/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.udp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.ip.AbstractInternetProtocolSendingMessageHandler;
import org.springframework.integration.ip.udp.DatagramPacketMessageMapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public class UnicastSendingMessageHandler
extends AbstractInternetProtocolSendingMessageHandler
implements Runnable {
    private final DatagramPacketMessageMapper mapper = new DatagramPacketMessageMapper();
    private volatile DatagramSocket socket;
    private volatile boolean waitForAck = false;
    private volatile boolean acknowledge = false;
    private volatile int ackPort;
    private volatile int ackTimeout = 5000;
    private volatile int ackCounter = 1;
    private volatile Map<String, CountDownLatch> ackControl = Collections.synchronizedMap(new HashMap());
    private volatile Exception fatalException;
    private volatile int soReceiveBufferSize = -1;
    private volatile String localAddress;
    private volatile CountDownLatch ackLatch;
    private volatile boolean ackThreadRunning;
    private volatile Executor taskExecutor;
    private volatile boolean taskExecutorSet;

    public UnicastSendingMessageHandler(String host, int port) {
        super(host, port);
        this.mapper.setLengthCheck(false);
        this.mapper.setAcknowledge(false);
    }

    public UnicastSendingMessageHandler(String host, int port, boolean lengthCheck) {
        super(host, port);
        this.mapper.setLengthCheck(lengthCheck);
        this.mapper.setAcknowledge(false);
    }

    public UnicastSendingMessageHandler(String host, int port, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        super(host, port);
        this.setReliabilityAttributes(false, acknowledge, ackHost, ackPort, ackTimeout);
    }

    public UnicastSendingMessageHandler(String host, int port, boolean lengthCheck, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        super(host, port);
        this.setReliabilityAttributes(lengthCheck, acknowledge, ackHost, ackPort, ackTimeout);
    }

    protected final void setReliabilityAttributes(boolean lengthCheck, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        this.mapper.setLengthCheck(lengthCheck);
        this.waitForAck = acknowledge;
        this.mapper.setAcknowledge(acknowledge);
        this.mapper.setAckAddress(ackHost + ":" + ackPort);
        this.ackPort = ackPort;
        if (ackTimeout > 0) {
            this.ackTimeout = ackTimeout;
        }
        this.acknowledge = acknowledge;
        if (this.acknowledge) {
            Assert.hasLength((String)ackHost);
        }
    }

    @Override
    public void doStart() {
        if (this.acknowledge && this.taskExecutor == null) {
            ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory(){
                private final AtomicInteger n = new AtomicInteger();

                @Override
                public Thread newThread(Runnable runner) {
                    Thread thread = new Thread(runner);
                    thread.setName("UDP-Ack-Handler-" + this.n.getAndIncrement());
                    thread.setDaemon(true);
                    return thread;
                }
            });
            this.taskExecutor = executor;
        }
    }

    @Override
    protected void doStop() {
        this.closeSocketIfNeeded();
        if (!this.taskExecutorSet && this.taskExecutor != null) {
            ((ExecutorService)this.taskExecutor).shutdown();
            this.taskExecutor = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleMessageInternal(Message<?> message) throws MessageRejectedException, MessageHandlingException, MessageDeliveryException {
        block22: {
            if (this.acknowledge) {
                Assert.state((boolean)this.isRunning(), (String)"When 'acknowlege' is enabled, adapter must be running");
                if (!this.ackThreadRunning) {
                    UnicastSendingMessageHandler unicastSendingMessageHandler = this;
                    synchronized (unicastSendingMessageHandler) {
                        if (!this.ackThreadRunning) {
                            this.ackLatch = new CountDownLatch(1);
                            this.taskExecutor.execute(this);
                            try {
                                this.ackLatch.await(10000L, TimeUnit.MILLISECONDS);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            }
            CountDownLatch countdownLatch = null;
            String messageId = message.getHeaders().getId().toString();
            try {
                if (this.waitForAck) {
                    if (this.fatalException != null) {
                        throw new MessagingException(message, "Acknowledgment failure", (Throwable)this.fatalException);
                    }
                    countdownLatch = new CountDownLatch(this.ackCounter);
                    this.ackControl.put(messageId, countdownLatch);
                }
                Object packet = this.mapper.fromMessage((Message)message);
                this.send((DatagramPacket)packet);
                this.logger.debug((Object)("Sent packet for message " + message));
                if (!this.waitForAck) break block22;
                try {
                    if (!countdownLatch.await(this.ackTimeout, TimeUnit.MILLISECONDS)) {
                        throw new MessagingException(message, "Failed to receive UDP Ack in " + this.ackTimeout + " millis");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            catch (MessagingException e) {
                throw e;
            }
            catch (Exception e) {
                try {
                    this.socket.close();
                }
                catch (Exception e1) {
                    // empty catch block
                }
                this.socket = null;
                throw new MessageHandlingException(message, "failed to send UDP packet", (Throwable)e);
            }
            finally {
                if (countdownLatch != null) {
                    this.ackControl.remove(messageId);
                }
            }
        }
    }

    protected void send(DatagramPacket packet) throws Exception {
        DatagramSocket socket = this.getSocket();
        packet.setSocketAddress(this.getDestinationAddress());
        socket.send(packet);
    }

    protected void setSocket(DatagramSocket socket) {
        this.socket = socket;
    }

    protected DatagramSocket getTheSocket() {
        return this.socket;
    }

    protected synchronized DatagramSocket getSocket() throws IOException {
        if (this.socket == null) {
            if (this.acknowledge) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Listening for acks on port: " + this.ackPort));
                }
                if (this.localAddress == null) {
                    this.socket = new DatagramSocket(this.ackPort);
                } else {
                    InetAddress whichNic = InetAddress.getByName(this.localAddress);
                    this.socket = new DatagramSocket(this.ackPort, whichNic);
                }
                if (this.soReceiveBufferSize > 0) {
                    this.socket.setReceiveBufferSize(this.soReceiveBufferSize);
                }
            } else {
                this.socket = new DatagramSocket();
            }
            this.setSocketAttributes(this.socket);
        }
        return this.socket;
    }

    @Override
    public void setSoReceiveBufferSize(int size) {
        this.soReceiveBufferSize = size;
    }

    @Override
    public void setLocalAddress(String localAddress) {
        this.localAddress = localAddress;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"'taskExecutor' cannot be null");
        this.taskExecutor = taskExecutor;
        this.taskExecutorSet = true;
    }

    public void setAckCounter(int ackCounter) {
        this.ackCounter = ackCounter;
    }

    public String getComponentType() {
        return "ip:udp-outbound-channel-adapter";
    }

    public boolean isAcknowledge() {
        return this.acknowledge;
    }

    public int getAckPort() {
        return this.ackPort;
    }

    public int getSoReceiveBufferSize() {
        return this.soReceiveBufferSize;
    }

    protected void onInit() throws Exception {
        super.onInit();
        this.mapper.setBeanFactory(this.getBeanFactory());
    }

    protected void setSocketAttributes(DatagramSocket socket) throws SocketException {
        if (this.getSoTimeout() >= 0) {
            socket.setSoTimeout(this.getSoTimeout());
        }
        if (this.getSoSendBufferSize() > 0) {
            socket.setSendBufferSize(this.getSoSendBufferSize());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            try {
                this.ackThreadRunning = true;
                this.ackLatch.countDown();
                DatagramPacket ackPack = new DatagramPacket(new byte[100], 100);
                while (true) {
                    CountDownLatch latch;
                    this.getSocket().receive(ackPack);
                    String id = new String(ackPack.getData(), ackPack.getOffset(), ackPack.getLength());
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Received ack for " + id + " from " + ackPack.getAddress().getHostAddress()));
                    }
                    if ((latch = this.ackControl.get(id)) == null) continue;
                    latch.countDown();
                }
            }
            catch (IOException e) {
                if (this.socket != null && !this.socket.isClosed()) {
                    this.logger.error((Object)("Error on UDP Acknowledge thread:" + e.getMessage()));
                }
                this.ackThreadRunning = false;
            }
        }
        catch (Throwable throwable) {
            this.ackThreadRunning = false;
            throw throwable;
        }
    }

    public void restartAckThread() {
        if (this.fatalException == null) {
            return;
        }
        this.fatalException = null;
        this.taskExecutor.execute(this);
    }

    @Deprecated
    public void shutDown() {
        this.stop();
    }

    private void closeSocketIfNeeded() {
        if (this.socket != null) {
            this.socket.close();
            this.socket = null;
        }
    }
}

