/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataAggregator;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

class EventHubBufferedPartitionProducer
implements Closeable {
    private static final ClientLogger LOGGER = new ClientLogger(EventHubBufferedPartitionProducer.class);
    private final AmqpRetryOptions retryOptions;
    private final EventHubProducerAsyncClient client;
    private final String partitionId;
    private final AmqpErrorContext errorContext;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Disposable publishSubscription;
    private final Sinks.Many<EventData> eventSink;
    private final CreateBatchOptions createBatchOptions;
    private final Queue<EventData> eventQueue;
    private final AtomicBoolean isFlushing = new AtomicBoolean(false);
    private final Semaphore flushSemaphore = new Semaphore(1);
    private final PublishResultSubscriber publishResultSubscriber;
    private final EventHubsTracer tracer;

    EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options, AmqpRetryOptions retryOptions, Sinks.Many<EventData> eventSink, Queue<EventData> eventQueue, Tracer tracer) {
        this.client = client;
        this.partitionId = partitionId;
        this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace());
        this.createBatchOptions = new CreateBatchOptions().setPartitionId(partitionId);
        this.retryOptions = retryOptions;
        this.eventSink = eventSink;
        this.eventQueue = eventQueue;
        EventDataAggregator eventDataBatchFlux = new EventDataAggregator((Flux<? extends EventData>)eventSink.asFlux(), this::createNewBatch, client.getFullyQualifiedNamespace(), options, partitionId);
        this.publishResultSubscriber = new PublishResultSubscriber(partitionId, options.getSendSucceededContext(), options.getSendFailedContext(), eventQueue, this.flushSemaphore, this.isFlushing, retryOptions.getTryTimeout(), LOGGER);
        this.publishSubscription = (Disposable)this.publishEvents((Flux<EventDataBatch>)eventDataBatchFlux).publishOn(Schedulers.boundedElastic(), 1).subscribeWith((Subscriber)this.publishResultSubscriber);
        this.tracer = new EventHubsTracer(tracer, client.getFullyQualifiedNamespace(), client.getEventHubName());
    }

    Mono<Void> enqueueEvent(EventData eventData) {
        Mono enqueueOperation = Mono.create(sink -> {
            if (this.isClosed.get()) {
                sink.error((Throwable)new IllegalStateException(String.format("Partition publisher id[%s] is already closed. Cannot enqueue more events.", this.partitionId)));
                return;
            }
            try {
                if (this.isFlushing.get() && !this.flushSemaphore.tryAcquire(this.retryOptions.getTryTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                    sink.error((Throwable)new TimeoutException("Timed out waiting for flush operation to complete."));
                    return;
                }
            }
            catch (InterruptedException e) {
                sink.error((Throwable)new TimeoutException("Unable to acquire flush semaphore due to interrupted exception."));
                return;
            }
            if (this.isClosed.get()) {
                sink.error((Throwable)new IllegalStateException(String.format("Partition publisher id[%s] was closed between flushing events and now. Cannot enqueue events.", this.partitionId)));
                return;
            }
            this.tracer.reportMessageSpan(eventData, eventData.getContext());
            Sinks.EmitResult emitResult = this.eventSink.tryEmitNext((Object)eventData);
            if (emitResult.isSuccess()) {
                sink.success();
                return;
            }
            if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED || emitResult == Sinks.EmitResult.FAIL_OVERFLOW) {
                LOGGER.atInfo().addKeyValue("partitionId", this.partitionId).addKeyValue("emitResult", (Object)emitResult).log("Event could not be published downstream. Applying retry.");
                sink.error((Throwable)new AmqpException(true, emitResult + " occurred.", this.errorContext));
            } else {
                LOGGER.atWarning().addKeyValue("emitResult", (Object)emitResult).log("Event could not be published downstream. Not retrying.", new Object[]{emitResult});
                sink.error((Throwable)new AmqpException(false, "Unable to buffer message for partition: " + this.getPartitionId(), this.errorContext));
            }
        });
        return RetryUtil.withRetry((Mono)enqueueOperation, (AmqpRetryOptions)this.retryOptions, (String)"Timed out trying to enqueue event data.", (boolean)true).onErrorMap(IllegalStateException.class, error -> new AmqpException(true, "Retries exhausted.", (Throwable)error, this.errorContext));
    }

    String getPartitionId() {
        return this.partitionId;
    }

    int getBufferedEventCount() {
        return this.eventQueue.size();
    }

    Mono<Void> flush() {
        return this.publishResultSubscriber.startFlush();
    }

    @Override
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            this.publishResultSubscriber.startFlush().block(this.retryOptions.getTryTimeout());
        }
        catch (IllegalStateException e) {
            LOGGER.info("Timed out waiting for flush to complete.", new Object[]{e});
        }
        finally {
            this.publishSubscription.dispose();
            this.client.close();
        }
    }

    private Flux<PublishResult> publishEvents(Flux<EventDataBatch> upstream) {
        return upstream.flatMap(batch -> this.client.send((EventDataBatch)batch).thenReturn((Object)new PublishResult((EventDataBatch)batch, null)).onErrorResume(error -> Mono.just((Object)new PublishResult((EventDataBatch)batch, (Throwable)error))), 1, 1);
    }

    private EventDataBatch createNewBatch() {
        Mono<EventDataBatch> batch = this.client.createBatch(this.createBatchOptions);
        try {
            return (EventDataBatch)batch.toFuture().get();
        }
        catch (InterruptedException e) {
            throw LOGGER.logExceptionAsError((RuntimeException)new UncheckedExecutionException(e));
        }
        catch (ExecutionException e) {
            throw LOGGER.logExceptionAsError((RuntimeException)new UncheckedExecutionException(e));
        }
    }

    private static class PublishResultSubscriber
    extends BaseSubscriber<PublishResult> {
        private final String partitionId;
        private final Consumer<SendBatchSucceededContext> onSucceed;
        private final Consumer<SendBatchFailedContext> onFailed;
        private final Queue<EventData> dataQueue;
        private final Duration operationTimeout;
        private final ClientLogger logger;
        private final AtomicBoolean isFlushing;
        private final Semaphore flushSemaphore;
        private MonoSink<Void> flushSink;

        PublishResultSubscriber(String partitionId, Consumer<SendBatchSucceededContext> onSucceed, Consumer<SendBatchFailedContext> onFailed, Queue<EventData> dataQueue, Semaphore flushSemaphore, AtomicBoolean flush, Duration operationTimeout, ClientLogger logger) {
            this.partitionId = partitionId;
            this.onSucceed = onSucceed;
            this.onFailed = onFailed;
            this.dataQueue = dataQueue;
            this.flushSemaphore = flushSemaphore;
            this.isFlushing = flush;
            this.operationTimeout = operationTimeout;
            this.logger = logger;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.requestUnbounded();
        }

        protected void hookOnNext(PublishResult result) {
            if (result.error == null) {
                this.onSucceed.accept(new SendBatchSucceededContext(result.batch.getEvents(), this.partitionId));
            } else {
                this.onFailed.accept(new SendBatchFailedContext(result.batch.getEvents(), this.partitionId, result.error));
            }
            this.tryCompleteFlush();
        }

        protected void hookOnError(Throwable throwable) {
            this.logger.atError().addKeyValue("partitionId", this.partitionId).log("Publishing subscription completed and ended in an error.", new Object[]{throwable});
            this.onFailed.accept(new SendBatchFailedContext(null, this.partitionId, throwable));
            this.tryCompleteFlush();
        }

        protected void hookOnComplete() {
            this.logger.atInfo().addKeyValue("partitionId", this.partitionId).log("Publishing subscription completed. Clearing rest of queue.");
            ArrayList<EventData> events = new ArrayList<EventData>(this.dataQueue);
            this.dataQueue.clear();
            this.onFailed.accept(new SendBatchFailedContext(events, this.partitionId, null));
            this.tryCompleteFlush();
        }

        Mono<Void> startFlush() {
            return Mono.create(sink -> {
                if (!this.isFlushing.compareAndSet(false, true)) {
                    this.logger.atInfo().addKeyValue("partitionId", this.partitionId).log("Flush operation already in progress.");
                    sink.success();
                    return;
                }
                this.flushSink = sink;
                try {
                    if (!this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        sink.error((Throwable)new TimeoutException("Unable to acquire flush semaphore to begin timeout operation."));
                    }
                    this.tryCompleteFlush();
                }
                catch (InterruptedException e) {
                    this.logger.atWarning().addKeyValue("partitionId", this.partitionId).log("Unable to acquire flush semaphore.");
                    sink.error((Throwable)e);
                }
            });
        }

        private void tryCompleteFlush() {
            if (!this.isFlushing.get()) {
                return;
            }
            if (!this.dataQueue.isEmpty()) {
                this.logger.atVerbose().addKeyValue("partitionId", this.partitionId).log("Data queue is not empty. Not completing flush.");
                return;
            }
            this.logger.atVerbose().addKeyValue("partitionId", this.partitionId).log("Completing flush operation.");
            if (this.flushSemaphore != null) {
                this.flushSemaphore.release();
            }
            this.isFlushing.compareAndSet(true, false);
            this.flushSink.success();
        }
    }

    private static class PublishResult {
        private final EventDataBatch batch;
        private final Throwable error;

        PublishResult(EventDataBatch batch, Throwable error) {
            this.batch = batch;
            this.error = error;
        }
    }
}

