/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.streams.Node;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.PairStream;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.SpoutNode;
import org.apache.storm.streams.Stream;
import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.streams.UniqueIdGen;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.PairFunction;
import org.apache.storm.streams.operations.Predicate;
import org.apache.storm.streams.operations.Reducer;
import org.apache.storm.streams.operations.aggregators.Count;
import org.apache.storm.streams.operations.mappers.PairValueMapper;
import org.apache.storm.streams.operations.mappers.TupleValueMapper;
import org.apache.storm.streams.operations.mappers.ValueMapper;
import org.apache.storm.streams.processors.BranchProcessor;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class StreamBuilderTest {
    StreamBuilder streamBuilder;

    @Before
    public void setUp() throws Exception {
        this.streamBuilder = new StreamBuilder();
        UniqueIdGen.getInstance().reset();
    }

    @Test(expected=IllegalArgumentException.class)
    public void testSpoutNoDefaultStream() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("test"));
        stream.filter((Predicate & Serializable)x -> true);
        this.streamBuilder.build();
    }

    @Test
    public void testSpoutToBolt() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"));
        stream.to(StreamBuilderTest.newBolt());
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)1L, (long)topology.get_spouts_size());
        Assert.assertEquals((long)1L, (long)topology.get_bolts_size());
        String spoutId = (String)topology.get_spouts().keySet().iterator().next();
        HashMap<GlobalStreamId, Grouping> expected = new HashMap<GlobalStreamId, Grouping>();
        expected.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle((NullStruct)new NullStruct()));
        Assert.assertEquals(expected, (Object)((Bolt)topology.get_bolts().values().iterator().next()).get_common().get_inputs());
    }

    @Test
    public void testBranch() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"));
        Stream[] streams = stream.branch(new Predicate[]{(Predicate & Serializable)x -> true});
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)1L, (long)topology.get_spouts_size());
        Assert.assertEquals((long)1L, (long)topology.get_bolts_size());
        HashMap<GlobalStreamId, Grouping> expected = new HashMap<GlobalStreamId, Grouping>();
        String spoutId = (String)topology.get_spouts().keySet().iterator().next();
        expected.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle((NullStruct)new NullStruct()));
        Assert.assertEquals(expected, (Object)((Bolt)topology.get_bolts().values().iterator().next()).get_common().get_inputs());
        Assert.assertEquals((long)1L, (long)streams.length);
        Assert.assertEquals((long)1L, (long)streams[0].node.getOutputStreams().size());
        String parentStream = (String)streams[0].node.getOutputStreams().iterator().next() + "-branch";
        Assert.assertEquals((long)1L, (long)streams[0].node.getParents(parentStream).size());
        Node processorNdoe = (Node)streams[0].node.getParents(parentStream).iterator().next();
        Assert.assertTrue((boolean)(processorNdoe instanceof ProcessorNode));
        Assert.assertTrue((boolean)(((ProcessorNode)processorNdoe).getProcessor() instanceof BranchProcessor));
        Assert.assertTrue((boolean)(processorNdoe.getParents("default").iterator().next() instanceof SpoutNode));
    }

    @Test
    public void testJoin() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0));
        Stream[] streams = stream.branch(new Predicate[]{(Predicate & Serializable)x -> x % 2 == 0, (Predicate & Serializable)x -> x % 3 == 0});
        PairStream s1 = streams[0].mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)1));
        PairStream s2 = streams[1].mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)1));
        PairStream sj = s1.join(s2);
        Assert.assertEquals(Collections.singleton(s1.node), (Object)sj.node.getParents(s1.stream));
        Assert.assertEquals(Collections.singleton(s2.node), (Object)sj.node.getParents(s2.stream));
    }

    @Test
    public void testGroupBy() throws Exception {
        PairStream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), new PairValueMapper(0, 1), 2);
        stream.window((Window)TumblingWindows.of((BaseWindowedBolt.Count)BaseWindowedBolt.Count.of((int)10))).aggregateByKey((CombinerAggregator)new Count());
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)2L, (long)topology.get_bolts_size());
        Bolt bolt1 = (Bolt)topology.get_bolts().get("bolt1");
        Bolt bolt2 = (Bolt)topology.get_bolts().get("bolt2");
        Assert.assertEquals((Object)Grouping.shuffle((NullStruct)new NullStruct()), bolt1.get_common().get_inputs().values().iterator().next());
        Assert.assertEquals((Object)Grouping.fields(Collections.singletonList("key")), bolt2.get_common().get_inputs().values().iterator().next());
    }

    @Test
    public void testGlobalAggregate() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0), 2);
        stream.aggregate((CombinerAggregator)new Count());
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)2L, (long)topology.get_bolts_size());
        Bolt bolt1 = (Bolt)topology.get_bolts().get("bolt1");
        Bolt bolt2 = (Bolt)topology.get_bolts().get("bolt2");
        String spoutId = (String)topology.get_spouts().keySet().iterator().next();
        HashMap<GlobalStreamId, Grouping> expected1 = new HashMap<GlobalStreamId, Grouping>();
        expected1.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle((NullStruct)new NullStruct()));
        HashMap<GlobalStreamId, Grouping> expected2 = new HashMap<GlobalStreamId, Grouping>();
        expected2.put(new GlobalStreamId("bolt1", "s1"), Grouping.fields(Collections.emptyList()));
        expected2.put(new GlobalStreamId("bolt1", "s1__punctuation"), Grouping.all((NullStruct)new NullStruct()));
        Assert.assertEquals(expected1, (Object)bolt1.get_common().get_inputs());
        Assert.assertEquals(expected2, (Object)bolt2.get_common().get_inputs());
    }

    @Test
    public void testRepartition() throws Exception {
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0));
        stream.repartition(3).filter((Predicate & Serializable)x -> true).repartition(2).filter((Predicate & Serializable)x -> true).aggregate((CombinerAggregator)new Count());
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)1L, (long)topology.get_spouts_size());
        SpoutSpec spout = (SpoutSpec)topology.get_spouts().get("spout1");
        Assert.assertEquals((long)4L, (long)topology.get_bolts_size());
        Bolt bolt1 = (Bolt)topology.get_bolts().get("bolt1");
        Bolt bolt2 = (Bolt)topology.get_bolts().get("bolt2");
        Bolt bolt3 = (Bolt)topology.get_bolts().get("bolt3");
        Bolt bolt4 = (Bolt)topology.get_bolts().get("bolt4");
        Assert.assertEquals((long)1L, (long)spout.get_common().get_parallelism_hint());
        Assert.assertEquals((long)1L, (long)bolt1.get_common().get_parallelism_hint());
        Assert.assertEquals((long)3L, (long)bolt2.get_common().get_parallelism_hint());
        Assert.assertEquals((long)2L, (long)bolt3.get_common().get_parallelism_hint());
        Assert.assertEquals((long)2L, (long)bolt4.get_common().get_parallelism_hint());
    }

    @Test
    public void testBranchAndJoin() throws Exception {
        TopologyContext mockContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        OutputCollector mockCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0), 2);
        Stream[] streams = stream.branch(new Predicate[]{(Predicate & Serializable)x -> x % 2 == 0, (Predicate & Serializable)x -> x % 2 == 1});
        PairStream joined = streams[0].mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)1)).join(streams[1].mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)1)));
        Assert.assertTrue((boolean)(joined.getNode() instanceof ProcessorNode));
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)2L, (long)topology.get_bolts_size());
    }

    @Test
    public void testMultiPartitionByKey() {
        TopologyContext mockContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        OutputCollector mockCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0));
        stream.mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)x)).window((Window)TumblingWindows.of((BaseWindowedBolt.Count)BaseWindowedBolt.Count.of((int)10))).reduceByKey((Reducer & Serializable)(x, y) -> x + y).reduceByKey((Reducer & Serializable)(x, y) -> 0).print();
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)2L, (long)topology.get_bolts_size());
    }

    @Test
    public void testMultiPartitionByKeyWithRepartition() {
        TopologyContext mockContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        OutputCollector mockCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        HashMap<GlobalStreamId, Grouping> expected = new HashMap<GlobalStreamId, Grouping>();
        expected.put(new GlobalStreamId("bolt2", "s3"), Grouping.fields(Collections.singletonList("key")));
        expected.put(new GlobalStreamId("bolt2", "s3__punctuation"), Grouping.all((NullStruct)new NullStruct()));
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0));
        stream.mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)x)).window((Window)TumblingWindows.of((BaseWindowedBolt.Count)BaseWindowedBolt.Count.of((int)10))).reduceByKey((Reducer & Serializable)(x, y) -> x + y).repartition(10).reduceByKey((Reducer & Serializable)(x, y) -> 0).print();
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)3L, (long)topology.get_bolts_size());
        Assert.assertEquals(expected, (Object)((Bolt)topology.get_bolts().get("bolt3")).get_common().get_inputs());
    }

    @Test
    public void testPartitionByKeySinglePartition() {
        TopologyContext mockContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        OutputCollector mockCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        Stream stream = this.streamBuilder.newStream(StreamBuilderTest.newSpout("default"), (TupleValueMapper)new ValueMapper(0));
        stream.mapToPair((PairFunction & Serializable)x -> Pair.of((Object)x, (Object)x)).reduceByKey((Reducer & Serializable)(x, y) -> x + y).print();
        StormTopology topology = this.streamBuilder.build();
        Assert.assertEquals((long)1L, (long)topology.get_bolts_size());
    }

    private static IRichSpout newSpout(final String os) {
        return new BaseRichSpout(){

            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declareStream(os, new Fields(new String[]{"value"}));
            }

            public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
            }

            public void nextTuple() {
            }
        };
    }

    private static IRichBolt newBolt() {
        return new BaseRichBolt(){

            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }

            public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
            }

            public void execute(Tuple input) {
            }
        };
    }
}

