Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Feb 3, 2025
1 parent d36e6d3 commit e900a60
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 32 deletions.
3 changes: 0 additions & 3 deletions examples/example_standard_scripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
name: bps_panda_submit_script
handler: lsst.cmservice.handlers.jobs.PandaScriptHandler
data:
wms: panda
bps_wms_yaml_file: "${CTRL_BPS_PANDA_DIR}/config/bps_usdf.yaml"
# Run a bps report script
- SpecBlock:
Expand All @@ -59,8 +58,6 @@
- SpecBlock:
name: bps_htcondor_submit_script
handler: lsst.cmservice.handlers.jobs.HTCondorScriptHandler
data:
wms: htcondor
# Run a bps report script
- SpecBlock:
name: bps_htcondor_report_script
Expand Down
11 changes: 9 additions & 2 deletions src/lsst/cmservice/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class WmsMethodEnum(enum.Enum):
panda = 1
Runs under PanDA
ht_condor = 2
htcondor = 2
Runs under HTCondor
More methods to come...
Expand All @@ -257,4 +257,11 @@ class WmsMethodEnum(enum.Enum):
default = -1
bash = 0
panda = 1
ht_condor = 2
htcondor = 2


class WmsComputeSite(enum.Enum):
"""Define a potential compute site"""

default = -1
usdf = 1
22 changes: 21 additions & 1 deletion src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from .common.enums import ScriptMethodEnum, StatusEnum
from .common.enums import ScriptMethodEnum, StatusEnum, WmsComputeSite

__all__ = ["Configuration", "config"]

Expand Down Expand Up @@ -373,6 +373,11 @@ class Configuration(BaseSettings):
default=ScriptMethodEnum.htcondor,
)

compute_site: WmsComputeSite = Field(
description="The default WMS compute site",
default=WmsComputeSite.usdf,
)

mock_status: StatusEnum | None = Field(
description="A fake status to return from all operations",
default=None,
Expand All @@ -389,6 +394,7 @@ def validate_mock_status_by_name(cls, value: str | StatusEnum) -> StatusEnum | N
warn(f"Invalid mock status ({value}) provided to config, using default.")
return None

# TODO refactor these identical field validators with type generics
@field_validator("script_handler", mode="before")
@classmethod
def validate_script_method_by_name(cls, value: str | ScriptMethodEnum) -> ScriptMethodEnum:
Expand All @@ -403,6 +409,20 @@ def validate_script_method_by_name(cls, value: str | ScriptMethodEnum) -> Script
warn(f"Invalid script handler ({value}) provided to config, using default.")
return ScriptMethodEnum.htcondor

@field_validator("compute_site", mode="before")
@classmethod
def validate_compute_site_by_name(cls, value: str | WmsComputeSite) -> WmsComputeSite:
"""Use a string value to resolve an enum by its name, falling back to
the default value if an invalid input is provided.
"""
if isinstance(value, WmsComputeSite):
return value
try:
return WmsComputeSite[value]
except KeyError:
warn(f"Invalid script handler ({value}) provided to config, using default.")
return WmsComputeSite.usdf


config = Configuration()
"""Configuration for cm-service."""
55 changes: 29 additions & 26 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,41 @@ class BpsScriptHandler(ScriptHandler):
`parent.collections['run']`
"""

wms_method = WmsMethodEnum.default

async def _write_script(
self,
session: async_scoped_session,
script: Script,
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
resolved_cols = await script.resolve_collections(session)

# Resolve mandatory data element inputs. All of these values must be
# provided somewhere along the SpecBlock chain.
try:
prod_area = os.path.expandvars(data_dict["prod_area"])
butler_repo = os.path.expandvars(data_dict["butler_repo"])
lsst_version = os.path.expandvars(data_dict["lsst_version"])
lsst_distrib_dir = os.path.expandvars(data_dict["lsst_distrib_dir"])
pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"])
run_coll = resolved_cols["run"]
input_colls = resolved_cols["inputs"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg

# optional stuff from data_dict
rescue = data_dict.get("rescue", False)
skip_colls = data_dict.get("skip_colls", "")
custom_lsst_setup = data_dict.get("custom_lsst_setup", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
data_query = data_dict.get("data_query", None)
extra_qgraph_options = data_dict.get("extra_qgraph_options", None)
# workflow_config is the values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["lsst_version"] = lsst_version
workflow_config["lsst_distrib_dir"] = lsst_distrib_dir
workflow_config["wms"] = self.wms_method.name
workflow_config["compute_site"] = data_dict.get("compute_site", self.default_compute_site.name)
workflow_config["custom_lsst_setup"] = data_dict.get("custom_lsst_setup", None)
workflow_config["extra_qgraph_options"] = data_dict.get("extra_qgraph_options", None)

# Get the output file paths
script_url = await self._set_script_files(session, script, prod_area)
Expand All @@ -105,27 +112,23 @@ async def _write_script(
# build up the bps wrapper script
command = f"{config.bps.bps_bin} --log-file {json_url} --no-log-tty submit {config_path} > {log_url}"

await write_bash_script(script_url, command, values=data_dict)
await write_bash_script(script_url, command, values=workflow_config)

# Collect values for and render bps submit yaml from template
await session.refresh(parent, attribute_names=["c_", "p_"])
# FIXME at this point, how could the following path *not* exist?
# is this meant to be `config_url` instead?
await Path(script_url).parent.mkdir(parents=True, exist_ok=True)

# workflow_config becomes values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore
workflow_config["submit_path"] = str(submit_path)
workflow_config["lsst_version"] = os.path.expandvars(lsst_version)
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["custom_lsst_setup"] = custom_lsst_setup
workflow_config["extra_qgraph_options"] = extra_qgraph_options
workflow_config["extra_yaml_literals"] = []

include_configs = []
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]:
if to_include_:
# We want abspaths, but we need to be careful about
Expand Down Expand Up @@ -160,14 +163,14 @@ async def _write_script(
in_collection = input_colls

payload = {
"payloadName": parent.c_.name, # type: ignore
"butlerConfig": butler_repo,
"outputRun": run_coll,
"inCollection": in_collection,
"name": parent.c_.name, # type: ignore
"butler_config": butler_repo,
"output_run_collection": run_coll,
"input_collection": in_collection,
"data_query": data_dict.get("data_query", None),
}
if data_query:
payload["dataQuery"] = data_query.replace("\n", " ").strip()
if rescue: # pragma: no cover
if data_dict.get("rescue", False): # pragma: no cover
skip_colls = data_dict.get("skip_colls", "")
payload["extra_args"] = f"--skip-existing-in {skip_colls}"

workflow_config["payload"] = payload
Expand Down Expand Up @@ -425,7 +428,7 @@ def get_job_id(cls, bps_dict: dict) -> str:
class HTCondorScriptHandler(BpsScriptHandler):
"""Class to handle running Bps for ht_condor jobs"""

wms_method = WmsMethodEnum.ht_condor
wms_method = WmsMethodEnum.htcondor

@classmethod
def get_job_id(cls, bps_dict: dict) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class ScriptHandler(BaseScriptHandler):
"""SubClass of Handler to deal with script operations using real scripts"""

default_method = config.script_handler
default_compute_site = config.compute_site

@staticmethod
async def _check_stamp_file(
Expand Down

0 comments on commit e900a60

Please sign in to comment.