/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;

class SpilledSubpartitionViewAsyncIO
implements ResultSubpartitionView {
    private static final int DEFAULT_READ_BATCH_SIZE = 2;
    private final Object lock = new Object();
    private final ResultSubpartition parent;
    private final BufferProvider bufferProvider;
    private final BufferProviderCallback bufferAvailabilityListener;
    private final int readBatchSize;
    private final AtomicInteger currentBatchSize = new AtomicInteger();
    private final BufferFileReader asyncFileReader;
    private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue();
    private NotificationListener registeredListener;
    private volatile IOException errorInIOThread;
    private volatile boolean isReleased;
    private volatile boolean hasReachedEndOfFile;
    private final long fileSize;

    SpilledSubpartitionViewAsyncIO(ResultSubpartition parent, BufferProvider bufferProvider, IOManager ioManager, FileIOChannel.ID channelId, long initialSeekPosition) throws IOException {
        this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, 2);
    }

    SpilledSubpartitionViewAsyncIO(ResultSubpartition parent, BufferProvider bufferProvider, IOManager ioManager, FileIOChannel.ID channelId, long initialSeekPosition, int readBatchSize) throws IOException {
        Preconditions.checkArgument((initialSeekPosition >= 0L ? 1 : 0) != 0, (Object)"Initial seek position is < 0.");
        Preconditions.checkArgument((readBatchSize >= 1 ? 1 : 0) != 0, (Object)"Batch read size < 1.");
        this.parent = (ResultSubpartition)Preconditions.checkNotNull((Object)parent);
        this.bufferProvider = (BufferProvider)Preconditions.checkNotNull((Object)bufferProvider);
        this.bufferAvailabilityListener = new BufferProviderCallback(this);
        this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
        if (initialSeekPosition > 0L) {
            this.asyncFileReader.seekToPosition(initialSeekPosition);
        }
        this.readBatchSize = readBatchSize;
        this.fileSize = this.asyncFileReader.getSize();
        this.readNextBatchAsync();
    }

    @Override
    public Buffer getNextBuffer() throws IOException {
        this.checkError();
        Buffer buffer = this.returnedBuffers.poll();
        if (buffer == null) {
            if (this.currentBatchSize.get() == 0) {
                this.readNextBatchAsync();
            }
        } else {
            this.currentBatchSize.decrementAndGet();
        }
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean registerListener(NotificationListener listener) throws IOException {
        Preconditions.checkNotNull((Object)listener);
        this.checkError();
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased || !this.returnedBuffers.isEmpty()) {
                return false;
            }
            if (this.registeredListener == null) {
                this.registeredListener = listener;
                return true;
            }
        }
        throw new IllegalStateException("Already registered listener.");
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        this.parent.onConsumedSubpartition();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseAllResources() throws IOException {
        try {
            Object object = this.lock;
            synchronized (object) {
                if (!this.isReleased) {
                    Buffer buffer;
                    while ((buffer = this.returnedBuffers.poll()) != null) {
                        buffer.recycle();
                    }
                    this.isReleased = true;
                }
            }
        }
        finally {
            this.asyncFileReader.close();
        }
    }

    @Override
    public boolean isReleased() {
        return this.parent.isReleased() || this.isReleased;
    }

    @Override
    public Throwable getFailureCause() {
        return this.parent.getFailureCause();
    }

    private void readNextBatchAsync() throws IOException {
        if (this.hasReachedEndOfFile || this.currentBatchSize.get() != 0) {
            return;
        }
        int i = 0;
        while (i < this.readBatchSize) {
            Buffer buffer = this.bufferProvider.requestBuffer();
            if (buffer == null) {
                this.currentBatchSize.incrementAndGet();
                if (this.bufferProvider.addListener(this.bufferAvailabilityListener)) {
                    ++i;
                    continue;
                }
                if (this.bufferProvider.isDestroyed()) {
                    this.currentBatchSize.decrementAndGet();
                    return;
                }
                this.currentBatchSize.decrementAndGet();
                continue;
            }
            this.currentBatchSize.incrementAndGet();
            this.asyncFileReader.readInto(buffer);
        }
    }

    private void onAvailableBuffer(Buffer buffer) {
        try {
            this.asyncFileReader.readInto(buffer);
        }
        catch (IOException e) {
            this.notifyError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void returnBufferFromIOThread(Buffer buffer) {
        NotificationListener listener;
        Object object = this.lock;
        synchronized (object) {
            if (this.hasReachedEndOfFile || this.isReleased) {
                buffer.recycle();
                return;
            }
            this.returnedBuffers.add(buffer);
            listener = this.registeredListener;
            this.registeredListener = null;
            if (this.asyncFileReader.hasReachedEndOfFile()) {
                this.hasReachedEndOfFile = true;
            }
        }
        if (listener != null) {
            listener.onNotification();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyError(IOException error) {
        NotificationListener listener;
        if (this.errorInIOThread == null) {
            this.errorInIOThread = error;
        }
        Object object = this.lock;
        synchronized (object) {
            listener = this.registeredListener;
            this.registeredListener = null;
        }
        if (listener != null) {
            listener.onNotification();
        }
    }

    private void checkError() throws IOException {
        if (this.errorInIOThread != null) {
            throw this.errorInIOThread;
        }
    }

    public String toString() {
        return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of ResultPartition %s", this.parent.index, this.fileSize, this.parent.parent.getPartitionId());
    }

    private static class BufferProviderCallback
    implements EventListener<Buffer> {
        private final SpilledSubpartitionViewAsyncIO subpartitionView;

        private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
            this.subpartitionView = subpartitionView;
        }

        @Override
        public void onEvent(Buffer buffer) {
            if (buffer == null) {
                return;
            }
            this.subpartitionView.onAvailableBuffer(buffer);
        }
    }

    private static class IOThreadCallback
    implements RequestDoneCallback<Buffer> {
        private final SpilledSubpartitionViewAsyncIO subpartitionView;

        public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
            this.subpartitionView = subpartitionView;
        }

        @Override
        public void requestSuccessful(Buffer buffer) {
            this.subpartitionView.returnBufferFromIOThread(buffer);
        }

        @Override
        public void requestFailed(Buffer buffer, IOException error) {
            buffer.recycle();
            this.subpartitionView.notifyError(error);
        }
    }
}

