Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/pool-to-work-with-a…
Browse files Browse the repository at this point in the history
…rchive-nodes'
  • Loading branch information
mr-tron committed Apr 10, 2024
2 parents d12cbd2 + 102f34a commit b1c716c
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 30 deletions.
82 changes: 60 additions & 22 deletions liteapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type Client struct {
// proofPolicy specifies a policy for proof checks.
proofPolicy ProofPolicy

// archiveDetectionEnabled specifies whether
// the underlying connections pool maintains information about which nodes are archive nodes.
archiveDetectionEnabled bool

// mu protects targetBlockID.
mu sync.RWMutex
targetBlockID *ton.BlockIDExt
Expand All @@ -82,6 +86,9 @@ type Options struct {
InitCtx context.Context
// ProofPolicy specifies a policy for proof checks.
ProofPolicy ProofPolicy
// DetectArchiveNodes specifies if a liteapi connection to a node
// should detect if its node is an archive node.
DetectArchiveNodes bool
}

type Option func(o *Options) error
Expand Down Expand Up @@ -118,6 +125,13 @@ func WithProofPolicy(policy ProofPolicy) Option {
}
}

func WithDetectArchiveNodes() Option {
return func(o *Options) error {
o.DetectArchiveNodes = true
return nil
}
}

// WithInitializationContext specifies a context to be used
// when opening a new connection to lite servers during the initialization.
func WithInitializationContext(ctx context.Context) Option {
Expand Down Expand Up @@ -231,10 +245,11 @@ func NewClientWithDefaultTestnet() (*Client, error) {
// Get options and create new lite client. If no options provided - download public config for mainnet from ton.org.
func NewClient(opts ...Option) (*Client, error) {
options := &Options{
Timeout: 60 * time.Second,
MaxConnections: defaultMaxConnectionsNumber,
InitCtx: context.Background(),
ProofPolicy: ProofPolicyUnsafe,
Timeout: 60 * time.Second,
MaxConnections: defaultMaxConnectionsNumber,
InitCtx: context.Background(),
ProofPolicy: ProofPolicyUnsafe,
DetectArchiveNodes: false,
}
for _, o := range opts {
if err := o(options); err != nil {
Expand Down Expand Up @@ -268,10 +283,11 @@ func NewClient(opts ...Option) (*Client, error) {
return nil, fmt.Errorf("all liteservers are unavailable")
}
client := Client{
pool: pool.NewFailoverPool(liteclients),
proofPolicy: options.ProofPolicy,
pool: pool.NewFailoverPool(liteclients),
proofPolicy: options.ProofPolicy,
archiveDetectionEnabled: options.DetectArchiveNodes,
}
go client.pool.Run(context.TODO())
go client.pool.Run(context.TODO(), options.DetectArchiveNodes)
return &client, nil
}

Expand Down Expand Up @@ -468,7 +484,7 @@ func (c *Client) RunSmcMethodByID(ctx context.Context, accountID ton.AccountID,
if err != nil {
return 0, tlb.VmStack{}, err
}
client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID)
client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID, false)
if err != nil {
return 0, tlb.VmStack{}, err
}
Expand Down Expand Up @@ -532,7 +548,7 @@ func (c *Client) GetAccountState(ctx context.Context, accountID ton.AccountID) (
}

func (c *Client) GetAccountStateRaw(ctx context.Context, accountID ton.AccountID) (liteclient.LiteServerAccountStateC, error) {
client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID)
client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID, false)
if err != nil {
return liteclient.LiteServerAccountStateC{}, err
}
Expand Down Expand Up @@ -649,7 +665,7 @@ func (c *Client) GetOneTransactionFromBlock(
blockId ton.BlockIDExt,
lt uint64,
) (ton.Transaction, error) {
client, _, err := c.pool.BestClientByAccountID(ctx, accountID)
client, _, err := c.pool.BestClientByAccountID(ctx, accountID, false)
if err != nil {
return ton.Transaction{}, err
}
Expand Down Expand Up @@ -711,20 +727,42 @@ func (c *Client) GetTransactions(
}

func (c *Client) GetTransactionsRaw(ctx context.Context, count uint32, accountID ton.AccountID, lt uint64, hash ton.Bits256) (liteclient.LiteServerTransactionListC, error) {
client, _, err := c.pool.BestClientByAccountID(ctx, accountID)
if err != nil {
return liteclient.LiteServerTransactionListC{}, err
archiveRequired := false
for {
client, _, err := c.pool.BestClientByAccountID(ctx, accountID, archiveRequired)
if err != nil {
return liteclient.LiteServerTransactionListC{}, err
}
res, err := client.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{
Count: count,
Account: liteclient.AccountID(accountID),
Lt: lt,
Hash: tl.Int256(hash),
})
if truncatedHistory(err) {
if !c.archiveDetectionEnabled {
return liteclient.LiteServerTransactionListC{}, err
}
if archiveRequired {
return liteclient.LiteServerTransactionListC{}, err
}
archiveRequired = true
continue
}
if err != nil {
return liteclient.LiteServerTransactionListC{}, err
}
return res, nil

}
res, err := client.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{
Count: count,
Account: liteclient.AccountID(accountID),
Lt: lt,
Hash: tl.Int256(hash),
})
if err != nil {
return liteclient.LiteServerTransactionListC{}, err
}

func truncatedHistory(err error) bool {
if err == nil {
return false
}
return res, nil
e, ok := err.(liteclient.LiteServerErrorC)
return ok && int32(e.Code) == -400
}

func (c *Client) GetLastTransactions(ctx context.Context, a ton.AccountID, limit int) ([]ton.Transaction, error) {
Expand Down
26 changes: 26 additions & 0 deletions liteapi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/tonkeeper/tongo/config"
"github.com/tonkeeper/tongo/tlb"
"github.com/tonkeeper/tongo/ton"
"golang.org/x/exp/maps"
Expand All @@ -37,6 +38,31 @@ func TestNewClient_WithMaxConnectionsNumber(t *testing.T) {
}
}

func TestGetTransactions_archive(t *testing.T) {
if len(os.Getenv("ARCHIVE_NODES_CONFIG")) == 0 {
t.Skip("ARCHIVE_NODES_CONFIG env is not set")
}
value := os.Getenv("ARCHIVE_NODES_CONFIG")
servers, err := config.ParseLiteServersEnvVar(value)
if err != nil {
t.Fatalf("ParseLiteServersEnvVar() failed: %v", err)
}
if len(servers) != 2 {
t.Fatalf("expected servers length: 2, got: %v", len(servers))
}
tongoClient, err := NewClient(WithLiteServers(servers), WithDetectArchiveNodes())
if err != nil {
log.Fatalf("Unable to create tongo client: %v", err)
}
time.Sleep(15 * time.Second)
accountId, _ := ton.AccountIDFromRaw("0:6ccd325a858c379693fae2bcaab1c2906831a4e10a6c3bb44ee8b615bca1d220")
txs, err := tongoClient.GetLastTransactions(context.Background(), accountId, 1000)
if err != nil {
t.Fatalf("Get transaction error: %v", err)
}
fmt.Printf("archive txs: %v\n", len(txs))
}

func TestGetTransactions(t *testing.T) {
if len(os.Getenv("LITE_SERVERS")) == 0 {
t.Skip("LITE_SERVERS env is not set")
Expand Down
63 changes: 62 additions & 1 deletion liteapi/pool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,29 @@ type connection struct {
mu sync.RWMutex
// masterHead is the latest known masterchain head.
masterHead ton.BlockIDExt
isArchive bool
}

type masterHeadUpdated struct {
Head ton.BlockIDExt
Conn *connection
}

func (c *connection) Run(ctx context.Context) {
func (c *connection) Run(ctx context.Context, detectArchive bool) {
if detectArchive {
go func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
// TODO: retry several times on error
seqno, err := c.FindMinAvailableMasterchainSeqno(ctx)
if err != nil {
return
}
if seqno == 2 {
c.setArchive(true)
}
}()
}
for {
var head ton.BlockIDExt
for {
Expand Down Expand Up @@ -100,3 +115,49 @@ func (c *connection) SetMasterHead(head ton.BlockIDExt) {
}
}
}

func (c *connection) FindMinAvailableMasterchainSeqno(ctx context.Context) (uint32, error) {
info, err := c.client.LiteServerGetMasterchainInfo(ctx)
if err != nil {
return 0, err
}
max := info.Last.Seqno
min := uint32(2)

next := min
workchain := -1
for min+1 < max {
request := liteclient.LiteServerLookupBlockRequest{
Mode: 1,
Id: liteclient.TonNodeBlockIdC{
Workchain: uint32(workchain),
Shard: 0x8000000000000000,
Seqno: next,
},
}
_, err := c.client.LiteServerLookupBlock(ctx, request)
if err != nil {
if e, ok := err.(liteclient.LiteServerErrorC); ok && e.Code == 651 {
min = next + 1
next = (min + max) / 2
continue
}
return 0, err
}
max = next - 1
next = (min + max) / 2
}
return min, nil
}

func (c *connection) IsArchiveNode() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.isArchive
}

func (c *connection) setArchive(archive bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.isArchive = archive
}
38 changes: 37 additions & 1 deletion liteapi/pool/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Test_connection_Run(t *testing.T) {
client: liteclient.NewClient(c),
masterHeadUpdatedCh: make(chan masterHeadUpdated, 100),
}
go conn.Run(context.Background())
go conn.Run(context.Background(), false)

time.Sleep(1 * time.Second)
res, err := conn.Client().LiteServerGetMasterchainInfo(context.Background())
Expand All @@ -62,3 +62,39 @@ func Test_connection_Run(t *testing.T) {
t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, newMasterHead.Seqno)
}
}

func Test_connection_FindMinAvailableMasterchainSeqno(t *testing.T) {
tests := []struct {
name string
host string
key string
wantMinSeqno uint32
}{
{
name: "querying regular node",
host: "5.9.10.15:48014",
key: "3XO67K/qi+gu3T9v8G2hx1yNmWZhccL3O7SoosFo8G0=",
wantMinSeqno: 36283540,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pubkey, err := base64.StdEncoding.DecodeString(tt.key)
if err != nil {
panic(err)
}
c, err := liteclient.NewConnection(context.Background(), pubkey, tt.host)
conn := &connection{
client: liteclient.NewClient(c),
masterHeadUpdatedCh: make(chan masterHeadUpdated, 100),
}
seqno, err := conn.FindMinAvailableMasterchainSeqno(context.Background())
if err != nil {
t.Fatalf("FindMinAvailableMasterchainSeqno() failed: %v", err)
}
if seqno < tt.wantMinSeqno {
t.Fatalf("want seqno: %v, got: %v", tt.wantMinSeqno, seqno)
}
})
}
}
20 changes: 16 additions & 4 deletions liteapi/pool/failover_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type conn interface {
SetMasterHead(ton.BlockIDExt)
IsOK() bool
Client() *liteclient.Client
Run(ctx context.Context)
Run(ctx context.Context, detectArchive bool)
IsArchiveNode() bool
}

// NewFailoverPool returns a new instance of a failover pool.
Expand Down Expand Up @@ -71,9 +72,9 @@ func NewFailoverPool(clients []*liteclient.Client) *FailoverPool {
}
}

func (p *FailoverPool) Run(ctx context.Context) {
func (p *FailoverPool) Run(ctx context.Context, detectArchiveNodes bool) {
for _, c := range p.conns {
go c.Run(ctx)
go c.Run(ctx, detectArchiveNodes)
}
tickTock := time.NewTicker(p.updateBestInterval)
defer tickTock.Stop()
Expand Down Expand Up @@ -170,9 +171,20 @@ func (p *FailoverPool) BestMasterchainClient(ctx context.Context) (*liteclient.C
return bestConnection.Client(), head, nil
}
}
func (p *FailoverPool) BestArchiveClient(ctx context.Context) (*liteclient.Client, ton.BlockIDExt, error) {
for _, c := range p.conns {
if c.IsOK() && c.IsArchiveNode() {
return c.Client(), c.MasterHead(), nil
}
}
return nil, ton.BlockIDExt{}, fmt.Errorf("no archive nodes available")
}

// BestClientByAccountID returns a liteclient and its known masterchain head.
func (p *FailoverPool) BestClientByAccountID(ctx context.Context, accountID ton.AccountID) (*liteclient.Client, ton.BlockIDExt, error) {
func (p *FailoverPool) BestClientByAccountID(ctx context.Context, accountID ton.AccountID, archiveRequired bool) (*liteclient.Client, ton.BlockIDExt, error) {
if archiveRequired {
return p.BestArchiveClient(ctx)
}
return p.BestMasterchainClient(ctx)
}

Expand Down
9 changes: 7 additions & 2 deletions liteapi/pool/failover_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type mockConn struct {
isOK bool
}

func (m *mockConn) IsArchiveNode() bool {
//TODO implement me
panic("implement me")
}

func (m *mockConn) ID() int {
return 0
}
Expand All @@ -38,7 +43,7 @@ func (m *mockConn) Client() *liteclient.Client {
panic("implement me")
}

func (m *mockConn) Run(ctx context.Context) {
func (m *mockConn) Run(ctx context.Context, detectArchiveNodes bool) {
}

var _ conn = &mockConn{}
Expand Down Expand Up @@ -102,7 +107,7 @@ func TestFailoverPool_updateBest(t *testing.T) {
updateBestInterval: time.Second,
}
ctx := context.Background()
go p.Run(ctx)
go p.Run(ctx, false)
p.updateBest()
c := p.bestConnection().(*mockConn)
if tt.wantID != c.id {
Expand Down

0 comments on commit b1c716c

Please sign in to comment.