Skip to content

Commit

Permalink
Merge branch 'master' into illia-malachyn/747-subscribe-account-statu…
Browse files Browse the repository at this point in the history
…tes-endpoint
  • Loading branch information
illia-malachyn committed Oct 15, 2024
2 parents b2a3948 + 853e844 commit bbc5240
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 0 deletions.
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,29 @@ func (c *Client) SubscribeBlocksFromLatest(
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockHeadersFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksHeadersFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockHeader, <-chan error, error) {
return c.grpc.SubscribeBlockHeadersFromLatest(ctx, blockStatus)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
Expand Down
148 changes: 148 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,23 @@ func (c *BaseClient) GetNodeVersionInfo(ctx context.Context, opts ...grpc.CallOp
return nil, newRPCError(err)
}

var compRange *flow.CompatibleRange
info := res.GetInfo()
if info != nil {
compRange = &flow.CompatibleRange{
StartHeight: info.CompatibleRange.GetStartHeight(),
EndHeight: info.CompatibleRange.GetEndHeight(),
}
}

return &flow.NodeVersionInfo{
Semver: info.Semver,
Commit: info.Commit,
SporkId: flow.BytesToID(info.SporkId),
ProtocolVersion: info.ProtocolVersion,
SporkRootBlockHeight: info.SporkRootBlockHeight,
NodeRootBlockHeight: info.NodeRootBlockHeight,
CompatibleRange: compRange,
}, nil
}

Expand Down Expand Up @@ -1369,6 +1378,145 @@ func receiveBlocksFromClient[Client interface {
}
}

func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockHeadersFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockHeadersFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromLatestRequest{
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func receiveBlockHeadersFromClient[Client interface {
Recv() (*access.SubscribeBlockHeadersResponse, error)
}](
ctx context.Context,
client Client,
blockHeadersChan chan<- flow.BlockHeader,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next blockHeader response
blockHeaderResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving blockHeader: %w", err))
return
}

blockHeader, err := convert.MessageToBlockHeader(blockHeaderResponse.GetHeader())
if err != nil {
sendErr(fmt.Errorf("error converting message to block header: %w", err))
return
}

select {
case <-ctx.Done():
return
case blockHeadersChan <- blockHeader:
}
}
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
Expand Down
Loading

0 comments on commit bbc5240

Please sign in to comment.