diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index a3d3e3ff8..4b59684f4 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -711,22 +711,29 @@ func (c *consumer) Seek(msgID MessageID) error { return err } - if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil { - return err - } - + consumer := c.consumers[msgID.PartitionIdx()] + consumer.pauseDispatchMessage() // clear messageCh for len(c.messageCh) > 0 { <-c.messageCh } - return nil + return consumer.Seek(msgID) } func (c *consumer) SeekByTime(time time.Time) error { c.Lock() defer c.Unlock() var errs error + + for _, cons := range c.consumers { + cons.pauseDispatchMessage() + } + // clear messageCh + for len(c.messageCh) > 0 { + <-c.messageCh + } + // run SeekByTime on every partition of topic for _, cons := range c.consumers { if err := cons.SeekByTime(time); err != nil { @@ -735,11 +742,6 @@ func (c *consumer) SeekByTime(time time.Time) error { } } - // clear messageCh - for len(c.messageCh) > 0 { - <-c.messageCh - } - return errs } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 678b7547f..e0208159a 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -24,6 +24,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "time" "github.com/apache/pulsar-client-go/pulsar/backoff" @@ -185,6 +186,15 @@ type partitionConsumer struct { redirectedClusterURI string backoffPolicyFunc func() backoff.Policy + + discardMessage atomic.Bool +} + +// pauseDispatchMessage used to discard the message in the dispatcher goroutine. +// This method will be called When the parent consumer performs the seek operation. +// After the seek operation, the dispatcher will continue dispatching messages automatically. +func (pc *partitionConsumer) pauseDispatchMessage() { + pc.discardMessage.Store(true) } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -879,6 +889,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error { // wait for the request to complete <-req.doneCh + pc.discardMessage.Store(false) return req.err } @@ -937,6 +948,7 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error { // wait for the request to complete <-req.doneCh + pc.discardMessage.Store(false) return req.err }