Skip to content

Commit

Permalink
(WIP) Refactor ackIDCommon
Browse files Browse the repository at this point in the history
TODO: split this method for list ack
  • Loading branch information
BewareMyPower committed Oct 31, 2024
1 parent 1c4792f commit 113c54d
Showing 1 changed file with 53 additions and 47 deletions.
100 changes: 53 additions & 47 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,33 +450,69 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
}
if !checkMessageIDType(msgID) {
pc.log.Errorf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
}

msgIDsToAck := make([]MessageID, 0, 1)
callback := func() {}
if cmid, ok := msgID.(*chunkMessageID); ok {
if txn == nil {
return pc.unAckChunksTracker.ack(cmid)
ids := pc.unAckChunksTracker.get(cmid)
for _, id := range ids {
msgIDsToAck = append(msgIDsToAck, id)
}
callback = func() {
pc.unAckChunksTracker.remove(cmid)
}
} else {
msgIDsToAck = append(msgIDsToAck, msgID)
}

trackingIDs := make([]*trackingMessageID, 0, len(msgIDsToAck))
completedPositions := make(map[position]struct{})

for _, msgIDToAck := range msgIDsToAck {
trackingID := toTrackingMessageID(msgIDToAck)
trackingIDs = append(trackingIDs, trackingID)
if trackingID.ack() {
completedPositions[position{
ledgerID: uint64(trackingID.ledgerID),
entryID: uint64(trackingID.entryID),
}] = struct{}{}
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
}
return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}

trackingID := toTrackingMessageID(msgID)
// The incomplete message IDs can only be handled when the batch index ACK is enabled
incompleteTrackIDs := make([]*trackingMessageID, 0)
for _, trackingID := range trackingIDs {
position := position{ledgerID: uint64(trackingID.ledgerID), entryID: uint64(trackingID.entryID)}
if _, found := completedPositions[position]; !found {
incompleteTrackIDs = append(incompleteTrackIDs, trackingID)
}
}

if trackingID != nil && trackingID.ack() {
// All messages in the same batch have been acknowledged, we only need to acknowledge the
// MessageID that represents the entry that stores the whole batch
trackingID = &trackingMessageID{
trackingIDsToAck := incompleteTrackIDs
if !pc.options.enableBatchIndexAck {
trackingIDsToAck = make([]*trackingMessageID, 0, len(completedPositions))
}
for position := range completedPositions {
trackingIDsToAck = append(trackingIDsToAck, &trackingMessageID{
messageID: &messageID{
ledgerID: trackingID.ledgerID,
entryID: trackingID.entryID,
ledgerID: int64(position.ledgerID),
entryID: int64(position.entryID),
},
}
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
} else if !pc.options.enableBatchIndexAck {
return nil
})
}

var err error
if withResponse {
for _, trackingID := range trackingIDsToAck {
if !withResponse {
pc.ackGroupingTracker.add(trackingID)
continue
}
if txn != nil {
ackReq := pc.sendIndividualAckWithTxn(trackingID, txn.(*transaction))
<-ackReq.doneCh
Expand All @@ -486,9 +522,8 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
<-ackReq.doneCh
err = ackReq.err
}
} else {
pc.ackGroupingTracker.add(trackingID)
}
callback()
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
return err
}
Expand Down Expand Up @@ -673,18 +708,10 @@ func (pc *partitionConsumer) sendIndividualAckWithTxn(msgID MessageID, txn *tran
}

func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
if !checkMessageIDType(msgID) {
pc.log.Errorf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
}
return pc.ackID(msgID, true)
}

func (pc *partitionConsumer) AckID(msgID MessageID) error {
if !checkMessageIDType(msgID) {
pc.log.Errorf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
}
return pc.ackID(msgID, false)
}

Expand Down Expand Up @@ -2436,27 +2463,6 @@ func (u *unAckChunksTracker) remove(cmid *chunkMessageID) {
delete(u.chunkIDs, *cmid)
}

func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
return u.ackWithTxn(cmid, nil)
}

func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) error {
ids := u.get(cmid)
for _, id := range ids {
var err error
if txn == nil {
err = u.pc.AckID(id)
} else {
err = u.pc.AckIDWithTxn(id, txn)
}
if err != nil {
return err
}
}
u.remove(cmid)
return nil
}

func (u *unAckChunksTracker) nack(cmid *chunkMessageID) {
ids := u.get(cmid)
for _, id := range ids {
Expand Down

0 comments on commit 113c54d

Please sign in to comment.