/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.changelog.fs.UploadResult;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.SequenceNumberRange;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
class FsStateChangelogWriter
implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class);
    private static final long DUMMY_PERSIST_CHECKPOINT = -1L;
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of((long)0L);
    private final UUID logId;
    private final KeyGroupRange keyGroupRange;
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final List<UploadCompletionListener> uploadCompletionListeners = new ArrayList<UploadCompletionListener>();
    private SequenceNumber activeSequenceNumber = INITIAL_SQN;
    private SequenceNumber lowestSequenceNumber = INITIAL_SQN;
    private SequenceNumber highestSequenceNumber = SequenceNumber.of((long)Long.MAX_VALUE);
    private List<StateChange> activeChangeSet = new ArrayList<StateChange>();
    private long activeChangeSetSize;
    private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded = new TreeMap<SequenceNumber, StateChangeSet>();
    private final NavigableMap<SequenceNumber, UploadResult> uploaded = new TreeMap<SequenceNumber, UploadResult>();
    @Nullable
    private Tuple2<SequenceNumber, Throwable> highestFailed;
    private boolean closed;
    private final MailboxExecutor mailboxExecutor;
    private final TaskChangelogRegistry changelogRegistry;
    @Nonnull
    private final LocalRecoveryConfig localRecoveryConfig;
    private final LocalChangelogRegistry localChangelogRegistry;

    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, MailboxExecutor mailboxExecutor, TaskChangelogRegistry changelogRegistry, LocalRecoveryConfig localRecoveryConfig, LocalChangelogRegistry localChangelogRegistry) {
        this.logId = logId;
        this.keyGroupRange = keyGroupRange;
        this.uploader = uploader;
        this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
        this.mailboxExecutor = mailboxExecutor;
        this.changelogRegistry = changelogRegistry;
        this.localRecoveryConfig = localRecoveryConfig;
        this.localChangelogRegistry = localChangelogRegistry;
    }

    public void appendMeta(byte[] value) throws IOException {
        if (this.closed) {
            LOG.warn("{} is closed.", (Object)this.logId);
            return;
        }
        LOG.trace("append metadata to {}: {} bytes", (Object)this.logId, (Object)value.length);
        this.activeChangeSet.add(StateChange.ofMetadataChange((byte[])value));
        this.preEmptiveFlushIfNeeded(value);
    }

    public void append(int keyGroup, byte[] value) throws IOException {
        LOG.trace("append to {}: keyGroup={} {} bytes", new Object[]{this.logId, keyGroup, value.length});
        if (this.closed) {
            LOG.warn("{} is closed.", (Object)this.logId);
            return;
        }
        this.activeChangeSet.add(StateChange.ofDataChange((int)keyGroup, (byte[])value));
        this.preEmptiveFlushIfNeeded(value);
    }

    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    public SequenceNumber nextSequenceNumber() {
        this.rollover();
        LOG.trace("query {} sqn: {}", (Object)this.logId, (Object)this.activeSequenceNumber);
        return this.activeSequenceNumber;
    }

    public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(SequenceNumber from, long checkpointId) throws IOException {
        LOG.debug("persist {} starting from sqn {} (incl.), active sqn: {}", new Object[]{this.logId, from, this.activeSequenceNumber});
        return this.persistInternal(from, checkpointId);
    }

    private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
        this.activeChangeSetSize += (long)value.length;
        if (this.activeChangeSetSize >= this.preEmptivePersistThresholdInBytes) {
            LOG.debug("pre-emptively flush {}MB of appended changes to the common store", (Object)(this.activeChangeSetSize / 1024L / 1024L));
            this.persistInternal(this.notUploaded.isEmpty() ? this.activeSequenceNumber : (SequenceNumber)this.notUploaded.firstKey(), -1L);
        }
    }

    private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistInternal(SequenceNumber from, long checkpointId) throws IOException {
        SequenceNumberRange range;
        this.ensureCanPersist(from);
        this.rollover();
        Map<SequenceNumber, StateChangeSet> toUpload = FsStateChangelogWriter.drainTailMap(this.notUploaded, from);
        NavigableMap<SequenceNumber, UploadResult> readyToReturn = this.uploaded.tailMap(from, true);
        LOG.debug("collected readyToReturn: {}, toUpload: {}, checkpointId: {}.", new Object[]{readyToReturn, toUpload, checkpointId});
        if (checkpointId != -1L) {
            for (UploadResult uploadResult : readyToReturn.values()) {
                if (uploadResult.localStreamHandle == null) continue;
                this.localChangelogRegistry.register(uploadResult.localStreamHandle, checkpointId);
            }
        }
        if ((range = SequenceNumberRange.generic((SequenceNumber)from, (SequenceNumber)this.activeSequenceNumber)).size() == (long)readyToReturn.size()) {
            Preconditions.checkState((boolean)toUpload.isEmpty());
            return CompletableFuture.completedFuture(this.buildSnapshotResult(this.keyGroupRange, readyToReturn, 0L));
        }
        CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future = new CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>();
        this.uploadCompletionListeners.add(new UploadCompletionListener(this.keyGroupRange, range, readyToReturn, future));
        if (!toUpload.isEmpty()) {
            StateChangeUploadScheduler.UploadTask uploadTask = new StateChangeUploadScheduler.UploadTask(toUpload.values(), uploadResults -> this.handleUploadSuccess((List<UploadResult>)uploadResults, checkpointId), this::handleUploadFailure);
            this.uploader.upload(uploadTask);
        }
        return future;
    }

    private void handleUploadFailure(List<SequenceNumber> failedSqn, Throwable throwable) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                return;
            }
            this.uploadCompletionListeners.removeIf(listener -> listener.onFailure(failedSqn, throwable));
            failedSqn.stream().max(Comparator.naturalOrder()).filter(sqn -> sqn.compareTo((Object)this.lowestSequenceNumber) >= 0).filter(sqn -> this.highestFailed == null || sqn.compareTo(this.highestFailed.f0) > 0).ifPresent(sqn -> {
                this.highestFailed = Tuple2.of((Object)sqn, (Object)throwable);
            });
        }, "handleUploadFailure");
    }

    private void handleUploadSuccess(List<UploadResult> results, long checkpointId) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                results.forEach(r -> IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{() -> r.getStreamStateHandle().discardState()}));
            } else {
                this.uploadCompletionListeners.removeIf(listener -> listener.onSuccess(results));
                for (UploadResult result : results) {
                    SequenceNumber resultSqn;
                    if (checkpointId != -1L && result.localStreamHandle != null) {
                        this.localChangelogRegistry.register(result.localStreamHandle, checkpointId);
                    }
                    if ((resultSqn = result.sequenceNumber).compareTo((Object)this.lowestSequenceNumber) >= 0 && resultSqn.compareTo((Object)this.highestSequenceNumber) < 0) {
                        this.uploaded.put(resultSqn, result);
                        continue;
                    }
                    this.changelogRegistry.release(result.streamStateHandle);
                    if (result.localStreamHandle == null) continue;
                    this.changelogRegistry.release(result.localStreamHandle);
                }
            }
        }, "handleUploadSuccess");
    }

    public void close() {
        LOG.debug("close {}", (Object)this.logId);
        Preconditions.checkState((!this.closed ? 1 : 0) != 0);
        this.closed = true;
        this.activeChangeSet.clear();
        this.activeChangeSetSize = 0L;
        this.notUploaded.clear();
        this.uploaded.clear();
    }

    public void truncate(SequenceNumber to) {
        LOG.debug("truncate {} to sqn {} (excl.)", (Object)this.logId, (Object)to);
        Preconditions.checkArgument((to.compareTo((Object)this.activeSequenceNumber) <= 0 ? 1 : 0) != 0);
        this.lowestSequenceNumber = to;
        this.notUploaded.headMap(this.lowestSequenceNumber, false).clear();
        SortedMap<SequenceNumber, UploadResult> toDiscard = this.uploaded.headMap(to);
        this.notifyStateNotUsed(toDiscard);
        toDiscard.clear();
    }

    public void truncateAndClose(SequenceNumber from) {
        LOG.debug("truncate {} tail from sqn {} (incl.)", (Object)this.logId, (Object)from);
        this.highestSequenceNumber = from;
        this.notifyStateNotUsed(this.uploaded.tailMap(from));
        this.close();
    }

    private void rollover() {
        if (this.activeChangeSet.isEmpty()) {
            return;
        }
        this.notUploaded.put(this.activeSequenceNumber, new StateChangeSet(this.logId, this.activeSequenceNumber, this.activeChangeSet));
        this.activeSequenceNumber = this.activeSequenceNumber.next();
        LOG.debug("bump active sqn to {}", (Object)this.activeSequenceNumber);
        this.activeChangeSet = new ArrayList<StateChange>();
        this.activeChangeSetSize = 0L;
    }

    public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {
        Preconditions.checkState((from.compareTo((Object)to) <= 0 ? 1 : 0) != 0, (String)"Invalid confirm range: [%s,%s)", (Object[])new Object[]{from, to});
        Preconditions.checkState((from.compareTo((Object)this.activeSequenceNumber) <= 0 && to.compareTo((Object)this.activeSequenceNumber) <= 0 ? 1 : 0) != 0, (String)"Invalid confirm range: [%s,%s), active sqn: %s", (Object[])new Object[]{from, to, this.activeSequenceNumber});
        LOG.debug("Confirm [{}, {})", (Object)from, (Object)to);
        this.uploaded.subMap(from, to).values().stream().map(UploadResult::getStreamStateHandle).forEach(this.changelogRegistry::stopTracking);
        this.uploaded.subMap(from, to).values().stream().map(UploadResult::getLocalStreamHandleStateHandle).filter(localHandle -> localHandle != null).forEach(localHandle -> this.changelogRegistry.stopTracking((StreamStateHandle)localHandle));
        this.localChangelogRegistry.discardUpToCheckpoint(checkpointId);
    }

    public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) {
        this.localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1L);
    }

    private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(KeyGroupRange keyGroupRange, NavigableMap<SequenceNumber, UploadResult> results, long incrementalSize) {
        ArrayList<Tuple2> tuples = new ArrayList<Tuple2>();
        long size = 0L;
        for (UploadResult uploadResult : results.values()) {
            tuples.add(Tuple2.of((Object)uploadResult.getStreamStateHandle(), (Object)uploadResult.getOffset()));
            size += uploadResult.getSize();
        }
        ChangelogStateHandleStreamImpl jmChangelogStateHandle = new ChangelogStateHandleStreamImpl(tuples, keyGroupRange, size, incrementalSize, "filesystem");
        if (this.localRecoveryConfig.isLocalBackupEnabled()) {
            size = 0L;
            ArrayList<Tuple2> localTuples = new ArrayList<Tuple2>();
            for (UploadResult uploadResult : results.values()) {
                if (uploadResult.getLocalStreamHandleStateHandle() == null) continue;
                localTuples.add(Tuple2.of((Object)uploadResult.getLocalStreamHandleStateHandle(), (Object)uploadResult.getLocalOffset()));
                size += uploadResult.getSize();
            }
            ChangelogStateHandleStreamImpl localChangelogStateHandle = null;
            if (localTuples.size() == tuples.size()) {
                localChangelogStateHandle = new ChangelogStateHandleStreamImpl(localTuples, keyGroupRange, size, 0L, "filesystem");
                return SnapshotResult.withLocalState((StateObject)jmChangelogStateHandle, (StateObject)localChangelogStateHandle);
            }
            LOG.warn("local handles are different from remote");
        }
        return SnapshotResult.of((StateObject)jmChangelogStateHandle);
    }

    @VisibleForTesting
    SequenceNumber lastAppendedSqnUnsafe() {
        return this.activeSequenceNumber;
    }

    private void ensureCanPersist(SequenceNumber from) throws IOException {
        Preconditions.checkNotNull((Object)from);
        if (this.highestFailed != null && ((SequenceNumber)this.highestFailed.f0).compareTo((Object)from) >= 0) {
            throw new IOException("The upload for " + this.highestFailed.f0 + " has already failed previously", (Throwable)this.highestFailed.f1);
        }
        if (this.lowestSequenceNumber.compareTo((Object)from) > 0) {
            throw new IllegalArgumentException(String.format("Requested changes were truncated (requested: %s, truncated: %s)", from, this.lowestSequenceNumber));
        }
        if (this.activeSequenceNumber.compareTo((Object)from) < 0) {
            throw new IllegalArgumentException(String.format("Requested changes were not yet appended (requested: %s, appended: %s)", from, this.activeSequenceNumber));
        }
    }

    private static Map<SequenceNumber, StateChangeSet> drainTailMap(NavigableMap<SequenceNumber, StateChangeSet> src, SequenceNumber fromInclusive) {
        NavigableMap<SequenceNumber, StateChangeSet> tailMap = src.tailMap(fromInclusive, true);
        HashMap<SequenceNumber, StateChangeSet> toUpload = new HashMap<SequenceNumber, StateChangeSet>(tailMap);
        tailMap.clear();
        return toUpload;
    }

    private void notifyStateNotUsed(Map<SequenceNumber, UploadResult> notUsedState) {
        LOG.trace("Uploaded state to discard: {}", notUsedState);
        for (UploadResult result : notUsedState.values()) {
            this.changelogRegistry.release(result.streamStateHandle);
            if (result.localStreamHandle == null) continue;
            this.changelogRegistry.release(result.localStreamHandle);
        }
    }

    private final class UploadCompletionListener {
        private final NavigableMap<SequenceNumber, UploadResult> uploaded;
        private final CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> completionFuture;
        private final KeyGroupRange keyGroupRange;
        private final SequenceNumberRange changeRange;

        private UploadCompletionListener(KeyGroupRange keyGroupRange, SequenceNumberRange changeRange, Map<SequenceNumber, UploadResult> uploaded, CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> completionFuture) {
            Preconditions.checkArgument((!changeRange.isEmpty() ? 1 : 0) != 0, (String)"Empty change range not allowed: %s", (Object[])new Object[]{changeRange});
            this.uploaded = new TreeMap<SequenceNumber, UploadResult>(uploaded);
            this.completionFuture = completionFuture;
            this.keyGroupRange = keyGroupRange;
            this.changeRange = changeRange;
        }

        public boolean onSuccess(List<UploadResult> uploadResults) {
            long incrementalSize = 0L;
            for (UploadResult uploadResult : uploadResults) {
                if (!this.changeRange.contains(uploadResult.sequenceNumber)) continue;
                this.uploaded.put(uploadResult.sequenceNumber, uploadResult);
                incrementalSize += uploadResult.getSize();
                if ((long)this.uploaded.size() != this.changeRange.size()) continue;
                this.completionFuture.complete((SnapshotResult<ChangelogStateHandleStreamImpl>)FsStateChangelogWriter.this.buildSnapshotResult(this.keyGroupRange, this.uploaded, incrementalSize));
                return true;
            }
            return false;
        }

        public boolean onFailure(List<SequenceNumber> sequenceNumbers, Throwable throwable) {
            IOException ioException = throwable instanceof IOException ? (IOException)throwable : new IOException(throwable);
            for (SequenceNumber sequenceNumber : sequenceNumbers) {
                if (!this.changeRange.contains(sequenceNumber)) continue;
                this.completionFuture.completeExceptionally(ioException);
                return true;
            }
            return false;
        }
    }
}

