diff --git a/poetry.lock b/poetry.lock index 7b11f907..ef1f4785 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6166,4 +6166,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "~3.12" -content-hash = "9e2205ccfcf513cc6a7adef24999a4985171c0157f14e73e17333b81ebca3d9c" +content-hash = "de5faafa8fb2b356355ee4e406f31ebefdc03891f4e706a9949bfd610f1bf879" diff --git a/pyproject.toml b/pyproject.toml index a2143c0b..d0026714 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ python-decouple = "^3.8" redis = "^5.0.8" requests = "^2.32.3" scrubadub = {extras = ["all"], version = "^2.0.1"} +tenacity = "^9.0.0" tiktoken = "^0.7.0" [tool.poetry.group.test.dependencies] @@ -68,6 +69,9 @@ pythonpath = [ testpaths = [ "tests", ] +env_files = [ + ".env.test" +] [tool.poe.tasks] lint = "ruff check ." diff --git a/src/agents/common/exceptions.py b/src/agents/common/exceptions.py new file mode 100644 index 00000000..bfa402b1 --- /dev/null +++ b/src/agents/common/exceptions.py @@ -0,0 +1,6 @@ +class SubtasksMissingError(Exception): + """Exception raised when no subtasks are created for the given query.""" + + def __init__(self, query: str): + self.query = query + super().__init__(f"Subtasks are missing for the given query: {query}") diff --git a/src/agents/supervisor/agent.py b/src/agents/supervisor/agent.py index 427c1467..f6a8a106 100644 --- a/src/agents/supervisor/agent.py +++ b/src/agents/supervisor/agent.py @@ -1,3 +1,4 @@ +import logging from typing import Any, Literal, cast from langchain_core.embeddings import Embeddings @@ -9,9 +10,11 @@ from langgraph.graph import StateGraph from langgraph.graph.graph import CompiledGraph from pydantic import BaseModel, Field +from tenacity import after_log, before_log, retry, stop_after_attempt, wait_exponential from agents.common.constants import ( COMMON, + ERROR, FINALIZER, K8S_AGENT, KYMA_AGENT, @@ -19,6 +22,7 @@ NEXT, PLANNER, ) +from agents.common.exceptions import SubtasksMissingError from agents.common.response_converter import IResponseConverter, ResponseConverter from agents.common.state import Plan from agents.common.utils import create_node_output, filter_messages @@ -145,8 +149,15 @@ def _create_planner_chain(self, model: IModel) -> RunnableSequence: ) return self.planner_prompt | model.llm.with_structured_output(Plan) # type: ignore + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=4), + reraise=True, + before=before_log(logger, logging.WARNING), + after=after_log(logger, logging.DEBUG), + ) async def _invoke_planner(self, state: SupervisorState) -> Plan: - """Invoke the planner.""" + """Invoke the planner with retry logic using tenacity.""" filtered_messages = filter_messages_via_checks( state.messages, @@ -187,23 +198,16 @@ async def _plan(self, state: SupervisorState) -> dict[str, Any]: # if the Planner did not respond directly but also failed to create any subtasks, raise an exception if not plan.subtasks: - raise Exception( - f"No subtasks are created for the given query: {state.messages[-1].content}" - ) + raise SubtasksMissingError(state.messages[-1].content) # return the plan with the subtasks to be dispatched by the Router return create_node_output( next=ROUTER, subtasks=plan.subtasks, ) - except Exception as e: - logger.error(f"Error in planning: {e}") + except Exception: + logger.exception("Error in planning") return { - MESSAGES: [ - AIMessage( - content=f"Sorry, I encountered an error while processing the request. Error: {e}", - name=PLANNER, - ) - ] + ERROR: "Unexpected error while processing the request. Please try again later.", } def _final_response_chain(self, state: SupervisorState) -> RunnableSequence: diff --git a/src/services/conversation.py b/src/services/conversation.py index 29a8368a..376d43da 100644 --- a/src/services/conversation.py +++ b/src/services/conversation.py @@ -133,5 +133,4 @@ async def handle_request( async for chunk in self._companion_graph.astream( conversation_id, message, k8s_client ): - logger.debug(f"Sending chunk: {chunk}") yield chunk.encode() diff --git a/tests/unit/agents/supervisor/test_supervisor_agent.py b/tests/unit/agents/supervisor/test_supervisor_agent.py index 568b2866..710373d2 100644 --- a/tests/unit/agents/supervisor/test_supervisor_agent.py +++ b/tests/unit/agents/supervisor/test_supervisor_agent.py @@ -5,7 +5,7 @@ from langchain_core.messages import AIMessage, HumanMessage from langgraph.constants import END -from agents.common.constants import COMMON, PLANNER +from agents.common.constants import COMMON, ERROR, PLANNER from agents.common.state import CompanionState, Plan, SubTask from agents.k8s.agent import K8S_AGENT from agents.kyma.agent import KYMA_AGENT @@ -273,13 +273,7 @@ async def test_agent_generate_final_response( "What is a Kubernetes pod?", '{"response":null, "subtasks": null}', { - "messages": [ - AIMessage( - content="Sorry, I encountered an error while processing the request. " - "Error: No subtasks are created for the given query: What is a Kubernetes pod?", - name=PLANNER, - ) - ] + ERROR: "Unexpected error while processing the request. Please try again later.", }, None, ), @@ -305,13 +299,7 @@ async def test_agent_generate_final_response( "What is a Kubernetes service?", '{"response":null,"subtasks": [{"description": "Explain Kubernetes service", "assigned_to": "KubernetesAgent","status" : "pending"}]}', { - "messages": [ - AIMessage( - content="Sorry, I encountered an error while processing the request. " - "Error: fake error", - name=PLANNER, - ) - ] + ERROR: "Unexpected error while processing the request. Please try again later.", }, "fake error", ),