Skip to content

Commit

Permalink
overload constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
Neelab Chaudhuri committed Oct 31, 2024
1 parent 6f73d5e commit 27b72af
Showing 1 changed file with 2 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,12 @@ public class KafkaEmitter implements Emitter {
* @throws IOException
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
kafkaConfigProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
kafkaConfigProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl());
kafkaConfigProperties.putAll(config.getSchemaRegistryConfig());
kafkaConfigProperties.putAll(config.getProducerConfig());

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = DEFAULT_MCP_KAFKA_TOPIC;
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}

/**
* Constructor that takes in KafkaEmitterConfig
* and custom mcp Kafka Topic Name
* and mcp Kafka Topic Name
*
* @param config
* @throws IOException
Expand Down

0 comments on commit 27b72af

Please sign in to comment.