Skip to content

Commit

Permalink
Merge pull request #762 from The-K-R-O-K/illia-malachyn/747-subscribe…
Browse files Browse the repository at this point in the history
…-account-statutes-endpoint

Add subscribe account statuses endpoint
  • Loading branch information
peterargue authored Oct 16, 2024
2 parents 8b37d47 + f394c03 commit 7eb6328
Show file tree
Hide file tree
Showing 5 changed files with 486 additions and 0 deletions.
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
}
return subsConf
}

func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
38 changes: 38 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/onflow/flow/protobuf/go/flow/executiondata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) {
}, nil
}

func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) {
if m == nil {
return flow.AccountStatus{}, ErrEmptyMessage
}

results, err := MessageToAccountStatusResults(m.GetResults())
if err != nil {
return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err)
}

return flow.AccountStatus{
BlockID: MessageToIdentifier(m.GetBlockId()),
BlockHeight: m.GetBlockHeight(),
MessageIndex: m.GetMessageIndex(),
Results: results,
}, nil
}

func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) {
results := make([]*flow.AccountStatusResult, len(m))
var emptyOptions []jsoncdc.Option

for i, r := range m {
events, err := MessagesToEvents(r.GetEvents(), emptyOptions)
if err != nil {
return nil, fmt.Errorf("error converting events: %w", err)
}

results[i] = &flow.AccountStatusResult{
Address: flow.BytesToAddress(r.GetAddress()),
Events: events,
}
}

return results, nil
}

func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey {
return &entities.AccountKey{
Index: uint32(a.Index),
Expand Down
142 changes: 142 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,148 @@ func receiveBlockHeadersFromClient[Client interface {
}
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func receiveAccountStatusesFromStream[Stream interface {
Recv() (*executiondata.SubscribeAccountStatusesResponse, error)
}](
ctx context.Context,
stream Stream,
accountStatutesChan chan<- flow.AccountStatus,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

var nextExpectedMsgIndex uint64
for {
accountStatusResponse, err := stream.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving account status: %w", err))
return
}

accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse)
if err != nil {
sendErr(fmt.Errorf("error converting message to account status: %w", err))
return
}

if accountStatus.MessageIndex != nextExpectedMsgIndex {
sendErr(fmt.Errorf("message received out of order"))
return
}
nextExpectedMsgIndex = accountStatus.MessageIndex + 1

select {
case <-ctx.Done():
return
case accountStatutesChan <- accountStatus:
}
}
}

func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
Expand Down
Loading

0 comments on commit 7eb6328

Please sign in to comment.