Skip to content

Commit

Permalink
Update and fix several issues with job_creator and job_watcher should…
Browse files Browse the repository at this point in the history
… be final now.
  • Loading branch information
Pasarus committed Feb 16, 2024
1 parent 22c230c commit ada3dfd
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 237 deletions.
55 changes: 10 additions & 45 deletions job_creator/jobcreator/database/db_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,67 +267,32 @@ def add_detected_run(

return int(reduction.id)

def add_completed_run(
self,
db_reduction_id: int,
reduction_inputs: Dict[str, Any],
state: State,
status_message: str,
output_files: List[str],
reduction_script: str,
script_sha: str,
reduction_start: str,
reduction_end: str,
) -> None:
def update_script(self, db_reduction_id: int, reduction_script: str, script_sha: str):
"""
This function submits data to the database from what is initially available on completed-runs message broker
station/topic
:param db_reduction_id: The ID for the reduction row in the reduction table
:param reduction_inputs: The inputs used in the reduction script by the IR-API
:param state: The state of how the run ended
:param status_message: The message that accompanies the state for how the state ended, if the state for
example was unsuccessful or an error, it would have the reason/error message.
:param output_files: The files output from the reduction job
:param reduction_script: The script used in the reduction
:param script_sha: The git sha of the script used in reduction
:param reduction_start: The time the pod running the reduction started working
:param reduction_end: The time the pod running the reduction stopped working
Updates the script tied to a reduction in the DB
:param db_reduction_id: The ID for the reduction to be updated
:param reduction_script: The contents of the script to be added
:param script_sha: The sha of that script
:return:
"""
logger.info(
"Submitting completed-run to the database: {id: %s, reduction_inputs: %s, state: %s, "
"status_message: %s, output_files: %s, reduction_script: %s}",
"Submitting script to the database: {db_reduction_id: %s, reduction_script: %s, script_sha: %s}",
db_reduction_id,
reduction_inputs,
str(state),
status_message,
output_files,
textwrap.shorten(reduction_script, width=10, placeholder="..."),
script_sha
)
with self.session_maker_func() as session:
script = session.query(Script).filter_by(script=reduction_script).first()
script = session.query(Script).filter_by(sha=script_sha).first()
if script is None:
script = Script(script=reduction_script, sha=script_sha)

reduction = session.query(Reduction).filter_by(id=db_reduction_id).one()
reduction.reduction_state = state
reduction.reduction_inputs = reduction_inputs
reduction.script = script
reduction.reduction_outputs = str(output_files)
reduction.reduction_status_message = status_message
reduction.reduction_start = reduction_start
reduction.reduction_end = reduction_end
session.commit()
logger.info(
"Submitted completed-run to the database successfully: {id: %s, reduction_inputs: %s, state: %s, "
"status_message: %s, output_files: %s, reduction_script: %s}",
"Submitted script to the database: {db_reduction_id: %s, reduction_script: %s, script_sha: %s}",
db_reduction_id,
reduction_inputs,
str(state),
status_message,
output_files,
textwrap.shorten(reduction_script, width=10, placeholder="..."),
script_sha
)


# pylint: enable=too-many-arguments, too-many-locals
71 changes: 1 addition & 70 deletions job_creator/jobcreator/job_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def spawn_job(self, job_name: str, script: str, job_namespace: str, user_id: str
service_account_name="jobwatcher",
containers=[main_container, watcher_container],
restart_policy="Never",
security_context=V1SecurityContext(run_as_user=int(user_id)),
tolerations=[V1Toleration(key="queue-worker", effect="NoSchedule", operator="Exists")],
volumes=[
V1Volume(name="archive-mount",
Expand All @@ -243,6 +242,7 @@ def spawn_job(self, job_name: str, script: str, job_namespace: str, user_id: str
"reduction-id": str(reduction_id),
"pvs": str(pv_names),
"pvcs": str(pvc_names),
"kubectl.kubernetes.io/default-container": main_container.name
}
)

Expand All @@ -252,73 +252,4 @@ def spawn_job(self, job_name: str, script: str, job_namespace: str, user_id: str
metadata=metadata,
spec=spec,
)

# job = client.V1Job(
# api_version="batch/v1",
# kind="Job",
# metadata={
# "name": job_name,
# "annotations": {
# "reduction-id": str(reduction_id),
# "pvs": pv_names,
# "pvcs": pvc_names,
# }
# },
# spec={
# "backoffLimit": 0,
# "ttlSecondsAfterFinished": 21600, # 6 hours
# "template": {
# "spec": {
# "security_context": {
# "runAsUser": user_id,
# },
# "containers": [
# {
# "name": job_name,
# "image": f"ghcr.io/interactivereduction/runner@sha256:{runner_sha}",
# "args": [script],
# "volumeMounts": [
# {"name": "archive-mount", "mountPath": "/archive"},
# {"name": "ceph-mount", "mountPath": "/output"},
# ],
# },
# {
# "name": "job-watcher",
# "image": f"ghcr.io/interactivereduction/jobwatcher@sha256:{self.watcher_sha}",
# "env": [
# {"name": "DB_IP", "value": db_ip},
# {"name": "DB_USERNAME", "value": db_username},
# {"name": "DB_PASSWORD", "value": db_password},
# {"name": "MAX_TIME_TO_COMPLETE_JOB", "value": str(max_time_to_complete_job)},
# {"name": "CONTAINER_NAME", "value": job_name},
# {"name": "JOB_NAME", "value": job_name},
# {"name": "POD_NAME", "value": job_name},
# ]
# }
# ],
# "restartPolicy": "Never",
# "tolerations": [{"key": "queue-worker", "effect": "NoSchedule", "operator": "Exists"}],
# "volumes": [
# {
# "name": "archive-mount",
# "persistentVolumeClaim": {"claimName": f"{job_name}-archive-pvc", "readOnly": True},
# },
# {
# "name": "ceph-mount",
# "persistentVolumeClaim": {"claimName": f"{job_name}-ceph-pvc", "readOnly": False}
# },
# ],
# },
# },
# },
# )
# if self.dev_mode:
# # Use an emptyDir instead of a ceph mount for the jobs, this will de deleted when the pod dies.
# job.spec["template"]["spec"]["volumes"][1] = {
# "name": "ceph-mount",
# "emptyDir": {
# "sizeLimit": "10000Mi"
# }
# }

client.BatchV1Api().create_namespaced_job(namespace=job_namespace, body=job)
1 change: 1 addition & 0 deletions job_creator/jobcreator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def process_message(message: Dict[str, Any]):
reduction_id=db_reduction_id,
instrument=instrument_name,
)
DB_UPDATER.update_script(db_reduction_id, script, script_sha)
ceph_mount_path = create_ceph_mount_path(instrument_name, rb_number)
JOB_CREATOR.spawn_job(
job_name=job_name,
Expand Down
2 changes: 1 addition & 1 deletion job_creator/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "jobcreator"
readme = "README.md"
version = "0.0.1"
dependencies = [
"kubernetes==25.3.0",
"kubernetes==29.0.0",
"psycopg2==2.9.6",
"SQLAlchemy==2.0.13",
"pika==1.3.2"
Expand Down
91 changes: 0 additions & 91 deletions job_watcher/jobwatcher/database/db_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,96 +176,6 @@ def __init__(self, ip: str, username: str, password: str):
self.session_maker_func = sessionmaker(bind=engine)

# pylint: disable=too-many-arguments, too-many-locals
def add_detected_run(
self,
filename: str,
title: str,
instrument_name: str,
users: str,
experiment_number: str,
run_start: str,
run_end: str,
good_frames: str,
raw_frames: str,
reduction_inputs: Dict[str, Any],
) -> int:
"""
This function submits data to the database from what is initially available on detected-runs message broker
station/topic
:param filename: the filename of the run that needs to be reduced
:param title: The title of the run file
:param instrument_name: The name of the instrument for the run
:param users: The users entered into the run file
:param experiment_number: The RB number of the run entered by users
:param run_start: The time at which the run started, created using the standard python time format.
:param run_end: The time at which the run ended, created using the standard python time format.
:param good_frames: The number of frames that are considered "good" in the file
:param raw_frames: The number of frames that are in the file
:param reduction_inputs: The inputs to be used by the reduction
:return: The id of the reduction row entry
"""
logger.info(
"Submitting detected-run to the database: {filename: %s, title: %s, instrument_name: %s, users: %s, "
"experiment_number: %s, run_start: %s, run_end: %s, good_frames: %s, raw_frames: %s, "
"reduction_inputs: %s}",
filename,
title,
instrument_name,
users,
experiment_number,
run_start,
run_end,
good_frames,
raw_frames,
reduction_inputs,
)
with self.session_maker_func() as session:
instrument = session.query(Instrument).filter_by(instrument_name=instrument_name).first()
if instrument is None:
instrument = Instrument(instrument_name=instrument_name)

run = Run(
filename=filename,
title=title,
users=users,
experiment_number=experiment_number,
run_start=run_start,
run_end=run_end,
good_frames=good_frames,
raw_frames=raw_frames,
)
run.instrument = instrument
reduction = Reduction(
reduction_start=None,
reduction_end=None,
reduction_state=State.NOT_STARTED,
reduction_inputs=reduction_inputs,
script_id=None,
reduction_outputs=None,
)
# Now create the run_reduction entry and add it
run_reduction = RunReduction(run_relationship=run, reduction_relationship=reduction)
session.add(run_reduction)
session.commit()

logger.info(
"Submitted detected-run to the database successfully: {filename: %s, title: %s, instrument_name: %s, "
"users: %s, experiment_number: %s, run_start: %s, run_end: %s, good_frames: %s, raw_frames: %s, "
"reduction_inputs: %s}",
filename,
title,
instrument_name,
users,
experiment_number,
run_start,
run_end,
good_frames,
raw_frames,
reduction_inputs,
)

return int(reduction.id)

def update_completed_run(
self,
db_reduction_id: int,
Expand Down Expand Up @@ -312,5 +222,4 @@ def update_completed_run(
output_files,
)


# pylint: enable=too-many-arguments, too-many-locals
Loading

0 comments on commit ada3dfd

Please sign in to comment.