From 0b1ff7f76f7eb520bb718bb038fdf8cea8fdb227 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 14 Jan 2025 13:01:51 +0100 Subject: [PATCH] wip --- testutil/e2e/comet.go | 42 +++++++++++++++++-------------------- testutil/e2e/ws_server.go | 42 +++++++++++++++++++++---------------- testutil/integration/app.go | 37 ++++++++++++++++---------------- 3 files changed, 62 insertions(+), 59 deletions(-) diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index 5e607f039..61a6a9d26 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -8,10 +8,10 @@ import ( "fmt" "io" "net/http" + "time" "github.com/cometbft/cometbft/abci/types" coretypes "github.com/cometbft/cometbft/rpc/core/types" - coregrpc "github.com/cometbft/cometbft/rpc/grpc" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" gogogrpc "github.com/cosmos/gogoproto/grpc" @@ -151,7 +151,10 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc // TODO_CONSIDERATION: more correct implementation of the different // broadcast_tx methods (i.e. sync, async, commit) is a matter of - // the sequence of running the tx and sending the JSON-RPC response. + // the sequencing of the following: + // - calling the finalize block ABCI method + // - returning the JSON-RPC response + // - emitting websocket event _, finalizeBlockRes, err := app.RunTx(nil, txBz) if err != nil { @@ -159,31 +162,24 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc return } + // TODO_IN_THIS_COMMIT: something better... + go func() { + // Simulate 1 second block production delay. + time.Sleep(time.Second * 1) + + app.EmitWSEvents(finalizeBlockRes.GetEvents()) + }() + // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as // we're finalizing one tx at a time (single tx blocks). txRes := finalizeBlockRes.GetTxResults()[0] - bcastTxRes := coregrpc.ResponseBroadcastTx{ - CheckTx: &types.ResponseCheckTx{ - Code: txRes.GetCode(), - Data: txRes.GetData(), - Log: txRes.GetLog(), - Info: txRes.GetInfo(), - GasWanted: txRes.GetGasWanted(), - GasUsed: txRes.GetGasUsed(), - Events: txRes.GetEvents(), - Codespace: txRes.GetCodespace(), - }, - TxResult: &types.ExecTxResult{ - Code: txRes.GetCode(), - Data: txRes.GetData(), - Log: txRes.GetLog(), - Info: txRes.GetInfo(), - GasWanted: txRes.GetGasWanted(), - GasUsed: txRes.GetGasUsed(), - Events: txRes.GetEvents(), - Codespace: txRes.GetCodespace(), - }, + bcastTxRes := coretypes.ResultBroadcastTx{ + Code: txRes.GetCode(), + Data: txRes.GetData(), + Log: txRes.GetLog(), + Codespace: txRes.GetCodespace(), + //Hash:, } response = rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go index cb6b10ec4..c38a405a6 100644 --- a/testutil/e2e/ws_server.go +++ b/testutil/e2e/ws_server.go @@ -6,9 +6,10 @@ import ( "strings" "testing" + abci "github.com/cometbft/cometbft/abci/types" coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" - sdk "github.com/cosmos/cosmos-sdk/types" + cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/tx" "github.com/gorilla/websocket" ) @@ -418,7 +419,7 @@ func (app *E2EApp) handleBlockEvents(t *testing.T) { // TODO_IN_THIS_COMMIT: also wrap RunMsgs... // TODO_IN_THIS_COMMIT: godoc... // Override RunMsg to also emit transaction events via WebSocket -func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) { +func (app *E2EApp) RunMsg(t *testing.T, msg cosmostypes.Msg) (tx.MsgResponse, error) { msgRes, err := app.App.RunMsg(t, msg) if err != nil { return nil, err @@ -432,7 +433,7 @@ func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) { } // createBlockEvent creates a CometBFT-compatible event from transaction results -func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent { +func createBlockEvent(ctx *cosmostypes.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent { // Convert SDK events to map[string][]string format that CometBFT expects events := make(map[string][]string) for _, event := range ctx.EventManager().Events() { @@ -457,22 +458,27 @@ func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.Result } } -//// createTxEvent creates a CometBFT-compatible event from transaction results -//func createTxEvent(tx *coretypes.ResultTx, index int) *coretypes.ResultEvent { -// return &coretypes.ResultEvent{ -// Query: "tm.event='Tx'", -// Data: map[string]interface{}{ -// "height": ctx.BlockHeight(), -// "hash": ctx.BlockHeader().LastBlockId.Hash, -// "events": events, -// // Add other relevant block and transaction data here as needed -// }, -// Events: events, -// } -//} +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) EmitWSEvents(events []abci.Event) { + // TODO_IN_THIS_COMMIT: necessary? + //app.wsConnMutex.RLock() + //defer app.wsConnMutex.RUnlock() + + for _, event := range events { + for conn, queries := range app.wsConnections { + // Check if connection is subscribed to this event type + for query, _ := range queries { + if eventMatchesQuery(event, query) { + // Marshal the event to JSON + _ = conn + } + } + } + } +} // eventMatchesQuery checks if an event matches a subscription query -func eventMatchesQuery(event *coretypes.ResultEvent, query string) bool { +func eventMatchesQuery(event abci.Event, query string) bool { // Basic implementation - should be expanded to handle more complex queries - return strings.Contains(query, event.Query) + return strings.Contains(query, event.Type) } diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 84caf1544..2427a781d 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -709,22 +709,6 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo t.Helper() } - // Commit the updated state after the message has been handled. - var finalizeBlockRes *abci.ResponseFinalizeBlock - defer func() { - if _, commitErr := app.Commit(); commitErr != nil { - err = fmt.Errorf("committing state: %w", commitErr) - return - } - - app.nextBlockUpdateCtx() - - // Emit events MUST happen AFTER the context has been updated so that - // events are available on the context for the block after their actions - // were committed (e.g. msgs, begin/end block trigger). - app.emitEvents(t, finalizeBlockRes) - }() - // Package the message into a transaction. txBuilder := app.txCfg.NewTxBuilder() if err = txBuilder.SetMsgs(msgs...); err != nil { @@ -740,7 +724,7 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo app.logger.Info("Running msg", "msg", msg.String()) } - txMsgResps, finalizeBlockRes, err = app.RunTx(t, txBz) + txMsgResps, _, err = app.RunTx(t, txBz) if err != nil { // DEV_NOTE: Intentionally returning and not asserting nil error to improve reusability. return nil, err @@ -759,6 +743,21 @@ func (app *App) RunTx(t *testing.T, txBz []byte) ( t.Helper() } + // Commit the updated state after the message has been handled. + defer func() { + if _, commitErr := app.Commit(); commitErr != nil { + err = fmt.Errorf("committing state: %w", commitErr) + return + } + + app.nextBlockUpdateCtx() + + // Emit events MUST happen AFTER the context has been updated so that + // events are available on the context for the block after their actions + // were committed (e.g. msgs, begin/end block trigger). + app.emitEvents(t, finalizeBlockRes) + }() + // Finalize the block with the transaction. finalizeBlockReq := &cmtabcitypes.RequestFinalizeBlock{ Height: app.LastBlockHeight() + 1, @@ -829,7 +828,9 @@ func (app *App) NextBlocks(t *testing.T, numBlocks int) { // emitEvents emits the events from the finalized block such that they are available // via the current context's event manager (i.e. app.GetSdkCtx().EventManager.Events()). func (app *App) emitEvents(t *testing.T, res *abci.ResponseFinalizeBlock) { - t.Helper() + if t != nil { + t.Helper() + } // Emit begin/end blocker events. for _, event := range res.Events {