Skip to content

Commit

Permalink
Add support for multiple provers in the aggregator (0xPolygonHermez#1214
Browse files Browse the repository at this point in the history
)

* Aggregator Multiprover

* WIP

* WIP

* Finish implementation

* typo

* Delete proof generation mark in case of error
  • Loading branch information
ToniRamirezM authored Oct 5, 2022
1 parent beee9c0 commit 33fcb01
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ generate-mocks: ## Generates mocks for the tests, using mockery tool

## mocks for the aggregator tests
mockery --name=stateInterface --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go
mockery --name=proverClient --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=ProverClientMock --filename=mock_proverclient.go
mockery --name=proverClientInterface --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=ProverClientMock --filename=mock_proverclient.go
mockery --name=etherman --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=Etherman --filename=mock_etherman.go
mockery --name=ethTxManager --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=EthTxManager --filename=mock_ethtxmanager.go

Expand Down
149 changes: 136 additions & 13 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/iden3/go-iden3-crypto/keccak256"
"google.golang.org/grpc"
)

// Aggregator represents an aggregator
Expand All @@ -25,7 +26,7 @@ type Aggregator struct {
State stateInterface
EthTxManager ethTxManager
Ethman etherman
ProverClient proverClient
ProverClients []proverClientInterface
ProfitabilityChecker aggregatorTxProfitabilityChecker
}

Expand All @@ -35,7 +36,7 @@ func NewAggregator(
state stateInterface,
ethTxManager ethTxManager,
etherman etherman,
zkProverClient pb.ZKProverServiceClient,
grpcClientConns []*grpc.ClientConn,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -45,13 +46,22 @@ func NewAggregator(
profitabilityChecker = NewTxProfitabilityCheckerAcceptAll(state, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration)
}

proverClients := make([]proverClientInterface, 0, len(cfg.ProverURIs))

for _, proverURI := range cfg.ProverURIs {
proverClient := prover.NewClient(proverURI, cfg.IntervalFrequencyToGetProofGenerationState)
proverClients = append(proverClients, proverClient)
grpcClientConns = append(grpcClientConns, proverClient.Prover.Conn)
log.Infof("Connected to prover %v", proverURI)
}

a := Aggregator{
cfg: cfg,

State: state,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: prover.NewClient(zkProverClient, cfg.IntervalFrequencyToGetProofGenerationState),
ProverClients: proverClients,
ProfitabilityChecker: profitabilityChecker,
}

Expand All @@ -61,15 +71,70 @@ func NewAggregator(
// Start starts the aggregator
func (a *Aggregator) Start(ctx context.Context) {
// define those vars here, bcs it can be used in case <-a.ctx.Done()
ticker := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration)
defer ticker.Stop()
for {
a.tryVerifyBatch(ctx, ticker)
tickerVerifyBatch := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration)
tickerSendVerifiedBatch := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration)
defer tickerVerifyBatch.Stop()
defer tickerSendVerifiedBatch.Stop()

for i := 0; i < len(a.ProverClients); i++ {
go func() {
for {
a.tryVerifyBatch(ctx, tickerVerifyBatch)
}
}()
time.Sleep(time.Second)
}

go func() {
for {
a.tryToSendVerifiedBatch(ctx, tickerSendVerifiedBatch)
}
}()
// Wait until context is done
<-ctx.Done()
}

func (a *Aggregator) tryToSendVerifiedBatch(ctx context.Context, ticker *time.Ticker) {
log.Info("checking if there is any consolidated batch to be verified")
lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Warnf("failed to get last consolidated batch, err: %v", err)
waitTick(ctx, ticker)
return
} else if err == state.ErrNotFound {
log.Warn("no consolidated batch found")
waitTick(ctx, ticker)
return
}

batchNumberToVerify := lastVerifiedBatch.BatchNumber + 1

proof, err := a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil)
if err != nil && err != state.ErrNotFound {
log.Warnf("failed to get last proof for batch %v, err: %v", batchNumberToVerify, err)
waitTick(ctx, ticker)
return
}

if proof != nil {
log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchNumberToVerify)
a.EthTxManager.VerifyBatch(batchNumberToVerify, proof)
log.Infof("proof for the batch was sent, batchNumber: %v", batchNumberToVerify)
err := a.State.DeleteGeneratedProof(ctx, batchNumberToVerify, nil)
if err != nil {
log.Warnf("failed to delete generated proof for batchNumber %v, err: %v", batchNumberToVerify, err)
return
}
} else {
log.Infof("no generated proof for batchNumber %v has been found", batchNumberToVerify)
waitTick(ctx, ticker)
return
}
}

func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
log.Info("checking if network is synced")

for !a.isSynced(ctx) {
log.Infof("waiting for synchronizer to sync...")
waitTick(ctx, ticker)
Expand Down Expand Up @@ -104,16 +169,50 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
return
}

genProofID, err := a.ProverClient.GetGenProofID(ctx, inputProver)
log.Infof("sending a batch to the prover, OLDSTATEROOT: %s, NEWSTATEROOT: %s, BATCHNUM: %d",
inputProver.PublicInputs.OldStateRoot, inputProver.PublicInputs.NewStateRoot, inputProver.PublicInputs.BatchNum)

var prover proverClientInterface

// Look for a free prover
for _, prover = range a.ProverClients {
if prover.IsIdle(ctx) {
break
}
}

if prover == nil {
log.Warn("all provers are busy")
waitTick(ctx, ticker)
return
}

// Avoid other thread to process the same batch
err = a.State.AddGeneratedProof(ctx, batchToVerify.BatchNumber, nil, nil)
if err != nil {
log.Warnf("failed to store proof generation mark, err: %v", err)
waitTick(ctx, ticker)
return
}

genProofID, err := prover.GetGenProofID(ctx, inputProver)
if err != nil {
log.Warnf("failed to get gen proof id, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
}
waitTick(ctx, ticker)
return
}

resGetProof, err := a.ProverClient.GetResGetProof(ctx, genProofID, batchToVerify.BatchNumber)
resGetProof, err := prover.GetResGetProof(ctx, genProofID, batchToVerify.BatchNumber)
if err != nil {
log.Warnf("failed to get proof from prover, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
}
waitTick(ctx, ticker)
return
}
Expand All @@ -129,9 +228,17 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
resGetProof.Public.PublicInputs.NewLocalExitRoot = inputProver.PublicInputs.NewLocalExitRoot
}

log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchToVerify.BatchNumber)
a.EthTxManager.VerifyBatch(batchToVerify.BatchNumber, resGetProof)
log.Infof("proof for the batch was sent, batchNumber: %d", batchToVerify.BatchNumber)
// Store proof
err = a.State.UpdateGeneratedProof(ctx, batchToVerify.BatchNumber, resGetProof, nil)
if err != nil {
log.Warnf("failed to store generated proof, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
}
waitTick(ctx, ticker)
return
}
}

func (a *Aggregator) isSynced(ctx context.Context) bool {
Expand Down Expand Up @@ -161,8 +268,24 @@ func (a *Aggregator) getBatchToVerify(ctx context.Context) (*state.Batch, error)
if err != nil {
return nil, err
}
batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, lastVerifiedBatch.BatchNumber+1, nil)

batchNumberToVerify := lastVerifiedBatch.BatchNumber + 1

// Check if a prover is already working on this batch
_, err = a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil)
if err != nil && !errors.Is(err, state.ErrNotFound) {
return nil, err
}

for !errors.Is(err, state.ErrNotFound) {
batchNumberToVerify++
_, err = a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil)
if err != nil && !errors.Is(err, state.ErrNotFound) {
return nil, err
}
}

batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, batchNumberToVerify, nil)
if err != nil {
if errors.Is(err, state.ErrNotFound) {
log.Infof("there are no batches to consolidate")
Expand Down
18 changes: 9 additions & 9 deletions aggregator/aggregator_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestBuildInputProver(t *testing.T) {
etherman := new(aggrMocks.Etherman)
proverClient := new(aggrMocks.ProverClientMock)
a := Aggregator{
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClients: []proverClientInterface{proverClient},
}
var (
oldStateRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6")
Expand Down Expand Up @@ -145,10 +145,10 @@ func TestBuildInputProverError(t *testing.T) {
etherman := new(aggrMocks.Etherman)
proverClient := new(aggrMocks.ProverClientMock)
a := Aggregator{
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClients: []proverClientInterface{proverClient},
}
var (
newLocalExitRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6")
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestAggregatorFlow(t *testing.T) {
State: st,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClient: proverClient,
ProverClients: []proverClientInterface{proverClient},
ProfitabilityChecker: NewTxProfitabilityCheckerAcceptAll(st, 1*time.Second),
}
var (
Expand Down
3 changes: 3 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ type Config struct {

// ChainID is the L2 ChainID provided by the Network Config
ChainID uint64

// Array of Prover URIs read from configuration file
ProverURIs []string
}
7 changes: 6 additions & 1 deletion aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type aggregatorTxProfitabilityChecker interface {
}

// proverClient is a wrapper to the prover service
type proverClient interface {
type proverClientInterface interface {
IsIdle(ctx context.Context) bool
GetGenProofID(ctx context.Context, inputProver *pb.InputProver) (string, error)
GetResGetProof(ctx context.Context, genProofID string, batchNumber uint64) (*pb.GetProofResponse, error)
}
Expand All @@ -41,4 +42,8 @@ type stateInterface interface {
GetLastVerifiedBatch(ctx context.Context, dbTx pgx.Tx) (*state.VerifiedBatch, error)
GetVirtualBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
AddGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error
UpdateGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error
GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*pb.GetProofResponse, error)
DeleteGeneratedProof(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
}
16 changes: 15 additions & 1 deletion aggregator/mocks/mock_proverclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 33fcb01

Please sign in to comment.