/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.grpc.runtime;

import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.runtime.StreamCollector;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import java.util.function.Consumer;
import java.util.function.Function;

public class ServerCalls {
    private static StreamCollector streamCollector = StreamCollector.NO_OP;

    private ServerCalls() {
    }

    public static <I, O> void oneToOne(I request, final StreamObserver<O> response, Function<I, Uni<O>> implementation) {
        try {
            Uni<O> uni = implementation.apply(request);
            uni.subscribe().with(new Consumer<O>(){

                @Override
                public void accept(O item) {
                    response.onNext(item);
                    response.onCompleted();
                }
            }, (Consumer)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable failure) {
                    response.onError(ServerCalls.toStatusFailure(failure));
                }
            });
        }
        catch (Throwable throwable) {
            response.onError(ServerCalls.toStatusFailure(throwable));
        }
    }

    public static <I, O> void oneToMany(I request, final StreamObserver<O> response, Function<I, Multi<O>> implementation) {
        try {
            streamCollector.add(response);
            implementation.apply(request).subscribe().with(new Consumer<O>(){

                @Override
                public void accept(O v) {
                    response.onNext(v);
                }
            }, (Consumer)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    ServerCalls.onError(response, throwable);
                }
            }, new Runnable(){

                @Override
                public void run() {
                    ServerCalls.onCompleted(response);
                }
            });
        }
        catch (Throwable throwable) {
            ServerCalls.onError(response, ServerCalls.toStatusFailure(throwable));
        }
    }

    public static <I, O> StreamObserver<I> manyToOne(final StreamObserver<O> response, Function<Multi<I>, Uni<O>> implementation) {
        try {
            UnicastProcessor input = UnicastProcessor.create();
            StreamObserver<I> pump = ServerCalls.getStreamObserverFeedingProcessor(input);
            streamCollector.add(response);
            Uni<O> uni = implementation.apply((Multi<I>)input);
            uni.subscribe().with(new Consumer<O>(){

                @Override
                public void accept(O item) {
                    response.onNext(item);
                    ServerCalls.onCompleted(response);
                }
            }, (Consumer)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable failure) {
                    ServerCalls.onError(response, ServerCalls.toStatusFailure(failure));
                }
            });
            return pump;
        }
        catch (Throwable throwable) {
            response.onError(ServerCalls.toStatusFailure(throwable));
            return null;
        }
    }

    public static <I, O> StreamObserver<I> manyToMany(final StreamObserver<O> response, Function<Multi<I>, Multi<O>> implementation) {
        try {
            streamCollector.add(response);
            UnicastProcessor input = UnicastProcessor.create();
            StreamObserver<I> pump = ServerCalls.getStreamObserverFeedingProcessor(input);
            Multi<O> uni = implementation.apply((Multi<I>)input);
            uni.subscribe().with(new Consumer<O>(){

                @Override
                public void accept(O v) {
                    response.onNext(v);
                }
            }, (Consumer)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable failure) {
                    ServerCalls.onError(response, ServerCalls.toStatusFailure(failure));
                }
            }, new Runnable(){

                @Override
                public void run() {
                    ServerCalls.onCompleted(response);
                }
            });
            return pump;
        }
        catch (Throwable throwable) {
            ServerCalls.onError(response, ServerCalls.toStatusFailure(throwable));
            return null;
        }
    }

    private static <O> void onCompleted(StreamObserver<O> response) {
        response.onCompleted();
        streamCollector.remove(response);
    }

    private static <O> void onError(StreamObserver<O> response, Throwable error) {
        response.onError(error);
        streamCollector.remove(response);
    }

    private static <I> StreamObserver<I> getStreamObserverFeedingProcessor(final UnicastProcessor<I> input) {
        StreamObserver result = new StreamObserver<I>(){

            public void onNext(I i) {
                input.onNext(i);
            }

            public void onError(Throwable throwable) {
                input.onError(throwable);
                streamCollector.remove(this);
            }

            public void onCompleted() {
                input.onComplete();
                streamCollector.remove(this);
            }
        };
        streamCollector.add(result);
        return result;
    }

    private static Throwable toStatusFailure(Throwable throwable) {
        if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
            return throwable;
        }
        String desc = throwable.getClass().getName();
        if (throwable.getMessage() != null) {
            desc = desc + " - " + throwable.getMessage();
        }
        if (throwable instanceof IllegalArgumentException) {
            return Status.INVALID_ARGUMENT.withDescription(desc).asException();
        }
        return Status.fromThrowable((Throwable)throwable).withDescription(desc).asException();
    }

    public static void setStreamCollector(StreamCollector collector) {
        streamCollector = collector;
    }

    public static StreamCollector getStreamCollector() {
        return streamCollector;
    }
}

