package org.dna.mqtt.moquette.messaging.spi.impl;

import c.e.a.b;
import c.e.a.f;
import c.e.a.m;
import c.e.a.n;
import com.icecoldapps.serversultimate.packb.x;
import java.net.InetSocketAddress;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.mina.core.session.IoSession;
import org.dna.mqtt.moquette.messaging.spi.IMatchingCondition;
import org.dna.mqtt.moquette.messaging.spi.IMessaging;
import org.dna.mqtt.moquette.messaging.spi.IStorageService;
import org.dna.mqtt.moquette.messaging.spi.impl.HawtDBStorageService;
import org.dna.mqtt.moquette.messaging.spi.impl.events.DisconnectEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.InitEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.MessagingEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.NotifyEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.ProtocolEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.PubAckEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.PublishEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.RepublishEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.events.StopEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.subscriptions.Subscription;
import org.dna.mqtt.moquette.messaging.spi.impl.subscriptions.SubscriptionsStore;
import org.dna.mqtt.moquette.proto.PubCompMessage;
import org.dna.mqtt.moquette.proto.messages.AbstractMessage;
import org.dna.mqtt.moquette.proto.messages.ConnectMessage;
import org.dna.mqtt.moquette.proto.messages.DisconnectMessage;
import org.dna.mqtt.moquette.proto.messages.PubAckMessage;
import org.dna.mqtt.moquette.proto.messages.PubRecMessage;
import org.dna.mqtt.moquette.proto.messages.PubRelMessage;
import org.dna.mqtt.moquette.proto.messages.PublishMessage;
import org.dna.mqtt.moquette.proto.messages.SubAckMessage;
import org.dna.mqtt.moquette.proto.messages.SubscribeMessage;
import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage;
import org.dna.mqtt.moquette.proto.messages.UnsubscribeMessage;
import org.dna.mqtt.moquette.server.ConnectionDescriptor;
import org.dna.mqtt.moquette.server.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class SimpleMessaging implements IMessaging, f<ValueEvent> {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessaging.class);
    public x _ClassThreadMQTT;
    b<ValueEvent> m_eventProcessor;
    private ExecutorService m_executor;
    private m<ValueEvent> m_ringBuffer;
    private IStorageService m_storageService;
    private SubscriptionsStore subscriptions = new SubscriptionsStore();
    Map<String, ConnectionDescriptor> m_clientIDs = new HashMap();

    public SimpleMessaging(x xVar) {
        this._ClassThreadMQTT = xVar;
    }

    private void disruptorPublish(MessagingEvent messagingEvent) {
        long a2 = this.m_ringBuffer.a();
        this.m_ringBuffer.b(a2).setEvent(messagingEvent);
        this.m_ringBuffer.a(a2);
    }

    private void notify(NotifyEvent notifyEvent) {
        LOG.debug("notify invoked with event " + notifyEvent);
        String clientId = notifyEvent.getClientId();
        PublishMessage publishMessage = new PublishMessage();
        publishMessage.setRetainFlag(notifyEvent.isRetained());
        publishMessage.setTopicName(notifyEvent.getTopic());
        publishMessage.setQos(notifyEvent.getQos());
        publishMessage.setPayload(notifyEvent.getMessage());
        if (publishMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            publishMessage.setMessageID(Integer.valueOf(notifyEvent.getMessageID()));
        }
        try {
            LOG.debug("clientIDs are " + this.m_clientIDs);
            LOG.debug("clientID is " + clientId);
            try {
                LOG.debug("Session for clientId " + clientId + " is " + this.m_clientIDs.get(clientId).getSession());
            } catch (Exception unused) {
            }
            this.m_clientIDs.get(clientId).getSession().write(publishMessage);
        } catch (Throwable th) {
            LOG.error((String) null, th);
        }
    }

    private void processDisconnect(IoSession ioSession, String str) throws InterruptedException {
        this.m_clientIDs.remove(str);
        ioSession.close(true);
        this.subscriptions.deactivate(str);
    }

    private void processInit() {
        this.m_storageService = new HawtDBStorageService(this._ClassThreadMQTT.f2945e._mqtt_storagefile);
        this.m_storageService.initStore();
        this.subscriptions.init(this.m_storageService);
    }

    private void processPubComp(String str, int i2) {
        this.m_storageService.cleanInFlight(String.format("%s%d", str, Integer.valueOf(i2)));
    }

    private void processPubRec(String str, int i2) {
        LOG.debug(String.format("processPubRec invoked for clientID %s ad messageID %d", str, Integer.valueOf(i2)));
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(Integer.valueOf(i2));
        this.m_clientIDs.get(str).getSession().write(pubRelMessage);
    }

    private void processRepublish(RepublishEvent republishEvent) throws InterruptedException {
        LOG.debug("processRepublish invoked");
        List<PublishEvent> retrivePersistedPublishes = this.m_storageService.retrivePersistedPublishes(republishEvent.getClientID());
        if (retrivePersistedPublishes == null) {
            LOG.debug("processRepublish, no stored publish events");
            return;
        }
        for (PublishEvent publishEvent : retrivePersistedPublishes) {
            notify(new NotifyEvent(publishEvent.getClientID(), publishEvent.getTopic(), publishEvent.getQos(), publishEvent.getMessage(), false, publishEvent.getMessageID()));
        }
    }

    private void processStop() {
        LOG.debug("processStop invoked");
        this.m_storageService.close();
        this.m_executor.shutdown();
    }

    private void publish2Subscribers(String str, AbstractMessage.QOSType qOSType, byte[] bArr, boolean z, Integer num) {
        for (Subscription subscription : this.subscriptions.matches(str)) {
            if (qOSType == AbstractMessage.QOSType.MOST_ONE) {
                notify(new NotifyEvent(subscription.getClientId(), str, qOSType, bArr, false));
            } else if (subscription.isCleanSession() || subscription.isActive()) {
                if (qOSType == AbstractMessage.QOSType.EXACTLY_ONCE) {
                    this.m_storageService.addInFlight(new PublishEvent(str, qOSType, bArr, z, subscription.getClientId(), num.intValue(), null), String.format("%s%d", subscription.getClientId(), num));
                }
                notify(new NotifyEvent(subscription.getClientId(), str, qOSType, bArr, false));
            } else {
                this.m_storageService.storePublishForFuture(new PublishEvent(str, qOSType, bArr, z, subscription.getClientId(), num.intValue(), null));
            }
        }
    }

    private void refill(MessagingEvent messagingEvent) {
        disruptorPublish(messagingEvent);
    }

    private void sendPubAck(PubAckEvent pubAckEvent) {
        LOG.debug("sendPubAck invoked");
        String clientID = pubAckEvent.getClientID();
        PubAckMessage pubAckMessage = new PubAckMessage();
        pubAckMessage.setMessageID(Integer.valueOf(pubAckEvent.getMessageId()));
        try {
            LOG.debug("clientIDs are " + this.m_clientIDs);
            LOG.debug("Session for clientId " + clientID + " is " + this.m_clientIDs.get(clientID).getSession());
            this.m_clientIDs.get(clientID).getSession().write(pubAckMessage);
        } catch (Throwable th) {
            LOG.error((String) null, th);
        }
    }

    private void sendPubComp(String str, int i2) {
        LOG.debug(String.format("sendPubComp invoked for clientID %s ad messageID %d", str, Integer.valueOf(i2)));
        PubCompMessage pubCompMessage = new PubCompMessage();
        pubCompMessage.setMessageID(Integer.valueOf(i2));
        this.m_clientIDs.get(str).getSession().write(pubCompMessage);
    }

    private void sendPubRec(String str, int i2) {
        LOG.debug(String.format("sendPubRec invoked for clientID %s ad messageID %d", str, Integer.valueOf(i2)));
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(Integer.valueOf(i2));
        this.m_clientIDs.get(str).getSession().write(pubRecMessage);
    }

    private void subscribeSingleTopic(Subscription subscription, final String str) {
        this.subscriptions.add(subscription);
        for (HawtDBStorageService.StoredMessage storedMessage : this.m_storageService.searchMatching(new IMatchingCondition() { // from class: org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.1
            @Override // org.dna.mqtt.moquette.messaging.spi.IMatchingCondition
            public boolean match(String str2) {
                return SubscriptionsStore.matchTopics(str2, str);
            }
        })) {
            LOG.debug("Inserting NotifyEvent into outbound for topic " + str);
            notify(new NotifyEvent(subscription.getClientId(), str, storedMessage.getQos(), storedMessage.getPayload(), true));
        }
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IMessaging
    public void disconnect(IoSession ioSession) {
        disruptorPublish(new DisconnectEvent(ioSession));
    }

    protected SubscriptionsStore getSubscriptions() {
        return this.subscriptions;
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IMessaging
    public void handleProtocolMessage(IoSession ioSession, AbstractMessage abstractMessage) {
        disruptorPublish(new ProtocolEvent(ioSession, abstractMessage));
    }

    public void init() {
        this.m_executor = Executors.newFixedThreadPool(1);
        this.m_ringBuffer = new m<>(ValueEvent.EVENT_FACTORY, 32768);
        this.m_eventProcessor = new b<>(this.m_ringBuffer, this.m_ringBuffer.a(new n[0]), this);
        this.m_ringBuffer.b(this.m_eventProcessor.a());
        this.m_executor.submit(this.m_eventProcessor);
        disruptorPublish(new InitEvent());
    }

    public String md5(String str) throws Exception {
        byte[] digest = MessageDigest.getInstance("MD5").digest(str.getBytes("UTF-8"));
        StringBuffer stringBuffer = new StringBuffer();
        for (byte b2 : digest) {
            stringBuffer.append(Integer.toHexString((b2 & 255) | 256).substring(1, 3));
        }
        return stringBuffer.toString();
    }

    @Override // c.e.a.f
    public void onEvent(ValueEvent valueEvent, long j, boolean z) throws Exception {
        String str;
        MessagingEvent event = valueEvent.getEvent();
        LOG.debug("onEvent processing messaging event " + event);
        if (event instanceof PublishEvent) {
            processPublish((PublishEvent) event);
            return;
        }
        if (event instanceof StopEvent) {
            processStop();
            return;
        }
        if (event instanceof DisconnectEvent) {
            DisconnectEvent disconnectEvent = (DisconnectEvent) event;
            processDisconnect(disconnectEvent.getSession(), (String) disconnectEvent.getSession().getAttribute(Constants.ATTR_CLIENTID));
            return;
        }
        if (event instanceof RepublishEvent) {
            processRepublish((RepublishEvent) event);
            return;
        }
        if (!(event instanceof ProtocolEvent)) {
            if (event instanceof InitEvent) {
                processInit();
                return;
            }
            return;
        }
        ProtocolEvent protocolEvent = (ProtocolEvent) event;
        IoSession session = protocolEvent.getSession();
        AbstractMessage message = protocolEvent.getMessage();
        if (message instanceof ConnectMessage) {
            processConnect(session, (ConnectMessage) message);
            return;
        }
        if (message instanceof PublishMessage) {
            PublishMessage publishMessage = (PublishMessage) message;
            String str2 = (String) session.getAttribute(Constants.ATTR_CLIENTID);
            processPublish(message.getQos() == AbstractMessage.QOSType.MOST_ONE ? new PublishEvent(publishMessage.getTopicName(), publishMessage.getQos(), publishMessage.getPayload(), publishMessage.isRetainFlag(), str2, session) : new PublishEvent(publishMessage.getTopicName(), publishMessage.getQos(), publishMessage.getPayload(), publishMessage.isRetainFlag(), str2, publishMessage.getMessageID().intValue(), session));
            return;
        }
        if (message instanceof DisconnectMessage) {
            String str3 = (String) session.getAttribute(Constants.ATTR_CLIENTID);
            if (((Boolean) session.getAttribute(Constants.CLEAN_SESSION)).booleanValue()) {
                processRemoveAllSubscriptions(str3);
            }
            processDisconnect(session, str3);
            return;
        }
        if (message instanceof UnsubscribeMessage) {
            UnsubscribeMessage unsubscribeMessage = (UnsubscribeMessage) message;
            processUnsubscribe(session, (String) session.getAttribute(Constants.ATTR_CLIENTID), unsubscribeMessage.topics(), unsubscribeMessage.getMessageID().intValue());
            return;
        }
        if (message instanceof SubscribeMessage) {
            processSubscribe(session, (SubscribeMessage) message, (String) session.getAttribute(Constants.ATTR_CLIENTID), ((Boolean) session.getAttribute(Constants.CLEAN_SESSION)).booleanValue());
            return;
        }
        if (message instanceof PubRelMessage) {
            processPubRel((String) session.getAttribute(Constants.ATTR_CLIENTID), ((PubRelMessage) message).getMessageID().intValue());
            return;
        }
        if (message instanceof PubRecMessage) {
            processPubRec((String) session.getAttribute(Constants.ATTR_CLIENTID), ((PubRecMessage) message).getMessageID().intValue());
            return;
        }
        if (message instanceof PubCompMessage) {
            processPubComp((String) session.getAttribute(Constants.ATTR_CLIENTID), ((PubCompMessage) message).getMessageID().intValue());
            return;
        }
        try {
            str = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();
        } catch (Exception unused) {
            str = "";
        }
        this._ClassThreadMQTT.f2942b.b("Illegal message received: " + message + "...", str);
        throw new RuntimeException("Illegal message received " + message);
    }

    /* JADX WARN: Can't wrap try/catch for region: R(15:32|33|34|(11:36|38|39|(1:41)|(1:44)|(1:46)|47|(4:55|(4:59|60|(1:62)|63)|50|(2:52|53)(1:54))|49|50|(0)(0))|69|38|39|(0)|(0)|(0)|47|(0)|49|50|(0)(0)) */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x01af, code lost:
    
        if ((r6 != null ? r6 : "").equals(r2) != false) goto L65;
     */
    /* JADX WARN: Removed duplicated region for block: B:41:0x0153 A[Catch: Exception -> 0x0158, TRY_LEAVE, TryCatch #3 {Exception -> 0x0158, blocks: (B:39:0x014d, B:41:0x0153), top: B:38:0x014d }] */
    /* JADX WARN: Removed duplicated region for block: B:44:0x015b  */
    /* JADX WARN: Removed duplicated region for block: B:46:0x015e  */
    /* JADX WARN: Removed duplicated region for block: B:52:0x01b5  */
    /* JADX WARN: Removed duplicated region for block: B:54:0x01dd  */
    /* JADX WARN: Removed duplicated region for block: B:55:0x016b  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void processConnect(org.apache.mina.core.session.IoSession r14, org.dna.mqtt.moquette.proto.messages.ConnectMessage r15) {
        /*
            Method dump skipped, instructions count: 559
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.processConnect(org.apache.mina.core.session.IoSession, org.dna.mqtt.moquette.proto.messages.ConnectMessage):void");
    }

    protected void processPubRel(String str, int i2) {
        String format = String.format("%s%d", str, Integer.valueOf(i2));
        PublishEvent retrieveQoS2Message = this.m_storageService.retrieveQoS2Message(format);
        String topic = retrieveQoS2Message.getTopic();
        AbstractMessage.QOSType qos = retrieveQoS2Message.getQos();
        byte[] message = retrieveQoS2Message.getMessage();
        boolean isRetain = retrieveQoS2Message.isRetain();
        publish2Subscribers(topic, qos, message, isRetain, Integer.valueOf(retrieveQoS2Message.getMessageID()));
        this.m_storageService.removeQoS2Message(format);
        if (isRetain) {
            this.m_storageService.storeRetained(topic, message, qos);
        }
        sendPubComp(str, i2);
    }

    protected void processPublish(PublishEvent publishEvent) {
        String str;
        LOG.debug("processPublish invoked with " + publishEvent);
        String topic = publishEvent.getTopic();
        AbstractMessage.QOSType qos = publishEvent.getQos();
        byte[] message = publishEvent.getMessage();
        boolean isRetain = publishEvent.isRetain();
        if (qos == AbstractMessage.QOSType.LEAST_ONE) {
            str = String.format("%s%d", publishEvent.getClientID(), Integer.valueOf(publishEvent.getMessageID()));
            this.m_storageService.addInFlight(publishEvent, str);
        } else if (qos == AbstractMessage.QOSType.EXACTLY_ONCE) {
            str = String.format("%s%d", publishEvent.getClientID(), Integer.valueOf(publishEvent.getMessageID()));
            this.m_storageService.persistQoS2Message(str, publishEvent);
            sendPubRec(publishEvent.getClientID(), publishEvent.getMessageID());
        } else {
            str = null;
        }
        String str2 = str;
        publish2Subscribers(topic, qos, message, isRetain, Integer.valueOf(publishEvent.getMessageID()));
        if (qos == AbstractMessage.QOSType.LEAST_ONE) {
            this.m_storageService.cleanInFlight(str2);
            sendPubAck(new PubAckEvent(publishEvent.getMessageID(), publishEvent.getClientID()));
        }
        if (isRetain) {
            this.m_storageService.storeRetained(topic, message, qos);
        }
    }

    protected void processRemoveAllSubscriptions(String str) {
        LOG.debug("processRemoveAllSubscriptions invoked");
        this.subscriptions.removeForClient(str);
        this.m_storageService.cleanPersistedPublishes(str);
    }

    protected void processSubscribe(IoSession ioSession, SubscribeMessage subscribeMessage, String str, boolean z) {
        LOG.debug("processSubscribe invoked");
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            subscribeSingleTopic(new Subscription(str, couple.getTopic(), AbstractMessage.QOSType.values()[couple.getQos()], z), couple.getTopic());
        }
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageID(subscribeMessage.getMessageID());
        for (int i2 = 0; i2 < subscribeMessage.subscriptions().size(); i2++) {
            subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
        }
        LOG.info("replying with SubAct to MSG ID " + subscribeMessage.getMessageID());
        ioSession.write(subAckMessage);
    }

    protected void processUnsubscribe(IoSession ioSession, String str, List<String> list, int i2) {
        LOG.debug("processSubscribe invoked");
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptions.removeSubscription(it.next(), str);
        }
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageID(Integer.valueOf(i2));
        LOG.info("replying with UnsubAck to MSG ID {0}", Integer.valueOf(i2));
        ioSession.write(unsubAckMessage);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IMessaging
    public void republishStored(String str) {
        LOG.debug("republishStored invoked to publish soterd messages for clientID " + str);
        disruptorPublish(new RepublishEvent(str));
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IMessaging
    public void stop() {
        disruptorPublish(new StopEvent());
    }
}
