/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.io.netty.chunk;

import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufUtil;
import com.couchbase.client.core.deps.io.netty.channel.ChannelConfig;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpResponse;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.io.netty.chunk.ChunkResponseParser;
import com.couchbase.client.core.json.stream.JsonStreamParser;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.msg.chunk.ChunkHeader;
import com.couchbase.client.core.msg.chunk.ChunkRow;
import com.couchbase.client.core.msg.chunk.ChunkTrailer;
import java.nio.ByteBuffer;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public abstract class BaseChunkResponseParser<H extends ChunkHeader, ROW extends ChunkRow, T extends ChunkTrailer>
implements ChunkResponseParser<H, ROW, T> {
    private JsonStreamParser parser;
    private CouchbaseException decodingFailure;
    private boolean headerComplete;
    private ChannelConfig channelConfig;
    private Sinks.One<T> trailer;
    private Flux<ROW> rows;
    private Sinks.Many<ROW> rowSink;
    private long demand = 0L;
    private final Object autoReadLock = new Object();
    private volatile RequestContext requestContext;
    private volatile HttpResponse responseHeader;

    protected abstract JsonStreamParser.Builder parserBuilder();

    @Override
    public final void cleanup() {
        if (this.parser != null) {
            this.parser.close();
        }
        this.parser = null;
        this.decodingFailure = null;
        this.headerComplete = false;
        this.doCleanup();
    }

    @Override
    public void updateRequestContext(RequestContext requestContext) {
        this.requestContext = requestContext;
    }

    protected RequestContext requestContext() {
        return this.requestContext;
    }

    @Override
    public void updateResponseHeader(HttpResponse responseHeader) {
        this.responseHeader = responseHeader;
    }

    protected HttpResponse responseHeader() {
        return this.responseHeader;
    }

    protected void markHeaderComplete() {
        this.headerComplete = true;
    }

    protected boolean isHeaderComplete() {
        return this.headerComplete;
    }

    protected abstract void doCleanup();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void feed(ByteBuf input) {
        try {
            if (this.decodingFailure != null) {
                return;
            }
            if (input.nioBufferCount() != -1) {
                for (ByteBuffer nioBuffer : input.nioBuffers()) {
                    this.parser.feed(nioBuffer);
                }
            } else {
                this.parser.feed(ByteBuffer.wrap(ByteBufUtil.getBytes(input)));
            }
        }
        catch (Exception e) {
            this.decodingFailure = new DecodingFailureException(e);
            this.failRows(this.decodingFailure);
            this.failTrailer(this.decodingFailure);
        }
        finally {
            input.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void initialize(ChannelConfig channelConfig) {
        this.cleanup();
        this.parser = this.parserBuilder().build();
        this.channelConfig = channelConfig;
        this.trailer = Sinks.one();
        Object object = this.autoReadLock;
        synchronized (object) {
            this.demand = 0L;
        }
        this.rowSink = Sinks.many().unicast().onBackpressureBuffer();
        this.rows = this.rowSink.asFlux().doOnRequest(v -> {
            Object object = this.autoReadLock;
            synchronized (object) {
                if (this.demand == Long.MAX_VALUE) {
                    return;
                }
                try {
                    this.demand = Math.addExact(this.demand, v);
                }
                catch (ArithmeticException e) {
                    this.demand = Long.MAX_VALUE;
                    channelConfig.setAutoRead(true);
                    return;
                }
                if (this.demand > 0L && !channelConfig.isAutoRead()) {
                    channelConfig.setAutoRead(true);
                }
            }
        }).doOnTerminate(this::readRemainingRowsWithoutBackpressure).doOnCancel(this::readRemainingRowsWithoutBackpressure).publish().refCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readRemainingRowsWithoutBackpressure() {
        Object object = this.autoReadLock;
        synchronized (object) {
            this.demand = Long.MAX_VALUE;
            this.channelConfig.setAutoRead(true);
        }
    }

    @Override
    public Flux<ROW> rows() {
        return this.rows;
    }

    @Override
    public Mono<T> trailer() {
        return this.trailer.asMono();
    }

    @Override
    public void endOfInput() {
        if (this.decodingFailure != null) {
            return;
        }
        try {
            this.parser.endOfInput();
        }
        catch (DecodingFailureException e) {
            this.decodingFailure = e;
            this.failRows(e);
            this.failTrailer(e);
            return;
        }
        this.signalComplete();
    }

    @Override
    public Optional<CouchbaseException> decodingFailure() {
        return Optional.ofNullable(this.decodingFailure);
    }

    protected abstract void signalComplete();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRow(ROW row) {
        this.rowSink.emitNext(row, Reactor.emitFailureHandler());
        Object object = this.autoReadLock;
        synchronized (object) {
            if (this.demand == Long.MAX_VALUE) {
                return;
            }
            --this.demand;
            if (this.demand <= 0L && this.channelConfig.isAutoRead()) {
                this.channelConfig.setAutoRead(false);
            }
        }
    }

    protected void failRows(Throwable t) {
        this.rowSink.emitError(t, Reactor.emitFailureHandler());
    }

    protected void completeRows() {
        this.rowSink.emitComplete(Reactor.emitFailureHandler());
    }

    private void failTrailer(Throwable t) {
        this.trailer.emitError(t, Reactor.emitFailureHandler());
    }

    protected void completeTrailer(T trailer) {
        this.trailer.emitValue(trailer, Reactor.emitFailureHandler());
    }
}

