Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finality listener manager #722

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions platform/common/utils/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package cache

import "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"

var logger = logging.MustGetLogger("fabric-sdk.cache")

type Map[K comparable, V any] interface {
Get(K) (V, bool)
Put(K, V)
Update(K, func(bool, V) (bool, V)) bool
Delete(...K)
Len() int
}

type rwLock interface {
Lock()
Unlock()
RLock()
RUnlock()
}

type noLock struct{}

func (l *noLock) Lock() {}
func (l *noLock) Unlock() {}
func (l *noLock) RLock() {}
func (l *noLock) RUnlock() {}
84 changes: 84 additions & 0 deletions platform/common/utils/cache/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package cache

import "fmt"

func evict[K comparable, V any](keys []K, m map[K]V, onEvict func(map[K]V)) {
evicted := make(map[K]V, len(keys))
for _, k := range keys {
if v, ok := m[k]; ok {
evicted[k] = v
delete(m, k)
} else {
logger.Debugf("No need to evict [%k]. Was already deleted.")
}
}
onEvict(evicted)
}

type evictionCache[K comparable, V any] struct {
m map[K]V
l rwLock
evictionPolicy EvictionPolicy[K]
}

func (c *evictionCache[K, V]) String() string {
return fmt.Sprintf("Content: [%v], Eviction policy: [%v]", c.m, c.evictionPolicy)
}

type EvictionPolicy[K comparable] interface {
// Push adds a key and must be invoked under write-lock
Push(K)
}

func (c *evictionCache[K, V]) Get(key K) (V, bool) {
c.l.RLock()
defer c.l.RUnlock()
v, ok := c.m[key]
return v, ok
}

func (c *evictionCache[K, V]) Put(key K, value V) {
c.l.Lock()
defer c.l.Unlock()
c.m[key] = value
// We assume that a value is always new for performance reasons.
// If we try to put again a value, this value will be put also in the LRU keys instead of just promoting the existing one.
// If we put this value c.cap times, then this will evict all other values.
c.evictionPolicy.Push(key)
}

func (c *evictionCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool {
c.l.Lock()
defer c.l.Unlock()
v, ok := c.m[key]
keep, newValue := f(ok, v)
if !keep {
delete(c.m, key)
} else {
c.m[key] = newValue
}
if !ok && keep {
c.evictionPolicy.Push(key)
}
return ok
}

func (c *evictionCache[K, V]) Delete(keys ...K) {
c.l.Lock()
defer c.l.Unlock()
for _, key := range keys {
delete(c.m, key)
}
}

func (c *evictionCache[K, V]) Len() int {
c.l.RLock()
defer c.l.RUnlock()
return len(c.m)
}
76 changes: 76 additions & 0 deletions platform/common/utils/cache/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package cache

import (
"fmt"
"sync"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
)

// NewLRUCache creates a cache with limited size with LRU eviction policy.
// It is guaranteed that at least size elements can be kept in the cache.
// The cache is cleaned up when the cache is full, i.e. it contains size + buffer elements
func NewLRUCache[K comparable, V any](size, buffer int, onEvict func(map[K]V)) *evictionCache[K, V] {
m := map[K]V{}
return &evictionCache[K, V]{
m: m,
l: &sync.RWMutex{},
evictionPolicy: NewLRUEviction(size, buffer, func(keys []K) { evict(keys, m, onEvict) }),
}
}

func NewLRUEviction[K comparable](size, buffer int, evict func([]K)) *lruEviction[K] {
return &lruEviction[K]{
size: size,
cap: size + buffer,
keys: make([]K, 0, size+buffer),
keySet: make(map[K]struct{}, size+buffer),
evict: evict,
}
}

type lruEviction[K comparable] struct {
// size is the minimum amount of entries guaranteed to be kept in cache.
size int
// cap + size is the maximum amount of entries that can be kept in cache. After that, a cleanup is invoked.
cap int
// keys keeps track of which keys should be evicted.
// The last element of the slice is the most recent one.
// Performance improvement: keep sliding index to avoid reallocating
keys []K
// keySet is for faster lookup whether a key exists
keySet map[K]struct{}
// evict is called when we evict
evict func([]K)
mu sync.Mutex
}

func (c *lruEviction[K]) Push(key K) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.keySet[key]; ok {
return
}
c.keySet[key] = struct{}{}
c.keys = append(c.keys, key)
if len(c.keys) <= c.cap {
return
}
logger.Debugf("Capacity of %d exceeded. Evicting old keys by shifting LRU keys keeping only the %d most recent ones", c.cap, c.size)
evicted := c.keys[0 : c.cap-c.size+1]
for _, k := range evicted {
delete(c.keySet, k)
}
c.evict(evicted)
c.keys = (c.keys)[c.cap-c.size+1:]
}

func (c *lruEviction[K]) String() string {
return fmt.Sprintf("Keys: [%v], KeySet: [%v]", c.keys, collections.Keys(c.keySet))
}
106 changes: 106 additions & 0 deletions platform/common/utils/cache/lru_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package cache_test

import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
)

func TestLRUSimple(t *testing.T) {
allEvicted := make(map[int]string)

c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) })

c.Put(1, "a")
c.Put(2, "b")
c.Put(3, "c")
c.Put(4, "d")
c.Put(5, "e")

assert.Equal(0, len(allEvicted))
assert.Equal(5, c.Len())
v, _ := c.Get(1)
assert.Equal("a", v)

c.Put(6, "f")
assert.Equal(map[int]string{1: "a", 2: "b", 3: "c"}, allEvicted)
assert.Equal(3, c.Len())
_, ok := c.Get(1)
assert.False(ok)
}

func TestLRUSameKey(t *testing.T) {
allEvicted := make(map[int]string)

c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) })

c.Put(1, "a")
c.Put(2, "b")
c.Put(3, "c")
c.Put(1, "d")
c.Put(1, "e")
c.Put(1, "f")
assert.Equal(0, len(allEvicted))
assert.Equal(3, c.Len())
v, _ := c.Get(1)
assert.Equal("f", v)

c.Put(4, "g")
c.Put(5, "h")
assert.Equal(0, len(allEvicted))
assert.Equal(5, c.Len())
v, _ = c.Get(1)
assert.Equal("f", v)

c.Put(6, "i")
assert.Equal(map[int]string{1: "f", 2: "b", 3: "c"}, allEvicted)
assert.Equal(3, c.Len())
_, ok := c.Get(1)
assert.False(ok)

allEvicted = map[int]string{}

c.Put(1, "j")
c.Put(2, "k")

assert.Equal(0, len(allEvicted))
assert.Equal(5, c.Len())
v, _ = c.Get(4)
assert.Equal("g", v)

c.Put(3, "l")

assert.Equal(map[int]string{4: "g", 5: "h", 6: "i"}, allEvicted)
assert.Equal(3, c.Len())
v, _ = c.Get(1)
assert.Equal("j", v)
}

func TestLRUParallel(t *testing.T) {
evictedCount := atomic.Int32{}
c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { evictedCount.Add(int32(len(evicted))) })

var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
c.Put(i, fmt.Sprintf("item-%d", i))
wg.Done()
}(i)
}
wg.Wait()

assert.Equal(4, c.Len())
assert.Equal(96, int(evictedCount.Load()))
}
59 changes: 59 additions & 0 deletions platform/common/utils/cache/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package cache

// NewMapCache creates a dummy implementation of the Cache interface.
// It is backed by a map with unlimited capacity.
func NewMapCache[K comparable, V any]() *mapCache[K, V] {
return &mapCache[K, V]{
m: map[K]V{},
l: &noLock{},
}
}

type mapCache[K comparable, V any] struct {
m map[K]V
l rwLock
}

func (c *mapCache[K, V]) Get(key K) (V, bool) {
c.l.RLock()
defer c.l.RUnlock()
v, ok := c.m[key]
return v, ok
}

func (c *mapCache[K, V]) Put(key K, value V) {
c.l.Lock()
defer c.l.Unlock()
c.m[key] = value
}

func (c *mapCache[K, V]) Delete(keys ...K) {
c.l.Lock()
defer c.l.Unlock()
for _, key := range keys {
delete(c.m, key)
}
}

func (c *mapCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool {
c.l.Lock()
defer c.l.Unlock()
v, ok := c.m[key]
keep, newValue := f(ok, v)
if !keep {
delete(c.m, key)
} else {
c.m[key] = newValue
}
return ok
}

func (c *mapCache[K, V]) Len() int {
return len(c.m)
}
Loading
Loading