Skip to content

Commit

Permalink
Remove epoll() usage in LTX communication
Browse files Browse the repository at this point in the history
Using epoll() is a bad idea when asyncio is involved. The reason is that
asyncio itself is using epoll() to implement concurrency, so we end up
having 2 epoll() instances running in parallel. When we record a new file
descriptor into an asyncio application, we supposed to use the
asyncio.BaseEventLoop.add_reader() method which is, unfortunately, not
working (by experience) with plain files.

For this reason, the current solution is to use threaded read() syscall
which permits to execute reading operations, as well as keeping asyncio
loop running in the background without killing UI or data acquisition.
  • Loading branch information
acerv committed Nov 29, 2023
1 parent 181ea09 commit bcb2df8
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions libkirk/ltx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
.. moduleauthor:: Andrea Cervesato <[email protected]>
"""
import os
import select
import asyncio
import logging
import typing
Expand Down Expand Up @@ -505,6 +504,8 @@ async def connect(self) -> None:

self._logger.info("Connecting to LTX")

os.set_blocking(self._stdout_fd, False)

self._exception = None
self._task = libkirk.create_task(self._polling())

Expand Down Expand Up @@ -586,7 +587,15 @@ async def _read(self, size: int) -> bytes:
"""
Blocking I/O method to read from stdout.
"""
return await libkirk.to_thread(os.read, self._stdout_fd, size)
data = None
try:
data = await libkirk.to_thread(os.read, self._stdout_fd, size)
except BlockingIOError:
# we ensure other threads will take action if reading
# procedure is too fast for this process
os.sched_yield()

return data

async def _write(self, data: bytes) -> None:
"""
Expand All @@ -608,44 +617,35 @@ async def _polling(self) -> None:
"""
self._logger.info("Starting producer")

poller = select.epoll()
poller.register(self._stdout_fd, select.EPOLLIN)

# force utf-8 encoding by using raw=False
unpacker = msgpack.Unpacker(raw=False)

try:
while not self._stop:
events = await libkirk.to_thread(poller.poll, 0.1)

for fdesc, _ in events:
if fdesc != self._stdout_fd:
continue

data = await self._read(self.BUFFSIZE)
if not data:
continue
data = await self._read(self.BUFFSIZE)
if not data:
continue

self._logger.debug("Unpacking bytes: %s", data)
self._logger.debug("Unpacking bytes: %s", data)

unpacker.feed(data)
unpacker.feed(data)

while True:
try:
msg = unpacker.unpack()
if not msg:
continue
while True:
try:
msg = unpacker.unpack()
if not msg:
continue

self._logger.info("Received message: %s", msg)
if not isinstance(msg, list):
raise LTXError("Message must be an array")
self._logger.info("Received message: %s", msg)
if not isinstance(msg, list):
raise LTXError("Message must be an array")

if msg[0] == Request.ERROR:
raise LTXError(msg[1])
if msg[0] == Request.ERROR:
raise LTXError(msg[1])

await self._feed_requests(msg)
except msgpack.OutOfData:
break
await self._feed_requests(msg)
except msgpack.OutOfData:
break
except LTXError as err:
self._exception = err
finally:
Expand Down

0 comments on commit bcb2df8

Please sign in to comment.