diff --git a/access/grpc/client.go b/access/grpc/client.go index a87b935ac..73c174f0f 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -413,3 +413,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig { } return subsConf } + +func (c *Client) SubscribeAccountStatusesFromStartHeight( + ctx context.Context, + startBlockHeight uint64, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter) +} + +func (c *Client) SubscribeAccountStatusesFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter) +} + +func (c *Client) SubscribeAccountStatusesFromLatestBlock( + ctx context.Context, + filter flow.AccountStatusFilter, +) (<-chan flow.AccountStatus, <-chan error, error) { + return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter) +} diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index 5bdaf217e..7b902a184 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -23,6 +23,7 @@ import ( "fmt" "time" + "github.com/onflow/flow/protobuf/go/flow/executiondata" "google.golang.org/protobuf/types/known/timestamppb" "github.com/onflow/cadence" @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) { }, nil } +func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { + if m == nil { + return flow.AccountStatus{}, ErrEmptyMessage + } + + results, err := MessageToAccountStatusResults(m.GetResults()) + if err != nil { + return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err) + } + + return flow.AccountStatus{ + BlockID: MessageToIdentifier(m.GetBlockId()), + BlockHeight: m.GetBlockHeight(), + MessageIndex: m.GetMessageIndex(), + Results: results, + }, nil +} + +func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) { + results := make([]*flow.AccountStatusResult, len(m)) + var emptyOptions []jsoncdc.Option + + for i, r := range m { + events, err := MessagesToEvents(r.GetEvents(), emptyOptions) + if err != nil { + return nil, fmt.Errorf("error converting events: %w", err) + } + + results[i] = &flow.AccountStatusResult{ + Address: flow.BytesToAddress(r.GetAddress()), + Events: events, + } + } + + return results, nil +} + func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey { return &entities.AccountKey{ Index: uint32(a.Index), diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index dc21cb43d..c2f668908 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1517,6 +1517,148 @@ func receiveBlockHeadersFromClient[Client interface { } } +func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( + ctx context.Context, + startHeight uint64, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{ + StartBlockHeight: startHeight, + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{ + StartBlockId: startBlockID.Bytes(), + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( + ctx context.Context, + filter flow.AccountStatusFilter, + opts ...grpc.CallOption, +) (<-chan flow.AccountStatus, <-chan error, error) { + request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{ + EventEncodingVersion: c.eventEncoding, + } + request.Filter = &executiondata.StatusFilter{ + EventType: filter.EventTypes, + Address: filter.Addresses, + } + + subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + accountStatutesChan := make(chan flow.AccountStatus) + errChan := make(chan error) + + go func() { + defer close(accountStatutesChan) + defer close(errChan) + receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) + }() + + return accountStatutesChan, errChan, nil +} + +func receiveAccountStatusesFromStream[Stream interface { + Recv() (*executiondata.SubscribeAccountStatusesResponse, error) +}]( + ctx context.Context, + stream Stream, + accountStatutesChan chan<- flow.AccountStatus, + errChan chan<- error, +) { + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + var nextExpectedMsgIndex uint64 + for { + accountStatusResponse, err := stream.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + + sendErr(fmt.Errorf("error receiving account status: %w", err)) + return + } + + accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse) + if err != nil { + sendErr(fmt.Errorf("error converting message to account status: %w", err)) + return + } + + if accountStatus.MessageIndex != nextExpectedMsgIndex { + sendErr(fmt.Errorf("message received out of order")) + return + } + nextExpectedMsgIndex = accountStatus.MessageIndex + 1 + + select { + case <-ctx.Done(): + return + case accountStatutesChan <- accountStatus: + } + } +} + func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 0c1596292..30844d9a7 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2101,6 +2101,243 @@ func TestClient_SubscribeEvents(t *testing.T) { })) } +func TestClient_SubscribeAccountStatuses(t *testing.T) { + filter := flow.AccountStatusFilter{} + accounts := test.AccountGenerator() + blocks := test.BlockGenerator() + + generateAccountStatusesResponses := func(count uint64) []*executiondata.SubscribeAccountStatusesResponse { + var resBlockHeaders []*executiondata.SubscribeAccountStatusesResponse + + for i := uint64(0); i < count; i++ { + account := convert.AccountToMessage(*accounts.New()) + results := []*executiondata.SubscribeAccountStatusesResponse_Result{ + { + Address: account.Address, + }, + } + + block := blocks.New() + resBlockHeaders = append(resBlockHeaders, &executiondata.SubscribeAccountStatusesResponse{ + BlockId: block.ID.Bytes(), + BlockHeight: block.Height, + MessageIndex: i, + Results: results, + }) + } + + return resBlockHeaders + } + + t.Run("Happy Path - from start height", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + startHeight := uint64(1) + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromStartHeight", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromStartHeight(ctx, startHeight, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start block id", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromStartBlockID", ctx, mock.Anything). + Return(stream, nil) + + startBlockId := convert.MessageToIdentifier(stream.responses[0].GetBlockId()) + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockId, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start latest block", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + responses: generateAccountStatusesResponses(responseCount), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualAccountStatus := <-accountStatusesCh + + blockId := convert.MessageToIdentifier(stream.responses[i].GetBlockId()) + require.Equal(t, blockId, actualAccountStatus.BlockID) + + blockHeight := stream.responses[i].GetBlockHeight() + require.Equal(t, blockHeight, actualAccountStatus.BlockHeight) + + messageIndex := stream.responses[i].GetMessageIndex() + require.Equal(t, messageIndex, actualAccountStatus.MessageIndex) + + results, err := convert.MessageToAccountStatusResults(stream.responses[i].GetResults()) + require.NoError(t, err) + require.Equal(t, results, actualAccountStatus.Results) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatuses, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoAccountStatuses(t, accountStatuses, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) + + t.Run("Messages are not ordered", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { + generateUnorderedAccountStatusesResponses := func(count uint64) []*executiondata.SubscribeAccountStatusesResponse { + resBlockHeaders := generateAccountStatusesResponses(count) + resBlockHeaders[1].MessageIndex += 1 + return resBlockHeaders + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + stream := &mockAccountStatutesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "message received out of order"), + responses: generateUnorderedAccountStatusesResponses(2), + } + + rpc. + On("SubscribeAccountStatusesFromLatestBlock", ctx, mock.Anything). + Return(stream, nil) + + accountStatusesCh, errCh, err := c.SubscribeAccountStatusesFromLatestBlock(ctx, filter) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + + for accStatus := range accountStatusesCh { + // we expect stream to send at least 1 account + require.Equal(t, accStatus.MessageIndex, 0) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + }() + + wg.Wait() + })) +} + func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { responseEvents := make([]*entities.Event, 0, len(events)) for _, e := range events { @@ -2165,6 +2402,13 @@ func assertNoEvents[T any](t *testing.T, eventCh <-chan T, done func()) { } } +func assertNoAccountStatuses(t *testing.T, accountStatusesChan <-chan flow.AccountStatus, done func()) { + defer done() + for range accountStatusesChan { + require.FailNow(t, "should not receive account statuses") + } +} + type mockEventStream struct { grpc.ClientStream @@ -2888,3 +3132,26 @@ func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan require.FailNow(t, "should not receive block digests") } } + +type mockAccountStatutesClientStream struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*executiondata.SubscribeAccountStatusesResponse +} + +func (m *mockAccountStatutesClientStream) Recv() (*executiondata.SubscribeAccountStatusesResponse, error) { + if m.err != nil { + return nil, m.err + } + + if m.offset >= len(m.responses) { + <-m.ctx.Done() + return nil, io.EOF + } + defer func() { m.offset++ }() + + return m.responses[m.offset], nil +} diff --git a/account.go b/account.go index 4654f562a..a2baee8cd 100644 --- a/account.go +++ b/account.go @@ -154,3 +154,19 @@ type accountKeyWrapper struct { HashAlgo uint Weight uint } + +type AccountStatusFilter struct { + EventFilter +} + +type AccountStatus struct { + BlockID Identifier + BlockHeight uint64 + MessageIndex uint64 + Results []*AccountStatusResult +} + +type AccountStatusResult struct { + Address Address + Events []Event +}