From 802c54a2632211615fde158d98b125b7d1967c86 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Sat, 28 Dec 2024 21:38:27 +0200 Subject: [PATCH 1/2] Cache functionality and delivery-service-based finality listeners Signed-off-by: Alexandros Filios --- platform/common/utils/cache/cache.go | 33 +++ platform/common/utils/cache/eviction.go | 84 ++++++ platform/common/utils/cache/lru.go | 76 ++++++ platform/common/utils/cache/lru_test.go | 106 ++++++++ platform/common/utils/cache/map.go | 59 +++++ platform/common/utils/cache/timeout.go | 79 ++++++ platform/common/utils/cache/timeout_test.go | 76 ++++++ .../core/generic/committer/endorsertx.go | 9 +- .../fabric/core/generic/delivery/service.go | 41 ++- .../core/generic/finality/committerflm.go | 28 ++ .../core/generic/finality/deliveryflm.go | 106 ++++++++ .../core/generic/finality/listenermanager.go | 243 ++++++++++++++++++ 12 files changed, 934 insertions(+), 6 deletions(-) create mode 100644 platform/common/utils/cache/cache.go create mode 100644 platform/common/utils/cache/eviction.go create mode 100644 platform/common/utils/cache/lru.go create mode 100644 platform/common/utils/cache/lru_test.go create mode 100644 platform/common/utils/cache/map.go create mode 100644 platform/common/utils/cache/timeout.go create mode 100644 platform/common/utils/cache/timeout_test.go create mode 100644 platform/fabric/core/generic/finality/committerflm.go create mode 100644 platform/fabric/core/generic/finality/deliveryflm.go create mode 100644 platform/fabric/core/generic/finality/listenermanager.go diff --git a/platform/common/utils/cache/cache.go b/platform/common/utils/cache/cache.go new file mode 100644 index 000000000..106016863 --- /dev/null +++ b/platform/common/utils/cache/cache.go @@ -0,0 +1,33 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache + +import "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging" + +var logger = logging.MustGetLogger("fabric-sdk.cache") + +type Map[K comparable, V any] interface { + Get(K) (V, bool) + Put(K, V) + Update(K, func(bool, V) (bool, V)) bool + Delete(...K) + Len() int +} + +type rwLock interface { + Lock() + Unlock() + RLock() + RUnlock() +} + +type noLock struct{} + +func (l *noLock) Lock() {} +func (l *noLock) Unlock() {} +func (l *noLock) RLock() {} +func (l *noLock) RUnlock() {} diff --git a/platform/common/utils/cache/eviction.go b/platform/common/utils/cache/eviction.go new file mode 100644 index 000000000..63ef3525a --- /dev/null +++ b/platform/common/utils/cache/eviction.go @@ -0,0 +1,84 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache + +import "fmt" + +func evict[K comparable, V any](keys []K, m map[K]V, onEvict func(map[K]V)) { + evicted := make(map[K]V, len(keys)) + for _, k := range keys { + if v, ok := m[k]; ok { + evicted[k] = v + delete(m, k) + } else { + logger.Debugf("No need to evict [%k]. Was already deleted.") + } + } + onEvict(evicted) +} + +type evictionCache[K comparable, V any] struct { + m map[K]V + l rwLock + evictionPolicy EvictionPolicy[K] +} + +func (c *evictionCache[K, V]) String() string { + return fmt.Sprintf("Content: [%v], Eviction policy: [%v]", c.m, c.evictionPolicy) +} + +type EvictionPolicy[K comparable] interface { + // Push adds a key and must be invoked under write-lock + Push(K) +} + +func (c *evictionCache[K, V]) Get(key K) (V, bool) { + c.l.RLock() + defer c.l.RUnlock() + v, ok := c.m[key] + return v, ok +} + +func (c *evictionCache[K, V]) Put(key K, value V) { + c.l.Lock() + defer c.l.Unlock() + c.m[key] = value + // We assume that a value is always new for performance reasons. + // If we try to put again a value, this value will be put also in the LRU keys instead of just promoting the existing one. + // If we put this value c.cap times, then this will evict all other values. + c.evictionPolicy.Push(key) +} + +func (c *evictionCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool { + c.l.Lock() + defer c.l.Unlock() + v, ok := c.m[key] + keep, newValue := f(ok, v) + if !keep { + delete(c.m, key) + } else { + c.m[key] = newValue + } + if !ok && keep { + c.evictionPolicy.Push(key) + } + return ok +} + +func (c *evictionCache[K, V]) Delete(keys ...K) { + c.l.Lock() + defer c.l.Unlock() + for _, key := range keys { + delete(c.m, key) + } +} + +func (c *evictionCache[K, V]) Len() int { + c.l.RLock() + defer c.l.RUnlock() + return len(c.m) +} diff --git a/platform/common/utils/cache/lru.go b/platform/common/utils/cache/lru.go new file mode 100644 index 000000000..824cbc3a8 --- /dev/null +++ b/platform/common/utils/cache/lru.go @@ -0,0 +1,76 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache + +import ( + "fmt" + "sync" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" +) + +// NewLRUCache creates a cache with limited size with LRU eviction policy. +// It is guaranteed that at least size elements can be kept in the cache. +// The cache is cleaned up when the cache is full, i.e. it contains size + buffer elements +func NewLRUCache[K comparable, V any](size, buffer int, onEvict func(map[K]V)) *evictionCache[K, V] { + m := map[K]V{} + return &evictionCache[K, V]{ + m: m, + l: &sync.RWMutex{}, + evictionPolicy: NewLRUEviction(size, buffer, func(keys []K) { evict(keys, m, onEvict) }), + } +} + +func NewLRUEviction[K comparable](size, buffer int, evict func([]K)) *lruEviction[K] { + return &lruEviction[K]{ + size: size, + cap: size + buffer, + keys: make([]K, 0, size+buffer), + keySet: make(map[K]struct{}, size+buffer), + evict: evict, + } +} + +type lruEviction[K comparable] struct { + // size is the minimum amount of entries guaranteed to be kept in cache. + size int + // cap + size is the maximum amount of entries that can be kept in cache. After that, a cleanup is invoked. + cap int + // keys keeps track of which keys should be evicted. + // The last element of the slice is the most recent one. + // Performance improvement: keep sliding index to avoid reallocating + keys []K + // keySet is for faster lookup whether a key exists + keySet map[K]struct{} + // evict is called when we evict + evict func([]K) + mu sync.Mutex +} + +func (c *lruEviction[K]) Push(key K) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.keySet[key]; ok { + return + } + c.keySet[key] = struct{}{} + c.keys = append(c.keys, key) + if len(c.keys) <= c.cap { + return + } + logger.Debugf("Capacity of %d exceeded. Evicting old keys by shifting LRU keys keeping only the %d most recent ones", c.cap, c.size) + evicted := c.keys[0 : c.cap-c.size+1] + for _, k := range evicted { + delete(c.keySet, k) + } + c.evict(evicted) + c.keys = (c.keys)[c.cap-c.size+1:] +} + +func (c *lruEviction[K]) String() string { + return fmt.Sprintf("Keys: [%v], KeySet: [%v]", c.keys, collections.Keys(c.keySet)) +} diff --git a/platform/common/utils/cache/lru_test.go b/platform/common/utils/cache/lru_test.go new file mode 100644 index 000000000..91b1e29e8 --- /dev/null +++ b/platform/common/utils/cache/lru_test.go @@ -0,0 +1,106 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache_test + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert" +) + +func TestLRUSimple(t *testing.T) { + allEvicted := make(map[int]string) + + c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) }) + + c.Put(1, "a") + c.Put(2, "b") + c.Put(3, "c") + c.Put(4, "d") + c.Put(5, "e") + + assert.Equal(0, len(allEvicted)) + assert.Equal(5, c.Len()) + v, _ := c.Get(1) + assert.Equal("a", v) + + c.Put(6, "f") + assert.Equal(map[int]string{1: "a", 2: "b", 3: "c"}, allEvicted) + assert.Equal(3, c.Len()) + _, ok := c.Get(1) + assert.False(ok) +} + +func TestLRUSameKey(t *testing.T) { + allEvicted := make(map[int]string) + + c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) }) + + c.Put(1, "a") + c.Put(2, "b") + c.Put(3, "c") + c.Put(1, "d") + c.Put(1, "e") + c.Put(1, "f") + assert.Equal(0, len(allEvicted)) + assert.Equal(3, c.Len()) + v, _ := c.Get(1) + assert.Equal("f", v) + + c.Put(4, "g") + c.Put(5, "h") + assert.Equal(0, len(allEvicted)) + assert.Equal(5, c.Len()) + v, _ = c.Get(1) + assert.Equal("f", v) + + c.Put(6, "i") + assert.Equal(map[int]string{1: "f", 2: "b", 3: "c"}, allEvicted) + assert.Equal(3, c.Len()) + _, ok := c.Get(1) + assert.False(ok) + + allEvicted = map[int]string{} + + c.Put(1, "j") + c.Put(2, "k") + + assert.Equal(0, len(allEvicted)) + assert.Equal(5, c.Len()) + v, _ = c.Get(4) + assert.Equal("g", v) + + c.Put(3, "l") + + assert.Equal(map[int]string{4: "g", 5: "h", 6: "i"}, allEvicted) + assert.Equal(3, c.Len()) + v, _ = c.Get(1) + assert.Equal("j", v) +} + +func TestLRUParallel(t *testing.T) { + evictedCount := atomic.Int32{} + c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { evictedCount.Add(int32(len(evicted))) }) + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + c.Put(i, fmt.Sprintf("item-%d", i)) + wg.Done() + }(i) + } + wg.Wait() + + assert.Equal(4, c.Len()) + assert.Equal(96, int(evictedCount.Load())) +} diff --git a/platform/common/utils/cache/map.go b/platform/common/utils/cache/map.go new file mode 100644 index 000000000..1a6194e6c --- /dev/null +++ b/platform/common/utils/cache/map.go @@ -0,0 +1,59 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache + +// NewMapCache creates a dummy implementation of the Cache interface. +// It is backed by a map with unlimited capacity. +func NewMapCache[K comparable, V any]() *mapCache[K, V] { + return &mapCache[K, V]{ + m: map[K]V{}, + l: &noLock{}, + } +} + +type mapCache[K comparable, V any] struct { + m map[K]V + l rwLock +} + +func (c *mapCache[K, V]) Get(key K) (V, bool) { + c.l.RLock() + defer c.l.RUnlock() + v, ok := c.m[key] + return v, ok +} + +func (c *mapCache[K, V]) Put(key K, value V) { + c.l.Lock() + defer c.l.Unlock() + c.m[key] = value +} + +func (c *mapCache[K, V]) Delete(keys ...K) { + c.l.Lock() + defer c.l.Unlock() + for _, key := range keys { + delete(c.m, key) + } +} + +func (c *mapCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool { + c.l.Lock() + defer c.l.Unlock() + v, ok := c.m[key] + keep, newValue := f(ok, v) + if !keep { + delete(c.m, key) + } else { + c.m[key] = newValue + } + return ok +} + +func (c *mapCache[K, V]) Len() int { + return len(c.m) +} diff --git a/platform/common/utils/cache/timeout.go b/platform/common/utils/cache/timeout.go new file mode 100644 index 000000000..3592d9304 --- /dev/null +++ b/platform/common/utils/cache/timeout.go @@ -0,0 +1,79 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache + +import ( + "sync" + "time" +) + +// NewTimeoutCache creates a cache that keeps elements for evictionTimeout time. +// An element might return even if it is marked stale. +func NewTimeoutCache[K comparable, V any](evictionTimeout time.Duration, onEvict func(map[K]V)) *evictionCache[K, V] { + m := map[K]V{} + l := &sync.RWMutex{} + return &evictionCache[K, V]{ + m: m, + l: l, + evictionPolicy: NewTimeoutEviction(evictionTimeout, func(keys []K) { + logger.Debugf("Evicting stale keys: [%v]", keys) + l.Lock() + defer l.Unlock() + evict(keys, m, onEvict) + }), + } +} + +type timeoutEviction[K comparable] struct { + keys []timeoutEntry[K] + mu sync.RWMutex + evict func([]K) +} + +type timeoutEntry[K comparable] struct { + created time.Time + key K +} + +func NewTimeoutEviction[K comparable](timeout time.Duration, evict func([]K)) *timeoutEviction[K] { + e := &timeoutEviction[K]{ + keys: make([]timeoutEntry[K], 0), + evict: evict, + } + go e.cleanup(timeout) + return e +} + +func (e *timeoutEviction[K]) cleanup(timeout time.Duration) { + logger.Infof("Launch cleanup function with eviction timeout [%v]", timeout) + for range time.Tick(1 * time.Second) { + expiry := time.Now().Add(-timeout) + logger.Debugf("Cleanup invoked: evicting everything created after [%v]", expiry) + e.mu.RLock() + evicted := make([]K, 0) + for _, entry := range e.keys { + if entry.created.After(expiry) { + break + } + evicted = append(evicted, entry.key) + } + e.mu.RUnlock() + if len(evicted) > 0 { + e.mu.Lock() + e.keys = e.keys[len(evicted):] + e.mu.Unlock() + logger.Infof("Evicting %d entries", len(evicted)) + e.evict(evicted) + } + } +} + +func (e *timeoutEviction[K]) Push(key K) { + e.mu.Lock() + defer e.mu.Unlock() + e.keys = append(e.keys, timeoutEntry[K]{key: key, created: time.Now()}) +} diff --git a/platform/common/utils/cache/timeout_test.go b/platform/common/utils/cache/timeout_test.go new file mode 100644 index 000000000..d3442e12b --- /dev/null +++ b/platform/common/utils/cache/timeout_test.go @@ -0,0 +1,76 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cache_test + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert" +) + +func TestTimeoutSimple(t *testing.T) { + mu := sync.RWMutex{} + allEvicted := make(map[int]string) + done := make(chan struct{}) + + c := cache.NewTimeoutCache(2*time.Second, func(evicted map[int]string) { + mu.Lock() + collections.CopyMap(allEvicted, evicted) + mu.Unlock() + done <- struct{}{} + }) + + c.Put(1, "a") + c.Put(2, "b") + c.Put(3, "c") + c.Put(4, "d") + c.Put(5, "e") + + mu.RLock() + assert.Equal(0, len(allEvicted)) + assert.Equal(5, c.Len()) + v, _ := c.Get(1) + assert.Equal("a", v) + mu.RUnlock() + <-done + + mu.RLock() + time.Sleep(3 * time.Second) + assert.Equal(map[int]string{1: "a", 2: "b", 3: "c", 4: "d", 5: "e"}, allEvicted) + assert.Equal(0, c.Len()) + _, ok := c.Get(1) + assert.False(ok) + mu.RUnlock() +} + +func TestTimeoutParallel(t *testing.T) { + evictedCount := atomic.Int32{} + c := cache.NewTimeoutCache(2*time.Second, func(evicted map[int]string) { evictedCount.Add(int32(len(evicted))) }) + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + c.Put(i, fmt.Sprintf("item-%d", i)) + wg.Done() + }(i) + } + wg.Wait() + assert.Equal(100, c.Len()) + assert.Equal(0, int(evictedCount.Load())) + + time.Sleep(3 * time.Second) + + assert.Equal(0, c.Len()) + assert.Equal(100, int(evictedCount.Load())) +} diff --git a/platform/fabric/core/generic/committer/endorsertx.go b/platform/fabric/core/generic/committer/endorsertx.go index 2924fc4ec..5ef7bac40 100644 --- a/platform/fabric/core/generic/committer/endorsertx.go +++ b/platform/fabric/core/generic/committer/endorsertx.go @@ -61,15 +61,20 @@ func MapFinalityEvent(ctx context.Context, block *common.BlockMetadata, txNum dr } fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[txNum] + validationCode, validationMessage := MapValidationCode(int32(fabricValidationCode)) event := &FinalityEvent{ Ctx: ctx, TxID: txID, - ValidationCode: convertValidationCode(int32(fabricValidationCode)), - ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)], + ValidationCode: validationCode, + ValidationMessage: validationMessage, } return fabricValidationCode, event, nil } +func MapValidationCode(code int32) (int, string) { + return convertValidationCode(code), pb.TxValidationCode_name[code] +} + // GetChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode. func (c *Committer) GetChaincodeEvents(env *common.Envelope, blockNum driver2.BlockNum) error { chaincodeEvent, err := readChaincodeEvent(env, blockNum) diff --git a/platform/fabric/core/generic/delivery/service.go b/platform/fabric/core/generic/delivery/service.go index d338b042f..65d7a28ad 100644 --- a/platform/fabric/core/generic/delivery/service.go +++ b/platform/fabric/core/generic/delivery/service.go @@ -10,6 +10,7 @@ import ( "context" "time" + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/fabricutils" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/hash" @@ -129,9 +130,9 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver for i, tx := range block.Data.Data { validationCode := ValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[i] - if pb.TxValidationCode(validationCode) != pb.TxValidationCode_VALID { - continue - } + //if pb.TxValidationCode(validationCode) != pb.TxValidationCode_VALID { + // continue + //} _, _, channelHeader, err := fabricutils.UnmarshalTx(tx) if err != nil { logger.Errorf("[%s] unmarshal tx failed: %s", c.channel, err) @@ -147,7 +148,12 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver return false, err } - stop, err := callback(ptx) + stop, err := callback(&processedTransaction{ + txID: ptx.TxID(), + results: ptx.Results(), + vc: int32(validationCode), + env: ptx.Envelope(), + }) if err != nil { // if an error occurred, stop processing return false, err @@ -162,6 +168,33 @@ func (c *Service) Scan(ctx context.Context, txID string, callback driver.Deliver }) } +type processedTransaction struct { + txID driver2.TxID + results []byte + vc int32 + env []byte +} + +func (p *processedTransaction) TxID() string { + return p.txID +} + +func (p *processedTransaction) Results() []byte { + return p.results +} + +func (p *processedTransaction) IsValid() bool { + return p.vc == int32(pb.TxValidationCode_VALID) +} + +func (p *processedTransaction) Envelope() []byte { + return p.env +} + +func (p *processedTransaction) ValidationCode() int32 { + return p.vc +} + type fakeVault struct { txID string } diff --git a/platform/fabric/core/generic/finality/committerflm.go b/platform/fabric/core/generic/finality/committerflm.go new file mode 100644 index 000000000..167f99740 --- /dev/null +++ b/platform/fabric/core/generic/finality/committerflm.go @@ -0,0 +1,28 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package finality + +import ( + "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" +) + +type committerListenerManager struct { + committer *fabric.Committer +} + +func NewCommitterFLM(committer *fabric.Committer) *committerListenerManager { + return &committerListenerManager{committer: committer} +} + +func (m *committerListenerManager) AddFinalityListener(_ driver.Namespace, txID driver.TxID, listener fabric.FinalityListener) error { + return m.committer.AddFinalityListener(txID, listener) +} + +func (m *committerListenerManager) RemoveFinalityListener(txID string, listener fabric.FinalityListener) error { + return m.committer.RemoveFinalityListener(txID, listener) +} diff --git a/platform/fabric/core/generic/finality/deliveryflm.go b/platform/fabric/core/generic/finality/deliveryflm.go new file mode 100644 index 000000000..066453c88 --- /dev/null +++ b/platform/fabric/core/generic/finality/deliveryflm.go @@ -0,0 +1,106 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package finality + +import ( + "context" + + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/fabricutils" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/rwset" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/pkg/errors" + "go.opentelemetry.io/otel/trace/noop" +) + +type txInfo struct { + txID driver2.TxID + status fabric.ValidationCode + message string +} + +func (i txInfo) TxID() driver2.TxID { + return i.txID +} + +type deliveryListenerEntry struct { + l fabric.FinalityListener +} + +func (e *deliveryListenerEntry) OnStatus(ctx context.Context, info txInfo) { + e.l.OnStatus(ctx, info.txID, info.status, info.message) +} + +func (e *deliveryListenerEntry) Equals(other ListenerEntry[txInfo]) bool { + return other != nil && other.(*deliveryListenerEntry).l == e.l +} + +func NewDeliveryFLM(config DeliveryListenerManagerConfig, network string, ch *fabric.Channel) (*deliveryListenerManager, error) { + flm, err := NewListenerManager[txInfo](config, ch.Delivery(), &noop.Tracer{}, &txInfoMapper{network: network}) + if err != nil { + return nil, err + } + return &deliveryListenerManager{flm}, nil +} + +type deliveryListenerManager struct { + flm *listenerManager[txInfo] +} + +func (m *deliveryListenerManager) AddFinalityListener(txID string, listener fabric.FinalityListener) error { + return m.flm.AddFinalityListener(txID, &deliveryListenerEntry{listener}) +} + +func (m *deliveryListenerManager) RemoveFinalityListener(txID string, listener fabric.FinalityListener) error { + return m.flm.RemoveFinalityListener(txID, &deliveryListenerEntry{listener}) +} + +type txInfoMapper struct { + network string +} + +func (m *txInfoMapper) MapTxData(ctx context.Context, tx []byte, block *common.BlockMetadata, blockNum driver2.BlockNum, txNum driver2.TxNum) (map[driver2.Namespace]txInfo, error) { + _, payl, chdr, err := fabricutils.UnmarshalTx(tx) + if err != nil { + return nil, errors.Wrapf(err, "failed unmarshaling tx [%d:%d]", blockNum, txNum) + } + if common.HeaderType(chdr.Type) != common.HeaderType_ENDORSER_TRANSACTION { + logger.Warnf("Type of TX [%d:%d] is [%d]. Skipping...", blockNum, txNum, chdr.Type) + return nil, nil + } + rwSet, err := rwset.NewEndorserTransactionReader(m.network).Read(payl, chdr) + if err != nil { + return nil, errors.Wrapf(err, "failed extracting rwset") + } + _, finalityEvent, err := committer.MapFinalityEvent(ctx, block, txNum, chdr.TxId) + if err != nil { + return nil, errors.Wrapf(err, "failed mapping finality event") + } + + txInfos := make(map[driver2.Namespace]txInfo, len(rwSet.WriteSet.Writes)) + logger.Infof("TX [%s] has %d namespaces", chdr.TxId, len(rwSet.WriteSet.Writes)) + for ns, write := range rwSet.WriteSet.Writes { + logger.Infof("TX [%s:%s] has %d writes", chdr.TxId, ns, len(write)) + txInfos[ns] = txInfo{ + txID: chdr.TxId, + status: finalityEvent.ValidationCode, + message: finalityEvent.ValidationMessage, + } + } + return txInfos, nil +} + +func (m *txInfoMapper) MapProcessedTx(tx *fabric.ProcessedTransaction) ([]txInfo, error) { + status, message := committer.MapValidationCode(tx.ValidationCode()) + return []txInfo{{ + txID: tx.TxID(), + status: status, + message: message, + }}, nil +} diff --git a/platform/fabric/core/generic/finality/listenermanager.go b/platform/fabric/core/generic/finality/listenermanager.go new file mode 100644 index 000000000..f511e98af --- /dev/null +++ b/platform/fabric/core/generic/finality/listenermanager.go @@ -0,0 +1,243 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package finality + +import ( + "context" + "sync" + "time" + + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/pkg/errors" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +type TxInfo interface { + TxID() driver2.TxID +} + +type TxInfoMapper[T TxInfo] interface { + MapTxData(ctx context.Context, tx []byte, block *common.BlockMetadata, blockNum driver2.BlockNum, txNum driver2.TxNum) (map[driver2.Namespace]T, error) + MapProcessedTx(tx *fabric.ProcessedTransaction) ([]T, error) +} + +type ListenerEntry[T TxInfo] interface { + // OnStatus is the callback for the transaction + OnStatus(ctx context.Context, info T) + // Equals compares a listener entry for the delition + Equals(other ListenerEntry[T]) bool +} + +type ListenerManager[T TxInfo] interface { + AddFinalityListener(txID string, e ListenerEntry[T]) error + RemoveFinalityListener(txID string, e ListenerEntry[T]) error +} + +type TxInfoCallback[T TxInfo] func(T) error + +type DeliveryListenerManagerConfig struct { + MapperParallelism int + ListenerTimeout time.Duration + LRUSize int + LRUBuffer int +} + +func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery *fabric.Delivery, tracer trace.Tracer, mapper TxInfoMapper[T]) (*listenerManager[T], error) { + var listeners cache.Map[driver2.TxID, []ListenerEntry[T]] + if config.ListenerTimeout > 0 { + listeners = cache.NewTimeoutCache[driver2.TxID, []ListenerEntry[T]](config.ListenerTimeout, func(evicted map[driver2.TxID][]ListenerEntry[T]) { + logger.Debugf("Listeners for TXs [%v] timed out. Either the TX finality is too slow or it reached finality too long ago and were evicted from the txInfos cache. The IDs will be queried directly from ledger...", collections.Keys(evicted)) + fetchTxs(evicted, mapper, delivery) + }) + } else { + listeners = cache.NewMapCache[driver2.TxID, []ListenerEntry[T]]() + } + + var txInfos cache.Map[driver2.TxID, T] + if config.LRUSize > 0 && config.LRUBuffer > 0 { + txInfos = cache.NewLRUCache[driver2.TxID, T](10, 2, func(evicted map[driver2.TxID]T) { + logger.Debugf("Evicted keys [%v]. If they are looked up, they will be fetched directly from the ledger from now on...", collections.Keys(evicted)) + }) + } else { + txInfos = cache.NewMapCache[driver2.TxID, T]() + } + flm := &listenerManager[T]{ + mapper: ¶llelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper}, + tracer: tracer, + listeners: listeners, + txInfos: txInfos, + ignoreBlockErrors: true, + delivery: delivery, + } + logger.Infof("Starting delivery service...") + go flm.start() + + return flm, nil +} + +func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxInfoMapper[T], delivery *fabric.Delivery) { + for txID, listeners := range evicted { + go func(txID driver2.TxID, listeners []ListenerEntry[T]) { + _ = delivery.Scan(context.TODO(), txID, func(tx *fabric.ProcessedTransaction) (bool, error) { + if tx.TxID() != txID { + return false, nil + } + logger.Debugf("Received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results())) + infos, err := mapper.MapProcessedTx(tx) + if err != nil { + logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err) + return true, err + } + for _, info := range infos { + for _, listener := range listeners { + go listener.OnStatus(context.TODO(), info) + } + } + return true, nil + }) + }(txID, listeners) + } +} + +func (m *listenerManager[T]) start() { + // In case the delivery service fails, it will try to reconnect automatically. + err := m.delivery.ScanBlock(context.Background(), func(ctx context.Context, block *common.Block) (bool, error) { + err := m.onBlock(ctx, block) + return !m.ignoreBlockErrors && err != nil, err + }) + logger.Errorf("failed running delivery: %v", err) +} + +type listenerManager[T TxInfo] struct { + tracer trace.Tracer + mapper *parallelBlockMapper[T] + + mu sync.RWMutex + listeners cache.Map[driver2.TxID, []ListenerEntry[T]] + txInfos cache.Map[driver2.TxID, T] + delivery *fabric.Delivery + ignoreBlockErrors bool +} + +func (m *listenerManager[T]) onBlock(ctx context.Context, block *common.Block) error { + logger.Debugf("New block with %d txs detected [%d]", len(block.Data.Data), block.Header.Number) + + txs, err := m.mapper.Map(ctx, block) + if err != nil { + logger.Errorf("failed to process block [%d]: %v", block.Header.Number, err) + return errors.Wrapf(err, "failed to process block [%d]", block.Header.Number) + } + + invokedTxIDs := make([]driver2.TxID, 0) + + m.mu.Lock() + defer m.mu.Unlock() + + for _, txInfos := range txs { + for ns, info := range txInfos { + logger.Debugf("Look for listeners of [%s:%s]", ns, info.TxID()) + // We expect there to be only one namespace. + // The complexity is better with a deliveryListenerEntry slice (because of the write operations) + // If more namespaces are expected, it is worth switching to a map. + listeners, ok := m.listeners.Get(info.TxID()) + if ok { + invokedTxIDs = append(invokedTxIDs, info.TxID()) + } + logger.Debugf("Invoking %d listeners for [%s]", len(listeners), info.TxID()) + for _, entry := range listeners { + go entry.OnStatus(ctx, info) + } + } + } + logger.Debugf("Invoked listeners for %d TxIDs: [%v]. Removing listeners...", len(invokedTxIDs), invokedTxIDs) + + for _, txInfos := range txs { + for ns, info := range txInfos { + logger.Debugf("Mapping for ns [%s]", ns) + m.txInfos.Put(info.TxID(), info) + } + } + logger.Debugf("Current size of cache: %d", m.txInfos.Len()) + + m.listeners.Delete(invokedTxIDs...) + logger.Debugf("Removed listeners for %d invoked TxIDs: %v", len(invokedTxIDs), invokedTxIDs) + + return nil +} + +func (m *listenerManager[T]) AddFinalityListener(txID string, e ListenerEntry[T]) error { + m.mu.RLock() + if txInfo, ok := m.txInfos.Get(txID); ok { + defer m.mu.RUnlock() + logger.Debugf("Found tx [%s]. Invoking listener directly", txID) + go e.OnStatus(context.TODO(), txInfo) + return nil + } + m.mu.RUnlock() + m.mu.Lock() + logger.Debugf("Checking if value has been added meanwhile for [%s]", txID) + defer m.mu.Unlock() + if txInfo, ok := m.txInfos.Get(txID); ok { + logger.Debugf("Found tx [%s]! Invoking listener directly", txID) + go e.OnStatus(context.TODO(), txInfo) + return nil + } + logger.Debugf("Value not found. Appending listener for [%s]", txID) + m.listeners.Update(txID, func(_ bool, listeners []ListenerEntry[T]) (bool, []ListenerEntry[T]) { + return true, append(listeners, e) + }) + return nil +} + +func (m *listenerManager[T]) RemoveFinalityListener(txID string, e ListenerEntry[T]) error { + logger.Debugf("Manually invoked listener removal for [%s]", txID) + m.mu.Lock() + defer m.mu.Unlock() + ok := m.listeners.Update(txID, func(_ bool, listeners []ListenerEntry[T]) (bool, []ListenerEntry[T]) { + for i, entry := range listeners { + if entry.Equals(e) { + listeners = append(listeners[:i], listeners[i+1:]...) + } + } + return len(listeners) > 0, listeners + }) + if ok { + return nil + } + return errors.Errorf("could not find listener [%v] in txid [%s]", e, txID) +} + +type parallelBlockMapper[T TxInfo] struct { + mapper TxInfoMapper[T] + cap int +} + +func (m *parallelBlockMapper[T]) Map(ctx context.Context, block *common.Block) ([]map[driver2.Namespace]T, error) { + logger.Debugf("Mapping block [%d]", block.Header.Number) + eg := errgroup.Group{} + eg.SetLimit(m.cap) + results := make([]map[driver2.Namespace]T, len(block.Data.Data)) + for i, tx := range block.Data.Data { + eg.Go(func() error { + event, err := m.mapper.MapTxData(ctx, tx, block.Metadata, block.Header.Number, driver2.TxNum(i)) + if err != nil { + return err + } + results[i] = event + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + return results, nil +} From ba6ea52b478417b37491e88729c472a1f50e1e97 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Mon, 6 Jan 2025 13:57:03 +0200 Subject: [PATCH 2/2] Allow parallel block processing Signed-off-by: Alexandros Filios --- .../core/generic/finality/listenermanager.go | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/platform/fabric/core/generic/finality/listenermanager.go b/platform/fabric/core/generic/finality/listenermanager.go index f511e98af..6395630d8 100644 --- a/platform/fabric/core/generic/finality/listenermanager.go +++ b/platform/fabric/core/generic/finality/listenermanager.go @@ -45,10 +45,11 @@ type ListenerManager[T TxInfo] interface { type TxInfoCallback[T TxInfo] func(T) error type DeliveryListenerManagerConfig struct { - MapperParallelism int - ListenerTimeout time.Duration - LRUSize int - LRUBuffer int + MapperParallelism int + BlockProcessParallelism int + ListenerTimeout time.Duration + LRUSize int + LRUBuffer int } func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery *fabric.Delivery, tracer trace.Tracer, mapper TxInfoMapper[T]) (*listenerManager[T], error) { @@ -71,12 +72,13 @@ func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery txInfos = cache.NewMapCache[driver2.TxID, T]() } flm := &listenerManager[T]{ - mapper: ¶llelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper}, - tracer: tracer, - listeners: listeners, - txInfos: txInfos, - ignoreBlockErrors: true, - delivery: delivery, + mapper: ¶llelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper}, + tracer: tracer, + listeners: listeners, + txInfos: txInfos, + ignoreBlockErrors: true, + blockProcessingParallelism: config.BlockProcessParallelism, + delivery: delivery, } logger.Infof("Starting delivery service...") go flm.start() @@ -110,22 +112,40 @@ func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxIn func (m *listenerManager[T]) start() { // In case the delivery service fails, it will try to reconnect automatically. - err := m.delivery.ScanBlock(context.Background(), func(ctx context.Context, block *common.Block) (bool, error) { - err := m.onBlock(ctx, block) - return !m.ignoreBlockErrors && err != nil, err - }) + err := m.delivery.ScanBlock(context.Background(), m.newBlockCallback()) logger.Errorf("failed running delivery: %v", err) } +func (m *listenerManager[T]) newBlockCallback() fabric.BlockCallback { + if m.blockProcessingParallelism <= 1 { + return func(ctx context.Context, block *common.Block) (bool, error) { + err := m.onBlock(ctx, block) + return !m.ignoreBlockErrors && err != nil, err + } + } + eg := errgroup.Group{} + eg.SetLimit(m.blockProcessingParallelism) + return func(ctx context.Context, block *common.Block) (bool, error) { + eg.Go(func() error { + if err := m.onBlock(ctx, block); err != nil { + logger.Warnf("Mapping block [%d] errored: %v", block.Header.Number, err) + } + return nil + }) + return false, nil + } +} + type listenerManager[T TxInfo] struct { tracer trace.Tracer mapper *parallelBlockMapper[T] - mu sync.RWMutex - listeners cache.Map[driver2.TxID, []ListenerEntry[T]] - txInfos cache.Map[driver2.TxID, T] - delivery *fabric.Delivery - ignoreBlockErrors bool + mu sync.RWMutex + listeners cache.Map[driver2.TxID, []ListenerEntry[T]] + txInfos cache.Map[driver2.TxID, T] + delivery *fabric.Delivery + blockProcessingParallelism int + ignoreBlockErrors bool } func (m *listenerManager[T]) onBlock(ctx context.Context, block *common.Block) error {