Skip to content

Commit

Permalink
Add retry mechanism for subtask generation
Browse files Browse the repository at this point in the history
- Add tenacity library for retry logic in planner invocation
-  Return erorr message in error field
- Create custom SubtasksMissingError exception for handling missing subtasks
- Update supervisor agent to use retry decorator with exponential backoff
  • Loading branch information
muralov committed Feb 4, 2025
1 parent 190ecec commit 8335b5a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 29 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ python-decouple = "^3.8"
redis = "^5.0.8"
requests = "^2.32.3"
scrubadub = {extras = ["all"], version = "^2.0.1"}
tenacity = "^9.0.0"
tiktoken = "^0.7.0"

[tool.poetry.group.test.dependencies]
Expand Down Expand Up @@ -68,6 +69,9 @@ pythonpath = [
testpaths = [
"tests",
]
env_files = [
".env.test"
]

[tool.poe.tasks]
lint = "ruff check ."
Expand Down
6 changes: 6 additions & 0 deletions src/agents/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class SubtasksMissingError(Exception):
"""Exception raised when no subtasks are created for the given query."""

def __init__(self, query: str):
self.query = query
super().__init__(f"Subtasks are missing for the given query: {query}")
28 changes: 16 additions & 12 deletions src/agents/supervisor/agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Any, Literal, cast

from langchain_core.embeddings import Embeddings
Expand All @@ -9,16 +10,19 @@
from langgraph.graph import StateGraph
from langgraph.graph.graph import CompiledGraph
from pydantic import BaseModel, Field
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_exponential

from agents.common.constants import (
COMMON,
ERROR,
FINALIZER,
K8S_AGENT,
KYMA_AGENT,
MESSAGES,
NEXT,
PLANNER,
)
from agents.common.exceptions import SubtasksMissingError
from agents.common.response_converter import IResponseConverter, ResponseConverter
from agents.common.state import Plan
from agents.common.utils import create_node_output, filter_messages
Expand Down Expand Up @@ -145,8 +149,15 @@ def _create_planner_chain(self, model: IModel) -> RunnableSequence:
)
return self.planner_prompt | model.llm.with_structured_output(Plan) # type: ignore

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
reraise=True,
before=before_log(logger, logging.WARNING),
after=after_log(logger, logging.DEBUG),
)
async def _invoke_planner(self, state: SupervisorState) -> Plan:
"""Invoke the planner."""
"""Invoke the planner with retry logic using tenacity."""

filtered_messages = filter_messages_via_checks(
state.messages,
Expand Down Expand Up @@ -187,23 +198,16 @@ async def _plan(self, state: SupervisorState) -> dict[str, Any]:

# if the Planner did not respond directly but also failed to create any subtasks, raise an exception
if not plan.subtasks:
raise Exception(
f"No subtasks are created for the given query: {state.messages[-1].content}"
)
raise SubtasksMissingError(state.messages[-1].content)
# return the plan with the subtasks to be dispatched by the Router
return create_node_output(
next=ROUTER,
subtasks=plan.subtasks,
)
except Exception as e:
logger.error(f"Error in planning: {e}")
except Exception:
logger.exception("Error in planning")
return {
MESSAGES: [
AIMessage(
content=f"Sorry, I encountered an error while processing the request. Error: {e}",
name=PLANNER,
)
]
ERROR: "Unexpected error while processing the request. Please try again later.",
}

def _final_response_chain(self, state: SupervisorState) -> RunnableSequence:
Expand Down
1 change: 0 additions & 1 deletion src/services/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,4 @@ async def handle_request(
async for chunk in self._companion_graph.astream(
conversation_id, message, k8s_client
):
logger.debug(f"Sending chunk: {chunk}")
yield chunk.encode()
18 changes: 3 additions & 15 deletions tests/unit/agents/supervisor/test_supervisor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.constants import END

from agents.common.constants import COMMON, PLANNER
from agents.common.constants import COMMON, ERROR, PLANNER
from agents.common.state import CompanionState, Plan, SubTask
from agents.k8s.agent import K8S_AGENT
from agents.kyma.agent import KYMA_AGENT
Expand Down Expand Up @@ -273,13 +273,7 @@ async def test_agent_generate_final_response(
"What is a Kubernetes pod?",
'{"response":null, "subtasks": null}',
{
"messages": [
AIMessage(
content="Sorry, I encountered an error while processing the request. "
"Error: No subtasks are created for the given query: What is a Kubernetes pod?",
name=PLANNER,
)
]
ERROR: "Unexpected error while processing the request. Please try again later.",
},
None,
),
Expand All @@ -305,13 +299,7 @@ async def test_agent_generate_final_response(
"What is a Kubernetes service?",
'{"response":null,"subtasks": [{"description": "Explain Kubernetes service", "assigned_to": "KubernetesAgent","status" : "pending"}]}',
{
"messages": [
AIMessage(
content="Sorry, I encountered an error while processing the request. "
"Error: fake error",
name=PLANNER,
)
]
ERROR: "Unexpected error while processing the request. Please try again later.",
},
"fake error",
),
Expand Down

0 comments on commit 8335b5a

Please sign in to comment.