package org.dna.mqtt.moquette.messaging.spi.impl;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.dna.mqtt.moquette.MQTTException;
import org.dna.mqtt.moquette.messaging.spi.IMatchingCondition;
import org.dna.mqtt.moquette.messaging.spi.IStorageService;
import org.dna.mqtt.moquette.messaging.spi.impl.events.PublishEvent;
import org.dna.mqtt.moquette.messaging.spi.impl.subscriptions.Subscription;
import org.dna.mqtt.moquette.proto.messages.AbstractMessage;
import org.fusesource.hawtbuf.codec.StringCodec;
import org.fusesource.hawtdb.api.BTreeIndexFactory;
import org.fusesource.hawtdb.api.MultiIndexFactory;
import org.fusesource.hawtdb.api.PageFileFactory;
import org.fusesource.hawtdb.api.SortedIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class HawtDBStorageService implements IStorageService {
    private static final Logger LOG = LoggerFactory.getLogger(HawtDBStorageService.class);
    private SortedIndex<String, PublishEvent> m_inflightStore;
    private MultiIndexFactory m_multiIndexFactory;
    private SortedIndex<String, List<PublishEvent>> m_persistentMessageStore;
    private SortedIndex<String, Set<Subscription>> m_persistentSubscriptions;
    private SortedIndex<String, PublishEvent> m_qos2Store;
    private SortedIndex<String, StoredMessage> m_retainedStore;
    private PageFileFactory pageFactory = new PageFileFactory();

    /* loaded from: classes.dex */
    public static class StoredMessage implements Serializable {
        byte[] m_payload;
        AbstractMessage.QOSType m_qos;

        StoredMessage(byte[] bArr, AbstractMessage.QOSType qOSType) {
            this.m_qos = qOSType;
            this.m_payload = bArr;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public byte[] getPayload() {
            return this.m_payload;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AbstractMessage.QOSType getQos() {
            return this.m_qos;
        }
    }

    public HawtDBStorageService(String str) {
        try {
            File file = new File(str);
            file.createNewFile();
            this.pageFactory.setFile(file);
            this.pageFactory.open();
            this.m_multiIndexFactory = new MultiIndexFactory(this.pageFactory.getPageFile());
        } catch (IOException e2) {
            LOG.error((String) null, (Throwable) e2);
            throw new MQTTException("Can't create temp file for subscriptions storage [" + str + "]", e2);
        }
    }

    private void initInflightMessageStore() {
        BTreeIndexFactory bTreeIndexFactory = new BTreeIndexFactory();
        bTreeIndexFactory.setKeyCodec(StringCodec.INSTANCE);
        this.m_inflightStore = (SortedIndex) this.m_multiIndexFactory.openOrCreate("inflight", bTreeIndexFactory);
    }

    private void initPersistentMessageStore() {
        BTreeIndexFactory bTreeIndexFactory = new BTreeIndexFactory();
        bTreeIndexFactory.setKeyCodec(StringCodec.INSTANCE);
        this.m_persistentMessageStore = (SortedIndex) this.m_multiIndexFactory.openOrCreate("persistedMessages", bTreeIndexFactory);
    }

    private void initPersistentQoS2MessageStore() {
        BTreeIndexFactory bTreeIndexFactory = new BTreeIndexFactory();
        bTreeIndexFactory.setKeyCodec(StringCodec.INSTANCE);
        this.m_qos2Store = (SortedIndex) this.m_multiIndexFactory.openOrCreate("qos2Store", bTreeIndexFactory);
    }

    private void initPersistentSubscriptions() {
        BTreeIndexFactory bTreeIndexFactory = new BTreeIndexFactory();
        bTreeIndexFactory.setKeyCodec(StringCodec.INSTANCE);
        this.m_persistentSubscriptions = (SortedIndex) this.m_multiIndexFactory.openOrCreate("subscriptions", bTreeIndexFactory);
    }

    private void initRetainedStore() {
        BTreeIndexFactory bTreeIndexFactory = new BTreeIndexFactory();
        bTreeIndexFactory.setKeyCodec(StringCodec.INSTANCE);
        this.m_retainedStore = (SortedIndex) this.m_multiIndexFactory.openOrCreate("retained", bTreeIndexFactory);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void addInFlight(PublishEvent publishEvent, String str) {
        this.m_inflightStore.put(str, publishEvent);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void addNewSubscription(Subscription subscription, String str) {
        if (!this.m_persistentSubscriptions.containsKey(str)) {
            this.m_persistentSubscriptions.put(str, new HashSet());
        }
        Set<Subscription> set = this.m_persistentSubscriptions.get(str);
        if (set.contains(subscription)) {
            return;
        }
        set.add(subscription);
        this.m_persistentSubscriptions.put(str, set);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void cleanInFlight(String str) {
        this.m_inflightStore.remove(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void cleanPersistedPublishes(String str) {
        this.m_persistentMessageStore.remove(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void close() {
        try {
            this.pageFactory.close();
        } catch (IOException e2) {
            LOG.error((String) null, (Throwable) e2);
        }
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void initStore() {
        initRetainedStore();
        initPersistentMessageStore();
        initInflightMessageStore();
        initPersistentSubscriptions();
        initPersistentQoS2MessageStore();
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void persistQoS2Message(String str, PublishEvent publishEvent) {
        LOG.debug(String.format("persistQoS2Message store pubKey %s, evt %s", str, publishEvent));
        this.m_qos2Store.put(str, publishEvent);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void removeAllSubscriptions(String str) {
        this.m_persistentSubscriptions.remove(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void removeQoS2Message(String str) {
        this.m_qos2Store.remove(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public List<Subscription> retrieveAllSubscriptions() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, Set<Subscription>>> it = this.m_persistentSubscriptions.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getValue());
        }
        return arrayList;
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public PublishEvent retrieveQoS2Message(String str) {
        return this.m_qos2Store.get(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public List<PublishEvent> retrivePersistedPublishes(String str) {
        return this.m_persistentMessageStore.get(str);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public Collection<StoredMessage> searchMatching(IMatchingCondition iMatchingCondition) {
        LOG.debug("searchMatching scanning all retained messages, presents are " + this.m_retainedStore.size());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, StoredMessage> entry : this.m_retainedStore) {
            StoredMessage value = entry.getValue();
            if (iMatchingCondition.match(entry.getKey())) {
                arrayList.add(value);
            }
        }
        return arrayList;
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void storePublishForFuture(PublishEvent publishEvent) {
        LOG.debug("storePublishForFuture store evt " + publishEvent);
        String clientID = publishEvent.getClientID();
        List<PublishEvent> arrayList = !this.m_persistentMessageStore.containsKey(clientID) ? new ArrayList<>() : this.m_persistentMessageStore.get(clientID);
        arrayList.add(publishEvent);
        this.m_persistentMessageStore.put(clientID, arrayList);
    }

    @Override // org.dna.mqtt.moquette.messaging.spi.IStorageService
    public void storeRetained(String str, byte[] bArr, AbstractMessage.QOSType qOSType) {
        if (bArr.length == 0) {
            this.m_retainedStore.remove(str);
        } else {
            this.m_retainedStore.put(str, new StoredMessage(bArr, qOSType));
        }
    }
}
