/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.elasticsearch.client.reactive;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.data.elasticsearch.client.reactive.RawActionResponse;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.RequestBodyEncodingException;
import org.springframework.data.elasticsearch.client.reactive.WebClientProvider;
import org.springframework.data.elasticsearch.client.util.RequestConverters;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

public class DefaultReactiveElasticsearchClient
implements ReactiveElasticsearchClient {
    private final HostProvider hostProvider;

    public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
        Assert.notNull((Object)hostProvider, (String)"HostProvider must not be null");
        this.hostProvider = hostProvider;
    }

    public static ReactiveElasticsearchClient create(HttpHeaders headers, String ... hosts) {
        Assert.notNull((Object)headers, (String)"HttpHeaders must not be null");
        Assert.notEmpty((Object[])hosts, (String)"Elasticsearch Cluster needs to consist of at least one host");
        ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(hosts).withDefaultHeaders(headers).build();
        return DefaultReactiveElasticsearchClient.create(clientConfiguration);
    }

    public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) {
        Assert.notNull((Object)clientConfiguration, (String)"ClientConfiguration must not be null");
        WebClientProvider provider = DefaultReactiveElasticsearchClient.getWebClientProvider(clientConfiguration);
        HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0]));
        return new DefaultReactiveElasticsearchClient(hostProvider);
    }

    private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
        Duration connectTimeout = clientConfiguration.getConnectTimeout();
        Duration soTimeout = clientConfiguration.getSocketTimeout();
        TcpClient tcpClient = TcpClient.create();
        if (!connectTimeout.isNegative()) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(connectTimeout.toMillis()));
        }
        if (!soTimeout.isNegative()) {
            tcpClient = tcpClient.doOnConnected(connection -> connection.addHandlerLast((ChannelHandler)new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)).addHandlerLast((ChannelHandler)new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
        }
        String scheme = "http";
        HttpClient httpClient = HttpClient.from((TcpClient)tcpClient);
        if (clientConfiguration.useSsl()) {
            httpClient = httpClient.secure(sslConfig -> {
                Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
                sslContext.ifPresent(it -> sslConfig.sslContext((SslContext)new JdkSslContext(it, true, ClientAuth.NONE)));
            });
            scheme = "https";
        }
        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
        WebClientProvider provider = WebClientProvider.create(scheme, (ClientHttpConnector)connector);
        return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders());
    }

    @Override
    public Mono<Boolean> ping(HttpHeaders headers) {
        return this.sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers).map(response -> response.statusCode().is2xxSuccessful()).onErrorResume(NoReachableHostException.class, error -> Mono.just((Object)false)).next();
    }

    @Override
    public Mono<MainResponse> info(HttpHeaders headers) {
        return this.sendRequest(new MainRequest(), RequestCreator.info(), MainResponse.class, headers).next();
    }

    @Override
    public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
        return this.sendRequest(getRequest, RequestCreator.get(), GetResponse.class, headers).filter(GetResponse::isExists).map(DefaultReactiveElasticsearchClient::getResponseToGetResult).next();
    }

    @Override
    public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
        return this.sendRequest(multiGetRequest, RequestCreator.multiGet(), MultiGetResponse.class, headers).map(MultiGetResponse::getResponses).flatMap(Flux::fromArray).filter(it -> !it.isFailed() && it.getResponse().isExists()).map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
    }

    @Override
    public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
        return this.sendRequest(getRequest, RequestCreator.exists(), RawActionResponse.class, headers).map(response -> response.statusCode().is2xxSuccessful()).next();
    }

    @Override
    public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
        return this.sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext();
    }

    @Override
    public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
        return this.sendRequest(updateRequest, RequestCreator.update(), UpdateResponse.class, headers).publishNext();
    }

    @Override
    public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
        return this.sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers).publishNext();
    }

    @Override
    public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
        return this.sendRequest(searchRequest, RequestCreator.search(), SearchResponse.class, headers).map(SearchResponse::getHits).flatMap(Flux::fromIterable);
    }

    @Override
    public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
        TimeValue scrollTimeout;
        TimeValue timeValue = scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive() : TimeValue.timeValueMinutes((long)1L);
        if (searchRequest.scroll() == null) {
            searchRequest.scroll(scrollTimeout);
        }
        EmitterProcessor outbound = EmitterProcessor.create((boolean)false);
        FluxSink request = outbound.sink();
        EmitterProcessor inbound = EmitterProcessor.create((boolean)false);
        Flux exchange = outbound.startWith((Object[])new ActionRequest[]{searchRequest}).flatMap(it -> {
            if (it instanceof SearchRequest) {
                return this.sendRequest((SearchRequest)it, RequestCreator.search(), SearchResponse.class, headers);
            }
            if (it instanceof SearchScrollRequest) {
                return this.sendRequest((SearchScrollRequest)it, RequestCreator.scroll(), SearchResponse.class, headers);
            }
            if (it instanceof ClearScrollRequest) {
                return this.sendRequest((ClearScrollRequest)it, RequestCreator.clearScroll(), ClearScrollResponse.class, headers).flatMap(discard -> Flux.empty());
            }
            throw new IllegalArgumentException(String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it));
        });
        return Flux.usingWhen((Publisher)Mono.fromSupplier(() -> new ScrollState()), scrollState -> {
            Flux searchHits = inbound.handle((searchResponse, sink) -> {
                scrollState.updateScrollId(searchResponse.getScrollId());
                if (DefaultReactiveElasticsearchClient.isEmpty(searchResponse.getHits())) {
                    inbound.onComplete();
                    outbound.onComplete();
                } else {
                    sink.next(searchResponse);
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()).scroll(scrollTimeout);
                    request.next((Object)searchScrollRequest);
                }
            }).map(SearchResponse::getHits).flatMap(Flux::fromIterable);
            return searchHits.doOnSubscribe(ignore -> exchange.subscribe((CoreSubscriber)inbound));
        }, state -> this.cleanupScroll(headers, (ScrollState)state), state -> this.cleanupScroll(headers, (ScrollState)state), state -> this.cleanupScroll(headers, (ScrollState)state));
    }

    private static boolean isEmpty(@Nullable SearchHits hits) {
        return hits != null && hits.getHits() != null && hits.getHits().length == 0;
    }

    private Publisher<?> cleanupScroll(HttpHeaders headers, ScrollState state) {
        if (state.getScrollIds().isEmpty()) {
            return Mono.empty();
        }
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.scrollIds(state.getScrollIds());
        return this.sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers);
    }

    @Override
    public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
        return this.sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers).publishNext();
    }

    @Override
    public Mono<ClientResponse> execute(ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback callback) {
        return this.hostProvider.getActive(HostProvider.Verification.LAZY).flatMap(callback::doWithClient).onErrorResume(throwable -> {
            if (throwable instanceof ConnectException) {
                return this.hostProvider.getActive(HostProvider.Verification.ACTIVE).flatMap(callback::doWithClient);
            }
            return Mono.error((Throwable)throwable);
        });
    }

    @Override
    public Mono<ReactiveElasticsearchClient.Status> status() {
        return this.hostProvider.clusterInfo().map(it -> new ClientStatus(it.getNodes()));
    }

    private static GetResult getResponseToGetResult(GetResponse response) {
        return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(), response.getFields());
    }

    private <Req extends ActionRequest, Resp extends ActionResponse> Flux<Resp> sendRequest(Req request, Function<Req, Request> converter, Class<Resp> responseType, HttpHeaders headers) {
        return this.sendRequest(converter.apply(request), responseType, headers);
    }

    private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<AR> responseType, HttpHeaders headers) {
        String logId = ClientLogger.newLogId();
        return this.execute(webClient -> this.sendRequest(webClient, logId, request, headers)).flatMapMany(response -> this.readResponseBody(logId, request, (ClientResponse)response, responseType));
    }

    private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
        WebClient.RequestBodySpec requestBodySpec = (WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)webClient.method(HttpMethod.valueOf((String)request.getMethod().toUpperCase())).uri(builder -> {
            builder = builder.path(request.getEndpoint());
            if (!ObjectUtils.isEmpty((Object)request.getParameters())) {
                for (Map.Entry entry : request.getParameters().entrySet()) {
                    builder = builder.queryParam((String)entry.getKey(), new Object[]{entry.getValue()});
                }
            }
            return builder.build(new Object[0]);
        })).attribute(ClientRequest.LOG_ID_ATTRIBUTE, (Object)logId)).headers(theHeaders -> {
            theHeaders.addAll((MultiValueMap)headers);
            if (request.getOptions() != null && !ObjectUtils.isEmpty((Object)request.getOptions().getHeaders())) {
                request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
            }
        });
        if (request.getEntity() != null) {
            Lazy<String> body = this.bodyExtractor(request);
            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), () -> body.get());
            requestBodySpec.contentType(MediaType.valueOf((String)request.getEntity().getContentType().getValue()));
            requestBodySpec.body((Publisher)Mono.fromSupplier(() -> body.get()), String.class);
        } else {
            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
        }
        return requestBodySpec.exchange().onErrorReturn(ConnectException.class, (Object)ClientResponse.create((HttpStatus)HttpStatus.SERVICE_UNAVAILABLE).build());
    }

    private Lazy<String> bodyExtractor(Request request) {
        return Lazy.of(() -> {
            try {
                return EntityUtils.toString((HttpEntity)request.getEntity());
            }
            catch (IOException e) {
                throw new RequestBodyEncodingException("Error encoding request", e);
            }
        });
    }

    private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response, Class<T> responseType) {
        if (RawActionResponse.class.equals(responseType)) {
            ClientLogger.logRawResponse(logId, response.statusCode());
            return Mono.just(responseType.cast((Object)RawActionResponse.create(response)));
        }
        if (response.statusCode().is5xxServerError()) {
            ClientLogger.logRawResponse(logId, response.statusCode());
            return DefaultReactiveElasticsearchClient.handleServerError(request, response);
        }
        return ((Mono)response.body(BodyExtractors.toMono(byte[].class))).map(it -> new String((byte[])it, StandardCharsets.UTF_8)).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)).flatMap(content -> DefaultReactiveElasticsearchClient.doDecode(response, responseType, content));
    }

    private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {
        String mediaType = response.headers().contentType().map(MimeType::toString).orElse(XContentType.JSON.mediaType());
        try {
            Method fromXContent = ReflectionUtils.findMethod(responseType, (String)"fromXContent", (Class[])new Class[]{XContentParser.class});
            return Mono.justOrEmpty(responseType.cast(ReflectionUtils.invokeMethod((Method)fromXContent, responseType, (Object[])new Object[]{DefaultReactiveElasticsearchClient.createParser(mediaType, content)})));
        }
        catch (Exception errorParseFailure) {
            try {
                return Mono.error((Throwable)BytesRestResponse.errorFromXContent((XContentParser)DefaultReactiveElasticsearchClient.createParser(mediaType, content)));
            }
            catch (Exception e) {
                return Mono.error((Throwable)new ElasticsearchStatusException(content, RestStatus.fromCode((int)response.statusCode().value()), new Object[0]));
            }
        }
    }

    private static XContentParser createParser(String mediaType, String content) throws IOException {
        return XContentType.fromMediaTypeOrFormat((String)mediaType).xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
    }

    private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
        return Mono.error((Throwable)new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(), response.statusCode().value())));
    }

    private static class ScrollState {
        private final Object lock = new Object();
        private final List<String> pastIds = new ArrayList<String>(1);
        private String scrollId;

        private ScrollState() {
        }

        String getScrollId() {
            return this.scrollId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        List<String> getScrollIds() {
            Object object = this.lock;
            synchronized (object) {
                return Collections.unmodifiableList(new ArrayList<String>(this.pastIds));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void updateScrollId(String scrollId) {
            if (StringUtils.hasText((String)scrollId)) {
                Object object = this.lock;
                synchronized (object) {
                    this.scrollId = scrollId;
                    this.pastIds.add(scrollId);
                }
            }
        }
    }

    class ClientStatus
    implements ReactiveElasticsearchClient.Status {
        private final Collection<ElasticsearchHost> connectedHosts;

        ClientStatus(Collection<ElasticsearchHost> connectedHosts) {
            this.connectedHosts = connectedHosts;
        }

        @Override
        public Collection<ElasticsearchHost> hosts() {
            return this.connectedHosts;
        }
    }

    static class RequestCreator {
        RequestCreator() {
        }

        static Function<SearchRequest, Request> search() {
            return RequestConverters::search;
        }

        static Function<SearchScrollRequest, Request> scroll() {
            return RequestConverters::searchScroll;
        }

        static Function<ClearScrollRequest, Request> clearScroll() {
            return RequestConverters::clearScroll;
        }

        static Function<IndexRequest, Request> index() {
            return RequestConverters::index;
        }

        static Function<GetRequest, Request> get() {
            return RequestConverters::get;
        }

        static Function<MainRequest, Request> ping() {
            return request -> RequestConverters.ping();
        }

        static Function<MainRequest, Request> info() {
            return request -> RequestConverters.info();
        }

        static Function<MultiGetRequest, Request> multiGet() {
            return RequestConverters::multiGet;
        }

        static Function<GetRequest, Request> exists() {
            return RequestConverters::exists;
        }

        static Function<UpdateRequest, Request> update() {
            return RequestConverters::update;
        }

        static Function<DeleteRequest, Request> delete() {
            return RequestConverters::delete;
        }

        static Function<DeleteByQueryRequest, Request> deleteByQuery() {
            return request -> {
                try {
                    return RequestConverters.deleteByQuery(request);
                }
                catch (IOException e) {
                    throw new ElasticsearchException("Could not parse request", e);
                }
            };
        }
    }
}

