From 196659d5f1151609340e6704e5fd10d3d835debd Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Wed, 12 Jun 2024 11:51:22 +0000 Subject: [PATCH] Replace print by the Python logging framework We use the logger "fishtest" as configured in production.ini. For messages to the event log we use the child logger "fishtest.event", also configured in "production.ini". Log messages are formatted as follows: INFO [fishtest] [waitress-0:rundb.py:failed_task:1446] <...message...> Also: move scheduler to a separate file scheduler.py. --- server/development.ini | 15 ++- server/fishtest/actiondb.py | 12 +- server/fishtest/api.py | 4 +- server/fishtest/rundb.py | 131 +++++++++----------- server/fishtest/scheduler.py | 223 +++++++++++++++++++++++++++++++++ server/fishtest/userdb.py | 8 +- server/fishtest/util.py | 232 +++-------------------------------- server/fishtest/views.py | 30 +++-- server/production.ini | 18 ++- 9 files changed, 350 insertions(+), 323 deletions(-) create mode 100644 server/fishtest/scheduler.py diff --git a/server/development.ini b/server/development.ini index 4c95d83ef9..4667114ade 100644 --- a/server/development.ini +++ b/server/development.ini @@ -35,10 +35,10 @@ port = 6542 ### [loggers] -keys = root, fishtest +keys = root, fishtest, fishtest.event [handlers] -keys = console +keys = console, events [formatters] keys = generic @@ -52,11 +52,20 @@ level = DEBUG handlers = qualname = fishtest +[logger_fishtest.event] +level = DEBUG +handlers = events +qualname = fishtest.event + [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic +[handler_events] +class = fishtest.utils.EventHandler +formatter = generic + [formatter_generic] -format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s +format = %(levelname)s [%(name)s] [%(threadName)s:%(filename)s:%(funcName)s:%(lineno)s] %(message)s diff --git a/server/fishtest/actiondb.py b/server/fishtest/actiondb.py index 5a8698301f..4c7e267993 100644 --- a/server/fishtest/actiondb.py +++ b/server/fishtest/actiondb.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from fishtest.schemas import action_schema -from fishtest.util import hex_print, worker_name +from fishtest.util import event_log, hex_print, logger, worker_name from pymongo import DESCENDING from vtjson import ValidationError, validate @@ -219,13 +219,7 @@ def insert_action(self, **action): try: validate(action_schema, action, "action") except ValidationError as e: - message = ( - f"Internal Error. Request {str(action)} does not validate: {str(e)}" - ) - print(message, flush=True) - self.log_message( - username="fishtest.system", - message=message, - ) + message = f"Request {str(action)} does not validate: {str(e)}" + event_log.error(message) return self.actions.insert_one(action) diff --git a/server/fishtest/api.py b/server/fishtest/api.py index 071d1c8ffa..f657eb653f 100644 --- a/server/fishtest/api.py +++ b/server/fishtest/api.py @@ -7,7 +7,7 @@ from fishtest.schemas import api_access_schema, api_schema, gzip_data from fishtest.stats.stat_util import SPRT_elo, get_elo -from fishtest.util import strip_run, worker_name +from fishtest.util import event_log, logger, strip_run, worker_name from pyramid.httpexceptions import ( HTTPBadRequest, HTTPException, @@ -69,7 +69,7 @@ def handle_error(self, error, exception=HTTPBadRequest): ) api = urlparse(full_url).path error = f"{api}: {error}" - print(error, flush=True) + logger.info(error) raise exception(self.add_time({"error": error})) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 0e3c105ea5..50c9fe1470 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -1,4 +1,5 @@ import copy +import logging import math import os import random @@ -15,6 +16,7 @@ from bson.errors import InvalidId from bson.objectid import ObjectId from fishtest.actiondb import ActionDb +from fishtest.scheduler import Scheduler from fishtest.schemas import ( RUN_VERSION, active_runs_schema, @@ -37,14 +39,15 @@ from fishtest.userdb import UserDb from fishtest.util import ( GeneratorAsFileReader, - Scheduler, crash_or_time, estimate_game_duration, + event_log, format_results, get_bad_workers, get_chi2, get_hash, get_tc_ratio, + logger, remaining_hours, update_residuals, worker_name, @@ -53,12 +56,10 @@ from pymongo import DESCENDING, MongoClient from vtjson import ValidationError, validate -boot_time = datetime.now(timezone.utc) +class OpenDb: -class RunDb: - - def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): + def __init__(self, db_name="fishtest_new"): # MongoDB server is assumed to be on the same machine, if not user should # use ssh with port forwarding to access the remote host. self.conn = MongoClient(os.getenv("FISHTEST_HOST") or "localhost") @@ -71,6 +72,20 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): self.nndb = self.db["nns"] self.runs = self.db["runs"] self.deltas = self.db["deltas"] + + +class RunDb: + + def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): + opendb = OpenDb(db_name=db_name) + self.userdb = opendb.userdb + self.actiondb = opendb.actiondb + self.workerdb = opendb.workerdb + self.pgndb = opendb.pgndb + self.nndb = opendb.nndb + self.runs = opendb.runs + self.deltas = opendb.deltas + self.port = port self.unfinished_runs = set() self.unfinished_runs_lock = threading.Lock() @@ -106,7 +121,7 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): def schedule_tasks(self): if self.scheduler is None: - self.scheduler = Scheduler(jitter=0.05) + self.scheduler = Scheduler(jitter=0.05, logger=logger) self.scheduler.create_task(1.0, self.flush_buffers, min_delay=1.0) self.scheduler.create_task(60.0, self.clean_cache) self.scheduler.create_task(60.0, self.scavenge_dead_tasks) @@ -121,9 +136,8 @@ def schedule_tasks(self): def validate_data_structures(self): # The main purpose of task is to ensure that the schemas # in schemas.py are kept up-to-date. - print( - "Validate_data_structures: validating Fishtest's internal data structures...", - flush=True, + logger.info( + "Validate_data_structures: validating Fishtest's internal data structures..." ) try: with self.run_cache_lock: @@ -166,11 +180,8 @@ def validate_data_structures(self): ) except ValidationError as e: message = f"Validation of internal data structures failed: {str(e)}" - print(message, flush=True) - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + logger.error(message) + event_log.error(message) def update_itp(self): with self.unfinished_runs_lock: @@ -191,7 +202,7 @@ def clean_wtt_map(self): task = run["tasks"][task_id] if not task["active"]: del self.wtt_map[short_worker_name] - print(f"Clean_wtt_map: {len(self.wtt_map)} active workers...") + logger.info(f"Clean_wtt_map: {len(self.wtt_map)} active workers...") # Do not use this while holding an active_run_lock! def insert_in_wtt_map(self, run, task_id): @@ -220,9 +231,8 @@ def validate_random_run(self): if not cache_entry["run"]["finished"] ] if len(run_list) == 0: - print( + logger.info( "Validate_random_run: no unfinished cache runs. No runs to validate...", - flush=True, ) return run = random.choice(run_list) @@ -232,18 +242,15 @@ def validate_random_run(self): # validating it with self.active_run_lock(run_id): validate(runs_schema, run, "run") - print( + logger.info( f"Validate_random_run: validated cache run {run_id}...", - flush=True, ) except ValidationError as e: message = f"The run object {run_id} does not validate: {str(e)}" - print(message, flush=True) if "version" in run and run["version"] >= RUN_VERSION: - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + event_log.error(message) + else: + logger.info(message) def set_inactive_run(self, run): run_id = str(run["_id"]) @@ -287,7 +294,7 @@ def set_inactive_task(self, task_id, run): if self.connections_counter[remote_addr] == 0: del self.connections_counter[remote_addr] except Exception as e: - print(f"Error while deleting connection: {str(e)}", flush=True) + event_log.error(f"Error while deleting connection: {str(e)}") def set_bad_task(self, task_id, run, residual=None, residual_color=None): zero_stats = { @@ -345,66 +352,59 @@ def update_aggregated_data(self): with self.active_run_lock(run_id): version = run.get("version", 0) if version < RUN_VERSION: - print( - f"Warning: upgrading run {run_id} to version {RUN_VERSION}", - flush=True, + logger.warning( + f"Warning: upgrading run {run_id} to version {RUN_VERSION}" ) run["version"] = RUN_VERSION changed = True results = compute_results(run) if results != run["results"]: - print( + logger.warning( f"Warning: correcting results for {run_id}", f"db: {run['results']} computed:{results}", - flush=True, ) run["results"] = results changed = True cores = compute_cores(run) if cores != run["cores"]: - print( + logger.warning( f"Warning: correcting cores for {run_id}", f"db: {run['cores']} computed:{cores}", - flush=True, ) run["cores"] = cores changed = True workers = compute_workers(run) if workers != run["workers"]: - print( + logger.warning( f"Warning: correcting workers for {run_id}", f"db: {run['workers']} computed:{workers}", - flush=True, ) run["workers"] = workers changed = True committed_games = compute_committed_games(run) committed_games_run = run.get("committed_games", None) if committed_games != committed_games_run: - print( + logger.warning( f"Warning: correcting committed_games for {run_id}", f"db: {committed_games_run} computed:{committed_games}", - flush=True, ) run["committed_games"] = committed_games changed = True total_games = compute_total_games(run) total_games_run = run.get("total_games", None) if total_games != total_games_run: - print( + logger.warning( f"Warning: correcting total_games for {run_id}", f"db: {total_games_run} computed:{total_games}", - flush=True, ) run["total_games"] = total_games changed = True flags = compute_flags(run) flags_run = {"is_green": run["is_green"], "is_yellow": run["is_yellow"]} if flags != flags_run: - print( - f"Warning: correcting flags for {run_id}", - f"db: {flags_run} computed:{flags}", - flush=True, + logger.warning( + f"Warning: correcting flags for {run_id} " + f"db: {flags_run} computed:{flags}" ) run.update(flags) changed = True @@ -571,7 +571,7 @@ def new_run( validate(runs_schema, new_run, "run") except ValidationError as e: message = f"The new run object does not validate: {str(e)}" - print(message, flush=True) + logger.error(message) raise Exception(message) # We cannot use self.buffer since new_run does not have an id yet. @@ -591,11 +591,7 @@ def upload_pgn(self, run_id, pgn_zip): validate(pgns_schema, record) except ValidationError as e: message = f"Internal Error. Pgn record has the wrong format: {str(e)}" - print(message, flush=True) - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + event_log.error(message) self.pgndb.insert_one( record, ) @@ -705,10 +701,9 @@ def get_run(self, r_id): def buffer(self, run, flush): if not self.is_primary_instance(): - print( + logger.warning( "Warning: attempt to use the run_cache on the", f"secondary instance with port number {self.port}!", - flush=True, ) return r_id = str(run["_id"]) @@ -742,6 +737,7 @@ def stop(self): time.sleep(1.1) def flush_all(self): + # Logging is not safe in an event handler print("flush", flush=True) # Note that we do not grab locks because this method is # called from a signal handler and grabbing locks might deadlock @@ -803,11 +799,10 @@ def scavenge_dead_tasks(self): # We release the lock to avoid deadlock for task_id, run in dead_tasks: task = run["tasks"][task_id] - print( + logger.info( "dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format( run["_id"], task_id, worker_name(task["worker_info"]) ), - flush=True, ) self.handle_crash_or_time(run, task_id) self.actiondb.dead_task( @@ -1025,7 +1020,7 @@ def request_task(self, worker_info): self.task_semaphore.release() else: message = "Request_task: the server is currently too busy..." - print(message, flush=True) + logger.info(message) return {"task_waiting": False, "info": message} def sync_request_task(self, worker_info): @@ -1063,7 +1058,7 @@ def sync_request_task(self, worker_info): f'which {last_update} seconds ago sent an update for task {str(wtt_run["_id"])}/{wtt_task_id} ' f'(my name is "{my_name_long}")' ) - print(error, flush=True) + logger.info(error) return {"task_waiting": False, "error": error} # We see if the worker has reached the number of allowed connections from the same ip @@ -1075,7 +1070,7 @@ def sync_request_task(self, worker_info): error = "Request_task: Machine limit reached for user {}".format( worker_info["username"] ) - print(error, flush=True) + logger.info(error) return {"task_waiting": False, "error": error} # Collect some data about the worker that will be used below. @@ -1214,7 +1209,7 @@ def priority(run): # lower is better f"Request_task: alas the run {run_id} corresponding to the " "assigned task no longer needs games. Please try again..." ) - print(info, flush=True) + logger.info(info) return {"task_waiting": False, "info": info} opening_offset = run["total_games"] @@ -1396,7 +1391,7 @@ def count_games(d): # Only log the case where the run is not yet finished, # otherwise it is expected behavior if not run["finished"]: - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} # Guard against incorrect results @@ -1425,7 +1420,7 @@ def count_games(d): ) if error != "": - print(error, flush=True) + logger.error(error) self.set_inactive_task(task_id, run) return {"task_alive": False, "error": error} @@ -1509,18 +1504,17 @@ def failed_task(self, run_id, task_id, message="Unknown reason"): # Check if the worker is still working on this task. if not task["active"]: info = "Failed_task: task {}/{} is not active".format(run_id, task_id) - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} # Mark the task as inactive. self.set_inactive_task(task_id, run) self.handle_crash_or_time(run, task_id) self.buffer(run, False) - print( + logger.info( "Failed_task: failure for: https://tests.stockfishchess.org/tests/view/{}, " "task_id: {}, worker: {}, reason: '{}'".format( run_id, task_id, worker_name(task["worker_info"]), message ), - flush=True, ) self.actiondb.failed_task( username=task["worker_info"]["username"], @@ -1552,12 +1546,10 @@ def stop_run(self, run_id): validate(runs_schema, run, "run") except ValidationError as e: message = f"The run object {run_id} does not validate: {str(e)}" - print(message, flush=True) if "version" in run and run["version"] >= RUN_VERSION: - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + event_log.error(message) + else: + logger.info(message) self.buffer(run, True) # Auto-purge runs here. This may revive the run. @@ -1573,13 +1565,12 @@ def stop_run(self, run_id): ), ) if message == "": - print("Run {} was auto-purged".format(str(run_id)), flush=True) + logger.info("Run {} was auto-purged".format(str(run_id))) else: - print( + logger.info( "Run {} was not auto-purged. Message: {}.".format( str(run_id), message ), - flush=True, ) def approve_run(self, run_id, approver): @@ -1693,7 +1684,7 @@ def request_spsa(self, run_id, task_id): # Check if the worker is still working on this task. if not task["active"]: info = "Request_spsa: task {}/{} is not active".format(run_id, task_id) - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} result = self.generate_spsa(run) diff --git a/server/fishtest/scheduler.py b/server/fishtest/scheduler.py new file mode 100644 index 0000000000..75646e9c28 --- /dev/null +++ b/server/fishtest/scheduler.py @@ -0,0 +1,223 @@ +import copy +import threading +import traceback +from datetime import datetime, timedelta, timezone +from random import uniform + +""" +The following scheduling code should be thread safe. + +- First and foremost, all tasks are executed in a single main thread. +So they are atomic. In particular, during its lifetime, a task will be +executed exactly once at each scheduling point. + +- The main thread maintains a list of scheduled tasks. To safely manipulate +this list outside the main thread we rely on the atomicity of in-place +list operations in Python. + +- To signal the main thread that the task list has changed, which should +be acted upon as soon as possible as it might affect the next task to +be executed, we use a threading.Event. + +Example + +s=Scheduler() +s.add_task(3, task1) +s.add_task(2, task2) + +When the second task is scheduled, the scheduler will interrupt the +3s wait for the first task and replace it by a 2s wait for the second task. +""" + + +class Task: + """This is an opaque class representing a task. Instances should be created via + Scheduler.create_task(). Some public methods are documented below. + """ + + def __init__( + self, + period, + worker, + initial_delay=None, + min_delay=0.0, + one_shot=False, + jitter=0.0, + scheduler=None, + logger=None, + args=(), + kwargs={}, + ): + self.period = timedelta(seconds=period) + self.worker = worker + if initial_delay is None: + initial_delay = self.period + else: + initial_delay = timedelta(seconds=initial_delay) + self.min_delay = timedelta(seconds=min_delay) + self.__rel_jitter = jitter * self.period + self.__next_schedule = ( + datetime.now(timezone.utc) + + initial_delay + + uniform(-self.__rel_jitter, self.__rel_jitter) + ) + self.one_shot = one_shot + self.__expired = False + self.__scheduler = scheduler + self.__lock = threading.Lock() + self.logger = logger + self.args = args + self.kwargs = kwargs + + def _do_work(self): + if not self.__expired: + try: + self.worker(*self.args, *self.kwargs) + except Exception as e: + self.logger.warning( + f"{type(e).__name__} while executing task: {str(e)}" + ) + if not self.one_shot: + jitter = uniform(-self.__rel_jitter, self.__rel_jitter) + with self.__lock: + self.__next_schedule = ( + max( + self.__next_schedule + self.period, + datetime.now(timezone.utc) + self.min_delay, + ) + + jitter + ) + else: + self.__expired = True + + def _next_schedule(self): + return self.__next_schedule + + def schedule_now(self): + """Schedule the task now. Note that this happens asynchronously.""" + if not self.__expired: + with self.__lock: + self.__next_schedule = datetime.now(timezone.utc) + self.__scheduler._refresh() + + def expired(self): + """Indicates if the task has stopped + + :rtype: bool + """ + return self.__expired + + def stop(self): + """This stops the task""" + if self.__expired: + return + self.__expired = True + self.__scheduler._refresh() + + +class Scheduler: + """This creates a scheduler + + :param jitter: the default value for the task jitter (see below), defaults to 0.0 + :type jitter: float, optional + """ + + def __init__(self, jitter=0.0, logger=None): + """Constructor method""" + self.jitter = jitter + self.logger = logger + self.__tasks = [] + self.__event = threading.Event() + self.__thread_stopped = False + self.__worker_thread = threading.Thread(target=self.__next_schedule) + self.__worker_thread.start() + + def create_task( + self, + period, + worker, + initial_delay=None, + min_delay=0.0, + one_shot=False, + jitter=None, + args=(), + kwargs={}, + ): + """This schedules a new task. + + :param period: The period after which the task will repeat + :type period: float + + :param worker: A callable that executes the task + :type worker: Callable + + :param initial_delay: The delay before the first execution of the task, defaults to period + :type initial_delay: float, optional + + :param min_delay: The minimum delay before the same task is repeated, defaults to 0.0 + :type min_delay: float, optional + + :param one_shot: If true, execute the task only once, defaults to False + :type one_shot: bool, optional + + :param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter + :type jitter: float, optional + + :param args: Arguments passed to the worker, defaults to () + :type args: tuple, optional + + :param kwargs: Keyword arguments passed to the worker, defaults to {} + :type kwargs: dict, optional + + :rtype: Task + """ + if jitter is None: + jitter = self.jitter + task = Task( + period, + worker, + initial_delay=initial_delay, + min_delay=min_delay, + one_shot=one_shot, + jitter=jitter, + scheduler=self, + logger=self.logger, + args=args, + kwargs=kwargs, + ) + self.__tasks.append(task) + self._refresh() + return task + + def stop(self): + """This stops the scheduler""" + self.__thread_stopped = True + self._refresh() + + def _refresh(self): + self.__event.set() + + def __del_task(self, task): + try: + self.__tasks.remove(task) + except Exception: + pass + + def __next_schedule(self): + while not self.__thread_stopped: + next_schedule = None + for task in copy.copy(self.__tasks): + if task.expired(): + self.__del_task(task) + else: + if next_schedule is None or task._next_schedule() < next_schedule: + next_task = task + next_schedule = task._next_schedule() + if next_schedule is not None: + delay = (next_schedule - datetime.now(timezone.utc)).total_seconds() + self.__event.wait(delay) + if not self.__event.is_set(): + next_task._do_work() + else: + self.__event.wait() + self.__event.clear() diff --git a/server/fishtest/userdb.py b/server/fishtest/userdb.py index 6c2957b7f1..5d5fdd1876 100644 --- a/server/fishtest/userdb.py +++ b/server/fishtest/userdb.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from fishtest.schemas import user_schema +from fishtest.util import event_log, logger from pymongo import ASCENDING from vtjson import ValidationError, validate @@ -15,7 +16,7 @@ def validate_user(user): validate(user_schema, user, "user") except ValidationError as e: message = f"The user object does not validate: {str(e)}" - print(message, flush=True) + logger.error(message) raise Exception(message) @@ -145,9 +146,8 @@ def remove_user(self, user, rejector): self.last_pending_time = 0 self.clear_cache() # logs rejected users to the server - print( - f"user: {user['username']} with email: {user['email']} was rejected by: {rejector}", - flush=True, + logger.info( + f"user: {user['username']} with email: {user['email']} was rejected by: {rejector}" ) return True else: diff --git a/server/fishtest/util.py b/server/fishtest/util.py index 589f842f9f..38e2bc356c 100644 --- a/server/fishtest/util.py +++ b/server/fishtest/util.py @@ -1,11 +1,10 @@ import copy import hashlib +import logging import math import re -import threading from datetime import datetime, timedelta, timezone from functools import cache -from random import uniform import fishtest.stats.stat_util import numpy @@ -15,6 +14,9 @@ FISH_URL = "https://tests.stockfishchess.org/tests/view/" +logger = logging.getLogger("fishtest") +event_log = logging.getLogger("fishtest.event") + class GeneratorAsFileReader: def __init__(self, generator): @@ -579,218 +581,18 @@ def strip_run(run): return stripped -""" -The following scheduling code should be thread safe. - -- First and foremost, all tasks are executed in a single main thread. -So they are atomic. In particular, during its lifetime, a task will be -executed exactly once at each scheduling point. - -- The main thread maintains a list of scheduled tasks. To safely manipulate -this list outside the main thread we rely on the atomicity of in-place -list operations in Python. - -- To signal the main thread that the task list has changed, which should -be acted upon as soon as possible as it might affect the next task to -be executed, we use a threading.Event. - -Example - -s=Scheduler() -s.add_task(3, task1) -s.add_task(2, task2) - -When the second task is scheduled, the scheduler will interrupt the -3s wait for the first task and replace it by a 2s wait for the second task. -""" - - -class Task: - """This is an opaque class representing a task. Instances should be created via - Scheduler.create_task(). Some public methods are documented below. - """ - - def __init__( - self, - period, - worker, - initial_delay=None, - min_delay=0.0, - one_shot=False, - jitter=0.0, - scheduler=None, - args=(), - kwargs={}, - ): - self.period = timedelta(seconds=period) - self.worker = worker - if initial_delay is None: - initial_delay = self.period - else: - initial_delay = timedelta(seconds=initial_delay) - self.min_delay = timedelta(seconds=min_delay) - self.__rel_jitter = jitter * self.period - self.__next_schedule = ( - datetime.now(timezone.utc) - + initial_delay - + uniform(-self.__rel_jitter, self.__rel_jitter) - ) - self.one_shot = one_shot - self.__expired = False - self.__scheduler = scheduler - self.__lock = threading.Lock() - self.args = args - self.kwargs = kwargs - - def _do_work(self): - if not self.__expired: - try: - self.worker(*self.args, *self.kwargs) - except Exception as e: - print(f"{type(e).__name__} while executing task: {str(e)}", flush=True) - if not self.one_shot: - jitter = uniform(-self.__rel_jitter, self.__rel_jitter) - with self.__lock: - self.__next_schedule = ( - max( - self.__next_schedule + self.period, - datetime.now(timezone.utc) + self.min_delay, - ) - + jitter - ) - else: - self.__expired = True - - def _next_schedule(self): - return self.__next_schedule - - def schedule_now(self): - """Schedule the task now. Note that this happens asynchronously.""" - if not self.__expired: - with self.__lock: - self.__next_schedule = datetime.now(timezone.utc) - self.__scheduler._refresh() - - def expired(self): - """Indicates if the task has stopped - - :rtype: bool - """ - return self.__expired - - def stop(self): - """This stops the task""" - if self.__expired: - return - self.__expired = True - self.__scheduler._refresh() - - -class Scheduler: - """This creates a scheduler - - :param jitter: the default value for the task jitter (see below), defaults to 0.0 - :type jitter: float, optional - """ - - def __init__(self, jitter=0.0): - """Constructor method""" - self.jitter = jitter - self.__tasks = [] - self.__event = threading.Event() - self.__thread_stopped = False - self.__worker_thread = threading.Thread(target=self.__next_schedule) - self.__worker_thread.start() - - def create_task( - self, - period, - worker, - initial_delay=None, - min_delay=0.0, - one_shot=False, - jitter=None, - args=(), - kwargs={}, - ): - """This schedules a new task. - - :param period: The period after which the task will repeat - :type period: float - - :param worker: A callable that executes the task - :type worker: Callable - - :param initial_delay: The delay before the first execution of the task, defaults to period - :type initial_delay: float, optional - - :param min_delay: The minimum delay before the same task is repeated, defaults to 0.0 - :type min_delay: float, optional - - :param one_shot: If true, execute the task only once, defaults to False - :type one_shot: bool, optional - - :param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter - :type jitter: float, optional - - :param args: Arguments passed to the worker, defaults to () - :type args: tuple, optional - - :param kwargs: Keyword arguments passed to the worker, defaults to {} - :type kwargs: dict, optional - - :rtype: Task - """ - if jitter is None: - jitter = self.jitter - task = Task( - period, - worker, - initial_delay=initial_delay, - min_delay=min_delay, - one_shot=one_shot, - jitter=jitter, - scheduler=self, - args=args, - kwargs=kwargs, - ) - self.__tasks.append(task) - self._refresh() - return task - - def stop(self): - """This stops the scheduler""" - self.__thread_stopped = True - self._refresh() - - def _refresh(self): - self.__event.set() +class EventHandler(logging.Handler): + def __init__(self): + super().__init__() + # Avoid circular import + from fishtest.rundb import OpenDb - def _del_task(self, task): - self.__del_task(task) - self._refresh() + opendb = OpenDb() + self.actiondb = opendb.actiondb - def __del_task(self, task): - try: - self.__tasks.remove(task) - except Exception: - pass - - def __next_schedule(self): - while not self.__thread_stopped: - next_schedule = None - for task in copy.copy(self.__tasks): - if task.expired(): - self.__del_task(task) - else: - if next_schedule is None or task._next_schedule() < next_schedule: - next_task = task - next_schedule = task._next_schedule() - if next_schedule is not None: - delay = (next_schedule - datetime.now(timezone.utc)).total_seconds() - self.__event.wait(delay) - if not self.__event.is_set(): - next_task._do_work() - else: - self.__event.wait() - self.__event.clear() + def emit(self, record): + message = self.format(record) + self.actiondb.log_message( + username="fishtest.system", + message=message, + ) diff --git a/server/fishtest/views.py b/server/fishtest/views.py index b4706da160..0db4476ed8 100644 --- a/server/fishtest/views.py +++ b/server/fishtest/views.py @@ -14,6 +14,7 @@ from fishtest.schemas import RUN_VERSION, runs_schema, short_worker_name from fishtest.util import ( email_valid, + event_log, extract_repo_from_link, format_bounds, format_date, @@ -23,6 +24,7 @@ get_tc_ratio, github_repo_valid, is_sprt_ltc_data, + logger, password_strength, update_residuals, ) @@ -291,7 +293,7 @@ def upload(request): ) return {} except Exception as e: - print("Error reading the network file:", e) + logger.error(f"Error reading the network file: {str(e)}") request.session.flash("Error reading the network file", "error") return {} if request.rundb.get_nn(filename): @@ -315,7 +317,7 @@ def upload(request): with open(os.path.expanduser("~/fishtest.upload"), "r") as f: upload_server = f.read().strip() except Exception as e: - print("Network upload not configured:", e) + logger.error(f"Network upload not configured: {str(e)}") request.session.flash("Network upload not configured", "error") return {} try: @@ -324,10 +326,10 @@ def upload(request): response = requests.post(upload_server, files=files, timeout=HTTP_TIMEOUT * 20) response.raise_for_status() except ConnectionError as e: - print("Failed to connect to the net server:", e) + logger.error(f"Failed to connect to the net server: {str(e)}") error = "Failed to connect to the net server" except HTTPError as e: - print("Network upload failed:", e) + logger.error("Network upload failed: {str(e)}") if response.status_code == 409: error = "Post request failed: network {} already uploaded".format(filename) elif response.status_code == 500: @@ -337,7 +339,7 @@ def upload(request): else: error = "Post request failed: other HTTP error" except Exception as e: - print("Error during connection:", e) + logger.error(f"Error during connection: {str(e)}") error = "Post request for the network upload failed" if error: @@ -422,7 +424,7 @@ def signup(request): ).json() if "success" not in response or not response["success"]: if "error-codes" in response: - print(response["error-codes"]) + logger.error(response["error-codes"]) request.session.flash("Captcha failed", "error") return {} @@ -731,7 +733,7 @@ def get_master_info(url): response = requests.get(url) response.raise_for_status() except Exception as e: - print(f"Exception getting commits:\n{e}") + logger.error(f"Exception getting commits:\n{str(e)}") return None bench_search = re.compile(r"(^|\s)[Bb]ench[ :]+([1-9]\d{5,7})(?!\d)") @@ -1408,15 +1410,11 @@ def tests_delete(request): try: validate(runs_schema, run, "run") except ValidationError as e: - message = ( - f"The run object {request.POST['run-id']} does not validate: {str(e)}" - ) - print(message, flush=True) + message = f"The run object {run_id} does not validate: {str(e)}" if "version" in run and run["version"] >= RUN_VERSION: - request.actiondb.log_message( - username="fishtest.system", - message=message, - ) + event_log.error(message) + else: + logger.info(message) request.rundb.buffer(run, True) request.actiondb.delete_run( @@ -1769,7 +1767,7 @@ def tests(request): try: last_tests = homepage_results(request) except Exception as e: - print("Overview exception: " + str(e)) + logger.error(f"Overview exception: {str(e)}") if not last_tests: raise e finally: diff --git a/server/production.ini b/server/production.ini index 10066bcc3d..80f1e57e41 100644 --- a/server/production.ini +++ b/server/production.ini @@ -40,10 +40,10 @@ threads = 4 ### [loggers] -keys = root, fishtest +keys = root, fishtest, fishtest.event [handlers] -keys = console +keys = console, events [formatters] keys = generic @@ -53,15 +53,25 @@ level = ERROR handlers = console [logger_fishtest] -level = WARN +level = DEBUG handlers = qualname = fishtest +[logger_fishtest.event] +level = DEBUG +handlers = events +qualname = fishtest.event + [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic +[handler_events] +class = fishtest.util.EventHandler +level = NOTSET +formatter = generic + [formatter_generic] -format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s +format = %(levelname)s [%(name)s] [%(threadName)s:%(filename)s:%(funcName)s:%(lineno)s] %(message)s