/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.impl.dataflow.replication;

import java.io.File;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.config.RepositoryEntry;
import org.exoplatform.services.jcr.config.WorkspaceEntry;
import org.exoplatform.services.jcr.dataflow.ItemDataChangesLog;
import org.exoplatform.services.jcr.dataflow.ItemDataKeeper;
import org.exoplatform.services.jcr.dataflow.persistent.ItemsPersistenceListener;
import org.exoplatform.services.jcr.impl.core.query.lucene.SearchIndex;
import org.exoplatform.services.jcr.impl.dataflow.persistent.CacheableWorkspaceDataManager;
import org.exoplatform.services.jcr.impl.dataflow.replication.FixupStream;
import org.exoplatform.services.jcr.impl.dataflow.replication.Packet;
import org.exoplatform.services.jcr.impl.dataflow.replication.PendingChangesLog;
import org.exoplatform.services.jcr.impl.dataflow.replication.WorkspaceDataManagerProxy;
import org.exoplatform.services.jcr.impl.util.io.FileCleaner;
import org.exoplatform.services.jcr.util.UUIDGenerator;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.util.Util;

public class WorkspaceDataReplicator
implements ItemsPersistenceListener,
MembershipListener,
RequestHandler {
    private static final String PERSISTENT_MODE = "persistent";
    private static final String PROXY_MODE = "proxy";
    protected static Log log = ExoLogger.getLogger((String)"jcr.WorkspaceDataReplicator");
    private final CacheableWorkspaceDataManager persistentdataManager;
    private final String systemId;
    private Channel channel;
    private MessageDispatcher disp;
    private HashMap<String, PendingChangesLog> mapPendingChangesLog;
    private Vector<Address> members;
    private ItemDataKeeper dataKeeper;
    private String mode;
    private FileCleaner fileCleaner;

    public WorkspaceDataReplicator(CacheableWorkspaceDataManager dataManager, WorkspaceEntry wsConfig, RepositoryEntry rConfig) throws RepositoryConfigurationException {
        this(dataManager, null, wsConfig, rConfig);
    }

    public WorkspaceDataReplicator(CacheableWorkspaceDataManager dataManager, SearchIndex searchIndex, WorkspaceEntry wsConfig, RepositoryEntry rConfig) throws RepositoryConfigurationException {
        this.mode = rConfig.getReplication().getMode();
        if (this.mode.equals(PROXY_MODE)) {
            this.dataKeeper = new WorkspaceDataManagerProxy(dataManager, searchIndex);
        } else if (this.mode.equals(PERSISTENT_MODE)) {
            this.dataKeeper = dataManager;
        } else {
            throw new RepositoryConfigurationException("Parameter 'mode' (persistent|proxy) required for replication configuration");
        }
        this.persistentdataManager = dataManager;
        this.fileCleaner = new FileCleaner(30030L);
        this.systemId = UUIDGenerator.generate();
        this.persistentdataManager.addItemPersistenceListener(this);
        String channelName = wsConfig.getUniqueName();
        try {
            String localAdaress = this.getLocalIP(Util.getFirstNonLoopbackAddress());
            String propsTCP_NIO = rConfig.getReplication().getChannelConfig();
            String props = propsTCP_NIO.replaceAll("/LocalAddress/", localAdaress);
            this.channel = new JChannel(props);
            this.disp = new MessageDispatcher(this.channel, null, (MembershipListener)this, (RequestHandler)this);
            this.channel.connect(channelName);
        }
        catch (ChannelException e) {
            e.printStackTrace();
        }
        catch (SocketException e) {
            e.printStackTrace();
        }
        this.mapPendingChangesLog = new HashMap();
        log.info((Object)("Replicator initialized JGroup Channel name: '" + channelName + "'"));
    }

    public void onSaveItems(ItemDataChangesLog changesLog) {
        if (changesLog.getSystemId() == null && changesLog.getSessionId() != null) {
            changesLog.setSystemId(this.systemId);
            try {
                this.send(changesLog);
                if (log.isDebugEnabled()) {
                    log.debug((Object)("After save message -->" + this.systemId));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void receive(ItemDataChangesLog changesLog) throws Exception {
        if (changesLog.getSystemId() == null || changesLog.getSystemId().equals(this.systemId)) {
            throw new Exception("Invalid or same systemId " + changesLog.getSystemId());
        }
        this.dataKeeper.save(changesLog);
    }

    private void send(ItemDataChangesLog itemDataChangesLog) throws Exception {
        PendingChangesLog container = new PendingChangesLog(itemDataChangesLog, this.fileCleaner);
        switch (container.getConteinerType()) {
            case 1: {
                byte[] buf = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf.length > 16384) {
                    this.sendBigItemDataChangesLog(buf, container.getUUID());
                    break;
                }
                Packet firstPacket = new Packet(1, buf.length, buf, container.getUUID());
                this.sendPacket(firstPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_without_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of buffer --> " + buf.length));
                log.debug((Object)("ItemStates size  --> " + itemDataChangesLog.getAllStates().size()));
                log.debug((Object)"---------------------");
                break;
            }
            case 2: {
                byte[] buf = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                Packet packet = new Packet(2, buf.length, buf, container.getUUID());
                this.sendPacket(packet);
                for (int i = 0; i < container.getInputStreams().size(); ++i) {
                    this.sendStream(container.getInputStreams().get(i), container.getFixupStreams().get(i), container.getUUID());
                }
                Packet lastPacket = new Packet(6, container.getUUID());
                this.sendPacket(lastPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_with_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of damp --> " + buf.length));
                log.debug((Object)("ItemStates   --> " + itemDataChangesLog.getAllStates().size()));
                log.debug((Object)("Streams      --> " + container.getInputStreams().size()));
                log.debug((Object)"---------------------");
                break;
            }
        }
    }

    public Object handle(Message msg) {
        try {
            Packet packet = Packet.getAsPacket(msg.getBuffer());
            switch (packet.getPacketType()) {
                case 1: {
                    ItemDataChangesLog changesLog = PendingChangesLog.getAsItemDataChangesLog(packet.getByteArray());
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Received-->ItemDataChangesLog_without_Streams-->");
                        log.debug((Object)"---------------------");
                        log.debug((Object)("Size of received packet --> " + packet.getByteArray().length));
                        log.debug((Object)("Size of ItemStates          --> " + changesLog.getAllStates().size()));
                        log.debug((Object)"---------------------");
                    }
                    this.receive(changesLog);
                    break;
                }
                case 2: {
                    ItemDataChangesLog changesLog = PendingChangesLog.getAsItemDataChangesLog(packet.getByteArray());
                    PendingChangesLog container = new PendingChangesLog(changesLog, packet.getUUID(), 2, this.fileCleaner);
                    this.mapPendingChangesLog.put(packet.getUUID(), container);
                    if (!log.isDebugEnabled()) break;
                    log.debug((Object)"Item DataChangesLog of type 'ItemDataChangesLog first whith stream'");
                    break;
                }
                case 3: {
                    if (!this.mapPendingChangesLog.containsKey(packet.getUUID())) break;
                    PendingChangesLog container = this.mapPendingChangesLog.get(packet.getUUID());
                    container.getFixupStreams().add(packet.getFixupStream());
                    File f = File.createTempFile("tempFile" + packet.getUUID() + UUIDGenerator.generate(), ".tmp");
                    container.getListFile().add(f);
                    container.getListRandomAccessFiles().add(new RandomAccessFile(f, "rw"));
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"First pocket of stream'");
                    }
                    break;
                }
                case 4: {
                    if (!this.mapPendingChangesLog.containsKey(packet.getUUID())) break;
                    PendingChangesLog container = this.mapPendingChangesLog.get(packet.getUUID());
                    RandomAccessFile randomAccessFile = container.getRandomAccessFile(packet.getFixupStream());
                    if (randomAccessFile != null) {
                        randomAccessFile.seek(packet.getOffset());
                        randomAccessFile.write(packet.getByteArray());
                    }
                    break;
                }
                case 5: {
                    if (!this.mapPendingChangesLog.containsKey(packet.getUUID())) break;
                    PendingChangesLog container = this.mapPendingChangesLog.get(packet.getUUID());
                    RandomAccessFile randomAccessFile = container.getRandomAccessFile(packet.getFixupStream());
                    if (randomAccessFile != null) {
                        randomAccessFile.seek(packet.getOffset());
                        randomAccessFile.write(packet.getByteArray());
                        randomAccessFile.close();
                    }
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Last pocket of stream'");
                    }
                    break;
                }
                case 6: {
                    ItemDataChangesLog dataChangesLog;
                    if (this.mapPendingChangesLog.get(packet.getUUID()) != null) {
                        this.mapPendingChangesLog.get(packet.getUUID()).restore();
                    }
                    if ((dataChangesLog = this.mapPendingChangesLog.get(packet.getUUID()).getItemDataChangesLog()) == null) break;
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Send-->ItemDataChangesLog_with_Streams-->");
                        log.debug((Object)"---------------------");
                        log.debug((Object)("ItemStates   --> " + dataChangesLog.getAllStates().size()));
                        log.debug((Object)("Streams      --> " + this.mapPendingChangesLog.get(packet.getUUID()).getInputStreams().size()));
                        log.debug((Object)"---------------------");
                    }
                    this.receive(dataChangesLog);
                    this.mapPendingChangesLog.remove(packet.getUUID());
                    break;
                }
                case 7: {
                    PendingChangesLog bigChangesLog = new PendingChangesLog(packet.getUUID(), (int)packet.getSize());
                    bigChangesLog.putData((int)packet.getOffset(), packet.getByteArray());
                    this.mapPendingChangesLog.put(packet.getUUID(), bigChangesLog);
                    break;
                }
                case 8: {
                    if (this.mapPendingChangesLog.get(packet.getUUID()) == null) break;
                    PendingChangesLog container = this.mapPendingChangesLog.get(packet.getUUID());
                    container.putData((int)packet.getOffset(), packet.getByteArray());
                    break;
                }
                case 9: {
                    if (this.mapPendingChangesLog.get(packet.getUUID()) == null) break;
                    PendingChangesLog container = this.mapPendingChangesLog.get(packet.getUUID());
                    container.putData((int)packet.getOffset(), packet.getByteArray());
                    ItemDataChangesLog tempChangesLog = PendingChangesLog.getAsItemDataChangesLog(container.getData());
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Recive-->Big ItemDataChangesLog_without_Streams-->");
                        log.debug((Object)"---------------------");
                        log.debug((Object)("Size of recive damp --> " + container.getData().length));
                        log.debug((Object)("ItemStates          --> " + tempChangesLog.getAllStates().size()));
                        log.debug((Object)"---------------------");
                        log.debug((Object)"Item big DataChangesLog of type 'ItemDataChangesLog only'");
                    }
                    this.receive(tempChangesLog);
                    this.mapPendingChangesLog.remove(packet.getUUID());
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return new String("Success !");
    }

    private void sendPacket(Packet packet) throws Exception {
        byte[] buffer = Packet.getAsByteArray(packet);
        Message msg = new Message(null, null, buffer);
        this.disp.castMessage(this.members, msg, 6, 0L);
    }

    private void sendStream(InputStream in, FixupStream fixupStream, String uuid) throws Exception {
        Packet packet = new Packet(3, fixupStream, uuid);
        this.sendPacket(packet);
        byte[] buf = new byte[16384];
        long offset = 0L;
        try {
            int len;
            while ((len = in.read(buf)) > 0) {
                if (len == buf.length) {
                    packet = new Packet(4, fixupStream, uuid, buf);
                    packet.setOffset(offset);
                    this.sendPacket(packet);
                } else {
                    byte[] buffer = new byte[len];
                    for (int i = 0; i < len; ++i) {
                        buffer[i] = buf[i];
                    }
                    packet = new Packet(5, fixupStream, uuid, buffer);
                    packet.setOffset(offset);
                    this.sendPacket(packet);
                }
                offset += (long)len;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Send  --> " + offset));
                }
                Thread.sleep(1L);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendBigItemDataChangesLog(byte[] data, String uuid) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(7, data.length, tempBuffer, uuid);
        firsPacket.setOffset(offset);
        this.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(8, data.length, tempBuffer, uuid);
            middlePacket.setOffset(offset);
            this.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.info((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(9, data.length, lastBuffer, uuid);
        lastPacket.setOffset(offset);
        this.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void cutData(byte[] sourceData, long startPos, byte[] destination) {
        for (int i = 0; i < destination.length; ++i) {
            destination[i] = sourceData[i + (int)startPos];
        }
    }

    public void viewAccepted(View new_view) {
        this.members = new Vector();
        for (int i = 0; i < new_view.getMembers().size(); ++i) {
            Address address = (Address)new_view.getMembers().get(i);
            if (address.compareTo((Object)this.channel.getLocalAddress()) == 0) continue;
            this.members.add(address);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)this.members.size());
        }
    }

    public void suspect(Address suspected_mbr) {
    }

    public void block() {
    }

    public void unblock() {
    }

    public byte[] getState() {
        return null;
    }

    public void setState(byte[] state) {
    }

    private String getLocalIP(InetAddress adr) {
        String str = adr.toString();
        return str.replaceAll("/", "");
    }
}

