Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Feb 3, 2025
1 parent d36e6d3 commit 09beb21
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
3 changes: 0 additions & 3 deletions examples/example_standard_scripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
name: bps_panda_submit_script
handler: lsst.cmservice.handlers.jobs.PandaScriptHandler
data:
wms: panda
bps_wms_yaml_file: "${CTRL_BPS_PANDA_DIR}/config/bps_usdf.yaml"
# Run a bps report script
- SpecBlock:
Expand All @@ -59,8 +58,6 @@
- SpecBlock:
name: bps_htcondor_submit_script
handler: lsst.cmservice.handlers.jobs.HTCondorScriptHandler
data:
wms: htcondor
# Run a bps report script
- SpecBlock:
name: bps_htcondor_report_script
Expand Down
4 changes: 2 additions & 2 deletions src/lsst/cmservice/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class WmsMethodEnum(enum.Enum):
panda = 1
Runs under PanDA
ht_condor = 2
htcondor = 2
Runs under HTCondor
More methods to come...
Expand All @@ -257,4 +257,4 @@ class WmsMethodEnum(enum.Enum):
default = -1
bash = 0
panda = 1
ht_condor = 2
htcondor = 2
43 changes: 23 additions & 20 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class BpsScriptHandler(ScriptHandler):
`parent.collections['run']`
"""

wms_method = WmsMethodEnum.default

async def _write_script(
self,
session: async_scoped_session,
Expand All @@ -67,25 +69,29 @@ async def _write_script(
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)

# Resolve mandatory data element inputs. All of these values must be
# provided somewhere along the SpecBlock chain.
try:
prod_area = os.path.expandvars(data_dict["prod_area"])
butler_repo = os.path.expandvars(data_dict["butler_repo"])
lsst_version = os.path.expandvars(data_dict["lsst_version"])
lsst_distrib_dir = os.path.expandvars(data_dict["lsst_distrib_dir"])
pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"])
run_coll = resolved_cols["run"]
input_colls = resolved_cols["inputs"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg

# optional stuff from data_dict
rescue = data_dict.get("rescue", False)
skip_colls = data_dict.get("skip_colls", "")
custom_lsst_setup = data_dict.get("custom_lsst_setup", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
data_query = data_dict.get("data_query", None)
extra_qgraph_options = data_dict.get("extra_qgraph_options", None)
# workflow_config is the values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["wms"] = self.wms_method.name
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["custom_lsst_setup"] = data_dict.get("custom_lsst_setup", None)
workflow_config["extra_qgraph_options"] = data_dict.get("extra_qgraph_options", None)
workflow_config["lsst_version"] = lsst_version
workflow_config["lsst_distrib_dir"] = lsst_distrib_dir

# Get the output file paths
script_url = await self._set_script_files(session, script, prod_area)
Expand All @@ -105,27 +111,23 @@ async def _write_script(
# build up the bps wrapper script
command = f"{config.bps.bps_bin} --log-file {json_url} --no-log-tty submit {config_path} > {log_url}"

await write_bash_script(script_url, command, values=data_dict)
await write_bash_script(script_url, command, values=workflow_config)

# Collect values for and render bps submit yaml from template
await session.refresh(parent, attribute_names=["c_", "p_"])
# FIXME at this point, how could the following path *not* exist?
# is this meant to be `config_url` instead?
await Path(script_url).parent.mkdir(parents=True, exist_ok=True)

# workflow_config becomes values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore
workflow_config["submit_path"] = str(submit_path)
workflow_config["lsst_version"] = os.path.expandvars(lsst_version)
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["custom_lsst_setup"] = custom_lsst_setup
workflow_config["extra_qgraph_options"] = extra_qgraph_options
workflow_config["extra_yaml_literals"] = []

include_configs = []
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]:
if to_include_:
# We want abspaths, but we need to be careful about
Expand Down Expand Up @@ -165,9 +167,10 @@ async def _write_script(
"outputRun": run_coll,
"inCollection": in_collection,
}
if data_query:
if data_query := data_dict.get("data_query", None):
payload["dataQuery"] = data_query.replace("\n", " ").strip()
if rescue: # pragma: no cover
if data_dict.get("rescue", False): # pragma: no cover
skip_colls = data_dict.get("skip_colls", "")
payload["extra_args"] = f"--skip-existing-in {skip_colls}"

workflow_config["payload"] = payload
Expand Down Expand Up @@ -425,7 +428,7 @@ def get_job_id(cls, bps_dict: dict) -> str:
class HTCondorScriptHandler(BpsScriptHandler):
"""Class to handle running Bps for ht_condor jobs"""

wms_method = WmsMethodEnum.ht_condor
wms_method = WmsMethodEnum.htcondor

@classmethod
def get_job_id(cls, bps_dict: dict) -> str:
Expand Down

0 comments on commit 09beb21

Please sign in to comment.