Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

user should be able to pass custom mcp kafka topic #11767

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private Optional<Emitter> getEmitter() {
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,12 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}

return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
} else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.spark.conf;

import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,8 +12,15 @@
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
String mcpTopic;

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class SparkConfigParser {
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

class AvroSerializer {
public class AvroSerializer {

private final Schema _recordSchema;
private final Schema _genericAspectSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter {
private final Properties kafkaConfigProperties;
private AvroSerializer _avroSerializer;
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
private final String mcpKafkaTopic;

/**
* The default constructor
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}

/**
* Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name
*
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
Expand All @@ -54,6 +65,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException {

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = mcpKafkaTopic;
}

@Override
Expand All @@ -73,8 +85,7 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback d
throws IOException {
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord);
new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {

Expand Down
Loading