package dk.dma.ais.packet;

import blcjava.util.Objects;
import blcjava.util.function.AndPredicate;
import blcjava.util.function.Consumer;
import blcjava.util.function.Predicate;
import dk.dma.ais.packet.AisPacketStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public class AisPacketStreamImpl extends AisPacketStream {
    static final Logger LOG = LoggerFactory.getLogger(AisPacketStreamImpl.class);
    private final Object deliveryLock;
    final Predicate<? super AisPacket> predicate;
    final AisPacketStreamImpl root;
    final ConcurrentHashMap<SubscriptionImpl, SubscriptionImpl> subscriptions;

    /* loaded from: classes.dex */
    class SubscriptionImpl implements AisPacketStream.Subscription {
        final Consumer<? super AisPacket> consumer;
        final Predicate<? super AisPacket> predicate;
        final CountDownLatch cancelled = new CountDownLatch(1);
        final AtomicLong count = new AtomicLong();
        final ReentrantLock lock = new ReentrantLock();
        final ConcurrentLinkedQueue<AisPacket> packets = new ConcurrentLinkedQueue<>();

        SubscriptionImpl(Predicate<? super AisPacket> predicate, Consumer<? super AisPacket> consumer) {
            this.predicate = predicate;
            this.consumer = (Consumer) Objects.requireNonNull(consumer);
        }

        @Override // dk.dma.ais.packet.AisPacketStream.Subscription
        public void awaitCancelled() throws InterruptedException {
            this.cancelled.await();
        }

        @Override // dk.dma.ais.packet.AisPacketStream.Subscription
        public boolean awaitCancelled(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.cancelled.await(j, timeUnit);
        }

        @Override // dk.dma.ais.packet.AisPacketStream.Subscription
        public void cancel() {
            cancel(null);
        }

        synchronized void cancel(Throwable th) {
            this.lock.lock();
            if (th != null) {
                try {
                    AisPacketStreamImpl.LOG.error("Cancelling subscription, because of error", th);
                } finally {
                    this.lock.unlock();
                }
            }
            if (this.cancelled.getCount() > 0) {
                AisPacketStreamImpl.this.subscriptions.remove(this);
                this.cancelled.countDown();
                if (this.consumer instanceof AisPacketStream.StreamConsumer) {
                    try {
                        ((AisPacketStream.StreamConsumer) this.consumer).end(th);
                    } catch (RuntimeException e) {
                        if (th == null) {
                            AisPacketStreamImpl.LOG.error("Failed to write footer", e);
                        }
                    }
                }
            }
        }

        void deliver() {
            while (this.lock.tryLock()) {
                try {
                    if (!this.packets.isEmpty()) {
                        AisPacket poll = this.packets.poll();
                        while (true) {
                            AisPacket aisPacket = poll;
                            if (aisPacket != null) {
                                try {
                                    if (this.predicate == null || this.predicate.test(aisPacket)) {
                                        if (this.count.getAndIncrement() == 0 && (this.consumer instanceof AisPacketStream.StreamConsumer)) {
                                            ((AisPacketStream.StreamConsumer) this.consumer).begin();
                                        }
                                        this.consumer.accept(aisPacket);
                                    }
                                    poll = this.packets.poll();
                                } catch (RuntimeException e) {
                                    e = e;
                                    try {
                                        if (e == AisPacketStream.CANCEL) {
                                            e = null;
                                        }
                                        cancel(e);
                                    } catch (RuntimeException unused) {
                                    }
                                }
                            }
                        }
                    }
                    return;
                } finally {
                    this.lock.unlock();
                }
            }
        }

        @Override // dk.dma.ais.packet.AisPacketStream.Subscription
        public boolean isCancelled() {
            return !AisPacketStreamImpl.this.subscriptions.containsKey(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AisPacketStreamImpl() {
        this.deliveryLock = new Object();
        this.predicate = null;
        this.subscriptions = new ConcurrentHashMap<>();
        this.root = null;
    }

    AisPacketStreamImpl(AisPacketStreamImpl aisPacketStreamImpl, Predicate<? super AisPacket> predicate) {
        this.deliveryLock = new Object();
        this.root = (AisPacketStreamImpl) Objects.requireNonNull(aisPacketStreamImpl);
        this.predicate = (Predicate) Objects.requireNonNull(predicate);
        this.subscriptions = aisPacketStreamImpl.subscriptions;
    }

    @Override // dk.dma.ais.packet.AisPacketStream
    public void add(AisPacket aisPacket) {
        Objects.requireNonNull(aisPacket);
        if (this.root != null) {
            throw new UnsupportedOperationException("Can only add elements to the root stream");
        }
        synchronized (this.deliveryLock) {
            for (SubscriptionImpl subscriptionImpl : this.subscriptions.keySet()) {
                subscriptionImpl.packets.add(aisPacket);
                subscriptionImpl.deliver();
            }
        }
    }

    @Override // dk.dma.ais.packet.AisPacketStream
    public AisPacketStream filter(Predicate<? super AisPacket> predicate) {
        Objects.requireNonNull(predicate);
        AisPacketStreamImpl aisPacketStreamImpl = this.root;
        if (aisPacketStreamImpl == null) {
            aisPacketStreamImpl = this;
        }
        return new AisPacketStreamImpl(aisPacketStreamImpl, new AndPredicate(this.predicate, predicate));
    }

    void handlePacket(AisPacket aisPacket, SubscriptionImpl subscriptionImpl) {
        if (subscriptionImpl.predicate == null || subscriptionImpl.predicate.test(aisPacket)) {
            subscriptionImpl.consumer.accept(aisPacket);
        }
    }

    @Override // dk.dma.ais.packet.AisPacketStream
    public AisPacketStream.Subscription subscribe(Consumer<AisPacket> consumer) {
        SubscriptionImpl subscriptionImpl = new SubscriptionImpl(this.predicate, consumer);
        this.subscriptions.put(subscriptionImpl, subscriptionImpl);
        return subscriptionImpl;
    }
}
