From c50d1198959cae8c04522a558bbd9cba35ad856a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 13 Mar 2024 18:51:13 +0800 Subject: [PATCH 01/15] fix #1187 --- pulsar/{internal => backoff}/backoff.go | 16 ++++++--- pulsar/{internal => backoff}/backoff_test.go | 8 ++--- pulsar/consumer.go | 4 +-- pulsar/consumer_partition.go | 36 +++++++++++--------- pulsar/dlq_router.go | 7 ++-- pulsar/internal/http_client.go | 6 ++-- pulsar/internal/rpc_client.go | 6 ++-- pulsar/producer.go | 4 +-- pulsar/producer_partition.go | 27 +++++++++------ pulsar/reader.go | 4 +-- pulsar/reader_test.go | 3 ++ pulsar/retry_router.go | 7 ++-- 12 files changed, 77 insertions(+), 51 deletions(-) rename pulsar/{internal => backoff}/backoff.go (77%) rename pulsar/{internal => backoff}/backoff_test.go (91%) diff --git a/pulsar/internal/backoff.go b/pulsar/backoff/backoff.go similarity index 77% rename from pulsar/internal/backoff.go rename to pulsar/backoff/backoff.go index 3284fb7e33..6c1ff87d0c 100644 --- a/pulsar/internal/backoff.go +++ b/pulsar/backoff/backoff.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "math/rand" @@ -26,10 +26,14 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// BackoffPolicy parameterize the following options in the reconnection logic to +// Policy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) -type BackoffPolicy interface { +type Policy interface { + // Next returns the delay to wait before next retry Next() time.Duration + + // IsMaxBackoffReached evaluates if the max number of retries is reached + IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool } // DefaultBackoff computes the delay before retrying an action. @@ -38,6 +42,10 @@ type DefaultBackoff struct { backoff time.Duration } +func NewDefaultBackoff(backoff time.Duration) Policy { + return &DefaultBackoff{backoff: backoff} +} + const maxBackoff = 60 * time.Second // Next returns the delay to wait before next retry @@ -58,6 +66,6 @@ func (b *DefaultBackoff) Next() time.Duration { } // IsMaxBackoffReached evaluates if the max number of retries is reached -func (b *DefaultBackoff) IsMaxBackoffReached() bool { +func (b *DefaultBackoff) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool { return b.backoff >= maxBackoff } diff --git a/pulsar/internal/backoff_test.go b/pulsar/backoff/backoff_test.go similarity index 91% rename from pulsar/internal/backoff_test.go rename to pulsar/backoff/backoff_test.go index e05ea29276..e9ca9c5b2d 100644 --- a/pulsar/internal/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "testing" @@ -35,14 +35,14 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) { backoff := &DefaultBackoff{} previousDelay := backoff.Next() // the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2) - for previousDelay < 51*time.Second { + for previousDelay < 40*time.Second { delay := backoff.Next() // the jitter introduces at most 20% difference so at least delay is 1.6=(1-0.2)*2 bigger assert.GreaterOrEqual(t, int64(delay), int64(1.6*float64(previousDelay))) // the jitter introduces at most 20% difference so delay is less than twice the previous value assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay))) previousDelay = delay - assert.Equal(t, false, backoff.IsMaxBackoffReached()) + assert.Equal(t, false, backoff.IsMaxBackoffReached(delay, time.Second)) } } @@ -55,7 +55,7 @@ func TestBackoff_NextMaxValue(t *testing.T) { cappedDelay := backoff.Next() assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff)) - assert.Equal(t, true, backoff.IsMaxBackoffReached()) + assert.Equal(t, true, backoff.IsMaxBackoffReached(cappedDelay, time.Second)) // max value is 60 seconds + 20% jitter = 72 seconds assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second)) } diff --git a/pulsar/consumer.go b/pulsar/consumer.go index fea94cf6a3..88bd473f42 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) // ConsumerMessage represents a pair of a Consumer and Message. @@ -198,7 +198,7 @@ type ConsumerOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackoffPolicy backoff.Policy // Decryption represents the encryption related fields required by the consumer to decrypt a message. Decryption *MessageDecryptionInfo diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 162565b2a9..707de2c14e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "google.golang.org/protobuf/proto" "github.com/apache/pulsar-client-go/pulsar/crypto" @@ -106,7 +108,7 @@ type partitionConsumerOpts struct { disableForceTopicCreation bool interceptors ConsumerInterceptors maxReconnectToBroker *uint - backoffPolicy internal.BackoffPolicy + backoffPolicy backoff.Policy keySharedPolicy *KeySharedPolicy schema Schema decryption *MessageDecryptionInfo @@ -574,11 +576,11 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } remainTime := pc.client.operationTimeout - var backoff internal.BackoffPolicy + var bo backoff.Policy if pc.options.backoffPolicy != nil { - backoff = pc.options.backoffPolicy + bo = pc.options.backoffPolicy } else { - backoff = &internal.DefaultBackoff{} + bo = &backoff.DefaultBackoff{} } request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} @@ -597,7 +599,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { pc.log.WithError(err).Error("Failed to getLastMessageID") return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) } - nextDelay := backoff.Next() + nextDelay := bo.Next() if nextDelay > remainTime { nextDelay = remainTime } @@ -1653,7 +1655,10 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { } func (pc *partitionConsumer) reconnectToBroker() { - var maxRetry int + var ( + maxRetry int + delayReconnectTime, totalDelayReconnectTime time.Duration + ) if pc.options.maxReconnectToBroker == nil { maxRetry = -1 @@ -1661,10 +1666,12 @@ func (pc *partitionConsumer) reconnectToBroker() { maxRetry = int(*pc.options.maxReconnectToBroker) } - var ( - delayReconnectTime time.Duration - defaultBackoff = internal.DefaultBackoff{} - ) + var bo backoff.Policy + if pc.options.backoffPolicy != nil { + bo = pc.options.backoffPolicy + } else { + bo = &backoff.DefaultBackoff{} + } for maxRetry != 0 { if pc.getConsumerState() != consumerReady { @@ -1673,11 +1680,8 @@ func (pc *partitionConsumer) reconnectToBroker() { return } - if pc.options.backoffPolicy == nil { - delayReconnectTime = defaultBackoff.Next() - } else { - delayReconnectTime = pc.options.backoffPolicy.Next() - } + delayReconnectTime = bo.Next() + totalDelayReconnectTime += delayReconnectTime pc.log.Info("Reconnecting to broker in ", delayReconnectTime) time.Sleep(delayReconnectTime) @@ -1707,7 +1711,7 @@ func (pc *partitionConsumer) reconnectToBroker() { maxRetry-- } pc.metrics.ConsumersReconnectFailure.Inc() - if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() { + if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) { pc.metrics.ConsumersReconnectMaxRetry.Inc() } } diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 6be35d7485..8a740bf511 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -22,7 +22,8 @@ import ( "fmt" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -155,7 +156,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { } // Retry to create producer indefinitely - backoff := &internal.DefaultBackoff{} + bo := &backoff.DefaultBackoff{} for { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic @@ -173,7 +174,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { if err != nil { r.log.WithError(err).Error("Failed to create DLQ producer") - time.Sleep(backoff.Next()) + time.Sleep(bo.Next()) continue } else { r.producer = producer diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index e68bd17c39..10e46a9761 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -29,6 +29,8 @@ import ( "path" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/log" @@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str if _, ok := err.(*url.Error); ok { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. - backoff := DefaultBackoff{100 * time.Millisecond} + bo := backoff.NewDefaultBackoff(100 * time.Millisecond) startTime := time.Now() var retryTime time.Duration for time.Since(startTime) < c.requestTimeout { - retryTime = backoff.Next() + retryTime = bo.Next() c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) _, err = c.GetWithQueryParams(endpoint, obj, params, true) diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 2213083d85..02f211c732 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/log" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" @@ -91,7 +93,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ var host *url.URL var rpcResult *RPCResult startTime := time.Now() - backoff := DefaultBackoff{100 * time.Millisecond} + bo := backoff.NewDefaultBackoff(100 * time.Millisecond) // we can retry these requests because this kind of request is // not specific to any particular broker for time.Since(startTime) < c.requestTimeout { @@ -106,7 +108,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ break } - retryTime := backoff.Next() + retryTime := bo.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) } diff --git a/pulsar/producer.go b/pulsar/producer.go index f8013a16ff..79b5159106 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) type HashingScheme int @@ -173,7 +173,7 @@ type ProducerOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackoffPolicy backoff.Policy // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) // This will be used to create batch container when batching is enabled. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index fbcc5b9776..1439a77c0b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" @@ -410,17 +412,22 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer } func (p *partitionProducer) reconnectToBroker() { - var maxRetry int + var ( + maxRetry int + delayReconnectTime, totalDelayReconnectTime time.Duration + ) if p.options.MaxReconnectToBroker == nil { maxRetry = -1 } else { maxRetry = int(*p.options.MaxReconnectToBroker) } - var ( - delayReconnectTime time.Duration - defaultBackoff = internal.DefaultBackoff{} - ) + var bo backoff.Policy + if p.options.BackoffPolicy == nil { + bo = p.options.BackoffPolicy + } else { + bo = &backoff.DefaultBackoff{} + } for maxRetry != 0 { if p.getProducerState() != producerReady { @@ -429,11 +436,9 @@ func (p *partitionProducer) reconnectToBroker() { return } - if p.options.BackoffPolicy == nil { - delayReconnectTime = defaultBackoff.Next() - } else { - delayReconnectTime = p.options.BackoffPolicy.Next() - } + delayReconnectTime = bo.Next() + totalDelayReconnectTime += delayReconnectTime + p.log.Info("Reconnecting to broker in ", delayReconnectTime) time.Sleep(delayReconnectTime) @@ -482,7 +487,7 @@ func (p *partitionProducer) reconnectToBroker() { maxRetry-- } p.metrics.ProducersReconnectFailure.Inc() - if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() { + if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) { p.metrics.ProducersReconnectMaxRetry.Inc() } } diff --git a/pulsar/reader.go b/pulsar/reader.go index 4daa889062..7c0749b2be 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) // ReaderMessage packages Reader and Message as a struct to use. @@ -91,7 +91,7 @@ type ReaderOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackoffPolicy backoff.Policy // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) MaxPendingChunkedMessage int diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 78c222dac7..65097689d8 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -899,6 +899,9 @@ func (b *testBackoffPolicy) Next() time.Duration { return b.curBackoff } +func (b *testBackoffPolicy) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool { + return delayReconnectTime >= b.maxBackoff +} func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool { // Approximately equal to expected interval diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 75792adc14..74c4255f6b 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -21,7 +21,8 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -124,7 +125,7 @@ func (r *retryRouter) getProducer() Producer { } // Retry to create producer indefinitely - backoff := &internal.DefaultBackoff{} + bo := &backoff.DefaultBackoff{} for { opt := r.policy.ProducerOptions opt.Topic = r.policy.RetryLetterTopic @@ -138,7 +139,7 @@ func (r *retryRouter) getProducer() Producer { if err != nil { r.log.WithError(err).Error("Failed to create RLQ producer") - time.Sleep(backoff.Next()) + time.Sleep(bo.Next()) continue } else { r.producer = producer From f659d3fa319db43791ae1d14b62d468d977416bf Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 13 Mar 2024 19:54:58 +0800 Subject: [PATCH 02/15] fix #1187 --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1439a77c0b..252d4417c2 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -423,7 +423,7 @@ func (p *partitionProducer) reconnectToBroker() { } var bo backoff.Policy - if p.options.BackoffPolicy == nil { + if p.options.BackoffPolicy != nil { bo = p.options.BackoffPolicy } else { bo = &backoff.DefaultBackoff{} From 54a69e2c6d59db50a15298c00ae2494c4b3f1302 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 13 Mar 2024 20:08:32 +0800 Subject: [PATCH 03/15] :rewind: Reverting seconds --- pulsar/backoff/backoff_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/backoff/backoff_test.go b/pulsar/backoff/backoff_test.go index e9ca9c5b2d..050cf1c876 100644 --- a/pulsar/backoff/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -35,7 +35,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) { backoff := &DefaultBackoff{} previousDelay := backoff.Next() // the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2) - for previousDelay < 40*time.Second { + for previousDelay < 51*time.Second { delay := backoff.Next() // the jitter introduces at most 20% difference so at least delay is 1.6=(1-0.2)*2 bigger assert.GreaterOrEqual(t, int64(delay), int64(1.6*float64(previousDelay))) From ebbc5d06c524d192c6408a36b084d140322ab4ac Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 9 Jul 2024 21:18:17 +0800 Subject: [PATCH 04/15] Update pulsar/consumer_partition.go Co-authored-by: Zixuan Liu --- pulsar/consumer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 4069cfd6fd..1cfdbc9e76 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -587,7 +587,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if pc.options.backoffPolicy != nil { bo = pc.options.backoffPolicy } else { - bo = &backoff.DefaultBackoff{} + bo = NewDefaultBackoff() } request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} From f05cd9508701ca7deb8d17b0fa982e368eb65d6b Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 9 Jul 2024 21:18:57 +0800 Subject: [PATCH 05/15] Update pulsar/consumer_partition.go Co-authored-by: Zixuan Liu --- pulsar/consumer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1cfdbc9e76..fca8a959d5 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1697,7 +1697,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if pc.options.backoffPolicy != nil { bo = pc.options.backoffPolicy } else { - bo = &backoff.DefaultBackoff{} + bo = NewDefaultBackoff() } for maxRetry != 0 { From 3cb3b5a82316e69739d0f155cb69e015d899d8dd Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 9 Jul 2024 21:19:33 +0800 Subject: [PATCH 06/15] Update pulsar/producer_partition.go Co-authored-by: Zixuan Liu --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 067d801842..a962d57bd1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -464,7 +464,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed if p.options.BackoffPolicy != nil { bo = p.options.BackoffPolicy } else { - bo = &backoff.DefaultBackoff{} + bo = NewDefaultBackoff() } for maxRetry != 0 { From d8b6980cd79eed49a2cfea7470354987cc61a6ae Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 10 Jul 2024 16:00:27 +0800 Subject: [PATCH 07/15] fix with cr --- pulsar/backoff/backoff.go | 16 +++++++++++--- pulsar/backoff/backoff_test.go | 6 +++-- pulsar/blue_green_migration_test.go | 15 ++++++------- pulsar/consumer.go | 4 ++-- pulsar/consumer_impl.go | 4 ++-- pulsar/consumer_partition.go | 29 ++++++++++++------------ pulsar/consumer_regex_test.go | 4 ++-- pulsar/consumer_test.go | 16 +++++++++----- pulsar/internal/http_client.go | 2 +- pulsar/internal/rpc_client.go | 5 +++-- pulsar/producer.go | 4 ++-- pulsar/producer_partition.go | 34 +++++++++++++++++------------ pulsar/producer_test.go | 20 ++++++++++------- pulsar/reader.go | 4 ++-- pulsar/reader_impl.go | 4 ++-- pulsar/reader_test.go | 25 ++++++++++++++------- pulsar/retry_router.go | 31 ++++++++++++++++---------- 17 files changed, 133 insertions(+), 90 deletions(-) diff --git a/pulsar/backoff/backoff.go b/pulsar/backoff/backoff.go index 6c1ff87d0c..b2cb2b10eb 100644 --- a/pulsar/backoff/backoff.go +++ b/pulsar/backoff/backoff.go @@ -33,7 +33,10 @@ type Policy interface { Next() time.Duration // IsMaxBackoffReached evaluates if the max number of retries is reached - IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool + IsMaxBackoffReached() bool + + // Reset the backoff to the initial state + Reset() } // DefaultBackoff computes the delay before retrying an action. @@ -42,7 +45,10 @@ type DefaultBackoff struct { backoff time.Duration } -func NewDefaultBackoff(backoff time.Duration) Policy { +func NewDefaultBackoff() Policy { + return &DefaultBackoff{} +} +func NewDefaultBackoffWithBackOff(backoff time.Duration) Policy { return &DefaultBackoff{backoff: backoff} } @@ -66,6 +72,10 @@ func (b *DefaultBackoff) Next() time.Duration { } // IsMaxBackoffReached evaluates if the max number of retries is reached -func (b *DefaultBackoff) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool { +func (b *DefaultBackoff) IsMaxBackoffReached() bool { return b.backoff >= maxBackoff } + +func (b *DefaultBackoff) Reset() { + b.backoff = 0 +} diff --git a/pulsar/backoff/backoff_test.go b/pulsar/backoff/backoff_test.go index 050cf1c876..fc0a49232b 100644 --- a/pulsar/backoff/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -42,7 +42,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) { // the jitter introduces at most 20% difference so delay is less than twice the previous value assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay))) previousDelay = delay - assert.Equal(t, false, backoff.IsMaxBackoffReached(delay, time.Second)) + assert.Equal(t, false, backoff.IsMaxBackoffReached()) } } @@ -55,7 +55,9 @@ func TestBackoff_NextMaxValue(t *testing.T) { cappedDelay := backoff.Next() assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff)) - assert.Equal(t, true, backoff.IsMaxBackoffReached(cappedDelay, time.Second)) + assert.Equal(t, true, backoff.IsMaxBackoffReached()) // max value is 60 seconds + 20% jitter = 72 seconds assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second)) + backoff.Reset() + assert.Equal(t, false, backoff.IsMaxBackoffReached()) } diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 91667e8d80..672ef343f8 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite) TestTopicMigration() { for _, scenario := range []topicUnloadTestCase{ { - testCaseName: "proxyConnection", - blueAdminURL: "http://localhost:8080", + testCaseName: "proxyConnection", + blueAdminURL: "http://localhost:8080", blueClientUrl: "pulsar://localhost:6650", greenAdminURL: "http://localhost:8081", migrationBody: ` @@ -83,17 +83,17 @@ func testTopicMigrate( migrationBody string) { runtime.GOMAXPROCS(1) const ( - cluster = "cluster-a" + cluster = "cluster-a" tenant = utils.PUBLICTENANT namespace = utils.DEFAULTNAMESPACE - blueBroker1URL = "pulsar://broker-1:6650" - blueBroker2URL = "pulsar://broker-2:6650" + blueBroker1URL = "pulsar://broker-1:6650" + blueBroker2URL = "pulsar://broker-2:6650" greenBroker1URL = "pulsar://green-broker-1:6650" greenBroker2URL = "pulsar://green-broker-2:6650" - blueBroker1LookupURL = "broker-1:8080" - blueBroker2LookupURL = "broker-2:8080" + blueBroker1LookupURL = "broker-1:8080" + blueBroker2LookupURL = "broker-2:8080" greenBroker1LookupURL = "green-broker-1:8080" greenBroker2LookupURL = "green-broker-2:8080" ) @@ -234,7 +234,6 @@ func testTopicMigrate( req.NoError(err) req.NotEmpty(bundleRange) - unloadURL := fmt.Sprintf( "/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s", tenant, namespace, bundleRange, dstTopicBrokerLookupURL) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 3719ed5b8c..e792978fe8 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -201,9 +201,9 @@ type ConsumerOptions struct { // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy backoff.Policy + BackOffPolicyFunc func() backoff.Policy // Decryption represents the encryption related fields required by the consumer to decrypt a message. Decryption *MessageDecryptionInfo diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 679054a044..3c0480e99f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -178,7 +178,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log) + rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log) if err != nil { return nil, err } @@ -453,7 +453,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu readCompacted: options.ReadCompacted, interceptors: options.Interceptors, maxReconnectToBroker: options.MaxReconnectToBroker, - backoffPolicy: options.BackoffPolicy, + backOffPolicyFunc: options.BackOffPolicyFunc, keySharedPolicy: options.KeySharedPolicy, schema: options.Schema, decryption: options.Decryption, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fca8a959d5..b88d6b7811 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -112,7 +112,7 @@ type partitionConsumerOpts struct { disableForceTopicCreation bool interceptors ConsumerInterceptors maxReconnectToBroker *uint - backoffPolicy backoff.Policy + backOffPolicyFunc func() backoff.Policy keySharedPolicy *KeySharedPolicy schema Schema decryption *MessageDecryptionInfo @@ -184,6 +184,7 @@ type partitionConsumer struct { lastMessageInBroker *trackingMessageID redirectedClusterURI string + backoffPolicyFunc func() backoff.Policy } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -320,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) { func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts, messageCh chan ConsumerMessage, dlq *dlqRouter, metrics *internal.LeveledMetrics) (*partitionConsumer, error) { + var boFunc func() backoff.Policy + if options.backOffPolicyFunc != nil { + boFunc = options.backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } + pc := &partitionConsumer{ parentConsumer: parent, client: client, @@ -341,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon dlq: dlq, metrics: metrics, schemaInfoCache: newSchemaInfoCache(client, options.topic), + backoffPolicyFunc: boFunc, } if pc.options.autoReceiverQueueSize { pc.currentQueueSize.Store(initialReceiverQueueSize) @@ -583,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } remainTime := pc.client.operationTimeout - var bo backoff.Policy - if pc.options.backoffPolicy != nil { - bo = pc.options.backoffPolicy - } else { - bo = NewDefaultBackoff() - } + bo := pc.backoffPolicyFunc() request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -1692,13 +1696,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose } else { maxRetry = int(*pc.options.maxReconnectToBroker) } - - var bo backoff.Policy - if pc.options.backoffPolicy != nil { - bo = pc.options.backoffPolicy - } else { - bo = NewDefaultBackoff() - } + bo := pc.backoffPolicyFunc() for maxRetry != 0 { if pc.getConsumerState() != consumerReady { @@ -1735,6 +1733,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") + bo.Reset() return } pc.log.WithError(err).Error("Failed to create consumer at reconnect") @@ -1749,7 +1748,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose maxRetry-- } pc.metrics.ConsumersReconnectFailure.Inc() - if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) { + if maxRetry == 0 || bo.IsMaxBackoffReached() { pc.metrics.ConsumersReconnectMaxRetry.Inc() } } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 28ba4f72b0..41aa0f8845 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -160,7 +160,7 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string } dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) - rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) + rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) @@ -199,7 +199,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string } dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) - rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) + rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 2ecdf8eb31..3cbbf7ab25 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" @@ -3789,12 +3791,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) { topicName := newTopicName() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", Type: Shared, - BackoffPolicy: backoff, + BackOffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.Nil(t, err) defer _consumer.Close() @@ -3803,22 +3807,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestAckWithMessageID(t *testing.T) { diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index 10e46a9761..cdc383a006 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -150,7 +150,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str if _, ok := err.(*url.Error); ok { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. - bo := backoff.NewDefaultBackoff(100 * time.Millisecond) + bo := backoff.NewDefaultBackoffWithBackOff(100 * time.Millisecond) startTime := time.Now() var retryTime time.Duration diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 28e795181b..5ff36674a4 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -25,8 +25,9 @@ import ( "sync/atomic" "time" - "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/backoff" + + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/log" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" @@ -116,7 +117,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver, var host *url.URL var rpcResult *RPCResult startTime := time.Now() - bo := backoff.NewDefaultBackoff(100 * time.Millisecond) + bo := backoff.NewDefaultBackoffWithBackOff(100 * time.Millisecond) // we can retry these requests because this kind of request is // not specific to any particular broker for time.Since(startTime) < c.requestTimeout { diff --git a/pulsar/producer.go b/pulsar/producer.go index 79b5159106..2ca7254426 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -171,9 +171,9 @@ type ProducerOptions struct { // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy backoff.Policy + BackOffPolicyFunc func() backoff.Policy // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) // This will be used to create batch container when batching is enabled. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a962d57bd1..2537121dd7 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -120,6 +120,7 @@ type partitionProducer struct { schemaCache *schemaCache topicEpoch *uint64 redirectedClusterURI string + backOffPolicyFunc func() backoff.Policy } type schemaCache struct { @@ -146,6 +147,14 @@ func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, metrics *internal.LeveledMetrics) ( *partitionProducer, error) { + + var boFunc func() backoff.Policy + if options.BackOffPolicyFunc != nil { + boFunc = options.BackOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } + var batchingMaxPublishDelay time.Duration if options.BatchingMaxPublishDelay != 0 { batchingMaxPublishDelay = options.BatchingMaxPublishDelay @@ -174,13 +183,14 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), - publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), - pendingQueue: internal.NewBlockingQueue(maxPendingMessages), - lastSequenceID: -1, - partitionIdx: int32(partitionIdx), - metrics: metrics, - epoch: 0, - schemaCache: newSchemaCache(), + publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), + pendingQueue: internal.NewBlockingQueue(maxPendingMessages), + lastSequenceID: -1, + partitionIdx: int32(partitionIdx), + metrics: metrics, + epoch: 0, + schemaCache: newSchemaCache(), + backOffPolicyFunc: boFunc, } if p.options.DisableBatching { p.batchFlushTicker.Stop() @@ -460,12 +470,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed maxRetry = int(*p.options.MaxReconnectToBroker) } - var bo backoff.Policy - if p.options.BackoffPolicy != nil { - bo = p.options.BackoffPolicy - } else { - bo = NewDefaultBackoff() - } + bo := p.backOffPolicyFunc() for maxRetry != 0 { if p.getProducerState() != producerReady { @@ -503,6 +508,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed if err == nil { // Successfully reconnected p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") + bo.Reset() return } p.log.WithError(err).Error("Failed to create producer at reconnect") @@ -536,7 +542,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed maxRetry-- } p.metrics.ProducersReconnectFailure.Inc() - if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) { + if maxRetry == 0 || bo.IsMaxBackoffReached() { p.metrics.ProducersReconnectMaxRetry.Inc() } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c4ff89752..fd650a8025 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" @@ -1273,11 +1275,13 @@ func TestProducerWithBackoffPolicy(t *testing.T) { topicName := newTopicName() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, - BackoffPolicy: backoff, + Topic: topicName, + SendTimeout: 2 * time.Second, + BackOffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.Nil(t, err) defer _producer.Close() @@ -1286,22 +1290,22 @@ func TestProducerWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestSendContextExpired(t *testing.T) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 7c0749b2be..98bde4e395 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -89,9 +89,9 @@ type ReaderOptions struct { // Schema represents the schema implementation. Schema Schema - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackoffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy backoff.Policy + BackoffPolicyFunc func() backoff.Policy // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) MaxPendingChunkedMessage int diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7cbae05c6d..cb677fc4f4 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -112,7 +112,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { ReplicateSubscriptionState: false, Decryption: options.Decryption, Schema: options.Schema, - BackoffPolicy: options.BackoffPolicy, + BackOffPolicyFunc: options.BackoffPolicyFunc, MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, @@ -133,7 +133,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } // Provide dummy rlq router with not dlq policy - rlq, err := newRetryRouter(client, nil, false, client.log) + rlq, err := newRetryRouter(client, nil, false, options.BackoffPolicyFunc, client.log) if err != nil { return nil, err } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index e4032d2961..266d354321 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" @@ -847,8 +849,13 @@ func (b *testBackoffPolicy) Next() time.Duration { return b.curBackoff } -func (b *testBackoffPolicy) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool { - return delayReconnectTime >= b.maxBackoff +func (b *testBackoffPolicy) IsMaxBackoffReached() bool { + return false +} + +func (b *testBackoffPolicy) Reset() { + b.curBackoff, b.minBackoff, b.minBackoff = 0, 0, 0 + b.retryTime = 0 } func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool { @@ -869,11 +876,13 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.Nil(t, err) defer client.Close() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _reader, err := client.CreateReader(ReaderOptions{ Topic: "my-topic", StartMessageID: LatestMessageID(), - BackoffPolicy: backoff, + BackoffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.NotNil(t, _reader) assert.Nil(t, err) @@ -882,22 +891,22 @@ func TestReaderWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestReaderGetLastMessageID(t *testing.T) { diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 74c4255f6b..c8aa0b9464 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -45,19 +45,28 @@ type RetryMessage struct { } type retryRouter struct { - client Client - producer Producer - policy *DLQPolicy - messageCh chan RetryMessage - closeCh chan interface{} - log log.Logger + client Client + producer Producer + policy *DLQPolicy + messageCh chan RetryMessage + closeCh chan interface{} + backOffPolicyFunc func() backoff.Policy + log log.Logger } -func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, logger log.Logger) (*retryRouter, error) { +func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, backOffPolicyFunc func() backoff.Policy, + logger log.Logger) (*retryRouter, error) { + var boFunc func() backoff.Policy + if backOffPolicyFunc != nil { + boFunc = backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } r := &retryRouter{ - client: client, - policy: policy, - log: logger, + client: client, + policy: policy, + backOffPolicyFunc: boFunc, + log: logger, } if policy != nil && retryEnabled { @@ -125,7 +134,7 @@ func (r *retryRouter) getProducer() Producer { } // Retry to create producer indefinitely - bo := &backoff.DefaultBackoff{} + bo := r.backOffPolicyFunc() for { opt := r.policy.ProducerOptions opt.Topic = r.policy.RetryLetterTopic From 1165b2b5e6cfc121474d48b653f6618e0fe0f2fb Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 10 Jul 2024 16:16:49 +0800 Subject: [PATCH 08/15] fix ci --- pulsar/reader_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 266d354321..3c928c1db7 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -854,8 +854,7 @@ func (b *testBackoffPolicy) IsMaxBackoffReached() bool { } func (b *testBackoffPolicy) Reset() { - b.curBackoff, b.minBackoff, b.minBackoff = 0, 0, 0 - b.retryTime = 0 + } func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool { From bc1fa9cb18cb2d11bbe707a9da637e3dbf49e2ab Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 11 Jul 2024 10:58:04 +0800 Subject: [PATCH 09/15] Update pulsar/producer_partition.go Co-authored-by: Zixuan Liu --- pulsar/producer_partition.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 2537121dd7..34205c64d3 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -488,7 +488,6 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed } else { delayReconnectTime = bo.Next() } - totalDelayReconnectTime += delayReconnectTime p.log.WithFields(log.Fields{ "assignedBrokerURL": assignedBrokerURL, From beda15b41080c5d7a1205cd17fb9f14fb51216cf Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 11 Jul 2024 10:59:14 +0800 Subject: [PATCH 10/15] Update pulsar/producer_partition.go Co-authored-by: Zixuan Liu --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 34205c64d3..10d1121bf6 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -462,7 +462,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) { var ( maxRetry int - delayReconnectTime, totalDelayReconnectTime time.Duration + delayReconnectTime time.Duration ) if p.options.MaxReconnectToBroker == nil { maxRetry = -1 From 61c25fede2d144d5c4e80f9547b06c28f6f9b778 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 11 Jul 2024 11:10:14 +0800 Subject: [PATCH 11/15] fix with cr --- pulsar/consumer_impl.go | 3 ++- pulsar/consumer_regex_test.go | 6 +++-- pulsar/dlq_router.go | 42 +++++++++++++++++++++-------------- pulsar/producer_partition.go | 2 +- pulsar/reader_impl.go | 3 ++- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 3c0480e99f..a3d3e3ff80 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -174,7 +174,8 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, + options.BackOffPolicyFunc, client.log) if err != nil { return nil, err } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 41aa0f8845..e1e2ca29e0 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -159,7 +159,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", + nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { @@ -198,7 +199,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", + nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 8a740bf511..9ff09eca36 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -28,26 +28,34 @@ import ( ) type dlqRouter struct { - client Client - producer Producer - policy *DLQPolicy - messageCh chan ConsumerMessage - closeCh chan interface{} - topicName string - subscriptionName string - consumerName string - log log.Logger + client Client + producer Producer + policy *DLQPolicy + messageCh chan ConsumerMessage + closeCh chan interface{} + topicName string + subscriptionName string + consumerName string + backOffPolicyFunc func() backoff.Policy + log log.Logger } func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string, - logger log.Logger) (*dlqRouter, error) { + backOffPolicyFunc func() backoff.Policy, logger log.Logger) (*dlqRouter, error) { + var boFunc func() backoff.Policy + if backOffPolicyFunc != nil { + boFunc = backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } r := &dlqRouter{ - client: client, - policy: policy, - topicName: topicName, - subscriptionName: subscriptionName, - consumerName: consumerName, - log: logger, + client: client, + policy: policy, + topicName: topicName, + subscriptionName: subscriptionName, + consumerName: consumerName, + backOffPolicyFunc: boFunc, + log: logger, } if policy != nil { @@ -156,7 +164,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { } // Retry to create producer indefinitely - bo := &backoff.DefaultBackoff{} + bo := r.backOffPolicyFunc() for { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 10d1121bf6..8ef494cb22 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -461,7 +461,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) { var ( - maxRetry int + maxRetry int delayReconnectTime time.Duration ) if p.options.MaxReconnectToBroker == nil { diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index cb677fc4f4..f76255e2e8 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -128,7 +128,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, + options.BackoffPolicyFunc, client.log) if err != nil { return nil, err } From f5c5cc0a7ae44397ff61102b2cb00432c7bfa535 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 18 Sep 2024 20:54:11 +0800 Subject: [PATCH 12/15] merge master --- pulsar/producer_partition.go | 18 +++++++++--------- pulsar/transaction_coordinator_client.go | 4 +++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 4136bcc98e..f578ee4b96 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -187,15 +187,15 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), - publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), - pendingQueue: internal.NewBlockingQueue(maxPendingMessages), - lastSequenceID: -1, - partitionIdx: int32(partitionIdx), - metrics: metrics, - epoch: 0, - schemaCache: newSchemaCache(), - ctx: ctx, - cancelFunc: cancelFunc, + publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), + pendingQueue: internal.NewBlockingQueue(maxPendingMessages), + lastSequenceID: -1, + partitionIdx: int32(partitionIdx), + metrics: metrics, + epoch: 0, + schemaCache: newSchemaCache(), + ctx: ctx, + cancelFunc: cancelFunc, backOffPolicyFunc: boFunc, } if p.options.DisableBatching { diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index 1449d698e5..afde54278b 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -143,7 +145,7 @@ func (t *transactionHandler) runEventsLoop() { func (t *transactionHandler) reconnectToBroker() { var delayReconnectTime time.Duration - var defaultBackoff = internal.DefaultBackoff{} + var defaultBackoff = backoff.DefaultBackoff{} for { if t.getState() == txnHandlerClosed { From 8d4e29eec1edd3542152a240b7cacac4c39859b3 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 18 Sep 2024 21:17:35 +0800 Subject: [PATCH 13/15] Update pulsar/backoff/backoff.go Co-authored-by: Zike Yang --- pulsar/backoff/backoff.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/backoff/backoff.go b/pulsar/backoff/backoff.go index b2cb2b10eb..29ad5b7bc7 100644 --- a/pulsar/backoff/backoff.go +++ b/pulsar/backoff/backoff.go @@ -48,7 +48,7 @@ type DefaultBackoff struct { func NewDefaultBackoff() Policy { return &DefaultBackoff{} } -func NewDefaultBackoffWithBackOff(backoff time.Duration) Policy { +func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy { return &DefaultBackoff{backoff: backoff} } From e8bf4b207e362b4da7abeb780d80cf0864ef0897 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 18 Sep 2024 21:20:13 +0800 Subject: [PATCH 14/15] Update pulsar/backoff/backoff.go Co-authored-by: Zike Yang --- pulsar/backoff/backoff.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/backoff/backoff.go b/pulsar/backoff/backoff.go index 29ad5b7bc7..453da57865 100644 --- a/pulsar/backoff/backoff.go +++ b/pulsar/backoff/backoff.go @@ -49,7 +49,7 @@ func NewDefaultBackoff() Policy { return &DefaultBackoff{} } func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy { - return &DefaultBackoff{backoff: backoff} + return &DefaultBackoff{backoff: backoff / 2} } const maxBackoff = 60 * time.Second From 34faba741a9c213aa9a8da80ef4963785f4dbe9d Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 18 Sep 2024 21:21:42 +0800 Subject: [PATCH 15/15] merge master --- pulsar/internal/http_client.go | 2 +- pulsar/internal/rpc_client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index cdc383a006..eea0101a1a 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -150,7 +150,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str if _, ok := err.(*url.Error); ok { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. - bo := backoff.NewDefaultBackoffWithBackOff(100 * time.Millisecond) + bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) startTime := time.Now() var retryTime time.Duration diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 5c54a13244..0f99311655 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -117,7 +117,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver, var host *url.URL var rpcResult *RPCResult startTime := time.Now() - bo := backoff.NewDefaultBackoffWithBackOff(100 * time.Millisecond) + bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) // we can retry these requests because this kind of request is // not specific to any particular broker for time.Since(startTime) < c.requestTimeout {