/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication.async;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.ext.replication.async.AsyncTransmitter;
import org.exoplatform.services.jcr.ext.replication.async.storage.ChangesFile;
import org.exoplatform.services.jcr.ext.replication.async.transport.AbstractPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.AsyncChannelManager;
import org.exoplatform.services.jcr.ext.replication.async.transport.CancelPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.ChangesPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.ErrorPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.ExportChangesPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.GetExportPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.MemberAddress;
import org.exoplatform.services.jcr.ext.replication.async.transport.MergePacket;
import org.exoplatform.services.log.ExoLogger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AsyncTransmitterImpl
implements AsyncTransmitter {
    private static final Log LOG = ExoLogger.getLogger("ext.AsyncTransmitterImpl");
    protected final AsyncChannelManager channel;
    protected final int priority;

    AsyncTransmitterImpl(AsyncChannelManager channel, int priority) {
        this.channel = channel;
        this.priority = priority;
    }

    @Override
    public void sendChanges(ChangesFile[] changes, List<MemberAddress> subscribers) throws IOException {
        for (ChangesFile cf : changes) {
            if (cf == null) continue;
            this.sendChangesLogFile(subscribers, cf, this.priority, changes.length);
        }
    }

    @Override
    public void sendChanges(ChangesFile cf, List<MemberAddress> subscribers, int totalFiles) throws IOException {
        this.sendChangesLogFile(subscribers, cf, this.priority, totalFiles);
    }

    @Override
    public void sendGetExport(String nodeId, MemberAddress address) throws IOException {
        GetExportPacket packet = new GetExportPacket(nodeId, this.priority);
        try {
            this.channel.sendPacket((AbstractPacket)packet, address);
        }
        catch (IOException e) {
            LOG.error("Cannot send export data", e);
            throw e;
        }
    }

    @Override
    public void sendExport(ChangesFile changes, MemberAddress destAddress) throws IOException {
        try {
            this.sendExportChangesLogFile(destAddress, changes, 1);
        }
        catch (IOException e) {
            LOG.error("Cannot send export data", e);
            this.sendError("Cannot send export data. Internal error ossurs.", destAddress);
            throw e;
        }
    }

    @Override
    public void sendError(String error, MemberAddress destAddress) throws IOException {
        try {
            ErrorPacket packet = new ErrorPacket(19, error, this.priority);
            this.channel.sendPacket((AbstractPacket)packet, destAddress);
        }
        catch (IOException e) {
            LOG.error("Cannot send export data", e);
            throw e;
        }
    }

    @Override
    public void sendCancel() throws IOException {
        CancelPacket cancelPacket = new CancelPacket(17, this.priority);
        this.channel.sendPacket(cancelPacket);
    }

    @Override
    public void sendMerge() throws IOException {
        MergePacket mergePacket = new MergePacket(18, this.priority);
        this.channel.sendPacket(mergePacket);
    }

    protected void sendChangesLogFile(MemberAddress destinationAddress, ChangesFile clFile, int transmitterPriority, int totalFiles) throws IOException {
        ArrayList<MemberAddress> destinationAddresses = new ArrayList<MemberAddress>();
        destinationAddresses.add(destinationAddress);
        this.sendChangesLogFile(destinationAddresses, clFile, transmitterPriority, totalFiles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendChangesLogFile(List<MemberAddress> destinationAddresses, ChangesFile clFile, int transmitterPriority, int totalFiles) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin send : " + clFile.getId());
        }
        InputStream in = clFile.getInputStream();
        try {
            byte[] b;
            byte[] buff = new byte[16384];
            long offset = 0L;
            int len = in.read(buff);
            if (len < 16384) {
                b = new byte[len];
                System.arraycopy(buff, 0, b, 0, len);
                buff = b;
            }
            ChangesPacket packet = new ChangesPacket(9, transmitterPriority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, buff);
            for (MemberAddress dm : destinationAddresses) {
                this.channel.sendPacket((AbstractPacket)packet, dm);
            }
            offset += (long)len;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Send PacType [BINARY_CHANGESLOG_FIRST_PACKET] --> " + offset);
            }
            while ((len = in.read(buff)) > 0) {
                if (len < 16384) {
                    b = new byte[len];
                    System.arraycopy(buff, 0, b, 0, len);
                    buff = b;
                }
                packet = new ChangesPacket(10, transmitterPriority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, buff);
                for (MemberAddress dm : destinationAddresses) {
                    this.channel.sendPacket((AbstractPacket)packet, dm);
                }
                offset += (long)len;
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Send PacType [BINARY_CHANGESLOG_MIDDLE_PACKET] --> " + offset);
            }
            packet = new ChangesPacket(11, transmitterPriority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, new byte[0]);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Send PacType [BINARY_CHANGESLOG_LAST_PACKET] --> " + offset);
            }
            for (MemberAddress dm : destinationAddresses) {
                this.channel.sendPacket((AbstractPacket)packet, dm);
            }
        }
        finally {
            try {
                in.close();
            }
            catch (IOException e) {
                LOG.error("Error fo local storage stream close. " + e, e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("End send : " + clFile.getChecksum());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendExportChangesLogFile(MemberAddress destinationAddress, ChangesFile clFile, int totalFiles) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin send : " + clFile.getChecksum());
        }
        ArrayList<MemberAddress> destinationAddresses = new ArrayList<MemberAddress>();
        destinationAddresses.add(destinationAddress);
        InputStream in = clFile.getInputStream();
        try {
            byte[] b;
            byte[] buf = new byte[16384];
            long offset = 0L;
            int len = in.read(buf);
            if (len < 16384) {
                b = new byte[len];
                System.arraycopy(buf, 0, b, 0, len);
                buf = b;
            }
            ExportChangesPacket packet = new ExportChangesPacket(1, this.priority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, buf);
            for (MemberAddress dm : destinationAddresses) {
                this.channel.sendPacket((AbstractPacket)packet, dm);
            }
            offset += (long)len;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Send PacType [EXPORT_CHANGES_FIRST_PACKET] --> " + offset);
            }
            while ((len = in.read(buf)) > 0) {
                if (len < 16384) {
                    b = new byte[len];
                    System.arraycopy(buf, 0, b, 0, len);
                    buf = b;
                }
                packet = new ExportChangesPacket(2, this.priority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, buf);
                for (MemberAddress dm : destinationAddresses) {
                    this.channel.sendPacket((AbstractPacket)packet, dm);
                }
                offset += (long)len;
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Send PacType [EXPORT_CHANGES_MIDDLE_PACKET] --> " + offset);
            }
            packet = new ExportChangesPacket(3, this.priority, clFile.getChecksum(), clFile.getId(), totalFiles, offset, new byte[0]);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Send PacType [EXPORT_CHANGES_LAST_PACKET] --> " + offset);
            }
            for (MemberAddress dm : destinationAddresses) {
                this.channel.sendPacket((AbstractPacket)packet, dm);
            }
        }
        finally {
            try {
                in.close();
            }
            catch (IOException e) {
                LOG.error("Error of local storage stream close. " + e, e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("End send : " + clFile.getChecksum());
        }
    }
}

