Skip to content

Commit

Permalink
Merge pull request #1874 from keboola/michaljurecko-PSGO-597-improve-…
Browse files Browse the repository at this point in the history
…shutdown

feat: Move graceful shutdown code to pkgs, where it is missing
  • Loading branch information
michaljurecko authored Jul 3, 2024
2 parents cd72fa6 + 3c54408 commit d405975
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestStart(t *testing.T) {
require.NoError(t, stream.StartComponents(ctx, d, mock.TestConfig(), stream.ComponentHTTPSource))

// Wait for the HTTP server
require.NoError(t, netutils.WaitForTCP(listenAddr, time.Second))
require.NoError(t, netutils.WaitForHTTP(url, 10*time.Second))
logger.AssertJSONMessages(t, `
{"level":"info","message":"starting HTTP source node","component":"http-source"}
{"level":"info","message":"started HTTP source on \"0.0.0.0:%d\"","component":"http-source"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"github.com/benbjohnson/clock"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/opener"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type collection = volume.Collection[*Volume]

type Volumes struct {
*collection
logger log.Logger
logger log.Logger
collection *volume.Collection[*Volume]
}

// OpenVolumes function detects and opens all volumes in the path.
func OpenVolumes(ctx context.Context, logger log.Logger, clock clock.Clock, nodeID, volumesPath string, opts ...Option) (out *Volumes, err error) {
func OpenVolumes(ctx context.Context, logger log.Logger, clock clock.Clock, process *servicectx.Process, nodeID, volumesPath string, opts ...Option) (out *Volumes, err error) {
out = &Volumes{logger: logger}
out.collection, err = opener.OpenVolumes(ctx, logger, nodeID, volumesPath, func(spec volume.Spec) (*Volume, error) {
return Open(ctx, logger, clock, spec, opts...)
Expand All @@ -27,10 +27,19 @@ func OpenVolumes(ctx context.Context, logger log.Logger, clock clock.Clock, node
return nil, err
}

// Graceful shutdown
process.OnShutdown(func(ctx context.Context) {
logger.Info(ctx, "closing volumes")
if err := out.collection.Close(ctx); err != nil {
err := errors.PrefixError(err, "cannot close volumes")
logger.Error(ctx, err.Error())
}
logger.Info(ctx, "closed volumes")
})

return out, nil
}

func (v *Volumes) Close(ctx context.Context) error {
v.logger.Info(ctx, "closing volumes")
return v.collection.Close(ctx)
func (v *Volumes) Collection() *volume.Collection[*Volume] {
return v.collection
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/stretchr/testify/assert"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func TestVolumes(t *testing.T) {
Expand All @@ -23,6 +25,7 @@ func TestVolumes(t *testing.T) {

logger := log.NewDebugLogger()
clk := clock.New()
process := servicectx.New()

// Create volumes directories
volumesPath := t.TempDir()
Expand All @@ -44,7 +47,7 @@ func TestVolumes(t *testing.T) {
done := make(chan struct{})
go func() {
defer close(done)
volumes, err = OpenVolumes(ctx, logger, clk, "my-node", volumesPath)
volumes, err = OpenVolumes(ctx, logger, clk, process, "my-node", volumesPath)
assert.NoError(t, err)
}()

Expand All @@ -66,16 +69,17 @@ func TestVolumes(t *testing.T) {
}

// Check opened volumes
assert.Len(t, volumes.All(), 5)
assert.Len(t, volumes.VolumeByType("foo"), 0)
assert.Len(t, volumes.VolumeByType("hdd"), 3)
assert.Len(t, volumes.VolumeByType("ssd"), 2)
assert.Len(t, volumes.Collection().All(), 5)
assert.Len(t, volumes.Collection().VolumeByType("foo"), 0)
assert.Len(t, volumes.Collection().VolumeByType("hdd"), 3)
assert.Len(t, volumes.Collection().VolumeByType("ssd"), 2)
for _, id := range []volume.ID{"HDD_1", "HDD_2", "HDD_3", "SSD_1", "SSD_2"} {
vol, err := volumes.Volume(id)
vol, err := volumes.Collection().Volume(id)
assert.NotNil(t, vol)
assert.NoError(t, err)
}

// Close volumes
assert.NoError(t, volumes.Close(ctx))
process.Shutdown(ctx, errors.New("bye bye"))
process.WaitForShutdown()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func RegisterVolumes[V volume.Volume](cfg Config, d dependencies, volumes *volum
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
d.Process().OnShutdown(func(ctx context.Context) {
logger.Info(ctx, "received shutdown request")
logger.Info(ctx, "stopping volumes registration")
cancel()
wg.Wait()
logger.Info(ctx, "shutdown done")
logger.Info(ctx, "stopped volumes registration")
})

// Register volumes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer/test"
Expand All @@ -27,6 +28,7 @@ func TestEventWriter(t *testing.T) {

logger := log.NewDebugLogger()
clk := clock.New()
process := servicectx.New()

// There are 2 volumes
volumesPath := t.TempDir()
Expand All @@ -36,7 +38,7 @@ func TestEventWriter(t *testing.T) {
assert.NoError(t, os.WriteFile(filepath.Join(volumesPath, "hdd", "2", volume.IDFile), []byte("HDD_2"), 0o640))

// Detect volumes
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, process, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
assert.NoError(t, err)

// Register "OnWriterOpen" and "OnWriterClose" events on the "volumes" level
Expand All @@ -58,9 +60,9 @@ func TestEventWriter(t *testing.T) {
})

// Register "OnWriterOpen" and "OnWriterClose" events on the "volume" level
vol1, err := volumes.Volume("HDD_1")
vol1, err := volumes.Collection().Volume("HDD_1")
assert.NoError(t, err)
vol2, err := volumes.Volume("HDD_2")
vol2, err := volumes.Collection().Volume("HDD_2")
assert.NoError(t, err)
vol1.Events().OnWriterOpen(func(w writer.Writer) error {
logger.Infof(ctx, `EVENT: slice: "%s", event: OPEN (3), level: volume1`, w.SliceKey().OpenedAt())
Expand Down Expand Up @@ -120,7 +122,8 @@ func TestEventWriter(t *testing.T) {
})

// Close all
assert.NoError(t, volumes.Close(ctx))
process.Shutdown(ctx, errors.New("bye bye"))
process.WaitForShutdown()

// Check logs, closing is parallel, so writers logs are checked separately
logger.AssertJSONMessages(t, `
Expand Down Expand Up @@ -157,14 +160,15 @@ func TestWriterEvents_OpenError(t *testing.T) {

logger := log.NewDebugLogger()
clk := clock.New()
process := servicectx.New()

// There are 2 volumes
volumesPath := t.TempDir()
assert.NoError(t, os.MkdirAll(filepath.Join(volumesPath, "hdd", "1"), 0o750))
assert.NoError(t, os.WriteFile(filepath.Join(volumesPath, "hdd", "1", volume.IDFile), []byte("HDD_1"), 0o640))

// Detect volumes
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, process, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
assert.NoError(t, err)

// Register "OnWriterOpen" event on the "volumes" level
Expand All @@ -173,7 +177,7 @@ func TestWriterEvents_OpenError(t *testing.T) {
})

// Register "OnWriterOpen" event on the "volume" level
vol, err := volumes.Volume("HDD_1")
vol, err := volumes.Collection().Volume("HDD_1")
assert.NoError(t, err)
vol.Events().OnWriterOpen(func(w writer.Writer) error {
return errors.New("error (2)")
Expand All @@ -186,7 +190,8 @@ func TestWriterEvents_OpenError(t *testing.T) {
}

// Close volumes
assert.NoError(t, volumes.Close(ctx))
process.Shutdown(ctx, errors.New("bye bye"))
process.WaitForShutdown()
}

func TestEventWriter_CloseError(t *testing.T) {
Expand All @@ -197,14 +202,15 @@ func TestEventWriter_CloseError(t *testing.T) {

logger := log.NewDebugLogger()
clk := clock.New()
process := servicectx.New()

// There are 2 volumes
volumesPath := t.TempDir()
assert.NoError(t, os.MkdirAll(filepath.Join(volumesPath, "hdd", "1"), 0o750))
assert.NoError(t, os.WriteFile(filepath.Join(volumesPath, "hdd", "1", volume.IDFile), []byte("HDD_1"), 0o640))

// Detect volumes
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
volumes, err := writerVolume.OpenVolumes(ctx, logger, clk, process, "my-node", volumesPath, writer.NewConfig(), writerVolume.WithFormatWriterFactory(test.DummyWriterFactory))
assert.NoError(t, err)

// Register "OnWriterClose" event on the "volumes" level
Expand All @@ -213,7 +219,7 @@ func TestEventWriter_CloseError(t *testing.T) {
})

// Register "OnWriterClose" event on the "volume" level
vol, err := volumes.Volume("HDD_1")
vol, err := volumes.Collection().Volume("HDD_1")
assert.NoError(t, err)
vol.Events().OnWriterClose(func(w writer.Writer, _ error) error {
return errors.New("error (2)")
Expand All @@ -235,5 +241,6 @@ func TestEventWriter_CloseError(t *testing.T) {
}

// Close volumes
assert.NoError(t, volumes.Close(ctx))
process.Shutdown(ctx, errors.New("bye bye"))
process.WaitForShutdown()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,46 @@ import (
"github.com/benbjohnson/clock"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/opener"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type collection = model.Collection[*Volume]

type Volumes struct {
*collection
logger log.Logger
events *writer.Events
logger log.Logger
events *writer.Events
collection *volume.Collection[*Volume]
}

// OpenVolumes function detects and opens all volumes in the path.
func OpenVolumes(ctx context.Context, logger log.Logger, clock clock.Clock, nodeID, volumesPath string, wrCfg writer.Config, opts ...Option) (out *Volumes, err error) {
func OpenVolumes(ctx context.Context, logger log.Logger, clock clock.Clock, process *servicectx.Process, nodeID, volumesPath string, wrCfg writer.Config, opts ...Option) (out *Volumes, err error) {
out = &Volumes{logger: logger, events: writer.NewEvents()}
out.collection, err = opener.OpenVolumes(ctx, logger, nodeID, volumesPath, func(spec model.Spec) (*Volume, error) {
out.collection, err = opener.OpenVolumes(ctx, logger, nodeID, volumesPath, func(spec volume.Spec) (*Volume, error) {
return Open(ctx, logger, clock, out.events.Clone(), wrCfg, spec, opts...)
})
if err != nil {
return nil, err
}

// Graceful shutdown
process.OnShutdown(func(ctx context.Context) {
logger.Info(ctx, "closing volumes")
if err := out.collection.Close(ctx); err != nil {
err := errors.PrefixError(err, "cannot close volumes")
logger.Error(ctx, err.Error())
}
logger.Info(ctx, "closed volumes")
})

return out, nil
}

func (v *Volumes) Collection() *model.Collection[*Volume] {
func (v *Volumes) Collection() *volume.Collection[*Volume] {
return v.collection
}

func (v *Volumes) Events() *writer.Events {
return v.events
}

func (v *Volumes) Close(ctx context.Context) error {
v.logger.Info(ctx, "closing volumes")
return v.collection.Close(ctx)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/stretchr/testify/assert"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/writer"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func TestOpenVolumes(t *testing.T) {
Expand All @@ -23,6 +25,7 @@ func TestOpenVolumes(t *testing.T) {

logger := log.NewDebugLogger()
clk := clock.New()
process := servicectx.New()
wrCfg := writer.NewConfig()

// Create volumes directories
Expand All @@ -48,7 +51,7 @@ func TestOpenVolumes(t *testing.T) {
done := make(chan struct{})
go func() {
defer close(done)
volumes, err = OpenVolumes(ctx, logger, clk, "my-node", volumesPath, wrCfg)
volumes, err = OpenVolumes(ctx, logger, clk, process, "my-node", volumesPath, wrCfg)
assert.NoError(t, err)
}()

Expand All @@ -62,13 +65,13 @@ func TestOpenVolumes(t *testing.T) {
}

// Check opened volumes
assert.Len(t, volumes.All(), 6)
assert.Len(t, volumes.VolumeByType("foo"), 0)
assert.Len(t, volumes.VolumeByType("hdd"), 3)
assert.Len(t, volumes.VolumeByType("ssd"), 2)
assert.Len(t, volumes.VolumeByType("drained"), 1)
assert.Len(t, volumes.Collection().All(), 6)
assert.Len(t, volumes.Collection().VolumeByType("foo"), 0)
assert.Len(t, volumes.Collection().VolumeByType("hdd"), 3)
assert.Len(t, volumes.Collection().VolumeByType("ssd"), 2)
assert.Len(t, volumes.Collection().VolumeByType("drained"), 1)
for _, id := range []volume.ID{"HDD_1", "HDD_2"} {
vol, err := volumes.Volume(id)
vol, err := volumes.Collection().Volume(id)
assert.NotNil(t, vol)
assert.NoError(t, err)
}
Expand All @@ -80,11 +83,12 @@ func TestOpenVolumes(t *testing.T) {
} {
content, err := os.ReadFile(path)
assert.NoError(t, err)
vol, err := volumes.Volume(volume.ID(content))
vol, err := volumes.Collection().Volume(volume.ID(content))
assert.NotNil(t, vol)
assert.NoError(t, err)
}

// Close volumes
assert.NoError(t, volumes.Close(ctx))
process.Shutdown(ctx, errors.New("bye bye"))
process.WaitForShutdown()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/reader/volume"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type dependencies interface {
Expand All @@ -27,18 +26,10 @@ func Start(ctx context.Context, d dependencies, cfg config.Config) error {
logger.Info(ctx, `starting storage reader node`)

// Open volumes
volumes, err := volume.OpenVolumes(ctx, logger, d.Clock(), cfg.NodeID, cfg.Storage.VolumesPath)
_, err := volume.OpenVolumes(ctx, logger, d.Clock(), d.Process(), cfg.NodeID, cfg.Storage.VolumesPath)
if err != nil {
return err
}

// Graceful shutdown
d.Process().OnShutdown(func(ctx context.Context) {
if err := volumes.Close(ctx); err != nil {
err := errors.PrefixError(err, "`cannot close reader volumes")
logger.Error(ctx, err.Error())
}
})

return nil
}
Loading

0 comments on commit d405975

Please sign in to comment.