/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.grpc;

import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.grpc.GrpcMessageConverter;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class MessageHandlingHelper<T extends GeneratedMessageV3>
implements SmartLifecycle {
    private Log logger = LogFactory.getLog(MessageHandlingHelper.class);
    private final List<GrpcMessageConverter<?>> grpcConverters;
    private final FunctionProperties funcProperties;
    private final FunctionCatalog functionCatalog;
    private final ExecutorService executor;
    private boolean running;

    public MessageHandlingHelper(List<GrpcMessageConverter<?>> grpcConverters, FunctionCatalog functionCatalog, FunctionProperties funcProperties) {
        this.grpcConverters = grpcConverters;
        this.funcProperties = funcProperties;
        this.functionCatalog = functionCatalog;
        this.executor = Executors.newCachedThreadPool();
    }

    public void requestReply(T request, StreamObserver<T> responseObserver) {
        Message<byte[]> message = this.toSpringMessage((GeneratedMessageV3)request);
        SimpleFunctionRegistry.FunctionInvocationWrapper function = this.resolveFunction((Map<String, Object>)message.getHeaders());
        if (FunctionTypeUtils.isFlux((Type)function.getOutputType())) {
            String errorMessage = "Flux reply is not supported for `requestReply` mode";
            responseObserver.onError((Throwable)Status.UNKNOWN.withDescription(errorMessage).withCause((Throwable)new UnsupportedOperationException(errorMessage)).asRuntimeException());
            return;
        }
        Object replyMessage = function.apply(message);
        if (replyMessage instanceof Message) {
            Object reply2 = this.toGrpcMessage((Message<byte[]>)((Message)replyMessage), request.getClass());
            responseObserver.onNext(reply2);
            responseObserver.onCompleted();
        } else if (replyMessage instanceof Publisher && replyMessage instanceof Mono) {
            Mono.from((Publisher)((Publisher)replyMessage)).doOnNext(reply -> {
                Object replyGrps = this.toGrpcMessage((Message<byte[]>)((Message)reply), request.getClass());
                responseObserver.onNext(replyGrps);
                responseObserver.onCompleted();
            }).subscribe();
        }
    }

    public void serverStream(T request, StreamObserver<T> responseObserver) {
        Message<byte[]> message = this.toSpringMessage((GeneratedMessageV3)request);
        SimpleFunctionRegistry.FunctionInvocationWrapper function = this.resolveFunction((Map<String, Object>)message.getHeaders());
        Publisher replyStream = (Publisher)function.apply(message);
        Flux.from((Publisher)replyStream).doOnNext(replyMessage -> responseObserver.onNext(this.toGrpcMessage((Message<byte[]>)replyMessage, request.getClass()))).doOnComplete(() -> responseObserver.onCompleted()).subscribe();
    }

    public StreamObserver<T> clientStream(final StreamObserver<T> responseObserver, final Class<T> grpcMessageType) {
        final ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver)responseObserver;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        SimpleFunctionRegistry.FunctionInvocationWrapper function = this.resolveFunction(null);
        AtomicBoolean wasReady = new AtomicBoolean(false);
        serverCallStreamObserver.setOnReadyHandler(() -> {
            if (serverCallStreamObserver.isReady() && !wasReady.get()) {
                wasReady.set(true);
                this.logger.info((Object)"gRPC Server receiving stream is ready.");
                serverCallStreamObserver.request(1);
            }
        });
        if (!function.isInputTypePublisher()) {
            throw new UnsupportedOperationException("The client streaming is not supported for functions that accept non-Publisher: " + function);
        }
        if (function.isOutputTypePublisher()) {
            throw new UnsupportedOperationException("The client streaming is not supported for functions that return Publisher: " + function);
        }
        final Sinks.Many inputStream = Sinks.many().unicast().onBackpressureBuffer();
        Flux inputStreamFlux = inputStream.asFlux();
        final LinkedBlockingQueue resultRef = new LinkedBlockingQueue(1);
        this.executor.execute(() -> {
            Message replyMessage = (Message)function.apply((Object)inputStreamFlux);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Function invocation reply: " + replyMessage));
            }
            resultRef.offer(replyMessage);
        });
        return new StreamObserver<T>(){

            public void onNext(T inputMessage) {
                if (MessageHandlingHelper.this.logger.isDebugEnabled()) {
                    MessageHandlingHelper.this.logger.debug((Object)("gRPC Server receiving: " + inputMessage));
                }
                inputStream.tryEmitNext(MessageHandlingHelper.this.toSpringMessage((GeneratedMessageV3)inputMessage));
                serverCallStreamObserver.request(1);
            }

            public void onError(Throwable t) {
                t.printStackTrace();
                responseObserver.onError((Throwable)Status.UNKNOWN.withDescription("Error handling request").withCause(t).asRuntimeException());
            }

            public void onCompleted() {
                MessageHandlingHelper.this.logger.info((Object)"gRPC Server has finished receiving data.");
                inputStream.tryEmitComplete();
                try {
                    responseObserver.onNext(MessageHandlingHelper.this.toGrpcMessage((Message<byte[]>)((Message)resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)), grpcMessageType));
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    responseObserver.onCompleted();
                }
            }
        };
    }

    public StreamObserver<T> biStream(StreamObserver<T> responseObserver, Class<T> grpcMessageType) {
        ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver)responseObserver;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        SimpleFunctionRegistry.FunctionInvocationWrapper function = this.resolveFunction(null);
        AtomicBoolean wasReady = new AtomicBoolean(false);
        serverCallStreamObserver.setOnReadyHandler(() -> {
            if (serverCallStreamObserver.isReady() && !wasReady.get()) {
                wasReady.set(true);
                this.logger.info((Object)"gRPC Server receiving stream is ready.");
                serverCallStreamObserver.request(1);
            }
        });
        if (function.isInputTypePublisher()) {
            if (function.isOutputTypePublisher()) {
                return this.biStreamReactive(responseObserver, serverCallStreamObserver, grpcMessageType);
            }
            UnsupportedOperationException ex = new UnsupportedOperationException("The bi-directional streaming is not supported for functions that accept Publisher but return non-Publisher: " + function);
            responseObserver.onCompleted();
            throw ex;
        }
        if (!function.isOutputTypePublisher()) {
            return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady);
        }
        UnsupportedOperationException ex = new UnsupportedOperationException("The bidirection streaming is not supported for functions that accept non-Publisher but return Publisher: " + function);
        responseObserver.onCompleted();
        throw ex;
    }

    private StreamObserver<T> biStreamReactive(final StreamObserver<T> responseObserver, final ServerCallStreamObserver<T> serverCallStreamObserver, Class<T> grpcMessageType) {
        final Sinks.Many inputStream = Sinks.many().unicast().onBackpressureBuffer();
        Flux inputStreamFlux = inputStream.asFlux();
        SimpleFunctionRegistry.FunctionInvocationWrapper function = this.resolveFunction(null);
        Publisher outputPublisher = (Publisher)function.apply((Object)inputStreamFlux);
        Flux.from((Publisher)outputPublisher).subscribe(functionResult -> {
            T outputMessage = this.toGrpcMessage((Message<byte[]>)functionResult, grpcMessageType);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("gRPC Server replying: " + outputMessage));
            }
            responseObserver.onNext(outputMessage);
        });
        return new StreamObserver<T>(){

            public void onNext(T inputMessage) {
                if (MessageHandlingHelper.this.logger.isDebugEnabled()) {
                    MessageHandlingHelper.this.logger.debug((Object)("gRPC Server receiving: " + inputMessage));
                }
                inputStream.tryEmitNext(MessageHandlingHelper.this.toSpringMessage((GeneratedMessageV3)inputMessage));
                serverCallStreamObserver.request(1);
            }

            public void onError(Throwable t) {
                t.printStackTrace();
                inputStream.tryEmitComplete();
                responseObserver.onError((Throwable)Status.UNKNOWN.withDescription("Error handling request").withCause(t).asException());
            }

            public void onCompleted() {
                MessageHandlingHelper.this.logger.info((Object)"gRPC Server has finished receiving data.");
                inputStream.tryEmitComplete();
                responseObserver.onCompleted();
            }
        };
    }

    private StreamObserver<T> biStreamImperative(final StreamObserver<T> responseObserver, final ServerCallStreamObserver<T> serverCallStreamObserver, final AtomicBoolean wasReady) {
        return new StreamObserver<T>(){

            public void onNext(T request) {
                try {
                    Message<byte[]> message = MessageHandlingHelper.this.toSpringMessage((GeneratedMessageV3)request);
                    SimpleFunctionRegistry.FunctionInvocationWrapper function = MessageHandlingHelper.this.resolveFunction((Map<String, Object>)message.getHeaders());
                    Message replyMessage = (Message)function.apply(message);
                    Object reply = MessageHandlingHelper.this.toGrpcMessage((Message<byte[]>)replyMessage, request.getClass());
                    responseObserver.onNext(reply);
                    if (serverCallStreamObserver.isReady()) {
                        serverCallStreamObserver.request(1);
                    } else {
                        wasReady.set(false);
                    }
                }
                catch (Throwable throwable) {
                    throwable.printStackTrace();
                    responseObserver.onError((Throwable)Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException());
                }
            }

            public void onError(Throwable t) {
                t.printStackTrace();
                responseObserver.onCompleted();
            }

            public void onCompleted() {
                MessageHandlingHelper.this.logger.info((Object)"gRPC Server has finished receiving data.");
                responseObserver.onCompleted();
            }
        };
    }

    private T toGrpcMessage(Message<byte[]> request, Class<T> grpcClass) {
        for (GrpcMessageConverter<?> converter : this.grpcConverters) {
            Object grpcMessage = converter.fromSpringMessage(request, grpcClass);
            if (grpcMessage == null) continue;
            return (T)grpcMessage;
        }
        throw new IllegalStateException("Failed to convert Grpc Message to Spring Message: " + request);
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.executor.shutdown();
        try {
            Assert.isTrue((boolean)this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS), (String)"gRPC Server executor timed out while stopping, since there are currently executing tasks");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    private Message<byte[]> toSpringMessage(GeneratedMessageV3 request) {
        for (GrpcMessageConverter<?> converter : this.grpcConverters) {
            Message<byte[]> springMessage = converter.toSpringMessage(request);
            if (springMessage == null) continue;
            return springMessage;
        }
        throw new IllegalStateException("Failed to convert Grpc Message to Spring Message: " + request);
    }

    private SimpleFunctionRegistry.FunctionInvocationWrapper resolveFunction(Map<String, Object> headers) {
        String functionDefinition = this.funcProperties.getDefinition();
        if (!CollectionUtils.isEmpty(headers) && headers.containsKey("spring.cloud.function.definition")) {
            functionDefinition = (String)headers.get("spring.cloud.function.definition");
        }
        SimpleFunctionRegistry.FunctionInvocationWrapper function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition, new String[]{"application/json"});
        Assert.notNull((Object)function, () -> "Failed to lookup function " + this.funcProperties.getDefinition());
        return function;
    }
}

