/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ClosedClientTransportException;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionTask;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdOpenConnectionsHandler;
import com.azure.cosmos.implementation.guava27.Strings;
import com.azure.cosmos.models.CosmosContainerIdentity;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;

public final class ProactiveOpenConnectionsProcessor
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ProactiveOpenConnectionsProcessor.class);
    private Sinks.Many<OpenConnectionTask> openConnectionsTaskSink;
    private final ConcurrentHashMap<String, List<OpenConnectionTask>> endpointsUnderMonitorMap;
    private final ReentrantReadWriteLock.WriteLock endpointsUnderMonitorMapWriteLock;
    private final ReentrantReadWriteLock.ReadLock endpointsUnderMonitorMapReadLock;
    private final Set<String> containersUnderOpenConnectionAndInitCaches;
    private final Map<String, Set<String>> collectionRidsAndUrisUnderOpenConnectionAndInitCaches;
    private final Set<String> addressUrisUnderOpenConnectionsAndInitCaches;
    private final Object containersUnderOpenConnectionAndInitCachesLock;
    private final AtomicReference<ConnectionOpenFlowAggressivenessHint> aggressivenessHint;
    private final AtomicReference<Boolean> isClosed = new AtomicReference<Boolean>(false);
    private static final Map<ConnectionOpenFlowAggressivenessHint, ConcurrencyConfiguration> concurrencySettings = new HashMap<ConnectionOpenFlowAggressivenessHint, ConcurrencyConfiguration>();
    private final IOpenConnectionsHandler openConnectionsHandler;
    private final RntbdEndpoint.Provider endpointProvider;
    private final AddressSelector addressSelector;
    private Disposable openConnectionBackgroundTask;
    private final Sinks.EmitFailureHandler serializedEmitFailureHandler;
    private static final int OPEN_CONNECTION_SINK_BUFFER_SIZE = 100000;

    public ProactiveOpenConnectionsProcessor(RntbdEndpoint.Provider endpointProvider, AddressSelector addressSelector) {
        this.aggressivenessHint = new AtomicReference<ConnectionOpenFlowAggressivenessHint>(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
        this.endpointsUnderMonitorMap = new ConcurrentHashMap();
        ReentrantReadWriteLock throughputReadWriteLock = new ReentrantReadWriteLock();
        this.endpointsUnderMonitorMapWriteLock = throughputReadWriteLock.writeLock();
        this.endpointsUnderMonitorMapReadLock = throughputReadWriteLock.readLock();
        this.openConnectionsHandler = new RntbdOpenConnectionsHandler(endpointProvider);
        this.endpointProvider = endpointProvider;
        this.addressSelector = addressSelector;
        this.serializedEmitFailureHandler = new SerializedEmitFailureHandler();
        this.containersUnderOpenConnectionAndInitCaches = ConcurrentHashMap.newKeySet();
        this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches = new ConcurrentHashMap<String, Set<String>>();
        this.addressUrisUnderOpenConnectionsAndInitCaches = ConcurrentHashMap.newKeySet();
        this.containersUnderOpenConnectionAndInitCachesLock = new Object();
        concurrencySettings.put(ConnectionOpenFlowAggressivenessHint.AGGRESSIVE, new ConcurrencyConfiguration(Configs.getAggressiveWarmupConcurrency(), Configs.getAggressiveWarmupConcurrency()));
        concurrencySettings.put(ConnectionOpenFlowAggressivenessHint.DEFENSIVE, new ConcurrencyConfiguration(Configs.getOpenConnectionsConcurrency(), Configs.getOpenConnectionsConcurrency()));
    }

    public void init() {
        this.openConnectionBackgroundTask = this.getBackgroundOpenConnectionsPublisher();
    }

    public OpenConnectionTask submitOpenConnectionTaskOutsideLoop(String collectionRid, URI serviceEndpoint, Uri addressUri, int minConnectionsRequiredForEndpoint) {
        OpenConnectionTask openConnectionTask = new OpenConnectionTask(collectionRid, serviceEndpoint, addressUri, minConnectionsRequiredForEndpoint);
        this.submitOpenConnectionTaskOutsideLoopInternal(openConnectionTask);
        return openConnectionTask;
    }

    private void submitOpenConnectionTaskOutsideLoopInternal(OpenConnectionTask openConnectionTask) {
        String addressUriAsString = openConnectionTask.getAddressUri().getURIAsString();
        if (this.endpointProvider.isClosed() || this.isClosed.get().booleanValue()) {
            openConnectionTask.completeExceptionally(new ClosedClientTransportException(Strings.lenientFormat("%s is closed", this), null));
            return;
        }
        this.endpointsUnderMonitorMapReadLock.lock();
        try {
            this.endpointsUnderMonitorMap.compute(addressUriAsString, (key, taskList) -> {
                if (taskList == null) {
                    taskList = new ArrayList<OpenConnectionTask>();
                }
                taskList.add(openConnectionTask);
                if (taskList.size() == 1) {
                    this.submitOpenConnectionWithinLoopInternal(openConnectionTask);
                }
                return taskList;
            });
        }
        finally {
            this.endpointsUnderMonitorMapReadLock.unlock();
        }
        this.getOrCreateEndpoint(openConnectionTask);
    }

    private synchronized void submitOpenConnectionWithinLoopInternal(OpenConnectionTask openConnectionTask) {
        this.openConnectionsTaskSink.emitNext((Object)openConnectionTask, this.serializedEmitFailureHandler);
    }

    @Override
    public void close() throws IOException {
        if (this.isClosed.compareAndSet(false, true)) {
            logger.info("Shutting down ProactiveOpenConnectionsProcessor...");
            this.completeSink(this.openConnectionsTaskSink);
            this.endpointsUnderMonitorMap.forEach((addresses, taskList) -> {
                for (OpenConnectionTask openConnectionTask : taskList) {
                    openConnectionTask.completeExceptionally(new ClosedClientTransportException(Strings.lenientFormat("%s is closed", this), null));
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> containerIdentities) {
        Object object = this.containersUnderOpenConnectionAndInitCachesLock;
        synchronized (object) {
            for (CosmosContainerIdentity containerIdentity : containerIdentities) {
                this.containersUnderOpenConnectionAndInitCaches.remove(ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink(containerIdentity));
            }
            if (this.containersUnderOpenConnectionAndInitCaches.isEmpty()) {
                this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
                this.reInstantiateOpenConnectionsPublisherAndSubscribe(true);
            } else {
                logger.debug("Cannot switch to defensive mode as some of the containers are still under openConnectionAndInitCaches flow: [{}]", this.containersUnderOpenConnectionAndInitCaches);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
        Object object = this.containersUnderOpenConnectionAndInitCachesLock;
        synchronized (object) {
            boolean shouldReInstantiatePublisher = this.containersUnderOpenConnectionAndInitCaches.size() == 0;
            for (CosmosContainerIdentity containerIdentity : cosmosContainerIdentities) {
                this.containersUnderOpenConnectionAndInitCaches.add(ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink(containerIdentity));
            }
            if (shouldReInstantiatePublisher) {
                this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.AGGRESSIVE);
                this.reInstantiateOpenConnectionsPublisherAndSubscribe(false);
            }
        }
    }

    public void recordCollectionRidsAndUrisUnderOpenConnectionsAndInitCaches(String collectionRid, List<String> addressUrisAsString) {
        this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches.compute(collectionRid, (ignore, urisAsString) -> {
            if (urisAsString == null) {
                urisAsString = new HashSet(addressUrisAsString);
            } else {
                urisAsString.addAll(addressUrisAsString);
            }
            this.addressUrisUnderOpenConnectionsAndInitCaches.addAll(addressUrisAsString);
            return urisAsString;
        });
    }

    public boolean isAddressUriUnderOpenConnectionsFlow(String addressUriAsString) {
        return this.addressUrisUnderOpenConnectionsAndInitCaches.contains(addressUriAsString);
    }

    public boolean isCollectionRidUnderOpenConnectionsFlow(String collectionRid) {
        return this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches.containsKey(collectionRid);
    }

    private Disposable getBackgroundOpenConnectionsPublisher() {
        ConcurrencyConfiguration concurrencyConfiguration = concurrencySettings.get((Object)this.aggressivenessHint.get());
        ConcurrentHashMap<String, List<OpenConnectionTask>> mapSnapshot = new ConcurrentHashMap<String, List<OpenConnectionTask>>();
        this.endpointsUnderMonitorMapWriteLock.lock();
        try {
            this.instantiateOpenConnectionsPublisher();
            mapSnapshot.putAll(this.endpointsUnderMonitorMap);
        }
        finally {
            this.endpointsUnderMonitorMapWriteLock.unlock();
        }
        Flux initialFlux = Flux.fromIterable((Iterable)mapSnapshot.keySet().stream().map(endpoint -> (OpenConnectionTask)((List)mapSnapshot.get(endpoint)).get(0)).collect(Collectors.toList()));
        return Flux.from((Publisher)this.openConnectionsTaskSink.asFlux()).mergeWith((Publisher)initialFlux).publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).onErrorResume(throwable -> {
            logger.warn("An error occurred with proactiveOpenConnectionsProcessor, re-initializing open connections sink", throwable);
            this.reInstantiateOpenConnectionsPublisherAndSubscribe(false);
            return Mono.empty();
        }).parallel(concurrencyConfiguration.openConnectionTaskEmissionConcurrency).runOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).flatMap(openConnectionTask -> {
            RntbdEndpoint endpoint = this.getOrCreateEndpoint((OpenConnectionTask)openConnectionTask);
            return Flux.zip((Publisher)Mono.just((Object)openConnectionTask), this.openConnectionsHandler.openConnections(openConnectionTask.getCollectionRid(), Arrays.asList(endpoint), openConnectionTask.getMinConnectionsRequiredForEndpoint())).onErrorResume(throwable -> {
                logger.warn("An error occurred in proactiveOpenConnectionsProcessor", throwable);
                return Flux.empty();
            });
        }, true, concurrencyConfiguration.openConnectionExecutionConcurrency).flatMap(openConnectionTaskToResponse -> {
            OpenConnectionTask openConnectionTask = (OpenConnectionTask)openConnectionTaskToResponse.getT1();
            OpenConnectionResponse openConnectionResponse = (OpenConnectionResponse)openConnectionTaskToResponse.getT2();
            if (openConnectionResponse.isConnected() && openConnectionResponse.isOpenConnectionAttempted()) {
                this.submitOpenConnectionWithinLoopInternal(openConnectionTask);
                return Mono.just((Object)openConnectionResponse);
            }
            if (openConnectionResponse.isConnected() && !openConnectionResponse.isOpenConnectionAttempted()) {
                this.removeEndpointFromMonitor(openConnectionTask.getAddressUri().toString(), openConnectionResponse);
                return Mono.just((Object)openConnectionResponse);
            }
            return openConnectionTask.getRetryPolicy().shouldRetry((Exception)openConnectionResponse.getException()).flatMap(shouldRetryResult -> {
                if (shouldRetryResult.shouldRetry) {
                    return this.enqueueOpenConnectionTaskForRetry(openConnectionTask, (ShouldRetryResult)shouldRetryResult).onErrorResume(throwable -> {
                        logger.warn("An error occurred in proactiveOpenConnectionsProcessor", throwable);
                        return Mono.empty();
                    });
                }
                this.removeEndpointFromMonitor(openConnectionTask.getAddressUri().toString(), openConnectionResponse);
                return Mono.just((Object)openConnectionResponse);
            });
        }, true).subscribe();
    }

    private RntbdEndpoint getOrCreateEndpoint(OpenConnectionTask openConnectionTask) {
        RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(openConnectionTask.getServiceEndpoint(), openConnectionTask.getAddressUri(), this, openConnectionTask.getMinConnectionsRequiredForEndpoint(), this.addressSelector);
        endpoint.setMinChannelsRequired(Math.max(openConnectionTask.getMinConnectionsRequiredForEndpoint(), endpoint.getMinChannelsRequired()));
        return endpoint;
    }

    private void removeEndpointFromMonitor(String addressUriString, OpenConnectionResponse openConnectionResponse) {
        List<OpenConnectionTask> openConnectionTasks = this.endpointsUnderMonitorMap.remove(addressUriString);
        logger.debug("Open connections completed for endpoint : {}, no. of connections opened : {}", (Object)addressUriString, (Object)openConnectionResponse.getOpenConnectionCountToEndpoint());
        if (openConnectionTasks != null && !openConnectionTasks.isEmpty()) {
            for (OpenConnectionTask connectionTask : openConnectionTasks) {
                connectionTask.complete(openConnectionResponse);
            }
        }
    }

    private synchronized void reInstantiateOpenConnectionsPublisherAndSubscribe(boolean shouldForceDefensiveOpenConnections) {
        if (shouldForceDefensiveOpenConnections) {
            logger.debug("Force defensive opening of connections");
            this.forceDefensiveOpenConnections();
        }
        if (this.openConnectionBackgroundTask != null) {
            this.openConnectionBackgroundTask.dispose();
        }
        this.openConnectionBackgroundTask = this.getBackgroundOpenConnectionsPublisher();
    }

    private Mono<OpenConnectionResponse> enqueueOpenConnectionTaskForRetry(OpenConnectionTask openConnectionTask, ShouldRetryResult retryResult) {
        if (retryResult.backOffTime == Duration.ZERO || retryResult.backOffTime == null) {
            this.submitOpenConnectionWithinLoopInternal(openConnectionTask);
            return Mono.empty();
        }
        return Mono.delay((Duration)retryResult.backOffTime).flatMap(ignore -> {
            this.submitOpenConnectionWithinLoopInternal(openConnectionTask);
            return Mono.empty();
        });
    }

    private void instantiateOpenConnectionsPublisher() {
        logger.debug("Re-instantiate open connections task sink");
        this.openConnectionsTaskSink = Sinks.many().multicast().onBackpressureBuffer(100000);
    }

    private void forceDefensiveOpenConnections() {
        if (this.aggressivenessHint.get() == ConnectionOpenFlowAggressivenessHint.AGGRESSIVE) {
            this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
        }
    }

    private void completeSink(Sinks.Many<OpenConnectionTask> sink) {
        Sinks.EmitResult completeEmitResult = sink.tryEmitComplete();
        if (completeEmitResult == Sinks.EmitResult.OK) {
            logger.debug("Sink completed.");
        } else if (completeEmitResult == Sinks.EmitResult.FAIL_CANCELLED || completeEmitResult == Sinks.EmitResult.FAIL_TERMINATED) {
            logger.debug("Sink already completed, EmitResult: {}", (Object)completeEmitResult);
        } else {
            logger.warn("Sink completion failed, EmitResult: {}", (Object)completeEmitResult);
        }
    }

    private static enum ConnectionOpenFlowAggressivenessHint {
        AGGRESSIVE,
        DEFENSIVE;

    }

    private static class SerializedEmitFailureHandler
    implements Sinks.EmitFailureHandler {
        private SerializedEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            if (emitResult.equals((Object)Sinks.EmitResult.FAIL_NON_SERIALIZED) || emitResult.equals((Object)Sinks.EmitResult.FAIL_OVERFLOW)) {
                logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
                return true;
            }
            logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
            return false;
        }
    }

    private static class ConcurrencyConfiguration {
        final int openConnectionTaskEmissionConcurrency;
        final int openConnectionExecutionConcurrency;

        public ConcurrencyConfiguration(int openConnectionTaskEmissionConcurrency, int openConnectionExecutionConcurrency) {
            this.openConnectionTaskEmissionConcurrency = openConnectionTaskEmissionConcurrency;
            this.openConnectionExecutionConcurrency = openConnectionExecutionConcurrency;
        }
    }
}

