/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.RequestData;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.engine.state.mutable.MutableMessageCorrelationState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageCorrelationRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import java.util.concurrent.atomic.AtomicBoolean;

public final class MessageSubscriptionRejectProcessor
implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private static final String SUBSCRIPTION_NOT_FOUND = "Expected to find subscription for message with name '%s' and correlation key '%s', but none was found.";
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final MutableMessageCorrelationState messageCorrelationState;
    private final SubscriptionCommandSender commandSender;
    private final StateWriter stateWriter;
    private final TypedResponseWriter responseWriter;

    public MessageSubscriptionRejectProcessor(MessageState messageState, MessageSubscriptionState subscriptionState, MutableMessageCorrelationState messageCorrelationState, SubscriptionCommandSender commandSender, Writers writers) {
        this.messageState = messageState;
        this.subscriptionState = subscriptionState;
        this.messageCorrelationState = messageCorrelationState;
        this.commandSender = commandSender;
        this.stateWriter = writers.state();
        this.responseWriter = writers.response();
    }

    @Override
    public void processRecord(TypedRecord<MessageSubscriptionRecord> record) {
        MessageSubscriptionRecord subscriptionRecord = (MessageSubscriptionRecord)record.getValue();
        this.stateWriter.appendFollowUpEvent(record.getKey(), (Intent)MessageSubscriptionIntent.REJECTED, (RecordValue)subscriptionRecord);
        boolean foundSubscription = this.findSubscriptionToCorrelate(subscriptionRecord);
        if (!foundSubscription) {
            this.writeNotCorrelatedResponse(record);
        }
    }

    private boolean findSubscriptionToCorrelate(MessageSubscriptionRecord subscriptionRecord) {
        long messageKey = subscriptionRecord.getMessageKey();
        StoredMessage storedMessage = this.messageState.getMessage(messageKey);
        if (storedMessage == null) {
            return false;
        }
        AtomicBoolean foundSubscription = new AtomicBoolean(false);
        this.subscriptionState.visitSubscriptions(subscriptionRecord.getTenantId(), subscriptionRecord.getMessageNameBuffer(), subscriptionRecord.getCorrelationKeyBuffer(), subscription -> {
            boolean canBeCorrelated;
            MessageSubscriptionRecord correlatingSubscription = subscription.getRecord();
            boolean bl = canBeCorrelated = correlatingSubscription.getBpmnProcessIdBuffer().equals((Object)subscriptionRecord.getBpmnProcessIdBuffer()) && !subscription.isCorrelating();
            if (canBeCorrelated) {
                correlatingSubscription.setMessageKey(messageKey).setVariables(storedMessage.getMessage().getVariablesBuffer());
                this.stateWriter.appendFollowUpEvent(subscription.getKey(), (Intent)MessageSubscriptionIntent.CORRELATING, (RecordValue)correlatingSubscription);
                this.sendCorrelateCommand(correlatingSubscription);
                foundSubscription.set(true);
            }
            return !canBeCorrelated;
        });
        return foundSubscription.get();
    }

    private void sendCorrelateCommand(MessageSubscriptionRecord subscription) {
        this.commandSender.correlateProcessMessageSubscription(subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessIdBuffer(), subscription.getMessageNameBuffer(), subscription.getMessageKey(), subscription.getVariablesBuffer(), subscription.getCorrelationKeyBuffer(), subscription.getTenantId());
    }

    private void writeNotCorrelatedResponse(TypedRecord<MessageSubscriptionRecord> record) {
        MessageSubscriptionRecord messageSubscription = (MessageSubscriptionRecord)record.getValue();
        long messageKey = messageSubscription.getMessageKey();
        if (this.messageCorrelationState.existsRequestDataForMessageKey(messageKey)) {
            RequestData requestData = this.messageCorrelationState.getRequestData(messageKey);
            MessageCorrelationRecord messageCorrelationRecord = new MessageCorrelationRecord().setName(messageSubscription.getMessageName()).setCorrelationKey(messageSubscription.getCorrelationKey()).setVariables(messageSubscription.getVariablesBuffer()).setTenantId(messageSubscription.getTenantId()).setMessageKey(messageKey);
            this.stateWriter.appendFollowUpEvent(messageKey, (Intent)MessageCorrelationIntent.NOT_CORRELATED, (RecordValue)messageCorrelationRecord);
            this.responseWriter.writeRejection(record, RejectionType.NOT_FOUND, SUBSCRIPTION_NOT_FOUND.formatted(messageSubscription.getMessageKey(), messageSubscription.getCorrelationKey()), requestData.getRequestId(), requestData.getRequestStreamId());
        }
    }
}

