/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.CountEvictionPolicy;
import org.apache.storm.windowing.CountTriggerPolicy;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TimeEvictionPolicy;
import org.apache.storm.windowing.TimeTriggerPolicy;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TriggerPolicy;
import org.apache.storm.windowing.TupleWindowImpl;
import org.apache.storm.windowing.WaterMarkEventGenerator;
import org.apache.storm.windowing.WatermarkCountEvictionPolicy;
import org.apache.storm.windowing.WatermarkCountTriggerPolicy;
import org.apache.storm.windowing.WatermarkTimeEvictionPolicy;
import org.apache.storm.windowing.WatermarkTimeTriggerPolicy;
import org.apache.storm.windowing.WindowLifecycleListener;
import org.apache.storm.windowing.WindowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowedBoltExecutor
implements IRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_LAG_MS = 0;
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private final IWindowedBolt bolt;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private TimestampExtractor timestampExtractor;
    private transient String lateTupleStream;
    private transient TriggerPolicy<Tuple> triggerPolicy;
    private transient EvictionPolicy<Tuple> evictionPolicy;
    private transient BaseWindowedBolt.Duration windowLengthDuration;
    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;

    public WindowedBoltExecutor(IWindowedBolt bolt) {
        this.bolt = bolt;
        this.timestampExtractor = bolt.getTimestampExtractor();
    }

    private int getTopologyTimeoutMillis(Map<String, Object> topoConf) {
        boolean timeOutsEnabled;
        if (topoConf.get("topology.enable.message.timeouts") != null && !(timeOutsEnabled = ((Boolean)topoConf.get("topology.enable.message.timeouts")).booleanValue())) {
            return Integer.MAX_VALUE;
        }
        int timeout = 0;
        if (topoConf.get("topology.message.timeout.secs") != null) {
            timeout = ((Number)topoConf.get("topology.message.timeout.secs")).intValue();
        }
        return timeout * 1000;
    }

    private int getMaxSpoutPending(Map<String, Object> topoConf) {
        int maxPending = Integer.MAX_VALUE;
        if (topoConf.get("topology.max.spout.pending") != null) {
            maxPending = ((Number)topoConf.get("topology.max.spout.pending")).intValue();
        }
        return maxPending;
    }

    private void ensureDurationLessThanTimeout(int duration, int timeout) {
        if (duration > timeout) {
            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration + " is more than " + "topology.message.timeout.secs" + " value " + timeout);
        }
    }

    private void ensureCountLessThanMaxPending(int count, int maxPending) {
        if (count > maxPending) {
            throw new IllegalArgumentException("Window count (length + sliding interval) value " + count + " is more than " + "topology.max.spout.pending" + " value " + maxPending);
        }
    }

    private void validate(Map<String, Object> topoConf, BaseWindowedBolt.Count windowLengthCount, BaseWindowedBolt.Duration windowLengthDuration, BaseWindowedBolt.Count slidingIntervalCount, BaseWindowedBolt.Duration slidingIntervalDuration) {
        int topologyTimeout = this.getTopologyTimeoutMillis(topoConf);
        int maxSpoutPending = this.getMaxSpoutPending(topoConf);
        if (windowLengthCount == null && windowLengthDuration == null) {
            throw new IllegalArgumentException("Window length is not specified");
        }
        if (windowLengthDuration != null && slidingIntervalDuration != null) {
            this.ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
        } else if (windowLengthDuration != null) {
            this.ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
        } else if (slidingIntervalDuration != null) {
            this.ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout);
        }
        if (windowLengthCount != null && slidingIntervalCount != null) {
            this.ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
        } else if (windowLengthCount != null) {
            this.ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
        } else if (slidingIntervalCount != null) {
            this.ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
        }
    }

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context) {
        WindowManager<Tuple> manager = new WindowManager<Tuple>(lifecycleListener);
        BaseWindowedBolt.Count windowLengthCount = null;
        BaseWindowedBolt.Duration slidingIntervalDuration = null;
        BaseWindowedBolt.Count slidingIntervalCount = null;
        if (topoConf.containsKey("topology.bolts.window.length.count")) {
            windowLengthCount = new BaseWindowedBolt.Count(((Number)topoConf.get("topology.bolts.window.length.count")).intValue());
        } else if (topoConf.containsKey("topology.bolts.window.length.duration.ms")) {
            this.windowLengthDuration = new BaseWindowedBolt.Duration(((Number)topoConf.get("topology.bolts.window.length.duration.ms")).intValue(), TimeUnit.MILLISECONDS);
        }
        if (topoConf.containsKey("topology.bolts.window.sliding.interval.count")) {
            slidingIntervalCount = new BaseWindowedBolt.Count(((Number)topoConf.get("topology.bolts.window.sliding.interval.count")).intValue());
        } else if (topoConf.containsKey("topology.bolts.window.sliding.interval.duration.ms")) {
            slidingIntervalDuration = new BaseWindowedBolt.Duration(((Number)topoConf.get("topology.bolts.window.sliding.interval.duration.ms")).intValue(), TimeUnit.MILLISECONDS);
        } else {
            slidingIntervalCount = new BaseWindowedBolt.Count(1);
        }
        if (this.timestampExtractor != null) {
            this.lateTupleStream = (String)topoConf.get("topology.bolts.late.tuple.stream");
            if (this.lateTupleStream != null && !context.getThisStreams().contains(this.lateTupleStream)) {
                throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
            }
            this.maxLagMs = topoConf.containsKey("topology.bolts.tuple.timestamp.max.lag.ms") ? ((Number)topoConf.get("topology.bolts.tuple.timestamp.max.lag.ms")).intValue() : 0;
            int watermarkInterval = topoConf.containsKey("topology.bolts.watermark.event.interval.ms") ? ((Number)topoConf.get("topology.bolts.watermark.event.interval.ms")).intValue() : 1000;
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<Tuple>(manager, watermarkInterval, this.maxLagMs, this.getComponentStreams(context));
        } else if (topoConf.containsKey("topology.bolts.late.tuple.stream")) {
            throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
        }
        this.validate(topoConf, windowLengthCount, this.windowLengthDuration, slidingIntervalCount, slidingIntervalDuration);
        this.evictionPolicy = this.getEvictionPolicy(windowLengthCount, this.windowLengthDuration, manager);
        this.triggerPolicy = this.getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, this.evictionPolicy);
        manager.setEvictionPolicy(this.evictionPolicy);
        manager.setTriggerPolicy(this.triggerPolicy);
        return manager;
    }

    private Set<GlobalStreamId> getComponentStreams(TopologyContext context) {
        HashSet<GlobalStreamId> streams = new HashSet<GlobalStreamId>();
        for (GlobalStreamId streamId : context.getThisSources().keySet()) {
            if (streamId.get_streamId().equals("$checkpoint")) continue;
            streams.add(streamId);
        }
        return streams;
    }

    protected void start() {
        if (this.waterMarkEventGenerator != null) {
            LOG.debug("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        LOG.debug("Starting trigger policy");
        this.triggerPolicy.start();
    }

    private boolean isTupleTs() {
        return this.timestampExtractor != null;
    }

    private TriggerPolicy<Tuple> getTriggerPolicy(BaseWindowedBolt.Count slidingIntervalCount, BaseWindowedBolt.Duration slidingIntervalDuration, WindowManager<Tuple> manager, EvictionPolicy<Tuple> evictionPolicy) {
        if (slidingIntervalCount != null) {
            if (this.isTupleTs()) {
                return new WatermarkCountTriggerPolicy<Tuple>(slidingIntervalCount.value, manager, evictionPolicy, manager);
            }
            return new CountTriggerPolicy<Tuple>(slidingIntervalCount.value, manager, evictionPolicy);
        }
        if (this.isTupleTs()) {
            return new WatermarkTimeTriggerPolicy<Tuple>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
        }
        return new TimeTriggerPolicy<Tuple>(slidingIntervalDuration.value, manager, evictionPolicy);
    }

    private EvictionPolicy<Tuple> getEvictionPolicy(BaseWindowedBolt.Count windowLengthCount, BaseWindowedBolt.Duration windowLengthDuration, WindowManager<Tuple> manager) {
        if (windowLengthCount != null) {
            if (this.isTupleTs()) {
                return new WatermarkCountEvictionPolicy<Tuple>(windowLengthCount.value);
            }
            return new CountEvictionPolicy<Tuple>(windowLengthCount.value);
        }
        if (this.isTupleTs()) {
            return new WatermarkTimeEvictionPolicy<Tuple>(windowLengthDuration.value, this.maxLagMs);
        }
        return new TimeEvictionPolicy<Tuple>(windowLengthDuration.value);
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.windowedOutputCollector = new WindowedOutputCollector(collector);
        this.bolt.prepare(topoConf, context, this.windowedOutputCollector);
        this.listener = this.newWindowLifecycleListener();
        this.windowManager = this.initWindowManager(this.listener, topoConf, context);
        this.start();
        LOG.debug("Initialized window manager {} ", this.windowManager);
    }

    @Override
    public void execute(Tuple input) {
        if (this.isTupleTs()) {
            long ts = this.timestampExtractor.extractTimestamp(input);
            if (this.waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                this.windowManager.add(input, ts);
            } else {
                if (this.lateTupleStream != null) {
                    this.windowedOutputCollector.emit(this.lateTupleStream, input, (List<Object>)new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", (Object)input, (Object)ts);
                }
                this.windowedOutputCollector.ack(input);
            }
        } else {
            this.windowManager.add(input);
        }
    }

    @Override
    public void cleanup() {
        this.windowManager.shutdown();
        this.bolt.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String lateTupleStream = (String)this.getComponentConfiguration().get("topology.bolts.late.tuple.stream");
        if (lateTupleStream != null) {
            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
        }
        this.bolt.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this.bolt.getComponentConfiguration();
    }

    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>(){

            @Override
            public void onExpiry(List<Tuple> tuples) {
                for (Tuple tuple : tuples) {
                    WindowedBoltExecutor.this.windowedOutputCollector.ack(tuple);
                }
            }

            @Override
            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
                WindowedBoltExecutor.this.windowedOutputCollector.setContext(tuples);
                WindowedBoltExecutor.this.bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, this.getWindowStartTs(timestamp), timestamp));
            }

            private Long getWindowStartTs(Long endTs) {
                Long res = null;
                if (endTs != null && WindowedBoltExecutor.this.windowLengthDuration != null) {
                    res = endTs - (long)((WindowedBoltExecutor)WindowedBoltExecutor.this).windowLengthDuration.value;
                }
                return res;
            }
        };
    }

    private static class WindowedOutputCollector
    extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector delegate) {
            super(delegate);
        }

        void setContext(List<Tuple> inputTuples) {
            this.inputTuples = inputTuples;
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple) {
            return this.emit(streamId, this.inputTuples, tuple);
        }

        @Override
        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
            this.emitDirect(taskId, streamId, this.inputTuples, tuple);
        }
    }
}

