/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.hadoop;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.cassandra.client.RingCache;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.Progressable;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;

final class ColumnFamilyRecordWriter
extends org.apache.hadoop.mapreduce.RecordWriter<ByteBuffer, List<Mutation>>
implements RecordWriter<ByteBuffer, List<Mutation>> {
    private final Configuration conf;
    private final RingCache ringCache;
    private final int queueSize;
    private final Map<Range, RangeClient> clients;
    private final long batchThreshold;
    private final ConsistencyLevel consistencyLevel;
    private Progressable progressable;

    ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException {
        this(context.getConfiguration());
        this.progressable = new Progressable(context);
    }

    ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException {
        this(conf);
        this.progressable = progressable;
    }

    ColumnFamilyRecordWriter(Configuration conf) throws IOException {
        this.conf = conf;
        this.ringCache = new RingCache(conf);
        this.queueSize = conf.getInt("mapreduce.output.columnfamilyoutputformat.queue.size", 32 * FBUtilities.getAvailableProcessors());
        this.clients = new HashMap<Range, RangeClient>();
        this.batchThreshold = conf.getLong("mapreduce.output.columnfamilyoutputformat.batch.threshold", 32L);
        this.consistencyLevel = ConsistencyLevel.valueOf((String)ConfigHelper.getWriteConsistencyLevel(conf));
    }

    public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException {
        Range<Token> range = this.ringCache.getRange(keybuff);
        RangeClient client = this.clients.get(range);
        if (client == null) {
            client = new RangeClient(this.ringCache.getEndpoint(range));
            client.start();
            this.clients.put(range, client);
        }
        for (Mutation amut : value) {
            client.put(Pair.create(keybuff, amut));
        }
        this.progressable.progress();
    }

    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        this.close();
    }

    @Deprecated
    public void close(Reporter reporter) throws IOException {
        this.close();
    }

    private void close() throws IOException {
        IOException clientException = null;
        for (RangeClient client : this.clients.values()) {
            try {
                client.close();
            }
            catch (IOException e) {
                clientException = e;
            }
        }
        if (clientException != null) {
            throw clientException;
        }
    }

    static /* synthetic */ long access$200(ColumnFamilyRecordWriter x0) {
        return x0.batchThreshold;
    }

    static /* synthetic */ ConsistencyLevel access$300(ColumnFamilyRecordWriter x0) {
        return x0.consistencyLevel;
    }

    public class RangeClient
    extends Thread {
        private final List<InetAddress> endpoints;
        private final String columnFamily;
        private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue;
        private volatile boolean run;
        private volatile IOException lastException;
        private Cassandra.Client thriftClient;
        private TSocket thriftSocket;

        public RangeClient(List<InetAddress> endpoints) {
            super("client-" + endpoints);
            this.columnFamily = ConfigHelper.getOutputColumnFamily(ColumnFamilyRecordWriter.this.conf);
            this.queue = new ArrayBlockingQueue<Pair<ByteBuffer, Mutation>>(ColumnFamilyRecordWriter.this.queueSize);
            this.run = true;
            this.endpoints = endpoints;
        }

        /*
         * Exception decompiling
         */
        public void put(Pair<ByteBuffer, Mutation> value) throws IOException {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[DOLOOP]], but top level block is 0[TRYBLOCK]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        public void close() throws IOException {
            this.run = false;
            this.interrupt();
            try {
                this.join();
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            if (this.lastException != null) {
                throw this.lastException;
            }
        }

        private void closeInternal() {
            if (this.thriftSocket != null) {
                this.thriftSocket.close();
                this.thriftSocket = null;
                this.thriftClient = null;
            }
        }

        /*
         * Unable to fully structure code
         */
        @Override
        public void run() {
            block6: while (this.run || !this.queue.isEmpty()) {
                try {
                    mutation = this.queue.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                batch = new HashMap<T1, Map<String, ArrayList<E>>>();
                while (mutation != null) {
                    subBatch = (Map<String, ArrayList<E>>)batch.get(mutation.left);
                    if (subBatch == null) {
                        subBatch = Collections.singletonMap(this.columnFamily, new ArrayList<E>());
                        batch.put(mutation.left, subBatch);
                    }
                    ((List)subBatch.get(this.columnFamily)).add(mutation.right);
                    if ((long)batch.size() >= ColumnFamilyRecordWriter.access$200(ColumnFamilyRecordWriter.this)) break;
                    mutation = (Pair)this.queue.poll();
                }
                iter = this.endpoints.iterator();
                while (true) {
                    try {
                        this.thriftClient.batch_mutate(batch, ColumnFamilyRecordWriter.access$300(ColumnFamilyRecordWriter.this));
                        continue block6;
                    }
                    catch (Exception e) {
                        this.closeInternal();
                        if (!iter.hasNext()) {
                            this.lastException = new IOException(e);
                            break block6;
                        }
                        try {
                            address = iter.next();
                            this.thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getOutputRpcPort(ColumnFamilyRecordWriter.access$000(ColumnFamilyRecordWriter.this)));
                            this.thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(this.thriftSocket, ColumnFamilyRecordWriter.access$000(ColumnFamilyRecordWriter.this));
                            continue;
                        }
                        catch (Exception e) {
                            this.closeInternal();
                            if (!(e instanceof TException) || !iter.hasNext()) ** break;
                            continue;
                            this.lastException = new IOException(e);
                            break block6;
                        }
                    }
                    break;
                }
            }
        }

        @Override
        public String toString() {
            return "#<Client for " + this.endpoints.toString() + ">";
        }
    }
}

