package com.amplifyframework.datastore.syncengine;

import a6.b;
import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;
import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.graphql.GraphQLResponse;
import com.amplifyframework.api.graphql.SubscriptionType;
import com.amplifyframework.core.Action;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.Consumer;
import com.amplifyframework.core.async.Cancelable;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.core.model.ModelProvider;
import com.amplifyframework.core.model.ModelSchema;
import com.amplifyframework.core.model.SchemaRegistry;
import com.amplifyframework.core.model.SerializedModel;
import com.amplifyframework.datastore.AmplifyDisposables;
import com.amplifyframework.datastore.DataStoreChannelEventName;
import com.amplifyframework.datastore.DataStoreException;
import com.amplifyframework.datastore.appsync.AppSync;
import com.amplifyframework.datastore.appsync.AppSyncExtensions;
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
import com.amplifyframework.datastore.syncengine.SubscriptionEvent;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.hub.HubEvent;
import com.amplifyframework.logging.Logger;
import com.amplifyframework.util.Empty;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import p5.d;

/* loaded from: classes2.dex */
public final class SubscriptionProcessor {
    private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
    private static final long NETWORK_OP_TIMEOUT_SECONDS = 60;
    private static final long TIMEOUT_SECONDS_PER_MODEL = 20;
    private final long adjustedTimeoutSeconds;
    private final AppSync appSync;
    private a6.b<SubscriptionEvent<? extends Model>> buffer;
    private final Merger merger;
    private final ModelProvider modelProvider;
    private final Consumer<Throwable> onFailure;
    private final f5.a ongoingOperationsDisposable;
    private final QueryPredicateProvider queryPredicateProvider;
    private final SchemaRegistry schemaRegistry;

    /* renamed from: com.amplifyframework.datastore.syncengine.SubscriptionProcessor$1 */
    /* loaded from: classes2.dex */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType;

        static {
            int[] iArr = new int[SubscriptionType.values().length];
            $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType = iArr;
            try {
                iArr[SubscriptionType.ON_UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError unused) {
            }
            try {
                $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[SubscriptionType.ON_DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError unused2) {
            }
            try {
                $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[SubscriptionType.ON_CREATE.ordinal()] = 3;
            } catch (NoSuchFieldError unused3) {
            }
        }
    }

    /* loaded from: classes2.dex */
    public interface AppSyncStep {
        @NonNull
        ModelProviderStep appSync(@NonNull AppSync appSync);
    }

    /* loaded from: classes2.dex */
    public interface BuildStep {
        @NonNull
        SubscriptionProcessor build();
    }

    /* loaded from: classes2.dex */
    public static final class Builder implements AppSyncStep, ModelProviderStep, SchemaRegistryStep, MergerStep, QueryPredicateProviderStep, OnFailureStep, BuildStep {
        private AppSync appSync;
        private Merger merger;
        private ModelProvider modelProvider;
        private Consumer<Throwable> onFailure;
        private QueryPredicateProvider queryPredicateProvider;
        private SchemaRegistry schemaRegistry;

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.AppSyncStep
        @NonNull
        public ModelProviderStep appSync(@NonNull AppSync appSync) {
            Objects.requireNonNull(appSync);
            this.appSync = appSync;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.BuildStep
        @NonNull
        public SubscriptionProcessor build() {
            return new SubscriptionProcessor(this, null);
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.MergerStep
        @NonNull
        public QueryPredicateProviderStep merger(@NonNull Merger merger) {
            Objects.requireNonNull(merger);
            this.merger = merger;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.ModelProviderStep
        @NonNull
        public SchemaRegistryStep modelProvider(@NonNull ModelProvider modelProvider) {
            Objects.requireNonNull(modelProvider);
            this.modelProvider = modelProvider;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.OnFailureStep
        @NonNull
        public BuildStep onFailure(Consumer<Throwable> consumer) {
            Objects.requireNonNull(consumer);
            this.onFailure = consumer;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.QueryPredicateProviderStep
        @NonNull
        public OnFailureStep queryPredicateProvider(QueryPredicateProvider queryPredicateProvider) {
            Objects.requireNonNull(queryPredicateProvider);
            this.queryPredicateProvider = queryPredicateProvider;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SchemaRegistryStep
        @NonNull
        public MergerStep schemaRegistry(@NonNull SchemaRegistry schemaRegistry) {
            Objects.requireNonNull(schemaRegistry);
            this.schemaRegistry = schemaRegistry;
            return this;
        }
    }

    /* loaded from: classes2.dex */
    public interface MergerStep {
        @NonNull
        QueryPredicateProviderStep merger(@NonNull Merger merger);
    }

    /* loaded from: classes2.dex */
    public interface ModelProviderStep {
        @NonNull
        SchemaRegistryStep modelProvider(@NonNull ModelProvider modelProvider);
    }

    /* loaded from: classes2.dex */
    public interface OnFailureStep {
        @NonNull
        BuildStep onFailure(Consumer<Throwable> consumer);
    }

    /* loaded from: classes2.dex */
    public interface QueryPredicateProviderStep {
        @NonNull
        OnFailureStep queryPredicateProvider(QueryPredicateProvider queryPredicateProvider);
    }

    /* loaded from: classes2.dex */
    public interface SchemaRegistryStep {
        @NonNull
        MergerStep schemaRegistry(@NonNull SchemaRegistry schemaRegistry);
    }

    /* loaded from: classes2.dex */
    public interface SubscriptionMethod {
        <T extends Model> Cancelable subscribe(@NonNull ModelSchema modelSchema, @NonNull Consumer<String> consumer, @NonNull Consumer<GraphQLResponse<ModelWithMetadata<T>>> consumer2, @NonNull Consumer<DataStoreException> consumer3, @NonNull Action action);
    }

    private SubscriptionProcessor(Builder builder) {
        this.appSync = builder.appSync;
        this.modelProvider = builder.modelProvider;
        this.merger = builder.merger;
        this.queryPredicateProvider = builder.queryPredicateProvider;
        this.onFailure = builder.onFailure;
        this.schemaRegistry = builder.schemaRegistry;
        this.ongoingOperationsDisposable = new f5.a();
        this.adjustedTimeoutSeconds = Math.max(60L, Math.max(r0.models().size(), r0.modelSchemas().size()) * TIMEOUT_SECONDS_PER_MODEL);
    }

    public /* synthetic */ SubscriptionProcessor(Builder builder, AnonymousClass1 anonymousClass1) {
        this(builder);
    }

    public static AppSyncStep builder() {
        return new Builder();
    }

    private static SubscriptionEvent.Type fromSubscriptionType(SubscriptionType subscriptionType) {
        int i10 = AnonymousClass1.$SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[subscriptionType.ordinal()];
        if (i10 == 1) {
            return SubscriptionEvent.Type.UPDATE;
        }
        if (i10 == 2) {
            return SubscriptionEvent.Type.DELETE;
        }
        if (i10 == 3) {
            return SubscriptionEvent.Type.CREATE;
        }
        throw new IllegalArgumentException("Unknown subscription type: " + subscriptionType);
    }

    private boolean isExceptionType(DataStoreException dataStoreException, AppSyncExtensions.AppSyncErrorType appSyncErrorType) {
        if (!(dataStoreException instanceof DataStoreException.GraphQLResponseException)) {
            return false;
        }
        GraphQLResponse.Error error = ((DataStoreException.GraphQLResponseException) dataStoreException).getErrors().get(0);
        if (Empty.check(error.getExtensions())) {
            return false;
        }
        return appSyncErrorType.equals(new AppSyncExtensions(error.getExtensions()).getErrorType());
    }

    public static /* synthetic */ void lambda$null$3(SubscriptionType subscriptionType, ModelSchema modelSchema, AtomicReference atomicReference, AbortableCountDownLatch abortableCountDownLatch, String str) {
        LOG.debug("Subscription started for " + subscriptionType.name() + " " + modelSchema.getName() + " subscriptionId: " + str);
        atomicReference.set(str);
        abortableCountDownLatch.countDown();
    }

    public /* synthetic */ void lambda$null$4(AbortableCountDownLatch abortableCountDownLatch, SubscriptionType subscriptionType, ModelSchema modelSchema, DataStoreException dataStoreException) {
        if (isExceptionType(dataStoreException, AppSyncExtensions.AppSyncErrorType.UNAUTHORIZED)) {
            abortableCountDownLatch.countDown();
            LOG.warn("Unauthorized failure:" + subscriptionType.name() + " " + modelSchema.getName());
            return;
        }
        if (!isExceptionType(dataStoreException, AppSyncExtensions.AppSyncErrorType.OPERATION_DISABLED)) {
            if (abortableCountDownLatch.getCount() > 0) {
                abortableCountDownLatch.abort(dataStoreException);
                return;
            } else {
                this.onFailure.accept(dataStoreException);
                return;
            }
        }
        abortableCountDownLatch.countDown();
        LOG.warn("Operation disabled:" + subscriptionType.name() + " " + modelSchema.getName());
    }

    public static void lambda$null$5(AtomicReference atomicReference, e5.m mVar) {
        LOG.debug("Subscription completed:" + ((String) atomicReference.get()));
        d.a aVar = (d.a) mVar;
        if (aVar.a()) {
            return;
        }
        try {
            aVar.f7339a.onComplete();
        } finally {
            h5.a.dispose(aVar);
        }
    }

    public static /* synthetic */ void lambda$startDrainingMutationBuffer$10(f5.b bVar) throws Throwable {
        LOG.info("Starting processing subscription data buffer.");
    }

    public static /* synthetic */ void lambda$startDrainingMutationBuffer$11(Throwable th) throws Throwable {
        LOG.warn("Reading subscriptions buffer has failed.", th);
    }

    public static /* synthetic */ void lambda$startDrainingMutationBuffer$12() throws Throwable {
        LOG.warn("Reading from subscriptions buffer is completed.");
    }

    public static /* synthetic */ void lambda$startSubscriptions$0(f5.b bVar) throws Throwable {
        LOG.info("Starting processing subscription events.");
    }

    public static /* synthetic */ void lambda$startSubscriptions$1(Throwable th) throws Throwable {
        LOG.warn("Reading subscription events has failed.", th);
    }

    public static /* synthetic */ void lambda$startSubscriptions$2() throws Throwable {
        LOG.warn("Reading subscription events is completed.");
    }

    public void lambda$subscriptionObservable$6(AppSync appSync, SubscriptionType subscriptionType, ModelSchema modelSchema, AbortableCountDownLatch abortableCountDownLatch, final e5.m mVar) throws Throwable {
        SubscriptionMethod subscriptionMethodFor = subscriptionMethodFor(appSync, subscriptionType);
        final AtomicReference atomicReference = new AtomicReference();
        v vVar = new v(subscriptionType, modelSchema, atomicReference, abortableCountDownLatch, 1);
        mVar.getClass();
        h5.a.set((d.a) mVar, AmplifyDisposables.fromCancelable(subscriptionMethodFor.subscribe(modelSchema, vVar, new x(mVar, 2), new v(this, abortableCountDownLatch, subscriptionType, modelSchema, 2), new Action() { // from class: com.amplifyframework.datastore.syncengine.a0
            @Override // com.amplifyframework.core.Action
            public final void call() {
                SubscriptionProcessor.lambda$null$5(atomicReference, mVar);
            }
        })));
    }

    public static /* synthetic */ void lambda$subscriptionObservable$7(SubscriptionType subscriptionType, ModelSchema modelSchema, Throwable th) throws Throwable {
        LOG.warn("An error occurred on the remote " + subscriptionType.name() + " subscription for model " + modelSchema.getName(), th);
    }

    public /* synthetic */ boolean lambda$subscriptionObservable$8(ModelSchema modelSchema, ModelWithMetadata modelWithMetadata) throws Throwable {
        return this.queryPredicateProvider.getPredicate(modelSchema.getName()).evaluate(modelWithMetadata.getModel());
    }

    public static /* synthetic */ SubscriptionEvent lambda$subscriptionObservable$9(SubscriptionType subscriptionType, ModelSchema modelSchema, ModelWithMetadata modelWithMetadata) throws Throwable {
        return SubscriptionEvent.builder().type(fromSubscriptionType(subscriptionType)).modelWithMetadata(modelWithMetadata).modelSchema(modelSchema).build();
    }

    public e5.a mergeEvent(SubscriptionEvent<? extends Model> subscriptionEvent) {
        ModelWithMetadata<? extends Model> modelWithMetadata = subscriptionEvent.modelWithMetadata();
        if (!(modelWithMetadata.getModel() instanceof SerializedModel)) {
            return this.merger.merge(modelWithMetadata);
        }
        return this.merger.merge(new ModelWithMetadata(SerializedModel.builder().serializedData(SerializedModel.parseSerializedData(((SerializedModel) modelWithMetadata.getModel()).getSerializedData(), subscriptionEvent.modelSchema().getName(), this.schemaRegistry)).modelSchema(subscriptionEvent.modelSchema()).build(), modelWithMetadata.getSyncMetadata()));
    }

    @VisibleForTesting
    public static SubscriptionMethod subscriptionMethodFor(final AppSync appSync, SubscriptionType subscriptionType) throws DataStoreException {
        int i10 = AnonymousClass1.$SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[subscriptionType.ordinal()];
        final int i11 = 1;
        if (i10 == 1) {
            appSync.getClass();
            final int i12 = 0;
            return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.y
                @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
                public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                    switch (i12) {
                        case 0:
                            return appSync.onUpdate(modelSchema, consumer, consumer2, consumer3, action);
                        case 1:
                            return appSync.onDelete(modelSchema, consumer, consumer2, consumer3, action);
                        default:
                            return appSync.onCreate(modelSchema, consumer, consumer2, consumer3, action);
                    }
                }
            };
        }
        final int i13 = 2;
        if (i10 == 2) {
            appSync.getClass();
            return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.y
                @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
                public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                    switch (i11) {
                        case 0:
                            return appSync.onUpdate(modelSchema, consumer, consumer2, consumer3, action);
                        case 1:
                            return appSync.onDelete(modelSchema, consumer, consumer2, consumer3, action);
                        default:
                            return appSync.onCreate(modelSchema, consumer, consumer2, consumer3, action);
                    }
                }
            };
        }
        if (i10 != 3) {
            throw new DataStoreException("Failed to establish a model subscription.", "Was a new subscription type created?");
        }
        appSync.getClass();
        return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.y
            @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
            public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                switch (i13) {
                    case 0:
                        return appSync.onUpdate(modelSchema, consumer, consumer2, consumer3, action);
                    case 1:
                        return appSync.onDelete(modelSchema, consumer, consumer2, consumer3, action);
                    default:
                        return appSync.onCreate(modelSchema, consumer, consumer2, consumer3, action);
                }
            }
        };
    }

    private <T extends Model> e5.l<SubscriptionEvent<? extends Model>> subscriptionObservable(AppSync appSync, SubscriptionType subscriptionType, AbortableCountDownLatch<DataStoreException> abortableCountDownLatch, ModelSchema modelSchema) {
        p5.e eVar = new p5.e(new p5.d(new l(this, appSync, subscriptionType, modelSchema, abortableCountDownLatch)), new z(subscriptionType, modelSchema), i5.a.c);
        s5.c cVar = z5.a.c;
        Objects.requireNonNull(cVar, "scheduler is null");
        return new p5.n(new p5.h(new p5.n(new p5.q(eVar, cVar).e(cVar), new b(11)), new c(7, this, modelSchema)), new z(subscriptionType, modelSchema));
    }

    public static <T extends Model> ModelWithMetadata<T> unwrapResponse(GraphQLResponse<? extends ModelWithMetadata<T>> graphQLResponse) throws DataStoreException {
        String format = graphQLResponse.hasErrors() ? String.format("Errors on subscription: %s", graphQLResponse.getErrors()) : !graphQLResponse.hasData() ? "Empty data received on subscription." : null;
        if (format == null) {
            return graphQLResponse.getData();
        }
        throw new DataStoreException(format, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION);
    }

    public void startDrainingMutationBuffer() {
        f5.a aVar = this.ongoingOperationsDisposable;
        a6.b<SubscriptionEvent<? extends Model>> bVar = this.buffer;
        b bVar2 = new b(12);
        bVar.getClass();
        l5.i d10 = new p5.j(new p5.f(bVar, bVar2), new n(this, 4)).e(new b(13)).d(new b(14));
        k5.h hVar = new k5.h();
        d10.a(hVar);
        aVar.b(hVar);
    }

    public synchronized void startSubscriptions() throws DataStoreException {
        AbortableCountDownLatch<DataStoreException> abortableCountDownLatch = new AbortableCountDownLatch<>(this.modelProvider.modelNames().size() * SubscriptionType.values().length);
        this.buffer = new a6.b<>(new b.c());
        HashSet hashSet = new HashSet();
        Iterator<ModelSchema> it = this.modelProvider.modelSchemas().values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ModelSchema next = it.next();
            for (SubscriptionType subscriptionType : SubscriptionType.values()) {
                hashSet.add(subscriptionObservable(this.appSync, subscriptionType, abortableCountDownLatch, next));
            }
        }
        f5.a aVar = this.ongoingOperationsDisposable;
        e5.l d10 = e5.l.d(hashSet);
        s5.c cVar = z5.a.c;
        d10.getClass();
        Objects.requireNonNull(cVar, "scheduler is null");
        p5.e eVar = new p5.e(new p5.e(new p5.f(new p5.q(d10, cVar).e(cVar), new b(15)), new b(16), i5.a.c), i5.a.f3214d, new b(17));
        a6.b<SubscriptionEvent<? extends Model>> bVar = this.buffer;
        bVar.getClass();
        b0 b0Var = new b0(bVar, 0);
        a6.b<SubscriptionEvent<? extends Model>> bVar2 = this.buffer;
        bVar2.getClass();
        b0 b0Var2 = new b0(bVar2, 1);
        a6.b<SubscriptionEvent<? extends Model>> bVar3 = this.buffer;
        bVar3.getClass();
        k5.i iVar = new k5.i(b0Var, b0Var2, new b0(bVar3, 2));
        eVar.c(iVar);
        aVar.b(iVar);
        try {
            Logger logger = LOG;
            logger.debug("Waiting for subscriptions to start.");
            if (!abortableCountDownLatch.abortableAwait(this.adjustedTimeoutSeconds, TimeUnit.SECONDS)) {
                throw new DataStoreException("Timed out waiting for subscription processor to start.", "Retry");
            }
            Amplify.Hub.publish(HubChannel.DATASTORE, HubEvent.create(DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED));
            logger.info(String.format(Locale.US, "Started subscription processor for models: %s of types %s.", this.modelProvider.modelNames(), Arrays.toString(SubscriptionType.values())));
        } catch (InterruptedException unused) {
            LOG.warn("Subscription operations were interrupted during setup.");
        }
    }

    public synchronized void stopAllSubscriptionActivity() {
        Logger logger = LOG;
        logger.info("Stopping subscription processor.");
        this.ongoingOperationsDisposable.d();
        logger.info("Stopped subscription processor.");
    }
}
