diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index d843984d9..731fd62f7 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -8,6 +8,7 @@ import ( "sync" "testing" + comettypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/cosmos-sdk/types/module" "github.com/gorilla/websocket" @@ -26,14 +27,14 @@ import ( // E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing type E2EApp struct { *integration.App - grpcServer *grpc.Server - grpcListener net.Listener - wsServer *http.Server - wsListener net.Listener - wsUpgrader websocket.Upgrader - wsConnMutex sync.RWMutex - wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries - blockEventChan chan *coretypes.ResultEvent + grpcServer *grpc.Server + grpcListener net.Listener + wsServer *http.Server + wsListener net.Listener + wsUpgrader websocket.Upgrader + wsConnMutex sync.RWMutex + wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries + resultEventChan chan *coretypes.ResultEvent } // NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers @@ -78,13 +79,13 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp require.NoError(t, err, "failed to create WebSocket listener") e2eApp := &E2EApp{ - App: app, - grpcListener: grpcListener, - grpcServer: grpcServer, - wsListener: wsListener, - wsConnections: make(map[*websocket.Conn]map[string]struct{}), - wsUpgrader: websocket.Upgrader{}, - blockEventChan: make(chan *coretypes.ResultEvent, 1), + App: app, + grpcListener: grpcListener, + grpcServer: grpcServer, + wsListener: wsListener, + wsConnections: make(map[*websocket.Conn]map[string]struct{}), + wsUpgrader: websocket.Upgrader{}, + resultEventChan: make(chan *coretypes.ResultEvent), } mux.Handle(http.MethodPost, rootPattern, newPostHandler(client, e2eApp)) @@ -117,7 +118,7 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp }() // Start event handling - go e2eApp.handleBlockEvents(t) + go e2eApp.handleResultEvents(t) return e2eApp } @@ -129,7 +130,7 @@ func (app *E2EApp) Close() error { return err } - close(app.blockEventChan) + close(app.resultEventChan) return nil } @@ -147,3 +148,17 @@ func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) func (app *E2EApp) GetWSEndpoint() string { return "ws://" + app.wsListener.Addr().String() + "/websocket" } + +// TODO_IN_THIS_COMMIT: godoc & move... +func (app *E2EApp) GetCometBlockID() comettypes.BlockID { + lastBlockID := app.GetSdkCtx().BlockHeader().LastBlockId + partSetHeader := lastBlockID.GetPartSetHeader() + + return comettypes.BlockID{ + Hash: lastBlockID.GetHash(), + PartSetHeader: comettypes.PartSetHeader{ + Total: partSetHeader.GetTotal(), + Hash: partSetHeader.GetHash(), + }, + } +} diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index 18f53c9b3..0d743da3a 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -124,7 +124,7 @@ func TestNewE2EApp(t *testing.T) { _, err = app.RunMsg(t, &banktypes.MsgSend{ FromAddress: app.GetFaucetBech32(), ToAddress: gateway2Addr.String(), - Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 100000000)), + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), }) require.NoError(t, err) @@ -319,3 +319,108 @@ func TestSanity3(t *testing.T) { t.Logf("result: %s", result) } + +func TestSanity4(t *testing.T) { + ctx := context.Background() + initialHeight := int64(7553) + // TODO_IN_THIS_COMMIT: does this 👆 need to be reconciled with the internal height of app? + + //app := NewE2EApp(t) + + //registry := codectypes.NewInterfaceRegistry() + //cdc := codec.NewProtoCodec(registry) + keyRing := keyring.NewInMemory(testclient.Marshaler) + _, err := keyRing.NewAccount( + "pnf", + "crumble shrimp south strategy speed kick green topic stool seminar track stand rhythm almost bubble pet knock steel pull flag weekend country major blade", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + + //// TODO_IN_THIS_COMMOT: fund gateway2 account. + //_, err = app.RunMsg(t, &banktypes.MsgSend{ + // FromAddress: app.GetFaucetBech32(), + // ToAddress: pnfAddr.String(), + // Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), + //}) + //require.NoError(t, err) + + ctrl := gomock.NewController(t) + blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) + blockQueryClient.EXPECT(). + Block(gomock.Any(), gomock.Any()). + DoAndReturn( + func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { + //time.Sleep(time.Second * 100) + blockResultMock := &cometrpctypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + Height: initialHeight, + }, + }, + } + return blockResultMock, nil + }, + ).AnyTimes() + //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") + //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:26657") + //require.NoError(t, err) + + //creds := insecure.NewCredentials() + //grpcConn := testclient.NewLocalnetClientCtx(t, flagSet).GetClient() + //grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) + //require.NoError(t, err) + + // TODO_IN_THIS_COMMIT: NOT localnet flagset NOR context, should be + // configured to match the E2E app listeners. + //flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + flagSet := testclient.NewLocalnetFlagSet(t) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + deps := depinject.Supply(clientCtx, blockQueryClient) + + sharedQueryClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) + + sharedParams, err := sharedQueryClient.GetParams(ctx) + require.NoError(t, err) + + t.Logf("shared params: %+v", sharedParams) + + eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:26657/websocket") + deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + blockClient, err := block.NewBlockClient(ctx, deps) + require.NoError(t, err) + + txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + + //_, txContext := testtx.NewE2ETxContext(t, keyRing, flagSet) + txContext, err := tx.NewTxContext(deps) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) + txClient, err := tx.NewTxClient(ctx, deps, tx.WithSigningKeyName("pnf")) + require.NoError(t, err) + + time.Sleep(time.Second * 1) + + eitherErr := txClient.SignAndBroadcast( + ctx, + &banktypes.MsgSend{ + FromAddress: "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + ToAddress: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), + }, + ) + + // TODO_IN_THIS_COMMIT: signal to the WS server to send another block result event... + //app.NextBlock(t) + + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + require.NoError(t, <-errCh) +} diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index 61a6a9d26..afd64c127 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -13,6 +13,7 @@ import ( "github.com/cometbft/cometbft/abci/types" coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + comettypes "github.com/cometbft/cometbft/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" gogogrpc "github.com/cosmos/gogoproto/grpc" "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -167,7 +168,12 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc // Simulate 1 second block production delay. time.Sleep(time.Second * 1) - app.EmitWSEvents(finalizeBlockRes.GetEvents()) + fmt.Println(">>> emitting ws events") + //app.EmitWSEvents(app.GetSdkCtx().EventManager().Events()) + + // TODO_IMPROVE: If we want/need to support multiple txs per + // block in the future, this will have to be refactored. + app.EmitWSEvents(finalizeBlockRes, txBz) }() // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as @@ -179,7 +185,7 @@ func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc Data: txRes.GetData(), Log: txRes.GetLog(), Codespace: txRes.GetCodespace(), - //Hash:, + Hash: comettypes.Tx(txBz).Hash(), } response = rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go index c38a405a6..f69d4677f 100644 --- a/testutil/e2e/ws_server.go +++ b/testutil/e2e/ws_server.go @@ -2,6 +2,7 @@ package e2e import ( "encoding/json" + "fmt" "net/http" "strings" "testing" @@ -9,9 +10,10 @@ import ( abci "github.com/cometbft/cometbft/abci/types" coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + comettypes "github.com/cometbft/cometbft/types" cosmostypes "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/tx" "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" ) // newWebSocketServer creates and configures a new WebSocket server for the E2EApp @@ -36,8 +38,7 @@ func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { } // TODO_IN_THIS_COMMIT: move -var mockBlockResultJSON = ` -{ +var mockBlockResultJSON = `{ "query" : "tm.event='NewBlock'", "data" : { "type" : "tendermint/event/NewBlock", @@ -322,6 +323,152 @@ var mockBlockResultJSON = ` } }` +const mockTxResultEventJSON = `{ +"query" : "tm.event='Tx' AND message.sender='pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw'", +"data" : { + "type" : "tendermint/event/Tx", + "value" : { + "TxResult" : { + "height" : "471", + "tx" : "CpYBCpABChwvY29zbW9zLmJhbmsudjFiZXRhMS5Nc2dTZW5kEnAKK3Bva3QxZWVla3NoMnR2a2g3d3ptZnJsam5odzR3cmhzNTVsY3V2bWVra3cSK3Bva3QxNXczZmhmeWMwbHR0djdyNTg1ZTJuY3BmNnQya2w5dWg4cnNueXoaFAoFdXBva3QSCzEwMDAwMDAwMDAwGIY7EloKUApGCh8vY29zbW9zLmNyeXB0by5zZWNwMjU2azEuUHViS2V5EiMKIQLcGKC0uUKBIbYq4/pz+5scHn0Xk/qiXX23JtYSW0rpTRIECgIIARgEEgYQqqGCyQIaQDXGt2kVA/IWj/7HMgfX3fK5tJrwJ7V0nhWAeDlnpcFcJM7k958ee9gJvk8HjRL2BOD97PEnXalu/zFu+YYChuw=", + "result" : { + "data" : "EiYKJC9jb3Ntb3MuYmFuay52MWJldGExLk1zZ1NlbmRSZXNwb25zZQ==", + "gas_wanted" : "690000042", + "gas_used" : "48137", + "events" : [ { + "type" : "tx", + "attributes" : [ { + "key" : "fee", + "value" : "", + "index" : true + }, { + "key" : "fee_payer", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + "index" : true + } ] + }, { + "type" : "tx", + "attributes" : [ { + "key" : "acc_seq", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw/4", + "index" : true + } ] + }, { + "type" : "tx", + "attributes" : [ { + "key" : "signature", + "value" : "Nca3aRUD8haP/scyB9fd8rm0mvAntXSeFYB4OWelwVwkzuT3nx572Am+TweNEvYE4P3s8SddqW7/MW75hgKG7A==", + "index" : true + } ] + }, { + "type" : "message", + "attributes" : [ { + "key" : "action", + "value" : "/cosmos.bank.v1beta1.MsgSend", + "index" : true + }, { + "key" : "sender", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + "index" : true + }, { + "key" : "module", + "value" : "bank", + "index" : true + }, { + "key" : "msg_index", + "value" : "0", + "index" : true + } ] + }, { + "type" : "coin_spent", + "attributes" : [ { + "key" : "spender", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + "index" : true + }, { + "key" : "amount", + "value" : "10000000000upokt", + "index" : true + }, { + "key" : "msg_index", + "value" : "0", + "index" : true + } ] + }, { + "type" : "coin_received", + "attributes" : [ { + "key" : "receiver", + "value" : "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + "index" : true + }, { + "key" : "amount", + "value" : "10000000000upokt", + "index" : true + }, { + "key" : "msg_index", + "value" : "0", + "index" : true + } ] + }, { + "type" : "transfer", + "attributes" : [ { + "key" : "recipient", + "value" : "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + "index" : true + }, { + "key" : "sender", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + "index" : true + }, { + "key" : "amount", + "value" : "10000000000upokt", + "index" : true + }, { + "key" : "msg_index", + "value" : "0", + "index" : true + } ] + }, { + "type" : "message", + "attributes" : [ { + "key" : "sender", + "value" : "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", + "index" : true + }, { + "key" : "msg_index", + "value" : "0", + "index" : true + } ] + } ] + } + } + } +}, +"events" : { + "coin_received.receiver" : [ "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz" ], + "tx.fee" : [ "" ], + "message.sender" : [ "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw", "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw" ], + "coin_received.amount" : [ "10000000000upokt" ], + "coin_received.msg_index" : [ "0" ], + "transfer.recipient" : [ "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz" ], + "transfer.amount" : [ "10000000000upokt" ], + "tm.event" : [ "Tx" ], + "tx.acc_seq" : [ "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw/4" ], + "coin_spent.amount" : [ "10000000000upokt" ], + "coin_spent.spender" : [ "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw" ], + "transfer.msg_index" : [ "0" ], + "tx.hash" : [ "119E4CAC3E395B256C9F87E5B2295DAC687C6312CCA9C701176A25153EE03B1A" ], + "tx.fee_payer" : [ "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw" ], + "tx.signature" : [ "Nca3aRUD8haP/scyB9fd8rm0mvAntXSeFYB4OWelwVwkzuT3nx572Am+TweNEvYE4P3s8SddqW7/MW75hgKG7A==" ], + "message.msg_index" : [ "0", "0" ], + "coin_spent.msg_index" : [ "0" ], + "transfer.sender" : [ "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw" ], + "tx.height" : [ "471" ], + "message.action" : [ "/cosmos.bank.v1beta1.MsgSend" ], + "message.module" : [ "bank" ] +} +}` + // handleWebSocketConnection handles messages from a specific WebSocket connection func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { defer func() { @@ -377,63 +524,90 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { } } -// handleBlockEvents coordinates block finalization with WebSocket event broadcasting -func (app *E2EApp) handleBlockEvents(t *testing.T) { - for event := range app.blockEventChan { +// handleResultEvents coordinates block finalization with WebSocket event broadcasting +func (app *E2EApp) handleResultEvents(t *testing.T) { + t.Helper() + + for event := range app.resultEventChan { + fmt.Printf(">>> WS event: %+v\n", event) + fmt.Printf(">>> num WS conns: %d\n", len(app.wsConnections)) + app.wsConnMutex.RLock() for conn, queries := range app.wsConnections { // Check if connection is subscribed to this event type for query := range queries { - _ = query - _ = event - //if eventMatchesQuery(event, query) { - // // Marshal the event to JSON - // eventJSON, err := json.Marshal(event) - // if err != nil { - // t.Logf("failed to marshal event: %v", err) - // continue - // } - - response := rpctypes.RPCResponse{ - JSONRPC: "2.0", - ID: nil, // Events don't have an ID - // TODO_IN_THIS_COMMIT: make this dynamic! - Result: json.RawMessage(mockBlockResultJSON), - } + queryPartPairs := parseQuery(t, query) + + for queryKey, queryValue := range queryPartPairs { + eventQueryValue, hasQueryKey := event.Events[queryKey] + if !hasQueryKey { + continue + } + + // TODO_IN_THIS_COMMIT: comment explaining 0th index... + if eventQueryValue[0] != strings.Trim(queryValue, "'") { + continue + } + + fmt.Printf(">>> checking query: %s\n", query) - if err := conn.WriteJSON(response); err != nil { - app.wsConnMutex.RUnlock() - app.wsConnMutex.Lock() - delete(app.wsConnections, conn) - app.wsConnMutex.Unlock() - app.wsConnMutex.RLock() - continue + // DEV_NOTE: An empty request ID is consistent with the cometbft + // implementation and is the reason that we MUST use a distinct + // websocket connection per query; it's not possible to determine + // to which query any given event corresponds. + response := rpctypes.NewRPCSuccessResponse(nil, event) + + if err := conn.WriteJSON(response); err != nil { + app.wsConnMutex.RUnlock() + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + app.wsConnMutex.RLock() + continue + } } - //} } } app.wsConnMutex.RUnlock() } } -// TODO_IN_THIS_COMMIT: also wrap RunMsgs... -// TODO_IN_THIS_COMMIT: godoc... -// Override RunMsg to also emit transaction events via WebSocket -func (app *E2EApp) RunMsg(t *testing.T, msg cosmostypes.Msg) (tx.MsgResponse, error) { - msgRes, err := app.App.RunMsg(t, msg) - if err != nil { - return nil, err - } +// TODO_IN_THIS_COMMIT: godoc and move... +func parseQuery(t *testing.T, query string) map[string]string { + t.Helper() - // Create and emit block event with transaction results - blockEvent := createBlockEvent(app.GetSdkCtx(), msgRes) - app.blockEventChan <- blockEvent + queryParts := strings.Split(query, " AND ") + queryPartPairs := make(map[string]string) + for _, queryPart := range queryParts { + queryPartPair := strings.Split(queryPart, "=") + require.Equal(t, 2, len(queryPartPair)) - return msgRes, nil + queryPartKey := strings.Trim(queryPartPair[0], `" `) + queryPartValue := strings.Trim(queryPartPair[1], `" `) + queryPartPairs[queryPartKey] = queryPartValue + } + + return queryPartPairs } +//// TODO_IN_THIS_COMMIT: also wrap RunMsgs... +//// TODO_IN_THIS_COMMIT: godoc... +//// Override RunMsg to also emit transaction events via WebSocket +//func (app *E2EApp) RunMsg(t *testing.T, msg cosmostypes.Msg) (tx.MsgResponse, error) { +// msgRes, err := app.App.RunMsg(t, msg) +// if err != nil { +// return nil, err +// } +// +// // Create and emit block event with transaction results +// blockEvent := createBlockEvent(app.GetSdkCtx()) +// app.resultEventChan <- blockEvent +// +// return msgRes, nil +//} + // createBlockEvent creates a CometBFT-compatible event from transaction results -func createBlockEvent(ctx *cosmostypes.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent { +func createBlockEvent(ctx *cosmostypes.Context) *coretypes.ResultEvent { // Convert SDK events to map[string][]string format that CometBFT expects events := make(map[string][]string) for _, event := range ctx.EventManager().Events() { @@ -459,26 +633,119 @@ func createBlockEvent(ctx *cosmostypes.Context, msgRes tx.MsgResponse) *coretype } // TODO_IN_THIS_COMMIT: godoc... -func (app *E2EApp) EmitWSEvents(events []abci.Event) { +func (app *E2EApp) EmitWSEvents(finalizeBlockRes *abci.ResponseFinalizeBlock, txBz []byte) { + //resultEvent := &coretypes.ResultEvent{ + // Query: "tm.event='NewBlock'", + // Data: map[string]interface{}{ + // //"height": ctx.BlockHeight(), + // //"hash": ctx.BlockHeader().LastBlockId.Hash, + // //"events": events, + // // Add other relevant block and transaction data here as needed + // }, + // //Events: events, + //} + + //emitEvent := func(event abci.Event, query string) error { + // eventAny, err := codectypes.NewAnyWithValue(&event) + // if err != nil { + // return err + // } + // + // resultEvent := &coretypes.ResultEvent{ + // Query: query, + // Data: eventAny, + // Events: nil, + // } + // + // app.resultEventChan <- resultEvent + // + // return nil + //} + //for _, event := range finalizeBlockRes.GetEvents() { + // // TODO_IN_THIS_COMMIT: reconsider how to populate the queries... + // if err := emitEvent(event, comettypes.EventQueryNewBlock.String()); err != nil { + // app.Logger().Error(err.Error()) + // } + //} + //for _, txResult := range finalizeBlockRes.GetTxResults() { + // for _, event := range txResult.GetEvents() { + // // TODO_IN_THIS_COMMIT: reconsider how to populate the queries... + // if err := emitEvent(event, comettypes.EventQueryTx.String()); err != nil { + // app.Logger().Error(err.Error()) + // } + // } + //} + // TODO_IN_THIS_COMMIT: necessary? //app.wsConnMutex.RLock() //defer app.wsConnMutex.RUnlock() + events := validateAndStringifyEvents(finalizeBlockRes.GetEvents()) + // DEV_NOTE: see https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L138 + events[comettypes.EventTypeKey] = append(events[comettypes.EventTypeKey], comettypes.EventNewBlock) + + evtDataNewBlock := comettypes.EventDataNewBlock{ + // TODO_IN_THIS_COMMIT: add block... + Block: nil, + BlockID: app.GetCometBlockID(), + ResultFinalizeBlock: abci.ResponseFinalizeBlock{}, + } + + // TODO_IN_THIS_COMMIT: comment... + resultEvent := &coretypes.ResultEvent{ + Query: comettypes.EventQueryNewBlock.String(), + Data: evtDataNewBlock, + Events: events, + } + + app.resultEventChan <- resultEvent + + // TODO_IN_THIS_COMMIT: comment... + for idx, txResult := range finalizeBlockRes.GetTxResults() { + events = validateAndStringifyEvents(txResult.GetEvents()) + // DEV_NOTE: see https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L180 + events[comettypes.EventTypeKey] = append(events[comettypes.EventTypeKey], comettypes.EventTx) + events[comettypes.TxHashKey] = append(events[comettypes.TxHashKey], fmt.Sprintf("%X", comettypes.Tx(txBz).Hash())) + events[comettypes.TxHeightKey] = append(events[comettypes.TxHeightKey], fmt.Sprintf("%d", app.GetSdkCtx().BlockHeight())) + + evtDataTx := comettypes.EventDataTx{ + TxResult: abci.TxResult{ + Height: app.GetSdkCtx().BlockHeight(), + Index: uint32(idx), + Tx: txBz, + Result: *txResult, + }, + } + + resultEvent = &coretypes.ResultEvent{ + Query: comettypes.EventQueryTx.String(), + Data: evtDataTx, + Events: events, + } + + app.resultEventChan <- resultEvent + } + + // TODO_IN_THIS_COMMIT: emit individual events... +} + +// TODO_IN_THIS_COMMIT: godoc... see: https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L112 +func validateAndStringifyEvents(events []abci.Event) map[string][]string { + result := make(map[string][]string) for _, event := range events { - for conn, queries := range app.wsConnections { - // Check if connection is subscribed to this event type - for query, _ := range queries { - if eventMatchesQuery(event, query) { - // Marshal the event to JSON - _ = conn - } + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue } + + compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) + result[compositeTag] = append(result[compositeTag], attr.Value) } } -} -// eventMatchesQuery checks if an event matches a subscription query -func eventMatchesQuery(event abci.Event, query string) bool { - // Basic implementation - should be expanded to handle more complex queries - return strings.Contains(query, event.Type) + return result } diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 2427a781d..d5fe4ad07 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -807,7 +807,7 @@ func (app *App) RunTx(t *testing.T, txBz []byte) ( require.NoError(t, err) require.NotNil(t, txMsgRes) } else { - return nil, nil, err + return nil, finalizeBlockRes, err } txMsgResps = append(txMsgResps, txMsgRes) @@ -833,10 +833,12 @@ func (app *App) emitEvents(t *testing.T, res *abci.ResponseFinalizeBlock) { } // Emit begin/end blocker events. - for _, event := range res.Events { - testutilevents.QuoteEventMode(&event) - abciEvent := cosmostypes.Event(event) - app.sdkCtx.EventManager().EmitEvent(abciEvent) + if res.Events != nil { + for _, event := range res.Events { + testutilevents.QuoteEventMode(&event) + abciEvent := cosmostypes.Event(event) + app.sdkCtx.EventManager().EmitEvent(abciEvent) + } } // Emit txResult events.