diff --git a/internal/bridge/chain/btc.go b/internal/bridge/chain/btc.go index a12f324..8de3385 100644 --- a/internal/bridge/chain/btc.go +++ b/internal/bridge/chain/btc.go @@ -2,6 +2,7 @@ package chain import ( "fmt" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/rpcclient" @@ -9,6 +10,7 @@ import ( "github.com/pkg/errors" "gitlab.com/distributed_lab/figure/v3" "reflect" + "strings" ) type Bitcoin struct { @@ -33,6 +35,19 @@ func (c Chain) Bitcoin() Bitcoin { panic(errors.Wrap(err, "failed to init bitcoin chain rpc")) } + // ensuring wallet is properly configured + _, err := chain.Rpc.GetWalletInfo() + if err != nil { + if strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotFound)) { + if _, err := chain.Rpc.LoadWallet(chain.Wallet); err != nil { + panic(errors.Wrap(err, "failed to load wallet")) + } + } + if strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotSpecified)) { + panic("wallet not specified in the URL") + } + } + var receivers []string if err := figure.Out(&receivers).FromInterface(c.BridgeAddresses).With(figure.BaseHooks).Please(); err != nil { panic(errors.Wrap(err, "failed to decode bitcoin receivers")) diff --git a/internal/bridge/core/config/config.go b/internal/bridge/core/config/config.go index f20465a..5d591b7 100644 --- a/internal/bridge/core/config/config.go +++ b/internal/bridge/core/config/config.go @@ -62,11 +62,13 @@ func (c *configurer) CoreConnectorConfig() ConnectorConfig { } connectSecurityOptions = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } + keepaliveOptions := grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 20 * time.Second, // wait time before ping if no activity + Timeout: 5 * time.Second, // ping timeout + PermitWithoutStream: true, + }) - client, err := grpc.Dial(cfg.Connection.Addr, connectSecurityOptions, grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, // wait time before ping if no activity - Timeout: 20 * time.Second, // ping timeout - })) + client, err := grpc.NewClient(cfg.Connection.Addr, connectSecurityOptions, keepaliveOptions) if err != nil { panic(errors.Wrap(err, "failed to connect to core via gRPC")) } diff --git a/internal/bridge/core/submit_tx.go b/internal/bridge/core/submit_tx.go index 7e5b536..152b039 100644 --- a/internal/bridge/core/submit_tx.go +++ b/internal/bridge/core/submit_tx.go @@ -4,6 +4,7 @@ import ( bridgetypes "github.com/hyle-team/bridgeless-core/x/bridge/types" "github.com/hyle-team/bridgeless-signer/internal/bridge/types" "github.com/pkg/errors" + "strings" ) func (c *Connector) SubmitDeposits(depositTxs ...bridgetypes.Transaction) error { @@ -12,11 +13,13 @@ func (c *Connector) SubmitDeposits(depositTxs ...bridgetypes.Transaction) error } msg := bridgetypes.NewMsgSubmitTransactions(c.settings.Account.CosmosAddress(), depositTxs...) - if err := c.submitMsgs(msg); err != nil { - if errors.Is(err, bridgetypes.ErrTranscationAlreadySubmitted.GRPCStatus().Err()) { - return types.ErrTransactionAlreadySubmitted - } + err := c.submitMsgs(msg) + if err == nil { + return nil + } + if strings.Contains(err.Error(), bridgetypes.ErrTranscationAlreadySubmitted.Error()) { + return types.ErrTransactionAlreadySubmitted } - return nil + return errors.Wrap(err, "failed to submit deposits") } diff --git a/internal/bridge/processor/main.go b/internal/bridge/processor/main.go index 1c618f0..8187f8a 100644 --- a/internal/bridge/processor/main.go +++ b/internal/bridge/processor/main.go @@ -20,7 +20,6 @@ func New( db data.DepositsQ, signer *signer.Signer, core bridgeTypes.Bridger, - ) *Processor { return &Processor{proxies: proxies, db: db, signer: signer, core: core} } diff --git a/internal/bridge/processor/submit_transaction.go b/internal/bridge/processor/submit_transaction.go index bb5ef82..9c7d508 100644 --- a/internal/bridge/processor/submit_transaction.go +++ b/internal/bridge/processor/submit_transaction.go @@ -36,11 +36,14 @@ func (p *Processor) ProcessSubmitTransactions(reqs ...SubmitTransactionRequest) return errors.Wrap(tmperr, "failed to set deposits submitted") } - return errors.Wrap(p.core.SubmitDeposits(depositTxs...), "failed to submit deposits") + err = p.core.SubmitDeposits(depositTxs...) + if errors.Is(err, bridgeTypes.ErrTransactionAlreadySubmitted) { + // ignoring already submitted transaction + return nil + } + + return errors.Wrap(err, "failed to submit deposits") }) - if errors.Is(err, bridgeTypes.ErrTransactionAlreadySubmitted) { - return false, err - } return err != nil, err } diff --git a/internal/bridge/processor/types.go b/internal/bridge/processor/types.go index c803412..8d8b208 100644 --- a/internal/bridge/processor/types.go +++ b/internal/bridge/processor/types.go @@ -12,20 +12,24 @@ type WithdrawalRequest struct { Data data.DepositData } +func (r WithdrawalRequest) Id() int64 { + return r.DepositDbId +} + type GetDepositRequest struct { DepositDbId int64 DepositIdentifier data.DepositIdentifier } +func (r GetDepositRequest) Id() int64 { return r.DepositDbId } + type ZanoSignedWithdrawalRequest struct { DepositDbId int64 Data data.DepositData Transaction zano.SignedTransaction } -func (r WithdrawalRequest) Id() int64 { - return r.DepositDbId -} +func (r ZanoSignedWithdrawalRequest) Id() int64 { return r.DepositDbId } type SubmitTransactionRequest struct { DepositDbId int64 diff --git a/internal/bridge/proxy/btc/withdraw.go b/internal/bridge/proxy/btc/withdraw.go index 49acd33..21890be 100644 --- a/internal/bridge/proxy/btc/withdraw.go +++ b/internal/bridge/proxy/btc/withdraw.go @@ -1,13 +1,10 @@ package btc import ( - "fmt" - "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/btcutil" bridgeTypes "github.com/hyle-team/bridgeless-signer/internal/bridge" "github.com/pkg/errors" "math/big" - "strings" ) var minWithdrawAmount = big.NewInt(minSatoshisPerOutput) @@ -34,10 +31,6 @@ func (p *proxy) SendBitcoins(data map[string]*big.Int) (string, error) { amounts[addr] = btcutil.Amount(value) } - if err := p.ensureWalletLoaded(); err != nil { - return "", errors.Wrap(err, "failed to ensure wallet loaded") - } - hash, err := p.chain.Rpc.SendMany("", amounts) if err != nil { return "", errors.Wrap(err, "failed to send transaction") @@ -53,20 +46,3 @@ func (p *proxy) WithdrawalAmountValid(amount *big.Int) bool { return true } - -func (p *proxy) ensureWalletLoaded() error { - info, err := p.chain.Rpc.GetWalletInfo() - if err != nil { - if !strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotFound)) { - return errors.Wrap(err, "failed to get wallet info") - } - } else { - if info.WalletName == p.chain.Wallet { - return nil - } - } - - _, err = p.chain.Rpc.LoadWallet(p.chain.Wallet) - - return errors.Wrap(err, "failed to load wallet") -} diff --git a/internal/cli/main.go b/internal/cli/main.go index aa9f8be..f685fa2 100644 --- a/internal/cli/main.go +++ b/internal/cli/main.go @@ -2,12 +2,9 @@ package cli import ( "context" - "github.com/pkg/errors" - "os/signal" - "syscall" - "github.com/alecthomas/kingpin" "github.com/hyle-team/bridgeless-signer/internal/config" + "github.com/pkg/errors" "gitlab.com/distributed_lab/kit/kv" "gitlab.com/distributed_lab/logan/v3" ) @@ -41,28 +38,26 @@ func Run(args []string) bool { return false } - ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) - switch cmd { case serviceCmd.FullCommand(): - err = RunService(ctx, cfg) + err = RunService(cfg) case migrateUpCmd.FullCommand(): err = MigrateUp(cfg) case migrateDownCmd.FullCommand(): err = MigrateDown(cfg) - // handle any custom commands here in the same way default: log.Errorf("unknown command %s", cmd) return false } if err != nil { if errors.Is(err, context.Canceled) { - log.Info("service stopped: context was canceled") + log.Info("service got signal to stop") return true } - log.WithError(err).Error("failed to exec cmd") + log.Error(err) return false } + return true } diff --git a/internal/cli/service.go b/internal/cli/service.go index 0252dc6..9b29eb5 100644 --- a/internal/cli/service.go +++ b/internal/cli/service.go @@ -4,7 +4,12 @@ import ( "context" coreConnector "github.com/hyle-team/bridgeless-signer/internal/bridge/core" "github.com/hyle-team/bridgeless-signer/internal/bridge/proxy" + rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" + amqp "github.com/rabbitmq/amqp091-go" + "os" + "os/signal" "sync" + "syscall" bridgeProcessor "github.com/hyle-team/bridgeless-signer/internal/bridge/processor" "github.com/hyle-team/bridgeless-signer/internal/config" @@ -14,12 +19,15 @@ import ( "github.com/pkg/errors" ) -func RunService(ctx context.Context, cfg config.Config) error { +func RunService(cfg config.Config) error { var ( wg = sync.WaitGroup{} coreCfg = cfg.CoreConnectorConfig() coreConn = coreConnector.NewConnector(coreCfg.Connection, coreCfg.Settings) rabbitCfg = cfg.RabbitMQConfig() + + rabbitConnChan = rabbitCfg.Connection.NotifyClose(make(chan *amqp.Error, 1)) + ctx = appContext(rabbitConnChan) ) proxiesRepo, err := proxy.NewProxiesRepository(cfg.Chains(), cfg.Log()) @@ -39,5 +47,27 @@ func RunService(ctx context.Context, cfg config.Config) error { wg.Wait() - return ctx.Err() + return context.Cause(ctx) +} + +func appContext(rabbit <-chan *amqp.Error) context.Context { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + + ctx, cancel := context.WithCancelCause(context.Background()) + + go func() { + select { + case <-sig: + cancel(nil) + case err, ok := <-rabbit: + if ok { + cancel(rabbitTypes.ErrConnectionClosed) + } else { + cancel(err) + } + } + }() + + return ctx } diff --git a/internal/core/api/server/main.go b/internal/core/api/server/main.go index c0699a8..40ffaa9 100644 --- a/internal/core/api/server/main.go +++ b/internal/core/api/server/main.go @@ -62,7 +62,7 @@ func (s *server) RunGRPC(ctx context.Context) error { srv := s.grpcServer() // graceful shutdown - go func() { <-ctx.Done(); srv.GracefulStop(); s.logger.Info("grpc serving stopped") }() + go func() { <-ctx.Done(); srv.GracefulStop(); s.logger.Info("grpc serving stopped: context canceled") }() s.logger.Info("grpc serving started") return srv.Serve(s.grpc) @@ -79,7 +79,7 @@ func (s *server) RunHTTP(ctxt context.Context) error { if err := srv.Shutdown(shutdownDeadline); err != nil { s.logger.WithError(err).Error("failed to shutdown http server") } - s.logger.Info("http serving stopped") + s.logger.Info("http serving stopped: context canceled") }() s.logger.Info("http serving started") diff --git a/internal/core/main.go b/internal/core/main.go index f47b158..7eda810 100644 --- a/internal/core/main.go +++ b/internal/core/main.go @@ -22,11 +22,6 @@ const ( componentConsumer = "consumer" ) -type baseConsumer struct { - deliveryProcessor rabbitTypes.DeliveryProcessor - prefix string -} - // RunConsumers runs consumers for all queues. func RunConsumers( ctx context.Context, @@ -36,57 +31,69 @@ func RunConsumers( processor *bridgeProcessor.Processor, ) { var ( - logger = cfg.Log() - rabbitCfg = cfg.RabbitMQConfig() - consumersMap = map[string]baseConsumer{ - rabbitTypes.GetDepositQueue: { - deliveryProcessor: consumerProcessors.NewGetDepositHandler(processor, producer), - prefix: consumer.GetDepositConsumerPrefix, - }, - rabbitTypes.EthSignWithdrawalQueue: { - deliveryProcessor: consumerProcessors.NewEthereumSignWithdrawalHandler(processor, producer), - prefix: consumer.EthSignWithdrawalConsumerPrefix, - }, - rabbitTypes.ZanoSignWithdrawalQueue: { - deliveryProcessor: consumerProcessors.NewZanoSignWithdrawalHandler(processor, producer), - prefix: consumer.ZanoSignWithdrawalConsumerPrefix, - }, - rabbitTypes.ZanoSendWithdrawalQueue: { - deliveryProcessor: consumerProcessors.NewZanoSendWithdrawalHandler(processor, producer), - prefix: consumer.ZanoSendWithdrawalConsumerPrefix, - }, - } + logger = cfg.Log() + rabbitCfg = cfg.RabbitMQConfig() ) - for i := uint(0); i < rabbitCfg.ConsumerInstances; i++ { + for i := uint(0); i < rabbitCfg.BaseConsumerInstances; i++ { idx := i + 1 - for queue, consumerCfg := range consumersMap { - wg.Add(1) - go func(id uint, queue string, consumerCfg baseConsumer) { - defer wg.Done() - consumerName := consumer.GetName(consumerCfg.prefix, id) - cns := consumer.NewBase( - rabbitCfg.NewChannel(), - consumerName, - logger. - WithField(serviceComponent, componentConsumer). - WithField(componentPart, consumerName), - consumerCfg.deliveryProcessor, - producer, - ) + // initializing new instances per loop + baseConsumers := map[string]rabbitTypes.Consumer{ + rabbitTypes.GetDepositQueue: consumer.NewBase[bridgeProcessor.GetDepositRequest]( + rabbitCfg.NewChannel(), + consumer.GetName(consumer.GetDepositConsumerPrefix, idx), + logger. + WithField(serviceComponent, componentConsumer). + WithField(componentPart, consumer.GetName(consumer.GetDepositConsumerPrefix, idx)), + consumerProcessors.NewGetDepositHandler(processor, producer), + producer, + ), + rabbitTypes.EthSignWithdrawalQueue: consumer.NewBase[bridgeProcessor.WithdrawalRequest]( + rabbitCfg.NewChannel(), + consumer.GetName(consumer.EthSignWithdrawalConsumerPrefix, idx), + logger. + WithField(serviceComponent, componentConsumer). + WithField(componentPart, consumer.GetName(consumer.EthSignWithdrawalConsumerPrefix, idx)), + consumerProcessors.NewEthereumSignWithdrawalHandler(processor, producer), + producer, + ), + rabbitTypes.ZanoSignWithdrawalQueue: consumer.NewBase[bridgeProcessor.WithdrawalRequest]( + rabbitCfg.NewChannel(), + consumer.GetName(consumer.ZanoSignWithdrawalConsumerPrefix, idx), + logger. + WithField(serviceComponent, componentConsumer). + WithField(componentPart, consumer.GetName(consumer.ZanoSignWithdrawalConsumerPrefix, idx)), + consumerProcessors.NewZanoSignWithdrawalHandler(processor, producer), + producer, + ), + rabbitTypes.ZanoSendWithdrawalQueue: consumer.NewBase[bridgeProcessor.ZanoSignedWithdrawalRequest]( + rabbitCfg.NewChannel(), + consumer.GetName(consumer.ZanoSendWithdrawalConsumerPrefix, idx), + logger. + WithField(serviceComponent, componentConsumer). + WithField(componentPart, consumer.GetName(consumer.ZanoSendWithdrawalConsumerPrefix, idx)), + consumerProcessors.NewZanoSendWithdrawalHandler(processor, producer), + producer, + ), + } + + wg.Add(len(baseConsumers)) + + for queue, cns := range baseConsumers { + go func(cns rabbitTypes.Consumer, queue string) { + defer wg.Done() if err := cns.Consume(ctx, queue); err != nil { - logger.WithError(err).Error(fmt.Sprintf("failed to consume for %s", consumerName)) + logger.WithError(err).Error(fmt.Sprintf("failed to consume for %s", cns.Name())) } - }(idx, queue, consumerCfg) + + }(cns, queue) } } - wg.Add(2) - go func() { - defer wg.Done() - cns := consumer.NewBatch[bridgeProcessor.SubmitTransactionRequest]( + batchConsumers := map[string]rabbitTypes.Consumer{ + rabbitTypes.SubmitTransactionQueue: consumer.NewBatch[bridgeProcessor.SubmitTransactionRequest]( rabbitCfg.NewChannel(), consumer.SubmitTransactionConsumerPrefix, logger. @@ -95,14 +102,8 @@ func RunConsumers( consumerProcessors.NewSubmitTransactionHandler(processor), producer, rabbitCfg.TxSubmitterOpts, - ) - if err := cns.Consume(ctx, rabbitTypes.SubmitTransactionQueue); err != nil { - logger.WithError(err).Error(fmt.Sprintf("failed to consume for %s", consumer.SubmitTransactionConsumerPrefix)) - } - }() - go func() { - defer wg.Done() - cns := consumer.NewBatch[bridgeProcessor.WithdrawalRequest]( + ), + rabbitTypes.BtcSendWithdrawalQueue: consumer.NewBatch[bridgeProcessor.WithdrawalRequest]( rabbitCfg.NewChannel(), consumer.BitcoinSendWithdrawalConsumerPrefix, logger. @@ -111,12 +112,21 @@ func RunConsumers( consumerProcessors.NewBitcoinSendWithdrawalHandler(processor, producer), producer, rabbitCfg.BitcoinSubmitterOpts, - ) - if err := cns.Consume(ctx, rabbitTypes.BtcSendWithdrawalQueue); err != nil { - logger.WithError(err).Error(fmt.Sprintf("failed to consume for %s", consumer.BitcoinSendWithdrawalConsumerPrefix)) - } - }() + ), + } + + wg.Add(len(batchConsumers)) + + for queue, cns := range batchConsumers { + go func(cns rabbitTypes.Consumer, queue string) { + defer wg.Done() + if err := cns.Consume(ctx, queue); err != nil { + logger.WithError(err).Error(fmt.Sprintf("failed to consume for %s", cns.Name())) + } + + }(cns, queue) + } } func RunServer( diff --git a/internal/core/rabbitmq/config/types.go b/internal/core/rabbitmq/config/types.go index a4123a2..670d098 100644 --- a/internal/core/rabbitmq/config/types.go +++ b/internal/core/rabbitmq/config/types.go @@ -9,11 +9,11 @@ import ( ) type Config struct { - Connection *amqp.Connection `fig:"url,required"` - ConsumerInstances uint `fig:"consumer_instances,required"` - ResendParams ResendParams `fig:"resend_params,required"` - TxSubmitterOpts BatchConsumingOpts `fig:"tx_submitter,required"` - BitcoinSubmitterOpts BatchConsumingOpts `fig:"bitcoin_submitter,required"` + Connection *amqp.Connection `fig:"url,required"` + BaseConsumerInstances uint `fig:"base_consumer_instances,required"` + ResendParams ResendParams `fig:"resend_params,required"` + TxSubmitterOpts BatchConsumingOpts `fig:"tx_submitter,required"` + BitcoinSubmitterOpts BatchConsumingOpts `fig:"bitcoin_submitter,required"` } type BatchConsumingOpts struct { @@ -31,8 +31,8 @@ func (c *Config) Validate() error { return errors.New("delays should not be empty") } - if c.ConsumerInstances == 0 { - c.ConsumerInstances = uint(runtime.NumCPU()) + if c.BaseConsumerInstances == 0 { + c.BaseConsumerInstances = uint(runtime.NumCPU()) } if c.ResendParams.MaxRetryCount == 0 { diff --git a/internal/core/rabbitmq/consumer/base.go b/internal/core/rabbitmq/consumer/base.go index 39498d5..ef83fae 100644 --- a/internal/core/rabbitmq/consumer/base.go +++ b/internal/core/rabbitmq/consumer/base.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "encoding/json" rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" @@ -15,22 +16,22 @@ const ( ZanoSendWithdrawalConsumerPrefix = "zano_send_withdrawal_consumer" ) -type BaseConsumer struct { +type BaseConsumer[T rabbitTypes.Identifiable] struct { channel *amqp.Channel name string logger *logan.Entry - deliveryProcessor rabbitTypes.DeliveryProcessor + deliveryProcessor rabbitTypes.RequestProcessor[T] deliveryResender rabbitTypes.DeliveryResender } -func NewBase( +func NewBase[T rabbitTypes.Identifiable]( channel *amqp.Channel, name string, logger *logan.Entry, - deliveryProcessor rabbitTypes.DeliveryProcessor, + deliveryProcessor rabbitTypes.RequestProcessor[T], deliveryResender rabbitTypes.DeliveryResender, ) rabbitTypes.Consumer { - return &BaseConsumer{ + return &BaseConsumer[T]{ channel: channel, name: name, logger: logger, @@ -39,7 +40,7 @@ func NewBase( } } -func (c *BaseConsumer) Consume(ctx context.Context, queue string) error { +func (c *BaseConsumer[T]) Consume(ctx context.Context, queue string) error { deliveries, err := c.channel.Consume(queue, c.name, false, false, false, false, nil) if err != nil { return errors.Wrap(err, "failed to get consumer channel") @@ -50,48 +51,56 @@ func (c *BaseConsumer) Consume(ctx context.Context, queue string) error { for { select { case <-ctx.Done(): - c.logger.Info("consuming stopped") + c.logger.Info("consuming stopped: context canceled") + return errors.Wrap(c.channel.Close(), "failed to close channel") case delivery, ok := <-deliveries: if !ok { + c.logger.Info("consuming stopped: delivery channel closed") + return nil } logger := c.logger.WithField("delivery_tag", delivery.DeliveryTag) logger.Debug("delivery received") - reprocessable, callback, err := c.deliveryProcessor.ProcessDelivery(delivery) + var request T + if err = json.Unmarshal(delivery.Body, &request); err != nil { + logger.WithError(err).Error("failed to unmarshal delivery body") + nack(logger, delivery, false) + continue + } + + reprocessable, err := c.deliveryProcessor.ProcessRequest(request) if err == nil { ack(logger, delivery) continue } - logger.WithError(err).Error("failed to process delivery") + nack(logger, delivery, false) + logger.WithError(err).Error("failed to process request") if !reprocessable { - logger.Debug("delivery is not reprocessable") - nack(logger, delivery, false) + logger.Debug("request is not reprocessable") continue } if err = c.deliveryResender.ResendDelivery(queue, delivery); err == nil { logger.Debug("delivery resent") - ack(logger, delivery) continue } - - if errors.Is(err, rabbitTypes.ErrorMaxResendReached) { - logger.Debug(err.Error()) - if callback != nil { - if err := callback(); err != nil { - logger.WithError(err).Error("failed to call reprocess fail callback") - } - } - - nack(logger, delivery, false) + if errors.Is(err, rabbitTypes.ErrMaxResendReached) { + logger.Debug(rabbitTypes.ErrMaxResendReached) } else { logger.WithError(err).Error("failed to resend delivery") - nack(logger, delivery, true) + } + + if err = c.deliveryProcessor.ReprocessFailedCallback(request); err != nil { + logger.WithError(err).Error("failed to execute failed reprocessing callback") } } } } + +func (c *BaseConsumer[T]) Name() string { + return c.name +} diff --git a/internal/core/rabbitmq/consumer/batch.go b/internal/core/rabbitmq/consumer/batch.go index 97f14f4..94467b7 100644 --- a/internal/core/rabbitmq/consumer/batch.go +++ b/internal/core/rabbitmq/consumer/batch.go @@ -68,20 +68,15 @@ func (c *BatchConsumer[T]) Consume(ctx context.Context, queue string) error { for { select { case <-ctx.Done(): - if len(c.batch) != 0 { - // return ack-ed deliveries to the sender in case of context cancellation - c.logger.Info("resending ack-ed deliveries") - for _, entry := range c.batch { - _ = c.deliveryResender.ResendDelivery(queue, entry.Delivery) - } - } - - c.logger.Info("consuming stopped") - c.batch = c.batch[:0] + c.logger.Info("consuming stopped: context canceled") + c.processBatch(queue) return errors.Wrap(c.channel.Close(), "failed to close channel") case delivery, ok := <-deliveries: if !ok { + c.logger.Info("consuming stopped: delivery channel closed") + c.processBatch(queue) + return nil } @@ -123,7 +118,7 @@ func (c *BatchConsumer[T]) processBatch(queue string) { entryBatch[i] = entry.Entry } - reprocessable, callback, err := c.batchProcessor.ProcessBatch(entryBatch) + reprocessable, err := c.batchProcessor.ProcessBatch(entryBatch) if err == nil { logger.Debug("batch processed") return @@ -135,7 +130,7 @@ func (c *BatchConsumer[T]) processBatch(queue string) { return } - var callbackIds []int64 + var callbackRequests []T for _, entry := range c.batch { // shadowing original logger logger := logger.WithField("delivery_tag", entry.Delivery.DeliveryTag) @@ -145,16 +140,22 @@ func (c *BatchConsumer[T]) processBatch(queue string) { logger.Debug("delivery resent") continue } - - logger.WithError(err).Error("failed to resend delivery") - if errors.Is(err, rabbitTypes.ErrorMaxResendReached) { - callbackIds = append(callbackIds, entry.Entry.Id()) + if errors.Is(err, rabbitTypes.ErrMaxResendReached) { + logger.Debug(rabbitTypes.ErrMaxResendReached) + } else { + logger.WithError(err).Error("failed to resend delivery") } + + callbackRequests = append(callbackRequests, entry.Entry) } - if callback != nil && len(callbackIds) > 0 { - if err = callback(callbackIds...); err != nil { - logger.WithField("callback_ids", callbackIds).WithError(err).Error("failed to set batch status failed") + if len(callbackRequests) > 0 { + if err = c.batchProcessor.ReprocessFailedCallback(callbackRequests); err != nil { + logger.WithError(err).Error("failed to execute failed reprocessing callback") } } } + +func (c *BatchConsumer[T]) Name() string { + return c.name +} diff --git a/internal/core/rabbitmq/consumer/processors/btc_send_withdrawal.go b/internal/core/rabbitmq/consumer/processors/btc_send_withdrawal.go index 77f8ced..db8f629 100644 --- a/internal/core/rabbitmq/consumer/processors/btc_send_withdrawal.go +++ b/internal/core/rabbitmq/consumer/processors/btc_send_withdrawal.go @@ -21,29 +21,34 @@ func NewBitcoinSendWithdrawalHandler( } } -func (h *BitcoinSendWithdrawalHandler) ProcessBatch(batch []processor.WithdrawalRequest) (reprocessable bool, rprFailCallback func(ids ...int64) error, err error) { +func (h BitcoinSendWithdrawalHandler) ProcessBatch(batch []processor.WithdrawalRequest) (reprocessable bool, err error) { if len(batch) == 0 { - return false, nil, nil - } - - rprFailCallback = func(ids ...int64) error { - return errors.Wrap( - h.processor.SetWithdrawStatusFailed(ids...), - "failed to set withdraw status failed", - ) + return false, nil } reprocessable, err = h.processor.ProcessSendBitcoinWithdrawals(batch...) if err != nil { - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process send bitcoin withdrawal request") + return reprocessable, errors.Wrap(err, "failed to process send bitcoin withdrawal request") } for _, entry := range batch { submitTxReq := processor.SubmitTransactionRequest{DepositDbId: entry.DepositDbId} if err = h.publisher.PublishSubmitTransactionRequest(submitTxReq); err != nil { - return true, rprFailCallback, errors.Wrap(err, "failed to send submit transaction request") + return true, errors.Wrap(err, "failed to send submit transaction request") } } - return false, nil, nil + return false, nil +} + +func (h BitcoinSendWithdrawalHandler) ReprocessFailedCallback(batch []processor.WithdrawalRequest) error { + ids := make([]int64, len(batch)) + for i, req := range batch { + ids[i] = req.DepositDbId + } + + return errors.Wrap( + h.processor.SetWithdrawStatusFailed(ids...), + "failed to set withdraw status failed", + ) } diff --git a/internal/core/rabbitmq/consumer/processors/eth_sign_withdrawal.go b/internal/core/rabbitmq/consumer/processors/eth_sign_withdrawal.go index 8866c6c..4b92a4e 100644 --- a/internal/core/rabbitmq/consumer/processors/eth_sign_withdrawal.go +++ b/internal/core/rabbitmq/consumer/processors/eth_sign_withdrawal.go @@ -1,12 +1,9 @@ package processors import ( - "encoding/json" - "github.com/hyle-team/bridgeless-signer/internal/bridge/processor" rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" ) type EthereumSignWithdrawalHandler struct { @@ -17,34 +14,29 @@ type EthereumSignWithdrawalHandler struct { func NewEthereumSignWithdrawalHandler( processor *processor.Processor, publisher rabbitTypes.Producer, -) rabbitTypes.DeliveryProcessor { +) rabbitTypes.RequestProcessor[processor.WithdrawalRequest] { return &EthereumSignWithdrawalHandler{ processor: processor, publisher: publisher, } } -func (h *EthereumSignWithdrawalHandler) ProcessDelivery(delivery amqp.Delivery) (reprocessable bool, rprFailCallback func() error, err error) { - var request processor.WithdrawalRequest - if err = json.Unmarshal(delivery.Body, &request); err != nil { - return false, nil, errors.Wrap(err, "failed to unmarshal delivery body") - } - - rprFailCallback = func() error { - return errors.Wrap( - h.processor.SetWithdrawStatusFailed(request.DepositDbId), - "failed to set withdraw status failed", - ) - } - +func (h EthereumSignWithdrawalHandler) ProcessRequest(request processor.WithdrawalRequest) (reprocessable bool, err error) { submitReq, reprocessable, err := h.processor.ProcessEthSignWithdrawalRequest(request) if err != nil { - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process eth sign withdrawal request") + return reprocessable, errors.Wrap(err, "failed to process eth sign withdrawal request") } if err = h.publisher.PublishSubmitTransactionRequest(*submitReq); err != nil { - return true, rprFailCallback, errors.Wrap(err, "failed to send submit withdraw request") + return true, errors.Wrap(err, "failed to send submit withdraw request") } - return false, nil, nil + return false, nil +} + +func (h EthereumSignWithdrawalHandler) ReprocessFailedCallback(request processor.WithdrawalRequest) error { + return errors.Wrap( + h.processor.SetWithdrawStatusFailed(request.DepositDbId), + "failed to set withdraw status failed", + ) } diff --git a/internal/core/rabbitmq/consumer/processors/get_deposit.go b/internal/core/rabbitmq/consumer/processors/get_deposit.go index 68ae170..8621d75 100644 --- a/internal/core/rabbitmq/consumer/processors/get_deposit.go +++ b/internal/core/rabbitmq/consumer/processors/get_deposit.go @@ -1,13 +1,11 @@ package processors import ( - "encoding/json" "fmt" "github.com/hyle-team/bridgeless-signer/internal/bridge/processor" bridgeTypes "github.com/hyle-team/bridgeless-signer/internal/bridge/types" rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" ) type GetDepositHandler struct { @@ -18,29 +16,17 @@ type GetDepositHandler struct { func NewGetDepositHandler( processor *processor.Processor, publisher rabbitTypes.Producer, -) rabbitTypes.DeliveryProcessor { +) rabbitTypes.RequestProcessor[processor.GetDepositRequest] { return &GetDepositHandler{ processor: processor, publisher: publisher, } } -func (h *GetDepositHandler) ProcessDelivery(delivery amqp.Delivery) (reprocessable bool, rprFailCallback func() error, err error) { - var request processor.GetDepositRequest - if err = json.Unmarshal(delivery.Body, &request); err != nil { - return false, nil, errors.Wrap(err, "failed to unmarshal delivery body") - } - - rprFailCallback = func() error { - return errors.Wrap( - h.processor.SetWithdrawStatusFailed(request.DepositDbId), - "failed to set withdraw status failed", - ) - } - +func (h GetDepositHandler) ProcessRequest(request processor.GetDepositRequest) (reprocessable bool, err error) { withdrawReq, reprocessable, err := h.processor.ProcessGetDepositRequest(request) if err != nil { - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process get deposit request") + return reprocessable, errors.Wrap(err, "failed to process get deposit request") } reprocessable = true @@ -56,5 +42,12 @@ func (h *GetDepositHandler) ProcessDelivery(delivery amqp.Delivery) (reprocessab reprocessable = false } - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to send deposit processing request") + return reprocessable, errors.Wrap(err, "failed to send deposit processing request") +} + +func (h GetDepositHandler) ReprocessFailedCallback(request processor.GetDepositRequest) error { + return errors.Wrap( + h.processor.SetWithdrawStatusFailed(request.DepositDbId), + "failed to set withdraw status failed", + ) } diff --git a/internal/core/rabbitmq/consumer/processors/submit_transaction.go b/internal/core/rabbitmq/consumer/processors/submit_transaction.go index 6ccc135..b3dabe4 100644 --- a/internal/core/rabbitmq/consumer/processors/submit_transaction.go +++ b/internal/core/rabbitmq/consumer/processors/submit_transaction.go @@ -18,19 +18,24 @@ func NewSubmitTransactionHandler( } } -func (s SubmitTransactionHandler) ProcessBatch(batch []processor.SubmitTransactionRequest) (reprocessable bool, rprFailCallback func(ids ...int64) error, err error) { +func (s SubmitTransactionHandler) ProcessBatch(batch []processor.SubmitTransactionRequest) (reprocessable bool, err error) { if len(batch) == 0 { - return false, nil, nil - } - - rprFailCallback = func(ids ...int64) error { - return errors.Wrap( - s.processor.SetSubmitStatusFailed(ids...), - "failed to set submit status failed", - ) + return false, nil } reprocessable, err = s.processor.ProcessSubmitTransactions(batch...) - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process submit transaction request") + return reprocessable, errors.Wrap(err, "failed to process submit transaction request") +} + +func (s SubmitTransactionHandler) ReprocessFailedCallback(batch []processor.SubmitTransactionRequest) error { + ids := make([]int64, len(batch)) + for i, req := range batch { + ids[i] = req.DepositDbId + } + + return errors.Wrap( + s.processor.SetSubmitStatusFailed(ids...), + "failed to set submit status failed", + ) } diff --git a/internal/core/rabbitmq/consumer/processors/zano_send_withdrawal.go b/internal/core/rabbitmq/consumer/processors/zano_send_withdrawal.go index ec461ef..e0c359e 100644 --- a/internal/core/rabbitmq/consumer/processors/zano_send_withdrawal.go +++ b/internal/core/rabbitmq/consumer/processors/zano_send_withdrawal.go @@ -1,12 +1,9 @@ package processors import ( - "encoding/json" - "github.com/hyle-team/bridgeless-signer/internal/bridge/processor" rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" ) type ZanoSendWithdrawalHandler struct { @@ -17,34 +14,29 @@ type ZanoSendWithdrawalHandler struct { func NewZanoSendWithdrawalHandler( processor *processor.Processor, publisher rabbitTypes.Producer, -) rabbitTypes.DeliveryProcessor { +) rabbitTypes.RequestProcessor[processor.ZanoSignedWithdrawalRequest] { return &ZanoSendWithdrawalHandler{ processor: processor, publisher: publisher, } } -func (h *ZanoSendWithdrawalHandler) ProcessDelivery(delivery amqp.Delivery) (reprocessable bool, rprFailCallback func() error, err error) { - var request processor.ZanoSignedWithdrawalRequest - if err = json.Unmarshal(delivery.Body, &request); err != nil { - return false, nil, errors.Wrap(err, "failed to unmarshal delivery body") - } - - rprFailCallback = func() error { - return errors.Wrap( - h.processor.SetWithdrawStatusFailed(request.DepositDbId), - "failed to set withdraw status failed", - ) - } - +func (h ZanoSendWithdrawalHandler) ProcessRequest(request processor.ZanoSignedWithdrawalRequest) (reprocessable bool, err error) { submitReq, reprocessable, err := h.processor.ProcessZanoSendWithdrawalRequest(request) if err != nil { - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process zano send withdrawal request") + return reprocessable, errors.Wrap(err, "failed to process zano send withdrawal request") } if err = h.publisher.PublishSubmitTransactionRequest(*submitReq); err != nil { - return true, rprFailCallback, errors.Wrap(err, "failed to send submit withdraw request") + return true, errors.Wrap(err, "failed to send submit withdraw request") } - return false, nil, nil + return false, nil +} + +func (h ZanoSendWithdrawalHandler) ReprocessFailedCallback(request processor.ZanoSignedWithdrawalRequest) error { + return errors.Wrap( + h.processor.SetWithdrawStatusFailed(request.DepositDbId), + "failed to set withdraw status failed", + ) } diff --git a/internal/core/rabbitmq/consumer/processors/zano_sign_withdrawal.go b/internal/core/rabbitmq/consumer/processors/zano_sign_withdrawal.go index 7b568d9..e203c2f 100644 --- a/internal/core/rabbitmq/consumer/processors/zano_sign_withdrawal.go +++ b/internal/core/rabbitmq/consumer/processors/zano_sign_withdrawal.go @@ -1,12 +1,9 @@ package processors import ( - "encoding/json" - "github.com/hyle-team/bridgeless-signer/internal/bridge/processor" rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types" "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" ) type ZanoSignWithdrawalHandler struct { @@ -17,34 +14,29 @@ type ZanoSignWithdrawalHandler struct { func NewZanoSignWithdrawalHandler( processor *processor.Processor, publisher rabbitTypes.Producer, -) rabbitTypes.DeliveryProcessor { +) rabbitTypes.RequestProcessor[processor.WithdrawalRequest] { return &ZanoSignWithdrawalHandler{ processor: processor, publisher: publisher, } } -func (h *ZanoSignWithdrawalHandler) ProcessDelivery(delivery amqp.Delivery) (reprocessable bool, rprFailCallback func() error, err error) { - var request processor.WithdrawalRequest - if err = json.Unmarshal(delivery.Body, &request); err != nil { - return false, nil, errors.Wrap(err, "failed to unmarshal delivery body") - } - - rprFailCallback = func() error { - return errors.Wrap( - h.processor.SetWithdrawStatusFailed(request.DepositDbId), - "failed to set withdraw status failed", - ) - } - +func (h ZanoSignWithdrawalHandler) ProcessRequest(request processor.WithdrawalRequest) (reprocessable bool, err error) { signedWithdrawReq, reprocessable, err := h.processor.ProcessZanoSignWithdrawalRequest(request) if err != nil { - return reprocessable, rprFailCallback, errors.Wrap(err, "failed to process zano sign withdrawal request") + return reprocessable, errors.Wrap(err, "failed to process zano sign withdrawal request") } if err = h.publisher.PublishZanoSendWithdrawalRequest(*signedWithdrawReq); err != nil { - return true, rprFailCallback, errors.Wrap(err, "failed to send zano send withdraw request") + return true, errors.Wrap(err, "failed to send zano send withdraw request") } - return false, nil, nil + return false, nil +} + +func (h ZanoSignWithdrawalHandler) ReprocessFailedCallback(request processor.WithdrawalRequest) error { + return errors.Wrap( + h.processor.SetWithdrawStatusFailed(request.DepositDbId), + "failed to set withdraw status failed", + ) } diff --git a/internal/core/rabbitmq/producer/resend.go b/internal/core/rabbitmq/producer/resend.go index ba096b0..0f14b0f 100644 --- a/internal/core/rabbitmq/producer/resend.go +++ b/internal/core/rabbitmq/producer/resend.go @@ -8,7 +8,7 @@ import ( func (p *Producer) ResendDelivery(queue string, msg amqp.Delivery) error { retryCount := p.getCurrentRetryNumber(msg) if retryCount >= int32(p.maxRetryCount) { - return rabbitTypes.ErrorMaxResendReached + return rabbitTypes.ErrMaxResendReached } retryCount++ diff --git a/internal/core/rabbitmq/types/main.go b/internal/core/rabbitmq/types/main.go index 6fc4106..e880a13 100644 --- a/internal/core/rabbitmq/types/main.go +++ b/internal/core/rabbitmq/types/main.go @@ -32,7 +32,10 @@ const ( SubmitTransactionQueue = "submit-transaction-queue" ) -var ErrorMaxResendReached = errors.New("max resend count reached") +var ( + ErrMaxResendReached = errors.New("max resend count reached") + ErrConnectionClosed = errors.New("RabbitMQ connection was closed") +) type Producer interface { PublishGetDepositRequest(request bridgeTypes.GetDepositRequest) error @@ -52,21 +55,6 @@ type DeliveryResender interface { } type Consumer interface { + Name() string Consume(ctx context.Context, queue string) error } - -type DeliveryProcessor interface { - // ProcessDelivery processes the delivery and returns whether the delivery should be reprocessed, - // a callback to be called if the reprocessing fails, and an error. - ProcessDelivery(delivery amqp.Delivery) (reprocessable bool, rprFailCallback func() error, err error) -} - -type Identifiable interface { - Id() int64 -} - -type BatchProcessor[T Identifiable] interface { - // ProcessBatch processes the batch and returns whether the batch should be reprocessed, - // a callback to be called if the reprocessing fails, and an error. - ProcessBatch(batch []T) (reprocessable bool, rprFailCallback func(ids ...int64) error, err error) -} diff --git a/internal/core/rabbitmq/types/processors.go b/internal/core/rabbitmq/types/processors.go new file mode 100644 index 0000000..861f82c --- /dev/null +++ b/internal/core/rabbitmq/types/processors.go @@ -0,0 +1,19 @@ +package types + +type Identifiable interface { + Id() int64 +} + +type RequestProcessor[Request Identifiable] interface { + // ProcessRequest processes the request and returns whether the request can be reprocessed + ProcessRequest(request Request) (reprocessable bool, err error) + // ReprocessFailedCallback is a callback to be called if the reprocessing fails + ReprocessFailedCallback(request Request) error +} + +type BatchProcessor[Request Identifiable] interface { + // ProcessBatch processes the batch and returns whether the batch should be reprocessed + ProcessBatch(batch []Request) (reprocessable bool, err error) + // ReprocessFailedCallback is a callback to be called if the reprocessing fails + ReprocessFailedCallback(batch []Request) error +}