From 34aa6484e424fbdf5b9b67dba0b4411e125d3b48 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 11 Nov 2024 22:25:18 +0530 Subject: [PATCH] create the makim scheduler --- poetry.lock | 14 +- pyproject.toml | 2 +- src/makim/cli/__init__.py | 254 ++++++++++++++++++++++++++---- src/makim/core.py | 7 + src/makim/logs.py | 3 + src/makim/scheduler.py | 319 ++++++++++++++++++++++++++++++++++++++ src/makim/schema.json | 29 ++++ 7 files changed, 594 insertions(+), 34 deletions(-) create mode 100644 src/makim/scheduler.py diff --git a/poetry.lock b/poetry.lock index 9f75758..d3224a3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -198,14 +198,14 @@ dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"] [[package]] name = "bandit" -version = "1.8.0" +version = "1.8.2" description = "Security oriented static analyser for python code." optional = false python-versions = ">=3.9" groups = ["dev"] files = [ - {file = "bandit-1.8.0-py3-none-any.whl", hash = "sha256:b1a61d829c0968aed625381e426aa378904b996529d048f8d908fa28f6b13e38"}, - {file = "bandit-1.8.0.tar.gz", hash = "sha256:b5bfe55a095abd9fe20099178a7c6c060f844bfd4fe4c76d28e35e4c52b9d31e"}, + {file = "bandit-1.8.2-py3-none-any.whl", hash = "sha256:df6146ad73dd30e8cbda4e29689ddda48364e36ff655dbfc86998401fcf1721f"}, + {file = "bandit-1.8.2.tar.gz", hash = "sha256:e00ad5a6bc676c0954669fe13818024d66b70e42cf5adb971480cf3b671e835f"}, ] [package.dependencies] @@ -1840,14 +1840,14 @@ min-versions = ["babel (==2.9.0)", "click (==7.0)", "colorama (==0.4)", "ghp-imp [[package]] name = "mkdocs-autorefs" -version = "1.2.0" +version = "1.3.0" description = "Automatically link across pages in MkDocs." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["dev"] files = [ - {file = "mkdocs_autorefs-1.2.0-py3-none-any.whl", hash = "sha256:d588754ae89bd0ced0c70c06f58566a4ee43471eeeee5202427da7de9ef85a2f"}, - {file = "mkdocs_autorefs-1.2.0.tar.gz", hash = "sha256:a86b93abff653521bda71cf3fc5596342b7a23982093915cb74273f67522190f"}, + {file = "mkdocs_autorefs-1.3.0-py3-none-any.whl", hash = "sha256:d180f9778a04e78b7134e31418f238bba56f56d6a8af97873946ff661befffb3"}, + {file = "mkdocs_autorefs-1.3.0.tar.gz", hash = "sha256:6867764c099ace9025d6ac24fd07b85a98335fbd30107ef01053697c8f46db61"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index fc6e93d..627ed05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -121,7 +121,7 @@ quote-style = "single" [tool.bandit] exclude_dirs = ["tests"] targets = "src/makim/" -skips = ["B102", "B701", "B507", "B601"] +skips = ["B102", "B701", "B507", "B601", "B603"] [tool.vulture] exclude = ["tests"] diff --git a/src/makim/cli/__init__.py b/src/makim/cli/__init__.py index 4860c90..821e411 100644 --- a/src/makim/cli/__init__.py +++ b/src/makim/cli/__init__.py @@ -1,4 +1,4 @@ -"""Cli functions to define the arguments and to call Makim.""" +"""CLI functions to define the arguments and call Makim.""" from __future__ import annotations @@ -9,6 +9,9 @@ import typer +from rich.console import Console +from rich.table import Table + from makim import __version__ from makim.cli.auto_generator import ( create_dynamic_command, @@ -60,7 +63,7 @@ def main( help='Execute the command in verbose mode', ), ) -> None: - """Process envers for specific flags, otherwise show the help menu.""" + """Process top-level flags; otherwise, show the help menu.""" typer.echo(f'Makim file: {file}') if version: @@ -96,10 +99,226 @@ def _get_command_from_cli() -> str: return command +def _create_cron_table() -> Table: + """Create a table for displaying scheduled tasks.""" + table = Table(show_header=True, header_style='bold magenta') + table.add_column('Name', style='cyan') + table.add_column('Task', style='blue') + table.add_column('Schedule', style='yellow') + table.add_column('Status', style='green') + table.add_column('Next Run', style='magenta') + return table + + +def _handle_cron_list(makim_instance: Makim) -> None: + """Handle the cron list command.""" + scheduled_tasks = makim_instance.global_data.get('scheduler', {}) + + if not scheduled_tasks: + typer.echo('No scheduled tasks configured in .makim.yaml') + return + + console = Console() + table = _create_cron_table() + + active_jobs = { + job['name']: job + for job in ( + makim_instance.scheduler.list_jobs() + if makim_instance.scheduler + else [] + ) + } + + for name, config in scheduled_tasks.items(): + active_job = active_jobs.get(name) + status = 'Active' if active_job else 'Inactive' + next_run = ( + active_job['next_run_time'] if active_job else 'Not scheduled' + ) + + table.add_row( + name, + config.get('task', 'N/A'), + config.get('schedule', 'N/A'), + status, + next_run or 'Not scheduled', + ) + + console.print(table) + + +def _handle_cron_start( + makim_instance: Makim, + name: str | None, + all_jobs: bool, +) -> None: + """Handle the cron start command.""" + if not makim_instance.scheduler: + typer.echo('No scheduler configured.') + return + + scheduled_tasks = makim_instance.global_data.get('scheduler', {}) + + if all_jobs: + success_count = 0 + error_count = 0 + for schedule_name, schedule_config in scheduled_tasks.items(): + try: + makim_instance.scheduler.add_job( + name=schedule_name, + schedule=schedule_config['schedule'], + task=schedule_config['task'], + args=schedule_config.get('args', {}), + ) + success_count += 1 + typer.echo(f"Successfully started schedule '{schedule_name}'") + except Exception as e: + error_count += 1 + typer.echo( + f"Failed to start schedule '{schedule_name}': {e}", + err=True, + ) + + typer.echo( + f'\nSummary: {success_count} jobs started successfully, ' + f'{error_count} failed' + ) + return + + if not name: + typer.echo("Please provide a scheduler name or use '--all' flag") + raise typer.Exit(1) + + try: + schedule_config = scheduled_tasks.get(name) + if not schedule_config: + typer.echo(f"No configuration found for schedule '{name}'") + return + + makim_instance.scheduler.add_job( + name=name, + schedule=schedule_config['schedule'], + task=schedule_config['task'], + args=schedule_config.get('args', {}), + ) + typer.echo(f"Successfully started schedule '{name}'") + except Exception as e: + typer.echo(f"Failed to start schedule '{name}': {e}", err=True) + + +def _handle_cron_stop( + makim_instance: Makim, + name: str | None, + all_jobs: bool, +) -> None: + """Handle the cron stop command.""" + if not makim_instance.scheduler: + typer.echo('No scheduler configured.') + return + + if all_jobs: + active_jobs = makim_instance.scheduler.list_jobs() + success_count = 0 + error_count = 0 + + for job in active_jobs: + try: + makim_instance.scheduler.remove_job(job['name']) + success_count += 1 + typer.echo(f"Successfully stopped schedule '{job['name']}'") + except Exception as e: + error_count += 1 + typer.echo( + f"Failed to stop schedule '{job['name']}': {e}", + err=True, + ) + + typer.echo( + f'\nSummary: {success_count} jobs stopped successfully, ' + f'{error_count} failed' + ) + return + + if not name: + typer.echo("Please provide a scheduler name or use '--all' flag") + raise typer.Exit(1) + + try: + makim_instance.scheduler.remove_job(name) + typer.echo(f"Successfully stopped schedule '{name}'") + except Exception as e: + typer.echo(f"Failed to stop schedule '{name}': {e}", err=True) + + +def _handle_cron_commands(makim_instance: Makim) -> typer.Typer: + """Create and handle cron-related commands. + + Returns + ------- + typer.Typer: The cron command group with all subcommands. + """ + typer_cron = typer.Typer( + help='Tasks Scheduler', + invoke_without_command=True, + ) + + if 'scheduler' in makim_instance.global_data: + for schedule_name, schedule_params in makim_instance.global_data.get( + 'scheduler', {} + ).items(): + create_dynamic_command_cron( + makim_instance, + typer_cron, + schedule_name, + schedule_params or {}, + ) + + @typer_cron.command(help='List all scheduled tasks') + def list() -> None: + """List tasks defined in .makim.yaml and their current status.""" + _handle_cron_list(makim_instance) + + @typer_cron.command(help='Start a scheduler by its name') + def start( + name: str = typer.Argument( + None, + help="""Name of the scheduler to start. + Use '--all' for all schedulers""", + ), + all: bool = typer.Option( + False, + '--all', + help='Start all available schedulers', + is_flag=True, + ), + ) -> None: + """Start (enable) a scheduled task.""" + _handle_cron_start(makim_instance, name, all) + + @typer_cron.command(help='Stop a scheduler by its name') + def stop( + name: str = typer.Argument( + None, + help="""Name of the scheduler to stop. + Use '--all' for all schedulers""", + ), + all: bool = typer.Option( + False, + '--all', + help='Stop all running schedulers', + is_flag=True, + ), + ) -> None: + """Stop (disable) scheduled task(s).""" + _handle_cron_stop(makim_instance, name, all) + + return typer_cron + + def run_app() -> None: - """Run the typer app.""" + """Run the Typer app.""" root_config = extract_root_config() - config_file_path = cast(str, root_config.get('file', '.makim.yaml')) cli_completion_words = [ @@ -107,7 +326,6 @@ def run_app() -> None: ] if not makim._check_makim_file(config_file_path) and cli_completion_words: - # autocomplete call root_config = extract_root_config(cli_completion_words) config_file_path = cast(str, root_config.get('file', '.makim.yaml')) if not makim._check_makim_file(config_file_path): @@ -119,43 +337,28 @@ def run_app() -> None: verbose=cast(bool, root_config.get('verbose', False)), ) - # create tasks data tasks: dict[str, Any] = {} for group_name, group_data in makim.global_data.get('groups', {}).items(): for task_name, task_data in group_data.get('tasks', {}).items(): tasks[f'{group_name}.{task_name}'] = task_data - # Add dynamically cron commands to Typer app - if 'scheduler' in makim.global_data: - typer_cron = typer.Typer( - help='Tasks Scheduler', - invoke_without_command=True, - ) + # Add cron commands if scheduler is configured + typer_cron = _handle_cron_commands(makim) + app.add_typer(typer_cron, name='cron', rich_help_panel='Extensions') - for schedule_name, schedule_params in makim.global_data.get( - 'scheduler', {} - ).items(): - create_dynamic_command_cron( - makim, typer_cron, schedule_name, schedule_params or {} - ) - - # Add cron command - app.add_typer(typer_cron, name='cron', rich_help_panel='Extensions') - - # Add dynamically commands to Typer app + # Add dynamic commands for name, args in tasks.items(): create_dynamic_command(makim, app, name, args) try: app() except SystemExit as e: - # code 2 means code not found + # Code 2 means command not found error_code = 2 if e.code != error_code: raise e command_used = _get_command_from_cli() - available_cmds = [ cmd.name for cmd in app.registered_commands if cmd.name is not None ] @@ -165,7 +368,6 @@ def run_app() -> None: f"Command {command_used} not found. Did you mean '{suggestion}'?", fg='red', ) - raise e diff --git a/src/makim/core.py b/src/makim/core.py index 7fab6dc..95ae1b0 100644 --- a/src/makim/core.py +++ b/src/makim/core.py @@ -33,6 +33,7 @@ from makim.console import get_terminal_size from makim.logs import MakimError, MakimLogs +from makim.scheduler import MakimScheduler MAKIM_CURRENT_PATH = Path(__file__).parent @@ -132,6 +133,7 @@ class Makim: task_name: str = '' task_data: dict[str, Any] = {} ssh_config: dict[str, Any] = {} + scheduler: Optional[MakimScheduler] = None def __init__(self) -> None: """Prepare the Makim class with the default configuration.""" @@ -145,6 +147,7 @@ def __init__(self) -> None: self.shell_app = sh.xonsh self.shell_args: list[str] = [] self.tmp_suffix: str = '.makim' + self.scheduler = None def _call_shell_app(self, cmd: str) -> None: self._load_shell_app() @@ -386,6 +389,10 @@ def _load_config_data(self) -> None: self._validate_config() + if 'scheduler' in self.global_data: + if self.scheduler is None: + self.scheduler = MakimScheduler(self) + def _resolve_working_directory(self, scope: str) -> Optional[Path]: scope_options = ('global', 'group', 'task') if scope not in scope_options: diff --git a/src/makim/logs.py b/src/makim/logs.py index fb025ec..4b0009f 100644 --- a/src/makim/logs.py +++ b/src/makim/logs.py @@ -28,6 +28,9 @@ class MakimError(Enum): SSH_CONNECTION_ERROR = 16 SSH_EXECUTION_ERROR = 17 REMOTE_HOST_NOT_FOUND = 18 + SCHEDULER_JOB_ERROR = 19 + SCHEDULER_JOB_NOT_FOUND = 20 + SCHEDULER_INVALID_SCHEDULE = 21 class MakimLogs: diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py new file mode 100644 index 0000000..c793fe5 --- /dev/null +++ b/src/makim/scheduler.py @@ -0,0 +1,319 @@ +"""Module for handling task scheduling in Makim.""" + +from __future__ import annotations + +import json +import shlex +import subprocess # nosec B404 - subprocess is required for task execution + +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional, cast + +from apscheduler.job import Job +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + + +class Config: + """Global configuration state handler.""" + + config_file: Optional[str] = None + job_history_path: Optional[Path] = None + + +def init_globals(config_file: str, history_path: Path) -> None: + """Initialize global variables needed for job execution.""" + Config.config_file = config_file + Config.job_history_path = history_path + + +def _sanitize_command(cmd_list: list[str]) -> list[str]: + """Sanitize command arguments to prevent command injection.""" + return [shlex.quote(str(arg)) for arg in cmd_list] + + +def log_execution( + name: str, + event: str, + result: Optional[str] = None, + error: Optional[str] = None, +) -> None: + """Log execution details. + + Args: + name: The scheduler name (e.g. 'hourly_status') + event: Event ('scheduled', 'execution_completed', 'execution_failed') + result: Output from task execution + error: Error message if execution failed + """ + if Config.job_history_path is None: + raise RuntimeError('Job history path not initialized') + + try: + # Load existing history + if Config.job_history_path.exists(): + with open(Config.job_history_path, 'r') as f: + history = json.load(f) + else: + history = {} + + current_time = datetime.now().isoformat() + + # Create the scheduler entry if it doesn't exist + if name not in history: + history[name] = { + 'scheduled_timestamp': current_time, + 'event': 'scheduled', + 'execution_timestamp': None, + 'output': None, + 'error': None, + 'task': None, # Add task field to track the associated task + } + + entry = history[name] + + if event == 'scheduled': + # Only update scheduling information + entry.update({'scheduled_timestamp': current_time, 'event': event}) + else: # execution_completed or execution_failed + # Preserve the scheduled timestamp and update execution details + entry.update( + { + 'execution_timestamp': current_time, + 'event': event, + 'output': result, + 'error': error, + } + ) + + # Save updated history + with open(Config.job_history_path, 'w') as f: + json.dump(history, f, indent=2) + + except Exception as e: + print(f'Failed to log execution: {e}') + + +def run_makim_task(task: str, args: Optional[Dict[str, Any]] = None) -> None: + """Standalone function to execute a Makim task.""" + if Config.config_file is None or Config.job_history_path is None: + raise RuntimeError('Global configuration not initialized') + + # Extract scheduler name from the task path + scheduler_name = None + if Config.job_history_path.exists(): + with open(Config.job_history_path, 'r') as f: + history = json.load(f) + # Find the scheduler entry that matches this task + for name, entry in history.items(): + if entry.get('task') == task: + scheduler_name = name + break + + if not scheduler_name: + # Fallback to task name if scheduler name not found + scheduler_name = task.split('.')[-1] + + # Build base command with known safe values + cmd = ['makim', '--file', Config.config_file, task] + + if args: + for key, value in args.items(): + safe_key = str(key) + if isinstance(value, bool): + if value: + cmd.append(f'--{safe_key}') + else: + cmd.extend([f'--{safe_key}', str(value)]) + + safe_cmd = _sanitize_command(cmd) + + try: + # nosec B603 - we've sanitized the input and are not using shell=True + result = subprocess.run( + safe_cmd, + capture_output=True, + text=True, + check=True, + ) + # Log successful execution + log_execution( + scheduler_name, 'execution_completed', result=result.stdout + ) + + except subprocess.CalledProcessError as e: + error_msg = ( + f'Job execution failed:\nSTDERR: {e.stderr}\nSTDOUT: {e.stdout}' + ) + # Log failed execution + log_execution(scheduler_name, 'execution_failed', error=error_msg) + raise + + +class MakimScheduler: + """Handles task scheduling for Makim.""" + + def __init__(self, makim_instance: Any): + """Initialize the scheduler with configuration.""" + self.config_file = makim_instance.file + self.scheduler = None + self.job_store_path = Path.home() / '.makim' / 'jobs.sqlite' + self.job_history_path = Path.home() / '.makim' / 'history.json' + self._setup_directories() + self._initialize_scheduler() + self.job_history: Dict[str, list[Dict[str, Any]]] = ( + self._load_history() + ) + + init_globals(self.config_file, self.job_history_path) + + self._sync_jobs_with_config( + makim_instance.global_data.get('scheduler', {}) + ) + + def _sync_jobs_with_config(self, config_jobs: Dict[str, Any]) -> None: + """Synchronize scheduler jobs with current config file.""" + if not self.scheduler: + return + + # Use job IDs instead of Job objects + current_job_ids = {job.id for job in self.scheduler.get_jobs()} + config_job_ids = set(config_jobs.keys()) + + # Remove jobs not in current config + for job_id in current_job_ids - config_job_ids: + self.scheduler.remove_job(job_id) + + # Clear history for removed jobs + self.job_history = { + name: history + for name, history in self.job_history.items() + if name in config_job_ids + } + self._save_history() + + def _setup_directories(self) -> None: + """Create necessary directories for job storage.""" + self.job_store_path.parent.mkdir(parents=True, exist_ok=True) + + def _initialize_scheduler(self) -> None: + """Initialize the APScheduler with SQLite backend.""" + jobstores = { + 'default': SQLAlchemyJobStore( + url=f'sqlite:///{self.job_store_path}' + ) + } + self.scheduler = BackgroundScheduler(jobstores=jobstores) + if self.scheduler is not None: + self.scheduler.start() + + def _load_history(self) -> Dict[str, list[Dict[str, Any]]]: + """Load job execution history from file.""" + if self.job_history_path.exists(): + with open(self.job_history_path, 'r') as f: + loaded_history = json.load(f) + # Ensure the loaded data matches the expected type + if not isinstance(loaded_history, dict): + return {} + return cast(Dict[str, list[Dict[str, Any]]], loaded_history) + return {} + + def _save_history(self) -> None: + """Save job execution history to file.""" + with open(self.job_history_path, 'w') as f: + json.dump(self.job_history, f) + + def add_job( + self, + name: str, + schedule: str, + task: str, + args: Optional[Dict[str, Any]] = None, + ) -> None: + """Add a new scheduled job.""" + if not self.scheduler: + raise RuntimeError('Scheduler not initialized') + + try: + # Create trigger from schedule + trigger = CronTrigger.from_crontab(schedule) + + # Log the scheduling event first + log_execution(name, 'scheduled') + + # Add the job using the module-level function + self.scheduler.add_job( + func='makim.scheduler:run_makim_task', + trigger=trigger, + args=[task], + kwargs={'args': args or {}}, + id=name, + name=name, + replace_existing=True, + misfire_grace_time=None, + ) + + except Exception as e: + log_execution(name, 'schedule_failed', error=str(e)) + raise + + def remove_job(self, name: str) -> None: + """Remove a scheduled job.""" + if not self.scheduler: + raise RuntimeError('Scheduler not initialized') + + try: + self.scheduler.remove_job(name) + log_execution(name, 'removed') + except Exception as e: + log_execution(name, 'remove_failed', error=str(e)) + raise + + def get_job(self, name: str) -> Optional[Job]: + """Get a job by name.""" + if not self.scheduler: + raise RuntimeError('Scheduler not initialized') + + return self.scheduler.get_job(name) + + def list_jobs(self) -> list[Dict[str, Any]]: + """List all scheduled jobs.""" + if not self.scheduler: + raise RuntimeError('Scheduler not initialized') + + jobs = [] + for job in self.scheduler.get_jobs(): + job_info = { + 'id': job.id, + 'name': job.name, + 'next_run_time': job.next_run_time.isoformat() + if job.next_run_time + else None, + 'schedule': str(job.trigger), + } + jobs.append(job_info) + return jobs + + def get_job_status(self, name: str) -> Dict[str, Any]: + """Get detailed status of a specific job.""" + job = self.get_job(name) + if not job: + return {'error': 'Job not found'} + + history = self.job_history.get(name, []) + + return { + 'name': name, + 'next_run_time': job.next_run_time.isoformat() + if job.next_run_time + else None, + 'schedule': str(job.trigger), + 'history': history, + } + + def shutdown(self) -> None: + """Shutdown the scheduler.""" + if self.scheduler: + self.scheduler.shutdown() diff --git a/src/makim/schema.json b/src/makim/schema.json index ba3bafb..c484dfc 100644 --- a/src/makim/schema.json +++ b/src/makim/schema.json @@ -270,6 +270,35 @@ } }, "additionalProperties": false + }, + "scheduler": { + "type": "object", + "description": "Scheduler configuration for tasks", + "patternProperties": { + "^[a-zA-Z0-9_-]+$": { + "type": "object", + "required": ["schedule", "task"], + "properties": { + "schedule": { + "type": "string", + "description": "Cron-style schedule for the task (e.g., '* * * * *' for every minute)" + }, + "task": { + "type": "string", + "description": "Full task path in the format 'group.task'" + }, + "args": { + "type": "object", + "description": "Arguments to pass to the scheduled task", + "additionalProperties": { + "type": ["string", "boolean"] + } + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false } }, "additionalProperties": false