diff --git a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index bcc585731f..924cef71cf 100644 --- a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -95,18 +95,29 @@ def _create_long_term_memory(self, output) -> None: pass def _ask_human_input(self, final_answer: str) -> str: - """Prompt human input for final decision making.""" + """Prompt human input with mode-appropriate messaging.""" self._printer.print( content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" ) - self._printer.print( - content=( + # Training mode prompt (single iteration) + if self.crew and getattr(self.crew, "_train", False): + prompt = ( "\n\n=====\n" - "## Please provide feedback on the Final Result and the Agent's actions. " - "Respond with 'looks good' or a similar phrase when you're satisfied.\n" + "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" + "This will be used to train better versions of the agent.\n" + "Please provide detailed feedback about the result quality and reasoning process.\n" "=====\n" - ), - color="bold_yellow", - ) + ) + # Regular human-in-the-loop prompt (multiple iterations) + else: + prompt = ( + "\n\n=====\n" + "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" + "Respond with 'looks good' to accept or provide specific improvement requests.\n" + "You can provide multiple rounds of feedback until satisfied.\n" + "=====\n" + ) + + self._printer.print(content=prompt, color="bold_yellow") return input() diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index b9797193c5..b144872b19 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -100,6 +100,12 @@ def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: try: formatted_answer = self._invoke_loop() + except AssertionError: + self._printer.print( + content="Agent failed to reach a final answer. This is likely a bug - please report it.", + color="red", + ) + raise except Exception as e: if e.__class__.__module__.startswith("litellm"): # Do not retry on litellm errors @@ -115,7 +121,7 @@ def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: self._create_long_term_memory(formatted_answer) return {"output": formatted_answer.output} - def _invoke_loop(self): + def _invoke_loop(self) -> AgentFinish: """ Main loop to invoke the agent's thought process until it reaches a conclusion or the maximum number of iterations is reached. @@ -161,6 +167,11 @@ def _invoke_loop(self): finally: self.iterations += 1 + # During the invoke loop, formatted_answer alternates between AgentAction + # (when the agent is using tools) and eventually becomes AgentFinish + # (when the agent reaches a final answer). This assertion confirms we've + # reached a final answer and helps type checking understand this transition. + assert isinstance(formatted_answer, AgentFinish) self._show_logs(formatted_answer) return formatted_answer @@ -292,8 +303,11 @@ def _show_start_logs(self): self._printer.print( content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" ) + description = ( + getattr(self.task, "description") if self.task else "Not Found" + ) self._printer.print( - content=f"\033[95m## Task:\033[00m \033[92m{self.task.description}\033[00m" + content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m" ) def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]): @@ -418,58 +432,50 @@ def _handle_context_length(self) -> None: ) def _handle_crew_training_output( - self, result: AgentFinish, human_feedback: str | None = None + self, result: AgentFinish, human_feedback: Optional[str] = None ) -> None: - """Function to handle the process of the training data.""" + """Handle the process of saving training data.""" agent_id = str(self.agent.id) # type: ignore + train_iteration = ( + getattr(self.crew, "_train_iteration", None) if self.crew else None + ) + + if train_iteration is None or not isinstance(train_iteration, int): + self._printer.print( + content="Invalid or missing train iteration. Cannot save training data.", + color="red", + ) + return - # Load training data training_handler = CrewTrainingHandler(TRAINING_DATA_FILE) - training_data = training_handler.load() - - # Check if training data exists, human input is not requested, and self.crew is valid - if training_data and not self.ask_for_human_input: - if self.crew is not None and hasattr(self.crew, "_train_iteration"): - train_iteration = self.crew._train_iteration - if agent_id in training_data and isinstance(train_iteration, int): - training_data[agent_id][train_iteration][ - "improved_output" - ] = result.output - training_handler.save(training_data) - else: - self._printer.print( - content="Invalid train iteration type or agent_id not in training data.", - color="red", - ) - else: - self._printer.print( - content="Crew is None or does not have _train_iteration attribute.", - color="red", - ) + training_data = training_handler.load() or {} - if self.ask_for_human_input and human_feedback is not None: - training_data = { + # Initialize or retrieve agent's training data + agent_training_data = training_data.get(agent_id, {}) + + if human_feedback is not None: + # Save initial output and human feedback + agent_training_data[train_iteration] = { "initial_output": result.output, "human_feedback": human_feedback, - "agent": agent_id, - "agent_role": self.agent.role, # type: ignore } - if self.crew is not None and hasattr(self.crew, "_train_iteration"): - train_iteration = self.crew._train_iteration - if isinstance(train_iteration, int): - CrewTrainingHandler(TRAINING_DATA_FILE).append( - train_iteration, agent_id, training_data - ) - else: - self._printer.print( - content="Invalid train iteration type. Expected int.", - color="red", - ) + else: + # Save improved output + if train_iteration in agent_training_data: + agent_training_data[train_iteration]["improved_output"] = result.output else: self._printer.print( - content="Crew is None or does not have _train_iteration attribute.", + content=( + f"No existing training data for agent {agent_id} and iteration " + f"{train_iteration}. Cannot save improved output." + ), color="red", ) + return + + # Update the training data and save + training_data[agent_id] = agent_training_data + training_handler.save(training_data) def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str: prompt = prompt.replace("{input}", inputs["input"]) @@ -485,82 +491,103 @@ def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]: return {"role": role, "content": prompt} def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: - """ - Handles the human feedback loop, allowing the user to provide feedback - on the agent's output and determining if additional iterations are needed. + """Handle human feedback with different flows for training vs regular use. - Parameters: - formatted_answer (AgentFinish): The initial output from the agent. + Args: + formatted_answer: The initial AgentFinish result to get feedback on Returns: - AgentFinish: The final output after incorporating human feedback. + AgentFinish: The final answer after processing feedback """ - while self.ask_for_human_input: - human_feedback = self._ask_human_input(formatted_answer.output) - - if self.crew and self.crew._train: - self._handle_crew_training_output(formatted_answer, human_feedback) - - # Make an LLM call to verify if additional changes are requested based on human feedback - additional_changes_prompt = self._i18n.slice( - "human_feedback_classification" - ).format(feedback=human_feedback) - - retry_count = 0 - llm_call_successful = False - additional_changes_response = None - - while retry_count < MAX_LLM_RETRY and not llm_call_successful: - try: - additional_changes_response = ( - self.llm.call( - [ - self._format_msg( - additional_changes_prompt, role="system" - ) - ], - callbacks=self.callbacks, - ) - .strip() - .lower() - ) - llm_call_successful = True - except Exception as e: - retry_count += 1 + human_feedback = self._ask_human_input(formatted_answer.output) - self._printer.print( - content=f"Error during LLM call to classify human feedback: {e}. Retrying... ({retry_count}/{MAX_LLM_RETRY})", - color="red", - ) + if self._is_training_mode(): + return self._handle_training_feedback(formatted_answer, human_feedback) - if not llm_call_successful: - self._printer.print( - content="Error processing feedback after multiple attempts.", - color="red", - ) - self.ask_for_human_input = False - break + return self._handle_regular_feedback(formatted_answer, human_feedback) + + def _is_training_mode(self) -> bool: + """Check if crew is in training mode.""" + return bool(self.crew and self.crew._train) + + def _handle_training_feedback( + self, initial_answer: AgentFinish, feedback: str + ) -> AgentFinish: + """Process feedback for training scenarios with single iteration.""" + self._printer.print( + content="\nProcessing training feedback.\n", + color="yellow", + ) + self._handle_crew_training_output(initial_answer, feedback) + self.messages.append(self._format_msg(f"Feedback: {feedback}")) + improved_answer = self._invoke_loop() + self._handle_crew_training_output(improved_answer) + self.ask_for_human_input = False + return improved_answer - if additional_changes_response == "false": + def _handle_regular_feedback( + self, current_answer: AgentFinish, initial_feedback: str + ) -> AgentFinish: + """Process feedback for regular use with potential multiple iterations.""" + feedback = initial_feedback + answer = current_answer + + while self.ask_for_human_input: + response = self._get_llm_feedback_response(feedback) + + if not self._feedback_requires_changes(response): self.ask_for_human_input = False - elif additional_changes_response == "true": - self.ask_for_human_input = True - # Add human feedback to messages - self.messages.append(self._format_msg(f"Feedback: {human_feedback}")) - # Invoke the loop again with updated messages - formatted_answer = self._invoke_loop() - - if self.crew and self.crew._train: - self._handle_crew_training_output(formatted_answer) else: - # Unexpected response - self._printer.print( - content=f"Unexpected response from LLM: '{additional_changes_response}'. Assuming no additional changes requested.", - color="red", - ) - self.ask_for_human_input = False + answer = self._process_feedback_iteration(feedback) + feedback = self._ask_human_input(answer.output) - return formatted_answer + return answer + + def _get_llm_feedback_response(self, feedback: str) -> Optional[str]: + """Get LLM classification of whether feedback requires changes.""" + prompt = self._i18n.slice("human_feedback_classification").format( + feedback=feedback + ) + message = self._format_msg(prompt, role="system") + + for retry in range(MAX_LLM_RETRY): + try: + response = self.llm.call([message], callbacks=self.callbacks) + return response.strip().lower() if response else None + except Exception as error: + self._log_feedback_error(retry, error) + + self._log_max_retries_exceeded() + return None + + def _feedback_requires_changes(self, response: Optional[str]) -> bool: + """Determine if feedback response indicates need for changes.""" + return response == "true" if response else False + + def _process_feedback_iteration(self, feedback: str) -> AgentFinish: + """Process a single feedback iteration.""" + self.messages.append(self._format_msg(f"Feedback: {feedback}")) + return self._invoke_loop() + + def _log_feedback_error(self, retry_count: int, error: Exception) -> None: + """Log feedback processing errors.""" + self._printer.print( + content=( + f"Error processing feedback: {error}. " + f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})" + ), + color="red", + ) + + def _log_max_retries_exceeded(self) -> None: + """Log when max retries for feedback processing are exceeded.""" + self._printer.print( + content=( + f"Failed to process feedback after {MAX_LLM_RETRY} attempts. " + "Ending feedback loop." + ), + color="red", + ) def _handle_max_iterations_exceeded(self, formatted_answer): """ diff --git a/src/crewai/crew.py b/src/crewai/crew.py index b446670424..93987f3b86 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -494,21 +494,26 @@ def train( train_crew = self.copy() train_crew._setup_for_training(filename) - for n_iteration in range(n_iterations): - train_crew._train_iteration = n_iteration - train_crew.kickoff(inputs=inputs) + try: + for n_iteration in range(n_iterations): + train_crew._train_iteration = n_iteration + train_crew.kickoff(inputs=inputs) - training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load() + training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load() - for agent in train_crew.agents: - if training_data.get(str(agent.id)): - result = TaskEvaluator(agent).evaluate_training_data( - training_data=training_data, agent_id=str(agent.id) - ) - - CrewTrainingHandler(filename).save_trained_data( - agent_id=str(agent.role), trained_data=result.model_dump() - ) + for agent in train_crew.agents: + if training_data.get(str(agent.id)): + result = TaskEvaluator(agent).evaluate_training_data( + training_data=training_data, agent_id=str(agent.id) + ) + CrewTrainingHandler(filename).save_trained_data( + agent_id=str(agent.role), trained_data=result.model_dump() + ) + except Exception as e: + self._logger.log("error", f"Training failed: {e}", color="red") + CrewTrainingHandler(TRAINING_DATA_FILE).clear() + CrewTrainingHandler(filename).clear() + raise def kickoff( self, diff --git a/src/crewai/utilities/evaluators/task_evaluator.py b/src/crewai/utilities/evaluators/task_evaluator.py index acfdceed63..2946292743 100644 --- a/src/crewai/utilities/evaluators/task_evaluator.py +++ b/src/crewai/utilities/evaluators/task_evaluator.py @@ -92,13 +92,34 @@ def evaluate_training_data( """ output_training_data = training_data[agent_id] - final_aggregated_data = "" - for _, data in output_training_data.items(): + + for iteration, data in output_training_data.items(): + improved_output = data.get("improved_output") + initial_output = data.get("initial_output") + human_feedback = data.get("human_feedback") + + if not all([improved_output, initial_output, human_feedback]): + missing_fields = [ + field + for field in ["improved_output", "initial_output", "human_feedback"] + if not data.get(field) + ] + error_msg = ( + f"Critical training data error: Missing fields ({', '.join(missing_fields)}) " + f"for agent {agent_id} in iteration {iteration}.\n" + "This indicates a broken training process. " + "Cannot proceed with evaluation.\n" + "Please check your training implementation." + ) + raise ValueError(error_msg) + final_aggregated_data += ( - f"Initial Output:\n{data.get('initial_output', '')}\n\n" - f"Human Feedback:\n{data.get('human_feedback', '')}\n\n" - f"Improved Output:\n{data.get('improved_output', '')}\n\n" + f"Iteration: {iteration}\n" + f"Initial Output:\n{initial_output}\n\n" + f"Human Feedback:\n{human_feedback}\n\n" + f"Improved Output:\n{improved_output}\n\n" + "------------------------------------------------\n\n" ) evaluation_query = ( diff --git a/src/crewai/utilities/training_handler.py b/src/crewai/utilities/training_handler.py index 5cadde619f..b6b3c38b62 100644 --- a/src/crewai/utilities/training_handler.py +++ b/src/crewai/utilities/training_handler.py @@ -1,3 +1,5 @@ +import os + from crewai.utilities.file_handler import PickleHandler @@ -29,3 +31,10 @@ def append(self, train_iteration: int, agent_id: str, new_data) -> None: data[agent_id] = {train_iteration: new_data} self.save(data) + + def clear(self) -> None: + """Clear the training data by removing the file or resetting its contents.""" + if os.path.exists(self.file_path): + with open(self.file_path, "wb") as file: + # Overwrite with an empty dictionary + self.save({}) diff --git a/tests/utilities/evaluators/test_task_evaluator.py b/tests/utilities/evaluators/test_task_evaluator.py index 8a0be027a3..e4de1db620 100644 --- a/tests/utilities/evaluators/test_task_evaluator.py +++ b/tests/utilities/evaluators/test_task_evaluator.py @@ -48,9 +48,9 @@ def test_evaluate_training_data(converter_mock): mock.call( llm=original_agent.llm, text="Assess the quality of the training data based on the llm output, human feedback , and llm " - "output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback " - "1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman " - "Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- Provide " + "output improved result.\n\nIteration: data1\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback " + "1\n\nImproved Output:\nImproved output 1\n\n------------------------------------------------\n\nIteration: data2\nInitial Output:\nInitial output 2\n\nHuman " + "Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\n------------------------------------------------\n\nPlease provide:\n- Provide " "a list of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's " "performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific " "action items for future tasks. Ensure all key and specificpoints from the human feedback are "