Skip to content

Commit

Permalink
node: Support unloading stream view for inactive streams (#79)
Browse files Browse the repository at this point in the history
- unload stream views in cache for inactive/cold streams that have subscribers
- Add prometheus metrics to stream cache to trace the total number and number of unloaded streams
  • Loading branch information
bas-vk authored May 29, 2024
1 parent 8ed45c5 commit f10e0ef
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 104 deletions.
17 changes: 9 additions & 8 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (s *streamImpl) loadInternal(ctx context.Context) error {
if AsRiverError(err).Code == Err_NOT_FOUND {
return s.initFromBlockchain(ctx)
}

return err
}

Expand Down Expand Up @@ -436,20 +435,21 @@ func (s *streamImpl) tryGetView() StreamView {
}
}

// tryCleanup unloads its internal view when s haven't got activity within the given expiration period.
// It returns true when the view is unloaded
func (s *streamImpl) tryCleanup(expiration time.Duration) bool {
s.mu.Lock()
defer s.mu.Unlock()

expired := time.Since(s.lastAccessedTime) >= expiration

// return immediately if the view is already purged or if the mini block creation routine
// is running for this stream
// return immediately if the view is already purged or if the mini block creation routine is running for this stream
if s.view == nil {
return false
return true
}

// only unload if there is no-one is listing to this stream and there are no events in the minipool.
if expired && (s.receivers == nil || s.receivers.Cardinality() == 0) && s.view.minipool.events.Len() == 0 {
expired := time.Since(s.lastAccessedTime) >= expiration

// unload if there is no activity within expiration
if expired && s.view.minipool.events.Len() == 0 {
s.view = nil
return true
}
Expand Down Expand Up @@ -676,6 +676,7 @@ func (s *streamImpl) ForceFlush(ctx context.Context) {
func (s *streamImpl) canCreateMiniblock() bool {
s.mu.RLock()
defer s.mu.RUnlock()

// Loaded, has events in minipool, fake leader and periodic miniblock creation is not disabled in test settings.
return s.view != nil &&
s.view.minipool.events.Len() > 0 &&
Expand Down
87 changes: 74 additions & 13 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/river-build/river/core/node/infra"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/config"
"github.com/river-build/river/core/node/contracts"
Expand All @@ -23,6 +26,17 @@ const (
MiniblockCandidateBatchSize = 50
)

var (
streamCacheSizeGauge = infra.NewGaugeVec(
"stream_cache_size", "Number of streams in stream cache",
"chain_id", "address",
)
streamCacheUnloadedGauge = infra.NewGaugeVec(
"stream_cache_unloaded", "Number of unloaded streams in stream cache",
"chain_id", "address",
)
)

type StreamCacheParams struct {
Storage storage.StreamStorage
Wallet *crypto.Wallet
Expand All @@ -33,6 +47,7 @@ type StreamCacheParams struct {

type StreamCache interface {
GetStream(ctx context.Context, streamId StreamId) (SyncStream, StreamView, error)
GetSyncStream(ctx context.Context, streamId StreamId) (SyncStream, error)
CreateStream(ctx context.Context, streamId StreamId) (SyncStream, StreamView, error)
ForceFlushAll(ctx context.Context)
GetLoadedViews(ctx context.Context) []StreamView
Expand All @@ -54,6 +69,9 @@ type streamCacheImpl struct {
// transaction instead of one by one. This can be deleted once the StreamRegistry facet is updated to allow for
// batch registrations.
registerMiniBlocksBatched bool

streamCacheSizeGauge prometheus.Gauge
streamCacheUnloadedGauge prometheus.Gauge
}

var _ StreamCache = (*streamCacheImpl)(nil)
Expand All @@ -67,6 +85,14 @@ func NewStreamCache(
s := &streamCacheImpl{
params: params,
registerMiniBlocksBatched: true,
streamCacheSizeGauge: streamCacheSizeGauge.With(prometheus.Labels{
"chain_id": params.Riverchain.ChainId.String(),
"address": params.Wallet.Address.String(),
}),
streamCacheUnloadedGauge: streamCacheUnloadedGauge.With(prometheus.Labels{
"chain_id": params.Riverchain.ChainId.String(),
"address": params.Wallet.Address.String(),
}),
}

streams, err := params.Registry.GetAllStreams(ctx, appliedBlockNum)
Expand Down Expand Up @@ -113,21 +139,33 @@ func (s *streamCacheImpl) cacheCleanup(ctx context.Context, pollInterval time.Du
for {
select {
case <-time.After(pollInterval):
totalStreamsCount := 0
unloadedStreamsCount := 0
// TODO: add data structure that supports to loop over streams that have their view loaded instead of
// looping over all streams.
s.cache.Range(func(streamID, streamVal any) bool {
totalStreamsCount++
if stream := streamVal.(*streamImpl); stream.tryCleanup(expiration) {
log.Debug("stream view evicted from cache", "streamId", stream.streamId)
unloadedStreamsCount++
log.Debug("stream view is unloaded from cache", "streamId", stream.streamId)
}
return true
})
s.streamCacheSizeGauge.Set(float64(totalStreamsCount))
s.streamCacheUnloadedGauge.Set(float64(unloadedStreamsCount))
case <-ctx.Done():
log.Debug("stream cache cache cleanup shutdown")
return
}
}
}

func (s *streamCacheImpl) tryLoadStreamRecord(ctx context.Context, streamId StreamId) (SyncStream, StreamView, error) {
// Same code is called for GetStream and CreateStream.
func (s *streamCacheImpl) tryLoadStreamRecord(
ctx context.Context,
streamId StreamId,
loadView bool,
) (SyncStream, StreamView, error) {
// Same code is called for GetStream, GetSyncStream and CreateStream.
// For GetStream the fact that record is not in cache means that there is race to get it during creation:
// Blockchain record is already created, but this fact is not reflected yet in local storage.
// This may happen if somebody observes record allocation on blockchain and tries to get stream
Expand Down Expand Up @@ -177,11 +215,14 @@ func (s *streamCacheImpl) tryLoadStreamRecord(ctx context.Context, streamId Stre
err := s.params.Storage.CreateStreamStorage(ctx, streamId, mb)
if err != nil {
if AsRiverError(err).Code == Err_ALREADY_EXISTS {
// Attempt to load stream from storage. Might as well do it while under lock.
err = stream.loadInternal(ctx)
if err == nil {
return stream, stream.view, nil
if loadView {
// Attempt to load stream from storage. Might as well do it while under lock.
if err = stream.loadInternal(ctx); err == nil {
return stream, stream.view, nil
}
return nil, nil, err
}
return stream, nil, err
}
return nil, nil, err
}
Expand All @@ -202,6 +243,10 @@ func (s *streamCacheImpl) tryLoadStreamRecord(ctx context.Context, streamId Stre
return nil, nil, RiverError(Err_INTERNAL, "tryLoadStreamRecord: Cache corruption", "streamId", streamId)
}
stream = entry.(*streamImpl)
if !loadView {
return stream, nil, err
}

view, err := stream.GetView(ctx)
if err != nil {
return nil, nil, err
Expand All @@ -213,7 +258,7 @@ func (s *streamCacheImpl) tryLoadStreamRecord(ctx context.Context, streamId Stre
func (s *streamCacheImpl) GetStream(ctx context.Context, streamId StreamId) (SyncStream, StreamView, error) {
entry, _ := s.cache.Load(streamId)
if entry == nil {
return s.tryLoadStreamRecord(ctx, streamId)
return s.tryLoadStreamRecord(ctx, streamId, true)
}
stream := entry.(*streamImpl)

Expand All @@ -227,6 +272,15 @@ func (s *streamCacheImpl) GetStream(ctx context.Context, streamId StreamId) (Syn
}
}

func (s *streamCacheImpl) GetSyncStream(ctx context.Context, streamId StreamId) (SyncStream, error) {
entry, _ := s.cache.Load(streamId)
if entry == nil {
syncStream, _, err := s.tryLoadStreamRecord(ctx, streamId, false)
return syncStream, err
}
return entry.(*streamImpl), nil
}

func (s *streamCacheImpl) CreateStream(
ctx context.Context,
streamId StreamId,
Expand Down Expand Up @@ -309,11 +363,12 @@ func (s *streamCacheImpl) onNewBlockBatch(ctx context.Context) {
tasks sync.WaitGroup
)

// TODO: visiting every stream here is not efficient,
// most streams are cold. Add data structure to keep track of streams
// that are eligible for miniblock production.
s.cache.Range(func(key, value interface{}) bool {
stream := value.(*streamImpl)
streamVal, ok := s.cache.Load(key)
if !ok {
return true
}
stream := streamVal.(*streamImpl)
if stream.canCreateMiniblock() {
candidates[stream.streamId] = stream
if len(candidates) == MiniblockCandidateBatchSize {
Expand Down Expand Up @@ -362,7 +417,13 @@ func (s *streamCacheImpl) processMiniblockProposalBatch(

proposal, err := c.ProposeNextMiniblock(ctx, false)
if err != nil {
log.Error("processMiniblockProposalBatch: Error creating new miniblock proposal", "streamId", c.streamId, "err", err)
log.Error(
"processMiniblockProposalBatch: Error creating new miniblock proposal",
"streamId",
c.streamId,
"err",
err,
)
continue
}
if proposal == nil {
Expand Down
Loading

0 comments on commit f10e0ef

Please sign in to comment.