Skip to content

Commit

Permalink
Feature/aggregator txman (0xPolygonHermez#1025)
Browse files Browse the repository at this point in the history
  • Loading branch information
KonradIT authored Aug 10, 2022
1 parent 7faa8f2 commit 2f483a0
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 206 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ stop-explorer-db: ## Stops the explorer database
run: ## Runs all the services
$(RUNDB)
$(RUNL1NETWORK)
sleep 2
$(RUNZKPROVER)
sleep 5
$(RUNSEQUENCER)
Expand Down
40 changes: 12 additions & 28 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type Aggregator struct {
Ethman etherman
ProverClient proverClient
ProfitabilityChecker aggregatorTxProfitabilityChecker

lastVerifiedBatchNum uint64
batchesSent map[uint64]bool
}

// NewAggregator creates a new aggregator
Expand All @@ -56,8 +53,6 @@ func NewAggregator(
Ethman: etherman,
ProverClient: prover.NewClient(zkProverClient),
ProfitabilityChecker: profitabilityChecker,

batchesSent: make(map[uint64]bool),
}

return a, nil
Expand Down Expand Up @@ -119,15 +114,9 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
}
a.compareInputHashes(inputProver, resGetProof)

log.Infof("sending verified proof to the ethereum smart contract, batchNumber", batchToVerify.BatchNumber)
err = a.EthTxManager.VerifyBatch(batchToVerify.BatchNumber, resGetProof)
if err != nil {
log.Warnf("failed to send request to consolidate batch to ethereum, batch number: %d, err: %v",
batchToVerify.BatchNumber, err)
return
}
a.batchesSent[batchToVerify.BatchNumber] = true
log.Infof("proof for the batch was send, batchNumber: %d", batchToVerify.BatchNumber)
log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchToVerify.BatchNumber)
a.EthTxManager.VerifyBatch(batchToVerify.BatchNumber, resGetProof)
log.Infof("proof for the batch was sent, batchNumber: %d", batchToVerify.BatchNumber)
}

func (a *Aggregator) isSynced(ctx context.Context) bool {
Expand All @@ -136,26 +125,28 @@ func (a *Aggregator) isSynced(ctx context.Context) bool {
log.Warnf("failed to get last consolidated batch, err: %v", err)
return false
}
if lastVerifiedBatch != nil {
a.lastVerifiedBatchNum = lastVerifiedBatch.BatchNumber
if lastVerifiedBatch == nil {
return false
}
lastVerifiedEthBatchNum, err := a.Ethman.GetLatestVerifiedBatchNum()
if err != nil {
log.Warnf("failed to get last eth batch, err: %v", err)
return false
}
if a.lastVerifiedBatchNum < lastVerifiedEthBatchNum {
if lastVerifiedBatch.BatchNumber < lastVerifiedEthBatchNum {
log.Infof("waiting for the state to be synced, lastVerifiedBatchNum: %d, lastVerifiedEthBatchNum: %d",
a.lastVerifiedBatchNum, lastVerifiedEthBatchNum)
lastVerifiedBatch.BatchNumber, lastVerifiedEthBatchNum)
return false
}
return true
}

func (a *Aggregator) getBatchToVerify(ctx context.Context) (*state.Batch, error) {
delete(a.batchesSent, a.lastVerifiedBatchNum)

batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, a.lastVerifiedBatchNum+1, nil)
lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
if err != nil {
return nil, err
}
batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, lastVerifiedBatch.BatchNumber+1, nil)

if err != nil {
if errors.Is(err, state.ErrNotFound) {
Expand All @@ -165,13 +156,6 @@ func (a *Aggregator) getBatchToVerify(ctx context.Context) (*state.Batch, error)
log.Warnf("failed to get batch to consolidate, err: %v", err)
return nil, err
}

if a.batchesSent[batchToVerify.BatchNumber] {
log.Infof("batch with number %d was already sent, but not yet consolidated by synchronizer",
batchToVerify.BatchNumber)
return nil, nil
}

return batchToVerify, nil
}

Expand Down
77 changes: 41 additions & 36 deletions aggregator/aggregator_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,17 @@ func TestIsSyncedNotSynced(t *testing.T) {
func TestGetBatchToVerify(t *testing.T) {
st := new(aggrMocks.StateMock)
batchToVerify := &state.Batch{BatchNumber: 1}
a := Aggregator{State: st, batchesSent: make(map[uint64]bool)}
a.batchesSent[a.lastVerifiedBatchNum] = true
a := Aggregator{State: st}
ctx := context.Background()
st.On("GetVirtualBatchByNumber", ctx, a.lastVerifiedBatchNum+1, nil).Return(batchToVerify, nil)
res, err := a.getBatchToVerify(ctx)
require.NoError(t, err)
require.Equal(t, batchToVerify, res)
require.False(t, a.batchesSent[a.lastVerifiedBatchNum])
}

func TestGetBatchToVerifyBatchAlreadySent(t *testing.T) {
st := new(aggrMocks.StateMock)
batchToVerify := &state.Batch{BatchNumber: 2}
a := Aggregator{State: st, batchesSent: make(map[uint64]bool)}
a.lastVerifiedBatchNum = 1
a.batchesSent[a.lastVerifiedBatchNum+1] = true
ctx := context.Background()
st.On("GetVirtualBatchByNumber", ctx, a.lastVerifiedBatchNum+1, nil).Return(batchToVerify, nil)
verifiedBatch := &state.VerifiedBatch{BatchNumber: 1}

st.On("GetLastVerifiedBatch", ctx, nil).Return(verifiedBatch, nil)
st.On("GetVirtualBatchByNumber", ctx, verifiedBatch.BatchNumber+1, nil).Return(batchToVerify, nil)

res, err := a.getBatchToVerify(ctx)
require.NoError(t, err)
require.Nil(t, res)
require.False(t, a.batchesSent[a.lastVerifiedBatchNum])
require.Equal(t, batchToVerify, res)
}

func TestBuildInputProver(t *testing.T) {
Expand All @@ -77,11 +66,10 @@ func TestBuildInputProver(t *testing.T) {
etherman := new(aggrMocks.Etherman)
proverClient := new(aggrMocks.ProverClientMock)
a := Aggregator{
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
lastVerifiedBatchNum: 1,
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
}
var (
oldStateRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6")
Expand All @@ -96,9 +84,19 @@ func TestBuildInputProver(t *testing.T) {
LocalExitRoot: oldLocalExitRoot,
}
)
st.On("GetBatchByNumber", mock.Anything, a.lastVerifiedBatchNum, nil).Return(previousBatch, nil)

ctx := context.Background()

verifiedBatch := &state.VerifiedBatch{BatchNumber: 1}

st.On("GetLastVerifiedBatch", ctx, nil).Return(verifiedBatch, nil)
etherman.On("GetLatestVerifiedBatchNum").Return(verifiedBatch.BatchNumber, nil)

lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
require.NoError(t, err)

st.On("GetBatchByNumber", mock.Anything, lastVerifiedBatch.BatchNumber, nil).Return(previousBatch, nil)

tx := *types.NewTransaction(1, common.HexToAddress("1"), big.NewInt(1), 0, big.NewInt(1), []byte("bbb"))
batchToVerify := &state.Batch{
BatchNumber: 2,
Expand Down Expand Up @@ -145,20 +143,28 @@ func TestBuildInputProverError(t *testing.T) {
etherman := new(aggrMocks.Etherman)
proverClient := new(aggrMocks.ProverClientMock)
a := Aggregator{
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
lastVerifiedBatchNum: 1,
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
}
var (
newLocalExitRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6")
seqAddress = common.HexToAddress("0x123")
batchL2Data = []byte("data")
)
st.On("GetBatchByNumber", mock.Anything, a.lastVerifiedBatchNum, nil).Return(nil, errors.New("error"))

ctx := context.Background()

verifiedBatch := &state.VerifiedBatch{BatchNumber: 1}

st.On("GetLastVerifiedBatch", ctx, nil).Return(verifiedBatch, nil)
etherman.On("GetLatestVerifiedBatchNum").Return(verifiedBatch.BatchNumber, nil)

lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
require.NoError(t, err)

st.On("GetBatchByNumber", mock.Anything, lastVerifiedBatch.BatchNumber, nil).Return(nil, errors.New("error"))

tx := *types.NewTransaction(1, common.HexToAddress("1"), big.NewInt(1), 0, big.NewInt(1), []byte("bbb"))
batchToVerify := &state.Batch{
BatchNumber: 2,
Expand Down Expand Up @@ -189,8 +195,6 @@ func TestAggregatorFlow(t *testing.T) {
Ethman: etherman,
ProverClient: proverClient,
ProfitabilityChecker: NewTxProfitabilityCheckerAcceptAll(st, 1*time.Second),
lastVerifiedBatchNum: 1,
batchesSent: make(map[uint64]bool),
}
var (
oldStateRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6")
Expand Down Expand Up @@ -256,10 +260,12 @@ func TestAggregatorFlow(t *testing.T) {
st.On("GetLastVerifiedBatch", mock.Anything, nil).Return(verifiedBatch, nil)
etherman.On("GetLatestVerifiedBatchNum").Return(uint64(1), nil)

lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(context.Background(), nil)
require.NoError(t, err)
// get batch to verify
st.On("GetVirtualBatchByNumber", mock.Anything, a.lastVerifiedBatchNum+1, nil).Return(batchToVerify, nil)
st.On("GetVirtualBatchByNumber", mock.Anything, lastVerifiedBatch.BatchNumber+1, nil).Return(batchToVerify, nil)
// build input prover
st.On("GetBatchByNumber", mock.Anything, a.lastVerifiedBatchNum, nil).Return(previousBatch, nil)
st.On("GetBatchByNumber", mock.Anything, lastVerifiedBatch.BatchNumber, nil).Return(previousBatch, nil)
// gen proof id
proverClient.On("GetGenProofID", mock.Anything, expectedInputProver).Return("1", nil)
// get proof
Expand All @@ -270,5 +276,4 @@ func TestAggregatorFlow(t *testing.T) {
defer ticker.Stop()
ctx := context.Background()
a.tryVerifyBatch(ctx, ticker)
require.True(t, a.batchesSent[batchToVerify.BatchNumber])
}
2 changes: 1 addition & 1 deletion aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// ethTxManager contains the methods required to send txs to
// ethereum.
type ethTxManager interface {
VerifyBatch(batchNum uint64, proof *pb.GetProofResponse) error
VerifyBatch(batchNum uint64, proof *pb.GetProofResponse)
}

// etherman contains the methods required to interact with ethereum
Expand Down
13 changes: 2 additions & 11 deletions aggregator/mocks/mock_ethtxmanager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2f483a0

Please sign in to comment.