/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.event;

import java.io.InterruptedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.event.EventManager;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventManagerImp
implements EventManager {
    private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
    private AtomicInteger added;
    private AtomicInteger processed;
    private AtomicBoolean running;
    private Thread runner;
    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue();

    public EventManagerImp(boolean isDaemon) {
        this.added = new AtomicInteger();
        this.processed = new AtomicInteger();
        this.running = new AtomicBoolean(true);
        this.runner = new Thread(){

            @Override
            public void run() {
                while (EventManagerImp.this.running.get()) {
                    try {
                        Runnable r = (Runnable)EventManagerImp.this.queue.take();
                        if (r == null) {
                            return;
                        }
                        r.run();
                        EventManagerImp.this.proccessInc();
                    }
                    catch (Throwable t) {
                        if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, (Throwable)t)) {
                            LOG.info("Event manager interrupted while doing IO");
                            continue;
                        }
                        if (Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, (Throwable)t)) {
                            LOG.info("Event manager interrupted while doing NIO");
                            continue;
                        }
                        if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, (Throwable)t)) {
                            LOG.info("Event manager interrupted");
                            continue;
                        }
                        LOG.error("{} Error when processing event", t);
                        Utils.exitProcess((int)20, (String)"Error when processing an event");
                    }
                }
            }
        };
        this.runner.setDaemon(isDaemon);
        this.runner.start();
    }

    public void proccessInc() {
        this.processed.incrementAndGet();
    }

    @Override
    public void add(Runnable eventFn) {
        if (!this.running.get()) {
            throw new RuntimeException("Cannot add events to a shutdown event manager");
        }
        this.added.incrementAndGet();
        this.queue.add(eventFn);
    }

    @Override
    public boolean waiting() {
        return Time.isThreadWaiting((Thread)this.runner) || this.processed.get() == this.added.get();
    }

    @Override
    public void close() throws Exception {
        this.running.set(false);
        this.runner.interrupt();
        this.runner.join();
    }
}

