/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams;

import com.google.common.collect.Multimap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.storm.streams.Edge;
import org.apache.storm.streams.Node;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.StreamsEdgeFactory;
import org.apache.storm.streams.WindowedProcessorBolt;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.aggregators.Count;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import org.jgrapht.DirectedGraph;
import org.jgrapht.EdgeFactory;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class WindowedProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    WindowedProcessorBolt bolt;
    Tuple mockTuple1;
    Tuple mockTuple2;
    Tuple mockTuple3;
    DirectedGraph<Node, Edge> graph;
    Multimap<String, ProcessorNode> mockStreamToProcessors;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple)Mockito.mock(Tuple.class);
        this.mockTuple2 = (Tuple)Mockito.mock(Tuple.class);
        this.mockTuple3 = (Tuple)Mockito.mock(Tuple.class);
        this.setUpMockTuples(this.mockTuple1, this.mockTuple2, this.mockTuple3);
        this.mockStreamToProcessors = (Multimap)Mockito.mock(Multimap.class);
    }

    @Test
    public void testEmit() throws Exception {
        TumblingWindows window = TumblingWindows.of((BaseWindowedBolt.Count)BaseWindowedBolt.Count.of((int)2));
        this.setUpWindowedProcessorBolt((Processor<?>)new AggregateProcessor((CombinerAggregator)new Count()), (Window<?, ?>)window);
        this.bolt.execute(this.getMockTupleWindow(this.mockTuple1, this.mockTuple2, this.mockTuple3));
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor os = ArgumentCaptor.forClass(String.class);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)2))).emit((String)os.capture(), (List)values.capture());
        Assert.assertEquals((Object)"outputstream", os.getAllValues().get(0));
        Assert.assertEquals((Object)new Values(new Object[]{3L}), values.getAllValues().get(0));
        Assert.assertEquals((Object)"outputstream__punctuation", os.getAllValues().get(1));
        Assert.assertEquals((Object)new Values(new Object[]{"__punctuation"}), values.getAllValues().get(1));
    }

    private void setUpWindowedProcessorBolt(Processor<?> processor, Window<?, ?> window) {
        ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        node.setWindowed(true);
        Mockito.when((Object)this.mockStreamToProcessors.get((Object)Mockito.anyString())).thenReturn(Collections.singletonList(node));
        Mockito.when((Object)this.mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
        this.graph = new DefaultDirectedGraph((EdgeFactory)new StreamsEdgeFactory());
        this.graph.addVertex((Object)node);
        this.bolt = new WindowedProcessorBolt("bolt1", this.graph, Collections.singletonList(node), window);
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
    }

    private void setUpMockTuples(Tuple ... tuples) {
        for (Tuple tuple : tuples) {
            Mockito.when((Object)tuple.size()).thenReturn((Object)1);
            Mockito.when((Object)tuple.getValue(0)).thenReturn((Object)100);
            Mockito.when((Object)tuple.getSourceComponent()).thenReturn((Object)"bolt0");
            Mockito.when((Object)tuple.getSourceStreamId()).thenReturn((Object)"inputstream");
        }
    }

    private TupleWindow getMockTupleWindow(Tuple ... tuples) {
        TupleWindow tupleWindow = (TupleWindow)Mockito.mock(TupleWindow.class);
        Mockito.when((Object)tupleWindow.get()).thenReturn(Arrays.asList(tuples));
        return tupleWindow;
    }
}

