Skip to content

Commit

Permalink
Merge branch 'feature/KafkaConsumeSlowProcessor-Issue#161' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
acaproni committed Apr 24, 2019
2 parents 13373cc + 87ad9c2 commit a8f6119
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.eso.ias.heartbeat.serializer.HbJsonSerializer;
import org.eso.ias.kafkautils.KafkaHelper;
import org.eso.ias.kafkautils.KafkaIasiosConsumer;
import org.eso.ias.kafkautils.KafkaStringsConsumer.StartPosition;
import org.eso.ias.kafkautils.KafkaStringsConsumer.StreamPosition;
import org.eso.ias.kafkautils.SimpleKafkaIasiosConsumer.IasioListener;
import org.eso.ias.kafkautils.SimpleStringProducer;
import org.eso.ias.plugin.publisher.MonitorPointData;
Expand Down Expand Up @@ -248,7 +248,7 @@ public static void classInitializer() throws Exception {
mPointsConsumer.setUp(props);

// Start getting events
mPointsConsumer.startGettingEvents(StartPosition.END,eventsConsumer);
mPointsConsumer.startGettingEvents(StreamPosition.END,eventsConsumer);

// Build the producer that pushes monitor point
// in the kafka topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package org.eso.ias.dasu.subscriber
import java.util
import java.util.{Collection, Properties}

import org.eso.ias.kafkautils.{KafkaHelper, KafkaIasiosConsumer}
import org.eso.ias.kafkautils.KafkaStringsConsumer.StreamPosition
import org.eso.ias.kafkautils.SimpleKafkaIasiosConsumer.IasioListener
import org.eso.ias.kafkautils.KafkaStringsConsumer.StartPosition
import org.eso.ias.kafkautils.{KafkaHelper, KafkaIasiosConsumer}
import org.eso.ias.logging.IASLogger
import org.eso.ias.types.{IASTypes, IASValue}

Expand Down Expand Up @@ -110,7 +110,7 @@ extends IasioListener with InputSubscriber {
this.listener = newListener

Try {
kafkaConsumer.startGettingEvents(StartPosition.END, this)
kafkaConsumer.startGettingEvents(StreamPosition.END, this)
KafkaSubscriber.logger.info("The subscriber of [{}] is polling events from kafka",dasuId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.eso.ias.cdb.pojos.DasuDao
import org.eso.ias.dasu.DasuImpl
import org.eso.ias.dasu.publisher.KafkaPublisher
import org.eso.ias.dasu.subscriber.KafkaSubscriber
import org.eso.ias.kafkautils.KafkaStringsConsumer.StartPosition
import org.eso.ias.kafkautils.KafkaStringsConsumer.StreamPosition
import org.eso.ias.kafkautils.SimpleStringConsumer.KafkaConsumerListener
import org.eso.ias.kafkautils.{KafkaHelper, SimpleStringConsumer, SimpleStringProducer}
import org.eso.ias.logging.IASLogger
Expand Down Expand Up @@ -81,7 +81,7 @@ class DasuWithKafkaPubSubTest extends FlatSpec with KafkaConsumerListener {
val props = new Properties()
props.setProperty("group.id", "DasuTest-groupID")
eventsListener.setUp(props)
eventsListener.startGettingEvents(StartPosition.END,this)
eventsListener.startGettingEvents(StreamPosition.END,this)

val stringPublisher = new SimpleStringProducer(
KafkaHelper.DEFAULT_BOOTSTRAP_BROKERS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class CalcConversionTime(servers: String)
this.setUp()

CalcConversionTime.logger.info("Start processing events from the core topic")
this.startGettingEvents(KafkaStringsConsumer.StartPosition.END,this)
this.startGettingEvents(KafkaStringsConsumer.StreamPosition.END,this)
CalcConversionTime.logger.info("Initialization done")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class HbKafkaConsumer(brokers: String, consumerId: String)
HbKafkaConsumer.logger.debug("Initializing the string consumer")
stringConsumer.setUp()
HbKafkaConsumer.logger.debug("Starting the string consumer")
stringConsumer.startGettingEvents(this,KafkaStringsConsumer.StartPosition.END)
stringConsumer.startGettingEvents(this,KafkaStringsConsumer.StreamPosition.END)
HbKafkaConsumer.logger.info("Initialized")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
* <EM>Life cycle</em>: {@link #setUp()} or {@link #setUp(Properties)}
* must be called to initialize the object;
* {@link #tearDown()} must be called when finished using the object;
* {@link #startGettingEvents(StringsConsumer, StartPosition)} must be called to start
* {@link #startGettingEvents(StringsConsumer, StreamPosition)} must be called to start
* polling events from the kafka topic
*
* {@link #startGettingEvents(StringsConsumer, StartPosition)} returns when the consumer has been assigned to
* {@link #startGettingEvents(StringsConsumer, StreamPosition)} returns when the consumer has been assigned to
* at least one partition. There are situations when the partitions assigned to the consumer
* can be revoked and reassigned like for example when another consumer subscribe or disconnect
* as the assignment of consumers to partitions is left to kafka in this version.
Expand All @@ -66,11 +66,11 @@ public interface StringsConsumer {
*
* @author acaproni
*/
public enum StartPosition {
public enum StreamPosition {
// The default position, usually set in kafka configurations
DEFAULT,
// Get events from the beginning of the partition
BEGINNING,
BEGIN,
// Get events from the end of the partition
END
}
Expand Down Expand Up @@ -148,7 +148,7 @@ public enum StartPosition {
/**
* The position to start reading from
*/
private StartPosition startReadingPos = StartPosition.DEFAULT;
private StreamPosition startReadingPos = StreamPosition.DEFAULT;

/**
* Max time to wait for the assignement of partitions before polling
Expand All @@ -162,6 +162,12 @@ public enum StartPosition {
*/
private final CountDownLatch polling = new CountDownLatch(1);

/**
* Signal that a partition has been assigned: seeking is done onlyt if a partition
* is assigned
*/
private final AtomicBoolean isPartitionAssigned = new AtomicBoolean(false);

/**
* @return the number of records processed
*/
Expand Down Expand Up @@ -234,7 +240,7 @@ public synchronized void setUp(Properties userPros) {
*/
public synchronized void startGettingEvents(
StringsConsumer listener,
StartPosition startReadingFrom)
StreamPosition startReadingFrom)
throws KafkaUtilsException {
Objects.requireNonNull(startReadingFrom);
Objects.requireNonNull(listener);
Expand Down Expand Up @@ -332,7 +338,7 @@ public void setUp() {
*/
public synchronized void tearDown() {
if (isClosed.get()) {
KafkaStringsConsumer.logger.debug("Consumer [{}] already closed",consumerID);
KafkaStringsConsumer.logger.warn("Consumer [{}] already closed",consumerID);
return;
}
KafkaStringsConsumer.logger.debug("Closing consumer [{}]...",consumerID);
Expand All @@ -353,7 +359,7 @@ public synchronized void tearDown() {
public void run() {
KafkaStringsConsumer.logger.info("Thread of consumer [{}] to get events from the topic {} started",consumerID,topicName);

if (startReadingPos==StartPosition.DEFAULT) {
if (startReadingPos== StreamPosition.DEFAULT) {
consumer.subscribe(Arrays.asList(topicName));
} else {
consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
Expand All @@ -378,17 +384,19 @@ private String formatPartitionsStr(Collection<TopicPartition> parts) {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> parts) {
isPartitionAssigned.set(false);
KafkaStringsConsumer.logger.info("Partition(s) of consumer [{}] revoked: {}",consumerID, formatPartitionsStr(parts));

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> parts) {
isPartitionAssigned.set(false);
KafkaStringsConsumer.logger.info("Consumer [{}] assigned to {} partition(s): {}",
consumerID,
parts.size(),
formatPartitionsStr(parts));
if (startReadingPos==StartPosition.BEGINNING) {
if (startReadingPos== StreamPosition.BEGIN) {
consumer.seekToBeginning(new ArrayList<>());
} else {
consumer.seekToEnd(new ArrayList<>());
Expand Down Expand Up @@ -447,4 +455,26 @@ private void notifyListener(ConsumerRecords<String, String> records) {
}
}

/**
* Seek the consumer to the passed postion.
* Kafka allows to seek only if a partition is assigned
* otherwise the seek is ignored.
*
* @param pos The position to seek the consumer
* @return true is the seek has been done with an asigned partiton;
* false otherwise
*/
public boolean seekTo(StreamPosition pos) {
Objects.requireNonNull(pos,"Invalid position given");
if (!isClosed.get() || !isPartitionAssigned.get()) {
if (pos== StreamPosition.BEGIN) {
consumer.seekToBeginning(new ArrayList<>());
} else {
consumer.seekToEnd(new ArrayList<>());
}
return true;
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.eso.ias.kafkautils;

import org.eso.ias.types.IASTypes;
import org.eso.ias.types.IASValue;
import org.eso.ias.types.IasValueJsonSerializer;
import org.eso.ias.types.IasValueStringSerializer;
Expand All @@ -12,6 +11,13 @@
/**
* KafkaIasiosConsumer gets the strings from the passed IASIO kafka topic
* from the {@link KafkaStringsConsumer} and forwards the IASIOs to the listener.
*
* The SimpleKafkaIasiosConsumer checks the timestaps of the received IASIOs against a threshold.
* If the timestamp is too old it means that the reciver is slow processing events: old IASIOs are discarded,
* a log is emitted and the consumers seeks to the end of the topic.
*
* To decide what old means, the user must set the {@link #SeekIfOlderThanProName} to the number
* of desired milliseconds otherwise {@link #SeekIfOlderThanDefault} is used
*/
public class SimpleKafkaIasiosConsumer implements KafkaStringsConsumer.StringsConsumer {
/**
Expand Down Expand Up @@ -53,6 +59,28 @@ public interface IasioListener {
*/
private final KafkaStringsConsumer stringsConsumer;

/**
* The property to set the number of milliseconds between the actual time and the time of
* the records and seek to the end of the topic
*/
public static final String SeekIfOlderThanProName="org.eso.ias.kafkautils.iasioconsumer.seekifolderthan";

/**
* The default number of milliseconds between the actual time and the time when records have been
* sent to the kafka topic to seek to the end of the topic
* It is disabled by default.
*
* @see #seekIfOlderThan
*/
private static final long SeekIfOlderThanDefault = Long.MAX_VALUE;

/**
* If the difference (millisecs) between the timestamp of the records read from kafka topic
* and the actual time is greater than seekIfOlderThan, then the consumer seek
* to the end of topic
*/
public final long seekIfOlderThan;

/**
* Build a FilteredStringConsumer with no filters (i.e. all the
* strings read from the kafka topic are forwarded to the listener)
Expand All @@ -62,6 +90,10 @@ public interface IasioListener {
* @param consumerID the ID of the consumer
*/
public SimpleKafkaIasiosConsumer(String servers, String topicName, String consumerID) {
seekIfOlderThan=Long.getLong(
SimpleKafkaIasiosConsumer.SeekIfOlderThanProName,
SimpleKafkaIasiosConsumer.SeekIfOlderThanDefault);

stringsConsumer = new SimpleStringConsumer(servers, topicName, consumerID);
}

Expand All @@ -76,7 +108,7 @@ public SimpleKafkaIasiosConsumer(String servers, String topicName, String consum
* @param listener The listener of events published in the topic
* @throws KafkaUtilsException in case of timeout subscribing to the kafkatopic
*/
public void startGettingEvents(KafkaStringsConsumer.StartPosition startReadingFrom, KafkaIasiosConsumer.IasioListener listener)
public void startGettingEvents(KafkaStringsConsumer.StreamPosition startReadingFrom, KafkaIasiosConsumer.IasioListener listener)
throws KafkaUtilsException {
Objects.requireNonNull(listener);
this.iasioListener=listener;
Expand All @@ -93,27 +125,47 @@ public void startGettingEvents(KafkaStringsConsumer.StartPosition startReadingFr
public void stringsReceived(Collection<String> strings) {
assert(strings!=null && !strings.isEmpty());

// Store the max difference between the actual timestamp
// and the time when the passed IASValuehave been sent to the
// kafka the topic
long maxTimeDifference=0;
Collection<IASValue<?>> ret = new ArrayList<>();
strings.forEach( str -> {

Iterator<String> iterator = strings.iterator();
while (iterator.hasNext()) {
String str = iterator.next();
IASValue<?> iasio;
try {
iasio = serializer.valueOf(str);
} catch (Exception e) {
logger.error("Error building the IASValue from string [{}]: value lost",str,e);
return;
continue;
}
if (iasio.sentToBsdbTStamp.isPresent()) {
maxTimeDifference = Long.max(maxTimeDifference,System.currentTimeMillis()-iasio.sentToBsdbTStamp.get());
};
if (accept(iasio)) {
ret.add(iasio.updateReadFromBsdbTime(System.currentTimeMillis()));
}
});
}

try {
logger.debug("Notifying {} IASIOs to the listener listener",ret.size());
if (!ret.isEmpty()) {
iasioListener.iasiosReceived(ret);
// Check if the porcessed records are too old and publishes only if they are enough recent
// If they are too old than the consumer is too slow processing events and seek to the end
if (maxTimeDifference<=seekIfOlderThan) {
try {
logger.debug("Notifying {} IASIOs to the listener listener", ret.size());
if (!ret.isEmpty()) {
iasioListener.iasiosReceived(ret);
}
} catch (Exception e) {
logger.error("Error notifying IASValues to the listener: {} values potentially lost", ret.size(), e);
}
} else {
logger.warn("Consumer too slow processing events: seeking to the end of the topic ({} ISOIOs discarded)",ret.size());
boolean seekOk=stringsConsumer.seekTo(KafkaStringsConsumer.StreamPosition.END);
if (!seekOk) {
logger.error("Cannot seek to end of stream");
}
} catch (Exception e) {
logger.error("Error notifying IASValues to the listener: {} values potentially lost",ret.size(),e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface KafkaConsumerListener {
/**
* The position to start reading from
*/
private StartPosition startReadingPos = StartPosition.DEFAULT;
private StreamPosition startReadingPos = StreamPosition.DEFAULT;

/**
* Max time to wait for the assignement of partitions before polling
Expand Down Expand Up @@ -96,7 +96,7 @@ public SimpleStringConsumer(String servers, String topicName, String consumerID)
* @param listener The listener of events published in the topic
* @throws KafkaUtilsException in case of timeout subscribing to the kafkatopic
*/
public synchronized void startGettingEvents(StartPosition startReadingFrom, KafkaConsumerListener listener)
public synchronized void startGettingEvents(StreamPosition startReadingFrom, KafkaConsumerListener listener)
throws KafkaUtilsException {
Objects.requireNonNull(listener);
super.startGettingEvents(this,startReadingFrom);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.eso.ias.kafkautils.test;

import org.eso.ias.kafkautils.KafkaHelper;
import org.eso.ias.kafkautils.KafkaStringsConsumer.StartPosition;
import org.eso.ias.kafkautils.KafkaStringsConsumer.StreamPosition;
import org.eso.ias.kafkautils.SimpleStringConsumer;
import org.eso.ias.kafkautils.SimpleStringConsumer.KafkaConsumerListener;
import org.eso.ias.kafkautils.SimpleStringProducer;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void setUp() throws Exception {
producer = new SimpleStringProducer(KafkaHelper.DEFAULT_BOOTSTRAP_BROKERS, topicName, "Consumer-ID");
producer.setUp();

consumer.startGettingEvents(StartPosition.END,this);
consumer.startGettingEvents(StreamPosition.END,this);
logger.info("Initialized.");
}

Expand Down
Loading

0 comments on commit a8f6119

Please sign in to comment.