Skip to content

Commit

Permalink
Seprerate the multi_queue behaviour from default, abstracted the buil…
Browse files Browse the repository at this point in the history
…ding of the message dictionary to prevent code repetition
  • Loading branch information
thysk committed Feb 19, 2025
1 parent e383672 commit 3eccea3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 21 deletions.
7 changes: 6 additions & 1 deletion lib/rucio/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def retrieve_messages(bulk: int = 1000,
event_type: "Optional[str]" = None,
lock: bool = False,
old_mode: bool = True,
service_filter: "Optional[str]" = None,
*, session: "Session") -> "MessagesListType":
"""
Retrieve up to $bulk messages.
Expand All @@ -123,7 +124,11 @@ def retrieve_messages(bulk: int = 1000,
Message.created_at
)
stmt_subquery = filter_thread_work(session=session, query=stmt_subquery, total_threads=total_threads, thread_id=thread)
if event_type:
if service_filter:
stmt_subquery = stmt_subquery.where(
Message.services == service_filter
)
elif event_type:
stmt_subquery = stmt_subquery.where(
Message.event_type == event_type
)
Expand Down
82 changes: 62 additions & 20 deletions lib/rucio/daemons/hermes/hermes.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,45 @@ def aggregate_to_influx(
return 204


def build_message_dict(
bulk: int,
worker_number: int,
total_workers: int,
message_dict: dict[str, list[dict[str, Any]]],
logger: "LoggerFunction",
service: str = "",
) -> None:
"""
Build a dictionary with the keys being the services, and the values a list of the messages (built up of dictionary / json information)
:param bulk Intiger for number of messages to retrieve, default behaviour is total, query_by_service is per service
:param worker_number Passed to thread in retrieve_mesasges for Identifier of the caller thread as an integer.
:param totoal_workers Passed to total_threads for Maximum number of threads as an integer.
:param message_dict Either empty dictionary to be build, or build upon when using query_by_service
:param logger: The logger object.
:param service When passed queries the message table for bulk number for that specific service.
:returns None, but builds on the dictionary message_dict passed to this fuction (for when querying multiple services).
"""
start_time = time.time()
messages = retrieve_messages(
bulk=bulk,
old_mode=False,
thread=worker_number,
total_threads=total_workers,
service_filter=service,
)

if messages and service not in message_dict:
message_dict[service] = messages.copy()
logger(
logging.DEBUG,
"Retrieved %i messages retrieved in %s seconds",
len(messages),
time.time() - start_time,
)


def hermes(once: bool = False, bulk: int = 1000, sleep_time: int = 10) -> None:
"""
Creates a Hermes Worker that can submit messages to different services (InfluXDB, ElasticSearch, ActiveMQ)
Expand Down Expand Up @@ -301,28 +340,31 @@ def run_once(heartbeat_handler: "HeartbeatHandler", bulk: int, **_kwargs) -> boo

worker_number, total_workers, logger = heartbeat_handler.live()
message_dict = {}
message_ids = []
start_time = time.time()
messages = retrieve_messages(bulk=bulk,
old_mode=False,
thread=worker_number,
total_threads=total_workers)

to_delete = []
if messages:
for message in messages:
service = message["services"]
if service not in message_dict:
message_dict[service] = []
message_dict[service].append(message)
message_ids.append(message["id"])
logger(
logging.DEBUG,
"Retrieved %i messages retrieved in %s seconds",
len(messages),
time.time() - start_time,
query_by_service = config_get_bool("hermes", "query_by_service", default=False)

# query_by_service is a toggleable behaviour switch between collecting bulk number of messages across all services when false, to collecting bulk messages from each service when true.
if query_by_service:
for service in services_list:
build_message_dict(
bulk=bulk,
worker_number=worker_number,
total_workers=total_workers,
message_dict=message_dict,
logger=logger,
service=service,
)
else:
build_message_dict(
bulk=bulk,
worker_number=worker_number,
total_workers=total_workers,
message_dict=message_dict,
logger=logger
)

if message_dict:
to_delete = []

if "influx" in message_dict and influx_endpoint:
# For influxDB, bulk submission, either everything succeeds or fails
t_time = time.time()
Expand Down

0 comments on commit 3eccea3

Please sign in to comment.