Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free up the *Radio namespace for future config structures #3520

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadio
from parsl.monitoring.radios import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadio] = None,
monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
Expand Down Expand Up @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadio]:
def monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._monitoring_radio

@monitoring_radio.setter
def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None:
def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._monitoring_radio = value
4 changes: 2 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadio
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import AddressedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down Expand Up @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadio(self.block_msgs)
self.radio = MultiprocessingQueueRadioSender(self.block_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
Expand Down
14 changes: 7 additions & 7 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
logger = logging.getLogger(__name__)


class MonitoringRadio(metaclass=ABCMeta):
class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass


class FilesystemRadio(MonitoringRadio):
"""A MonitoringRadio that sends messages over a shared filesystem.
class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.

The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
Expand All @@ -36,7 +36,7 @@ class FilesystemRadio(MonitoringRadio):
This avoids a race condition of reading partially written messages.

This radio is likely to give higher shared filesystem load compared to
the UDPRadio, but should be much more reliable.
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str):
Expand Down Expand Up @@ -66,7 +66,7 @@ def send(self, message: object) -> None:
os.rename(tmp_filename, new_filename)


class HTEXRadio(MonitoringRadio):
class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
"""
Expand Down Expand Up @@ -120,7 +120,7 @@ def send(self, message: object) -> None:
return


class UDPRadio(MonitoringRadio):
class UDPRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
"""
Expand Down Expand Up @@ -174,7 +174,7 @@ def send(self, message: object) -> None:
return


class MultiprocessingQueueRadio(MonitoringRadio):
class MultiprocessingQueueRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
Expand Down
24 changes: 12 additions & 12 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import (
FilesystemRadio,
HTEXRadio,
MonitoringRadio,
UDPRadio,
FilesystemRadioSender,
HTEXRadioSender,
MonitoringRadioSender,
UDPRadioSender,
)
from parsl.multiprocessing import ForkProcess
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -100,17 +100,17 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return (wrapped, args, new_kwargs)


def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio:
radio: MonitoringRadio
def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender:
radio: MonitoringRadioSender
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
radio = UDPRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
radio = HTEXRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
return radio
Expand Down
Loading