diff --git a/server/development.ini b/server/development.ini index 4c95d83ef9..834b9e2da2 100644 --- a/server/development.ini +++ b/server/development.ini @@ -38,18 +38,18 @@ port = 6542 keys = root, fishtest [handlers] -keys = console +keys = console, events [formatters] keys = generic [logger_root] -level = INFO +level = DEBUG handlers = console [logger_fishtest] level = DEBUG -handlers = +handlers = events qualname = fishtest [handler_console] @@ -58,5 +58,9 @@ args = (sys.stderr,) level = NOTSET formatter = generic +[handler_events] +class = fishtest.rundb.EventHandler +formatter = generic + [formatter_generic] -format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s +format = %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s diff --git a/server/fishtest/actiondb.py b/server/fishtest/actiondb.py index 5a8698301f..58f2927b7d 100644 --- a/server/fishtest/actiondb.py +++ b/server/fishtest/actiondb.py @@ -1,3 +1,5 @@ +import logging + from datetime import datetime, timezone from fishtest.schemas import action_schema @@ -5,6 +7,8 @@ from pymongo import DESCENDING from vtjson import ValidationError, validate +logger = logging.getLogger(__name__) + def run_name(run): run_id = str(run["_id"]) @@ -219,13 +223,7 @@ def insert_action(self, **action): try: validate(action_schema, action, "action") except ValidationError as e: - message = ( - f"Internal Error. Request {str(action)} does not validate: {str(e)}" - ) - print(message, flush=True) - self.log_message( - username="fishtest.system", - message=message, - ) + message = f"Request {str(action)} does not validate: {str(e)}" + logger.error(message) return self.actions.insert_one(action) diff --git a/server/fishtest/api.py b/server/fishtest/api.py index 6a36baf4df..c377722910 100644 --- a/server/fishtest/api.py +++ b/server/fishtest/api.py @@ -1,6 +1,7 @@ import base64 import copy import io +import logging import re from datetime import datetime, timezone from urllib.parse import urlparse @@ -19,6 +20,8 @@ from pyramid.view import exception_view_config, view_config, view_defaults from vtjson import ValidationError, validate +logger = logging.getLogger(__name__) + """ Important note ============== @@ -69,7 +72,7 @@ def handle_error(self, error, exception=HTTPBadRequest): ) api = urlparse(full_url).path error = f"{api}: {error}" - print(error, flush=True) + logger.info(error) raise exception(self.add_time({"error": error})) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 672e7cc314..96d626d212 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -1,4 +1,5 @@ import copy +import logging import math import os import random @@ -15,6 +16,7 @@ from bson.errors import InvalidId from bson.objectid import ObjectId from fishtest.actiondb import ActionDb +from fishtest.scheduler import Scheduler from fishtest.schemas import ( RUN_VERSION, cache_schema, @@ -31,7 +33,6 @@ from fishtest.userdb import UserDb from fishtest.util import ( GeneratorAsFileReader, - Scheduler, crash_or_time, estimate_game_duration, format_bounds, @@ -48,7 +49,24 @@ from pymongo import DESCENDING, MongoClient from vtjson import ValidationError, validate -boot_time = datetime.now(timezone.utc) + +logger = logging.getLogger(__name__) + + +class EventHandler(logging.Handler): + def __init__(self): + super().__init__(level=logging.ERROR) + logging.disable(level=logging.CRITICAL) + rundb = RunDb(is_primary_instance=False) + logging.disable(level=logging.NOTSET) + self.actiondb = rundb.actiondb + + def emit(self, record): + message = self.format(record) + self.actiondb.log_message( + username="fishtest.system", + message=message, + ) class RunDb: @@ -93,7 +111,7 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): def schedule_tasks(self): if self.scheduler is None: - self.scheduler = Scheduler(jitter=0.05) + self.scheduler = Scheduler(jitter=0.05, logger=logger) self.scheduler.create_task(1.0, self.flush_buffers) self.scheduler.create_task(60.0, self.clean_cache) self.scheduler.create_task(60.0, self.scavenge_dead_tasks) @@ -121,7 +139,7 @@ def clean_wtt_map(self): task = run["tasks"][task_id] if not task["active"]: del self.wtt_map[short_worker_name] - print(f"Clean_wtt_map: {len(self.wtt_map)} active workers...") + logger.info(f"Clean_wtt_map: {len(self.wtt_map)} active workers...") # Do not use this while holding an active_run_lock! def insert_in_wtt_map(self, run, task_id): @@ -150,9 +168,8 @@ def validate_random_run(self): if not cache_entry["run"]["finished"] ] if len(run_list) == 0: - print( + logger.info( "Validate_random_run: no unfinished cache runs. No runs to validate...", - flush=True, ) return run = random.choice(run_list) @@ -162,18 +179,15 @@ def validate_random_run(self): # validating it with self.active_run_lock(run_id): validate(runs_schema, run, "run") - print( + logger.info( f"Validate_random_run: validated cache run {run_id}...", - flush=True, ) except ValidationError as e: message = f"The run object {run_id} does not validate: {str(e)}" - print(message, flush=True) if "version" in run and run["version"] >= RUN_VERSION: - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + logger.error(message) + else: + logger.info(message) def set_inactive_run(self, run): run_id = str(run["_id"]) @@ -215,7 +229,7 @@ def set_inactive_task(self, task_id, run): if self.connections_counter[remote_addr] == 0: del self.connections_counter[remote_addr] except Exception as e: - print(f"Error while deleting connection: {str(e)}", flush=True) + logger.error(f"Error while deleting connection: {str(e)}") def set_bad_task(self, task_id, run, residual=None, residual_color=None): zero_stats = { @@ -273,48 +287,43 @@ def update_aggregated_data(self): with self.active_run_lock(run_id): results = compute_results(run) if results != run["results"]: - print( + logger.warning( f"Warning: correcting results for {run_id}", f"db: {run['results']} computed:{results}", - flush=True, ) run["results"] = results changed = True cores = compute_cores(run) if cores != run["cores"]: - print( + logger.warning( f"Warning: correcting cores for {run_id}", f"db: {run['cores']} computed:{cores}", - flush=True, ) run["cores"] = cores changed = True workers = compute_workers(run) if workers != run["workers"]: - print( + logger.warning( f"Warning: correcting workers for {run_id}", f"db: {run['workers']} computed:{workers}", - flush=True, ) run["workers"] = workers changed = True committed_games = compute_committed_games(run) committed_games_run = run.get("committed_games", None) if committed_games != committed_games_run: - print( + logger.warning( f"Warning: correcting committed_games for {run_id}", f"db: {committed_games_run} computed:{committed_games}", - flush=True, ) run["committed_games"] = committed_games changed = True total_games = compute_total_games(run) total_games_run = run.get("total_games", None) if total_games != total_games_run: - print( + logger.warning( f"Warning: correcting total_games for {run_id}", f"db: {total_games_run} computed:{total_games}", - flush=True, ) run["total_games"] = total_games changed = True @@ -350,7 +359,7 @@ def update_aggregated_data(self): subs={"runs_schema": dict}, ) except ValidationError as e: - print(f"Validation of run_cache failed: {str(e)}") + logger.error(f"Validation of run_cache failed: {str(e)}") def new_run( self, @@ -493,7 +502,7 @@ def new_run( validate(runs_schema, new_run, "run") except ValidationError as e: message = f"The new run object does not validate: {str(e)}" - print(message, flush=True) + logger.warning(message) raise Exception(message) # We cannot use self.buffer since new_run does not have an id yet. @@ -513,11 +522,7 @@ def upload_pgn(self, run_id, pgn_zip): validate(pgns_schema, record) except ValidationError as e: message = f"Internal Error. Pgn record has the wrong format: {str(e)}" - print(message, flush=True) - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + logger.error(message) self.pgndb.insert_one( record, ) @@ -628,10 +633,9 @@ def get_run(self, r_id): def buffer(self, run, flush): if not self.is_primary_instance(): - print( + logger.warning( "Warning: attempt to use the run_cache on the", f"secondary instance with port number {self.port}!", - flush=True, ) return with self.run_cache_lock: @@ -664,6 +668,7 @@ def stop(self): time.sleep(1.1) def flush_all(self): + # Logging is not safe in an event handler print("flush", flush=True) # Note that we do not grab locks because this method is # called from a signal handler and grabbing locks might deadlock @@ -723,11 +728,10 @@ def scavenge_dead_tasks(self): # We release the lock to avoid deadlock for task_id, run in dead_tasks: task = run["tasks"][task_id] - print( + logger.info( "dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format( run["_id"], task_id, worker_name(task["worker_info"]) ), - flush=True, ) self.handle_crash_or_time(run, task_id) self.actiondb.dead_task( @@ -948,7 +952,7 @@ def request_task(self, worker_info): self.task_semaphore.release() else: message = "Request_task: the server is currently too busy..." - print(message, flush=True) + logger.info(message) return {"task_waiting": False, "info": message} def sync_request_task(self, worker_info): @@ -986,7 +990,7 @@ def sync_request_task(self, worker_info): f'which {last_update} seconds ago sent an update for task {str(wtt_run["_id"])}/{wtt_task_id} ' f'(my name is "{my_name_long}")' ) - print(error, flush=True) + logger.info(error) return {"task_waiting": False, "error": error} # We see if the worker has reached the number of allowed connections from the same ip @@ -998,7 +1002,7 @@ def sync_request_task(self, worker_info): error = "Request_task: Machine limit reached for user {}".format( worker_info["username"] ) - print(error, flush=True) + logger.info(error) return {"task_waiting": False, "error": error} # Collect some data about the worker that will be used below. @@ -1135,7 +1139,7 @@ def priority(run): # lower is better f"Request_task: alas the run {run_id} corresponding to the " "assigned task no longer needs games. Please try again..." ) - print(info, flush=True) + logger.info(info) return {"task_waiting": False, "info": info} opening_offset = run["total_games"] @@ -1321,7 +1325,7 @@ def count_games(d): # Only log the case where the run is not yet finished, # otherwise it is expected behavior if not run["finished"]: - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} # Guard against incorrect results @@ -1350,7 +1354,7 @@ def count_games(d): ) if error != "": - print(error, flush=True) + logger.info(error) self.set_inactive_task(task_id, run) return {"task_alive": False, "error": error} @@ -1434,18 +1438,17 @@ def failed_task(self, run_id, task_id, message="Unknown reason"): # Check if the worker is still working on this task. if not task["active"]: info = "Failed_task: task {}/{} is not active".format(run_id, task_id) - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} # Mark the task as inactive. self.set_inactive_task(task_id, run) self.handle_crash_or_time(run, task_id) self.buffer(run, False) - print( + logger.info( "Failed_task: failure for: https://tests.stockfishchess.org/tests/view/{}, " "task_id: {}, worker: {}, reason: '{}'".format( run_id, task_id, worker_name(task["worker_info"]), message ), - flush=True, ) self.actiondb.failed_task( username=task["worker_info"]["username"], @@ -1477,12 +1480,10 @@ def stop_run(self, run_id): validate(runs_schema, run, "run") except ValidationError as e: message = f"The run object {run_id} does not validate: {str(e)}" - print(message, flush=True) if "version" in run and run["version"] >= RUN_VERSION: - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + logger.error(message) + else: + logger.info(message) self.buffer(run, True) # Auto-purge runs here. This may revive the run. @@ -1498,13 +1499,12 @@ def stop_run(self, run_id): ), ) if message == "": - print("Run {} was auto-purged".format(str(run_id)), flush=True) + logger.info("Run {} was auto-purged".format(str(run_id))) else: - print( + logger.info( "Run {} was not auto-purged. Message: {}.".format( str(run_id), message ), - flush=True, ) def approve_run(self, run_id, approver): @@ -1628,7 +1628,7 @@ def request_spsa(self, run_id, task_id): # Check if the worker is still working on this task. if not task["active"]: info = "Request_spsa: task {}/{} is not active".format(run_id, task_id) - print(info, flush=True) + logger.info(info) return {"task_alive": False, "info": info} result = self.generate_spsa(run) diff --git a/server/fishtest/scheduler.py b/server/fishtest/scheduler.py new file mode 100644 index 0000000000..e048316bfa --- /dev/null +++ b/server/fishtest/scheduler.py @@ -0,0 +1,211 @@ +import copy +import threading +from datetime import datetime, timedelta, timezone +from random import uniform + +""" +The following scheduling code should be thread safe. + +- First and foremost, all tasks are executed in a single main thread. +So they are atomic. In particular, during its lifetime, a task will be +executed exactly once at each scheduling point. + +- The main thread maintains a list of scheduled tasks. To safely manipulate +this list outside the main thread we rely on the atomicity of in-place +list operations in Python. + +- To signal the main thread that the task list has changed, which should +be acted upon as soon as possible as it might affect the next task to +be executed, we use a threading.Event. + +Example + +s=Scheduler() +s.add_task(3, task1) +s.add_task(2, task2) + +When the second task is scheduled, the scheduler will interrupt the +3s wait for the first task and replace it by a 2s wait for the second task. +""" + + +class Task: + """This is an opaque class representing a task. Instances should be created via + Scheduler.create_task(). Some public methods are documented below. + """ + + def __init__( + self, + period, + worker, + initial_delay=None, + one_shot=False, + jitter=0.0, + logger=None, + scheduler=None, + args=(), + kwargs={}, + ): + self.period = timedelta(seconds=period) + self.worker = worker + if initial_delay is None: + initial_delay = self.period + else: + initial_delay = timedelta(seconds=initial_delay) + self.__rel_jitter = jitter * self.period + self.__next_schedule = ( + datetime.now(timezone.utc) + + initial_delay + + uniform(-self.__rel_jitter, self.__rel_jitter) + ) + self.one_shot = one_shot + self.logger = logger + self.__expired = False + self.__scheduler = scheduler + self.__lock = threading.Lock() + self.args = args + self.kwargs = kwargs + + def _do_work(self): + if not self.__expired: + try: + self.worker(*self.args, *self.kwargs) + except Exception as e: + message = f"{type(e).__name__} while executing task: {str(e)}" + if self.logger is None: + print(message, flush=True) + else: + self.logger.info(message) + if not self.one_shot: + jitter = uniform(-self.__rel_jitter, self.__rel_jitter) + with self.__lock: + self.__next_schedule += self.period + jitter + else: + self.__expired = True + + def _next_schedule(self): + return self.__next_schedule + + def schedule_now(self): + """Schedule the task now. Note that this happens asynchronously.""" + if not self.__expired: + with self.__lock: + self.__next_schedule = datetime.now(timezone.utc) + self.__scheduler._refresh() + + def expired(self): + """Indicates if the task has stopped + + :rtype: bool + """ + return self.__expired + + def stop(self): + """This stops the task""" + if self.__expired: + return + self.__expired = True + self.__scheduler._refresh() + + +class Scheduler: + """This creates a scheduler + + :param jitter: the default value for the task jitter (see below), defaults to 0.0 + :type jitter: float, optional + """ + + def __init__(self, jitter=0.0, logger=None): + """Constructor method""" + self.jitter = jitter + self.logger = logger + self.__tasks = [] + self.__event = threading.Event() + self.__thread_stopped = False + self.__worker_thread = threading.Thread(target=self.__next_schedule) + self.__worker_thread.start() + + def create_task( + self, + period, + worker, + initial_delay=None, + one_shot=False, + jitter=None, + args=(), + kwargs={}, + ): + """This schedules a new task. + + :param period: The period after which the task will repeat + :type period: float + + :param worker: A callable that executes the task + :type worker: Callable + + :param initial_delay: The delay before the first execution of the task, defaults to period + :type initial_delay: float, optional + + :param one_shot: If true, execute the task only once, defaults to False + :type one_shot: bool, optional + + :param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter + :type jitter: float, optional + + :param args: Arguments passed to the worker, defaults to () + :type args: tuple, optional + + :param kwargs: Keyword arguments passed to the worker, defaults to {} + :type kwargs: dict, optional + + :rtype: Task + """ + if jitter is None: + jitter = self.jitter + task = Task( + period, + worker, + initial_delay=initial_delay, + one_shot=one_shot, + jitter=jitter, + logger=self.logger, + scheduler=self, + args=args, + kwargs=kwargs, + ) + self.__tasks.append(task) + self._refresh() + return task + + def stop(self): + """This stops the scheduler""" + self.__thread_stopped = True + self._refresh() + + def _refresh(self): + self.__event.set() + + def __del_task(self, task): + try: + self.__tasks.remove(task) + except Exception: + pass + + def __next_schedule(self): + while not self.__thread_stopped: + next_schedule = None + for task in copy.copy(self.__tasks): + if task.expired(): + self.__del_task(task) + else: + if next_schedule is None or task._next_schedule() < next_schedule: + next_task = task + next_schedule = task._next_schedule() + if next_schedule is not None: + delay = (next_schedule - datetime.now(timezone.utc)).total_seconds() + self.__event.wait(delay) + if not self.__event.is_set(): + next_task._do_work() + else: + self.__event.wait() + self.__event.clear() diff --git a/server/fishtest/userdb.py b/server/fishtest/userdb.py index 6c2957b7f1..2b962606e6 100644 --- a/server/fishtest/userdb.py +++ b/server/fishtest/userdb.py @@ -1,3 +1,4 @@ +import logging import sys import threading import time @@ -7,6 +8,9 @@ from pymongo import ASCENDING from vtjson import ValidationError, validate +logger = logging.getLogger("__name__") + + DEFAULT_MACHINE_LIMIT = 16 @@ -15,7 +19,7 @@ def validate_user(user): validate(user_schema, user, "user") except ValidationError as e: message = f"The user object does not validate: {str(e)}" - print(message, flush=True) + logger.info(message) raise Exception(message) @@ -145,9 +149,8 @@ def remove_user(self, user, rejector): self.last_pending_time = 0 self.clear_cache() # logs rejected users to the server - print( - f"user: {user['username']} with email: {user['email']} was rejected by: {rejector}", - flush=True, + self.log( + f"user: {user['username']} with email: {user['email']} was rejected by: {rejector}" ) return True else: diff --git a/server/fishtest/util.py b/server/fishtest/util.py index 170bdf8e08..9c5dc10e53 100644 --- a/server/fishtest/util.py +++ b/server/fishtest/util.py @@ -1,11 +1,10 @@ import copy import hashlib +import logging import math import re -import threading from datetime import datetime, timedelta, timezone from functools import cache -from random import uniform import fishtest.stats.stat_util import numpy @@ -567,207 +566,3 @@ def strip_run(run): stripped[key] = str(run[key]) return stripped - - -""" -The following scheduling code should be thread safe. - -- First and foremost, all tasks are executed in a single main thread. -So they are atomic. In particular, during its lifetime, a task will be -executed exactly once at each scheduling point. - -- The main thread maintains a list of scheduled tasks. To safely manipulate -this list outside the main thread we rely on the atomicity of in-place -list operations in Python. - -- To signal the main thread that the task list has changed, which should -be acted upon as soon as possible as it might affect the next task to -be executed, we use a threading.Event. - -Example - -s=Scheduler() -s.add_task(3, task1) -s.add_task(2, task2) - -When the second task is scheduled, the scheduler will interrupt the -3s wait for the first task and replace it by a 2s wait for the second task. -""" - - -class Task: - """This is an opaque class representing a task. Instances should be created via - Scheduler.create_task(). Some public methods are documented below. - """ - - def __init__( - self, - period, - worker, - initial_delay=None, - one_shot=False, - jitter=0.0, - scheduler=None, - args=(), - kwargs={}, - ): - self.period = timedelta(seconds=period) - self.worker = worker - if initial_delay is None: - initial_delay = self.period - else: - initial_delay = timedelta(seconds=initial_delay) - self.__rel_jitter = jitter * self.period - self.__next_schedule = ( - datetime.now(timezone.utc) - + initial_delay - + uniform(-self.__rel_jitter, self.__rel_jitter) - ) - self.one_shot = one_shot - self.__expired = False - self.__scheduler = scheduler - self.__lock = threading.Lock() - self.args = args - self.kwargs = kwargs - - def _do_work(self): - if not self.__expired: - try: - self.worker(*self.args, *self.kwargs) - except Exception as e: - print(f"{type(e).__name__} while executing task: {str(e)}", flush=True) - if not self.one_shot: - jitter = uniform(-self.__rel_jitter, self.__rel_jitter) - with self.__lock: - self.__next_schedule += self.period + jitter - else: - self.__expired = True - - def _next_schedule(self): - return self.__next_schedule - - def schedule_now(self): - """Schedule the task now. Note that this happens asynchronously.""" - if not self.__expired: - with self.__lock: - self.__next_schedule = datetime.now(timezone.utc) - self.__scheduler._refresh() - - def expired(self): - """Indicates if the task has stopped - - :rtype: bool - """ - return self.__expired - - def stop(self): - """This stops the task""" - if self.__expired: - return - self.__expired = True - self.__scheduler._refresh() - - -class Scheduler: - """This creates a scheduler - - :param jitter: the default value for the task jitter (see below), defaults to 0.0 - :type jitter: float, optional - """ - - def __init__(self, jitter=0.0): - """Constructor method""" - self.jitter = jitter - self.__tasks = [] - self.__event = threading.Event() - self.__thread_stopped = False - self.__worker_thread = threading.Thread(target=self.__next_schedule) - self.__worker_thread.start() - - def create_task( - self, - period, - worker, - initial_delay=None, - one_shot=False, - jitter=None, - args=(), - kwargs={}, - ): - """This schedules a new task. - - :param period: The period after which the task will repeat - :type period: float - - :param worker: A callable that executes the task - :type worker: Callable - - :param initial_delay: The delay before the first execution of the task, defaults to period - :type initial_delay: float, optional - - :param one_shot: If true, execute the task only once, defaults to False - :type one_shot: bool, optional - - :param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter - :type jitter: float, optional - - :param args: Arguments passed to the worker, defaults to () - :type args: tuple, optional - - :param kwargs: Keyword arguments passed to the worker, defaults to {} - :type kwargs: dict, optional - - :rtype: Task - """ - if jitter is None: - jitter = self.jitter - task = Task( - period, - worker, - initial_delay=initial_delay, - one_shot=one_shot, - jitter=jitter, - scheduler=self, - args=args, - kwargs=kwargs, - ) - self.__tasks.append(task) - self._refresh() - return task - - def stop(self): - """This stops the scheduler""" - self.__thread_stopped = True - self._refresh() - - def _refresh(self): - self.__event.set() - - def _del_task(self, task): - self.__del_task(task) - self._refresh() - - def __del_task(self, task): - try: - self.__tasks.remove(task) - except Exception: - pass - - def __next_schedule(self): - while not self.__thread_stopped: - next_schedule = None - for task in copy.copy(self.__tasks): - if task.expired(): - self.__del_task(task) - else: - if next_schedule is None or task._next_schedule() < next_schedule: - next_task = task - next_schedule = task._next_schedule() - if next_schedule is not None: - delay = (next_schedule - datetime.now(timezone.utc)).total_seconds() - self.__event.wait(delay) - if not self.__event.is_set(): - next_task._do_work() - else: - self.__event.wait() - self.__event.clear() diff --git a/server/fishtest/views.py b/server/fishtest/views.py index e10890d7f8..dc97bc842f 100644 --- a/server/fishtest/views.py +++ b/server/fishtest/views.py @@ -1,6 +1,7 @@ import copy import hashlib import html +import logging import os import re import threading @@ -30,6 +31,8 @@ from requests.exceptions import ConnectionError, HTTPError from vtjson import ValidationError, union, validate +logger = logging.getLogger(__name__) + HTTP_TIMEOUT = 15.0 @@ -279,7 +282,7 @@ def upload(request): ) return {} except Exception as e: - print("Error reading the network file:", e) + logger.info(f"Error reading the network file: {str(e)}") request.session.flash("Error reading the network file", "error") return {} if request.rundb.get_nn(filename): @@ -303,7 +306,7 @@ def upload(request): with open(os.path.expanduser("~/fishtest.upload"), "r") as f: upload_server = f.read().strip() except Exception as e: - print("Network upload not configured:", e) + logger.info(f"Network upload not configured: {str(e)}") request.session.flash("Network upload not configured", "error") return {} try: @@ -312,10 +315,10 @@ def upload(request): response = requests.post(upload_server, files=files, timeout=HTTP_TIMEOUT * 20) response.raise_for_status() except ConnectionError as e: - print("Failed to connect to the net server:", e) + logger.info(f"Failed to connect to the net server: {str(e)}") error = "Failed to connect to the net server" except HTTPError as e: - print("Network upload failed:", e) + logger.info("Network upload failed: {str(e)}") if response.status_code == 409: error = "Post request failed: network {} already uploaded".format(filename) elif response.status_code == 500: @@ -325,7 +328,7 @@ def upload(request): else: error = "Post request failed: other HTTP error" except Exception as e: - print("Error during connection:", e) + logger.info(f"Error during connection: {str(e)}") error = "Post request for the network upload failed" if error: @@ -410,7 +413,7 @@ def signup(request): ).json() if "success" not in response or not response["success"]: if "error-codes" in response: - print(response["error-codes"]) + logger.info(response["error-codes"]) request.session.flash("Captcha failed", "error") return {} @@ -716,7 +719,7 @@ def get_master_info(url): response = requests.get(url) response.raise_for_status() except Exception as e: - print(f"Exception getting commits:\n{e}") + logger.info(f"Exception getting commits:\n{str(e)}") return None bench_search = re.compile(r"(^|\s)[Bb]ench[ :]+([1-9]\d{5,7})(?!\d)") @@ -1394,12 +1397,10 @@ def tests_delete(request): validate(runs_schema, run, "run") except ValidationError as e: message = f"The run object {run_id} does not validate: {str(e)}" - print(message, flush=True) if "version" in run and run["version"] >= RUN_VERSION: - self.actiondb.log_message( - username="fishtest.system", - message=message, - ) + logger.error(message) + else: + logger.info(message) request.rundb.buffer(run, True) request.actiondb.delete_run( @@ -1747,7 +1748,7 @@ def tests(request): try: last_tests = homepage_results(request) except Exception as e: - print("Overview exception: " + str(e)) + logger.info(f"Overview exception: {str(e)}") if not last_tests: raise e finally: diff --git a/server/production.ini b/server/production.ini index 10066bcc3d..6897508b8f 100644 --- a/server/production.ini +++ b/server/production.ini @@ -43,18 +43,18 @@ threads = 4 keys = root, fishtest [handlers] -keys = console +keys = console, events [formatters] keys = generic [logger_root] -level = ERROR +level = DEBUG handlers = console [logger_fishtest] -level = WARN -handlers = +level = DEBUG +handlers = events qualname = fishtest [handler_console] @@ -63,5 +63,9 @@ args = (sys.stderr,) level = NOTSET formatter = generic +[handler_events] +class = fishtest.rundb.EventHandler +formatter = generic + [formatter_generic] -format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s +format = %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s