Skip to content

Commit

Permalink
Merge pull request #251 from microbiomedata/neon-ingest-pipeline
Browse files Browse the repository at this point in the history
NEON metadata ingest pipeline
  • Loading branch information
sujaypatil96 authored Aug 30, 2023
2 parents 8f1a736 + 6bd168a commit ab63cdd
Show file tree
Hide file tree
Showing 10 changed files with 2,142 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ GOLD_API_USERNAME=x
GOLD_API_PASSWORD=x

NMDC_PORTAL_API_BASE_URL=https://data-dev.microbiomedata.org/

NEON_API_TOKEN=y
NEON_API_BASE_URL=https://data.neonscience.org/api/v0
35 changes: 35 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
fetch_nmdc_portal_submission_by_id,
translate_portal_submission_to_nmdc_schema_database,
validate_metadata,
neon_data_by_product,
nmdc_schema_database_from_neon_data,
nmdc_schema_database_export_filename_neon,
get_neon_pipeline_mms_data_product,
get_neon_pipeline_sls_data_product
)


Expand Down Expand Up @@ -146,3 +151,33 @@ def ingest_metadata_submission():
database = translate_portal_submission_to_nmdc_schema_database(metadata_submission)
run_id = submit_metadata_to_db(database)
poll_for_run_completion(run_id)


@graph
def translate_neon_api_metadata_to_nmdc_schema_database():
mms_data_product = get_neon_pipeline_mms_data_product()
sls_data_product = get_neon_pipeline_sls_data_product()

mms_data = neon_data_by_product(mms_data_product)
sls_data = neon_data_by_product(sls_data_product)

database = nmdc_schema_database_from_neon_data(mms_data, sls_data)

database_dict = nmdc_schema_object_to_dict(database)
filename = nmdc_schema_database_export_filename_neon()

outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)


@graph
def ingest_neon_metadata():
mms_data_product = get_neon_pipeline_mms_data_product()
sls_data_product = get_neon_pipeline_sls_data_product()

mms_data = neon_data_by_product(mms_data_product)
sls_data = neon_data_by_product(sls_data_product)

database = nmdc_schema_database_from_neon_data(mms_data, sls_data)
run_id = submit_metadata_to_db(database)
poll_for_run_completion(run_id)
84 changes: 73 additions & 11 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime, timezone
from io import BytesIO
from zipfile import ZipFile
import pandas as pd

from bson import ObjectId, json_util
from dagster import (
Expand Down Expand Up @@ -56,8 +57,10 @@
GoldApiClient,
RuntimeApiSiteClient,
RuntimeApiUserClient,
NeonApiClient
)
from nmdc_runtime.site.translation.gold_translator import GoldStudyTranslator
from nmdc_runtime.site.translation.neon_translator import NeonDataTranslator
from nmdc_runtime.site.translation.submission_portal_translator import (
SubmissionPortalTranslator,
)
Expand Down Expand Up @@ -586,23 +589,23 @@ def get_gold_study_pipeline_inputs(context: OpExecutionContext) -> str:

@op(required_resource_keys={"gold_api_client"})
def gold_biosamples_by_study(
context: OpExecutionContext, study_id: str
context: OpExecutionContext, study_id: str
) -> List[Dict[str, Any]]:
client: GoldApiClient = context.resources.gold_api_client
return client.fetch_biosamples_by_study(study_id)


@op(required_resource_keys={"gold_api_client"})
def gold_projects_by_study(
context: OpExecutionContext, study_id: str
context: OpExecutionContext, study_id: str
) -> List[Dict[str, Any]]:
client: GoldApiClient = context.resources.gold_api_client
return client.fetch_projects_by_study(study_id)


@op(required_resource_keys={"gold_api_client"})
def gold_analysis_projects_by_study(
context: OpExecutionContext, study_id: str
context: OpExecutionContext, study_id: str
) -> List[Dict[str, Any]]:
client: GoldApiClient = context.resources.gold_api_client
return client.fetch_analysis_projects_by_study(study_id)
Expand All @@ -616,11 +619,11 @@ def gold_study(context: OpExecutionContext, study_id: str) -> Dict[str, Any]:

@op(required_resource_keys={"runtime_api_site_client"})
def nmdc_schema_database_from_gold_study(
context: OpExecutionContext,
study: Dict[str, Any],
projects: List[Dict[str, Any]],
biosamples: List[Dict[str, Any]],
analysis_projects: List[Dict[str, Any]],
context: OpExecutionContext,
study: Dict[str, Any],
projects: List[Dict[str, Any]],
biosamples: List[Dict[str, Any]],
analysis_projects: List[Dict[str, Any]],
) -> nmdc.Database:
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client

Expand All @@ -647,8 +650,8 @@ def fetch_nmdc_portal_submission_by_id(context: OpExecutionContext) -> Dict[str,

@op(required_resource_keys={"runtime_api_site_client"})
def translate_portal_submission_to_nmdc_schema_database(
context: OpExecutionContext,
metadata_submission: Dict[str, Any],
context: OpExecutionContext,
metadata_submission: Dict[str, Any],
) -> nmdc.Database:
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client

Expand Down Expand Up @@ -678,7 +681,7 @@ def nmdc_schema_object_to_dict(object: YAMLRoot) -> Dict[str, Any]:

@op(required_resource_keys={"mongo"}, config_schema={"username": str})
def export_json_to_drs(
context: OpExecutionContext, data: Dict, filename: str, description: str = ""
context: OpExecutionContext, data: Dict, filename: str, description: str = ""
) -> List[str]:
mdb = context.resources.mongo.db
username = context.op_config.get("username")
Expand Down Expand Up @@ -711,3 +714,62 @@ def export_json_to_drs(

def unique_field_values(docs: List[Dict[str, Any]], field: str):
return {doc[field] for doc in docs if field in doc}


@op(config_schema={"mms_data_product": dict})
def get_neon_pipeline_mms_data_product(context: OpExecutionContext) -> dict:
return context.op_config["mms_data_product"]


@op(config_schema={"sls_data_product": dict})
def get_neon_pipeline_sls_data_product(context: OpExecutionContext) -> dict:
return context.op_config["sls_data_product"]


@op(required_resource_keys={"neon_api_client"})
def neon_data_by_product(
context: OpExecutionContext, data_product: dict
) -> Dict[str, pd.DataFrame]:
df_dict = {}
client: NeonApiClient = context.resources.neon_api_client

product_id = data_product["product_id"]
product_tables = data_product["product_tables"]

product_table_list = [t.strip() for t in product_tables.split(",")]
product = client.fetch_product_by_id(product_id)
for table_name in product_table_list:
df = pd.DataFrame()
for site in product["data"]["siteCodes"]:
for data_url in site["availableDataUrls"]:
data_files = client.request(data_url)
for file in data_files["data"]["files"]:
if table_name in file["name"] and "expanded" in file["name"]:
current_df = pd.read_csv(file['url'])
df = pd.concat([df, current_df], ignore_index=True)
df_dict[table_name] = df

return df_dict


@op(required_resource_keys={"runtime_api_site_client"})
def nmdc_schema_database_from_neon_data(
context: OpExecutionContext,
mms_data: Dict[str, pd.DataFrame],
sls_data: Dict[str, pd.DataFrame]
) -> nmdc.Database:
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client

def id_minter(*args, **kwargs):
response = client.mint_id(*args, **kwargs)
return response.json()

translator = NeonDataTranslator(mms_data, sls_data, id_minter=id_minter)

database = translator.get_database()
return database


@op
def nmdc_schema_database_export_filename_neon() -> str:
return "database_from_neon_metadata.json"
75 changes: 75 additions & 0 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
apply_changesheet,
apply_metadata_in,
hello_graph,
translate_neon_api_metadata_to_nmdc_schema_database,
ingest_neon_metadata,
)
from nmdc_runtime.site.resources import (
get_mongo,
runtime_api_site_client_resource,
runtime_api_user_client_resource,
nmdc_portal_api_client_resource,
gold_api_client_resource,
neon_api_client_resource,
terminus_resource,
mongo_resource,
)
Expand All @@ -58,6 +61,7 @@
"runtime_api_user_client": runtime_api_user_client_resource,
"nmdc_portal_api_client": nmdc_portal_api_client_resource,
"gold_api_client": gold_api_client_resource,
"neon_api_client": neon_api_client_resource,
"terminus": terminus_resource,
"mongo": mongo_resource,
}
Expand Down Expand Up @@ -544,6 +548,77 @@ def biosample_submission_ingest():
},
},
),
translate_neon_api_metadata_to_nmdc_schema_database.to_job(
description="This job fetches the metadata associated with a given NEON data product code and translates it into an equivalent nmdc:Database object. The object is serialized to JSON and stored in DRS. This can be considered a dry-run for the `ingest_neon_metadata` job.",
resource_defs=resource_defs,
config={
"resources": merge(
unfreeze(normal_resources),
{
"neon_api_client": {
"config": {
"base_url": {"env": "NEON_API_BASE_URL"},
"api_token": {"env": "NEON_API_TOKEN"},
},
}
},
),
"ops": {
"get_neon_pipeline_mms_data_product": {
"config": {
"mms_data_product": {
"product_id": "DP1.10107.001",
"product_tables": "mms_metagenomeDnaExtraction, mms_metagenomeSequencing",
}
}
},
"get_neon_pipeline_sls_data_product": {
"config": {
"sls_data_product": {
"product_id": "DP1.10086.001",
"product_tables": "sls_metagenomicsPooling, sls_soilCoreCollection, sls_soilChemistry, sls_soilMoisture, sls_soilpH, ntr_externalLab, ntr_internalLab",
}
}
},
"export_json_to_drs": {"config": {"username": ""}},
},
},
),
ingest_neon_metadata.to_job(
description="This job fetches the metadata associated with a given data product code and translates it into an equivalent nmdc:Database object. This object is validated and ingested into Mongo via a `POST /metadata/json:submit` request.",
resource_defs=resource_defs,
config={
"resources": merge(
unfreeze(normal_resources),
{
"neon_api_client": {
"config": {
"base_url": {"env": "NEON_API_BASE_URL"},
"api_token": {"env": "NEON_API_TOKEN"},
},
}
},
),
"ops": {
"get_neon_pipeline_mms_data_product": {
"config": {
"mms_data_product": {
"product_id": "DP1.10107.001",
"product_tables": "mms_metagenomeDnaExtraction, mms_metagenomeSequencing",
}
}
},
"get_neon_pipeline_sls_data_product": {
"config": {
"sls_data_product": {
"product_id": "DP1.10086.001",
"product_tables": "sls_metagenomicsPooling, sls_soilCoreCollection, sls_soilChemistry, sls_soilMoisture, sls_soilpH, ntr_externalLab, ntr_internalLab",
}
}
},
},
},
),
]


Expand Down
33 changes: 33 additions & 0 deletions nmdc_runtime/site/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Dict, List, Optional, Union

import requests
import requests_cache
from requests.auth import HTTPBasicAuth
from dagster import (
build_init_resource_context,
Expand Down Expand Up @@ -193,6 +194,7 @@ def claim_job(self, job_id):

def mint_id(self, schema_class, how_many=1):
body = {"schema_class": {"id": schema_class}, "how_many": how_many}
print(body)
return self.request("POST", "/pids/mint", body)


Expand Down Expand Up @@ -336,6 +338,37 @@ def nmdc_portal_api_client_resource(context: InitResourceContext):
)


@dataclass
class NeonApiClient:

base_url: str
api_token: str
session = requests_cache.CachedSession("neon_cache")

def request(self, url):
response = self.session.get(url, headers={
'X-API-Token': self.api_token
})
response.raise_for_status()
return response.json()

def fetch_product_by_id(self, product_id: str):
return self.request(self.base_url + f"/products/{product_id}")


@resource(
config_schema={
"base_url": StringSource,
"api_token": StringSource
}
)
def neon_api_client_resource(context: InitResourceContext):
return NeonApiClient(
base_url=context.resource_config["base_url"],
api_token=context.resource_config["api_token"]
)


class MongoDB:
def __init__(
self,
Expand Down
Loading

0 comments on commit ab63cdd

Please sign in to comment.