Skip to content

Commit

Permalink
feat: Implement Sender.CloseSend to close the request stream (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrongmeal authored Jan 16, 2025
1 parent a2b04f1 commit 9db81ac
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 8 deletions.
2 changes: 1 addition & 1 deletion examples/starwars/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func append(tx s2.Sender[*s2.AppendInput], tail uint64) error {
if err != nil {
return err
}
defer rtx.Close()
defer rtx.CloseSend()

scanner := bufio.NewScanner(conn)
for scanner.Scan() {
Expand Down
5 changes: 4 additions & 1 deletion s2/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func appendRecordBatchingWorker(

var lingerCh <-chan time.Time

// Close the input sender on exit.
defer sender.CloseSend() //nolint:errcheck

for {
if recordsToFlush.IsEmpty() {
lingerCh = make(<-chan time.Time) // Never
Expand Down Expand Up @@ -262,7 +265,7 @@ func (s *AppendRecordBatchingSender) Send(record AppendRecord) error {
}

// Close the sender gracefully.
func (s *AppendRecordBatchingSender) Close() error {
func (s *AppendRecordBatchingSender) CloseSend() error {
s.closeWorkerCancel()
<-s.workerExitCtx.Done()

Expand Down
19 changes: 15 additions & 4 deletions s2/batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func withMaxBatchBytes(n uint) AppendRecordBatchingConfigParam {

type testAppendSessionSender struct {
inputs atomic.Value
closed atomic.Bool
}

func (s *testAppendSessionSender) Inputs(t *testing.T) []*AppendInput {
Expand All @@ -36,6 +37,10 @@ func (s *testAppendSessionSender) Inputs(t *testing.T) []*AppendInput {
}

func (s *testAppendSessionSender) Send(input *AppendInput) error {
if s.closed.Load() {
return ErrAppendRecordSenderClosed
}

if inputs, ok := s.inputs.Load().([]*AppendInput); ok {
inputs = append(inputs, input)
s.inputs.Store(inputs)
Expand All @@ -46,6 +51,12 @@ func (s *testAppendSessionSender) Send(input *AppendInput) error {
return nil
}

func (s *testAppendSessionSender) CloseSend() error {
s.closed.Store(true)

return nil
}

func TestAppendRecordBatchingMechanics(t *testing.T) {
testCases := []struct {
MaxBatchRecords uint
Expand Down Expand Up @@ -90,7 +101,7 @@ func TestAppendRecordBatchingMechanics(t *testing.T) {
require.NoError(t, recordSender.Send(AppendRecord{Body: []byte(body)}))
}

require.NoError(t, recordSender.Close())
require.NoError(t, recordSender.CloseSend())

i := 0

Expand Down Expand Up @@ -169,7 +180,7 @@ func TestAppendRecordBatchingLinger(t *testing.T) {
sendNext("large string")
sendNext("")

require.NoError(t, recordSender.Close())
require.NoError(t, recordSender.CloseSend())

expectedBatches := [][]string{
{"r_0", "r_1"},
Expand Down Expand Up @@ -207,7 +218,7 @@ func TestAppendRecordBatchingErrorSizeLimits(t *testing.T) {
require.NoError(t, err)

require.ErrorIs(t, recordSender.Send(record), ErrRecordTooBig)
require.ErrorIs(t, recordSender.Close(), ErrRecordTooBig)
require.ErrorIs(t, recordSender.CloseSend(), ErrRecordTooBig)
}

func TestAppendRecordBatchingAppendInputOpts(t *testing.T) {
Expand Down Expand Up @@ -240,7 +251,7 @@ func TestAppendRecordBatchingAppendInputOpts(t *testing.T) {
require.NoError(t, recordSender.Send(record))
}

require.NoError(t, recordSender.Close())
require.NoError(t, recordSender.CloseSend())

batches := batchSender.Inputs(t)

Expand Down
4 changes: 2 additions & 2 deletions s2/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func ExampleStreamClient_AppendSession() {
if err != nil {
panic(err)
}
defer recordSender.Close()
defer recordSender.CloseSend() //nolint:errcheck

send := make(chan error)

Expand Down Expand Up @@ -472,7 +472,7 @@ func ExampleAppendRecordBatchingSender() {
if err != nil {
panic(err)
}
defer recordSender.Close()
defer recordSender.CloseSend() //nolint:errcheck

records := []s2.AppendRecord{
{Body: []byte("my record 1")},
Expand Down
6 changes: 6 additions & 0 deletions s2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ type Receiver[T any] interface {
type Sender[T any] interface {
// Block until the item has been sent.
Send(T) error
// Close the sender.
CloseSend() error
}

type recvInner[F, T any] struct {
Expand Down Expand Up @@ -275,6 +277,10 @@ func (r sendInner[F, T]) Send(f F) error {
return r.Client.Send(t)
}

func (r sendInner[F, T]) CloseSend() error {
return r.Client.CloseSend()
}

type implRetentionPolicy interface {
implRetentionPolicy()
}
Expand Down

0 comments on commit 9db81ac

Please sign in to comment.