Skip to content

Commit

Permalink
add message state on conflict error
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Shorsher <[email protected]>
  • Loading branch information
shorsher committed Nov 1, 2024
1 parent 74c139d commit a116123
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error {
conflictErr, conflictTestOk := err.(operations.ConflictError)
if conflictTestOk && conflictErr.IsConflictError() {
// We know that the connector has received our batch, so we shouldn't need to retry
payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
return true, nil
}
} else {
Expand Down
23 changes: 23 additions & 0 deletions internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) {
}
}

type testConflictError struct {
err error
}

func (tce *testConflictError) Error() string {
return tce.err.Error()
}

func (tce *testConflictError) IsConflictError() bool {
return true
}

func TestUnfilledBatch(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down Expand Up @@ -129,6 +141,17 @@ func TestUnfilledBatch(t *testing.T) {
mim.AssertExpectations(t)
}

func TestHandleDispatchConflictError(t *testing.T) {
cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error {
conflictErr := testConflictError{err: fmt.Errorf("pop")}
return &conflictErr
})
defer cancel()
bp.dispatchBatch(&DispatchPayload{})
bp.cancelCtx()
<-bp.done
}

func TestBatchSizeOverflow(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down

0 comments on commit a116123

Please sign in to comment.