Skip to content

Commit

Permalink
Merge pull request #758 from The-K-R-O-K/illia-malachyn/746-subscribe…
Browse files Browse the repository at this point in the history
…-blocks-endpoinds

Add subscribe blocks endpoints
  • Loading branch information
franklywatson authored Oct 1, 2024
2 parents 195b667 + ec522cd commit bd1d074
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 0 deletions.
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
Expand Down
11 changes: 11 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
return entities.BlockStatus_BLOCK_FINALIZED
case flow.BlockStatusSealed:
return entities.BlockStatus_BLOCK_SEALED
default:
return entities.BlockStatus_BLOCK_UNKNOWN
}
}

func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) {
switch encodingVersion {
case flow.EventEncodingVersionCCF:
Expand Down
141 changes: 141 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ package grpc

import (
"context"
"errors"
"fmt"
"io"

"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -1155,3 +1157,142 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlocksFromLatestRequest{
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlocksFromClient[Client interface {
Recv() (*access.SubscribeBlocksResponse, error)
}](
ctx context.Context,
client Client,
blocksChan chan<- flow.Block,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next block response
blockResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving block: %w", err))
return
}

block, err := convert.MessageToBlock(blockResponse.GetBlock())
if err != nil {
sendErr(fmt.Errorf("error converting message to block: %w", err))
return
}

select {
case <-ctx.Done():
return
case blocksChan <- block:
}
}
}
174 changes: 174 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,3 +2207,177 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR

return m.responses[m.offset], nil
}

func TestClient_SubscribeBlocks(t *testing.T) {
blocks := test.BlockGenerator()

generateBlockResponses := func(count uint64) []*access.SubscribeBlocksResponse {
var resBlocks []*access.SubscribeBlocksResponse

for i := uint64(0); i < count; i++ {
b, err := convert.BlockToMessage(*blocks.New())
require.NoError(t, err)

resBlocks = append(resBlocks, &access.SubscribeBlocksResponse{
Block: b,
})
}

return resBlocks
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromStartHeight(ctx, startHeight, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].Block.Id)
blockCh, errCh, err := c.SubscribeBlocksFromStartBlockID(ctx, startBlockID, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlocks(t, blockCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockClientStream[SubscribeBlocksResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlocksResponse
}

func (s *mockBlockClientStream[SubscribeBlocksResponse]) Recv() (*SubscribeBlocksResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) {
defer done()
for range blocksCh {
require.FailNow(t, "should not receive blocks")
}
}

0 comments on commit bd1d074

Please sign in to comment.