/*
 * Decompiled with CFR 0.152.
 */
package com.github.fppt.jedismock.operations.streams;

import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.datastructures.streams.SequencedMap;
import com.github.fppt.jedismock.datastructures.streams.SequencedMapForwardIterator;
import com.github.fppt.jedismock.datastructures.streams.StreamId;
import com.github.fppt.jedismock.exception.WrongStreamKeyException;
import com.github.fppt.jedismock.operations.AbstractRedisOperation;
import com.github.fppt.jedismock.operations.RedisCommand;
import com.github.fppt.jedismock.server.Response;
import com.github.fppt.jedismock.storage.OperationExecutorState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@RedisCommand(value="xread")
public class XRead
extends AbstractRedisOperation {
    private final Object lock;
    private final boolean isInTransaction;

    public XRead(OperationExecutorState state, List<Slice> params) {
        super(state.base(), params);
        this.lock = state.lock();
        this.isInTransaction = state.isTransactionModeOn();
    }

    @Override
    protected int minArgs() {
        return 3;
    }

    @Override
    protected Slice response() {
        int count;
        int streamInd = 0;
        long blockTimeNanosec = 0L;
        boolean isBlocking = false;
        if ("count".equalsIgnoreCase(this.params().get(streamInd).toString())) {
            count = Integer.parseInt(this.params().get(++streamInd).toString());
            ++streamInd;
        } else {
            count = Integer.MAX_VALUE;
        }
        if ("block".equalsIgnoreCase(this.params().get(streamInd).toString())) {
            blockTimeNanosec = Long.parseLong(this.params().get(++streamInd).toString()) * 1000000L;
            isBlocking = true;
            if (blockTimeNanosec < 0L) {
                return Response.error("ERR timeout is negative");
            }
        }
        int n = ++streamInd;
        ++streamInd;
        if (!"streams".equalsIgnoreCase(this.params().get(n).toString())) {
            return Response.error("ERR syntax error");
        }
        if ((this.params().size() - streamInd) % 2 != 0) {
            return Response.error("ERR Unbalanced 'xread' list of streams: for each stream key an ID or '$' must be specified");
        }
        int streamsCount = (this.params().size() - streamInd) / 2;
        SequencedMap<Slice, StreamId> mapKeyToBeginEntryId = new SequencedMap<Slice, StreamId>();
        for (int i = 0; i < streamsCount; ++i) {
            Slice key2 = this.params().get(streamInd + i);
            Slice id2 = this.params().get(streamInd + streamsCount + i);
            try {
                if (!this.base().exists(key2)) {
                    mapKeyToBeginEntryId.append(key2, "$".equalsIgnoreCase(id2.toString()) ? new StreamId(0L, 1L) : new StreamId(id2));
                    continue;
                }
                mapKeyToBeginEntryId.append(key2, "$".equalsIgnoreCase(id2.toString()) ? this.getStreamFromBaseOrCreateEmpty(key2).getStoredData().getTail() : new StreamId(id2));
                continue;
            }
            catch (WrongStreamKeyException e) {
                return Response.error(e.getMessage());
            }
        }
        ArrayList<Slice> output = new ArrayList<Slice>();
        long waitEnd = System.nanoTime() + blockTimeNanosec;
        if (isBlocking) {
            if (blockTimeNanosec > 0L) {
                try {
                    long waitTimeNanos;
                    while (!this.isInTransaction && (waitTimeNanos = waitEnd - System.nanoTime()) >= 0L) {
                        this.lock.wait(waitTimeNanos / 1000000L, (int)(waitTimeNanos % 1000000L));
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return Response.NULL;
                }
            }
            boolean updated = false;
            try {
                while (!this.isInTransaction && !updated) {
                    for (Map.Entry entry : mapKeyToBeginEntryId) {
                        if (!this.base().exists((Slice)entry.getKey()) || this.getStreamFromBaseOrCreateEmpty((Slice)entry.getKey()).getStoredData().getTail().compareTo((StreamId)entry.getValue()) <= 0) continue;
                        updated = true;
                        break;
                    }
                    this.lock.wait(500L, 0);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return Response.NULL;
            }
        }
        mapKeyToBeginEntryId.forEach((key, id) -> {
            SequencedMap<StreamId, SequencedMap<Slice, Slice>> map = this.getStreamFromBaseOrCreateEmpty((Slice)key).getStoredData();
            if (!this.base().exists((Slice)key)) {
                return;
            }
            if (map.getTail() == null || id.compareTo(map.getTail()) >= 0) {
                return;
            }
            try {
                id = id.increment();
            }
            catch (WrongStreamKeyException e) {
                return;
            }
            SequencedMapForwardIterator<StreamId, SequencedMap<Slice, Slice>> it = map.iterator((StreamId)id);
            ArrayList<Slice> data = new ArrayList<Slice>();
            int addedEntries = 1;
            while (it.hasNext() && addedEntries++ <= count) {
                Object entry = it.next();
                ArrayList<Slice> values = new ArrayList<Slice>();
                ((SequencedMap)entry.getValue()).forEach((k, v) -> {
                    values.add(Response.bulkString(k));
                    values.add(Response.bulkString(v));
                });
                data.add(Response.array(Arrays.asList(Response.bulkString(((StreamId)entry.getKey()).toSlice()), Response.array(values))));
            }
            output.add(Response.array(Arrays.asList(Response.bulkString(key), Response.array(data))));
        });
        return Response.array(output);
    }
}

