/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams;

import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.streams.Edge;
import org.apache.storm.streams.Node;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.StatefulProcessorBolt;
import org.apache.storm.streams.StreamsEdgeFactory;
import org.apache.storm.streams.operations.StateUpdater;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.jgrapht.DirectedGraph;
import org.jgrapht.EdgeFactory;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class StatefulProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    StatefulProcessorBolt<String, Long> bolt;
    Tuple mockTuple1;
    DirectedGraph<Node, Edge> graph;
    Multimap<String, ProcessorNode> mockStreamToProcessors;
    KeyValueState<String, Long> mockKeyValueState;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple)Mockito.mock(Tuple.class);
        this.mockStreamToProcessors = (Multimap)Mockito.mock(Multimap.class);
        this.mockKeyValueState = (KeyValueState)Mockito.mock(KeyValueState.class);
        this.setUpMockTuples(this.mockTuple1);
    }

    @Test
    public void testEmitAndAck() throws Exception {
        this.setUpStatefulProcessorBolt((Processor<?>)new UpdateStateByKeyProcessor((StateUpdater)new StateUpdater<Object, Long>(){

            public Long init() {
                return 0L;
            }

            public Long apply(Long state, Object value) {
                return state + 1L;
            }
        }));
        this.bolt.execute(this.mockTuple1);
        ArgumentCaptor anchor = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor os = ArgumentCaptor.forClass(String.class);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector)).emit((String)os.capture(), (Collection)anchor.capture(), (List)values.capture());
        Assert.assertEquals((Object)"outputstream", (Object)os.getValue());
        Assert.assertArrayEquals((Object[])new Object[]{this.mockTuple1}, (Object[])((Collection)anchor.getValue()).toArray());
        Assert.assertEquals((Object)new Values(new Object[]{"k", 1L}), (Object)values.getValue());
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).ack(this.mockTuple1);
        ((KeyValueState)Mockito.verify(this.mockKeyValueState, (VerificationMode)Mockito.times((int)1))).put((Object)"k", (Object)1L);
    }

    private void setUpStatefulProcessorBolt(Processor<?> processor) {
        ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        node.setEmitsPair(true);
        Mockito.when((Object)this.mockStreamToProcessors.get((Object)Mockito.anyString())).thenReturn(Collections.singletonList(node));
        this.graph = new DefaultDirectedGraph((EdgeFactory)new StreamsEdgeFactory());
        this.graph.addVertex((Object)node);
        this.bolt = new StatefulProcessorBolt("bolt1", this.graph, Collections.singletonList(node));
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        this.bolt.initState(this.mockKeyValueState);
    }

    private void setUpMockTuples(Tuple ... tuples) {
        for (Tuple tuple : tuples) {
            Mockito.when((Object)tuple.size()).thenReturn((Object)1);
            Mockito.when((Object)tuple.getValue(0)).thenReturn((Object)Pair.of((Object)"k", (Object)"v"));
            Mockito.when((Object)tuple.getSourceComponent()).thenReturn((Object)"bolt0");
            Mockito.when((Object)tuple.getSourceStreamId()).thenReturn((Object)"inputstream");
        }
    }
}

