Skip to content

Commit

Permalink
user should be able to pass custom mcp kafka topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Neelab Chaudhuri committed Oct 31, 2024
1 parent 55e3d1d commit 3b29930
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private Optional<Emitter> getEmitter() {
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,13 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}

return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
}
else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.spark.conf;

import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,8 +12,16 @@
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
String mcpTopic;

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig,
String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class SparkConfigParser {
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

class AvroSerializer {
public class AvroSerializer {

private final Schema _recordSchema;
private final Schema _genericAspectSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class KafkaEmitter implements Emitter {
private final Properties kafkaConfigProperties;
private AvroSerializer _avroSerializer;
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
private final String mcpKafkaTopic;

/**
* The default constructor
Expand All @@ -54,6 +55,34 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException {

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = DEFAULT_MCP_KAFKA_TOPIC;
}

/**
* Constructor that takes in KafkaEmitterConfig
* and custom mcp Kafka Topic Name
*
* @param config
* @throws IOException
*/
public KafkaEmitter(KafkaEmitterConfig config,
String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
kafkaConfigProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
kafkaConfigProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl());
kafkaConfigProperties.putAll(config.getSchemaRegistryConfig());
kafkaConfigProperties.putAll(config.getProducerConfig());

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = mcpKafkaTopic;
}

@Override
Expand All @@ -74,7 +103,7 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback d
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord);
this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {

Expand Down

0 comments on commit 3b29930

Please sign in to comment.