Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove monitoring queue tag switch monitoring db pre-router #3587

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,31 +316,31 @@ def start(self,
self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
priority_queue, 'priority', self._kill_event,),
priority_queue, self._kill_event,),
name="Monitoring-migrate-priority",
daemon=True,
)
self._priority_queue_pull_thread.start()

self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
node_queue, 'node', self._kill_event,),
node_queue, self._kill_event,),
name="Monitoring-migrate-node",
daemon=True,
)
self._node_queue_pull_thread.start()

self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
block_queue, 'block', self._kill_event,),
block_queue, self._kill_event,),
name="Monitoring-migrate-block",
daemon=True,
)
self._block_queue_pull_thread.start()

self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
resource_queue, 'resource', self._kill_event,),
resource_queue, self._kill_event,),
name="Monitoring-migrate-resource",
daemon=True,
)
Expand Down Expand Up @@ -577,43 +577,26 @@ def start(self,
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")

@wrap_with_logs(target="database_manager")
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None:
logger.info("Starting processing for queue {}".format(queue_tag))
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None:
logger.info("Starting _migrate_logs_to_internal")
benclifford marked this conversation as resolved.
Show resolved Hide resolved

while not kill_event.is_set() or logs_queue.qsize() != 0:
logger.debug("""Checking STOP conditions for {} threads: {}, {}"""
.format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0))
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
kill_event.is_set(), logs_queue.qsize() != 0)
try:
x, addr = logs_queue.get(timeout=0.1)
except queue.Empty:
continue
else:
if queue_tag == 'priority' and x == 'STOP':
if x == 'STOP':
self.close()
elif queue_tag == 'priority': # implicitly not 'STOP'
assert isinstance(x, tuple)
assert len(x) == 2
assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \
"_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0])
self._dispatch_to_internal(x)
elif queue_tag == 'resource':
assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x)
assert x[0] == MessageType.RESOURCE_INFO, (
"_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, "
"got tag {}, message {}".format(x[0], x)
)
self._dispatch_to_internal(x)
elif queue_tag == 'node':
assert len(x) == 2, "expected message tuple to have exactly two elements"
assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue"

self._dispatch_to_internal(x)
elif queue_tag == "block":
self._dispatch_to_internal(x)
else:
logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}")
self._dispatch_to_internal(x)

def _dispatch_to_internal(self, x: Tuple) -> None:
assert isinstance(x, tuple)
assert len(x) == 2, "expected message tuple to have exactly two elements"

if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]:
self.pending_priority_queue.put(cast(Any, x))
elif x[0] == MessageType.RESOURCE_INFO:
Expand Down
Loading