/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.utils;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
import org.apache.storm.utils.DisruptorBackpressureCallback;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorQueue
implements IStatefulObject {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";
    private static final FlusherPool FLUSHER = new FlusherPool();
    private final RingBuffer<AtomicReference<Object>> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;
    private final int _inputBatchSize;
    private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap();
    private final Flusher _flusher;
    private final QueueMetrics _metrics;
    private String _queueName = "";
    private DisruptorBackpressureCallback _cb = null;
    private int _highWaterMark = 0;
    private int _lowWaterMark = 0;
    private boolean _enableBackpressure = false;
    private final AtomicLong _overflowCount = new AtomicLong(0L);
    private volatile boolean _throttleOn = false;

    private static int getNumFlusherPoolThreads() {
        int numThreads = 100;
        try {
            Map<String, Object> conf = Utils.readStormConfig();
            numThreads = ObjectReader.getInt(conf.get("storm.worker.disruptor.flusher.max.pool.size"), numThreads);
        }
        catch (Exception e) {
            LOG.warn("Error while trying to read system config", (Throwable)e);
        }
        try {
            String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
            numThreads = Integer.parseInt(threads);
        }
        catch (Exception e) {
            LOG.warn("Error while parsing number of flusher pool threads", (Throwable)e);
        }
        LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", (Object)numThreads);
        return numThreads;
    }

    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
        this._queueName = PREFIX + queueName;
        Object wait = readTimeout <= 0L ? new LiteBlockingWaitStrategy() : new TimeoutBlockingWaitStrategy(readTimeout, TimeUnit.MILLISECONDS);
        this._buffer = RingBuffer.create((ProducerType)type, (EventFactory)new ObjectEventFactory(), (int)size, (WaitStrategy)wait);
        this._consumer = new Sequence();
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.addGatingSequences(new Sequence[]{this._consumer});
        this._metrics = new QueueMetrics();
        this._inputBatchSize = Math.max(1, Math.min(inputBatchSize, size / 2));
        this._flusher = new Flusher(Math.max(flushInterval, 1L), this._queueName);
        this._flusher.start();
    }

    public DisruptorQueue(String queueName, int size, long readTimeout, int inputBatchSize, long flushInterval) {
        this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, flushInterval);
    }

    public String getName() {
        return this._queueName;
    }

    public boolean isFull() {
        return this._metrics.population() + this._overflowCount.get() >= this._metrics.capacity();
    }

    public void haltWithInterrupt() {
        try {
            this.publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
            this._flusher.close();
            this._metrics.close();
        }
        catch (InsufficientCapacityException e) {
            throw new RuntimeException(e);
        }
    }

    public void consumeBatch(EventHandler<Object> handler) {
        if (this._metrics.population() > 0L) {
            this.consumeBatchWhenAvailable(handler);
        }
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            long nextSequence = this._consumer.get() + 1L;
            long availableSequence = this._barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                this.consumeBatchToCursor(availableSequence, handler);
            }
        }
        catch (TimeoutException nextSequence) {
        }
        catch (AlertException e) {
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        for (long curr = this._consumer.get() + 1L; curr <= cursor; ++curr) {
            try {
                AtomicReference mo = (AtomicReference)this._buffer.get(curr);
                Object o = mo.getAndSet(null);
                if (o == INTERRUPT) {
                    throw new InterruptedException("Disruptor processing interrupted");
                }
                if (o == null) {
                    LOG.error("NULL found in {}:{}", (Object)this.getName(), (Object)cursor);
                    continue;
                }
                handler.onEvent(o, curr, curr == cursor);
                if (!this._enableBackpressure || this._cb == null || this._metrics.writePos() - curr + this._overflowCount.get() > (long)this._lowWaterMark) continue;
                try {
                    if (!this._throttleOn) continue;
                    this._throttleOn = false;
                    this._cb.lowWaterMark();
                    continue;
                }
                catch (Exception e) {
                    throw new RuntimeException("Exception during calling lowWaterMark callback!");
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this._consumer.set(cursor);
    }

    public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
        this._cb = cb;
    }

    private static Long getId() {
        return Thread.currentThread().getId();
    }

    private void publishDirectSingle(Object obj, boolean block) throws InsufficientCapacityException {
        long at = block ? this._buffer.next() : this._buffer.tryNext();
        AtomicReference m = (AtomicReference)this._buffer.get(at);
        m.set(obj);
        this._buffer.publish(at);
        this._metrics.notifyArrivals(1L);
    }

    private void publishDirect(ArrayList<Object> objs, boolean block) throws InsufficientCapacityException {
        int size = objs.size();
        if (size > 0) {
            long begin;
            long end = block ? this._buffer.next(size) : this._buffer.tryNext(size);
            long at = begin = end - (long)(size - 1);
            for (Object obj : objs) {
                AtomicReference m = (AtomicReference)this._buffer.get(at);
                m.set(obj);
                ++at;
            }
            this._buffer.publish(begin, end);
            this._metrics.notifyArrivals(size);
        }
    }

    public void publish(Object obj) {
        Long id = DisruptorQueue.getId();
        ThreadLocalInserter batcher = this._batchers.get(id);
        if (batcher == null) {
            batcher = this._inputBatchSize > 1 ? new ThreadLocalBatcher() : new ThreadLocalJustInserter();
            this._batchers.put(id, batcher);
        }
        batcher.add(obj);
        batcher.flush(false);
    }

    @Override
    public Object getState() {
        return this._metrics.getState();
    }

    public DisruptorQueue setHighWaterMark(double highWaterMark) {
        this._highWaterMark = (int)((double)this._metrics.capacity() * highWaterMark);
        return this;
    }

    public DisruptorQueue setLowWaterMark(double lowWaterMark) {
        this._lowWaterMark = (int)((double)this._metrics.capacity() * lowWaterMark);
        return this;
    }

    public int getHighWaterMark() {
        return this._highWaterMark;
    }

    public int getLowWaterMark() {
        return this._lowWaterMark;
    }

    public DisruptorQueue setEnableBackpressure(boolean enableBackpressure) {
        this._enableBackpressure = enableBackpressure;
        return this;
    }

    public QueueMetrics getMetrics() {
        return this._metrics;
    }

    public boolean getThrottleOn() {
        return this._throttleOn;
    }

    public class QueueMetrics {
        private final RateTracker _rateTracker = new RateTracker(10000, 10);

        public long writePos() {
            return DisruptorQueue.this._buffer.getCursor();
        }

        public long readPos() {
            return DisruptorQueue.this._consumer.get();
        }

        public long overflow() {
            return DisruptorQueue.this._overflowCount.get();
        }

        public long population() {
            return this.writePos() - this.readPos();
        }

        public long capacity() {
            return DisruptorQueue.this._buffer.getBufferSize();
        }

        public float pctFull() {
            return 1.0f * (float)this.population() / (float)this.capacity();
        }

        public Object getState() {
            HashMap<String, Number> state = new HashMap<String, Number>();
            long rp = this.readPos();
            long wp = this.writePos();
            double arrivalRateInSecs = this._rateTracker.reportRate();
            double sojournTime = (double)(wp - rp) / Math.max(arrivalRateInSecs, 1.0E-5) * 1000.0;
            state.put("capacity", this.capacity());
            state.put("population", wp - rp);
            state.put("write_pos", wp);
            state.put("read_pos", rp);
            state.put("arrival_rate_secs", arrivalRateInSecs);
            state.put("sojourn_time_ms", sojournTime);
            state.put("overflow", DisruptorQueue.this._overflowCount.get());
            return state;
        }

        public void notifyArrivals(long counts) {
            this._rateTracker.notify(counts);
        }

        public void close() {
            this._rateTracker.close();
        }
    }

    private class Flusher
    implements Runnable {
        private AtomicBoolean _isFlushing = new AtomicBoolean(false);
        private final long _flushInterval;

        public Flusher(long flushInterval, String name) {
            this._flushInterval = flushInterval;
        }

        @Override
        public void run() {
            if (this._isFlushing.compareAndSet(false, true)) {
                for (ThreadLocalInserter batcher : DisruptorQueue.this._batchers.values()) {
                    batcher.forceBatch();
                    batcher.flush(true);
                }
                this._isFlushing.set(false);
            }
        }

        public void start() {
            FLUSHER.start(this, this._flushInterval);
        }

        public void close() {
            FLUSHER.stop(this, this._flushInterval);
        }
    }

    private class ThreadLocalBatcher
    implements ThreadLocalInserter {
        private final ReentrantLock _flushLock = new ReentrantLock();
        private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow = new ConcurrentLinkedQueue();
        private ArrayList<Object> _currentBatch;

        public ThreadLocalBatcher() {
            this._currentBatch = new ArrayList(DisruptorQueue.this._inputBatchSize);
        }

        @Override
        public synchronized void add(Object obj) {
            this._currentBatch.add(obj);
            DisruptorQueue.this._overflowCount.incrementAndGet();
            if (DisruptorQueue.this._enableBackpressure && DisruptorQueue.this._cb != null && DisruptorQueue.this._metrics.population() + DisruptorQueue.this._overflowCount.get() >= (long)DisruptorQueue.this._highWaterMark) {
                try {
                    if (!DisruptorQueue.this._throttleOn) {
                        DisruptorQueue.this._throttleOn = true;
                        DisruptorQueue.this._cb.highWaterMark();
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException("Exception during calling highWaterMark callback!", e);
                }
            }
            if (this._currentBatch.size() >= DisruptorQueue.this._inputBatchSize) {
                boolean flushed = false;
                if (this._overflow.isEmpty()) {
                    try {
                        DisruptorQueue.this.publishDirect(this._currentBatch, false);
                        DisruptorQueue.this._overflowCount.addAndGet(0 - this._currentBatch.size());
                        this._currentBatch.clear();
                        flushed = true;
                    }
                    catch (InsufficientCapacityException insufficientCapacityException) {
                        // empty catch block
                    }
                }
                if (!flushed) {
                    this._overflow.add(this._currentBatch);
                    this._currentBatch = new ArrayList(DisruptorQueue.this._inputBatchSize);
                }
            }
        }

        @Override
        public synchronized void forceBatch() {
            if (!this._currentBatch.isEmpty()) {
                this._overflow.add(this._currentBatch);
                this._currentBatch = new ArrayList(DisruptorQueue.this._inputBatchSize);
            }
        }

        @Override
        public void flush(boolean block) {
            if (block) {
                this._flushLock.lock();
            } else if (!this._flushLock.tryLock()) {
                return;
            }
            try {
                while (!this._overflow.isEmpty()) {
                    DisruptorQueue.this.publishDirect(this._overflow.peek(), block);
                    DisruptorQueue.this._overflowCount.addAndGet(0 - this._overflow.poll().size());
                }
            }
            catch (InsufficientCapacityException insufficientCapacityException) {
            }
            finally {
                this._flushLock.unlock();
            }
        }
    }

    private class ThreadLocalJustInserter
    implements ThreadLocalInserter {
        private final ReentrantLock _flushLock = new ReentrantLock();
        private final ConcurrentLinkedQueue<Object> _overflow = new ConcurrentLinkedQueue();

        @Override
        public synchronized void add(Object obj) {
            boolean inserted = false;
            if (this._overflow.isEmpty()) {
                try {
                    DisruptorQueue.this.publishDirectSingle(obj, false);
                    inserted = true;
                }
                catch (InsufficientCapacityException insufficientCapacityException) {
                    // empty catch block
                }
            }
            if (!inserted) {
                DisruptorQueue.this._overflowCount.incrementAndGet();
                this._overflow.add(obj);
            }
            if (DisruptorQueue.this._enableBackpressure && DisruptorQueue.this._cb != null && DisruptorQueue.this._metrics.population() + DisruptorQueue.this._overflowCount.get() >= (long)DisruptorQueue.this._highWaterMark) {
                try {
                    if (!DisruptorQueue.this._throttleOn) {
                        DisruptorQueue.this._throttleOn = true;
                        DisruptorQueue.this._cb.highWaterMark();
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException("Exception during calling highWaterMark callback!", e);
                }
            }
        }

        @Override
        public void forceBatch() {
        }

        @Override
        public void flush(boolean block) {
            if (block) {
                this._flushLock.lock();
            } else if (!this._flushLock.tryLock()) {
                return;
            }
            try {
                while (!this._overflow.isEmpty()) {
                    DisruptorQueue.this.publishDirectSingle(this._overflow.peek(), block);
                    DisruptorQueue.this._overflowCount.addAndGet(-1L);
                    this._overflow.poll();
                }
            }
            catch (InsufficientCapacityException insufficientCapacityException) {
            }
            finally {
                this._flushLock.unlock();
            }
        }
    }

    private static interface ThreadLocalInserter {
        public void add(Object var1);

        public void forceBatch();

        public void flush(boolean var1);
    }

    private static class ObjectEventFactory
    implements EventFactory<AtomicReference<Object>> {
        private ObjectEventFactory() {
        }

        public AtomicReference<Object> newInstance() {
            return new AtomicReference<Object>();
        }
    }

    private static class FlusherPool {
        private static final String THREAD_PREFIX = "disruptor-flush";
        private Timer _timer = new Timer("disruptor-flush-trigger", true);
        private ThreadPoolExecutor _exec;
        private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap();
        private HashMap<Long, TimerTask> _tt = new HashMap();

        public FlusherPool() {
            this._exec = new ThreadPoolExecutor(1, DisruptorQueue.getNumFlusherPoolThreads(), 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.DiscardPolicy());
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("disruptor-flush-task-pool").build();
            this._exec.setThreadFactory(threadFactory);
        }

        public synchronized void start(Flusher flusher, final long flushInterval) {
            ArrayList<Flusher> pending = this._pendingFlush.get(flushInterval);
            if (pending == null) {
                pending = new ArrayList();
                TimerTask t = new TimerTask(){

                    @Override
                    public void run() {
                        this.invokeAll(flushInterval);
                    }
                };
                this._pendingFlush.put(flushInterval, pending);
                this._timer.schedule(t, flushInterval, flushInterval);
                this._tt.put(flushInterval, t);
            }
            pending.add(flusher);
        }

        private synchronized void invokeAll(long flushInterval) {
            ArrayList<Flusher> tasks = this._pendingFlush.get(flushInterval);
            if (tasks != null) {
                for (Flusher f : tasks) {
                    this._exec.submit(f);
                }
            }
        }

        public synchronized void stop(Flusher flusher, long flushInterval) {
            ArrayList<Flusher> pending = this._pendingFlush.get(flushInterval);
            if (pending != null) {
                pending.remove(flusher);
                if (pending.size() == 0) {
                    this._pendingFlush.remove(flushInterval);
                    this._tt.remove(flushInterval).cancel();
                }
            }
        }
    }
}

