/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication.async;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jcr.InvalidItemStateException;
import javax.jcr.RepositoryException;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.dataflow.ItemState;
import org.exoplatform.services.jcr.ext.replication.async.AsyncInitializer;
import org.exoplatform.services.jcr.ext.replication.async.AsyncTransmitter;
import org.exoplatform.services.jcr.ext.replication.async.ChangesSaveErrorLog;
import org.exoplatform.services.jcr.ext.replication.async.ChangesSubscriber;
import org.exoplatform.services.jcr.ext.replication.async.IncomeDataContext;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventListener;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventProducer;
import org.exoplatform.services.jcr.ext.replication.async.MergeDataManager;
import org.exoplatform.services.jcr.ext.replication.async.MergeDataManagerException;
import org.exoplatform.services.jcr.ext.replication.async.RemoteEventListener;
import org.exoplatform.services.jcr.ext.replication.async.RemoteExportException;
import org.exoplatform.services.jcr.ext.replication.async.SynchronizationLifeCycle;
import org.exoplatform.services.jcr.ext.replication.async.WorkspaceSynchronizer;
import org.exoplatform.services.jcr.ext.replication.async.storage.ChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.IncomeChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.IncomeStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.Member;
import org.exoplatform.services.jcr.ext.replication.async.storage.MemberChangesStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.RandomChangesFile;
import org.exoplatform.services.jcr.ext.replication.async.storage.StorageRuntimeException;
import org.exoplatform.services.jcr.ext.replication.async.storage.SynchronizationException;
import org.exoplatform.services.jcr.ext.replication.async.transport.ChangesPacket;
import org.exoplatform.services.jcr.ext.replication.async.transport.MemberAddress;
import org.exoplatform.services.jcr.impl.dataflow.serialization.ReaderSpoolFileHolder;
import org.exoplatform.services.jcr.impl.util.io.FileCleaner;
import org.exoplatform.services.log.ExoLogger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChangesSubscriberImpl
extends SynchronizationLifeCycle
implements ChangesSubscriber,
RemoteEventListener,
LocalEventListener,
LocalEventProducer {
    protected static final Log LOG = ExoLogger.getLogger((String)"ext.ChangesSubscriberImpl");
    protected final MergeDataManager mergeManager;
    protected final WorkspaceSynchronizer workspace;
    protected final IncomeStorage incomeStorrage;
    protected final ChangesSaveErrorLog errorLog;
    protected final AsyncTransmitter transmitter;
    protected final AsyncInitializer initializer;
    protected final int memberWaitTimeout;
    protected final int confMembersCount;
    protected final int localPriority;
    protected final HashMap<Integer, Counter> counterMap;
    protected final CountDownLatch mergeBarier;
    private FirstChangesWaiter firstChangesWaiter;
    protected HashMap<String, IncomeDataContext> incomChanges = new HashMap();
    protected MergeWorker mergeWorker = null;
    protected final Set<LocalEventListener> listeners = new LinkedHashSet<LocalEventListener>();
    protected final FileCleaner fileCleaner;
    protected final int maxBufferSize;
    private final ReaderSpoolFileHolder holder;

    public ChangesSubscriberImpl(AsyncInitializer initializer, AsyncTransmitter transmitter, WorkspaceSynchronizer workspace, MergeDataManager mergeManager, IncomeStorage incomeStorage, ChangesSaveErrorLog errorLog, int memberWaitTimeout, int localPriority, int confMembersCount, FileCleaner fileCleaner, int maxBufferSize, ReaderSpoolFileHolder holder) {
        this.memberWaitTimeout = memberWaitTimeout;
        this.localPriority = localPriority;
        this.mergeManager = mergeManager;
        this.workspace = workspace;
        this.incomeStorrage = incomeStorage;
        this.errorLog = errorLog;
        this.initializer = initializer;
        this.transmitter = transmitter;
        this.mergeBarier = new CountDownLatch(confMembersCount);
        this.confMembersCount = confMembersCount;
        this.counterMap = new LinkedHashMap<Integer, Counter>();
        this.fileCleaner = fileCleaner;
        this.maxBufferSize = maxBufferSize;
        this.holder = holder;
    }

    @Override
    public void addLocalListener(LocalEventListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeLocalListener(LocalEventListener listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void onChanges(ChangesPacket packet, Member member) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Receiving member " + member.getName() + " changes."));
            }
            if (this.isInitialized()) {
                LOG.info((Object)("START. Initiated by " + member.getName()));
                this.firstChangesWaiter = new FirstChangesWaiter();
                this.firstChangesWaiter.start();
                this.doStart();
                for (LocalEventListener syncl : this.listeners) {
                    syncl.onStart(this.initializer.getOtherMembers());
                }
            }
            if (this.isStarted()) {
                IncomeDataContext mcf = this.incomChanges.get(this.getKey(packet));
                if (mcf == null) {
                    RandomChangesFile cf = this.incomeStorrage.createChangesFile(packet.getCRC(), packet.getTimeStamp(), member);
                    mcf = new IncomeDataContext(cf, member, packet.getPacketsCount());
                    this.incomChanges.put(this.getKey(packet), mcf);
                }
                mcf.writeData(packet.getBuffer(), packet.getOffset());
                if (mcf.isFinished()) {
                    Counter counter;
                    this.incomChanges.remove(this.getKey(packet));
                    this.incomeStorrage.addMemberChanges(mcf.getMember(), mcf.getChangesFile());
                    if (this.counterMap.containsKey(packet.getTransmitterPriority())) {
                        counter = this.counterMap.get(packet.getTransmitterPriority());
                        counter.countUp();
                    } else {
                        counter = new Counter((int)packet.getFileCount(), 1);
                        this.counterMap.put(packet.getTransmitterPriority(), counter);
                    }
                    if (counter.isTotalTransfer() && this.isAllTransfered()) {
                        if (this.mergeWorker == null) {
                            this.mergeWorker = new MergeWorker();
                            this.mergeWorker.start();
                        } else {
                            LOG.error((Object)"Error, merge process laready activated.");
                        }
                    }
                }
            } else {
                LOG.error((Object)("Changes packet received but the Subscriber is not started. Packet from " + member.getName() + " skipped"));
            }
        }
        catch (Throwable e) {
            this.doCancel();
            LOG.error((Object)("Error of ChangesFile data packet processing. Packet from " + member.getName()), e);
        }
    }

    private boolean isAllTransfered() {
        if (this.counterMap.size() + 1 != this.confMembersCount) {
            return false;
        }
        for (Map.Entry<Integer, Counter> e : this.counterMap.entrySet()) {
            if (e.getValue().isTotalTransfer()) continue;
            return false;
        }
        return true;
    }

    private void doCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Do CANCEL (local)");
        }
        if (this.isStarted()) {
            this.doStop();
            this.cancelMerge();
            try {
                this.transmitter.sendCancel();
            }
            catch (IOException ioe) {
                LOG.error((Object)("Cannot send 'Cancel'" + ioe), (Throwable)ioe);
            }
            for (LocalEventListener syncl : this.listeners) {
                syncl.onCancel();
            }
        } else {
            LOG.warn((Object)"Cannot cancel. Already stopped.");
        }
    }

    @Override
    public void doStop() {
        super.doStop();
        if (this.firstChangesWaiter != null) {
            this.firstChangesWaiter.cancel();
        } else {
            LOG.warn((Object)"First changes member is not initialized");
        }
        new Thread("Merge canceler, " + new Date()){

            public void run() {
                if (ChangesSubscriberImpl.this.mergeWorker != null) {
                    try {
                        ChangesSubscriberImpl.this.mergeWorker.join();
                    }
                    catch (InterruptedException e) {
                        LOG.error((Object)("Error of merge process cancelation " + e), (Throwable)e);
                    }
                    ChangesSubscriberImpl.this.mergeManager.cleanup();
                }
            }
        }.start();
    }

    @Override
    public void onCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"On CANCEL");
        }
        if (this.isStarted()) {
            this.doStop();
            this.cancelMerge();
        } else {
            LOG.warn((Object)"Not started or already stopped");
        }
    }

    private void cancelMerge() {
        if (this.mergeWorker != null) {
            this.mergeManager.cancel();
        }
    }

    @Override
    public void onMerge(MemberAddress member) {
        if (this.isStarted()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("On Merge member " + member + ", done=" + this.mergeBarier.getCount() + " members=" + this.confMembersCount));
            }
            this.mergeBarier.countDown();
        } else {
            LOG.warn((Object)("Subscriber stopped. On Merge member " + member + " ignored."));
        }
    }

    private synchronized void save() {
        LOG.info((Object)"Save changes.");
        try {
            if (LOG.isDebugEnabled()) {
                try {
                    LOG.debug((Object)("save \r\n" + this.mergeWorker.result.dump()));
                }
                catch (Throwable e1) {
                    LOG.error((Object)("Changes dump error " + e1));
                }
            }
            this.workspace.save(this.mergeWorker.result);
        }
        catch (InvalidItemStateException e) {
            LOG.error((Object)("Save error " + (Object)((Object)e)), (Throwable)e);
            this.errorLog.reportError(e);
        }
        catch (UnsupportedOperationException e) {
            LOG.error((Object)("Save error " + e), (Throwable)e);
            this.errorLog.reportError(e);
        }
        catch (RepositoryException e) {
            LOG.error((Object)("Save error " + (Object)((Object)e)), (Throwable)e);
            this.errorLog.reportError(e);
        }
        catch (SynchronizationException e) {
            LOG.error((Object)("Save error " + e), (Throwable)e);
            this.errorLog.reportError(e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Fire Stop (local)");
        }
        for (LocalEventListener ll : this.listeners) {
            ll.onStop();
        }
    }

    @Override
    public void onDisconnectMembers(List<Member> member) {
    }

    @Override
    public void onStart(List<MemberAddress> members) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("On START (local) " + members.size() + " members"));
        }
        this.firstChangesWaiter = new FirstChangesWaiter();
        this.firstChangesWaiter.start();
        this.doStart();
    }

    @Override
    public void onStop() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"On STOP (local)");
        }
        if (this.isStarted()) {
            this.doStop();
        } else {
            LOG.warn((Object)"Not started or already stopped");
        }
    }

    private String getKey(ChangesPacket packet) throws UnsupportedEncodingException {
        return packet.getTransmitterPriority() + ":" + new String(packet.getCRC(), "UTF-8") + ":" + packet.getTimeStamp();
    }

    private class FirstChangesWaiter
    extends Thread {
        private volatile boolean run = true;

        private FirstChangesWaiter() {
        }

        public void run() {
            try {
                Thread.sleep(ChangesSubscriberImpl.this.memberWaitTimeout);
                if (this.run) {
                    if (ChangesSubscriberImpl.this.counterMap.size() + 1 != ChangesSubscriberImpl.this.confMembersCount) {
                        LOG.error((Object)("No changes from one of members, received " + (ChangesSubscriberImpl.this.counterMap.size() + 1) + ", expected " + ChangesSubscriberImpl.this.confMembersCount + "."));
                        ChangesSubscriberImpl.this.doCancel();
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("FirstChangesWaiter stopped. Changes from all members (" + (ChangesSubscriberImpl.this.counterMap.size() + 1) + ") received."));
                    }
                }
            }
            catch (InterruptedException e) {
                LOG.error((Object)("FirstChangesWaiter is interrupted : " + e), (Throwable)e);
            }
        }

        void cancel() {
            this.run = false;
        }
    }

    private class Counter {
        int total;
        int counter;

        public Counter(int total, int counter) {
            this.total = total;
            this.counter = counter;
        }

        public void countUp() {
            ++this.counter;
        }

        public boolean isTotalTransfer() {
            return this.total == this.counter;
        }
    }

    class MergeWorker
    extends Thread {
        private final Log workerLog = ExoLogger.getLogger((String)"ext.MergeWorker");
        ChangesStorage<ItemState> result = null;

        MergeWorker() {
        }

        public void run() {
            try {
                MemberAddress localAddress = ChangesSubscriberImpl.this.initializer.getLocalMember();
                this.runMerge(new Member(localAddress, ChangesSubscriberImpl.this.localPriority));
                if (ChangesSubscriberImpl.this.isStarted()) {
                    ChangesSubscriberImpl.this.mergeBarier.countDown();
                    try {
                        ChangesSubscriberImpl.this.transmitter.sendMerge();
                    }
                    catch (IOException ioe) {
                        this.workerLog.error((Object)("Cannot send 'Merge done'" + ioe), (Throwable)ioe);
                    }
                }
            }
            catch (RepositoryException e) {
                this.workerLog.error((Object)("Merge error " + (Object)((Object)e)), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (RemoteExportException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (IOException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (ClassCastException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (ClassNotFoundException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (MergeDataManagerException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (StorageRuntimeException e) {
                this.workerLog.error((Object)("Merge error " + e), (Throwable)e);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            catch (Throwable t) {
                this.workerLog.error((Object)("Merge error " + t), t);
                ChangesSubscriberImpl.this.doCancel();
                return;
            }
            if (ChangesSubscriberImpl.this.isStarted()) {
                try {
                    this.workerLog.info((Object)"Waiting for other members.");
                    ChangesSubscriberImpl.this.mergeBarier.await();
                    ChangesSubscriberImpl.this.save();
                    ChangesSubscriberImpl.this.doStop();
                }
                catch (InterruptedException e) {
                    this.workerLog.error((Object)("Save error " + e), (Throwable)e);
                    ChangesSubscriberImpl.this.doCancel();
                }
            }
        }

        private void runMerge(Member localMember) throws RepositoryException, RemoteExportException, IOException, ClassCastException, ClassNotFoundException, MergeDataManagerException, StorageRuntimeException {
            List<MemberChangesStorage<ItemState>> membersChanges;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Run merge on " + localMember));
            }
            if ((membersChanges = ChangesSubscriberImpl.this.incomeStorrage.getChanges()).get(membersChanges.size() - 1).getMember().getPriority() < localMember.getPriority()) {
                membersChanges.add(new IncomeChangesStorage<ItemState>(ChangesSubscriberImpl.this.workspace.getLocalChanges(), localMember, ChangesSubscriberImpl.this.fileCleaner, ChangesSubscriberImpl.this.maxBufferSize, ChangesSubscriberImpl.this.holder));
            } else {
                for (int i = 0; i < membersChanges.size(); ++i) {
                    if (membersChanges.get(i).getMember().getPriority() <= localMember.getPriority()) continue;
                    membersChanges.add(i, new IncomeChangesStorage<ItemState>(ChangesSubscriberImpl.this.workspace.getLocalChanges(), localMember, ChangesSubscriberImpl.this.fileCleaner, ChangesSubscriberImpl.this.maxBufferSize, ChangesSubscriberImpl.this.holder));
                    break;
                }
            }
            if (LOG.isDebugEnabled()) {
                for (MemberChangesStorage<ItemState> ms : membersChanges) {
                    LOG.debug((Object)(">>> Member " + ms.getMember().getName() + " changes"));
                    LOG.debug((Object)ms.dump());
                }
            }
            this.workerLog.info((Object)("Start merge of " + membersChanges.size() + " members"));
            ChangesSubscriberImpl.this.mergeManager.setLocalMember(localMember);
            this.result = ChangesSubscriberImpl.this.mergeManager.merge(membersChanges.iterator());
            this.workerLog.info((Object)"Local merge done");
        }
    }
}

