/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.flume.node;

import com.google.common.base.Throwables;
import com.google.common.eventbus.Subscribe;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.MonitoringType;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.pulsar.io.flume.node.MaterializedConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Application {
    private static final Logger logger = LoggerFactory.getLogger(Application.class);
    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
    private final List<LifecycleAware> components;
    private final LifecycleSupervisor supervisor;
    private MaterializedConfiguration materializedConfiguration;
    private MonitorService monitorServer;
    private final ReentrantLock lifecycleLock = new ReentrantLock();

    public Application() {
        this(new ArrayList<LifecycleAware>(0));
    }

    public Application(List<LifecycleAware> components) {
        this.components = components;
        this.supervisor = new LifecycleSupervisor();
    }

    public void start() {
        this.lifecycleLock.lock();
        try {
            for (LifecycleAware component : this.components) {
                this.supervisor.supervise(component, (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration conf) {
        try {
            this.lifecycleLock.lockInterruptibly();
            this.stopAllComponents();
            this.startAllComponents(conf);
        }
        catch (InterruptedException e) {
            logger.info("Interrupted while trying to handle configuration event");
            return;
        }
        finally {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        this.stopAllComponents();
        try {
            this.supervisor.stop();
            if (this.monitorServer != null) {
                this.monitorServer.stop();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            logger.info("Shutting down configuration: {}", (Object)this.materializedConfiguration);
            for (Map.Entry entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                try {
                    logger.info("Stopping Source " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    logger.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getSinkRunners().entrySet()) {
                try {
                    logger.info("Stopping Sink " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    logger.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getChannels().entrySet()) {
                try {
                    logger.info("Stopping Channel " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    logger.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
        }
        if (this.monitorServer != null) {
            this.monitorServer.stop();
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        logger.info("Starting new configuration:{}", (Object)materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        for (Map.Entry entry : materializedConfiguration.getChannels().entrySet()) {
            try {
                logger.info("Starting Channel " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
        for (Channel ch : materializedConfiguration.getChannels().values()) {
            while (ch.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState((LifecycleAware)ch)) {
                try {
                    logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    logger.error("Interrupted while waiting for channel to start.", (Throwable)e);
                    Throwables.propagate((Throwable)e);
                }
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
                logger.info("Starting Sink " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSourceRunners().entrySet()) {
            try {
                logger.info("Starting Source " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
        this.loadMonitoring();
    }

    private void loadMonitoring() {
        block5: {
            Properties systemProps = System.getProperties();
            Set<String> keys = systemProps.stringPropertyNames();
            try {
                Class klass;
                if (!keys.contains(CONF_MONITOR_CLASS)) break block5;
                String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
                try {
                    klass = MonitoringType.valueOf((String)monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
                }
                catch (Exception e) {
                    klass = Class.forName(monitorType);
                }
                this.monitorServer = (MonitorService)klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                Context context = new Context();
                for (String key : keys) {
                    if (!key.startsWith(CONF_MONITOR_PREFIX)) continue;
                    context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key));
                }
                this.monitorServer.configure(context);
                this.monitorServer.start();
            }
            catch (Exception e) {
                logger.warn("Error starting monitoring. Monitoring might not be available.", (Throwable)e);
            }
        }
    }
}

