/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.netflix.turbine.amqp;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.netflix.turbine.amqp.TurbineAmqpProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import rx.Observable;
import rx.subjects.PublishSubject;

@Configuration
@EnableConfigurationProperties(value={TurbineAmqpProperties.class})
public class TurbineAmqpConfiguration
implements SmartLifecycle {
    private static final Log log = LogFactory.getLog(TurbineAmqpConfiguration.class);
    private boolean running = false;
    @Autowired
    private TurbineAmqpProperties turbine;
    private int turbinePort;

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        Observable publishedStreams = StreamAggregator.aggregateGroupedStreams((Observable)this.hystrixSubject().groupBy(data -> InstanceKey.create((String)((String)data.get("instanceId"))))).doOnUnsubscribe(() -> log.info((Object)"Unsubscribing aggregation.")).doOnSubscribe(() -> log.info((Object)"Starting aggregation")).flatMap(o -> o).publish().refCount();
        this.turbinePort = this.turbine.getPort();
        if (this.turbinePort <= 0) {
            this.turbinePort = SocketUtils.findAvailableTcpPort((int)40000);
        }
        HttpServer httpServer = RxNetty.createHttpServer((int)this.turbinePort, (request, response) -> {
            log.info((Object)"SSE Request Received");
            response.getHeaders().setHeader("Content-Type", (Object)"text/event-stream");
            return publishedStreams.doOnUnsubscribe(() -> log.info((Object)"Unsubscribing RxNetty server connection")).flatMap(data -> response.writeAndFlush((Object)new ServerSentEvent(null, null, JsonUtility.mapToJson((Map)data))));
        }, (PipelineConfigurator)PipelineConfigurators.sseServerConfigurator());
        return httpServer;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        this.aggregatorServer().start();
    }

    public void stop() {
        try {
            this.aggregatorServer().shutdown();
        }
        catch (InterruptedException ex) {
            log.error((Object)"Error shutting down", (Throwable)ex);
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public int getTurbinePort() {
        return this.turbinePort;
    }
}

