From 887cdd3aa1e5343f58fbed7dfc3f28af5bdca11a Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Thu, 19 Sep 2024 13:09:18 +0300 Subject: [PATCH 1/5] Add SubscribeBlockHeaders* endpoints --- access/grpc/client.go | 23 +++++ access/grpc/convert/convert.go | 11 +++ access/grpc/grpc.go | 124 +++++++++++++++++++++++ access/grpc/grpc_test.go | 175 +++++++++++++++++++++++++++++++++ 4 files changed, 333 insertions(+) diff --git a/access/grpc/client.go b/access/grpc/client.go index 5ca1729aa..e0f4a1739 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -278,6 +278,29 @@ func (c *Client) SubscribeEventsByBlockHeight( return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval)) } +func (c *Client) SubscribeBlockHeadersFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockHeader, <-chan error, error) { + return c.grpc.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, blockStatus) +} + +func (c *Client) SubscribeBlockHeadersFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockHeader, <-chan error, error) { + return c.grpc.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) +} + +func (c *Client) SubscribeBlocksHeadersFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockHeader, <-chan error, error) { + return c.grpc.SubscribeBlockHeadersFromLatest(ctx, blockStatus) +} + func (c *Client) Close() error { return c.grpc.Close() } diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index e52500a08..4c1f1c15e 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -192,6 +192,17 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { }, nil } +func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus { + switch blockStatus { + case flow.BlockStatusFinalized: + return entities.BlockStatus_BLOCK_FINALIZED + case flow.BlockStatusSealed: + return entities.BlockStatus_BLOCK_SEALED + default: + return entities.BlockStatus_BLOCK_UNKNOWN + } +} + func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) { switch encodingVersion { case flow.EventEncodingVersionCCF: diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index b70a55bed..c21773676 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -965,3 +965,127 @@ func (c *BaseClient) subscribeEvents( return sub, errChan, nil } + +func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockHeader, <-chan error, error) { + request := &access.SubscribeBlockHeadersFromStartBlockIDRequest{ + StartBlockId: startBlockID.Bytes(), + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartBlockID(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blockHeaderChan := make(chan flow.BlockHeader) + errChan := make(chan error) + + go func() { + defer close(blockHeaderChan) + defer close(errChan) + receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) + }() + + return blockHeaderChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockHeader, <-chan error, error) { + request := &access.SubscribeBlockHeadersFromStartHeightRequest{ + StartBlockHeight: startHeight, + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartHeight(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blockHeaderChan := make(chan flow.BlockHeader) + errChan := make(chan error) + + go func() { + defer close(blockHeaderChan) + defer close(errChan) + receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) + }() + + return blockHeaderChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockHeadersFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockHeader, <-chan error, error) { + request := &access.SubscribeBlockHeadersFromLatestRequest{ + BlockStatus: convert.BlockStatusToEntity(blockStatus), + } + + subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromLatest(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blockHeaderChan := make(chan flow.BlockHeader) + errChan := make(chan error) + + go func() { + defer close(blockHeaderChan) + defer close(errChan) + receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) + }() + + return blockHeaderChan, errChan, nil +} + +func receiveBlockHeadersFromClient[Client interface { + Recv() (*access.SubscribeBlockHeadersResponse, error) +}]( + ctx context.Context, + client Client, + blockHeadersChan chan<- flow.BlockHeader, + errChan chan<- error, +) { + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + for { + // Receive the next blockHeader response + blockHeaderResponse, err := client.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + + sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + return + } + + blockHeader, err := convert.MessageToBlockHeader(blockHeaderResponse.GetHeader()) + if err != nil { + sendErr(fmt.Errorf("error converting message to block header: %w", err)) + return + } + + select { + case <-ctx.Done(): + return + case blockHeadersChan <- blockHeader: + } + } +} diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 58a7939f6..23e391ae3 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1787,3 +1787,178 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR return m.responses[m.offset], nil } + +func TestClient_SubscribeBlockHeaders(t *testing.T) { + blockHeaders := test.BlockHeaderGenerator() + + generateBlockHeaderResponses := func(count uint64) []*access.SubscribeBlockHeadersResponse { + var resBlockHeaders []*access.SubscribeBlockHeadersResponse + + for i := uint64(0); i < count; i++ { + header, err := convert.BlockHeaderToMessage(blockHeaders.New()) + require.NoError(t, err) + + resBlockHeaders = append(resBlockHeaders, &access.SubscribeBlockHeadersResponse{ + Header: header, + }) + } + + return resBlockHeaders + } + + t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + startHeight := uint64(1) + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + ctx: ctx, + responses: generateBlockHeaderResponses(responseCount), + } + + rpc. + On("SubscribeBlockHeadersFromStartHeight", ctx, mock.Anything). + Return(stream, nil) + + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualHeader := <-blockHeadersCh + expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader()) + require.NoError(t, err) + require.Equal(t, expectedHeader, actualHeader) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + ctx: ctx, + responses: generateBlockHeaderResponses(responseCount), + } + + rpc. + On("SubscribeBlockHeadersFromStartBlockID", ctx, mock.Anything). + Return(stream, nil) + + startBlockID := convert.MessageToIdentifier(stream.responses[0].GetHeader().Id) + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualHeader := <-blockHeadersCh + expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader()) + require.NoError(t, err) + require.Equal(t, expectedHeader, actualHeader) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + ctx: ctx, + responses: generateBlockHeaderResponses(responseCount), + } + + rpc. + On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualHeader := <-blockHeadersCh + expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader()) + require.NoError(t, err) + require.Equal(t, expectedHeader, actualHeader) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusUnknown) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoBlockHeaders(t, blockHeadersCh, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) +} + +type mockBlockHeaderClientStream[SubscribeBlockHeadersResponse any] struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*SubscribeBlockHeadersResponse +} + +func (s *mockBlockHeaderClientStream[SubscribeBlockHeadersResponse]) Recv() (*SubscribeBlockHeadersResponse, error) { + if s.err != nil { + return nil, s.err + } + + if s.offset >= len(s.responses) { + <-s.ctx.Done() + return nil, io.EOF + } + defer func() { s.offset++ }() + + return s.responses[s.offset], nil +} + +func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan BlockHeader, done func()) { + defer done() + for range blockHeadersChan { + require.FailNow(t, "should not receive block headers") + } +} From d61758c5774fcb5c40fe542de61f0d6941853301 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Thu, 26 Sep 2024 11:54:27 +0300 Subject: [PATCH 2/5] Update GetNodeVersionInfo response object --- access/grpc/grpc.go | 6 ++++++ access/grpc/grpc_test.go | 3 +++ flow.go | 6 ++++++ 3 files changed, 15 insertions(+) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 9c547b25b..f292e5a29 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -165,6 +165,11 @@ func (c *BaseClient) GetNodeVersionInfo(ctx context.Context, opts ...grpc.CallOp } info := res.GetInfo() + compRange := flow.CompatibleRange{ + StartHeight: info.CompatibleRange.GetStartHeight(), + EndHeight: info.CompatibleRange.GetEndHeight(), + } + return &flow.NodeVersionInfo{ Semver: info.Semver, Commit: info.Commit, @@ -172,6 +177,7 @@ func (c *BaseClient) GetNodeVersionInfo(ctx context.Context, opts ...grpc.CallOp ProtocolVersion: info.ProtocolVersion, SporkRootBlockHeight: info.SporkRootBlockHeight, NodeRootBlockHeight: info.NodeRootBlockHeight, + CompatibleRange: compRange, }, nil } diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 1e9a077ff..5e963218a 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -156,6 +156,7 @@ func TestClient_GetNodeInfo(t *testing.T) { ver := uint64(1) spork := uint64(2) root := uint64(3) + compRange := &entities.CompatibleRange{StartHeight: 1, EndHeight: 10} response := &access.GetNodeVersionInfoResponse{ Info: &entities.NodeVersionInfo{ @@ -165,6 +166,7 @@ func TestClient_GetNodeInfo(t *testing.T) { ProtocolVersion: ver, SporkRootBlockHeight: spork, NodeRootBlockHeight: root, + CompatibleRange: compRange, }, } @@ -175,6 +177,7 @@ func TestClient_GetNodeInfo(t *testing.T) { ProtocolVersion: ver, SporkRootBlockHeight: spork, NodeRootBlockHeight: root, + CompatibleRange: flow.CompatibleRange{StartHeight: compRange.StartHeight, EndHeight: compRange.EndHeight}, } rpc.On("GetNodeVersionInfo", ctx, mock.Anything).Return(response, nil) diff --git a/flow.go b/flow.go index 29963e893..a8bdbf899 100644 --- a/flow.go +++ b/flow.go @@ -134,6 +134,12 @@ type NodeVersionInfo struct { ProtocolVersion uint64 SporkRootBlockHeight uint64 NodeRootBlockHeight uint64 + CompatibleRange CompatibleRange +} + +type CompatibleRange struct { + StartHeight uint64 + EndHeight uint64 } // entityHasher is a thread-safe hasher used to hash Flow entities. From 691b549b39d5be59d2f5f8120a789773b17a8501 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Tue, 1 Oct 2024 17:34:09 +0300 Subject: [PATCH 3/5] do not accept unknown block status --- access/grpc/grpc.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index ff2c80970..0b084e8e4 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -23,9 +23,11 @@ package grpc import ( "context" + "errors" "fmt" "io" + "github.com/onflow/flow/protobuf/go/flow/entities" "google.golang.org/grpc" "github.com/onflow/cadence" @@ -1136,9 +1138,14 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockHeader, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockHeadersFromStartBlockIDRequest{ StartBlockId: startBlockID.Bytes(), - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartBlockID(ctx, request, opts...) @@ -1164,9 +1171,14 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockHeader, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockHeadersFromStartHeightRequest{ StartBlockHeight: startHeight, - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartHeight(ctx, request, opts...) @@ -1191,8 +1203,13 @@ func (c *BaseClient) SubscribeBlockHeadersFromLatest( blockStatus flow.BlockStatus, opts ...grpc.CallOption, ) (<-chan flow.BlockHeader, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + request := &access.SubscribeBlockHeadersFromLatestRequest{ - BlockStatus: convert.BlockStatusToEntity(blockStatus), + BlockStatus: status, } subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromLatest(ctx, request, opts...) From 6c72a9a19cea7bcdf0229671c50a5790d10c7516 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Tue, 1 Oct 2024 17:35:50 +0300 Subject: [PATCH 4/5] fix tests --- access/grpc/grpc_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index e4e3e450a..53332d5fe 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2182,7 +2182,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { On("SubscribeBlockHeadersFromStartHeight", ctx, mock.Anything). Return(stream, nil) - blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown) + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, flow.BlockStatusFinalized) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2214,7 +2214,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { Return(stream, nil) startBlockID := convert.MessageToIdentifier(stream.responses[0].GetHeader().Id) - blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown) + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, flow.BlockStatusFinalized) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2245,7 +2245,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything). Return(stream, nil) - blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusUnknown) + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusFinalized) require.NoError(t, err) wg := sync.WaitGroup{} @@ -2274,7 +2274,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything). Return(stream, nil) - blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusUnknown) + blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusFinalized) require.NoError(t, err) wg := sync.WaitGroup{} From 2953c1f65f6937496c9aea3855c32e5b68d4e175 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Wed, 9 Oct 2024 15:02:54 +0300 Subject: [PATCH 5/5] compatible range might be nil --- access/grpc/grpc.go | 9 ++++++--- access/grpc/grpc_test.go | 2 +- flow.go | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index ed465a29f..c47257595 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -166,10 +166,13 @@ func (c *BaseClient) GetNodeVersionInfo(ctx context.Context, opts ...grpc.CallOp return nil, newRPCError(err) } + var compRange *flow.CompatibleRange info := res.GetInfo() - compRange := flow.CompatibleRange{ - StartHeight: info.CompatibleRange.GetStartHeight(), - EndHeight: info.CompatibleRange.GetEndHeight(), + if info != nil { + compRange = &flow.CompatibleRange{ + StartHeight: info.CompatibleRange.GetStartHeight(), + EndHeight: info.CompatibleRange.GetEndHeight(), + } } return &flow.NodeVersionInfo{ diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index becce0fd8..788223faf 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -177,7 +177,7 @@ func TestClient_GetNodeInfo(t *testing.T) { ProtocolVersion: ver, SporkRootBlockHeight: spork, NodeRootBlockHeight: root, - CompatibleRange: flow.CompatibleRange{StartHeight: compRange.StartHeight, EndHeight: compRange.EndHeight}, + CompatibleRange: &flow.CompatibleRange{StartHeight: compRange.StartHeight, EndHeight: compRange.EndHeight}, } rpc.On("GetNodeVersionInfo", ctx, mock.Anything).Return(response, nil) diff --git a/flow.go b/flow.go index a8bdbf899..1623dbe8b 100644 --- a/flow.go +++ b/flow.go @@ -134,7 +134,7 @@ type NodeVersionInfo struct { ProtocolVersion uint64 SporkRootBlockHeight uint64 NodeRootBlockHeight uint64 - CompatibleRange CompatibleRange + CompatibleRange *CompatibleRange } type CompatibleRange struct {