Skip to content

Commit

Permalink
feat: Create only one sink per source at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
michaljurecko committed Jul 26, 2024
1 parent 1272427 commit a4e5756
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion internal/pkg/service/stream/api/service/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"slices"
"strings"
"time"
Expand Down Expand Up @@ -40,7 +41,22 @@ func (s *service) CreateSink(ctx context.Context, d dependencies.SourceRequestSc
ProjectID: d.ProjectID(),
ObjectKey: sink.SinkKey,
Operation: func(ctx context.Context, logger log.Logger) task.Result {
if err := s.definition.Sink().Create(&sink, s.clock.Now(), d.RequestUser(), "New sink.").Do(ctx).Err(); err == nil {
// Lock: create only one sink per source at a time
lockName := fmt.Sprintf("api.source.create.sink.%s", sink.SourceKey)
lock := s.locks.NewMutex(lockName)
if err := lock.Lock(ctx); err != nil {
return task.ErrResult(err)
}
defer func() {
if err := lock.Unlock(ctx); err != nil {
s.logger.Warnf(ctx, "cannot unlock lock %q: %s", lockName, err)
}
}()

// Create sink
op := s.definition.Sink().Create(&sink, s.clock.Now(), d.RequestUser(), "New sink.")
op = op.RequireLock(lock)
if err := op.Do(ctx).Err(); err == nil {
result := task.OkResult("Sink has been created successfully.")
result = s.mapper.WithTaskOutputs(result, sink.SinkKey)
return result
Expand Down

0 comments on commit a4e5756

Please sign in to comment.