From 50f189a0576f3e064ab4a09df5a7e4b66b10d0b6 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Fri, 20 Sep 2024 16:04:13 +0300 Subject: [PATCH 1/7] Add subscribe account statuses endpoint --- access/grpc/client.go | 23 ++++ access/grpc/convert/convert.go | 38 ++++++ access/grpc/grpc.go | 135 +++++++++++++++++++++ access/grpc/grpc_test.go | 215 +++++++++++++++++++++++++++++++++ account.go | 16 +++ 5 files changed, 427 insertions(+) diff --git a/access/grpc/client.go b/access/grpc/client.go index 5ca1729aa..11ed6943e 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -293,3 +293,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig { } return subsConf } + +func (c *Client) SubscribeAccountStatusesFromStartHeight( + ctx context.Context, + startBlockHeight uint64, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter) +} + +func (c *Client) SubscribeAccountStatusesFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter) +} + +func (c *Client) SubscribeAccountStatusesFromLatestBlock( + ctx context.Context, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter) +} diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index e52500a08..b614bfb56 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -23,6 +23,7 @@ import ( "fmt" "time" + "github.com/onflow/flow/protobuf/go/flow/executiondata" "google.golang.org/protobuf/types/known/timestamppb" "github.com/onflow/cadence" @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) { }, nil } +func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { + if m == nil { + return flow.AccountStatus{}, ErrEmptyMessage + } + + results, err := MessageToAccountStatusResults(m.GetResults()) + if err != nil { + return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err) + } + + return flow.AccountStatus{ + BlockID: MessageToIdentifier(m.GetBlockId()), + BlockHeight: m.GetBlockHeight(), + MessageIndex: m.GetMessageIndex(), + Results: results, + }, nil +} + +func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) { + results := make([]*flow.AccountStatusResult, len(m)) + var emptyOptions []jsoncdc.Option + + for i, r := range m { + events, err := MessagesToEvents(r.GetEvents(), emptyOptions) + if err != nil { + return nil, fmt.Errorf("error converting events: %w", err) + } + + results[i] = &flow.AccountStatusResult{ + Address: MessageToIdentifier(r.GetAddress()), + Events: events, + } + } + + return results, nil +} + func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey { return &entities.AccountKey{ Index: uint32(a.Index), diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index b70a55bed..d532a30c0 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -965,3 +965,138 @@ func (c *BaseClient) subscribeEvents( return sub, errChan, nil } + +func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( + ctx context.Context, + startHeight uint64, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{ + StartBlockHeight: startHeight, + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{ + StartBlockId: startBlockID.Bytes(), + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( + ctx context.Context, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{ + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func receiveAccountStatusesFromClient[Client interface { + Recv() (*executiondata.SubscribeAccountStatusesResponse, error) +}]( + ctx context.Context, + client Client, + accountStatutesChan chan<- flow.AccountStatus, + errChan chan<- error, +) { + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + for { + accountStatusResponse, err := client.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + + sendErr(fmt.Errorf("error receiving account status: %w", err)) + return + } + + accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse) + if err != nil { + sendErr(fmt.Errorf("error converting message to account status: %w", err)) + return + } + + select { + case <-ctx.Done(): + return + case accountStatutesChan <- accountStatus: + } + } +} diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 58a7939f6..3799c6f2f 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1678,6 +1678,191 @@ func TestClient_SubscribeEvents(t *testing.T) { })) } +func TestClient_SubscribeAccountStatuses(t *testing.T) { + filter := flow.AccountStatusFilter{} + accounts := test.AccountGenerator() + blocks := test.BlockGenerator() + + generateAccountStatusesResponses := func(count uint64) []*executiondata.SubscribeAccountStatusesResponse { + var resBlockHeaders []*executiondata.SubscribeAccountStatusesResponse + + for i := uint64(0); i < count; i++ { + account := convert.AccountToMessage(*accounts.New()) + results := []*executiondata.SubscribeAccountStatusesResponse_Result{ + { + Address: account.Address, + }, + } + + block := blocks.New() + resBlockHeaders = append(resBlockHeaders, &executiondata.SubscribeAccountStatusesResponse{ + BlockId: block.ID.Bytes(), + BlockHeight: block.Height, + MessageIndex: i, + Results: results, + }) + } + + return resBlockHeaders + } + + t.Run("Happy Path - from start height", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + startHeight := uint64(1) + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromStartHeight", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromStartHeight(ctx, startHeight, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start block id", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromStartBlockID", ctx, mock.Anything). + Return(stream, nil) + + startBlockId := convert.MessageToIdentifier(stream.responses[0].GetBlockId()) + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockId, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start latest block", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatuses, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoAccountStatuses(t, accountStatuses, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) +} + func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { responseEvents := make([]*entities.Event, 0, len(events)) for _, e := range events { @@ -1742,6 +1927,13 @@ func assertNoEvents[T any](t *testing.T, eventCh <-chan T, done func()) { } } +func assertNoAccountStatuses(t *testing.T, accountStatusesChan <-chan flow.AccountStatus, done func()) { + defer done() + for range accountStatusesChan { + require.FailNow(t, "should not receive account statuses") + } +} + type mockEventStream struct { grpc.ClientStream @@ -1787,3 +1979,26 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR return m.responses[m.offset], nil } + +type mockAccountStatutesClientStream struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*executiondata.SubscribeAccountStatusesResponse +} + +func (m *mockAccountStatutesClientStream) Recv() (*executiondata.SubscribeAccountStatusesResponse, error) { + if m.err != nil { + return nil, m.err + } + + if m.offset >= len(m.responses) { + <-m.ctx.Done() + return nil, io.EOF + } + defer func() { m.offset++ }() + + return m.responses[m.offset], nil +} diff --git a/account.go b/account.go index 4654f562a..cb7558664 100644 --- a/account.go +++ b/account.go @@ -154,3 +154,19 @@ type accountKeyWrapper struct { HashAlgo uint Weight uint } + +type AccountStatusFilter struct { + EventFilter +} + +type AccountStatus struct { + BlockID Identifier + BlockHeight uint64 + MessageIndex uint64 + Results []*AccountStatusResult +} + +type AccountStatusResult struct { + Address Identifier + Events []Event +} From 6374402e4321d5f6c728f410c6a76b06918c200b Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 2 Oct 2024 14:10:46 +0300 Subject: [PATCH 2/7] add msgIndex --- access/grpc/grpc.go | 23 ++++++++++++++++++----- examples/go.mod | 2 +- examples/go.sum | 1 + 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 81b50015d..01bd3daa5 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -25,6 +25,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "google.golang.org/grpc" @@ -82,6 +83,7 @@ type BaseClient struct { close func() error jsonOptions []json.Option eventEncoding flow.EventEncodingVersion + msgIndex atomic.Uint64 } // NewBaseClient creates a new gRPC handler for network communication. @@ -95,13 +97,17 @@ func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) { execDataClient := executiondata.NewExecutionDataAPIClient(conn) - return &BaseClient{ + client := &BaseClient{ rpcClient: grpcClient, executionDataClient: execDataClient, close: func() error { return conn.Close() }, jsonOptions: []json.Option{json.WithAllowUnstructuredStaticTypes(true)}, eventEncoding: flow.EventEncodingVersionCCF, - }, nil + } + + client.msgIndex.Store(^uint64(0)) + + return client, nil } // NewFromRPCClient initializes a Flow client using a pre-configured gRPC provider. @@ -1182,7 +1188,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1214,7 +1220,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1244,7 +1250,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1257,6 +1263,7 @@ func receiveAccountStatusesFromClient[Client interface { client Client, accountStatutesChan chan<- flow.AccountStatus, errChan chan<- error, + previousMsgIndex atomic.Uint64, ) { sendErr := func(err error) { select { @@ -1283,6 +1290,12 @@ func receiveAccountStatusesFromClient[Client interface { return } + swapped := previousMsgIndex.CompareAndSwap(accountStatus.MessageIndex-1, accountStatus.MessageIndex) + if !swapped { + sendErr(fmt.Errorf("messages are not ordered")) + return + } + select { case <-ctx.Done(): return diff --git a/examples/go.mod b/examples/go.mod index 6fd2b9c50..255c1752a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.4 replace github.com/onflow/flow-go-sdk => ../ require ( - github.com/onflow/cadence v1.0.0-preview.52 + github.com/onflow/cadence v1.0.0 github.com/onflow/flow-cli/flowkit v1.11.0 github.com/onflow/flow-go-sdk v0.41.17 github.com/spf13/afero v1.11.0 diff --git a/examples/go.sum b/examples/go.sum index 1a3ba9dca..78cf35e3b 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -117,6 +117,7 @@ github.com/onflow/cadence v1.0.0-preview.35/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmp github.com/onflow/cadence v1.0.0-preview.36/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= +github.com/onflow/cadence v1.0.0/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= From 9748a29ce3e375e9db08193fac612f4e9017eb0c Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 2 Oct 2024 15:02:52 +0300 Subject: [PATCH 3/7] check and increment message index correctly --- access/grpc/grpc.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 01bd3daa5..954c2b4c4 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -23,6 +23,7 @@ package grpc import ( "context" + "errors" "fmt" "io" "sync/atomic" @@ -1188,7 +1189,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1220,7 +1221,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1250,7 +1251,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) }() return accountStatutesChan, errChan, nil @@ -1263,7 +1264,7 @@ func receiveAccountStatusesFromClient[Client interface { client Client, accountStatutesChan chan<- flow.AccountStatus, errChan chan<- error, - previousMsgIndex atomic.Uint64, + previousMsgIndex *atomic.Uint64, ) { sendErr := func(err error) { select { @@ -1290,9 +1291,8 @@ func receiveAccountStatusesFromClient[Client interface { return } - swapped := previousMsgIndex.CompareAndSwap(accountStatus.MessageIndex-1, accountStatus.MessageIndex) - if !swapped { - sendErr(fmt.Errorf("messages are not ordered")) + if err = checkAndIncrementMessageIndex(previousMsgIndex, accountStatus.MessageIndex); err != nil { + sendErr(fmt.Errorf("error checking message index. messages are not ordered: %w", err)) return } @@ -1303,3 +1303,18 @@ func receiveAccountStatusesFromClient[Client interface { } } } + +func checkAndIncrementMessageIndex(previousMessageIndex *atomic.Uint64, currentMessageIndex uint64) error { + local := previousMessageIndex.Load() + + if local == 0 && currentMessageIndex == 0 { + return nil + } + + if currentMessageIndex != local+1 { + return errors.New("current message index is not exactly one larger than the previous one") + } + + previousMessageIndex.Add(1) + return nil +} From 3aeee13312bde33429782e868f736bc52ee3fa08 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 2 Oct 2024 15:07:33 +0300 Subject: [PATCH 4/7] fix bugs after merge --- access/grpc/grpc.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 2f23b16f0..0f55bb1c4 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -85,6 +85,7 @@ type BaseClient struct { close func() error jsonOptions []json.Option eventEncoding flow.EventEncodingVersion + messageIndex atomic.Uint64 } // NewBaseClient creates a new gRPC handler for network communication. @@ -1324,7 +1325,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) }() return accountStatutesChan, errChan, nil @@ -1356,7 +1357,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) }() return accountStatutesChan, errChan, nil @@ -1386,7 +1387,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.msgIndex) + receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) }() return accountStatutesChan, errChan, nil From 4215a3533e3018c2adc96becf98d6e6c6eb324c8 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 9 Oct 2024 12:52:48 +0300 Subject: [PATCH 5/7] remove atomic msg index counter --- access/grpc/convert/convert.go | 2 +- access/grpc/grpc.go | 36 ++++++++++------------------------ account.go | 2 +- 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index aeddc78e1..a32d4d09a 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -106,7 +106,7 @@ func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesRe } results[i] = &flow.AccountStatusResult{ - Address: MessageToIdentifier(r.GetAddress()), + Address: flow.BytesToAddress(r.GetAddress()), Events: events, } } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 0f55bb1c4..04fd07dfc 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -26,7 +26,6 @@ import ( "errors" "fmt" "io" - "sync/atomic" "github.com/onflow/flow/protobuf/go/flow/entities" "google.golang.org/grpc" @@ -85,7 +84,6 @@ type BaseClient struct { close func() error jsonOptions []json.Option eventEncoding flow.EventEncodingVersion - messageIndex atomic.Uint64 } // NewBaseClient creates a new gRPC handler for network communication. @@ -1325,7 +1323,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) }() return accountStatutesChan, errChan, nil @@ -1357,7 +1355,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) }() return accountStatutesChan, errChan, nil @@ -1387,20 +1385,19 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( go func() { defer close(accountStatutesChan) defer close(errChan) - receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan, &c.messageIndex) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) }() return accountStatutesChan, errChan, nil } -func receiveAccountStatusesFromClient[Client interface { +func receiveAccountStatusesFromStream[Stream interface { Recv() (*executiondata.SubscribeAccountStatusesResponse, error) }]( ctx context.Context, - client Client, + stream Stream, accountStatutesChan chan<- flow.AccountStatus, errChan chan<- error, - previousMsgIndex *atomic.Uint64, ) { sendErr := func(err error) { select { @@ -1409,8 +1406,9 @@ func receiveAccountStatusesFromClient[Client interface { } } + var nextExpectedMsgIndex uint64 for { - accountStatusResponse, err := client.Recv() + accountStatusResponse, err := stream.Recv() if err != nil { if err == io.EOF { // End of stream, return gracefully @@ -1427,10 +1425,11 @@ func receiveAccountStatusesFromClient[Client interface { return } - if err = checkAndIncrementMessageIndex(previousMsgIndex, accountStatus.MessageIndex); err != nil { - sendErr(fmt.Errorf("error checking message index. messages are not ordered: %w", err)) + if accountStatus.MessageIndex != nextExpectedMsgIndex { + sendErr(fmt.Errorf("messages are not ordered")) return } + nextExpectedMsgIndex = accountStatus.MessageIndex + 1 select { case <-ctx.Done(): @@ -1439,18 +1438,3 @@ func receiveAccountStatusesFromClient[Client interface { } } } - -func checkAndIncrementMessageIndex(previousMessageIndex *atomic.Uint64, currentMessageIndex uint64) error { - local := previousMessageIndex.Load() - - if local == 0 && currentMessageIndex == 0 { - return nil - } - - if currentMessageIndex != local+1 { - return errors.New("current message index is not exactly one larger than the previous one") - } - - previousMessageIndex.Add(1) - return nil -} diff --git a/account.go b/account.go index cb7558664..a2baee8cd 100644 --- a/account.go +++ b/account.go @@ -167,6 +167,6 @@ type AccountStatus struct { } type AccountStatusResult struct { - Address Identifier + Address Address Events []Event } From 1fce6a8d9da71435f558173d550b6767fb76ef6a Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 9 Oct 2024 14:46:49 +0300 Subject: [PATCH 6/7] add test for not ordered messages error --- access/grpc/grpc_test.go | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index f24fe1c7c..bc82d3d16 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2281,6 +2281,58 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { wg.Wait() })) + + t.Run("Messages are not ordered", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + generateUnorderedAccountStatusesResponses := func(count uint64) []*executiondata.SubscribeAccountStatusesResponse { + resBlockHeaders := generateAccountStatusesResponses(count) + resBlockHeaders[1].MessageIndex += 1 + return resBlockHeaders + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "message are not ordered"), + responses: generateUnorderedAccountStatusesResponses(2), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + + for accStatus := range accountStatusesCh { + // we expect stream to send at least 1 account + require.Equal(t, accStatus.MessageIndex, 0) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + }() + + wg.Wait() + })) } func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { From b2a3948119b94d755e7ecb24a70ba5d8d4f63bf6 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Tue, 15 Oct 2024 15:36:49 +0300 Subject: [PATCH 7/7] change error message --- access/grpc/grpc.go | 2 +- access/grpc/grpc_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 939347ceb..f54ce04cf 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1498,7 +1498,7 @@ func receiveAccountStatusesFromStream[Stream interface { } if accountStatus.MessageIndex != nextExpectedMsgIndex { - sendErr(fmt.Errorf("messages are not ordered")) + sendErr(fmt.Errorf("message received out of order")) return } nextExpectedMsgIndex = accountStatus.MessageIndex + 1 diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index c00c7270d..a8167e4bd 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2294,7 +2294,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { stream := &mockAccountStatutesClientStream{ ctx: ctx, - err: status.Error(codes.Internal, "message are not ordered"), + err: status.Error(codes.Internal, "message received out of order"), responses: generateUnorderedAccountStatusesResponses(2), }