Skip to content

Commit

Permalink
Add a name to data provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Ribeaud committed Mar 10, 2017
1 parent d90323f commit a244f93
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.camel</groupId>
<artifactId>camel-data-provider</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>

<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -24,6 +25,8 @@ public class DataProviderConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(DataProviderConsumer.class);

private volatile Range<Integer> range;
// We need this to avoid multiple INFO logs when all batches have been processed
private final AtomicBoolean finished = new AtomicBoolean(false);
private final Lock rangeLock = new ReentrantLock();

DataProviderConsumer(DataProviderEndpoint dataProviderEndpoint, Processor processor) {
Expand All @@ -40,7 +43,8 @@ protected void doStart() throws Exception {
} finally {
rangeLock.unlock();
}
LogUtils.info(LOG, () -> String.format("Preparing to handle %d partition(s) (%d / %d)",
LogUtils.info(LOG, () -> String.format("[%s] Preparing to handle %d partition(s) (%d / %d).",
dataProvider.getName(),
(int) Math.ceil((float) size / maxMessagesPerPoll), size, maxMessagesPerPoll));
super.doStart();
}
Expand Down Expand Up @@ -82,11 +86,13 @@ protected int poll() throws Exception {
try {
Range<Integer> range = this.range;
if (range.isEmpty()) {
LogUtils.info(LOG, () -> "Nothing to poll. Last range handled.");
if (!finished.getAndSet(true)) {
LogUtils.info(LOG, () -> String.format("[%s] Nothing to poll. Last range handled.", dataProvider.getName()));
}
return 0;
}
// Process current range
LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range));
LogUtils.info(LOG, () -> String.format("[%s] Handling range '%s'.", dataProvider.getName(), range));
int index = range.lowerEndpoint();
int size = dataProvider.getSize();
for (Object item : dataProvider.partition(range)) {
Expand All @@ -99,15 +105,15 @@ protected int poll() throws Exception {
}
// Prepare next range
Range<Integer> nextRange = createNextRange(range.upperEndpoint(), size);
LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange));
LogUtils.debug(LOG, () -> String.format("[%s] Next range will be '%s'.", dataProvider.getName(), nextRange));
this.range = nextRange;
} finally {
rangeLock.unlock();
}
Stopwatch stopwatch = Stopwatch.createStarted();
int processBatch = processBatch(CastUtils.cast(exchanges));
stopwatch.stop();
LogUtils.debug(LOG, () -> String.format("Processing of %d exchanges took '%s'.", processBatch, stopwatch));
LogUtils.debug(LOG, () -> String.format("[%s] Processing of %d exchanges took '%s'.", dataProvider.getName(), processBatch, stopwatch));
return processBatch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,16 @@ public interface IDataProvider<T> {
* @return the partition associated to given <i>range</i>. Never <code>null</code>.
*/
public Iterable<T> partition(Range<Integer> range);

/**
* Returns a name uniquely identifying this data provider.
* <p>
* Especially useful when dealing with several data providers.
* </p>
*
* @return {@code getClass().getSimpleName()} by default.
*/
public default String getName() {
return getClass().getSimpleName();
}
}

0 comments on commit a244f93

Please sign in to comment.