/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.redis.stream;

import com.redislabs.provider.redis.RedisConfig;
import com.redislabs.provider.redis.util.ConnectionUtils$;
import com.redislabs.provider.redis.util.Logging;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.sql.redis.stream.RedisConsumerConfig;
import org.apache.spark.sql.redis.stream.RedisSourceOffsetRange;
import org.slf4j.Logger;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.math.Ordering;
import scala.math.Ordering$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\rb\u0001B\u0001\u0003\u0001=\u0011\u0011CU3eSN\u001cFO]3b[J+\u0017\rZ3s\u0015\t\u0019A!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000b\u0019\tQA]3eSNT!a\u0002\u0005\u0002\u0007M\fHN\u0003\u0002\n\u0015\u0005)1\u000f]1sW*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0019B\u0001\u0001\t\u0017GA\u0011\u0011\u0003F\u0007\u0002%)\t1#A\u0003tG\u0006d\u0017-\u0003\u0002\u0016%\t1\u0011I\\=SK\u001a\u0004\"aF\u0011\u000e\u0003aQ!!\u0007\u000e\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u000bmQ!\u0001H\u000f\u0002\u0011A\u0014xN^5eKJT!AH\u0010\u0002\u0013I,G-[:mC\n\u001c(\"\u0001\u0011\u0002\u0007\r|W.\u0003\u0002#1\t9Aj\\4hS:<\u0007CA\t%\u0013\t)#C\u0001\u0007TKJL\u0017\r\\5{C\ndW\r\u0003\u0005(\u0001\t\u0005\t\u0015!\u0003)\u0003-\u0011X\rZ5t\u0007>tg-[4\u0011\u0005%RS\"\u0001\u000e\n\u0005-R\"a\u0003*fI&\u001c8i\u001c8gS\u001eDQ!\f\u0001\u0005\u00029\na\u0001P5oSRtDCA\u00182!\t\u0001\u0004!D\u0001\u0003\u0011\u00159C\u00061\u0001)\u0011\u0015\u0019\u0004\u0001\"\u00015\u0003M)hN]3bIN#(/Z1n\u000b:$(/[3t)\t)D\u000bE\u00027}\u0005s!a\u000e\u001f\u000f\u0005aZT\"A\u001d\u000b\u0005ir\u0011A\u0002\u001fs_>$h(C\u0001\u0014\u0013\ti$#A\u0004qC\u000e\\\u0017mZ3\n\u0005}\u0002%\u0001C%uKJ\fGo\u001c:\u000b\u0005u\u0012\u0002C\u0001\"R\u001d\t\u0019uJ\u0004\u0002E\u001d:\u0011Q)\u0014\b\u0003\r2s!aR&\u000f\u0005!SeB\u0001\u001dJ\u0013\u0005i\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011QAB\u0005\u0003\u0007\u0011I!\u0001\u0015\u0002\u0002!I+G-[:T_V\u00148-\u001a+za\u0016\u001c\u0018B\u0001*T\u0005-\u0019FO]3b[\u0016sGO]=\u000b\u0005A\u0013\u0001\"B+3\u0001\u00041\u0016aC8gMN,GOU1oO\u0016\u0004\"\u0001M,\n\u0005a\u0013!A\u0006*fI&\u001c8k\\;sG\u0016|eMZ:fiJ\u000bgnZ3\t\u000bi\u0003A\u0011B.\u0002-I,\u0017\rZ*ue\u0016\fW.\u00128uef\u0014\u0015\r^2iKN$2\u0001X0a!\t\u0011U,\u0003\u0002_'\n\u00112\u000b\u001e:fC6,e\u000e\u001e:z\u0005\u0006$8\r[3t\u0011\u0015)\u0016\f1\u0001W\u0011\u0015\t\u0017\f1\u0001c\u0003A\u0019H/\u0019:u\u000b:$(/_(gMN,G\u000f\u0005\u0003dU6,hB\u00013i\u001b\u0005)'BA\rg\u0015\u00059\u0017\u0001\u00026bm\u0006L!![3\u0002\u00075\u000b\u0007/\u0003\u0002lY\n)QI\u001c;ss*\u0011\u0011.\u001a\t\u0003]Jt!a\u001c9\u0011\u0005a\u0012\u0012BA9\u0013\u0003\u0019\u0001&/\u001a3fM&\u00111\u000f\u001e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005E\u0014\u0002C\u0001<}\u001b\u00059(B\u0001=z\u0003\u0015QW\rZ5t\u0015\tQ80A\u0004dY&,g\u000e^:\u000b\u0003\u0015I!!`<\u0003\u001bM#(/Z1n\u000b:$(/_%E\u0011\u0019y\b\u0001\"\u0003\u0002\u0002\u0005\u0019b-\u001b7uKJ\u001cFO]3b[\u0016sGO]5fgR!\u00111AA\t)\r)\u0014Q\u0001\u0005\t\u0003\u000fqH\u00111\u0001\u0002\n\u0005a1\u000f\u001e:fC6<%o\\;qgB)\u0011#a\u0003\u0002\u0010%\u0019\u0011Q\u0002\n\u0003\u0011q\u0012\u0017P\\1nKz\u00022A\u000e ]\u0011\u0015)f\u00101\u0001W\u0011\u001d\t)\u0002\u0001C\u0005\u0003/\tAC\u001a7biR,gn\u0015;sK\u0006lWI\u001c;sS\u0016\u001cHcA\u001b\u0002\u001a!A\u00111DA\n\u0001\u0004\ti\"A\u0003f]R\u0014\u0018\u0010E\u0002C\u0003?I1!!\tT\u0005A\u0019FO]3b[\u0016sGO]=CCR\u001c\u0007\u000e")
public class RedisStreamReader
implements Logging,
scala.Serializable {
    private final RedisConfig redisConfig;
    private transient Logger com$redislabs$provider$redis$util$Logging$$_logger;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public Logger logger() {
        return Logging.logger$(this);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public Logger com$redislabs$provider$redis$util$Logging$$_logger() {
        return this.com$redislabs$provider$redis$util$Logging$$_logger;
    }

    @Override
    public void com$redislabs$provider$redis$util$Logging$$_logger_$eq(Logger x$1) {
        this.com$redislabs$provider$redis$util$Logging$$_logger = x$1;
    }

    /*
     * WARNING - void declaration
     */
    public Iterator<Tuple2<StreamEntryID, Map<String, String>>> unreadStreamEntries(RedisSourceOffsetRange offsetRange) {
        void var3_3;
        RedisConsumerConfig config = offsetRange.config();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reading entries "})).s((Seq)Nil$.MODULE$) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"[", ", ", ", ", ", start=", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{config.streamKey(), config.groupName(), config.consumerName(), offsetRange.start()})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"end=", "]... "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{offsetRange.end()})));
        Iterator<Tuple2<StreamEntryID, Map<String, String>>> res = this.filterStreamEntries(offsetRange, (Function0<Iterator<List<Map.Entry<String, List<StreamEntry>>>>>)(Function0 & Serializable & scala.Serializable)() -> {
            AbstractMap.SimpleEntry<String, StreamEntryID> startEntryOffset = new AbstractMap.SimpleEntry<String, StreamEntryID>(config.streamKey(), StreamEntryID.UNRECEIVED_ENTRY);
            return package$.MODULE$.Iterator().continually((Function0 & Serializable & scala.Serializable)() -> this.readStreamEntryBatches(offsetRange, startEntryOffset));
        });
        return var3_3;
    }

    private List<Map.Entry<String, List<StreamEntry>>> readStreamEntryBatches(RedisSourceOffsetRange offsetRange, Map.Entry<String, StreamEntryID> startEntryOffset) {
        RedisConsumerConfig config = offsetRange.config();
        return (List)ConnectionUtils$.MODULE$.withConnection(this.redisConfig.connectionForKey(config.streamKey()), (Function1 & Serializable & scala.Serializable)conn -> {
            boolean noAck = true;
            List response = conn.xreadGroup(config.groupName(), config.consumerName(), config.batchSize(), (long)config.block(), noAck, new Map.Entry[]{startEntryOffset});
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Got entries: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{response})));
            return response;
        });
    }

    private Iterator<Tuple2<StreamEntryID, Map<String, String>>> filterStreamEntries(RedisSourceOffsetRange offsetRange, Function0<Iterator<List<Map.Entry<String, List<StreamEntry>>>>> streamGroups) {
        StreamEntryID end = new StreamEntryID(offsetRange.end());
        return ((Iterator)streamGroups.apply()).takeWhile((Function1 & Serializable & scala.Serializable)response -> BoxesRunTime.boxToBoolean((boolean)RedisStreamReader.$anonfun$filterStreamEntries$1(response))).flatMap((Function1 & Serializable & scala.Serializable)response -> ((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(response).asScala()).iterator()).flatMap((Function1 & Serializable & scala.Serializable)streamEntry -> this.flattenStreamEntries((Map.Entry<String, List<StreamEntry>>)streamEntry)).takeWhile((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)RedisStreamReader.$anonfun$filterStreamEntries$4(end, x0$1)));
    }

    private Iterator<Tuple2<StreamEntryID, Map<String, String>>> flattenStreamEntries(Map.Entry<String, List<StreamEntry>> entry) {
        return ((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(entry.getValue()).asScala()).iterator().map((Function1 & Serializable & scala.Serializable)streamEntry -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)streamEntry.getID()), (Object)streamEntry.getFields()));
    }

    public static final /* synthetic */ boolean $anonfun$filterStreamEntries$1(List response) {
        return response != null && !response.isEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$filterStreamEntries$4(StreamEntryID end$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        StreamEntryID entryId = (StreamEntryID)tuple2._1();
        boolean bl = Ordering.Implicits$.MODULE$.infixOrderingOps((Object)entryId, Ordering$.MODULE$.ordered((Function1)Predef$.MODULE$.$conforms())).$less$eq((Object)end$1);
        return bl;
    }

    public RedisStreamReader(RedisConfig redisConfig) {
        this.redisConfig = redisConfig;
        Logging.$init$(this);
    }
}

