Skip to content

Commit

Permalink
add custom fallback functions to the finality listener managers
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <[email protected]>
  • Loading branch information
adecaro committed Feb 4, 2025
1 parent 38a48b3 commit fb7e21c
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/gobuffalo/packr/v2 v2.7.1
github.com/hashicorp/go-uuid v1.0.3
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250131192054-84be834f3a67
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250204104331-6146550b7285
github.com/hyperledger-labs/orion-sdk-go v0.2.10
github.com/hyperledger-labs/orion-server v0.2.10
github.com/hyperledger/fabric v1.4.0-rc1.0.20230405174026-695dd57e01c2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,8 @@ github.com/hidal-go/hidalgo v0.0.0-20201109092204-05749a6d73df/go.mod h1:bPkrxDl
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250131192054-84be834f3a67 h1:mFVLnRAX35HX+eme3avFXJEe28rIoT7HNMj2egN6lng=
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250131192054-84be834f3a67/go.mod h1:+yKIFyakvYG5/OL1n38u/xQzqm2CmgWz0o7OkPs6l8M=
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250204104331-6146550b7285 h1:l3FWLX0zGXmUdExed6FoBlMWdcDoX56B+wRLNCsPHBg=
github.com/hyperledger-labs/fabric-smart-client v0.4.1-0.20250204104331-6146550b7285/go.mod h1:+yKIFyakvYG5/OL1n38u/xQzqm2CmgWz0o7OkPs6l8M=
github.com/hyperledger-labs/orion-sdk-go v0.2.10 h1:lFgWgxyvngIhWnIqymYGBmtmq9D6uC5d0uLG9cbyh5s=
github.com/hyperledger-labs/orion-sdk-go v0.2.10/go.mod h1:iN2xZB964AqwVJwL+EnwPOs8z1EkMEbbIg/qYeC7gDY=
github.com/hyperledger-labs/orion-server v0.2.10 h1:G4zbQEL5Egk0Oj+TwHCZWdTOLDBHOjaAEvYOT4G7ozw=
Expand Down
21 changes: 18 additions & 3 deletions token/services/network/fabric/finality/deliveryflm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type listenerEntry struct {
listener driver.FinalityListener
}

func (e *listenerEntry) Namespace() driver2.Namespace {
return e.namespace
}

func (e *listenerEntry) OnStatus(ctx context.Context, info TxInfo) {
if len(e.namespace) == 0 || len(info.Namespace) == 0 || e.namespace == info.Namespace {
e.listener.OnStatus(ctx, info.TxId, info.Status, info.Message, info.RequestHash)
Expand Down Expand Up @@ -90,9 +94,20 @@ func (p *deliveryBasedFLMProvider) NewManager(network, channel string) (Listener
if err != nil {
return nil, err
}
flm, err := finality.NewListenerManager[TxInfo](p.config, ch.Delivery(), p.tracerProvider.Tracer("finality_listener_manager", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: network,
})), p.newMapper(network, channel))
mapper := p.newMapper(network, channel)
flm, err := finality.NewListenerManager[TxInfo](
p.config,
ch.Delivery(),
&DeliveryScanQueryByID{
Delivery: ch.Delivery(),
Ledger: ch.Ledger(),
Mapper: mapper,
},
p.tracerProvider.Tracer("finality_listener_manager", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: network,
})),
mapper,
)
if err != nil {
return nil, err
}
Expand Down
85 changes: 85 additions & 0 deletions token/services/network/fabric/finality/deliveryqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package finality

import (
"context"
"fmt"
"strings"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/finality"
)

type DeliveryScanQueryByID struct {
Delivery *fabric.Delivery
Ledger *fabric.Ledger
Mapper finality.TxInfoMapper[TxInfo]
}

func (q *DeliveryScanQueryByID) QueryByID(evicted map[driver.TxID][]finality.ListenerEntry[TxInfo]) (<-chan []TxInfo, error) {
txIDs := collections.Keys(evicted)
logger.Debugf("Launching routine to scan for txs [%v]", txIDs)

results := collections.NewSet(txIDs...)
ch := make(chan []TxInfo, len(txIDs))

// for each txID, fetch the corresponding transaction.
// if the transaction is not found, start a delivery for it
for _, txID := range txIDs {
logger.Debugf("loading transaction [%s] from ledger...", txID)
pt, err := q.Ledger.GetTransactionByID(txID)
if err == nil {
logger.Debugf("transaction [%s] found on ledger", txID)
infos, err := q.Mapper.MapProcessedTx(pt)
if err != nil {
return nil, errors.Wrapf(err, "failed to map tx [%s]", txID)
}
ch <- infos
continue
}

// which kind of error do we have here?
if strings.Contains(err.Error(), fmt.Sprintf("TXID [%s] not available", txID)) {
// transaction was not found
logger.Errorf("tx [%s] not found on the ledger [%s], start a scan for future transactions", txID, err)
// start delivery for the future
// TODO: find a better starting point
err := q.Delivery.Scan(context.TODO(), "", func(tx *fabric.ProcessedTransaction) (bool, error) {
if !results.Contains(tx.TxID()) {
return false, nil
}

logger.Debugf("Received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
infos, err := q.Mapper.MapProcessedTx(tx)
if err != nil {
logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err)
return true, err
}
ch <- infos
results.Remove(tx.TxID())

return results.Length() == 0, nil
})
if err != nil {
logger.Errorf("Failed scanning: %v", err)
return nil, err
}

continue
}

// error not recoverable, fail
logger.Debugf("scan for tx [%s] failed with err [%s]", txID, err)
return nil, errors.Wrapf(err, "failed scanning tx [%s]", txID)
}

return ch, nil
}
18 changes: 15 additions & 3 deletions token/services/network/fabric/lookup/deliveryllm.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type listenerEntry struct {
listener Listener
}

func (e *listenerEntry) Namespace() driver2.Namespace {
return e.namespace
}

func (e *listenerEntry) OnStatus(ctx context.Context, info TxInfo) {
logger.Debugf("notify info [%v] to namespace [%s]", info, e.namespace)
if len(e.namespace) == 0 || len(info.Namespace) == 0 || e.namespace == info.Namespace {
Expand Down Expand Up @@ -100,9 +104,17 @@ func (p *deliveryBasedLLMProvider) NewManager(network, channel string) (Listener
if err != nil {
return nil, err
}
flm, err := finality.NewListenerManager[TxInfo](p.config, ch.Delivery(), p.tracerProvider.Tracer("finality_listener_manager", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: network,
})), p.newMapper(network, channel))
flm, err := finality.NewListenerManager[TxInfo](
p.config,
ch.Delivery(),
&DeliveryScanQueryByID{
Channel: ch,
},
p.tracerProvider.Tracer("finality_listener_manager", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: network,
})),
p.newMapper(network, channel),
)
if err != nil {
return nil, err
}
Expand Down
70 changes: 70 additions & 0 deletions token/services/network/fabric/lookup/deliveryqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package lookup

import (
"encoding/json"

driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/finality"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/fabric/tcc"
"github.com/pkg/errors"
)

const (
QueryStates = tcc.QueryStates
)

type DeliveryScanQueryByID struct {
Channel *fabric.Channel
}

func (q *DeliveryScanQueryByID) QueryByID(evicted map[driver2.TxID][]finality.ListenerEntry[TxInfo]) (<-chan []TxInfo, error) {
// collects keys by namespace
keysByNS := map[driver2.Namespace][]string{}
for k, v := range evicted {
ns := v[0].Namespace()
_, ok := keysByNS[ns]
if !ok {
keysByNS[ns] = []string{}
}
keysByNS[ns] = append(keysByNS[ns], k)
}

ch := make(chan []TxInfo, len(evicted))
// for each namespace, have a call to the token chaincode
for ns, keys := range keysByNS {
arg, err := json.Marshal(keys)
if err != nil {
return nil, errors.Wrapf(err, "failed marshalling args for query by ids [%v]", keys)
}

logger.Debugf("querying chaincode [%s] for the states of ids [%v]", ns, keys)
chaincode := q.Channel.Chaincode(ns)
res, err := chaincode.Query(QueryStates, arg).Query()
if err != nil {
return nil, errors.Wrapf(err, "failed querying by ids [%v]", keys)
}
values := make([][]byte, 0, len(keys))
err = json.Unmarshal(res, &values)
if err != nil {
return nil, errors.Wrapf(err, "failed unmarshalling results for query by ids [%v]", keys)
}
infos := make([]TxInfo, 0, len(values))
for i, value := range values {
infos = append(infos, TxInfo{
Namespace: ns,
Key: keys[i],
Value: value,
})
}
ch <- infos
}

return ch, nil
}

0 comments on commit fb7e21c

Please sign in to comment.