diff --git a/liteapi/client.go b/liteapi/client.go index 97ee2c8c..df3b096d 100644 --- a/liteapi/client.go +++ b/liteapi/client.go @@ -67,6 +67,10 @@ type Client struct { // proofPolicy specifies a policy for proof checks. proofPolicy ProofPolicy + // archiveDetectionEnabled specifies whether + // the underlying connections pool maintains information about which nodes are archive nodes. + archiveDetectionEnabled bool + // mu protects targetBlockID. mu sync.RWMutex targetBlockID *ton.BlockIDExt @@ -82,6 +86,9 @@ type Options struct { InitCtx context.Context // ProofPolicy specifies a policy for proof checks. ProofPolicy ProofPolicy + // DetectArchiveNodes specifies if a liteapi connection to a node + // should detect if its node is an archive node. + DetectArchiveNodes bool } type Option func(o *Options) error @@ -118,6 +125,13 @@ func WithProofPolicy(policy ProofPolicy) Option { } } +func WithDetectArchiveNodes() Option { + return func(o *Options) error { + o.DetectArchiveNodes = true + return nil + } +} + // WithInitializationContext specifies a context to be used // when opening a new connection to lite servers during the initialization. func WithInitializationContext(ctx context.Context) Option { @@ -231,10 +245,11 @@ func NewClientWithDefaultTestnet() (*Client, error) { // Get options and create new lite client. If no options provided - download public config for mainnet from ton.org. func NewClient(opts ...Option) (*Client, error) { options := &Options{ - Timeout: 60 * time.Second, - MaxConnections: defaultMaxConnectionsNumber, - InitCtx: context.Background(), - ProofPolicy: ProofPolicyUnsafe, + Timeout: 60 * time.Second, + MaxConnections: defaultMaxConnectionsNumber, + InitCtx: context.Background(), + ProofPolicy: ProofPolicyUnsafe, + DetectArchiveNodes: false, } for _, o := range opts { if err := o(options); err != nil { @@ -268,10 +283,11 @@ func NewClient(opts ...Option) (*Client, error) { return nil, fmt.Errorf("all liteservers are unavailable") } client := Client{ - pool: pool.NewFailoverPool(liteclients), - proofPolicy: options.ProofPolicy, + pool: pool.NewFailoverPool(liteclients), + proofPolicy: options.ProofPolicy, + archiveDetectionEnabled: options.DetectArchiveNodes, } - go client.pool.Run(context.TODO()) + go client.pool.Run(context.TODO(), options.DetectArchiveNodes) return &client, nil } @@ -468,7 +484,7 @@ func (c *Client) RunSmcMethodByID(ctx context.Context, accountID ton.AccountID, if err != nil { return 0, tlb.VmStack{}, err } - client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID) + client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID, false) if err != nil { return 0, tlb.VmStack{}, err } @@ -532,7 +548,7 @@ func (c *Client) GetAccountState(ctx context.Context, accountID ton.AccountID) ( } func (c *Client) GetAccountStateRaw(ctx context.Context, accountID ton.AccountID) (liteclient.LiteServerAccountStateC, error) { - client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID) + client, masterHead, err := c.pool.BestClientByAccountID(ctx, accountID, false) if err != nil { return liteclient.LiteServerAccountStateC{}, err } @@ -649,7 +665,7 @@ func (c *Client) GetOneTransactionFromBlock( blockId ton.BlockIDExt, lt uint64, ) (ton.Transaction, error) { - client, _, err := c.pool.BestClientByAccountID(ctx, accountID) + client, _, err := c.pool.BestClientByAccountID(ctx, accountID, false) if err != nil { return ton.Transaction{}, err } @@ -711,20 +727,42 @@ func (c *Client) GetTransactions( } func (c *Client) GetTransactionsRaw(ctx context.Context, count uint32, accountID ton.AccountID, lt uint64, hash ton.Bits256) (liteclient.LiteServerTransactionListC, error) { - client, _, err := c.pool.BestClientByAccountID(ctx, accountID) - if err != nil { - return liteclient.LiteServerTransactionListC{}, err + archiveRequired := false + for { + client, _, err := c.pool.BestClientByAccountID(ctx, accountID, archiveRequired) + if err != nil { + return liteclient.LiteServerTransactionListC{}, err + } + res, err := client.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{ + Count: count, + Account: liteclient.AccountID(accountID), + Lt: lt, + Hash: tl.Int256(hash), + }) + if truncatedHistory(err) { + if !c.archiveDetectionEnabled { + return liteclient.LiteServerTransactionListC{}, err + } + if archiveRequired { + return liteclient.LiteServerTransactionListC{}, err + } + archiveRequired = true + continue + } + if err != nil { + return liteclient.LiteServerTransactionListC{}, err + } + return res, nil + } - res, err := client.LiteServerGetTransactions(ctx, liteclient.LiteServerGetTransactionsRequest{ - Count: count, - Account: liteclient.AccountID(accountID), - Lt: lt, - Hash: tl.Int256(hash), - }) - if err != nil { - return liteclient.LiteServerTransactionListC{}, err +} + +func truncatedHistory(err error) bool { + if err == nil { + return false } - return res, nil + e, ok := err.(liteclient.LiteServerErrorC) + return ok && int32(e.Code) == -400 } func (c *Client) GetLastTransactions(ctx context.Context, a ton.AccountID, limit int) ([]ton.Transaction, error) { diff --git a/liteapi/client_test.go b/liteapi/client_test.go index 04cb98b1..6eb36375 100644 --- a/liteapi/client_test.go +++ b/liteapi/client_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/tonkeeper/tongo/config" "github.com/tonkeeper/tongo/tlb" "github.com/tonkeeper/tongo/ton" "golang.org/x/exp/maps" @@ -37,6 +38,31 @@ func TestNewClient_WithMaxConnectionsNumber(t *testing.T) { } } +func TestGetTransactions_archive(t *testing.T) { + if len(os.Getenv("ARCHIVE_NODES_CONFIG")) == 0 { + t.Skip("ARCHIVE_NODES_CONFIG env is not set") + } + value := os.Getenv("ARCHIVE_NODES_CONFIG") + servers, err := config.ParseLiteServersEnvVar(value) + if err != nil { + t.Fatalf("ParseLiteServersEnvVar() failed: %v", err) + } + if len(servers) != 2 { + t.Fatalf("expected servers length: 2, got: %v", len(servers)) + } + tongoClient, err := NewClient(WithLiteServers(servers), WithDetectArchiveNodes()) + if err != nil { + log.Fatalf("Unable to create tongo client: %v", err) + } + time.Sleep(15 * time.Second) + accountId, _ := ton.AccountIDFromRaw("0:6ccd325a858c379693fae2bcaab1c2906831a4e10a6c3bb44ee8b615bca1d220") + txs, err := tongoClient.GetLastTransactions(context.Background(), accountId, 1000) + if err != nil { + t.Fatalf("Get transaction error: %v", err) + } + fmt.Printf("archive txs: %v\n", len(txs)) +} + func TestGetTransactions(t *testing.T) { if len(os.Getenv("LITE_SERVERS")) == 0 { t.Skip("LITE_SERVERS env is not set") diff --git a/liteapi/pool/connection.go b/liteapi/pool/connection.go index 0251c362..6144733c 100644 --- a/liteapi/pool/connection.go +++ b/liteapi/pool/connection.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -19,6 +20,7 @@ type connection struct { mu sync.RWMutex // masterHead is the latest known masterchain head. masterHead ton.BlockIDExt + isArchive bool } type masterHeadUpdated struct { @@ -26,7 +28,22 @@ type masterHeadUpdated struct { Conn *connection } -func (c *connection) Run(ctx context.Context) { +func (c *connection) Run(ctx context.Context, detectArchive bool) { + if detectArchive { + go func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + // TODO: retry several times on error + seqno, err := c.FindMinAvailableMasterchainSeqno(ctx) + if err != nil { + return + } + fmt.Printf("node %v, seqno : %v\n", c.id, seqno) + if seqno == 2 { + c.setArchive(true) + } + }() + } for { var head ton.BlockIDExt for { @@ -100,3 +117,50 @@ func (c *connection) SetMasterHead(head ton.BlockIDExt) { } } } + +func (c *connection) FindMinAvailableMasterchainSeqno(ctx context.Context) (uint32, error) { + info, err := c.client.LiteServerGetMasterchainInfo(ctx) + if err != nil { + return 0, err + } + max := info.Last.Seqno + min := uint32(2) + + next := min + workchain := -1 + for min+1 < max { + fmt.Printf("min: %v, max: %v, next: %v\n", min, max, next) + request := liteclient.LiteServerLookupBlockRequest{ + Mode: 1, + Id: liteclient.TonNodeBlockIdC{ + Workchain: uint32(workchain), + Shard: 0x8000000000000000, + Seqno: next, + }, + } + _, err := c.client.LiteServerLookupBlock(ctx, request) + if err != nil { + if e, ok := err.(liteclient.LiteServerErrorC); ok && e.Code == 651 { + min = next + 1 + next = (min + max) / 2 + continue + } + return 0, err + } + max = next - 1 + next = (min + max) / 2 + } + return min, nil +} + +func (c *connection) IsArchiveNode() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.isArchive +} + +func (c *connection) setArchive(archive bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.isArchive = archive +} diff --git a/liteapi/pool/connection_test.go b/liteapi/pool/connection_test.go index af8e8f6d..eeda4b4f 100644 --- a/liteapi/pool/connection_test.go +++ b/liteapi/pool/connection_test.go @@ -40,7 +40,7 @@ func Test_connection_Run(t *testing.T) { client: liteclient.NewClient(c), masterHeadUpdatedCh: make(chan masterHeadUpdated, 100), } - go conn.Run(context.Background()) + go conn.Run(context.Background(), false) time.Sleep(1 * time.Second) res, err := conn.Client().LiteServerGetMasterchainInfo(context.Background()) @@ -62,3 +62,39 @@ func Test_connection_Run(t *testing.T) { t.Fatalf("want seqno: %v, got: %v", res.Last.Seqno, newMasterHead.Seqno) } } + +func Test_connection_FindMinAvailableMasterchainSeqno(t *testing.T) { + tests := []struct { + name string + host string + key string + wantMinSeqno uint32 + }{ + { + name: "querying regular node", + host: "5.9.10.15:48014", + key: "3XO67K/qi+gu3T9v8G2hx1yNmWZhccL3O7SoosFo8G0=", + wantMinSeqno: 36283540, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pubkey, err := base64.StdEncoding.DecodeString(tt.key) + if err != nil { + panic(err) + } + c, err := liteclient.NewConnection(context.Background(), pubkey, tt.host) + conn := &connection{ + client: liteclient.NewClient(c), + masterHeadUpdatedCh: make(chan masterHeadUpdated, 100), + } + seqno, err := conn.FindMinAvailableMasterchainSeqno(context.Background()) + if err != nil { + t.Fatalf("FindMinAvailableMasterchainSeqno() failed: %v", err) + } + if seqno < tt.wantMinSeqno { + t.Fatalf("want seqno: %v, got: %v", tt.wantMinSeqno, seqno) + } + }) + } +} diff --git a/liteapi/pool/failover_pool.go b/liteapi/pool/failover_pool.go index 3847b697..d6462eec 100644 --- a/liteapi/pool/failover_pool.go +++ b/liteapi/pool/failover_pool.go @@ -43,7 +43,8 @@ type conn interface { SetMasterHead(ton.BlockIDExt) IsOK() bool Client() *liteclient.Client - Run(ctx context.Context) + Run(ctx context.Context, detectArchive bool) + IsArchiveNode() bool } // NewFailoverPool returns a new instance of a failover pool. @@ -71,9 +72,9 @@ func NewFailoverPool(clients []*liteclient.Client) *FailoverPool { } } -func (p *FailoverPool) Run(ctx context.Context) { +func (p *FailoverPool) Run(ctx context.Context, detectArchiveNodes bool) { for _, c := range p.conns { - go c.Run(ctx) + go c.Run(ctx, detectArchiveNodes) } tickTock := time.NewTicker(p.updateBestInterval) defer tickTock.Stop() @@ -170,9 +171,20 @@ func (p *FailoverPool) BestMasterchainClient(ctx context.Context) (*liteclient.C return bestConnection.Client(), head, nil } } +func (p *FailoverPool) BestArchiveClient(ctx context.Context) (*liteclient.Client, ton.BlockIDExt, error) { + for _, c := range p.conns { + if c.IsOK() && c.IsArchiveNode() { + return c.Client(), c.MasterHead(), nil + } + } + return nil, ton.BlockIDExt{}, fmt.Errorf("no archive nodes available") +} // BestClientByAccountID returns a liteclient and its known masterchain head. -func (p *FailoverPool) BestClientByAccountID(ctx context.Context, accountID ton.AccountID) (*liteclient.Client, ton.BlockIDExt, error) { +func (p *FailoverPool) BestClientByAccountID(ctx context.Context, accountID ton.AccountID, archiveRequired bool) (*liteclient.Client, ton.BlockIDExt, error) { + if archiveRequired { + return p.BestArchiveClient(ctx) + } return p.BestMasterchainClient(ctx) } diff --git a/liteapi/pool/failover_pool_test.go b/liteapi/pool/failover_pool_test.go index 79acbe3e..c1847750 100644 --- a/liteapi/pool/failover_pool_test.go +++ b/liteapi/pool/failover_pool_test.go @@ -15,6 +15,11 @@ type mockConn struct { isOK bool } +func (m *mockConn) IsArchiveNode() bool { + //TODO implement me + panic("implement me") +} + func (m *mockConn) ID() int { return 0 } @@ -38,7 +43,7 @@ func (m *mockConn) Client() *liteclient.Client { panic("implement me") } -func (m *mockConn) Run(ctx context.Context) { +func (m *mockConn) Run(ctx context.Context, detectArchiveNodes bool) { } var _ conn = &mockConn{} @@ -102,7 +107,7 @@ func TestFailoverPool_updateBest(t *testing.T) { updateBestInterval: time.Second, } ctx := context.Background() - go p.Run(ctx) + go p.Run(ctx, false) p.updateBest() c := p.bestConnection().(*mockConn) if tt.wantID != c.id {