Skip to content

Commit

Permalink
feat: Account for missing chain ID when deduplicating chain events
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinTrinque committed Mar 11, 2024
1 parent 6a84f7d commit 7cc23f5
Show file tree
Hide file tree
Showing 40 changed files with 5,217 additions and 4,690 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
- [10246](https://github.com/vegaprotocol/vega/issues/10246) - Add quantum volumes to teams statistics API.
- [10550](https://github.com/vegaprotocol/vega/issues/10550) - Update network parameters with default values.
- [10612](https://github.com/vegaprotocol/vega/issues/10612) - Convert all assets to be associated to the configured Ethereum chain.
- [10624](https://github.com/vegaprotocol/vega/issues/10624) - Ensure chain event are not duplicated when chain identifier is missing.

### 🐛 Fixes

Expand Down
26 changes: 14 additions & 12 deletions core/banking/asset_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package banking
import (
"context"
"errors"
"fmt"
"sync/atomic"

"code.vegaprotocol.io/vega/core/assets"
Expand All @@ -37,6 +38,7 @@ type assetAction struct {
blockHeight uint64
logIndex uint64
txHash string
chainID string

// all deposit related types
builtinD *types.BuiltinAssetDeposit
Expand Down Expand Up @@ -117,23 +119,23 @@ func (t *assetAction) ERC20AssetList() *types.ERC20AssetList {
func (t *assetAction) String() string {
switch {
case t.IsBuiltinAssetDeposit():
return t.builtinD.String()
return fmt.Sprintf("builtinAssetDeposit(%s)", t.builtinD.String())
case t.IsERC20Deposit():
return t.erc20D.String()
return fmt.Sprintf("erc20Deposit(%s)", t.erc20D.String())
case t.IsERC20AssetList():
return t.erc20AL.String()
return fmt.Sprintf("erc20AssetList(%s)", t.erc20AL.String())
case t.IsERC20AssetLimitsUpdated():
return t.erc20AssetLimitsUpdated.String()
return fmt.Sprintf("erc20AssetLimitsUpdated(%s)", t.erc20AssetLimitsUpdated.String())
case t.IsERC20BridgeStopped():
return t.erc20BridgeStopped.String()
return fmt.Sprintf("erc20BridgeStopped(%s)", t.erc20BridgeStopped.String())
case t.IsERC20BridgeResumed():
return t.erc20BridgeResumed.String()
return fmt.Sprintf("erc20BridgeResumed(%s)", t.erc20BridgeResumed.String())
default:
return ""
}
}

func (t *assetAction) Check(ctx context.Context) error {
func (t *assetAction) Check(_ context.Context) error {
switch {
case t.IsBuiltinAssetDeposit():
return t.checkBuiltinAssetDeposit()
Expand Down Expand Up @@ -189,15 +191,15 @@ func (t *assetAction) getRef() snapshot.TxRef {
case t.IsBuiltinAssetDeposit():
return snapshot.TxRef{Asset: string(common.Builtin), BlockNr: 0, Hash: t.txHash, LogIndex: 0}
case t.IsERC20Deposit():
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex}
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex, ChainId: t.chainID}
case t.IsERC20AssetList():
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex}
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex, ChainId: t.chainID}
case t.IsERC20AssetLimitsUpdated():
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex}
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex, ChainId: t.chainID}
case t.IsERC20BridgeStopped():
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex}
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex, ChainId: t.chainID}
case t.IsERC20BridgeResumed():
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex}
return snapshot.TxRef{Asset: string(common.ERC20), BlockNr: t.blockHeight, Hash: t.txHash, LogIndex: t.logIndex, ChainId: t.chainID}
default:
return snapshot.TxRef{} // this is basically unreachable
}
Expand Down
26 changes: 8 additions & 18 deletions core/banking/bridge_stop_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,33 @@ import (
"code.vegaprotocol.io/vega/core/types"
)

func (e *Engine) BridgeStopped(
ctx context.Context,
stopped bool,
id string,
block, logIndex uint64,
ethTxHash string,
) error {
func (e *Engine) BridgeStopped(ctx context.Context, stopped bool, id string, block uint64, logIndex uint64, ethTxHash string, chainID string) error {
aa := &assetAction{
id: id,
state: newPendingState(),
erc20BridgeStopped: &types.ERC20EventBridgeStopped{BridgeStopped: stopped},
blockHeight: block,
logIndex: logIndex,
txHash: ethTxHash,
chainID: chainID,
erc20BridgeStopped: &types.ERC20EventBridgeStopped{BridgeStopped: stopped},
bridgeView: e.bridgeView,
}
e.assetActs[aa.id] = aa
e.assetActions[aa.id] = aa
return e.witness.StartCheck(aa, e.onCheckDone, e.timeService.GetTimeNow().Add(defaultValidationDuration))
}

func (e *Engine) BridgeResumed(
ctx context.Context,
resumed bool,
id string,
block, logIndex uint64,
ethTxHash string,
) error {
func (e *Engine) BridgeResumed(ctx context.Context, resumed bool, id string, block uint64, logIndex uint64, ethTxHash string, chainID string) error {
aa := &assetAction{
id: id,
state: newPendingState(),
erc20BridgeResumed: &types.ERC20EventBridgeResumed{BridgeResumed: resumed},
blockHeight: block,
logIndex: logIndex,
txHash: ethTxHash,
chainID: chainID,
bridgeView: e.bridgeView,
}
e.assetActs[aa.id] = aa
e.assetActions[aa.id] = aa
return e.witness.StartCheck(aa, e.onCheckDone, e.timeService.GetTimeNow().Add(defaultValidationDuration))
}

Expand All @@ -66,7 +56,7 @@ type bridgeState struct {
active bool
// last block + log index we received an update from the bridge
// this will be used later to verify no new state of the bridge is processed
// in a wrong orderi
// in a wrong order.
block, logIndex uint64
}

Expand Down
2 changes: 1 addition & 1 deletion core/banking/builtin_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (e *Engine) DepositBuiltinAsset(
asset: asset,
txHash: hex.EncodeToString(vgcrypto.Hash(b)),
}
e.assetActs[aa.id] = aa
e.assetActions[aa.id] = aa
e.deposits[dep.ID] = dep
return e.witness.StartCheck(aa, e.onCheckDone, e.timeService.GetTimeNow().Add(defaultValidationDuration))
}
Expand Down
39 changes: 28 additions & 11 deletions core/banking/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package banking

import (
"context"
"fmt"
"sort"
"sync/atomic"

Expand All @@ -31,6 +32,7 @@ import (
eventspb "code.vegaprotocol.io/vega/protos/vega/events/v1"

"github.com/emirpasic/gods/sets/treeset"
"go.uber.org/zap"
)

func (e *Engine) Name() types.CheckpointName {
Expand All @@ -47,13 +49,13 @@ func (e *Engine) Checkpoint() ([]byte, error) {
LastSeenEthBlock: e.lastSeenEthBlock,
}

msg.SeenRefs = make([]string, 0, e.seen.Size())
iter := e.seen.Iterator()
msg.SeenRefs = make([]string, 0, e.seenAssetActions.Size())
iter := e.seenAssetActions.Iterator()
for iter.Next() {
msg.SeenRefs = append(msg.SeenRefs, iter.Value().(string))
}

msg.AssetActions = make([]*checkpoint.AssetAction, 0, len(e.assetActs))
msg.AssetActions = make([]*checkpoint.AssetAction, 0, len(e.assetActions))
for _, aa := range e.getAssetActions() {
msg.AssetActions = append(msg.AssetActions, aa.IntoProto())
}
Expand Down Expand Up @@ -83,9 +85,9 @@ func (e *Engine) Load(ctx context.Context, data []byte) error {

e.loadBridgeState(b.BridgeState)

e.seen = treeset.NewWithStringComparator()
e.seenAssetActions = treeset.NewWithStringComparator()
for _, v := range b.SeenRefs {
e.seen.Add(v)
e.seenAssetActions.Add(v)
}

e.lastSeenEthBlock = b.LastSeenEthBlock
Expand All @@ -99,7 +101,7 @@ func (e *Engine) Load(ctx context.Context, data []byte) error {
aa = append(aa, types.AssetActionFromProto(a))
}
e.loadAssetActions(aa)
for _, aa := range e.assetActs {
for _, aa := range e.assetActions {
e.witness.StartCheck(aa, e.onCheckDone, e.timeService.GetTimeNow().Add(defaultValidationDuration))
}

Expand Down Expand Up @@ -145,6 +147,7 @@ func (e *Engine) loadAssetActions(aa []*types.AssetAction) {
asset: asset,
logIndex: v.TxIndex,
txHash: v.Hash,
chainID: v.ChainID,
builtinD: v.BuiltinD,
erc20AL: v.Erc20AL,
erc20D: v.Erc20D,
Expand All @@ -155,12 +158,18 @@ func (e *Engine) loadAssetActions(aa []*types.AssetAction) {
bridgeView: e.bridgeView,
}

e.log.Info("loadAssetActions",
zap.Any("action", fmt.Sprintf("%+v", aa)),
zap.String("ref", aa.getRef().Hash),
zap.String("chain-id", v.ChainID),
)

if len(aa.getRef().Hash) == 0 {
// if we're here it means that the IntoProto code has not done its job properly for a particular asset action type
e.log.Panic("asset action has not been serialised correct and is empty", logging.String("txHash", aa.txHash))
}

e.assetActs[v.ID] = aa
e.assetActions[v.ID] = aa
// store the deposit in the deposits
if v.BuiltinD != nil {
e.deposits[v.ID] = e.newDeposit(v.ID, v.BuiltinD.PartyID, v.BuiltinD.VegaAssetID, v.BuiltinD.Amount, v.Hash)
Expand Down Expand Up @@ -323,8 +332,8 @@ func (e *Engine) getScheduledGovernanceTransfers() []*checkpoint.ScheduledGovern
}

func (e *Engine) getAssetActions() []*types.AssetAction {
aa := make([]*types.AssetAction, 0, len(e.assetActs))
for _, v := range e.assetActs {
aa := make([]*types.AssetAction, 0, len(e.assetActions))
for _, v := range e.assetActions {
// this is optional as bridge action don't have one
var assetID string
if v.asset != nil {
Expand All @@ -341,20 +350,28 @@ func (e *Engine) getAssetActions() []*types.AssetAction {
bridgeResumed = true
}

aa = append(aa, &types.AssetAction{
action := types.AssetAction{
ID: v.id,
State: v.state.Load(),
BlockNumber: v.blockHeight,
Asset: assetID,
TxIndex: v.logIndex,
Hash: v.txHash,
ChainID: v.chainID,
BuiltinD: v.builtinD,
Erc20AL: v.erc20AL,
Erc20D: v.erc20D,
ERC20AssetLimitsUpdated: v.erc20AssetLimitsUpdated,
BridgeStopped: bridgeStopped,
BridgeResume: bridgeResumed,
})
}

e.log.Info("getAssetActions",
zap.Any("action", fmt.Sprintf("%+v", action)),
zap.String("ref", v.getRef().Hash),
)

aa = append(aa, &action)
}

sort.SliceStable(aa, func(i, j int) bool { return aa[i].ID < aa[j].ID })
Expand Down
22 changes: 6 additions & 16 deletions core/banking/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func TestDepositFinalisedAfterCheckpoint(t *testing.T) {
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

// then we call the callback from the fake erc
eng.erc.r.Check(context.Background())
eng.erc.f(eng.erc.r, true)
// then we call the callback from the fake witness
eng.witness.r.Check(context.Background())
eng.witness.f(eng.witness.r, true)

// now we take a checkpoint
cp, err := eng.Checkpoint()
Expand Down Expand Up @@ -203,13 +203,9 @@ func testSimpledScheduledTransfer(t *testing.T) {
e2.OnTick(context.Background(), time.Unix(12, 0))
}

func TestGovernancedScheduledTransfer(t *testing.T) {
func TestGovernanceScheduledTransfer(t *testing.T) {
e := getTestEngine(t)
e.assets.EXPECT().Get(gomock.Any()).AnyTimes().Return(assets.NewAsset(&mockAsset{name: assetNameETH, quantum: num.DecimalFromFloat(100)}), nil)
e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()

// let's do a massive fee, easy to test.
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
Expand All @@ -233,6 +229,7 @@ func TestGovernancedScheduledTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand All @@ -253,10 +250,6 @@ func TestGovernancedScheduledTransfer(t *testing.T) {

// progress time to when the scheduled gov transfer should be delivered on
// then trigger the time update, and see the transfer going
e2.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(12, 0)
}).Times(2)
e2.broker.EXPECT().Send(gomock.Any()).Times(2)
e2.broker.EXPECT().SendBatch(gomock.Any()).AnyTimes()
e2.col.EXPECT().GetSystemAccountBalance(gomock.Any(), gomock.Any(), gomock.Any()).Return(num.NewUint(1000), nil).AnyTimes()
Expand All @@ -276,10 +269,6 @@ func TestGovernanceRecurringTransfer(t *testing.T) {
e.assets.EXPECT().Get(gomock.Any()).AnyTimes().Return(assets.NewAsset(&mockAsset{name: assetNameETH, quantum: num.DecimalFromFloat(100)}), nil)

ctx := context.Background()
e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).Times(3)
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
e.OnTick(ctx, time.Unix(10, 0))
e.OnEpoch(ctx, types.Epoch{Seq: 0, StartTime: time.Unix(10, 0), Action: vega.EpochAction_EPOCH_ACTION_START})
Expand All @@ -301,6 +290,7 @@ func TestGovernanceRecurringTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down
Loading

0 comments on commit 7cc23f5

Please sign in to comment.