From 33fcb016bf580b70cfe91666600c6781635ef86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Wed, 5 Oct 2022 12:13:07 +0200 Subject: [PATCH] Add support for multiple provers in the aggregator (#1214) * Aggregator Multiprover * WIP * WIP * Finish implementation * typo * Delete proof generation mark in case of error --- Makefile | 2 +- aggregator/aggregator.go | 149 ++++++++++++++++++++++--- aggregator/aggregator_internal_test.go | 18 +-- aggregator/config.go | 3 + aggregator/interfaces.go | 7 +- aggregator/mocks/mock_proverclient.go | 16 ++- aggregator/mocks/mock_state.go | 69 +++++++++++- aggregator/prover/client.go | 24 ++-- aggregator/prover/prover.go | 29 +++++ cmd/run.go | 33 +----- config/config.go | 2 +- config/config.local.toml | 4 +- db/migrations/state/0001.sql | 6 + docker-compose.yml | 2 +- proverclient/config.go | 6 +- state/pgstatestorage.go | 44 ++++++++ test/config/config.test.toml | 4 +- 17 files changed, 348 insertions(+), 70 deletions(-) create mode 100644 aggregator/prover/prover.go diff --git a/Makefile b/Makefile index ffe6343fc8..79085c54a4 100644 --- a/Makefile +++ b/Makefile @@ -357,7 +357,7 @@ generate-mocks: ## Generates mocks for the tests, using mockery tool ## mocks for the aggregator tests mockery --name=stateInterface --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=StateMock --filename=mock_state.go - mockery --name=proverClient --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=ProverClientMock --filename=mock_proverclient.go + mockery --name=proverClientInterface --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=ProverClientMock --filename=mock_proverclient.go mockery --name=etherman --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=Etherman --filename=mock_etherman.go mockery --name=ethTxManager --dir=aggregator --output=aggregator/mocks --outpkg=mocks --structname=EthTxManager --filename=mock_ethtxmanager.go diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 1000325bbd..3bbeeeb07b 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -16,6 +16,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" "github.com/iden3/go-iden3-crypto/keccak256" + "google.golang.org/grpc" ) // Aggregator represents an aggregator @@ -25,7 +26,7 @@ type Aggregator struct { State stateInterface EthTxManager ethTxManager Ethman etherman - ProverClient proverClient + ProverClients []proverClientInterface ProfitabilityChecker aggregatorTxProfitabilityChecker } @@ -35,7 +36,7 @@ func NewAggregator( state stateInterface, ethTxManager ethTxManager, etherman etherman, - zkProverClient pb.ZKProverServiceClient, + grpcClientConns []*grpc.ClientConn, ) (Aggregator, error) { var profitabilityChecker aggregatorTxProfitabilityChecker switch cfg.TxProfitabilityCheckerType { @@ -45,13 +46,22 @@ func NewAggregator( profitabilityChecker = NewTxProfitabilityCheckerAcceptAll(state, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration) } + proverClients := make([]proverClientInterface, 0, len(cfg.ProverURIs)) + + for _, proverURI := range cfg.ProverURIs { + proverClient := prover.NewClient(proverURI, cfg.IntervalFrequencyToGetProofGenerationState) + proverClients = append(proverClients, proverClient) + grpcClientConns = append(grpcClientConns, proverClient.Prover.Conn) + log.Infof("Connected to prover %v", proverURI) + } + a := Aggregator{ cfg: cfg, State: state, EthTxManager: ethTxManager, Ethman: etherman, - ProverClient: prover.NewClient(zkProverClient, cfg.IntervalFrequencyToGetProofGenerationState), + ProverClients: proverClients, ProfitabilityChecker: profitabilityChecker, } @@ -61,15 +71,70 @@ func NewAggregator( // Start starts the aggregator func (a *Aggregator) Start(ctx context.Context) { // define those vars here, bcs it can be used in case <-a.ctx.Done() - ticker := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration) - defer ticker.Stop() - for { - a.tryVerifyBatch(ctx, ticker) + tickerVerifyBatch := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration) + tickerSendVerifiedBatch := time.NewTicker(a.cfg.IntervalToConsolidateState.Duration) + defer tickerVerifyBatch.Stop() + defer tickerSendVerifiedBatch.Stop() + + for i := 0; i < len(a.ProverClients); i++ { + go func() { + for { + a.tryVerifyBatch(ctx, tickerVerifyBatch) + } + }() + time.Sleep(time.Second) + } + + go func() { + for { + a.tryToSendVerifiedBatch(ctx, tickerSendVerifiedBatch) + } + }() + // Wait until context is done + <-ctx.Done() +} + +func (a *Aggregator) tryToSendVerifiedBatch(ctx context.Context, ticker *time.Ticker) { + log.Info("checking if there is any consolidated batch to be verified") + lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil) + if err != nil && err != state.ErrNotFound { + log.Warnf("failed to get last consolidated batch, err: %v", err) + waitTick(ctx, ticker) + return + } else if err == state.ErrNotFound { + log.Warn("no consolidated batch found") + waitTick(ctx, ticker) + return + } + + batchNumberToVerify := lastVerifiedBatch.BatchNumber + 1 + + proof, err := a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil) + if err != nil && err != state.ErrNotFound { + log.Warnf("failed to get last proof for batch %v, err: %v", batchNumberToVerify, err) + waitTick(ctx, ticker) + return + } + + if proof != nil { + log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchNumberToVerify) + a.EthTxManager.VerifyBatch(batchNumberToVerify, proof) + log.Infof("proof for the batch was sent, batchNumber: %v", batchNumberToVerify) + err := a.State.DeleteGeneratedProof(ctx, batchNumberToVerify, nil) + if err != nil { + log.Warnf("failed to delete generated proof for batchNumber %v, err: %v", batchNumberToVerify, err) + return + } + } else { + log.Infof("no generated proof for batchNumber %v has been found", batchNumberToVerify) + waitTick(ctx, ticker) + return } } func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) { log.Info("checking if network is synced") + for !a.isSynced(ctx) { log.Infof("waiting for synchronizer to sync...") waitTick(ctx, ticker) @@ -104,16 +169,50 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) { return } - genProofID, err := a.ProverClient.GetGenProofID(ctx, inputProver) + log.Infof("sending a batch to the prover, OLDSTATEROOT: %s, NEWSTATEROOT: %s, BATCHNUM: %d", + inputProver.PublicInputs.OldStateRoot, inputProver.PublicInputs.NewStateRoot, inputProver.PublicInputs.BatchNum) + + var prover proverClientInterface + + // Look for a free prover + for _, prover = range a.ProverClients { + if prover.IsIdle(ctx) { + break + } + } + + if prover == nil { + log.Warn("all provers are busy") + waitTick(ctx, ticker) + return + } + + // Avoid other thread to process the same batch + err = a.State.AddGeneratedProof(ctx, batchToVerify.BatchNumber, nil, nil) + if err != nil { + log.Warnf("failed to store proof generation mark, err: %v", err) + waitTick(ctx, ticker) + return + } + + genProofID, err := prover.GetGenProofID(ctx, inputProver) if err != nil { log.Warnf("failed to get gen proof id, err: %v", err) + err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil) + if err2 != nil { + log.Errorf("failed to delete proof generation mark after error, err: %v", err2) + } waitTick(ctx, ticker) return } - resGetProof, err := a.ProverClient.GetResGetProof(ctx, genProofID, batchToVerify.BatchNumber) + resGetProof, err := prover.GetResGetProof(ctx, genProofID, batchToVerify.BatchNumber) if err != nil { log.Warnf("failed to get proof from prover, err: %v", err) + err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil) + if err2 != nil { + log.Errorf("failed to delete proof generation mark after error, err: %v", err2) + } waitTick(ctx, ticker) return } @@ -129,9 +228,17 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) { resGetProof.Public.PublicInputs.NewLocalExitRoot = inputProver.PublicInputs.NewLocalExitRoot } - log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchToVerify.BatchNumber) - a.EthTxManager.VerifyBatch(batchToVerify.BatchNumber, resGetProof) - log.Infof("proof for the batch was sent, batchNumber: %d", batchToVerify.BatchNumber) + // Store proof + err = a.State.UpdateGeneratedProof(ctx, batchToVerify.BatchNumber, resGetProof, nil) + if err != nil { + log.Warnf("failed to store generated proof, err: %v", err) + err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil) + if err2 != nil { + log.Errorf("failed to delete proof generation mark after error, err: %v", err2) + } + waitTick(ctx, ticker) + return + } } func (a *Aggregator) isSynced(ctx context.Context) bool { @@ -161,8 +268,24 @@ func (a *Aggregator) getBatchToVerify(ctx context.Context) (*state.Batch, error) if err != nil { return nil, err } - batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, lastVerifiedBatch.BatchNumber+1, nil) + batchNumberToVerify := lastVerifiedBatch.BatchNumber + 1 + + // Check if a prover is already working on this batch + _, err = a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil) + if err != nil && !errors.Is(err, state.ErrNotFound) { + return nil, err + } + + for !errors.Is(err, state.ErrNotFound) { + batchNumberToVerify++ + _, err = a.State.GetGeneratedProofByBatchNumber(ctx, batchNumberToVerify, nil) + if err != nil && !errors.Is(err, state.ErrNotFound) { + return nil, err + } + } + + batchToVerify, err := a.State.GetVirtualBatchByNumber(ctx, batchNumberToVerify, nil) if err != nil { if errors.Is(err, state.ErrNotFound) { log.Infof("there are no batches to consolidate") diff --git a/aggregator/aggregator_internal_test.go b/aggregator/aggregator_internal_test.go index 8fb5d37964..34f4b16b0f 100644 --- a/aggregator/aggregator_internal_test.go +++ b/aggregator/aggregator_internal_test.go @@ -66,10 +66,10 @@ func TestBuildInputProver(t *testing.T) { etherman := new(aggrMocks.Etherman) proverClient := new(aggrMocks.ProverClientMock) a := Aggregator{ - State: st, - EthTxManager: ethTxManager, - Ethman: etherman, - ProverClient: proverClient, + State: st, + EthTxManager: ethTxManager, + Ethman: etherman, + ProverClients: []proverClientInterface{proverClient}, } var ( oldStateRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6") @@ -145,10 +145,10 @@ func TestBuildInputProverError(t *testing.T) { etherman := new(aggrMocks.Etherman) proverClient := new(aggrMocks.ProverClientMock) a := Aggregator{ - State: st, - EthTxManager: ethTxManager, - Ethman: etherman, - ProverClient: proverClient, + State: st, + EthTxManager: ethTxManager, + Ethman: etherman, + ProverClients: []proverClientInterface{proverClient}, } var ( newLocalExitRoot = common.HexToHash("0xbdde84a5932a2f0a1a4c6c51f3b64ea265d4f1461749298cfdd09b31122ce0d6") @@ -196,7 +196,7 @@ func TestAggregatorFlow(t *testing.T) { State: st, EthTxManager: ethTxManager, Ethman: etherman, - ProverClient: proverClient, + ProverClients: []proverClientInterface{proverClient}, ProfitabilityChecker: NewTxProfitabilityCheckerAcceptAll(st, 1*time.Second), } var ( diff --git a/aggregator/config.go b/aggregator/config.go index 01681a49d5..9be23031e8 100644 --- a/aggregator/config.go +++ b/aggregator/config.go @@ -51,4 +51,7 @@ type Config struct { // ChainID is the L2 ChainID provided by the Network Config ChainID uint64 + + // Array of Prover URIs read from configuration file + ProverURIs []string } diff --git a/aggregator/interfaces.go b/aggregator/interfaces.go index d404669499..43111d5999 100644 --- a/aggregator/interfaces.go +++ b/aggregator/interfaces.go @@ -31,7 +31,8 @@ type aggregatorTxProfitabilityChecker interface { } // proverClient is a wrapper to the prover service -type proverClient interface { +type proverClientInterface interface { + IsIdle(ctx context.Context) bool GetGenProofID(ctx context.Context, inputProver *pb.InputProver) (string, error) GetResGetProof(ctx context.Context, genProofID string, batchNumber uint64) (*pb.GetProofResponse, error) } @@ -41,4 +42,8 @@ type stateInterface interface { GetLastVerifiedBatch(ctx context.Context, dbTx pgx.Tx) (*state.VerifiedBatch, error) GetVirtualBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) + AddGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error + UpdateGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error + GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*pb.GetProofResponse, error) + DeleteGeneratedProof(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error } diff --git a/aggregator/mocks/mock_proverclient.go b/aggregator/mocks/mock_proverclient.go index 788d14c726..54fd745fda 100644 --- a/aggregator/mocks/mock_proverclient.go +++ b/aggregator/mocks/mock_proverclient.go @@ -9,7 +9,7 @@ import ( mock "github.com/stretchr/testify/mock" ) -// ProverClientMock is an autogenerated mock type for the proverClient type +// ProverClientMock is an autogenerated mock type for the proverClientInterface type type ProverClientMock struct { mock.Mock } @@ -58,6 +58,20 @@ func (_m *ProverClientMock) GetResGetProof(ctx context.Context, genProofID strin return r0, r1 } +// IsIdle provides a mock function with given fields: ctx +func (_m *ProverClientMock) IsIdle(ctx context.Context) bool { + ret := _m.Called(ctx) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + type mockConstructorTestingTNewProverClientMock interface { mock.TestingT Cleanup(func()) diff --git a/aggregator/mocks/mock_state.go b/aggregator/mocks/mock_state.go index 517bd36bda..c09a9e8c3c 100644 --- a/aggregator/mocks/mock_state.go +++ b/aggregator/mocks/mock_state.go @@ -5,9 +5,11 @@ package mocks import ( context "context" - pgx "github.com/jackc/pgx/v4" + pb "github.com/0xPolygonHermez/zkevm-node/proverclient/pb" mock "github.com/stretchr/testify/mock" + pgx "github.com/jackc/pgx/v4" + state "github.com/0xPolygonHermez/zkevm-node/state" ) @@ -16,6 +18,34 @@ type StateMock struct { mock.Mock } +// AddGeneratedProof provides a mock function with given fields: ctx, batchNumber, proof, dbTx +func (_m *StateMock) AddGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error { + ret := _m.Called(ctx, batchNumber, proof, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, *pb.GetProofResponse, pgx.Tx) error); ok { + r0 = rf(ctx, batchNumber, proof, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteGeneratedProof provides a mock function with given fields: ctx, batchNumber, dbTx +func (_m *StateMock) DeleteGeneratedProof(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { + ret := _m.Called(ctx, batchNumber, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) error); ok { + r0 = rf(ctx, batchNumber, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // GetBatchByNumber provides a mock function with given fields: ctx, batchNumber, dbTx func (_m *StateMock) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) { ret := _m.Called(ctx, batchNumber, dbTx) @@ -39,6 +69,29 @@ func (_m *StateMock) GetBatchByNumber(ctx context.Context, batchNumber uint64, d return r0, r1 } +// GetGeneratedProofByBatchNumber provides a mock function with given fields: ctx, batchNumber, dbTx +func (_m *StateMock) GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*pb.GetProofResponse, error) { + ret := _m.Called(ctx, batchNumber, dbTx) + + var r0 *pb.GetProofResponse + if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) *pb.GetProofResponse); ok { + r0 = rf(ctx, batchNumber, dbTx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pb.GetProofResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uint64, pgx.Tx) error); ok { + r1 = rf(ctx, batchNumber, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetLastVerifiedBatch provides a mock function with given fields: ctx, dbTx func (_m *StateMock) GetLastVerifiedBatch(ctx context.Context, dbTx pgx.Tx) (*state.VerifiedBatch, error) { ret := _m.Called(ctx, dbTx) @@ -85,6 +138,20 @@ func (_m *StateMock) GetVirtualBatchByNumber(ctx context.Context, batchNumber ui return r0, r1 } +// UpdateGeneratedProof provides a mock function with given fields: ctx, batchNumber, proof, dbTx +func (_m *StateMock) UpdateGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error { + ret := _m.Called(ctx, batchNumber, proof, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, *pb.GetProofResponse, pgx.Tx) error); ok { + r0 = rf(ctx, batchNumber, proof, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type mockConstructorTestingTNewStateMock interface { mock.TestingT Cleanup(func()) diff --git a/aggregator/prover/client.go b/aggregator/prover/client.go index 9d31da4acc..d6c26c55a8 100644 --- a/aggregator/prover/client.go +++ b/aggregator/prover/client.go @@ -11,26 +11,36 @@ import ( "google.golang.org/grpc" ) -// Client wrapper for the zkprover client +// Client wrapper for the prover client type Client struct { - ZkProverClient pb.ZKProverServiceClient + Prover Prover IntervalFrequencyToGetProofGenerationState types.Duration } -// NewClient inits zkprover wrapper client -func NewClient(pc pb.ZKProverServiceClient, intervalFrequencyToGetProofGenerationState types.Duration) *Client { +// NewClient inits prover wrapper client +func NewClient(proverURI string, intervalFrequencyToGetProofGenerationState types.Duration) *Client { return &Client{ - ZkProverClient: pc, + Prover: NewProver(proverURI), IntervalFrequencyToGetProofGenerationState: intervalFrequencyToGetProofGenerationState, } } +// IsIdle indicates the prover is ready to process requests +func (c *Client) IsIdle(ctx context.Context) bool { + var opts []grpc.CallOption + status, err := c.Prover.Client.GetStatus(ctx, &pb.GetStatusRequest{}, opts...) + if err != nil || status.State != pb.GetStatusResponse_STATUS_PROVER_IDLE { + return false + } + return true +} + // GetGenProofID get id of generation proof request func (c *Client) GetGenProofID(ctx context.Context, inputProver *pb.InputProver) (string, error) { genProofRequest := pb.GenProofRequest{Input: inputProver} // init connection to the prover var opts []grpc.CallOption - resGenProof, err := c.ZkProverClient.GenProof(ctx, &genProofRequest, opts...) + resGenProof, err := c.Prover.Client.GenProof(ctx, &genProofRequest, opts...) if err != nil { return "", fmt.Errorf("failed to connect to the prover to gen proof, err: %v", err) } @@ -50,7 +60,7 @@ func (c *Client) GetResGetProof(ctx context.Context, genProofID string, batchNum resGetProof := &pb.GetProofResponse{Result: -1} getProofCtx, getProofCtxCancel := context.WithCancel(ctx) defer getProofCtxCancel() - getProofClient, err := c.ZkProverClient.GetProof(getProofCtx) + getProofClient, err := c.Prover.Client.GetProof(getProofCtx) if err != nil { return nil, fmt.Errorf("failed to init getProofClient, batchNumber: %d, err: %v", batchNumber, err) } diff --git a/aggregator/prover/prover.go b/aggregator/prover/prover.go new file mode 100644 index 0000000000..0efd2e9019 --- /dev/null +++ b/aggregator/prover/prover.go @@ -0,0 +1,29 @@ +package prover + +import ( + "github.com/0xPolygonHermez/zkevm-node/log" + proverclientpb "github.com/0xPolygonHermez/zkevm-node/proverclient/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// Prover struct +type Prover struct { + Client proverclientpb.ZKProverServiceClient + Conn *grpc.ClientConn +} + +// NewProver creates a new Prover +func NewProver(proverURI string) Prover { + opts := []grpc.DialOption{ + // TODO: once we have user and password for prover server, change this + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + proverConn, err := grpc.Dial(proverURI, opts...) + if err != nil { + log.Fatalf("fail to dial: %v", err) + } + + proverClient := proverclientpb.NewZKProverServiceClient(proverConn) + return Prover{Client: proverClient, Conn: proverConn} +} diff --git a/cmd/run.go b/cmd/run.go index d5d2d15adb..ace24790c7 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -21,8 +21,6 @@ import ( "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage" "github.com/0xPolygonHermez/zkevm-node/pricegetter" - "github.com/0xPolygonHermez/zkevm-node/proverclient" - proverclientpb "github.com/0xPolygonHermez/zkevm-node/proverclient/pb" "github.com/0xPolygonHermez/zkevm-node/sequencer" "github.com/0xPolygonHermez/zkevm-node/sequencer/broadcast" "github.com/0xPolygonHermez/zkevm-node/sequencer/broadcast/pb" @@ -34,7 +32,6 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/urfave/cli/v2" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func start(cliCtx *cli.Context) error { @@ -73,23 +70,20 @@ func start(cliCtx *cli.Context) error { c.NetworkConfig.L2ChainID = chainID log.Infof("Chain ID read from POE SC = %v", c.NetworkConfig.L2ChainID) - - if strings.Contains(cliCtx.String(config.FlagComponents), RPC) { - etherman = nil - } } ctx := context.Background() st := newState(ctx, c, stateSqlDB) ethTxManager := ethtxmanager.New(c.EthTxManager, etherman) - proverClient, proverConn := newProverClient(c.Prover) + for _, item := range cliCtx.StringSlice(config.FlagComponents) { switch item { case AGGREGATOR: log.Info("Running aggregator") c.Aggregator.ChainID = c.NetworkConfig.L2ChainID - go runAggregator(ctx, c.Aggregator, etherman, ethTxManager, proverClient, st) + c.Aggregator.ProverURIs = c.Provers.ProverURIs + go runAggregator(ctx, c.Aggregator, etherman, ethTxManager, st, grpcClientConns) case SEQUENCER: log.Info("Running sequencer") poolInstance := createPool(c.PoolDB, c.NetworkConfig, st) @@ -114,8 +108,6 @@ func start(cliCtx *cli.Context) error { } } - grpcClientConns = append(grpcClientConns, proverConn) - waitSignal(grpcClientConns, cancelFuncs) return nil @@ -194,29 +186,14 @@ func createSequencer(c config.Config, pool *pool.Pool, state *state.State, ether return seq } -func runAggregator(ctx context.Context, c aggregator.Config, ethman *etherman.Client, ethTxManager *ethtxmanager.Client, - proverClient proverclientpb.ZKProverServiceClient, state *state.State) { - agg, err := aggregator.NewAggregator(c, state, ethTxManager, ethman, proverClient) +func runAggregator(ctx context.Context, c aggregator.Config, ethman *etherman.Client, ethTxManager *ethtxmanager.Client, state *state.State, grpcClientConns []*grpc.ClientConn) { + agg, err := aggregator.NewAggregator(c, state, ethTxManager, ethman, grpcClientConns) if err != nil { log.Fatal(err) } agg.Start(ctx) } -func newProverClient(c proverclient.Config) (proverclientpb.ZKProverServiceClient, *grpc.ClientConn) { - opts := []grpc.DialOption{ - // TODO: once we have user and password for prover server, change this - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - proverConn, err := grpc.Dial(c.ProverURI, opts...) - if err != nil { - log.Fatalf("fail to dial: %v", err) - } - - proverClient := proverclientpb.NewZKProverServiceClient(proverConn) - return proverClient, proverConn -} - func runBroadcastServer(c broadcast.ServerConfig, st *state.State) { s := grpc.NewServer() diff --git a/config/config.go b/config/config.go index 6da2cd92a7..987c199434 100644 --- a/config/config.go +++ b/config/config.go @@ -57,7 +57,7 @@ type Config struct { Sequencer sequencer.Config PriceGetter pricegetter.Config Aggregator aggregator.Config - Prover proverclient.Config + Provers proverclient.Config NetworkConfig NetworkConfig GasPriceEstimator gasprice.Config Executor executor.Config diff --git a/config/config.local.toml b/config/config.local.toml index 0ce93446d2..9eae2dfc49 100644 --- a/config/config.local.toml +++ b/config/config.local.toml @@ -84,8 +84,8 @@ TxProfitabilityMinReward = "1.1" Type = "default" DefaultGasPriceWei = 1000000000 -[Prover] -ProverURI = "zkevm-prover:50052" +[Provers] +ProverURIs = ["zkevm-prover:50052"] [MTServer] Host = "0.0.0.0" diff --git a/db/migrations/state/0001.sql b/db/migrations/state/0001.sql index f5af7fcc58..3a6ff463b7 100644 --- a/db/migrations/state/0001.sql +++ b/db/migrations/state/0001.sql @@ -115,3 +115,9 @@ CREATE TABLE state.log topic3 VARCHAR, PRIMARY KEY (tx_hash, log_index) ); + +CREATE TABLE state.proof +( + batch_num BIGINT NOT NULL PRIMARY KEY REFERENCES state.batch (batch_num) ON DELETE CASCADE, + proof jsonb +); diff --git a/docker-compose.yml b/docker-compose.yml index 3f77445ba1..19ae307419 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -233,7 +233,7 @@ services: zkevm-prover: container_name: zkevm-prover - image: hermeznetwork/zkevm-prover:develop@sha256:509fccbe361232ee360d6c95cd08124b9337f7473b335826c80b612b6692f65d + image: hermeznetwork/zkevm-prover:develop@sha256:408855e37f4751260bfb206e6aeb3b53c773a1615e8d675166b3647770700701 ports: # - 50051:50051 # Prover - 50052:50052 # Mock prover diff --git a/proverclient/config.go b/proverclient/config.go index 0d1a743b1f..e5160561f4 100644 --- a/proverclient/config.go +++ b/proverclient/config.go @@ -1,7 +1,7 @@ package proverclient -// Config represents the configuration of the prover client +// Config represents the configuration of the prover clients type Config struct { - // ProverURI URI to get access to the prover client - ProverURI string `mapstructure:"ProverURI"` + // ProverURIs URIs to get access to the prover clients + ProverURIs []string `mapstructure:"ProverURIs"` } diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 102876b0c3..be2326ddfc 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -9,6 +9,7 @@ import ( "time" "github.com/0xPolygonHermez/zkevm-node/hex" + "github.com/0xPolygonHermez/zkevm-node/proverclient/pb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" @@ -1700,3 +1701,46 @@ func (p *PostgresStorage) GetExitRootByGlobalExitRoot(ctx context.Context, ger c exitRoot.GlobalExitRootNum = new(big.Int).SetUint64(globalNum) return &exitRoot, nil } + +// AddGeneratedProof adds a generated proof to the storage +func (p *PostgresStorage) AddGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error { + const addGeneratedProofSQL = "INSERT INTO state.proof (batch_num, proof) VALUES ($1, $2)" + e := p.getExecQuerier(dbTx) + _, err := e.Exec(ctx, addGeneratedProofSQL, batchNumber, proof) + return err +} + +// UpdateGeneratedProof updates a generated proof in the storage +func (p *PostgresStorage) UpdateGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error { + const addGeneratedProofSQL = "UPDATE state.proof SET proof = $2 WHERE batch_num = $1" + e := p.getExecQuerier(dbTx) + _, err := e.Exec(ctx, addGeneratedProofSQL, batchNumber, proof) + return err +} + +// GetGeneratedProofByBatchNumber gets a generated proof from the storage +func (p *PostgresStorage) GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*pb.GetProofResponse, error) { + var ( + proof *pb.GetProofResponse + err error + ) + + const getGeneratedProofSQL = "SELECT proof FROM state.proof WHERE batch_num = $1" + e := p.getExecQuerier(dbTx) + err = e.QueryRow(ctx, getGeneratedProofSQL, batchNumber).Scan(&proof) + if errors.Is(err, pgx.ErrNoRows) { + return nil, ErrNotFound + } else if err != nil { + return nil, err + } + + return proof, err +} + +// DeleteGeneratedProof deletes a generated proof from the storage +func (p *PostgresStorage) DeleteGeneratedProof(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { + const deleteGeneratedProofSQL = "DELETE FROM state.proof WHERE batch_num = $1" + e := p.getExecQuerier(dbTx) + _, err := e.Exec(ctx, deleteGeneratedProofSQL, batchNumber) + return err +} diff --git a/test/config/config.test.toml b/test/config/config.test.toml index c88d47d93a..a959c970a7 100644 --- a/test/config/config.test.toml +++ b/test/config/config.test.toml @@ -84,8 +84,8 @@ TxProfitabilityMinReward = "1.1" Type = "default" DefaultGasPriceWei = 1000000000 -[Prover] -ProverURI = "zkevm-prover:50052" +[Provers] +ProverURIs = ["zkevm-prover:50052"] [MTServer] Host = "0.0.0.0"