Skip to content

Commit

Permalink
support cloning connections
Browse files Browse the repository at this point in the history
  • Loading branch information
erokhinav committed Dec 5, 2024
1 parent 3d3a044 commit c227da0
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions liteclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ type Client struct {
// if such a method makes several calls to a lite server,
// the total time is bounded by the timeout.
timeout time.Duration
connection *Connection
connections []*Connection
nextConn int
connMutex sync.Mutex
queries map[queryID]chan []byte
queriesMutex sync.Mutex
}
Expand All @@ -44,22 +46,47 @@ func OptionTimeout(t time.Duration) Options {
}
}

func OptionConnectionsNum(n int) Options {
return func(c *Client) {
if n < 1 {
n = 1
}
connFirst := c.connections[0]
for i := 1; i < n-1; i++ {
conn, err := NewConnection(context.Background(), connFirst.peerPublicKey, connFirst.host)
if err != nil {
slog.Warn("liteclient.reader() error", "err", err)
continue
}
c.connections = append(c.connections, conn)
}
}
}

func NewClient(c *Connection, opts ...Options) *Client {
c2 := &Client{
timeout: defaultTimeout,
connection: c,
queries: make(map[queryID]chan []byte),
timeout: defaultTimeout,
connections: []*Connection{c},
queries: make(map[queryID]chan []byte),
}
for _, f := range opts {
f(c2)
}
go c2.reader()

for _, conn := range c2.connections {
go c2.reader(conn)
}
return c2
}

// IsOK returns true if there is no problems with this client and its underlying connection to a lite server.
func (c *Client) IsOK() bool {
return c.connection.Status() == Connected
for _, conn := range c.connections {
if conn.Status() == Connected {
return true
}
}
return false
}

// Request sends q as query in adnl.message.query and receives answer from adnl.message.answer
Expand All @@ -83,7 +110,12 @@ func (c *Client) Request(ctx context.Context, q []byte) ([]byte, error) {
resp := c.registerCallback(id)
defer c.unregisterCallback(id)

err = c.connection.Send(p)
c.connMutex.Lock()
conn := c.connections[c.nextConn]
c.nextConn = (c.nextConn + 1) % len(c.connections)
c.connMutex.Unlock()

err = conn.Send(p)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,8 +174,8 @@ func decodeLength(b []byte) (int, []byte, error) {
return int(i) >> 8, b[4:], nil
}

func (c *Client) reader() {
for p := range c.connection.Responses() {
func (c *Client) reader(conn *Connection) {
for p := range conn.Responses() {
if p.MagicType() != magicADNLAnswer {
continue
}
Expand Down Expand Up @@ -225,7 +257,11 @@ func (c *Client) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeout
}

func (c *Client) AverageRoundTrip() time.Duration {
return c.connection.AverageRoundTrip()
var total time.Duration
for _, conn := range c.connections {
total += conn.AverageRoundTrip()
}
return total / time.Duration(len(c.connections))
}

func (c *Client) WaitMasterchainBlock(ctx context.Context, seqno uint32, timeout uint32) (res LiteServerBlockHeaderC, err error) {
Expand Down

0 comments on commit c227da0

Please sign in to comment.