Skip to content

Commit

Permalink
feat: Add distributed locks to dependencies.ApiScope
Browse files Browse the repository at this point in the history
  • Loading branch information
michaljurecko committed Jul 26, 2024
1 parent 5ed01f8 commit 1272427
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 23 deletions.
6 changes: 6 additions & 0 deletions internal/pkg/service/stream/api/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/benbjohnson/clock"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/distlock"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/task"
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/mapper"
Expand All @@ -15,18 +17,22 @@ import (
)

type service struct {
logger log.Logger
clock clock.Clock
publicURL *url.URL
tasks *task.Node
locks *distlock.Provider
definition *definitionRepo.Repository
mapper *mapper.Mapper
}

func New(d dependencies.APIScope, cfg config.Config) api.Service {
return &service{
logger: d.Logger(),
clock: d.Clock(),
publicURL: d.APIPublicURL(),
tasks: d.TaskNode(),
locks: d.DistributedLockProvider(),
definition: d.DefinitionRepository(),
mapper: mapper.New(d, cfg),
}
Expand Down
14 changes: 4 additions & 10 deletions internal/pkg/service/stream/api/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@ const (
ExceptionIDPrefix = "keboola-stream-api-"
)

func Start(ctx context.Context, d dependencies.ServiceScope, cfg config.Config) error {
// Create dependencies
apiScp, err := dependencies.NewAPIScope(d, cfg) // nolint:forbidigo
if err != nil {
return err
}

func Start(ctx context.Context, d dependencies.APIScope, cfg config.Config) error {
// Create service
svc := service.New(apiScp, cfg)
svc := service.New(d, cfg)

// Start HTTP server
return httpserver.Start(ctx, apiScp, httpserver.Config{
return httpserver.Start(ctx, d, httpserver.Config{
ListenAddress: cfg.API.Listen,
ErrorNamePrefix: ErrorNamePrefix,
ExceptionIDPrefix: ExceptionIDPrefix,
Expand All @@ -52,7 +46,7 @@ func Start(ctx context.Context, d dependencies.ServiceScope, cfg config.Config)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
next.ServeHTTP(w, req.WithContext(context.WithValue(
req.Context(),
dependencies.PublicRequestScopeCtxKey, dependencies.NewPublicRequestScope(apiScp, req),
dependencies.PublicRequestScopeCtxKey, dependencies.NewPublicRequestScope(d, req),
)))
})
})
Expand Down
20 changes: 12 additions & 8 deletions internal/pkg/service/stream/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Components []Component

type Component string

func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg config.Config, components ...Component) (err error) {
func StartComponents(ctx context.Context, serviceScp dependencies.ServiceScope, cfg config.Config, components ...Component) (err error) {
componentsMap := make(map[Component]bool)
for _, c := range components {
componentsMap[c] = true
Expand All @@ -38,13 +38,13 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
// Common distribution scope
var distScp commonDeps.DistributionScope
if componentsMap[ComponentStorageWriter] || componentsMap[ComponentHTTPSource] || componentsMap[ComponentStorageCoordinator] {
distScp = commonDeps.NewDistributionScope(cfg.NodeID, cfg.Distribution, d)
distScp = commonDeps.NewDistributionScope(cfg.NodeID, cfg.Distribution, serviceScp)
}

// Common distribution locks scope
var distLocksScp commonDeps.DistributedLockScope
if componentsMap[ComponentStorageCoordinator] {
distLocksScp, err = commonDeps.NewDistributedLockScope(ctx, distlock.NewConfig(), d)
if componentsMap[ComponentAPI] || componentsMap[ComponentStorageCoordinator] {
distLocksScp, err = commonDeps.NewDistributedLockScope(ctx, distlock.NewConfig(), serviceScp)
if err != nil {
return err
}
Expand All @@ -53,15 +53,15 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
// Common storage scope
var storageScp dependencies.StorageScope
if componentsMap[ComponentStorageWriter] || componentsMap[ComponentStorageReader] {
storageScp, err = dependencies.NewStorageScope(ctx, d, cfg)
storageScp, err = dependencies.NewStorageScope(ctx, serviceScp, cfg)
if err != nil {
return err
}
}

// Start components, always in the same order
if componentsMap[ComponentStorageCoordinator] {
d, err := dependencies.NewCoordinatorScope(ctx, d, distScp, distLocksScp, cfg)
d, err := dependencies.NewCoordinatorScope(ctx, serviceScp, distScp, distLocksScp, cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -91,13 +91,17 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
}

if componentsMap[ComponentAPI] {
if err := api.Start(ctx, d, cfg); err != nil {
apiScp, err := dependencies.NewAPIScope(serviceScp, distLocksScp, cfg) // nolint:forbidigo
if err != nil {
return err
}
if err := api.Start(ctx, apiScp, cfg); err != nil {
return err
}
}

if componentsMap[ComponentHTTPSource] {
d, err := dependencies.NewSourceScope(d, distScp, "http-source", cfg)
d, err := dependencies.NewSourceScope(serviceScp, distScp, "http-source", cfg)
if err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions internal/pkg/service/stream/dependencies/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
// apiSCope implements APIScope interface.
type apiScope struct {
ServiceScope
dependencies.DistributedLockScope
logger log.Logger
apiPublicURL *url.URL
httpSourcePublicURL *url.URL
}

func NewAPIScope(serviceScp ServiceScope, cfg config.Config) (v APIScope, err error) {
return newAPIScope(serviceScp, cfg), nil
func NewAPIScope(serviceScp ServiceScope, distLocksScp dependencies.DistributedLockScope, cfg config.Config) (v APIScope, err error) {
return newAPIScope(serviceScp, distLocksScp, cfg), nil
}

func NewMockedAPIScope(tb testing.TB, opts ...dependencies.MockedOption) (APIScope, Mocked) {
Expand All @@ -29,20 +30,22 @@ func NewMockedAPIScope(tb testing.TB, opts ...dependencies.MockedOption) (APISco
func NewMockedAPIScopeWithConfig(tb testing.TB, modifyConfig func(*config.Config), opts ...dependencies.MockedOption) (APIScope, Mocked) {
tb.Helper()

opts = append(opts, dependencies.WithEnabledTasks("test-node"))
opts = append(opts, dependencies.WithEnabledTasks("test-node"), dependencies.WithEnabledDistributedLocks())
serviceScp, mock := NewMockedServiceScopeWithConfig(tb, modifyConfig, opts...)

apiScp := newAPIScope(serviceScp, mock.TestConfig())
apiScp := newAPIScope(serviceScp, mock, mock.TestConfig())

mock.DebugLogger().Truncate()
return apiScp, mock
}

func newAPIScope(svcScope ServiceScope, cfg config.Config) APIScope {
func newAPIScope(svcScope ServiceScope, distLocksScp dependencies.DistributedLockScope, cfg config.Config) APIScope {
d := &apiScope{}

d.ServiceScope = svcScope

d.DistributedLockScope = distLocksScp

d.logger = svcScope.Logger().WithComponent("api")

d.apiPublicURL = cfg.API.PublicURL
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/service/stream/dependencies/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ServiceScope interface {

type APIScope interface {
ServiceScope
dependencies.DistributedLockScope
APIPublicURL() *url.URL
HTTPSourcePublicURL() *url.URL
}
Expand Down

0 comments on commit 1272427

Please sign in to comment.