/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams.processors;

import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.processors.BaseProcessor;
import org.apache.storm.streams.processors.BatchProcessor;

public class AggregateProcessor<T, A, R>
extends BaseProcessor<T>
implements BatchProcessor {
    private final CombinerAggregator<T, A, R> aggregator;
    private final boolean emitAggregate;
    private A state;

    public AggregateProcessor(CombinerAggregator<T, A, R> aggregator) {
        this(aggregator, false);
    }

    public AggregateProcessor(CombinerAggregator<T, A, R> aggregator, boolean emitAggregate) {
        this.aggregator = aggregator;
        this.emitAggregate = emitAggregate;
    }

    @Override
    public void execute(T input) {
        if (this.state == null) {
            this.state = this.aggregator.init();
        }
        this.state = this.aggregator.apply(this.state, input);
        if (this.emitAggregate) {
            this.mayBeForwardAggUpdate(() -> this.state);
        } else {
            this.mayBeForwardAggUpdate(() -> this.aggregator.result(this.state));
        }
    }

    @Override
    public void finish() {
        if (this.state != null) {
            if (this.emitAggregate) {
                this.context.forward(this.state);
            } else {
                this.context.forward(this.aggregator.result(this.state));
            }
            this.state = null;
        }
    }

    public String toString() {
        return "AggregateProcessor{aggregator=" + this.aggregator + ", emitAggregate=" + this.emitAggregate + ", state=" + this.state + "}";
    }
}

