/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication.async.transport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.ext.replication.ReplicationException;
import org.exoplatform.services.jcr.ext.replication.async.ConnectionListener;
import org.exoplatform.services.jcr.ext.replication.async.transport.AbstractPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.AsyncPacketListener;
import org.exoplatform.services.jcr.ext.replication.async.transport.AsyncStateEvent;
import org.exoplatform.services.jcr.ext.replication.async.transport.AsyncStateListener;
import org.exoplatform.services.jcr.ext.replication.async.transport.MemberAddress;
import org.exoplatform.services.jcr.ext.replication.async.transport.PacketTransformer;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AsyncChannelManager
implements RequestHandler,
MembershipListener {
    private static final Log LOG = ExoLogger.getLogger((String)"ext.AsyncChannelManager");
    private JChannel channel;
    private MessageDispatcher dispatcher;
    private final String channelConfig;
    private final String channelName;
    private final int confMembersCount;
    private List<AsyncPacketListener> packetListeners;
    private List<AsyncStateListener> stateListeners;
    private final List<ConnectionListener> connectionListeners;
    private final PacketHandler packetsHandler;

    public AsyncChannelManager(String channelConfig, String channelName, int confMembersCount) {
        this.channelConfig = channelConfig;
        this.channelName = channelName;
        this.confMembersCount = confMembersCount;
        this.packetListeners = new ArrayList<AsyncPacketListener>();
        this.stateListeners = new ArrayList<AsyncStateListener>();
        this.connectionListeners = new ArrayList<ConnectionListener>();
        this.packetsHandler = new PacketHandler();
        this.packetsHandler.start();
    }

    public boolean isConnected() {
        return this.channel != null;
    }

    public void connect() throws ReplicationException {
        try {
            if (this.channel == null) {
                this.channel = new JChannel(this.channelConfig);
                this.channel.setOpt(5, (Object)Boolean.TRUE);
                this.channel.setOpt(6, (Object)Boolean.TRUE);
                this.dispatcher = new MessageDispatcher((Channel)this.channel, null, null, null);
                this.dispatcher.setRequestHandler((RequestHandler)this);
                this.dispatcher.setMembershipListener((MembershipListener)this);
            }
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't create JGroups channel", e);
        }
        LOG.info((Object)("Channel name : " + this.channelName));
        try {
            this.channel.connect(this.channelName);
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't connect to JGroups channel", e);
        }
    }

    public void disconnect() {
        if (this.dispatcher != null) {
            this.dispatcher.setRequestHandler(null);
            this.dispatcher.setMembershipListener(null);
            this.dispatcher.stop();
            this.dispatcher = null;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"dispatcher stopped");
            }
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                LOG.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
        }
        if (this.channel != null) {
            this.channel.disconnect();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"channel disconnected");
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                LOG.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
            this.channel.close();
            this.channel = null;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Disconnect done, fire connection listeners");
            }
            for (ConnectionListener cl : this.connectionListeners) {
                cl.onDisconnect();
            }
        }
    }

    public void addPacketListener(AsyncPacketListener packetListener) {
        this.packetListeners.add(packetListener);
    }

    public void removePacketListener(AsyncPacketListener packetListener) {
        this.packetListeners.remove(packetListener);
    }

    public void addStateListener(AsyncStateListener listener) {
        this.stateListeners.add(listener);
    }

    public void removeStateListener(AsyncStateListener listener) {
        this.stateListeners.remove(listener);
    }

    public void addConnectionListener(ConnectionListener listener) {
        this.connectionListeners.add(listener);
    }

    public void removeConnectionListener(ConnectionListener listener) {
        this.connectionListeners.remove(listener);
    }

    public MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    public List<MemberAddress> getOtherMembers() {
        ArrayList list = new ArrayList(this.channel.getView().getMembers());
        list.remove(this.channel.getLocalAddress());
        ArrayList<MemberAddress> members = new ArrayList<MemberAddress>();
        for (Address address : list) {
            members.add(new MemberAddress(address));
        }
        return members;
    }

    public void sendPacket(AbstractPacket packet, MemberAddress ... destinations) throws IOException {
        Vector<Address> dest = new Vector<Address>();
        for (MemberAddress address : destinations) {
            dest.add(address.getAddress());
        }
        this.sendPacket(packet, dest);
    }

    private void sendPacket(AbstractPacket packet, Vector<Address> dest) throws IOException {
        byte[] buffer = PacketTransformer.getAsByteArray(packet);
        Message msg = new Message(null, null, buffer);
        this.dispatcher.castMessage(dest, msg, 6, 0L);
    }

    public void sendPacket(AbstractPacket packet) throws IOException {
        Vector<Address> dest = new Vector<Address>(this.channel.getView().getMembers());
        dest.remove(this.channel.getLocalAddress());
        this.sendPacket(packet, dest);
    }

    public JChannel getChannel() {
        return this.channel;
    }

    public Object handle(Message message) {
        if (this.isConnected()) {
            try {
                this.packetsHandler.add(PacketTransformer.getAsPacket(message.getBuffer()), new MemberAddress(message.getSrc()));
                if (this.channel.getView() != null) {
                    if (this.channel.getView().getMembers().size() == this.confMembersCount) {
                        this.packetsHandler.handle();
                    } else {
                        LOG.warn((Object)("Not all members connected to the channel " + this.channel.getView().getMembers().size() + " != " + this.confMembersCount + ", queue message " + message));
                    }
                } else {
                    LOG.warn((Object)("No members found or channel closed, queue message " + message));
                }
                return new String("Success");
            }
            catch (IOException e) {
                LOG.error((Object)("Message handler error " + e), (Throwable)e);
                return e.getMessage();
            }
            catch (ClassNotFoundException e) {
                LOG.error((Object)("Message handler error " + e), (Throwable)e);
                return e.getMessage();
            }
        }
        LOG.warn((Object)("Channel is closed but message received " + message));
        return new String("Disconnected");
    }

    public void viewAccepted(View view) {
        if (this.isConnected()) {
            LOG.info((Object)("View accepted " + view.printDetails()));
            ArrayList<MemberAddress> members = new ArrayList<MemberAddress>();
            for (Address address : view.getMembers()) {
                members.add(new MemberAddress(address));
            }
            AsyncStateEvent event = new AsyncStateEvent(new MemberAddress(this.channel.getLocalAddress()), members);
            for (AsyncStateListener listener : this.stateListeners) {
                listener.onStateChanged(event);
            }
            this.packetsHandler.handle();
        } else {
            LOG.warn((Object)("Channel is closed but View accepted " + view.printDetails()));
        }
    }

    public void block() {
    }

    public void suspect(Address arg0) {
    }

    class PacketHandler
    extends Thread {
        private final Object lock = new Object();
        private final ConcurrentLinkedQueue<MemberPacket> queue = new ConcurrentLinkedQueue();
        private MemberPacket current;

        PacketHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            while (true) {
                try {
                    while (true) {
                        Object object = this.lock;
                        synchronized (object) {
                            this.current = this.queue.poll();
                            while (this.current != null) {
                                AsyncPacketListener[] pl;
                                for (AsyncPacketListener handler : pl = AsyncChannelManager.this.packetListeners.toArray(new AsyncPacketListener[AsyncChannelManager.this.packetListeners.size()])) {
                                    handler.receive(this.current.packet, this.current.member);
                                }
                                this.current = this.queue.poll();
                            }
                            this.lock.wait();
                        }
                    }
                }
                catch (InterruptedException e) {
                    LOG.error((Object)("Cannot handle the queue. Wait lock failed " + e), (Throwable)e);
                    continue;
                }
                catch (Throwable e) {
                    LOG.error((Object)("Cannot handle the queue now. Error " + e), e);
                    try {
                        PacketHandler.sleep(5000L);
                        continue;
                    }
                    catch (Throwable e1) {
                        LOG.error((Object)("Sleep error " + e1));
                        continue;
                    }
                }
                break;
            }
        }

        void add(AbstractPacket packet, MemberAddress member) {
            this.queue.add(new MemberPacket(packet, member));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void handle() {
            if (this.current == null) {
                Object object = this.lock;
                synchronized (object) {
                    this.lock.notify();
                }
            }
        }
    }

    class MemberPacket {
        final AbstractPacket packet;
        final MemberAddress member;

        MemberPacket(AbstractPacket packet, MemberAddress member) {
            this.packet = packet;
            this.member = member;
        }
    }
}

