-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up resources in DHT/P2P, improve test robustness #636
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #636 +/- ##
==========================================
+ Coverage 85.39% 86.17% +0.78%
==========================================
Files 81 81
Lines 8006 8010 +4
==========================================
+ Hits 6837 6903 +66
+ Misses 1169 1107 -62
|
self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=True) | ||
self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.info(f"Caught signal {signal_number} ({strsignal(signal_number)}), shutting down") | ||
exit_event.set() | ||
|
||
signal(SIGTERM, signal_handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We terminate hivemind-server and hivemind-dht in tests, so a signal handler is necessary to ensure child processes are terminated correctly
writer.close() | ||
await writer.wait_closed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_closed is recommended after close()
@@ -46,6 +46,7 @@ async def write_unsigned_varint(stream: asyncio.StreamWriter, integer: int, max_ | |||
value |= 0x80 | |||
byte = value.to_bytes(1, "big") | |||
stream.write(byte) | |||
await stream.drain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drain() is recommended after write()
for _ in range(5): | ||
for _ in range(10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason, we might not be getting the updated routing table in DHT processes after 5 iterations. Increasing the limit seems to help, although the solution is still quite hacky
@@ -282,6 +282,7 @@ def test_client_anomaly_detection(): | |||
experts["expert.3"].module.ffn.weight.data[0, 0] = float("nan") | |||
|
|||
dht = DHT(start=True) | |||
dht.get_visible_maddrs(latest=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using the solution suggested in #635, otherwise replicating P2P seems to hang sometimes
@@ -262,6 +262,7 @@ def _check_result_and_set(future): | |||
|
|||
future1, future2 = receiver.recv() | |||
future1.set_result(123) | |||
events[6].wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this event, we could assert future2.done()
in the main process before it's cancelled in the child process
During that sleep, one of the worker-less coroutines will take up the worker freed by coroutine A. | ||
Finally, coroutine A finishes sleeping and immediately gets stuck at lock2, because there are no free workers. | ||
Thus, every single coroutine is either awaiting an already acquired lock, or awaiting for free workers in executor. | ||
|
||
""" | ||
total_sleep_time = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand the purpose of this test since there are no assert statements, but decided to optimize it a bit (especially on machines with >=128 cores) by setting the total sleep time across all coroutines
counter.value += 1 | ||
with counter.get_lock(): | ||
counter.value += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inplace updates of mp.Value are not atomic, so we need to acquire the lock to avoid race conditions
This PR makes several changes to the codebase that aim to make tests less flaky (the situation with their stability is quite bleak at the moment).
Specifically, it does the following:
fail-fast
for the test matrix to make sure all failing tests can be seen at the same time