Skip to content

Commit

Permalink
merged with 640
Browse files Browse the repository at this point in the history
  • Loading branch information
vshand11 committed Dec 20, 2023
2 parents aa07c3c + d68779a commit 285bee2
Show file tree
Hide file tree
Showing 20 changed files with 246 additions and 106 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,11 @@ config/es_loader_*
backup/
**/.vscode/
**/.DS_Store
**/s3_download
**/s3_download/*

#config files for testing
**/configs/models
**/configs/*config.yml

# model dict dumps
**/models/*.json
50 changes: 27 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ The application is programmed purely with python v3.11. It depends on bento com
The application is consist of multiple python modules/classes to support multiple functions listed below:

1) Linux services that keeps poll crdc databub message in specific AWS SQS queues for different service types, essential, file and metadata validation services respectively.
2) Metadata model factory that stores models in python dict for different data commons after the service started.
3) Metadata model reader that is called by the metadata model factory to parse data model yaml files.
4) Mongo database access layer for the service to retrieve batch detail, loading metadata into DB and update batch or dataRecords after validation and data loading.
2) Metadata model factory that stores models in python dict for different data commons after the service started. It also provides different mode access functions.
3) Metadata model reader that is called by the metadata model factory to parse data model yaml files currently and is expendable for different model sources.
4) Mongo database access layer for services to retrieve batch detail, loading metadata into DB and update batch or dataRecords after validation and data loading.
5) File downloader that get metadata file objects from S3 bucket based on batch.
6) Essential validator, file validator and metadata validator to validate file and/or contents.
7) Data loader that update or insert validated data in tsv file into Mongo database.
7) Data loader thats insert, update or delete validated data in tsv file to Mongo database.
8) Log info, error and exceptions.

Major implemented modules/classes in src dir:
Expand All @@ -21,14 +21,14 @@ Major implemented modules/classes in src dir:
This is the entry point of the command line interface. It controls the workflow and dispatches to different service based on configured service type.

2) config.py
This class manages request arguments and configurations. It receives user's arguments, validate these arguments and store them in a dictionary.
This class manages request arguments and configurations. It receives user's arguments, validates these arguments and stores them in a dictionary.

3) essential_validator.py contains:
A service that poll messages in a specific sqs queue, loader_queue, and call essential validator class for validation and metadata loading.
A class, EssentialValidator, validates 1) if batch object contains all required fields; 2) check if files in the batch are metadata; 3) check if tsv file contents are valid. 4) if the batch matadata intention is new, verify no existing records are matched with ids of the data in the files.

4) data_loader.py
This class loads validated metadata into Mongo DB.
This class loads validated metadata into Mongo DB, updates or deletes records based on the metadataIntention in a batch.

5) common/mongo_dao.py
This class is the Mongo DB access object that takes care DB connection, CRUD operations, and handle DB errors.
Expand All @@ -38,35 +38,39 @@ Major implemented modules/classes in src dir:

7) file_validator.py contains:
A service that polls messages in a specific sqs queue, file_queue, and call file validator class for validation and duplication checking.
A class, FileValidator, validates individual file or files uploaded for a submission and check duplications
A class, FileValidator, validates individual file or files uploaded for a submission and check duplications.

8) metadata_validator.py contains:
A service that polls messages in a specific sqs queue, metadata_queue, and call metadata validator class for validation.
A class, MeataDataValidator, validates metadata node by node and record by record in the node, validation relationships among nodes.

Environment settings:

1) ECR tier: - key: DEV_TIER. #possible value in ["dev2", "qa"....]
2) STS queues: 1) key: LOADER_QUEUE 2) key: FILE_QUEUE 3) key: METADATA_QUEUE
3) Settings in Configuration file:
1.1 Mongo DB connection string: - key: connection-str #e.g. value: mongodb://xxx:xxx@localhost:27017/?authMechanism=DEFAULT
1.2 Database name: - key: db #e.g. crdc-datahub2
1.3 Service type: - key: service-type # possible value in ["essential", "file", "metadata"]
1.4 Data model location: - key: models-loc value: https://raw.githubusercontent.com/CBIIT/crdc-datahub-models/ #only applied for service type of essential and metadata.
1.5 Mounted S3 bucket dir: - key: s3_bucket_drive value: /s3_bucket #only applied for service type of file.
1) ECR tier: - key: TIER. #possible value in ["dev2", "qa"....]
2) STS queues:
2-1 key: LOADER_QUEUE, for essential validation requests.
2-2 key: FILE_QUEUE, for file validation requests.
2-3 key: METADATA_QUEUE, for metadata validation requests.
3) Mongo database configurations:
3-1 key: MONGO_DB_HOST, Mongo DB server host.
3-2 key: MONGO_DB_PORT, Mongo DB server port
3-3 key: MONGO_DB_USER, Mongo DB server user ID.
3-4 key: MONGO_DB_PASSWORD, Mongo DB server user password.
3-5 key: DATABASE_NAME, Mongo database name.
4) Settings in Configuration file and/or arguments:
4-1 key: service-type # possible value in ["essential", "file", "metadata"]
4-2 key: key: models-loc, value: https://raw.githubusercontent.com/CBIIT/crdc-datahub-models/ #only required for service type of essential and metadata.

Usage of the CLI tool:

1) Get helps command
1) Get helps command:
$ python src/uploader.py -h
##Executing results:
Command line arguments / configuration

2) Start essential validation service command
$ python src/validator.py -c configs/validator-metadata-config.yml
2) Start essential validation service command:
$ python src/validator.py -c configs/validator-essential-config.yml

3) Start file validation service command
3) Start file validation service command:
$ python src/validator.py -c configs/validator-file-config.yml

4) Start metadata validation service command (TBD)

4) Start metadata validation service command:
$ python src/validator.py -c configs/validator-metadata-config.yml
7 changes: 7 additions & 0 deletions configs/validate-essential-config-dev.yml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{# service type value in essential, file and metadata #}
service-type: {{service-type}}
{# data model location #}
models-loc: {{models-loc}}



12 changes: 10 additions & 2 deletions configs/validate-essential-config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@ Config:
# service type value in essential, file and metadata
service-type: essential
# MongoDB configurations
# connection string
connection-str: mongodb://xxx:xxx@localhost:27017/?authMechanism=DEFAULT
# please note all mongo database settings here are optional since will use env settings
# db server
server: localhost
# db port
port: 27017
# db user id
user: xxx
# db user password
pwd: ***
# db name
db: crdc-datahub
#sqs configuration
sqs: crdcdh-queue-pgu.fifo
Expand Down
6 changes: 6 additions & 0 deletions configs/validate-file-config-dev.yml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{# service type value in essiential, file and metadata #}
service-type: {{service-type}}




12 changes: 10 additions & 2 deletions configs/validate-file-config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@ Config:
# service type value in essential, file and metadata
service-type: file
# MongoDB configurations
# connection string
connection-str: mongodb://xxx:xxx@localhost:27017/?authMechanism=DEFAULT
# please note all mongo database settings here are optional since will use env settings
# db server
server: localhost
# db port
port: 27017
# db user id
user: xxx
# db user password
pwd: ***
# db name
db: crdc-datahub
#sqs configuration
sqs: crdcdh-queue-pgu.fifo
Expand Down
6 changes: 6 additions & 0 deletions configs/validate-metadata-config-dev.yml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{# service type value in essential, file and metadata #}
service-type: {{service-type}}
{# data model location #}
models-loc: {{models-loc}}


12 changes: 10 additions & 2 deletions configs/validate-metadata-config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@ Config:
# service type value in essential, file and metadata
service-type: metadata
# MongoDB configurations
# connection string
connection-str: mongodb://xxx:xxx@localhost:27017/?authMechanism=DEFAULT
# please note all mongo database settings here are optional since will use env settings
# db server
server: localhost
# db port
port: 27017
# db user id
user: xxx
# db user password
pwd: ***
# db name
db: crdc-datahub
#sqs configuration
sqs: crdcdh-queue-pgu.fifo
Expand Down
1 change: 0 additions & 1 deletion models/CDS_1.3.0_model.json

This file was deleted.

1 change: 0 additions & 1 deletion models/ICDC_1.0.0_model.json

This file was deleted.

1 change: 1 addition & 0 deletions models/readme
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data model dumped from dict in json.
18 changes: 17 additions & 1 deletion src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
NODE_TYPE = "nodeType"
S3_BUCKET_DIR = "s3_bucket_drive"
FILE_ERRORS = "fileErrors"
PROPERTIES = "props"

FILE_NAME_FIELD = "name-field"
FILE_SIZE_FIELD = "size-field"
FILE_MD5_FIELD = "md5-field"
Expand All @@ -84,10 +86,22 @@
PROP_REQUIRED="required"
TYPE ="type"
MODELS_DEFINITION_FILE = "content.json"
TIER = "DEV_TIER"
TIER = "TIER"
TIER_CONFIG = "tier"
UPDATED_AT = "updatedAt"
FILE_SIZE = "file_size"
MIN = 'minimum'
MAX = 'maximum'
VALID_PROP_TYPE_LIST = [
"string", # default type
"integer",
"number", # float or double
"datetime",
"date",
"boolean", # true/false or yes/no
"array" # value_type: list
]
VALIDATION_RESULT = "result"

#s3 download directory
S3_DOWNLOAD_DIR = "s3_download"
Expand All @@ -102,3 +116,5 @@





2 changes: 1 addition & 1 deletion src/common/mongo_dao.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pymongo import MongoClient, errors, ReplaceOne, DeleteOne
from bento.common.utils import get_logger
from common.constants import MONGO_DB, BATCH_COLLECTION, SUBMISSION_COLLECTION, DATA_COLlECTION, ID, UPDATED_AT, \
from common.constants import BATCH_COLLECTION, SUBMISSION_COLLECTION, DATA_COLlECTION, ID, UPDATED_AT, \
SUBMISSION_ID, NODE_ID, NODE_TYPE, S3_FILE_INFO, ERRORS, INTENTION_NEW, FILE_STATUS, FILE_ERRORS
from common.utils import get_exception_msg, current_datetime_str

Expand Down
5 changes: 2 additions & 3 deletions src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import requests
import yaml
import boto3
from bento.common.utils import get_md5, get_stream_md5
from bento.common.utils import get_stream_md5
from datetime import datetime
#from bento.common.utils import get_uuid
import uuid
from common.constants import DATA_COMMON, VERSION

Expand Down Expand Up @@ -124,7 +123,7 @@ def current_datetime_str():


"""
get uuid v5
get uuid v4
"""
def get_uuid_str():
return str(uuid.uuid4())
Expand Down
74 changes: 28 additions & 46 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import argparse
import os
import yaml
from common.constants import MONGO_DB, SQS_NAME, RETRIES, DB, MODEL_FILE_DIR, \
from common.constants import MONGO_DB, SQS_NAME, DB, MODEL_FILE_DIR, \
LOADER_QUEUE, SERVICE_TYPE, SERVICE_TYPE_ESSENTIAL, SERVICE_TYPE_FILE, SERVICE_TYPE_METADATA, \
SERVICE_TYPES, DB, FILE_QUEUE, METADATA_QUEUE, TIER, S3_BUCKET_DIR, TIER_CONFIG
SERVICE_TYPES, DB, FILE_QUEUE, METADATA_QUEUE, TIER, TIER_CONFIG
from bento.common.utils import get_logger
from common.utils import clean_up_key_value

class Config():
def __init__(self):
self.log = get_logger('Upload Config')
parser = argparse.ArgumentParser(description='Upload files to AWS s3 bucket')
parser.add_argument('-s', '--service-type', type=str, choices=["essential", "file", "metadata"], help='validation type, required')
parser.add_argument('-c', '--mongo', help='Mongo database connection string, required')
parser.add_argument('-d', '--db', help='Mongo database with batch collection, required')
parser.add_argument('-p', '--models-loc', help='metadata models local, only required for essential and metadata service types')
parser.add_argument('-t', '--tier', help='current tier, optional')
parser.add_argument('-q', '--sqs', help='aws sqs name, required')
parser.add_argument('-r', '--retries', help='db connection, data loading, default value is 3, optional')

parser.add_argument('-t', '--service-type', type=str, choices=["essential", "file", "metadata"], help='validation type, required')
parser.add_argument('-s', '--server', help='Mongo database host, optional, it can be acquired from env.')
parser.add_argument('-o', '--port', help='Mongo database port, optional, it can be acquired from env.')
parser.add_argument('-u', '--user', help='Mongo database user id, optional, it can be acquired from env.')
parser.add_argument('-p', '--pwd', help='Mongo database user password, optional, it can be acquired from env.')
parser.add_argument('-d', '--db', help='Mongo database with batch collection, optional, it can be acquired from env.')
parser.add_argument('-m', '--models-loc', help='metadata models local, only required for essential and metadata service types')
parser.add_argument('-q', '--sqs', help='aws sqs name, optional, it can be acquired from env.')
parser.add_argument('config', help='configuration file path, contains all above parameters, required')

args = parser.parse_args()
Expand Down Expand Up @@ -52,38 +52,39 @@ def validate(self):
self.log.critical(f'Service type is required and must be "essential", "file" or "metadata"!')
return False

mongo = self.data.get(MONGO_DB)
if mongo is None:
self.log.critical(f'Mongo DB connection string is required!')
return False

db = self.data.get(DB)
if db is None:
self.log.critical(f'Mongo DB for batch is required!')
db_server = self.data.get("server", os.environ.get("MONGO_DB_HOST"))
db_port = self.data.get("port", os.environ.get("MONGO_DB_PORT"))
db_user_id = self.data.get("user", os.environ.get("MONGO_DB_USER"))
db_user_password = self.data.get("pwd", os.environ.get("MONGO_DB_PASSWORD"))
db_name= self.data.get("db", os.environ.get("MONGO_DB_NAME"))
if db_server is None or db_port is None or db_user_id is None or db_user_password is None \
or db_name is None:
self.log.critical(f'Missing Mongo BD setting(s)!')
return False

else:
self.data[DB] = db_name
self.data[MONGO_DB] = f"mongodb://{db_user_id}:{db_user_password}@{db_server}:{db_port}/?authMechanism=DEFAULT"

models_loc= self.data.get(MODEL_FILE_DIR)
if models_loc is None and self.data[SERVICE_TYPE] != SERVICE_TYPE_FILE:
self.log.critical(f'Metadata models location is required!')
return False

# try to get sqs setting from env.
if self.data[SERVICE_TYPE] == SERVICE_TYPE_ESSENTIAL:
sqs = os.environ.get(LOADER_QUEUE)
sqs = os.environ.get(LOADER_QUEUE, self.data.get(SQS_NAME))
elif self.data[SERVICE_TYPE] == SERVICE_TYPE_FILE:
sqs = os.environ.get(FILE_QUEUE)
sqs = os.environ.get(FILE_QUEUE, self.data.get(SQS_NAME))
elif self.data[SERVICE_TYPE] == SERVICE_TYPE_METADATA:
sqs = os.environ.get(METADATA_QUEUE)
sqs = os.environ.get(METADATA_QUEUE, self.data.get(SQS_NAME))
else:
sqs = None

# if no env set got sqs, check config/arg
if not sqs:
sqs = self.data.get(SQS_NAME)
if not sqs:
self.log.critical(f'AWS sqs name is required!')
return False
else:
self.log.critical(f'AWS sqs name is required!')
return False
else:
self.data[SQS_NAME] = sqs

tier = os.environ.get(TIER, self.data.get(TIER_CONFIG))
Expand All @@ -93,24 +94,5 @@ def validate(self):
else:
self.data[TIER_CONFIG] = tier

# s3_bucket_drive = self.data.get(S3_BUCKET_DIR)
# if not s3_bucket_drive and self.data[SERVICE_TYPE] == SERVICE_TYPE_FILE:
# self.log.critical(f'No s3 bucket drive configured!')
# return False
# else:
# self.data[S3_BUCKET_DIR] = s3_bucket_drive


retry = self.data.get(RETRIES, 3) #default value is 3
if isinstance(retry, str):
if not retry.isdigit():
self.log.critical(f'retries is not integer!')
return False
else:
self.data[RETRIES] =int(retry)
else:
self.data[RETRIES] =int(retry)


return True

4 changes: 3 additions & 1 deletion src/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ def load_data(self, file_path_list):
prop_names = [name for name in col_names if not name in [TYPE, 'index'] + relation_fields]
node_id = self.get_node_id(type, row)
exist_node = None if intention == INTENTION_NEW else self.mongo_dao.get_dataRecord_nodeId(node_id)
batchIds = [self.batch[ID]] if intention == INTENTION_NEW or not exist_node else [self.batch[ID]] + exist_node[BATCH_IDS]
batchIds = [self.batch[ID]] if intention == INTENTION_NEW or not exist_node else exist_node[BATCH_IDS] + [self.batch[ID]]
dataRecord = {
ID: self.get_record_id(intention, exist_node),
SUBMISSION_ID: self.batch[SUBMISSION_ID],
BATCH_IDS: batchIds,
"latestBatchID": self.batch[ID],
"uploadedDate": current_datetime_str(),
FILE_STATUS: STATUS_NEW,
ERRORS: [] if intention == INTENTION_NEW or not exist_node else exist_node[ERRORS],
WARNINGS: [] if intention == INTENTION_NEW or not exist_node else exist_node[WARNINGS],
Expand Down
4 changes: 2 additions & 2 deletions src/essential_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from bento.common.sqs import VisibilityExtender
from bento.common.utils import get_logger
from bento.common.s3 import S3Bucket
from common.constants import BATCH_STATUS, BATCH_TYPE_METADATA, DATA_COMMON_NAME, ERRORS, DB, \
ERRORS, S3_DOWNLOAD_DIR, SQS_NAME, BATCH_ID, BATCH_STATUS_LOADED, INTENTION_NEW, IDS, SQS_TYPE, TYPE_LOAD,\
from common.constants import BATCH_STATUS, BATCH_TYPE_METADATA, DATA_COMMON_NAME, ERRORS, \
ERRORS, S3_DOWNLOAD_DIR, SQS_NAME, BATCH_ID, BATCH_STATUS_LOADED, INTENTION_NEW, SQS_TYPE, TYPE_LOAD,\
BATCH_STATUS_REJECTED, ID, FILE_NAME, TYPE, FILE_PREFIX, BATCH_INTENTION, NODE_LABEL, MODEL_FILE_DIR, TIER_CONFIG
from common.utils import cleanup_s3_download_dir, get_exception_msg, dump_dict_to_json
from common.model_store import ModelFactory
Expand Down
Loading

0 comments on commit 285bee2

Please sign in to comment.