/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication.async;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jcr.InvalidItemStateException;
import javax.jcr.RepositoryException;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.dataflow.ItemState;
import org.exoplatform.services.jcr.ext.replication.async.AsyncInitializer;
import org.exoplatform.services.jcr.ext.replication.async.AsyncTransmitter;
import org.exoplatform.services.jcr.ext.replication.async.ChangesSaveErrorLog;
import org.exoplatform.services.jcr.ext.replication.async.ChangesSubscriber;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventListener;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventProducer;
import org.exoplatform.services.jcr.ext.replication.async.MergeDataManager;
import org.exoplatform.services.jcr.ext.replication.async.MergeDataManagerException;
import org.exoplatform.services.jcr.ext.replication.async.RemoteEventListener;
import org.exoplatform.services.jcr.ext.replication.async.RemoteExportException;
import org.exoplatform.services.jcr.ext.replication.async.SynchronizationLifeCycle;
import org.exoplatform.services.jcr.ext.replication.async.WorkspaceSynchronizer;
import org.exoplatform.services.jcr.ext.replication.async.storage.ChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.IncomeChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.IncomeStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.Member;
import org.exoplatform.services.jcr.ext.replication.async.storage.MemberChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.RandomChangesFile;
import org.exoplatform.services.jcr.ext.replication.async.storage.StorageRuntimeException;
import org.exoplatform.services.jcr.ext.replication.async.storage.SynchronizationException;
import org.exoplatform.services.jcr.ext.replication.async.transport.ChangesPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.MemberAddress;
import org.exoplatform.services.log.ExoLogger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChangesSubscriberImpl
extends SynchronizationLifeCycle
implements ChangesSubscriber,
RemoteEventListener,
LocalEventListener,
LocalEventProducer {
    protected static final Log LOG = ExoLogger.getLogger("ext.ChangesSubscriberImpl");
    protected final MergeDataManager mergeManager;
    protected final WorkspaceSynchronizer workspace;
    protected final IncomeStorage incomeStorrage;
    protected final ChangesSaveErrorLog errorLog;
    protected final AsyncTransmitter transmitter;
    protected final AsyncInitializer initializer;
    protected final int memberWaitTimeout;
    protected final int confMembersCount;
    protected final int localPriority;
    protected final HashMap<Integer, Counter> counterMap;
    protected final CountDownLatch mergeBarier;
    private FirstChangesWaiter firstChangesWaiter;
    protected HashMap<Integer, MemberChangesFile> incomChanges = new HashMap();
    protected MergeWorker mergeWorker = null;
    protected final Set<LocalEventListener> listeners = new LinkedHashSet<LocalEventListener>();

    public ChangesSubscriberImpl(AsyncInitializer initializer, AsyncTransmitter transmitter, WorkspaceSynchronizer workspace, MergeDataManager mergeManager, IncomeStorage incomeStorage, ChangesSaveErrorLog errorLog, int memberWaitTimeout, int localPriority, int confMembersCount) {
        this.memberWaitTimeout = memberWaitTimeout;
        this.localPriority = localPriority;
        this.mergeManager = mergeManager;
        this.workspace = workspace;
        this.incomeStorrage = incomeStorage;
        this.errorLog = errorLog;
        this.initializer = initializer;
        this.transmitter = transmitter;
        this.mergeBarier = new CountDownLatch(confMembersCount);
        this.confMembersCount = confMembersCount;
        this.counterMap = new LinkedHashMap<Integer, Counter>();
    }

    @Override
    public void addLocalListener(LocalEventListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeLocalListener(LocalEventListener listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void onChanges(ChangesPacket packet, Member member) {
        switch (packet.getType()) {
            case 9: {
                try {
                    LOG.info("Receiving member " + member.getName() + " changes.");
                    if (this.isInitialized()) {
                        LOG.info("START. Initiated by " + member.getName());
                        this.firstChangesWaiter = new FirstChangesWaiter();
                        this.firstChangesWaiter.start();
                        this.doStart();
                        for (LocalEventListener syncl : this.listeners) {
                            syncl.onStart(this.initializer.getOtherMembers());
                        }
                    }
                    if (this.isStarted()) {
                        RandomChangesFile cf = this.incomeStorrage.createChangesFile(packet.getCRC(), packet.getTimeStamp(), member);
                        cf.writeData(packet.getBuffer(), packet.getOffset());
                        this.incomChanges.put(packet.getTransmitterPriority(), new MemberChangesFile(cf, member));
                        break;
                    }
                    LOG.error("First data packet received but the Subscriber is not started. Packet from " + member.getName() + " skipped");
                }
                catch (Throwable e) {
                    this.doCancel();
                    LOG.error("Error of First data packet processing. Packet from " + member.getName(), e);
                }
                break;
            }
            case 10: {
                if (this.isStarted()) {
                    try {
                        MemberChangesFile mcf = this.incomChanges.get(packet.getTransmitterPriority());
                        mcf.getChangesFile().writeData(packet.getBuffer(), packet.getOffset());
                    }
                    catch (Throwable e) {
                        this.doCancel();
                        LOG.error("Error of Mid data packet processing. Packet from " + member.getName(), e);
                    }
                    break;
                }
                LOG.error("Mid data packet received but the Subscriber is not started. Packet from " + member.getName() + " skipped");
                break;
            }
            case 11: {
                if (this.isStarted()) {
                    LOG.info("Member " + member.getName() + " changes received.");
                    try {
                        Counter counter;
                        MemberChangesFile mcf = this.incomChanges.get(packet.getTransmitterPriority());
                        mcf.getChangesFile().finishWrite();
                        this.incomeStorrage.addMemberChanges(mcf.getMember(), mcf.getChangesFile());
                        if (this.counterMap.containsKey(packet.getTransmitterPriority())) {
                            counter = this.counterMap.get(packet.getTransmitterPriority());
                            counter.countUp();
                        } else {
                            counter = new Counter((int)packet.getFileCount(), 1);
                            this.counterMap.put(packet.getTransmitterPriority(), counter);
                        }
                        if (!counter.isTotalTransfer() || !this.isAllTransfered()) break;
                        if (this.mergeWorker == null) {
                            this.mergeWorker = new MergeWorker();
                            this.mergeWorker.start();
                            break;
                        }
                        LOG.error("Error, merge process laready activated.");
                    }
                    catch (Throwable e) {
                        this.doCancel();
                        LOG.error("Error of Last data packet processing. Packet from " + member.getName(), e);
                    }
                    break;
                }
                LOG.error("Last data packet received but the Subscriber is not started. Packet from " + member + " skipped");
            }
        }
    }

    private boolean isAllTransfered() {
        if (this.counterMap.size() + 1 != this.confMembersCount) {
            return false;
        }
        for (Map.Entry<Integer, Counter> e : this.counterMap.entrySet()) {
            if (e.getValue().isTotalTransfer()) continue;
            return false;
        }
        return true;
    }

    private void doCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Do CANCEL (local)");
        }
        if (this.isStarted()) {
            this.doStop();
            this.cancelMerge();
            try {
                this.transmitter.sendCancel();
            }
            catch (IOException ioe) {
                LOG.error("Cannot send 'Cancel'" + ioe, ioe);
            }
            for (LocalEventListener syncl : this.listeners) {
                syncl.onCancel();
            }
        } else {
            LOG.warn("Cannot cancel. Already stopped.");
        }
    }

    @Override
    public void doStop() {
        super.doStop();
        if (this.firstChangesWaiter != null) {
            this.firstChangesWaiter.cancel();
        } else {
            LOG.warn("First changes member is not initialized");
        }
        new Thread("Merge canceler, " + new Date()){

            public void run() {
                if (ChangesSubscriberImpl.this.mergeWorker != null) {
                    try {
                        ChangesSubscriberImpl.this.mergeWorker.join();
                    }
                    catch (InterruptedException e) {
                        LOG.error("Error of merge process cancelation " + e, e);
                    }
                    ChangesSubscriberImpl.this.mergeManager.cleanup();
                }
            }
        }.start();
    }

    @Override
    public void onCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("On CANCEL");
        }
        if (this.isStarted()) {
            this.doStop();
            this.cancelMerge();
        } else {
            LOG.warn("Not started or already stopped");
        }
    }

    private void cancelMerge() {
        if (this.mergeWorker != null) {
            this.mergeManager.cancel();
        }
    }

    @Override
    public void onMerge(MemberAddress member) {
        if (this.isStarted()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("On Merge member " + member + ", done=" + this.mergeBarier.getCount() + " members=" + this.confMembersCount);
            }
            this.mergeBarier.countDown();
        } else {
            LOG.warn("Subscriber stopped. On Merge member " + member + " ignored.");
        }
    }

    private synchronized void save() {
        LOG.info("Save changes.");
        try {
            if (LOG.isDebugEnabled()) {
                try {
                    LOG.debug("save \r\n" + this.mergeWorker.result.dump());
                }
                catch (Throwable e1) {
                    LOG.error("Changes dump error " + e1);
                }
            }
            this.workspace.save(this.mergeWorker.result);
        }
        catch (InvalidItemStateException e) {
            LOG.error("Save error " + e, e);
            this.errorLog.reportError(e);
        }
        catch (UnsupportedOperationException e) {
            LOG.error("Save error " + e, e);
            this.errorLog.reportError(e);
        }
        catch (RepositoryException e) {
            LOG.error("Save error " + e, e);
            this.errorLog.reportError(e);
        }
        catch (SynchronizationException e) {
            LOG.error("Save error " + e, e);
            this.errorLog.reportError(e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fire Stop (local)");
        }
        for (LocalEventListener ll : this.listeners) {
            ll.onStop();
        }
    }

    @Override
    public void onDisconnectMembers(List<Member> member) {
    }

    @Override
    public void onStart(List<MemberAddress> members) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("On START (local) " + members.size() + " members");
        }
        this.firstChangesWaiter = new FirstChangesWaiter();
        this.firstChangesWaiter.start();
        this.doStart();
    }

    @Override
    public void onStop() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("On STOP (local)");
        }
        if (this.isStarted()) {
            this.doStop();
        } else {
            LOG.warn("Not started or already stopped");
        }
    }

    private class FirstChangesWaiter
    extends Thread {
        private volatile boolean run = true;

        private FirstChangesWaiter() {
        }

        public void run() {
            try {
                Thread.sleep(ChangesSubscriberImpl.this.memberWaitTimeout);
                if (this.run) {
                    if (ChangesSubscriberImpl.this.counterMap.size() + 1 != ChangesSubscriberImpl.this.confMembersCount) {
                        LOG.error("No changes from one of members, received " + (ChangesSubscriberImpl.this.counterMap.size() + 1) + ", expected " + ChangesSubscriberImpl.this.confMembersCount + ".");
                        ChangesSubscriberImpl.this.doCancel();
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug("FirstChangesWaiter stopped. Changes from all members (" + (ChangesSubscriberImpl.this.counterMap.size() + 1) + ") received.");
                    }
                }
            }
            catch (InterruptedException e) {
                LOG.error("FirstChangesWaiter is interrupted : " + e, e);
            }
        }

        void cancel() {
            this.run = false;
        }
    }

    private class MemberChangesFile {
        private final RandomChangesFile changesFile;
        private final Member member;

        public MemberChangesFile(RandomChangesFile changesFile, Member member) {
            this.changesFile = changesFile;
            this.member = member;
        }

        public RandomChangesFile getChangesFile() {
            return this.changesFile;
        }

        public Member getMember() {
            return this.member;
        }
    }

    private class Counter {
        int total;
        int counter;

        public Counter(int total, int counter) {
            this.total = total;
            this.counter = counter;
        }

        public void countUp() {
            ++this.counter;
        }

        public boolean isTotalTransfer() {
            return this.total == this.counter;
        }
    }

    class MergeWorker
    extends Thread {
        private final Log workerLog = ExoLogger.getLogger("ext.MergeWorker");
        ChangesStorage<ItemState> result = null;

        MergeWorker() {
        }

        public void run() {
            try {
                MemberAddress localAddress = ChangesSubscriberImpl.this.initializer.getLocalMember();
                this.runMerge(new Member(localAddress, ChangesSubscriberImpl.this.localPriority));
                if (ChangesSubscriberImpl.this.isStarted()) {
                    ChangesSubscriberImpl.this.mergeBarier.countDown();
                    try {
                        ChangesSubscriberImpl.this.transmitter.sendMerge();
                    }
                    catch (IOException ioe) {
                        this.workerLog.error("Cannot send 'Merge done'" + ioe, ioe);
                    }
                }
            }
            catch (RepositoryException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (RemoteExportException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (IOException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (ClassCastException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (ClassNotFoundException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (MergeDataManagerException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (StorageRuntimeException e) {
                this.workerLog.error("Merge error " + e, e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (Throwable t) {
                this.workerLog.error("Merge error " + t, t);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            if (ChangesSubscriberImpl.this.isStarted()) {
                try {
                    this.workerLog.info("Waiting for other members.");
                    ChangesSubscriberImpl.this.mergeBarier.await();
                    ChangesSubscriberImpl.this.save();
                    ChangesSubscriberImpl.this.doStop();
                }
                catch (InterruptedException e) {
                    this.workerLog.error("Save error " + e, e);
                    ChangesSubscriberImpl.this.doCancel();
                }
            }
        }

        private void runMerge(Member localMember) throws RepositoryException, RemoteExportException, IOException, ClassCastException, ClassNotFoundException, MergeDataManagerException, StorageRuntimeException {
            List<MemberChangesStorage<ItemState>> membersChanges;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Run merge on " + localMember);
            }
            if ((membersChanges = ChangesSubscriberImpl.this.incomeStorrage.getChanges()).get(membersChanges.size() - 1).getMember().getPriority() < localMember.getPriority()) {
                membersChanges.add(new IncomeChangesStorage<ItemState>(ChangesSubscriberImpl.this.workspace.getLocalChanges(), localMember));
            } else {
                for (int i = 0; i < membersChanges.size(); ++i) {
                    if (membersChanges.get(i).getMember().getPriority() <= localMember.getPriority()) continue;
                    membersChanges.add(i, new IncomeChangesStorage<ItemState>(ChangesSubscriberImpl.this.workspace.getLocalChanges(), localMember));
                    break;
                }
            }
            if (LOG.isDebugEnabled()) {
                for (MemberChangesStorage<ItemState> ms : membersChanges) {
                    LOG.debug(">>> Member " + ms.getMember().getName() + " changes");
                    LOG.debug(ms.dump());
                }
            }
            this.workerLog.info("Start merge of " + membersChanges.size() + " members");
            ChangesSubscriberImpl.this.mergeManager.setLocalMember(localMember);
            this.result = ChangesSubscriberImpl.this.mergeManager.merge(membersChanges.iterator());
            this.workerLog.info("Local merge done");
        }
    }
}

