From 6b729b7fee327b95dfb3a7fbd9a437137f71501b Mon Sep 17 00:00:00 2001 From: ribeach1 Date: Thu, 5 Jul 2018 14:53:01 +0200 Subject: [PATCH] Update dependencies to its latest version --- pom.xml | 10 +++++----- .../DataProviderPollingConsumer.java | 16 +++++++++++----- .../DataProviderComponentMultithreadingTest.java | 9 ++++++--- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 31a620d..338ce87 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 org.apache.camel camel-data-provider - 1.0.4 + 1.0.5 jar @@ -16,10 +16,10 @@ UTF-8 UTF-8 - 2.18.1 - 1.7.22 - 1.2.1 - 21.0 + 2.21.1 + 1.7.25 + 1.2.3 + 25.1-jre 1.10.19 1.8 1.8 diff --git a/src/main/java/org/apache/camel/component/dataprovider/DataProviderPollingConsumer.java b/src/main/java/org/apache/camel/component/dataprovider/DataProviderPollingConsumer.java index f28f271..6e3fba9 100644 --- a/src/main/java/org/apache/camel/component/dataprovider/DataProviderPollingConsumer.java +++ b/src/main/java/org/apache/camel/component/dataprovider/DataProviderPollingConsumer.java @@ -10,7 +10,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * {@link PollingConsumerSupport} extension for {@link IDataProvider}. @@ -30,8 +35,9 @@ public class DataProviderPollingConsumer extends PollingConsumerSupport { @Override public Exchange receive() { Range range = createRange(); - LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range)); - Iterable partition = getEndpoint().getDataProvider().partition(range); + IDataProvider dataProvider = getEndpoint().getDataProvider(); + LogUtils.info(LOG, () -> String.format("[%s] Handling range '%s'.", dataProvider.getName(), range)); + Iterable partition = dataProvider.partition(range); Exchange exchange = getEndpoint().createExchange(); int size = Iterables.size(partition); Message in = exchange.getIn(); @@ -65,12 +71,12 @@ public Exchange receive(long timeout) { } @Override - protected void doStart() throws Exception { + protected void doStart() { executorService = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, getClass().getSimpleName()); } @Override - protected void doStop() throws Exception { + protected void doStop() { getCamelContext().getExecutorServiceManager().shutdown(executorService); } diff --git a/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentMultithreadingTest.java b/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentMultithreadingTest.java index 3317aac..fde6964 100644 --- a/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentMultithreadingTest.java +++ b/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentMultithreadingTest.java @@ -1,5 +1,7 @@ package org.apache.camel.component.dataprovider; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.JndiRegistry; @@ -29,11 +31,12 @@ protected JndiRegistry createRegistry() throws Exception { } @Override - protected RouteBuilder createRouteBuilder() throws Exception { + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("dataprovider://foo?consumer.useFixedDelay=true&consumer.delay=20&consumer.maxMessagesPerPoll=20&initialDelay=20"). - to("mock:result"); + from("dataprovider://foo?consumer.maxMessagesPerPoll=20&initialDelay=20&greedy=true") + .to("seda:input"); + from("seda:input").threads(20).process(exchange -> Thread.sleep(100L)).to("mock:result"); } }; }