Skip to content

Commit

Permalink
Fix acks (cosmos#756)
Browse files Browse the repository at this point in the history
* Use map[string]string for attributes inside event since events can have many attributes

* Fix loop end bracket placement

* update ibctest version. log event parse errors as warnings.
  • Loading branch information
agouin authored May 26, 2022
1 parent e0a2858 commit d0cf6d6
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 127 deletions.
2 changes: 1 addition & 1 deletion ibctest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/cosmos/cosmos-sdk v0.45.1
github.com/cosmos/relayer/v2 v2.0.0
github.com/ory/dockertest/v3 v3.8.1
github.com/strangelove-ventures/ibctest v0.0.0-20220524122627-7c657bd34983
github.com/strangelove-ventures/ibctest v0.0.0-20220526210715-7f10563bfe65
go.uber.org/zap v1.21.0
)

Expand Down
2 changes: 2 additions & 0 deletions ibctest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU=
github.com/strangelove-ventures/ibctest v0.0.0-20220524122627-7c657bd34983 h1:Z+K3+FvC7b9f6fH4r9xFRXh5ueMgQAeAwTdGh09tbdU=
github.com/strangelove-ventures/ibctest v0.0.0-20220524122627-7c657bd34983/go.mod h1:WQf13YWKaB2h9uzDSKUIehiOWbAqYTcJWQ2SGj04Ndg=
github.com/strangelove-ventures/ibctest v0.0.0-20220526210715-7f10563bfe65 h1:Mp9ftARQJ2VduGDJwqVyHY1umoLvW0WHTEgEzvrahmA=
github.com/strangelove-ventures/ibctest v0.0.0-20220526210715-7f10563bfe65/go.mod h1:KTmqIpN6fxLAmaa6eo/EWTCbQHv5NPmYnD3LLQ8QAds=
github.com/strangelove-ventures/lens v0.3.1-0.20220407203447-bcb1fa2e7b3a h1:9nwCWqs6BHJomom4TEQXCSQ0I8AKPwXNzSXZRr3i2vk=
github.com/strangelove-ventures/lens v0.3.1-0.20220407203447-bcb1fa2e7b3a/go.mod h1:17n4M/9F6YWAvmaqJLh0/8dZgUz23grtImvd58gBe28=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
Expand Down
18 changes: 12 additions & 6 deletions relayer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
func ParseClientIDFromEvents(events []provider.RelayerEvent) (string, error) {
for _, event := range events {
if event.EventType == clienttypes.EventTypeCreateClient {
if event.AttributeKey == clienttypes.AttributeKeyClientID {
return event.AttributeValue, nil
for attributeKey, attributeValue := range event.Attributes {
if attributeKey == clienttypes.AttributeKeyClientID {
return attributeValue, nil
}
}
}
}
Expand All @@ -27,8 +29,10 @@ func ParseClientIDFromEvents(events []provider.RelayerEvent) (string, error) {
func ParseConnectionIDFromEvents(events []provider.RelayerEvent) (string, error) {
for _, event := range events {
if event.EventType == connectiontypes.EventTypeConnectionOpenInit || event.EventType == connectiontypes.EventTypeConnectionOpenTry {
if event.AttributeKey == connectiontypes.AttributeKeyConnectionID {
return event.AttributeValue, nil
for attributeKey, attributeValue := range event.Attributes {
if attributeKey == connectiontypes.AttributeKeyConnectionID {
return attributeValue, nil
}
}
}
}
Expand All @@ -40,8 +44,10 @@ func ParseConnectionIDFromEvents(events []provider.RelayerEvent) (string, error)
func ParseChannelIDFromEvents(events []provider.RelayerEvent) (string, error) {
for _, event := range events {
if event.EventType == channeltypes.EventTypeChannelOpenInit || event.EventType == channeltypes.EventTypeChannelOpenTry {
if event.AttributeKey == channeltypes.AttributeKeyChannelID {
return event.AttributeValue, nil
for attributeKey, attributeValue := range event.Attributes {
if attributeKey == channeltypes.AttributeKeyChannelID {
return attributeValue, nil
}
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions relayer/provider/cosmos/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ func getChannelsIfPresent(events []provider.RelayerEvent) []zapcore.Field {

for _, event := range events {
for _, tag := range channelTags {
if event.AttributeKey == tag {
// Only append the tag once
// TODO: what if they are different?
if _, ok := foundTag[tag]; !ok {
fields = append(fields, zap.String(tag, event.AttributeValue))
foundTag[tag] = struct{}{}
for attributeKey, attributeValue := range event.Attributes {
if attributeKey == tag {
// Only append the tag once
// TODO: what if they are different?
if _, ok := foundTag[tag]; !ok {
fields = append(fields, zap.String(tag, attributeValue))
foundTag[tag] = struct{}{}
}
}
}
}
Expand Down
229 changes: 127 additions & 102 deletions relayer/provider/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ func (cc *CosmosProvider) AcknowledgementFromSequence(ctx context.Context, dst p
return nil, fmt.Errorf("more than one transaction returned with query")
}

acks, err := acknowledgementsFromResultTx(dstChanId, dstPortId, srcChanId, srcPortId, txs[0])
acks, err := cc.acknowledgementsFromResultTx(dstChanId, dstPortId, srcChanId, srcPortId, txs[0])
switch {
case err != nil:
return nil, err
Expand Down Expand Up @@ -1153,55 +1153,67 @@ func (cc *CosmosProvider) relayPacketsFromResultTx(ctx context.Context, src, dst
timeoutPackets []provider.RelayPacket
)

rp := &relayMsgRecvPacket{pass: false}
EventLoop:
for _, event := range resp.Events {
rp.pass = false
rp := &relayMsgRecvPacket{}

if event.EventType != spTag {
continue
}

switch event.AttributeKey {
case srcChanTag:
if event.AttributeValue != srcChanId {
rp.pass = true
continue
}
case dstChanTag:
if event.AttributeValue != dstChanId {
rp.pass = true
continue
}
case srcPortTag:
if event.AttributeValue != srcPortId {
rp.pass = true
continue
for attributeKey, attributeValue := range event.Attributes {
switch attributeKey {
case srcChanTag:
if attributeValue != srcChanId {
continue EventLoop
}
case dstChanTag:
if attributeValue != dstChanId {
continue EventLoop
}
case srcPortTag:
if attributeValue != srcPortId {
continue EventLoop
}
case dstPortTag:
if attributeValue != dstPortId {
continue EventLoop
}
case dataTag:
rp.packetData = []byte(attributeValue)
case toHeightTag:
timeout, err := clienttypes.ParseHeight(attributeValue)
if err != nil {
cc.log.Warn("error parsing height timeout",
zap.String("chain_id", cc.ChainId()),
zap.Uint64("sequence", rp.seq),
zap.Error(err),
)
continue EventLoop
}
rp.timeout = timeout
case toTSTag:
timeout, err := strconv.ParseUint(attributeValue, 10, 64)
if err != nil {
cc.log.Warn("error parsing timestamp timeout",
zap.String("chain_id", cc.ChainId()),
zap.Uint64("sequence", rp.seq),
zap.Error(err),
)
continue EventLoop
}
rp.timeoutStamp = timeout
case seqTag:
seq, err := strconv.ParseUint(attributeValue, 10, 64)
if err != nil {
cc.log.Warn("error parsing packet sequence",
zap.String("chain_id", cc.ChainId()),
zap.Error(err),
)
continue EventLoop
}
rp.seq = seq
}
case dstPortTag:
if event.AttributeValue != dstPortId {
rp.pass = true
continue
}
case dataTag:
rp.packetData = []byte(event.AttributeValue)
case toHeightTag:
timeout, err := clienttypes.ParseHeight(event.AttributeValue)
if err != nil {
return nil, nil, err
}
rp.timeout = timeout
case toTSTag:
timeout, err := strconv.ParseUint(event.AttributeValue, 10, 64)
if err != nil {
return nil, nil, err
}
rp.timeoutStamp = timeout
case seqTag:
seq, err := strconv.ParseUint(event.AttributeValue, 10, 64)
if err != nil {
return nil, nil, err
}
rp.seq = seq
}

// If packet data is nil or sequence number is 0 keep parsing events,
Expand Down Expand Up @@ -1233,75 +1245,88 @@ func (cc *CosmosProvider) relayPacketsFromResultTx(ctx context.Context, src, dst
case !rp.timeout.IsZero() && block.GetHeight().GTE(rp.timeout):
timeoutPackets = append(timeoutPackets, rp.timeoutPacket())
// If the packet matches the relay constraints relay it as a MsgReceivePacket
case !rp.pass:
default:
rcvPackets = append(rcvPackets, rp)
}
}

// If there is a relayPacket, return it
if len(rcvPackets) > 0 || len(timeoutPackets) > 0 {
return rcvPackets, timeoutPackets, nil
}
// If there is a relayPacket, return it
if len(rcvPackets) > 0 || len(timeoutPackets) > 0 {
return rcvPackets, timeoutPackets, nil
}

return nil, nil, fmt.Errorf("no packet data found")
}

// acknowledgementsFromResultTx looks through the events in a *ctypes.ResultTx and returns
// relayPackets with the appropriate data
func acknowledgementsFromResultTx(dstChanId, dstPortId, srcChanId, srcPortId string, resp *provider.RelayerTxResponse) ([]provider.RelayPacket, error) {
func (cc *CosmosProvider) acknowledgementsFromResultTx(dstChanId, dstPortId, srcChanId, srcPortId string, resp *provider.RelayerTxResponse) ([]provider.RelayPacket, error) {
var ackPackets []provider.RelayPacket

rp := &relayMsgPacketAck{pass: false}
EventLoop:
for _, event := range resp.Events {
rp.pass = false
rp := &relayMsgPacketAck{}

if event.EventType != waTag {
continue
}

switch event.AttributeKey {
case srcChanTag:
if event.AttributeValue != srcChanId {
rp.pass = true
continue
}
case dstChanTag:
if event.AttributeValue != dstChanId {
rp.pass = true
continue
}
case srcPortTag:
if event.AttributeValue != srcPortId {
rp.pass = true
continue
}
case dstPortTag:
if event.AttributeValue != dstPortId {
rp.pass = true
continue
for attributeKey, attributeValue := range event.Attributes {

switch attributeKey {
case srcChanTag:
if attributeValue != srcChanId {
continue EventLoop
}
case dstChanTag:
if attributeValue != dstChanId {
continue EventLoop
}
case srcPortTag:
if attributeValue != srcPortId {
continue EventLoop
}
case dstPortTag:
if attributeValue != dstPortId {
continue EventLoop
}
case ackTag:
rp.ack = []byte(attributeValue)
case dataTag:
rp.packetData = []byte(attributeValue)
case toHeightTag:
timeout, err := clienttypes.ParseHeight(attributeValue)
if err != nil {
cc.log.Warn("error parsing height timeout",
zap.String("chain_id", cc.ChainId()),
zap.Uint64("sequence", rp.seq),
zap.Error(err),
)
continue EventLoop
}
rp.timeout = timeout
case toTSTag:
timeout, err := strconv.ParseUint(attributeValue, 10, 64)
if err != nil {
cc.log.Warn("error parsing timestamp timeout",
zap.String("chain_id", cc.ChainId()),
zap.Uint64("sequence", rp.seq),
zap.Error(err),
)
continue EventLoop
}
rp.timeoutStamp = timeout
case seqTag:
seq, err := strconv.ParseUint(attributeValue, 10, 64)
if err != nil {
cc.log.Warn("error parsing packet sequence",
zap.String("chain_id", cc.ChainId()),
zap.Error(err),
)
continue EventLoop
}
rp.seq = seq
}
case ackTag:
rp.ack = []byte(event.AttributeValue)
case dataTag:
rp.packetData = []byte(event.AttributeValue)
case toHeightTag:
timeout, err := clienttypes.ParseHeight(event.AttributeValue)
if err != nil {
return nil, err
}
rp.timeout = timeout
case toTSTag:
timeout, err := strconv.ParseUint(event.AttributeValue, 10, 64)
if err != nil {
return nil, err
}
rp.timeoutStamp = timeout
case seqTag:
seq, err := strconv.ParseUint(event.AttributeValue, 10, 64)
if err != nil {
return nil, err
}
rp.seq = seq
}

// If packet data is nil or sequence number is 0 keep parsing events,
Expand All @@ -1310,9 +1335,8 @@ func acknowledgementsFromResultTx(dstChanId, dstPortId, srcChanId, srcPortId str
continue
}

if !rp.pass {
ackPackets = append(ackPackets, rp)
}
ackPackets = append(ackPackets, rp)

}

// If there is a relayPacket, return it
Expand Down Expand Up @@ -1818,13 +1842,14 @@ func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent {

for _, logs := range resp.Logs {
for _, event := range logs.Events {
for _, attr := range event.Attributes {
events = append(events, provider.RelayerEvent{
EventType: event.Type,
AttributeKey: attr.Key,
AttributeValue: attr.Value,
})
attributes := make(map[string]string)
for _, attribute := range event.Attributes {
attributes[attribute.Key] = attribute.Value
}
events = append(events, provider.RelayerEvent{
EventType: event.Type,
Attributes: attributes,
})
}
}
return events
Expand Down
11 changes: 6 additions & 5 deletions relayer/provider/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ func parseEventsFromResponseDeliverTx(resp abci.ResponseDeliverTx) []provider.Re
var events []provider.RelayerEvent

for _, event := range resp.Events {
attributes := make(map[string]string)
for _, attribute := range event.Attributes {
events = append(events, provider.RelayerEvent{
EventType: event.Type,
AttributeKey: string(attribute.Key),
AttributeValue: string(attribute.Value),
})
attributes[string(attribute.Key)] = string(attribute.Value)
}
events = append(events, provider.RelayerEvent{
EventType: event.Type,
Attributes: attributes,
})
}
return events
}
Expand Down
2 changes: 0 additions & 2 deletions relayer/provider/cosmos/relayer_packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ type relayMsgRecvPacket struct {
timeout clienttypes.Height
timeoutStamp uint64
dstComRes *chantypes.QueryPacketCommitmentResponse

pass bool
}

func (rp relayMsgRecvPacket) timeoutPacket() *relayMsgTimeout {
Expand Down
Loading

0 comments on commit d0cf6d6

Please sign in to comment.