/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.asyncprocessing.AsyncFutureImpl;
import org.apache.flink.core.asyncprocessing.CompletedAsyncFuture;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.core.state.InternalStateIterator;

@Experimental
public class StateFutureUtils {
    public static <V> StateFuture<V> completedVoidFuture() {
        return new CompletedAsyncFuture<Object>(null);
    }

    public static <V> StateFuture<V> completedFuture(V result) {
        return new CompletedAsyncFuture<V>(result);
    }

    public static <T> StateFuture<Collection<T>> combineAll(Collection<? extends StateFuture<? extends T>> futures) {
        int count = futures.size();
        if (count == 0) {
            return new CompletedAsyncFuture<Collection<T>>(Collections.emptyList());
        }
        if (count == 1) {
            StateFuture<T> firstFuture = futures.stream().findFirst().get();
            return firstFuture.thenCompose(t -> StateFutureUtils.completedFuture(Collections.singletonList(t)));
        }
        Object[] results = new Object[count];
        AsyncFutureImpl pendingFuture = null;
        for (StateFuture<T> stateFuture : futures) {
            if (!(stateFuture instanceof AsyncFutureImpl)) continue;
            pendingFuture = (AsyncFutureImpl)stateFuture;
            break;
        }
        if (pendingFuture == null) {
            int i = 0;
            for (StateFuture<T> stateFuture : futures) {
                int index = i++;
                ((InternalAsyncFuture)stateFuture).thenSyncAccept(t -> {
                    results[index] = t;
                });
            }
            return new CompletedAsyncFuture<Collection<T>>(Arrays.asList(results));
        }
        int i = 0;
        AtomicInteger atomicInteger = new AtomicInteger(count);
        AsyncFutureImpl<Collection<T>> asyncFutureImpl = pendingFuture.makeNewFuture();
        for (StateFuture<T> stateFuture : futures) {
            int index = i++;
            ((InternalAsyncFuture)stateFuture).thenSyncAccept(t -> {
                results[index] = t;
                if (countDown.decrementAndGet() == 0) {
                    ret.complete(Arrays.asList(results));
                }
            });
        }
        return asyncFutureImpl;
    }

    public static <T> StateFuture<Iterable<T>> toIterable(StateFuture<StateIterator<T>> future) {
        return future.thenCompose(iterator -> {
            if (iterator == null) {
                return StateFutureUtils.completedFuture(Collections.emptyList());
            }
            InternalStateIterator theIterator = (InternalStateIterator)iterator;
            if (!theIterator.hasNextLoading()) {
                return StateFutureUtils.completedFuture(theIterator.getCurrentCache());
            }
            ArrayList result = new ArrayList();
            return theIterator.onNext(next -> result.add(next)).thenApply(ignored -> result);
        });
    }
}

