Skip to content

Commit

Permalink
Merge pull request #9 from hyle-team/feature/closed-rabbit-conn
Browse files Browse the repository at this point in the history
Feature/closed rabbit conn
  • Loading branch information
slbmax authored Dec 5, 2024
2 parents 4db247f + bfea6d1 commit bcd5667
Show file tree
Hide file tree
Showing 23 changed files with 314 additions and 281 deletions.
15 changes: 15 additions & 0 deletions internal/bridge/chain/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package chain

import (
"fmt"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/rpcclient"
"github.com/hyle-team/bridgeless-signer/internal/bridge/types"
"github.com/pkg/errors"
"gitlab.com/distributed_lab/figure/v3"
"reflect"
"strings"
)

type Bitcoin struct {
Expand All @@ -33,6 +35,19 @@ func (c Chain) Bitcoin() Bitcoin {
panic(errors.Wrap(err, "failed to init bitcoin chain rpc"))
}

// ensuring wallet is properly configured
_, err := chain.Rpc.GetWalletInfo()
if err != nil {
if strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotFound)) {
if _, err := chain.Rpc.LoadWallet(chain.Wallet); err != nil {
panic(errors.Wrap(err, "failed to load wallet"))
}
}
if strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotSpecified)) {
panic("wallet not specified in the URL")
}
}

var receivers []string
if err := figure.Out(&receivers).FromInterface(c.BridgeAddresses).With(figure.BaseHooks).Please(); err != nil {
panic(errors.Wrap(err, "failed to decode bitcoin receivers"))
Expand Down
10 changes: 6 additions & 4 deletions internal/bridge/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ func (c *configurer) CoreConnectorConfig() ConnectorConfig {
}
connectSecurityOptions = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
keepaliveOptions := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second, // wait time before ping if no activity
Timeout: 5 * time.Second, // ping timeout
PermitWithoutStream: true,
})

client, err := grpc.Dial(cfg.Connection.Addr, connectSecurityOptions, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // wait time before ping if no activity
Timeout: 20 * time.Second, // ping timeout
}))
client, err := grpc.NewClient(cfg.Connection.Addr, connectSecurityOptions, keepaliveOptions)
if err != nil {
panic(errors.Wrap(err, "failed to connect to core via gRPC"))
}
Expand Down
13 changes: 8 additions & 5 deletions internal/bridge/core/submit_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
bridgetypes "github.com/hyle-team/bridgeless-core/x/bridge/types"
"github.com/hyle-team/bridgeless-signer/internal/bridge/types"
"github.com/pkg/errors"
"strings"
)

func (c *Connector) SubmitDeposits(depositTxs ...bridgetypes.Transaction) error {
Expand All @@ -12,11 +13,13 @@ func (c *Connector) SubmitDeposits(depositTxs ...bridgetypes.Transaction) error
}

msg := bridgetypes.NewMsgSubmitTransactions(c.settings.Account.CosmosAddress(), depositTxs...)
if err := c.submitMsgs(msg); err != nil {
if errors.Is(err, bridgetypes.ErrTranscationAlreadySubmitted.GRPCStatus().Err()) {
return types.ErrTransactionAlreadySubmitted
}
err := c.submitMsgs(msg)
if err == nil {
return nil
}
if strings.Contains(err.Error(), bridgetypes.ErrTranscationAlreadySubmitted.Error()) {
return types.ErrTransactionAlreadySubmitted
}

return nil
return errors.Wrap(err, "failed to submit deposits")
}
1 change: 0 additions & 1 deletion internal/bridge/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func New(
db data.DepositsQ,
signer *signer.Signer,
core bridgeTypes.Bridger,

) *Processor {
return &Processor{proxies: proxies, db: db, signer: signer, core: core}
}
Expand Down
11 changes: 7 additions & 4 deletions internal/bridge/processor/submit_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ func (p *Processor) ProcessSubmitTransactions(reqs ...SubmitTransactionRequest)
return errors.Wrap(tmperr, "failed to set deposits submitted")
}

return errors.Wrap(p.core.SubmitDeposits(depositTxs...), "failed to submit deposits")
err = p.core.SubmitDeposits(depositTxs...)
if errors.Is(err, bridgeTypes.ErrTransactionAlreadySubmitted) {
// ignoring already submitted transaction
return nil
}

return errors.Wrap(err, "failed to submit deposits")
})
if errors.Is(err, bridgeTypes.ErrTransactionAlreadySubmitted) {
return false, err
}

return err != nil, err
}
10 changes: 7 additions & 3 deletions internal/bridge/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ type WithdrawalRequest struct {
Data data.DepositData
}

func (r WithdrawalRequest) Id() int64 {
return r.DepositDbId
}

type GetDepositRequest struct {
DepositDbId int64
DepositIdentifier data.DepositIdentifier
}

func (r GetDepositRequest) Id() int64 { return r.DepositDbId }

type ZanoSignedWithdrawalRequest struct {
DepositDbId int64
Data data.DepositData
Transaction zano.SignedTransaction
}

func (r WithdrawalRequest) Id() int64 {
return r.DepositDbId
}
func (r ZanoSignedWithdrawalRequest) Id() int64 { return r.DepositDbId }

type SubmitTransactionRequest struct {
DepositDbId int64
Expand Down
24 changes: 0 additions & 24 deletions internal/bridge/proxy/btc/withdraw.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package btc

import (
"fmt"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/btcutil"
bridgeTypes "github.com/hyle-team/bridgeless-signer/internal/bridge"
"github.com/pkg/errors"
"math/big"
"strings"
)

var minWithdrawAmount = big.NewInt(minSatoshisPerOutput)
Expand All @@ -34,10 +31,6 @@ func (p *proxy) SendBitcoins(data map[string]*big.Int) (string, error) {
amounts[addr] = btcutil.Amount(value)
}

if err := p.ensureWalletLoaded(); err != nil {
return "", errors.Wrap(err, "failed to ensure wallet loaded")
}

hash, err := p.chain.Rpc.SendMany("", amounts)
if err != nil {
return "", errors.Wrap(err, "failed to send transaction")
Expand All @@ -53,20 +46,3 @@ func (p *proxy) WithdrawalAmountValid(amount *big.Int) bool {

return true
}

func (p *proxy) ensureWalletLoaded() error {
info, err := p.chain.Rpc.GetWalletInfo()
if err != nil {
if !strings.HasPrefix(err.Error(), fmt.Sprintf("%v", btcjson.ErrRPCWalletNotFound)) {
return errors.Wrap(err, "failed to get wallet info")
}
} else {
if info.WalletName == p.chain.Wallet {
return nil
}
}

_, err = p.chain.Rpc.LoadWallet(p.chain.Wallet)

return errors.Wrap(err, "failed to load wallet")
}
15 changes: 5 additions & 10 deletions internal/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package cli

import (
"context"
"github.com/pkg/errors"
"os/signal"
"syscall"

"github.com/alecthomas/kingpin"
"github.com/hyle-team/bridgeless-signer/internal/config"
"github.com/pkg/errors"
"gitlab.com/distributed_lab/kit/kv"
"gitlab.com/distributed_lab/logan/v3"
)
Expand Down Expand Up @@ -41,28 +38,26 @@ func Run(args []string) bool {
return false
}

ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)

switch cmd {
case serviceCmd.FullCommand():
err = RunService(ctx, cfg)
err = RunService(cfg)
case migrateUpCmd.FullCommand():
err = MigrateUp(cfg)
case migrateDownCmd.FullCommand():
err = MigrateDown(cfg)
// handle any custom commands here in the same way
default:
log.Errorf("unknown command %s", cmd)
return false
}
if err != nil {
if errors.Is(err, context.Canceled) {
log.Info("service stopped: context was canceled")
log.Info("service got signal to stop")
return true
}

log.WithError(err).Error("failed to exec cmd")
log.Error(err)
return false
}

return true
}
34 changes: 32 additions & 2 deletions internal/cli/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import (
"context"
coreConnector "github.com/hyle-team/bridgeless-signer/internal/bridge/core"
"github.com/hyle-team/bridgeless-signer/internal/bridge/proxy"
rabbitTypes "github.com/hyle-team/bridgeless-signer/internal/core/rabbitmq/types"
amqp "github.com/rabbitmq/amqp091-go"
"os"
"os/signal"
"sync"
"syscall"

bridgeProcessor "github.com/hyle-team/bridgeless-signer/internal/bridge/processor"
"github.com/hyle-team/bridgeless-signer/internal/config"
Expand All @@ -14,12 +19,15 @@ import (
"github.com/pkg/errors"
)

func RunService(ctx context.Context, cfg config.Config) error {
func RunService(cfg config.Config) error {
var (
wg = sync.WaitGroup{}
coreCfg = cfg.CoreConnectorConfig()
coreConn = coreConnector.NewConnector(coreCfg.Connection, coreCfg.Settings)
rabbitCfg = cfg.RabbitMQConfig()

rabbitConnChan = rabbitCfg.Connection.NotifyClose(make(chan *amqp.Error, 1))
ctx = appContext(rabbitConnChan)
)

proxiesRepo, err := proxy.NewProxiesRepository(cfg.Chains(), cfg.Log())
Expand All @@ -39,5 +47,27 @@ func RunService(ctx context.Context, cfg config.Config) error {

wg.Wait()

return ctx.Err()
return context.Cause(ctx)
}

func appContext(rabbit <-chan *amqp.Error) context.Context {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

ctx, cancel := context.WithCancelCause(context.Background())

go func() {
select {
case <-sig:
cancel(nil)
case err, ok := <-rabbit:
if ok {
cancel(rabbitTypes.ErrConnectionClosed)
} else {
cancel(err)
}
}
}()

return ctx
}
4 changes: 2 additions & 2 deletions internal/core/api/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *server) RunGRPC(ctx context.Context) error {
srv := s.grpcServer()

// graceful shutdown
go func() { <-ctx.Done(); srv.GracefulStop(); s.logger.Info("grpc serving stopped") }()
go func() { <-ctx.Done(); srv.GracefulStop(); s.logger.Info("grpc serving stopped: context canceled") }()

s.logger.Info("grpc serving started")
return srv.Serve(s.grpc)
Expand All @@ -79,7 +79,7 @@ func (s *server) RunHTTP(ctxt context.Context) error {
if err := srv.Shutdown(shutdownDeadline); err != nil {
s.logger.WithError(err).Error("failed to shutdown http server")
}
s.logger.Info("http serving stopped")
s.logger.Info("http serving stopped: context canceled")
}()

s.logger.Info("http serving started")
Expand Down
Loading

0 comments on commit bcd5667

Please sign in to comment.