Skip to content

Commit

Permalink
feat: add swarm output schemas and agent fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Occupying-Mars committed Jan 10, 2025
1 parent b408faf commit 0ce21da
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 39 deletions.
70 changes: 31 additions & 39 deletions new_features_examples/swarmarrange/swarm_arange_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,49 +171,41 @@
regulatory_specialist,
]

# Define multiple flow patterns
flows = [
"Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance",
"Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist",
"Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance",
]

# Create instances of AgentRearrange for each flow pattern
blackstone_acquisition_analysis = AgentRearrange(
name="Blackstone-Acquisition-Analysis",
# Example 1: Automatic sequential flow
acquisition_analysis = AgentRearrange(
name="Acquisition-Analysis",
description="A system for analyzing potential acquisitions",
agents=agents,
flow=flows[0],
agents=agents
)
acquisition_analysis.set_flow_from_task("Analyze this acquisition step by step, starting with industry analysis and ending with financial recommendations")

blackstone_investment_strategy = AgentRearrange(
name="Blackstone-Investment-Strategy",
description="A system for evaluating investment opportunities",
agents=agents,
flow=flows[1],
# Example 2: Automatic parallel flow
market_research = AgentRearrange(
name="Market-Research",
description="A system for parallel market research",
agents=agents
)

blackstone_market_analysis = AgentRearrange(
name="Blackstone-Market-Analysis",
description="A system for analyzing market trends and opportunities",
agents=agents,
flow=flows[2],
market_research.set_flow_from_task("Analyze multiple market segments simultaneously to identify opportunities")

# Example 3: Hybrid flow with agent groups
agent_groups = {
"research": ["Industry-Analyst", "Tech-Expert", "Market-Researcher"],
"compliance": ["Regulatory-Specialist"],
"management": ["Managing-Director", "VP-Finance"]
}

investment_strategy = AgentRearrange(
name="Investment-Strategy",
description="A system for comprehensive investment analysis",
agents=agents
)
investment_strategy.flow = investment_strategy.generate_flow("hybrid", agent_groups)

swarm_arrange = SwarmRearrange(
name="Blackstone-Swarm",
description="A swarm that processes tasks concurrently using multiple agents and rearranges the flow based on the task requirements.",
swarms=[
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}",
max_loops=1,
)
# Run examples
print("Acquisition Analysis Flow:", acquisition_analysis.flow)
print("Market Research Flow:", market_research.flow)
print("Investment Strategy Flow:", investment_strategy.flow)

print(
swarm_arrange.run(
"Analyze NVIDIA's performance, market trends, and potential for growth in the AI industry"
)
)
# Example task execution
result = acquisition_analysis.run("Analyze potential acquisition of a tech startup valued at $50M")
print("\nAcquisition Analysis Result:", result)
61 changes: 61 additions & 0 deletions swarms/structs/rearrange.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,67 @@ def to_dict(self) -> Dict[str, Any]:
for attr_name, attr_value in self.__dict__.items()
}

def generate_flow(self, task_type: str = "sequential", agent_groups: Dict[str, List[str]] = None) -> str:

Check failure

Code scanning / Pyre

Incompatible variable type Error

Incompatible variable type [9]: agent_groups is declared to have type Dict[str, List[str]] but is used as type None.
"""
Automatically generates a flow pattern based on task type and agent groupings.
Args:
task_type (str): Type of flow pattern ('sequential', 'parallel', or 'hybrid')
agent_groups (Dict[str, List[str]], optional): Groups of agents that should work together
Format: {"group1": ["agent1", "agent2"], "group2": ["agent3", "agent4"]}
Returns:
str: Generated flow pattern
"""
if not self.agents:
raise ValueError("No agents available to generate flow.")

agent_names = list(self.agents.keys())

if task_type == "sequential":
# Simple sequential flow: a -> b -> c
return " -> ".join(agent_names)

elif task_type == "parallel":
# All agents work in parallel: a, b, c
return ", ".join(agent_names)

elif task_type == "hybrid" and agent_groups:
# Hybrid flow using agent groups: (a, b) -> (c, d) -> e
flow_parts = []
for group in agent_groups.values():
valid_agents = [a for a in group if a in agent_names]
if valid_agents:
flow_parts.append(", ".join(valid_agents))
return " -> ".join(flow_parts)

else:
raise ValueError("Invalid task_type or missing agent_groups for hybrid flow")

def set_flow_from_task(self, task: str):
"""
Analyzes the task description and automatically sets an appropriate flow pattern.
Args:
task (str): Task description to analyze
"""
# Keywords indicating parallel processing needs
parallel_keywords = ["compare", "analyze multiple", "parallel", "simultaneously"]
# Keywords indicating sequential processing needs
sequential_keywords = ["step by step", "in sequence", "following", "then"]

task_lower = task.lower()

if any(keyword in task_lower for keyword in parallel_keywords):
self.flow = self.generate_flow("parallel")
elif any(keyword in task_lower for keyword in sequential_keywords):
self.flow = self.generate_flow("sequential")
else:
# Default to sequential flow if no clear indicators
self.flow = self.generate_flow("sequential")

logger.info(f"Automatically set flow pattern: {self.flow}")


def rearrange(
agents: List[Agent] = None,
Expand Down

0 comments on commit 0ce21da

Please sign in to comment.