Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sims): Complete sims integration for app v2 #23478

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions scripts/build/simulations.mk
Original file line number Diff line number Diff line change
@@ -1,32 +1,10 @@
# TODO: This should be ported to work with SimApp v2.

#? test-sim-nondeterminism: Run non-determinism test for simapp
test-sim-nondeterminism:
@echo "Running non-determinism test..."
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \
-NumBlocks=100 -BlockSize=200


# Requires an exported plugin. See store/streaming/README.md for documentation.
#
# example:
# export COSMOS_SDK_ABCI_V1=<path-to-plugin-binary>
# make test-sim-nondeterminism-streaming
#
# Using the built-in examples:
# export COSMOS_SDK_ABCI_V1=<path-to-sdk>/store/streaming/abci/examples/file/file
# make test-sim-nondeterminism-streaming
test-sim-nondeterminism-streaming:
# @echo "Running non-determinism-streaming test..."
# @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \
# -NumBlocks=100 -BlockSize=200 -EnableStreaming=true

test-sim-custom-genesis-fast:
# @echo "Running custom genesis simulation..."
# @echo "By default, ${HOME}/.simapp/config/genesis.json will be used."
# @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \
# -NumBlocks=100 -BlockSize=200 -Seed=99 -SigverifyTx=false

test-sim-import-export:
@echo "Running application import/export simulation. This may take several minutes..."
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 20m -tags='sims' -run TestAppImportExport \
Expand All @@ -37,11 +15,6 @@ test-sim-after-import:
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestAppSimulationAfterImport \
-NumBlocks=50

test-sim-custom-genesis-multi-seed:
# @echo "Running multi-seed custom genesis simulation..."
# @echo "By default, ${HOME}/.simapp/config/genesis.json will be used."
# @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \
# -NumBlocks=400

test-sim-multi-seed-long:
@echo "Running long multi-seed application simulation. This may take awhile!"
Expand All @@ -55,11 +28,8 @@ test-sim-multi-seed-short:

.PHONY: \
test-sim-nondeterminism \
test-sim-nondeterminism-streaming \
test-sim-custom-genesis-fast \
test-sim-import-export \
test-sim-after-import \
test-sim-custom-genesis-multi-seed \
test-sim-multi-seed-short \
test-sim-multi-seed-long \

Expand Down
1 change: 1 addition & 0 deletions server/v2/appmanager/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format:

## [Unreleased]


## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/appmanager%2Fv1.0.0-beta.2)

* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `TransactionFuzzer`, an interface for processing and generated state transitions.
Expand Down
11 changes: 1 addition & 10 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
"cosmossdk.io/core/event"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
Expand Down Expand Up @@ -510,16 +509,8 @@ func (c *consensus[T]) FinalizeBlock(
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
}

var events []event.Event
events = append(events, resp.PreBlockEvents...)
events = append(events, resp.BeginBlockEvents...)
for _, tx := range resp.TxResults {
events = append(events, tx.Events...)
}
events = append(events, resp.EndBlockEvents...)

// listen to state streaming changes in accordance with the block
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, resp.TxResults, events, stateChanges)
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, *resp, stateChanges)
if err != nil {
return nil, err
}
Expand Down
109 changes: 91 additions & 18 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package cometbft
import (
"context"
"encoding/json"
"fmt"

"cosmossdk.io/core/event"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
Expand All @@ -18,14 +20,69 @@ func (c *consensus[T]) streamDeliverBlockChanges(
height int64,
txs [][]byte,
decodedTxs []T,
blockResp server.BlockResponse,
stateChanges []store.StateChanges,
) error {
return StreamOut(ctx, height, txs, decodedTxs, blockResp, stateChanges, c.streamingManager, c.listener, c.cfg.AppTomlConfig.Trace, c.logger.Error)
}

// StreamOut stream all the changes happened during deliver block.
func StreamOut[T transaction.Tx](
ctx context.Context,
height int64,
rawTXs [][]byte,
decodedTXs []T,
blockRsp server.BlockResponse,
stateChanges []store.StateChanges,
streamingManager streaming.Manager,
listener *appdata.Listener,
traceErrs bool,
logErrFn func(msg string, keyVals ...any),
) error {
var events []event.Event
events = append(events, blockRsp.PreBlockEvents...)
events = append(events, blockRsp.BeginBlockEvents...)
for _, tx := range blockRsp.TxResults {
events = append(events, tx.Events...)
}
events = append(events, blockRsp.EndBlockEvents...)
txResults := blockRsp.TxResults

err := doServeStreamListeners(
ctx,
height,
rawTXs,
txResults,
traceErrs,
streamingManager,
events,
logErrFn,
stateChanges,
)
if err != nil {
return err
}
return doServeHookListener(listener, height, rawTXs, decodedTXs, events, stateChanges)
}

func doServeStreamListeners(
ctx context.Context,
height int64,
rawTXs [][]byte,
txResults []server.TxResult,
traceErrs bool,
streamingManager streaming.Manager,
events []event.Event,
logErrFn func(msg string, keyVals ...any),
stateChanges []store.StateChanges,
) error {
if len(streamingManager.Listeners) == 0 {
return nil
}
// convert txresults to streaming txresults
streamingTxResults := make([]*streaming.ExecTxResult, len(txResults))
for i, txResult := range txResults {
space, code, log := errorsmod.ABCIInfo(txResult.Error, c.cfg.AppTomlConfig.Trace)
space, code, log := errorsmod.ABCIInfo(txResult.Error, traceErrs)

events, err := streaming.IntoStreamingEvents(txResult.Events)
if err != nil {
Expand All @@ -42,31 +99,47 @@ func (c *consensus[T]) streamDeliverBlockChanges(
}
}

for _, streamingListener := range c.streamingManager.Listeners {
for _, streamingListener := range streamingManager.Listeners {
events, err := streaming.IntoStreamingEvents(events)
if err != nil {
alpe marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if err := streamingListener.ListenDeliverBlock(ctx, streaming.ListenDeliverBlockRequest{
BlockHeight: height,
Txs: txs,
Txs: rawTXs,
TxResults: streamingTxResults,
Events: events,
}); err != nil {
c.logger.Error("ListenDeliverBlock listening hook failed", "height", height, "err", err)
if streamingManager.StopNodeOnErr {
return fmt.Errorf("listen deliver block: %w", err)
}
logErrFn("ListenDeliverBlock listening hook failed", "height", height, "err", err)
}

if err := streamingListener.ListenStateChanges(ctx, intoStreamingKVPairs(stateChanges)); err != nil {
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
if streamingManager.StopNodeOnErr {
return fmt.Errorf("listen state changes: %w", err)
}
logErrFn("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}
return nil
}

if c.listener == nil {
func doServeHookListener[T transaction.Tx](
listener *appdata.Listener,
height int64,
rawTXs [][]byte,
decodedTXs []T,
events []event.Event,
stateChanges []store.StateChanges,
) error {
if listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
if listener.StartBlock != nil {
if err := listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
Expand All @@ -75,35 +148,35 @@ func (c *consensus[T]) streamDeliverBlockChanges(
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
if listener.OnTx != nil {
for i, tx := range rawTXs {
if err := listener.OnTx(appdata.TxData{
BlockNumber: uint64(height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: func() (json.RawMessage, error) {
return json.Marshal(decodedTxs[i])
return json.Marshal(decodedTXs[i])
},
}); err != nil {
return err
}
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
if listener.OnEvent != nil {
if err := listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
if listener.OnKVPair != nil {
if err := listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
if listener.Commit != nil {
if completionCallback, err := listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/v2/stf/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format:

## [Unreleased]


## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/stf%2Fv1.0.0-beta.2)

* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `DeliverSims`, an interface for state transitions by sims.
Expand Down
3 changes: 2 additions & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ require (
google.golang.org/protobuf v1.36.3
)

require cosmossdk.io/schema v1.0.0

require (
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.36.3-20241120201313-68e42a58b301.1 // indirect
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.36.3-20240130113600-88ef6483f90f.1 // indirect
Expand All @@ -64,7 +66,6 @@ require (
cosmossdk.io/core/testing v0.0.1 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/errors/v2 v2.0.0 // indirect
cosmossdk.io/schema v1.0.0 // indirect
cosmossdk.io/server/v2/stf v1.0.0-beta.2 // indirect
cosmossdk.io/store v1.10.0-rc.1.0.20241218084712-ca559989da43 // indirect
cosmossdk.io/x/tx v1.0.1 // indirect
Expand Down
Loading
Loading