From 86cbea25b27c2078666013f99246ae19d9f5532c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 08:50:50 +0200 Subject: [PATCH 01/10] feat: etcd serde: ignore scalar types, such as int64 --- internal/pkg/service/common/dependencies/etcdclient.go | 8 +++++++- .../diskwriter/network/router/closesync/closesync.go | 0 .../diskwriter/network/router/closesync/closesync_test.go | 0 .../diskwriter/network/router/closesync/coordinator.go | 1 + .../local/diskwriter/network/router/closesync/source.go | 0 5 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go create mode 100644 internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go create mode 100644 internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go create mode 100644 internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go diff --git a/internal/pkg/service/common/dependencies/etcdclient.go b/internal/pkg/service/common/dependencies/etcdclient.go index 900eee3444..7893d45009 100644 --- a/internal/pkg/service/common/dependencies/etcdclient.go +++ b/internal/pkg/service/common/dependencies/etcdclient.go @@ -2,6 +2,7 @@ package dependencies import ( "context" + "reflect" etcdPkg "go.etcd.io/etcd/client/v3" @@ -31,7 +32,12 @@ func newEtcdClientScope(ctx context.Context, baseScp BaseScope, cfg etcdclient.C return &etcdClientScope{ client: client, - serde: serde.NewJSON(baseScp.Validator().Validate), + serde: serde.NewJSON(func(ctx context.Context, value any) error { + if k := reflect.ValueOf(value).Kind(); k != reflect.Struct && k != reflect.Pointer { + return baseScp.Validator().Validate(ctx, value) + } + return nil + }), }, nil } diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go new file mode 100644 index 0000000000..e69de29bb2 diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go new file mode 100644 index 0000000000..e69de29bb2 diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go new file mode 100644 index 0000000000..bca2e77045 --- /dev/null +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go @@ -0,0 +1 @@ +package closesync diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go new file mode 100644 index 0000000000..e69de29bb2 From 57147344af41776db17496aaad5b1a9c6b5691fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 08:51:19 +0200 Subject: [PATCH 02/10] feat: atomic result: Add Header method --- internal/pkg/service/common/etcdop/op/atomic_do.go | 4 +++- internal/pkg/service/common/etcdop/op/atomic_result.go | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/pkg/service/common/etcdop/op/atomic_do.go b/internal/pkg/service/common/etcdop/op/atomic_do.go index 2249a0a1b4..38e9d887fb 100644 --- a/internal/pkg/service/common/etcdop/op/atomic_do.go +++ b/internal/pkg/service/common/etcdop/op/atomic_do.go @@ -17,11 +17,13 @@ func (v *AtomicOp[R]) Do(ctx context.Context, opts ...Option) AtomicResult[R] { var ok bool var err error + var header *Header for { txnResult := v.DoWithoutRetry(ctx, opts...) ok = txnResult.Succeeded() err = txnResult.Err() + header = txnResult.Header() if err == nil && !ok { attempt++ if delay := b.NextBackOff(); delay == backoff.Stop { @@ -42,7 +44,7 @@ func (v *AtomicOp[R]) Do(ctx context.Context, opts ...Option) AtomicResult[R] { ) } - return AtomicResult[R]{result: v.result, error: err, attempt: attempt, elapsedTime: elapsedTime} + return AtomicResult[R]{result: v.result, error: err, header: header, attempt: attempt, elapsedTime: elapsedTime} } func (v *AtomicOp[R]) DoWithoutRetry(ctx context.Context, opts ...Option) *TxnResult[R] { diff --git a/internal/pkg/service/common/etcdop/op/atomic_result.go b/internal/pkg/service/common/etcdop/op/atomic_result.go index d0974ba3db..c04cdc52e6 100644 --- a/internal/pkg/service/common/etcdop/op/atomic_result.go +++ b/internal/pkg/service/common/etcdop/op/atomic_result.go @@ -1,10 +1,13 @@ package op -import "time" +import ( + "time" +) type AtomicResult[R any] struct { result *R error error + header *Header attempt int elapsedTime time.Duration } @@ -21,6 +24,10 @@ func (v AtomicResult[R]) Err() error { return v.error } +func (v AtomicResult[R]) Header() *Header { + return v.header +} + func (v AtomicResult[R]) ResultOrErr() (R, error) { var empty R if v.error != nil { From 73665b25c4d3af69877d70920ad6518b7aa8c8cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 08:51:50 +0200 Subject: [PATCH 03/10] fix: Hide method from the build struct --- internal/pkg/service/common/etcdop/session.go | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/internal/pkg/service/common/etcdop/session.go b/internal/pkg/service/common/etcdop/session.go index 9f929e7631..ad35cfd854 100644 --- a/internal/pkg/service/common/etcdop/session.go +++ b/internal/pkg/service/common/etcdop/session.go @@ -33,10 +33,10 @@ const ( // Any initialization error is reported via the error channel. // After successful initialization, a new session is created after each failure until the context ends. type Session struct { - sessionBuilder - logger log.Logger - client *etcd.Client - backoff *backoff.ExponentialBackOff + sessionBuilder SessionBuilder + logger log.Logger + client *etcd.Client + backoff *backoff.ExponentialBackOff mutexStore *mutexStore @@ -53,14 +53,20 @@ type SessionBuilder struct { type onSession func(session *concurrency.Session) error -type sessionBuilder = SessionBuilder - type NoSessionError struct{} func (e NoSessionError) Error() string { return "no active concurrent.Session" } +// NewSessionBuilder creates a builder for the resistant Session. +func NewSessionBuilder() *SessionBuilder { + return &SessionBuilder{ + grantTimeout: sessionDefaultGrantTimeout, + ttlSeconds: sessionDefaultTTLSeconds, + } +} + // WithGrantTimeout configures the maximum time to wait for creating a new session. func (b SessionBuilder) WithGrantTimeout(v time.Duration) SessionBuilder { if v <= 0 { @@ -233,15 +239,15 @@ func (s *Session) newSession(ctx context.Context) (_ *concurrency.Session, err e // Obtain the LeaseID // The concurrency.NewSession bellow can do it by itself, but we need a separate context with a timeout here. - grantCtx, grantCancel := context.WithTimeout(ctx, s.grantTimeout) + grantCtx, grantCancel := context.WithTimeout(ctx, s.sessionBuilder.grantTimeout) defer grantCancel() - grantResp, err := s.client.Grant(grantCtx, int64(s.ttlSeconds)) + grantResp, err := s.client.Grant(grantCtx, int64(s.sessionBuilder.ttlSeconds)) if err != nil { return nil, err } // Create session - session, err := concurrency.NewSession(s.client, concurrency.WithTTL(s.ttlSeconds), concurrency.WithLease(grantResp.ID)) + session, err := concurrency.NewSession(s.client, concurrency.WithTTL(s.sessionBuilder.ttlSeconds), concurrency.WithLease(grantResp.ID)) if err != nil { return nil, err } @@ -261,7 +267,7 @@ func (s *Session) newSession(ctx context.Context) (_ *concurrency.Session, err e // Invoke callbacks - start session dependent work s.logger.WithDuration(time.Since(startTime)).Infof(ctx, "created etcd session") - for i, fn := range s.onSession { + for i, fn := range s.sessionBuilder.onSession { if err := fn(session); err != nil { err = errors.Errorf(`callback OnSession[%d] failed: %s`, i, err) s.logger.Error(ctx, err.Error()) @@ -301,14 +307,6 @@ func (s *Session) closeSession(ctx context.Context, reason string) error { return nil } -// NewSessionBuilder creates a builder for the resistant Session. -func NewSessionBuilder() *SessionBuilder { - return &SessionBuilder{ - grantTimeout: sessionDefaultGrantTimeout, - ttlSeconds: sessionDefaultTTLSeconds, - } -} - func newSessionBackoff() *backoff.ExponentialBackOff { b := backoff.NewExponentialBackOff() b.RandomizationFactor = 0.2 From e2004ba7378545e513624c099c66bb2e1e3e31da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 08:52:27 +0200 Subject: [PATCH 04/10] feat: Add closesync pkg, sync between source and coordinators nodes regarding closing slices --- .../network/router/closesync/closesync.go | 33 ++++ .../router/closesync/closesync_test.go | 170 ++++++++++++++++++ .../network/router/closesync/coordinator.go | 135 ++++++++++++++ .../network/router/closesync/source.go | 69 +++++++ .../local/diskwriter/network/router/router.go | 18 ++ .../diskwriter/network/router/router_test.go | 29 ++- 6 files changed, 451 insertions(+), 3 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go index e69de29bb2..ea8358a2d3 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go @@ -0,0 +1,33 @@ +// Package closesync provides synchronization between source and coordinators nodes regarding closing slices +// The coordinator nodes is waiting for slice pipeline to finish, the router nodes notifies about closed slices. +package closesync + +import ( + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" +) + +type dependencies interface { + Logger() log.Logger + Process() *servicectx.Process + EtcdClient() *etcd.Client + EtcdSerde() *serde.Serde +} + +type schema struct { + prefix etcdop.PrefixT[int64] +} + +func newSchema(s *serde.Serde) schema { + return schema{ + prefix: etcdop.NewTypedPrefix[int64]("runtime/closesync/source/node", s), + } +} + +func (s schema) SourceNode(sourceNodeID string) etcdop.KeyT[int64] { + return s.prefix.Key(sourceNodeID) +} diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go index e69de29bb2..ddd55b7a87 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go @@ -0,0 +1,170 @@ +package closesync_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestSourceAndCoordinatorNodes(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + d, mock := dependencies.NewMockedServiceScope(t) + client := mock.TestEtcdClient() + + // Create 3 source nodes and 1 coordinator node + coordinator, err := closesync.NewCoordinatorNode(d) + require.NoError(t, err) + assert.Equal(t, closesync.NoSourceNode, coordinator.MinRevInUse()) + s1, err := closesync.NewSourceNode(d, "source-node-1") + require.NoError(t, err) + s2, err := closesync.NewSourceNode(d, "source-node-2") + require.NoError(t, err) + s3, err := closesync.NewSourceNode(d, "source-node-3") + require.NoError(t, err) + + // Helper + waitForMinRevInUse := func(t *testing.T, r int64) { + t.Helper() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, r, coordinator.MinRevInUse()) + }, 5*time.Second, 10*time.Millisecond) + } + waitForEtcdState := func(t *testing.T, expected string) { + t.Helper() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + etcdhelper.AssertKVsString(c, client, expected) + }, 5*time.Second, 10*time.Millisecond) + } + + // Check initial etcd state + waitForEtcdState(t, ` +<<<<< +runtime/closesync/source/node/source-node-1 (lease) +----- +0 +>>>>> + +<<<<< +runtime/closesync/source/node/source-node-2 (lease) +----- +0 +>>>>> + +<<<<< +runtime/closesync/source/node/source-node-3 (lease) +----- +0 +>>>>> +`) + assert.Equal(t, int64(0), coordinator.MinRevInUse()) + + // The progress of individual source nodes is different + assert.NoError(t, s1.Notify(ctx, 100)) + assert.NoError(t, s2.Notify(ctx, 101)) + assert.NoError(t, s3.Notify(ctx, 102)) + waitForMinRevInUse(t, 100) + waitForEtcdState(t, ` +<<<<< +runtime/closesync/source/node/source-node-1 (lease) +----- +100 +>>>>> + +<<<<< +runtime/closesync/source/node/source-node-2 (lease) +----- +101 +>>>>> + +<<<<< +runtime/closesync/source/node/source-node-3 (lease) +----- +102 +>>>>> +`) + + // Revisions <= 100 are unblocked + select { + case <-coordinator.WaitForRevisionChan(99): + default: + assert.Fail(t, "channel should be closed") + } + select { + case <-coordinator.WaitForRevisionChan(100): + default: + assert.Fail(t, "channel should be closed") + } + + // Revisions > 100 are blocked + wait101 := coordinator.WaitForRevisionChan(101) + wait102 := coordinator.WaitForRevisionChan(102) + wait103 := coordinator.WaitForRevisionChan(103) + select { + case <-wait101: + assert.Fail(t, "channel shouldn't be closed") + case <-wait102: + assert.Fail(t, "channel shouldn't be closed") + case <-wait103: + assert.Fail(t, "channel shouldn't be closed") + default: + } + + // Unblock 101 + require.NoError(t, s1.Notify(ctx, 200)) + waitForMinRevInUse(t, 101) + select { + case <-wait101: + default: + assert.Fail(t, "channel should be closed") + } + select { + case <-wait102: + assert.Fail(t, "channel shouldn't be closed") + case <-wait103: + assert.Fail(t, "channel shouldn't be closed") + default: + } + + // Unblock 102 + require.NoError(t, s2.Notify(ctx, 200)) + waitForMinRevInUse(t, 102) + select { + case <-wait102: + default: + assert.Fail(t, "channel should be closed") + } + select { + case <-wait103: + assert.Fail(t, "channel shouldn't be closed") + default: + } + + // Unblock 103 + require.NoError(t, s3.Notify(ctx, 200)) + waitForMinRevInUse(t, 200) + select { + case <-wait103: + default: + assert.Fail(t, "channel should be closed") + } + + // Shutdown + d.Process().Shutdown(ctx, errors.New("bye bye")) + d.Process().WaitForShutdown() + waitForEtcdState(t, ``) + + // No error is logged + mock.DebugLogger().AssertJSONMessages(t, "") +} diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go index bca2e77045..bd3e582c8d 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go @@ -1 +1,136 @@ package closesync + +import ( + "context" + "sync" + + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" +) + +const ( + NoSourceNode = int64(-1) +) + +type CoordinatorNode struct { + logger log.Logger + client *etcd.Client + schema schema + + // revision reported by source nodes + revisions *etcdop.MirrorMap[int64, string, int64] + + listenersLock sync.Mutex + listenerID int + listeners map[int]*listener +} + +type listener struct { + id int + minRev int64 + done chan struct{} +} + +func NewCoordinatorNode(d dependencies) (*CoordinatorNode, error) { + n := &CoordinatorNode{ + client: d.EtcdClient(), + logger: d.Logger().WithComponent("close-sync.coordinator"), + schema: newSchema(d.EtcdSerde()), + listeners: make(map[int]*listener), + } + + // Graceful shutdown + wg := &sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + d.Process().OnShutdown(func(_ context.Context) { + n.logger.Infof(ctx, "closing close-sync coordinator node") + cancel() + wg.Wait() + n.logger.Infof(ctx, "closed close-sync coordinator node") + }) + + // Watch the prefix + { + n.revisions = etcdop.SetupMirrorMap[int64, string, int64]( + n.schema.prefix.GetAllAndWatch(ctx, n.client), + func(key string, value int64) string { return key }, + func(key string, value int64) int64 { return value }, + ). + WithOnUpdate(func(_ etcdop.MirrorUpdate) { + n.invokeListeners() + }). + BuildMirror() + if err := <-n.revisions.StartMirroring(ctx, wg, n.logger); err != nil { + return nil, err + } + } + + return n, nil +} + +func (n *CoordinatorNode) MinRevInUse() (out int64) { + out = NoSourceNode + n.revisions.ForEach(func(_ string, v int64) (stop bool) { + if out == NoSourceNode || out > v { + out = v + } + return false + }) + return out +} + +// WaitForRevision waits until all API nodes are synced to the required revision or the context is cancelled. +func (n *CoordinatorNode) WaitForRevision(ctx context.Context, minRev int64) error { + if greaterOrEqual(n.MinRevInUse(), minRev) { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-n.WaitForRevisionChan(minRev): + return nil + } +} + +// WaitForRevisionChan returns the channel that is closed when all API nodes are synced to the required revision. +func (n *CoordinatorNode) WaitForRevisionChan(minRev int64) <-chan struct{} { + if greaterOrEqual(n.MinRevInUse(), minRev) { + done := make(chan struct{}) + close(done) + return done + } + + return n.newListener(minRev).done +} + +func (n *CoordinatorNode) newListener(minRev int64) *listener { + n.listenersLock.Lock() + defer n.listenersLock.Unlock() + + l := &listener{id: n.listenerID, minRev: minRev, done: make(chan struct{})} + n.listeners[n.listenerID] = l + n.listenerID++ + + return l +} + +func (n *CoordinatorNode) invokeListeners() { + n.listenersLock.Lock() + defer n.listenersLock.Unlock() + + r := n.MinRevInUse() + + for id, l := range n.listeners { + if greaterOrEqual(r, l.minRev) { + close(l.done) + delete(n.listeners, id) + } + } +} + +func greaterOrEqual(actual, min int64) bool { + return actual == NoSourceNode || actual >= min +} diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go index e69de29bb2..578f60a35c 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go @@ -0,0 +1,69 @@ +package closesync + +import ( + "context" + "sync" + + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" +) + +type SourceNode struct { + logger log.Logger + client *etcd.Client + sess *etcdop.Session + nodeID string + schema schema +} + +func NewSourceNode(d dependencies, nodeID string) (*SourceNode, error) { + n := &SourceNode{ + client: d.EtcdClient(), + logger: d.Logger().WithComponent("close-sync.source"), + nodeID: nodeID, + schema: newSchema(d.EtcdSerde()), + } + + // Graceful shutdown + wg := &sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + d.Process().OnShutdown(func(_ context.Context) { + n.logger.Infof(ctx, "closing close-sync source node") + cancel() + wg.Wait() + n.logger.Infof(ctx, "closed close-sync source node") + }) + + // Stat concurrent session with retries + { + var errCh <-chan error + n.sess, errCh = etcdop.NewSessionBuilder().Start(ctx, wg, n.logger, n.client) + if err := <-errCh; err != nil { + return nil, err + } + } + + if err := n.Notify(ctx, 0); err != nil { + return nil, err + } + + return n, nil +} + +// Notify - the source node notifies that all changes up to the reported revision has been processed. +// That means, all pipelines matching slices closed up to the revision, are already closed too. +func (n *SourceNode) Notify(ctx context.Context, rev int64) error { + sess, err := n.sess.Session() + if err != nil { + return err + } + + // Update key in the database, with the current revision. + // The key contains lease, so if the source node unexpectedly gone, the key is automatically removed, after TTL seconds. + return n.schema.SourceNode(n.nodeID). + Put(n.client, rev, etcd.WithLease(sess.Lease())). + Do(ctx). + Err() +} diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go index 2515beaf1a..603f4dbf85 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go @@ -1,3 +1,4 @@ +// Package router provides write routing and balancing, from a source node to disk writer nodes/slices. package router import ( @@ -13,12 +14,14 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/log" "github.com/keboola/keboola-as-code/internal/pkg/service/common/distribution" "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding" encodingCfg "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config" localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model" @@ -34,6 +37,7 @@ type Router struct { connections *connection.Manager encoding *encoding.Manager distribution *distribution.GroupNode + closeSyncer *closesync.SourceNode // slices field contains in-memory snapshot of all opened storage file slices slices *etcdop.MirrorTree[storage.Slice, *sliceData] @@ -56,6 +60,8 @@ type sliceData struct { type dependencies interface { Logger() log.Logger Process() *servicectx.Process + EtcdClient() *etcd.Client + EtcdSerde() *serde.Serde DistributionNode() *distribution.Node StorageRepository() *storageRepo.Repository ConnectionManager() *connection.Manager @@ -112,6 +118,12 @@ func New(d dependencies, sourceNodeID, sourceType string, config network.Config) return nil, err } + // Create utility, to report processed changes in slices (closed pipelines) + r.closeSyncer, err = closesync.NewSourceNode(d, sourceNodeID) + if err != nil { + return nil, err + } + // Start slices mirroring, only necessary data is saved { r.slices = etcdop. @@ -145,6 +157,12 @@ func New(d dependencies, sourceNodeID, sourceType string, config network.Config) // Update all affected pipelines r.updatePipelines(ctx, sinks) + + // All changes up to the revision have been processed, + // pipelines have been closed. + if err := r.closeSyncer.Notify(ctx, changes.Header.Revision); err != nil { + r.logger.Errorf(ctx, "cannot report synced revision: %s", err.Error()) + } }). BuildMirror() if err := <-r.slices.StartMirroring(ctx, &r.wg, r.logger); err != nil { diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go index c3afea6258..5854c32da2 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go @@ -13,9 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/keboola/keboola-as-code/internal/pkg/log" + commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/testconfig" @@ -39,6 +42,19 @@ func TestRouter_UpdatePipelinesOnSlicesChange(t *testing.T) { volumesCount := 2 writerNode, _ := testnode.StartDiskWriterNode(t, logger, etcdCfg, volumesCount, nil) + // Create coordinator, to check reported revisions + svcScope, _ := dependencies.NewMockedServiceScope(t, commonDeps.WithEtcdConfig(etcdCfg)) + coordinator, err := closesync.NewCoordinatorNode(svcScope) + require.NoError(t, err) + + // Helper + waitForMinRevInUse := func(t *testing.T, r int64) { + t.Helper() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, r, coordinator.MinRevInUse()) + }, 5*time.Second, 10*time.Millisecond) + } + // Wait for volumes registration require.EventuallyWithT(t, func(c *assert.CollectT) { logger.AssertJSONMessages(c, `{"level":"info","message":"disk writer listening on \"%s\""}`) @@ -64,7 +80,8 @@ func TestRouter_UpdatePipelinesOnSlicesChange(t *testing.T) { sink.Config = testconfig.LocalVolumeConfig(2, []string{"hdd"}) require.NoError(t, sourceScp.DefinitionRepository().Branch().Create(&branch, clk.Now(), test.ByUser()).Do(ctx).Err()) require.NoError(t, sourceScp.DefinitionRepository().Source().Create(&source, clk.Now(), test.ByUser(), "create").Do(ctx).Err()) - require.NoError(t, sourceScp.DefinitionRepository().Sink().Create(&sink, clk.Now(), test.ByUser(), "create").Do(ctx).Err()) + sinkResult := sourceScp.DefinitionRepository().Sink().Create(&sink, clk.Now(), test.ByUser(), "create").Do(ctx) + require.NoError(t, sinkResult.Err()) require.EventuallyWithT(t, func(c *assert.CollectT) { logger.AssertJSONMessages(c, `{"level":"debug","message":"watch stream mirror synced to revision %d","component":"sink.router"}`) }, 5*time.Second, 10*time.Millisecond) @@ -82,26 +99,32 @@ func TestRouter_UpdatePipelinesOnSlicesChange(t *testing.T) { {"level":"debug","message":"opened balanced pipeline to 2 slices, sink \"123/111/my-source/my-sink\"","component":"storage.router"} `) }, 5*time.Second, 10*time.Millisecond) + waitForMinRevInUse(t, sinkResult.Header().Revision) // Rotate file/slices - require.NoError(t, sourceScp.StorageRepository().File().Rotate(sink.SinkKey, clk.Now()).Do(ctx).Err()) + rotateResult := sourceScp.StorageRepository().File().Rotate(sink.SinkKey, clk.Now()).Do(ctx) + require.NoError(t, rotateResult.Err()) require.EventuallyWithT(t, func(c *assert.CollectT) { logger.AssertJSONMessages(c, ` {"level":"debug","message":"updated balanced pipeline, 2 opened slices, 2 closed slices, sink \"123/111/my-source/my-sink\"","component":"storage.router"} `) }, 5*time.Second, 10*time.Millisecond) + waitForMinRevInUse(t, rotateResult.Header().Revision) // Disable sink - close files/slices - require.NoError(t, sourceScp.DefinitionRepository().Sink().Disable(sink.SinkKey, clk.Now(), test.ByUser(), "reason").Do(ctx).Err()) + disableResult := sourceScp.DefinitionRepository().Sink().Disable(sink.SinkKey, clk.Now(), test.ByUser(), "reason").Do(ctx) + require.NoError(t, disableResult.Err()) require.EventuallyWithT(t, func(c *assert.CollectT) { logger.AssertJSONMessages(c, ` {"level":"debug","message":"closed balanced pipeline to 2 slices, sink \"123/111/my-source/my-sink\"","component":"storage.router"} `) }, 5*time.Second, 10*time.Millisecond) + waitForMinRevInUse(t, disableResult.Header().Revision) // Shutdown the source node sourceScp.Process().Shutdown(ctx, errors.New("bye bye")) sourceScp.Process().WaitForShutdown() + waitForMinRevInUse(t, closesync.NoSourceNode) // Shutdown the writer node writerNode.Process().Shutdown(ctx, errors.New("bye bye")) From 359007e4e484be42db90b713a9c52a3de789a431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 09:06:07 +0200 Subject: [PATCH 05/10] test: Fix test assert --- .../diskwriter/network/connection/connection_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection_test.go index 0453e436e2..df82c2851d 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection_test.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection_test.go @@ -78,14 +78,14 @@ func TestConnectionManager(t *testing.T) { assert.True(t, conn.IsConnected()) } - // Shutdown w1 and w3 + // Shutdown w1 and w3 (disconnect reason can be: 1. "session shutdown" OR 2. "EOF", so %s is used) w1.Process().Shutdown(ctx, errors.New("bye bye writer 1")) w1.Process().WaitForShutdown() w3.Process().Shutdown(ctx, errors.New("bye bye writer 3")) w3.Process().WaitForShutdown() waitForLog(t, sourceLogger, `{"level":"info","message":"the list of volumes has changed, updating connections","component":"storage.router.connections"}`) - waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w1\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`) - waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w3\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`) + waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w1\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`) + waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w3\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`) sourceLogger.Truncate() assert.Equal(t, 2, connManager.ConnectionsCount()) _, found := connManager.ConnectionToNode("w1") @@ -102,8 +102,8 @@ func TestConnectionManager(t *testing.T) { // Shutdown source node - no warning/error is logged s.Process().Shutdown(ctx, errors.New("bye bye source")) s.Process().WaitForShutdown() - waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w2\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`) - waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w4\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`) + waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w2\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`) + waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w4\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`) sourceLogger.AssertJSONMessages(t, `{"level":"info","message":"exited"}`) waitForLog(t, w2.DebugLogger(), `{"level":"info","message":"closed connection from \"%s\"","component":"storage.node.writer.network-file.server"}`) waitForLog(t, w4.DebugLogger(), `{"level":"info","message":"closed connection from \"%s\"","component":"storage.node.writer.network-file.server"}`) From 443c83df72625c12976297e279507fb7cb739af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 09:07:33 +0200 Subject: [PATCH 06/10] tests: Increase timeout --- .../diskwriter/network/router/closesync/closesync_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go index ddd55b7a87..b8344074d9 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync_test.go @@ -17,7 +17,7 @@ import ( func TestSourceAndCoordinatorNodes(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() d, mock := dependencies.NewMockedServiceScope(t) @@ -39,13 +39,13 @@ func TestSourceAndCoordinatorNodes(t *testing.T) { t.Helper() assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.Equal(c, r, coordinator.MinRevInUse()) - }, 5*time.Second, 10*time.Millisecond) + }, 10*time.Second, 10*time.Millisecond) } waitForEtcdState := func(t *testing.T, expected string) { t.Helper() assert.EventuallyWithT(t, func(c *assert.CollectT) { etcdhelper.AssertKVsString(c, client, expected) - }, 5*time.Second, 10*time.Millisecond) + }, 10*time.Second, 10*time.Millisecond) } // Check initial etcd state From d731b926ac04be95dc090d024d8a46a7287bc30f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 11:37:56 +0200 Subject: [PATCH 07/10] tests: Fix unstable test/logic --- .../level/local/encoding/chunk/writer.go | 4 ++++ .../storage/level/local/encoding/pipeline.go | 18 +++++++++++++----- .../level/local/encoding/pipeline_test.go | 18 +++++++----------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/encoding/chunk/writer.go b/internal/pkg/service/stream/storage/level/local/encoding/chunk/writer.go index 59db7d459e..ef425ffff8 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/chunk/writer.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/chunk/writer.go @@ -90,6 +90,10 @@ func (w *Writer) Flush() error { w.lock.Lock() defer w.lock.Unlock() + if w.closed { + return errors.New("chunk.Writer is closed") + } + // Swap chunks w.activeChunk.aligned = true w.swapChunks(w.emptyChunk()) diff --git a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go index 7112a44fe4..7f16d2b56e 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go @@ -92,8 +92,10 @@ type pipeline struct { // closed blocks new writes closed chan struct{} - // writeWg waits for in-progress writes before Close + // writeWg waits for in-progress writes without waiting for sync. writeWg sync.WaitGroup + // writeWg waits until all Write method calls are completed, it includes waiting for the sync. + writeCompletedWg sync.WaitGroup // writeWg waits for in-progress writes before Close chunksWg sync.WaitGroup @@ -258,7 +260,8 @@ func (p *pipeline) WriteRecord(record recordctx.Context) error { // Block Close method p.writeWg.Add(1) - defer p.writeWg.Done() + p.writeCompletedWg.Add(1) + defer p.writeCompletedWg.Done() // Check if the writer is closed if p.isClosed() { @@ -266,7 +269,9 @@ func (p *pipeline) WriteRecord(record recordctx.Context) error { } // Format and write table row - if err := p.encoder.WriteRecord(record); err != nil { + err := p.encoder.WriteRecord(record) + p.writeWg.Done() + if err != nil { return err } @@ -364,14 +369,17 @@ func (p *pipeline) Close(ctx context.Context) error { errs := errors.NewMultiError() - // Wait for running writes + // Wait for in-progress writes. p.writeWg.Wait() - // Stop syncer, it triggers also the last sync. + // Stop syncer, it triggers also the last sync to unblock write sync notifiers. if err := p.syncer.Stop(ctx); err != nil { errs.Append(err) } + // Wait for all Write method calls, it includes waiting for the sync. + p.writeCompletedWg.Wait() + // Close writers chain, it closes all writers and generates the last chunk. if err := p.chain.Close(ctx); err != nil { errs.Append(err) diff --git a/internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go b/internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go index 4e555556a1..64546353a5 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/keboola/keboola-as-code/internal/pkg/log" + commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding" @@ -221,15 +222,11 @@ foo3 {"level":"debug","message":"sync to disk done"} {"level":"info","message":"TEST: write unblocked"} {"level":"debug","message":"closing encoding pipeline"} +{"level":"debug","message":"stopping syncer"} {"level":"debug","message":"starting sync to disk"} {"level":"debug","message":"flushing writers"} {"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""} {"level":"debug","message":"writers flushed"} -{"level":"debug","message":"sync to disk done"} -{"level":"debug","message":"stopping syncer"} -{"level":"debug","message":"starting sync to disk"} -{"level":"debug","message":"flushing writers"} -{"level":"debug","message":"writers flushed"} {"level":"debug","message":"sync to disk done"} {"level":"debug","message":"syncer stopped"} {"level":"debug","message":"closing chain"} @@ -318,15 +315,11 @@ foo3 {"level":"debug","message":"sync to cache done"} {"level":"info","message":"TEST: write unblocked"} {"level":"debug","message":"closing encoding pipeline"} +{"level":"debug","message":"stopping syncer"} {"level":"debug","message":"starting sync to cache"} {"level":"debug","message":"flushing writers"} {"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""} {"level":"debug","message":"writers flushed"} -{"level":"debug","message":"sync to cache done"} -{"level":"debug","message":"stopping syncer"} -{"level":"debug","message":"starting sync to cache"} -{"level":"debug","message":"flushing writers"} -{"level":"debug","message":"writers flushed"} {"level":"debug","message":"sync to cache done"} {"level":"debug","message":"syncer stopped"} {"level":"debug","message":"closing chain"} @@ -496,7 +489,10 @@ func newEncodingTestCase(t *testing.T) *encodingTestCase { cancel() }) - d, mock := dependencies.NewMockedSourceScope(t) + // Disable real clocks, in tests, sync is triggered manually. + // The sync timer may cause unexpected log messages. + clk := clock.NewMock() + d, mock := dependencies.NewMockedSourceScope(t, commonDeps.WithClock(clk)) helper := &writerSyncHelper{writeDone: make(chan struct{}, 100)} From 8acd6d1e233c8c6c786e39d24d7637c3f5cbf07e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 11:54:02 +0200 Subject: [PATCH 08/10] fix: Typos --- .../local/diskwriter/network/router/closesync/closesync.go | 2 +- .../pkg/service/stream/storage/level/local/encoding/pipeline.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go index ea8358a2d3..9aeeff44b7 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go @@ -1,5 +1,5 @@ // Package closesync provides synchronization between source and coordinators nodes regarding closing slices -// The coordinator nodes is waiting for slice pipeline to finish, the router nodes notifies about closed slices. +// The coordinator nodes are waiting for slice pipeline to finish, the router nodes notify about closed slices. package closesync import ( diff --git a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go index 7f16d2b56e..c429b69849 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go @@ -94,7 +94,7 @@ type pipeline struct { closed chan struct{} // writeWg waits for in-progress writes without waiting for sync. writeWg sync.WaitGroup - // writeWg waits until all Write method calls are completed, it includes waiting for the sync. + // writeCompletedWg waits until all Write method calls are completed, it includes waiting for the sync. writeCompletedWg sync.WaitGroup // writeWg waits for in-progress writes before Close chunksWg sync.WaitGroup From 2f31decc17b1cf291609026233afba25955d9965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 14:29:19 +0200 Subject: [PATCH 09/10] docs: Improve comments --- .../local/diskwriter/network/router/closesync/closesync.go | 2 +- .../diskwriter/network/router/closesync/coordinator.go | 6 ++++-- .../local/diskwriter/network/router/closesync/source.go | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go index 9aeeff44b7..740e48a04f 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/closesync.go @@ -1,4 +1,4 @@ -// Package closesync provides synchronization between source and coordinators nodes regarding closing slices +// Package closesync provides synchronization between source and coordinator nodes regarding closing slices // The coordinator nodes are waiting for slice pipeline to finish, the router nodes notify about closed slices. package closesync diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go index bd3e582c8d..2b1b8a6a59 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/coordinator.go @@ -11,6 +11,8 @@ import ( ) const ( + // NoSourceNode is revision number used to describe an edge-case, when no source node is running, only a coordinator node is running. + // So it is guaranteed that no source node writes to the slice and the check can be skipped. NoSourceNode = int64(-1) ) @@ -81,7 +83,7 @@ func (n *CoordinatorNode) MinRevInUse() (out int64) { return out } -// WaitForRevision waits until all API nodes are synced to the required revision or the context is cancelled. +// WaitForRevision waits until all source nodes are synced to the required revision or the context is cancelled. func (n *CoordinatorNode) WaitForRevision(ctx context.Context, minRev int64) error { if greaterOrEqual(n.MinRevInUse(), minRev) { return nil @@ -95,7 +97,7 @@ func (n *CoordinatorNode) WaitForRevision(ctx context.Context, minRev int64) err } } -// WaitForRevisionChan returns the channel that is closed when all API nodes are synced to the required revision. +// WaitForRevisionChan returns the channel that is closed when all source nodes are synced to the required revision. func (n *CoordinatorNode) WaitForRevisionChan(minRev int64) <-chan struct{} { if greaterOrEqual(n.MinRevInUse(), minRev) { done := make(chan struct{}) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go index 578f60a35c..303052bbb9 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync/source.go @@ -36,7 +36,7 @@ func NewSourceNode(d dependencies, nodeID string) (*SourceNode, error) { n.logger.Infof(ctx, "closed close-sync source node") }) - // Stat concurrent session with retries + // Start concurrent session with retries { var errCh <-chan error n.sess, errCh = etcdop.NewSessionBuilder().Start(ctx, wg, n.logger, n.client) From ec3f10122d3323de1a129d75956de048c9056f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 26 Jul 2024 15:01:13 +0200 Subject: [PATCH 10/10] fix: Unstable test --- .../level/local/diskwriter/network/router/router_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go index 5854c32da2..33139f9ad6 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router_test.go @@ -51,7 +51,7 @@ func TestRouter_UpdatePipelinesOnSlicesChange(t *testing.T) { waitForMinRevInUse := func(t *testing.T, r int64) { t.Helper() assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, r, coordinator.MinRevInUse()) + assert.GreaterOrEqual(c, coordinator.MinRevInUse(), r) }, 5*time.Second, 10*time.Millisecond) }