Skip to content

Commit

Permalink
Merge pull request #2 from tonkeeper/streaming-api-filter-by-ops
Browse files Browse the repository at this point in the history
Support streaming of account txs filtered by operation
  • Loading branch information
aleksej-paschenko authored Nov 23, 2023
2 parents 39092d7 + 03db330 commit ec7e31c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
15 changes: 13 additions & 2 deletions streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ func NewStreamingAPI(opts ...StreamingOption) *StreamingAPI {
// happening in the TON blockchain.
type Websocket interface {
// SubscribeToTransactions subscribes to notifications about new transactions for the specified accounts.
SubscribeToTransactions(accounts []string) error
// "operations" specifies a list of operations to receive.
// Each operation is a string containing either MsgOpName from https://github.com/tonkeeper/tongo/blob/master/abi/messages.go
// or a hex string representing an unsigned 32-bit integer.
// An example of "operations" is []string{"JettonBurn", "0x595f07bc"}.
SubscribeToTransactions(accounts []string, operations []string) error
// UnsubscribeFromTransactions unsubscribes from notifications about new transactions for the specified accounts.
UnsubscribeFromTransactions(accounts []string) error

Expand Down Expand Up @@ -188,14 +192,21 @@ func (s *StreamingAPI) SubscribeToMempool(ctx context.Context, handler MempoolHa
// SubscribeToTransactions opens a new sse connection to tonapi.io and subscribes to new transactions for the specified accounts.
// When a new transaction is received, the handler will be called.
// If accounts is empty, all traces for all accounts will be received.
// "operations" specifies a list of operations to receive.
// Each operation is a string containing either MsgOpName from https://github.com/tonkeeper/tongo/blob/master/abi/messages.go
// or a hex string representing an unsigned 32-bit integer.
// An example of "operations" is []string{"JettonBurn", "0x595f07bc"}.
// This function returns an error when the underlying connection fails or context is canceled.
// No automatic reconnection is performed.
func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []string, handler TransactionHandler) error {
func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []string, operations []string, handler TransactionHandler) error {
accountsQueryStr := "ALL"
if len(accounts) > 0 {
accountsQueryStr = strings.Join(accounts, ",")
}
url := fmt.Sprintf("%s/v2/sse/accounts/transactions?accounts=%s", s.endpoint, accountsQueryStr)
if len(operations) > 0 {
url += "&operations=" + strings.Join(operations, ",")
}
return s.subscribe(ctx, url, s.apiKey, func(data []byte) {
eventData := TransactionEventData{}
if err := json.Unmarshal(data, &eventData); err != nil {
Expand Down
31 changes: 24 additions & 7 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"

"github.com/gorilla/websocket"
Expand All @@ -32,49 +33,58 @@ type JsonRPCResponse struct {
type websocketConnection struct {
// mu protects the handler fields below.
mu sync.Mutex
requestID uint64
conn *websocket.Conn
mempoolHandler MempoolHandler
transactionHandler TransactionHandler
traceHandler TraceHandler
}

func (w *websocketConnection) SubscribeToTransactions(accounts []string) error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_account", Params: accounts}
func (w *websocketConnection) SubscribeToTransactions(accounts []string, operations []string) error {
params := accounts
if len(operations) > 0 {
params = make([]string, 0, len(accounts))
ops := fmt.Sprintf("operations=%s", strings.Join(operations, ","))
for _, account := range accounts {
params = append(params, fmt.Sprintf("%s;%s", account, ops))
}
}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_account", Params: params}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) UnsubscribeFromTransactions(accounts []string) error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_account", Params: accounts}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_account", Params: accounts}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) SubscribeToTraces(accounts []string) error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_trace", Params: accounts}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_trace", Params: accounts}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) UnsubscribeFromTraces(accounts []string) error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_trace", Params: accounts}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_trace", Params: accounts}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) SubscribeToMempool() error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "subscribe_mempool"}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_mempool"}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) UnsubscribeFromMempool() error {
request := JsonRPCRequest{ID: 1, JSONRPC: "2.0", Method: "unsubscribe_mempool"}
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_mempool"}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
Expand Down Expand Up @@ -176,6 +186,13 @@ func (w *websocketConnection) runJsonRPC(ctx context.Context, fn WebsocketConfig
return g.Wait()
}

func (w *websocketConnection) currentRequestID() uint64 {
w.mu.Lock()
defer w.mu.Unlock()
w.requestID++
return w.requestID
}

func (w *websocketConnection) processHandler(fn func()) {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down

0 comments on commit ec7e31c

Please sign in to comment.