/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.behavior;

import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.deployment.DeployedProcess;
import io.camunda.zeebe.engine.state.immutable.MessageStartEventSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.immutable.ProcessingState;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageStartEventSubscriptionRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import java.time.InstantSource;
import java.util.Optional;
import org.agrona.DirectBuffer;

public final class BpmnBufferedMessageStartEventBehavior {
    private final MessageState messageState;
    private final ProcessState processState;
    private final MessageStartEventSubscriptionState messageStartEventSubscriptionState;
    private final EventHandle eventHandle;
    private final InstantSource clock;

    public BpmnBufferedMessageStartEventBehavior(ProcessingState processingState, KeyGenerator keyGenerator, EventTriggerBehavior eventTriggerBehavior, BpmnStateBehavior stateBehavior, Writers writers, InstantSource clock) {
        this.messageState = processingState.getMessageState();
        this.processState = processingState.getProcessState();
        this.messageStartEventSubscriptionState = processingState.getMessageStartEventSubscriptionState();
        this.clock = clock;
        this.eventHandle = new EventHandle(keyGenerator, processingState.getEventScopeInstanceState(), writers, this.processState, eventTriggerBehavior, stateBehavior);
    }

    public Optional<DirectBuffer> findCorrelationKey(BpmnElementContext context) {
        long processInstanceKey = context.getProcessInstanceKey();
        return Optional.ofNullable(this.messageState.getProcessInstanceCorrelationKey(processInstanceKey));
    }

    public void correlateMessage(BpmnElementContext context, DirectBuffer correlationKey) {
        if (correlationKey != null) {
            this.correlateNextBufferedMessage(correlationKey, context);
        }
    }

    private void correlateNextBufferedMessage(DirectBuffer correlationKey, BpmnElementContext context) {
        DirectBuffer bpmnProcessId = context.getBpmnProcessId();
        DeployedProcess process = this.processState.getLatestProcessVersionByProcessId(bpmnProcessId, context.getTenantId());
        this.findNextMessageToCorrelate(process, correlationKey).ifPresent(messageCorrelation -> {
            StoredMessage storedMessage = this.messageState.getMessage(messageCorrelation.messageKey);
            this.eventHandle.triggerMessageStartEvent(messageCorrelation.subscriptionKey, messageCorrelation.subscriptionRecord, storedMessage.getMessageKey(), storedMessage.getMessage().getNameBuffer(), storedMessage.getMessage().getCorrelationKeyBuffer(), storedMessage.getMessage().getVariablesBuffer());
        });
    }

    private Optional<Correlation> findNextMessageToCorrelate(DeployedProcess process, DirectBuffer correlationKey) {
        Correlation messageCorrelation = new Correlation();
        this.messageStartEventSubscriptionState.visitSubscriptionsByProcessDefinition(process.getKey(), subscription -> {
            MessageStartEventSubscriptionRecord subscriptionRecord = subscription.getRecord();
            DirectBuffer messageName = subscriptionRecord.getMessageNameBuffer();
            this.messageState.visitMessages(subscriptionRecord.getTenantId(), messageName, correlationKey, storedMessage -> {
                if (storedMessage.getMessage().getDeadline() > this.clock.millis() && !this.messageState.existMessageCorrelation(storedMessage.getMessageKey(), process.getBpmnProcessId())) {
                    if (storedMessage.getMessageKey() < messageCorrelation.messageKey) {
                        messageCorrelation.messageKey = storedMessage.getMessageKey();
                        messageCorrelation.subscriptionKey = subscription.getKey();
                        messageCorrelation.subscriptionRecord.wrap(subscription.getRecord());
                    }
                    return false;
                }
                return true;
            });
        });
        if (messageCorrelation.subscriptionKey > 0L) {
            return Optional.of(messageCorrelation);
        }
        return Optional.empty();
    }

    private static final class Correlation {
        private long messageKey = Long.MAX_VALUE;
        private long subscriptionKey = -1L;
        private final MessageStartEventSubscriptionRecord subscriptionRecord = new MessageStartEventSubscriptionRecord();

        private Correlation() {
        }
    }
}

