Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] The producer flush opertion is not guarantee to flush all messages #1057

Closed
RobertIndie opened this issue Jul 18, 2023 · 5 comments · Fixed by #1058
Closed

[BUG] The producer flush opertion is not guarantee to flush all messages #1057

RobertIndie opened this issue Jul 18, 2023 · 5 comments · Fixed by #1058
Assignees
Milestone

Comments

@RobertIndie
Copy link
Member

Expected behavior

The producer.Flush should guarantee that all the messages should be flushed to the broker.

Actual behavior

The flush operation may skip some messages in the producer.

Steps to reproduce

producer.SendAsync(context.Background(), &ProducerMessage{
		Payload: []byte("hello"),
	}, nil)
err = producer.Flush()
if err != nil {
	return
}

Currently, the Flush cannot guarantee that the message hello is actually flushed to the broker.

The root cause is that when calling the SendAsync, the producer will create another go routine to put the message to the queue.
Therefore, when flush is called, the message may not have entered the queue yet.

System configuration

Pulsar Go version: master

@gunli
Copy link
Contributor

gunli commented Jul 18, 2023

@RobertIndie @Gleiphir2769 I have reported a bug in #1042 about this, and @zengguan is prepareing a PR to fix it.

Thanks. @gunli I also found a bug related to this: #1057 The initial idea I came up with is to have the operation of pushing a message to the producer queue happen in the user thread. Just like the Java client did. Let's move this discussion into that issue(or a new issue if it's not related).

And I have reported a issue in #1043 about how to improve the message enqueue logic. I don't know if there is a performance issue in JAVA client, if no, we implement this Go client like the JAVA client did will be fine, if yes, a dynamic length of pendingItem queue may be an alternate solution, we don't care if the pendingItem is full, just use the semaphore to keep track of how many messages are pending, when semaphore is available, send it, when not, block it. If a message has to be chunked, we treat it as ONE message, but more than one pending items. When close, flush them into the pending queue and wait for them done.

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Jul 18, 2023

Hi @RobertIndie, good cache! It seems a new problem introduced in #1029. Before #1029 , flush cmd and send cmd are passed in the same channel eventChan and it makes sure all the messages in the eventsChan have handled by internalSend before internalFlush.

Now we should clear dataChan and put all the messages to internalSent before internalFlush.

If you have no time on this thread, I think I can fix it.

@gunli
Copy link
Contributor

gunli commented Jul 19, 2023

@Gleiphir2769 @RobertIndie Flush() just flush the message in the pending queue..., I thought we intentionally designed it this way, so in #1042 I just mentioned that Close() should consume the message int dataChan...

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Jul 19, 2023

I thought we intentionally designed it this way

@gunli Yes, you're right. Because in the original design there is only one channel eventChan to handle both send and flush cmd. Which means when flush cmd arrived, all the send have been handled by internalSend and send to pendingQueue. After #1029 , eventChan has been split into dataChan and cmdChan and the order guarantee has been broken.

@RobertIndie
Copy link
Member Author

Thank all of you for figuring out the issue. @gunli @Gleiphir2769

So looks like it's a regression bug introduced in #1029

RobertIndie pushed a commit that referenced this issue Jul 20, 2023
…to flush all messages (#1058)

Fixes #1057 

### Motivation

`dataChan` is introduced by #1029  to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed.

### Modifications

- Fix the producer flush opertion is not guarantee to flush all messages
RobertIndie pushed a commit that referenced this issue Sep 7, 2023
…to flush all messages (#1058)

Fixes #1057

### Motivation

`dataChan` is introduced by #1029  to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed.

### Modifications

- Fix the producer flush opertion is not guarantee to flush all messages

(cherry picked from commit 9867c29)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants