Skip to content

Commit

Permalink
Add pipeline for fetching and transforming submission portal entry (#247
Browse files Browse the repository at this point in the history
)

* Add stub Dagster resource for accessing submission portal data API

This is just pulling from a hardcoded sample file
until the data API's authentication scheme is
worked out

* Add Dagster op to wrap data API call to get metadata

* Add Translator subclass for submission portal metadata

Currently this includes logic to generate a Study
object. Translating Biosample objects still to do.

* Use shared fake ID minting test fixture in translator tests

* Add translation of biosamples to SubmissionPortalTranslator

* Add tests for SubmissionPortalTranslator helper methods

* Add integration test case for submission portal translator

* Handle multivalued slots

* Add graph and job for submission portal metadata transformation

* Add ops for metadata submission and run status polling

* Split portal ingest graph into dry run and actual submit graphs

* Clean up empty strings and lists in submission portal translator

* Add docstrings for submission portal translator

* Update submission metadata graph names and add job descriptions

* Add validation step to submission ingest dry run job

* Get portal session cookie from launch config instead of environment

* Add smoke test for submission data translation job

* fix: hash checking is problematic

* regen reqs

* remove mongo dep from test container

* feat: no need for mongo in `test` image

* resolve

* finish merge

* Remove fields that should be null from submission portal translator expected output

* Pass submission id through op config in test

* style: black autofmt

* feat: better startup invocation

* Remove unnecessary raise_for_status calls

* fix: double type recognized

* test double-type fix

* style: refactor mgmt stuff

---------

Co-authored-by: Donny Winston <[email protected]>
  • Loading branch information
pkalita-lbl and dwinston authored May 24, 2023
1 parent 05e8906 commit 84267cf
Show file tree
Hide file tree
Showing 26 changed files with 14,593 additions and 173 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ DAGIT_HOST=http://dagster-dagit:3000
GOLD_API_BASE_URL=https://gold.jgi.doe.gov/rest/nmdc
GOLD_API_USERNAME=x
GOLD_API_PASSWORD=x

NMDC_PORTAL_API_BASE_URL=https://data-dev.microbiomedata.org/
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
- nmdc_runtime_mongo_data:/data/db
- ~/nmdcdb-mongodump:/nmdc_dump:ro
- ./tests/mongorestore-nmdc-testdb.sh:/mongorestore-nmdc-testdb.sh:ro
restart: always
restart: unless-stopped
environment:
MONGODB_ROOT_USER: admin
MONGODB_ROOT_PASSWORD: root
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
volumes:
- nmdc_runtime_mongo_data:/data/db
- ./tests:/app_tests
restart: always
restart: unless-stopped
environment:
MONGODB_ROOT_USER: admin
MONGODB_ROOT_PASSWORD: root
Expand Down
7 changes: 6 additions & 1 deletion nmdc_runtime/api/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ def load_changesheet(
for ix, value, ranges in list(df[["value", "ranges"]].itertuples()):
# TODO make this way more robust,
# i.e. detect a range with a https://w3id.org/linkml/base of "float".
if ranges.endswith("float") or ranges.endswith("decimal degree"):
# TODO mongo BSON has a decimal type. Should use this for decimals!
if (
ranges.endswith("float")
or ranges.endswith("double")
or ranges.endswith("decimal degree")
):
df.at[ix, "value"] = float(value)
return df

Expand Down
24 changes: 12 additions & 12 deletions nmdc_runtime/api/endpoints/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ def request_run(
)


def _get_run_summary(run_id, mdb):
def _get_run_summary(run_id, mdb) -> RunSummary:
events_in_order = list(mdb.run_events.find({"run.id": run_id}, sort=[("time", 1)]))
raise404_if_none(events_in_order or None)
# TODO put relevant outputs in outputs! (for get_study_metadata job)
return {
"id": run_id,
"status": events_in_order[-1]["type"],
"started_at_time": events_in_order[0]["time"],
"was_started_by": events_in_order[0]["producer"],
"inputs": list(concat(e["inputs"] for e in events_in_order)),
"outputs": list(concat(e["outputs"] for e in events_in_order)),
"job": events_in_order[-1]["job"],
"producer": events_in_order[-1]["producer"],
"schemaURL": events_in_order[-1]["schemaURL"],
}
return RunSummary(
id=run_id,
status=events_in_order[-1]["type"],
started_at_time=events_in_order[0]["time"],
was_started_by=events_in_order[0]["producer"],
inputs=list(concat(e["inputs"] for e in events_in_order)),
outputs=list(concat(e["outputs"] for e in events_in_order)),
job=events_in_order[-1]["job"],
producer=events_in_order[-1]["producer"],
schemaURL=events_in_order[-1]["schemaURL"],
)


@router.get(
Expand Down
116 changes: 62 additions & 54 deletions nmdc_runtime/api/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from contextlib import asynccontextmanager
from importlib import import_module

import pkg_resources
Expand Down Expand Up @@ -223,59 +224,8 @@
},
]

app = FastAPI(
title="NMDC Runtime API",
version="0.1.0",
description=(
"This is a draft of the NMDC Runtime API."
" The resource layout currently covers aspects of workflow execution and automation,"
" and is intended to facilitate discussion as more of the API is developed."
"\n\n"
"Dependency versions:\n\n"
f'nmdc-schema={pkg_resources.get_distribution("nmdc_schema").version}'
),
openapi_tags=tags_metadata,
)
app.include_router(api_router)


app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"(http://localhost:\d+)|(https://.+?\.microbiomedata\.org)",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


@app.on_event("startup")
async def ensure_default_api_perms():
db = get_mongo_db()
if db["_runtime.api.allow"].count_documents({}):
return

allowed = {
"/metadata/changesheets:submit": [
"mam",
"dwinston",
"pajau",
"montana",
"spatil",
],
"/queries:run(query_cmd:DeleteCommand)": ["scanon", "dwinston"],
}
for doc in [
{"username": username, "action": action}
for action, usernames in allowed.items()
for username in usernames
]:
db["_runtime.api.allow"].replace_one(doc, doc, upsert=True)
db["_runtime.api.allow"].create_index("username")
db["_runtime.api.allow"].create_index("action")


@app.on_event("startup")
async def ensure_initial_resources_on_boot():
def ensure_initial_resources_on_boot():
"""ensure these resources are loaded when (re-)booting the system."""
mdb = get_mongo_db()

Expand Down Expand Up @@ -333,8 +283,7 @@ async def ensure_initial_resources_on_boot():
minter_bootstrap()


@app.on_event("startup")
async def ensure_attribute_indexes():
def ensure_attribute_indexes():
mdb = get_mongo_db()
for collection_name, index_specs in entity_attributes_to_index.items():
for spec in index_specs:
Expand All @@ -346,5 +295,64 @@ async def ensure_attribute_indexes():
mdb[collection_name].create_index([(spec, 1)], name=spec, background=True)


def ensure_default_api_perms():
db = get_mongo_db()
if db["_runtime.api.allow"].count_documents({}):
return

allowed = {
"/metadata/changesheets:submit": [
"mam",
"dwinston",
"pajau",
"montana",
"spatil",
],
"/queries:run(query_cmd:DeleteCommand)": ["scanon", "dwinston"],
}
for doc in [
{"username": username, "action": action}
for action, usernames in allowed.items()
for username in usernames
]:
db["_runtime.api.allow"].replace_one(doc, doc, upsert=True)
db["_runtime.api.allow"].create_index("username")
db["_runtime.api.allow"].create_index("action")


@asynccontextmanager
async def lifespan(app: FastAPI):
ensure_initial_resources_on_boot()
ensure_attribute_indexes()
ensure_default_api_perms()
yield


app = FastAPI(
title="NMDC Runtime API",
version="0.1.0",
description=(
"This is a draft of the NMDC Runtime API."
" The resource layout currently covers aspects of workflow execution and automation,"
" and is intended to facilitate discussion as more of the API is developed."
"\n\n"
"Dependency versions:\n\n"
f'nmdc-schema={pkg_resources.get_distribution("nmdc_schema").version}'
),
openapi_tags=tags_metadata,
lifespan=lifespan,
)
app.include_router(api_router)


app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"(http://localhost:\d+)|(https://.+?\.microbiomedata\.org)",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
20 changes: 14 additions & 6 deletions nmdc_runtime/api/models/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
import os
from functools import lru_cache
from typing import List, Optional
Expand Down Expand Up @@ -43,9 +44,16 @@ class Run(BaseModel):
facets: Optional[dict]


class RunEventType(str, Enum):
REQUESTED = "REQUESTED"
STARTED = "STARTED"
FAIL = "FAIL"
COMPLETE = "COMPLETE"


class RunSummary(OpenLineageBase):
id: str
status: str
status: RunEventType
started_at_time: str
was_started_by: str
inputs: List[str]
Expand All @@ -56,7 +64,7 @@ class RunSummary(OpenLineageBase):
class RunEvent(OpenLineageBase):
run: Run
job: JobSummary
type: str
type: RunEventType
time: str
inputs: Optional[List[str]] = []
outputs: Optional[List[str]] = []
Expand All @@ -81,7 +89,7 @@ def _add_run_requested_event(run_spec: RunUserSpec, mdb: MongoDatabase, user: Us
pick(["id", "description"], job),
{"producer": PRODUCER_URL, "schemaURL": SCHEMA_URL},
),
type="REQUESTED",
type=RunEventType.REQUESTED,
time=now(as_str=True),
inputs=run_spec.inputs,
)
Expand All @@ -103,7 +111,7 @@ def _add_run_started_event(run_id: str, mdb: MongoDatabase):
schemaURL=SCHEMA_URL,
run=requested.run,
job=requested.job,
type="STARTED",
type=RunEventType.STARTED,
time=now(as_str=True),
).dict()
)
Expand All @@ -124,7 +132,7 @@ def _add_run_fail_event(run_id: str, mdb: MongoDatabase):
schemaURL=SCHEMA_URL,
run=requested.run,
job=requested.job,
type="FAIL",
type=RunEventType.FAIL,
time=now(as_str=True),
).dict()
)
Expand All @@ -145,7 +153,7 @@ def _add_run_complete_event(run_id: str, mdb: MongoDatabase, outputs: List[str])
schemaURL=SCHEMA_URL,
run=started.run,
job=started.job,
type="COMPLETE",
type=RunEventType.COMPLETE,
time=now(as_str=True),
outputs=outputs,
).dict()
Expand Down
26 changes: 26 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
gold_analysis_projects_by_study,
gold_projects_by_study,
gold_study,
poll_for_run_completion,
run_etl,
local_file_to_api_object,
get_operation,
Expand All @@ -20,6 +21,7 @@
filter_ops_done_object_puts,
hello,
mongo_stats,
submit_metadata_to_db,
update_schema,
filter_ops_undone_expired,
construct_jobs,
Expand All @@ -30,6 +32,9 @@
perform_mongo_updates,
add_output_run_event,
gold_biosamples_by_study,
fetch_nmdc_portal_submission_by_id,
translate_portal_submission_to_nmdc_schema_database,
validate_metadata,
)


Expand Down Expand Up @@ -120,3 +125,24 @@ def gold_study_to_database():

outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)


@graph
def translate_metadata_submission_to_nmdc_schema_database():
metadata_submission = fetch_nmdc_portal_submission_by_id()
database = translate_portal_submission_to_nmdc_schema_database(metadata_submission)

validate_metadata(database)

database_dict = nmdc_schema_object_to_dict(database)
filename = nmdc_schema_database_export_filename(metadata_submission)
outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)


@graph
def ingest_metadata_submission():
metadata_submission = fetch_nmdc_portal_submission_by_id()
database = translate_portal_submission_to_nmdc_schema_database(metadata_submission)
run_id = submit_metadata_to_db(database)
poll_for_run_completion(run_id)
Loading

0 comments on commit 84267cf

Please sign in to comment.