/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.base.Function;
import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long allowedLateness) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness);
        this.evictor = Objects.requireNonNull(evictor);
        this.windowStateDescriptor = windowStateDescriptor;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        final Object key = this.getStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Tuple1 mergeTriggerResult;
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(mergeTriggerResult = new Tuple1((Object)TriggerResult.CONTINUE)){
                    final /* synthetic */ Tuple1 val$mergeTriggerResult;
                    {
                        this.val$mergeTriggerResult = tuple1;
                    }

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        EvictingWindowOperator.this.context.key = key;
                        EvictingWindowOperator.this.context.window = mergeResult;
                        this.val$mergeTriggerResult.f0 = EvictingWindowOperator.this.context.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            EvictingWindowOperator.this.context.window = m;
                            EvictingWindowOperator.this.context.clear();
                            EvictingWindowOperator.this.deleteCleanupTimer(m);
                        }
                        EvictingWindowOperator.this.getStateBackend().mergePartitionedStates(stateWindowResult, mergedStateWindows, EvictingWindowOperator.this.windowSerializer, EvictingWindowOperator.this.windowStateDescriptor);
                    }
                });
                if (this.isLate(actualWindow)) {
                    LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                ListState<StreamRecord<IN>> windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element);
                this.context.key = key;
                this.context.window = actualWindow;
                TriggerResult triggerResult = this.context.onElement(element);
                TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, (TriggerResult)((Object)mergeTriggerResult.f0));
                this.fireOrContinue(combinedTriggerResult, actualWindow, windowState);
                if (combinedTriggerResult.isPurge()) {
                    this.cleanup(actualWindow, windowState, mergingWindows);
                    continue;
                }
                this.registerCleanupTimer(actualWindow);
            }
        } else {
            for (Window window : elementWindows) {
                if (this.isLate(window)) {
                    LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
                    continue;
                }
                ListState<StreamRecord<IN>> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element);
                this.context.key = key;
                this.context.window = window;
                TriggerResult triggerResult = this.context.onElement(element);
                this.fireOrContinue(triggerResult, window, windowState);
                if (triggerResult.isPurge()) {
                    this.cleanup(window, windowState, null);
                    continue;
                }
                this.registerCleanupTimer(window);
            }
        }
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        boolean fire;
        do {
            WindowOperator.Timer timer;
            if ((timer = (WindowOperator.Timer)this.watermarkTimersQueue.peek()) != null && timer.timestamp <= mark.getTimestamp()) {
                ListState<StreamRecord<IN>> windowState;
                fire = true;
                this.watermarkTimers.remove(timer);
                this.watermarkTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                MergingWindowSet mergingWindows = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindows = this.getMergingWindowSet();
                    Object stateWindow = mergingWindows.getStateWindow(this.context.window);
                    windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                } else {
                    windowState = this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                TriggerResult triggerResult = this.context.onEventTime(timer.timestamp);
                this.fireOrContinue(triggerResult, this.context.window, windowState);
                if (!triggerResult.isPurge() && (!this.windowAssigner.isEventTime() || !this.isCleanupTime(timer.window, timer.timestamp))) continue;
                this.cleanup(timer.window, windowState, mergingWindows);
                continue;
            }
            fire = false;
        } while (fire);
        this.output.emitWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    @Override
    public void trigger(long time) throws Exception {
        boolean fire;
        this.processingTimeTimerFutures.remove(time);
        this.processingTimeTimerTimestamps.remove(time, this.processingTimeTimerTimestamps.count(time));
        do {
            WindowOperator.Timer timer;
            if ((timer = (WindowOperator.Timer)this.processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
                ListState<StreamRecord<IN>> windowState;
                fire = true;
                this.processingTimeTimers.remove(timer);
                this.processingTimeTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                MergingWindowSet mergingWindows = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindows = this.getMergingWindowSet();
                    Object stateWindow = mergingWindows.getStateWindow(this.context.window);
                    windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                } else {
                    windowState = this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                TriggerResult triggerResult = this.context.onProcessingTime(timer.timestamp);
                this.fireOrContinue(triggerResult, this.context.window, windowState);
                if (!triggerResult.isPurge() && (this.windowAssigner.isEventTime() || !this.isCleanupTime(timer.window, timer.timestamp))) continue;
                this.cleanup(timer.window, windowState, mergingWindows);
                continue;
            }
            fire = false;
        } while (fire);
    }

    private void fireOrContinue(TriggerResult triggerResult, W window, ListState<StreamRecord<IN>> windowState) throws Exception {
        if (!triggerResult.isFire()) {
            return;
        }
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        Iterable contents = (Iterable)windowState.get();
        int toEvict = this.evictor.evict(contents, Iterables.size(contents), this.context.window);
        FluentIterable projectedContents = FluentIterable.from(contents).skip(toEvict).transform(new Function<StreamRecord<IN>, IN>(){

            @Override
            public IN apply(StreamRecord<IN> input) {
                return input.getValue();
            }
        });
        ((InternalWindowFunction)this.userFunction).apply(this.context.key, this.context.window, projectedContents, this.timestampedCollector);
    }

    private void cleanup(W window, ListState<StreamRecord<IN>> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
        }
        this.context.clear();
        this.deleteCleanupTimer(window);
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}

