From 03db330b65d740dd8f766d1fb40fe6f9c78cb76c Mon Sep 17 00:00:00 2001 From: "aleksej.paschenko" Date: Thu, 16 Nov 2023 16:04:30 +0300 Subject: [PATCH] Support streaming of account txs filtered by operation --- streaming.go | 15 +++++++++++++-- websocket.go | 31 ++++++++++++++++++++++++------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/streaming.go b/streaming.go index 7078014..b1b80e1 100644 --- a/streaming.go +++ b/streaming.go @@ -101,7 +101,11 @@ func NewStreamingAPI(opts ...StreamingOption) *StreamingAPI { // happening in the TON blockchain. type Websocket interface { // SubscribeToTransactions subscribes to notifications about new transactions for the specified accounts. - SubscribeToTransactions(accounts []string) error + // "operations" specifies a list of operations to receive. + // Each operation is a string containing either MsgOpName from https://github.com/tonkeeper/tongo/blob/master/abi/messages.go + // or a hex string representing an unsigned 32-bit integer. + // An example of "operations" is []string{"JettonBurn", "0x595f07bc"}. + SubscribeToTransactions(accounts []string, operations []string) error // UnsubscribeFromTransactions unsubscribes from notifications about new transactions for the specified accounts. UnsubscribeFromTransactions(accounts []string) error @@ -188,14 +192,21 @@ func (s *StreamingAPI) SubscribeToMempool(ctx context.Context, handler MempoolHa // SubscribeToTransactions opens a new sse connection to tonapi.io and subscribes to new transactions for the specified accounts. // When a new transaction is received, the handler will be called. // If accounts is empty, all traces for all accounts will be received. +// "operations" specifies a list of operations to receive. +// Each operation is a string containing either MsgOpName from https://github.com/tonkeeper/tongo/blob/master/abi/messages.go +// or a hex string representing an unsigned 32-bit integer. +// An example of "operations" is []string{"JettonBurn", "0x595f07bc"}. // This function returns an error when the underlying connection fails or context is canceled. // No automatic reconnection is performed. -func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []string, handler TransactionHandler) error { +func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []string, operations []string, handler TransactionHandler) error { accountsQueryStr := "ALL" if len(accounts) > 0 { accountsQueryStr = strings.Join(accounts, ",") } url := fmt.Sprintf("%s/v2/sse/accounts/transactions?accounts=%s", s.endpoint, accountsQueryStr) + if len(operations) > 0 { + url += "&operations=" + strings.Join(operations, ",") + } return s.subscribe(ctx, url, s.apiKey, func(data []byte) { eventData := TransactionEventData{} if err := json.Unmarshal(data, &eventData); err != nil { diff --git a/websocket.go b/websocket.go index c53e2fe..52c62db 100644 --- a/websocket.go +++ b/websocket.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "net/url" + "strings" "sync" "github.com/gorilla/websocket" @@ -32,49 +33,58 @@ type JsonRPCResponse struct { type websocketConnection struct { // mu protects the handler fields below. mu sync.Mutex + requestID uint64 conn *websocket.Conn mempoolHandler MempoolHandler transactionHandler TransactionHandler traceHandler TraceHandler } -func (w *websocketConnection) SubscribeToTransactions(accounts []string) error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_account", Params: accounts} +func (w *websocketConnection) SubscribeToTransactions(accounts []string, operations []string) error { + params := accounts + if len(operations) > 0 { + params = make([]string, 0, len(accounts)) + ops := fmt.Sprintf("operations=%s", strings.Join(operations, ",")) + for _, account := range accounts { + params = append(params, fmt.Sprintf("%s;%s", account, ops)) + } + } + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_account", Params: params} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) } func (w *websocketConnection) UnsubscribeFromTransactions(accounts []string) error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_account", Params: accounts} + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_account", Params: accounts} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) } func (w *websocketConnection) SubscribeToTraces(accounts []string) error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_trace", Params: accounts} + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_trace", Params: accounts} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) } func (w *websocketConnection) UnsubscribeFromTraces(accounts []string) error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_trace", Params: accounts} + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_trace", Params: accounts} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) } func (w *websocketConnection) SubscribeToMempool() error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_mempool"} + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_mempool"} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) } func (w *websocketConnection) UnsubscribeFromMempool() error { - request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_mempool"} + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_mempool"} w.mu.Lock() defer w.mu.Unlock() return w.conn.WriteJSON(request) @@ -176,6 +186,13 @@ func (w *websocketConnection) runJsonRPC(ctx context.Context, fn WebsocketConfig return g.Wait() } +func (w *websocketConnection) currentRequestID() uint64 { + w.mu.Lock() + defer w.mu.Unlock() + w.requestID++ + return w.requestID +} + func (w *websocketConnection) processHandler(fn func()) { w.mu.Lock() defer w.mu.Unlock()