Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Reconnection logic and Backoff policy doesn't work correctly #1197

Merged
merged 18 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions pulsar/internal/backoff.go → pulsar/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff
nodece marked this conversation as resolved.
Show resolved Hide resolved

import (
"math/rand"
Expand All @@ -26,10 +26,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// BackoffPolicy parameterize the following options in the reconnection logic to
// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
type BackoffPolicy interface {
type Policy interface {
// Next returns the delay to wait before next retry
Next() time.Duration

// IsMaxBackoffReached evaluates if the max number of retries is reached
IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool
nodece marked this conversation as resolved.
Show resolved Hide resolved
}

// DefaultBackoff computes the delay before retrying an action.
Expand All @@ -38,6 +42,10 @@ type DefaultBackoff struct {
backoff time.Duration
}

func NewDefaultBackoff(backoff time.Duration) Policy {
return &DefaultBackoff{backoff: backoff}
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
}

const maxBackoff = 60 * time.Second

// Next returns the delay to wait before next retry
Expand All @@ -58,6 +66,6 @@ func (b *DefaultBackoff) Next() time.Duration {
}

// IsMaxBackoffReached evaluates if the max number of retries is reached
func (b *DefaultBackoff) IsMaxBackoffReached() bool {
func (b *DefaultBackoff) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool {
return b.backoff >= maxBackoff
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"testing"
Expand All @@ -42,7 +42,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
// the jitter introduces at most 20% difference so delay is less than twice the previous value
assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay)))
previousDelay = delay
assert.Equal(t, false, backoff.IsMaxBackoffReached())
assert.Equal(t, false, backoff.IsMaxBackoffReached(delay, time.Second))
}
}

Expand All @@ -55,7 +55,7 @@ func TestBackoff_NextMaxValue(t *testing.T) {

cappedDelay := backoff.Next()
assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff))
assert.Equal(t, true, backoff.IsMaxBackoffReached())
assert.Equal(t, true, backoff.IsMaxBackoffReached(cappedDelay, time.Second))
// max value is 60 seconds + 20% jitter = 72 seconds
assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
}
4 changes: 2 additions & 2 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ConsumerMessage represents a pair of a Consumer and Message.
Expand Down Expand Up @@ -203,7 +203,7 @@ type ConsumerOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
Expand Down
34 changes: 20 additions & 14 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/crypto"
Expand Down Expand Up @@ -110,7 +112,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
backoffPolicy internal.BackoffPolicy
backoffPolicy backoff.Policy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
Expand Down Expand Up @@ -581,11 +583,11 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
var bo backoff.Policy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
bo = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
bo = &backoff.DefaultBackoff{}
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
}
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
Expand All @@ -604,7 +606,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
Expand Down Expand Up @@ -1680,18 +1682,23 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
)

if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
var bo backoff.Policy
if pc.options.backoffPolicy != nil {
bo = pc.options.backoffPolicy
} else {
bo = &backoff.DefaultBackoff{}
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
}

for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
Expand All @@ -1706,11 +1713,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
delayReconnectTime = bo.Next()
}
totalDelayReconnectTime += delayReconnectTime
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved

pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand Down Expand Up @@ -1743,7 +1749,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
Expand Down
7 changes: 4 additions & 3 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/log"
)

Expand Down Expand Up @@ -155,7 +156,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
}

// Retry to create producer indefinitely
backoff := &internal.DefaultBackoff{}
bo := &backoff.DefaultBackoff{}
nodece marked this conversation as resolved.
Show resolved Hide resolved
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
Expand All @@ -173,7 +174,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {

if err != nil {
r.log.WithError(err).Error("Failed to create DLQ producer")
time.Sleep(backoff.Next())
time.Sleep(bo.Next())
continue
} else {
r.producer = producer
Expand Down
6 changes: 4 additions & 2 deletions pulsar/internal/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"path"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/auth"

"github.com/apache/pulsar-client-go/pulsar/log"
Expand Down Expand Up @@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
backoff := DefaultBackoff{100 * time.Millisecond}
bo := backoff.NewDefaultBackoff(100 * time.Millisecond)
startTime := time.Now()
var retryTime time.Duration

for time.Since(startTime) < c.requestTimeout {
retryTime = backoff.Next()
retryTime = bo.Next()
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
Expand Down
5 changes: 3 additions & 2 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/backoff"
"github.com/apache/pulsar-client-go/pulsar/log"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver,
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
backoff := DefaultBackoff{100 * time.Millisecond}
bo := backoff.NewDefaultBackoff(100 * time.Millisecond)
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
Expand All @@ -130,7 +131,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver,
break
}

retryTime := backoff.Next()
retryTime := bo.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

type HashingScheme int
Expand Down Expand Up @@ -173,7 +173,7 @@ type ProducerOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
// This will be used to create batch container when batching is enabled.
Expand Down
24 changes: 15 additions & 9 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"

Expand Down Expand Up @@ -448,17 +450,22 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer
}

func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
)
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
var bo backoff.Policy
if p.options.BackoffPolicy != nil {
bo = p.options.BackoffPolicy
} else {
bo = &backoff.DefaultBackoff{}
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
}

for maxRetry != 0 {
if p.getProducerState() != producerReady {
Expand All @@ -473,11 +480,10 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Only attempt once
} else if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
delayReconnectTime = bo.Next()
}
totalDelayReconnectTime += delayReconnectTime
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved

p.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand Down Expand Up @@ -530,7 +536,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ReaderMessage packages Reader and Message as a struct to use.
Expand Down Expand Up @@ -91,7 +91,7 @@ type ReaderOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
MaxPendingChunkedMessage int
Expand Down
3 changes: 3 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,9 @@ func (b *testBackoffPolicy) Next() time.Duration {

return b.curBackoff
}
func (b *testBackoffPolicy) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool {
return delayReconnectTime >= b.maxBackoff
}

func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool {
// Approximately equal to expected interval
Expand Down
7 changes: 4 additions & 3 deletions pulsar/retry_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/log"
)

Expand Down Expand Up @@ -124,7 +125,7 @@ func (r *retryRouter) getProducer() Producer {
}

// Retry to create producer indefinitely
backoff := &internal.DefaultBackoff{}
bo := &backoff.DefaultBackoff{}
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.RetryLetterTopic
Expand All @@ -138,7 +139,7 @@ func (r *retryRouter) getProducer() Producer {

if err != nil {
r.log.WithError(err).Error("Failed to create RLQ producer")
time.Sleep(backoff.Next())
time.Sleep(bo.Next())
continue
} else {
r.producer = producer
Expand Down
Loading