/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.jcr.ext.replication.async;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.exoplatform.services.jcr.ext.replication.async.AsyncInitializer;
import org.exoplatform.services.jcr.ext.replication.async.AsyncTransmitter;
import org.exoplatform.services.jcr.ext.replication.async.ChangesPublisher;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventListener;
import org.exoplatform.services.jcr.ext.replication.async.LocalEventProducer;
import org.exoplatform.services.jcr.ext.replication.async.RemoteEventListener;
import org.exoplatform.services.jcr.ext.replication.async.SynchronizationLifeCycle;
import org.exoplatform.services.jcr.ext.replication.async.storage.ChangesFile;
import org.exoplatform.services.jcr.ext.replication.async.storage.LocalStorage;
import org.exoplatform.services.jcr.ext.replication.async.storage.Member;
import org.exoplatform.services.jcr.ext.replication.async.transport.MemberAddress;
import org.exoplatform.services.log.ExoLogger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChangesPublisherImpl
extends SynchronizationLifeCycle
implements ChangesPublisher,
RemoteEventListener,
LocalEventListener,
LocalEventProducer {
    private static final Log LOG = ExoLogger.getLogger((String)"ext.ChangesPublisherImpl");
    protected final AsyncInitializer initializer;
    protected final AsyncTransmitter transmitter;
    protected final LocalStorage storage;
    protected final Set<LocalEventListener> listeners = new LinkedHashSet<LocalEventListener>();
    protected PublisherWorker publisherWorker;

    public ChangesPublisherImpl(AsyncInitializer initializer, AsyncTransmitter transmitter, LocalStorage storage) {
        this.initializer = initializer;
        this.transmitter = transmitter;
        this.storage = storage;
    }

    @Override
    public void onCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"On CANCEL (local)");
        }
        if (this.isStarted()) {
            this.cancelWorker();
            this.doStop();
        } else {
            LOG.warn((Object)"Not started or already stopped");
        }
    }

    @Override
    public void onDisconnectMembers(List<Member> member) {
    }

    @Override
    public void onMerge(MemberAddress member) {
    }

    @Override
    public void onStart(List<MemberAddress> members) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("On START (local) " + members.size() + " members"));
        }
        this.doStart();
        this.publisherWorker = new PublisherWorker(members);
        this.publisherWorker.start();
    }

    @Override
    public void onStop() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"On STOP (local)");
        }
        if (this.isStarted()) {
            this.cancelWorker();
            this.doStop();
            this.publisherWorker = null;
        } else {
            LOG.warn((Object)"Not started or already stopped");
        }
    }

    private void cancelWorker() {
        if (this.publisherWorker != null) {
            this.publisherWorker.cancel();
        }
    }

    protected void doCancel() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Do CANCEL (local)");
        }
        if (this.isStarted()) {
            try {
                this.transmitter.sendCancel();
            }
            catch (IOException ioe) {
                LOG.error((Object)("Cannot send 'Cancel' " + ioe), (Throwable)ioe);
            }
            for (LocalEventListener syncl : this.listeners) {
                syncl.onCancel();
            }
        } else {
            LOG.warn((Object)"Cannot cancel. Already stopped.");
        }
    }

    @Override
    public void addLocalListener(LocalEventListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeLocalListener(LocalEventListener listener) {
        this.listeners.remove(listener);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class PublisherWorker
    extends Thread {
        private final List<MemberAddress> subscribers;
        private volatile boolean run = true;

        public PublisherWorker(List<MemberAddress> subscribers) {
            this.subscribers = new ArrayList<MemberAddress>(subscribers);
        }

        @Override
        public void run() {
            try {
                ChangesFile[] localChanges = ChangesPublisherImpl.this.storage.getLocalChanges(true).getChangesFile();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Local changes : " + localChanges.length));
                }
                if (ChangesPublisherImpl.this.isStarted() && this.run) {
                    for (ChangesFile cf : localChanges) {
                        if (!this.run) {
                            return;
                        }
                        ChangesPublisherImpl.this.transmitter.sendChanges(cf, this.subscribers, localChanges.length);
                    }
                }
            }
            catch (IOException e) {
                LOG.error((Object)("Cannot send changes " + e), (Throwable)e);
                ChangesPublisherImpl.this.doCancel();
            }
        }

        void cancel() {
            this.run = false;
        }
    }
}

