diff --git a/.gitignore b/.gitignore index adfe257..f86fbf6 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ coverage.html logs.txt bin -contracts/broadcast \ No newline at end of file +contracts/broadcast +.vscode \ No newline at end of file diff --git a/Makefile b/Makefile index ef4c9f3..a87b869 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,9 @@ CHAINID=31337 STRATEGY_ADDRESS=0x7a2088a1bFc9d81c55368AE168C2C02570cB814F DEPLOYMENT_FILES_DIR=contracts/script/output/${CHAINID} +PROTOS := ./api/proto +PROTO_GEN := ./api/grpc + -----------------------------: ## ___CONTRACTS___: ## @@ -25,6 +28,22 @@ bindings: ## generates contract bindings __CLI__: ## +clean: + find $(PROTO_GEN) -name "*.pb.go" -type f | xargs rm -rf + mkdir -p $(PROTO_GEN) + +protoc: clean + protoc -I $(PROTOS) \ + --go_out=$(PROTO_GEN) \ + --go_opt=paths=source_relative \ + --go-grpc_out=$(PROTO_GEN) \ + --go-grpc_opt=paths=source_relative \ + $(PROTOS)/**/*.proto + +lint: + staticcheck ./... + golangci-lint run + build: build-operator build-aggregator build-cli build-operator: @@ -40,3 +59,4 @@ _____HELPER_____: ## mocks: ## generates mocks for tests go install go.uber.org/mock/mockgen@v0.3.0 go generate ./... + diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 7c3523c..15ff856 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -26,7 +26,7 @@ const ( // this hardcoded here because it's also hardcoded in the contracts, but should // ideally be fetched from the contracts taskChallengeWindowBlock = 100 - blockTimeSeconds = 12 * time.Second + blockTimeDuration = 12 * time.Second avsName = "mach" ) @@ -79,11 +79,14 @@ type OperatorStatus struct { type Aggregator struct { logger logging.Logger - serverIpPortAddr string - avsWriter chainio.AvsWriterer + serverIpPortAddr string + grpcServerIpPortAddr string + + avsWriter chainio.AvsWriterer service *AggregatorService legacyRpc *rpc.LegacyRpcHandler + gRpc *rpc.GRpcHandler } // NewAggregator creates a new Aggregator with the provided config. @@ -102,12 +105,20 @@ func NewAggregator(c *config.Config) (*Aggregator, error) { legacyRpc := rpc.NewLegacyRpcHandler(c.Logger, service) + var grpc_server *rpc.GRpcHandler + if c.AggregatorGRPCServerIpPortAddr != "" { + c.Logger.Infof("Create grpc server in %s", c.AggregatorGRPCServerIpPortAddr) + grpc_server = rpc.NewGRpcHandler(c.Logger, service) + } + return &Aggregator{ - logger: c.Logger, - serverIpPortAddr: c.AggregatorServerIpPortAddr, - avsWriter: avsWriter, - service: service, - legacyRpc: legacyRpc, + logger: c.Logger, + serverIpPortAddr: c.AggregatorServerIpPortAddr, + grpcServerIpPortAddr: c.AggregatorGRPCServerIpPortAddr, + avsWriter: avsWriter, + service: service, + legacyRpc: legacyRpc, + gRpc: grpc_server, }, nil } @@ -136,6 +147,10 @@ func (agg *Aggregator) Start(ctx context.Context, wg *sync.WaitGroup) error { func (agg *Aggregator) startRpcServer(ctx context.Context) { go agg.legacyRpc.StartServer(ctx, 1*time.Second, agg.serverIpPortAddr) + + if agg.gRpc != nil { + go agg.gRpc.StartServer(ctx, agg.grpcServerIpPortAddr) + } } func (agg *Aggregator) wait() { @@ -145,6 +160,10 @@ func (agg *Aggregator) wait() { agg.legacyRpc.Wait() } + if agg.gRpc != nil { + agg.gRpc.Wait() + } + agg.logger.Info("The aggregator is exited") } diff --git a/aggregator/rpc/grpc.go b/aggregator/rpc/grpc.go new file mode 100644 index 0000000..2dedd6a --- /dev/null +++ b/aggregator/rpc/grpc.go @@ -0,0 +1,110 @@ +package rpc + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/alt-research/avs/api/grpc/aggregator" + "github.com/alt-research/avs/core/message" + "google.golang.org/grpc" +) + +type GRpcHandler struct { + aggregator.UnimplementedAggregatorServer + logger logging.Logger + aggreagtor AggregatorRpcHandler + wg *sync.WaitGroup +} + +func NewGRpcHandler(logger logging.Logger, aggreagtor AggregatorRpcHandler) *GRpcHandler { + return &GRpcHandler{ + logger: logger, + aggreagtor: aggreagtor, + wg: &sync.WaitGroup{}, + } +} + +func (s *GRpcHandler) StartServer(ctx context.Context, serverIpPortAddr string) { + s.logger.Info("Start GRpcServer", "addr", serverIpPortAddr) + + lis, err := net.Listen("tcp", serverIpPortAddr) + if err != nil { + s.logger.Fatalf("GRpcServer failed to listen: %v", err) + } + + server := grpc.NewServer() + aggregator.RegisterAggregatorServer(server, s) + + serverErr := make(chan error, 1) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + serverErr <- server.Serve(lis) + }() + + select { + case <-ctx.Done(): + s.logger.Info("Stop GRpcServer by Done") + server.Stop() + case err = <-serverErr: + } + + if err != nil { + s.logger.Error("GRpcServer serve stopped by error", "err", err) + } else { + s.logger.Info("GRpcServer serve stopped") + } +} + +func (s *GRpcHandler) Wait() { + s.wg.Wait() +} + +// Send Init operator to aggregator from operator, will check if the config is matching +func (s *GRpcHandler) InitOperator(ctx context.Context, req *aggregator.InitOperatorRequest) (*aggregator.InitOperatorResponse, error) { + msg, err := message.NewInitOperatorRequest(req) + if err != nil { + return nil, fmt.Errorf("initOperator message convert error: %v", err.Error()) + } + + resp, err := s.aggreagtor.InitOperator(msg) + if err != nil { + return nil, fmt.Errorf("initOperator handler error: %v", err.Error()) + } + + return resp.ToPbType(), nil +} + +// Create a alert task +func (s *GRpcHandler) CreateTask(ctx context.Context, req *aggregator.CreateTaskRequest) (*aggregator.CreateTaskResponse, error) { + msg, err := message.NewCreateTaskRequest(req) + if err != nil { + return nil, fmt.Errorf("createTask message convert error: %v", err.Error()) + } + + resp, err := s.aggreagtor.CreateTask(msg) + if err != nil { + return nil, fmt.Errorf("createTask handler error: %v", err.Error()) + } + + return resp.ToPbType(), nil +} + +// Send signed task for alert +func (s *GRpcHandler) ProcessSignedTaskResponse(ctx context.Context, req *aggregator.SignedTaskRespRequest) (*aggregator.SignedTaskRespResponse, error) { + msg, err := message.NewSignedTaskRespRequest(req) + if err != nil { + return nil, fmt.Errorf("processSignedTaskResponse message convert error: %v", err.Error()) + } + + resp, err := s.aggreagtor.ProcessSignedTaskResponse(msg) + if err != nil { + return nil, fmt.Errorf("processSignedTaskResponse handler error: %v", err.Error()) + } + + return resp.ToPbType(), nil +} diff --git a/aggregator/rpc/handler.go b/aggregator/rpc/handler.go index c2891c9..1c698ef 100644 --- a/aggregator/rpc/handler.go +++ b/aggregator/rpc/handler.go @@ -1,20 +1,9 @@ package rpc import ( - "errors" - "github.com/alt-research/avs/core/message" ) -var ( - TaskNotFoundError400 = errors.New("400. Task not found") - OperatorNotPartOfTaskQuorum400 = errors.New("400. Operator not part of quorum") - TaskResponseDigestNotFoundError500 = errors.New("500. Failed to get task response digest") - UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature") - SignatureVerificationFailed400 = errors.New("400. Signature verification failed") - CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices") -) - type AggregatorRpcHandler interface { InitOperator(req *message.InitOperatorRequest) (*message.InitOperatorResponse, error) CreateTask(req *message.CreateTaskRequest) (*message.CreateTaskResponse, error) diff --git a/aggregator/service.go b/aggregator/service.go index c3e18d9..a154228 100644 --- a/aggregator/service.go +++ b/aggregator/service.go @@ -2,12 +2,10 @@ package aggregator import ( "context" - "errors" "fmt" "sync" "time" - "github.com/Layr-Labs/eigensdk-go/chainio/clients" sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" "github.com/Layr-Labs/eigensdk-go/logging" @@ -15,6 +13,7 @@ import ( blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation" oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys" sdktypes "github.com/Layr-Labs/eigensdk-go/types" + "github.com/alt-research/avs/aggregator/rpc" "github.com/alt-research/avs/aggregator/types" "github.com/alt-research/avs/core/chainio" @@ -23,15 +22,6 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var ( - TaskNotFoundError400 = errors.New("400. Task not found") - OperatorNotPartOfTaskQuorum400 = errors.New("400. Operator not part of quorum") - TaskResponseDigestNotFoundError500 = errors.New("500. Failed to get task response digest") - UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature") - SignatureVerificationFailed400 = errors.New("400. Signature verification failed") - CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices") -) - type AggregatorService struct { logger logging.Logger cfg *config.Config @@ -66,7 +56,7 @@ func NewAggregatorService(c *config.Config) (*AggregatorService, error) { AvsName: avsName, PromMetricsIpPortAddress: ":9090", } - clients, err := clients.BuildAll(chainioConfig, c.PrivateKey, c.Logger) + clients, err := sdkclients.BuildAll(chainioConfig, c.PrivateKey, c.Logger) if err != nil { c.Logger.Errorf("Cannot create sdk clients", "err", err) return nil, err @@ -107,14 +97,14 @@ func (agg *AggregatorService) GetTaskByIndex(taskIndex types.TaskIndex) *message agg.tasksMu.RLock() defer agg.tasksMu.RUnlock() - res, _ := agg.tasks[taskIndex] + res := agg.tasks[taskIndex] return res } func (agg *AggregatorService) newIndex() types.TaskIndex { - agg.tasksMu.Lock() - defer agg.tasksMu.Unlock() + agg.nextTaskIndexMu.Lock() + defer agg.nextTaskIndexMu.Unlock() res := agg.nextTaskIndex agg.nextTaskIndex += 1 @@ -188,7 +178,7 @@ func (agg *AggregatorService) CreateTask(req *message.CreateTaskRequest) (*messa finished := agg.GetFinishedTaskByAlertHash(req.AlertHash) if finished != nil { - return nil, fmt.Errorf("The task 0x%x already finished: 0x%x", req.AlertHash, finished.TxHash) + return nil, fmt.Errorf("the task 0x%x already finished: 0x%x", req.AlertHash, finished.TxHash) } task := agg.GetTaskByAlertHash(req.AlertHash) @@ -295,15 +285,19 @@ func (agg *AggregatorService) sendNewTask(alertHash [32]byte, taskIndex types.Ta // TODO(samlaf): we use seconds for now, but we should ideally pass a blocknumber to the blsAggregationService // and it should monitor the chain and only expire the task aggregation once the chain has reached that block number. - taskTimeToExpiry := taskChallengeWindowBlock * blockTimeSeconds + taskTimeToExpiry := taskChallengeWindowBlock * blockTimeDuration agg.logger.Infof("InitializeNewTask %v %v", taskIndex, taskTimeToExpiry) - agg.blsAggregationService.InitializeNewTask( + err = agg.blsAggregationService.InitializeNewTask( taskIndex, uint32(newAlertTask.ReferenceBlockNumber), newAlertTask.QuorumNumbers, newAlertTask.QuorumThresholdPercentages, taskTimeToExpiry, ) + if err != nil { + agg.logger.Error("InitializeNewTask failed", "err", err) + return nil, err + } return newAlertTask, nil } diff --git a/api/grpc/aggregator/aggregator.pb.go b/api/grpc/aggregator/aggregator.pb.go new file mode 100644 index 0000000..69e37ec --- /dev/null +++ b/api/grpc/aggregator/aggregator.pb.go @@ -0,0 +1,712 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.12.4 +// source: aggregator/aggregator.proto + +package aggregator + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type InitOperatorRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The layer1 chain id for operator to use + Layer1ChainId uint32 `protobuf:"varint,1,opt,name=layer1_chain_id,json=layer1ChainId,proto3" json:"layer1_chain_id,omitempty"` + // The layer2 chain id for operator to use + ChainId uint32 `protobuf:"varint,2,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + // The operator 's id + OperatorId []byte `protobuf:"bytes,3,opt,name=operator_id,json=operatorId,proto3" json:"operator_id,omitempty"` + // The operator 's ecdsa address + OperatorAddress string `protobuf:"bytes,4,opt,name=operator_address,json=operatorAddress,proto3" json:"operator_address,omitempty"` + // The operator_state_retriever_addr + OperatorStateRetrieverAddr string `protobuf:"bytes,5,opt,name=operator_state_retriever_addr,json=operatorStateRetrieverAddr,proto3" json:"operator_state_retriever_addr,omitempty"` + // The registry_coordinator_addr + RegistryCoordinatorAddr string `protobuf:"bytes,6,opt,name=registry_coordinator_addr,json=registryCoordinatorAddr,proto3" json:"registry_coordinator_addr,omitempty"` +} + +func (x *InitOperatorRequest) Reset() { + *x = InitOperatorRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InitOperatorRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InitOperatorRequest) ProtoMessage() {} + +func (x *InitOperatorRequest) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InitOperatorRequest.ProtoReflect.Descriptor instead. +func (*InitOperatorRequest) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{0} +} + +func (x *InitOperatorRequest) GetLayer1ChainId() uint32 { + if x != nil { + return x.Layer1ChainId + } + return 0 +} + +func (x *InitOperatorRequest) GetChainId() uint32 { + if x != nil { + return x.ChainId + } + return 0 +} + +func (x *InitOperatorRequest) GetOperatorId() []byte { + if x != nil { + return x.OperatorId + } + return nil +} + +func (x *InitOperatorRequest) GetOperatorAddress() string { + if x != nil { + return x.OperatorAddress + } + return "" +} + +func (x *InitOperatorRequest) GetOperatorStateRetrieverAddr() string { + if x != nil { + return x.OperatorStateRetrieverAddr + } + return "" +} + +func (x *InitOperatorRequest) GetRegistryCoordinatorAddr() string { + if x != nil { + return x.RegistryCoordinatorAddr + } + return "" +} + +type InitOperatorResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // If the operator 's state is ok + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` + // Reason + Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"` +} + +func (x *InitOperatorResponse) Reset() { + *x = InitOperatorResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InitOperatorResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InitOperatorResponse) ProtoMessage() {} + +func (x *InitOperatorResponse) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InitOperatorResponse.ProtoReflect.Descriptor instead. +func (*InitOperatorResponse) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{1} +} + +func (x *InitOperatorResponse) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +func (x *InitOperatorResponse) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +type CreateTaskRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The hash of alert + AlertHash []byte `protobuf:"bytes,1,opt,name=alert_hash,json=alertHash,proto3" json:"alert_hash,omitempty"` +} + +func (x *CreateTaskRequest) Reset() { + *x = CreateTaskRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateTaskRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateTaskRequest) ProtoMessage() {} + +func (x *CreateTaskRequest) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateTaskRequest.ProtoReflect.Descriptor instead. +func (*CreateTaskRequest) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{2} +} + +func (x *CreateTaskRequest) GetAlertHash() []byte { + if x != nil { + return x.AlertHash + } + return nil +} + +type CreateTaskResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The info of alert + Info *AlertTaskInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` +} + +func (x *CreateTaskResponse) Reset() { + *x = CreateTaskResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateTaskResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateTaskResponse) ProtoMessage() {} + +func (x *CreateTaskResponse) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateTaskResponse.ProtoReflect.Descriptor instead. +func (*CreateTaskResponse) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{3} +} + +func (x *CreateTaskResponse) GetInfo() *AlertTaskInfo { + if x != nil { + return x.Info + } + return nil +} + +type SignedTaskRespRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The alert + Alert *AlertTaskInfo `protobuf:"bytes,1,opt,name=alert,proto3" json:"alert,omitempty"` + // The operator's BLS signature signed on the keccak256 hash + OperatorRequestSignature []byte `protobuf:"bytes,2,opt,name=operator_request_signature,json=operatorRequestSignature,proto3" json:"operator_request_signature,omitempty"` + // The operator 's id + OperatorId []byte `protobuf:"bytes,3,opt,name=operator_id,json=operatorId,proto3" json:"operator_id,omitempty"` +} + +func (x *SignedTaskRespRequest) Reset() { + *x = SignedTaskRespRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SignedTaskRespRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SignedTaskRespRequest) ProtoMessage() {} + +func (x *SignedTaskRespRequest) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SignedTaskRespRequest.ProtoReflect.Descriptor instead. +func (*SignedTaskRespRequest) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{4} +} + +func (x *SignedTaskRespRequest) GetAlert() *AlertTaskInfo { + if x != nil { + return x.Alert + } + return nil +} + +func (x *SignedTaskRespRequest) GetOperatorRequestSignature() []byte { + if x != nil { + return x.OperatorRequestSignature + } + return nil +} + +func (x *SignedTaskRespRequest) GetOperatorId() []byte { + if x != nil { + return x.OperatorId + } + return nil +} + +type SignedTaskRespResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // If need reply + Reply bool `protobuf:"varint,1,opt,name=reply,proto3" json:"reply,omitempty"` + // The tx hash of send + TxHash []byte `protobuf:"bytes,2,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` +} + +func (x *SignedTaskRespResponse) Reset() { + *x = SignedTaskRespResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SignedTaskRespResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SignedTaskRespResponse) ProtoMessage() {} + +func (x *SignedTaskRespResponse) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SignedTaskRespResponse.ProtoReflect.Descriptor instead. +func (*SignedTaskRespResponse) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{5} +} + +func (x *SignedTaskRespResponse) GetReply() bool { + if x != nil { + return x.Reply + } + return false +} + +func (x *SignedTaskRespResponse) GetTxHash() []byte { + if x != nil { + return x.TxHash + } + return nil +} + +type AlertTaskInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The hash of alert + AlertHash []byte `protobuf:"bytes,1,opt,name=alert_hash,json=alertHash,proto3" json:"alert_hash,omitempty"` + // QuorumNumbers of task + QuorumNumbers []byte `protobuf:"bytes,2,opt,name=quorum_numbers,json=quorumNumbers,proto3" json:"quorum_numbers,omitempty"` + // QuorumThresholdPercentages of task + QuorumThresholdPercentages []byte `protobuf:"bytes,3,opt,name=quorum_threshold_percentages,json=quorumThresholdPercentages,proto3" json:"quorum_threshold_percentages,omitempty"` + // TaskIndex + TaskIndex uint32 `protobuf:"varint,4,opt,name=task_index,json=taskIndex,proto3" json:"task_index,omitempty"` + // ReferenceBlockNumber + ReferenceBlockNumber uint64 `protobuf:"varint,5,opt,name=reference_block_number,json=referenceBlockNumber,proto3" json:"reference_block_number,omitempty"` +} + +func (x *AlertTaskInfo) Reset() { + *x = AlertTaskInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_aggregator_aggregator_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AlertTaskInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AlertTaskInfo) ProtoMessage() {} + +func (x *AlertTaskInfo) ProtoReflect() protoreflect.Message { + mi := &file_aggregator_aggregator_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AlertTaskInfo.ProtoReflect.Descriptor instead. +func (*AlertTaskInfo) Descriptor() ([]byte, []int) { + return file_aggregator_aggregator_proto_rawDescGZIP(), []int{6} +} + +func (x *AlertTaskInfo) GetAlertHash() []byte { + if x != nil { + return x.AlertHash + } + return nil +} + +func (x *AlertTaskInfo) GetQuorumNumbers() []byte { + if x != nil { + return x.QuorumNumbers + } + return nil +} + +func (x *AlertTaskInfo) GetQuorumThresholdPercentages() []byte { + if x != nil { + return x.QuorumThresholdPercentages + } + return nil +} + +func (x *AlertTaskInfo) GetTaskIndex() uint32 { + if x != nil { + return x.TaskIndex + } + return 0 +} + +func (x *AlertTaskInfo) GetReferenceBlockNumber() uint64 { + if x != nil { + return x.ReferenceBlockNumber + } + return 0 +} + +var File_aggregator_aggregator_proto protoreflect.FileDescriptor + +var file_aggregator_aggregator_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x67, 0x67, + 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x61, + 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x22, 0xa3, 0x02, 0x0a, 0x13, 0x49, 0x6e, + 0x69, 0x74, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x26, 0x0a, 0x0f, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x31, 0x5f, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x6c, 0x61, 0x79, 0x65, + 0x72, 0x31, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, + 0x74, 0x6f, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, + 0x12, 0x41, 0x0a, 0x1d, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x65, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x5f, 0x61, 0x64, 0x64, + 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x41, + 0x64, 0x64, 0x72, 0x12, 0x3a, 0x0a, 0x19, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x5f, + 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x61, 0x64, 0x64, 0x72, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, + 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x41, 0x64, 0x64, 0x72, 0x22, + 0x3e, 0x0a, 0x14, 0x49, 0x6e, 0x69, 0x74, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, + 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, + 0x32, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x5f, 0x68, 0x61, + 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x48, + 0x61, 0x73, 0x68, 0x22, 0x43, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x61, 0x73, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x04, 0x69, 0x6e, 0x66, + 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, + 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x6e, + 0x66, 0x6f, 0x52, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0xa7, 0x01, 0x0a, 0x15, 0x53, 0x69, 0x67, + 0x6e, 0x65, 0x64, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x05, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x19, 0x2e, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x41, + 0x6c, 0x65, 0x72, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x61, 0x6c, + 0x65, 0x72, 0x74, 0x12, 0x3c, 0x0a, 0x1a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x5f, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x18, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x69, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, + 0x49, 0x64, 0x22, 0x47, 0x0a, 0x16, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x54, 0x61, 0x73, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x72, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x70, + 0x6c, 0x79, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x22, 0xec, 0x01, 0x0a, 0x0d, + 0x41, 0x6c, 0x65, 0x72, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1d, 0x0a, + 0x0a, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x09, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x48, 0x61, 0x73, 0x68, 0x12, 0x25, 0x0a, 0x0e, + 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x4e, 0x75, 0x6d, 0x62, + 0x65, 0x72, 0x73, 0x12, 0x40, 0x0a, 0x1c, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x5f, 0x74, 0x68, + 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x5f, 0x70, 0x65, 0x72, 0x63, 0x65, 0x6e, 0x74, 0x61, + 0x67, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x1a, 0x71, 0x75, 0x6f, 0x72, 0x75, + 0x6d, 0x54, 0x68, 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x50, 0x65, 0x72, 0x63, 0x65, 0x6e, + 0x74, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x6e, + 0x64, 0x65, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x49, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x34, 0x0a, 0x16, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, + 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x14, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x32, 0x96, 0x02, 0x0a, 0x0a, 0x41, + 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x53, 0x0a, 0x0c, 0x49, 0x6e, 0x69, + 0x74, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x2e, 0x61, 0x67, 0x67, 0x72, + 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4f, 0x70, 0x65, 0x72, 0x61, + 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x61, 0x67, 0x67, + 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4f, 0x70, 0x65, 0x72, + 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4d, + 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1d, 0x2e, 0x61, + 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x61, 0x67, + 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, + 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x64, 0x0a, + 0x19, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x2e, 0x61, 0x67, 0x67, + 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, + 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x65, + 0x64, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x61, 0x6c, 0x74, 0x2d, 0x72, 0x65, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x61, + 0x76, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x67, 0x72, + 0x65, 0x67, 0x61, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_aggregator_aggregator_proto_rawDescOnce sync.Once + file_aggregator_aggregator_proto_rawDescData = file_aggregator_aggregator_proto_rawDesc +) + +func file_aggregator_aggregator_proto_rawDescGZIP() []byte { + file_aggregator_aggregator_proto_rawDescOnce.Do(func() { + file_aggregator_aggregator_proto_rawDescData = protoimpl.X.CompressGZIP(file_aggregator_aggregator_proto_rawDescData) + }) + return file_aggregator_aggregator_proto_rawDescData +} + +var file_aggregator_aggregator_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_aggregator_aggregator_proto_goTypes = []interface{}{ + (*InitOperatorRequest)(nil), // 0: aggregator.InitOperatorRequest + (*InitOperatorResponse)(nil), // 1: aggregator.InitOperatorResponse + (*CreateTaskRequest)(nil), // 2: aggregator.CreateTaskRequest + (*CreateTaskResponse)(nil), // 3: aggregator.CreateTaskResponse + (*SignedTaskRespRequest)(nil), // 4: aggregator.SignedTaskRespRequest + (*SignedTaskRespResponse)(nil), // 5: aggregator.SignedTaskRespResponse + (*AlertTaskInfo)(nil), // 6: aggregator.AlertTaskInfo +} +var file_aggregator_aggregator_proto_depIdxs = []int32{ + 6, // 0: aggregator.CreateTaskResponse.info:type_name -> aggregator.AlertTaskInfo + 6, // 1: aggregator.SignedTaskRespRequest.alert:type_name -> aggregator.AlertTaskInfo + 0, // 2: aggregator.Aggregator.InitOperator:input_type -> aggregator.InitOperatorRequest + 2, // 3: aggregator.Aggregator.CreateTask:input_type -> aggregator.CreateTaskRequest + 4, // 4: aggregator.Aggregator.ProcessSignedTaskResponse:input_type -> aggregator.SignedTaskRespRequest + 1, // 5: aggregator.Aggregator.InitOperator:output_type -> aggregator.InitOperatorResponse + 3, // 6: aggregator.Aggregator.CreateTask:output_type -> aggregator.CreateTaskResponse + 5, // 7: aggregator.Aggregator.ProcessSignedTaskResponse:output_type -> aggregator.SignedTaskRespResponse + 5, // [5:8] is the sub-list for method output_type + 2, // [2:5] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_aggregator_aggregator_proto_init() } +func file_aggregator_aggregator_proto_init() { + if File_aggregator_aggregator_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_aggregator_aggregator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InitOperatorRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InitOperatorResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateTaskRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateTaskResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SignedTaskRespRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SignedTaskRespResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_aggregator_aggregator_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AlertTaskInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_aggregator_aggregator_proto_rawDesc, + NumEnums: 0, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_aggregator_aggregator_proto_goTypes, + DependencyIndexes: file_aggregator_aggregator_proto_depIdxs, + MessageInfos: file_aggregator_aggregator_proto_msgTypes, + }.Build() + File_aggregator_aggregator_proto = out.File + file_aggregator_aggregator_proto_rawDesc = nil + file_aggregator_aggregator_proto_goTypes = nil + file_aggregator_aggregator_proto_depIdxs = nil +} diff --git a/api/grpc/aggregator/aggregator_grpc.pb.go b/api/grpc/aggregator/aggregator_grpc.pb.go new file mode 100644 index 0000000..fd97c42 --- /dev/null +++ b/api/grpc/aggregator/aggregator_grpc.pb.go @@ -0,0 +1,183 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.12.4 +// source: aggregator/aggregator.proto + +package aggregator + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// AggregatorClient is the client API for Aggregator service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AggregatorClient interface { + // Send Init operator to aggregator from operator, will check if the config is matching + InitOperator(ctx context.Context, in *InitOperatorRequest, opts ...grpc.CallOption) (*InitOperatorResponse, error) + // Create a alert task + CreateTask(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) + // Send signed task for alert + ProcessSignedTaskResponse(ctx context.Context, in *SignedTaskRespRequest, opts ...grpc.CallOption) (*SignedTaskRespResponse, error) +} + +type aggregatorClient struct { + cc grpc.ClientConnInterface +} + +func NewAggregatorClient(cc grpc.ClientConnInterface) AggregatorClient { + return &aggregatorClient{cc} +} + +func (c *aggregatorClient) InitOperator(ctx context.Context, in *InitOperatorRequest, opts ...grpc.CallOption) (*InitOperatorResponse, error) { + out := new(InitOperatorResponse) + err := c.cc.Invoke(ctx, "/aggregator.Aggregator/InitOperator", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *aggregatorClient) CreateTask(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) { + out := new(CreateTaskResponse) + err := c.cc.Invoke(ctx, "/aggregator.Aggregator/CreateTask", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *aggregatorClient) ProcessSignedTaskResponse(ctx context.Context, in *SignedTaskRespRequest, opts ...grpc.CallOption) (*SignedTaskRespResponse, error) { + out := new(SignedTaskRespResponse) + err := c.cc.Invoke(ctx, "/aggregator.Aggregator/ProcessSignedTaskResponse", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AggregatorServer is the server API for Aggregator service. +// All implementations must embed UnimplementedAggregatorServer +// for forward compatibility +type AggregatorServer interface { + // Send Init operator to aggregator from operator, will check if the config is matching + InitOperator(context.Context, *InitOperatorRequest) (*InitOperatorResponse, error) + // Create a alert task + CreateTask(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error) + // Send signed task for alert + ProcessSignedTaskResponse(context.Context, *SignedTaskRespRequest) (*SignedTaskRespResponse, error) + mustEmbedUnimplementedAggregatorServer() +} + +// UnimplementedAggregatorServer must be embedded to have forward compatible implementations. +type UnimplementedAggregatorServer struct { +} + +func (UnimplementedAggregatorServer) InitOperator(context.Context, *InitOperatorRequest) (*InitOperatorResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method InitOperator not implemented") +} +func (UnimplementedAggregatorServer) CreateTask(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateTask not implemented") +} +func (UnimplementedAggregatorServer) ProcessSignedTaskResponse(context.Context, *SignedTaskRespRequest) (*SignedTaskRespResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ProcessSignedTaskResponse not implemented") +} +func (UnimplementedAggregatorServer) mustEmbedUnimplementedAggregatorServer() {} + +// UnsafeAggregatorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AggregatorServer will +// result in compilation errors. +type UnsafeAggregatorServer interface { + mustEmbedUnimplementedAggregatorServer() +} + +func RegisterAggregatorServer(s grpc.ServiceRegistrar, srv AggregatorServer) { + s.RegisterService(&Aggregator_ServiceDesc, srv) +} + +func _Aggregator_InitOperator_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InitOperatorRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AggregatorServer).InitOperator(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/aggregator.Aggregator/InitOperator", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AggregatorServer).InitOperator(ctx, req.(*InitOperatorRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Aggregator_CreateTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateTaskRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AggregatorServer).CreateTask(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/aggregator.Aggregator/CreateTask", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AggregatorServer).CreateTask(ctx, req.(*CreateTaskRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Aggregator_ProcessSignedTaskResponse_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SignedTaskRespRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AggregatorServer).ProcessSignedTaskResponse(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/aggregator.Aggregator/ProcessSignedTaskResponse", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AggregatorServer).ProcessSignedTaskResponse(ctx, req.(*SignedTaskRespRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Aggregator_ServiceDesc is the grpc.ServiceDesc for Aggregator service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Aggregator_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "aggregator.Aggregator", + HandlerType: (*AggregatorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "InitOperator", + Handler: _Aggregator_InitOperator_Handler, + }, + { + MethodName: "CreateTask", + Handler: _Aggregator_CreateTask_Handler, + }, + { + MethodName: "ProcessSignedTaskResponse", + Handler: _Aggregator_ProcessSignedTaskResponse_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "aggregator/aggregator.proto", +} diff --git a/api/proto/aggregator/aggregator.proto b/api/proto/aggregator/aggregator.proto new file mode 100644 index 0000000..fcaedf7 --- /dev/null +++ b/api/proto/aggregator/aggregator.proto @@ -0,0 +1,76 @@ +syntax = "proto3"; + +option go_package = "github.com/alt-research/avs/api/grpc/aggregator"; +package aggregator; + +// The Aggregator is for collection the bls signatures from operator and +// then commit the task to AVS. +service Aggregator { + // Send Init operator to aggregator from operator, will check if the config is matching + rpc InitOperator(InitOperatorRequest) returns (InitOperatorResponse) {} + // Create a alert task + rpc CreateTask(CreateTaskRequest) returns (CreateTaskResponse) {} + // Send signed task for alert + rpc ProcessSignedTaskResponse(SignedTaskRespRequest) returns (SignedTaskRespResponse) {} +} + +message InitOperatorRequest { + // The layer1 chain id for operator to use + uint32 layer1_chain_id = 1; + // The layer2 chain id for operator to use + uint32 chain_id = 2; + // The operator 's id + bytes operator_id = 3; + // The operator 's ecdsa address + string operator_address = 4; + // The operator_state_retriever_addr + string operator_state_retriever_addr = 5; + // The registry_coordinator_addr + string registry_coordinator_addr = 6; +} + +message InitOperatorResponse { + // If the operator 's state is ok + bool ok = 1; + // Reason + string reason = 2; +} + +message CreateTaskRequest { + // The hash of alert + bytes alert_hash = 1; +} + +message CreateTaskResponse { + // The info of alert + AlertTaskInfo info = 1; +} + +message SignedTaskRespRequest { + // The alert + AlertTaskInfo alert = 1; + // The operator's BLS signature signed on the keccak256 hash + bytes operator_request_signature = 2; + // The operator 's id + bytes operator_id = 3; +} + +message SignedTaskRespResponse { + // If need reply + bool reply = 1; + // The tx hash of send + bytes tx_hash = 2; +} + +message AlertTaskInfo { + // The hash of alert + bytes alert_hash = 1; + // QuorumNumbers of task + bytes quorum_numbers = 2; + // QuorumThresholdPercentages of task + bytes quorum_threshold_percentages = 3; + // TaskIndex + uint32 task_index = 4; + // ReferenceBlockNumber + uint64 reference_block_number = 5; +} diff --git a/config-files/aggregator.yaml b/config-files/aggregator.yaml index dd7d56b..d33f81c 100644 --- a/config-files/aggregator.yaml +++ b/config-files/aggregator.yaml @@ -1,13 +1,16 @@ # 'production' only prints info and above. 'development' also prints debug -environment: production +environment: development eth_rpc_url: http://localhost:8545 eth_ws_url: ws://localhost:8545 # address which the aggregator listens on for operator signed messages aggregator_server_ip_port_address: localhost:8090 +# address which the aggregator grpc listens on for operator signed messages +aggregator_grpc_server_ip_port_address: localhost:8190 + # the layer1 chain id the avs contracts in layer1_chain_id: 31337 # the layer2 chain id -layer2_chain_id: 20240219 +layer2_chain_id: 0 diff --git a/config-files/operator.yaml b/config-files/operator.yaml index 2490a63..6a3b6b2 100644 --- a/config-files/operator.yaml +++ b/config-files/operator.yaml @@ -21,6 +21,9 @@ bls_private_key_store_path: ./config-files/key/test1.bls.key.json aggregator_server_ip_port_address: localhost:8090 +# address which the aggregator grpc listens on for operator signed messages +aggregator_grpc_server_ip_port_address: localhost:8190 + # avs node spec compliance https://eigen.nethermind.io/docs/spec/intro eigen_metrics_ip_port_address: localhost:9090 enable_metrics: true @@ -33,4 +36,10 @@ enable_node_api: true avs_registry_coordinator_address: 0xa82fF9aFd8f496c3d6ac40E2a0F282E47488CFc9 operator_state_retriever_address: 0x0E801D84Fa97b50751Dbf25036d067dCf18858bF -operator_server_ip_port_addr: localhost:8091 \ No newline at end of file +operator_server_ip_port_addr: localhost:8091 + +# the layer1 chain id the avs contracts in +layer1_chain_id: 31337 + +# the layer2 chain id +layer2_chain_id: 0 diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 30cf3cc..cbd68c1 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -30,7 +30,6 @@ type AvsWriter struct { AvsContractBindings *AvsManagersBindings logger logging.Logger TxMgr txmgr.TxManager - client eth.Client } var _ AvsWriterer = (*AvsWriter)(nil) diff --git a/core/config/avs_config.go b/core/config/avs_config.go index 59d53e3..cd6bdf9 100644 --- a/core/config/avs_config.go +++ b/core/config/avs_config.go @@ -2,21 +2,22 @@ package config type NodeConfig struct { // used to set the logger level (true = info, false = debug) - Production bool `yaml:"production"` - OperatorStateRetrieverAddress string `yaml:"operator_state_retriever_address"` - AVSRegistryCoordinatorAddress string `yaml:"avs_registry_coordinator_address"` - EthRpcUrl string `yaml:"eth_rpc_url"` - EthWsUrl string `yaml:"eth_ws_url"` - BlsPrivateKeyStorePath string `yaml:"bls_private_key_store_path"` - EcdsaPrivateKeyStorePath string `yaml:"ecdsa_private_key_store_path"` - AggregatorServerIpPortAddress string `yaml:"aggregator_server_ip_port_address"` - EigenMetricsIpPortAddress string `yaml:"eigen_metrics_ip_port_address"` - EnableMetrics bool `yaml:"enable_metrics"` - NodeApiIpPortAddress string `yaml:"node_api_ip_port_address"` - EnableNodeApi bool `yaml:"enable_node_api"` - OperatorServerIpPortAddr string `yaml:"operator_server_ip_port_addr"` - MetadataURI string `yaml:"metadata_uri"` - OperatorSocket string `yaml:"operator_socket"` - Layer1ChainId uint32 `yaml:"layer1_chain_id"` - Layer2ChainId uint32 `yaml:"layer2_chain_id"` + Production bool `yaml:"production"` + OperatorStateRetrieverAddress string `yaml:"operator_state_retriever_address"` + AVSRegistryCoordinatorAddress string `yaml:"avs_registry_coordinator_address"` + EthRpcUrl string `yaml:"eth_rpc_url"` + EthWsUrl string `yaml:"eth_ws_url"` + BlsPrivateKeyStorePath string `yaml:"bls_private_key_store_path"` + EcdsaPrivateKeyStorePath string `yaml:"ecdsa_private_key_store_path"` + AggregatorServerIpPortAddress string `yaml:"aggregator_server_ip_port_address"` + AggregatorGRPCServerIpPortAddress string `yaml:"aggregator_grpc_server_ip_port_address"` + EigenMetricsIpPortAddress string `yaml:"eigen_metrics_ip_port_address"` + EnableMetrics bool `yaml:"enable_metrics"` + NodeApiIpPortAddress string `yaml:"node_api_ip_port_address"` + EnableNodeApi bool `yaml:"enable_node_api"` + OperatorServerIpPortAddr string `yaml:"operator_server_ip_port_addr"` + MetadataURI string `yaml:"metadata_uri"` + OperatorSocket string `yaml:"operator_socket"` + Layer1ChainId uint32 `yaml:"layer1_chain_id"` + Layer2ChainId uint32 `yaml:"layer2_chain_id"` } diff --git a/core/config/config.go b/core/config/config.go index 75ce106..afb16ba 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -31,15 +31,16 @@ type Config struct { EigenMetricsIpPortAddress string // we need the url for the eigensdk currently... eventually standardize api so as to // only take an ethclient or an rpcUrl (and build the ethclient at each constructor site) - EthHttpRpcUrl string - EthWsRpcUrl string - EthHttpClient eth.Client - EthWsClient eth.Client - OperatorStateRetrieverAddr common.Address - RegistryCoordinatorAddr common.Address - AggregatorServerIpPortAddr string - Layer1ChainId uint32 - Layer2ChainId uint32 + EthHttpRpcUrl string + EthWsRpcUrl string + EthHttpClient eth.Client + EthWsClient eth.Client + OperatorStateRetrieverAddr common.Address + RegistryCoordinatorAddr common.Address + AggregatorServerIpPortAddr string + AggregatorGRPCServerIpPortAddr string + Layer1ChainId uint32 + Layer2ChainId uint32 // json:"-" skips this field when marshaling (only used for logging to stdout), since SignerFn doesnt implement marshalJson SignerFn signerv2.SignerFn `json:"-"` PrivateKey *ecdsa.PrivateKey `json:"-"` @@ -49,12 +50,13 @@ type Config struct { // These are read from ConfigFileFlag type ConfigRaw struct { - Environment sdklogging.LogLevel `yaml:"environment"` - EthRpcUrl string `yaml:"eth_rpc_url"` - EthWsUrl string `yaml:"eth_ws_url"` - AggregatorServerIpPortAddr string `yaml:"aggregator_server_ip_port_address"` - Layer1ChainId uint32 `yaml:"layer1_chain_id"` - Layer2ChainId uint32 `yaml:"layer2_chain_id"` + Environment sdklogging.LogLevel `yaml:"environment"` + EthRpcUrl string `yaml:"eth_rpc_url"` + EthWsUrl string `yaml:"eth_ws_url"` + AggregatorServerIpPortAddr string `yaml:"aggregator_server_ip_port_address"` + AggregatorGRPCServerIpPortAddr string `yaml:"aggregator_grpc_server_ip_port_address"` + Layer1ChainId uint32 `yaml:"layer1_chain_id"` + Layer2ChainId uint32 `yaml:"layer2_chain_id"` } // These are read from DeploymentFileFlag @@ -71,7 +73,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { var configRaw ConfigRaw configFilePath := ctx.GlobalString(ConfigFileFlag.Name) if configFilePath != "" { - sdkutils.ReadYamlConfig(configFilePath, &configRaw) + err := sdkutils.ReadYamlConfig(configFilePath, &configRaw) + if err != nil { + return nil, err + } } ethRpcUrl, ok := os.LookupEnv("ETH_RPC_URL") @@ -89,6 +94,11 @@ func NewConfig(ctx *cli.Context) (*Config, error) { configRaw.AggregatorServerIpPortAddr = aggregatorServerIpPortAddress } + aggregatorGRPCServerIpPortAddress, ok := os.LookupEnv("AGGREGATOR_GRPC_SERVER_URL") + if ok && aggregatorGRPCServerIpPortAddress != "" { + configRaw.AggregatorGRPCServerIpPortAddr = aggregatorGRPCServerIpPortAddress + } + var deploymentRaw MachAvsDeploymentRaw avsRegistryCoordinatorAddress, rcOk := os.LookupEnv("AVS_REGISTRY_COORDINATOR_ADDRESS") @@ -168,25 +178,26 @@ func NewConfig(ctx *cli.Context) (*Config, error) { txSender, err := wallet.NewPrivateKeyWallet(ethRpcClient, signerV2, aggregatorAddr, logger) if err != nil { - return nil, types.WrapError(errors.New("Failed to create transaction sender"), err) + return nil, types.WrapError(errors.New("failed to create transaction sender"), err) } txMgr := txmgr.NewSimpleTxManager(txSender, ethRpcClient, logger, aggregatorAddr) config := &Config{ - Logger: logger, - EthWsRpcUrl: configRaw.EthWsUrl, - EthHttpRpcUrl: configRaw.EthRpcUrl, - EthHttpClient: ethRpcClient, - EthWsClient: ethWsClient, - OperatorStateRetrieverAddr: common.HexToAddress(deploymentRaw.OperatorStateRetrieverAddr), - RegistryCoordinatorAddr: common.HexToAddress(deploymentRaw.RegistryCoordinatorAddr), - AggregatorServerIpPortAddr: configRaw.AggregatorServerIpPortAddr, - SignerFn: signerV2, - PrivateKey: ecdsaPrivateKey, - TxMgr: txMgr, - AggregatorAddress: aggregatorAddr, - Layer1ChainId: configRaw.Layer1ChainId, - Layer2ChainId: configRaw.Layer2ChainId, + Logger: logger, + EthWsRpcUrl: configRaw.EthWsUrl, + EthHttpRpcUrl: configRaw.EthRpcUrl, + EthHttpClient: ethRpcClient, + EthWsClient: ethWsClient, + OperatorStateRetrieverAddr: common.HexToAddress(deploymentRaw.OperatorStateRetrieverAddr), + RegistryCoordinatorAddr: common.HexToAddress(deploymentRaw.RegistryCoordinatorAddr), + AggregatorServerIpPortAddr: configRaw.AggregatorServerIpPortAddr, + AggregatorGRPCServerIpPortAddr: configRaw.AggregatorGRPCServerIpPortAddr, + SignerFn: signerV2, + PrivateKey: ecdsaPrivateKey, + TxMgr: txMgr, + AggregatorAddress: aggregatorAddr, + Layer1ChainId: configRaw.Layer1ChainId, + Layer2ChainId: configRaw.Layer2ChainId, } config.validate() return config, nil diff --git a/core/message/task.go b/core/message/task.go index d526384..03e41f8 100644 --- a/core/message/task.go +++ b/core/message/task.go @@ -1,14 +1,18 @@ package message import ( - csservicemanager "github.com/alt-research/avs/contracts/bindings/MachServiceManager" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "golang.org/x/crypto/sha3" + "fmt" "github.com/Layr-Labs/eigensdk-go/crypto/bls" sdktypes "github.com/Layr-Labs/eigensdk-go/types" "github.com/alt-research/avs/aggregator/types" + "github.com/alt-research/avs/api/grpc/aggregator" + "github.com/alt-research/avs/core" + + csservicemanager "github.com/alt-research/avs/contracts/bindings/MachServiceManager" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "golang.org/x/crypto/sha3" ) // The Alert task Information @@ -20,6 +24,34 @@ type AlertTaskInfo struct { ReferenceBlockNumber uint64 } +func NewAlertTaskInfo(req *aggregator.AlertTaskInfo) (*AlertTaskInfo, error) { + alertHash := req.GetAlertHash() + if len(alertHash) != 32 { + return nil, fmt.Errorf("operator ID len should be 32") + } + + res := &AlertTaskInfo{ + QuorumNumbers: core.ConvertQuorumNumbersFromBytes(req.GetQuorumNumbers()), + QuorumThresholdPercentages: core.ConvertQuorumThresholdPercentagesFromBytes(req.GetQuorumThresholdPercentages()), + TaskIndex: req.GetTaskIndex(), + ReferenceBlockNumber: req.GetReferenceBlockNumber(), + } + + copy(res.AlertHash[:], alertHash[:32]) + + return res, nil +} + +func (r AlertTaskInfo) ToPbType() *aggregator.AlertTaskInfo { + return &aggregator.AlertTaskInfo{ + AlertHash: r.AlertHash[:], + QuorumNumbers: r.QuorumNumbers.UnderlyingType(), + QuorumThresholdPercentages: r.QuorumThresholdPercentages.UnderlyingType(), + TaskIndex: r.TaskIndex, + ReferenceBlockNumber: r.ReferenceBlockNumber, + } +} + func (a *AlertTaskInfo) EncodeSigHash() ([]byte, error) { // The order here has to match the field ordering of ReducedBatchHeader defined in IEigenDAServiceManager.sol // ref: https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 @@ -74,21 +106,10 @@ func (a AlertTaskInfo) SignHash() ([32]byte, error) { } func (a AlertTaskInfo) ToIMachServiceManagerAlertHeader() csservicemanager.IMachServiceManagerAlertHeader { - quorumNumbers := make([]byte, len(a.QuorumNumbers)) - quorumThresholdPercentages := make([]byte, len(a.QuorumThresholdPercentages)) - - for i, _ := range a.QuorumNumbers { - quorumNumbers[i] = byte(a.QuorumNumbers[i]) - } - - for i, _ := range a.QuorumThresholdPercentages { - quorumThresholdPercentages[i] = byte(a.QuorumThresholdPercentages[i]) - } - return csservicemanager.IMachServiceManagerAlertHeader{ MessageHash: a.AlertHash, - QuorumNumbers: quorumNumbers, - QuorumThresholdPercentages: quorumThresholdPercentages, + QuorumNumbers: a.QuorumNumbers.UnderlyingType(), + QuorumThresholdPercentages: a.QuorumThresholdPercentages.UnderlyingType(), ReferenceBlockNumber: uint32(a.ReferenceBlockNumber), } } @@ -103,23 +124,80 @@ type InitOperatorRequest struct { RegistryCoordinatorAddr common.Address } +func NewInitOperatorRequest(req *aggregator.InitOperatorRequest) (*InitOperatorRequest, error) { + operatorId := req.GetOperatorId() + if len(operatorId) != 32 { + return nil, fmt.Errorf("operator ID len should be 32") + } + + if !common.IsHexAddress(req.GetOperatorAddress()) { + return nil, fmt.Errorf("operatorAddress not a hex address") + } + operatorAddress := common.HexToAddress(req.OperatorAddress) + + if !common.IsHexAddress(req.GetOperatorStateRetrieverAddr()) { + return nil, fmt.Errorf("operatorStateRetrieverAddr not a hex address") + } + operatorStateRetrieverAddr := common.HexToAddress(req.GetOperatorStateRetrieverAddr()) + + if !common.IsHexAddress(req.GetRegistryCoordinatorAddr()) { + return nil, fmt.Errorf("registryCoordinatorAddr not a hex address") + } + registryCoordinatorAddr := common.HexToAddress(req.GetRegistryCoordinatorAddr()) + + res := &InitOperatorRequest{ + Layer1ChainId: req.GetLayer1ChainId(), + ChainId: req.GetChainId(), + OperatorAddress: operatorAddress, + OperatorStateRetrieverAddr: operatorStateRetrieverAddr, + RegistryCoordinatorAddr: registryCoordinatorAddr, + } + + copy(res.OperatorId[:], operatorId[:32]) + + return res, nil +} + // The init operator response type InitOperatorResponse struct { Ok bool Res string } +func (r InitOperatorResponse) ToPbType() *aggregator.InitOperatorResponse { + return &aggregator.InitOperatorResponse{ + Ok: r.Ok, + Reason: r.Res, + } +} + // The Alert task create request type CreateTaskRequest struct { AlertHash [32]byte } +func NewCreateTaskRequest(req *aggregator.CreateTaskRequest) (*CreateTaskRequest, error) { + alertHash := req.GetAlertHash() + if len(alertHash) != 32 { + return nil, fmt.Errorf("operator ID len should be 32") + } + + res := &CreateTaskRequest{} + + copy(res.AlertHash[:], alertHash[:32]) + + return res, nil +} + // The Alert task create response type CreateTaskResponse struct { Info AlertTaskInfo } -type ProcessSignedTaskResponseResponse struct { +func (r CreateTaskResponse) ToPbType() *aggregator.CreateTaskResponse { + return &aggregator.CreateTaskResponse{ + Info: r.Info.ToPbType(), + } } type SignedTaskRespRequest struct { @@ -128,7 +206,43 @@ type SignedTaskRespRequest struct { OperatorId sdktypes.OperatorId } +func NewSignedTaskRespRequest(req *aggregator.SignedTaskRespRequest) (*SignedTaskRespRequest, error) { + operatorId := req.GetOperatorId() + if len(operatorId) != 32 { + return nil, fmt.Errorf("operator ID len should be 32") + } + + alert, err := NewAlertTaskInfo(req.GetAlert()) + if err != nil { + return nil, fmt.Errorf("new alert task info failed: %v", err.Error()) + } + + signRaw := req.GetOperatorRequestSignature() + if len(signRaw) != 64 { + return nil, fmt.Errorf("operatorRequestSignature len should be 64") + } + + g1Point := bls.NewZeroG1Point().Deserialize(signRaw) + sign := bls.Signature{G1Point: g1Point} + + res := &SignedTaskRespRequest{ + Alert: *alert, + BlsSignature: sign, + } + + copy(res.OperatorId[:], operatorId[:32]) + + return res, nil +} + type SignedTaskRespResponse struct { Reply bool TxHash [32]byte } + +func (r SignedTaskRespResponse) ToPbType() *aggregator.SignedTaskRespResponse { + return &aggregator.SignedTaskRespResponse{ + Reply: r.Reply, + TxHash: r.TxHash[:], + } +} diff --git a/core/utils.go b/core/utils.go index 248196c..403d43a 100644 --- a/core/utils.go +++ b/core/utils.go @@ -4,6 +4,7 @@ import ( "math/big" "github.com/Layr-Labs/eigensdk-go/crypto/bls" + sdktypes "github.com/Layr-Labs/eigensdk-go/types" csservicemanager "github.com/alt-research/avs/contracts/bindings/MachServiceManager" ) @@ -27,3 +28,19 @@ func ConvertToBN254G2Point(input *bls.G2Point) csservicemanager.BN254G2Point { } return output } + +func ConvertQuorumNumbersFromBytes(numbers []byte) sdktypes.QuorumNums { + quorumNumbers := make([]sdktypes.QuorumNum, len(numbers)) + for i, v := range numbers { + quorumNumbers[i] = sdktypes.QuorumNum(v) + } + return quorumNumbers +} + +func ConvertQuorumThresholdPercentagesFromBytes(numbers []byte) sdktypes.QuorumThresholdPercentages { + quorumNumbers := make([]sdktypes.QuorumThresholdPercentage, len(numbers)) + for i, v := range numbers { + quorumNumbers[i] = sdktypes.QuorumThresholdPercentage(v) + } + return quorumNumbers +} diff --git a/go.mod b/go.mod index 1a338f0..9928305 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.18.0 + google.golang.org/grpc v1.58.3 + google.golang.org/protobuf v1.33.0 ) require ( @@ -37,6 +39,7 @@ require ( github.com/getsentry/sentry-go v0.18.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/holiman/uint256 v1.2.4 // indirect @@ -53,10 +56,12 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/mod v0.16.0 // indirect + golang.org/x/net v0.20.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/go.sum b/go.sum index 52f04d2..5ecf9ef 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= @@ -401,6 +403,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -477,12 +481,17 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/operator/grpc_client.go b/operator/grpc_client.go new file mode 100644 index 0000000..66d9cdc --- /dev/null +++ b/operator/grpc_client.go @@ -0,0 +1,172 @@ +package operator + +import ( + "context" + "fmt" + "net/rpc" + "time" + + "github.com/Layr-Labs/eigensdk-go/logging" + sdktypes "github.com/Layr-Labs/eigensdk-go/types" + "github.com/alt-research/avs/api/grpc/aggregator" + "github.com/alt-research/avs/core/alert" + "github.com/alt-research/avs/core/config" + "github.com/alt-research/avs/core/message" + "github.com/alt-research/avs/metrics" + "github.com/ethereum/go-ethereum/common" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type AggregatorGRpcClient struct { + rpcClient *rpc.Client + metrics metrics.Metrics + logger logging.Logger + config config.NodeConfig + operatorId sdktypes.OperatorId + operatorAddr common.Address + OperatorStateRetrieverAddr common.Address + RegistryCoordinatorAddr common.Address + gRPCAggregatorIpPortAddr string + timeout time.Duration +} + +func NewAggregatorGRpcClient(config config.NodeConfig, operatorId sdktypes.OperatorId, operatorAddr common.Address, logger logging.Logger, metrics metrics.Metrics) (*AggregatorGRpcClient, error) { + return &AggregatorGRpcClient{ + // set to nil so that we can create an rpc client even if the aggregator is not running + rpcClient: nil, + metrics: metrics, + logger: logger, + config: config, + operatorId: operatorId, + operatorAddr: operatorAddr, + OperatorStateRetrieverAddr: common.HexToAddress(config.OperatorStateRetrieverAddress), + RegistryCoordinatorAddr: common.HexToAddress(config.AVSRegistryCoordinatorAddress), + gRPCAggregatorIpPortAddr: config.AggregatorGRPCServerIpPortAddress, + timeout: 1 * time.Second, + }, nil +} + +// CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. +func (c *AggregatorGRpcClient) InitOperatorToAggregator() error { + conn, err := grpc.Dial( + c.gRPCAggregatorIpPortAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return fmt.Errorf("dial initOperatorToAggregator connection failed: %v", err.Error()) + } + + n := aggregator.NewAggregatorClient(conn) + nodeCtx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + request := &aggregator.InitOperatorRequest{ + Layer1ChainId: c.config.Layer1ChainId, + ChainId: c.config.Layer2ChainId, + OperatorId: c.operatorId[:], + OperatorAddress: c.operatorAddr.Hex(), + OperatorStateRetrieverAddr: c.config.OperatorStateRetrieverAddress, + RegistryCoordinatorAddr: c.config.AVSRegistryCoordinatorAddress, + } + + c.logger.Info("Init operator to aggregator", "req", fmt.Sprintf("%#v", request)) + + reply, err := n.InitOperator(nodeCtx, request) + if err != nil { + return fmt.Errorf("call initOperatorToAggregator failed: %v", err.Error()) + } + + if !reply.GetOk() { + return fmt.Errorf("initOperatorToAggregator failed: %v", reply.GetReason()) + } + + return nil + +} + +// CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. +func (c *AggregatorGRpcClient) CreateAlertTaskToAggregator(alertHash [32]byte) (*message.AlertTaskInfo, error) { + conn, err := grpc.Dial( + c.gRPCAggregatorIpPortAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("dial initOperatorToAggregator connection failed: %v", err.Error()) + } + + n := aggregator.NewAggregatorClient(conn) + nodeCtx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + request := &aggregator.CreateTaskRequest{ + AlertHash: alertHash[:], + } + + c.logger.Info("CreateAlertTask to aggregator", "req", fmt.Sprintf("%#v", request)) + + reply, err := n.CreateTask(nodeCtx, request) + if err != nil { + return nil, fmt.Errorf("call CreateAlertTask failed: %v", err.Error()) + } + + info, err := message.NewAlertTaskInfo(reply.GetInfo()) + if err != nil { + return nil, fmt.Errorf("call CreateAlertTask failed by decode alert info: %v", err.Error()) + } + + return info, nil +} + +// SendSignedTaskResponseToAggregator sends a signed task response to the aggregator. +// it is meant to be ran inside a go thread, so doesn't return anything. +// this is because sending the signed task response to the aggregator is time sensitive, +// so there is no point in retrying if it fails for a few times. +// Currently hardcoded to retry sending the signed task response 5 times, waiting 2 seconds in between each attempt. +func (c *AggregatorGRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *message.SignedTaskRespRequest, resChan chan alert.AlertResponse) { + conn, err := grpc.Dial( + c.gRPCAggregatorIpPortAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + resChan <- alert.AlertResponse{ + Err: err, + Msg: "dial initOperatorToAggregator connection failed", + } + return + } + + n := aggregator.NewAggregatorClient(conn) + nodeCtx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + request := &aggregator.SignedTaskRespRequest{ + Alert: signedTaskResponse.Alert.ToPbType(), + OperatorRequestSignature: signedTaskResponse.BlsSignature.Serialize(), + OperatorId: signedTaskResponse.OperatorId[:], + } + + c.logger.Info("CreateAlertTask to aggregator", "req", fmt.Sprintf("%#v", request)) + + response, err := n.ProcessSignedTaskResponse(nodeCtx, request) + if err != nil { + resChan <- alert.AlertResponse{ + Err: err, + Msg: "call CreateAlertTask failed", + } + return + } + + c.logger.Info("Signed task response header accepted by aggregator.", "response", fmt.Sprintf("%#v", response)) + + res := alert.AlertResponse{ + Code: 0, + TaskIndex: signedTaskResponse.Alert.TaskIndex, + } + copy(res.TxHash[:], response.GetTxHash()[:32]) + + c.logger.Info("Signed task resp", "response", res) + c.metrics.IncNumTasksAcceptedByAggregator() + + resChan <- res +} diff --git a/operator/operator.go b/operator/operator.go index 4d98e24..4fdc8e6 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -27,7 +27,6 @@ import ( "github.com/Layr-Labs/eigensdk-go/chainio/txmgr" "github.com/Layr-Labs/eigensdk-go/crypto/bls" sdkEcdsa "github.com/Layr-Labs/eigensdk-go/crypto/ecdsa" - "github.com/Layr-Labs/eigensdk-go/logging" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" sdkmetrics "github.com/Layr-Labs/eigensdk-go/metrics" "github.com/Layr-Labs/eigensdk-go/metrics/collectors/economic" @@ -42,7 +41,7 @@ const SEM_VER = "0.0.1" type Operator struct { config config.NodeConfig - logger logging.Logger + logger sdklogging.Logger ethClient eth.Client metricsReg *prometheus.Registry metrics metrics.Metrics @@ -114,6 +113,11 @@ func withEnvConfig(c config.NodeConfig) config.NodeConfig { c.AggregatorServerIpPortAddress = aggregatorServerIpPortAddress } + grpcAggregatorServerIpPortAddress, ok := os.LookupEnv("AGGREGATOR_GRPC_SERVER_URL") + if ok && grpcAggregatorServerIpPortAddress != "" { + c.AggregatorGRPCServerIpPortAddress = grpcAggregatorServerIpPortAddress + } + eigenMetricsIpPortAddress, ok := os.LookupEnv("EIGEN_METRICS_URL") if ok && eigenMetricsIpPortAddress != "" { c.EigenMetricsIpPortAddress = eigenMetricsIpPortAddress @@ -193,7 +197,7 @@ func withEnvConfig(c config.NodeConfig) config.NodeConfig { // // take the config in core (which is shared with aggregator and challenger) func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) { - var logLevel logging.LogLevel + var logLevel sdklogging.LogLevel if cfg.Production { logLevel = sdklogging.Production } else { @@ -328,10 +332,28 @@ func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) { return nil, err } - aggregatorRpcClient, err := NewAggregatorRpcClient(c, operatorId, operatorAddress, logger, avsAndEigenMetrics) - if err != nil { - logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err) - return nil, err + var aggregatorRpcClient AggregatorRpcClienter + + if c.AggregatorGRPCServerIpPortAddress != "" { + logger.Info("Use grpc server to connect to the aggregator", "address", c.AggregatorGRPCServerIpPortAddress) + + cli, err := NewAggregatorGRpcClient(c, operatorId, operatorAddress, logger, avsAndEigenMetrics) + if err != nil { + logger.Error("Cannot create AggregatorGRpcClient. Is aggregator running?", "err", err) + return nil, err + } + + aggregatorRpcClient = cli + } else { + logger.Info("Use legacy rpc server to connect to the aggregator", "address", c.AggregatorServerIpPortAddress) + + cli, err := NewAggregatorRpcClient(c, operatorId, operatorAddress, logger, avsAndEigenMetrics) + if err != nil { + logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err) + return nil, err + } + + aggregatorRpcClient = cli } newTaskCreatedChan := make(chan alert.AlertRequest, 32) @@ -412,7 +434,12 @@ func (o *Operator) Start(ctx context.Context) error { o.logger.Error("Error start Rpc server", "err", err) return err } - defer o.rpcServer.Stop() + defer func() { + err := o.rpcServer.Stop() + if err != nil { + o.logger.Error("Stop Rpc server failed", "err", err) + } + }() for { select { diff --git a/operator/registration.go b/operator/registration.go index b260594..efee234 100644 --- a/operator/registration.go +++ b/operator/registration.go @@ -14,19 +14,18 @@ import ( "math/big" "time" + "github.com/alt-research/avs/core" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/Layr-Labs/eigensdk-go/crypto/bls" - eigenSdkTypes "github.com/Layr-Labs/eigensdk-go/types" sdktypes "github.com/Layr-Labs/eigensdk-go/types" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" ) func (o *Operator) RegisterOperatorWithEigenlayer() error { - op := eigenSdkTypes.Operator{ + op := sdktypes.Operator{ Address: o.operatorAddr.String(), EarningsReceiverAddress: o.operatorAddr.String(), MetadataUrl: o.metadataURI, @@ -83,10 +82,7 @@ func (o *Operator) RegisterOperatorWithAvs( "operatorToAvsRegistrationSigExpiry", operatorToAvsRegistrationSigExpiry, ) - quorumNumbersToSDK := make([]sdktypes.QuorumNum, len(quorumNumbers)) - for i, _ := range quorumNumbers { - quorumNumbersToSDK[i] = sdktypes.QuorumNum(uint8(quorumNumbers[i])) - } + quorumNumbersToSDK := core.ConvertQuorumNumbersFromBytes(quorumNumbers) _, err = o.avsWriter.RegisterOperatorInQuorumWithAVSRegistryCoordinator( context.Background(), @@ -113,10 +109,7 @@ func (o *Operator) DeregisterOperatorWithAvs() error { "operatorAddr", operatorAddr, ) - quorumNumbersToSDK := make([]sdktypes.QuorumNum, len(quorumNumbers)) - for i, _ := range quorumNumbers { - quorumNumbersToSDK[i] = sdktypes.QuorumNum(uint8(quorumNumbers[i])) - } + quorumNumbersToSDK := core.ConvertQuorumNumbersFromBytes(quorumNumbers) _, err := o.avsWriter.DeregisterOperator( context.Background(), @@ -179,10 +172,3 @@ func (o *Operator) PrintOperatorStatus() error { fmt.Println(string(operatorStatusJson)) return nil } - -func pubKeyG1ToBN254G1Point(p *bls.G1Point) regcoord.BN254G1Point { - return regcoord.BN254G1Point{ - X: p.X.BigInt(new(big.Int)), - Y: p.Y.BigInt(new(big.Int)), - } -} diff --git a/operator/rpc_client.go b/operator/rpc_client.go index 780dada..963196c 100644 --- a/operator/rpc_client.go +++ b/operator/rpc_client.go @@ -112,7 +112,7 @@ func (c *AggregatorRpcClient) InitOperatorToAggregator() error { } c.logger.Errorf("Could not send init operator to aggregator. Tried 5 times.") - return fmt.Errorf("Could not send init operator to aggregator") + return fmt.Errorf("could not send init operator to aggregator") } // CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. @@ -158,7 +158,7 @@ func (c *AggregatorRpcClient) CreateAlertTaskToAggregator(alertHash [32]byte) (* } c.logger.Errorf("Could not create task to aggregator. Tried 5 times.") - return nil, fmt.Errorf("Could not create task to aggregator") + return nil, fmt.Errorf("could not create task to aggregator") } // SendSignedTaskResponseToAggregator sends a signed task response to the aggregator. @@ -216,6 +216,6 @@ func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskRespo c.logger.Errorf("Could not send signed task response to aggregator. Tried 5 times.") resChan <- alert.AlertResponse{ - Err: fmt.Errorf("Could not send signed task response to aggregator by %v.", err), + Err: fmt.Errorf("could not send signed task response to aggregator by %v", err), } }