Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jan 14, 2025
1 parent 2b76ef1 commit 0b1ff7f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 59 deletions.
42 changes: 19 additions & 23 deletions testutil/e2e/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/cometbft/cometbft/abci/types"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
coregrpc "github.com/cometbft/cometbft/rpc/grpc"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"
Expand Down Expand Up @@ -151,39 +151,35 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc

// TODO_CONSIDERATION: more correct implementation of the different
// broadcast_tx methods (i.e. sync, async, commit) is a matter of
// the sequence of running the tx and sending the JSON-RPC response.
// the sequencing of the following:
// - calling the finalize block ABCI method
// - returning the JSON-RPC response
// - emitting websocket event

_, finalizeBlockRes, err := app.RunTx(nil, txBz)
if err != nil {
writeErrorResponseFromErr(w, req, err)
return
}

// TODO_IN_THIS_COMMIT: something better...
go func() {
// Simulate 1 second block production delay.
time.Sleep(time.Second * 1)

app.EmitWSEvents(finalizeBlockRes.GetEvents())
}()

// DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as
// we're finalizing one tx at a time (single tx blocks).
txRes := finalizeBlockRes.GetTxResults()[0]

bcastTxRes := coregrpc.ResponseBroadcastTx{
CheckTx: &types.ResponseCheckTx{
Code: txRes.GetCode(),
Data: txRes.GetData(),
Log: txRes.GetLog(),
Info: txRes.GetInfo(),
GasWanted: txRes.GetGasWanted(),
GasUsed: txRes.GetGasUsed(),
Events: txRes.GetEvents(),
Codespace: txRes.GetCodespace(),
},
TxResult: &types.ExecTxResult{
Code: txRes.GetCode(),
Data: txRes.GetData(),
Log: txRes.GetLog(),
Info: txRes.GetInfo(),
GasWanted: txRes.GetGasWanted(),
GasUsed: txRes.GetGasUsed(),
Events: txRes.GetEvents(),
Codespace: txRes.GetCodespace(),
},
bcastTxRes := coretypes.ResultBroadcastTx{
Code: txRes.GetCode(),
Data: txRes.GetData(),
Log: txRes.GetLog(),
Codespace: txRes.GetCodespace(),
//Hash:,
}

response = rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes)
Expand Down
42 changes: 24 additions & 18 deletions testutil/e2e/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"strings"
"testing"

abci "github.com/cometbft/cometbft/abci/types"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
sdk "github.com/cosmos/cosmos-sdk/types"
cosmostypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -418,7 +419,7 @@ func (app *E2EApp) handleBlockEvents(t *testing.T) {
// TODO_IN_THIS_COMMIT: also wrap RunMsgs...
// TODO_IN_THIS_COMMIT: godoc...
// Override RunMsg to also emit transaction events via WebSocket
func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) {
func (app *E2EApp) RunMsg(t *testing.T, msg cosmostypes.Msg) (tx.MsgResponse, error) {
msgRes, err := app.App.RunMsg(t, msg)
if err != nil {
return nil, err
Expand All @@ -432,7 +433,7 @@ func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) {
}

// createBlockEvent creates a CometBFT-compatible event from transaction results
func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent {
func createBlockEvent(ctx *cosmostypes.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent {
// Convert SDK events to map[string][]string format that CometBFT expects
events := make(map[string][]string)
for _, event := range ctx.EventManager().Events() {
Expand All @@ -457,22 +458,27 @@ func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.Result
}
}

//// createTxEvent creates a CometBFT-compatible event from transaction results
//func createTxEvent(tx *coretypes.ResultTx, index int) *coretypes.ResultEvent {
// return &coretypes.ResultEvent{
// Query: "tm.event='Tx'",
// Data: map[string]interface{}{
// "height": ctx.BlockHeight(),
// "hash": ctx.BlockHeader().LastBlockId.Hash,
// "events": events,
// // Add other relevant block and transaction data here as needed
// },
// Events: events,
// }
//}
// TODO_IN_THIS_COMMIT: godoc...
func (app *E2EApp) EmitWSEvents(events []abci.Event) {
// TODO_IN_THIS_COMMIT: necessary?
//app.wsConnMutex.RLock()
//defer app.wsConnMutex.RUnlock()

for _, event := range events {
for conn, queries := range app.wsConnections {
// Check if connection is subscribed to this event type
for query, _ := range queries {
if eventMatchesQuery(event, query) {
// Marshal the event to JSON
_ = conn
}
}
}
}
}

// eventMatchesQuery checks if an event matches a subscription query
func eventMatchesQuery(event *coretypes.ResultEvent, query string) bool {
func eventMatchesQuery(event abci.Event, query string) bool {
// Basic implementation - should be expanded to handle more complex queries
return strings.Contains(query, event.Query)
return strings.Contains(query, event.Type)
}
37 changes: 19 additions & 18 deletions testutil/integration/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,22 +709,6 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo
t.Helper()
}

// Commit the updated state after the message has been handled.
var finalizeBlockRes *abci.ResponseFinalizeBlock
defer func() {
if _, commitErr := app.Commit(); commitErr != nil {
err = fmt.Errorf("committing state: %w", commitErr)
return
}

app.nextBlockUpdateCtx()

// Emit events MUST happen AFTER the context has been updated so that
// events are available on the context for the block after their actions
// were committed (e.g. msgs, begin/end block trigger).
app.emitEvents(t, finalizeBlockRes)
}()

// Package the message into a transaction.
txBuilder := app.txCfg.NewTxBuilder()
if err = txBuilder.SetMsgs(msgs...); err != nil {
Expand All @@ -740,7 +724,7 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo
app.logger.Info("Running msg", "msg", msg.String())
}

txMsgResps, finalizeBlockRes, err = app.RunTx(t, txBz)
txMsgResps, _, err = app.RunTx(t, txBz)
if err != nil {
// DEV_NOTE: Intentionally returning and not asserting nil error to improve reusability.
return nil, err
Expand All @@ -759,6 +743,21 @@ func (app *App) RunTx(t *testing.T, txBz []byte) (
t.Helper()
}

// Commit the updated state after the message has been handled.
defer func() {
if _, commitErr := app.Commit(); commitErr != nil {
err = fmt.Errorf("committing state: %w", commitErr)
return
}

app.nextBlockUpdateCtx()

// Emit events MUST happen AFTER the context has been updated so that
// events are available on the context for the block after their actions
// were committed (e.g. msgs, begin/end block trigger).
app.emitEvents(t, finalizeBlockRes)
}()

// Finalize the block with the transaction.
finalizeBlockReq := &cmtabcitypes.RequestFinalizeBlock{
Height: app.LastBlockHeight() + 1,
Expand Down Expand Up @@ -829,7 +828,9 @@ func (app *App) NextBlocks(t *testing.T, numBlocks int) {
// emitEvents emits the events from the finalized block such that they are available
// via the current context's event manager (i.e. app.GetSdkCtx().EventManager.Events()).
func (app *App) emitEvents(t *testing.T, res *abci.ResponseFinalizeBlock) {
t.Helper()
if t != nil {
t.Helper()
}

// Emit begin/end blocker events.
for _, event := range res.Events {
Expand Down

0 comments on commit 0b1ff7f

Please sign in to comment.