Skip to content

Commit

Permalink
Add aggLayer settlement option
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Feb 19, 2024
1 parent ead0b1a commit 6a1940a
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 85 deletions.
136 changes: 116 additions & 20 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,6 +14,9 @@ import (
"time"
"unicode"

"github.com/0xPolygon/agglayer/client"
agglayerTypes "github.com/0xPolygon/agglayer/rpc/types"
"github.com/0xPolygon/agglayer/tx"
"github.com/0xPolygonHermez/zkevm-node/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-node/aggregator/prover"
"github.com/0xPolygonHermez/zkevm-node/config/types"
Expand Down Expand Up @@ -64,6 +68,10 @@ type Aggregator struct {
srv *grpc.Server
ctx context.Context
exit context.CancelFunc

AggLayerClient client.ClientInterface
sequencerPrivateKey *ecdsa.PrivateKey
RollupID uint32
}

// New creates a new aggregator.
Expand All @@ -72,6 +80,8 @@ func New(
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
agglayerClient client.ClientInterface,
sequencerPrivateKey *ecdsa.PrivateKey,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -93,6 +103,10 @@ func New(
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,

finalProof: make(chan finalProofMsg),

AggLayerClient: agglayerClient,
sequencerPrivateKey: sequencerPrivateKey,
RollupID: cfg.RollupID,
}

return a, nil
Expand Down Expand Up @@ -266,34 +280,116 @@ func (a *Aggregator) sendFinalProof() {

log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
switch a.cfg.SettlementBackend {
case AggLayer:
if success := a.settleWithAggLayer(ctx, proof, inputs); !success {
continue
}
default:
if success := a.settleDirect(ctx, proof, inputs); !success {
continue
}
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

a.resetVerifyProofTime()
a.endProofVerification()
}
}
}

func (a *Aggregator) settleDirect(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
// add batch verification to be monitored
// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return false
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return false
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

return true
}

func (a *Aggregator) settleWithAggLayer(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x")
proofBytes := common.Hex2Bytes(proofStrNo0x)
tx := tx.Tx{
LastVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumber - 1),
NewVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumberFinal),
ZKP: tx.ZKP{
NewStateRoot: common.BytesToHash(inputs.NewStateRoot),
NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot),
Proof: agglayerTypes.ArgBytes(proofBytes),
},
RollupID: a.RollupID,
}
signedTx, err := tx.Sign(a.sequencerPrivateKey)

if err != nil {
log.Errorf("failed to sign tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Debug("final proof signedTx: ", signedTx.Tx.ZKP.Proof.Hex())
txHash, err := a.AggLayerClient.SendTx(*signedTx)
if err != nil {
log.Errorf("failed to send tx to the interop: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Infof("tx %s sent to agglayer, waiting to be mined", txHash.Hex())
log.Debugf("Timeout set to %f seconds", a.cfg.AggLayerTxTimeout.Duration.Seconds())
waitCtx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(a.cfg.AggLayerTxTimeout.Duration))
defer cancelFunc()
if err := a.AggLayerClient.WaitTxToBeMined(txHash, waitCtx); err != nil {
log.Errorf("interop didn't mine the tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

// TODO: wait for synchronizer to catch up
return true
}

func (a *Aggregator) handleFailureToSendToAggLayer(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil

err := a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("Failed updating proof state (false): %v", err)
}

a.endProofVerification()
}

func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil
Expand Down
10 changes: 5 additions & 5 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestSendFinalProof(t *testing.T) {
stateMock := mocks.NewStateMock(t)
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
a.ctx, a.exit = context.WithCancel(context.Background())
m := mox{
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestTryAggregateProofs(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -958,7 +958,7 @@ func TestTryGenerateBatchProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1235,7 +1235,7 @@ func TestTryBuildFinalProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func TestIsSynced(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down
26 changes: 26 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import (
"github.com/0xPolygonHermez/zkevm-node/encoding"
)

// SettlementBackend is the type of the settlement backend
type SettlementBackend string

const (
// AggLayer settlement backend
AggLayer SettlementBackend = "agglayer"

// L1 settlement backend
L1 SettlementBackend = "l1"
)

// TokenAmountWithDecimals is a wrapper type that parses token amount with decimals to big int
type TokenAmountWithDecimals struct {
*big.Int `validate:"required"`
Expand Down Expand Up @@ -85,4 +96,19 @@ type Config struct {
// gas offset: 100
// final gas: 1100
GasOffset uint64 `mapstructure:"GasOffset"`

// SettlementBackend configuration defines how a final ZKP should be settled. Directly to L1 or over the Beethoven service.
SettlementBackend SettlementBackend `mapstructure:"SettlementBackend"`

// AggLayerTxTimeout is the interval time to wait for a tx to be mined from the agglayer
AggLayerTxTimeout types.Duration `mapstructure:"AggLayerTxTimeout"`

// AggLayerURL url of the agglayer service
AggLayerURL string `mapstructure:"AggLayerURL"`

// SequencerPrivateKey Private key of the trusted sequencer
SequencerPrivateKey types.KeystoreFileConfig `mapstructure:"SequencerPrivateKey"`

// RollupID ID of the rollup
RollupID uint32 `mapstructure:"RollupID"`
}
20 changes: 19 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"net"
Expand All @@ -12,6 +13,7 @@ import (
"runtime"
"time"

agglayerClient "github.com/0xPolygon/agglayer/client"
dataCommitteeClient "github.com/0xPolygon/cdk-data-availability/client"
datastreamerlog "github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node"
Expand Down Expand Up @@ -430,7 +432,23 @@ func createSequenceSender(cfg config.Config, pool *pool.Pool, etmStorage *ethtxm
}

func runAggregator(ctx context.Context, c aggregator.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State) {
agg, err := aggregator.New(c, st, ethTxManager, etherman)
var (
aggCli *agglayerClient.Client
pk *ecdsa.PrivateKey
err error
)

if c.SettlementBackend == aggregator.AggLayer {
aggCli = agglayerClient.New(c.AggLayerURL)

// Load private key
pk, err = config.NewKeyFromKeystore(c.SequencerPrivateKey)
if err != nil {
log.Fatal(err)
}
}

agg, err := aggregator.New(c, st, ethTxManager, etherman, aggCli, pk)
if err != nil {
log.Fatal(err)
}
Expand Down
20 changes: 20 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package config

import (
"bytes"
"crypto/ecdsa"
"os"
"path/filepath"
"strings"

"github.com/0xPolygonHermez/zkevm-node/aggregator"
"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
Expand All @@ -21,6 +24,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/synchronizer"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -183,3 +187,19 @@ func Load(ctx *cli.Context, loadNetworkConfig bool) (*Config, error) {
}
return cfg, nil
}

// NewKeyFromKeystore creates a private key from a keystore file
func NewKeyFromKeystore(cfg types.KeystoreFileConfig) (*ecdsa.PrivateKey, error) {
if cfg.Path == "" && cfg.Password == "" {
return nil, nil
}
keystoreEncrypted, err := os.ReadFile(filepath.Clean(cfg.Path))
if err != nil {
return nil, err
}
key, err := keystore.DecryptKey(keystoreEncrypted, cfg.Password)
if err != nil {
return nil, err
}
return key.PrivateKey, nil
}
4 changes: 4 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ ProofStatePollingInterval = "5s"
CleanupLockedProofsInterval = "2m"
GeneratingProofCleanupThreshold = "10m"
GasOffset = 0
SettlementBackend = "l1"
AggLayerTxTimeout = "5m"
AggLayerURL = ""
SequencerPrivateKey = {}
[L2GasPriceSuggester]
Type = "follower"
Expand Down
Loading

0 comments on commit 6a1940a

Please sign in to comment.