/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderFlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.InFlightAppend;
import io.camunda.zeebe.logstreams.impl.log.SequencedBatch;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;

final class LogStorageAppender
extends Actor
implements HealthMonitorable,
AppendErrorHandler {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final String name;
    private final AppenderFlowControl flowControl;
    private final Sequencer sequencer;
    private final LogStorage logStorage;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private final ActorFuture<Void> closeFuture;
    private final AppenderMetrics metrics;
    private final int partitionId;

    LogStorageAppender(String name, int partitionId, LogStorage logStorage, Sequencer sequencer) {
        this.name = name;
        this.partitionId = partitionId;
        this.logStorage = logStorage;
        this.sequencer = sequencer;
        this.metrics = new AppenderMetrics(partitionId);
        this.flowControl = new AppenderFlowControl(this, this.metrics);
        this.closeFuture = new CompletableActorFuture();
    }

    protected Map<String, String> createContext() {
        Map context = super.createContext();
        context.put("partitionId", Integer.toString(this.partitionId));
        return context;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.sequencer.registerConsumer(this.actor.onCondition("sequencer", this::tryWriteBatch));
        this.actor.submit(this::tryWriteBatch);
    }

    protected void onActorClosed() {
        this.closeFuture.complete(null);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        super.closeAsync();
        return this.closeFuture;
    }

    protected void handleFailure(Throwable failure) {
        this.onFailure(failure);
    }

    public void onActorFailed() {
        this.closeFuture.complete(null);
    }

    public HealthReport getHealthReport() {
        return this.actor.isClosed() ? HealthReport.unhealthy((HealthMonitorable)this).withMessage("actor is closed") : HealthReport.healthy((HealthMonitorable)this);
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    private void tryWriteBatch() {
        Optional<InFlightAppend> inflightAppend = this.flowControl.tryAcquire();
        if (inflightAppend.isEmpty()) {
            this.actor.submit(this::tryWriteBatch);
            return;
        }
        this.writeBatch(inflightAppend.get());
    }

    private void writeBatch(InFlightAppend append) {
        SequencedBatch sequencedBatch = this.sequencer.tryRead();
        if (sequencedBatch == null) {
            append.discard();
            return;
        }
        long lowestPosition = sequencedBatch.firstPosition();
        long highestPosition = sequencedBatch.firstPosition() + (long)sequencedBatch.entries().size() - 1L;
        List<LogAppendEntryMetadata> metricsMetadata = this.copyMetricsMetadata(sequencedBatch);
        append.start(highestPosition);
        this.logStorage.append(lowestPosition, highestPosition, sequencedBatch, (LogStorage.AppendListener)new InstrumentedAppendListener(append, metricsMetadata, this.metrics));
        this.actor.submit(this::tryWriteBatch);
    }

    private List<LogAppendEntryMetadata> copyMetricsMetadata(SequencedBatch sequencedBatch) {
        List<LogAppendEntry> entries = sequencedBatch.entries();
        ArrayList<LogAppendEntryMetadata> metricsMetadata = new ArrayList<LogAppendEntryMetadata>(entries.size());
        for (LogAppendEntry entry : entries) {
            metricsMetadata.add(new LogAppendEntryMetadata(entry));
        }
        return metricsMetadata;
    }

    private void onFailure(Throwable error) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.name, this.actor.getLifecyclePhase(), error});
        this.actor.fail(error);
        HealthReport report = HealthReport.unhealthy((HealthMonitorable)this).withIssue(error);
        this.failureListeners.forEach(l -> l.onFailure(report));
    }

    @Override
    public void onCommitError(Throwable error) {
        this.actor.run(() -> this.onFailure(error));
    }

    @Override
    public void onWriteError(Throwable error) {
        this.actor.run(() -> this.onFailure(error));
    }

    private record InstrumentedAppendListener(LogStorage.AppendListener delegate, List<LogAppendEntryMetadata> batchMetadata, AppenderMetrics metrics) implements LogStorage.AppendListener
    {
        @Override
        public void onWrite(long address) {
            this.delegate.onWrite(address);
            this.batchMetadata.forEach(this::recordAppendedEntry);
        }

        @Override
        public void onWriteError(Throwable error) {
            this.delegate.onWriteError(error);
        }

        @Override
        public void onCommit(long address) {
            this.delegate.onCommit(address);
        }

        @Override
        public void onCommitError(long address, Throwable error) {
            this.delegate.onCommitError(address, error);
        }

        private void recordAppendedEntry(LogAppendEntryMetadata metadata) {
            this.metrics.recordAppendedEntry(1, metadata.recordType(), metadata.valueType(), metadata.intent());
        }
    }

    private record LogAppendEntryMetadata(RecordType recordType, ValueType valueType, Intent intent) {
        private LogAppendEntryMetadata(LogAppendEntry entry) {
            this(entry.recordMetadata().getRecordType(), entry.recordMetadata().getValueType(), entry.recordMetadata().getIntent());
        }
    }
}

