/*
 * Decompiled with CFR 0.152.
 */
package org.apache.mahout.clustering.streaming.mapreduce;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.clustering.ClusteringUtils;
import org.apache.mahout.clustering.streaming.cluster.StreamingKMeans;
import org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansUtilsMR;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
import org.apache.mahout.math.Centroid;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.neighborhood.UpdatableSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingKMeansThread
implements Callable<Iterable<Centroid>> {
    private static final Logger log = LoggerFactory.getLogger(StreamingKMeansThread.class);
    private static final int NUM_ESTIMATE_POINTS = 1000;
    private final Configuration conf;
    private final Iterable<Centroid> dataPoints;

    public StreamingKMeansThread(Path input, Configuration conf) {
        this(StreamingKMeansUtilsMR.getCentroidsFromVectorWritable(new SequenceFileValueIterable<VectorWritable>(input, false, conf)), conf);
    }

    public StreamingKMeansThread(Iterable<Centroid> dataPoints, Configuration conf) {
        this.dataPoints = dataPoints;
        this.conf = conf;
    }

    @Override
    public Iterable<Centroid> call() {
        UpdatableSearcher searcher = StreamingKMeansUtilsMR.searcherFromConfiguration(this.conf);
        int numClusters = this.conf.getInt("estimatedNumMapClusters", 1);
        double estimateDistanceCutoff = this.conf.getFloat("estimatedDistanceCutoff", -1.0f);
        Iterator<Centroid> dataPointsIterator = this.dataPoints.iterator();
        if (estimateDistanceCutoff == -1.0) {
            ArrayList estimatePoints = Lists.newArrayListWithExpectedSize((int)1000);
            while (dataPointsIterator.hasNext() && estimatePoints.size() < 1000) {
                Centroid centroid = dataPointsIterator.next();
                estimatePoints.add(centroid);
            }
            if (log.isInfoEnabled()) {
                log.info("Estimated Points: {}", (Object)estimatePoints.size());
            }
            estimateDistanceCutoff = ClusteringUtils.estimateDistanceCutoff(estimatePoints, searcher.getDistanceMeasure());
        }
        StreamingKMeans streamingKMeans = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff);
        if (!dataPointsIterator.hasNext()) {
            dataPointsIterator = this.dataPoints.iterator();
        }
        while (dataPointsIterator.hasNext()) {
            streamingKMeans.cluster(dataPointsIterator.next());
        }
        streamingKMeans.reindexCentroids();
        return streamingKMeans;
    }
}

