From f7a527408350858c5735481b166be587b3d19807 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 1 Nov 2023 14:27:35 -0700 Subject: [PATCH 1/2] Added first roadmap change --- docs/roadmap_changelog.md | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 docs/roadmap_changelog.md diff --git a/docs/roadmap_changelog.md b/docs/roadmap_changelog.md new file mode 100644 index 000000000..e93fb5948 --- /dev/null +++ b/docs/roadmap_changelog.md @@ -0,0 +1,40 @@ +# Roadmap Changelog + +The purpose of this doc is to keep track of the changes made to the [Shannon roadmap](https://github.com/orgs/pokt-network/projects/144). + +- [Relevant links](#relevant-links) +- [11/01/2023](#11012023) + - [Changes](#changes) + - [After](#after) + - [Before](#before) + +## Relevant links + +- [Shannon Project](https://github.com/orgs/pokt-network/projects/144?query=is%3Aopen+sort%3Aupdated-desc) - GitHub dashboard +- [Shannon Roadmap](https://github.com/orgs/pokt-network/projects/144/views/4?query=is%3Aopen+sort%3Aupdated-desc) - GitHub Roadmap +- [PoktRoll Repo](https://github.com/pokt-network/poktroll) - Source Code +- [PoktRoll Issues](https://github.com/pokt-network/poktroll/issues) - GitHub Issues +- [PoktRoll Milestones](https://github.com/pokt-network/poktroll/milestones) - GitHub Milestones + +## 11/01/2023 + +### Changes + +1. We're adding a 1 week `E2E Relay` iteration to focus solely on finishing off `Foundation` & `Integration` related work needed to enable automating end-to-end relays. +2. We've delayed the `Govern` iteration to next year because: + - It is not a blocker for TestNetT + - here are still open-ended questions from PNF that need to be addressed first. +3. We've introduced `TECHDEBT` iterations to tackle `TODOs` left throughout the code. + - The first iteration will be focused on `TODO_BLOCKER` in the source code + - Details to other iterations will be ironed out closer to the iteration. +4. We have decided to have multiple `Test` iterations, each of which will be focused on testing different components. + - The first iteration will be focused on load testing relays to de-risk permissionless applications and verify the Claim & Proof lifecycle. + - Details to each iteration will be ironed out closer to the iteration. + +### After + +![Screenshot 2023-11-01 at 2 15 09 PM](https://github.com/pokt-network/poktroll/assets/1892194/e8ef99e6-aecc-433b-8a32-5fb42c05cb86) + +### Before + +![Screenshot 2023-11-01 at 11 05 21 AM](https://github.com/pokt-network/poktroll/assets/1892194/0826d4af-d0e1-4edc-a173-362425672c64) From 94e26650d8b4d20fefa3593b0664a8540aa6dc55 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 2 Nov 2023 07:12:37 +0100 Subject: [PATCH 2/2] [Testing] fix: flaky tests in observable & client pkgs (#124) * reactor: rename `event` to `eventsBz` for consistency * fix: increase delay in `Map` test to prevent flakiness * chore: improve test assertions * fix: remove redundant unsubscription * refactor: rename `publisher` var to `publishCh` * fix: close `publishCh` instead of relying on context cancellation to prevent test flakiness * fix: tune delay & timeout in observable test to prevent flakiness * chore: itest.sh prints total tests run when exiting early * chore: simplify * fixup: delay & timeout --- pkg/client/events_query/client.go | 4 +-- pkg/client/events_query/client_test.go | 28 +++++++++++---- pkg/observable/channel/map_test.go | 2 +- pkg/observable/channel/observable_test.go | 44 +++++++++-------------- pkg/observable/channel/observer_test.go | 5 ++- pkg/observable/channel/replay_test.go | 22 +++++++++--- tools/scripts/itest.sh | 2 +- 7 files changed, 63 insertions(+), 44 deletions(-) diff --git a/pkg/client/events_query/client.go b/pkg/client/events_query/client.go index f41e3e536..bd11e57fb 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events_query/client.go @@ -203,7 +203,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz( // Read and handle messages from the websocket. This loop will exit when the // websocket connection is isClosed and/or returns an error. for { - event, err := conn.Receive() + eventBz, err := conn.Receive() if err != nil { // TODO_CONSIDERATION: should we close the publish channel here too? @@ -226,7 +226,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz( } // Populate the []byte side (right) of the either and publish it. - eventsBzPublishCh <- either.Success(event) + eventsBzPublishCh <- either.Success(eventBz) } } diff --git a/pkg/client/events_query/client_test.go b/pkg/client/events_query/client_test.go index 4d3b41b30..a96516f0e 100644 --- a/pkg/client/events_query/client_test.go +++ b/pkg/client/events_query/client_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" "testing" "time" @@ -23,8 +24,6 @@ import ( ) func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { - t.Skip("TODO_BUG(@bryanchriswhite): See #120 for more details") - var ( readObserverEventsTimeout = time.Second queryCounter int @@ -60,7 +59,12 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { readEventCounter int // HandleEventsLimit is the total number of eventsBytesAndConns to send and // receive through the query client's eventsBytes for this subtest. - handleEventsLimit = 250 + handleEventsLimit = 250 + // delayFirstEvent runs once (per test case) to delay the first event + // published by the mocked connection's Receive method to give the test + // ample time to subscribe to the events bytes observable before it + // starts receiving events, otherwise they will be dropped. + delayFirstEvent sync.Once connClosed atomic.Bool queryCtx, cancelQuery = context.WithCancel(rootCtx) ) @@ -84,6 +88,8 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { // last message. connMock.EXPECT().Receive(). DoAndReturn(func() (any, error) { + delayFirstEvent.Do(func() { time.Sleep(50 * time.Millisecond) }) + // Simulate ErrConnClosed if connection is isClosed. if connClosed.Load() { return nil, eventsquery.ErrConnClosed @@ -132,11 +138,17 @@ func TestEventsQueryClient_Subscribe_Succeeds(t *testing.T) { func TestEventsQueryClient_Subscribe_Close(t *testing.T) { var ( - readAllEventsTimeout = 50 * time.Millisecond + firstEventDelay = 50 * time.Millisecond + readAllEventsTimeout = 50*time.Millisecond + firstEventDelay handleEventsLimit = 10 readEventCounter int - connClosed atomic.Bool - ctx = context.Background() + // delayFirstEvent runs once (per test case) to delay the first event + // published by the mocked connection's Receive method to give the test + // ample time to subscribe to the events bytes observable before it + // starts receiving events, otherwise they will be dropped. + delayFirstEvent sync.Once + connClosed atomic.Bool + ctx = context.Background() ) connMock, dialerMock := testeventsquery.NewOneTimeMockConnAndDialer(t) @@ -144,6 +156,8 @@ func TestEventsQueryClient_Subscribe_Close(t *testing.T) { Times(1) connMock.EXPECT().Receive(). DoAndReturn(func() (any, error) { + delayFirstEvent.Do(func() { time.Sleep(firstEventDelay) }) + if connClosed.Load() { return nil, eventsquery.ErrConnClosed } @@ -289,6 +303,8 @@ func behavesLikeEitherObserver[V any]( timeout time.Duration, onLimit func(), ) { + t.Helper() + var ( // eventsCounter is the number of events which have been received from the // eventsBytes since this function was called. diff --git a/pkg/observable/channel/map_test.go b/pkg/observable/channel/map_test.go index 37d7f5744..01014619f 100644 --- a/pkg/observable/channel/map_test.go +++ b/pkg/observable/channel/map_test.go @@ -69,7 +69,7 @@ func TestMap_Word_BytesToPalindrome(t *testing.T) { }() // wait a tick for the observer to receive the word - time.Sleep(time.Millisecond) + time.Sleep(10 * time.Millisecond) // ensure that the observer received the word require.Equal(t, int32(1), atomic.LoadInt32(&wordCounter)) diff --git a/pkg/observable/channel/observable_test.go b/pkg/observable/channel/observable_test.go index e94679630..918370cb0 100644 --- a/pkg/observable/channel/observable_test.go +++ b/pkg/observable/channel/observable_test.go @@ -17,8 +17,8 @@ import ( ) const ( - publishDelay = 100 * time.Microsecond - notifyTimeout = publishDelay * 20 + publishDelay = time.Millisecond + notifyTimeout = 50 * time.Millisecond cancelUnsubscribeDelay = publishDelay * 2 ) @@ -101,11 +101,11 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - obsvbl, publisher := channel.NewObservable[int]( + obsvbl, publishCh := channel.NewObservable[int]( channel.WithPublisher(tt.publishCh), ) require.NotNil(t, obsvbl) - require.NotNil(t, publisher) + require.NotNil(t, publishCh) // construct 3 distinct observers, each with its own channel observers := make([]observable.Observer[int], 1) @@ -132,9 +132,8 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { // onDone is called when the observer channel closes onDone := func(outputs []int) error { - if !assert.Equalf( - t, len(tt.expectedOutputs), - len(outputs), + if !assert.ElementsMatch( + t, tt.expectedOutputs, outputs, "obsvr addr: %p", obsvr, ) { return testerrors.ErrAsync @@ -148,24 +147,23 @@ func TestChannelObservable_NotifyObservers(t *testing.T) { } // notify with test input - publish := delayedPublishFactory(publisher, publishDelay) + publish := delayedPublishFactory(publishCh, publishDelay) for _, input := range tt.inputs { - inputPtr := new(int) - *inputPtr = input - // simulating IO delay in sequential message publishing publish(input) } - cancel() + + // Finished sending values, close publishCh to unsubscribe all observers + // and close all fan-out channels. + close(publishCh) // wait for obsvbl to be notified or timeout err := group.Wait() require.NoError(t, err) - // unsubscribing should close observer channel(s) + // closing publishCh should unsubscribe all observers, causing them + // to close their channels. for _, observer := range observers { - observer.Unsubscribe() - // must drain the channel first to ensure it is isClosed err := testchannel.DrainChannel(observer.Ch()) require.NoError(t, err) @@ -317,20 +315,10 @@ func TestChannelObservable_SequentialPublishAndUnsubscription(t *testing.T) { obsrvn.Lock() defer obsrvn.Unlock() - require.Equalf( - t, len(expectedNotifications[obsnIdx]), - len(obsrvn.Notifications), - "observation index: %d, expected: %+v, actual: %+v", - obsnIdx, expectedNotifications[obsnIdx], obsrvn.Notifications, + require.EqualValuesf( + t, expectedNotifications[obsnIdx], obsrvn.Notifications, + "observation index: %d", obsnIdx, ) - for notificationIdx, expected := range expectedNotifications[obsnIdx] { - require.Equalf( - t, expected, - (obsrvn.Notifications)[notificationIdx], - "allExpected: %+v, allActual: %+v", - expectedNotifications[obsnIdx], obsrvn.Notifications, - ) - } }) } } diff --git a/pkg/observable/channel/observer_test.go b/pkg/observable/channel/observer_test.go index ccda5c66c..fe7c865a9 100644 --- a/pkg/observable/channel/observer_test.go +++ b/pkg/observable/channel/observer_test.go @@ -70,13 +70,16 @@ func TestObserver_ConcurrentUnsubscribe(t *testing.T) { // publish a value obsvr.notify(idx) + + // Slow this loop to prevent bogging the test down. + time.Sleep(10 * time.Microsecond) } }() // send on done when the test cleans up t.Cleanup(func() { done <- struct{}{} }) // it should still be open after a bit of inactivity - time.Sleep(10 * time.Millisecond) + time.Sleep(time.Millisecond) require.Equal(t, false, obsvr.isClosed) obsvr.Unsubscribe() diff --git a/pkg/observable/channel/replay_test.go b/pkg/observable/channel/replay_test.go index a34fb0f92..b04857196 100644 --- a/pkg/observable/channel/replay_test.go +++ b/pkg/observable/channel/replay_test.go @@ -50,7 +50,9 @@ func TestReplayObservable(t *testing.T) { // send all values to the observable's publish channel for _, value := range values { + time.Sleep(10 * time.Microsecond) publishCh <- value + time.Sleep(10 * time.Microsecond) } // allow some time for values to be buffered by the replay observable @@ -59,27 +61,37 @@ func TestReplayObservable(t *testing.T) { // replay observer, should receive the last lastN values published prior to // subscribing followed by subsequently published values replayObserver := replayObsvbl.Subscribe(ctx) + + // Collect values from replayObserver. + var actualValues []int for _, expected := range expectedValues { select { case v := <-replayObserver.Ch(): - require.Equal(t, expected, v) + actualValues = append(actualValues, v) case <-time.After(1 * time.Second): t.Fatalf("Did not receive expected value %d in time", expected) } } - // second replay observer, should receive the same values as the first + require.EqualValues(t, expectedValues, actualValues) + + // Second replay observer, should receive the same values as the first // even though it subscribed after all values were published and the // values were already replayed by the first. replayObserver2 := replayObsvbl.Subscribe(ctx) + + // Collect values from replayObserver2. + var actualValues2 []int for _, expected := range expectedValues { select { case v := <-replayObserver2.Ch(): - require.Equal(t, expected, v) + actualValues2 = append(actualValues2, v) case <-time.After(1 * time.Second): t.Fatalf("Did not receive expected value %d in time", expected) } } + + require.EqualValues(t, expectedValues, actualValues) } func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) { @@ -168,7 +180,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { "Last should block until at lest 1 value has been published; actualValues: %v", actualValues, ) - case <-time.After(200 * time.Millisecond): + case <-time.After(10 * time.Millisecond): } // Publish some values (up to splitIdx). @@ -215,7 +227,7 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { case actualValues := <-getLastValues(): require.Len(t, actualValues, lastN) require.ElementsMatch(t, values, actualValues) - case <-time.After(10 * time.Millisecond): + case <-time.After(50 * time.Millisecond): t.Fatal("timed out waiting for Last to return") } diff --git a/tools/scripts/itest.sh b/tools/scripts/itest.sh index 642cdbfe8..323db3d71 100755 --- a/tools/scripts/itest.sh +++ b/tools/scripts/itest.sh @@ -44,7 +44,7 @@ itest() { # If go test fails, exit the loop. if [[ $test_exit_status -ne 0 ]]; then - echo "go test failed on iteration $i. Exiting early." + echo "go test failed on iteration $i; exiting early. Total tests run: $total_tests_run" return 1 fi done