OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: On completion failure when doing file processing


Have you tried adding bridgeErrorHandler=true to the sftp operation?



> On May 10, 2018, at 10:35 AM, Hugo Marcelino <hugo.m.marcelino@xxxxxxxxx> wrote:
> 
> Hello,
> 
> I have inherited some code that is running Apache Camel and was asked if it was possible to send a notification whenever a processed file failed. I've found two links that looked promising to me.
> 
> 1. https://issues.apache.org/jira/browse/CAMEL-3372
> 2. https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java 
> 
> The idea would be to use the ".onCompletion().onFailureOnly()" to react to failed file process and send a notification. But in fact what's happening is that the file is being moved to the failed folder but is not invoking my FileRollback:onFailure method.
> 
> I've built a test that is the closest as possible to our current code. The difference is that instead of being file:// is sftp:// but for the test case is not important.
> 
> Can you help me?
> 
> Thanks
> 
> public class FileProcessingFailedNotificationTest extends ContextTestSupport {
> 
>    private static final String basePath = "resources/";
> 
>    public void testSmoke() throws InterruptedException {
>        Thread.sleep(10000);
>    }
> 
>    @Override
>    protected RouteBuilder createRouteBuilder() {
> 
>        return new RouteBuilder() {
>            @Override
>            public void configure() {
>                from("file://" + basePath + "?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed") 
>                        .to("direct:csvProcessor");
> 
>                from("direct:csvProcessor")
>                        .onCompletion().onFailureOnly()
>                            .bean(FileRollback.class, "onFailure")
>                        .end()
>                        .process(new CSVProcessor())
>                        .split(body()).streaming().parallelProcessing()
>                        .process(new LineProcessor())
>                        .end();
>            }
>        };
>    }
> 
>    public static class FileRollback implements Synchronization {
> 
>        public void onComplete(Exchange exchange) {
>            System.out.println("FileRollback:onComplete");
>        }
> 
>        public void onFailure(Exchange exchange) {
>            System.out.println("FileRollback:onFailure");
>        }
>    }
> 
>    private static class CSVProcessor implements Processor {
> 
>        @Override
>        public void process(Exchange exchange) throws Exception {
>            CsvDataFormat csvDataFormat = new CsvDataFormat();
>            csvDataFormat.setDelimiter(',');
>            csvDataFormat.setLazyLoad(true);
>            csvDataFormat.setUseMaps(true);
> 
>            ServiceHelper.startService(csvDataFormat);
> 
>            InputStream stream = exchange.getIn().getMandatoryBody(InputStream.class);
>            Message out = exchange.getOut();
>            out.copyFrom(exchange.getIn());
>            Object result = csvDataFormat.unmarshal(exchange, stream);
>            out.setBody(result);
>        }
>    }
> 
>    private static class LineProcessor implements Processor {
>        @Override
>        public void process(Exchange exchange) {
>            String body = exchange.getIn().getBody(String.class);
> 
>            if (body.contains("kaboom")) {
>                throw new RuntimeException("kaboom");
> 
>            } else {
>                System.out.println(body);
>            }
>        }
>    }
> }