/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.metric;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.metric.util.DataPointExpander;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsConsumerBolt
implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;
    private final int _maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
    private final DataPointExpander _expander;
    private final BlockingQueue<MetricsTask> _taskQueue;
    private Thread _taskExecuteThread;
    private volatile boolean _running = true;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples, Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {
        this._consumerClassName = consumerClassName;
        this._registrationArgument = registrationArgument;
        this._maxRetainMetricTuples = maxRetainMetricTuples;
        this._filterPredicate = filterPredicate;
        this._expander = expander;
        this._taskQueue = this._maxRetainMetricTuples > 0 ? new LinkedBlockingDeque<MetricsTask>(this._maxRetainMetricTuples) : new LinkedBlockingDeque<MetricsTask>();
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        try {
            this._metricsConsumer = (IMetricsConsumer)Class.forName(this._consumerClassName).newInstance();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section topology.metrics.consumer.register with fully qualified name " + this._consumerClassName, e);
        }
        this._metricsConsumer.prepare(topoConf, this._registrationArgument, context, collector);
        this._collector = collector;
        this._taskExecuteThread = new Thread(new MetricsHandlerRunnable());
        this._taskExecuteThread.setDaemon(true);
        this._taskExecuteThread.start();
    }

    @Override
    public void execute(Tuple input) {
        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo)input.getValue(0);
        Collection dataPoints = (Collection)input.getValue(1);
        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = this._expander.expandDataPoints(dataPoints);
        List<IMetricsConsumer.DataPoint> filteredDataPoints = this.getFilteredDataPoints(expandedDataPoints);
        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);
        while (!this._taskQueue.offer(metricsTask)) {
            this._taskQueue.poll();
        }
        this._collector.ack(input);
    }

    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
        return Lists.newArrayList((Iterable)Iterables.filter(dataPoints, this._filterPredicate));
    }

    @Override
    public void cleanup() {
        this._running = false;
        this._metricsConsumer.cleanup();
        this._taskExecuteThread.interrupt();
    }

    class MetricsHandlerRunnable
    implements Runnable {
        MetricsHandlerRunnable() {
        }

        @Override
        public void run() {
            while (MetricsConsumerBolt.this._running) {
                try {
                    MetricsTask task = (MetricsTask)MetricsConsumerBolt.this._taskQueue.take();
                    MetricsConsumerBolt.this._metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
                }
                catch (InterruptedException e) {
                    break;
                }
                catch (Throwable t) {
                    LOG.error("Exception occurred during handle metrics", t);
                }
            }
        }
    }

    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
            this.taskInfo = taskInfo;
            this.dataPoints = dataPoints;
        }

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return this.taskInfo;
        }

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return this.dataPoints;
        }
    }
}

