/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.ext.replication.Packet;
import org.exoplatform.services.jcr.ext.replication.PacketListener;
import org.exoplatform.services.jcr.ext.replication.ReplicationException;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.ChannelListener;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;

public class ChannelManager
implements RequestHandler {
    private static Log log = ExoLogger.getLogger((String)"ext.ChannelManager");
    private JChannel channel;
    private MessageDispatcher dispatcher;
    private final String channelConfig;
    private final String channelName;
    private String testChannelName;
    private MembershipListener membershipListener;
    private MessageListener messageListener;
    private List<PacketListener> packetListeners;
    private ChannelListener channelListener;
    private final PacketHandler packetsHandler;

    public ChannelManager(String channelConfig, String channelName) {
        this.channelConfig = channelConfig;
        this.channelName = channelName;
        this.packetListeners = new ArrayList<PacketListener>();
        this.packetsHandler = new PacketHandler();
        this.packetsHandler.start();
    }

    public synchronized void init() throws ReplicationException {
        try {
            if (this.channel == null) {
                this.channel = new JChannel(this.channelConfig);
                this.channel.setOpt(5, (Object)Boolean.TRUE);
                this.channel.setOpt(6, (Object)Boolean.TRUE);
                this.dispatcher = new MessageDispatcher((Channel)this.channel, null, null, null);
                if (this.channelListener != null) {
                    this.channel.addChannelListener(this.channelListener);
                }
                if (this.membershipListener != null) {
                    this.dispatcher.setMembershipListener(this.membershipListener);
                }
                if (this.messageListener != null) {
                    this.dispatcher.setMessageListener(this.messageListener);
                }
                this.dispatcher.setRequestHandler((RequestHandler)this);
            }
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't create JGroups channel", e);
        }
    }

    public synchronized void connect() throws ReplicationException {
        log.info((Object)("channalName : " + this.channelName));
        if (log.isDebugEnabled()) {
            log.info((Object)("testChannalName == " + this.testChannelName));
        }
        try {
            if (this.testChannelName == null) {
                this.channel.connect(this.channelName);
            } else {
                this.channel.connect(this.testChannelName);
            }
        }
        catch (ChannelException e) {
            throw new ReplicationException("Can't connect to JGroups channel", e);
        }
    }

    public void closeChannel() {
        if (this.dispatcher != null) {
            this.dispatcher.setRequestHandler(null);
            this.dispatcher.setMembershipListener(null);
            this.dispatcher.stop();
            this.dispatcher = null;
            if (log.isDebugEnabled()) {
                log.debug((Object)"dispatcher stopped");
            }
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                log.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
        }
        if (this.channel != null) {
            this.channel.disconnect();
            if (log.isDebugEnabled()) {
                log.debug((Object)"channel disconnected");
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                log.error((Object)("The interapted on disconnect : " + e), (Throwable)e);
            }
            this.channel.close();
            this.channel = null;
        }
    }

    public void setMembershipListener(MembershipListener membershipListener) {
        this.membershipListener = membershipListener;
    }

    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public void addPacketListener(PacketListener packetListener) {
        this.packetListeners.add(packetListener);
    }

    public void setChannelListener(ChannelListener channelListener) {
        this.channelListener = channelListener;
    }

    public MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    public void sendPacket(Packet packet) throws Exception {
        byte[] buffer = Packet.getAsByteArray(packet);
        Message msg = new Message(null, null, buffer);
        Vector addr = new Vector(this.channel.getView().getMembers());
        addr.remove(this.channel.getLocalAddress());
        this.dispatcher.castMessage(addr, msg, 6, 0L);
    }

    public void sendPacketToAll(Packet packet) throws Exception {
        byte[] buffer = Packet.getAsByteArray(packet);
        Message msg = new Message(null, null, buffer);
        this.dispatcher.castMessage(null, msg, 6, 0L);
    }

    public JChannel getChannel() {
        return this.channel;
    }

    public synchronized void send(byte[] buffer) {
        Message msg = new Message(null, null, buffer);
        this.dispatcher.castMessage(null, msg, 6, 0L);
    }

    public synchronized void sendBigPacket(byte[] data, Packet packet) throws Exception {
        int len;
        long totalPacketCount = this.getPacketCount(data.length, 16384L);
        int offset = 0;
        while ((len = data.length - offset) > 0) {
            int l = len > 16384 ? 16384 : len;
            byte[] buf = new byte[l];
            System.arraycopy(data, offset, buf, 0, l);
            Packet bigPacket = new Packet(27, packet.getIdentifier(), totalPacketCount, data.length, offset, buf);
            this.sendPacket(bigPacket);
            offset += l;
            if (!log.isDebugEnabled()) continue;
            log.debug((Object)("Send of damp --> " + bigPacket.getByteArray().length));
        }
    }

    public synchronized void sendBinaryFile(String filePath, String ownerName, String identifier, String systemId, int packetType) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Begin send : " + filePath));
        }
        File f = new File(filePath);
        long packetCount = this.getPacketCount(f.length(), 16384L);
        FileInputStream in = new FileInputStream(f);
        byte[] buf = new byte[16384];
        long offset = 0L;
        int len = ((InputStream)in).read(buf);
        if (len < 16384) {
            byte[] b = new byte[len];
            System.arraycopy(buf, 0, b, 0, len);
            buf = b;
        }
        Packet packet = new Packet(packetType, systemId, identifier, ownerName, f.getName(), packetCount, offset, buf);
        this.sendPacket(packet);
        offset += (long)len;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Send packet type [" + packetType + "] --> " + offset));
        }
        while ((len = ((InputStream)in).read(buf)) > 0) {
            if (len < 16384) {
                byte[] b = new byte[len];
                System.arraycopy(buf, 0, b, 0, len);
                buf = b;
            }
            packet = new Packet(packetType, systemId, identifier, ownerName, f.getName(), packetCount, offset, buf);
            this.sendPacket(packet);
            offset += (long)len;
            if (!log.isDebugEnabled()) continue;
            log.debug((Object)("Send packet type [" + packetType + "] --> " + offset));
        }
        ((InputStream)in).close();
    }

    public Object handle(Message message) {
        try {
            Packet packet = Packet.getAsPacket(message.getBuffer());
            this.packetsHandler.add(packet);
            if (this.channel != null || this.channel.getView() != null) {
                this.packetsHandler.handle();
            } else {
                log.warn((Object)("No members found or channel closed, queue message " + message));
            }
        }
        catch (IOException e) {
            log.error((Object)"An error in processing packet : ", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            log.error((Object)"An error in processing packet : ", (Throwable)e);
        }
        return new String("Success !");
    }

    public void setAllowConnect(boolean allowConnect) {
        this.testChannelName = !allowConnect ? this.channelName + Math.round(Math.random() * 127.0) : null;
    }

    public void setAllowConnect(boolean allowConnect, int id) {
        this.testChannelName = !allowConnect ? this.channelName + id : null;
    }

    private long getPacketCount(long contentLength, long packetSize) {
        long count;
        return count += (count = contentLength / packetSize) * packetSize - contentLength != 0L ? 1L : 0L;
    }

    class PacketHandler
    extends Thread {
        private final Object lock = new Object();
        private final ConcurrentLinkedQueue<Packet> queue = new ConcurrentLinkedQueue();
        private Packet current;

        PacketHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            while (true) {
                try {
                    while (true) {
                        Object object = this.lock;
                        synchronized (object) {
                            this.current = this.queue.poll();
                            while (this.current != null) {
                                PacketListener[] pl;
                                for (PacketListener handler : pl = ChannelManager.this.packetListeners.toArray(new PacketListener[ChannelManager.this.packetListeners.size()])) {
                                    handler.receive(this.current);
                                }
                                this.current = this.queue.poll();
                            }
                            this.lock.wait();
                        }
                    }
                }
                catch (InterruptedException e) {
                    log.error((Object)("Cannot handle the queue. Wait lock failed " + e), (Throwable)e);
                    continue;
                }
                catch (Throwable e) {
                    log.error((Object)("Cannot handle the queue now. Error " + e), e);
                    try {
                        PacketHandler.sleep(5000L);
                        continue;
                    }
                    catch (Throwable e1) {
                        log.error((Object)("Sleep error " + e1));
                        continue;
                    }
                }
                break;
            }
        }

        void add(Packet packet) {
            this.queue.add(packet);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void handle() {
            if (this.current == null) {
                Object object = this.lock;
                synchronized (object) {
                    this.lock.notify();
                }
            }
        }
    }
}

