Skip to content

Commit

Permalink
Asynchronously handle discussion and voting queses to match the respo…
Browse files Browse the repository at this point in the history
…nse queue

Add check for opinion response to reduce log noise in error events
  • Loading branch information
NeonDaniel committed Feb 19, 2025
1 parent 0eb2e23 commit 4c43f11
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
6 changes: 5 additions & 1 deletion neon_llm_core/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ def _get_llm_api_opinion(self, prompt: str, options: dict) -> Optional[LLMDiscus
request_data=request_data.model_dump(),
target_queue=queue,
response_queue=response_queue)
if not resp_data:
LOG.error(f"Timed out waiting for response on "
f"{self.mq_queue_config.vhost}/{queue}")
return None
return LLMDiscussResponse.model_validate(obj=resp_data)
except Exception as e:
LOG.exception(f"Failed to get response on "
LOG.exception(f"Error getting response on "
f"{self.mq_queue_config.vhost}/{queue}: {e}")

def _get_llm_api_choice(self, prompt: str,
Expand Down
41 changes: 32 additions & 9 deletions neon_llm_core/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,32 @@ def handle_request(self, body: dict) -> Thread:
t.start()
return t

@create_mq_callback()
def handle_score_request(self, body: dict):
"""
Handles score requests (vote) from MQ to LLM
:param body: request body (dict)
"""
# Handle this asynchronously so multiple subminds can be handled
# concurrently
t = Thread(target=self._handle_score_async, args=(body,),
daemon=True)
t.start()
return t

@create_mq_callback()
def handle_opinion_request(self, body: dict):
"""
Handles opinion requests (discuss) from MQ to LLM
:param body: request body (dict)
"""
# Handle this asynchronously so multiple subminds can be handled
# concurrently
t = Thread(target=self._handle_opinion_async, args=(body,),
daemon=True)
t.start()
return t

@create_mq_callback()
def handle_persona_update(self, body: dict):
"""
Expand Down Expand Up @@ -184,14 +210,12 @@ def _handle_request_async(self, request: dict):
api_response = LLMProposeResponse(message_id=message_id,
response=response,
routing_key=routing_key)
LOG.info(f"Sending response: {response}")
LOG.debug(f"Sending response: {response}")
self.send_message(request_data=api_response.model_dump(),
queue=routing_key)
LOG.info(f"Handled ask request for message_id={message_id}")
LOG.info(f"Handled ask request for query={query}")

# TODO: Refactor score and opinion to work async like request
@create_mq_callback()
def handle_score_request(self, body: dict):
def _handle_score_async(self, body: dict):
"""
Handles score requests (vote) from MQ to LLM
:param body: request body (dict)
Expand All @@ -218,10 +242,9 @@ def handle_score_request(self, body: dict):
sorted_answer_indexes=sorted_answer_idx)
self.send_message(request_data=api_response.model_dump(),
queue=routing_key)
LOG.info(f"Handled score request for message_id={message_id}")
LOG.info(f"Handled score request for query={query}")

@create_mq_callback()
def handle_opinion_request(self, body: dict):
def _handle_opinion_async(self, body: dict):
"""
Handles opinion requests (discuss) from MQ to LLM
:param body: request body (dict)
Expand Down Expand Up @@ -255,7 +278,7 @@ def handle_opinion_request(self, body: dict):
opinion=opinion)
self.send_message(request_data=api_response.model_dump(),
queue=routing_key)
LOG.info(f"Handled ask request for message_id={message_id}")
LOG.info(f"Handled discuss request for query={query}")

def _ask_model_for_opinion(self, respondent_nick: str, question: str,
answer: str, persona: dict) -> str:
Expand Down

0 comments on commit 4c43f11

Please sign in to comment.