diff --git a/server/fishtest/actiondb.py b/server/fishtest/actiondb.py index df0d5b01e..c3d6eadcc 100644 --- a/server/fishtest/actiondb.py +++ b/server/fishtest/actiondb.py @@ -218,8 +218,10 @@ def insert_action(self, **action): action["time"] = datetime.now(timezone.utc).timestamp() try: validate(action_schema, action, "action") - except ValidationError: - message = f"Internal Error. Request {str(action)} does not validate" + except ValidationError as e: + message = ( + f"Internal Error. Request {str(action)} does not validate: {str(e)}" + ) print(message, flush=True) self.log_message( username="fishtest.system", diff --git a/server/fishtest/schemas.py b/server/fishtest/schemas.py index 56306cce5..272d6f8f4 100644 --- a/server/fishtest/schemas.py +++ b/server/fishtest/schemas.py @@ -10,8 +10,10 @@ from bson.objectid import ObjectId from vtjson import ( + anything, at_least_one_of, at_most_one_of, + cond, div, email, glob, @@ -21,6 +23,7 @@ ip_address, keys, lax, + nothing, number, one_of, quote, @@ -125,150 +128,236 @@ def first_test_before_last(x): } -action_schema = union( - { - "_id?": ObjectId, - "time": float, - "action": "failed_task", - "username": str, - "worker": long_worker_name, - "run_id": run_id, - "run": run_name, - "task_id": int, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "crash_or_time", - "username": str, - "worker": long_worker_name, - "run_id": run_id, - "run": run_name, - "task_id": int, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "dead_task", - "username": str, - "worker": long_worker_name, - "run_id": run_id, - "run": run_name, - "task_id": int, - }, - { - "_id?": ObjectId, - "time": float, - "action": "system_event", - "username": "fishtest.system", - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "new_run", - "username": str, - "run_id": run_id, - "run": run_name, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "upload_nn", - "username": str, - "nn": str, - }, - { - "_id?": ObjectId, - "time": float, - "action": "modify_run", - "username": str, - "run_id": run_id, - "run": run_name, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "delete_run", - "username": str, - "run_id": run_id, - "run": run_name, - }, - intersect( +action_name = set_name( + union( + "failed_task", + "crash_or_time", + "dead_task", + "system_event", + "new_run", + "upload_nn", + "modify_run", + "delete_run", + "stop_run", + "finished_run", + "approve_run", + "purge_run", + "block_user", + "accept_user", + "block_worker", + "log_message", + ), + "action_name", +) + + +def action_is(x): + return lax({"action": x}) + + +action_schema = intersect( + # First make sure that we recognize the action name. + lax( { - "_id?": ObjectId, - "time": float, - "action": "stop_run", - "username": str, - "run_id": run_id, - "run": run_name, - "message": action_message, - "worker?": long_worker_name, - "task_id?": int, - }, - ifthen(at_least_one_of("worker", "task_id"), keys("worker", "task_id")), + "action": action_name, + } + ), + # For every action name introduce a specific schema. + cond( + ( + action_is("failed_task"), + { + "_id?": ObjectId, + "time": float, + "action": "failed_task", + "username": str, + "worker": long_worker_name, + "run_id": run_id, + "run": run_name, + "task_id": int, + "message": action_message, + }, + ), + ( + action_is("crash_or_time"), + { + "_id?": ObjectId, + "time": float, + "action": "crash_or_time", + "username": str, + "worker": long_worker_name, + "run_id": run_id, + "run": run_name, + "task_id": int, + "message": action_message, + }, + ), + ( + action_is("dead_task"), + { + "_id?": ObjectId, + "time": float, + "action": "dead_task", + "username": str, + "worker": long_worker_name, + "run_id": run_id, + "run": run_name, + "task_id": int, + }, + ), + ( + action_is("system_event"), + { + "_id?": ObjectId, + "time": float, + "action": "system_event", + "username": "fishtest.system", + "message": action_message, + }, + ), + ( + action_is("new_run"), + { + "_id?": ObjectId, + "time": float, + "action": "new_run", + "username": str, + "run_id": run_id, + "run": run_name, + "message": action_message, + }, + ), + ( + action_is("upload_nn"), + { + "_id?": ObjectId, + "time": float, + "action": "upload_nn", + "username": str, + "nn": str, + }, + ), + ( + action_is("modify_run"), + { + "_id?": ObjectId, + "time": float, + "action": "modify_run", + "username": str, + "run_id": run_id, + "run": run_name, + "message": action_message, + }, + ), + ( + action_is("delete_run"), + { + "_id?": ObjectId, + "time": float, + "action": "delete_run", + "username": str, + "run_id": run_id, + "run": run_name, + }, + ), + ( + action_is("stop_run"), + intersect( + { + "_id?": ObjectId, + "time": float, + "action": "stop_run", + "username": str, + "run_id": run_id, + "run": run_name, + "message": action_message, + "worker?": long_worker_name, + "task_id?": int, + }, + ifthen(at_least_one_of("worker", "task_id"), keys("worker", "task_id")), + ), + ), + ( + action_is("finished_run"), + { + "_id?": ObjectId, + "time": float, + "action": "finished_run", + "username": str, + "run_id": run_id, + "run": run_name, + "message": action_message, + }, + ), + ( + action_is("approve_run"), + { + "_id?": ObjectId, + "time": float, + "action": "approve_run", + "username": str, + "run_id": run_id, + "run": run_name, + }, + ), + ( + action_is("purge_run"), + { + "_id?": ObjectId, + "time": float, + "action": "purge_run", + "username": str, + "run_id": run_id, + "run": run_name, + "message": action_message, + }, + ), + ( + action_is("block_user"), + { + "_id?": ObjectId, + "time": float, + "action": "block_user", + "username": str, + "user": str, + "message": union("blocked", "unblocked"), + }, + ), + ( + action_is("accept_user"), + { + "_id?": ObjectId, + "time": float, + "action": "accept_user", + "username": str, + "user": str, + "message": "accepted", + }, + ), + ( + action_is("block_worker"), + { + "_id?": ObjectId, + "time": float, + "action": "block_worker", + "username": str, + "worker": short_worker_name, + "message": union("blocked", "unblocked"), + }, + ), + ( + action_is("log_message"), + { + "_id?": ObjectId, + "time": float, + "action": "log_message", + "username": str, + "message": action_message, + }, + ), + # we should never get here + (anything, nothing), ), - { - "_id?": ObjectId, - "time": float, - "action": "finished_run", - "username": str, - "run_id": run_id, - "run": run_name, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "approve_run", - "username": str, - "run_id": run_id, - "run": run_name, - }, - { - "_id?": ObjectId, - "time": float, - "action": "purge_run", - "username": str, - "run_id": run_id, - "run": run_name, - "message": action_message, - }, - { - "_id?": ObjectId, - "time": float, - "action": "block_user", - "username": str, - "user": str, - "message": union("blocked", "unblocked"), - }, - { - "_id?": ObjectId, - "time": float, - "action": "accept_user", - "username": str, - "user": str, - "message": "accepted", - }, - { - "_id?": ObjectId, - "time": float, - "action": "block_worker", - "username": str, - "worker": short_worker_name, - "message": union("blocked", "unblocked"), - }, - { - "_id?": ObjectId, - "time": float, - "action": "log_message", - "username": str, - "message": action_message, - }, )