diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 48411b4e0c..e74fd984f4 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -848,11 +848,12 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, type pendingItem struct { sync.Mutex - buffer internal.Buffer - sequenceID uint64 - sentAt time.Time - sendRequests []interface{} - completed bool + buffer internal.Buffer + sequenceID uint64 + sentAt time.Time + sendRequests []interface{} + completed bool + flushCallback func(err error) } func (p *partitionProducer) internalFlushCurrentBatch() { @@ -990,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the sending has completed with error, flush make no effect - pi.Complete() + pi.Complete(errSendTimeout) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1062,15 +1063,10 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { return } - sendReq := &sendRequest{ - msg: nil, - callback: func(id MessageID, message *ProducerMessage, e error) { - fr.err = e - close(fr.doneCh) - }, + pi.flushCallback = func(err error) { + fr.err = err + close(fr.doneCh) } - - pi.sendRequests = append(pi.sendRequests, sendReq) } func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { @@ -1208,27 +1204,17 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) pi.Lock() defer pi.Unlock() p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) - batchSize := int32(0) - for _, i := range pi.sendRequests { - sr := i.(*sendRequest) - if sr.msg != nil { - batchSize = batchSize + 1 - } else { // Flush request - break - } - } + batchSize := int32(len(pi.sendRequests)) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))) - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - } + atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) + p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))) + p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) + p.metrics.MessagesPublished.Inc() + p.metrics.MessagesPending.Dec() + payloadSize := float64(len(sr.msg.Payload)) + p.metrics.BytesPublished.Add(payloadSize) + p.metrics.BytesPending.Sub(payloadSize) if sr.callback != nil || len(p.options.Interceptors) > 0 { msgID := newMessageID( @@ -1274,7 +1260,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.Complete() + pi.Complete(nil) } } @@ -1384,12 +1370,15 @@ type flushRequest struct { err error } -func (i *pendingItem) Complete() { +func (i *pendingItem) Complete(err error) { if i.completed { return } i.completed = true buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } } // _setConn sets the internal connection field of this partition producer atomically.