diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 071f52c4aa..9a761b33c3 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -316,7 +316,7 @@ private void commitToKafka(AtlasKafkaMessage messageToCommit int partitionId = messageToCommit.getPartition(); if (offsetOfLastMessageCommittedToKafka < messageOffset) { - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId); + TopicPartition partition = new TopicPartition(messageToCommit.getTopic(), partitionId); try { if (LOG.isDebugEnabled()) { LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka");