/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.State;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IStatefulBolt;
import org.apache.storm.topology.StatefulBoltExecutor;
import org.apache.storm.tuple.Tuple;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class StatefulBoltExecutorTest {
    StatefulBoltExecutor<KeyValueState<String, String>> executor;
    IStatefulBolt<KeyValueState<String, String>> mockBolt;
    TopologyContext mockTopologyContext;
    Tuple mockTuple;
    Tuple mockCheckpointTuple;
    Map<String, Object> mockStormConf = new HashMap<String, Object>();
    OutputCollector mockOutputCollector;
    State mockState;
    Map<GlobalStreamId, Grouping> mockGlobalStream;
    Set<GlobalStreamId> mockStreamIds;

    @Before
    public void setUp() throws Exception {
        this.mockBolt = (IStatefulBolt)Mockito.mock(IStatefulBolt.class);
        this.executor = new StatefulBoltExecutor(this.mockBolt);
        GlobalStreamId mockGlobalStreamId = (GlobalStreamId)Mockito.mock(GlobalStreamId.class);
        Mockito.when((Object)mockGlobalStreamId.get_streamId()).thenReturn((Object)"$checkpoint");
        this.mockStreamIds = new HashSet<GlobalStreamId>();
        this.mockStreamIds.add(mockGlobalStreamId);
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        this.mockGlobalStream = (Map)Mockito.mock(Map.class);
        this.mockState = (State)Mockito.mock(State.class);
        Mockito.when((Object)this.mockTopologyContext.getThisComponentId()).thenReturn((Object)"test");
        Mockito.when((Object)this.mockTopologyContext.getThisTaskId()).thenReturn((Object)1);
        Mockito.when((Object)this.mockTopologyContext.getThisSources()).thenReturn(this.mockGlobalStream);
        Mockito.when((Object)this.mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
        Mockito.when(this.mockGlobalStream.keySet()).thenReturn(this.mockStreamIds);
        this.mockTuple = (Tuple)Mockito.mock(Tuple.class);
        this.mockCheckpointTuple = (Tuple)Mockito.mock(Tuple.class);
        this.executor.prepare(this.mockStormConf, this.mockTopologyContext, this.mockOutputCollector, this.mockState);
    }

    @Test
    public void testHandleTupleBeforeInit() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        this.executor.execute(this.mockTuple);
        ((IStatefulBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)0))).execute((Tuple)Mockito.any(Tuple.class));
    }

    @Test
    public void testHandleTuple() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.INITSTATE);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(0L));
        ((OutputCollector)Mockito.doNothing().when((Object)this.mockOutputCollector)).ack(this.mockCheckpointTuple);
        this.executor.execute(this.mockCheckpointTuple);
        ((IStatefulBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)1))).execute(this.mockTuple);
        ((IStatefulBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)1))).initState((State)Mockito.any(KeyValueState.class));
    }

    @Test
    public void testRollback() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.ROLLBACK);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(0L));
        ((OutputCollector)Mockito.doNothing().when((Object)this.mockOutputCollector)).ack(this.mockCheckpointTuple);
        this.executor.execute(this.mockCheckpointTuple);
        ((State)Mockito.verify((Object)this.mockState, (VerificationMode)Mockito.times((int)1))).rollback();
    }

    @Test
    public void testCommit() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.COMMIT);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(0L));
        ((OutputCollector)Mockito.doNothing().when((Object)this.mockOutputCollector)).ack(this.mockCheckpointTuple);
        this.executor.execute(this.mockCheckpointTuple);
        ((IStatefulBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)1))).preCommit(new Long(0L).longValue());
        ((State)Mockito.verify((Object)this.mockState, (VerificationMode)Mockito.times((int)1))).commit(new Long(0L).longValue());
    }

    @Test
    public void testPrepareAndRollbackBeforeInitstate() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.PREPARE);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        this.executor.execute(this.mockCheckpointTuple);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).fail(this.mockCheckpointTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.ROLLBACK);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        ((OutputCollector)Mockito.doNothing().when((Object)this.mockOutputCollector)).ack(this.mockCheckpointTuple);
        this.executor.execute(this.mockCheckpointTuple);
        ((State)Mockito.verify((Object)this.mockState, (VerificationMode)Mockito.times((int)1))).rollback();
    }

    @Test
    public void testCommitBeforeInitstate() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.COMMIT);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        this.executor.execute(this.mockCheckpointTuple);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).ack(this.mockCheckpointTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.ROLLBACK);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        this.executor.execute(this.mockCheckpointTuple);
        ((State)Mockito.verify((Object)this.mockState, (VerificationMode)Mockito.times((int)1))).rollback();
    }

    @Test
    public void testPrepareAndCommit() throws Exception {
        Mockito.when((Object)this.mockTuple.getSourceStreamId()).thenReturn((Object)"default");
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.INITSTATE);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(0L));
        this.executor.execute(this.mockCheckpointTuple);
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getSourceStreamId()).thenReturn((Object)"$checkpoint");
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.PREPARE);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        this.executor.execute(this.mockCheckpointTuple);
        this.executor.execute(this.mockTuple);
        Mockito.when((Object)this.mockCheckpointTuple.getValueByField("action")).thenReturn((Object)CheckPointState.Action.COMMIT);
        Mockito.when((Object)this.mockCheckpointTuple.getLongByField("txid")).thenReturn((Object)new Long(100L));
        this.executor.execute(this.mockCheckpointTuple);
        this.mockOutputCollector.ack(this.mockTuple);
        ((State)Mockito.verify((Object)this.mockState, (VerificationMode)Mockito.times((int)1))).commit(new Long(100L).longValue());
        ((IStatefulBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)2))).execute(this.mockTuple);
        ((OutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).ack(this.mockTuple);
    }
}

