/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.streaming;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.streaming.CometDReplayExtension;
import org.apache.camel.support.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHelper
extends ServiceSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int CONNECT_TIMEOUT = 110;
    private static final int CHANNEL_TIMEOUT = 40;
    private static final String EXCEPTION_FIELD = "exception";
    private final SalesforceComponent component;
    private final SalesforceSession session;
    private final BayeuxClient client;
    private final long timeout = 60000L;
    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
    private ClientSessionChannel.MessageListener handshakeListener;
    private ClientSessionChannel.MessageListener connectListener;
    private String handshakeError;
    private Exception handshakeException;
    private String connectError;
    private boolean reconnecting;

    public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception {
        this.component = component;
        this.session = component.getSession();
        this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
        this.client = this.createClient(topicName);
    }

    protected void doStart() throws Exception {
        if (this.handshakeListener == null) {
            this.handshakeListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        Exception exception;
                        String error = (String)message.get((Object)"error");
                        if (error != null) {
                            SubscriptionHelper.this.handshakeError = error;
                        }
                        if ((exception = (Exception)message.get((Object)SubscriptionHelper.EXCEPTION_FIELD)) != null) {
                            SubscriptionHelper.this.handshakeException = exception;
                        }
                    } else if (!SubscriptionHelper.this.listenerMap.isEmpty()) {
                        SubscriptionHelper.this.reconnecting = true;
                    }
                }
            };
        }
        this.client.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        if (this.connectListener == null) {
            this.connectListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_CONNECT]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        String error = (String)message.get((Object)"error");
                        if (error != null) {
                            SubscriptionHelper.this.connectError = error;
                        }
                    } else if (SubscriptionHelper.this.reconnecting) {
                        SubscriptionHelper.this.reconnecting = false;
                        LOG.debug("Refreshing subscriptions to {} channels on reconnect", (Object)SubscriptionHelper.this.listenerMap.size());
                        HashMap map = new HashMap();
                        map.putAll(SubscriptionHelper.this.listenerMap);
                        SubscriptionHelper.this.listenerMap.clear();
                        for (Map.Entry entry : map.entrySet()) {
                            SalesforceConsumer consumer = (SalesforceConsumer)((Object)entry.getKey());
                            String topicName = consumer.getTopicName();
                            try {
                                SubscriptionHelper.this.subscribe(topicName, consumer);
                            }
                            catch (CamelException e) {
                                consumer.handleException(String.format("Error refreshing subscription to topic [%s]: %s", topicName, e.getMessage()), e);
                            }
                        }
                    }
                }
            };
        }
        this.client.getChannel("/meta/connect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        this.client.handshake();
        long waitMs = TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS);
        if (!this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            if (this.handshakeException != null) {
                throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), (Throwable)this.handshakeException);
            }
            if (this.handshakeError != null) {
                throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
            }
            if (this.connectError != null) {
                throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
            }
            throw new CamelException(String.format("Handshake request timeout after %s seconds", 110));
        }
    }

    protected void doStop() throws Exception {
        this.client.getChannel("/meta/connect").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        this.client.getChannel("/meta/handshake").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        boolean disconnected = this.client.disconnect(60000L);
        if (!disconnected) {
            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", (Object)this.getEndpointUrl(), (Object)60000L);
        }
    }

    private BayeuxClient createClient(String topicName) throws Exception {
        SalesforceHttpClient httpClient = this.component.getConfig().getHttpClient();
        HashMap<String, Long> options = new HashMap<String, Long>();
        options.put("maxNetworkDelay", httpClient.getTimeout());
        if (this.session.getAccessToken() == null) {
            this.session.login(null);
        }
        LongPollingTransport transport = new LongPollingTransport(options, httpClient){

            protected void customize(Request request) {
                super.customize(request);
                request.header(HttpHeader.AUTHORIZATION, "OAuth " + SubscriptionHelper.this.session.getAccessToken());
            }
        };
        BayeuxClient client = new BayeuxClient(this.getEndpointUrl(), (ClientTransport)transport, new ClientTransport[0]);
        Integer replayId = null;
        String channelName = this.getChannelName(topicName);
        Map<String, Integer> replayIdMap = this.component.getConfig().getInitialReplayIdMap();
        if (replayIdMap != null) {
            replayId = replayIdMap.get(channelName);
        }
        if (replayId == null) {
            replayId = this.component.getConfig().getDefaultReplayId();
        }
        if (replayId != null) {
            LOG.info("Sending replayId={} for channel {}", (Object)replayId, (Object)channelName);
            List extensions = client.getExtensions();
            ClientSession.Extension ext = null;
            Iterator iter = extensions.iterator();
            while (iter.hasNext()) {
                if (ext instanceof CometDReplayExtension) {
                    iter.remove();
                }
                ext = (ClientSession.Extension)iter.next();
            }
            HashMap<String, Integer> dataMap = new HashMap<String, Integer>();
            dataMap.put(channelName, replayId);
            CometDReplayExtension extension = new CometDReplayExtension(dataMap);
            client.addExtension(extension);
        }
        return client;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String topicName, final SalesforceConsumer consumer) throws CamelException {
        final String channelName = this.getChannelName(topicName);
        LOG.info("Subscribing to channel {}...", (Object)channelName);
        ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("Received Message: {}", (Object)message);
                consumer.processMessage(channel, message);
            }
        };
        ClientSessionChannel clientChannel = this.client.getChannel(channelName);
        final CountDownLatch latch = new CountDownLatch(1);
        final String[] subscribeError = new String[]{null};
        ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", (Object)message);
                String subscribedChannelName = message.get((Object)"subscription").toString();
                if (channelName.equals(subscribedChannelName)) {
                    if (!message.isSuccessful()) {
                        String error = (String)message.get((Object)"error");
                        if (error != null) {
                            subscribeError[0] = error;
                        }
                    } else {
                        LOG.info("Subscribed to channel {}", (Object)subscribedChannelName);
                    }
                    latch.countDown();
                }
            }
        };
        this.client.getChannel("/meta/subscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)subscriptionListener);
        try {
            clientChannel.subscribe(listener);
            try {
                if (!latch.await(40L, TimeUnit.SECONDS)) {
                    String message = subscribeError[0] != null ? String.format("Error subscribing to topic %s: %s", topicName, subscribeError[0]) : String.format("Timeout error subscribing to topic %s after %s seconds", topicName, 40);
                    throw new CamelException(message);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.listenerMap.put(consumer, listener);
        }
        finally {
            this.client.getChannel("/meta/subscribe").removeListener((ClientSessionChannel.ClientSessionChannelListener)subscriptionListener);
        }
    }

    private String getChannelName(String topicName) {
        return "/topic/" + topicName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
        block6: {
            final String channelName = this.getChannelName(topicName);
            final CountDownLatch latch = new CountDownLatch(1);
            final String[] unsubscribeError = new String[]{null};
            ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", (Object)message);
                    String unsubscribedChannelName = message.get((Object)"subscription").toString();
                    if (channelName.equals(unsubscribedChannelName)) {
                        if (!message.isSuccessful()) {
                            String error = (String)message.get((Object)"error");
                            if (error != null) {
                                unsubscribeError[0] = error;
                            }
                        } else {
                            LOG.info("Unsubscribed from channel {}", (Object)unsubscribedChannelName);
                        }
                        latch.countDown();
                    }
                }
            };
            this.client.getChannel("/meta/unsubscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)unsubscribeListener);
            try {
                ClientSessionChannel.MessageListener listener = this.listenerMap.remove((Object)consumer);
                if (listener == null) break block6;
                LOG.info("Unsubscribing from channel {}...", (Object)channelName);
                ClientSessionChannel clientChannel = this.client.getChannel(channelName);
                clientChannel.unsubscribe(listener);
                try {
                    if (!latch.await(40L, TimeUnit.SECONDS)) {
                        String message = unsubscribeError[0] != null ? String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeError[0]) : String.format("Timeout error unsubscribing from topic %s after %s seconds", topicName, 40);
                        throw new CamelException(message);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            finally {
                this.client.getChannel("/meta/unsubscribe").removeListener((ClientSessionChannel.ClientSessionChannelListener)unsubscribeListener);
            }
        }
    }

    public String getEndpointUrl() {
        if (Double.valueOf(this.component.getConfig().getApiVersion()) == 36.0) {
            boolean replayOptionsPresent;
            boolean bl = replayOptionsPresent = this.component.getConfig().getDefaultReplayId() != null || !this.component.getConfig().getInitialReplayIdMap().isEmpty();
            if (replayOptionsPresent) {
                return this.component.getSession().getInstanceUrl() + "/cometd/replay/" + this.component.getConfig().getApiVersion();
            }
        }
        return this.component.getSession().getInstanceUrl() + "/cometd/" + this.component.getConfig().getApiVersion();
    }
}

