/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.changelog.fs.RetryPolicy;
import org.apache.flink.changelog.fs.RetryingExecutor;
import org.apache.flink.changelog.fs.SchedulerFactory;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.changelog.fs.UploadResult;
import org.apache.flink.changelog.fs.UploadThrottle;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
class BatchingStateChangeUploadScheduler
implements StateChangeUploadScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeUploadScheduler.class);
    private final RetryingExecutor retryingExecutor;
    private final RetryPolicy retryPolicy;
    private final StateChangeUploader delegate;
    private final ScheduledExecutorService scheduler;
    private final long scheduleDelayMs;
    private final long sizeThresholdBytes;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Queue<StateChangeUploadScheduler.UploadTask> scheduled;
    @GuardedBy(value="lock")
    private long scheduledBytesCounter;
    private final AvailabilityProvider.AvailabilityHelper availabilityHelper;
    @Nullable
    @GuardedBy(value="lock")
    private ScheduledFuture<?> scheduledFuture;
    @Nullable
    @GuardedBy(value="this")
    private Throwable errorUnsafe;
    @GuardedBy(value="lock")
    private final UploadThrottle uploadThrottle;
    private final Histogram uploadBatchSizes;

    BatchingStateChangeUploadScheduler(long persistDelayMs, long sizeThresholdBytes, RetryPolicy retryPolicy, StateChangeUploader delegate, int numUploadThreads, long maxBytesInFlight, ChangelogStorageMetricGroup metricGroup) {
        this(persistDelayMs, sizeThresholdBytes, maxBytesInFlight, retryPolicy, delegate, SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG), new RetryingExecutor(numUploadThreads, metricGroup.getAttemptsPerUpload(), metricGroup.getTotalAttemptsPerUpload()), metricGroup);
    }

    BatchingStateChangeUploadScheduler(long persistDelayMs, long sizeThresholdBytes, long maxBytesInFlight, RetryPolicy retryPolicy, StateChangeUploader delegate, ScheduledExecutorService scheduler, RetryingExecutor retryingExecutor, ChangelogStorageMetricGroup metricGroup) {
        Preconditions.checkArgument((sizeThresholdBytes <= maxBytesInFlight ? 1 : 0) != 0, (String)"sizeThresholdBytes (%s) must not exceed maxBytesInFlight (%s)", (Object[])new Object[]{sizeThresholdBytes, maxBytesInFlight});
        this.scheduleDelayMs = persistDelayMs;
        this.scheduled = new LinkedList<StateChangeUploadScheduler.UploadTask>();
        this.scheduler = scheduler;
        this.retryPolicy = retryPolicy;
        this.retryingExecutor = retryingExecutor;
        this.sizeThresholdBytes = sizeThresholdBytes;
        this.delegate = delegate;
        this.uploadThrottle = new UploadThrottle(maxBytesInFlight);
        this.availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        this.availabilityHelper.resetAvailable();
        this.uploadBatchSizes = metricGroup.getUploadBatchSizes();
        metricGroup.registerUploadQueueSizeGauge((Gauge<Integer>)((Gauge)() -> {
            Queue<StateChangeUploadScheduler.UploadTask> queue = this.scheduled;
            synchronized (queue) {
                return this.scheduled.size();
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void upload(StateChangeUploadScheduler.UploadTask uploadTask) throws IOException {
        Throwable error = this.getErrorSafe();
        if (error != null) {
            LOG.debug("don't persist {} changesets, already failed", (Object)uploadTask.changeSets.size());
            uploadTask.fail(error);
            return;
        }
        LOG.debug("persist {} changeSets", (Object)uploadTask.changeSets.size());
        try {
            long size = uploadTask.getSize();
            Object object = this.lock;
            synchronized (object) {
                while (!this.uploadThrottle.hasCapacity()) {
                    this.lock.wait();
                }
                this.uploadThrottle.seizeCapacity(size);
                if (!this.uploadThrottle.hasCapacity()) {
                    this.availabilityHelper.resetUnavailable();
                }
                this.scheduledBytesCounter += size;
                this.scheduled.add(this.wrapWithSizeUpdate(uploadTask, size));
                this.scheduleUploadIfNeeded();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            uploadTask.fail(e);
            throw new IOException(e);
        }
        catch (Exception e) {
            uploadTask.fail(e);
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseCapacity(long size) {
        CompletableFuture toNotify = null;
        Object object = this.lock;
        synchronized (object) {
            boolean hadCapacityBefore = this.uploadThrottle.hasCapacity();
            this.uploadThrottle.releaseCapacity(size);
            this.lock.notifyAll();
            if (!hadCapacityBefore && this.uploadThrottle.hasCapacity()) {
                toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
            }
        }
        if (toNotify != null) {
            toNotify.complete(null);
        }
    }

    private void scheduleUploadIfNeeded() {
        Preconditions.checkState((boolean)Thread.holdsLock(this.lock));
        if (this.scheduleDelayMs == 0L || this.scheduledBytesCounter >= this.sizeThresholdBytes) {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduledFuture = null;
            }
            this.drainAndSave();
        } else if (this.scheduledFuture == null) {
            this.scheduledFuture = this.scheduler.schedule(this::drainAndSave, this.scheduleDelayMs, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainAndSave() {
        ArrayList<StateChangeUploadScheduler.UploadTask> tasks;
        Object object = this.lock;
        synchronized (object) {
            tasks = new ArrayList<StateChangeUploadScheduler.UploadTask>(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
            this.scheduledFuture = null;
        }
        if (tasks.size() == 0) {
            return;
        }
        try {
            Throwable error = this.getErrorSafe();
            if (error != null) {
                tasks.forEach(task -> task.fail(error));
                return;
            }
            this.uploadBatchSizes.update((long)tasks.size());
            this.retryingExecutor.execute(this.retryPolicy, this.asRetriableAction(tasks));
        }
        catch (Throwable t) {
            tasks.forEach(task -> task.fail(t));
            if (ExceptionUtils.findThrowable((Throwable)t, IOException.class).isPresent()) {
                LOG.warn("Caught IO exception while uploading", t);
            }
            this.setErrorSafe(t);
            throw t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        ArrayList<StateChangeUploadScheduler.UploadTask> drained;
        LOG.debug("close");
        this.scheduler.shutdownNow();
        if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 5s");
        }
        Object object = this.lock;
        synchronized (object) {
            drained = new ArrayList<StateChangeUploadScheduler.UploadTask>(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
        }
        CancellationException ce = new CancellationException();
        drained.forEach(task -> task.fail(ce));
        this.retryingExecutor.close();
        this.delegate.close();
    }

    private synchronized Throwable getErrorSafe() {
        return this.errorUnsafe;
    }

    private synchronized void setErrorSafe(Throwable t) {
        this.errorUnsafe = t;
    }

    private StateChangeUploadScheduler.UploadTask wrapWithSizeUpdate(StateChangeUploadScheduler.UploadTask uploadTask, long size) {
        return new StateChangeUploadScheduler.UploadTask(uploadTask.changeSets, result -> {
            try {
                this.releaseCapacity(size);
            }
            finally {
                uploadTask.complete((List<UploadResult>)result);
            }
        }, (result, error) -> {
            try {
                this.releaseCapacity(size);
            }
            finally {
                uploadTask.fail((Throwable)error);
            }
        });
    }

    @Override
    public AvailabilityProvider getAvailabilityProvider() {
        return this.availabilityHelper;
    }

    private RetryingExecutor.RetriableAction<StateChangeUploader.UploadTasksResult> asRetriableAction(final Collection<StateChangeUploadScheduler.UploadTask> tasks) {
        return new RetryingExecutor.RetriableAction<StateChangeUploader.UploadTasksResult>(){

            @Override
            public StateChangeUploader.UploadTasksResult tryExecute() throws Exception {
                return BatchingStateChangeUploadScheduler.this.delegate.upload(tasks);
            }

            @Override
            public void completeWithResult(StateChangeUploader.UploadTasksResult uploadTasksResult) {
                uploadTasksResult.complete();
            }

            @Override
            public void discardResult(StateChangeUploader.UploadTasksResult uploadTasksResult) throws Exception {
                uploadTasksResult.discard();
            }

            @Override
            public void handleFailure(Throwable throwable) {
                tasks.forEach(task -> task.fail(throwable));
            }
        };
    }
}

