Skip to content

Commit

Permalink
fix: pause dispatch message before performing seek
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Oct 8, 2024
1 parent 5f4ac52 commit e63efdf
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,22 +711,29 @@ func (c *consumer) Seek(msgID MessageID) error {
return err
}

if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
return err
}

consumer := c.consumers[msgID.PartitionIdx()]
consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

return nil
return consumer.Seek(msgID)
}

func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error

for _, cons := range c.consumers {
cons.pauseDispatchMessage()
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
Expand All @@ -735,11 +742,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
}
}

// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

return errs
}

Expand Down
12 changes: 12 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"
Expand Down Expand Up @@ -185,6 +186,15 @@ type partitionConsumer struct {

redirectedClusterURI string
backoffPolicyFunc func() backoff.Policy

discardMessage atomic.Bool
}

// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
// This method will be called When the parent consumer performs the seek operation.
// After the seek operation, the dispatcher will continue dispatching messages automatically.
func (pc *partitionConsumer) pauseDispatchMessage() {
pc.discardMessage.Store(true)
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -879,6 +889,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {

// wait for the request to complete
<-req.doneCh
pc.discardMessage.Store(false)
return req.err
}

Expand Down Expand Up @@ -937,6 +948,7 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {

// wait for the request to complete
<-req.doneCh
pc.discardMessage.Store(false)
return req.err
}

Expand Down

0 comments on commit e63efdf

Please sign in to comment.