Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gravity gRPC client implementation #385

Merged
merged 7 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 24 additions & 41 deletions chain/paloma/gravity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,74 +4,57 @@ import (
"context"
"encoding/hex"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/grpc"
gravity "github.com/palomachain/paloma/x/gravity/types"
"github.com/palomachain/pigeon/chain"
)

func (c *Client) GravityQueryLastUnsignedBatch(ctx context.Context, chainReferenceID string) ([]gravity.OutgoingTxBatch, error) {
return gravityQueryLastUnsignedBatch(ctx, c.GRPCClient, c.creator, chainReferenceID)
}
// return gravityQueryLastUnsignedBatch(ctx, c.GRPCClient, c.creator, chainReferenceID)
qc := gravity.NewQueryClient(c.GRPCClient)
batches, err := qc.LastPendingBatchRequestByAddr(ctx, &gravity.QueryLastPendingBatchRequestByAddrRequest{
Address: c.creator,
})
if err != nil {
return nil, err
}

func (c *Client) GravityConfirmBatches(ctx context.Context, signatures ...chain.SignedGravityOutgoingTxBatch) error {
return gravityConfirmBatch(ctx, c.MessageSender, c.creator, signatures...)
}
filtered := make([]gravity.OutgoingTxBatch, 0, len(batches.Batch))
for _, v := range batches.Batch {
if v.GetChainReferenceID() == chainReferenceID {
filtered = append(filtered, v)
}
}

func (c *Client) GravityQueryBatchesForRelaying(ctx context.Context, chainReferenceID string) ([]chain.GravityBatchWithSignatures, error) {
return gravityQueryBatchesForRelaying(ctx, c.GRPCClient, c.valAddr, chainReferenceID)
return filtered, nil
}

func gravityConfirmBatch(
ctx context.Context,
ms MessageSender,
creator string,
signedBatches ...chain.SignedGravityOutgoingTxBatch,
) error {
if len(signedBatches) == 0 {
func (c *Client) GravityConfirmBatches(ctx context.Context, signatures ...chain.SignedGravityOutgoingTxBatch) error {
if len(signatures) == 0 {
return nil
}
for _, signedBatch := range signedBatches {
for _, signedBatch := range signatures {
msg := &gravity.MsgConfirmBatch{
Nonce: signedBatch.BatchNonce,
TokenContract: signedBatch.TokenContract,
EthSigner: signedBatch.SignedByAddress,
Orchestrator: creator,
Orchestrator: c.creator,
Signature: hex.EncodeToString(signedBatch.Signature),
}
_, err := ms.SendMsg(ctx, msg, "")
_, err := c.MessageSender.SendMsg(ctx, msg, "", c.sendingOpts...)
return err

}
return nil
}

func gravityQueryLastUnsignedBatch(ctx context.Context, grpcClient grpc.ClientConn, address string, chainReferenceID string) ([]gravity.OutgoingTxBatch, error) {
qc := gravity.NewQueryClient(grpcClient)
batches, err := qc.LastPendingBatchRequestByAddr(ctx, &gravity.QueryLastPendingBatchRequestByAddrRequest{
Address: address,
})
if err != nil {
return nil, err
}

filtered := make([]gravity.OutgoingTxBatch, 0, len(batches.Batch))
for _, v := range batches.Batch {
if v.GetChainReferenceID() == chainReferenceID {
filtered = append(filtered, v)
}
}

return filtered, nil
}

func gravityQueryBatchesForRelaying(ctx context.Context, grpcClient grpc.ClientConn, address sdk.ValAddress, chainReferenceID string) ([]chain.GravityBatchWithSignatures, error) {
qc := gravity.NewQueryClient(grpcClient)
func (c *Client) GravityQueryBatchesForRelaying(ctx context.Context, chainReferenceID string) ([]chain.GravityBatchWithSignatures, error) {
// return gravityQueryBatchesForRelaying(ctx, c.GRPCClient, c.valAddr, chainReferenceID)
qc := gravity.NewQueryClient(c.GRPCClient)

// Get batches
req := &gravity.QueryOutgoingTxBatchesRequest{
ChainReferenceId: chainReferenceID,
Assignee: address.String(),
Assignee: c.valAddr.String(),
}
batches, err := qc.OutgoingTxBatches(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion chain/paloma/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ func (m *PalomaMessageSender) SendMsg(ctx context.Context, msg sdk.Msg, memo str
signer := m.GetSigner()

logger.WithField("creator", creator).WithField("signer", signer).Debug("Injecting metadata")

if err := tryInjectMetadata(msg, vtypes.MsgMetadata{
Creator: m.GetCreator(),
Signers: []string{signer},
}); err != nil {
return nil, fmt.Errorf("failed to inject metadata: %w", err)
}

logger.WithField("msg", msg).Debug("Sending message...")
res, err := m.W.SendMsg(ctx, msg, memo, opts...)
if IsPalomaDown(err) {
return nil, whoops.Wrap(ErrPalomaIsDown, err)
Expand Down
3 changes: 3 additions & 0 deletions util/ion/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
tmtypes "github.com/cometbft/cometbft/types"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/pigeon/internal/liblog"
)

const (
Expand Down Expand Up @@ -68,6 +69,7 @@ func broadcastTx(
// point
syncRes, err := broadcaster.BroadcastTxSync(ctx, tx)
if err != nil {
liblog.WithContext(ctx).WithError(err).Warn("Failed to broadcast TX.")
if syncRes == nil {
// There are some cases where BroadcastTxSync will return an error but the associated
// ResultBroadcastTx will be nil.
Expand All @@ -84,6 +86,7 @@ func broadcastTx(
// This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go
err = errors.Unwrap(sdkerrors.ABCIError(syncRes.Codespace, syncRes.Code, "error broadcasting transaction"))
if err.Error() != errUnknown {
liblog.WithContext(ctx).WithError(err).WithField("sync-result", syncRes).Warn("Failed to broadcast TX.")
return nil, err
}
if syncRes.Code != 0 {
Expand Down
6 changes: 3 additions & 3 deletions util/ion/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (cc *Client) SendMsgs(ctx context.Context, msgs []sdk.Msg, memo string, opt
defer done()
liblog.WithContext(ctx).WithField("component", "send-msgs").WithField("key", cc.Config.Key).Info("signing transaction")
if err = tx.Sign(ctx, txf, cc.Config.Key, txb, false); err != nil {
return err
return fmt.Errorf("failed to sign tx: %w", err)
}
return nil
}()
Expand All @@ -119,13 +119,13 @@ func (cc *Client) SendMsgs(ctx context.Context, msgs []sdk.Msg, memo string, opt
// Generate the transaction bytes
txBytes, err := cc.Codec.TxConfig.TxEncoder()(txb.GetTx())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to encode tx: %w", err)
}

// Broadcast those bytes
res, err := cc.BroadcastTx(ctx, txBytes)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to broadcast tx: %w", err)
}

// transaction was executed, log the success or failure using the tx response code
Expand Down
Loading