/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication;

import java.io.InputStream;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.dataflow.ChangesLogIterator;
import org.exoplatform.services.jcr.dataflow.ItemStateChangesLog;
import org.exoplatform.services.jcr.dataflow.PlainChangesLog;
import org.exoplatform.services.jcr.dataflow.TransactionChangesLog;
import org.exoplatform.services.jcr.dataflow.persistent.ItemsPersistenceListener;
import org.exoplatform.services.jcr.ext.replication.FixupStream;
import org.exoplatform.services.jcr.ext.replication.Packet;
import org.exoplatform.services.jcr.ext.replication.PendingChangesLog;
import org.exoplatform.services.jcr.impl.dataflow.persistent.CacheableWorkspaceDataManager;
import org.exoplatform.services.jcr.impl.util.io.FileCleaner;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Address;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;

public class WorkspaceDataTransmitter
implements ItemsPersistenceListener,
MembershipListener {
    protected static Log log = ExoLogger.getLogger((String)"ext.WorksapeDataTransmitter");
    private String systemId;
    private MessageDispatcher disp;
    private FileCleaner fileCleaner;
    private Vector<Address> members;

    public WorkspaceDataTransmitter(CacheableWorkspaceDataManager dataManager) throws RepositoryConfigurationException {
        dataManager.addItemPersistenceListener((ItemsPersistenceListener)this);
        this.fileCleaner = new FileCleaner(30030L);
    }

    public void init(MessageDispatcher messageDispatcher, String systemId) {
        this.systemId = systemId;
        this.disp = messageDispatcher;
        log.info((Object)("REPLICATION: WorkspaceDataTransmitter initialized, JGroup Channel name: '" + this.disp.getChannel().getClusterName() + "'"));
    }

    public void onSaveItems(ItemStateChangesLog changesLog_) {
        TransactionChangesLog changesLog = (TransactionChangesLog)changesLog_;
        if (changesLog.getSystemId() == null && !this.isSessionNull(changesLog)) {
            changesLog.setSystemId(this.systemId);
            try {
                if (log.isDebugEnabled()) {
                    ChangesLogIterator logIterator = changesLog.getLogIterator();
                    while (logIterator.hasNextLog()) {
                        PlainChangesLog pcl = logIterator.nextLog();
                        log.info((Object)pcl.dump());
                    }
                }
                this.send((ItemStateChangesLog)changesLog);
                if (log.isDebugEnabled()) {
                    log.debug((Object)("After save message -->" + this.systemId));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void send(ItemStateChangesLog itemDataChangesLog_) throws Exception {
        TransactionChangesLog itemDataChangesLog = (TransactionChangesLog)itemDataChangesLog_;
        PendingChangesLog container = new PendingChangesLog(itemDataChangesLog, this.fileCleaner);
        switch (container.getConteinerType()) {
            case 1: {
                byte[] buf = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf.length > 16384) {
                    this.sendBigItemDataChangesLog(buf, container.getIdentifier());
                    break;
                }
                Packet firstPacket = new Packet(1, buf.length, buf, container.getIdentifier());
                this.sendPacket(firstPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_without_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of buffer --> " + buf.length));
                log.debug((Object)("ItemStates size  --> " + itemDataChangesLog.getAllStates().size()));
                log.debug((Object)"---------------------");
                break;
            }
            case 2: {
                byte[] buf = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf.length < 16384) {
                    Packet packet = new Packet(2, buf.length, buf, container.getIdentifier());
                    this.sendPacket(packet);
                } else {
                    this.sendBigItemDataChangesLogWhithStream(buf, container.getIdentifier());
                }
                for (int i = 0; i < container.getInputStreams().size(); ++i) {
                    this.sendStream(container.getInputStreams().get(i), container.getFixupStreams().get(i), container.getIdentifier());
                }
                Packet lastPacket = new Packet(6, container.getIdentifier());
                this.sendPacket(lastPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_with_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of damp --> " + buf.length));
                log.debug((Object)("ItemStates   --> " + itemDataChangesLog.getAllStates().size()));
                log.debug((Object)("Streams      --> " + container.getInputStreams().size()));
                log.debug((Object)"---------------------");
                break;
            }
        }
    }

    private void sendPacket(Packet packet) throws Exception {
        byte[] buffer = Packet.getAsByteArray(packet);
        Message msg = new Message(null, null, buffer);
        this.disp.castMessage(null, msg, 6, 0L);
    }

    private void sendStream(InputStream in, FixupStream fixupStream, String identifier) throws Exception {
        Packet packet = new Packet(3, fixupStream, identifier);
        this.sendPacket(packet);
        byte[] buf = new byte[16384];
        long offset = 0L;
        try {
            int len;
            while ((len = in.read(buf)) > 0 && len == 16384) {
                packet = new Packet(4, fixupStream, identifier, buf);
                packet.setOffset(offset);
                this.sendPacket(packet);
                offset += (long)len;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Send  --> " + offset));
                }
                Thread.sleep(1L);
            }
            if (len < 16384) {
                len = len == -1 ? 0 : len;
                byte[] buffer = new byte[len];
                for (int i = 0; i < len; ++i) {
                    buffer[i] = buf[i];
                }
                packet = new Packet(5, fixupStream, identifier, buffer);
                packet.setOffset(offset);
                this.sendPacket(packet);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendBigItemDataChangesLog(byte[] data, String identifier) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(7, data.length, tempBuffer, identifier);
        firsPacket.setOffset(offset);
        this.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(8, data.length, tempBuffer, identifier);
            middlePacket.setOffset(offset);
            this.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.info((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(9, data.length, lastBuffer, identifier);
        lastPacket.setOffset(offset);
        this.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void sendBigItemDataChangesLogWhithStream(byte[] data, String identifier) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(10, data.length, tempBuffer, identifier);
        firsPacket.setOffset(offset);
        this.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(11, data.length, tempBuffer, identifier);
            middlePacket.setOffset(offset);
            this.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.info((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(12, data.length, lastBuffer, identifier);
        lastPacket.setOffset(offset);
        this.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void cutData(byte[] sourceData, long startPos, byte[] destination) {
        for (int i = 0; i < destination.length; ++i) {
            destination[i] = sourceData[i + (int)startPos];
        }
    }

    public void suspect(Address suspected_mbr) {
    }

    public void block() {
    }

    public void unblock() {
    }

    public byte[] getState() {
        return null;
    }

    public void setState(byte[] state) {
    }

    private boolean isSessionNull(TransactionChangesLog changesLog) {
        boolean isSessionNull = false;
        ChangesLogIterator logIterator = changesLog.getLogIterator();
        while (logIterator.hasNextLog()) {
            if (logIterator.nextLog().getSessionId() != null) continue;
            isSessionNull = true;
            break;
        }
        return isSessionNull;
    }

    public void viewAccepted(View views) {
        Address localIpAddres = this.disp.getChannel().getLocalAddress();
        this.members = new Vector();
        for (int i = 0; i < views.getMembers().size(); ++i) {
            Address address = (Address)views.getMembers().get(i);
            if (address.compareTo((Object)localIpAddres) == 0) continue;
            this.members.add(address);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)this.members.size());
        }
    }
}

