/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IStatefulWindowedBolt;
import org.apache.storm.topology.StatefulWindowedBoltExecutor;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import org.apache.storm.windowing.TupleWindowImpl;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class StatefulWindowedBoltExecutorTest {
    StatefulWindowedBoltExecutor<KeyValueState<String, String>> executor;
    IStatefulWindowedBolt<KeyValueState<String, String>> mockBolt;
    OutputCollector mockOutputCollector;
    TopologyContext mockTopologyContext;
    Map<String, Object> mockStormConf = new HashMap<String, Object>();

    @Before
    public void setUp() throws Exception {
        this.mockBolt = (IStatefulWindowedBolt)Mockito.mock(IStatefulWindowedBolt.class);
        this.mockTopologyContext = (TopologyContext)Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector)Mockito.mock(OutputCollector.class);
        this.executor = new StatefulWindowedBoltExecutor(this.mockBolt);
    }

    @Test(expected=IllegalArgumentException.class)
    public void testPrepare() throws Exception {
        this.executor.prepare(this.mockStormConf, this.mockTopologyContext, this.mockOutputCollector);
    }

    @Test
    public void testPrepareWithMsgid() throws Exception {
        this.mockStormConf.put("topology.bolts.message.id.field.name", "msgid");
        this.mockStormConf.put("topology.bolts.window.length.count", 5);
        this.mockStormConf.put("topology.bolts.window.sliding.interval.count", 5);
        this.executor.prepare(this.mockStormConf, this.mockTopologyContext, this.mockOutputCollector);
    }

    @Test
    public void testExecute() throws Exception {
        this.mockStormConf.put("topology.bolts.message.id.field.name", "msgid");
        this.mockStormConf.put("topology.bolts.window.length.count", 5);
        this.mockStormConf.put("topology.bolts.window.sliding.interval.count", 5);
        KeyValueState mockState = (KeyValueState)Mockito.mock(KeyValueState.class);
        this.executor.prepare(this.mockStormConf, this.mockTopologyContext, this.mockOutputCollector, mockState);
        this.executor.initState(null);
        List<Tuple> tuples = this.getMockTuples(5);
        for (Tuple tuple : tuples) {
            this.executor.execute(tuple);
        }
        ((IStatefulWindowedBolt)Mockito.verify(this.mockBolt, (VerificationMode)Mockito.times((int)1))).execute(this.getTupleWindow(tuples));
        StatefulWindowedBoltExecutor.WindowState expectedState = new StatefulWindowedBoltExecutor.WindowState(Long.MIN_VALUE, 4L);
        ((KeyValueState)Mockito.verify((Object)mockState, (VerificationMode)Mockito.times((int)1))).put(Mockito.any(StatefulWindowedBoltExecutor.TaskStream.class), Mockito.eq((Object)expectedState));
    }

    @Test
    public void testRecovery() throws Exception {
        this.mockStormConf.put("topology.bolts.message.id.field.name", "msgid");
        this.mockStormConf.put("topology.bolts.window.length.count", 5);
        this.mockStormConf.put("topology.bolts.window.sliding.interval.count", 5);
        KeyValueState mockState = (KeyValueState)Mockito.mock(KeyValueState.class);
        Map mockMap = (Map)Mockito.mock(Map.class);
        Mockito.when((Object)this.mockTopologyContext.getThisSources()).thenReturn((Object)mockMap);
        Mockito.when((Object)this.mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
        Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
        StatefulWindowedBoltExecutor.WindowState mockWindowState = new StatefulWindowedBoltExecutor.WindowState(4L, 4L);
        Mockito.when((Object)mockState.get(Mockito.any(StatefulWindowedBoltExecutor.TaskStream.class))).thenReturn((Object)mockWindowState);
        this.executor.prepare(this.mockStormConf, this.mockTopologyContext, this.mockOutputCollector, mockState);
        this.executor.initState(null);
        List<Tuple> tuples = this.getMockTuples(10);
        for (Tuple tuple : tuples) {
            this.executor.execute(tuple);
        }
        StatefulWindowedBoltExecutor.WindowState expectedState = new StatefulWindowedBoltExecutor.WindowState(4L, 9L);
        ((KeyValueState)Mockito.verify((Object)mockState, (VerificationMode)Mockito.times((int)1))).put(Mockito.any(StatefulWindowedBoltExecutor.TaskStream.class), Mockito.eq((Object)expectedState));
    }

    private TupleWindow getTupleWindow(List<Tuple> tuples) {
        return new TupleWindowImpl(tuples, tuples, Collections.emptyList());
    }

    private List<Tuple> getMockTuples(int count) {
        ArrayList<Tuple> mockTuples = new ArrayList<Tuple>();
        for (long i = 0L; i < (long)count; ++i) {
            Tuple mockTuple = (Tuple)Mockito.mock(Tuple.class);
            Mockito.when((Object)mockTuple.getLongByField("msgid")).thenReturn((Object)i);
            Mockito.when((Object)mockTuple.getSourceTask()).thenReturn((Object)1);
            Mockito.when((Object)mockTuple.getSourceGlobalStreamId()).thenReturn((Object)new GlobalStreamId("a", "s"));
            mockTuples.add(mockTuple);
        }
        return mockTuples;
    }
}

