From c3fa8bf6d026150e5bb2e495259ffa4b88d05b51 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 19 Sep 2024 16:05:31 +0300 Subject: [PATCH 1/7] Added SubscribeBlockDigests endpoints --- access/grpc/client.go | 23 +++++ access/grpc/convert/convert.go | 27 +++++ access/grpc/grpc.go | 120 +++++++++++++++++++++ access/grpc/grpc_test.go | 184 ++++++++++++++++++++++++++++++++- block.go | 7 ++ 5 files changed, 356 insertions(+), 5 deletions(-) diff --git a/access/grpc/client.go b/access/grpc/client.go index 683e397a4..5b6d6366a 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -298,6 +298,29 @@ func (c *Client) SubscribeEventsByBlockHeight( return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval)) } +func (c *Client) SubscribeBlockDigestsFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus) +} + +func (c *Client) SubscribeBlockDigestsFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus) +} + +func (c *Client) SubscribeBlockDigestsFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus) +} + func (c *Client) Close() error { return c.grpc.Close() } diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index 0c45da014..c3573d4a1 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -192,6 +192,33 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { }, nil } +func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest { + return flow.BlockDigest{ + BlockID: flow.BytesToID(m.GetBlockId()), + Height: m.GetBlockHeight(), + Timestamp: m.GetBlockTimestamp().AsTime(), + } +} + +func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse { + return &access.SubscribeBlockDigestsResponse{ + BlockId: IdentifierToMessage(blockDigest.BlockID), + BlockHeight: blockDigest.Height, + BlockTimestamp: timestamppb.New(blockDigest.Timestamp), + } +} + +func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus { + switch blockStatus { + case flow.BlockStatusFinalized: + return entities.BlockStatus_BLOCK_FINALIZED + case flow.BlockStatusSealed: + return entities.BlockStatus_BLOCK_SEALED + default: + return entities.BlockStatus_BLOCK_UNKNOWN + } +} + func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) { switch encodingVersion { case flow.EventEncodingVersionCCF: diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 1616b8691..2237fabbe 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1019,3 +1019,123 @@ func (c *BaseClient) subscribeEvents( return sub, errChan, nil } + +func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{ + StartBlockId: startBlockID.Bytes(), + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + request := &access.SubscribeBlockDigestsFromStartHeightRequest{ + StartBlockHeight: startHeight, + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockDigestsFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + request := &access.SubscribeBlockDigestsFromLatestRequest{ + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func receiveBlockDigestFromClient[Client interface { + Recv() (*access.SubscribeBlockDigestsResponse, error) +}]( + ctx context.Context, + client Client, + blockDigestsChan chan<- flow.BlockDigest, + errChan chan<- error, +) { + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + for { + // Receive the next blockDigest response + blockDigestResponse, err := client.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + + sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + return + } + + blockDigest := convert.MessageToBlockDigest(blockDigestResponse) + + select { + case <-ctx.Done(): + return + case blockDigestsChan <- blockDigest: + } + } +} diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index a991d27ca..70f748e9e 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -20,11 +20,6 @@ package grpc import ( "context" - "io" - "math/rand" - "sync" - "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -32,6 +27,10 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" + "io" + "math/rand" + "sync" + "testing" "github.com/onflow/cadence" jsoncdc "github.com/onflow/cadence/encoding/json" @@ -1918,6 +1917,151 @@ func TestClient_SubscribeEvents(t *testing.T) { })) } +func Test_SubscribeBlockDigest(t *testing.T) { + blockHeaders := test.BlockHeaderGenerator() + + generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse { + var resBlockDigests []*access.SubscribeBlockDigestsResponse + + for i := uint64(0); i < count; i++ { + blockHeader := blockHeaders.New() + + digest := flow.BlockDigest{ + BlockID: blockHeader.ID, + Height: blockHeader.Height, + Timestamp: blockHeader.Timestamp, + } + + resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest)) + } + + return resBlockDigests + } + + t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + startHeight := uint64(1) + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything). + Return(stream, nil) + + startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoBlockDigests(t, blockDigestsCh, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) + +} func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { responseEvents := make([]*entities.Event, 0, len(events)) for _, e := range events { @@ -2027,3 +2171,33 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR return m.responses[m.offset], nil } + +type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*SubscribeBlockDigestsResponse +} + +func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) { + if s.err != nil { + return nil, s.err + } + + if s.offset >= len(s.responses) { + <-s.ctx.Done() + return nil, io.EOF + } + defer func() { s.offset++ }() + + return s.responses[s.offset], nil +} + +func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) { + defer done() + for range blockDigestsChan { + require.FailNow(t, "should not receive block digests") + } +} diff --git a/block.go b/block.go index 5ee39fcc9..3e9a0211e 100644 --- a/block.go +++ b/block.go @@ -76,3 +76,10 @@ type BlockSeal struct { // block produces the same receipt among all verifying nodes ExecutionReceiptID Identifier } + +// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp +type BlockDigest struct { + BlockID Identifier + Height uint64 + Timestamp time.Time +} From 1e00b1ec22313038efcf47e4e0491194be35c69c Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Sep 2024 11:49:34 +0300 Subject: [PATCH 2/7] Removed endpoints from another PR --- access/grpc/client.go | 8 ------ access/grpc/grpc.go | 26 ------------------ access/grpc/grpc_test.go | 58 ---------------------------------------- 3 files changed, 92 deletions(-) diff --git a/access/grpc/client.go b/access/grpc/client.go index 5b6d6366a..6a631f29a 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -244,14 +244,6 @@ func (c *Client) GetLatestProtocolStateSnapshot(ctx context.Context) ([]byte, er return c.grpc.GetLatestProtocolStateSnapshot(ctx) } -func (c *Client) GetProtocolStateSnapshotByBlockID(ctx context.Context, blockID flow.Identifier) ([]byte, error) { - return c.grpc.GetProtocolStateSnapshotByBlockID(ctx, blockID) -} - -func (c *Client) GetProtocolStateSnapshotByHeight(ctx context.Context, blockHeight uint64) ([]byte, error) { - return c.grpc.GetProtocolStateSnapshotByHeight(ctx, blockHeight) -} - func (c *Client) GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionResult, error) { return c.grpc.GetExecutionResultForBlockID(ctx, blockID) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 2237fabbe..be6d222b1 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -774,32 +774,6 @@ func (c *BaseClient) GetLatestProtocolStateSnapshot(ctx context.Context, opts .. return res.GetSerializedSnapshot(), nil } -func (c *BaseClient) GetProtocolStateSnapshotByBlockID(ctx context.Context, blockID flow.Identifier, opts ...grpc.CallOption) ([]byte, error) { - req := &access.GetProtocolStateSnapshotByBlockIDRequest{ - BlockId: blockID.Bytes(), - } - - res, err := c.rpcClient.GetProtocolStateSnapshotByBlockID(ctx, req, opts...) - if err != nil { - return nil, newRPCError(err) - } - - return res.GetSerializedSnapshot(), nil -} - -func (c *BaseClient) GetProtocolStateSnapshotByHeight(ctx context.Context, blockHeight uint64, opts ...grpc.CallOption) ([]byte, error) { - req := &access.GetProtocolStateSnapshotByHeightRequest{ - BlockHeight: blockHeight, - } - - res, err := c.rpcClient.GetProtocolStateSnapshotByHeight(ctx, req, opts...) - if err != nil { - return nil, newRPCError(err) - } - - return res.GetSerializedSnapshot(), nil -} - func (c *BaseClient) GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier, opts ...grpc.CallOption) (*flow.ExecutionResult, error) { er, err := c.rpcClient.GetExecutionResultForBlockID(ctx, &access.GetExecutionResultForBlockIDRequest{ BlockId: convert.IdentifierToMessage(blockID), diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 70f748e9e..12ebaf650 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1319,64 +1319,6 @@ func TestClient_GetLatestProtocolStateSnapshot(t *testing.T) { })) } -func TestClient_GetProtocolStateSnapshotByBlockID(t *testing.T) { - ids := test.IdentifierGenerator() - - t.Run("Success", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { - blockID := ids.New() - - expected := &access.ProtocolStateSnapshotResponse{ - SerializedSnapshot: make([]byte, 128), - } - _, err := rand.Read(expected.SerializedSnapshot) - assert.NoError(t, err) - - rpc.On("GetProtocolStateSnapshotByBlockID", ctx, mock.Anything).Return(expected, nil) - - res, err := c.GetProtocolStateSnapshotByBlockID(ctx, blockID) - assert.NoError(t, err) - assert.Equal(t, expected.SerializedSnapshot, res) - })) - - t.Run("Internal error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { - blockID := ids.New() - - rpc.On("GetProtocolStateSnapshotByBlockID", ctx, mock.Anything). - Return(nil, errInternal) - - _, err := c.GetProtocolStateSnapshotByBlockID(ctx, blockID) - assert.Error(t, err) - assert.Equal(t, codes.Internal, status.Code(err)) - })) -} - -func TestClient_GetProtocolStateSnapshotByHeight(t *testing.T) { - blockHeight := uint64(42) - - t.Run("Success", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { - expected := &access.ProtocolStateSnapshotResponse{ - SerializedSnapshot: make([]byte, 128), - } - _, err := rand.Read(expected.SerializedSnapshot) - assert.NoError(t, err) - - rpc.On("GetProtocolStateSnapshotByHeight", ctx, mock.Anything).Return(expected, nil) - - res, err := c.GetProtocolStateSnapshotByHeight(ctx, blockHeight) - assert.NoError(t, err) - assert.Equal(t, expected.SerializedSnapshot, res) - })) - - t.Run("Internal error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { - rpc.On("GetProtocolStateSnapshotByHeight", ctx, mock.Anything). - Return(nil, errInternal) - - _, err := c.GetProtocolStateSnapshotByHeight(ctx, blockHeight) - assert.Error(t, err) - assert.Equal(t, codes.Internal, status.Code(err)) - })) -} - func TestClient_GetExecutionResultForBlockID(t *testing.T) { ids := test.IdentifierGenerator() t.Run("Success", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { From 7fa63b38bc7274adae3a1e2f353de28e41dcf3fb Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Sep 2024 11:51:26 +0300 Subject: [PATCH 3/7] Fixed imports --- access/grpc/grpc_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 12ebaf650..f3616633e 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -20,6 +20,11 @@ package grpc import ( "context" + "io" + "math/rand" + "sync" + "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -27,10 +32,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" - "io" - "math/rand" - "sync" - "testing" "github.com/onflow/cadence" jsoncdc "github.com/onflow/cadence/encoding/json" From e6b648a5e90e0edb56e235896d0a463fd65694a3 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Sep 2024 17:43:39 +0300 Subject: [PATCH 4/7] Renamed test name --- access/grpc/grpc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index f3616633e..7fe898e6a 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1860,7 +1860,7 @@ func TestClient_SubscribeEvents(t *testing.T) { })) } -func Test_SubscribeBlockDigest(t *testing.T) { +func TestClient_SubscribeBlockDigest(t *testing.T) { blockHeaders := test.BlockHeaderGenerator() generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse { From c0e017009990c9073380cd966654223c0414cfb0 Mon Sep 17 00:00:00 2001 From: Andrii Diachuk Date: Thu, 26 Sep 2024 10:22:52 +0300 Subject: [PATCH 5/7] fixed error msg Co-authored-by: Janez Podhostnik <67895329+janezpodhostnik@users.noreply.github.com> --- access/grpc/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index be6d222b1..e31c46966 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1100,7 +1100,7 @@ func receiveBlockDigestFromClient[Client interface { return } - sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + sendErr(fmt.Errorf("error receiving blockDigest: %w", err)) return } From 42322769820f208a63576e8ebba215fbc54d1610 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 9 Oct 2024 15:07:18 +0300 Subject: [PATCH 6/7] return error if block status is unknown --- access/grpc/grpc.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index d3c77b2f0..a80d516b3 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1303,9 +1303,14 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{ StartBlockId: startBlockID.Bytes(), - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...) From 3a6992c3ec39cfe780b88f8bc9d1a1c8123e4dac Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 9 Oct 2024 15:10:04 +0300 Subject: [PATCH 7/7] handle block status in tests --- access/grpc/grpc.go | 14 ++++++++++++-- access/grpc/grpc_test.go | 8 ++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 1d0aed61f..fea0d35bf 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1408,9 +1408,14 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockDigestsFromStartHeightRequest{ StartBlockHeight: startHeight, - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...) @@ -1435,8 +1440,13 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockDigestsFromLatestRequest{ - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 975a0d4f2..09e5534a3 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2571,7 +2571,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything). Return(stream, nil) - blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusSealed) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2602,7 +2602,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { Return(stream, nil) startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId) - blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusSealed) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2632,7 +2632,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). Return(stream, nil) - blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2660,7 +2660,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). Return(stream, nil) - blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed) require.NoError(t, err) wg := sync.WaitGroup{}