Skip to content

Commit

Permalink
Add details from request context to mempool messages
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksej-paschenko committed Dec 1, 2023
1 parent ac76695 commit 701edfe
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 106 deletions.
8 changes: 4 additions & 4 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func main() {
if err != nil {
log.Fatal("storage init", zap.Error(err))
}
// mempool receives a copy of any payload that goes through our API method /v2/blockchain/message
mempool := sources.NewMemPool(log)
// mempoolChannel receives a copy of any payload that goes through our API method /v2/blockchain/message
mempoolChannel, emulationCh := mempool.Run(context.TODO())
mempoolCh := mempool.Run(context.TODO())

msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan []byte{mempoolChannel})
msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan<- blockchain.ExtInMsgCopy{mempoolCh})
if err != nil {
log.Fatal("failed to create msg sender", zap.Error(err))
}
Expand All @@ -57,7 +57,7 @@ func main() {
api.WithExecutor(storage),
api.WithMessageSender(msgSender),
api.WithSpamFilter(spamFilter),
api.WithEmulationChannel(emulationCh),
api.WithEmulationChannel(mempoolCh),
api.WithTonConnectSecret(cfg.TonConnect.Secret),
)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/decode_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,7 +43,6 @@ func TestHandler_DecodeMessage(t *testing.T) {
return
}
require.Nil(t, err)
fmt.Printf("response: %v\n", response)
pkgTesting.CompareResults(t, response, tt.filenamePrefix)
})
}
Expand Down
50 changes: 27 additions & 23 deletions pkg/api/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/shopspring/decimal"
"github.com/tonkeeper/opentonapi/pkg/pusher/sources"
"github.com/tonkeeper/opentonapi/pkg/blockchain"
"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/abi"
"github.com/tonkeeper/tongo/boc"
Expand All @@ -38,8 +38,16 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
return toError(http.StatusBadRequest, fmt.Errorf("boc not found"))
}
if request.Boc.IsSet() {
payload, err := sendMessage(ctx, request.Boc.Value, h.msgSender)
payload, err := base64.StdEncoding.DecodeString(request.Boc.Value)
if err != nil {
return toError(http.StatusBadRequest, fmt.Errorf("boc must be a base64 encoded string"))
}
msgCopy := blockchain.ExtInMsgCopy{
MsgBoc: request.Boc.Value,
Payload: payload,
Details: h.ctxToDetails(ctx),
}
if err := h.msgSender.SendMessage(ctx, msgCopy); err != nil {
sentry.Send("sending message", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError)
return toError(http.StatusInternalServerError, err)
}
Expand All @@ -49,25 +57,33 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
sentry.Send("addToMempool", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError)
}
}()
h.addToMempool(payload, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
h.addToMempool(ctx, payload, nil)
}()
return nil
}
var (
batchOfBoc []string
copies []blockchain.ExtInMsgCopy
shardAccount = map[tongo.AccountID]tlb.ShardAccount{}
)
for _, msgBoc := range request.Batch {
payload, err := base64.StdEncoding.DecodeString(msgBoc)
if err != nil {
return toError(http.StatusBadRequest, err)
}
shardAccount, err = h.addToMempool(payload, shardAccount)
shardAccount, err = h.addToMempool(ctx, payload, shardAccount)
if err != nil {
continue
}
batchOfBoc = append(batchOfBoc, msgBoc)
msgCopy := blockchain.ExtInMsgCopy{
MsgBoc: msgBoc,
Payload: payload,
Details: h.ctxToDetails(ctx),
}
copies = append(copies, msgCopy)
}
h.msgSender.MsgsBocAddToMempool(batchOfBoc)
h.msgSender.SendMultipleMessages(ctx, copies)
return nil
}

Expand Down Expand Up @@ -499,9 +515,7 @@ func (h *Handler) EmulateMessageToWallet(ctx context.Context, request *oas.Emula
return &consequences, nil
}

func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
func (h *Handler) addToMempool(ctx context.Context, bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) {
if shardAccount == nil {
shardAccount = map[tongo.AccountID]tlb.ShardAccount{}
}
Expand Down Expand Up @@ -554,7 +568,9 @@ func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID
traces = slices.Insert(traces, 0, hex.EncodeToString(hash))
h.mempoolEmulate.accountsTraces.Set(account, traces, cache.WithExpiration(time.Second*time.Duration(ttl)))
}
h.emulationCh <- sources.PayloadAndEmulationResults{
h.emulationCh <- blockchain.ExtInMsgCopy{
MsgBoc: base64.StdEncoding.EncodeToString(bytesBoc),
Details: h.ctxToDetails(ctx),
Payload: bytesBoc,
Accounts: accounts,
}
Expand Down Expand Up @@ -704,15 +720,3 @@ func emulatedTreeToTrace(ctx context.Context, executor executor, resolver core.L
}
return t, nil
}

func sendMessage(ctx context.Context, msgBoc string, msgSender messageSender) ([]byte, error) {
payload, err := base64.StdEncoding.DecodeString(msgBoc)
if err != nil {
return nil, err
}
err = msgSender.SendMessage(ctx, payload)
if err != nil {
return nil, err
}
return payload, nil
}
51 changes: 35 additions & 16 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sync"

"github.com/go-faster/errors"
"github.com/tonkeeper/opentonapi/pkg/blockchain"
"github.com/tonkeeper/opentonapi/pkg/chainstate"
"github.com/tonkeeper/opentonapi/pkg/core"
"github.com/tonkeeper/opentonapi/pkg/pusher/sources"
"github.com/tonkeeper/opentonapi/pkg/rates"
"github.com/tonkeeper/opentonapi/pkg/spam"
"github.com/tonkeeper/tongo"
Expand All @@ -26,6 +26,9 @@ import (
// Compile-time check for Handler.
var _ oas.Handler = (*Handler)(nil)

// ctxToDetails converts a request context to a details instance.
type ctxToDetails func(ctx context.Context) any

type Handler struct {
logger *zap.Logger

Expand All @@ -43,7 +46,9 @@ type Handler struct {
// mempoolEmulate contains results of emulation of messages that are in the mempool.
mempoolEmulate mempoolEmulate
// emulationCh is used to send emulation results to mempool subscribers.
emulationCh chan sources.PayloadAndEmulationResults
emulationCh chan<- blockchain.ExtInMsgCopy
// ctxToDetails converts a request context to a details instance.
ctxToDetails ctxToDetails

// mu protects "dns".
mu sync.Mutex
Expand All @@ -65,7 +70,8 @@ type Options struct {
spamFilter spamFilter
ratesSource ratesSource
tonConnectSecret string
emulationCh chan sources.PayloadAndEmulationResults
emulationCh chan<- blockchain.ExtInMsgCopy
ctxToDetails ctxToDetails
}

type Option func(o *Options)
Expand All @@ -88,7 +94,7 @@ func WithAddressBook(book addressBook) Option {
}

// WithEmulationChannel configures a channel that will be used to send emulation results to mempool subscribers.
func WithEmulationChannel(ch chan sources.PayloadAndEmulationResults) Option {
func WithEmulationChannel(ch chan<- blockchain.ExtInMsgCopy) Option {
return func(o *Options) {
o.emulationCh = ch
}
Expand Down Expand Up @@ -130,6 +136,12 @@ func WithTonConnectSecret(tonConnectSecret string) Option {
}
}

func WithContextToDetails(ctxToDetails ctxToDetails) Option {
return func(o *Options) {
o.ctxToDetails = ctxToDetails
}
}

func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) {
options := &Options{}
for _, o := range opts {
Expand Down Expand Up @@ -157,31 +169,38 @@ func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) {
return nil, fmt.Errorf("executor is not configured")
}
if options.emulationCh == nil {
options.emulationCh = make(chan sources.PayloadAndEmulationResults, 100)
emulationCh := make(chan blockchain.ExtInMsgCopy, 100)
options.emulationCh = emulationCh
go func() {
for {
select {
case <-options.emulationCh:
case <-emulationCh:
// drop it
}
}
}()
}
if options.ctxToDetails == nil {
options.ctxToDetails = func(ctx context.Context) any {
return nil
}
}
tonConnect, err := tonconnect.NewTonConnect(options.executor, options.tonConnectSecret)
if err != nil {
return nil, fmt.Errorf("failed to init tonconnect")
}
return &Handler{
logger: logger,
storage: options.storage,
state: options.chainState,
addressBook: options.addressBook,
msgSender: options.msgSender,
executor: options.executor,
limits: options.limits,
spamFilter: options.spamFilter,
emulationCh: options.emulationCh,
ratesSource: rates.InitCalculator(options.ratesSource),
logger: logger,
storage: options.storage,
state: options.chainState,
addressBook: options.addressBook,
msgSender: options.msgSender,
executor: options.executor,
limits: options.limits,
spamFilter: options.spamFilter,
emulationCh: options.emulationCh,
ctxToDetails: options.ctxToDetails,
ratesSource: rates.InitCalculator(options.ratesSource),
metaCache: metadataCache{
collectionsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "nft_metadata_cache"),
jettonsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "jetton_metadata_cache"),
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ed25519"
"sync"

"github.com/tonkeeper/opentonapi/pkg/blockchain"
rules "github.com/tonkeeper/scam_backoffice_rules"
"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/abi"
Expand Down Expand Up @@ -124,8 +125,10 @@ type chainState interface {

// messageSender provides a method to send a raw message to the blockchain.
type messageSender interface {
SendMessage(ctx context.Context, payload []byte) error // SendMessage sends the given payload(a message) to the blockchain.
MsgsBocAddToMempool(bocMsgs []string) // MsgsBocAddToMempool sends a list of boc to the cache for later sending
// SendMessage sends the given message to the blockchain.
SendMessage(ctx context.Context, msgCopy blockchain.ExtInMsgCopy) error
// SendMultipleMessages sends a list of messages to the cache for later sending.
SendMultipleMessages(ctx context.Context, copies []blockchain.ExtInMsgCopy)
}

// executor runs any get methods
Expand Down
Loading

0 comments on commit 701edfe

Please sign in to comment.