Skip to content

Commit

Permalink
Merge pull request #1939 from keboola/michaljurecko-PSGO-636
Browse files Browse the repository at this point in the history
feat: Create only one sink per source at a time
  • Loading branch information
michaljurecko authored Jul 26, 2024
2 parents c134e4f + 4ac8966 commit 73ea845
Show file tree
Hide file tree
Showing 33 changed files with 293 additions and 24 deletions.
6 changes: 6 additions & 0 deletions internal/pkg/service/stream/api/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/benbjohnson/clock"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/distlock"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/task"
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/mapper"
Expand All @@ -15,18 +17,22 @@ import (
)

type service struct {
logger log.Logger
clock clock.Clock
publicURL *url.URL
tasks *task.Node
locks *distlock.Provider
definition *definitionRepo.Repository
mapper *mapper.Mapper
}

func New(d dependencies.APIScope, cfg config.Config) api.Service {
return &service{
logger: d.Logger(),
clock: d.Clock(),
publicURL: d.APIPublicURL(),
tasks: d.TaskNode(),
locks: d.DistributedLockProvider(),
definition: d.DefinitionRepository(),
mapper: mapper.New(d, cfg),
}
Expand Down
18 changes: 17 additions & 1 deletion internal/pkg/service/stream/api/service/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"slices"
"strings"
"time"
Expand Down Expand Up @@ -40,7 +41,22 @@ func (s *service) CreateSink(ctx context.Context, d dependencies.SourceRequestSc
ProjectID: d.ProjectID(),
ObjectKey: sink.SinkKey,
Operation: func(ctx context.Context, logger log.Logger) task.Result {
if err := s.definition.Sink().Create(&sink, s.clock.Now(), d.RequestUser(), "New sink.").Do(ctx).Err(); err == nil {
// Lock: create only one sink per source at a time
lockName := fmt.Sprintf("api.source.create.sink.%s", sink.SourceKey)
lock := s.locks.NewMutex(lockName)
if err := lock.Lock(ctx); err != nil {
return task.ErrResult(err)
}
defer func() {
if err := lock.Unlock(ctx); err != nil {
s.logger.Warnf(ctx, "cannot unlock lock %q: %s", lockName, err)
}
}()

// Create sink
op := s.definition.Sink().Create(&sink, s.clock.Now(), d.RequestUser(), "New sink.")
op = op.RequireLock(lock)
if err := op.Do(ctx).Err(); err == nil {
result := task.OkResult("Sink has been created successfully.")
result = s.mapper.WithTaskOutputs(result, sink.SinkKey)
return result
Expand Down
14 changes: 4 additions & 10 deletions internal/pkg/service/stream/api/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@ const (
ExceptionIDPrefix = "keboola-stream-api-"
)

func Start(ctx context.Context, d dependencies.ServiceScope, cfg config.Config) error {
// Create dependencies
apiScp, err := dependencies.NewAPIScope(d, cfg) // nolint:forbidigo
if err != nil {
return err
}

func Start(ctx context.Context, d dependencies.APIScope, cfg config.Config) error {
// Create service
svc := service.New(apiScp, cfg)
svc := service.New(d, cfg)

// Start HTTP server
return httpserver.Start(ctx, apiScp, httpserver.Config{
return httpserver.Start(ctx, d, httpserver.Config{
ListenAddress: cfg.API.Listen,
ErrorNamePrefix: ErrorNamePrefix,
ExceptionIDPrefix: ExceptionIDPrefix,
Expand All @@ -52,7 +46,7 @@ func Start(ctx context.Context, d dependencies.ServiceScope, cfg config.Config)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
next.ServeHTTP(w, req.WithContext(context.WithValue(
req.Context(),
dependencies.PublicRequestScopeCtxKey, dependencies.NewPublicRequestScope(apiScp, req),
dependencies.PublicRequestScopeCtxKey, dependencies.NewPublicRequestScope(d, req),
)))
})
})
Expand Down
20 changes: 12 additions & 8 deletions internal/pkg/service/stream/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Components []Component

type Component string

func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg config.Config, components ...Component) (err error) {
func StartComponents(ctx context.Context, serviceScp dependencies.ServiceScope, cfg config.Config, components ...Component) (err error) {
componentsMap := make(map[Component]bool)
for _, c := range components {
componentsMap[c] = true
Expand All @@ -38,13 +38,13 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
// Common distribution scope
var distScp commonDeps.DistributionScope
if componentsMap[ComponentStorageWriter] || componentsMap[ComponentHTTPSource] || componentsMap[ComponentStorageCoordinator] {
distScp = commonDeps.NewDistributionScope(cfg.NodeID, cfg.Distribution, d)
distScp = commonDeps.NewDistributionScope(cfg.NodeID, cfg.Distribution, serviceScp)
}

// Common distribution locks scope
var distLocksScp commonDeps.DistributedLockScope
if componentsMap[ComponentStorageCoordinator] {
distLocksScp, err = commonDeps.NewDistributedLockScope(ctx, distlock.NewConfig(), d)
if componentsMap[ComponentAPI] || componentsMap[ComponentStorageCoordinator] {
distLocksScp, err = commonDeps.NewDistributedLockScope(ctx, distlock.NewConfig(), serviceScp)
if err != nil {
return err
}
Expand All @@ -53,15 +53,15 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
// Common storage scope
var storageScp dependencies.StorageScope
if componentsMap[ComponentStorageWriter] || componentsMap[ComponentStorageReader] {
storageScp, err = dependencies.NewStorageScope(ctx, d, cfg)
storageScp, err = dependencies.NewStorageScope(ctx, serviceScp, cfg)
if err != nil {
return err
}
}

// Start components, always in the same order
if componentsMap[ComponentStorageCoordinator] {
d, err := dependencies.NewCoordinatorScope(ctx, d, distScp, distLocksScp, cfg)
d, err := dependencies.NewCoordinatorScope(ctx, serviceScp, distScp, distLocksScp, cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -91,13 +91,17 @@ func StartComponents(ctx context.Context, d dependencies.ServiceScope, cfg confi
}

if componentsMap[ComponentAPI] {
if err := api.Start(ctx, d, cfg); err != nil {
apiScp, err := dependencies.NewAPIScope(serviceScp, distLocksScp, cfg) // nolint:forbidigo
if err != nil {
return err
}
if err := api.Start(ctx, apiScp, cfg); err != nil {
return err
}
}

if componentsMap[ComponentHTTPSource] {
d, err := dependencies.NewSourceScope(d, distScp, "http-source", cfg)
d, err := dependencies.NewSourceScope(serviceScp, distScp, "http-source", cfg)
if err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions internal/pkg/service/stream/dependencies/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
// apiSCope implements APIScope interface.
type apiScope struct {
ServiceScope
dependencies.DistributedLockScope
logger log.Logger
apiPublicURL *url.URL
httpSourcePublicURL *url.URL
}

func NewAPIScope(serviceScp ServiceScope, cfg config.Config) (v APIScope, err error) {
return newAPIScope(serviceScp, cfg), nil
func NewAPIScope(serviceScp ServiceScope, distLocksScp dependencies.DistributedLockScope, cfg config.Config) (v APIScope, err error) {
return newAPIScope(serviceScp, distLocksScp, cfg), nil
}

func NewMockedAPIScope(tb testing.TB, opts ...dependencies.MockedOption) (APIScope, Mocked) {
Expand All @@ -29,20 +30,22 @@ func NewMockedAPIScope(tb testing.TB, opts ...dependencies.MockedOption) (APISco
func NewMockedAPIScopeWithConfig(tb testing.TB, modifyConfig func(*config.Config), opts ...dependencies.MockedOption) (APIScope, Mocked) {
tb.Helper()

opts = append(opts, dependencies.WithEnabledTasks("test-node"))
opts = append(opts, dependencies.WithEnabledTasks("test-node"), dependencies.WithEnabledDistributedLocks())
serviceScp, mock := NewMockedServiceScopeWithConfig(tb, modifyConfig, opts...)

apiScp := newAPIScope(serviceScp, mock.TestConfig())
apiScp := newAPIScope(serviceScp, mock, mock.TestConfig())

mock.DebugLogger().Truncate()
return apiScp, mock
}

func newAPIScope(svcScope ServiceScope, cfg config.Config) APIScope {
func newAPIScope(svcScope ServiceScope, distLocksScp dependencies.DistributedLockScope, cfg config.Config) APIScope {
d := &apiScope{}

d.ServiceScope = svcScope

d.DistributedLockScope = distLocksScp

d.logger = svcScope.Logger().WithComponent("api")

d.apiPublicURL = cfg.API.PublicURL
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/service/stream/dependencies/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ServiceScope interface {

type APIScope interface {
ServiceScope
dependencies.DistributedLockScope
APIPublicURL() *url.URL
HTTPSourcePublicURL() *url.URL
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
202
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"taskId": "api.create.source/%%TEST_DEFAULT_BRANCH_ID%%/my-source/%s",
"type": "api.create.source",
"url": "https://stream.keboola.local/v1/tasks/api.create.source/%%TEST_DEFAULT_BRANCH_ID%%/my-source/%s",
"status": "processing",
"isFinished": false,
"createdAt": "%s"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"path": "/v1/branches/default/sources",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"body": {
"name": "My Source",
"type": "http"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
200
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
%A
"status": "success",
%A
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"path": "<<001-create-source:response.url>>",
"method": "GET",
"headers": {
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"repeat": {
"until": "status != 'processing'",
"timeout": 60
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
202
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"taskId": "api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-1/%s",
"type": "api.create.sink",
"url": "https://stream.keboola.local/v1/tasks/api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-1/%s",
"status": "processing",
"isFinished": false,
"createdAt": "%s"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"path": "/v1/branches/default/sources/my-source/sinks",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"body": {
"name": "My Sink 1",
"type": "table",
"table": {
"type": "keboola",
"tableId": "in.c-my-bucket.my-table-1",
"mapping": {
"columns": [
{
"type": "uuid",
"name": "id",
"primaryKey": true
},
{
"type": "body",
"name": "body"
}
]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
202
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"taskId": "api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-2/%s",
"type": "api.create.sink",
"url": "https://stream.keboola.local/v1/tasks/api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-2/%s",
"status": "processing",
"isFinished": false,
"createdAt": "%s"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"path": "/v1/branches/default/sources/my-source/sinks",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"body": {
"name": "My Sink 2",
"type": "table",
"table": {
"type": "keboola",
"tableId": "in.c-my-bucket.my-table-2",
"mapping": {
"columns": [
{
"type": "uuid",
"name": "id",
"primaryKey": true
},
{
"type": "body",
"name": "body"
}
]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
202
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"taskId": "api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-3/%s",
"type": "api.create.sink",
"url": "https://stream.keboola.local/v1/tasks/api.create.sink/%%TEST_DEFAULT_BRANCH_ID%%/my-source/my-sink-3/%s",
"status": "processing",
"isFinished": false,
"createdAt": "%s"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"path": "/v1/branches/default/sources/my-source/sinks",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"body": {
"name": "My Sink 3",
"type": "table",
"table": {
"type": "keboola",
"tableId": "in.c-my-bucket.my-table-3",
"mapping": {
"columns": [
{
"type": "uuid",
"name": "id",
"primaryKey": true
},
{
"type": "body",
"name": "body"
}
]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
200
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
%A
"status": "success",
%A
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"path": "<<003-create-sink-1:response.url>>",
"method": "GET",
"headers": {
"X-StorageApi-Token": "%%TEST_KBC_STORAGE_API_TOKEN%%"
},
"repeat": {
"until": "status != 'processing'",
"timeout": 120
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
200
Loading

0 comments on commit 73ea845

Please sign in to comment.