/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventProcessorClientOptions;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.PartitionPump;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.implementation.ReactorShim;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

class PartitionPumpManager {
    private static final int MAXIMUM_QUEUE_SIZE = 10000;
    private static final ClientLogger LOGGER = new ClientLogger(PartitionPumpManager.class);
    private final int schedulerSize = Runtime.getRuntime().availableProcessors() * 4;
    private final CheckpointStore checkpointStore;
    private final Map<String, PartitionPump> partitionPumps = new ConcurrentHashMap<String, PartitionPump>();
    private final Supplier<PartitionProcessor> partitionProcessorFactory;
    private final EventHubClientBuilder eventHubClientBuilder;
    private final int prefetch;
    private final EventHubsTracer tracer;
    private final EventProcessorClientOptions options;

    PartitionPumpManager(CheckpointStore checkpointStore, Supplier<PartitionProcessor> partitionProcessorFactory, EventHubClientBuilder eventHubClientBuilder, EventHubsTracer tracer, EventProcessorClientOptions options) {
        this.checkpointStore = checkpointStore;
        this.partitionProcessorFactory = partitionProcessorFactory;
        this.eventHubClientBuilder = eventHubClientBuilder;
        this.options = options;
        this.prefetch = eventHubClientBuilder.getPrefetchCount() == null ? 500 : eventHubClientBuilder.getPrefetchCount();
        this.tracer = tracer;
    }

    void stopAllPartitionPumps() {
        this.partitionPumps.forEach((partitionId, eventHubConsumer) -> {
            try {
                eventHubConsumer.close();
            }
            catch (Exception ex) {
                LOGGER.atWarning().addKeyValue("partitionId", partitionId).log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, new Object[]{ex});
            }
            finally {
                this.partitionPumps.remove(partitionId);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyPartitionConnection(PartitionOwnership ownership) {
        String partitionId = ownership.getPartitionId();
        PartitionPump partitionPump = this.partitionPumps.get(partitionId);
        if (partitionPump == null) {
            LOGGER.atInfo().addKeyValue("partitionId", partitionId).addKeyValue("entity-path", ownership.getEventHubName()).log("No partition pump found for ownership record.");
            return;
        }
        EventHubConsumerAsyncClient consumerClient = partitionPump.getClient();
        if (consumerClient.isConnectionClosed()) {
            LOGGER.atInfo().addKeyValue("partitionId", partitionId).addKeyValue("entity-path", ownership.getEventHubName()).log("Connection closed for partition. Removing the consumer.");
            try {
                partitionPump.close();
            }
            catch (Exception ex) {
                LOGGER.atWarning().addKeyValue("partitionId", partitionId).log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, new Object[]{ex});
            }
            finally {
                this.partitionPumps.remove(partitionId);
            }
        }
    }

    void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) {
        String partitionId = claimedOwnership.getPartitionId();
        if (this.partitionPumps.containsKey(partitionId)) {
            LOGGER.atVerbose().addKeyValue("partitionId", partitionId).log("Consumer is already running.");
            return;
        }
        try {
            PartitionContext partitionContext = new PartitionContext(claimedOwnership.getFullyQualifiedNamespace(), claimedOwnership.getEventHubName(), claimedOwnership.getConsumerGroup(), partitionId);
            PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get();
            InitializationContext initializationContext = new InitializationContext(partitionContext);
            partitionProcessor.initialize(initializationContext);
            EventPosition startFromEventPosition = this.getInitialEventPosition(partitionId, checkpoint);
            LOGGER.atInfo().addKeyValue("partitionId", partitionId).addKeyValue("eventPosition", (Object)startFromEventPosition).log("Starting event processing.");
            ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L).setTrackLastEnqueuedEventProperties(this.options.isTrackLastEnqueuedEventProperties());
            Scheduler scheduler = Schedulers.newBoundedElastic((int)this.schedulerSize, (int)10000, (String)("partition-pump-" + partitionId));
            EventHubConsumerAsyncClient eventHubConsumer = this.eventHubClientBuilder.buildAsyncClient().createConsumer(claimedOwnership.getConsumerGroup(), this.prefetch, true);
            PartitionPump partitionPump = new PartitionPump(partitionId, eventHubConsumer, scheduler);
            this.partitionPumps.put(partitionId, partitionPump);
            Flux receiver = eventHubConsumer.receiveFromPartition(partitionId, startFromEventPosition, receiveOptions).doOnNext(partitionEvent -> {
                if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                    LOGGER.atVerbose().addKeyValue("partitionId", partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).addKeyValue("sequenceNumber", (Object)partitionEvent.getData().getSequenceNumber()).log("On next.");
                }
            });
            Flux partitionEventFlux = this.options.getMaxWaitTime() != null ? ReactorShim.windowTimeout(receiver, this.options.getMaxBatchSize(), this.options.getMaxWaitTime()) : receiver.window(this.options.getMaxBatchSize());
            int prefetchWindows = Math.max(this.prefetch / this.options.getMaxBatchSize(), 1);
            partitionEventFlux.concatMap(Flux::collectList, 0).publishOn(scheduler, false, prefetchWindows).subscribe(partitionEventBatch -> this.processEvents(partitionContext, partitionProcessor, partitionPump, (List<PartitionEvent>)partitionEventBatch), ex -> this.handleError(claimedOwnership, partitionPump, partitionProcessor, (Throwable)ex, partitionContext), () -> {
                try {
                    partitionProcessor.close(new CloseContext(partitionContext, CloseReason.EVENT_PROCESSOR_SHUTDOWN));
                }
                catch (Throwable e) {
                    LOGGER.atError().addKeyValue("partitionId", partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.close when closing partition pump.", new Object[]{e});
                }
                finally {
                    this.cleanup(claimedOwnership, partitionPump);
                }
            });
        }
        catch (Exception ex2) {
            if (this.partitionPumps.containsKey(partitionId)) {
                this.cleanup(claimedOwnership, this.partitionPumps.get(partitionId));
            }
            throw LOGGER.atError().addKeyValue("partitionId", partitionId).log((RuntimeException)((Object)new PartitionProcessorException("Error occurred while starting partition pump for partition " + partitionId, ex2)));
        }
    }

    private void processEvent(PartitionContext partitionContext, PartitionProcessor partitionProcessor, EventContext eventContext) {
        EventData eventData = eventContext.getEventData();
        try {
            if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                LOGGER.atVerbose().addKeyValue("partitionId", partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Processing event.");
            }
            partitionProcessor.processEvent(new EventContext(partitionContext, eventData, this.checkpointStore, eventContext.getLastEnqueuedEventProperties()));
            if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                LOGGER.atVerbose().addKeyValue("partitionId", partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Completed processing event.");
            }
        }
        catch (Throwable throwable) {
            throw LOGGER.logExceptionAsError((RuntimeException)((Object)new PartitionProcessorException("Error in event processing callback", throwable)));
        }
    }

    private void processEvents(PartitionContext partitionContext, PartitionProcessor partitionProcessor, PartitionPump partitionPump, List<PartitionEvent> partitionEventBatch) {
        Throwable exception = null;
        Context span = null;
        AutoCloseable scope = null;
        try {
            if (this.options.isBatchReceiveMode()) {
                LastEnqueuedEventProperties[] lastEnqueuedEventProperties = new LastEnqueuedEventProperties[1];
                List<EventData> eventDataList = partitionEventBatch.stream().map(partitionEvent -> {
                    lastEnqueuedEventProperties[0] = partitionEvent.getLastEnqueuedEventProperties();
                    return partitionEvent.getData();
                }).collect(Collectors.toList());
                LastEnqueuedEventProperties enqueuedEventProperties = this.updateOrGetLastEnqueuedEventProperties(partitionPump, lastEnqueuedEventProperties[0]);
                EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, eventDataList, this.checkpointStore, enqueuedEventProperties);
                span = this.tracer.startProcessSpan("EventHubs.process", eventDataList, Context.NONE);
                scope = this.tracer.makeSpanCurrent(span);
                if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                    LOGGER.atVerbose().addKeyValue("partitionId", partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Processing event batch.");
                }
                partitionProcessor.processEventBatch(eventBatchContext);
                if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                    LOGGER.atVerbose().addKeyValue("partitionId", partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Completed processing event batch.");
                }
            } else {
                EventData eventData = partitionEventBatch.size() == 1 ? partitionEventBatch.get(0).getData() : null;
                LastEnqueuedEventProperties lastEnqueuedEventProperties = partitionEventBatch.size() == 1 ? partitionEventBatch.get(0).getLastEnqueuedEventProperties() : null;
                LastEnqueuedEventProperties enqueuedEventProperties = this.updateOrGetLastEnqueuedEventProperties(partitionPump, lastEnqueuedEventProperties);
                EventContext eventContext = new EventContext(partitionContext, eventData, this.checkpointStore, enqueuedEventProperties);
                span = this.tracer.startProcessSpan("EventHubs.process", eventData, Context.NONE);
                scope = this.tracer.makeSpanCurrent(span);
                this.processEvent(partitionContext, partitionProcessor, eventContext);
            }
            this.tracer.endSpan(exception, span, scope);
        }
        catch (Throwable throwable) {
            try {
                exception = throwable;
                throw LOGGER.logExceptionAsError((RuntimeException)((Object)new PartitionProcessorException("Error in event processing callback", throwable)));
            }
            catch (Throwable throwable2) {
                this.tracer.endSpan(exception, span, scope);
                throw throwable2;
            }
        }
    }

    Map<String, PartitionPump> getPartitionPumps() {
        return this.partitionPumps;
    }

    EventPosition getInitialEventPosition(String partitionId, Checkpoint checkpoint) {
        EventPosition initialPosition;
        if (checkpoint != null && checkpoint.getOffset() != null) {
            return EventPosition.fromOffset(checkpoint.getOffset());
        }
        if (checkpoint != null && checkpoint.getSequenceNumber() != null) {
            return EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber());
        }
        if (this.options.getInitialEventPositionProvider() != null && (initialPosition = this.options.getInitialEventPositionProvider().apply(partitionId)) != null) {
            return initialPosition;
        }
        return EventPosition.latest();
    }

    private void handleError(PartitionOwnership claimedOwnership, PartitionPump partitionPump, PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) {
        boolean shouldRethrow = true;
        if (!(throwable instanceof PartitionProcessorException)) {
            shouldRethrow = false;
            LOGGER.atWarning().addKeyValue("partitionId", partitionContext.getPartitionId()).log("Error receiving events from partition.", new Object[]{throwable});
            try {
                partitionProcessor.processError(new ErrorContext(partitionContext, throwable));
            }
            catch (Throwable e) {
                LOGGER.atError().addKeyValue("partitionId", partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.processError.", new Object[]{e});
            }
        }
        CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
        try {
            partitionProcessor.close(new CloseContext(partitionContext, closeReason));
        }
        catch (Throwable e) {
            LOGGER.atError().addKeyValue("partitionId", partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.close.", new Object[]{e});
        }
        this.cleanup(claimedOwnership, partitionPump);
        if (shouldRethrow) {
            PartitionProcessorException exception = (PartitionProcessorException)((Object)throwable);
            throw LOGGER.logExceptionAsError((RuntimeException)((Object)exception));
        }
    }

    private void cleanup(PartitionOwnership claimedOwnership, PartitionPump partitionPump) {
        try {
            LOGGER.atInfo().addKeyValue("partitionId", claimedOwnership.getPartitionId()).log("Closing consumer.");
            partitionPump.close();
        }
        finally {
            this.partitionPumps.remove(claimedOwnership.getPartitionId());
        }
    }

    private LastEnqueuedEventProperties updateOrGetLastEnqueuedEventProperties(PartitionPump partitionPump, LastEnqueuedEventProperties last) {
        if (last != null) {
            partitionPump.setLastEnqueuedEventProperties(last);
        }
        return partitionPump.getLastEnqueuedEventProperties();
    }
}

