/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams.processors;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.ValueJoiner;
import org.apache.storm.streams.processors.BaseProcessor;
import org.apache.storm.streams.processors.BatchProcessor;
import org.apache.storm.streams.tuple.Tuple3;

public class JoinProcessor<K, R, V1, V2>
extends BaseProcessor<Pair<K, ?>>
implements BatchProcessor {
    private final ValueJoiner<V1, V2, R> valueJoiner;
    private final String leftStream;
    private final String rightStream;
    private final List<Pair<K, V1>> leftRows = new ArrayList<Pair<K, V1>>();
    private final List<Pair<K, V2>> rightRows = new ArrayList<Pair<K, V2>>();
    private final JoinType leftType;
    private final JoinType rightType;

    public JoinProcessor(String leftStream, String rightStream, ValueJoiner<V1, V2, R> valueJoiner) {
        this(leftStream, rightStream, valueJoiner, JoinType.INNER, JoinType.INNER);
    }

    public JoinProcessor(String leftStream, String rightStream, ValueJoiner<V1, V2, R> valueJoiner, JoinType leftType, JoinType rightType) {
        this.valueJoiner = valueJoiner;
        this.leftStream = leftStream;
        this.rightStream = rightStream;
        this.leftType = leftType;
        this.rightType = rightType;
    }

    @Override
    public void execute(Pair<K, ?> input, String sourceStream) {
        K key = input.getFirst();
        if (sourceStream.equals(this.leftStream)) {
            Object val = input.getSecond();
            Pair<K, ?> pair = Pair.of(key, val);
            this.leftRows.add(pair);
            if (!this.context.isWindowed()) {
                this.joinAndForward(Collections.singletonList(pair), this.rightRows);
            }
        } else if (sourceStream.equals(this.rightStream)) {
            Object val = input.getSecond();
            Pair<K, ?> pair = Pair.of(key, val);
            this.rightRows.add(pair);
            if (!this.context.isWindowed()) {
                this.joinAndForward(this.leftRows, Collections.singletonList(pair));
            }
        }
    }

    @Override
    public void finish() {
        this.joinAndForward(this.leftRows, this.rightRows);
        this.leftRows.clear();
        this.rightRows.clear();
    }

    public String getLeftStream() {
        return this.leftStream;
    }

    public String getRightStream() {
        return this.rightStream;
    }

    private void joinAndForward(List<Pair<K, V1>> leftRows, List<Pair<K, V2>> rightRows) {
        if (leftRows.size() < rightRows.size()) {
            for (Tuple3 res : this.join(this.getJoinTable(leftRows), rightRows, this.leftType, this.rightType)) {
                this.context.forward(Pair.of(res._1, this.valueJoiner.apply(res._2, res._3)));
            }
        } else {
            for (Tuple3 res : this.join(this.getJoinTable(rightRows), leftRows, this.rightType, this.leftType)) {
                this.context.forward(Pair.of(res._1, this.valueJoiner.apply(res._3, res._2)));
            }
        }
    }

    private <T1, T2> List<Tuple3<K, T1, T2>> join(Multimap<K, T1> tab, List<Pair<K, T2>> rows, JoinType leftType, JoinType rightType) {
        ArrayList<Tuple3<K, T1, T2>> res = new ArrayList<Tuple3<K, T1, T2>>();
        for (Pair<K, T2> row : rows) {
            K key = row.getFirst();
            Collection values = tab.removeAll(key);
            if (values.isEmpty()) {
                if (rightType != JoinType.OUTER) continue;
                res.add(new Tuple3(row.getFirst(), null, row.getSecond()));
                continue;
            }
            for (Object mapValue : values) {
                res.add(new Tuple3(row.getFirst(), mapValue, row.getSecond()));
            }
        }
        if (leftType == JoinType.OUTER) {
            for (Pair<K, T2> row : tab.entries()) {
                res.add(new Tuple3(row.getKey(), row.getValue(), null));
            }
        }
        return res;
    }

    private <T> Multimap<K, T> getJoinTable(List<Pair<K, T>> rows) {
        ArrayListMultimap m = ArrayListMultimap.create();
        for (Pair<K, T> v : rows) {
            m.put(v.getFirst(), v.getSecond());
        }
        return m;
    }

    public static enum JoinType {
        INNER,
        OUTER;

    }
}

