/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.ext.replication.FixupStream;
import org.exoplatform.services.jcr.ext.replication.Packet;
import org.exoplatform.services.jcr.ext.replication.PacketListener;
import org.exoplatform.services.jcr.ext.replication.ReplicationException;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.ChannelListener;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;

public class ChannelManager
implements RequestHandler {
    public static final int INITIALIZED = 1;
    public static final int CONNECTED = 2;
    public static final int DISCONNECTED = 3;
    protected int state = 1;
    private CountDownLatch latch;
    private static Log log = ExoLogger.getLogger((String)"ext.ChannelManager");
    private JChannel channel;
    private MessageDispatcher dispatcher;
    private final String channelConfig;
    private final String channelName;
    private String testChannelName;
    private MembershipListener membershipListener;
    private MessageListener messageListener;
    private List<PacketListener> packetListeners;
    private ChannelListener channelListener;
    protected final PacketHandler packetsHandler;

    public ChannelManager(String channelConfig, String channelName) {
        this.channelConfig = channelConfig;
        this.channelName = channelName;
        this.packetListeners = new ArrayList<PacketListener>();
        this.packetsHandler = new PacketHandler();
        this.packetsHandler.start();
    }

    public boolean isConnected() {
        return this.channel != null;
    }

    public synchronized void init() throws ReplicationException {
        try {
            if (this.channel == null) {
                this.latch = new CountDownLatch(1);
                this.channel = new JChannel(this.channelConfig);
                this.channel.setOpt(5, (Object)Boolean.TRUE);
                this.channel.setOpt(6, (Object)Boolean.TRUE);
                this.dispatcher = new MessageDispatcher((Channel)this.channel, null, null, null);
                if (this.channelListener != null) {
                    this.channel.addChannelListener(this.channelListener);
                }
                if (this.membershipListener != null) {
                    this.dispatcher.setMembershipListener(this.membershipListener);
                }
                if (this.messageListener != null) {
                    this.dispatcher.setMessageListener(this.messageListener);
                }
                this.dispatcher.setRequestHandler((RequestHandler)this);
            }
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't create JGroups channel", e);
        }
    }

    public synchronized void connect() throws ReplicationException {
        log.info((Object)("channelName : " + this.channelName));
        if (log.isDebugEnabled()) {
            log.info((Object)("testChannelName == " + this.testChannelName));
        }
        try {
            if (this.testChannelName == null) {
                this.channel.connect(this.channelName);
            } else {
                this.channel.connect(this.testChannelName);
            }
            this.state = 2;
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't connect to JGroups channel", e);
        }
        finally {
            this.latch.countDown();
        }
    }

    public void closeChannel() {
        this.state = 3;
        if (this.dispatcher != null) {
            this.dispatcher.setRequestHandler(null);
            this.dispatcher.setMembershipListener(null);
            this.dispatcher.stop();
            this.dispatcher = null;
            if (log.isDebugEnabled()) {
                log.debug((Object)"dispatcher stopped");
            }
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                log.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
        }
        if (this.channel != null) {
            this.channel.disconnect();
            if (log.isDebugEnabled()) {
                log.debug((Object)"channel disconnected");
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                log.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
            this.channel.close();
            this.channel = null;
        }
    }

    public void setMembershipListener(MembershipListener membershipListener) {
        this.membershipListener = membershipListener;
    }

    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public void addPacketListener(PacketListener packetListener) {
        this.packetListeners.add(packetListener);
    }

    public void setChannelListener(ChannelListener channelListener) {
        this.channelListener = channelListener;
    }

    public MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    public void sendPacket(Packet packet) throws Exception {
        if (this.latch != null && this.latch.getCount() != 0L) {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.state == 2) {
            byte[] buffer = Packet.getAsByteArray(packet);
            Message msg = new Message(null, null, buffer);
            this.dispatcher.castMessage(null, msg, 6, 0L);
        } else if (log.isDebugEnabled()) {
            log.debug((Object)"Channel is not connected");
        }
    }

    public JChannel getChannel() {
        return this.channel;
    }

    public synchronized void send(byte[] buffer) {
        if (this.latch != null && this.latch.getCount() != 0L) {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.state == 2) {
            Message msg = new Message(null, null, buffer);
            this.dispatcher.castMessage(null, msg, 6, 0L);
        } else if (log.isDebugEnabled()) {
            log.debug((Object)"Channel is not connected");
        }
    }

    public synchronized void sendBigPacket(byte[] data, Packet packet) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(27, data.length, tempBuffer, packet.getIdentifier());
        firsPacket.setOwnName(packet.getOwnerName());
        firsPacket.setOffset(offset);
        this.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(28, data.length, tempBuffer, packet.getIdentifier());
            middlePacket.setOwnName(packet.getOwnerName());
            middlePacket.setOffset(offset);
            Thread.sleep(1L);
            this.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(29, data.length, lastBuffer, packet.getIdentifier());
        lastPacket.setOwnName(packet.getOwnerName());
        lastPacket.setOffset(offset);
        this.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void cutData(byte[] sourceData, long startPos, byte[] destination) {
        for (int i = 0; i < destination.length; ++i) {
            destination[i] = sourceData[i + (int)startPos];
        }
    }

    public synchronized void sendBinaryFile(String filePath, String ownerName, String identifier, String systemId, int firstPacketType, int middlePocketType, int lastPocketType) throws Exception {
        int len;
        long count = 0L;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Begin send : " + filePath));
        }
        File f = new File(filePath);
        FileInputStream in = new FileInputStream(f);
        Packet packet = new Packet(firstPacketType, identifier, ownerName, f.getName());
        packet.setSystemId(systemId);
        packet.setSize(count);
        ++count;
        this.sendPacket(packet);
        byte[] buf = new byte[16384];
        long offset = 0L;
        while ((len = ((InputStream)in).read(buf)) > 0 && len == 16384) {
            packet = new Packet(middlePocketType, new FixupStream(), identifier, buf);
            packet.setOffset(offset);
            packet.setOwnName(ownerName);
            packet.setFileName(f.getName());
            packet.setSize(count);
            ++count;
            this.sendPacket(packet);
            offset += (long)len;
            if (log.isDebugEnabled()) {
                log.debug((Object)("Send  --> " + offset));
            }
            Thread.sleep(1L);
        }
        if (len < 16384) {
            len = len == -1 ? 0 : len;
            byte[] buffer = new byte[len];
            for (int i = 0; i < len; ++i) {
                buffer[i] = buf[i];
            }
            packet = new Packet(lastPocketType, new FixupStream(), identifier, buffer);
            packet.setOffset(offset);
            packet.setOwnName(ownerName);
            packet.setFileName(f.getName());
            packet.setSize(count);
            ++count;
            this.sendPacket(packet);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("End send : " + filePath));
        }
        ((InputStream)in).close();
    }

    public Object handle(Message message) {
        if (this.isConnected()) {
            try {
                this.packetsHandler.add(Packet.getAsPacket(message.getBuffer()));
                if (this.channel.getView() != null) {
                    this.packetsHandler.handle();
                } else {
                    log.warn((Object)("No members found or channel closed, queue message " + message));
                }
                return new String("Success");
            }
            catch (IOException e) {
                log.error((Object)("Message handler error " + e), (Throwable)e);
                return e.getMessage();
            }
            catch (ClassNotFoundException e) {
                log.error((Object)("Message handler error " + e), (Throwable)e);
                return e.getMessage();
            }
        }
        log.warn((Object)("Channel is closed but message received " + message));
        return new String("Disconnected");
    }

    public void setAllowConnect(boolean allowConnect) {
        this.testChannelName = !allowConnect ? this.channelName + Math.round(Math.random() * 127.0) : null;
    }

    public void setAllowConnect(boolean allowConnect, int id) {
        this.testChannelName = !allowConnect ? this.channelName + id : null;
    }

    protected class PacketHandler
    extends Thread {
        private final Object lock = new Object();
        private final ConcurrentLinkedQueue<Packet> queue = new ConcurrentLinkedQueue();
        private Packet current;

        protected PacketHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            while (true) {
                try {
                    while (true) {
                        Object object = this.lock;
                        synchronized (object) {
                            this.current = this.queue.poll();
                            while (this.current != null) {
                                PacketListener[] pl;
                                for (PacketListener handler : pl = ChannelManager.this.packetListeners.toArray(new PacketListener[ChannelManager.this.packetListeners.size()])) {
                                    handler.receive(this.current);
                                }
                                this.current = this.queue.poll();
                            }
                            this.lock.wait();
                        }
                    }
                }
                catch (InterruptedException e) {
                    log.error((Object)("Cannot handle the queue. Wait lock failed " + e), (Throwable)e);
                    continue;
                }
                catch (Throwable e) {
                    log.error((Object)("Cannot handle the queue now. Error " + e), e);
                    try {
                        PacketHandler.sleep(5000L);
                        continue;
                    }
                    catch (Throwable e1) {
                        log.error((Object)("Sleep error " + e1));
                        continue;
                    }
                }
                break;
            }
        }

        public void add(Packet packet) {
            this.queue.add(packet);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle() {
            if (this.current == null) {
                Object object = this.lock;
                synchronized (object) {
                    this.lock.notify();
                }
                Thread.yield();
            } else if (log.isDebugEnabled()) {
                log.debug((Object)("Handler already active, queue size : " + this.queue.size()));
            }
        }
    }
}

