/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.state.message;

import io.camunda.zeebe.db.ColumnFamily;
import io.camunda.zeebe.db.DbKey;
import io.camunda.zeebe.db.DbValue;
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.impl.DbCompositeKey;
import io.camunda.zeebe.db.impl.DbLong;
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.db.impl.DbTenantAwareKey;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.message.TransientPendingSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.InstantSource;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;

public final class DbProcessMessageSubscriptionState
implements MutableProcessMessageSubscriptionState,
PendingProcessMessageSubscriptionState,
StreamProcessorLifecycleAware {
    private static final Logger LOG = Loggers.PROCESS_PROCESSOR_LOGGER;
    private final DbLong elementInstanceKey;
    private final DbString tenantIdKey;
    private final DbString messageName;
    private final DbTenantAwareKey<DbString> tenantAwareMessageName;
    private final DbCompositeKey<DbLong, DbTenantAwareKey<DbString>> elementKeyAndMessageName;
    private final ProcessMessageSubscription processMessageSubscription;
    private final ColumnFamily<DbCompositeKey<DbLong, DbTenantAwareKey<DbString>>, ProcessMessageSubscription> subscriptionColumnFamily;
    private final TransientPendingSubscriptionState transientState;
    private final InstantSource clock;

    public DbProcessMessageSubscriptionState(ZeebeDb<ZbColumnFamilies> zeebeDb, TransactionContext transactionContext, TransientPendingSubscriptionState transientProcessMessageSubscriptionState, InstantSource clock) {
        this.clock = clock;
        this.elementInstanceKey = new DbLong();
        this.tenantIdKey = new DbString();
        this.messageName = new DbString();
        this.tenantAwareMessageName = new DbTenantAwareKey(this.tenantIdKey, (DbKey)this.messageName, DbTenantAwareKey.PlacementType.PREFIX);
        this.elementKeyAndMessageName = new DbCompositeKey((DbKey)this.elementInstanceKey, this.tenantAwareMessageName);
        this.processMessageSubscription = new ProcessMessageSubscription();
        this.transientState = transientProcessMessageSubscriptionState;
        this.subscriptionColumnFamily = zeebeDb.createColumnFamily((Enum)ZbColumnFamilies.PROCESS_SUBSCRIPTION_BY_KEY, transactionContext, this.elementKeyAndMessageName, (DbValue)this.processMessageSubscription);
    }

    public void onRecovered(ReadonlyStreamProcessorContext context) {
        this.subscriptionColumnFamily.forEach(subscription -> {
            if (subscription.isOpening() || subscription.isClosing()) {
                ProcessMessageSubscriptionRecord record = subscription.getRecord();
                this.transientState.add(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()), this.clock.millis());
            }
        });
    }

    @Override
    public void put(long key, ProcessMessageSubscriptionRecord record) {
        this.wrapSubscriptionKeys(record.getElementInstanceKey(), record.getMessageNameBuffer(), record.getTenantId());
        this.processMessageSubscription.reset();
        this.processMessageSubscription.setKey(key).setRecord(record);
        this.subscriptionColumnFamily.insert(this.elementKeyAndMessageName, (DbValue)this.processMessageSubscription);
        this.transientState.add(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()), this.clock.millis());
    }

    @Override
    public void updateToOpeningState(ProcessMessageSubscriptionRecord record) {
        this.update(record, (ProcessMessageSubscription s) -> s.setRecord(record).setOpening());
        this.transientState.update(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()), this.clock.millis());
    }

    @Override
    public void updateToOpenedState(ProcessMessageSubscriptionRecord record) {
        this.update(record, (ProcessMessageSubscription s) -> s.setRecord(record).setOpened());
        this.transientState.remove(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()));
    }

    @Override
    public void updateToClosingState(ProcessMessageSubscriptionRecord record) {
        this.update(record, (ProcessMessageSubscription s) -> s.setRecord(record).setClosing());
        this.transientState.update(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()), this.clock.millis());
    }

    @Override
    public boolean remove(long elementInstanceKey, DirectBuffer messageName, String tenantId) {
        boolean found;
        ProcessMessageSubscription subscription = this.getSubscription(elementInstanceKey, messageName, tenantId);
        boolean bl = found = subscription != null;
        if (found) {
            this.remove(subscription);
        }
        return found;
    }

    @Override
    public void update(long key, ProcessMessageSubscriptionRecord record) {
        this.update(record, (ProcessMessageSubscription s) -> s.setRecord(record));
    }

    @Override
    public ProcessMessageSubscription getSubscription(long elementInstanceKey, DirectBuffer messageName, String tenantId) {
        this.wrapSubscriptionKeys(elementInstanceKey, messageName, tenantId);
        return (ProcessMessageSubscription)this.subscriptionColumnFamily.get(this.elementKeyAndMessageName);
    }

    @Override
    public void visitElementSubscriptions(long elementInstanceKey, ProcessMessageSubscriptionState.ProcessMessageSubscriptionVisitor visitor) {
        this.elementInstanceKey.wrapLong(elementInstanceKey);
        this.subscriptionColumnFamily.whileEqualPrefix((DbKey)this.elementInstanceKey, (compositeKey, subscription) -> visitor.visit((ProcessMessageSubscription)((Object)subscription)));
    }

    @Override
    public boolean existSubscriptionForElementInstance(long elementInstanceKey, DirectBuffer messageName, String tenantId) {
        this.wrapSubscriptionKeys(elementInstanceKey, messageName, tenantId);
        return this.subscriptionColumnFamily.exists(this.elementKeyAndMessageName);
    }

    @Override
    public void visitPending(long deadline, ProcessMessageSubscriptionState.ProcessMessageSubscriptionVisitor visitor) {
        for (TransientPendingSubscriptionState.PendingSubscription pendingSubscription : this.transientState.entriesBefore(deadline)) {
            ProcessMessageSubscription subscription = this.getSubscription(pendingSubscription.elementInstanceKey(), BufferUtil.wrapString((String)pendingSubscription.messageName()), pendingSubscription.tenantId());
            if (subscription == null) {
                LOG.warn("Expected to find subscription with key {} messageName {} tenantId: {}, but no subscription found", new Object[]{pendingSubscription.elementInstanceKey(), pendingSubscription.messageName(), pendingSubscription.tenantId()});
                continue;
            }
            visitor.visit(subscription);
        }
    }

    @Override
    public void onSent(ProcessMessageSubscriptionRecord record, long timestampMs) {
        this.transientState.update(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()), timestampMs);
    }

    private void update(ProcessMessageSubscriptionRecord record, Consumer<ProcessMessageSubscription> modifier) {
        ProcessMessageSubscription subscription = this.getSubscription(record.getElementInstanceKey(), record.getMessageNameBuffer(), record.getTenantId());
        if (subscription == null) {
            return;
        }
        this.update(subscription, modifier);
    }

    private void update(ProcessMessageSubscription subscription, Consumer<ProcessMessageSubscription> modifier) {
        modifier.accept(subscription);
        ProcessMessageSubscriptionRecord record = subscription.getRecord();
        this.wrapSubscriptionKeys(record.getElementInstanceKey(), record.getMessageNameBuffer(), record.getTenantId());
        this.subscriptionColumnFamily.update(this.elementKeyAndMessageName, (DbValue)subscription);
    }

    private void remove(ProcessMessageSubscription subscription) {
        ProcessMessageSubscriptionRecord record = subscription.getRecord();
        this.wrapSubscriptionKeys(record.getElementInstanceKey(), record.getMessageNameBuffer(), record.getTenantId());
        this.subscriptionColumnFamily.deleteExisting(this.elementKeyAndMessageName);
        this.transientState.remove(new TransientPendingSubscriptionState.PendingSubscription(record.getElementInstanceKey(), record.getMessageName(), record.getTenantId()));
    }

    private void wrapSubscriptionKeys(long elementInstanceKey, DirectBuffer messageName, String tenantId) {
        this.elementInstanceKey.wrapLong(elementInstanceKey);
        this.messageName.wrapBuffer(messageName);
        this.tenantIdKey.wrapString(tenantId);
    }
}

