diff --git a/pom.xml b/pom.xml index ad6dc02..734c54d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 org.apache.camel camel-data-provider - 1.0.3 + 1.0.4 jar diff --git a/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java index 42a2bf3..7986981 100644 --- a/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java +++ b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java @@ -11,6 +11,7 @@ import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -24,6 +25,8 @@ public class DataProviderConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(DataProviderConsumer.class); private volatile Range range; + // We need this to avoid multiple INFO logs when all batches have been processed + private final AtomicBoolean finished = new AtomicBoolean(false); private final Lock rangeLock = new ReentrantLock(); DataProviderConsumer(DataProviderEndpoint dataProviderEndpoint, Processor processor) { @@ -40,7 +43,8 @@ protected void doStart() throws Exception { } finally { rangeLock.unlock(); } - LogUtils.info(LOG, () -> String.format("Preparing to handle %d partition(s) (%d / %d)", + LogUtils.info(LOG, () -> String.format("[%s] Preparing to handle %d partition(s) (%d / %d).", + dataProvider.getName(), (int) Math.ceil((float) size / maxMessagesPerPoll), size, maxMessagesPerPoll)); super.doStart(); } @@ -82,11 +86,13 @@ protected int poll() throws Exception { try { Range range = this.range; if (range.isEmpty()) { - LogUtils.info(LOG, () -> "Nothing to poll. Last range handled."); + if (!finished.getAndSet(true)) { + LogUtils.info(LOG, () -> String.format("[%s] Nothing to poll. Last range handled.", dataProvider.getName())); + } return 0; } // Process current range - LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range)); + LogUtils.info(LOG, () -> String.format("[%s] Handling range '%s'.", dataProvider.getName(), range)); int index = range.lowerEndpoint(); int size = dataProvider.getSize(); for (Object item : dataProvider.partition(range)) { @@ -99,7 +105,7 @@ protected int poll() throws Exception { } // Prepare next range Range nextRange = createNextRange(range.upperEndpoint(), size); - LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange)); + LogUtils.debug(LOG, () -> String.format("[%s] Next range will be '%s'.", dataProvider.getName(), nextRange)); this.range = nextRange; } finally { rangeLock.unlock(); @@ -107,7 +113,7 @@ protected int poll() throws Exception { Stopwatch stopwatch = Stopwatch.createStarted(); int processBatch = processBatch(CastUtils.cast(exchanges)); stopwatch.stop(); - LogUtils.debug(LOG, () -> String.format("Processing of %d exchanges took '%s'.", processBatch, stopwatch)); + LogUtils.debug(LOG, () -> String.format("[%s] Processing of %d exchanges took '%s'.", dataProvider.getName(), processBatch, stopwatch)); return processBatch; } diff --git a/src/main/java/org/apache/camel/component/dataprovider/IDataProvider.java b/src/main/java/org/apache/camel/component/dataprovider/IDataProvider.java index cb8ef1c..4905c1e 100644 --- a/src/main/java/org/apache/camel/component/dataprovider/IDataProvider.java +++ b/src/main/java/org/apache/camel/component/dataprovider/IDataProvider.java @@ -24,4 +24,16 @@ public interface IDataProvider { * @return the partition associated to given range. Never null. */ public Iterable partition(Range range); + + /** + * Returns a name uniquely identifying this data provider. + *

+ * Especially useful when dealing with several data providers. + *

+ * + * @return {@code getClass().getSimpleName()} by default. + */ + public default String getName() { + return getClass().getSimpleName(); + } }