/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.zookeeper;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.zookeeper.ZooKeeperConfiguration;
import org.apache.camel.component.zookeeper.ZooKeeperConnectionManager;
import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
import org.apache.camel.component.zookeeper.ZooKeeperMessage;
import org.apache.camel.component.zookeeper.ZooKeeperUtils;
import org.apache.camel.component.zookeeper.operations.CreateOperation;
import org.apache.camel.component.zookeeper.operations.DeleteOperation;
import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
import org.apache.camel.component.zookeeper.operations.OperationResult;
import org.apache.camel.component.zookeeper.operations.SetDataOperation;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZookeeperProducer
extends DefaultProducer {
    public static final String ZK_OPERATION_WRITE = "WRITE";
    public static final String ZK_OPERATION_DELETE = "DELETE";
    private ZooKeeperConfiguration configuration;
    private ZooKeeperConnectionManager zkm;
    private ZooKeeper connection;

    public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
        super((Endpoint)endpoint);
        this.configuration = endpoint.getConfiguration();
        this.zkm = endpoint.getConnectionManager();
    }

    public void process(Exchange exchange) throws Exception {
        ProductionContext context = new ProductionContext(this.connection, exchange);
        String operation = (String)exchange.getIn().getHeader("CamelZookeeperOperation", String.class);
        boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
        if (ExchangeHelper.isOutCapable((Exchange)exchange)) {
            if (isDelete) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug(String.format("Deleting znode '%s', waiting for confirmation", context.node));
                }
                OperationResult result = this.synchronouslyDelete(context);
                if (this.configuration.isListChildren()) {
                    result = this.listChildren(context);
                }
                this.updateExchangeWithResult(context, result);
            } else {
                if (this.log.isDebugEnabled()) {
                    this.log.debug(String.format("Storing data to znode '%s', waiting for confirmation", context.node));
                }
                OperationResult result = this.synchronouslySetData(context);
                if (this.configuration.isListChildren()) {
                    result = this.listChildren(context);
                }
                this.updateExchangeWithResult(context, result);
            }
        } else if (isDelete) {
            this.asynchronouslyDeleteNode(this.connection, context);
        } else {
            this.asynchronouslySetDataOnNode(this.connection, context);
        }
    }

    protected void doStart() throws Exception {
        this.connection = this.zkm.getConnection();
        if (this.log.isTraceEnabled()) {
            this.log.trace(String.format("Starting zookeeper producer of '%s'", this.configuration.getPath()));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.log.isTraceEnabled()) {
            this.log.trace(String.format("Shutting down zookeeper producer of '%s'", this.configuration.getPath()));
        }
    }

    private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
        if (this.log.isDebugEnabled()) {
            this.log.debug(String.format("Deleting node '%s', not waiting for confirmation", context.node));
        }
        connection.delete(context.node, context.version, (AsyncCallback.VoidCallback)new AsyncDeleteCallback(), (Object)context);
    }

    private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) {
        if (this.log.isDebugEnabled()) {
            this.log.debug(String.format("Storing data to node '%s', not waiting for confirmation", context.node));
        }
        connection.setData(context.node, context.payload, context.version, (AsyncCallback.StatCallback)new AsyncSetDataCallback(), (Object)context);
    }

    private void updateExchangeWithResult(ProductionContext context, OperationResult result) {
        ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders());
        if (result.isOk()) {
            out.setBody(result.getResult());
        } else {
            context.exchange.setException((Throwable)result.getException());
        }
        context.exchange.setOut((Message)out);
    }

    private OperationResult listChildren(ProductionContext context) throws Exception {
        return new GetChildrenOperation(context.connection, this.configuration.getPath()).get();
    }

    private OperationResult<String> createNode(ProductionContext ctx) throws Exception {
        CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
        create.setPermissions(ZooKeeperUtils.getAclListFromMessage(ctx.exchange.getIn()));
        CreateMode mode = null;
        String modeString = this.configuration.getCreateMode();
        if (modeString != null) {
            try {
                mode = ZooKeeperUtils.getCreateModeFromString(modeString, CreateMode.EPHEMERAL);
            }
            catch (Exception e) {}
        } else {
            mode = ZooKeeperUtils.getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL);
        }
        create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode);
        create.setData(ctx.payload);
        return create.get();
    }

    private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception {
        SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload);
        setData.setVersion(ctx.version);
        OperationResult<Object> result = setData.get();
        if (!result.isOk() && this.configuration.shouldCreate() && result.failedDueTo(KeeperException.Code.NONODE)) {
            this.log.warn(String.format("Node '%s' did not exist, creating it.", ctx.node));
            result = this.createNode(ctx);
        }
        return result;
    }

    private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
        DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
        setData.setVersion(ctx.version);
        OperationResult<Object> result = setData.get();
        if (!result.isOk() && this.configuration.shouldCreate() && result.failedDueTo(KeeperException.Code.NONODE)) {
            this.log.warn(String.format("Node '%s' did not exist, creating it.", ctx.node));
            result = this.createNode(ctx);
        }
        return result;
    }

    private void logStoreComplete(String path, Stat statistics) {
        if (this.log.isDebugEnabled()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(String.format("Stored data to node '%s', and receive statistics %s", path, statistics));
            } else {
                this.log.debug(String.format("Stored data to node '%s'", path));
            }
        }
    }

    private class AsyncDeleteCallback
    implements AsyncCallback.VoidCallback {
        private AsyncDeleteCallback() {
        }

        public void processResult(int rc, String path, Object ctx) {
            if (ZookeeperProducer.this.log.isDebugEnabled()) {
                if (ZookeeperProducer.this.log.isTraceEnabled()) {
                    ZookeeperProducer.this.log.trace(String.format("Removed data node '%s'", path));
                } else {
                    ZookeeperProducer.this.log.debug(String.format("Removed data node '%s'", path));
                }
            }
        }
    }

    private class AsyncSetDataCallback
    implements AsyncCallback.StatCallback {
        private AsyncSetDataCallback() {
        }

        public void processResult(int rc, String node, Object ctx, Stat statistics) {
            if (KeeperException.Code.NONODE.equals((Object)KeeperException.Code.get((int)rc))) {
                if (ZookeeperProducer.this.configuration.shouldCreate()) {
                    ZookeeperProducer.this.log.warn(String.format("Node '%s' did not exist, creating it...", node));
                    ProductionContext context = (ProductionContext)ctx;
                    OperationResult result = null;
                    try {
                        result = ZookeeperProducer.this.createNode(context);
                    }
                    catch (Exception e) {
                        ZookeeperProducer.this.log.error(String.format("Error trying to create node '%s'", node), (Throwable)e);
                    }
                    if (result == null || !result.isOk()) {
                        ZookeeperProducer.this.log.error(String.format("Error creating node '%s'", node), (Throwable)result.getException());
                    }
                }
            } else {
                ZookeeperProducer.this.logStoreComplete(node, statistics);
            }
        }
    }

    private class ProductionContext {
        ZooKeeper connection;
        Exchange exchange;
        Message in;
        byte[] payload;
        int version;
        String node;

        public ProductionContext(ZooKeeper connection, Exchange exchange) {
            this.connection = connection;
            this.exchange = exchange;
            this.in = exchange.getIn();
            this.node = ZooKeeperUtils.getNodeFromMessage(this.in, ZookeeperProducer.this.configuration.getPath());
            this.version = ZooKeeperUtils.getVersionFromMessage(this.in);
            this.payload = ZooKeeperUtils.getPayloadFromExchange(exchange);
        }
    }
}

