Skip to content

Commit

Permalink
add mode class
Browse files Browse the repository at this point in the history
  • Loading branch information
vshand11 committed Dec 20, 2023
1 parent b611575 commit aa07c3c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 52 deletions.
3 changes: 3 additions & 0 deletions src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
NODE_TYPE = "nodeType"
S3_BUCKET_DIR = "s3_bucket_drive"
FILE_ERRORS = "fileErrors"
FILE_NAME_FIELD = "name-field"
FILE_SIZE_FIELD = "size-field"
FILE_MD5_FIELD = "md5-field"

#data model
DATA_COMMON = "data_commons"
Expand Down
35 changes: 35 additions & 0 deletions src/common/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from common.constants import IDS, NODES_LABEL, MODEL

class DataModel:
def __init__(self, model):
self.model = model

# model connivent functions
"""
get model id fields in the given model
"""
def get_model_ids(self):
return self.model.get(IDS, None)

"""
get id field of a given node in the model
"""
def get_node_id(self, node):
return self.model[MODEL][NODES_LABEL][node].get("id_property", None)


"""
get properties of a node in the model
"""
def get_node_props(self, node):
if self.model[MODEL][NODES_LABEL].get(node):
return self.model[MODEL][NODES_LABEL][node].get("properties", None)

"""
get required properties of a node in the model
"""
def get_node_req_props(self, node):
props = self.get_node_props(self.model, node)
if not props:
return None
return {k: v for (k, v) in props.items() if v.get("required") == True}
43 changes: 8 additions & 35 deletions src/common/model_store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import json
import glob
from bento.common.utils import get_logger
from common.model_reader import YamlModelParser
from common.constants import DATA_COMMON, IDS, NODES_LABEL, MODELS_DEFINITION_FILE, MODEL
from common.model import DataModel
from common.constants import DATA_COMMON, IDS, MODELS_DEFINITION_FILE, MODEL
from common.utils import download_file_to_dict


Expand Down Expand Up @@ -43,41 +42,15 @@ def __init__(self, model_def_loc, tier):
model_reader.model.update({DEF_FILE_NODES: v[DEF_SEMANTICS][DEF_FILE_NODES]})
self.models.append({MODEL: model_reader.model, IDS: model_reader.id_fields})





"""
get model by data common
"""
def get_model_by_data_common(self, data_common):
return next((x for x in self.models if x[MODEL][DATA_COMMON] == data_common.upper()), None)
model = next((x for x in self.models if x[MODEL][DATA_COMMON] == data_common.upper()), None)
return DataModel(model)


# model connivent functions
"""
get model id fields in the given model
"""
def get_model_ids(self, model):
return model.get(IDS, None)

"""
get id field of a given node in the model
"""
def get_node_id(self, model, node):
if model.get(IDS):
return model[MODEL][NODES_LABEL][node].get("id_property", None)
return None

"""
get properties of a node in the model
"""
def get_node_props(self, model, node):
if model[MODEL][NODES_LABEL].get(node):
return model[MODEL][NODES_LABEL][node].get("properties", None)

"""
get required properties of a node in the model
"""
def get_node_req_props(self, model, node):
props = self.get_node_props(model, node)
if not props:
return None
return {k: v for (k, v) in props.items() if v.get("required") == True}

18 changes: 9 additions & 9 deletions src/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
from common.utils import get_uuid_str, current_datetime_str, get_exception_msg
from common.constants import MODEL, ID_PROPERTY, TYPE, ID, SUBMISSION_ID, FILE_STATUS, STATUS_NEW, \
ERRORS, WARNINGS, BATCH_CREATED, UPDATED_AT, BATCH_INTENTION, S3_FILE_INFO, FILE_NAME, \
MD5, NODES_LABEL, INTENTION_NEW, INTENTION_UPDATE, INTENTION_DELETE, SIZE, NODES_LABEL
MD5, INTENTION_NEW, INTENTION_UPDATE, INTENTION_DELETE, SIZE, \
FILE_NAME_FIELD, FILE_SIZE_FIELD, FILE_MD5_FIELD
SEPARATOR_CHAR = '\t'
UTF8_ENCODE ='utf8'
BATCH_IDS = "batchIDs"

# This script load matadata files to database
# input: file info list
class DataLoader:
def __init__(self, configs, model, batch, mongo_dao):
def __init__(self, model, batch, mongo_dao):
self.log = get_logger('Matedata loader')
self.configs = configs
self.model = model
self.mongo_dao =mongo_dao
self.batch = batch
self.file_nodes = model[MODEL].get("file-nodes", {})
self.file_nodes = self.model.model[MODEL].get("file-nodes", {})
self.errors = None

"""
Expand All @@ -28,7 +28,7 @@ def __init__(self, configs, model, batch, mongo_dao):
def load_data(self, file_path_list):
returnVal = True
self.errors = []
intention = self.batch.get(BATCH_INTENTION, STATUS_NEW)
intention = self.batch.get(BATCH_INTENTION, INTENTION_NEW)
file_types = None if intention == INTENTION_DELETE else [k for (k,v) in self.file_nodes.items()]
deleted_ids = [] if intention == INTENTION_DELETE else None
for file in file_path_list:
Expand Down Expand Up @@ -111,7 +111,7 @@ def get_record_id(self, intention, node):
get node id defined in model dict
"""
def get_node_id(self, type, row):
id_field = self.model[MODEL][NODES_LABEL][type].get(ID_PROPERTY, None)
id_field = self.model.get_node_id(type)
return row[id_field] if id_field else None

"""
Expand All @@ -130,9 +130,9 @@ def get_parents(self, relation_fields, row):
"""
def get_file_info(self, type, prop_names, row):
file_fields = self.file_nodes.get(type)
file_name = row[file_fields["name-field"]] if file_fields["name-field"] in prop_names else None
file_size = row[file_fields["size-field"]] if file_fields["size-field"] in prop_names else None
file_md5 = row[file_fields["md5-field"]] if file_fields["md5-field"] in prop_names else None
file_name = row[file_fields[FILE_NAME_FIELD]] if file_fields[FILE_NAME_FIELD] in prop_names else None
file_size = row[file_fields[FILE_SIZE_FIELD]] if file_fields[FILE_SIZE_FIELD] in prop_names else None
file_md5 = row[file_fields[FILE_MD5_FIELD]] if file_fields[FILE_MD5_FIELD] in prop_names else None
return {
FILE_NAME: file_name,
SIZE: file_size,
Expand Down
4 changes: 2 additions & 2 deletions src/essential_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def essentialValidate(configs, job_queue, mongo_dao):
result = validator.validate(batch)
if result and len(validator.download_file_list) > 0:
#3. call mongo_dao to load data
data_loader = DataLoader(configs, model_store.get_model_by_data_common(validator.datacommon), batch, mongo_dao)
data_loader = DataLoader(model_store.get_model_by_data_common(validator.datacommon), batch, mongo_dao)
result, errors = data_loader.load_data(validator.download_file_list)
if result:
batch[BATCH_STATUS] = BATCH_STATUS_LOADED
Expand Down Expand Up @@ -241,7 +241,7 @@ def validate_data(self, file_info):
type = self.df[TYPE][0]

# get id data fields for the type, the domain for mvp2/m3 is cds.
id_field = self.model_store.get_node_id(self.model, type)
id_field = self.model.get_node_id(type)
if not id_field: return True
# extract ids from df.
ids = self.df[id_field].tolist()
Expand Down
7 changes: 1 addition & 6 deletions src/metadata_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

import pandas as pd
import json
import os
from botocore.exceptions import ClientError
from bento.common.sqs import VisibilityExtender
from bento.common.utils import get_logger
from bento.common.s3 import S3Bucket
from common.constants import SQS_NAME, SQS_TYPE, SCOPE, MODEL, SUBMISSION_ID, ERRORS, WARNINGS, STATUS_ERROR, \
from common.constants import SQS_NAME, SQS_TYPE, SCOPE, SUBMISSION_ID, ERRORS, WARNINGS, STATUS_ERROR, \
STATUS_WARNING, STATUS_PASSED, FILE_STATUS, UPDATED_AT, MODEL_FILE_DIR, TIER_CONFIG, DATA_COMMON_NAME
from common.utils import current_datetime_str, get_exception_msg, dump_dict_to_json
from common.model_store import ModelFactory
from data_loader import DataLoader

VISIBILITY_TIMEOUT = 20

Expand All @@ -21,7 +17,6 @@ def metadataValidate(configs, job_queue, mongo_dao):
try:
model_store = ModelFactory(configs[MODEL_FILE_DIR], configs[TIER_CONFIG])
# dump models to json files
# dump_dict_to_json([model[MODEL] for model in model_store.models], f"tmp/data_models_dump.json")
dump_dict_to_json(model_store.models, f"models/data_model.json")
except Exception as e:
log.debug(e)
Expand Down

0 comments on commit aa07c3c

Please sign in to comment.