From 9bf21a0bbc18fe01079a789da8424aa8f4de8c9d Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 27 Jan 2025 23:53:38 +0530 Subject: [PATCH] feat: Add `s2-bentobox` package (#36) Common functionalities for implementing the Bento plugins. --- Taskfile.yml | 2 +- internal/pb/s2.pb.go | 14 +- s2-bentobox/bentobox.go | 62 ++++++++ s2-bentobox/input.go | 336 ++++++++++++++++++++++++++++++++++++++++ s2-bentobox/output.go | 167 ++++++++++++++++++++ 5 files changed, 573 insertions(+), 8 deletions(-) create mode 100644 s2-bentobox/bentobox.go create mode 100644 s2-bentobox/input.go create mode 100644 s2-bentobox/output.go diff --git a/Taskfile.yml b/Taskfile.yml index 394df54..b281e95 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -42,7 +42,7 @@ tasks: - | git diff --exit-code internal/pb/*.go s2/*.sync.go && echo "Up-to-date" || { echo "Not up-to-date" - echo "Run task:gen and update the generated code" + echo "Run `task gen` and update the generated code" exit 1 } diff --git a/internal/pb/s2.pb.go b/internal/pb/s2.pb.go index 7b369ec..4507fc2 100644 --- a/internal/pb/s2.pb.go +++ b/internal/pb/s2.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.3 +// protoc-gen-go v1.36.4 // protoc v5.29.3 // source: s2.proto @@ -12,6 +12,7 @@ import ( fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -2491,7 +2492,7 @@ func (x *SequencedRecordBatch) GetRecords() []*SequencedRecord { var File_s2_proto protoreflect.FileDescriptor -var file_s2_proto_rawDesc = []byte{ +var file_s2_proto_rawDesc = string([]byte{ 0x0a, 0x08, 0x73, 0x32, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x73, 0x32, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x6d, 0x61, @@ -2830,16 +2831,16 @@ var file_s2_proto_rawDesc = []byte{ 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x30, 0x01, 0x42, 0x0e, 0x0a, 0x0a, 0x73, 0x32, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x50, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_s2_proto_rawDescOnce sync.Once - file_s2_proto_rawDescData = file_s2_proto_rawDesc + file_s2_proto_rawDescData []byte ) func file_s2_proto_rawDescGZIP() []byte { file_s2_proto_rawDescOnce.Do(func() { - file_s2_proto_rawDescData = protoimpl.X.CompressGZIP(file_s2_proto_rawDescData) + file_s2_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_s2_proto_rawDesc), len(file_s2_proto_rawDesc))) }) return file_s2_proto_rawDescData } @@ -2987,7 +2988,7 @@ func file_s2_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_s2_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_s2_proto_rawDesc), len(file_s2_proto_rawDesc)), NumEnums: 2, NumMessages: 42, NumExtensions: 0, @@ -2999,7 +3000,6 @@ func file_s2_proto_init() { MessageInfos: file_s2_proto_msgTypes, }.Build() File_s2_proto = out.File - file_s2_proto_rawDesc = nil file_s2_proto_goTypes = nil file_s2_proto_depIdxs = nil } diff --git a/s2-bentobox/bentobox.go b/s2-bentobox/bentobox.go new file mode 100644 index 0000000..10fe561 --- /dev/null +++ b/s2-bentobox/bentobox.go @@ -0,0 +1,62 @@ +package s2bentobox + +import ( + "context" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +// Plugin name. +const PluginName = "s2" + +func newBasinClient(config *Config) (*s2.BasinClient, error) { + return s2.NewBasinClient(config.Basin, config.AuthToken) +} + +func newStreamClient(config *Config, stream string) (*s2.StreamClient, error) { + client, err := newBasinClient(config) + if err != nil { + return nil, err + } + + // `s2.BasinClient.StreamClient` is a cheap operation. + return client.StreamClient(stream), nil +} + +func waitForClosers(ctx context.Context, closers ...<-chan struct{}) error { + for _, closer := range closers { + select { + case <-ctx.Done(): + return ctx.Err() + case <-closer: + } + } + + return nil +} + +func notifyOnce(ch chan<- struct{}) { + select { + case ch <- struct{}{}: + default: + } +} + +type Logger interface { + Tracef(template string, args ...any) + Trace(message string) + Debugf(template string, args ...any) + Debug(message string) + Infof(template string, args ...any) + Info(message string) + Warnf(template string, args ...any) + Warn(message string) + Errorf(template string, args ...any) + Error(message string) + With(keyValuePairs ...any) Logger +} + +type Config struct { + Basin string + AuthToken string +} diff --git a/s2-bentobox/input.go b/s2-bentobox/input.go new file mode 100644 index 0000000..126871e --- /dev/null +++ b/s2-bentobox/input.go @@ -0,0 +1,336 @@ +package s2bentobox + +import ( + "context" + "errors" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +var ErrInputClosed = errors.New("input closed") + +type SeqNumCache interface { + Get(ctx context.Context, stream string) (uint64, error) + Set(ctx context.Context, stream string, seqNum uint64) error +} + +type InputStreams interface { + list(ctx context.Context, client *s2.BasinClient) ([]string, error) +} + +type StaticInputStreams struct { + Streams []string +} + +func (s StaticInputStreams) list(context.Context, *s2.BasinClient) ([]string, error) { //nolint:unparam + return s.Streams, nil +} + +type PrefixedInputStreams struct { + Prefix string +} + +func (p PrefixedInputStreams) list(ctx context.Context, client *s2.BasinClient) ([]string, error) { + var ( + hasMore = true + streams []string + startAfter string + ) + + for hasMore { + list, err := client.ListStreams(ctx, &s2.ListStreamsRequest{Prefix: p.Prefix, StartAfter: startAfter}) + if err != nil { + return nil, err + } + + for _, stream := range list.Streams { + if stream.DeletedAt != nil { + // The stream is deleted. + continue + } + + streams = append(streams, stream.Name) + startAfter = stream.Name + } + + hasMore = list.HasMore + } + + return streams, nil +} + +type InputConfig struct { + *Config + Streams InputStreams + MaxInFlight int + Cache SeqNumCache + Logger Logger +} + +type recvOutput struct { + Stream string + Batch *s2.SequencedRecordBatch + Err error +} + +type Input struct { + config *InputConfig + recvCh <-chan recvOutput + streamsManagerCloser <-chan struct{} + cancelSession context.CancelFunc + closeWorker chan<- string +} + +func ConnectInput(ctx context.Context, config *InputConfig) (*Input, error) { + client, err := newBasinClient(config.Config) + if err != nil { + return nil, err + } + + sessionCtx, cancelSession := context.WithCancel(ctx) + + streamsManagerCloser := make(chan struct{}) + closeWorker := make(chan string, config.MaxInFlight) + recvCh := make(chan recvOutput, config.MaxInFlight) + + go streamsManagerWorker( + sessionCtx, + client, + config, + /* updateDuration = */ time.Minute, + recvCh, + closeWorker, + streamsManagerCloser, + ) + + return &Input{ + config: config, + recvCh: recvCh, + streamsManagerCloser: streamsManagerCloser, + cancelSession: cancelSession, + closeWorker: closeWorker, + }, nil +} + +type streamWorker struct { + cancel context.CancelFunc + closer <-chan struct{} +} + +func (sw *streamWorker) IsClosed() bool { + select { + case <-sw.closer: + return true + default: + return false + } +} + +func (sw *streamWorker) Close() { + sw.cancel() +} + +func (sw *streamWorker) Wait() { + <-sw.closer +} + +func streamsManagerWorker( + ctx context.Context, + client *s2.BasinClient, + config *InputConfig, + updateDuration time.Duration, + recvCh chan<- recvOutput, + closeWorker <-chan string, + streamsManagerCloser chan<- struct{}, +) { + defer close(streamsManagerCloser) + + var ( + existingWorkers = make(map[string]streamWorker) + ticker = time.NewTicker(updateDuration) + exitNotifier = make(chan struct{}, 1) + updateListNotifier = make(chan struct{}, 1) + ) + + defer ticker.Stop() + + notifyOnce(updateListNotifier) + +outerLoop: + for { + select { + case <-ctx.Done(): + break outerLoop + + case <-exitNotifier: + for stream, worker := range existingWorkers { + if worker.IsClosed() { + delete(existingWorkers, stream) + // Maybe the worker exited due to an unexpected error. Let's list immediately. + notifyOnce(updateListNotifier) + } + } + + case stream := <-closeWorker: + if worker, found := existingWorkers[stream]; found { + worker.Close() + } + + case <-updateListNotifier: + case <-ticker.C: + } + + newStreams, err := config.Streams.list(ctx, client) + if err != nil { + config.Logger. + With("error", err). + Warn("Failed to list streams") + notifyOnce(updateListNotifier) + + continue + } + + newStreamsSet := make(map[string]struct{}) + for _, stream := range newStreams { + newStreamsSet[stream] = struct{}{} + + if _, found := existingWorkers[stream]; !found { + workerCtx, cancelWorker := context.WithCancel(ctx) + workerCloser := make(chan struct{}) + worker := streamWorker{ + cancel: cancelWorker, + closer: workerCloser, + } + existingWorkers[stream] = worker + go receiverWorker(workerCtx, client, config, stream, recvCh, workerCloser, exitNotifier) + } + } + + for stream, worker := range existingWorkers { + if _, found := newStreamsSet[stream]; !found { + // Cancel the worker that's not in the new list. + config.Logger.With("stream", stream).Warn("Not reading from S2 source anymore") + worker.Close() + delete(existingWorkers, stream) + } + } + } + + // Close and wait for all to exit. + for _, worker := range existingWorkers { + worker.Close() + worker.Wait() + } +} + +func receiverWorker( + ctx context.Context, + client *s2.BasinClient, + config *InputConfig, + stream string, + recvCh chan<- recvOutput, + closer chan<- struct{}, + exitNotifier chan<- struct{}, +) { + defer notifyOnce(exitNotifier) + defer close(closer) + + startSeqNum, err := config.Cache.Get(ctx, stream) + if err != nil { + config.Logger. + With( + "error", err, + "stream", stream, + ). + Warn("Failed to get last sequence number from cache. Setting start seq num to 0") + + startSeqNum = 0 + } + + receiver, err := client.StreamClient(stream).ReadSession(ctx, &s2.ReadSessionRequest{ + StartSeqNum: startSeqNum, + }) + if err != nil { + config.Logger. + With( + "error", err, + "stream", stream, + ). + Error("Failed to initialize receiver") + + return + } + + config.Logger.With("stream", stream).Info("Reading from S2 source") + defer config.Logger.With("stream", stream).Debug("Exiting S2 source worker") + + for { + output, err := receiver.Recv() + if err != nil { + select { + case recvCh <- recvOutput{Stream: stream, Err: err}: + case <-ctx.Done(): + } + + return + } + + if batch, ok := output.(s2.ReadOutputBatch); ok { + select { + case recvCh <- recvOutput{Stream: stream, Batch: batch.SequencedRecordBatch}: + case <-ctx.Done(): + return + } + } + } +} + +func (i *Input) ReadBatch(ctx context.Context) (*s2.SequencedRecordBatch, string, error) { + select { + case output := <-i.recvCh: + if output.Err != nil { + return nil, output.Stream, output.Err + } + + return output.Batch, output.Stream, nil + + case <-ctx.Done(): + return nil, "", ctx.Err() + + case <-i.streamsManagerCloser: + if err := waitForClosers(ctx, i.streamsManagerCloser); err != nil { + return nil, "", err + } + + return nil, "", ErrInputClosed + } +} + +func (i *Input) AckFunc(stream string, batch *s2.SequencedRecordBatch) func(context.Context, error) error { + return func(ctx context.Context, err error) error { + if err == nil && batch != nil && len(batch.Records) > 0 { + // Update the cache with the last sequence number + lastSeqNum := batch.Records[len(batch.Records)-1].SeqNum + + return i.config.Cache.Set(ctx, stream, lastSeqNum) + } + + // Close a specific worker and let it restart. + i.closeWorker <- stream + + return nil + } +} + +func (i *Input) Close(ctx context.Context) error { + i.cancelSession() + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-i.streamsManagerCloser: + return waitForClosers(ctx, i.streamsManagerCloser) + } +} diff --git a/s2-bentobox/output.go b/s2-bentobox/output.go new file mode 100644 index 0000000..d5494ac --- /dev/null +++ b/s2-bentobox/output.go @@ -0,0 +1,167 @@ +package s2bentobox + +import ( + "context" + "errors" + "io" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +// Errors. +var ( + ErrOutputClosed = errors.New("output closed") + ErrAppendRecordBatchFull = errors.New("append record batch is full") +) + +type OutputConfig struct { + *Config + Stream string + MaxInFlight int + FencingToken []byte +} + +type Output struct { + sendCh chan<- sendInput + ackStreamCloser <-chan struct{} + appendWorkerCloser <-chan struct{} + cancelSession context.CancelFunc +} + +func ConnectOutput(ctx context.Context, config *OutputConfig) (*Output, error) { + client, err := newStreamClient(config.Config, config.Stream) + if err != nil { + return nil, err + } + + sessionCtx, cancelSession := context.WithCancel(ctx) + + sender, receiver, err := client.AppendSession(sessionCtx) + if err != nil { + cancelSession() + + return nil, err + } + + var ( + sendCh = make(chan sendInput, config.MaxInFlight) + replyQ = make(chan chan<- error, config.MaxInFlight) + ackStreamCloser = make(chan struct{}) + appendWorkerCloser = make(chan struct{}) + ) + + go ackStream(receiver, replyQ, ackStreamCloser) + go appendWorker(sender, sendCh, replyQ, config, appendWorkerCloser) + + return &Output{ + sendCh: sendCh, + ackStreamCloser: ackStreamCloser, + appendWorkerCloser: appendWorkerCloser, + cancelSession: cancelSession, + }, nil +} + +type sendInput struct { + batch *s2.AppendRecordBatch + reply chan<- error +} + +func ackStream( + receiver s2.Receiver[*s2.AppendOutput], + replyQ <-chan chan<- error, + ackStreamCloser chan<- struct{}, +) { + defer close(ackStreamCloser) + + for reply := range replyQ { + _, err := receiver.Recv() + reply <- err + + if err != nil { + // End the stream since it's basically poisoned. + return + } + } +} + +func appendWorker( + sender s2.Sender[*s2.AppendInput], + sendCh <-chan sendInput, + replyQ chan<- chan<- error, + config *OutputConfig, + appendWorkerCloser chan<- struct{}, +) { + defer close(appendWorkerCloser) + defer close(replyQ) + defer func(sender s2.Sender[*s2.AppendInput]) { + _ = sender.CloseSend() + }(sender) + + for s := range sendCh { + input := s2.AppendInput{ + Records: s.batch, + FencingToken: config.FencingToken, + MatchSeqNum: nil, + } + + if err := sender.Send(&input); err != nil { + if errors.Is(err, io.EOF) { + // Stream is closed. Exit immediately. + return + } + + // TODO: Add a retry limit here. This may keep on happening due to a connection error too. + s.reply <- err + + continue + } + + replyQ <- s.reply + } +} + +func (o *Output) waitForClosers(ctx context.Context, done error) error { + if err := waitForClosers(ctx, o.ackStreamCloser, o.appendWorkerCloser); err != nil { + return err + } + + return done +} + +func (o *Output) WriteBatch(ctx context.Context, batch *s2.AppendRecordBatch) error { + reply := make(chan error, 1) + + select { + case o.sendCh <- sendInput{ + batch: batch, + reply: reply, + }: // OK + + case <-ctx.Done(): + return ctx.Err() + + case <-o.appendWorkerCloser: + return o.waitForClosers(ctx, ErrOutputClosed) + } + + select { + case err := <-reply: + return err + + case <-ctx.Done(): + return ctx.Err() + + case <-o.appendWorkerCloser: + return o.waitForClosers(ctx, ErrOutputClosed) + } +} + +func (o *Output) Close(ctx context.Context) error { + // NOTE: This assumes the `Close` method is only called once. + close(o.sendCh) + + // Cancel the session for abandoning the requests. + o.cancelSession() + + return o.waitForClosers(ctx, nil) +}