From 3a570ea6a3c77de6aa4946442056ebbd79c0ad36 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 4 Apr 2024 12:30:50 +0000 Subject: [PATCH 1/2] go.mod: bump google.golang.org/grpc from 1.62.1 to 1.63.0 Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.62.1 to 1.63.0. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.62.1...v1.63.0) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d1d4d57..4daa2b7 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/matryer/is v1.4.1 go.uber.org/mock v0.4.0 go.uber.org/multierr v1.11.0 - google.golang.org/grpc v1.62.1 + google.golang.org/grpc v1.63.0 google.golang.org/protobuf v1.33.0 gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) diff --git a/go.sum b/go.sum index 306762a..d89c6e5 100644 --- a/go.sum +++ b/go.sum @@ -1107,8 +1107,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= +google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From 870695a1c32e6ee43b1efc4d4e1a2e904ba89283 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 4 Apr 2024 18:16:24 +0200 Subject: [PATCH 2/2] use grpc.NewClient instead of DialContext --- destination.go | 29 +++++++++++++++++++++-------- destination_test.go | 4 ++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/destination.go b/destination.go index b9a234e..2283733 100644 --- a/destination.go +++ b/destination.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" @@ -91,7 +92,6 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro func (d *Destination) Open(ctx context.Context) error { dialOptions := []grpc.DialOption{ grpc.WithContextDialer(d.dialer), - grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}), } if d.config.RateLimit > 0 { @@ -109,17 +109,30 @@ func (d *Destination) Open(ctx context.Context) error { } else { dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) } - ctxTimeout, cancel := context.WithTimeout(ctx, d.config.MaxDowntime) - defer cancel() - conn, err := grpc.DialContext(ctxTimeout, - d.config.URL, - dialOptions..., - ) + conn, err := grpc.NewClient(d.config.URL, dialOptions...) if err != nil { - return fmt.Errorf("failed to dial server: %w", err) + return fmt.Errorf("failed to create grpc client: %w", err) } d.conn = conn + // Block until conn is ready. + conn.Connect() + ctxTimeout, cancel := context.WithTimeout(ctx, d.config.MaxDowntime) + defer cancel() + for { + s := conn.GetState() + if s == connectivity.Idle { + conn.Connect() + } + if s == connectivity.Ready { + break // connection is ready + } + if !conn.WaitForStateChange(ctxTimeout, s) { + // ctx got timeout or canceled. + return fmt.Errorf("failed to connect to server in time (state: %s)", s) + } + } + d.sm, err = NewStreamManager(ctx, conn, d.config.ReconnectDelay, d.config.MaxDowntime) if err != nil { return err diff --git a/destination_test.go b/destination_test.go index cb30db6..c070ba3 100644 --- a/destination_test.go +++ b/destination_test.go @@ -122,7 +122,7 @@ func TestBackoffRetry_MaxDowntime(t *testing.T) { ctx := context.Background() dest := NewDestinationWithDialer(dialer) err := dest.Configure(ctx, map[string]string{ - "url": "bufnet", + "url": "passthrough://bufnet", "rateLimit": "0", "maxDowntime": "500ms", "reconnectDelay": "200ms", @@ -171,7 +171,7 @@ func TestBackoffRetry_Reconnect(t *testing.T) { ctx := context.Background() dest := NewDestinationWithDialer(dialer) err := dest.Configure(ctx, map[string]string{ - "url": "bufnet", + "url": "passthrough://bufnet", "rateLimit": "0", "maxDowntime": "5s", "reconnectDelay": "200ms",