Skip to content

Commit

Permalink
added pipeline controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Dec 17, 2024
1 parent 7bd570b commit 7e764ac
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
30 changes: 30 additions & 0 deletions chrisClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from loguru import logger
import sys
from pipeline import Pipeline

LOG = logger.debug

Expand Down Expand Up @@ -39,6 +40,35 @@ def pacs_push(self):
pass

def anonymize(self, dicom_dir: str, tag_struct: str, send_params: dict, pv_id: int):
dsdir_inst_id = self.pl_run_dicomdir(dicom_dir,tag_struct,pv_id)
plugin_params = {
'dicom-anonymization': {
"tagStruct": tag_struct,
'fileFilter': '.dcm'
},
'push-to-orthanc': {
'inputFileFilter': "**/*dcm",
"orthancUrl": send_params["url"],
"username": send_params["username"],
"password": send_params["password"],
"pushToRemote": send_params["aec"]
}
}
pipe = Pipeline(self.cl)
pipe.workflow_schedule(dsdir_inst_id, "DICOM anonymization and Orthanc push 20241217",
plugin_params)

def pl_run_dicomdir(self, dicom_dir: str, tag_struct: str, pv_id: int) -> int:
pl_id = self.__get_plugin_id({"name": "pl-dsdircopy", "version": "1.0.2"})
# 1) Run dircopy
# empty directory check
if len(dicom_dir) == 0:
LOG(f"No directory found in CUBE containing files for search : {tag_struct}")
return
pv_in_id = self.__create_feed(pl_id, {"previous_id": pv_id, 'dir': dicom_dir})
return int(pv_in_id)

def anonymize_(self, dicom_dir: str, tag_struct: str, send_params: dict, pv_id: int):

pl_id = self.__get_plugin_id({"name": "pl-dsdircopy", "version": "1.0.2"})
pl_sub_id = self.__get_plugin_id({"name": "pl-pfdicom_tagsub", "version": "3.3.4"})
Expand Down
105 changes: 105 additions & 0 deletions pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
class Pipeline:
def __init__(self, client):
self.cl = client

def pluginParameters_setInNodes(self,
d_piping: dict,
d_pluginParameters: dict
) -> dict:
"""
Override default parameters in the `d_piping`
Args:
d_piping (dict): the current default parameters for the
plugins in a pipeline
d_pluginParameters (dict): a list of plugins and parameters to
set in the response
Returns:
dict: a new piping structure with changes to some parameter values
if required. If no d_pluginParameters is passed, simply
return the piping unchanged.
"""
for pluginTitle, d_parameters in d_pluginParameters.items():
for piping in d_piping:
if pluginTitle in piping.get('title'):
for k, v in d_parameters.items():
for d_default in piping.get('plugin_parameter_defaults'):
if k in d_default.get('name'):
d_default['default'] = v
return d_piping

def pipelineWithName_getNodes(
self,
str_pipelineName: str,
d_pluginParameters: dict = {}
) -> dict:
"""
Find a pipeline that contains the passed name <str_pipelineName>
and if found, return a nodes dictionary. Optionally set relevant
plugin parameters to values described in <d_pluginParameters>
Args:
str_pipelineName (str): the name of the pipeline to find
d_pluginParameters (dict): a set of optional plugin parameter
overrides
Returns:
dict: node dictionary (name, compute env, default parameters)
and id of the pipeline
"""
# pudb.set_trace()
id_pipeline: int = -1
ld_node: list = []
d_pipeline: dict = self.cl.get_pipelines({'name': str_pipelineName})
if 'data' in d_pipeline:
id_pipeline: int = d_pipeline['data'][0]['id']
d_response: dict = self.cl.get_pipeline_default_parameters(
id_pipeline, {'limit': 1000}
)
if 'data' in d_response:
ld_node = self.pluginParameters_setInNodes(
self.cl.compute_workflow_nodes_info(d_response['data'], True),
d_pluginParameters)
for piping in ld_node:
if piping.get('compute_resource_name'):
del piping['compute_resource_name']
return {
'nodes': ld_node,
'id': id_pipeline
}

def workflow_schedule(self,
inputDataNodeID: str,
str_pipelineName: str,
d_pluginParameters: dict = {}
) -> dict:
"""
Schedule a workflow that has name <str_pipelineName> off a given node id
of <inputDataNodeID>.
Args:
inputDataNodeID (str): id of parent node
str_pipelineName (str): substring of workflow name to connect
d_pluginParameters (dict): optional structure of default parameter
overrides
Returns:
dict: result from calling the client `get_workflow_plugin_instances`
"""
d_pipeline: dict = self.pipelineWithName_getNodes(
str_pipelineName, d_pluginParameters
)
d_workflow: dict = self.cl.create_workflow(
d_pipeline['id'],
{
'previous_plugin_inst_id': inputDataNodeID,
'nodes_info': json.dumps(d_pipeline['nodes'])
})
d_workflowInst: dict = self.cl.get_workflow_plugin_instances(
d_workflow['id'], {'limit': 1000}
)
return d_workflowInst
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_version(rel_path: str) -> str:
author='FNNDSC',
author_email='[email protected]',
url='https://github.com/FNNDSC/pl-reg_',
py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient'],
py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline'],
install_requires=['chris_plugin'],
license='MIT',
entry_points={
Expand Down

0 comments on commit 7e764ac

Please sign in to comment.