diff --git a/liteapi/client.go b/liteapi/client.go index f6cc92e4..5f704bfa 100644 --- a/liteapi/client.go +++ b/liteapi/client.go @@ -24,7 +24,7 @@ import ( const ( LiteServerEnvName = "LITE_SERVERS" - defaultMaxConnectionsNumber = 3 + defaultMaxConnectionsNumber = 1 ) var ( @@ -32,11 +32,26 @@ var ( ErrAccountNotFound = errors.New("account not found") ) +// Client provides a convenient way to interact with TON blockchain. +// +// By default, it uses a single connection to a lite server. +// But internally, it makes use of a failover pool, +// so it is possible to force it to use multiple connections. Take a look at WithMaxConnectionsNumber() option. +// +// When the client is configured with several connections, +// two different lite servers can be used for two consequent requests. +// Because a blockchain is inherently a distributed system, +// this could lead to some inconsistencies. +// For example, +// 1. you obtain a master head with GetMasterchainInfo, +// 2. you get an account state with GetAccountState, +// the account state can be obtained from a block that is earlier in the blockchain than the master head you obtained at step 1. +// To avoid this, you can use WithBlock() method to specify a target block for all requests. type Client struct { - pool *pool.FailoverPool - targetBlockID *tongo.BlockIDExt - masterchainLastBlockCache *liteclient.TonNodeBlockIdExtC - masterchainLastBlockUpdateTime time.Time + pool *pool.FailoverPool + + mu sync.RWMutex + targetBlockID *tongo.BlockIDExt } // Options holds parameters to configure a lite api instance. @@ -167,69 +182,48 @@ func NewClient(opts ...Option) (*Client, error) { pool: pool.NewFailoverPool(liteclients), } go client.pool.Run(context.TODO()) - go client.refreshMasterchainTask() return &client, nil } -func (c *Client) targetBlock(ctx context.Context) (tongo.BlockIDExt, error) { - if c.targetBlockID != nil { - return *c.targetBlockID, nil - } - if time.Since(c.masterchainLastBlockUpdateTime) < 20*time.Second && c.masterchainLastBlockCache != nil { - return c.masterchainLastBlockCache.ToBlockIdExt(), nil - } - r, err := c.pool.BestMasterchainServer().LiteServerGetMasterchainInfo(context.TODO()) - if err != nil { - return tongo.BlockIDExt{}, err - } - return r.Last.ToBlockIdExt(), nil -} - -func (c *Client) getlastBlock(ctx context.Context) (liteclient.TonNodeBlockIdExtC, error) { - info, err := c.pool.BestMasterchainServer().LiteServerGetMasterchainInfo(ctx) - if err != nil { - return liteclient.TonNodeBlockIdExtC{}, err - } - return info.Last, nil -} - -func (c *Client) refreshMasterchainTask() { - for { - time.Sleep(time.Second) //todo: switch to wait function - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - block, err := c.getlastBlock(ctx) - cancel() - if err != nil { - continue - } - c.masterchainLastBlockCache = &block +func (c *Client) targetBlockOr(blockID tongo.BlockIDExt) tongo.BlockIDExt { + c.mu.RLock() + defer c.mu.RUnlock() + if c.targetBlockID == nil { + return blockID } + return *c.targetBlockID } func (c *Client) WithBlock(block tongo.BlockIDExt) *Client { return &Client{ - pool: c.pool, - targetBlockID: &block, - masterchainLastBlockCache: c.masterchainLastBlockCache, - masterchainLastBlockUpdateTime: c.masterchainLastBlockUpdateTime, + pool: c.pool, + targetBlockID: &block, } } func (c *Client) GetMasterchainInfo(ctx context.Context) (liteclient.LiteServerMasterchainInfoC, error) { - return c.pool.BestMasterchainServer().LiteServerGetMasterchainInfo(ctx) + return c.pool.BestMasterchainInfoClient().LiteServerGetMasterchainInfo(ctx) } func (c *Client) GetMasterchainInfoExt(ctx context.Context, mode uint32) (liteclient.LiteServerMasterchainInfoExtC, error) { - return c.pool.BestMasterchainServer().LiteServerGetMasterchainInfoExt(ctx, liteclient.LiteServerGetMasterchainInfoExtRequest{Mode: mode}) + return c.pool.BestMasterchainInfoClient().LiteServerGetMasterchainInfoExt(ctx, liteclient.LiteServerGetMasterchainInfoExtRequest{Mode: mode}) } func (c *Client) GetTime(ctx context.Context) (uint32, error) { - res, err := c.pool.BestMasterchainServer().LiteServerGetTime(ctx) + client, _, err := c.pool.BestMasterchainClient(ctx) + if err != nil { + return 0, err + } + res, err := client.LiteServerGetTime(ctx) return res.Now, err } func (c *Client) GetVersion(ctx context.Context) (liteclient.LiteServerVersionC, error) { - return c.pool.BestMasterchainServer().LiteServerGetVersion(ctx) + client, _, err := c.pool.BestMasterchainClient(ctx) + if err != nil { + return liteclient.LiteServerVersionC{}, err + } + return client.LiteServerGetVersion(ctx) } func (c *Client) GetBlock(ctx context.Context, blockID tongo.BlockIDExt) (tlb.Block, error) { @@ -253,12 +247,11 @@ func (c *Client) GetBlock(ctx context.Context, blockID tongo.BlockIDExt) (tlb.Bl } func (c *Client) GetBlockRaw(ctx context.Context, blockID tongo.BlockIDExt) (liteclient.LiteServerBlockDataC, error) { - server, err := c.pool.BestServerByBlockID(blockID.BlockID) + client, err := c.pool.BestClientByBlockID(ctx, blockID.BlockID) if err != nil { return liteclient.LiteServerBlockDataC{}, err } - - res, err := server.LiteServerGetBlock(ctx, liteclient.LiteServerGetBlockRequest{liteclient.BlockIDExt(blockID)}) + res, err := client.LiteServerGetBlock(ctx, liteclient.LiteServerGetBlockRequest{liteclient.BlockIDExt(blockID)}) if err != nil { return liteclient.LiteServerBlockDataC{}, err } @@ -275,11 +268,11 @@ func (c *Client) GetState(ctx context.Context, blockID tongo.BlockIDExt) ([]byte } func (c *Client) GetStateRaw(ctx context.Context, blockID tongo.BlockIDExt) (liteclient.LiteServerBlockStateC, error) { - server, err := c.pool.BestServerByBlockID(blockID.BlockID) + client, err := c.pool.BestClientByBlockID(ctx, blockID.BlockID) if err != nil { return liteclient.LiteServerBlockStateC{}, err } - res, err := server.LiteServerGetState(ctx, liteclient.LiteServerGetStateRequest{Id: liteclient.BlockIDExt(blockID)}) + res, err := client.LiteServerGetState(ctx, liteclient.LiteServerGetStateRequest{Id: liteclient.BlockIDExt(blockID)}) if err != nil { return liteclient.LiteServerBlockStateC{}, err } @@ -296,11 +289,11 @@ func (c *Client) GetBlockHeader(ctx context.Context, blockID tongo.BlockIDExt, m } func (c *Client) GetBlockHeaderRaw(ctx context.Context, blockID tongo.BlockIDExt, mode uint32) (liteclient.LiteServerBlockHeaderC, error) { - server, err := c.pool.BestServerByBlockID(blockID.BlockID) + client, err := c.pool.BestClientByBlockID(ctx, blockID.BlockID) if err != nil { return liteclient.LiteServerBlockHeaderC{}, err } - res, err := server.LiteServerGetBlockHeader(ctx, liteclient.LiteServerGetBlockHeaderRequest{ + res, err := client.LiteServerGetBlockHeader(ctx, liteclient.LiteServerGetBlockHeaderRequest{ Id: liteclient.BlockIDExt(blockID), Mode: mode, }) @@ -311,11 +304,11 @@ func (c *Client) GetBlockHeaderRaw(ctx context.Context, blockID tongo.BlockIDExt } func (c *Client) LookupBlock(ctx context.Context, blockID tongo.BlockID, mode uint32, lt *uint64, utime *uint32) (tongo.BlockIDExt, tlb.BlockInfo, error) { - server, err := c.pool.BestServerByBlockID(blockID) + client, err := c.pool.BestClientByBlockID(ctx, blockID) if err != nil { return tongo.BlockIDExt{}, tlb.BlockInfo{}, err } - res, err := server.LiteServerLookupBlock(ctx, liteclient.LiteServerLookupBlockRequest{ + res, err := client.LiteServerLookupBlock(ctx, liteclient.LiteServerLookupBlockRequest{ Mode: mode, Id: liteclient.TonNodeBlockIdC{ Workchain: uint32(blockID.Workchain), @@ -354,7 +347,11 @@ func (c *Client) SendMessage(ctx context.Context, payload []byte) (uint32, error if err := VerifySendMessagePayload(payload); err != nil { return 0, err } - res, err := c.pool.BestMasterchainServer().LiteServerSendMessage(ctx, liteclient.LiteServerSendMessageRequest{Body: payload}) + client, _, err := c.pool.BestMasterchainClient(ctx) + if err != nil { + return 0, err + } + res, err := client.LiteServerSendMessage(ctx, liteclient.LiteServerSendMessageRequest{Body: payload}) return res.Status, err } @@ -368,22 +365,18 @@ func (c *Client) RunSmcMethodByID(ctx context.Context, accountID tongo.AccountID if err != nil { return 0, tlb.VmStack{}, err } - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID) if err != nil { return 0, tlb.VmStack{}, err } req := liteclient.LiteServerRunSmcMethodRequest{ Mode: 4, - Id: liteclient.BlockIDExt(id), + Id: liteclient.BlockIDExt(c.targetBlockOr(masterHead)), Account: liteclient.AccountID(accountID), MethodId: uint64(methodID), Params: b, } - server, err := c.pool.BestServerByAccountID(accountID) - if err != nil { - return 0, tlb.VmStack{}, err - } - res, err := server.LiteServerRunSmcMethod(ctx, req) + res, err := client.LiteServerRunSmcMethod(ctx, req) if err != nil { return 0, tlb.VmStack{}, err } @@ -436,17 +429,14 @@ func (c *Client) GetAccountState(ctx context.Context, accountID tongo.AccountID) } func (c *Client) GetAccountStateRaw(ctx context.Context, accountID tongo.AccountID) (liteclient.LiteServerAccountStateC, error) { - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID) if err != nil { return liteclient.LiteServerAccountStateC{}, err } - server, err := c.pool.BestServerByAccountID(accountID) - if err != nil { - return liteclient.LiteServerAccountStateC{}, err - } - res, err := server.LiteServerGetAccountState(ctx, liteclient.LiteServerGetAccountStateRequest{ + blockID := c.targetBlockOr(masterHead) + res, err := client.LiteServerGetAccountState(ctx, liteclient.LiteServerGetAccountStateRequest{ Account: liteclient.AccountID(accountID), - Id: liteclient.BlockIDExt(id), + Id: liteclient.BlockIDExt(blockID), }) if err != nil { return liteclient.LiteServerAccountStateC{}, err @@ -494,7 +484,11 @@ func (c *Client) GetShardInfo( } func (c *Client) GetShardInfoRaw(ctx context.Context, blockID tongo.BlockIDExt, workchain uint32, shard uint64, exact bool) (liteclient.LiteServerShardInfoC, error) { - res, err := c.pool.BestMasterchainServer().LiteServerGetShardInfo(ctx, liteclient.LiteServerGetShardInfoRequest{ + client, _, err := c.pool.BestMasterchainClient(ctx) + if err != nil { + return liteclient.LiteServerShardInfoC{}, err + } + res, err := client.LiteServerGetShardInfo(ctx, liteclient.LiteServerGetShardInfoRequest{ Id: liteclient.BlockIDExt(blockID), Workchain: workchain, Shard: shard, @@ -534,7 +528,11 @@ func (c *Client) GetAllShardsInfo(ctx context.Context, blockID tongo.BlockIDExt) } func (c *Client) GetAllShardsInfoRaw(ctx context.Context, blockID tongo.BlockIDExt) (liteclient.LiteServerAllShardsInfoC, error) { - res, err := c.pool.BestMasterchainServer().LiteServerGetAllShardsInfo(ctx, liteclient.LiteServerGetAllShardsInfoRequest{ + client, _, err := c.pool.BestMasterchainClient(ctx) + if err != nil { + return liteclient.LiteServerAllShardsInfoC{}, err + } + res, err := client.LiteServerGetAllShardsInfo(ctx, liteclient.LiteServerGetAllShardsInfoRequest{ Id: liteclient.BlockIDExt(blockID)}) if err != nil { return liteclient.LiteServerAllShardsInfoC{}, err @@ -548,11 +546,11 @@ func (c *Client) GetOneTransactionFromBlock( blockId tongo.BlockIDExt, lt uint64, ) (tongo.Transaction, error) { - server, err := c.pool.BestServerByAccountID(accountID) + client, _, err := c.pool.BestClientByAccountID(ctx, accountID) if err != nil { return tongo.Transaction{}, err } - r, err := server.LiteServerGetOneTransaction(ctx, liteclient.LiteServerGetOneTransactionRequest{ + r, err := client.LiteServerGetOneTransaction(ctx, liteclient.LiteServerGetOneTransactionRequest{ Id: liteclient.BlockIDExt(blockId), Account: liteclient.AccountID(accountID), Lt: lt, @@ -610,11 +608,11 @@ func (c *Client) GetTransactions( } func (c *Client) GetTransactionsRaw(ctx context.Context, count uint32, accountID tongo.AccountID, lt uint64, hash tongo.Bits256) (liteclient.LiteServerTransactionListC, error) { - server, err := c.pool.BestServerByAccountID(accountID) + client, _, err := c.pool.BestClientByAccountID(ctx, accountID) if err != nil { return liteclient.LiteServerTransactionListC{}, err } - res, err := server.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{ + res, err := client.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{ Count: count, Account: liteclient.AccountID(accountID), Lt: lt, @@ -673,11 +671,11 @@ func (c *Client) ListBlockTransactions( } func (c *Client) ListBlockTransactionsRaw(ctx context.Context, blockID tongo.BlockIDExt, mode, count uint32, after *liteclient.LiteServerTransactionId3C) (liteclient.LiteServerBlockTransactionsC, error) { - server, err := c.pool.BestServerByBlockID(blockID.BlockID) + client, err := c.pool.BestClientByBlockID(ctx, blockID.BlockID) if err != nil { return liteclient.LiteServerBlockTransactionsC{}, err } - res, err := server.LiteServerListBlockTransactions(ctx, liteclient.LiteServerListBlockTransactionsRequest{ + res, err := client.LiteServerListBlockTransactions(ctx, liteclient.LiteServerListBlockTransactionsRequest{ Id: liteclient.BlockIDExt(blockID), Mode: mode, Count: count, @@ -705,14 +703,14 @@ func (c *Client) GetBlockProof( func (c *Client) GetBlockProofRaw(ctx context.Context, knownBlock tongo.BlockIDExt, targetBlock *tongo.BlockIDExt) (liteclient.LiteServerPartialBlockProofC, error) { var ( err error - server *liteclient.Client + client *liteclient.Client mode uint32 = 0 ) if targetBlock != nil { - server, err = c.pool.BestServerByBlockID(targetBlock.BlockID) + client, err = c.pool.BestClientByBlockID(ctx, targetBlock.BlockID) mode = 1 } else { - server, err = c.pool.BestServerByBlockID(knownBlock.BlockID) + client, err = c.pool.BestClientByBlockID(ctx, knownBlock.BlockID) } if err != nil { return liteclient.LiteServerPartialBlockProofC{}, err @@ -722,7 +720,7 @@ func (c *Client) GetBlockProofRaw(ctx context.Context, knownBlock tongo.BlockIDE b := liteclient.BlockIDExt(*targetBlock) tb = &b } - res, err := server.LiteServerGetBlockProof(ctx, liteclient.LiteServerGetBlockProofRequest{ + res, err := client.LiteServerGetBlockProof(ctx, liteclient.LiteServerGetBlockProofRequest{ Mode: mode, KnownBlock: liteclient.BlockIDExt(knownBlock), TargetBlock: tb, @@ -743,13 +741,13 @@ func (c *Client) GetConfigAll(ctx context.Context, mode ConfigMode) (tlb.ConfigP } func (c *Client) GetConfigAllRaw(ctx context.Context, mode ConfigMode) (liteclient.LiteServerConfigInfoC, error) { - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestMasterchainClient(ctx) if err != nil { return liteclient.LiteServerConfigInfoC{}, err } - res, err := c.pool.BestMasterchainServer().LiteServerGetConfigAll(ctx, liteclient.LiteServerGetConfigAllRequest{ + res, err := client.LiteServerGetConfigAll(ctx, liteclient.LiteServerGetConfigAllRequest{ Mode: uint32(mode), - Id: liteclient.BlockIDExt(id), + Id: liteclient.BlockIDExt(c.targetBlockOr(masterHead)), }) if err != nil { return liteclient.LiteServerConfigInfoC{}, err @@ -758,13 +756,13 @@ func (c *Client) GetConfigAllRaw(ctx context.Context, mode ConfigMode) (liteclie } func (c *Client) GetConfigParams(ctx context.Context, mode ConfigMode, paramList []uint32) (tlb.ConfigParams, error) { - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestMasterchainClient(ctx) if err != nil { return tlb.ConfigParams{}, err } - r, err := c.pool.BestMasterchainServer().LiteServerGetConfigParams(ctx, liteclient.LiteServerGetConfigParamsRequest{ + r, err := client.LiteServerGetConfigParams(ctx, liteclient.LiteServerGetConfigParamsRequest{ Mode: uint32(mode), - Id: liteclient.BlockIDExt(id), + Id: liteclient.BlockIDExt(c.targetBlockOr(masterHead)), ParamList: paramList, }) if err != nil { @@ -800,7 +798,7 @@ func (c *Client) GetValidatorStats( startAfter *tongo.Bits256, modifiedAfter *uint32, ) (*tlb.McStateExtra, error) { - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestMasterchainClient(ctx) if err != nil { return nil, err } @@ -809,9 +807,9 @@ func (c *Client) GetValidatorStats( b := tl.Int256(*startAfter) sa = &b } - r, err := c.pool.BestMasterchainServer().LiteServerGetValidatorStats(ctx, liteclient.LiteServerGetValidatorStatsRequest{ + r, err := client.LiteServerGetValidatorStats(ctx, liteclient.LiteServerGetValidatorStatsRequest{ Mode: mode, - Id: liteclient.BlockIDExt(id), + Id: liteclient.BlockIDExt(c.targetBlockOr(masterHead)), Limit: limit, StartAfter: sa, ModifiedAfter: modifiedAfter, @@ -839,11 +837,7 @@ func (c *Client) GetValidatorStats( } func (c *Client) GetLibraries(ctx context.Context, libraryList []tongo.Bits256) (map[tongo.Bits256]*boc.Cell, error) { - id, err := c.targetBlock(ctx) - if err != nil { - return nil, err - } - server, err := c.pool.BestServerByBlockID(id.BlockID) + client, _, err := c.pool.BestMasterchainClient(ctx) if err != nil { return nil, err } @@ -851,7 +845,7 @@ func (c *Client) GetLibraries(ctx context.Context, libraryList []tongo.Bits256) for _, l := range libraryList { ll = append(ll, tl.Int256(l)) } - r, err := server.LiteServerGetLibraries(ctx, liteclient.LiteServerGetLibrariesRequest{ + r, err := client.LiteServerGetLibraries(ctx, liteclient.LiteServerGetLibrariesRequest{ LibraryList: ll, }) if err != nil { @@ -880,19 +874,22 @@ func (c *Client) GetShardBlockProof(ctx context.Context) (liteclient.LiteServerS } func (c *Client) GetShardBlockProofRaw(ctx context.Context) (liteclient.LiteServerShardBlockProofC, error) { - id, err := c.targetBlock(ctx) + client, masterHead, err := c.pool.BestMasterchainClient(ctx) if err != nil { return liteclient.LiteServerShardBlockProofC{}, err } - server, err := c.pool.BestServerByBlockID(id.BlockID) - if err != nil { - return liteclient.LiteServerShardBlockProofC{}, err - } - return server.LiteServerGetShardBlockProof(ctx, liteclient.LiteServerGetShardBlockProofRequest{ - Id: liteclient.BlockIDExt(id), + return client.LiteServerGetShardBlockProof(ctx, liteclient.LiteServerGetShardBlockProofRequest{ + Id: liteclient.BlockIDExt(c.targetBlockOr(masterHead)), }) } +// WaitMasterchainSeqno waits for a masterchain block with the given seqno. +// If any connection in the pool becomes aware of this seqno, the function returns. +// If the timeout is reached, the function returns an error. +func (c *Client) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeout time.Duration) error { + return c.pool.WaitMasterchainSeqno(ctx, seqno, timeout) +} + var configCache = make(map[string]*config.GlobalConfigurationFile) var configCacheMutex sync.RWMutex diff --git a/liteapi/client_test.go b/liteapi/client_test.go index 65afe2b8..9d5de5ce 100644 --- a/liteapi/client_test.go +++ b/liteapi/client_test.go @@ -189,17 +189,13 @@ func TestLookupBlock(t *testing.T) { } func TestGetOneTransaction(t *testing.T) { - tongoClient, err := NewClientWithDefaultMainnet() if err != nil { log.Fatalf("Unable to create tongo client: %v", err) } ctx := context.Background() - lastBlockID, err := tongoClient.getlastBlock(ctx) - if err != nil { - t.Fatal(err) - } - shards, err := tongoClient.GetAllShardsInfo(ctx, lastBlockID.ToBlockIdExt()) + info, err := tongoClient.GetMasterchainInfo(ctx) + shards, err := tongoClient.GetAllShardsInfo(ctx, info.Last.ToBlockIdExt()) if err != nil { t.Fatal(err) } diff --git a/liteapi/pool/connection.go b/liteapi/pool/connection.go index f212e915..2c8d0482 100644 --- a/liteapi/pool/connection.go +++ b/liteapi/pool/connection.go @@ -5,18 +5,30 @@ import ( "sync" "time" + "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/liteclient" ) type connection struct { - client *liteclient.Client - mu sync.RWMutex - masterSeqno uint32 + id int + client *liteclient.Client + + // masterHeadUpdatedCh is used to send a notification when a known master head is changed. + masterHeadUpdatedCh chan masterHeadUpdated + + mu sync.RWMutex + // masterHead is the latest known masterchain head. + masterHead tongo.BlockIDExt +} + +type masterHeadUpdated struct { + Head tongo.BlockIDExt + Conn *connection } func (c *connection) Run(ctx context.Context) { for { - var seqno uint32 + var head tongo.BlockIDExt for { res, err := c.client.LiteServerGetMasterchainInfo(ctx) if err != nil { @@ -24,12 +36,24 @@ func (c *connection) Run(ctx context.Context) { time.Sleep(1000 * time.Millisecond) continue } - seqno = res.Last.Seqno + head = res.Last.ToBlockIdExt() break } - c.setMasterSeqno(seqno) + c.SetMasterHead(head) for { - if err := c.client.WaitMasterchainSeqno(ctx, seqno+1, 15_000); err != nil { + if err := c.client.WaitMasterchainSeqno(ctx, head.Seqno+1, 15_000); err != nil { + // TODO: log error + time.Sleep(1000 * time.Millisecond) + // we want to request seqno again with LiteServerGetMasterchainInfo + // to avoid situation when this server has been offline for too long, + // and it doesn't contain a block with the latest known seqno anymore. + break + } + if ctx.Err() != nil { + return + } + res, err := c.client.LiteServerGetMasterchainInfo(ctx) + if err != nil { // TODO: log error time.Sleep(1000 * time.Millisecond) // we want to request seqno again with LiteServerGetMasterchainInfo @@ -37,11 +61,11 @@ func (c *connection) Run(ctx context.Context) { // and it doesn't contain a block with the latest known seqno anymore. break } - seqno += 1 if ctx.Err() != nil { return } - c.setMasterSeqno(seqno) + head = res.Last.ToBlockIdExt() + c.SetMasterHead(head) } } } @@ -51,18 +75,28 @@ func (c *connection) IsOK() bool { return c.client.IsOK() } +func (c *connection) ID() int { + return c.id +} + func (c *connection) Client() *liteclient.Client { return c.client } -func (c *connection) MasterSeqno() uint32 { +func (c *connection) MasterHead() tongo.BlockIDExt { c.mu.RLock() defer c.mu.RUnlock() - return c.masterSeqno + return c.masterHead } -func (c *connection) setMasterSeqno(seqno uint32) { +func (c *connection) SetMasterHead(head tongo.BlockIDExt) { c.mu.Lock() defer c.mu.Unlock() - c.masterSeqno = seqno + if head.Seqno > c.masterHead.Seqno { + c.masterHead = head + c.masterHeadUpdatedCh <- masterHeadUpdated{ + Head: head, + Conn: c, + } + } } diff --git a/liteapi/pool/connection_test.go b/liteapi/pool/connection_test.go index 2efc09b8..1ef71bc8 100644 --- a/liteapi/pool/connection_test.go +++ b/liteapi/pool/connection_test.go @@ -19,7 +19,8 @@ func Test_connection_Run(t *testing.T) { t.Fatalf("NewConnection() failed: %v", err) } conn := &connection{ - client: liteclient.NewClient(c), + client: liteclient.NewClient(c), + masterHeadUpdatedCh: make(chan masterHeadUpdated, 100), } go conn.Run(context.Background()) @@ -28,16 +29,16 @@ func Test_connection_Run(t *testing.T) { if err != nil { t.Fatalf("LiteServerGetMasterchainInfo() failed: %v", err) } - if res.Last.Seqno != conn.MasterSeqno() { - t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, conn.MasterSeqno()) + if res.Last.Seqno != conn.MasterHead().Seqno { + t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, conn.MasterHead()) } if err := conn.Client().WaitMasterchainSeqno(context.Background(), res.Last.Seqno+1, 15_000); err != nil { t.Fatalf("WaitMasterchainSeqno() failed: %v", err) } // give a few milliseconds to the connection's goroutine - time.Sleep(150 * time.Millisecond) + time.Sleep(1 * time.Second) - if res.Last.Seqno+1 != conn.MasterSeqno() { - t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, conn.MasterSeqno()) + if res.Last.Seqno+1 != conn.MasterHead().Seqno { + t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, conn.MasterHead()) } } diff --git a/liteapi/pool/failover_pool.go b/liteapi/pool/failover_pool.go index 5504e33b..7f07cba6 100644 --- a/liteapi/pool/failover_pool.go +++ b/liteapi/pool/failover_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -26,14 +27,20 @@ type FailoverPool struct { conns []conn updateBestInterval time.Duration - mu sync.RWMutex - bestConn conn + masterHeadUpdatedCh chan masterHeadUpdated + + mu sync.RWMutex + bestConn conn + waitListID uint64 + waitList map[uint64]chan tongo.BlockIDExt } // conn contains all methods needed by a pool. // used to implement tests. type conn interface { - MasterSeqno() uint32 + ID() int + MasterHead() tongo.BlockIDExt + SetMasterHead(tongo.BlockIDExt) IsOK() bool Client() *liteclient.Client Run(ctx context.Context) @@ -42,14 +49,25 @@ type conn interface { // NewFailoverPool returns a new instance of a failover pool. // The given list of clients is ordered by priority and starts with a connection with the highest priority. func NewFailoverPool(clients []*liteclient.Client) *FailoverPool { + if len(clients) == 0 { + panic("empty list of clients") + } conns := make([]conn, 0, len(clients)) - for _, cli := range clients { - conns = append(conns, &connection{client: cli}) + masterHeadUpdatedCh := make(chan masterHeadUpdated, 10) + + for connID, cli := range clients { + conns = append(conns, &connection{ + id: connID, + client: cli, + masterHeadUpdatedCh: masterHeadUpdatedCh, + }) } return &FailoverPool{ - conns: conns, - updateBestInterval: updateBestConnectionInterval, - bestConn: conns[0], + conns: conns, + updateBestInterval: updateBestConnectionInterval, + bestConn: conns[0], + waitList: map[uint64]chan tongo.BlockIDExt{}, + masterHeadUpdatedCh: masterHeadUpdatedCh, } } @@ -57,12 +75,17 @@ func (p *FailoverPool) Run(ctx context.Context) { for _, c := range p.conns { go c.Run(ctx) } + tickTock := time.NewTicker(p.updateBestInterval) + defer tickTock.Stop() + for { select { case <-ctx.Done(): return - case <-time.After(p.updateBestInterval): + case <-tickTock.C: p.updateBest() + case update := <-p.masterHeadUpdatedCh: + p.notifySubscribers(update) } } } @@ -71,7 +94,7 @@ func (p *FailoverPool) Run(ctx context.Context) { func (p *FailoverPool) updateBest() { var maxSeqno uint32 for _, c := range p.conns { - masterSeqno := c.MasterSeqno() + masterSeqno := c.MasterHead().Seqno if maxSeqno < masterSeqno { maxSeqno = masterSeqno } @@ -81,7 +104,7 @@ func (p *FailoverPool) updateBest() { if !c.IsOK() { continue } - if c.MasterSeqno()+1 >= maxSeqno { + if c.MasterHead().Seqno+1 >= maxSeqno { p.setBestConnection(c) return } @@ -95,24 +118,123 @@ func (p *FailoverPool) setBestConnection(conn conn) { } func (p *FailoverPool) bestConnection() conn { - p.mu.Lock() - defer p.mu.Unlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.bestConn } -func (p *FailoverPool) BestMasterchainServer() *liteclient.Client { - return p.bestConnection().Client() +type MasterchainInfoClient struct { + conn conn } -func (p *FailoverPool) BestServerByAccountID(tongo.AccountID) (*liteclient.Client, error) { - return p.BestMasterchainServer(), nil +func (s *MasterchainInfoClient) LiteServerGetMasterchainInfoExt(ctx context.Context, request liteclient.LiteServerGetMasterchainInfoExtRequest) (res liteclient.LiteServerMasterchainInfoExtC, err error) { + info, err := s.conn.Client().LiteServerGetMasterchainInfoExt(ctx, request) + if err != nil { + return liteclient.LiteServerMasterchainInfoExtC{}, err + } + s.conn.SetMasterHead(info.Last.ToBlockIdExt()) + return info, err } -func (p *FailoverPool) BestServerByBlockID(tongo.BlockID) (*liteclient.Client, error) { - return p.BestMasterchainServer(), nil +func (s *MasterchainInfoClient) LiteServerGetMasterchainInfo(ctx context.Context) (liteclient.LiteServerMasterchainInfoC, error) { + info, err := s.conn.Client().LiteServerGetMasterchainInfo(ctx) + if err != nil { + return liteclient.LiteServerMasterchainInfoC{}, err + } + s.conn.SetMasterHead(info.Last.ToBlockIdExt()) + return info, err +} + +func (p *FailoverPool) BestMasterchainInfoClient() *MasterchainInfoClient { + return &MasterchainInfoClient{ + conn: p.bestConnection(), + } +} + +// BestMasterchainClient returns a liteclient and its known masterchain head. +func (p *FailoverPool) BestMasterchainClient(ctx context.Context) (*liteclient.Client, tongo.BlockIDExt, error) { + bestConnection := p.bestConnection() + masterHead := bestConnection.MasterHead() + if masterHead.Seqno > 0 { + return bestConnection.Client(), bestConnection.MasterHead(), nil + } + // so this client is not initialized yet, + // let's wait for it to be initialized. + waitID, ch := p.subscribe(1) + defer p.unsubscribe(waitID) + + select { + case <-ctx.Done(): + return nil, tongo.BlockIDExt{}, ctx.Err() + case head := <-ch: + return bestConnection.Client(), head, nil + } +} + +// BestClientByAccountID returns a liteclient and its known masterchain head. +func (p *FailoverPool) BestClientByAccountID(ctx context.Context, accountID tongo.AccountID) (*liteclient.Client, tongo.BlockIDExt, error) { + return p.BestMasterchainClient(ctx) +} + +// BestClientByBlockID returns a liteclient and its known masterchain head. +func (p *FailoverPool) BestClientByBlockID(ctx context.Context, blockID tongo.BlockID) (*liteclient.Client, error) { + server, _, err := p.BestMasterchainClient(ctx) + return server, err } // ConnectionsNumber returns a number of connections in this pool. func (p *FailoverPool) ConnectionsNumber() int { return len(p.conns) } + +func (p *FailoverPool) notifySubscribers(update masterHeadUpdated) { + p.mu.RLock() + defer p.mu.RUnlock() + + if update.Conn.ID() != p.bestConn.ID() { + return + } + for _, ch := range p.waitList { + ch <- update.Head + } +} + +func (p *FailoverPool) subscribe(seqno uint32) (uint64, chan tongo.BlockIDExt) { + ch := make(chan tongo.BlockIDExt, 1) + + p.mu.Lock() + defer p.mu.Unlock() + + head := p.bestConn.MasterHead() + if head.Seqno >= seqno { + ch <- head + return 0, ch + } + p.waitListID++ + p.waitList[p.waitListID] = ch + return p.waitListID, ch +} + +func (p *FailoverPool) unsubscribe(waitID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.waitList, waitID) +} + +func (p *FailoverPool) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeout time.Duration) error { + waitID, ch := p.subscribe(seqno) + defer p.unsubscribe(waitID) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(timeout): + return fmt.Errorf("timeout") + case head := <-ch: + if head.Seqno >= seqno { + return nil + } + } + } +} diff --git a/liteapi/pool/failover_pool_test.go b/liteapi/pool/failover_pool_test.go index 0d55963d..132da9a8 100644 --- a/liteapi/pool/failover_pool_test.go +++ b/liteapi/pool/failover_pool_test.go @@ -3,7 +3,9 @@ package pool import ( "context" "testing" + "time" + "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/liteclient" ) @@ -13,6 +15,17 @@ type mockConn struct { isOK bool } +func (m *mockConn) ID() int { + return 0 +} + +func (m *mockConn) MasterHead() tongo.BlockIDExt { + return tongo.BlockIDExt{BlockID: tongo.BlockID{Seqno: m.seqno}} +} + +func (m *mockConn) SetMasterHead(ext tongo.BlockIDExt) { +} + func (m *mockConn) MasterSeqno() uint32 { return m.seqno } @@ -85,7 +98,8 @@ func TestFailoverPool_updateBest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &FailoverPool{ - conns: tt.conns, + conns: tt.conns, + updateBestInterval: time.Second, } ctx := context.Background() go p.Run(ctx)