/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpManagementNode;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpChannelProcessor;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpExceptionHandler;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.ClaimsBasedSecurityChannel;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.ManagementChannel;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.ReactorExecutor;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ReactorConnection
implements AmqpConnection {
    private static final String CBS_SESSION_NAME = "cbs-session";
    private static final String CBS_ADDRESS = "$cbs";
    private static final String CBS_LINK_NAME = "cbs";
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private final ClientLogger logger;
    private final ConcurrentMap<String, SessionSubscription> sessionMap = new ConcurrentHashMap<String, SessionSubscription>();
    private final ConcurrentHashMap<String, AmqpManagementNode> managementNodes = new ConcurrentHashMap();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.One<AmqpShutdownSignal> shutdownSignalSink = Sinks.one();
    private final Flux<AmqpEndpointState> endpointStates;
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final String connectionId;
    private final Mono<Connection> connectionMono;
    private final ConnectionHandler handler;
    private final ReactorHandlerProvider handlerProvider;
    private final AmqpLinkProvider linkProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final ConnectionOptions connectionOptions;
    private final ReactorProvider reactorProvider;
    private final AmqpRetryPolicy retryPolicy;
    private final SenderSettleMode senderSettleMode;
    private final ReceiverSettleMode receiverSettleMode;
    private final Duration operationTimeout;
    private final Disposable.Composite subscriptions;
    private ReactorExecutor reactorExecutor;
    private volatile ClaimsBasedSecurityChannel cbsChannel;
    private volatile AmqpChannelProcessor<RequestResponseChannel> cbsChannelProcessor;
    private volatile Connection connection;
    private final boolean isV2;

    public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, boolean isV2) {
        this.connectionOptions = connectionOptions;
        this.reactorProvider = reactorProvider;
        this.connectionId = connectionId;
        this.logger = new ClientLogger(ReactorConnection.class, AmqpLoggingUtils.createContextWithConnectionId(connectionId));
        this.handlerProvider = handlerProvider;
        this.linkProvider = linkProvider;
        this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.messageSerializer = messageSerializer;
        this.handler = handlerProvider.createConnectionHandler(connectionId, connectionOptions);
        this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry());
        this.operationTimeout = connectionOptions.getRetry().getTryTimeout();
        this.senderSettleMode = senderSettleMode;
        this.receiverSettleMode = receiverSettleMode;
        this.isV2 = isV2;
        this.connectionMono = Mono.fromCallable(this::getOrCreateConnection).flatMap(reactorConnection -> {
            Mono activeEndpoint = this.getEndpointStates().filter(state -> state == AmqpEndpointState.ACTIVE).next().timeout(this.operationTimeout, Mono.error(() -> {
                AmqpException exception = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, String.format("Connection '%s' not active within the timout: %s.", connectionId, this.operationTimeout), this.handler.getErrorContext());
                if (!isV2) {
                    this.handler.onError((Throwable)((Object)exception));
                }
                return exception;
            }));
            return activeEndpoint.thenReturn(reactorConnection);
        }).doOnError(error -> {
            if (this.isDisposed.getAndSet(true)) {
                this.logger.verbose("Connection was already disposed: Error occurred while connection was starting.", new Object[]{error});
            } else {
                this.closeAsync(new AmqpShutdownSignal(false, false, "Error occurred while connection was starting. Error: " + error)).subscribe();
            }
        });
        this.endpointStates = this.handler.getEndpointStates().takeUntilOther((Publisher)this.shutdownSignalSink.asMono()).map(state -> {
            this.logger.atVerbose().addKeyValue("state", state).log("getConnectionState");
            return AmqpEndpointStateUtil.getConnectionState(state);
        }).onErrorResume(error -> {
            if (!this.isDisposed.getAndSet(true)) {
                this.logger.verbose("Disposing of active sessions due to error.");
                return this.closeAsync(new AmqpShutdownSignal(false, false, error.getMessage())).then(Mono.error((Throwable)error));
            }
            return Mono.error((Throwable)error);
        }).doOnComplete(() -> {
            if (!this.isDisposed.getAndSet(true)) {
                this.logger.verbose("Disposing of active sessions due to connection close.");
                this.closeAsync(new AmqpShutdownSignal(false, false, "Connection handler closed.")).subscribe();
            }
        }).cache(1);
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.endpointStates.subscribe()});
    }

    public Mono<ReactorConnection> connectAndAwaitToActive() {
        return this.connectionMono.handle((c, sink) -> {
            if (this.isDisposed()) {
                sink.error((Throwable)((Object)new AmqpException(true, String.format("Connection '%s' completed without being active.", this.connectionId), null)));
            } else {
                sink.complete();
            }
        }).thenReturn((Object)this);
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override
    public Flux<AmqpShutdownSignal> getShutdownSignals() {
        return this.shutdownSignalSink.asMono().cache().flux();
    }

    @Override
    public Mono<AmqpManagementNode> getManagementNode(String entityPath) {
        return Mono.defer(() -> {
            if (this.isDisposed()) {
                return FluxUtil.monoError((LoggingEventBuilder)this.logger.atError().addKeyValue("entityPath", entityPath), (RuntimeException)Exceptions.propagate((Throwable)new IllegalStateException("Connection is disposed. Cannot get management instance.")));
            }
            AmqpManagementNode existing = this.managementNodes.get(entityPath);
            if (existing != null) {
                return Mono.just((Object)existing);
            }
            TokenManager tokenManager = new AzureTokenManagerProvider(this.connectionOptions.getAuthorizationType(), this.connectionOptions.getFullyQualifiedNamespace(), this.connectionOptions.getAuthorizationScope()).getTokenManager(this.getClaimsBasedSecurityNode(), entityPath);
            return tokenManager.authorize().thenReturn((Object)this.managementNodes.compute(entityPath, (key, current) -> {
                if (current != null) {
                    this.logger.info("A management node exists already, returning it.");
                    tokenManager.close();
                    return current;
                }
                String sessionName = entityPath + "-" + MANAGEMENT_SESSION_NAME;
                String linkName = entityPath + "-" + MANAGEMENT_LINK_NAME;
                String address = entityPath + "/" + MANAGEMENT_ADDRESS;
                this.logger.atInfo().addKeyValue("entityPath", entityPath).addKeyValue("linkName", linkName).addKeyValue("address", address).log("Creating management node.");
                AmqpChannelProcessor<RequestResponseChannel> requestResponseChannel = this.createRequestResponseChannel(sessionName, linkName, address);
                return new ManagementChannel(requestResponseChannel, this.getFullyQualifiedNamespace(), entityPath, tokenManager);
            }));
        });
    }

    @Override
    public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
        return this.connectionMono.then(Mono.fromCallable(() -> this.getOrCreateCBSNode()));
    }

    @Override
    public String getId() {
        return this.connectionId;
    }

    @Override
    public String getFullyQualifiedNamespace() {
        return this.handler.getHostname();
    }

    @Override
    public int getMaxFrameSize() {
        return this.handler.getMaxFrameSize();
    }

    @Override
    public Map<String, Object> getConnectionProperties() {
        return this.handler.getConnectionProperties();
    }

    @Override
    public Mono<AmqpSession> createSession(String sessionName) {
        return this.connectionMono.map(connection -> this.sessionMap.computeIfAbsent(sessionName, key -> {
            SessionHandler sessionHandler = this.handlerProvider.createSessionHandler(this.connectionId, this.getFullyQualifiedNamespace(), (String)key, this.connectionOptions.getRetry().getTryTimeout());
            Session session = connection.session();
            BaseHandler.setHandler((Extendable)session, (Handler)sessionHandler);
            AmqpSession amqpSession = this.createSession((String)key, session, sessionHandler);
            Disposable subscription = amqpSession.getEndpointStates().subscribe(state -> {}, error -> {
                if (this.isDisposed.get()) {
                    return;
                }
                this.logger.atInfo().addKeyValue("sessionName", sessionName).log("Error occurred. Removing and disposing session", new Object[]{error});
                this.removeSession((String)key);
            }, () -> {
                if (this.isDisposed.get()) {
                    return;
                }
                this.logger.atVerbose().addKeyValue("sessionName", sessionName).log("Complete. Removing and disposing session.");
                this.removeSession((String)key);
            });
            return new SessionSubscription(amqpSession, subscription);
        })).flatMap(sessionSubscription -> {
            Mono activeSession = ((SessionSubscription)sessionSubscription).getSession().getEndpointStates().filter(state -> state == AmqpEndpointState.ACTIVE).next().timeout(this.retryPolicy.getRetryOptions().getTryTimeout(), Mono.error(() -> new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, String.format("connectionId[%s] sessionName[%s] Timeout waiting for session to be active.", this.connectionId, sessionName), this.handler.getErrorContext()))).doOnError(error -> {
                if (!(error instanceof AmqpException)) {
                    return;
                }
                AmqpException amqpException = (AmqpException)((Object)((Object)((Object)error)));
                if (amqpException.getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                    SessionSubscription removed = (SessionSubscription)this.sessionMap.remove(sessionName);
                    removed.dispose();
                }
            });
            return activeSession.thenReturn((Object)((SessionSubscription)sessionSubscription).getSession());
        });
    }

    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new ReactorSession(this, session, handler, sessionName, this.reactorProvider, this.handlerProvider, this.linkProvider, this.getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.connectionOptions.getRetry());
    }

    @Override
    public boolean removeSession(String sessionName) {
        if (sessionName == null) {
            return false;
        }
        SessionSubscription removed = (SessionSubscription)this.sessionMap.remove(sessionName);
        if (removed != null) {
            removed.dispose();
        }
        return removed != null;
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        Duration timeout = this.operationTimeout.plus(this.operationTimeout);
        this.closeAsync().block(timeout);
    }

    protected Mono<Connection> getReactorConnection() {
        return this.connectionMono;
    }

    protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChannel(String sessionName, String linkName, String entityPath) {
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Flux createChannel = this.createSession(sessionName).cast(ReactorSession.class).map(reactorSession -> new RequestResponseChannel(this, this.getId(), this.getFullyQualifiedNamespace(), linkName, entityPath, reactorSession.session(), this.connectionOptions.getRetry(), this.handlerProvider, this.reactorProvider, this.messageSerializer, this.senderSettleMode, this.receiverSettleMode, this.handlerProvider.getMetricProvider(this.getFullyQualifiedNamespace(), entityPath), this.isV2)).doOnNext(e -> this.logger.atInfo().addKeyValue("entityPath", entityPath).addKeyValue("linkName", linkName).log("Emitting new response channel.")).repeat(() -> !this.isDisposed());
        Map<String, Object> loggingContext = AmqpLoggingUtils.createContextWithConnectionId(this.connectionId);
        loggingContext.put("entityPath", entityPath);
        return (AmqpChannelProcessor)createChannel.subscribeWith(new AmqpChannelProcessor<RequestResponseChannel>(this.getFullyQualifiedNamespace(), channel -> channel.getEndpointStates(), this.retryPolicy, loggingContext));
    }

    Mono<RequestResponseChannel> newRequestResponseChannel(String sessionName, String linksNamePrefix, String entityPath) {
        assert (this.isV2);
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        return this.createSession(sessionName).cast(ReactorSession.class).map(reactorSession -> new RequestResponseChannel(this, this.getId(), this.getFullyQualifiedNamespace(), linksNamePrefix, entityPath, reactorSession.session(), this.connectionOptions.getRetry(), this.handlerProvider, this.reactorProvider, this.messageSerializer, this.senderSettleMode, this.receiverSettleMode, this.handlerProvider.getMetricProvider(this.getFullyQualifiedNamespace(), entityPath), this.isV2));
    }

    @Override
    public Mono<Void> closeAsync() {
        if (this.isDisposed.getAndSet(true)) {
            this.logger.verbose("Connection was already closed. Not disposing again.");
            return this.isClosedMono.asMono();
        }
        return this.closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client."));
    }

    public Mono<Void> closeAsync(AmqpShutdownSignal shutdownSignal) {
        AmqpLoggingUtils.addShutdownSignal(this.logger.atInfo(), shutdownSignal).log("Disposing of ReactorConnection.");
        Sinks.EmitResult result = this.shutdownSignalSink.tryEmitValue((Object)shutdownSignal);
        if (result.isFailure()) {
            AmqpLoggingUtils.addShutdownSignal(this.logger.atInfo(), shutdownSignal).addKeyValue("emitResult", (Object)result).log("Unable to emit shutdown signal.");
        }
        Mono cbsCloseOperation = this.cbsChannelProcessor != null ? this.cbsChannelProcessor.flatMap(channel -> channel.closeAsync()) : Mono.empty();
        Mono managementNodeCloseOperations = Mono.when((Publisher[])new Publisher[]{Flux.fromStream(this.managementNodes.values().stream()).flatMap(node -> node.closeAsync())});
        Mono closeReactor = Mono.fromRunnable(() -> {
            this.logger.verbose("Scheduling closeConnection work.");
            ReactorDispatcher dispatcher = this.reactorProvider.getReactorDispatcher();
            if (dispatcher != null) {
                try {
                    dispatcher.invoke(() -> this.closeConnectionWork());
                }
                catch (IOException e) {
                    this.logger.warning("IOException while scheduling closeConnection work. Manually disposing.", new Object[]{e});
                    this.closeConnectionWork();
                }
                catch (RejectedExecutionException e) {
                    this.logger.info("Could not schedule closeConnection work. Manually disposing.");
                    this.closeConnectionWork();
                }
            } else {
                this.closeConnectionWork();
            }
        });
        return Mono.whenDelayError((Publisher[])new Publisher[]{cbsCloseOperation.doFinally(signalType -> this.logger.atVerbose().addKeyValue("signalType", signalType).log("Closed CBS node.")), managementNodeCloseOperations.doFinally(signalType -> this.logger.atVerbose().addKeyValue("signalType", signalType).log("Closed management nodes."))}).then(closeReactor.doFinally(signalType -> this.logger.atVerbose().addKeyValue("signalType", signalType).log("Closed reactor dispatcher."))).then(this.isClosedMono.asMono());
    }

    private synchronized void closeConnectionWork() {
        if (this.connection == null) {
            this.isClosedMono.emitEmpty((signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atInfo(), signalType, emitResult).log("Unable to complete closeMono.");
                return false;
            });
            return;
        }
        this.connection.close();
        this.handler.close();
        ArrayList closingSessions = new ArrayList();
        this.sessionMap.values().forEach(link -> closingSessions.add(((SessionSubscription)link).isClosed()));
        Mono closedExecutor = this.reactorExecutor != null ? Mono.defer(() -> {
            ReactorConnection reactorConnection = this;
            synchronized (reactorConnection) {
                this.logger.info("Closing executor.");
                return this.reactorExecutor.closeAsync();
            }
        }) : Mono.empty();
        Mono closeSessionAndExecutorMono = Mono.when(closingSessions).timeout(this.operationTimeout).onErrorResume(error -> {
            this.logger.info("Timed out waiting for all sessions to close.");
            return Mono.empty();
        }).then(closedExecutor).then(Mono.fromRunnable(() -> {
            this.isClosedMono.emitEmpty((signalType, result) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, result).log("Unable to emit connection closed signal.");
                return false;
            });
            this.subscriptions.dispose();
        }));
        this.subscriptions.add(closeSessionAndExecutorMono.subscribe());
    }

    private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
        if (this.cbsChannel == null) {
            this.logger.info("Setting CBS channel.");
            this.cbsChannelProcessor = this.createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, CBS_ADDRESS);
            this.cbsChannel = new ClaimsBasedSecurityChannel(this.cbsChannelProcessor, this.connectionOptions.getTokenCredential(), this.connectionOptions.getAuthorizationType(), this.connectionOptions.getRetry());
        }
        return this.cbsChannel;
    }

    private synchronized Connection getOrCreateConnection() throws IOException {
        if (this.connection == null) {
            this.logger.atInfo().addKeyValue("hostName", this.handler.getHostname()).addKeyValue("port", (long)this.handler.getProtocolPort()).log("Creating and starting connection.");
            Reactor reactor = this.reactorProvider.createReactor(this.connectionId, this.handler.getMaxFrameSize());
            this.connection = reactor.connectionToHost(this.handler.getHostname(), this.handler.getProtocolPort(), (Handler)this.handler);
            ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();
            this.reactorExecutor = this.reactorProvider.createExecutor(reactor, this.connectionId, this.connectionOptions.getFullyQualifiedNamespace(), reactorExceptionHandler, this.connectionOptions.getRetry());
            Mono executorCloseMono = Mono.defer(() -> {
                ReactorConnection reactorConnection = this;
                synchronized (reactorConnection) {
                    return this.reactorExecutor.closeAsync();
                }
            });
            this.reactorProvider.getReactorDispatcher().getShutdownSignal().flatMap(signal -> {
                reactorExceptionHandler.onConnectionShutdown((AmqpShutdownSignal)signal);
                return executorCloseMono;
            }).onErrorResume(error -> {
                reactorExceptionHandler.onConnectionError((Throwable)error);
                return executorCloseMono;
            }).subscribe();
            this.reactorExecutor.start();
        }
        return this.connection;
    }

    private static final class SessionSubscription {
        private final AtomicBoolean isDisposed = new AtomicBoolean();
        private final AmqpSession session;
        private final Disposable subscription;

        private SessionSubscription(AmqpSession session, Disposable subscription) {
            this.session = session;
            this.subscription = subscription;
        }

        private AmqpSession getSession() {
            return this.session;
        }

        private void dispose() {
            if (this.isDisposed.getAndSet(true)) {
                return;
            }
            if (this.session instanceof ReactorSession) {
                ((ReactorSession)this.session).closeAsync("Closing session.", null, true).subscribe();
            } else {
                this.session.dispose();
            }
            this.subscription.dispose();
        }

        private Mono<Void> isClosed() {
            if (this.session instanceof ReactorSession) {
                return ((ReactorSession)this.session).isClosed();
            }
            return Mono.empty();
        }
    }

    public final class ReactorExceptionHandler
    extends AmqpExceptionHandler {
        private ReactorExceptionHandler() {
        }

        @Override
        public void onConnectionError(Throwable exception) {
            ReactorConnection.this.logger.atInfo().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionError, Starting new reactor", new Object[]{exception});
            if (!ReactorConnection.this.isDisposed.getAndSet(true)) {
                ReactorConnection.this.logger.atVerbose().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onReactorError: Disposing.");
                ReactorConnection.this.closeAsync(new AmqpShutdownSignal(false, false, "onReactorError: " + exception.toString())).subscribe();
            }
        }

        @Override
        void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
            AmqpLoggingUtils.addShutdownSignal(ReactorConnection.this.logger.atInfo(), shutdownSignal).addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionShutdown. Shutting down.");
            if (!ReactorConnection.this.isDisposed.getAndSet(true)) {
                ReactorConnection.this.logger.atVerbose().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionShutdown: disposing.");
                ReactorConnection.this.closeAsync(shutdownSignal).subscribe();
            }
        }
    }
}

