diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index b4810d0..0a6e3df 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -73,9 +73,9 @@ func (m *manager) restoreStreams() error { } func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { - var lastInPage *fftypes.UUID for { - listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) + // Do not specify after as we just delete everything + listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, nil, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) if err != nil { return err } @@ -83,7 +83,6 @@ func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftype break } for _, def := range listenerDefs { - lastInPage = def.ID if err := m.persistence.DeleteListener(ctx, def.ID); err != nil { return err } diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 5cbfe36..054f458 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -194,6 +194,37 @@ func TestDeleteStartedListenerFail(t *testing.T) { mp.AssertExpectations(t) } +func TestDeleteStartedListenerWithPagination(t *testing.T) { + + _, m, close := newTestManagerMockPersistence(t) + defer close() + + esID := apitypes.NewULID() + lID := apitypes.NewULID() + secondID := apitypes.NewULID() + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: lID, StreamID: esID}, + {ID: secondID, StreamID: esID}, + }, nil).Once() + thirdID := apitypes.NewULID() + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: thirdID, StreamID: esID}, + }, nil).Once() + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{}, nil) + mp.On("DeleteListener", m.ctx, lID).Return(nil) + mp.On("DeleteListener", m.ctx, secondID).Return(nil) + mp.On("DeleteListener", m.ctx, thirdID).Return(nil) + + err := m.deleteAllStreamListeners(m.ctx, esID) + assert.NoError(t, err) + + mp.AssertExpectations(t) +} + func TestDeleteStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t)