package com.spark.indy.android.features.inbox;

import a9.e;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.g;
import androidx.lifecycle.m;
import com.segment.analytics.integrations.BasePayload;
import com.spark.indy.android.data.remote.network.grpc.dispatcher.Dispatcher;
import com.spark.indy.android.managers.GrpcManager;
import e7.o;
import fa.z;
import fa.z0;
import i7.d;
import i7.f;
import ia.b;
import ia.p0;
import ia.w0;
import q7.p;
import r7.k;
import za.h;

/* loaded from: classes2.dex */
public final class WebSocketServiceImpl implements h {
    private final p0<String> conversationIdFlow;
    private final f coroutineContext;
    private final GrpcManager grpcManager;
    private z0 job;

    public WebSocketServiceImpl(GrpcManager grpcManager, f fVar) {
        k.f(grpcManager, "grpcManager");
        k.f(fVar, "coroutineContext");
        this.grpcManager = grpcManager;
        this.coroutineContext = fVar;
        this.conversationIdFlow = w0.b(0, 0, null, 7);
    }

    private final <T> ia.f<T> grpcFlow(p<? super c6.k<T>, ? super d<? super o>, ? extends Object> pVar) {
        return new b(new WebSocketServiceImpl$grpcFlow$1(pVar, null), null, 0, null, 14);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ia.f<Dispatcher.Message> startMessagingStream() {
        return grpcFlow(new WebSocketServiceImpl$startMessagingStream$1(this, null));
    }

    @Override // za.h
    public void closeSocket() {
        z0 z0Var = this.job;
        if (z0Var != null) {
            z0Var.c(null);
        }
    }

    @Override // za.h
    public void connect() {
        this.job = e.C(z.b(this.coroutineContext), null, 0, new WebSocketServiceImpl$connect$1(this, null), 3, null);
    }

    public LiveData<Throwable> getErrorStream() {
        WebSocketServiceImpl$getErrorStream$1 webSocketServiceImpl$getErrorStream$1 = new WebSocketServiceImpl$getErrorStream$1(null);
        i7.h hVar = i7.h.f14681a;
        k.f(hVar, BasePayload.CONTEXT_KEY);
        k.f(webSocketServiceImpl$getErrorStream$1, "block");
        return new g(hVar, 5000L, webSocketServiceImpl$getErrorStream$1);
    }

    @Override // za.h
    public LiveData<String> getMessageStream() {
        return m.a(this.conversationIdFlow, null, 0L, 3);
    }

    public void reconnect() {
        throw new e7.h("An operation is not implemented: Not yet implemented");
    }
}
