Skip to content

Commit

Permalink
[azeventhubs,azservicebus] Applying retry fix to both eventhubs and s…
Browse files Browse the repository at this point in the history
…ervicebus (#23562)

* Some lint errors.
* Fixing some broken tests. It looks like these weren't running in CI because what I've changed should not have affected them at all.

Fixes #23523
  • Loading branch information
richardpark-msft authored Oct 11, 2024
1 parent 5df372c commit 42922c1
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 233 deletions.
6 changes: 3 additions & 3 deletions eng/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@
},
{
"Name": "messaging/azservicebus",
"CoverageGoal": 0.48
"CoverageGoal": 0.39
},
{
"Name": "messaging/azeventhubs",
"CoverageGoal": 0.60
"CoverageGoal": 0.45
},
{
"Name": "messaging/eventgrid/aznamespaces",
Expand Down Expand Up @@ -134,4 +134,4 @@
"CoverageGoal": 0.75
}
]
}
}
10 changes: 6 additions & 4 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Release History

## 1.2.3 (Unreleased)
## 1.2.3 (2024-10-14)

### Bugs Fixed

- Fixed a bug where cancelling RenewMessageLock() calls could cause hangs in future RenewMessageLock calls. (PR#23400)
- Fixed bug where cancelling management link calls, such GetEventHubProperties() or GetPartitionProperties, could result in blocked calls. (PR#23400)
- Apply fix from @bcho for overflows with retries. (PR#23562)

## 1.2.2 (2024-08-15)

Expand All @@ -23,9 +24,10 @@
### Bugs Fixed

Processor.Run had unclear behavior for some cases:
- Run() now returns an explicit error when called more than once on a single

- Run() now returns an explicit error when called more than once on a single
Processor instance or if multiple Run calls are made concurrently. (PR#22833)
- NextProcessorClient now properly terminates (and returns nil) if called on a
- NextProcessorClient now properly terminates (and returns nil) if called on a
stopped Processor. (PR#22833)

## 1.1.0 (2024-04-02)
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func sendEventToPartition(t *testing.T, producer *azeventhubs.ProducerClient, pa

eventToSend.Properties = props

err = batch.AddEventData(event, nil)
err = batch.AddEventData(&eventToSend, nil)
require.NoError(t, err)
}

Expand Down
47 changes: 30 additions & 17 deletions sdk/messaging/azeventhubs/internal/links_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package internal
import (
"context"
"fmt"
"net"
"testing"
"time"

Expand Down Expand Up @@ -113,6 +112,8 @@ func TestLinksRecoverLinkWithConnectionFailureAndExpiredContext(t *testing.T) {
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)

t.Logf("Getting links (original), manually")

oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

Expand All @@ -123,32 +124,44 @@ func TestLinksRecoverLinkWithConnectionFailureAndExpiredContext(t *testing.T) {
err = origConn.Close()
require.NoError(t, err)

err = oldLWID.Link().Send(context.Background(), &amqp.Message{}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

// Try to recover, but using an expired context. We'll get a network error (not enough time to resolve or
// create a connection), which would normally be a connection level recovery event.
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID))
var netErr net.Error
require.ErrorAs(t, err, &netErr)
t.Logf("Sending message, within retry loop, with an already expired context")

// now recover like normal
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID))
require.NoError(t, err)
err = links.Retry(cancelledCtx, "(expired context) retry loop with precancelled context", "send", "0", exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
// ignoring the cancelled context, let's see what happens.
t.Logf("(expired context) Sending message")
err = lwid.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("(expired context) hello world")},
}, nil)

newLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)
t.Logf("(expired context) Message sent, error: %#v", err)
return err
})
require.ErrorIs(t, err, context.DeadlineExceeded)

requireNewLinkNewConn(t, oldLWID, newLWID)
t.Logf("Sending message, within retry loop, NO expired context")

err = newLWID.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
var newLWID LinkWithID[amqpwrap.AMQPSenderCloser]

err = links.Retry(context.Background(), "(normal) retry loop without cancelled context", "send", "0", exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
// ignoring the cancelled context, let's see what happens.
t.Logf("(normal) Sending message")
err = lwid.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
t.Logf("(normal) Message sent, error: %#v", err)

newLWID = lwid
return err
})
require.NoError(t, err)

requireNewLinkNewConn(t, oldLWID, newLWID)
require.Equal(t, newLWID.ConnID(), uint64(2), "we should have recovered the connection")
}

func TestLinkFailureWhenConnectionIsDead(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions sdk/messaging/azeventhubs/internal/mock/mock_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func SetupRPC(sender *MockAMQPSenderCloser, receiver *MockAMQPReceiverCloser, ex
CorrelationID: sentMessage.Properties.MessageID,
},
}
receiver.EXPECT().AcceptMessage(gomock.Any(), gomock.Any()).Return(nil)

// let the caller fill in the blanks of whatever needs to happen here.
handler(sentMessage, response)
return response, nil
Expand Down
18 changes: 0 additions & 18 deletions sdk/messaging/azeventhubs/internal/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ func TestRPCLinkNonErrorRequiresNoRecovery(t *testing.T) {
require.Equal(t, 200, resp.Code)
require.Equal(t, "response from service", resp.Message.Value)

acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")

require.NoError(t, link.Close(context.Background()))

logMessages := getLogs()
Expand Down Expand Up @@ -219,9 +216,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
require.ErrorAs(t, err, &rpcErr)
require.Equal(t, 400, rpcErr.RPCCode())

acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")

// validate that a normal error doesn't cause the response router to shut down
resp, err = link.RPC(context.Background(), &amqp.Message{
ApplicationProperties: map[string]any{
Expand All @@ -232,8 +226,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, "response from service", resp.Message.Value)
acceptedMessage = <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
}

func TestRPCLinkClosingClean_SessionCreationFailed(t *testing.T) {
Expand Down Expand Up @@ -424,7 +416,6 @@ func TestRPCLinkUsesCorrectFlags(t *testing.T) {
func NewRPCTester(t *testing.T) *rpcTester {
return &rpcTester{t: t,
ResponsesCh: make(chan *rpcTestResp, 1000),
Accepted: make(chan *amqp.Message, 1),
RPCLoopStarted: make(chan struct{}, 1),
}
}
Expand All @@ -440,9 +431,6 @@ type rpcTester struct {
amqpwrap.AMQPReceiverCloser
receiverOpts *amqp.ReceiverOptions

// Accepted contains all the messages where we called AcceptMessage(msg)
// We only call this when we
Accepted chan *amqp.Message
ResponsesCh chan *rpcTestResp
t *testing.T

Expand Down Expand Up @@ -501,12 +489,6 @@ func (tester *rpcTester) LinkName() string {

// receiver functions

func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error {
require.NotNil(tester.t, tester.Accepted, "No messages should be AcceptMessage()'d since the tester.Accepted channel was nil")
tester.Accepted <- msg
return nil
}

func (tester *rpcTester) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
tester.closeRPCLoopStarted.Do(func() {
close(tester.RPCLoopStarted)
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/sas/sas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func parseSig(sigStr string) (*sig, error) {
case "skn":
parsed.skn = keyValue[1]
default:
return nil, fmt.Errorf(fmt.Sprintf("unknown key / value: %q", keyValue))
return nil, fmt.Errorf("unknown key / value: %q", keyValue)
}
}
return parsed, nil
Expand Down
34 changes: 21 additions & 13 deletions sdk/messaging/azeventhubs/internal/utils/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,33 @@ func setDefaults(o *exported.RetryOptions) {
}

// (adapted from from azcore/policy_retry)
func calcDelay(o exported.RetryOptions, try int32) time.Duration {
if try == 0 {
return 0
func calcDelay(o exported.RetryOptions, try int32) time.Duration { // try is >=1; never 0
// avoid overflow when shifting left
factor := time.Duration(math.MaxInt64)
if try < 63 {
factor = time.Duration(int64(1<<try) - 1)
}

pow := func(number int64, exponent int32) int64 { // pow is nested helper function
var result int64 = 1
for n := int32(0); n < exponent; n++ {
result *= number
}
return result
delay := factor * o.RetryDelay
if delay < factor {
// overflow has happened so set to max value
delay = time.Duration(math.MaxInt64)
}

delay := time.Duration(pow(2, try)-1) * o.RetryDelay
// Introduce jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
jitterMultiplier := rand.Float64()/2 + 0.8 // NOTE: We want math/rand; not crypto/rand

delayFloat := float64(delay) * jitterMultiplier
if delayFloat > float64(math.MaxInt64) {
// the jitter pushed us over MaxInt64, so just use MaxInt64
delay = time.Duration(math.MaxInt64)
} else {
delay = time.Duration(delayFloat)
}

// Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
delay = time.Duration(delay.Seconds() * (rand.Float64()/2 + 0.8) * float64(time.Second)) // NOTE: We want math/rand; not crypto/rand
if delay > o.MaxRetryDelay {
if delay > o.MaxRetryDelay { // MaxRetryDelay is backfilled with non-negative value
delay = o.MaxRetryDelay
}

return delay
}
73 changes: 69 additions & 4 deletions sdk/messaging/azeventhubs/internal/utils/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ func TestRetrier(t *testing.T) {
return errors.Is(err, context.Canceled)
}

customRetryOptions := fastRetryOptions
customRetryOptions.MaxRetries = 1

var actualAttempts []int32

maxRetries := int32(2)
Expand Down Expand Up @@ -276,7 +273,7 @@ func TestRetryDefaults(t *testing.T) {
require.EqualValues(t, time.Duration(0), ro.RetryDelay)
}

func TestCalcDelay(t *testing.T) {
func TestCalcDelay2(t *testing.T) {
// calcDelay introduces some jitter, automatically.
ro := exported.RetryOptions{}
setDefaults(&ro)
Expand Down Expand Up @@ -409,6 +406,74 @@ func TestRetryLogging(t *testing.T) {
})
}

func BenchmarkCalcDelay_defaultSettings(b *testing.B) {
retryOptions := exported.RetryOptions{}
setDefaults(&retryOptions)

for i := 0; i < b.N; i++ {
calcDelay(retryOptions, 32)
}
}

func BenchmarkCalcDelay_overflow(b *testing.B) {
retryOptions := exported.RetryOptions{
RetryDelay: 1,
MaxRetryDelay: math.MaxInt64,
}
setDefaults(&retryOptions)

for i := 0; i < b.N; i++ {
calcDelay(retryOptions, 100)
}
}

func TestCalcDelay(t *testing.T) {
requireWithinJitter := func(t testing.TB, expected, actual time.Duration) {
lower, upper := float64(expected)*0.8, float64(expected)*1.3
require.Truef(
t, float64(actual) >= lower && float64(actual) <= upper,
"%.2f not within jitter of %.2f", actual.Seconds(), expected.Seconds(),
)
}

t.Run("basic cases", func(t *testing.T) {
retryOptions := exported.RetryOptions{
RetryDelay: 1 * time.Second,
MaxRetryDelay: 30 * time.Second,
}
setDefaults(&retryOptions)

for i := int32(1); i <= 5; i++ {
delay := float64(calcDelay(retryOptions, i))
expected := float64((1<<i - 1) * int64(retryOptions.RetryDelay))
requireWithinJitter(
t, time.Duration(expected), time.Duration(delay),
)
}
for i := int32(6); i < 100; i++ {
require.Equal(
t,
calcDelay(retryOptions, i),
retryOptions.MaxRetryDelay,
)
}
})

t.Run("overflow", func(t *testing.T) {
retryOptions := exported.RetryOptions{
RetryDelay: 1,
MaxRetryDelay: math.MaxInt64,
}
setDefaults(&retryOptions)

for i := int32(63); i < 100000; i++ {
requireWithinJitter(
t, math.MaxInt64, calcDelay(retryOptions, i),
)
}
})
}

// retryRE is used to replace the 'retry time' with a consistent string to make
// unit tests against logging simpler
// A typical string: "[azsb.Retry] (retry) Attempt 1 sleeping for 1.10233ms"
Expand Down
10 changes: 8 additions & 2 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 1.7.3 (2024-10-14)

### Bugs Fixed

- Apply fix from @bcho for overflows with retries. (PR#23562)

## 1.7.2 (2024-09-11)

### Bugs Fixed
Expand Down Expand Up @@ -33,7 +39,7 @@

### Bugs Fixed

- Settling a message (using CompleteMessage, AbandonMessage, etc..) on a different Receiver instance than you received on no
- Settling a message (using CompleteMessage, AbandonMessage, etc..) on a different Receiver instance than you received on no
longer leaks memory. (PR#22253)

## 1.5.0 (2023-10-10)
Expand All @@ -57,7 +63,7 @@

### Features Added

- `admin.SubscriptionProperties` now allow for a `DefaultRule` to be set. This allows Subscriptions to be created with an immediate filter/action.
- `admin.SubscriptionProperties` now allow for a `DefaultRule` to be set. This allows Subscriptions to be created with an immediate filter/action.
Contributed by @StrawbrryFlurry. (PR#20888)

## 1.3.0 (2023-05-09)
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
package internal

// Version is the semantic version number
const Version = "v1.7.2"
const Version = "v1.7.3"
Loading

0 comments on commit 42922c1

Please sign in to comment.