Skip to content

Commit

Permalink
fixup! Cache functionality and delivery-service-based finality listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jan 5, 2025
1 parent 345b34f commit 195f8be
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 25 deletions.
15 changes: 3 additions & 12 deletions platform/fabric/core/generic/delivery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package delivery

import (
"context"
"sync/atomic"
"time"

driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
Expand Down Expand Up @@ -124,29 +123,21 @@ func (c *Service) ScanBlock(ctx context.Context, callback driver.BlockCallback)
return c.scanBlock(ctx, &fakeVault{}, callback)
}

var ctr = &atomic.Uint32{}

func (c *Service) Scan(ctx context.Context, txID string, callback driver.DeliveryCallback) error {
localCtr := ctr.Add(1)
defer logger.Infof("Local thread [%d] finished scanning for [%s]", localCtr, txID)
logger.Infof("Start looking for [%s] on thread [%d]", txID, localCtr)
vault := &fakeVault{txID: txID}
return c.scanBlock(ctx, vault,
func(_ context.Context, block *common.Block) (bool, error) {
for i, tx := range block.Data.Data {
logger.Infof("Local thread [%d] checking tx [%d:%d] while looking for [%s]", localCtr, block.Header.Number, i, txID)
validationCode := ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[i]

if pb.TxValidationCode(validationCode) != pb.TxValidationCode_VALID {
logger.Infof("tx [%d:%d] is invalid with code [%d]", block.Header.Number, i, validationCode)
// continue
}
//if pb.TxValidationCode(validationCode) != pb.TxValidationCode_VALID {
// continue
//}
_, _, channelHeader, err := fabricutils.UnmarshalTx(tx)
if err != nil {
logger.Errorf("[%s] unmarshal tx failed: %s", c.channel, err)
return false, err
}
logger.Infof("Checking tx [%d:%d, %s]", block.Header.Number, i, channelHeader.TxId)

if common.HeaderType(channelHeader.Type) != common.HeaderType_ENDORSER_TRANSACTION {
continue
Expand Down
1 change: 0 additions & 1 deletion platform/fabric/core/generic/finality/deliveryflm.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (m *txInfoMapper) MapTxData(ctx context.Context, tx []byte, block *common.B

func (m *txInfoMapper) MapProcessedTx(tx *fabric.ProcessedTransaction) ([]txInfo, error) {
status, message := committer.MapValidationCode(tx.ValidationCode())
logger.Infof("Mapping processed tx [%s]: %v", tx.TxID(), status)
return []txInfo{{
txID: tx.TxID(),
status: status,
Expand Down
19 changes: 7 additions & 12 deletions platform/fabric/core/generic/finality/listenermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,21 @@ func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxIn
for txID, listeners := range evicted {
go delivery.Scan(context.TODO(), txID, func(tx *fabric.ProcessedTransaction) (bool, error) {

Check failure on line 89 in platform/fabric/core/generic/finality/listenermanager.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `delivery.Scan` is not checked (errcheck)
if tx.TxID() != txID {
logger.Infof("Received result for tx [%s, %v, %d]. discarding...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
return false, nil
}
logger.Infof("Received result for tx [%s, %v, %d]. keeping...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
logger.Debugf("Received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results()))
infos, err := mapper.MapProcessedTx(tx)
if err != nil {
logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err)
return true, err
}
logger.Infof("Mapped result: %v", infos)
for _, info := range infos {
for _, listener := range listeners {
go listener.OnStatus(context.TODO(), info)
}
}
return true, nil
})
//if err != nil {
// logger.Infof("error fetching tx [%s]: %v", txID, err)
//}
}
}

Expand Down Expand Up @@ -187,28 +182,28 @@ func (m *listenerManager[T]) AddFinalityListener(txID string, e ListenerEntry[T]
m.mu.RLock()
if txInfo, ok := m.txInfos.Get(txID); ok {
defer m.mu.RUnlock()
logger.Infof("Found tx [%s]. Invoking listener directly", txID)
logger.Debugf("Found tx [%s]. Invoking listener directly", txID)
go e.OnStatus(context.TODO(), txInfo)
return nil
}
m.mu.RUnlock()
m.mu.Lock()
logger.Infof("Checking if value has been added meanwhile for [%s]", txID)
logger.Debugf("Checking if value has been added meanwhile for [%s]", txID)
defer m.mu.Unlock()
if txInfo, ok := m.txInfos.Get(txID); ok {
logger.Infof("Found tx [%s]! Invoking listener directly", txID)
logger.Debugf("Found tx [%s]! Invoking listener directly", txID)
go e.OnStatus(context.TODO(), txInfo)
return nil
}
logger.Infof("Value not found. Appending listener for [%s]", txID)
logger.Debugf("Value not found. Appending listener for [%s]", txID)
m.listeners.Update(txID, func(_ bool, listeners []ListenerEntry[T]) (bool, []ListenerEntry[T]) {
return true, append(listeners, e)
})
return nil
}

func (m *listenerManager[T]) RemoveFinalityListener(txID string, e ListenerEntry[T]) error {
logger.Infof("Manually invoked listener removal for [%s]", txID)
logger.Debugf("Manually invoked listener removal for [%s]", txID)
m.mu.Lock()
defer m.mu.Unlock()
ok := m.listeners.Update(txID, func(_ bool, listeners []ListenerEntry[T]) (bool, []ListenerEntry[T]) {
Expand All @@ -231,7 +226,7 @@ type parallelBlockMapper[T TxInfo] struct {
}

func (m *parallelBlockMapper[T]) Map(ctx context.Context, block *common.Block) ([]map[driver2.Namespace]T, error) {
logger.Infof("Mapping block [%d]", block.Header.Number)
logger.Debugf("Mapping block [%d]", block.Header.Number)
eg := errgroup.Group{}
eg.SetLimit(m.cap)
results := make([]map[driver2.Namespace]T, len(block.Data.Data))
Expand Down

0 comments on commit 195f8be

Please sign in to comment.