Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer messages stuck flushing even though there weren't any writes. #1194

Open
7 tasks done
ylhan opened this issue May 20, 2024 · 4 comments
Open
7 tasks done

Producer messages stuck flushing even though there weren't any writes. #1194

ylhan opened this issue May 20, 2024 · 4 comments

Comments

@ylhan
Copy link

ylhan commented May 20, 2024

Description

Scenario:

  1. I have a producer running for 4 days
  2. No writes at all
  3. Calling flush on this producer (timeout=10 seconds) results in 2 unflushed messages

I'm at wits' end here. I didn't write anything using this producer yet it complains that two messages are not flushed.

I dug into the library code a bit and perhaps this could be an issue with the Len() method? Why does this method add up the lengths of 3 different queues? Why do I have unflushed messages when I did not call write on the producer even once?

How to reproduce

Instantiate a kafka producer with the configuration map below, do not write anything, and flush the producer.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): v2.3.1-0.20240315214844-f1230c0e9dd4
  • Apache Kafka broker version: 3.2.0
  • Client configuration: ConfigMap{...}:
&kafka.ConfigMap{
   "client.id":         "blah",
   "bootstrap.servers": ..., 
   "sasl.mechanism": "SCRAM-SHA-512",
   "security.protocol": "SASL_SSL",
   "sasl.username": ...,
   "sasl.password": ...,
   "acks": "all",
   "enable.idempotence": "true",
   "max.in.flight.requests.per.connection": 5,
   "linger.ms": 100,
   "retries": 5,
   "batch.size": 2000,
   "request.timeout.ms": 2000,
}
  • Operating system: docker container - base image: golang:1.21-bullseye
  • Provide client logs (with "debug": ".." as necessary)
    I wrapped confluent kafka's flush like so:
func (p *Producer) Close(context.Context) error {
	defer p.writer.Close()
	unflushed := p.writer.Flush(p.flushTimeoutMs)
	if unflushed > 0 {
		p.log.Error("Failed to flush messages", zap.Int("unflushed", unflushed))
		return errors.Errorf("failed to flush %d messages", unflushed)
	}
	return nil
}

When the client shutdown, I get the following error:

failed to flush 2 messages
  • Provide broker log excerpts: attached. There are multiple services connected to the broker but service is called market-connector. Nothing very telling in the logs from my review.
  • Critical issue: No

extract-2024-05-20T21_40_38.964Z.csv

@milindl
Copy link
Contributor

milindl commented May 23, 2024

Hi @ylhan , Is there a goroutine or something that is reading from the producer.Events() channel? That channel needs to be read from continuously, as there might be some administrative/control events on that channel, and until that channel is drained, Flush would complain.

It's necessary to read from this channel while running a producer, even if you choose to set per-message delivery channels.

@ylhan
Copy link
Author

ylhan commented Jun 18, 2024

Yes we have a goroutine running in the background consuming any result messages asynchronously.

@guotie
Copy link

guotie commented Jul 17, 2024

same problem

when I produce message, and recv kafka.ErrQueueFull error, I do flush(), but I found it stucked.

@guotie
Copy link

guotie commented Jul 18, 2024

If I produce message as following:

			err = p.Produce(&kafka.Message{
				TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
				Value:          buf,
			}, delivery)
			if err == nil {
				<-delivery
			}

then, When I do p.Flush(10000), it will sucks.

And if I set delivery to nil, then Flush is Ok.

Why?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants