/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.spout;

import java.util.HashMap;
import java.util.List;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointSpoutTest {
    CheckpointSpout spout = new CheckpointSpout();
    TopologyContext mockTopologyContext;
    SpoutOutputCollector mockOutputCollector;

    @Before
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)this.mockTopologyContext.getThisComponentId()).thenReturn((Object)"test");
        Mockito.when((Object)this.mockTopologyContext.getThisTaskId()).thenReturn((Object)1);
        this.mockOutputCollector = (SpoutOutputCollector)Mockito.mock(SpoutOutputCollector.class);
    }

    @Test
    public void testInitState() throws Exception {
        this.spout.open(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        this.spout.nextTuple();
        Values expectedTuple = new Values(new Object[]{-1L, CheckPointState.Action.INITSTATE});
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector)).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)-1L, (Object)msgId.getValue());
        this.spout.ack((Object)-1L);
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector)).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        expectedTuple = new Values(new Object[]{-1L, CheckPointState.Action.INITSTATE});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)-1L, (Object)msgId.getValue());
    }

    @Test
    public void testPrepare() throws Exception {
        this.spout.open(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack((Object)-1L);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)2))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{0L, CheckPointState.Action.PREPARE});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)0L, (Object)msgId.getValue());
    }

    @Test
    public void testPrepareWithFail() throws Exception {
        HashMap topoConf = new HashMap();
        KeyValueState state = (KeyValueState)StateFactory.getState((String)"__state", topoConf, (TopologyContext)this.mockTopologyContext);
        CheckPointState txState = new CheckPointState(-1L, CheckPointState.State.COMMITTED);
        state.put((Object)"__state", (Object)txState);
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack((Object)-1L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.ack((Object)0L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.ack((Object)0L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.fail((Object)1L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.fail((Object)1L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.ack((Object)1L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.ack((Object)0L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)8))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{1L, CheckPointState.Action.PREPARE});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)1L, (Object)msgId.getValue());
    }

    @Test
    public void testCommit() throws Exception {
        HashMap<String, Integer> topoConf = new HashMap<String, Integer>();
        topoConf.put("topology.state.checkpoint.interval.ms", 0);
        this.spout.open(topoConf, this.mockTopologyContext, this.mockOutputCollector);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack((Object)-1L);
        this.spout.nextTuple();
        this.spout.ack((Object)0L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        this.spout.fail((Object)0L);
        Utils.sleep((long)10L);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)4))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{0L, CheckPointState.Action.COMMIT});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)0L, (Object)msgId.getValue());
    }

    @Test
    public void testRecoveryRollback() throws Exception {
        HashMap topoConf = new HashMap();
        KeyValueState state = (KeyValueState)StateFactory.getState((String)"test-1", topoConf, (TopologyContext)this.mockTopologyContext);
        CheckPointState checkPointState = new CheckPointState(100L, CheckPointState.State.PREPARING);
        state.put((Object)"__state", (Object)checkPointState);
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{100L, CheckPointState.Action.ROLLBACK});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)100L, (Object)msgId.getValue());
    }

    @Test
    public void testRecoveryRollbackAck() throws Exception {
        HashMap topoConf = new HashMap();
        KeyValueState state = (KeyValueState)StateFactory.getState((String)"test-1", topoConf, (TopologyContext)this.mockTopologyContext);
        CheckPointState checkPointState = new CheckPointState(100L, CheckPointState.State.PREPARING);
        state.put((Object)"__state", (Object)checkPointState);
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack((Object)100L);
        this.spout.nextTuple();
        this.spout.ack((Object)99L);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)3))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{100L, CheckPointState.Action.PREPARE});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)100L, (Object)msgId.getValue());
    }

    @Test
    public void testRecoveryCommit() throws Exception {
        HashMap topoConf = new HashMap();
        KeyValueState state = (KeyValueState)StateFactory.getState((String)"test-1", topoConf, (TopologyContext)this.mockTopologyContext);
        CheckPointState checkPointState = new CheckPointState(100L, CheckPointState.State.COMMITTING);
        state.put((Object)"__state", (Object)checkPointState);
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor stream = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor values = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor msgId = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector)Mockito.verify((Object)this.mockOutputCollector, (VerificationMode)Mockito.times((int)1))).emit((String)stream.capture(), (List)values.capture(), msgId.capture());
        Values expectedTuple = new Values(new Object[]{100L, CheckPointState.Action.COMMIT});
        Assert.assertEquals((Object)"$checkpoint", (Object)stream.getValue());
        Assert.assertEquals((Object)expectedTuple, (Object)values.getValue());
        Assert.assertEquals((Object)100L, (Object)msgId.getValue());
    }
}

