Skip to content

Commit

Permalink
Merge pull request #1936 from keboola/michaljurecko-PSGO-598-close-sync
Browse files Browse the repository at this point in the history
feat: Add closesync pkg to sync between source and coordinators nodes
  • Loading branch information
michaljurecko authored Jul 26, 2024
2 parents 73ea845 + ec3f101 commit 875bce2
Show file tree
Hide file tree
Showing 14 changed files with 517 additions and 45 deletions.
8 changes: 7 additions & 1 deletion internal/pkg/service/common/dependencies/etcdclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dependencies

import (
"context"
"reflect"

etcdPkg "go.etcd.io/etcd/client/v3"

Expand Down Expand Up @@ -31,7 +32,12 @@ func newEtcdClientScope(ctx context.Context, baseScp BaseScope, cfg etcdclient.C

return &etcdClientScope{
client: client,
serde: serde.NewJSON(baseScp.Validator().Validate),
serde: serde.NewJSON(func(ctx context.Context, value any) error {
if k := reflect.ValueOf(value).Kind(); k != reflect.Struct && k != reflect.Pointer {
return baseScp.Validator().Validate(ctx, value)
}
return nil
}),
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/service/common/etcdop/op/atomic_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ func (v *AtomicOp[R]) Do(ctx context.Context, opts ...Option) AtomicResult[R] {

var ok bool
var err error
var header *Header

for {
txnResult := v.DoWithoutRetry(ctx, opts...)
ok = txnResult.Succeeded()
err = txnResult.Err()
header = txnResult.Header()
if err == nil && !ok {
attempt++
if delay := b.NextBackOff(); delay == backoff.Stop {
Expand All @@ -42,7 +44,7 @@ func (v *AtomicOp[R]) Do(ctx context.Context, opts ...Option) AtomicResult[R] {
)
}

return AtomicResult[R]{result: v.result, error: err, attempt: attempt, elapsedTime: elapsedTime}
return AtomicResult[R]{result: v.result, error: err, header: header, attempt: attempt, elapsedTime: elapsedTime}
}

func (v *AtomicOp[R]) DoWithoutRetry(ctx context.Context, opts ...Option) *TxnResult[R] {
Expand Down
9 changes: 8 additions & 1 deletion internal/pkg/service/common/etcdop/op/atomic_result.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package op

import "time"
import (
"time"
)

type AtomicResult[R any] struct {
result *R
error error
header *Header
attempt int
elapsedTime time.Duration
}
Expand All @@ -21,6 +24,10 @@ func (v AtomicResult[R]) Err() error {
return v.error
}

func (v AtomicResult[R]) Header() *Header {
return v.header
}

func (v AtomicResult[R]) ResultOrErr() (R, error) {
var empty R
if v.error != nil {
Expand Down
34 changes: 16 additions & 18 deletions internal/pkg/service/common/etcdop/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ const (
// Any initialization error is reported via the error channel.
// After successful initialization, a new session is created after each failure until the context ends.
type Session struct {
sessionBuilder
logger log.Logger
client *etcd.Client
backoff *backoff.ExponentialBackOff
sessionBuilder SessionBuilder
logger log.Logger
client *etcd.Client
backoff *backoff.ExponentialBackOff

mutexStore *mutexStore

Expand All @@ -53,14 +53,20 @@ type SessionBuilder struct {

type onSession func(session *concurrency.Session) error

type sessionBuilder = SessionBuilder

type NoSessionError struct{}

func (e NoSessionError) Error() string {
return "no active concurrent.Session"
}

// NewSessionBuilder creates a builder for the resistant Session.
func NewSessionBuilder() *SessionBuilder {
return &SessionBuilder{
grantTimeout: sessionDefaultGrantTimeout,
ttlSeconds: sessionDefaultTTLSeconds,
}
}

// WithGrantTimeout configures the maximum time to wait for creating a new session.
func (b SessionBuilder) WithGrantTimeout(v time.Duration) SessionBuilder {
if v <= 0 {
Expand Down Expand Up @@ -233,15 +239,15 @@ func (s *Session) newSession(ctx context.Context) (_ *concurrency.Session, err e

// Obtain the LeaseID
// The concurrency.NewSession bellow can do it by itself, but we need a separate context with a timeout here.
grantCtx, grantCancel := context.WithTimeout(ctx, s.grantTimeout)
grantCtx, grantCancel := context.WithTimeout(ctx, s.sessionBuilder.grantTimeout)
defer grantCancel()
grantResp, err := s.client.Grant(grantCtx, int64(s.ttlSeconds))
grantResp, err := s.client.Grant(grantCtx, int64(s.sessionBuilder.ttlSeconds))
if err != nil {
return nil, err
}

// Create session
session, err := concurrency.NewSession(s.client, concurrency.WithTTL(s.ttlSeconds), concurrency.WithLease(grantResp.ID))
session, err := concurrency.NewSession(s.client, concurrency.WithTTL(s.sessionBuilder.ttlSeconds), concurrency.WithLease(grantResp.ID))
if err != nil {
return nil, err
}
Expand All @@ -261,7 +267,7 @@ func (s *Session) newSession(ctx context.Context) (_ *concurrency.Session, err e

// Invoke callbacks - start session dependent work
s.logger.WithDuration(time.Since(startTime)).Infof(ctx, "created etcd session")
for i, fn := range s.onSession {
for i, fn := range s.sessionBuilder.onSession {
if err := fn(session); err != nil {
err = errors.Errorf(`callback OnSession[%d] failed: %s`, i, err)
s.logger.Error(ctx, err.Error())
Expand Down Expand Up @@ -301,14 +307,6 @@ func (s *Session) closeSession(ctx context.Context, reason string) error {
return nil
}

// NewSessionBuilder creates a builder for the resistant Session.
func NewSessionBuilder() *SessionBuilder {
return &SessionBuilder{
grantTimeout: sessionDefaultGrantTimeout,
ttlSeconds: sessionDefaultTTLSeconds,
}
}

func newSessionBackoff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.RandomizationFactor = 0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func TestConnectionManager(t *testing.T) {
assert.True(t, conn.IsConnected())
}

// Shutdown w1 and w3
// Shutdown w1 and w3 (disconnect reason can be: 1. "session shutdown" OR 2. "EOF", so %s is used)
w1.Process().Shutdown(ctx, errors.New("bye bye writer 1"))
w1.Process().WaitForShutdown()
w3.Process().Shutdown(ctx, errors.New("bye bye writer 3"))
w3.Process().WaitForShutdown()
waitForLog(t, sourceLogger, `{"level":"info","message":"the list of volumes has changed, updating connections","component":"storage.router.connections"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w1\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w3\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w1\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w3\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`)
sourceLogger.Truncate()
assert.Equal(t, 2, connManager.ConnectionsCount())
_, found := connManager.ConnectionToNode("w1")
Expand All @@ -102,8 +102,8 @@ func TestConnectionManager(t *testing.T) {
// Shutdown source node - no warning/error is logged
s.Process().Shutdown(ctx, errors.New("bye bye source"))
s.Process().WaitForShutdown()
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w2\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w4\" - \"localhost:%s\": session shutdown","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w2\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`)
waitForLog(t, sourceLogger, `{"level":"info","message":"disk writer client disconnected from \"w4\" - \"localhost:%s\": %s","component":"storage.router.connections.client"}`)
sourceLogger.AssertJSONMessages(t, `{"level":"info","message":"exited"}`)
waitForLog(t, w2.DebugLogger(), `{"level":"info","message":"closed connection from \"%s\"","component":"storage.node.writer.network-file.server"}`)
waitForLog(t, w4.DebugLogger(), `{"level":"info","message":"closed connection from \"%s\"","component":"storage.node.writer.network-file.server"}`)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Package closesync provides synchronization between source and coordinator nodes regarding closing slices
// The coordinator nodes are waiting for slice pipeline to finish, the router nodes notify about closed slices.
package closesync

import (
etcd "go.etcd.io/etcd/client/v3"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
)

type dependencies interface {
Logger() log.Logger
Process() *servicectx.Process
EtcdClient() *etcd.Client
EtcdSerde() *serde.Serde
}

type schema struct {
prefix etcdop.PrefixT[int64]
}

func newSchema(s *serde.Serde) schema {
return schema{
prefix: etcdop.NewTypedPrefix[int64]("runtime/closesync/source/node", s),
}
}

func (s schema) SourceNode(sourceNodeID string) etcdop.KeyT[int64] {
return s.prefix.Key(sourceNodeID)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package closesync_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/closesync"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper"
)

func TestSourceAndCoordinatorNodes(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

d, mock := dependencies.NewMockedServiceScope(t)
client := mock.TestEtcdClient()

// Create 3 source nodes and 1 coordinator node
coordinator, err := closesync.NewCoordinatorNode(d)
require.NoError(t, err)
assert.Equal(t, closesync.NoSourceNode, coordinator.MinRevInUse())
s1, err := closesync.NewSourceNode(d, "source-node-1")
require.NoError(t, err)
s2, err := closesync.NewSourceNode(d, "source-node-2")
require.NoError(t, err)
s3, err := closesync.NewSourceNode(d, "source-node-3")
require.NoError(t, err)

// Helper
waitForMinRevInUse := func(t *testing.T, r int64) {
t.Helper()
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, r, coordinator.MinRevInUse())
}, 10*time.Second, 10*time.Millisecond)
}
waitForEtcdState := func(t *testing.T, expected string) {
t.Helper()
assert.EventuallyWithT(t, func(c *assert.CollectT) {
etcdhelper.AssertKVsString(c, client, expected)
}, 10*time.Second, 10*time.Millisecond)
}

// Check initial etcd state
waitForEtcdState(t, `
<<<<<
runtime/closesync/source/node/source-node-1 (lease)
-----
0
>>>>>
<<<<<
runtime/closesync/source/node/source-node-2 (lease)
-----
0
>>>>>
<<<<<
runtime/closesync/source/node/source-node-3 (lease)
-----
0
>>>>>
`)
assert.Equal(t, int64(0), coordinator.MinRevInUse())

// The progress of individual source nodes is different
assert.NoError(t, s1.Notify(ctx, 100))
assert.NoError(t, s2.Notify(ctx, 101))
assert.NoError(t, s3.Notify(ctx, 102))
waitForMinRevInUse(t, 100)
waitForEtcdState(t, `
<<<<<
runtime/closesync/source/node/source-node-1 (lease)
-----
100
>>>>>
<<<<<
runtime/closesync/source/node/source-node-2 (lease)
-----
101
>>>>>
<<<<<
runtime/closesync/source/node/source-node-3 (lease)
-----
102
>>>>>
`)

// Revisions <= 100 are unblocked
select {
case <-coordinator.WaitForRevisionChan(99):
default:
assert.Fail(t, "channel should be closed")
}
select {
case <-coordinator.WaitForRevisionChan(100):
default:
assert.Fail(t, "channel should be closed")
}

// Revisions > 100 are blocked
wait101 := coordinator.WaitForRevisionChan(101)
wait102 := coordinator.WaitForRevisionChan(102)
wait103 := coordinator.WaitForRevisionChan(103)
select {
case <-wait101:
assert.Fail(t, "channel shouldn't be closed")
case <-wait102:
assert.Fail(t, "channel shouldn't be closed")
case <-wait103:
assert.Fail(t, "channel shouldn't be closed")
default:
}

// Unblock 101
require.NoError(t, s1.Notify(ctx, 200))
waitForMinRevInUse(t, 101)
select {
case <-wait101:
default:
assert.Fail(t, "channel should be closed")
}
select {
case <-wait102:
assert.Fail(t, "channel shouldn't be closed")
case <-wait103:
assert.Fail(t, "channel shouldn't be closed")
default:
}

// Unblock 102
require.NoError(t, s2.Notify(ctx, 200))
waitForMinRevInUse(t, 102)
select {
case <-wait102:
default:
assert.Fail(t, "channel should be closed")
}
select {
case <-wait103:
assert.Fail(t, "channel shouldn't be closed")
default:
}

// Unblock 103
require.NoError(t, s3.Notify(ctx, 200))
waitForMinRevInUse(t, 200)
select {
case <-wait103:
default:
assert.Fail(t, "channel should be closed")
}

// Shutdown
d.Process().Shutdown(ctx, errors.New("bye bye"))
d.Process().WaitForShutdown()
waitForEtcdState(t, ``)

// No error is logged
mock.DebugLogger().AssertJSONMessages(t, "")
}
Loading

0 comments on commit 875bce2

Please sign in to comment.