From 113c54d7924de6fe63e6025bbcee2762b7b0b3f4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 31 Oct 2024 18:04:16 +0800 Subject: [PATCH] (WIP) Refactor ackIDCommon TODO: split this method for list ack --- pulsar/consumer_partition.go | 100 +++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 47 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 5acd69130..86b74b676 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -450,33 +450,69 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") } + if !checkMessageIDType(msgID) { + pc.log.Errorf("invalid message id type %T", msgID) + return fmt.Errorf("invalid message id type %T", msgID) + } + msgIDsToAck := make([]MessageID, 0, 1) + callback := func() {} if cmid, ok := msgID.(*chunkMessageID); ok { - if txn == nil { - return pc.unAckChunksTracker.ack(cmid) + ids := pc.unAckChunksTracker.get(cmid) + for _, id := range ids { + msgIDsToAck = append(msgIDsToAck, id) + } + callback = func() { + pc.unAckChunksTracker.remove(cmid) + } + } else { + msgIDsToAck = append(msgIDsToAck, msgID) + } + + trackingIDs := make([]*trackingMessageID, 0, len(msgIDsToAck)) + completedPositions := make(map[position]struct{}) + + for _, msgIDToAck := range msgIDsToAck { + trackingID := toTrackingMessageID(msgIDToAck) + trackingIDs = append(trackingIDs, trackingID) + if trackingID.ack() { + completedPositions[position{ + ledgerID: uint64(trackingID.ledgerID), + entryID: uint64(trackingID.entryID), + }] = struct{}{} + pc.metrics.AcksCounter.Inc() + pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) } - return pc.unAckChunksTracker.ackWithTxn(cmid, txn) } - trackingID := toTrackingMessageID(msgID) + // The incomplete message IDs can only be handled when the batch index ACK is enabled + incompleteTrackIDs := make([]*trackingMessageID, 0) + for _, trackingID := range trackingIDs { + position := position{ledgerID: uint64(trackingID.ledgerID), entryID: uint64(trackingID.entryID)} + if _, found := completedPositions[position]; !found { + incompleteTrackIDs = append(incompleteTrackIDs, trackingID) + } + } - if trackingID != nil && trackingID.ack() { - // All messages in the same batch have been acknowledged, we only need to acknowledge the - // MessageID that represents the entry that stores the whole batch - trackingID = &trackingMessageID{ + trackingIDsToAck := incompleteTrackIDs + if !pc.options.enableBatchIndexAck { + trackingIDsToAck = make([]*trackingMessageID, 0, len(completedPositions)) + } + for position := range completedPositions { + trackingIDsToAck = append(trackingIDsToAck, &trackingMessageID{ messageID: &messageID{ - ledgerID: trackingID.ledgerID, - entryID: trackingID.entryID, + ledgerID: int64(position.ledgerID), + entryID: int64(position.entryID), }, - } - pc.metrics.AcksCounter.Inc() - pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) - } else if !pc.options.enableBatchIndexAck { - return nil + }) } var err error - if withResponse { + for _, trackingID := range trackingIDsToAck { + if !withResponse { + pc.ackGroupingTracker.add(trackingID) + continue + } if txn != nil { ackReq := pc.sendIndividualAckWithTxn(trackingID, txn.(*transaction)) <-ackReq.doneCh @@ -486,9 +522,8 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn <-ackReq.doneCh err = ackReq.err } - } else { - pc.ackGroupingTracker.add(trackingID) } + callback() pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) return err } @@ -673,18 +708,10 @@ func (pc *partitionConsumer) sendIndividualAckWithTxn(msgID MessageID, txn *tran } func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { - if !checkMessageIDType(msgID) { - pc.log.Errorf("invalid message id type %T", msgID) - return fmt.Errorf("invalid message id type %T", msgID) - } return pc.ackID(msgID, true) } func (pc *partitionConsumer) AckID(msgID MessageID) error { - if !checkMessageIDType(msgID) { - pc.log.Errorf("invalid message id type %T", msgID) - return fmt.Errorf("invalid message id type %T", msgID) - } return pc.ackID(msgID, false) } @@ -2436,27 +2463,6 @@ func (u *unAckChunksTracker) remove(cmid *chunkMessageID) { delete(u.chunkIDs, *cmid) } -func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error { - return u.ackWithTxn(cmid, nil) -} - -func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) error { - ids := u.get(cmid) - for _, id := range ids { - var err error - if txn == nil { - err = u.pc.AckID(id) - } else { - err = u.pc.AckIDWithTxn(id, txn) - } - if err != nil { - return err - } - } - u.remove(cmid) - return nil -} - func (u *unAckChunksTracker) nack(cmid *chunkMessageID) { ids := u.get(cmid) for _, id := range ids {