/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.engine.state.routing.RoutingInfo;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

public final class CommandDistributionBehavior {
    private final DistributionState distributionState;
    private final TypedCommandWriter commandWriter;
    private final StateWriter stateWriter;
    private final SideEffectWriter sideEffectWriter;
    private final RoutingInfo routingInfo;
    private final InterPartitionCommandSender interPartitionCommandSender;
    private final int currentPartitionId;
    private final CommandDistributionRecord commandDistributionStarted = new CommandDistributionRecord();
    private final CommandDistributionRecord commandDistributionDistributing = new CommandDistributionRecord();
    private final CommandDistributionRecord commandDistributionEnqueued = new CommandDistributionRecord();
    private final CommandDistributionRecord commandDistributionAcknowledge = new CommandDistributionRecord();
    private final CommandDistributionRecord commandDistributionContinuation = new CommandDistributionRecord();

    public CommandDistributionBehavior(DistributionState distributionState, Writers writers, int currentPartition, RoutingInfo routingInfo, InterPartitionCommandSender partitionCommandSender) {
        this.distributionState = distributionState;
        this.commandWriter = writers.command();
        this.stateWriter = writers.state();
        this.sideEffectWriter = writers.sideEffect();
        this.routingInfo = routingInfo;
        this.interPartitionCommandSender = partitionCommandSender;
        this.currentPartitionId = currentPartition;
    }

    public RequestBuilder withKey(long distributionKey) {
        return new DistributionRequest(distributionKey);
    }

    private <T extends UnifiedRecordValue> void distributeCommand(String queue, long distributionKey, ValueType valueType, Intent intent, T value, Set<Integer> partitions) {
        if (partitions.isEmpty() || partitions.size() == 1 && partitions.contains(this.currentPartitionId)) {
            return;
        }
        this.commandDistributionStarted.reset();
        CommandDistributionRecord distributionRecord = this.commandDistributionStarted.setQueueId(queue).setPartitionId(this.currentPartitionId).setValueType(valueType).setIntent(intent).setCommandValue(value);
        this.stateWriter.appendFollowUpEvent(distributionKey, (Intent)CommandDistributionIntent.STARTED, (RecordValue)distributionRecord);
        partitions.forEach(partition -> {
            if (partition == this.currentPartitionId) {
                return;
            }
            this.distributeToPartition((int)partition, distributionRecord, distributionKey);
        });
    }

    private void distributeToPartition(int partition, CommandDistributionRecord distributionRecord, long distributionKey) {
        Optional<String> distributionQueue = Optional.ofNullable(distributionRecord.getQueueId());
        distributionQueue.ifPresent(queue -> this.enqueueDistribution((String)queue, partition, distributionKey));
        boolean canDistributeImmediately = distributionQueue.flatMap(queue -> this.distributionState.getNextQueuedDistributionKey((String)queue, partition)).filter(nextDistributionKey -> nextDistributionKey != distributionKey).isEmpty();
        if (canDistributeImmediately) {
            this.startDistributing(partition, distributionRecord, distributionKey);
        }
    }

    private void enqueueDistribution(String queue, int partition, long distributionKey) {
        this.commandDistributionEnqueued.reset();
        this.stateWriter.appendFollowUpEvent(distributionKey, (Intent)CommandDistributionIntent.ENQUEUED, (RecordValue)this.commandDistributionEnqueued.setQueueId(queue).setPartitionId(partition));
    }

    void distributeNextInQueue(String queue, int partition) {
        this.distributionState.getNextQueuedDistributionKey(queue, partition).ifPresent(nextDistributionKey -> this.startDistributing(partition, this.distributionState.getCommandDistributionRecord((long)nextDistributionKey, partition), (long)nextDistributionKey));
    }

    void continueAfterQueue(String queue) {
        if (this.distributionState.hasQueuedDistributions(queue)) {
            return;
        }
        this.distributionState.forEachContinuationCommand(queue, key -> this.handleContinuationCommand(queue, key));
    }

    private void handleContinuationCommand(String queue, long key) {
        this.commandDistributionContinuation.reset();
        this.commandDistributionContinuation.setQueueId(queue);
        this.commandDistributionContinuation.setPartitionId(this.currentPartitionId);
        this.commandWriter.appendFollowUpCommand(key, (Intent)CommandDistributionIntent.CONTINUE, (RecordValue)this.commandDistributionContinuation);
    }

    private void startDistributing(int partition, CommandDistributionRecord distributionRecord, long distributionKey) {
        ValueType valueType = distributionRecord.getValueType();
        Intent intent = distributionRecord.getIntent();
        this.commandDistributionDistributing.reset();
        this.stateWriter.appendFollowUpEvent(distributionKey, (Intent)CommandDistributionIntent.DISTRIBUTING, (RecordValue)this.commandDistributionDistributing.setPartitionId(partition).setValueType(valueType).setIntent(intent));
        UnifiedRecordValue commandValue = distributionRecord.getCommandValue();
        this.sideEffectWriter.appendSideEffect(() -> {
            this.interPartitionCommandSender.sendCommand(partition, valueType, intent, Long.valueOf(distributionKey), commandValue);
            return true;
        });
    }

    public <T extends UnifiedRecordValue> void acknowledgeCommand(TypedRecord<T> command) {
        long distributionKey = command.getKey();
        this.commandDistributionAcknowledge.reset();
        CommandDistributionRecord distributionRecord = this.commandDistributionAcknowledge.setPartitionId(this.currentPartitionId).setValueType(command.getValueType()).setIntent(command.getIntent());
        int receiverPartitionId = Protocol.decodePartitionId((long)distributionKey);
        this.sideEffectWriter.appendSideEffect(() -> {
            this.interPartitionCommandSender.sendCommand(receiverPartitionId, ValueType.COMMAND_DISTRIBUTION, (Intent)CommandDistributionIntent.ACKNOWLEDGE, Long.valueOf(distributionKey), (UnifiedRecordValue)distributionRecord);
            return true;
        });
    }

    private <T extends UnifiedRecordValue> void requestContinuation(String queue, long key, ValueType valueType, Intent intent, T value) {
        boolean writeImmediately;
        boolean bl = writeImmediately = !this.distributionState.hasQueuedDistributions(queue);
        if (writeImmediately) {
            this.commandWriter.appendFollowUpCommand(key, intent, (RecordValue)value);
            return;
        }
        this.commandDistributionContinuation.reset();
        this.commandDistributionContinuation.setQueueId(queue);
        this.commandDistributionContinuation.setPartitionId(this.currentPartitionId);
        this.commandDistributionContinuation.setValueType(valueType);
        this.commandDistributionContinuation.setIntent(intent);
        this.commandDistributionContinuation.setCommandValue(value);
        this.stateWriter.appendFollowUpEvent(key, (Intent)CommandDistributionIntent.CONTINUATION_REQUESTED, (RecordValue)this.commandDistributionContinuation);
    }

    private class DistributionRequest
    implements RequestBuilder,
    DistributionRequestBuilder,
    ContinuationRequestBuilder {
        final long key;
        String queue;
        Set<Integer> partitions;

        public DistributionRequest(long key) {
            this.partitions = CommandDistributionBehavior.this.routingInfo.partitions();
            this.key = key;
        }

        @Override
        public DistributionRequest unordered() {
            this.queue = null;
            return this;
        }

        @Override
        public DistributionRequest inQueue(String queue) {
            this.queue = Objects.requireNonNull(queue);
            return this;
        }

        @Override
        public ContinuationRequestBuilder afterQueue(String queue) {
            this.queue = Objects.requireNonNull(queue);
            return this;
        }

        @Override
        public DistributionRequestBuilder forPartition(int partition) {
            this.partitions = Set.of(Integer.valueOf(partition));
            return this;
        }

        @Override
        public DistributionRequestBuilder forPartitions(Set<Integer> partitions) {
            this.partitions = Objects.requireNonNull(partitions);
            return this;
        }

        @Override
        public DistributionRequestBuilder forOtherPartitions() {
            this.partitions = CommandDistributionBehavior.this.routingInfo.partitions();
            return this;
        }

        @Override
        public <T extends UnifiedRecordValue> void distribute(TypedRecord<T> command) {
            CommandDistributionBehavior.this.distributeCommand(this.queue, this.key, command.getValueType(), command.getIntent(), command.getValue(), this.partitions);
        }

        @Override
        public <T extends UnifiedRecordValue> void distribute(ValueType valueType, Intent intent, T value) {
            CommandDistributionBehavior.this.distributeCommand(this.queue, this.key, Objects.requireNonNull(valueType), Objects.requireNonNull(intent), Objects.requireNonNull(value), this.partitions);
        }

        @Override
        public <T extends UnifiedRecordValue> void continueWith(TypedRecord<T> command) {
            CommandDistributionBehavior.this.requestContinuation(this.queue, this.key, command.getValueType(), command.getIntent(), command.getValue());
        }

        @Override
        public <T extends UnifiedRecordValue> void continueWith(ValueType valueType, Intent intent, T value) {
            CommandDistributionBehavior.this.requestContinuation(this.queue, this.key, valueType, intent, value);
        }
    }

    public static interface ContinuationRequestBuilder {
        public <T extends UnifiedRecordValue> void continueWith(TypedRecord<T> var1);

        public <T extends UnifiedRecordValue> void continueWith(ValueType var1, Intent var2, T var3);
    }

    public static interface DistributionRequestBuilder {
        public DistributionRequestBuilder forPartition(int var1);

        public DistributionRequestBuilder forPartitions(Set<Integer> var1);

        public DistributionRequestBuilder forOtherPartitions();

        public <T extends UnifiedRecordValue> void distribute(TypedRecord<T> var1);

        public <T extends UnifiedRecordValue> void distribute(ValueType var1, Intent var2, T var3);
    }

    public static interface RequestBuilder {
        public DistributionRequestBuilder unordered();

        public DistributionRequestBuilder inQueue(String var1);

        public ContinuationRequestBuilder afterQueue(String var1);
    }
}

