/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.converter.stream.CachedOutputStream;
import org.apache.camel.model.ExpressionNode;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class SplitterStreamCachingInSubRouteTest
extends ContextTestSupport {
    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            public void configure() throws Exception {
                SplitterStreamCachingInSubRouteTest.this.context.setStreamCaching(Boolean.valueOf(true));
                SplitterStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setEnabled(true);
                SplitterStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
                SplitterStreamCachingInSubRouteTest.this.context.getStreamCachingStrategy().setSpoolThreshold(1L);
                ((ExpressionNode)this.from("direct:startIterable").split((Expression)this.body().tokenize(",")).streaming().aggregationStrategy((AggregationStrategy)new InternalAggregationStrategy()).stopOnException().parallelProcessing().to("direct:sub")).end().to("mock:result");
                ((ExpressionNode)this.from("direct:start").split((Expression)this.body().tokenize(",")).aggregationStrategy((AggregationStrategy)new InternalAggregationStrategy()).stopOnException().parallelProcessing().to("direct:sub")).end().to("mock:result");
                ((RouteDefinition)this.from("direct:sub").process((Processor)new InputProcessorWithStreamCache(22))).to("mock:resultsub");
                ((ExpressionNode)this.from("direct:startNested").split((Expression)this.body().tokenize(",")).aggregationStrategy((AggregationStrategy)new InternalAggregationStrategy()).stopOnException().parallelProcessing().to("direct:start")).end().to("mock:resultNested");
            }
        };
    }

    public void testWithAggregationStategyAndStreamCacheInSubRoute() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"Test Message 22"});
        this.template.sendBody("direct:start", (Object)"<start></start>");
        this.assertMockEndpointsSatisfied();
    }

    public void testStreamCacheIterableSplitter() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:result");
        mock.expectedBodiesReceived(new Object[]{"Test Message 22"});
        this.template.sendBody("direct:startIterable", (Object)"<start></start>");
        this.assertMockEndpointsSatisfied();
    }

    public void testNested() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:resultNested");
        mock.expectedBodiesReceived(new Object[]{"Test Message 22"});
        this.template.sendBody("direct:startNested", (Object)"<start></start>");
        this.assertMockEndpointsSatisfied();
    }

    public static class InternalAggregationStrategy
    implements AggregationStrategy {
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            try {
                String oldBody = (String)oldExchange.getIn().getBody(String.class);
                String newBody = (String)newExchange.getIn().getBody(String.class);
                String merged = oldBody + newBody;
                CachedOutputStream cos = new CachedOutputStream(newExchange);
                cos.write(merged.getBytes("UTF-8"));
                cos.close();
                oldExchange.getIn().setBody((Object)cos.newStreamCache());
                return oldExchange;
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    public static class InputProcessorWithStreamCache
    implements Processor {
        private final int number;

        public InputProcessorWithStreamCache(int number) {
            this.number = number;
        }

        public void process(Exchange exchange) throws Exception {
            CachedOutputStream cos = new CachedOutputStream(exchange);
            String s = "Test Message " + this.number;
            cos.write(s.getBytes(Charset.forName("UTF-8")));
            cos.close();
            InputStream is = (InputStream)cos.newStreamCache();
            exchange.getOut().setBody((Object)is);
        }
    }
}

