Skip to content

Commit

Permalink
fix: pause dispatch message before performing seek
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Oct 12, 2024
1 parent 5f4ac52 commit 6920617
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 41 deletions.
22 changes: 12 additions & 10 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,22 +711,29 @@ func (c *consumer) Seek(msgID MessageID) error {
return err
}

if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
return err
}

consumer := c.consumers[msgID.PartitionIdx()]
consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

return nil
return consumer.Seek(msgID)
}

func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error

for _, cons := range c.consumers {
cons.pauseDispatchMessage()
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
Expand All @@ -735,11 +742,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
}
}

// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

return errs
}

Expand Down
83 changes: 52 additions & 31 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"
Expand Down Expand Up @@ -185,6 +186,16 @@ type partitionConsumer struct {

redirectedClusterURI string
backoffPolicyFunc func() backoff.Policy

dispatcherSeekingControlCh chan bool
isSeeking atomic.Bool
}

// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
// This method will be called When the parent consumer performs the seek operation.
// After the seek operation, the dispatcher will continue dispatching messages automatically.
func (pc *partitionConsumer) pauseDispatchMessage() {
pc.dispatcherSeekingControlCh <- true
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -329,27 +340,28 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}

pc := &partitionConsumer{
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan *connectionClosed, 1),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *trackingMessageID)),
compressionProviders: sync.Map{},
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
backoffPolicyFunc: boFunc,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan *connectionClosed, 1),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *trackingMessageID)),
compressionProviders: sync.Map{},
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
backoffPolicyFunc: boFunc,
dispatcherSeekingControlCh: make(chan bool),
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
Expand Down Expand Up @@ -1440,17 +1452,18 @@ func (pc *partitionConsumer) dispatcher() {
}
nextMessageSize = queueMsg.size()

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
if !pc.isSeeking.Load() {
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}
pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
} else {
queueCh = pc.queueCh
}
Expand Down Expand Up @@ -1483,6 +1496,12 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}

case val, ok := <-pc.dispatcherSeekingControlCh:
if !ok {
return
}
pc.isSeeking.Store(val)

case msg, ok := <-queueCh:
if !ok {
return
Expand Down Expand Up @@ -1674,6 +1693,8 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
pc.isSeeking.Store(false)

var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
Expand Down

0 comments on commit 6920617

Please sign in to comment.