From 5c5af6ada9e7352cb6fd5bef09cc23d8d880cf39 Mon Sep 17 00:00:00 2001 From: Jiaqi Shen <18863662628@163.com> Date: Thu, 20 Jul 2023 19:35:05 +0800 Subject: [PATCH] [fix] [issue 1057]: Fix the producer flush opertion is not guarantee to flush all messages (#1058) Fixes #1057 ### Motivation `dataChan` is introduced by #1029 to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed. ### Modifications - Fix the producer flush opertion is not guarantee to flush all messages (cherry picked from commit 9867c29ca329302e97ddd9c6a99f66853c7f447f) --- pulsar/producer_partition.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e8ad1953b8..00d4e6f0fb 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1048,6 +1048,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { + // clear all the messages which have sent to dataChan before flush + if len(p.dataChan) != 0 { + oldDataChan := p.dataChan + p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) + for len(oldDataChan) != 0 { + pendingData := <-oldDataChan + p.internalSend(pendingData) + } + } p.internalFlushCurrentBatch()