diff --git a/lib/rucio/core/message.py b/lib/rucio/core/message.py index affeba5999..caff02a249 100644 --- a/lib/rucio/core/message.py +++ b/lib/rucio/core/message.py @@ -101,6 +101,7 @@ def retrieve_messages(bulk: int = 1000, event_type: "Optional[str]" = None, lock: bool = False, old_mode: bool = True, + service_filter: "Optional[str]" = None, *, session: "Session") -> "MessagesListType": """ Retrieve up to $bulk messages. @@ -123,7 +124,11 @@ def retrieve_messages(bulk: int = 1000, Message.created_at ) stmt_subquery = filter_thread_work(session=session, query=stmt_subquery, total_threads=total_threads, thread_id=thread) - if event_type: + if service_filter: + stmt_subquery = stmt_subquery.where( + Message.services == service_filter + ) + elif event_type: stmt_subquery = stmt_subquery.where( Message.event_type == event_type ) diff --git a/lib/rucio/daemons/hermes/hermes.py b/lib/rucio/daemons/hermes/hermes.py index 9462c6e952..979e54fad3 100644 --- a/lib/rucio/daemons/hermes/hermes.py +++ b/lib/rucio/daemons/hermes/hermes.py @@ -240,6 +240,45 @@ def aggregate_to_influx( return 204 +def build_message_dict( + bulk: int, + worker_number: int, + total_workers: int, + message_dict: dict[str, list[dict[str, Any]]], + logger: "LoggerFunction", + service: str = "", +) -> None: + """ + Build a dictionary with the keys being the services, and the values a list of the messages (built up of dictionary / json information) + + :param bulk Intiger for number of messages to retrieve, default behaviour is total, query_by_service is per service + :param worker_number Passed to thread in retrieve_mesasges for Identifier of the caller thread as an integer. + :param totoal_workers Passed to total_threads for Maximum number of threads as an integer. + :param message_dict Either empty dictionary to be build, or build upon when using query_by_service + :param logger: The logger object. + :param service When passed queries the message table for bulk number for that specific service. + + :returns None, but builds on the dictionary message_dict passed to this fuction (for when querying multiple services). + """ + start_time = time.time() + messages = retrieve_messages( + bulk=bulk, + old_mode=False, + thread=worker_number, + total_threads=total_workers, + service_filter=service, + ) + + if messages and service not in message_dict: + message_dict[service] = messages.copy() + logger( + logging.DEBUG, + "Retrieved %i messages retrieved in %s seconds", + len(messages), + time.time() - start_time, + ) + + def hermes(once: bool = False, bulk: int = 1000, sleep_time: int = 10) -> None: """ Creates a Hermes Worker that can submit messages to different services (InfluXDB, ElasticSearch, ActiveMQ) @@ -301,28 +340,31 @@ def run_once(heartbeat_handler: "HeartbeatHandler", bulk: int, **_kwargs) -> boo worker_number, total_workers, logger = heartbeat_handler.live() message_dict = {} - message_ids = [] - start_time = time.time() - messages = retrieve_messages(bulk=bulk, - old_mode=False, - thread=worker_number, - total_threads=total_workers) - - to_delete = [] - if messages: - for message in messages: - service = message["services"] - if service not in message_dict: - message_dict[service] = [] - message_dict[service].append(message) - message_ids.append(message["id"]) - logger( - logging.DEBUG, - "Retrieved %i messages retrieved in %s seconds", - len(messages), - time.time() - start_time, + query_by_service = config_get_bool("hermes", "query_by_service", default=False) + + # query_by_service is a toggleable behaviour switch between collecting bulk number of messages across all services when false, to collecting bulk messages from each service when true. + if query_by_service: + for service in services_list: + build_message_dict( + bulk=bulk, + worker_number=worker_number, + total_workers=total_workers, + message_dict=message_dict, + logger=logger, + service=service, + ) + else: + build_message_dict( + bulk=bulk, + worker_number=worker_number, + total_workers=total_workers, + message_dict=message_dict, + logger=logger ) + if message_dict: + to_delete = [] + if "influx" in message_dict and influx_endpoint: # For influxDB, bulk submission, either everything succeeds or fails t_time = time.time()