/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import javax.xml.transform.Source;
import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamSource;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.StringSource;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.MulticastDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.util.IOHelper;

public class MultiCastParallelAndStreamCachingTest
extends ContextTestSupport {
    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            public void configure() throws Exception {
                MultiCastParallelAndStreamCachingTest.this.context.setStreamCaching(Boolean.valueOf(true));
                MultiCastParallelAndStreamCachingTest.this.context.getStreamCachingStrategy().setEnabled(true);
                MultiCastParallelAndStreamCachingTest.this.context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
                MultiCastParallelAndStreamCachingTest.this.context.getStreamCachingStrategy().setSpoolThreshold(5L);
                ((MulticastDefinition)this.from("direct:start").multicast().parallelProcessing().stopOnException().to(new String[]{"direct:a", "direct:b"})).end().to("mock:result");
                ((RouteDefinition)this.from("direct:a").process((Processor)new SimpleProcessor(false))).to("mock:resulta");
                ((RouteDefinition)this.from("direct:b").process((Processor)new SimpleProcessor(true))).to("mock:resultb");
            }
        };
    }

    public void testByteArrayInputStreamCache() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{"<start></start>"});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{"<start></start>"});
        this.template.sendBody("direct:start", (Object)new ByteArrayInputStream("<start></start>".getBytes("UTF-8")));
        this.assertMockEndpointsSatisfied();
    }

    public void testFileInputStreamCache() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{"James,Guillaume,Hiram,Rob,Roman"});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{"James,Guillaume,Hiram,Rob,Roman"});
        InputStream in = MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream("org/apache/camel/processor/simple.txt");
        this.template.sendBody("direct:start", (Object)in);
        this.assertMockEndpointsSatisfied();
    }

    public void testInputStreamCache() throws Exception {
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{"A"});
        MockEndpoint mockb = this.getMockEndpoint("mock:resultb");
        mockb.expectedBodiesReceived(new Object[]{"A"});
        InputStream in = MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream("org/apache/camel/processor/oneCharacter.txt");
        this.template.sendBody("direct:start", (Object)in);
        this.assertMockEndpointsSatisfied();
    }

    public void testReaderCache() throws Exception {
        String abcScharpS = "ABC\u00df";
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{abcScharpS});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{abcScharpS});
        InputStreamReader isr = new InputStreamReader((InputStream)new ByteArrayInputStream(abcScharpS.getBytes("ISO-8859-1")), "ISO-8859-1");
        this.template.sendBody("direct:start", (Object)isr);
        this.assertMockEndpointsSatisfied();
    }

    public void testStreamSourceCacheWithInputStream() throws Exception {
        String input = "<A>a</A>";
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{input});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{input});
        StreamSource ss = new StreamSource(new ByteArrayInputStream(input.getBytes("UTF-8")));
        this.template.sendBody("direct:start", (Object)ss);
        this.assertMockEndpointsSatisfied();
    }

    public void testStreamSourceCacheWithReader() throws Exception {
        String input = "ABC\u00df";
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{input});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{input});
        InputStreamReader isr = new InputStreamReader((InputStream)new ByteArrayInputStream(input.getBytes("ISO-8859-1")), "ISO-8859-1");
        StreamSource ss = new StreamSource(isr);
        this.template.sendBody("direct:start", (Object)ss);
        this.assertMockEndpointsSatisfied();
    }

    public void testSourceCache() throws Exception {
        String input = "<A>a</A>";
        MockEndpoint mock = this.getMockEndpoint("mock:resulta");
        mock.expectedBodiesReceived(new Object[]{input});
        mock = this.getMockEndpoint("mock:resultb");
        mock.expectedBodiesReceived(new Object[]{input});
        StringSource ss = new StringSource(input);
        SAXSource saxSource = new SAXSource(SAXSource.sourceToInputSource((Source)ss));
        this.template.sendBody("direct:start", (Object)saxSource);
        this.assertMockEndpointsSatisfied();
    }

    private static class SimpleProcessor
    implements Processor {
        private final boolean withSleepTime;

        SimpleProcessor(boolean withSleepTime) {
            this.withSleepTime = withSleepTime;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void process(Exchange exchange) throws Exception {
            Object body;
            if (this.withSleepTime) {
                Thread.sleep(900L);
            }
            if ((body = exchange.getIn().getBody()) instanceof InputStream) {
                ByteArrayOutputStream output = new ByteArrayOutputStream();
                IOHelper.copy((InputStream)((InputStream)body), (OutputStream)output);
                exchange.getOut().setBody((Object)output.toByteArray());
                return;
            } else if (body instanceof Reader) {
                Reader reader = (Reader)body;
                StringBuilder sb = new StringBuilder();
                int i = reader.read();
                while (i > -1) {
                    sb.append((char)i);
                    i = reader.read();
                }
                reader.close();
                exchange.getOut().setBody((Object)sb.toString());
                return;
            } else {
                if (!(body instanceof StreamSource)) throw new RuntimeException("Type " + body.getClass().getName() + " not supported");
                StreamSource ss = (StreamSource)body;
                if (ss.getInputStream() != null) {
                    ByteArrayOutputStream output = new ByteArrayOutputStream();
                    IOHelper.copy((InputStream)ss.getInputStream(), (OutputStream)output);
                    exchange.getOut().setBody((Object)output.toByteArray());
                    return;
                } else {
                    if (ss.getReader() == null) throw new RuntimeException("StreamSource without InputStream and without Reader not supported");
                    Reader reader = ss.getReader();
                    StringBuilder sb = new StringBuilder();
                    int i = reader.read();
                    while (i > -1) {
                        sb.append((char)i);
                        i = reader.read();
                    }
                    reader.close();
                    exchange.getOut().setBody((Object)sb.toString());
                }
            }
        }
    }
}

