/*
 * Decompiled with CFR 0.152.
 */
package org.jenkinsci.plugins.ssegateway.sse;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.Extension;
import hudson.model.User;
import hudson.util.CopyOnWriteMap;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.servlet.http.HttpSessionEvent;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import jenkins.model.Jenkins;
import jenkins.util.HttpSessionListener;
import net.sf.json.JSONObject;
import org.acegisecurity.Authentication;
import org.jenkinsci.plugins.pubsub.ChannelSubscriber;
import org.jenkinsci.plugins.pubsub.EventFilter;
import org.jenkinsci.plugins.pubsub.EventProps;
import org.jenkinsci.plugins.pubsub.Message;
import org.jenkinsci.plugins.pubsub.MessageException;
import org.jenkinsci.plugins.pubsub.PubsubBus;
import org.jenkinsci.plugins.pubsub.SimpleMessage;
import org.jenkinsci.plugins.ssegateway.EventHistoryStore;
import org.jenkinsci.plugins.ssegateway.Util;
import org.jenkinsci.plugins.ssegateway.sse.EventDispatcherFactory;
import org.jenkinsci.plugins.ssegateway.sse.SSEChannel;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Restricted(value={NoExternalUse.class})
public abstract class EventDispatcher
implements Serializable {
    public static final String SESSION_SYNC_OBJ = "org.jenkinsci.plugins.ssegateway.sse.session.sync";
    private static final Logger LOGGER = LoggerFactory.getLogger((String)EventDispatcher.class.getName());
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Integer.getInteger(EventDispatcher.class.getName() + ".scheduledExecutorService.size", 4), r -> new Thread(r, "EventDispatcher.retryProcessor"));
    private volatile boolean isRetryLoopActive = false;
    @SuppressFBWarnings(value={"MS_SHOULD_BE_FINAL"})
    public static long RETRY_QUEUE_EVENT_LIFETIME = Integer.getInteger(EventDispatcher.class.getName() + ".RETRY_QUEUE_EVENT_LIFETIME", 60) * 1000;
    @SuppressFBWarnings(value={"MS_SHOULD_BE_FINAL"})
    public static long RETRY_QUEUE_PROCESSING_DELAY = Integer.getInteger(EventDispatcher.class.getName() + ".RETRY_QUEUE_PROCESSING_DELAY", 250).intValue();
    private String id = null;
    private final transient PubsubBus bus;
    private final transient Authentication authentication;
    transient Map<EventFilter, ChannelSubscriber> subscribers = new CopyOnWriteMap.Hash();
    private long timestamp_dispatchEventOK = System.currentTimeMillis();
    @SuppressFBWarnings(value={"MS_SHOULD_BE_FINAL"})
    public static long TIMEOUT_DISPATCHERFAIL = Integer.getInteger(EventDispatcher.class.getName() + ".TIMEOUT_DISPATCHERFAIL", 900) * 1000;
    transient Queue<Retry> retryQueue = new ConcurrentLinkedQueue<Retry>();

    public EventDispatcher() {
        this.bus = PubsubBus.getBus();
        User current = this.getUser();
        this.authentication = current != null ? Jenkins.getAuthentication() : Jenkins.ANONYMOUS;
    }

    public abstract void start(HttpServletRequest var1, HttpServletResponse var2) throws IOException, ServletException;

    public abstract HttpServletResponse getResponse();

    public Map<EventFilter, ChannelSubscriber> getSubscribers() {
        return Collections.unmodifiableMap(this.subscribers);
    }

    public final String getId() {
        if (this.id == null) {
            throw new IllegalStateException("Call to getId before the ID was set.");
        }
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String toString() {
        return String.format("%s (%s)", this.id, System.identityHashCode(this));
    }

    private void checkDispatcherFailTimeout(String step) {
        long t_curr = System.currentTimeMillis();
        long t_diff = t_curr - this.timestamp_dispatchEventOK;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("SSE dispatcher %s %s fail - %d - %d - %d", this, step, t_curr, t_diff, TIMEOUT_DISPATCHERFAIL));
        }
        if (t_diff > TIMEOUT_DISPATCHERFAIL) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("SSE dispatcher %s %s fail - timediff > TIMEOUT_DISPATCHERFAIL", this, step));
            }
            this.retryQueue.clear();
            this.unsubscribeAll();
        }
    }

    public synchronized boolean dispatchEvent(String name, String data) throws IOException, ServletException {
        PrintWriter writer;
        HttpServletResponse response = null;
        try {
            response = this.getResponse();
        }
        catch (IllegalStateException e) {
            LOGGER.debug("cannot get response channel connection may have been closed", (Throwable)e);
        }
        if (response == null) {
            this.checkDispatcherFailTimeout("response");
            return false;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("SSE dispatcher %s sending event: %s", this, data));
        }
        if ((writer = response.getWriter()).checkError()) {
            this.checkDispatcherFailTimeout("writer.checkError");
            return false;
        }
        if (name != null) {
            writer.write("event: " + name + "\n");
        }
        if (data != null) {
            writer.write("data: " + data + "\n");
        }
        writer.write("\n");
        boolean writerStatus = writer.checkError();
        if (!writerStatus) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("SSE dispatcher %s writer ok - %d", this, System.currentTimeMillis()));
            }
            this.timestamp_dispatchEventOK = System.currentTimeMillis();
        } else {
            this.checkDispatcherFailTimeout("writer.write");
        }
        return !writerStatus;
    }

    public void stop() {
    }

    void setDefaultHeaders() {
        HttpServletResponse response = this.getResponse();
        response.setStatus(200);
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
    }

    public boolean subscribe(@NonNull EventFilter filter) {
        String channelName = filter.getChannelName();
        if (channelName != null) {
            SSEChannelSubscriber subscriber = (SSEChannelSubscriber)this.subscribers.get(filter);
            if (subscriber == null) {
                subscriber = new SSEChannelSubscriber();
                this.bus.subscribe(channelName, (ChannelSubscriber)subscriber, this.authentication, filter);
                this.subscribers.put(filter, subscriber);
            }
            ++subscriber.numSubscribers;
            this.publishStateEvent(SSEChannel.Event.subscribe, ((SimpleMessage)((SimpleMessage)new SimpleMessage().set((Enum)SSEChannel.EventProps.sse_subs_dispatcher, this.id)).set((Enum)SSEChannel.EventProps.sse_subs_channel_name, channelName)).set((Enum)SSEChannel.EventProps.sse_subs_filter, filter.toJSON()));
            return true;
        }
        LOGGER.error(String.format("Invalid SSE subscribe configuration. '%s' not specified.", EventProps.Jenkins.jenkins_channel));
        return false;
    }

    protected User getUser() {
        return User.current();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean unsubscribe(@NonNull EventFilter filter) {
        String channelName = filter.getChannelName();
        if (channelName != null) {
            SSEChannelSubscriber subscriber = (SSEChannelSubscriber)this.subscribers.get(filter);
            if (subscriber != null) {
                --subscriber.numSubscribers;
                if (subscriber.numSubscribers == 0) {
                    try {
                        this.bus.unsubscribe(channelName, (ChannelSubscriber)subscriber);
                    }
                    finally {
                        this.subscribers.remove(filter);
                    }
                }
                this.publishStateEvent(SSEChannel.Event.unsubscribe, ((SimpleMessage)((SimpleMessage)new SimpleMessage().set((Enum)SSEChannel.EventProps.sse_subs_dispatcher, this.id)).set((Enum)SSEChannel.EventProps.sse_subs_channel_name, channelName)).set((Enum)SSEChannel.EventProps.sse_subs_filter, filter.toJSON()));
                return true;
            }
            LOGGER.info("Invalid SSE unsubscribe configuration. No active subscription for channel: {}", (Object)channelName);
        } else {
            LOGGER.error(String.format("Invalid SSE unsubscribe configuration. '%s' not specified.", EventProps.Jenkins.jenkins_channel));
        }
        return false;
    }

    public void unsubscribeAll() {
        Set<Map.Entry<EventFilter, ChannelSubscriber>> entries = this.subscribers.entrySet();
        for (Map.Entry<EventFilter, ChannelSubscriber> entry : entries) {
            SSEChannelSubscriber subscriber = (SSEChannelSubscriber)entry.getValue();
            EventFilter filter = entry.getKey();
            String channelName = filter.getChannelName();
            this.bus.unsubscribe(channelName, (ChannelSubscriber)subscriber);
        }
        this.subscribers.clear();
    }

    private void scheduleRetryQueueProcessing(long delay) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("EventDispatcher (%s) - scheduleRetryQueueProcessing(%d)", this, delay));
        }
        if (delay > 0L) {
            try {
                scheduledExecutorService.schedule(this::processRetries, delay, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOGGER.info(String.format("EventDispatcher (%s) - scheduleRetryQueueProcessing - Error scheduling retry.", this), (Throwable)e);
            }
        } else {
            this.processRetries();
        }
    }

    private void publishStateEvent(SSEChannel.Event event, Message additional) {
        if (!Util.isTestEnv()) {
            return;
        }
        try {
            SimpleMessage message = (SimpleMessage)((SimpleMessage)((SimpleMessage)new SimpleMessage().setChannelName("sse")).setEventName((Enum)event)).set("sse_numsubs", Integer.toString(this.subscribers.size()));
            if (additional != null) {
                message.putAll((Map)additional);
            }
            this.bus.publish((Message)message);
        }
        catch (MessageException e) {
            LOGGER.warn("Failed to publish SSE Dispatcher state event.", (Throwable)e);
        }
    }

    private void dispatchReload() {
        this.retryQueue.clear();
        try {
            this.dispatchEvent("reload", null);
        }
        catch (Exception e) {
            LOGGER.warn("Unable to send reload event to client.", (Throwable)e);
        }
    }

    private void validateDispatcher() {
        Retry retry = this.retryQueue.peek();
        if (retry != null) {
            long ctime = System.currentTimeMillis();
            long retry_age = ctime - retry.timestamp;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("EventDispatcher (%s) - timestamp: %d - current: %d - age: %d", this, retry.timestamp, ctime, retry_age));
            }
            if (retry_age > RETRY_QUEUE_EVENT_LIFETIME) {
                LOGGER.debug("EventDispatcher {} processRetries - clear retryQueue", (Object)this);
                this.retryQueue.clear();
            }
        }
        this.checkDispatcherFailTimeout("dispatcher.validation");
    }

    void addToRetryQueue(@NonNull Message message) {
        this.validateDispatcher();
        boolean isFirstEvent = this.retryQueue.isEmpty();
        if (!this.retryQueue.add(new Retry(message)) || this.subscribers.isEmpty()) {
            this.dispatchReload();
        } else if (isFirstEvent) {
            this.scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void processRetries() {
        if (!this.isRetryLoopActive) {
            this.isRetryLoopActive = true;
            this.validateDispatcher();
            Retry retry = this.retryQueue.peek();
            try {
                while (retry != null) {
                    String eventJSON;
                    block20: {
                        try {
                            eventJSON = EventHistoryStore.getChannelEvent(retry.channelName, retry.eventUUID);
                            if (eventJSON != null) break block20;
                            if (!retry.needsMoreTimeToLandInStore()) {
                                this.dispatchReload();
                                return;
                            }
                            return;
                        }
                        catch (Exception e) {
                            LOGGER.debug(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. Write failed.", this), (Throwable)e);
                            return;
                        }
                    }
                    if (Util.isTestEnv()) {
                        JSONObject eventJSONObj = JSONObject.fromObject((Object)eventJSON);
                        eventJSONObj.put(SSEChannel.EventProps.sse_dispatch_retry.name(), (Object)"true");
                        eventJSON = eventJSONObj.toString();
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("EventDispatcher (%s) - retry event: %s", this, eventJSON));
                    }
                    if (!this.dispatchEvent(retry.channelName, eventJSON)) {
                        LOGGER.debug(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. dispatchEvent failed.", this));
                        return;
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("EventDispatcher ({0}) - Dispatched retry event to SSE channel. Event {1}.", new Object[]{this, eventJSON});
                    }
                    this.retryQueue.remove();
                    retry = this.retryQueue.peek();
                }
            }
            catch (Exception e) {
                LOGGER.warn(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. Write failed.", this), (Throwable)e);
                return;
            }
            finally {
                if (!this.retryQueue.isEmpty()) {
                    this.scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                }
                this.isRetryLoopActive = false;
            }
            if (!this.retryQueue.isEmpty()) {
                this.scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
            }
            this.isRetryLoopActive = false;
        }
    }

    private void doDispatch(@NonNull Message message) {
        if (!this.retryQueue.isEmpty()) {
            this.addToRetryQueue(message);
        } else {
            try {
                message.set((Enum)SSEChannel.EventProps.sse_subs_dispatcher, this.id);
                message.set((Enum)SSEChannel.EventProps.sse_subs_dispatcher_inst, Integer.toString(System.identityHashCode(this)));
                if (!this.dispatchEvent(message.getChannelName(), message.toJSON())) {
                    LOGGER.debug("Error dispatching event to SSE channel. dispatchEvent failed.");
                    this.addToRetryQueue(message);
                }
            }
            catch (Exception e) {
                LOGGER.debug("Error dispatching event to SSE channel.", (Throwable)e);
                this.addToRetryQueue(message);
            }
        }
    }

    private final class SSEChannelSubscriber
    implements ChannelSubscriber {
        private int numSubscribers = 0;

        private SSEChannelSubscriber() {
        }

        public void onMessage(@NonNull Message message) {
            EventDispatcher.this.doDispatch(message);
        }
    }

    private static class Retry {
        private final long timestamp = System.currentTimeMillis();
        private final String channelName;
        private final String eventUUID;

        private Retry(@NonNull Message message) {
            String channelName = message.getChannelName();
            this.channelName = channelName != null ? channelName.intern() : "";
            String eventUUID = message.getEventUUID();
            this.eventUUID = eventUUID != null ? eventUUID.intern() : "";
        }

        private boolean needsMoreTimeToLandInStore() {
            return System.currentTimeMillis() - this.timestamp < 10000L;
        }
    }

    @Extension
    public static final class SSEHttpSessionListener
    extends HttpSessionListener {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void sessionDestroyed(HttpSessionEvent httpSessionEvent) {
            try {
                Map<String, EventDispatcher> dispatchers = EventDispatcherFactory.getDispatchers(httpSessionEvent.getSession());
                try {
                    for (EventDispatcher dispatcher : dispatchers.values()) {
                        try {
                            dispatcher.unsubscribeAll();
                        }
                        catch (Exception e) {
                            if (!LOGGER.isDebugEnabled()) continue;
                            LOGGER.debug("Error during unsubscribeAll() for dispatcher " + dispatcher.getId() + ".", (Throwable)e);
                        }
                    }
                }
                finally {
                    dispatchers.clear();
                }
            }
            catch (Exception e) {
                LOGGER.debug("Error during session cleanup. The session has probably timed out." + String.valueOf((Object)this), (Throwable)e);
            }
        }
    }
}

