Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protobuf producer serialization slow #1317

Open
6 tasks done
bichselb opened this issue Oct 16, 2024 · 0 comments
Open
6 tasks done

Protobuf producer serialization slow #1317

bichselb opened this issue Oct 16, 2024 · 0 comments

Comments

@bichselb
Copy link

Description

I am using the protobuf producer outlined here. Unfortunately, its serialization is so slow that it a bottleneck in my whole system. Specifically, it takes 1.6s to serialize 1 million messages, even though pure protobuf can serialize the same 1 million messages in 100ms.

How can I speed up serialization to avoid this 15x overhead? I would like to avoid parallelization.

How to reproduce

I put my whole experimental setup here: https://github.com/bichselb/confluent-kafka-go-performance/tree/serialization-slow

My serialization is a straight-forward adaptation of the protobuf producer example, except that I enabled CacheSchemas (without this, the code is another order of magnitude slower):

func serializeWithSchemaregistry(nMessages int) {
	registry, err := schemaregistry.NewClient(schemaregistry.NewConfig("http://schema-registry:8081"))
	if err != nil {
		fmt.Printf("Error creating schema registry client...\n")
		panic(err)
	}
	defer registry.Close()

	serializer, err := protobuf.NewSerializer(
		registry,
		serde.ValueSerde,
		&protobuf.SerializerConfig{
			SerializerConfig: *serde.NewSerializerConfig(),
			CacheSchemas: true,  // big difference for performance
		},
	)
	if err != nil {
		fmt.Printf("Error creating serializer...\n")
		panic(err)
	}
	defer serializer.Close()

	start := time.Now()
	topic := "mytopic"
	for i := 0; i < nMessages; i++ {
		message := pb.MyMessage{
			A: int32(i),
			B: int32(i),
		}

		payload, err := serializer.Serialize(topic, &message)
		if err != nil {
			fmt.Printf("Error serializing message...\n")
			panic(err)
		}
		// Pseudo-use of payload to prevent compiler optimizations
		_ = copy(payload, payload)
	}
	fmt.Printf("Protobuf+schemaregistry: Serialized %d messages in %v\n", nMessages, time.Since(start))
}

Checklist

Please provide the following information:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants