/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication;

import java.io.File;
import java.io.InputStream;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.dataflow.ChangesLogIterator;
import org.exoplatform.services.jcr.dataflow.ItemStateChangesLog;
import org.exoplatform.services.jcr.dataflow.PlainChangesLog;
import org.exoplatform.services.jcr.dataflow.TransactionChangesLog;
import org.exoplatform.services.jcr.dataflow.persistent.ItemsPersistenceListener;
import org.exoplatform.services.jcr.ext.replication.ChannelManager;
import org.exoplatform.services.jcr.ext.replication.FixupStream;
import org.exoplatform.services.jcr.ext.replication.Packet;
import org.exoplatform.services.jcr.ext.replication.PendingChangesLog;
import org.exoplatform.services.jcr.ext.replication.recovery.RecoveryManager;
import org.exoplatform.services.jcr.impl.dataflow.persistent.CacheableWorkspaceDataManager;
import org.exoplatform.services.jcr.impl.util.io.FileCleaner;
import org.exoplatform.services.log.ExoLogger;
import org.jgroups.Address;
import org.jgroups.MembershipListener;
import org.jgroups.View;

public class WorkspaceDataTransmitter
implements ItemsPersistenceListener,
MembershipListener {
    private static Log log = ExoLogger.getLogger((String)"ext.WorksapeDataTransmitter");
    private String systemId;
    private ChannelManager channelManager;
    private FileCleaner fileCleaner;
    private Vector<Address> members;
    private RecoveryManager recoveryManager;
    private String ownName;

    public WorkspaceDataTransmitter(CacheableWorkspaceDataManager dataManager) throws RepositoryConfigurationException {
        dataManager.addItemPersistenceListener((ItemsPersistenceListener)this);
        this.fileCleaner = new FileCleaner(30030L);
    }

    public void init(ChannelManager channelManager, String systemId, String ownName, RecoveryManager recoveryManager) {
        this.systemId = systemId;
        this.channelManager = channelManager;
        this.ownName = ownName;
        this.recoveryManager = recoveryManager;
        log.info((Object)("Own name  : " + ownName));
        log.info((Object)("System ID : " + systemId));
    }

    public void onSaveItems(ItemStateChangesLog isChangesLog) {
        TransactionChangesLog changesLog = (TransactionChangesLog)isChangesLog;
        if (changesLog.getSystemId() == null && !this.isSessionNull(changesLog)) {
            changesLog.setSystemId(this.systemId);
            try {
                if (log.isDebugEnabled()) {
                    ChangesLogIterator logIterator = changesLog.getLogIterator();
                    while (logIterator.hasNextLog()) {
                        PlainChangesLog pcl = logIterator.nextLog();
                        log.info((Object)pcl.dump());
                    }
                }
                String identifier = this.sendAsBinaryFile((ItemStateChangesLog)changesLog);
                if (log.isDebugEnabled()) {
                    log.info((Object)("After send message: the owner systemId --> " + changesLog.getSystemId()));
                    log.info((Object)("After send message: --> " + this.systemId));
                }
            }
            catch (Exception e) {
                log.error((Object)"Can not sent ChangesLog ...", (Throwable)e);
            }
        }
    }

    private String send(ItemStateChangesLog isChangesLog) throws Exception {
        TransactionChangesLog changesLog = (TransactionChangesLog)isChangesLog;
        PendingChangesLog container = new PendingChangesLog(changesLog, this.fileCleaner);
        this.recoveryManager.save(isChangesLog, container.getIdentifier());
        switch (container.getConteinerType()) {
            case 1: {
                byte[] buf1 = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf1.length > 16384) {
                    this.sendBigItemDataChangesLog(buf1, container.getIdentifier());
                    break;
                }
                Packet firstPacket = new Packet(1, buf1.length, buf1, container.getIdentifier());
                this.channelManager.sendPacket(firstPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_without_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of buffer --> " + buf1.length));
                log.debug((Object)("ItemStates size  --> " + changesLog.getAllStates().size()));
                log.debug((Object)"---------------------");
                break;
            }
            case 2: {
                byte[] buf2 = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf2.length < 16384) {
                    Packet packet = new Packet(2, buf2.length, buf2, container.getIdentifier());
                    this.channelManager.sendPacket(packet);
                } else {
                    this.sendBigItemDataChangesLogWhithStream(buf2, container.getIdentifier());
                }
                for (int i = 0; i < container.getInputStreams().size(); ++i) {
                    this.sendStream(container.getInputStreams().get(i), container.getFixupStreams().get(i), container.getIdentifier());
                }
                Packet lastPacket = new Packet(6, container.getIdentifier());
                this.channelManager.sendPacket(lastPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_with_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of damp --> " + buf2.length));
                log.debug((Object)("ItemStates   --> " + changesLog.getAllStates().size()));
                log.debug((Object)("Streams      --> " + container.getInputStreams().size()));
                log.debug((Object)"---------------------");
                break;
            }
        }
        return container.getIdentifier();
    }

    private String sendAsBinaryFile(ItemStateChangesLog isChangesLog) throws Exception {
        TransactionChangesLog changesLog = (TransactionChangesLog)isChangesLog;
        PendingChangesLog container = new PendingChangesLog(changesLog, this.fileCleaner);
        this.recoveryManager.save(isChangesLog, container.getIdentifier());
        File f = File.createTempFile("cl_", ".tmp");
        this.recoveryManager.getRecoveryWriter().save(f, changesLog);
        switch (container.getConteinerType()) {
            case 1: {
                byte[] buf1 = PendingChangesLog.getAsByteArray(container.getItemDataChangesLog());
                if (buf1.length > 16384) {
                    this.sendBigItemDataChangesLog(buf1, container.getIdentifier());
                    break;
                }
                Packet firstPacket = new Packet(1, buf1.length, buf1, container.getIdentifier());
                this.channelManager.sendPacket(firstPacket);
                if (!log.isDebugEnabled()) break;
                log.debug((Object)"Send-->ItemDataChangesLog_without_Streams-->");
                log.debug((Object)"---------------------");
                log.debug((Object)("Size of buffer --> " + buf1.length));
                log.debug((Object)("ItemStates size  --> " + changesLog.getAllStates().size()));
                log.debug((Object)"---------------------");
                break;
            }
            case 2: {
                this.channelManager.sendBinaryFile(f.getCanonicalPath(), this.ownName, container.getIdentifier(), this.systemId, 32, 33, 34);
                this.fileCleaner.addFile(f);
                break;
            }
        }
        return container.getIdentifier();
    }

    private void sendStream(InputStream in, FixupStream fixupStream, String identifier) throws Exception {
        Packet packet = new Packet(3, fixupStream, identifier);
        this.channelManager.sendPacket(packet);
        byte[] buf = new byte[16384];
        long offset = 0L;
        try {
            int len;
            while ((len = in.read(buf)) > 0 && len == 16384) {
                packet = new Packet(4, fixupStream, identifier, buf);
                packet.setOffset(offset);
                this.channelManager.sendPacket(packet);
                offset += (long)len;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Send  --> " + offset));
                }
                Thread.sleep(1L);
            }
            if (len < 16384) {
                len = len == -1 ? 0 : len;
                byte[] buffer = new byte[len];
                for (int i = 0; i < len; ++i) {
                    buffer[i] = buf[i];
                }
                packet = new Packet(5, fixupStream, identifier, buffer);
                packet.setOffset(offset);
                this.channelManager.sendPacket(packet);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendBigItemDataChangesLog(byte[] data, String identifier) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(7, data.length, tempBuffer, identifier);
        firsPacket.setOffset(offset);
        this.channelManager.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(8, data.length, tempBuffer, identifier);
            middlePacket.setOffset(offset);
            this.channelManager.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.info((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(9, data.length, lastBuffer, identifier);
        lastPacket.setOffset(offset);
        this.channelManager.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void sendBigItemDataChangesLogWhithStream(byte[] data, String identifier) throws Exception {
        long offset = 0L;
        byte[] tempBuffer = new byte[16384];
        this.cutData(data, offset, tempBuffer);
        Packet firsPacket = new Packet(10, data.length, tempBuffer, identifier);
        firsPacket.setOffset(offset);
        this.channelManager.sendPacket(firsPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + firsPacket.getByteArray().length));
        }
        offset += (long)tempBuffer.length;
        while ((long)data.length - offset > 16384L) {
            this.cutData(data, offset, tempBuffer);
            Packet middlePacket = new Packet(11, data.length, tempBuffer, identifier);
            middlePacket.setOffset(offset);
            this.channelManager.sendPacket(middlePacket);
            if (log.isDebugEnabled()) {
                log.info((Object)("Send of damp --> " + middlePacket.getByteArray().length));
            }
            offset += (long)tempBuffer.length;
        }
        byte[] lastBuffer = new byte[data.length - (int)offset];
        this.cutData(data, offset, lastBuffer);
        Packet lastPacket = new Packet(12, data.length, lastBuffer, identifier);
        lastPacket.setOffset(offset);
        this.channelManager.sendPacket(lastPacket);
        if (log.isDebugEnabled()) {
            log.info((Object)("Send of damp --> " + lastPacket.getByteArray().length));
        }
    }

    private void cutData(byte[] sourceData, long startPos, byte[] destination) {
        for (int i = 0; i < destination.length; ++i) {
            destination[i] = sourceData[i + (int)startPos];
        }
    }

    public void suspect(Address suspectedMbr) {
    }

    public void block() {
    }

    private boolean isSessionNull(TransactionChangesLog changesLog) {
        boolean isSessionNull = false;
        ChangesLogIterator logIterator = changesLog.getLogIterator();
        while (logIterator.hasNextLog()) {
            if (logIterator.nextLog().getSessionId() != null) continue;
            isSessionNull = true;
            break;
        }
        return isSessionNull;
    }

    public void viewAccepted(View views) {
        Address localIpAddres = this.channelManager.getChannel().getLocalAddress();
        this.members = new Vector();
        for (int i = 0; i < views.getMembers().size(); ++i) {
            Address address = (Address)views.getMembers().get(i);
            if (address.compareTo((Object)localIpAddres) == 0) continue;
            this.members.add(address);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)this.members.size());
        }
    }

    public ChannelManager getChannelManager() {
        return this.channelManager;
    }
}

