/*
 * Decompiled with CFR 0.152.
 */
package com.redislabs.provider.redis.streaming;

import com.redislabs.provider.redis.ReadWriteConfig;
import com.redislabs.provider.redis.RedisConfig;
import com.redislabs.provider.redis.streaming.ConsumerConfig;
import com.redislabs.provider.redis.streaming.Earliest$;
import com.redislabs.provider.redis.streaming.IdOffset;
import com.redislabs.provider.redis.streaming.ItemId;
import com.redislabs.provider.redis.streaming.Latest$;
import com.redislabs.provider.redis.streaming.Offset;
import com.redislabs.provider.redis.streaming.StreamItem;
import com.redislabs.provider.redis.util.Logging;
import com.redislabs.provider.redis.util.PipelineUtils$;
import com.redislabs.provider.redis.util.StreamUtils$;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.sparkproject.guava.util.concurrent.RateLimiter;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005]d\u0001B\u0001\u0003\u00015\u00111CU3eSN\u001cFO]3b[J+7-Z5wKJT!a\u0001\u0003\u0002\u0013M$(/Z1nS:<'BA\u0003\u0007\u0003\u0015\u0011X\rZ5t\u0015\t9\u0001\"\u0001\u0005qe>4\u0018\u000eZ3s\u0015\tI!\"A\u0005sK\u0012L7\u000f\\1cg*\t1\"A\u0002d_6\u001c\u0001aE\u0002\u0001\u001d}\u00012aD\r\u001c\u001b\u0005\u0001\"BA\t\u0013\u0003!\u0011XmY3jm\u0016\u0014(BA\u0002\u0014\u0015\t!R#A\u0003ta\u0006\u00148N\u0003\u0002\u0017/\u00051\u0011\r]1dQ\u0016T\u0011\u0001G\u0001\u0004_J<\u0017B\u0001\u000e\u0011\u0005!\u0011VmY3jm\u0016\u0014\bC\u0001\u000f\u001e\u001b\u0005\u0011\u0011B\u0001\u0010\u0003\u0005)\u0019FO]3b[&#X-\u001c\t\u0003A\rj\u0011!\t\u0006\u0003E\u0011\tA!\u001e;jY&\u0011A%\t\u0002\b\u0019><w-\u001b8h\u0011!1\u0003A!A!\u0002\u00139\u0013aD2p]N,X.\u001a:t\u0007>tg-[4\u0011\u0007!\u0012TG\u0004\u0002*_9\u0011!&L\u0007\u0002W)\u0011A\u0006D\u0001\u0007yI|w\u000e\u001e \n\u00039\nQa]2bY\u0006L!\u0001M\u0019\u0002\u000fA\f7m[1hK*\ta&\u0003\u00024i\t\u00191+Z9\u000b\u0005A\n\u0004C\u0001\u000f7\u0013\t9$A\u0001\bD_:\u001cX/\\3s\u0007>tg-[4\t\u0011e\u0002!\u0011!Q\u0001\ni\n1B]3eSN\u001cuN\u001c4jOB\u00111\bP\u0007\u0002\t%\u0011Q\b\u0002\u0002\f%\u0016$\u0017n]\"p]\u001aLw\r\u0003\u0005@\u0001\t\u0005\t\u0015!\u0003A\u0003=\u0011X-\u00193Xe&$XmQ8oM&<\u0007CA\u001eB\u0013\t\u0011EAA\bSK\u0006$wK]5uK\u000e{gNZ5h\u0011%!\u0005A!A!\u0002\u0013)5*\u0001\u0007ti>\u0014\u0018mZ3MKZ,G\u000e\u0005\u0002G\u00136\tqI\u0003\u0002I'\u000591\u000f^8sC\u001e,\u0017B\u0001&H\u00051\u0019Fo\u001c:bO\u0016dUM^3m\u0013\t!\u0015\u0004C\u0003N\u0001\u0011\u0005a*\u0001\u0004=S:LGO\u0010\u000b\u0006\u001fB\u000b&k\u0015\t\u00039\u0001AQA\n'A\u0002\u001dBQ!\u000f'A\u0002iBQa\u0010'A\u0002\u0001CQ\u0001\u0012'A\u0002\u0015CQ!\u0016\u0001\u0005BY\u000bqa\u001c8Ti\u0006\u0014H\u000fF\u0001X!\tA\u0016,D\u00012\u0013\tQ\u0016G\u0001\u0003V]&$\b\"\u0002/\u0001\t\u00032\u0016AB8o'R|\u0007O\u0002\u0003_\u0001\u0011y&AD'fgN\fw-\u001a%b]\u0012dWM]\n\u0004;\u0002D\u0007CA1g\u001b\u0005\u0011'BA2e\u0003\u0011a\u0017M\\4\u000b\u0003\u0015\fAA[1wC&\u0011qM\u0019\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005\u0005L\u0017B\u00016c\u0005!\u0011VO\u001c8bE2,\u0007\u0002\u00037^\u0005\u0003\u0005\u000b\u0011B\u001b\u0002\t\r|gN\u001a\u0005\tsu\u0013\t\u0011)A\u0005u!Aq(\u0018BC\u0002\u0013\rq.F\u0001A\u0011!\tXL!A!\u0002\u0013\u0001\u0015\u0001\u0005:fC\u0012<&/\u001b;f\u0007>tg-[4!\u0011\u0015iU\f\"\u0001t)\u0011!ho\u001e=\u0011\u0005UlV\"\u0001\u0001\t\u000b1\u0014\b\u0019A\u001b\t\u000be\u0012\b\u0019\u0001\u001e\t\u000b}\u0012\b9\u0001!\t\u000fil&\u0019!C\u0001w\u0006)!.\u001a3jgV\tA\u0010E\u0002~\u0003\u000bi\u0011A \u0006\u0003u~TA!!\u0001\u0002\u0004\u000591\r\\5f]R\u001c(\"A\u0003\n\u0007\u0005\u001daPA\u0003KK\u0012L7\u000fC\u0004\u0002\fu\u0003\u000b\u0011\u0002?\u0002\r),G-[:!\u0011%\ty!\u0018b\u0001\n\u0003\t\t\"\u0001\bsCR,G*[7ji\u0016\u0014x\n\u001d;\u0016\u0005\u0005M\u0001#\u0002-\u0002\u0016\u0005e\u0011bAA\fc\t1q\n\u001d;j_:\u0004B!a\u0007\u0002,5\u0011\u0011Q\u0004\u0006\u0005\u0003?\t\t#\u0001\u0006d_:\u001cWO\u001d:f]RT1AIA\u0012\u0015\u0011\t)#a\n\u0002\u000b\u001d,\u0018M^1\u000b\u0007\u0005%r#\u0001\u0007ta\u0006\u00148\u000e\u001d:pU\u0016\u001cG/\u0003\u0003\u0002.\u0005u!a\u0003*bi\u0016d\u0015.\\5uKJD\u0001\"!\r^A\u0003%\u00111C\u0001\u0010e\u0006$X\rT5nSR,'o\u00149uA!1\u0011QG/\u0005BY\u000b1A];o\u0011\u0019\tI$\u0018C\u0001-\u0006i2M]3bi\u0016\u001cuN\\:v[\u0016\u0014xI]8va&3gj\u001c;Fq&\u001cH\u000f\u0003\u0004\u0002>u#\tAV\u0001\u0016e\u0016\u001cW-\u001b<f+:\f7m\u001b8po2,GmZ3e\u0011\u0019\t\t%\u0018C\u0001-\u0006\u0011\"/Z2fSZ,g*Z<NKN\u001c\u0018mZ3t\u0011\u001d\t)%\u0018C\u0001\u0003\u000f\n1b\u001d;pe\u0016\fe\u000eZ!dWR)q+!\u0013\u0002^!A\u00111JA\"\u0001\u0004\ti%A\u0005tiJ,\u0017-\\&fsB!\u0011qJA,\u001d\u0011\t\t&a\u0015\u0011\u0005)\n\u0014bAA+c\u00051\u0001K]3eK\u001aLA!!\u0017\u0002\\\t11\u000b\u001e:j]\u001eT1!!\u00162\u0011!\ty&a\u0011A\u0002\u0005\u0005\u0014aB3oiJLWm\u001d\t\u0005QI\n\u0019\u0007E\u0002~\u0003KJ1!a\u001a\u007f\u0005-\u0019FO]3b[\u0016sGO]=\t\u000f\u0005-T\f\"\u0001\u0002n\u0005qQM\u001c;sS\u0016\u001cHk\\%uK6\u001cHCBA8\u0003c\n)\bE\u0002)emA\u0001\"a\u001d\u0002j\u0001\u0007\u0011QJ\u0001\u0004W\u0016L\b\u0002CA0\u0003S\u0002\r!!\u0019")
public class RedisStreamReceiver
extends Receiver<StreamItem>
implements Logging {
    private final Seq<ConsumerConfig> consumersConfig;
    private final RedisConfig redisConfig;
    private final ReadWriteConfig readWriteConfig;
    private transient Logger com$redislabs$provider$redis$util$Logging$$_logger;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public Logger logger() {
        return Logging.logger$(this);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public Logger com$redislabs$provider$redis$util$Logging$$_logger() {
        return this.com$redislabs$provider$redis$util$Logging$$_logger;
    }

    @Override
    public void com$redislabs$provider$redis$util$Logging$$_logger_$eq(Logger x$1) {
        this.com$redislabs$provider$redis$util$Logging$$_logger = x$1;
    }

    public void onStart() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting Redis Stream Receiver");
        ExecutorService executorPool = ThreadUtils.newFixedThreadPool((int)this.consumersConfig.size(), (String)"RedisStreamMessageHandler");
        try {
            this.consumersConfig.foreach((Function1 & Serializable & scala.Serializable)c -> executorPool.submit(new MessageHandler(this, (ConsumerConfig)c, $this.redisConfig, $this.readWriteConfig)));
        }
        finally {
            executorPool.shutdown();
        }
    }

    public void onStop() {
    }

    public RedisStreamReceiver(Seq<ConsumerConfig> consumersConfig, RedisConfig redisConfig, ReadWriteConfig readWriteConfig, StorageLevel storageLevel) {
        this.consumersConfig = consumersConfig;
        this.redisConfig = redisConfig;
        this.readWriteConfig = readWriteConfig;
        super(storageLevel);
        Logging.$init$(this);
    }

    public class MessageHandler
    implements Runnable {
        private final ConsumerConfig conf;
        private final ReadWriteConfig readWriteConfig;
        private final Jedis jedis;
        private final Option<RateLimiter> rateLimiterOpt;
        public final /* synthetic */ RedisStreamReceiver $outer;

        public ReadWriteConfig readWriteConfig() {
            return this.readWriteConfig;
        }

        public Jedis jedis() {
            return this.jedis;
        }

        public Option<RateLimiter> rateLimiterOpt() {
            return this.rateLimiterOpt;
        }

        @Override
        public void run() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting MessageHandler ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.conf})));
            try {
                this.createConsumerGroupIfNotExist();
                this.receiveUnacknowledged();
                this.receiveNewMessages();
            }
            catch (Exception e) {
                this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().restart("Error handling message. Restarting.", e);
            }
        }

        public void createConsumerGroupIfNotExist() {
            StreamEntryID streamEntryID;
            Offset offset = this.conf.offset();
            if (Earliest$.MODULE$.equals(offset)) {
                streamEntryID = new StreamEntryID(0L, 0L);
            } else if (Latest$.MODULE$.equals(offset)) {
                streamEntryID = StreamEntryID.LAST_ENTRY;
            } else if (offset instanceof IdOffset) {
                IdOffset idOffset = (IdOffset)offset;
                long v1 = idOffset.v1();
                long v2 = idOffset.v2();
                streamEntryID = new StreamEntryID(v1, v2);
            } else {
                throw new MatchError((Object)offset);
            }
            StreamEntryID entryId = streamEntryID;
            StreamUtils$.MODULE$.createConsumerGroupIfNotExist(this.jedis(), this.conf.streamKey(), this.conf.groupName(), entryId);
        }

        public void receiveUnacknowledged() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting receiving unacknowledged messages for key ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.conf.streamKey()})));
            boolean bl = true;
            AbstractMap.SimpleEntry<String, StreamEntryID> unackId = new AbstractMap.SimpleEntry<String, StreamEntryID>(this.conf.streamKey(), new StreamEntryID(0L, 0L));
            while (!this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().isStopped() && bl) {
                List response = this.jedis().xreadGroup(this.conf.groupName(), this.conf.consumerName(), this.conf.batchSize(), this.conf.block(), false, new Map.Entry[]{unackId});
                Map unackMessagesMap = ((TraversableOnce)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(response).map((Function1 & Serializable & scala.Serializable)e -> new Tuple2(e.getKey(), e.getValue()), Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                List entries = (List)unackMessagesMap.apply((Object)this.conf.streamKey());
                if (entries.isEmpty()) {
                    bl = false;
                }
                this.storeAndAck(this.conf.streamKey(), (Seq<StreamEntry>)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(entries));
            }
        }

        public void receiveNewMessages() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting receiving new messages for key ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.conf.streamKey()})));
            AbstractMap.SimpleEntry<String, StreamEntryID> newMessId = new AbstractMap.SimpleEntry<String, StreamEntryID>(this.conf.streamKey(), StreamEntryID.UNRECEIVED_ENTRY);
            while (!this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().isStopped()) {
                List response = this.jedis().xreadGroup(this.conf.groupName(), this.conf.consumerName(), this.conf.batchSize(), this.conf.block(), false, new Map.Entry[]{newMessId});
                if (response == null) continue;
                JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(response).foreach((Function1 & Serializable & scala.Serializable)streamMessages -> {
                    MessageHandler.$anonfun$receiveNewMessages$2(this, streamMessages);
                    return BoxedUnit.UNIT;
                });
            }
        }

        public void storeAndAck(String streamKey, Seq<StreamEntry> entries) {
            block0: {
                if (!entries.nonEmpty()) break block0;
                this.rateLimiterOpt().foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                    x$1.acquire(entries.size());
                    return BoxedUnit.UNIT;
                });
                Seq<StreamItem> streamItems = this.entriesToItems(streamKey, entries);
                this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().store(streamItems.iterator());
                PipelineUtils$.MODULE$.foreachWithPipeline(this.jedis(), entries, (Function2 & Serializable & scala.Serializable)(pipeline, entry) -> {
                    pipeline.xack(streamKey, this.conf.groupName(), new StreamEntryID[]{entry.getID()});
                    return BoxedUnit.UNIT;
                }, this.readWriteConfig());
            }
        }

        public Seq<StreamItem> entriesToItems(String key, Seq<StreamEntry> entries) {
            return (Seq)entries.map((Function1 & Serializable & scala.Serializable)e -> {
                ItemId itemId = new ItemId(e.getID().getTime(), e.getID().getSequence());
                return new StreamItem(key, itemId, (Map<String, String>)JavaConversions$.MODULE$.deprecated$u0020mapAsScalaMap(e.getFields()).toMap(Predef$.MODULE$.$conforms()));
            }, Seq$.MODULE$.canBuildFrom());
        }

        public /* synthetic */ RedisStreamReceiver com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ RateLimiter $anonfun$rateLimiterOpt$1(int r) {
            return RateLimiter.create((double)r);
        }

        public static final /* synthetic */ void $anonfun$receiveNewMessages$2(MessageHandler $this, Map.Entry streamMessages) {
            String key = (String)streamMessages.getKey();
            List entries = (List)streamMessages.getValue();
            $this.storeAndAck(key, (Seq<StreamEntry>)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(entries));
        }

        public MessageHandler(RedisStreamReceiver $outer, ConsumerConfig conf, RedisConfig redisConfig, ReadWriteConfig readWriteConfig) {
            this.conf = conf;
            this.readWriteConfig = readWriteConfig;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            this.jedis = redisConfig.connectionForKey(conf.streamKey());
            this.rateLimiterOpt = conf.rateLimitPerConsumer().map((Function1 & Serializable & scala.Serializable)r -> MessageHandler.$anonfun$rateLimiterOpt$1(BoxesRunTime.unboxToInt((Object)r)));
        }
    }
}

