/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.direct.DirectEndpoint;
import org.apache.camel.component.mock.MockEndpoint;

public class SamplingThrottlerTest
extends ContextTestSupport {
    public void testSamplingFromExchangeStream() throws Exception {
        NotifyBuilder notify = new NotifyBuilder((CamelContext)this.context).whenDone(15).create();
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(2);
        mock.setResultWaitTime(3000L);
        ArrayList<Exchange> sentExchanges = new ArrayList<Exchange>();
        this.sendExchangesThroughDroppingThrottler(sentExchanges, 15);
        notify.matchesMockWaitTime();
        mock.assertIsSatisfied();
        this.validateDroppedExchanges(sentExchanges, mock.getReceivedCounter());
    }

    public void testBurstySampling() throws Exception {
        NotifyBuilder notify = new NotifyBuilder((CamelContext)this.context).whenDone(5).create();
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(2);
        mock.setResultWaitTime(3000L);
        ArrayList<Exchange> sentExchanges = new ArrayList<Exchange>();
        this.sendExchangesThroughDroppingThrottler(sentExchanges, 5);
        Thread.sleep(1100L);
        this.sendExchangesThroughDroppingThrottler(sentExchanges, 5);
        notify.matchesMockWaitTime();
        mock.assertIsSatisfied();
        this.validateDroppedExchanges(sentExchanges, mock.getReceivedCounter());
    }

    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(3);
        mock.setResultWaitTime(4000L);
        final List sentExchanges = Collections.synchronizedList(new ArrayList());
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; ++i) {
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        SamplingThrottlerTest.this.sendExchangesThroughDroppingThrottler(sentExchanges, 35);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            });
        }
        mock.assertIsSatisfied();
        executor.shutdownNow();
    }

    public void testSamplingUsingMessageFrequency() throws Exception {
        long totalMessages = 100L;
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(10);
        mock.setResultWaitTime(100L);
        int i = 0;
        while ((long)i < totalMessages) {
            this.template.sendBody("direct:sample-messageFrequency", (Object)("<message>" + i + "</message>"));
            ++i;
        }
        mock.assertIsSatisfied();
    }

    public void testSamplingUsingMessageFrequencyViaDSL() throws Exception {
        long totalMessages = 50L;
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(10);
        mock.setResultWaitTime(100L);
        int i = 0;
        while ((long)i < totalMessages) {
            this.template.sendBody("direct:sample-messageFrequency-via-dsl", (Object)("<message>" + i + "</message>"));
            ++i;
        }
        mock.assertIsSatisfied();
    }

    private void sendExchangesThroughDroppingThrottler(List<Exchange> sentExchanges, int messages) throws Exception {
        ProducerTemplate myTemplate = this.context.createProducerTemplate();
        DirectEndpoint targetEndpoint = this.resolveMandatoryEndpoint("direct:sample", DirectEndpoint.class);
        for (int i = 0; i < messages; ++i) {
            Exchange e = targetEndpoint.createExchange();
            e.getIn().setBody((Object)("<message>" + i + "</message>"));
            if (!this.context.getStatus().isStarted()) continue;
            myTemplate.send((Endpoint)targetEndpoint, e);
            sentExchanges.add(e);
            Thread.sleep(100L);
        }
        myTemplate.stop();
    }

    private void validateDroppedExchanges(List<Exchange> sentExchanges, int expectedNotDroppedCount) {
        int notDropped = 0;
        for (Exchange e : sentExchanges) {
            Boolean stopped = (Boolean)e.getProperty("CamelRouteStop", Boolean.class);
            if (stopped != null) continue;
            ++notDropped;
        }
        SamplingThrottlerTest.assertEquals((int)expectedNotDroppedCount, (int)notDropped);
    }

    @Override
    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.from("direct:sample").sample().to("mock:result");
                this.from("direct:sample-configured").sample(1L, TimeUnit.SECONDS).to("mock:result");
                this.from("direct:sample-configured-via-dsl").sample().samplePeriod(1L).timeUnits(TimeUnit.SECONDS).to("mock:result");
                this.from("direct:sample-messageFrequency").sample(10L).to("mock:result");
                this.from("direct:sample-messageFrequency-via-dsl").sample().sampleMessageFrequency(5L).to("mock:result");
            }
        };
    }
}

