From 1f491a411c68252bebc59d670de5007ae77057ec Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 11:22:29 -0400 Subject: [PATCH 01/11] Explicitly signal stream initiation and termination --- jumpstarter/common/streams.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index b73b2273..a9b3681c 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -11,7 +11,15 @@ async def forward_server_stream(request_iterator, stream): async def client_to_server(): try: async for frame in request_iterator: - await stream.send(frame.payload) + match frame.frame_type: + case router_pb2.FRAME_TYPE_DATA: + await stream.send(frame.payload) + case router_pb2.FRAME_TYPE_PING: + pass + case router_pb2.FRAME_TYPE_GOAWAY: + break + case _: + pass except BrokenResourceError: pass finally: @@ -21,8 +29,10 @@ async def client_to_server(): # server_to_client try: + yield router_pb2.StreamResponse(frame_type=router_pb2.FRAME_TYPE_PING) async for payload in stream: yield router_pb2.StreamResponse(payload=payload) + yield router_pb2.StreamResponse(frame_type=router_pb2.FRAME_TYPE_GOAWAY) except BrokenResourceError: pass @@ -30,8 +40,10 @@ async def client_to_server(): async def forward_client_stream(router, stream, metadata): async def client_to_server(): try: + yield router_pb2.StreamRequest(frame_type=router_pb2.FRAME_TYPE_PING) async for payload in stream: yield router_pb2.StreamRequest(payload=payload) + yield router_pb2.StreamRequest(frame_type=router_pb2.FRAME_TYPE_GOAWAY) except BrokenResourceError: pass @@ -41,9 +53,15 @@ async def client_to_server(): client_to_server(), metadata=metadata, ): - if not frame.payload: - break - await stream.send(frame.payload) + match frame.frame_type: + case router_pb2.FRAME_TYPE_DATA: + await stream.send(frame.payload) + case router_pb2.FRAME_TYPE_PING: + pass + case router_pb2.FRAME_TYPE_GOAWAY: + break + case _: + pass except grpc.aio.AioRpcError: # TODO: handle connection error pass From 427d6c05f1cd4f7074706715decb7d405a5b5ae0 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 12:08:19 -0400 Subject: [PATCH 02/11] Simplify stream handling --- jumpstarter/client/lease.py | 2 + jumpstarter/common/streams.py | 82 ++++++++++++++--------------------- jumpstarter/drivers/base.py | 4 +- jumpstarter/drivers/core.py | 1 + 4 files changed, 39 insertions(+), 50 deletions(-) diff --git a/jumpstarter/client/lease.py b/jumpstarter/client/lease.py index cc84140f..abfaa5ac 100644 --- a/jumpstarter/client/lease.py +++ b/jumpstarter/client/lease.py @@ -94,6 +94,8 @@ async def start_soon(): ) as inner: yield self.portal.call(client_from_channel, inner, self.portal) + self.portal.call(tg.cancel_scope.cancel) + async def __accept(self, listener, response): async with await listener.accept() as stream: await connect_router_stream(response.router_endpoint, response.router_token, stream) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index a9b3681c..b0aa3906 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -1,72 +1,56 @@ import grpc -from anyio import BrokenResourceError, create_memory_object_stream, create_task_group +from anyio import BrokenResourceError, ClosedResourceError, create_memory_object_stream, create_task_group from anyio.streams.stapled import StapledObjectStream from jumpstarter.v1 import router_pb2, router_pb2_grpc -async def forward_server_stream(request_iterator, stream): - async with create_task_group() as tg: - - async def client_to_server(): - try: - async for frame in request_iterator: - match frame.frame_type: - case router_pb2.FRAME_TYPE_DATA: - await stream.send(frame.payload) - case router_pb2.FRAME_TYPE_PING: - pass - case router_pb2.FRAME_TYPE_GOAWAY: - break - case _: - pass - except BrokenResourceError: - pass - finally: - await stream.send_eof() - - tg.start_soon(client_to_server) - - # server_to_client - try: - yield router_pb2.StreamResponse(frame_type=router_pb2.FRAME_TYPE_PING) - async for payload in stream: - yield router_pb2.StreamResponse(payload=payload) - yield router_pb2.StreamResponse(frame_type=router_pb2.FRAME_TYPE_GOAWAY) - except BrokenResourceError: - pass - +async def encapsulate_stream(rx, cls): + try: + yield cls(frame_type=router_pb2.FRAME_TYPE_PING) + async for payload in rx: + yield cls(payload=payload) + yield cls(frame_type=router_pb2.FRAME_TYPE_GOAWAY) + except (BrokenResourceError, ClosedResourceError): + pass -async def forward_client_stream(router, stream, metadata): - async def client_to_server(): - try: - yield router_pb2.StreamRequest(frame_type=router_pb2.FRAME_TYPE_PING) - async for payload in stream: - yield router_pb2.StreamRequest(payload=payload) - yield router_pb2.StreamRequest(frame_type=router_pb2.FRAME_TYPE_GOAWAY) - except BrokenResourceError: - pass - # server_to_client +async def decapsulate_stream(tx, rx): try: - async for frame in router.Stream( - client_to_server(), - metadata=metadata, - ): + async for frame in rx: match frame.frame_type: case router_pb2.FRAME_TYPE_DATA: - await stream.send(frame.payload) + await tx.send(frame.payload) case router_pb2.FRAME_TYPE_PING: pass case router_pb2.FRAME_TYPE_GOAWAY: break case _: pass + except (BrokenResourceError, ClosedResourceError): + pass + + +async def forward_server_stream(request_iterator, stream): + async with create_task_group() as tg: + tg.start_soon(decapsulate_stream, stream, request_iterator) + + async for v in encapsulate_stream(stream, router_pb2.StreamResponse): + yield v + + +async def forward_client_stream(router, stream, metadata): + try: + response_iterator = router.Stream( + encapsulate_stream(stream, router_pb2.StreamRequest), + metadata=metadata, + ) + await decapsulate_stream(stream, response_iterator) + async for _ in response_iterator: + pass except grpc.aio.AioRpcError: # TODO: handle connection error pass - except BrokenResourceError: - pass finally: await stream.aclose() diff --git a/jumpstarter/drivers/base.py b/jumpstarter/drivers/base.py index 9bef2315..9e501a27 100644 --- a/jumpstarter/drivers/base.py +++ b/jumpstarter/drivers/base.py @@ -98,12 +98,14 @@ async def Stream(self, request_iterator, context): self.resources[resource_uuid] = resource await resource.send(str(resource_uuid).encode("utf-8")) + await resource.send_eof() async with remote: async for v in forward_server_stream(request_iterator, remote): yield v - del self.resources[resource_uuid] + # del self.resources[resource_uuid] + # small resources might be fully buffered in memory async def GetReport(self, request, context): """ diff --git a/jumpstarter/drivers/core.py b/jumpstarter/drivers/core.py index 0a111a7c..090bd080 100644 --- a/jumpstarter/drivers/core.py +++ b/jumpstarter/drivers/core.py @@ -75,6 +75,7 @@ async def stream_async(self, method): ) async with client_stream: yield client_stream + tg.cancel_scope.cancel() @asynccontextmanager async def portforward_async(self, method, listener): From ee4a2993d3a079c1da0c33db365044076fe748e5 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 15:00:07 -0400 Subject: [PATCH 03/11] Fix stream handling under lease --- jumpstarter/client/lease.py | 44 ++++++++++++++++------------------- jumpstarter/common/streams.py | 2 -- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/jumpstarter/client/lease.py b/jumpstarter/client/lease.py index abfaa5ac..6214497b 100644 --- a/jumpstarter/client/lease.py +++ b/jumpstarter/client/lease.py @@ -1,4 +1,5 @@ -from contextlib import AbstractContextManager, contextmanager +from asyncio.exceptions import InvalidStateError +from contextlib import AbstractContextManager, asynccontextmanager, contextmanager from dataclasses import dataclass from pathlib import Path from tempfile import TemporaryDirectory @@ -70,32 +71,27 @@ class Lease: def __post_init__(self, *args): jumpstarter_pb2_grpc.ControllerServiceStub.__init__(self, self.channel) - @contextmanager - def connect(self): - response = self.portal.call(self.Dial, jumpstarter_pb2.DialRequest(uuid=str(self.uuid))) + @asynccontextmanager + async def connect_async(self): + response = await self.Dial(jumpstarter_pb2.DialRequest(uuid=str(self.uuid))) with TemporaryDirectory() as tempdir: socketpath = Path(tempdir) / "socket" + async with await create_unix_listener(socketpath) as listener: + async with create_task_group() as tg: + tg.start_soon(self.__accept, listener, response) + async with await insecure_channel(f"unix://{socketpath}") as inner: + yield await client_from_channel(inner, self.portal) + tg.cancel_scope.cancel() - with self.portal.wrap_async_context_manager(self.portal.call(create_unix_listener, socketpath)) as listener: - - async def create_tg(): - return create_task_group() - - with self.portal.wrap_async_context_manager(self.portal.call(create_tg)) as tg: - - async def start_soon(): - tg.start_soon(self.__accept, listener, response) - - self.portal.call(start_soon) - - with self.portal.wrap_async_context_manager( - self.portal.call(insecure_channel, f"unix://{socketpath}") - ) as inner: - yield self.portal.call(client_from_channel, inner, self.portal) - - self.portal.call(tg.cancel_scope.cancel) + @contextmanager + def connect(self): + with self.portal.wrap_async_context_manager(self.connect_async()) as client: + yield client async def __accept(self, listener, response): - async with await listener.accept() as stream: - await connect_router_stream(response.router_endpoint, response.router_token, stream) + try: + async with await listener.accept() as stream: + await connect_router_stream(response.router_endpoint, response.router_token, stream) + except InvalidStateError: + pass diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index b0aa3906..849af8fe 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -51,8 +51,6 @@ async def forward_client_stream(router, stream, metadata): except grpc.aio.AioRpcError: # TODO: handle connection error pass - finally: - await stream.aclose() async def connect_router_stream(endpoint, token, stream): From 5891ee4c7d8e05d630c91b4dc45a386da17b92a0 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 15:36:55 -0400 Subject: [PATCH 04/11] Rework all stream handling with context manager --- jumpstarter/client/lease.py | 20 +++++---------- jumpstarter/common/streams.py | 44 ++++++++++++++++++++------------ jumpstarter/drivers/core.py | 26 +++++++++---------- jumpstarter/exporter/exporter.py | 5 ++-- 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/jumpstarter/client/lease.py b/jumpstarter/client/lease.py index 6214497b..3e297fc3 100644 --- a/jumpstarter/client/lease.py +++ b/jumpstarter/client/lease.py @@ -1,11 +1,10 @@ -from asyncio.exceptions import InvalidStateError from contextlib import AbstractContextManager, asynccontextmanager, contextmanager from dataclasses import dataclass from pathlib import Path from tempfile import TemporaryDirectory from uuid import UUID -from anyio import create_task_group, create_unix_listener +from anyio import create_unix_listener from anyio.from_thread import BlockingPortal from google.protobuf import duration_pb2 from grpc.aio import Channel @@ -78,20 +77,13 @@ async def connect_async(self): with TemporaryDirectory() as tempdir: socketpath = Path(tempdir) / "socket" async with await create_unix_listener(socketpath) as listener: - async with create_task_group() as tg: - tg.start_soon(self.__accept, listener, response) - async with await insecure_channel(f"unix://{socketpath}") as inner: - yield await client_from_channel(inner, self.portal) - tg.cancel_scope.cancel() + async with await insecure_channel(f"unix://{socketpath}") as inner: + inner.get_state(try_to_connect=True) + async with await listener.accept() as stream: + async with connect_router_stream(response.router_endpoint, response.router_token, stream): + yield await client_from_channel(inner, self.portal) @contextmanager def connect(self): with self.portal.wrap_async_context_manager(self.connect_async()) as client: yield client - - async def __accept(self, listener, response): - try: - async with await listener.accept() as stream: - await connect_router_stream(response.router_endpoint, response.router_token, stream) - except InvalidStateError: - pass diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index 849af8fe..6fa77510 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -1,3 +1,5 @@ +from contextlib import asynccontextmanager + import grpc from anyio import BrokenResourceError, ClosedResourceError, create_memory_object_stream, create_task_group from anyio.streams.stapled import StapledObjectStream @@ -15,44 +17,50 @@ async def encapsulate_stream(rx, cls): pass -async def decapsulate_stream(tx, rx): +async def decapsulate_stream(tx, rx, tg): try: async for frame in rx: match frame.frame_type: case router_pb2.FRAME_TYPE_DATA: - await tx.send(frame.payload) + try: + await tx.send(frame.payload) + except (BrokenResourceError, ClosedResourceError): + pass case router_pb2.FRAME_TYPE_PING: pass case router_pb2.FRAME_TYPE_GOAWAY: break case _: pass - except (BrokenResourceError, ClosedResourceError): - pass + # workaround for grpc + async for _ in rx: + pass + finally: + tg.cancel_scope.cancel() async def forward_server_stream(request_iterator, stream): async with create_task_group() as tg: - tg.start_soon(decapsulate_stream, stream, request_iterator) + tg.start_soon(decapsulate_stream, stream, request_iterator, tg) async for v in encapsulate_stream(stream, router_pb2.StreamResponse): yield v +@asynccontextmanager async def forward_client_stream(router, stream, metadata): - try: - response_iterator = router.Stream( - encapsulate_stream(stream, router_pb2.StreamRequest), - metadata=metadata, - ) - await decapsulate_stream(stream, response_iterator) - async for _ in response_iterator: - pass - except grpc.aio.AioRpcError: - # TODO: handle connection error - pass + response_iterator = router.Stream( + encapsulate_stream(stream, router_pb2.StreamRequest), + metadata=metadata, + ) + async with create_task_group() as tg: + tg.start_soon(decapsulate_stream, stream, response_iterator, tg) + yield + tg.cancel_scope.cancel() + +@asynccontextmanager async def connect_router_stream(endpoint, token, stream): credentials = grpc.composite_channel_credentials( grpc.local_channel_credentials(), # TODO: Use TLS @@ -61,7 +69,9 @@ async def connect_router_stream(endpoint, token, stream): async with grpc.aio.secure_channel(endpoint, credentials) as channel: router = router_pb2_grpc.RouterServiceStub(channel) - await forward_client_stream(router, stream, ()) + + async with forward_client_stream(router, stream, ()): + yield def create_memory_stream(): diff --git a/jumpstarter/drivers/core.py b/jumpstarter/drivers/core.py index 090bd080..17dcaa71 100644 --- a/jumpstarter/drivers/core.py +++ b/jumpstarter/drivers/core.py @@ -5,7 +5,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass -from anyio import create_task_group +from anyio import create_task_group, sleep_forever from anyio.streams.stapled import StapledObjectStream from google.protobuf import json_format, struct_pb2 from grpc.aio import Channel @@ -65,27 +65,24 @@ async def streamingcall_async(self, method, *args): @asynccontextmanager async def stream_async(self, method): client_stream, device_stream = create_memory_stream() - - async with create_task_group() as tg: - tg.start_soon( - forward_client_stream, - self, - device_stream, - {"kind": "connect", "uuid": str(self.uuid), "method": method}.items(), - ) + async with forward_client_stream( + self, + device_stream, + {"kind": "connect", "uuid": str(self.uuid), "method": method}.items(), + ): async with client_stream: yield client_stream - tg.cancel_scope.cancel() @asynccontextmanager async def portforward_async(self, method, listener): async def handle(client): async with client: - await forward_client_stream( + async with forward_client_stream( self, client, {"kind": "connect", "uuid": str(self.uuid), "method": method}.items(), - ) + ): + await sleep_forever() async with create_task_group() as tg: tg.start_soon(listener.serve, handle) @@ -105,11 +102,12 @@ async def resource_async( async def handle(stream): async with stream: - await forward_client_stream( + async with forward_client_stream( self, stream, {"kind": "resource", "uuid": str(self.uuid)}.items(), - ) + ): + await sleep_forever() async with create_task_group() as tg: tg.start_soon(handle, combined) diff --git a/jumpstarter/exporter/exporter.py b/jumpstarter/exporter/exporter.py index 5f739815..23aafe22 100644 --- a/jumpstarter/exporter/exporter.py +++ b/jumpstarter/exporter/exporter.py @@ -5,7 +5,7 @@ from tempfile import TemporaryDirectory import grpc -from anyio import connect_unix +from anyio import connect_unix, sleep_forever from jumpstarter.common import Metadata from jumpstarter.common.streams import connect_router_stream @@ -65,6 +65,7 @@ async def serve(self): await server.start() async with await connect_unix(socketpath) as stream: - await connect_router_stream(request.router_endpoint, request.router_token, stream) + async with connect_router_stream(request.router_endpoint, request.router_token, stream): + await sleep_forever() finally: await server.stop(grace=None) From 4b30f4e16a04260a080801a1b892db639a65a51d Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 15:43:29 -0400 Subject: [PATCH 05/11] Fixup error handling in decapsulate_stream --- jumpstarter/common/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index 6fa77510..e86c68bf 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -35,6 +35,8 @@ async def decapsulate_stream(tx, rx, tg): # workaround for grpc async for _ in rx: pass + except Exception: + pass finally: tg.cancel_scope.cancel() From 3e724c95324c17a974059c2252893ce68a8f4512 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 15:45:40 -0400 Subject: [PATCH 06/11] Drop the use of cancel_scope in resource handling --- jumpstarter/drivers/core.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/jumpstarter/drivers/core.py b/jumpstarter/drivers/core.py index 17dcaa71..e89507c2 100644 --- a/jumpstarter/drivers/core.py +++ b/jumpstarter/drivers/core.py @@ -100,18 +100,10 @@ async def resource_async( combined = StapledObjectStream(tx, ProgressStream(stream=stream)) - async def handle(stream): - async with stream: - async with forward_client_stream( - self, - stream, - {"kind": "resource", "uuid": str(self.uuid)}.items(), - ): - await sleep_forever() - - async with create_task_group() as tg: - tg.start_soon(handle, combined) - try: + async with combined: + async with forward_client_stream( + self, + combined, + {"kind": "resource", "uuid": str(self.uuid)}.items(), + ): yield (await rx.receive()).decode() - finally: - tg.cancel_scope.cancel() From b92ed4c78558d8bbacf9be07ca562a85cd2b691f Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 16:04:25 -0400 Subject: [PATCH 07/11] Only ignore rpc cancellation exception --- jumpstarter/common/streams.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index e86c68bf..3d487334 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -35,8 +35,13 @@ async def decapsulate_stream(tx, rx, tg): # workaround for grpc async for _ in rx: pass - except Exception: - pass + # ignore rpc cancellation + except grpc.aio.AioRpcError as e: + match e.code(): + case grpc.StatusCode.CANCELLED: + pass + case _: + raise finally: tg.cancel_scope.cancel() From 81a2bf3f7ab8e7fb80acf5dc2d4e73f3eb3bdf58 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 16:13:20 -0400 Subject: [PATCH 08/11] Also ignore grpc internal error --- jumpstarter/common/streams.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index 3d487334..b1f039d6 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -22,23 +22,18 @@ async def decapsulate_stream(tx, rx, tg): async for frame in rx: match frame.frame_type: case router_pb2.FRAME_TYPE_DATA: - try: - await tx.send(frame.payload) - except (BrokenResourceError, ClosedResourceError): - pass - case router_pb2.FRAME_TYPE_PING: - pass + await tx.send(frame.payload) case router_pb2.FRAME_TYPE_GOAWAY: - break + await tx.send_eof() case _: pass - # workaround for grpc - async for _ in rx: - pass - # ignore rpc cancellation + # ignore peer disconnet + except BrokenResourceError: + pass + # ignore rpc cancellation and internal error except grpc.aio.AioRpcError as e: match e.code(): - case grpc.StatusCode.CANCELLED: + case grpc.StatusCode.CANCELLED | grpc.StatusCode.INTERNAL: pass case _: raise From a56ba7dbf41ff950d64ba1adae633f5de21e72ff Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 16:17:51 -0400 Subject: [PATCH 09/11] Only call send_eof on supported types --- jumpstarter/common/streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index b1f039d6..95e3c932 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -2,6 +2,7 @@ import grpc from anyio import BrokenResourceError, ClosedResourceError, create_memory_object_stream, create_task_group +from anyio.abc import ByteStream, ObjectStream from anyio.streams.stapled import StapledObjectStream from jumpstarter.v1 import router_pb2, router_pb2_grpc @@ -24,7 +25,8 @@ async def decapsulate_stream(tx, rx, tg): case router_pb2.FRAME_TYPE_DATA: await tx.send(frame.payload) case router_pb2.FRAME_TYPE_GOAWAY: - await tx.send_eof() + if isinstance(tx, ObjectStream) or isinstance(tx, ByteStream): + await tx.send_eof() case _: pass # ignore peer disconnet From 7b389f6367fd8b37a63eaef1aa3749f2185a024c Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 1 Aug 2024 16:29:05 -0400 Subject: [PATCH 10/11] Fix typo --- jumpstarter/common/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index 95e3c932..80efe3ce 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -29,7 +29,7 @@ async def decapsulate_stream(tx, rx, tg): await tx.send_eof() case _: pass - # ignore peer disconnet + # ignore peer disconnect except BrokenResourceError: pass # ignore rpc cancellation and internal error From 2a3f62ea46554ab0786fe06b7f0cc2b9c0ca6bc2 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Tue, 6 Aug 2024 08:53:59 -0400 Subject: [PATCH 11/11] Add debug logging to ignored exceptions --- jumpstarter/common/streams.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/jumpstarter/common/streams.py b/jumpstarter/common/streams.py index 80efe3ce..51fd939a 100644 --- a/jumpstarter/common/streams.py +++ b/jumpstarter/common/streams.py @@ -1,3 +1,4 @@ +import logging from contextlib import asynccontextmanager import grpc @@ -7,6 +8,8 @@ from jumpstarter.v1 import router_pb2, router_pb2_grpc +logger = logging.getLogger(__name__) + async def encapsulate_stream(rx, cls): try: @@ -15,7 +18,7 @@ async def encapsulate_stream(rx, cls): yield cls(payload=payload) yield cls(frame_type=router_pb2.FRAME_TYPE_GOAWAY) except (BrokenResourceError, ClosedResourceError): - pass + logger.debug("stream encapsulation error ignored") async def decapsulate_stream(tx, rx, tg): @@ -28,15 +31,15 @@ async def decapsulate_stream(tx, rx, tg): if isinstance(tx, ObjectStream) or isinstance(tx, ByteStream): await tx.send_eof() case _: - pass + logger.debug(f"unrecognized frame ignored: {frame}") # ignore peer disconnect except BrokenResourceError: - pass + logger.debug("stream decapsulation peer disconnect ignored") # ignore rpc cancellation and internal error except grpc.aio.AioRpcError as e: match e.code(): case grpc.StatusCode.CANCELLED | grpc.StatusCode.INTERNAL: - pass + logger.debug("stream decapsulation grpc error ignored") case _: raise finally: