diff --git a/chrisClient.py b/chrisClient.py index 3445dd0..1066fd2 100644 --- a/chrisClient.py +++ b/chrisClient.py @@ -7,6 +7,7 @@ import time from loguru import logger import sys +from pipeline import Pipeline LOG = logger.debug @@ -39,32 +40,33 @@ def pacs_push(self): pass def anonymize(self, dicom_dir: str, tag_struct: str, send_params: dict, pv_id: int): + dsdir_inst_id = self.pl_run_dicomdir(dicom_dir,tag_struct,pv_id) + plugin_params = { + 'dicom-anonymization': { + "tagStruct": tag_struct, + 'fileFilter': '.dcm' + }, + 'push-to-orthanc': { + 'inputFileFilter': "**/*dcm", + "orthancUrl": send_params["url"], + "username": send_params["username"], + "password": send_params["password"], + "pushToRemote": send_params["aec"] + } + } + pipe = Pipeline(self.cl) + pipe.workflow_schedule(dsdir_inst_id, "DICOM anonymization and Orthanc push 20241217", + plugin_params) + def pl_run_dicomdir(self, dicom_dir: str, tag_struct: str, pv_id: int) -> int: pl_id = self.__get_plugin_id({"name": "pl-dsdircopy", "version": "1.0.2"}) - pl_sub_id = self.__get_plugin_id({"name": "pl-pfdicom_tagsub", "version": "3.3.4"}) - pl_dcm_id = self.__get_plugin_id({"name": "pl-orthanc_push", "version": "1.2.7"}) - # 1) Run dircopy # empty directory check if len(dicom_dir) == 0: LOG(f"No directory found in CUBE containing files for search : {tag_struct}") return pv_in_id = self.__create_feed(pl_id, {"previous_id": pv_id, 'dir': dicom_dir}) - - # 2) Run anonymization - data = {"previous_id": pv_in_id, "tagStruct": tag_struct, 'fileFilter': '.dcm'} - tag_sub_id = self.__create_feed(pl_sub_id, data) - - # 3) Push results - dir_send_data = { - "previous_id": tag_sub_id, - 'inputFileFilter': "**/*dcm", - "orthancUrl": send_params["url"], - "username": send_params["username"], - "password": send_params["password"], - "pushToRemote": send_params["aec"] - } - pl_inst_id = self.__create_feed(pl_dcm_id, dir_send_data) + return int(pv_in_id) def __create_feed(self, plugin_id: str,params: dict): response = self.cl.create_plugin_instance(plugin_id, params) diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..ba2c453 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,105 @@ +import json +class Pipeline: + def __init__(self, client): + self.cl = client + + def pluginParameters_setInNodes(self, + d_piping: dict, + d_pluginParameters: dict + ) -> dict: + """ + Override default parameters in the `d_piping` + + Args: + d_piping (dict): the current default parameters for the + plugins in a pipeline + d_pluginParameters (dict): a list of plugins and parameters to + set in the response + + Returns: + dict: a new piping structure with changes to some parameter values + if required. If no d_pluginParameters is passed, simply + return the piping unchanged. + + """ + for pluginTitle, d_parameters in d_pluginParameters.items(): + for piping in d_piping: + if pluginTitle in piping.get('title'): + for k, v in d_parameters.items(): + for d_default in piping.get('plugin_parameter_defaults'): + if k in d_default.get('name'): + d_default['default'] = v + return d_piping + + def pipelineWithName_getNodes( + self, + str_pipelineName: str, + d_pluginParameters: dict = {} + ) -> dict: + """ + Find a pipeline that contains the passed name + and if found, return a nodes dictionary. Optionally set relevant + plugin parameters to values described in + + + Args: + str_pipelineName (str): the name of the pipeline to find + d_pluginParameters (dict): a set of optional plugin parameter + overrides + + Returns: + dict: node dictionary (name, compute env, default parameters) + and id of the pipeline + """ + # pudb.set_trace() + id_pipeline: int = -1 + ld_node: list = [] + d_pipeline: dict = self.cl.get_pipelines({'name': str_pipelineName}) + if 'data' in d_pipeline: + id_pipeline: int = d_pipeline['data'][0]['id'] + d_response: dict = self.cl.get_pipeline_default_parameters( + id_pipeline, {'limit': 1000} + ) + if 'data' in d_response: + ld_node = self.pluginParameters_setInNodes( + self.cl.compute_workflow_nodes_info(d_response['data'], True), + d_pluginParameters) + for piping in ld_node: + if piping.get('compute_resource_name'): + del piping['compute_resource_name'] + return { + 'nodes': ld_node, + 'id': id_pipeline + } + + def workflow_schedule(self, + inputDataNodeID: str, + str_pipelineName: str, + d_pluginParameters: dict = {} + ) -> dict: + """ + Schedule a workflow that has name off a given node id + of . + + Args: + inputDataNodeID (str): id of parent node + str_pipelineName (str): substring of workflow name to connect + d_pluginParameters (dict): optional structure of default parameter + overrides + + Returns: + dict: result from calling the client `get_workflow_plugin_instances` + """ + d_pipeline: dict = self.pipelineWithName_getNodes( + str_pipelineName, d_pluginParameters + ) + d_workflow: dict = self.cl.create_workflow( + d_pipeline['id'], + { + 'previous_plugin_inst_id': inputDataNodeID, + 'nodes_info': json.dumps(d_pipeline['nodes']) + }) + d_workflowInst: dict = self.cl.get_workflow_plugin_instances( + d_workflow['id'], {'limit': 1000} + ) + return d_workflowInst \ No newline at end of file diff --git a/setup.py b/setup.py index b9cee14..faa1f8c 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def get_version(rel_path: str) -> str: author='FNNDSC', author_email='dev@babyMRI.org', url='https://github.com/FNNDSC/pl-reg_', - py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient'], + py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline'], install_requires=['chris_plugin'], license='MIT', entry_points={