Skip to content

Commit

Permalink
Renames the client.id used by the kafka producers @see #110
Browse files Browse the repository at this point in the history
  • Loading branch information
acaproni committed Jun 18, 2018
1 parent 5c3542f commit 457c9f9
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ConverterStream converterStream(String cID, Optional<String> kafkaBrokers
@Lazy(value = true)
public HbProducer hbProducer(String cID, Optional<String> kafkaBrokers, Properties props) {
return new HbKafkaProducer(
cID, getKafkaServer(kafkaBrokers, props),
cID+"HBSender", getKafkaServer(kafkaBrokers, props),
new HbJsonSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public static void main(String[] args) {
pluginConfig.getSinkPort(),
Plugin.getScheduledExecutorService());

HbProducer hbProducer = new HbKafkaProducer(pluginConfig.getId(), new HbJsonSerializer());
HbProducer hbProducer = new HbKafkaProducer(pluginConfig.getId()+"HBSender", new HbJsonSerializer());

UdpPlugin udpPlugin = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ object Supervisor {
val hbProducer: HbProducer = {
val kafkaServers = System.getProperties.getProperty(KafkaHelper.BROKERS_PROPNAME,KafkaHelper.DEFAULT_BOOTSTRAP_BROKERS)

new HbKafkaProducer(supervisorId,kafkaServers,new HbJsonSerializer())
new HbKafkaProducer(supervisorId+"HBSender",kafkaServers,new HbJsonSerializer())
}

// Build the supervisor
Expand Down

0 comments on commit 457c9f9

Please sign in to comment.