From 55018c636a3c0f7d49a21c1a7e5a69be388dc20f Mon Sep 17 00:00:00 2001 From: mike dupont Date: Sat, 7 Dec 2024 12:54:06 -0500 Subject: [PATCH 1/3] adding emacs --- .gitignore | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/.gitignore b/.gitignore index 9f6e25b6b..65ce495ce 100644 --- a/.gitignore +++ b/.gitignore @@ -224,3 +224,52 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ .vscode/settings.json +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* + +# Org-mode +.org-id-locations +*_archive + +# flymake-mode +*_flymake.* + +# eshell files +/eshell/history +/eshell/lastdir + +# elpa packages +/elpa/ + +# reftex files +*.rel + +# AUCTeX auto folder +/auto/ + +# cask packages +.cask/ +dist/ + +# Flycheck +flycheck_*.el + +# server auth directory +/server/ + +# projectiles files +.projectile + +# directory configuration +.dir-locals.el + +# network security +/network-security.data + From 449b2db79ed82532a3e2c915510079ebdf24fd6d Mon Sep 17 00:00:00 2001 From: mike dupont Date: Sat, 7 Dec 2024 16:00:03 -0500 Subject: [PATCH 2/3] adding main --- api/agent_api.py | 3 + api/main.py | 638 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 641 insertions(+) create mode 100644 api/main.py diff --git a/api/agent_api.py b/api/agent_api.py index d1968d9d8..83d051015 100644 --- a/api/agent_api.py +++ b/api/agent_api.py @@ -619,6 +619,7 @@ def create_app() -> FastAPI: if __name__ == "__main__": # Configure uvicorn logging + print("in main") logger.info("API Starting") uvicorn.run( "main:create_app", @@ -627,3 +628,5 @@ def create_app() -> FastAPI: reload=True, workers=4, ) +else: + print("not in main") diff --git a/api/main.py b/api/main.py new file mode 100644 index 000000000..768e8d962 --- /dev/null +++ b/api/main.py @@ -0,0 +1,638 @@ +import os +from fastapi import ( + FastAPI, + HTTPException, + status, + Query, + BackgroundTasks, +) +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field +from typing import Optional, Dict, Any, List +from loguru import logger +import uvicorn +from datetime import datetime, timedelta +from uuid import UUID, uuid4 +from enum import Enum +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor +import traceback + +from swarms import Agent +from dotenv import load_dotenv + +print ("starting") +# Load environment variables +load_dotenv() + +# Configure Loguru +logger.add( + "logs/api_{time}.log", + rotation="500 MB", + retention="10 days", + level="INFO", + format="{time} {level} {message}", + backtrace=True, + diagnose=True, +) + + +class AgentStatus(str, Enum): + """Enum for agent status.""" + + IDLE = "idle" + PROCESSING = "processing" + ERROR = "error" + MAINTENANCE = "maintenance" + + +class AgentConfig(BaseModel): + """Configuration model for creating a new agent.""" + + agent_name: str = Field(..., description="Name of the agent") + model_name: str = Field( + ..., + description="Name of the llm you want to use provided by litellm", + ) + description: str = Field( + default="", description="Description of the agent's purpose" + ) + system_prompt: str = Field( + ..., description="System prompt for the agent" + ) + model_name: str = Field( + default="gpt-4", description="Model name to use" + ) + temperature: float = Field( + default=0.1, + ge=0.0, + le=2.0, + description="Temperature for the model", + ) + max_loops: int = Field( + default=1, ge=1, description="Maximum number of loops" + ) + autosave: bool = Field( + default=True, description="Enable autosave" + ) + dashboard: bool = Field( + default=False, description="Enable dashboard" + ) + verbose: bool = Field( + default=True, description="Enable verbose output" + ) + dynamic_temperature_enabled: bool = Field( + default=True, description="Enable dynamic temperature" + ) + user_name: str = Field( + default="default_user", description="Username for the agent" + ) + retry_attempts: int = Field( + default=1, ge=1, description="Number of retry attempts" + ) + context_length: int = Field( + default=200000, ge=1000, description="Context length" + ) + output_type: str = Field( + default="string", description="Output type (string or json)" + ) + streaming_on: bool = Field( + default=False, description="Enable streaming" + ) + tags: List[str] = Field( + default_factory=list, + description="Tags for categorizing the agent", + ) + + +class AgentUpdate(BaseModel): + """Model for updating agent configuration.""" + + description: Optional[str] = None + system_prompt: Optional[str] = None + temperature: Optional[float] = None + max_loops: Optional[int] = None + tags: Optional[List[str]] = None + status: Optional[AgentStatus] = None + + +class AgentSummary(BaseModel): + """Summary model for agent listing.""" + + agent_id: UUID + agent_name: str + description: str + created_at: datetime + last_used: datetime + total_completions: int + tags: List[str] + status: AgentStatus + + +class AgentMetrics(BaseModel): + """Model for agent performance metrics.""" + + total_completions: int + average_response_time: float + error_rate: float + last_24h_completions: int + total_tokens_used: int + uptime_percentage: float + success_rate: float + peak_tokens_per_minute: int + + +class CompletionRequest(BaseModel): + """Model for completion requests.""" + + prompt: str = Field(..., description="The prompt to process") + agent_id: UUID = Field(..., description="ID of the agent to use") + max_tokens: Optional[int] = Field( + None, description="Maximum tokens to generate" + ) + temperature_override: Optional[float] = None + stream: bool = Field( + default=False, description="Enable streaming response" + ) + + +class CompletionResponse(BaseModel): + """Model for completion responses.""" + + agent_id: UUID + response: str + metadata: Dict[str, Any] + timestamp: datetime + processing_time: float + token_usage: Dict[str, int] + + +class AgentStore: + """Enhanced store for managing agents.""" + + def __init__(self): + self.agents: Dict[UUID, Agent] = {} + self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} + self.executor = ThreadPoolExecutor(max_workers=4) + self._ensure_directories() + + def _ensure_directories(self): + """Ensure required directories exist.""" + Path("logs").mkdir(exist_ok=True) + Path("states").mkdir(exist_ok=True) + + async def create_agent(self, config: AgentConfig) -> UUID: + """Create a new agent with the given configuration.""" + try: + + agent = Agent( + agent_name=config.agent_name, + system_prompt=config.system_prompt, + model_name=config.model_name, + max_loops=config.max_loops, + autosave=config.autosave, + dashboard=config.dashboard, + verbose=config.verbose, + dynamic_temperature_enabled=config.dynamic_temperature_enabled, + saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", + user_name=config.user_name, + retry_attempts=config.retry_attempts, + context_length=config.context_length, + return_step_meta=True, + output_type="str", + streaming_on=config.streaming_on, + ) + + agent_id = uuid4() + self.agents[agent_id] = agent + self.agent_metadata[agent_id] = { + "description": config.description, + "created_at": datetime.utcnow(), + "last_used": datetime.utcnow(), + "total_completions": 0, + "tags": config.tags, + "total_tokens": 0, + "error_count": 0, + "response_times": [], + "status": AgentStatus.IDLE, + "start_time": datetime.utcnow(), + "downtime": timedelta(), + "successful_completions": 0, + } + + logger.info(f"Created agent with ID: {agent_id}") + return agent_id + + except Exception as e: + logger.error(f"Error creating agent: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create agent: {str(e)}", + ) + + async def get_agent(self, agent_id: UUID) -> Agent: + """Retrieve an agent by ID.""" + agent = self.agents.get(agent_id) + if not agent: + logger.error(f"Agent not found: {agent_id}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return agent + + async def update_agent( + self, agent_id: UUID, update: AgentUpdate + ) -> None: + """Update agent configuration.""" + agent = await self.get_agent(agent_id) + metadata = self.agent_metadata[agent_id] + + if update.system_prompt: + agent.system_prompt = update.system_prompt + if update.temperature is not None: + agent.llm.temperature = update.temperature + if update.max_loops is not None: + agent.max_loops = update.max_loops + if update.tags is not None: + metadata["tags"] = update.tags + if update.description is not None: + metadata["description"] = update.description + if update.status is not None: + metadata["status"] = update.status + if update.status == AgentStatus.MAINTENANCE: + metadata["downtime"] += ( + datetime.utcnow() - metadata["last_used"] + ) + + logger.info(f"Updated agent {agent_id}") + + async def list_agents( + self, + tags: Optional[List[str]] = None, + status: Optional[AgentStatus] = None, + ) -> List[AgentSummary]: + """List all agents, optionally filtered by tags and status.""" + summaries = [] + for agent_id, agent in self.agents.items(): + metadata = self.agent_metadata[agent_id] + + # Apply filters + if tags and not any( + tag in metadata["tags"] for tag in tags + ): + continue + if status and metadata["status"] != status: + continue + + summaries.append( + AgentSummary( + agent_id=agent_id, + agent_name=agent.agent_name, + description=metadata["description"], + created_at=metadata["created_at"], + last_used=metadata["last_used"], + total_completions=metadata["total_completions"], + tags=metadata["tags"], + status=metadata["status"], + ) + ) + return summaries + + async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics: + """Get performance metrics for an agent.""" + metadata = self.agent_metadata[agent_id] + response_times = metadata["response_times"] + + # Calculate metrics + total_time = datetime.utcnow() - metadata["start_time"] + uptime = total_time - metadata["downtime"] + uptime_percentage = ( + uptime.total_seconds() / total_time.total_seconds() + ) * 100 + + success_rate = ( + metadata["successful_completions"] + / metadata["total_completions"] + * 100 + if metadata["total_completions"] > 0 + else 0 + ) + + return AgentMetrics( + total_completions=metadata["total_completions"], + average_response_time=( + sum(response_times) / len(response_times) + if response_times + else 0 + ), + error_rate=( + metadata["error_count"] + / metadata["total_completions"] + if metadata["total_completions"] > 0 + else 0 + ), + last_24h_completions=sum( + 1 + for t in response_times + if (datetime.utcnow() - t).days < 1 + ), + total_tokens_used=metadata["total_tokens"], + uptime_percentage=uptime_percentage, + success_rate=success_rate, + peak_tokens_per_minute=max( + metadata.get("tokens_per_minute", [0]) + ), + ) + + async def clone_agent( + self, agent_id: UUID, new_name: str + ) -> UUID: + """Clone an existing agent with a new name.""" + original_agent = await self.get_agent(agent_id) + original_metadata = self.agent_metadata[agent_id] + + config = AgentConfig( + agent_name=new_name, + description=f"Clone of {original_agent.agent_name}", + system_prompt=original_agent.system_prompt, + model_name=original_agent.llm.model_name, + temperature=original_agent.llm.temperature, + max_loops=original_agent.max_loops, + tags=original_metadata["tags"], + ) + + return await self.create_agent(config) + + async def delete_agent(self, agent_id: UUID) -> None: + """Delete an agent.""" + if agent_id not in self.agents: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + + # Clean up any resources + agent = self.agents[agent_id] + if agent.autosave and os.path.exists(agent.saved_state_path): + os.remove(agent.saved_state_path) + + del self.agents[agent_id] + del self.agent_metadata[agent_id] + logger.info(f"Deleted agent {agent_id}") + + async def process_completion( + self, + agent: Agent, + prompt: str, + agent_id: UUID, + max_tokens: Optional[int] = None, + temperature_override: Optional[float] = None, + ) -> CompletionResponse: + """Process a completion request using the specified agent.""" + start_time = datetime.utcnow() + metadata = self.agent_metadata[agent_id] + + try: + # Update agent status + metadata["status"] = AgentStatus.PROCESSING + metadata["last_used"] = start_time + + # Apply temporary overrides if specified + original_temp = agent.llm.temperature + if temperature_override is not None: + agent.llm.temperature = temperature_override + + # Process the completion + response = agent.run(prompt) + + # Reset overrides + if temperature_override is not None: + agent.llm.temperature = original_temp + + # Update metrics + processing_time = ( + datetime.utcnow() - start_time + ).total_seconds() + metadata["response_times"].append(processing_time) + metadata["total_completions"] += 1 + metadata["successful_completions"] += 1 + + # Estimate token usage (this is a rough estimate) + prompt_tokens = len(prompt.split()) * 1.3 + completion_tokens = len(response.split()) * 1.3 + total_tokens = int(prompt_tokens + completion_tokens) + metadata["total_tokens"] += total_tokens + + # Update tokens per minute tracking + current_minute = datetime.utcnow().replace( + second=0, microsecond=0 + ) + if "tokens_per_minute" not in metadata: + metadata["tokens_per_minute"] = {} + metadata["tokens_per_minute"][current_minute] = ( + metadata["tokens_per_minute"].get(current_minute, 0) + + total_tokens + ) + + return CompletionResponse( + agent_id=agent_id, + response=response, + metadata={ + "agent_name": agent.agent_name, + "model_name": agent.llm.model_name, + "temperature": agent.llm.temperature, + }, + timestamp=datetime.utcnow(), + processing_time=processing_time, + token_usage={ + "prompt_tokens": int(prompt_tokens), + "completion_tokens": int(completion_tokens), + "total_tokens": total_tokens, + }, + ) + + except Exception as e: + metadata["error_count"] += 1 + metadata["status"] = AgentStatus.ERROR + logger.error( + f"Error in completion processing: {str(e)}\n{traceback.format_exc()}" + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + finally: + metadata["status"] = AgentStatus.IDLE + + +class SwarmsAPI: + """Enhanced API class for Swarms agent integration.""" + + def __init__(self): + self.app = FastAPI( + title="Swarms Agent API", + description="Production-grade API for Swarms agent interaction", + version="1.0.0", + docs_url="/v1/docs", + redoc_url="/v1/redoc", + ) + self.store = AgentStore() + # Configure CORS + self.app.add_middleware( + CORSMiddleware, + allow_origins=[ + "*" + ], # Configure appropriately for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + self._setup_routes() + + def _setup_routes(self): + """Set up API routes.""" + + @self.app.post("/v1/agent", response_model=Dict[str, UUID]) + async def create_agent(config: AgentConfig): + """Create a new agent with the specified configuration.""" + agent_id = await self.store.create_agent(config) + return {"agent_id": agent_id} + + @self.app.get("/v1/agents", response_model=List[AgentSummary]) + async def list_agents( + tags: Optional[List[str]] = Query(None), + status: Optional[AgentStatus] = None, + ): + """List all agents, optionally filtered by tags and status.""" + return await self.store.list_agents(tags, status) + + @self.app.patch( + "/v1/agent/{agent_id}", response_model=Dict[str, str] + ) + async def update_agent(agent_id: UUID, update: AgentUpdate): + """Update an existing agent's configuration.""" + await self.store.update_agent(agent_id, update) + return {"status": "updated"} + + @self.app.get( + "/v1/agent/{agent_id}/metrics", + response_model=AgentMetrics, + ) + async def get_agent_metrics(agent_id: UUID): + """Get performance metrics for a specific agent.""" + return await self.store.get_agent_metrics(agent_id) + + @self.app.post( + "/v1/agent/{agent_id}/clone", + response_model=Dict[str, UUID], + ) + async def clone_agent(agent_id: UUID, new_name: str): + """Clone an existing agent with a new name.""" + new_id = await self.store.clone_agent(agent_id, new_name) + return {"agent_id": new_id} + + @self.app.delete("/v1/agent/{agent_id}") + async def delete_agent(agent_id: UUID): + """Delete an agent.""" + await self.store.delete_agent(agent_id) + return {"status": "deleted"} + + @self.app.post( + "/v1/agent/completions", response_model=CompletionResponse + ) + async def create_completion( + request: CompletionRequest, + background_tasks: BackgroundTasks, + ): + """Process a completion request with the specified agent.""" + try: + agent = await self.store.get_agent(request.agent_id) + + # Process completion + response = await self.store.process_completion( + agent, + request.prompt, + request.agent_id, + request.max_tokens, + request.temperature_override, + ) + + # Schedule background cleanup + background_tasks.add_task( + self._cleanup_old_metrics, request.agent_id + ) + + return response + + except Exception as e: + logger.error(f"Error processing completion: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + + @self.app.get("/v1/agent/{agent_id}/status") + async def get_agent_status(agent_id: UUID): + """Get the current status of an agent.""" + metadata = self.store.agent_metadata.get(agent_id) + if not metadata: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return { + "agent_id": agent_id, + "status": metadata["status"], + "last_used": metadata["last_used"], + "total_completions": metadata["total_completions"], + "error_count": metadata["error_count"], + } + + async def _cleanup_old_metrics(self, agent_id: UUID): + """Clean up old metrics data to prevent memory bloat.""" + metadata = self.store.agent_metadata.get(agent_id) + if metadata: + # Keep only last 24 hours of response times + cutoff = datetime.utcnow() - timedelta(days=1) + metadata["response_times"] = [ + t + for t in metadata["response_times"] + if isinstance(t, (int, float)) + and t > cutoff.timestamp() + ] + + # Clean up old tokens per minute data + if "tokens_per_minute" in metadata: + metadata["tokens_per_minute"] = { + k: v + for k, v in metadata["tokens_per_minute"].items() + if k > cutoff + } + + +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + print("create app") + api = SwarmsAPI() + return api.app + + +#if __name__ == "__main__": +if __name__ == '__main__': + #freeze_support() + print("yes in main") + # Configure uvicorn logging + logger.info("API Starting") + + uvicorn.run( + "main:create_app", + host="0.0.0.0", + port=8000, + # reload=True, + # workers=4, + ) +else: + print("not in main") + From 823051a9f4dedabf9b5a0d4f79a553b5ea8b81fd Mon Sep 17 00:00:00 2001 From: mike dupont Date: Sat, 7 Dec 2024 16:01:00 -0500 Subject: [PATCH 3/3] remove the agent api renamed to main --- api/agent_api.py | 632 ----------------------------------------------- 1 file changed, 632 deletions(-) delete mode 100644 api/agent_api.py diff --git a/api/agent_api.py b/api/agent_api.py deleted file mode 100644 index 83d051015..000000000 --- a/api/agent_api.py +++ /dev/null @@ -1,632 +0,0 @@ -import os -from fastapi import ( - FastAPI, - HTTPException, - status, - Query, - BackgroundTasks, -) -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel, Field -from typing import Optional, Dict, Any, List -from loguru import logger -import uvicorn -from datetime import datetime, timedelta -from uuid import UUID, uuid4 -from enum import Enum -from pathlib import Path -from concurrent.futures import ThreadPoolExecutor -import traceback - -from swarms import Agent -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configure Loguru -logger.add( - "logs/api_{time}.log", - rotation="500 MB", - retention="10 days", - level="INFO", - format="{time} {level} {message}", - backtrace=True, - diagnose=True, -) - - -class AgentStatus(str, Enum): - """Enum for agent status.""" - - IDLE = "idle" - PROCESSING = "processing" - ERROR = "error" - MAINTENANCE = "maintenance" - - -class AgentConfig(BaseModel): - """Configuration model for creating a new agent.""" - - agent_name: str = Field(..., description="Name of the agent") - model_name: str = Field( - ..., - description="Name of the llm you want to use provided by litellm", - ) - description: str = Field( - default="", description="Description of the agent's purpose" - ) - system_prompt: str = Field( - ..., description="System prompt for the agent" - ) - model_name: str = Field( - default="gpt-4", description="Model name to use" - ) - temperature: float = Field( - default=0.1, - ge=0.0, - le=2.0, - description="Temperature for the model", - ) - max_loops: int = Field( - default=1, ge=1, description="Maximum number of loops" - ) - autosave: bool = Field( - default=True, description="Enable autosave" - ) - dashboard: bool = Field( - default=False, description="Enable dashboard" - ) - verbose: bool = Field( - default=True, description="Enable verbose output" - ) - dynamic_temperature_enabled: bool = Field( - default=True, description="Enable dynamic temperature" - ) - user_name: str = Field( - default="default_user", description="Username for the agent" - ) - retry_attempts: int = Field( - default=1, ge=1, description="Number of retry attempts" - ) - context_length: int = Field( - default=200000, ge=1000, description="Context length" - ) - output_type: str = Field( - default="string", description="Output type (string or json)" - ) - streaming_on: bool = Field( - default=False, description="Enable streaming" - ) - tags: List[str] = Field( - default_factory=list, - description="Tags for categorizing the agent", - ) - - -class AgentUpdate(BaseModel): - """Model for updating agent configuration.""" - - description: Optional[str] = None - system_prompt: Optional[str] = None - temperature: Optional[float] = None - max_loops: Optional[int] = None - tags: Optional[List[str]] = None - status: Optional[AgentStatus] = None - - -class AgentSummary(BaseModel): - """Summary model for agent listing.""" - - agent_id: UUID - agent_name: str - description: str - created_at: datetime - last_used: datetime - total_completions: int - tags: List[str] - status: AgentStatus - - -class AgentMetrics(BaseModel): - """Model for agent performance metrics.""" - - total_completions: int - average_response_time: float - error_rate: float - last_24h_completions: int - total_tokens_used: int - uptime_percentage: float - success_rate: float - peak_tokens_per_minute: int - - -class CompletionRequest(BaseModel): - """Model for completion requests.""" - - prompt: str = Field(..., description="The prompt to process") - agent_id: UUID = Field(..., description="ID of the agent to use") - max_tokens: Optional[int] = Field( - None, description="Maximum tokens to generate" - ) - temperature_override: Optional[float] = None - stream: bool = Field( - default=False, description="Enable streaming response" - ) - - -class CompletionResponse(BaseModel): - """Model for completion responses.""" - - agent_id: UUID - response: str - metadata: Dict[str, Any] - timestamp: datetime - processing_time: float - token_usage: Dict[str, int] - - -class AgentStore: - """Enhanced store for managing agents.""" - - def __init__(self): - self.agents: Dict[UUID, Agent] = {} - self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} - self.executor = ThreadPoolExecutor(max_workers=4) - self._ensure_directories() - - def _ensure_directories(self): - """Ensure required directories exist.""" - Path("logs").mkdir(exist_ok=True) - Path("states").mkdir(exist_ok=True) - - async def create_agent(self, config: AgentConfig) -> UUID: - """Create a new agent with the given configuration.""" - try: - - agent = Agent( - agent_name=config.agent_name, - system_prompt=config.system_prompt, - model_name=config.model_name, - max_loops=config.max_loops, - autosave=config.autosave, - dashboard=config.dashboard, - verbose=config.verbose, - dynamic_temperature_enabled=config.dynamic_temperature_enabled, - saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", - user_name=config.user_name, - retry_attempts=config.retry_attempts, - context_length=config.context_length, - return_step_meta=True, - output_type="str", - streaming_on=config.streaming_on, - ) - - agent_id = uuid4() - self.agents[agent_id] = agent - self.agent_metadata[agent_id] = { - "description": config.description, - "created_at": datetime.utcnow(), - "last_used": datetime.utcnow(), - "total_completions": 0, - "tags": config.tags, - "total_tokens": 0, - "error_count": 0, - "response_times": [], - "status": AgentStatus.IDLE, - "start_time": datetime.utcnow(), - "downtime": timedelta(), - "successful_completions": 0, - } - - logger.info(f"Created agent with ID: {agent_id}") - return agent_id - - except Exception as e: - logger.error(f"Error creating agent: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to create agent: {str(e)}", - ) - - async def get_agent(self, agent_id: UUID) -> Agent: - """Retrieve an agent by ID.""" - agent = self.agents.get(agent_id) - if not agent: - logger.error(f"Agent not found: {agent_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - return agent - - async def update_agent( - self, agent_id: UUID, update: AgentUpdate - ) -> None: - """Update agent configuration.""" - agent = await self.get_agent(agent_id) - metadata = self.agent_metadata[agent_id] - - if update.system_prompt: - agent.system_prompt = update.system_prompt - if update.temperature is not None: - agent.llm.temperature = update.temperature - if update.max_loops is not None: - agent.max_loops = update.max_loops - if update.tags is not None: - metadata["tags"] = update.tags - if update.description is not None: - metadata["description"] = update.description - if update.status is not None: - metadata["status"] = update.status - if update.status == AgentStatus.MAINTENANCE: - metadata["downtime"] += ( - datetime.utcnow() - metadata["last_used"] - ) - - logger.info(f"Updated agent {agent_id}") - - async def list_agents( - self, - tags: Optional[List[str]] = None, - status: Optional[AgentStatus] = None, - ) -> List[AgentSummary]: - """List all agents, optionally filtered by tags and status.""" - summaries = [] - for agent_id, agent in self.agents.items(): - metadata = self.agent_metadata[agent_id] - - # Apply filters - if tags and not any( - tag in metadata["tags"] for tag in tags - ): - continue - if status and metadata["status"] != status: - continue - - summaries.append( - AgentSummary( - agent_id=agent_id, - agent_name=agent.agent_name, - description=metadata["description"], - created_at=metadata["created_at"], - last_used=metadata["last_used"], - total_completions=metadata["total_completions"], - tags=metadata["tags"], - status=metadata["status"], - ) - ) - return summaries - - async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics: - """Get performance metrics for an agent.""" - metadata = self.agent_metadata[agent_id] - response_times = metadata["response_times"] - - # Calculate metrics - total_time = datetime.utcnow() - metadata["start_time"] - uptime = total_time - metadata["downtime"] - uptime_percentage = ( - uptime.total_seconds() / total_time.total_seconds() - ) * 100 - - success_rate = ( - metadata["successful_completions"] - / metadata["total_completions"] - * 100 - if metadata["total_completions"] > 0 - else 0 - ) - - return AgentMetrics( - total_completions=metadata["total_completions"], - average_response_time=( - sum(response_times) / len(response_times) - if response_times - else 0 - ), - error_rate=( - metadata["error_count"] - / metadata["total_completions"] - if metadata["total_completions"] > 0 - else 0 - ), - last_24h_completions=sum( - 1 - for t in response_times - if (datetime.utcnow() - t).days < 1 - ), - total_tokens_used=metadata["total_tokens"], - uptime_percentage=uptime_percentage, - success_rate=success_rate, - peak_tokens_per_minute=max( - metadata.get("tokens_per_minute", [0]) - ), - ) - - async def clone_agent( - self, agent_id: UUID, new_name: str - ) -> UUID: - """Clone an existing agent with a new name.""" - original_agent = await self.get_agent(agent_id) - original_metadata = self.agent_metadata[agent_id] - - config = AgentConfig( - agent_name=new_name, - description=f"Clone of {original_agent.agent_name}", - system_prompt=original_agent.system_prompt, - model_name=original_agent.llm.model_name, - temperature=original_agent.llm.temperature, - max_loops=original_agent.max_loops, - tags=original_metadata["tags"], - ) - - return await self.create_agent(config) - - async def delete_agent(self, agent_id: UUID) -> None: - """Delete an agent.""" - if agent_id not in self.agents: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - - # Clean up any resources - agent = self.agents[agent_id] - if agent.autosave and os.path.exists(agent.saved_state_path): - os.remove(agent.saved_state_path) - - del self.agents[agent_id] - del self.agent_metadata[agent_id] - logger.info(f"Deleted agent {agent_id}") - - async def process_completion( - self, - agent: Agent, - prompt: str, - agent_id: UUID, - max_tokens: Optional[int] = None, - temperature_override: Optional[float] = None, - ) -> CompletionResponse: - """Process a completion request using the specified agent.""" - start_time = datetime.utcnow() - metadata = self.agent_metadata[agent_id] - - try: - # Update agent status - metadata["status"] = AgentStatus.PROCESSING - metadata["last_used"] = start_time - - # Apply temporary overrides if specified - original_temp = agent.llm.temperature - if temperature_override is not None: - agent.llm.temperature = temperature_override - - # Process the completion - response = agent.run(prompt) - - # Reset overrides - if temperature_override is not None: - agent.llm.temperature = original_temp - - # Update metrics - processing_time = ( - datetime.utcnow() - start_time - ).total_seconds() - metadata["response_times"].append(processing_time) - metadata["total_completions"] += 1 - metadata["successful_completions"] += 1 - - # Estimate token usage (this is a rough estimate) - prompt_tokens = len(prompt.split()) * 1.3 - completion_tokens = len(response.split()) * 1.3 - total_tokens = int(prompt_tokens + completion_tokens) - metadata["total_tokens"] += total_tokens - - # Update tokens per minute tracking - current_minute = datetime.utcnow().replace( - second=0, microsecond=0 - ) - if "tokens_per_minute" not in metadata: - metadata["tokens_per_minute"] = {} - metadata["tokens_per_minute"][current_minute] = ( - metadata["tokens_per_minute"].get(current_minute, 0) - + total_tokens - ) - - return CompletionResponse( - agent_id=agent_id, - response=response, - metadata={ - "agent_name": agent.agent_name, - "model_name": agent.llm.model_name, - "temperature": agent.llm.temperature, - }, - timestamp=datetime.utcnow(), - processing_time=processing_time, - token_usage={ - "prompt_tokens": int(prompt_tokens), - "completion_tokens": int(completion_tokens), - "total_tokens": total_tokens, - }, - ) - - except Exception as e: - metadata["error_count"] += 1 - metadata["status"] = AgentStatus.ERROR - logger.error( - f"Error in completion processing: {str(e)}\n{traceback.format_exc()}" - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error processing completion: {str(e)}", - ) - finally: - metadata["status"] = AgentStatus.IDLE - - -class SwarmsAPI: - """Enhanced API class for Swarms agent integration.""" - - def __init__(self): - self.app = FastAPI( - title="Swarms Agent API", - description="Production-grade API for Swarms agent interaction", - version="1.0.0", - docs_url="/v1/docs", - redoc_url="/v1/redoc", - ) - self.store = AgentStore() - # Configure CORS - self.app.add_middleware( - CORSMiddleware, - allow_origins=[ - "*" - ], # Configure appropriately for production - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - self._setup_routes() - - def _setup_routes(self): - """Set up API routes.""" - - @self.app.post("/v1/agent", response_model=Dict[str, UUID]) - async def create_agent(config: AgentConfig): - """Create a new agent with the specified configuration.""" - agent_id = await self.store.create_agent(config) - return {"agent_id": agent_id} - - @self.app.get("/v1/agents", response_model=List[AgentSummary]) - async def list_agents( - tags: Optional[List[str]] = Query(None), - status: Optional[AgentStatus] = None, - ): - """List all agents, optionally filtered by tags and status.""" - return await self.store.list_agents(tags, status) - - @self.app.patch( - "/v1/agent/{agent_id}", response_model=Dict[str, str] - ) - async def update_agent(agent_id: UUID, update: AgentUpdate): - """Update an existing agent's configuration.""" - await self.store.update_agent(agent_id, update) - return {"status": "updated"} - - @self.app.get( - "/v1/agent/{agent_id}/metrics", - response_model=AgentMetrics, - ) - async def get_agent_metrics(agent_id: UUID): - """Get performance metrics for a specific agent.""" - return await self.store.get_agent_metrics(agent_id) - - @self.app.post( - "/v1/agent/{agent_id}/clone", - response_model=Dict[str, UUID], - ) - async def clone_agent(agent_id: UUID, new_name: str): - """Clone an existing agent with a new name.""" - new_id = await self.store.clone_agent(agent_id, new_name) - return {"agent_id": new_id} - - @self.app.delete("/v1/agent/{agent_id}") - async def delete_agent(agent_id: UUID): - """Delete an agent.""" - await self.store.delete_agent(agent_id) - return {"status": "deleted"} - - @self.app.post( - "/v1/agent/completions", response_model=CompletionResponse - ) - async def create_completion( - request: CompletionRequest, - background_tasks: BackgroundTasks, - ): - """Process a completion request with the specified agent.""" - try: - agent = await self.store.get_agent(request.agent_id) - - # Process completion - response = await self.store.process_completion( - agent, - request.prompt, - request.agent_id, - request.max_tokens, - request.temperature_override, - ) - - # Schedule background cleanup - background_tasks.add_task( - self._cleanup_old_metrics, request.agent_id - ) - - return response - - except Exception as e: - logger.error(f"Error processing completion: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error processing completion: {str(e)}", - ) - - @self.app.get("/v1/agent/{agent_id}/status") - async def get_agent_status(agent_id: UUID): - """Get the current status of an agent.""" - metadata = self.store.agent_metadata.get(agent_id) - if not metadata: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - return { - "agent_id": agent_id, - "status": metadata["status"], - "last_used": metadata["last_used"], - "total_completions": metadata["total_completions"], - "error_count": metadata["error_count"], - } - - async def _cleanup_old_metrics(self, agent_id: UUID): - """Clean up old metrics data to prevent memory bloat.""" - metadata = self.store.agent_metadata.get(agent_id) - if metadata: - # Keep only last 24 hours of response times - cutoff = datetime.utcnow() - timedelta(days=1) - metadata["response_times"] = [ - t - for t in metadata["response_times"] - if isinstance(t, (int, float)) - and t > cutoff.timestamp() - ] - - # Clean up old tokens per minute data - if "tokens_per_minute" in metadata: - metadata["tokens_per_minute"] = { - k: v - for k, v in metadata["tokens_per_minute"].items() - if k > cutoff - } - - -def create_app() -> FastAPI: - """Create and configure the FastAPI application.""" - api = SwarmsAPI() - return api.app - - -if __name__ == "__main__": - # Configure uvicorn logging - print("in main") - logger.info("API Starting") - uvicorn.run( - "main:create_app", - host="0.0.0.0", - port=8000, - reload=True, - workers=4, - ) -else: - print("not in main")