Skip to content

Commit

Permalink
[Testing] fix: flaky tests in observable & client pkgs (#124)
Browse files Browse the repository at this point in the history
* reactor: rename `event` to `eventsBz` for consistency

* fix: increase delay in `Map` test to prevent flakiness

* chore: improve test assertions

* fix: remove redundant unsubscription

* refactor: rename `publisher`  var to `publishCh`

* fix: close `publishCh` instead of relying on context cancellation to prevent test flakiness

* fix: tune delay & timeout in observable test to prevent flakiness

* chore: itest.sh prints total tests run when exiting early

* chore: simplify

* fixup: delay & timeout
  • Loading branch information
bryanchriswhite authored Nov 2, 2023
1 parent f7a5274 commit 94e2665
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 44 deletions.
4 changes: 2 additions & 2 deletions pkg/client/events_query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz(
// Read and handle messages from the websocket. This loop will exit when the
// websocket connection is isClosed and/or returns an error.
for {
event, err := conn.Receive()
eventBz, err := conn.Receive()
if err != nil {
// TODO_CONSIDERATION: should we close the publish channel here too?

Expand All @@ -226,7 +226,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz(
}

// Populate the []byte side (right) of the either and publish it.
eventsBzPublishCh <- either.Success(event)
eventsBzPublishCh <- either.Success(eventBz)
}
}

Expand Down
28 changes: 22 additions & 6 deletions pkg/client/events_query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -23,8 +24,6 @@ import (
)

func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) {
t.Skip("TODO_BUG(@bryanchriswhite): See #120 for more details")

var (
readObserverEventsTimeout = time.Second
queryCounter int
Expand Down Expand Up @@ -60,7 +59,12 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) {
readEventCounter int
// HandleEventsLimit is the total number of eventsBytesAndConns to send and
// receive through the query client's eventsBytes for this subtest.
handleEventsLimit = 250
handleEventsLimit = 250
// delayFirstEvent runs once (per test case) to delay the first event
// published by the mocked connection's Receive method to give the test
// ample time to subscribe to the events bytes observable before it
// starts receiving events, otherwise they will be dropped.
delayFirstEvent sync.Once
connClosed atomic.Bool
queryCtx, cancelQuery = context.WithCancel(rootCtx)
)
Expand All @@ -84,6 +88,8 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) {
// last message.
connMock.EXPECT().Receive().
DoAndReturn(func() (any, error) {
delayFirstEvent.Do(func() { time.Sleep(50 * time.Millisecond) })

// Simulate ErrConnClosed if connection is isClosed.
if connClosed.Load() {
return nil, eventsquery.ErrConnClosed
Expand Down Expand Up @@ -132,18 +138,26 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) {

func TestEventsQueryClient_Subscribe_Close(t *testing.T) {
var (
readAllEventsTimeout = 50 * time.Millisecond
firstEventDelay = 50 * time.Millisecond
readAllEventsTimeout = 50*time.Millisecond + firstEventDelay
handleEventsLimit = 10
readEventCounter int
connClosed atomic.Bool
ctx = context.Background()
// delayFirstEvent runs once (per test case) to delay the first event
// published by the mocked connection's Receive method to give the test
// ample time to subscribe to the events bytes observable before it
// starts receiving events, otherwise they will be dropped.
delayFirstEvent sync.Once
connClosed atomic.Bool
ctx = context.Background()
)

connMock, dialerMock := testeventsquery.NewOneTimeMockConnAndDialer(t)
connMock.EXPECT().Send(gomock.Any()).Return(nil).
Times(1)
connMock.EXPECT().Receive().
DoAndReturn(func() (any, error) {
delayFirstEvent.Do(func() { time.Sleep(firstEventDelay) })

if connClosed.Load() {
return nil, eventsquery.ErrConnClosed
}
Expand Down Expand Up @@ -289,6 +303,8 @@ func behavesLikeEitherObserver[V any](
timeout time.Duration,
onLimit func(),
) {
t.Helper()

var (
// eventsCounter is the number of events which have been received from the
// eventsBytes since this function was called.
Expand Down
2 changes: 1 addition & 1 deletion pkg/observable/channel/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestMap_Word_BytesToPalindrome(t *testing.T) {
}()

// wait a tick for the observer to receive the word
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)

// ensure that the observer received the word
require.Equal(t, int32(1), atomic.LoadInt32(&wordCounter))
Expand Down
44 changes: 16 additions & 28 deletions pkg/observable/channel/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

const (
publishDelay = 100 * time.Microsecond
notifyTimeout = publishDelay * 20
publishDelay = time.Millisecond
notifyTimeout = 50 * time.Millisecond
cancelUnsubscribeDelay = publishDelay * 2
)

Expand Down Expand Up @@ -101,11 +101,11 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

obsvbl, publisher := channel.NewObservable[int](
obsvbl, publishCh := channel.NewObservable[int](
channel.WithPublisher(tt.publishCh),
)
require.NotNil(t, obsvbl)
require.NotNil(t, publisher)
require.NotNil(t, publishCh)

// construct 3 distinct observers, each with its own channel
observers := make([]observable.Observer[int], 1)
Expand All @@ -132,9 +132,8 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {

// onDone is called when the observer channel closes
onDone := func(outputs []int) error {
if !assert.Equalf(
t, len(tt.expectedOutputs),
len(outputs),
if !assert.ElementsMatch(
t, tt.expectedOutputs, outputs,
"obsvr addr: %p", obsvr,
) {
return testerrors.ErrAsync
Expand All @@ -148,24 +147,23 @@ func TestChannelObservable_NotifyObservers(t *testing.T) {
}

// notify with test input
publish := delayedPublishFactory(publisher, publishDelay)
publish := delayedPublishFactory(publishCh, publishDelay)
for _, input := range tt.inputs {
inputPtr := new(int)
*inputPtr = input

// simulating IO delay in sequential message publishing
publish(input)
}
cancel()

// Finished sending values, close publishCh to unsubscribe all observers
// and close all fan-out channels.
close(publishCh)

// wait for obsvbl to be notified or timeout
err := group.Wait()
require.NoError(t, err)

// unsubscribing should close observer channel(s)
// closing publishCh should unsubscribe all observers, causing them
// to close their channels.
for _, observer := range observers {
observer.Unsubscribe()

// must drain the channel first to ensure it is isClosed
err := testchannel.DrainChannel(observer.Ch())
require.NoError(t, err)
Expand Down Expand Up @@ -317,20 +315,10 @@ func TestChannelObservable_SequentialPublishAndUnsubscription(t *testing.T) {
obsrvn.Lock()
defer obsrvn.Unlock()

require.Equalf(
t, len(expectedNotifications[obsnIdx]),
len(obsrvn.Notifications),
"observation index: %d, expected: %+v, actual: %+v",
obsnIdx, expectedNotifications[obsnIdx], obsrvn.Notifications,
require.EqualValuesf(
t, expectedNotifications[obsnIdx], obsrvn.Notifications,
"observation index: %d", obsnIdx,
)
for notificationIdx, expected := range expectedNotifications[obsnIdx] {
require.Equalf(
t, expected,
(obsrvn.Notifications)[notificationIdx],
"allExpected: %+v, allActual: %+v",
expectedNotifications[obsnIdx], obsrvn.Notifications,
)
}
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/observable/channel/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ func TestObserver_ConcurrentUnsubscribe(t *testing.T) {

// publish a value
obsvr.notify(idx)

// Slow this loop to prevent bogging the test down.
time.Sleep(10 * time.Microsecond)
}
}()
// send on done when the test cleans up
t.Cleanup(func() { done <- struct{}{} })

// it should still be open after a bit of inactivity
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)
require.Equal(t, false, obsvr.isClosed)

obsvr.Unsubscribe()
Expand Down
22 changes: 17 additions & 5 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func TestReplayObservable(t *testing.T) {

// send all values to the observable's publish channel
for _, value := range values {
time.Sleep(10 * time.Microsecond)
publishCh <- value
time.Sleep(10 * time.Microsecond)
}

// allow some time for values to be buffered by the replay observable
Expand All @@ -59,27 +61,37 @@ func TestReplayObservable(t *testing.T) {
// replay observer, should receive the last lastN values published prior to
// subscribing followed by subsequently published values
replayObserver := replayObsvbl.Subscribe(ctx)

// Collect values from replayObserver.
var actualValues []int
for _, expected := range expectedValues {
select {
case v := <-replayObserver.Ch():
require.Equal(t, expected, v)
actualValues = append(actualValues, v)
case <-time.After(1 * time.Second):
t.Fatalf("Did not receive expected value %d in time", expected)
}
}

// second replay observer, should receive the same values as the first
require.EqualValues(t, expectedValues, actualValues)

// Second replay observer, should receive the same values as the first
// even though it subscribed after all values were published and the
// values were already replayed by the first.
replayObserver2 := replayObsvbl.Subscribe(ctx)

// Collect values from replayObserver2.
var actualValues2 []int
for _, expected := range expectedValues {
select {
case v := <-replayObserver2.Ch():
require.Equal(t, expected, v)
actualValues2 = append(actualValues2, v)
case <-time.After(1 * time.Second):
t.Fatalf("Did not receive expected value %d in time", expected)
}
}

require.EqualValues(t, expectedValues, actualValues)
}

func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) {
Expand Down Expand Up @@ -168,7 +180,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
"Last should block until at lest 1 value has been published; actualValues: %v",
actualValues,
)
case <-time.After(200 * time.Millisecond):
case <-time.After(10 * time.Millisecond):
}

// Publish some values (up to splitIdx).
Expand Down Expand Up @@ -215,7 +227,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
case actualValues := <-getLastValues():
require.Len(t, actualValues, lastN)
require.ElementsMatch(t, values, actualValues)
case <-time.After(10 * time.Millisecond):
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for Last to return")
}

Expand Down
2 changes: 1 addition & 1 deletion tools/scripts/itest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ itest() {

# If go test fails, exit the loop.
if [[ $test_exit_status -ne 0 ]]; then
echo "go test failed on iteration $i. Exiting early."
echo "go test failed on iteration $i; exiting early. Total tests run: $total_tests_run"
return 1
fi
done
Expand Down

0 comments on commit 94e2665

Please sign in to comment.