Skip to content

Commit

Permalink
RANGER-4349: AtlasTagSource.commitToKafka() should commit the offset …
Browse files Browse the repository at this point in the history
…to the topic from which the event came from.

Signed-off-by: szymonorz <szyorz@proton.me>
Signed-off-by: Ramesh Mani <rmani@apache.org>
  • Loading branch information
szymonorz authored and Ramesh Mani committed Aug 9, 2023
1 parent d0764e2 commit a546180
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit
int partitionId = messageToCommit.getPartition();

if (offsetOfLastMessageCommittedToKafka < messageOffset) {
TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId);
TopicPartition partition = new TopicPartition(messageToCommit.getTopic(), partitionId);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka");
Expand Down

0 comments on commit a546180

Please sign in to comment.