/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.util.Assert;
import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.fn.BiFunction;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.GroupedStream;

public class WindowingOffsetManager
implements OffsetManager,
InitializingBean,
DisposableBean {
    private static final BiFunction<Long, Long, Long> maxFunction;
    private static final Function<PartitionAndOffset, Long> offsetFunction;
    private static final ComputeMaximumOffsetByPartitionFunction findHighestOffsetInPartitionGroup;
    private static final Function<PartitionAndOffset, Partition> getPartitionFunction;
    private static final FindHighestOffsetsByPartitionFunction findHighestOffsetsByPartition;
    private final Consumer<PartitionAndOffset> delegateUpdateOffset = new Consumer<PartitionAndOffset>(){

        public void accept(PartitionAndOffset partitionAndOffset) {
            WindowingOffsetManager.this.delegate.updateOffset(partitionAndOffset.getPartition(), partitionAndOffset.getOffset());
        }
    };
    private final Consumer<Void> offsetComplete = new Consumer<Void>(){

        public void accept(Void aVoid) {
            WindowingOffsetManager.this.createOffsetsStream();
        }
    };
    private final ReadWriteLock offsetsLock = new ReentrantReadWriteLock();
    private final OffsetManager delegate;
    private long timespan = 10000L;
    private int count = Integer.MAX_VALUE;
    private int shutdownTimeout = 2000;
    private volatile RingBufferProcessor<PartitionAndOffset> offsets;
    private volatile boolean closed;

    public WindowingOffsetManager(OffsetManager offsetManager) {
        this.delegate = offsetManager;
    }

    public void setTimespan(long timespan) {
        Assert.isTrue((timespan >= 0L ? 1 : 0) != 0, (String)"Timespan must be a positive value");
        this.timespan = timespan;
    }

    public void setCount(int count) {
        Assert.isTrue((count >= 0 ? 1 : 0) != 0, (String)"Count must be a positive value");
        this.count = count;
    }

    public void setShutdownTimeout(int shutdownTimeout) {
        this.shutdownTimeout = shutdownTimeout;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.count != 1) {
            this.createOffsetsStream();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createOffsetsStream() {
        if (!this.closed) {
            this.offsetsLock.writeLock().lock();
            try {
                this.offsets = RingBufferProcessor.share((String)"spring-integration-kafka-offset", (int)1024);
            }
            finally {
                this.offsetsLock.writeLock().unlock();
            }
            Streams.wrap(this.offsets).window(this.count, this.timespan, TimeUnit.MILLISECONDS).flatMap((Function)findHighestOffsetsByPartition).consume(this.delegateUpdateOffset, null, this.offsetComplete);
        }
    }

    public void destroy() throws Exception {
        this.flush();
        this.close();
        if (this.delegate instanceof DisposableBean) {
            ((DisposableBean)this.delegate).destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateOffset(Partition partition, long offset) {
        if (this.offsets != null) {
            this.offsetsLock.readLock().lock();
            try {
                this.offsets.onNext((Object)new PartitionAndOffset(partition, offset));
            }
            finally {
                this.offsetsLock.readLock().unlock();
            }
        } else {
            this.delegate.updateOffset(partition, offset);
        }
    }

    @Override
    public long getOffset(Partition partition) {
        this.doFlush();
        return this.delegate.getOffset(partition);
    }

    @Override
    public void deleteOffset(Partition partition) {
        this.doFlush();
        this.delegate.deleteOffset(partition);
    }

    @Override
    public void resetOffsets(Collection<Partition> partition) {
        this.doFlush();
        this.delegate.resetOffsets(partition);
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        this.delegate.close();
    }

    @Override
    public void flush() throws IOException {
        if (this.offsets != null) {
            this.offsets.awaitAndShutdown((long)this.shutdownTimeout, TimeUnit.MILLISECONDS);
        }
        this.delegate.flush();
    }

    private void doFlush() {
        try {
            this.flush();
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    static {
        Environment.initializeIfEmpty();
        maxFunction = new BiFunction<Long, Long, Long>(){

            public Long apply(Long aLong, Long bLong) {
                return Math.max(aLong, bLong);
            }
        };
        offsetFunction = new Function<PartitionAndOffset, Long>(){

            public Long apply(PartitionAndOffset partitionAndOffset) {
                return partitionAndOffset.getOffset();
            }
        };
        findHighestOffsetInPartitionGroup = new ComputeMaximumOffsetByPartitionFunction();
        getPartitionFunction = new Function<PartitionAndOffset, Partition>(){

            public Partition apply(PartitionAndOffset partitionAndOffset) {
                return partitionAndOffset.getPartition();
            }
        };
        findHighestOffsetsByPartition = new FindHighestOffsetsByPartitionFunction();
    }

    private static class FindHighestOffsetsByPartitionFunction
    implements Function<Stream<PartitionAndOffset>, Stream<PartitionAndOffset>> {
        private FindHighestOffsetsByPartitionFunction() {
        }

        public Stream<PartitionAndOffset> apply(Stream<PartitionAndOffset> windowBuffer) {
            return windowBuffer.groupBy(getPartitionFunction).flatMap((Function)findHighestOffsetInPartitionGroup);
        }
    }

    private static class ComputeMaximumOffsetByPartitionFunction
    implements Function<GroupedStream<Partition, PartitionAndOffset>, Stream<PartitionAndOffset>> {
        private ComputeMaximumOffsetByPartitionFunction() {
        }

        public Stream<PartitionAndOffset> apply(final GroupedStream<Partition, PartitionAndOffset> group) {
            return group.map(offsetFunction).reduce(maxFunction).map((Function)new Function<Long, PartitionAndOffset>(){

                public PartitionAndOffset apply(Long offset) {
                    return new PartitionAndOffset((Partition)group.key(), offset);
                }
            });
        }
    }

    private static class PartitionAndOffset {
        private final Partition partition;
        private final Long offset;

        public PartitionAndOffset(Partition partition, Long offset) {
            this.partition = partition;
            this.offset = offset;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public Long getOffset() {
            return this.offset;
        }

        public String toString() {
            return "PartitionAndOffset{partition=" + this.partition + ", offset=" + this.offset + '}';
        }
    }
}

