Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/llm-env-fix-for-daniel
Browse files Browse the repository at this point in the history
  • Loading branch information
bhancockio authored Jan 30, 2025
2 parents c471f67 + 7bed63a commit a8124cd
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 137 deletions.
27 changes: 19 additions & 8 deletions src/crewai/agents/agent_builder/base_agent_executor_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,29 @@ def _create_long_term_memory(self, output) -> None:
pass

def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input for final decision making."""
"""Prompt human input with mode-appropriate messaging."""
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)

self._printer.print(
content=(
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## Please provide feedback on the Final Result and the Agent's actions. "
"Respond with 'looks good' or a similar phrase when you're satisfied.\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
),
color="bold_yellow",
)
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Respond with 'looks good' to accept or provide specific improvement requests.\n"
"You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
)

self._printer.print(content=prompt, color="bold_yellow")
return input()
243 changes: 135 additions & 108 deletions src/crewai/agents/crew_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:

try:
formatted_answer = self._invoke_loop()
except AssertionError:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
Expand All @@ -115,7 +121,7 @@ def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
self._create_long_term_memory(formatted_answer)
return {"output": formatted_answer.output}

def _invoke_loop(self):
def _invoke_loop(self) -> AgentFinish:
"""
Main loop to invoke the agent's thought process until it reaches a conclusion
or the maximum number of iterations is reached.
Expand Down Expand Up @@ -161,6 +167,11 @@ def _invoke_loop(self):
finally:
self.iterations += 1

# During the invoke loop, formatted_answer alternates between AgentAction
# (when the agent is using tools) and eventually becomes AgentFinish
# (when the agent reaches a final answer). This assertion confirms we've
# reached a final answer and helps type checking understand this transition.
assert isinstance(formatted_answer, AgentFinish)
self._show_logs(formatted_answer)
return formatted_answer

Expand Down Expand Up @@ -292,8 +303,11 @@ def _show_start_logs(self):
self._printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
description = (
getattr(self.task, "description") if self.task else "Not Found"
)
self._printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{self.task.description}\033[00m"
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)

def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
Expand Down Expand Up @@ -418,58 +432,50 @@ def _handle_context_length(self) -> None:
)

def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: str | None = None
self, result: AgentFinish, human_feedback: Optional[str] = None
) -> None:
"""Function to handle the process of the training data."""
"""Handle the process of saving training data."""
agent_id = str(self.agent.id) # type: ignore
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)

if train_iteration is None or not isinstance(train_iteration, int):
self._printer.print(
content="Invalid or missing train iteration. Cannot save training data.",
color="red",
)
return

# Load training data
training_handler = CrewTrainingHandler(TRAINING_DATA_FILE)
training_data = training_handler.load()

# Check if training data exists, human input is not requested, and self.crew is valid
if training_data and not self.ask_for_human_input:
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
train_iteration = self.crew._train_iteration
if agent_id in training_data and isinstance(train_iteration, int):
training_data[agent_id][train_iteration][
"improved_output"
] = result.output
training_handler.save(training_data)
else:
self._printer.print(
content="Invalid train iteration type or agent_id not in training data.",
color="red",
)
else:
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
color="red",
)
training_data = training_handler.load() or {}

if self.ask_for_human_input and human_feedback is not None:
training_data = {
# Initialize or retrieve agent's training data
agent_training_data = training_data.get(agent_id, {})

if human_feedback is not None:
# Save initial output and human feedback
agent_training_data[train_iteration] = {
"initial_output": result.output,
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.agent.role, # type: ignore
}
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
train_iteration = self.crew._train_iteration
if isinstance(train_iteration, int):
CrewTrainingHandler(TRAINING_DATA_FILE).append(
train_iteration, agent_id, training_data
)
else:
self._printer.print(
content="Invalid train iteration type. Expected int.",
color="red",
)
else:
# Save improved output
if train_iteration in agent_training_data:
agent_training_data[train_iteration]["improved_output"] = result.output
else:
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
content=(
f"No existing training data for agent {agent_id} and iteration "
f"{train_iteration}. Cannot save improved output."
),
color="red",
)
return

# Update the training data and save
training_data[agent_id] = agent_training_data
training_handler.save(training_data)

def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
prompt = prompt.replace("{input}", inputs["input"])
Expand All @@ -485,82 +491,103 @@ def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
return {"role": role, "content": prompt}

def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""
Handles the human feedback loop, allowing the user to provide feedback
on the agent's output and determining if additional iterations are needed.
"""Handle human feedback with different flows for training vs regular use.
Parameters:
formatted_answer (AgentFinish): The initial output from the agent.
Args:
formatted_answer: The initial AgentFinish result to get feedback on
Returns:
AgentFinish: The final output after incorporating human feedback.
AgentFinish: The final answer after processing feedback
"""
while self.ask_for_human_input:
human_feedback = self._ask_human_input(formatted_answer.output)

if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer, human_feedback)

# Make an LLM call to verify if additional changes are requested based on human feedback
additional_changes_prompt = self._i18n.slice(
"human_feedback_classification"
).format(feedback=human_feedback)

retry_count = 0
llm_call_successful = False
additional_changes_response = None

while retry_count < MAX_LLM_RETRY and not llm_call_successful:
try:
additional_changes_response = (
self.llm.call(
[
self._format_msg(
additional_changes_prompt, role="system"
)
],
callbacks=self.callbacks,
)
.strip()
.lower()
)
llm_call_successful = True
except Exception as e:
retry_count += 1
human_feedback = self._ask_human_input(formatted_answer.output)

self._printer.print(
content=f"Error during LLM call to classify human feedback: {e}. Retrying... ({retry_count}/{MAX_LLM_RETRY})",
color="red",
)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)

if not llm_call_successful:
self._printer.print(
content="Error processing feedback after multiple attempts.",
color="red",
)
self.ask_for_human_input = False
break
return self._handle_regular_feedback(formatted_answer, human_feedback)

def _is_training_mode(self) -> bool:
"""Check if crew is in training mode."""
return bool(self.crew and self.crew._train)

def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process feedback for training scenarios with single iteration."""
self._printer.print(
content="\nProcessing training feedback.\n",
color="yellow",
)
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
improved_answer = self._invoke_loop()
self._handle_crew_training_output(improved_answer)
self.ask_for_human_input = False
return improved_answer

if additional_changes_response == "false":
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations."""
feedback = initial_feedback
answer = current_answer

while self.ask_for_human_input:
response = self._get_llm_feedback_response(feedback)

if not self._feedback_requires_changes(response):
self.ask_for_human_input = False
elif additional_changes_response == "true":
self.ask_for_human_input = True
# Add human feedback to messages
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
# Invoke the loop again with updated messages
formatted_answer = self._invoke_loop()

if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer)
else:
# Unexpected response
self._printer.print(
content=f"Unexpected response from LLM: '{additional_changes_response}'. Assuming no additional changes requested.",
color="red",
)
self.ask_for_human_input = False
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)

return formatted_answer
return answer

def _get_llm_feedback_response(self, feedback: str) -> Optional[str]:
"""Get LLM classification of whether feedback requires changes."""
prompt = self._i18n.slice("human_feedback_classification").format(
feedback=feedback
)
message = self._format_msg(prompt, role="system")

for retry in range(MAX_LLM_RETRY):
try:
response = self.llm.call([message], callbacks=self.callbacks)
return response.strip().lower() if response else None
except Exception as error:
self._log_feedback_error(retry, error)

self._log_max_retries_exceeded()
return None

def _feedback_requires_changes(self, response: Optional[str]) -> bool:
"""Determine if feedback response indicates need for changes."""
return response == "true" if response else False

def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
return self._invoke_loop()

def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
self._printer.print(
content=(
f"Error processing feedback: {error}. "
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
),
color="red",
)

def _log_max_retries_exceeded(self) -> None:
"""Log when max retries for feedback processing are exceeded."""
self._printer.print(
content=(
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
"Ending feedback loop."
),
color="red",
)

def _handle_max_iterations_exceeded(self, formatted_answer):
"""
Expand Down
31 changes: 18 additions & 13 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,21 +494,26 @@ def train(
train_crew = self.copy()
train_crew._setup_for_training(filename)

for n_iteration in range(n_iterations):
train_crew._train_iteration = n_iteration
train_crew.kickoff(inputs=inputs)
try:
for n_iteration in range(n_iterations):
train_crew._train_iteration = n_iteration
train_crew.kickoff(inputs=inputs)

training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()

for agent in train_crew.agents:
if training_data.get(str(agent.id)):
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)

CrewTrainingHandler(filename).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
for agent in train_crew.agents:
if training_data.get(str(agent.id)):
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler(filename).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
except Exception as e:
self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear()
CrewTrainingHandler(filename).clear()
raise

def kickoff(
self,
Expand Down
Loading

0 comments on commit a8124cd

Please sign in to comment.