Skip to content

Commit

Permalink
Update First Version
Browse files Browse the repository at this point in the history
  • Loading branch information
didiforgithub committed Jan 15, 2025
1 parent 1283701 commit d680212
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 88 deletions.
47 changes: 37 additions & 10 deletions metagpt/ext/eflow/src/optimize_operators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
from metagpt.ext.eflow.src.abstract import Operator
from metagpt.ext.eflow.src.prompts.opt_prompts import ATTRIBUTE_ON_RAW_BETTER_PROMPT, ATTRIBUTE_ON_OPT_BETTER_PROMPT, ATTRIBUTE_ON_BOTH_LOW_PROMPT, ATTRIBUTE_OVERALL_PROMPT, WORKFLOW_OPTIMIZE_PROMPT
from metagpt.ext.eflow.src.prompts.opt_prompts import (
ATTRIBUTE_ON_RAW_BETTER_PROMPT,
ATTRIBUTE_ON_OPT_BETTER_PROMPT,
ATTRIBUTE_ON_BOTH_LOW_PROMPT,
ATTRIBUTE_OVERALL_PROMPT,
WORKFLOW_OPTIMIZE_PROMPT,
WORKFLOW_INPUT,
WORKFLOW_CUSTOM_USE,
)
from metagpt.llm import LLM
import asyncio

class AttributeQueryOperator(Operator):
def __init__(self, model: LLM):
super().__init__(model, "Attribute")
self.max_retries = 3 # 添加最大重试次数
self.schema = [
{"name": "thought", "type": "str", "description": "Your thought of the attribution process for this case"},
{"name": "attribution_on_answer", "type": "str", "description": "The attribution of this case on the answer"},
Expand Down Expand Up @@ -36,10 +46,23 @@ async def __call__(self, raw_workflow, opt_workflow, question_description, raw_a
raw_answer=raw_answer,
opt_answer=opt_answer
)
response = await self._fill_node(
op_schema=self.schema, prompt=prompt, format="xml_fill", model=model,
)
return response

# 添加重试逻辑
retries = 0
last_error = None

while retries < self.max_retries:
try:
response = await self._fill_node(
op_schema=self.schema, prompt=prompt, format="xml_fill", model=model,
)
return response
except Exception as e:
last_error = e
retries += 1
if retries == self.max_retries:
raise Exception(f"重试{self.max_retries}次后仍然失败: {str(last_error)}")
await asyncio.sleep(1 * retries) # 指数退避

class AttributeOverallOperator(Operator):
def __init__(self, model: LLM):
Expand Down Expand Up @@ -68,16 +91,20 @@ def __init__(self, model: LLM):
{"name": "prompt", "type": "str", "description": "The prompt of the workflow"},
]

async def __call__(self, optimize_signal, raw_workflow, custom_prompt, operator_description, model: LLM = None):
async def __call__(self, optimize_signal, raw_workflow, custom_prompt, operator_description, question_type="code generation", model: LLM = None):
"""
Version 1.0: 只考虑使用已有的优化信号,对raw_workflow进行优化
"""
prompt = WORKFLOW_OPTIMIZE_PROMPT.format(
optimize_signal=optimize_signal,
raw_workflow=raw_workflow,
custom_prompt=custom_prompt,
input_content = WORKFLOW_INPUT.format(
optimization_signals=optimize_signal,
workflow=raw_workflow,
prompt=custom_prompt,
operator_description=operator_description
)
system_content = WORKFLOW_OPTIMIZE_PROMPT.format(
question_type = question_type
)
prompt = system_content + input_content + WORKFLOW_CUSTOM_USE
response = await self._fill_node(
op_schema=self.schema, prompt=prompt, format="xml_fill", model=model,
)
Expand Down
39 changes: 22 additions & 17 deletions metagpt/ext/eflow/src/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ def _load(self):
pass

def _combine_optimize_signals(self):
optimize_signals = "对当前Workflow的改进信号:\n" + self.optimize_signal_compared_with_father + "\n"
optimize_signals = "Error Attribution and Optimization Signals for Current Workflow:\n" + self.optimize_signal_compared_with_father + "\nFailure Improvement Attribution under Current Workflow:"
for child_node in self.child_nodes:
optimize_signals += child_node.optimize_signal_compared_with_father
optimize_signals += child_node.optimize_signal_compared_with_father + "\n"
return optimize_signals

class Optimizer:
def __init__(self, dataset, optimize_model, operator_description:str, max_round:int =10):
def __init__(self, dataset, optimize_model, operator_description:str, root_path:str, max_round:int =10):
self.dataset = dataset
self.optimize_model = optimize_model
optimize_llm_config = ModelsConfig.default().get(optimize_model)
self.optimize_model = create_llm_instance(optimize_llm_config)
self.optimize_model.cost_manager = CostManager()
self.workflow_optimize_operator = WorkflowOptimizeOperator(self.optimize_model)
self.operator_description = operator_description # TODO 这里我写死了,之后你用Graph utils 应该就有,我看你之前有写
self.max_round = max_round
self.root_path = root_path

def optimize_on_model_choice(self, workflow: Workflow, model_choices: List):
"""
Expand All @@ -62,7 +66,7 @@ def optimize_on_model_choice(self, workflow: Workflow, model_choices: List):
"""
pass

def optimize_on_workflow_structure(self, workflow, prompts, optimize_signal):
async def optimize_on_workflow_structure(self, workflow, prompts, optimize_signal):
"""
optimize the workflow's structure.
optimize signal is from the attributor.
Expand All @@ -74,34 +78,35 @@ def optimize_on_workflow_structure(self, workflow, prompts, optimize_signal):
2. attribute_table
3. optimize_signal
"""
response = self.workflow_optimize_operator(
response = await self.workflow_optimize_operator(
optimize_signal=optimize_signal,
raw_workflow=workflow,
custom_prompt=prompts,
operator_description=self.operator_description
)
modification = response["modification"]
optimize_workflow = response["workflow"]
optimize_prompts = response["prompt"]

logger.info(json.dumps(modification, indent=4))
logger.info(json.dumps(optimize_workflow, indent=4))
logger.info(json.dumps(optimize_prompts, indent=4))


# TODO 这里记得写入OptimizeNode,或者写入对应的ID的文件夹,让OptimizeNode做Load
return response

def optimize(self):
"""
full process of workflow optimization.
"""
depth = 0
layer_id = 0
for _ in range(self.max_round):

pass


if depth ==0:
cur_optimize_node = OptimizeNode(depth=depth, layer_id=layer_id, save_path=self.root_path)
# TODO 这里读取空白Workflow模板
else:
cur_save_path = f"{self.root_path}/{depth}_{layer_id}"
cur_optimize_node = OptimizeNode(depth=depth, layer_id=layer_id, save_path=cur_save_path)
# TODO 这里做树结构搜索



class Attributor:

def __init__(self, dataset, attribute_query_model_name, attribute_overall_model_name, score_threshold=0.5):
self.dataset = dataset
self.score_threshold = score_threshold
Expand Down
78 changes: 53 additions & 25 deletions metagpt/ext/eflow/src/prompts/opt_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,31 +105,59 @@
In the "improvements" field: Suggest potential directions for improving this workflow
"""

# TODO Both Low 应该给一个正确答案?
WORKFLOW_OPTIMIZE_PROMPT = """
You are buidling a workflow to solve {question_type} problems.
Given a workflow and corresponding prompt, which is a workflow that can run on {question_type} problems.
Also, you are given the strengths, weaknesses, and potential improvements of this workflow.
Your task is to optimize the workflow to make it better.
## Optimization Rules
1. You can add, modify, or delete nodes, opeartors, or prompts in the workflow.
2. And in each optimization round, you can only make one modification. Make sure the optimized workflow and prompt are complete and correct to avoid runtime failure.
3. Ensure that all the prompts required by the current workflow from prompt_custom are included, and exclude any other prompts.
4. Output the modified workflow and all the necessary Prompts in prompt_custom (if needed).
5. The prompt you need to generate is only the one used in `prompt_custom.XXX` within Custom.
6. Other methods already have built-in prompts and are prohibited from being generated.
7. Only generate those needed for use in `prompt_custom`; please remove any unused prompts in prompt_custom.
8. The generated prompt must not contain any placeholders.
9. Considering information loss, complex workflows may yield better results, but insufficient information transmission can omit the solution.
10. It's crucial to include necessary context during the process.
"""


WORKFLOW_OPTIMIZE_PROMPT = """
You are building a workflow and corresponding Prompt to jointly solve {type} problems.
Referring to the given workflow and prompt, which forms a basic example of a {type} solution approach,
please reconstruct and optimize them.
You can add, modify, or delete nodes, parameters, or prompts.
Include your single modification in XML tags in your reply.
Ensure they are complete and correct to avoid runtime failures.
When optimizing, you can incorporate critical thinking methods like review, revise, ensemble (generating multiple answers through different/similar prompts, then voting/integrating/checking the majority to obtain a final answer), selfAsk, etc. Consider
Python's loops (for, while, list comprehensions), conditional statements (if-elif-else, ternary operators), or machine learning techniques (e.g., linear regression, decision trees, neural networks, clustering).
The workflow complexity should not exceed 10.
Use logical and control flow (IF-ELSE, loops) for a more enhanced graphical representation.
Ensure that all the prompts required by the current graph from prompt_custom are included.
Exclude any other prompts.
Output the modified graph and all the necessary Prompts in prompt_custom (if needed).
The prompt you need to generate is only the one used in `prompt_custom.XXX` within Custom.
Other methods already have built-in prompts and are prohibited from being generated.
Only generate those needed for use in `prompt_custom`; please remove any unused prompts in prompt_custom.
The generated prompt must not contain any placeholders.
Considering information loss, complex graphs may yield better results, but insufficient information transmission can omit the solution.
It's crucial to include necessary context during the process.

WORKFLOW_CUSTOM_USE = """\nHere's an example of using the `custom` method in workflow:
```
# You can write your own prompt in <prompt>prompt_custom</prompt> and then use it in the Custom method in the workflow
response = await self.custom(input=problem, instruction=prompt_custom.XXX_PROMPT)
# You can also concatenate previously generated string results in the input to provide more comprehensive contextual information.
# response = await self.custom(input=problem+f"xxx:{{xxx}}, xxx:{{xxx}}", instruction=prompt_custom.XXX_PROMPT)
# The output from the Custom method can be placed anywhere you need it, as shown in the example below
solution = await self.generate(problem=f"question:{{problem}}, xxx:{response['response']}")
```
Note: In custom, the input and instruction are directly concatenated(instruction+input), and placeholders are not supported. Please ensure to add comments and handle the concatenation externally.\n
**Introducing multiple operators at appropriate points can enhance performance. If you find that some provided operators are not yet used in the workflow, try incorporating them.**
"""

WORKFLOW_INPUT = """
Here is a workflow and the corresponding prompt (prompt only related to the custom method) that performed excellently in a previous iteration.
You must make further optimizations and improvements based on this workflow.
The modified workflow must differ from the provided example, and the specific differences should be noted within the <modification>xxx</modification> section.\n
<sample>
<optimization_signals>{optimization_signals}</optimization_signals>
<modification>(such as:add /delete /modify/ ...)</modification>
<workflow>{workflow}</workflow>
<prompt>{prompt}</prompt>(only prompt_custom)
<operator_description>{operator_description}</operator_description>
</sample>
First, provide optimization ideas.
**Only one detail point can be modified at a time**, and no more than 5 lines of code may be changed per modification—extensive modifications are strictly prohibited to maintain project focus!
When introducing new functionalities in the workflow, please make sure to import the necessary libraries or modules yourself, except for operator, prompt_custom, create_llm_instance, and CostManage, which have already been automatically imported.
**Under no circumstances should workflow output None for any field.**
Use custom methods to restrict your output format, rather than using code (outside of the code, the system will extract answers based on certain rules and score them).
It is very important to format the workflow output answers, you can refer to the standard answer format in the log.
"""
22 changes: 22 additions & 0 deletions op_desc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"CustomCodeGenerate": {
"description": "Generates code based on customized input and instruction.",
"interface": "custom_code_generate(problem: str, entry_point: str, instruction: str) -> dict with key 'response' of type str"
},
"CodeGenerate": {
"description": "Generates code based on input and instruction.",
"interface": "code_generate(problem: str, entry_point: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str], problem: str) -> dict with key 'response' of type str"
},
"Test": {
"description": "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
"interface": "test(problem: str, solution: str, entry_point: str) -> dict with key 'result' of type bool and key 'solution' of type str"
}
}
64 changes: 28 additions & 36 deletions run.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,43 @@
import pandas as pd
import json
import asyncio

from metagpt.ext.eflow.src.optimizer import Attributor
from typing import List
from metagpt.ext.eflow.src.optimizer import Attributor, Optimizer
from metagpt.logs import logger

test_attribute_model = "gpt-4o-mini"
test_attribute_overall_model = "gpt-4o"
optimize_model = "claude-3-5-sonnet-20240620"
test_score_threshold = 0.5
dataset = "HumanEval"

def load_operators_description(operators: List[str]) -> str:
path = f"op_desc.json"
operators_description = ""
for id, operator in enumerate(operators):
operator_description = load_operator_description(id + 1, operator, path)
operators_description += f"{operator_description}\n"
return operators_description

def load_operator_description(id: int, operator_name: str, file_path: str) -> str:
with open(file_path, "r") as f:
operator_data = json.load(f)
matched_data = operator_data[operator_name]
desc = matched_data["description"]
interface = matched_data["interface"]
return f"{id}. {operator_name}: {desc}, with interface {interface})."


test_attributor = Attributor(dataset, test_attribute_model, test_attribute_overall_model, test_score_threshold)
operator_desc = load_operators_description(["CodeGenerate", "Test", "Custom", "CustomCodeGenerate", "ScEnsemble"])
test_optimizer = Optimizer(dataset=dataset, optimize_model=optimize_model, operator_description=operator_desc, root_path="")


test_dataset_path = "metagpt/ext/aflow/data/humaneval_incremental.jsonl"
raw_workflow_data_path = "raw_workflow_data.csv"
opt_workflow_data_path = "opt_workflow_data.csv"

raw_workflow = """
GENERATE_PROMPT = "{{problem}}\nGenerate an answer to this question, without any additional test cases. "
REFLECTION_ON_PUBLIC_TEST_PROMPT = "
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
### problem
{{problem}}
### Code Solution
{{solution}}
### Execution Result
{{exec_pass}}
#### Failed Test Case
{{test_fail}}
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
"
Expand All @@ -49,21 +55,6 @@ async def __call__(self, problem: str, entry_point: str):
"""

opt_workflow = """
GENERATE_PROMPT = "{{problem}}\nGenerate an answer to this question, without any additional test cases. "
REFLECTION_ON_PUBLIC_TEST_PROMPT = "
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
### problem
{{problem}}
### Code Solution
{{solution}}
### Execution Result
{{exec_pass}}
#### Failed Test Case
{{test_fail}}
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
"
async def __call__(self, problem: str, entry_point: str):
Expand Down Expand Up @@ -103,16 +94,17 @@ def load_case_table(path, workflow):
return case_table




if __name__ == "__main__":

async def main(raw_case_table, opt_case_table):
attribute_table, attribute_cost, overall_cost = await test_attributor.attribute(raw_case_table, opt_case_table)
logger.info(json.dumps(attribute_table, indent=4))
opt_signal, attribute_cost, overall_cost = await test_attributor.attribute(raw_case_table, opt_case_table)
logger.info(json.dumps(opt_signal, indent=4))
logger.info(attribute_cost)
logger.info(overall_cost)

response = await test_optimizer.optimize_on_workflow_structure(workflow=opt_workflow, prompts="", optimize_signal=opt_signal)
logger.info(json.dumps(response, indent=4))

raw_case_table = load_case_table(raw_workflow_data_path, raw_workflow)
opt_case_table = load_case_table(opt_workflow_data_path, opt_workflow)

Expand Down

0 comments on commit d680212

Please sign in to comment.