/*
 * Decompiled with CFR 0.152.
 */
package io.kubernetes.client.extended.workqueue;

import io.kubernetes.client.extended.workqueue.DefaultWorkQueue;
import io.kubernetes.client.extended.workqueue.DelayingQueue;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class DefaultDelayingQueue<T>
extends DefaultWorkQueue<T>
implements DelayingQueue<T> {
    public static Duration heartBeatInterval = Duration.ofSeconds(10L);
    private DelayQueue<WaitForEntry<T>> delayQueue;
    private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
    protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
    private Supplier<Instant> timeSource = Instant::now;

    public DefaultDelayingQueue(ExecutorService waitingWorker) {
        this.delayQueue = new DelayQueue();
        this.waitingEntryByData = new ConcurrentHashMap<T, WaitForEntry<T>>();
        this.waitingForAddQueue = new LinkedBlockingQueue<WaitForEntry<T>>(1000);
        waitingWorker.submit(this::waitingLoop);
    }

    public DefaultDelayingQueue() {
        this(Executors.newSingleThreadExecutor());
    }

    @Override
    public void addAfter(T item, Duration duration) {
        if (super.isShuttingDown()) {
            return;
        }
        if (duration.isZero()) {
            super.add(item);
            return;
        }
        WaitForEntry entry = new WaitForEntry(item, duration.addTo(this.timeSource.get()), this.timeSource);
        this.waitingForAddQueue.offer(entry);
    }

    protected void injectTimeSource(Supplier<Instant> fn) {
        this.timeSource = fn;
    }

    private void waitingLoop() {
        try {
            while (true) {
                WaitForEntry<T> waitForEntry;
                if (super.isShuttingDown()) {
                    return;
                }
                WaitForEntry entry = (WaitForEntry)this.delayQueue.peek();
                Duration nextReadyAt = heartBeatInterval;
                if (entry != null) {
                    Instant now = this.timeSource.get();
                    if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
                        this.delayQueue.remove(entry);
                        super.add(entry.data);
                        this.waitingEntryByData.remove(entry.data);
                        continue;
                    }
                    nextReadyAt = Duration.between(now, entry.readyAtMillis);
                }
                if ((waitForEntry = this.waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS)) == null) continue;
                if (Duration.between(((WaitForEntry)waitForEntry).readyAtMillis, this.timeSource.get()).isNegative()) {
                    this.insert(this.delayQueue, this.waitingEntryByData, waitForEntry);
                    continue;
                }
                super.add(((WaitForEntry)waitForEntry).data);
            }
        }
        catch (InterruptedException interruptedException) {
            return;
        }
    }

    private void insert(DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
        WaitForEntry<T> existing = knownEntries.get(entry.data);
        if (existing != null) {
            if (Duration.between(((WaitForEntry)existing).readyAtMillis, entry.readyAtMillis).isNegative()) {
                q.remove(existing);
                ((WaitForEntry)existing).readyAtMillis = entry.readyAtMillis;
                q.add(existing);
            }
            return;
        }
        q.offer(entry);
        knownEntries.put(entry.data, entry);
    }

    private static class WaitForEntry<T>
    implements Delayed {
        private T data;
        private Temporal readyAtMillis;
        private Supplier<Instant> timeSource;

        private WaitForEntry(T data, Temporal readyAtMillis, Supplier<Instant> timeSource) {
            this.data = data;
            this.readyAtMillis = readyAtMillis;
            this.timeSource = timeSource;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            Duration duration = Duration.between(this.timeSource.get(), this.readyAtMillis);
            return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

