/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

class DataBufferPublisherAdapter {
    DataBufferPublisherAdapter() {
    }

    static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
        State state = new State(inputStream, dataBufferFactory);
        return Flux.usingWhen((Publisher)Mono.just((Object)inputStream), it -> Flux.create(sink -> {
            sink.onDispose(state::close);
            sink.onCancel(state::close);
            sink.onRequest(n -> state.request((FluxSink<DataBuffer>)sink, n));
        }), AsyncInputStream::close, AsyncInputStream::close, AsyncInputStream::close).concatMap(Flux::just, 1);
    }

    static class State {
        private static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand");
        private static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state");
        private static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read");
        private static final int STATE_OPEN = 0;
        private static final int STATE_CLOSED = 1;
        private static final int READ_NONE = 0;
        private static final int READ_IN_PROGRESS = 1;
        final AsyncInputStream inputStream;
        final DataBufferFactory dataBufferFactory;
        volatile long demand;
        volatile int state = 0;
        volatile int read = 0;

        void request(FluxSink<DataBuffer> sink, long n) {
            Operators.addCap(DEMAND, (Object)this, (long)n);
            if (this.onShouldRead()) {
                this.emitNext(sink);
            }
        }

        boolean onShouldRead() {
            return !this.isClosed() && this.getDemand() > 0L && this.onWantRead();
        }

        boolean onWantRead() {
            return READ.compareAndSet(this, 0, 1);
        }

        boolean onReadDone() {
            return READ.compareAndSet(this, 1, 0);
        }

        long getDemand() {
            return DEMAND.get(this);
        }

        boolean decrementDemand() {
            return DEMAND.decrementAndGet(this) > 0L;
        }

        void close() {
            STATE.compareAndSet(this, 0, 1);
        }

        boolean isClosed() {
            return STATE.get(this) == 1;
        }

        void emitNext(FluxSink<DataBuffer> sink) {
            DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer();
            ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
            try {
                Mono.from((Publisher)this.inputStream.read(intermediate)).subscribe((CoreSubscriber)new BufferCoreSubscriber(sink, dataBuffer, intermediate));
            }
            catch (Exception e) {
                sink.error((Throwable)e);
            }
        }

        public State(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
            this.inputStream = inputStream;
            this.dataBufferFactory = dataBufferFactory;
        }

        private class BufferCoreSubscriber
        implements CoreSubscriber<Integer> {
            private final FluxSink<DataBuffer> sink;
            private final DataBuffer dataBuffer;
            private final ByteBuffer intermediate;

            BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
                this.sink = sink;
                this.dataBuffer = dataBuffer;
                this.intermediate = intermediate;
            }

            public Context currentContext() {
                return this.sink.currentContext();
            }

            public void onSubscribe(Subscription s) {
                s.request(1L);
            }

            public void onNext(Integer bytes) {
                if (State.this.isClosed()) {
                    State.this.onReadDone();
                    DataBufferUtils.release((DataBuffer)this.dataBuffer);
                    Operators.onNextDropped((Object)this.dataBuffer, (Context)this.sink.currentContext());
                    return;
                }
                this.intermediate.flip();
                this.dataBuffer.write(new ByteBuffer[]{this.intermediate});
                this.sink.next((Object)this.dataBuffer);
                State.this.decrementDemand();
                try {
                    if (bytes == -1) {
                        this.sink.complete();
                    }
                }
                finally {
                    State.this.onReadDone();
                }
            }

            public void onError(Throwable t) {
                if (State.this.isClosed()) {
                    Operators.onErrorDropped((Throwable)t, (Context)this.sink.currentContext());
                    return;
                }
                State.this.onReadDone();
                DataBufferUtils.release((DataBuffer)this.dataBuffer);
                Operators.onNextDropped((Object)this.dataBuffer, (Context)this.sink.currentContext());
                this.sink.error(t);
            }

            public void onComplete() {
                if (State.this.onShouldRead()) {
                    State.this.emitNext(this.sink);
                }
            }
        }
    }
}

