Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question - Using Confluent Schema Registry and AVRO: How to Serialize and Send a Message with Unknown Structure? #1269

Open
2 of 7 tasks
msempere opened this issue Aug 12, 2024 · 2 comments

Comments

@msempere
Copy link

msempere commented Aug 12, 2024

Description

I'm currently working on a project where I need to serialize and send messages to Confluent Kafka using the Confluent Schema Registry and AVRO. The challenge I'm facing is that the messages have an unknown structure, and I only have the schema ID and topic.

I've tried using interface{} and map[string]interface{} but ser.Serialize(topic, &record) fails and I'm unable to serialize the message. One of the errors I'm getting is unknown type interface.

Detailed Scenario

I have a log file with JSON records that contain the schema ID, topic, and a JSON message to be sent to Confluent Kafka. The JSON message is arbitrary, and I don't know its structure in advance. Here's the structure when reading the records:

type Record struct {
    Topic    string `json:"topic"`
    SchemaID int    `json:"id"`
    Msg      map[string]interface{} `json:"msg"`
}

The Issue

I'm unable to serialize the Msg without knowing its structure. Here is the code I'm using:

configuration := avrov2.NewSerializerConfig()
configuration.UseSchemaID = record.SchemaID
configuration.AutoRegisterSchemas = false

ser, err := avrov2.NewSerializer(schemaRegistryClient, serde.ValueSerde, configuration)
serializedMsg, err := ser.Serialize(record.Topic, &record.Msg)

This throws an unknown type interface error when using Msg map[string]interface{} or interface{}.

Alternative Attempt

As an alternative, I tried using github.com/hamba/avro/v2 to serialize the Msg with the schema retrieved from schemaRegistryClient.GetBySubjectAndID, but it didn't work either:

metadata, err = schemaRegistryClient.GetBySubjectAndID(schemaValue, record.SchemaID)
avroSchema, err = avro.Parse(metadata.Schema)
msg, err = avro.Marshal(avroSchema, record.Msg)

This throws a float64 is unsupported for Avro int error for Msg with just an int value.

Question

How can I accomplish serializing an arbitrary JSON message using AVRO and the schema registry? Any help or guidance would be greatly appreciated.

Thanks!


How to reproduce

See code above. I'm happy to share the full code if needed.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 2.5.0
  • Apache Kafka broker version:
  • Client configuration: ConfigMap{...}
  • Operating system: MacOs
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@chiang8
Copy link

chiang8 commented Aug 29, 2024

I have the same requirement. I found that using map[string]interface{} will break an assumption of the library. ie. It only supports struct.
Reasons:

  • struct is required in order to convert to a schema. On the other hand, map Type cannot be converted to schema.
  • This local schema will be used by SubjectNameStrategy. It may not be necessary for default implementation which takes the topic name as the subject. If your app has to support multiple schemas for the same topic, you may need a different strategy which extracts the record name from the schema.
  • This local schema is required to register schema if AutoRegisterSchemas is true.
  • This local schema is required for lookup if latest schema is not preferred.

It would be nice if this library can support map[string]interface{}, even at the expense of losing some of the features.

@OneCricketeer
Copy link

OneCricketeer commented Oct 16, 2024

I'd suggest using an intermediate data format that encoded maps to text/bytes within the Avro spec. Base64 is a popular option

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants