/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams;

import com.google.common.collect.Multimap;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.streams.Edge;
import org.apache.storm.streams.Node;
import org.apache.storm.streams.ProcessorBolt;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.StreamsEdgeFactory;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.Predicate;
import org.apache.storm.streams.operations.aggregators.LongSum;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.FilterProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.jgrapht.DirectedGraph;
import org.jgrapht.EdgeFactory;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    ProcessorBolt bolt;
    Tuple mockTuple1;
    Tuple mockTuple2;
    Tuple mockTuple3;
    Tuple punctuation;
    Multimap<String, ProcessorNode> mockStreamToProcessors;
    DirectedGraph<Node, Edge> graph;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple)Mockito.mock(Tuple.class);
        this.mockTuple2 = (Tuple)Mockito.mock(Tuple.class);
        this.mockTuple3 = (Tuple)Mockito.mock(Tuple.class);
        this.setUpMockTuples(this.mockTuple1, this.mockTuple2, this.mockTuple3);
        this.punctuation = (Tuple)Mockito.mock(Tuple.class);
        this.setUpPunctuation(this.punctuation);
        this.mockStreamToProcessors = (Multimap)Mockito.mock(Multimap.class);
        this.graph = new DefaultDirectedGraph((EdgeFactory)new StreamsEdgeFactory());
    }

    @Test
    public void testEmitAndAck() throws Exception {
        this.setUpProcessorBolt((Processor<?>)new FilterProcessor((Predicate & Serializable)x -> true));
        this.bolt.execute(this.mockTuple1);
        ArgumentCaptor anchor = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor os = ArgumentCaptor.forClass(String.class);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).emit((String)os.capture(), (Collection)anchor.capture(), (List)values.capture());
        Assert.assertEquals((Object)"outputstream", (Object)os.getValue());
        Assert.assertArrayEquals((Object[])new Object[]{this.mockTuple1}, (Object[])((Collection)anchor.getValue()).toArray());
        Assert.assertEquals((Object)new Values(new Object[]{100}), (Object)values.getValue());
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).ack(this.mockTuple1);
    }

    @Test
    public void testAggResultAndAck() throws Exception {
        this.setUpProcessorBolt((Processor<?>)new AggregateProcessor((CombinerAggregator)new LongSum()), Collections.singleton("inputstream"), true, null);
        this.bolt.execute(this.mockTuple2);
        this.bolt.execute(this.mockTuple3);
        this.bolt.execute(this.punctuation);
        ArgumentCaptor anchor = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor os = ArgumentCaptor.forClass(String.class);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)2))).emit((String)os.capture(), (Collection)anchor.capture(), (List)values.capture());
        Assert.assertArrayEquals((Object[])new Object[]{this.mockTuple2, this.mockTuple3, this.punctuation}, (Object[])((Collection)anchor.getAllValues().get(0)).toArray());
        Assert.assertArrayEquals((Object[])new Object[]{this.mockTuple2, this.mockTuple3, this.punctuation}, (Object[])((Collection)anchor.getAllValues().get(1)).toArray());
        Assert.assertArrayEquals((Object[])new Object[]{new Values(new Object[]{200L}), new Values(new Object[]{"__punctuation"})}, (Object[])values.getAllValues().toArray());
        Assert.assertArrayEquals((Object[])new Object[]{"outputstream", "outputstream__punctuation"}, (Object[])os.getAllValues().toArray());
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).ack(this.mockTuple2);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).ack(this.mockTuple3);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).ack(this.punctuation);
    }

    @Test
    public void testEmitTs() throws Exception {
        Tuple tupleWithTs = (Tuple)Mockito.mock(Tuple.class);
        this.setUpMockTuples(tupleWithTs);
        Mockito.when((Object)tupleWithTs.getLongByField("ts")).thenReturn((Object)12345L);
        this.setUpProcessorBolt((Processor<?>)new FilterProcessor((Predicate & Serializable)x -> true), "ts");
        this.bolt.execute(tupleWithTs);
        ArgumentCaptor anchor = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor os = ArgumentCaptor.forClass(String.class);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).emit((String)os.capture(), (Collection)anchor.capture(), (List)values.capture());
        Assert.assertEquals((Object)"outputstream", (Object)os.getValue());
        Assert.assertArrayEquals((Object[])new Object[]{tupleWithTs}, (Object[])((Collection)anchor.getValue()).toArray());
        Assert.assertEquals((Object)new Values(new Object[]{100, 12345L}), (Object)values.getValue());
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).ack(tupleWithTs);
    }

    private void setUpProcessorBolt(Processor<?> processor) {
        this.setUpProcessorBolt(processor, Collections.emptySet(), false, null);
    }

    private void setUpProcessorBolt(Processor<?> processor, String tsFieldName) {
        this.setUpProcessorBolt(processor, Collections.emptySet(), false, tsFieldName);
    }

    private void setUpProcessorBolt(Processor<?> processor, Set<String> windowedParentStreams, boolean isWindowed, String tsFieldName) {
        ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        node.setWindowedParentStreams(windowedParentStreams);
        node.setWindowed(isWindowed);
        Mockito.when((Object)this.mockStreamToProcessors.get((Object)Mockito.anyString())).thenReturn(Collections.singletonList(node));
        Mockito.when((Object)this.mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
        Map mockSources = (Map)Mockito.mock(Map.class);
        GlobalStreamId mockGlobalStreamId = (GlobalStreamId)Mockito.mock(GlobalStreamId.class);
        Mockito.when((Object)this.mockTopologyContext.getThisSources()).thenReturn((Object)mockSources);
        Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
        Mockito.when((Object)mockGlobalStreamId.get_streamId()).thenReturn((Object)"inputstream");
        Mockito.when((Object)mockGlobalStreamId.get_componentId()).thenReturn((Object)"bolt0");
        Mockito.when((Object)this.mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
        this.graph.addVertex((Object)node);
        this.bolt = new ProcessorBolt("bolt1", this.graph, Collections.singletonList(node));
        if (tsFieldName != null && !tsFieldName.isEmpty()) {
            this.bolt.setTimestampField(tsFieldName);
        }
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
    }

    private void setUpMockTuples(Tuple ... tuples) {
        for (Tuple tuple : tuples) {
            Mockito.when((Object)tuple.size()).thenReturn((Object)1);
            Mockito.when((Object)tuple.getValue(0)).thenReturn((Object)100);
            Mockito.when((Object)tuple.getSourceComponent()).thenReturn((Object)"bolt0");
            Mockito.when((Object)tuple.getSourceStreamId()).thenReturn((Object)"inputstream");
        }
    }

    private void setUpPunctuation(Tuple punctuation) {
        Mockito.when((Object)punctuation.size()).thenReturn((Object)1);
        Mockito.when((Object)punctuation.getValue(0)).thenReturn((Object)"__punctuation");
        Mockito.when((Object)punctuation.getSourceComponent()).thenReturn((Object)"bolt0");
        Mockito.when((Object)punctuation.getSourceStreamId()).thenReturn((Object)"inputstream");
    }
}

