Skip to content

Commit

Permalink
Merge pull request #1927 from keboola/michaljurecko-fix-filename
Browse files Browse the repository at this point in the history
fix: Filename in diskwriter, include source node ID
  • Loading branch information
jachym-tousek-keboola authored Jul 23, 2024
2 parents c3effc8 + 1aa0519 commit 3460da6
Show file tree
Hide file tree
Showing 24 changed files with 177 additions and 159 deletions.
31 changes: 16 additions & 15 deletions internal/pkg/service/common/dependencies/mocked.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ type mocked struct {

type MockedConfig struct {
enableEtcdClient bool
enableTasks bool
enableDistribution bool
enableDistributedLocks bool
enableOrchestrator bool

tasksNodeID string
distributionNodeID string

ctx context.Context
clock clock.Clock
telemetry telemetry.ForTest
Expand Down Expand Up @@ -109,17 +110,17 @@ func WithBigQueryBackend() MockedOption {
}
}

func WithEnabledTasks() MockedOption {
func WithEnabledTasks(nodeID string) MockedOption {
return func(c *MockedConfig) {
WithEnabledEtcdClient()(c)
c.enableTasks = true
c.tasksNodeID = nodeID
}
}

func WithEnabledDistribution() MockedOption {
func WithEnabledDistribution(nodeID string) MockedOption {
return func(c *MockedConfig) {
WithEnabledEtcdClient()(c)
c.enableDistribution = true
c.distributionNodeID = nodeID
}
}

Expand All @@ -130,18 +131,18 @@ func WithEnabledDistributedLocks() MockedOption {
}
}

func WithDistributionConfig(cfg distribution.Config) MockedOption {
func WithDistributionConfig(nodeID string, cfg distribution.Config) MockedOption {
return func(c *MockedConfig) {
WithEnabledEtcdClient()(c)
WithEnabledDistribution()(c)
WithEnabledDistribution(nodeID)(c)
c.distributionConfig = cfg
}
}

func WithEnabledOrchestrator() MockedOption {
func WithEnabledOrchestrator(nodeID string) MockedOption {
return func(c *MockedConfig) {
WithEnabledTasks()(c)
WithEnabledDistribution()(c)
WithEnabledTasks(nodeID)(c)
WithEnabledDistribution(nodeID)(c)
c.enableOrchestrator = true
}
}
Expand Down Expand Up @@ -383,13 +384,13 @@ func NewMocked(tb testing.TB, opts ...MockedOption) Mocked {
require.NoError(tb, err)
}

if cfg.enableTasks {
d.taskScope, err = newTaskScope(cfg.ctx, cfg.nodeID, d)
if cfg.tasksNodeID != "" {
d.taskScope, err = newTaskScope(cfg.ctx, cfg.tasksNodeID, d)
require.NoError(tb, err)
}

if cfg.enableDistribution {
d.distributionScope = newDistributionScope(cfg.nodeID, cfg.distributionConfig, d)
if cfg.distributionNodeID != "" {
d.distributionScope = newDistributionScope(cfg.distributionNodeID, cfg.distributionConfig, d)
}

if cfg.enableDistributedLocks {
Expand Down
20 changes: 10 additions & 10 deletions internal/pkg/service/common/task/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestOrchestrator(t *testing.T) {
d1 := dependencies.NewMocked(t,
dependencies.WithCtx(ctx),
dependencies.WithEtcdConfig(etcdCfg),
dependencies.WithEnabledOrchestrator(),
dependencies.WithEnabledOrchestrator("node1"),
dependencies.WithNodeID("node1"),
)
grp1, err := d1.DistributionNode().Group("my-group")
Expand All @@ -49,7 +49,7 @@ func TestOrchestrator(t *testing.T) {
d2 := dependencies.NewMocked(t,
dependencies.WithCtx(ctx),
dependencies.WithEtcdConfig(etcdCfg),
dependencies.WithEnabledOrchestrator(),
dependencies.WithEnabledOrchestrator("node2"),
dependencies.WithNodeID("node2"),
)
grp2, err := d2.DistributionNode().Group("my-group")
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestOrchestrator_StartTaskIf(t *testing.T) {
dependencies.WithCtx(ctx),
dependencies.WithEtcdConfig(etcdCfg),
dependencies.WithNodeID("node1"),
dependencies.WithEnabledOrchestrator(),
dependencies.WithEnabledOrchestrator("test-node"),
)

dist, err := d.DistributionNode().Group("my-group")
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestOrchestrator_RestartInterval(t *testing.T) {
dependencies.WithClock(clk),
dependencies.WithEtcdConfig(etcdCfg),
dependencies.WithNodeID("node1"),
dependencies.WithEnabledOrchestrator(),
dependencies.WithEnabledOrchestrator("test-node"),
)
logger := d.DebugLogger()
dist, err := d.DistributionNode().Group("my-group")
Expand Down Expand Up @@ -328,18 +328,18 @@ func TestOrchestrator_RestartInterval(t *testing.T) {
{"level":"info","message":"watch stream consumer closed: context canceled","task":"some.task","component":"orchestrator.watch.consumer"}
{"level":"info","message":"shutdown done","component":"orchestrator"}
{"level":"info","message":"received shutdown request","component":"distribution"}
{"level":"info","message":"unregistering the node \"node1\"","component":"distribution"}
{"level":"info","message":"the node \"node1\" unregistered","component":"distribution"}
{"level":"info","message":"unregistering the node \"test-node\"","component":"distribution"}
{"level":"info","message":"the node \"test-node\" unregistered","component":"distribution"}
{"level":"info","message":"closing etcd session: context canceled","distribution.group":"my-group","component":"distribution.etcd.session"}
{"level":"info","message":"closed etcd session","distribution.group":"my-group","component":"distribution.etcd.session"}
{"level":"info","message":"shutdown done","component":"distribution"}
{"level":"info","message":"received shutdown request","component":"orchestrator"}
{"level":"info","message":"waiting for orchestrators to finish","component":"orchestrator"}
{"level":"info","message":"shutdown done","component":"orchestrator"}
{"level":"info","message":"received shutdown request","node":"node1","component":"task"}
{"level":"info","message":"closing etcd session: context canceled","node":"node1","component":"task.etcd.session"}
{"level":"info","message":"closed etcd session","node":"node1","component":"task.etcd.session"}
{"level":"info","message":"shutdown done","node":"node1","component":"task"}
{"level":"info","message":"received shutdown request","node":"test-node","component":"task"}
{"level":"info","message":"closing etcd session: context canceled","node":"test-node","component":"task.etcd.session"}
{"level":"info","message":"closed etcd session","node":"test-node","component":"task.etcd.session"}
{"level":"info","message":"shutdown done","node":"test-node","component":"task"}
{"level":"info","message":"closing etcd connection","component":"etcd.client"}
{"level":"info","message":"closed etcd connection","component":"etcd.client"}
`)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/dependencies/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewMockedAPIScope(tb testing.TB, opts ...dependencies.MockedOption) (APISco
func NewMockedAPIScopeWithConfig(tb testing.TB, modifyConfig func(*config.Config), opts ...dependencies.MockedOption) (APIScope, Mocked) {
tb.Helper()

opts = append(opts, dependencies.WithEnabledTasks())
opts = append(opts, dependencies.WithEnabledTasks("test-node"))
serviceScp, mock := NewMockedServiceScopeWithConfig(tb, modifyConfig, opts...)

apiScp := newAPIScope(serviceScp, mock.TestConfig())
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/dependencies/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewMockedCoordinatorScopeWithConfig(tb testing.TB, modifyConfig func(*confi
svcScp, mock := NewMockedServiceScopeWithConfig(
tb,
modifyConfig,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution(), dependencies.WithEnabledDistributedLocks()}, opts...)...,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution("test-node"), dependencies.WithEnabledDistributedLocks()}, opts...)...,
)
d, err := newCoordinatorScope(mock.TestContext(), coordinatorParentScopesImpl{
ServiceScope: svcScp,
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/service/stream/dependencies/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewMockedSourceScopeWithConfig(tb testing.TB, modifyConfig func(*config.Con
svcScp, mock := NewMockedServiceScopeWithConfig(
tb,
modifyConfig,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution()}, opts...)...,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution("test-node")}, opts...)...,
)
d, err := newSourceScope(sourceParentScopesImpl{
ServiceScope: svcScp,
Expand Down Expand Up @@ -95,7 +95,7 @@ func newSourceScope(parentScp sourceParentScopes, sourceType string, cfg config.
return nil, err
}

d.storageRouter, err = storageRouter.New(d, sourceType, cfg.Storage.Level.Local.Writer.Network)
d.storageRouter, err = storageRouter.New(d, cfg.NodeID, sourceType, cfg.Storage.Level.Local.Writer.Network)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/dependencies/storage_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewMockedStorageWriterScopeWithConfig(tb testing.TB, modifyConfig func(*con
storageScp, mock := NewMockedStorageScopeWithConfig(
tb,
modifyConfig,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution()}, opts...)...,
append([]dependencies.MockedOption{dependencies.WithEnabledDistribution("test-node")}, opts...)...,
)
d, err := newStorageWriterScope(mock.TestContext(), storageWriterParentScopesImpl{
StorageScope: storageScp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (tc *readerTestCase) NewReader(disableValidation bool) (diskreader.Reader,

// Write slice data
assert.NoError(tc.TB, os.MkdirAll(tc.Slice.LocalStorage.DirName(tc.VolumePath), 0o750))
assert.NoError(tc.TB, os.WriteFile(tc.Slice.LocalStorage.FileName(tc.VolumePath), tc.SliceData, 0o640))
assert.NoError(tc.TB, os.WriteFile(tc.Slice.LocalStorage.FileName(tc.VolumePath, "my-node"), tc.SliceData, 0o640))

r, err := tc.Volume.OpenReader(tc.Slice)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (v *Volume) OpenReader(slice *model.Slice) (r Reader, err error) {
}

// Open file
filePath := slice.LocalStorage.FileName(v.Path())
filePath := slice.LocalStorage.FileName(v.Path(), "my-node")
logger = logger.With(attribute.String("file.path", filePath))
file, err = opener.OpenFile(filePath)
if err == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (p *slicePipeline) tryOpen() error {
}

// Open remote RPC file
remoteFile, err := rpc.OpenNetworkFile(p.ctx, conn, p.slice.SliceKey)
remoteFile, err := rpc.OpenNetworkFile(p.ctx, p.router.nodeID, conn, p.slice.SliceKey)
if err != nil {
return errors.PrefixErrorf(err, "cannot open network file for the slice %q", p.slice.SliceKey.String())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

type Router struct {
nodeID string
config network.Config
logger log.Logger
balancer Balancer
Expand Down Expand Up @@ -57,8 +58,9 @@ type dependencies interface {
EncodingManager() *encoding.Manager
}

func New(d dependencies, sourceType string, config network.Config) (r *Router, err error) {
func New(d dependencies, sourceNodeID, sourceType string, config network.Config) (r *Router, err error) {
r = &Router{
nodeID: sourceNodeID,
config: config,
logger: d.Logger().WithComponent("storage.router"),
balancer: NewRandomBalancer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type networkFile struct {
fileID uint64
}

func OpenNetworkFile(ctx context.Context, conn *transport.ClientConnection, sliceKey model.SliceKey) (encoding.NetworkOutput, error) {
func OpenNetworkFile(ctx context.Context, sourceNodeID string, conn *transport.ClientConnection, sliceKey model.SliceKey) (encoding.NetworkOutput, error) {
// Use transport layer with multiplexer for connection
dialer := func(_ context.Context, _ string) (net.Conn, error) {
stream, err := conn.OpenStream()
Expand Down Expand Up @@ -74,16 +74,17 @@ func OpenNetworkFile(ctx context.Context, conn *transport.ClientConnection, slic

// Try to open remote file
f := &networkFile{conn: clientConn, rpc: pb.NewNetworkFileClient(clientConn), sliceKey: sliceKey}
if err := f.open(ctx); err != nil {
if err := f.open(ctx, sourceNodeID); err != nil {
_ = clientConn.Close()
return nil, err
}

return f, nil
}

func (f *networkFile) open(ctx context.Context) error {
func (f *networkFile) open(ctx context.Context, sourceNodeID string) error {
resp, err := f.rpc.Open(ctx, &pb.OpenRequest{
SourceNodeId: sourceNodeID,
SliceKey: &pb.SliceKey{
ProjectId: int64(f.sliceKey.ProjectID),
BranchId: int64(f.sliceKey.BranchID),
Expand Down
Loading

0 comments on commit 3460da6

Please sign in to comment.