/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.WindowedBoltExecutor;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.apache.storm.windowing.TupleWindow;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class WindowedBoltExecutorTest {
    private WindowedBoltExecutor executor;
    private TestWindowedBolt testWindowedBolt;

    private GeneralTopologyContext getContext(final Fields fields) {
        TopologyBuilder builder = new TopologyBuilder();
        return new GeneralTopologyContext(builder.createTopology(), (Map)new Config(), new HashMap(), new HashMap(), new HashMap(), ""){

            public Fields getComponentOutputFields(String componentId, String streamId) {
                return fields;
            }
        };
    }

    private Tuple getTuple(String streamId, Fields fields, Values values) {
        return new TupleImpl(this.getContext(fields), (List)values, 1, streamId){

            public GlobalStreamId getSourceGlobalStreamId() {
                return new GlobalStreamId("s1", "default");
            }
        };
    }

    private OutputCollector getOutputCollector() {
        return (OutputCollector)Mockito.mock(OutputCollector.class);
    }

    private TopologyContext getTopologyContext() {
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Map<GlobalStreamId, Object> sources = Collections.singletonMap(new GlobalStreamId("s1", "default"), null);
        Mockito.when((Object)context.getThisSources()).thenReturn(sources);
        return context;
    }

    @Before
    public void setUp() {
        this.testWindowedBolt = new TestWindowedBolt();
        this.testWindowedBolt.withTimestampField("ts");
        this.executor = new WindowedBoltExecutor((IWindowedBolt)this.testWindowedBolt);
        HashMap<String, Integer> conf = new HashMap<String, Integer>();
        conf.put("topology.message.timeout.secs", 100000);
        conf.put("topology.bolts.window.length.duration.ms", 20);
        conf.put("topology.bolts.window.sliding.interval.duration.ms", 10);
        conf.put("topology.bolts.tuple.timestamp.max.lag.ms", 5);
        conf.put("topology.bolts.watermark.event.interval.ms", 100000);
        this.executor.prepare(conf, this.getTopologyContext(), this.getOutputCollector());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testExecuteWithoutTs() throws Exception {
        this.executor.execute(this.getTuple("s1", new Fields(new String[]{"a"}), new Values(new Object[]{1})));
    }

    @Test
    public void testExecuteWithTs() throws Exception {
        long[] timstamps;
        for (long ts : timstamps = new long[]{603L, 605L, 607L, 618L, 626L, 636L}) {
            this.executor.execute(this.getTuple("s1", new Fields(new String[]{"ts"}), new Values(new Object[]{ts})));
        }
        this.executor.waterMarkEventGenerator.run();
        Assert.assertEquals((long)3L, (long)this.testWindowedBolt.tupleWindows.size());
        TupleWindow first = this.testWindowedBolt.tupleWindows.get(0);
        Assert.assertArrayEquals((long[])new long[]{603L, 605L, 607L}, (long[])new long[]{(Long)((Tuple)first.get().get(0)).getValue(0), (Long)((Tuple)first.get().get(1)).getValue(0), (Long)((Tuple)first.get().get(2)).getValue(0)});
        TupleWindow second = this.testWindowedBolt.tupleWindows.get(1);
        Assert.assertArrayEquals((long[])new long[]{603L, 605L, 607L, 618L}, (long[])new long[]{(Long)((Tuple)second.get().get(0)).getValue(0), (Long)((Tuple)second.get().get(1)).getValue(0), (Long)((Tuple)second.get().get(2)).getValue(0), (Long)((Tuple)second.get().get(3)).getValue(0)});
        TupleWindow third = this.testWindowedBolt.tupleWindows.get(2);
        Assert.assertArrayEquals((long[])new long[]{618L, 626L}, (long[])new long[]{(Long)((Tuple)third.get().get(0)).getValue(0), (Long)((Tuple)third.get().get(1)).getValue(0)});
    }

    @Test
    public void testPrepareLateTupleStreamWithoutTs() throws Exception {
        HashMap<String, Object> conf = new HashMap<String, Object>();
        conf.put("topology.message.timeout.secs", 100000);
        conf.put("topology.bolts.window.length.duration.ms", 20);
        conf.put("topology.bolts.window.sliding.interval.duration.ms", 10);
        conf.put("topology.bolts.late.tuple.stream", "$late");
        conf.put("topology.bolts.tuple.timestamp.max.lag.ms", 5);
        conf.put("topology.bolts.watermark.event.interval.ms", 10);
        this.testWindowedBolt = new TestWindowedBolt();
        this.executor = new WindowedBoltExecutor((IWindowedBolt)this.testWindowedBolt);
        TopologyContext context = this.getTopologyContext();
        Mockito.when((Object)context.getThisStreams()).thenReturn(new HashSet<String>(Arrays.asList("default", "$late")));
        try {
            this.executor.prepare(conf, context, this.getOutputCollector());
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertThat((Object)e.getMessage(), (Matcher)Is.is((Object)"Late tuple stream can be defined only when specifying a timestamp field"));
        }
    }

    @Test
    public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
        HashMap<String, Object> conf = new HashMap<String, Object>();
        conf.put("topology.message.timeout.secs", 100000);
        conf.put("topology.bolts.window.length.duration.ms", 20);
        conf.put("topology.bolts.window.sliding.interval.duration.ms", 10);
        conf.put("topology.bolts.late.tuple.stream", "$late");
        conf.put("topology.bolts.tuple.timestamp.max.lag.ms", 5);
        conf.put("topology.bolts.watermark.event.interval.ms", 10);
        this.testWindowedBolt = new TestWindowedBolt();
        this.testWindowedBolt.withTimestampField("ts");
        this.executor = new WindowedBoltExecutor((IWindowedBolt)this.testWindowedBolt);
        TopologyContext context = this.getTopologyContext();
        try {
            this.executor.prepare(conf, context, this.getOutputCollector());
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertThat((Object)e.getMessage(), (Matcher)Is.is((Object)"Stream for late tuples must be defined with the builder method withLateTupleStream"));
        }
    }

    @Test
    public void testExecuteWithLateTupleStream() throws Exception {
        this.testWindowedBolt = new TestWindowedBolt();
        this.testWindowedBolt.withTimestampField("ts");
        this.executor = new WindowedBoltExecutor((IWindowedBolt)this.testWindowedBolt);
        TopologyContext context = this.getTopologyContext();
        Mockito.when((Object)context.getThisStreams()).thenReturn(new HashSet<String>(Arrays.asList("default", "$late")));
        OutputCollector outputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        HashMap<String, Object> conf = new HashMap<String, Object>();
        conf.put("topology.message.timeout.secs", 100000);
        conf.put("topology.bolts.window.length.duration.ms", 20);
        conf.put("topology.bolts.window.sliding.interval.duration.ms", 10);
        conf.put("topology.bolts.late.tuple.stream", "$late");
        conf.put("topology.bolts.tuple.timestamp.max.lag.ms", 5);
        conf.put("topology.bolts.watermark.event.interval.ms", 10);
        this.executor.prepare(conf, context, outputCollector);
        long[] timstamps = new long[]{603L, 605L, 607L, 618L, 626L, 636L, 600L};
        ArrayList<Tuple> tuples = new ArrayList<Tuple>(timstamps.length);
        this.executor.waterMarkEventGenerator.run();
        for (long ts : timstamps) {
            Tuple tuple = this.getTuple("s1", new Fields(new String[]{"ts"}), new Values(new Object[]{ts}));
            tuples.add(tuple);
            this.executor.execute(tuple);
            Time.sleep((long)10L);
        }
        System.out.println(this.testWindowedBolt.tupleWindows);
        Tuple tuple = (Tuple)tuples.get(tuples.size() - 1);
        ((OutputCollector)Mockito.verify((Object)outputCollector)).emit("$late", Arrays.asList(tuple), (List)new Values(new Object[]{tuple}));
    }

    private static class TestWindowedBolt
    extends BaseWindowedBolt {
        List<TupleWindow> tupleWindows = new ArrayList<TupleWindow>();

        private TestWindowedBolt() {
        }

        public void execute(TupleWindow input) {
            this.tupleWindows.add(input);
        }
    }
}

