/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.metadata.CompositeMetadata;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.rsocket.DefaultRSocketRequester;
import org.springframework.messaging.rsocket.PayloadUtils;
import org.springframework.messaging.rsocket.RSocketMessageHandler;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

class MessagingRSocket
extends AbstractRSocket {
    private final RSocketMessageHandler messageHandler;
    private final RouteMatcher routeMatcher;
    private final RSocketRequester requester;
    private final MimeType dataMimeType;
    private final MimeType metadataMimeType;
    private final DataBufferFactory bufferFactory;

    MessagingRSocket(RSocketMessageHandler messageHandler, RouteMatcher routeMatcher, RSocketRequester requester, MimeType dataMimeType, MimeType metadataMimeType, DataBufferFactory bufferFactory) {
        Assert.notNull((Object)messageHandler, (String)"'messageHandler' is required");
        Assert.notNull((Object)routeMatcher, (String)"'routeMatcher' is required");
        Assert.notNull((Object)requester, (String)"'requester' is required");
        Assert.notNull((Object)requester, (String)"'dataMimeType' is required");
        Assert.notNull((Object)requester, (String)"'metadataMimeType' is required");
        Assert.isTrue((boolean)DefaultRSocketRequester.METADATA_MIME_TYPES.contains(metadataMimeType), () -> "Unexpected metadatata mime type: '" + metadataMimeType + "'");
        this.messageHandler = messageHandler;
        this.routeMatcher = routeMatcher;
        this.requester = requester;
        this.dataMimeType = dataMimeType;
        this.metadataMimeType = metadataMimeType;
        this.bufferFactory = bufferFactory;
    }

    public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
        payload.retain();
        return this.handle((Payload)payload);
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return this.handle(payload);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return this.handleAndReply(payload, (Flux<Payload>)Flux.just((Object)payload)).next();
    }

    public Flux<Payload> requestStream(Payload payload) {
        return this.handleAndReply(payload, (Flux<Payload>)Flux.just((Object)payload));
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> {
            Payload firstPayload = (Payload)signal.get();
            return firstPayload == null ? innerFlux : this.handleAndReply(firstPayload, (Flux<Payload>)innerFlux);
        });
    }

    public Mono<Void> metadataPush(Payload payload) {
        return this.handle(payload);
    }

    private Mono<Void> handle(Payload payload) {
        String destination = this.getDestination(payload);
        MessageHeaders headers = this.createHeaders(destination, null);
        DataBuffer dataBuffer = this.retainDataAndReleasePayload(payload);
        int refCount = this.refCount(dataBuffer);
        Message<DataBuffer> message = MessageBuilder.createMessage(dataBuffer, headers);
        return Mono.defer(() -> this.messageHandler.handleMessage(message)).doFinally(s -> {
            if (this.refCount(dataBuffer) == refCount) {
                DataBufferUtils.release((DataBuffer)dataBuffer);
            }
        });
    }

    private int refCount(DataBuffer dataBuffer) {
        return dataBuffer instanceof NettyDataBuffer ? ((NettyDataBuffer)dataBuffer).getNativeBuffer().refCnt() : 1;
    }

    private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payloads) {
        MonoProcessor replyMono = MonoProcessor.create();
        String destination = this.getDestination(firstPayload);
        MessageHeaders headers = this.createHeaders(destination, replyMono);
        AtomicBoolean read = new AtomicBoolean();
        Flux buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
        Message<Flux> message = MessageBuilder.createMessage(buffers, headers);
        return Mono.defer(() -> this.messageHandler.handleMessage(message)).doFinally(s -> {
            if (!read.get()) {
                buffers.subscribe(DataBufferUtils::release);
            }
        }).thenMany((Publisher)Flux.defer(() -> replyMono.isTerminated() ? replyMono.flatMapMany(Function.identity()) : Mono.error((Throwable)new IllegalStateException("Something went wrong: reply Mono not set"))));
    }

    private String getDestination(Payload payload) {
        if (this.metadataMimeType.equals((Object)DefaultRSocketRequester.COMPOSITE_METADATA)) {
            CompositeMetadata metadata = new CompositeMetadata(payload.metadata(), false);
            for (CompositeMetadata.Entry entry : metadata) {
                String mimeType = entry.getMimeType();
                if (!DefaultRSocketRequester.ROUTING.toString().equals(mimeType)) continue;
                return entry.getContent().toString(StandardCharsets.UTF_8);
            }
            return "";
        }
        if (this.metadataMimeType.equals((Object)DefaultRSocketRequester.ROUTING)) {
            return payload.getMetadataUtf8();
        }
        throw new IllegalArgumentException("Unexpected metadata MimeType");
    }

    private DataBuffer retainDataAndReleasePayload(Payload payload) {
        return PayloadUtils.retainDataAndReleasePayload(payload, this.bufferFactory);
    }

    private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) {
        MessageHeaderAccessor headers = new MessageHeaderAccessor();
        headers.setLeaveMutable(true);
        RouteMatcher.Route route = this.routeMatcher.parseRoute(destination);
        headers.setHeader("lookupDestination", route);
        headers.setContentType(this.dataMimeType);
        headers.setHeader("rsocketRequester", this.requester);
        if (replyMono != null) {
            headers.setHeader("rsocketResponse", replyMono);
        }
        headers.setHeader("dataBufferFactory", this.bufferFactory);
        return headers.getMessageHeaders();
    }
}

