From 9372d3cf49ef49ca12c9cede45271d2ae42d932f Mon Sep 17 00:00:00 2001 From: Giulia Baldini <44327645+giuliabaldini@users.noreply.github.com> Date: Wed, 15 Feb 2023 22:03:00 +0100 Subject: [PATCH] 94 efficiency problem with merge on (#113) * Remove the merge on parameter and return one dataframe per resource * Filter out none values directly from the returned records * Modify bundle_to_dataframe to take the union of all processed bundles per resource * Adjust tests * Different outputs for query_to_dataframe * Remove always return dict overwrite, that may kill everything * Update pyproject.toml --- fhir_pyrate/pirate.py | 209 +++++++----------- .../util/bundle_processing_templates.py | 18 +- pyproject.toml | 2 +- tests/test_public.py | 62 +++++- 4 files changed, 148 insertions(+), 143 deletions(-) diff --git a/fhir_pyrate/pirate.py b/fhir_pyrate/pirate.py index 71f66b5..5ef03c0 100644 --- a/fhir_pyrate/pirate.py +++ b/fhir_pyrate/pirate.py @@ -231,9 +231,8 @@ def steal_bundles_to_dataframe( num_pages: int = -1, process_function: Callable[[FHIRObj], Any] = flatten_data, fhir_paths: List[Union[str, Tuple[str, str]]] = None, - merge_on: str = None, build_df_after_query: bool = False, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Executes a request, iterates through the result pages, and builds a DataFrame with their information. The DataFrames are either built after each @@ -250,12 +249,11 @@ def steal_bundles_to_dataframe( DataFrame, alternatively, a list of tuples can be used to specify the column name of the future column with (column_name, fhir_path). Please refer to the `bundles_to_dataframe` functions for notes on how to use the FHIR paths - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one, an example is given in `bundles_to_dataframe` :param build_df_after_query: Whether the DataFrame should be built after all bundles have been collected, or whether the bundles should be transformed just after retrieving - :return: A DataFrame containing the queried information + :return: A DataFrame per queried resource. In case only once resource is queried, then only + one dictionary is given back, otherwise a dictionary of (resourceType, DataFrame) is + returned. """ return self._query_to_dataframe(self._get_bundles)( resource_type=resource_type, @@ -264,9 +262,9 @@ def steal_bundles_to_dataframe( silence_tqdm=False, process_function=process_function, fhir_paths=fhir_paths, - merge_on=merge_on, build_df_after_query=build_df_after_query, disable_multiprocessing_build=True, + always_return_dict=False, ) def sail_through_search_space( @@ -309,9 +307,8 @@ def sail_through_search_space_to_dataframe( request_params: Dict[str, Any] = None, process_function: Callable[[FHIRObj], Any] = flatten_data, fhir_paths: List[Union[str, Tuple[str, str]]] = None, - merge_on: str = None, build_df_after_query: bool = False, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Uses the multiprocessing module to speed up some queries. The time frame is divided into multiple time spans (as many as there are processes) and each smaller @@ -332,12 +329,11 @@ def sail_through_search_space_to_dataframe( DataFrame, alternatively, a list of tuples can be used to specify the column name of the future column with (column_name, fhir_path). Please refer to the `bundles_to_dataframe` functions for notes on how to use the FHIR paths - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one, an example is given in `bundles_to_dataframe` :param build_df_after_query: Whether the DataFrame should be built after all bundles have been collected, or whether the bundles should be transformed just after retrieving - :return: A DataFrame containing FHIR bundles with the queried information for all timespans + :return: A DataFrame per queried resource for all timestamps. In case only once resource + is queried, then only one dictionary is given back, otherwise a dictionary of + (resourceType, DataFrame) is returned. """ return self._query_to_dataframe(self._sail_through_search_space)( resource_type=resource_type, @@ -347,8 +343,8 @@ def sail_through_search_space_to_dataframe( date_end=date_end, process_function=process_function, fhir_paths=fhir_paths, - merge_on=merge_on, build_df_after_query=build_df_after_query, + always_return_dict=False, ) def trade_rows_for_bundles( @@ -405,9 +401,8 @@ def trade_rows_for_dataframe( num_pages: int = -1, with_ref: bool = True, with_columns: List[Union[str, Tuple[str, str]]] = None, - merge_on: str = None, build_df_after_query: bool = False, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Go through the rows of a DataFrame (with multiprocessing), run a query, retrieve bundles for each row and transform them into a DataFrame. @@ -441,9 +436,6 @@ def trade_rows_for_dataframe( added to output DataFrame. The columns from the source DataFrame can be either specified as a list of columns `[col1, col2, ...]` or as a list of tuples `[(new_name_for_col1, col1), (new_name_for_col2, col2), ...]` - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one, an example is given in `bundles_to_dataframe` :param process_function: The transformation function going through the entries and storing the entries to save :param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the @@ -452,8 +444,9 @@ def trade_rows_for_dataframe( functions for notes on how to use the FHIR paths :param build_df_after_query: Whether the DataFrame should be built after all bundles have been collected, or whether the bundles should be transformed just after retrieving - :return: A DataFrame containing FHIR bundles with the queried information for all rows - and if requested some columns containing the original constraints + :return: A DataFrame per queried resource which contains information about all rows. + In case only once resource is queried, then only one dictionary is given back, otherwise + a dictionary of (resourceType, DataFrame) is returned. """ with logging_redirect_tqdm(): if fhir_paths is not None: @@ -473,9 +466,9 @@ def trade_rows_for_dataframe( tqdm_df_build=not build_df_after_query, ), process_function=process_function, - merge_on=merge_on, build_df_after_query=build_df_after_query, disable_multiprocessing=self.disable_multiprocessing_build, + always_return_dict=False, ) if self.caching and self.num_processes > 1: logger.info( @@ -545,11 +538,8 @@ def trade_rows_for_dataframe( } for req_sample in req_params_per_sample ] - found_dfs = [] + final_dfs: Dict[str, List[pd.DataFrame]] = {} tqdm_text = f"Query & Build DF ({resource_type})" - # TODO: Can the merge_on be run for each smaller DataFrame? - # is there the possibility to have resources referring to the same thing in - # different bundles? if ( self.disable_multiprocessing_requests or self.disable_multiprocessing_build @@ -561,18 +551,21 @@ def trade_rows_for_dataframe( desc=tqdm_text, ): # Get the dataframe - found_df = self._query_to_dataframe(self._get_bundles)( + found_dfs = self._query_to_dataframe(self._get_bundles)( process_function=process_function, build_df_after_query=False, disable_multiprocessing_build=True, + always_return_dict=True, **param, ) - self._copy_existing_columns( - df=found_df, - input_params=input_param, - key_mapping=with_columns_rename, - ) - found_dfs.append(found_df) + for resource_type, found_df in found_dfs.items(): + final_dfs.setdefault(resource_type, []) + self._copy_existing_columns( + df=found_df, + input_params=input_param, + key_mapping=with_columns_rename, + ) + final_dfs[resource_type].append(found_df) else: pool = multiprocessing.Pool(self.num_processes) results = [] @@ -591,6 +584,7 @@ def trade_rows_for_dataframe( process_function=process_function, build_df_after_query=False, disable_multiprocessing=True, + always_return_dict=True, ), ), input_param, @@ -598,29 +592,29 @@ def trade_rows_for_dataframe( ) for async_result, input_param in results: # Get the results and build the dataframes - found_df = async_result.get() - self._copy_existing_columns( - df=found_df, - input_params=input_param, - key_mapping=with_columns_rename, - ) - found_dfs.append(found_df) + found_dfs = async_result.get() + for resource_type, found_df in found_dfs.items(): + final_dfs.setdefault(resource_type, []) + self._copy_existing_columns( + df=found_df, + input_params=input_param, + key_mapping=with_columns_rename, + ) + final_dfs[resource_type].append(found_df) pool.close() pool.join() - df = pd.concat(found_dfs, ignore_index=True) - return ( - df - if merge_on is None or len(df) == 0 - else self.merge_on_col(df, merge_on) - ) + dfs = { + resource_type: pd.concat(final_dfs[resource_type], ignore_index=True) + for resource_type in final_dfs + } + return list(dfs.values())[0] if len(dfs) == 1 else dfs def bundles_to_dataframe( self, bundles: Union[List[FHIRObj], Generator[FHIRObj, None, int]], process_function: Callable[[FHIRObj], Any] = flatten_data, fhir_paths: List[Union[str, Tuple[str, str]]] = None, - merge_on: str = None, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Convert a bundle into a DataFrame using either the `flatten_data` function (default), FHIR paths or a custom processing function. For the case of `flatten_data` and the FHIR @@ -633,10 +627,9 @@ def bundles_to_dataframe( :param fhir_paths: A list of FHIR paths (https://hl7.org/fhirpath/) to be used to build the DataFrame, alternatively, a list of tuples can be used to specify the column name of the future column with (column_name, fhir_path). - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one, example below - :return: A pandas DataFrame containing the queried information + :return: A DataFrame per queried resource. In case only once resource is queried, then only + one dictionary is given back, otherwise a dictionary of (resourceType, DataFrame) is + returned. **NOTE 1 on FHIR paths**: The standard also allows some primitive math operations such as modulus (`mod`) or integer division (`div`), and this may be problematic if there are fields of the resource that use these terms as attributes. @@ -672,31 +665,6 @@ def bundles_to_dataframe( # ], num_pages=1, ) - ``` - - The following example will initially return one row for each entry, but using - `group_row="patient_id"` we choose a column to run the merge on. This will merge the - columns that contain values that for the others are empty, having then one row representing - one patient. - ``` - df = search.trade_rows_for_dataframe( - resource_type="Patient", - request_params={ - "_sort": "_id", - "_count": 10, - "birthdate": "ge1990", - "_revinclude": "Condition:subject", - }, - fhir_paths=[ - ("patient_id", "Patient.id"), - ("patient_id", "Condition.subject.reference.replace('Patient/', '')"), - "Patient.gender", - "Condition.code.coding.code", - ], - num_pages=1, - merge_on="patient_id" - ) - ``` """ with logging_redirect_tqdm(): if fhir_paths is not None: @@ -708,34 +676,11 @@ def bundles_to_dataframe( return self._bundles_to_dataframe( bundles=bundles, process_function=process_function, - merge_on=merge_on, build_df_after_query=True, disable_multiprocessing=self.disable_multiprocessing_build, + always_return_dict=False, ) - @staticmethod - def merge_on_col(df: pd.DataFrame, merge_on: str) -> pd.DataFrame: - """ - Merges rows from different resources on a given attribute. - :param df: The DataFrame where the merge should be applied - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one - :return: A DataFrame where the rows having the same `merge_on` attribute are merged. - """ - # TODO: Could probably be done more efficiently? - new_df = df[merge_on] - for col in df.columns: - if col == merge_on: - continue - new_df = pd.merge( - left=new_df, - right=df.loc[~df[col].isna(), [merge_on, col]], - how="outer", - ) - new_df = new_df.loc[new_df.astype(str).drop_duplicates().index] - return new_df.reset_index(drop=True) - @staticmethod def smash_rows( df: pd.DataFrame, @@ -1067,14 +1012,12 @@ def _get_bundles( ) bundle_total: Union[int, float] = num_pages total = self._get_total_from_bundle(bundle, count_entries=False) - if bundle_total == -1: n_entries = self._get_total_from_bundle(bundle, count_entries=True) if total and n_entries: bundle_total = math.ceil(total / n_entries) else: bundle_total = math.inf - if ( "history" not in (request_params or {}) and "_id" not in (request_params or {}) @@ -1381,10 +1324,10 @@ def _bundles_to_dataframe( self, bundles: Union[List[FHIRObj], Generator[FHIRObj, None, int]], process_function: Callable[[FHIRObj], Any] = flatten_data, - merge_on: str = None, build_df_after_query: bool = False, disable_multiprocessing: bool = False, - ) -> pd.DataFrame: + always_return_dict: bool = False, + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Convert a bundle into a DataFrame using either the `flatten_data` function (default), FHIR paths or a custom processing function. For the case of `flatten_data` and the FHIR @@ -1394,42 +1337,52 @@ def _bundles_to_dataframe( :param bundles: The bundles to transform :param process_function: The transformation function going through the entries and storing the entries to save - :param merge_on: Whether to merge the results on a certain row after computing. This is - useful when using includes, if you store the IDs on the same column you can use that column - to merge all the rows into one, an example is given in `bundles_to_dataframe` :param build_df_after_query: Whether the DataFrame should be built after all bundles have been collected, or whether the bundles should be transformed just after retrieving :param disable_multiprocessing: Whether the bundles should be processed sequentially - :return: A pandas DataFrame containing the queried information + :return: A DataFrame per queried resource. In case only once resource is queried, then only + one dictionary is given back, otherwise a dictionary of (resourceType, DataFrame) is + returned. """ if disable_multiprocessing: - results = [item for bundle in bundles for item in process_function(bundle)] + processed_bundles = [process_function(bundle) for bundle in bundles] else: # TODO: It could be that this never makes sense pool = multiprocessing.Pool(self.num_processes) if build_df_after_query or isinstance(bundles, List): bundles = list(bundles) - results = [ - item - for sublist in tqdm( + processed_bundles = [ + bundle_output + for bundle_output in tqdm( pool.imap(process_function, bundles), total=len(bundles), desc="Build DF", ) - for item in sublist ] else: - results = [ - item - for sublist in pool.imap(process_function, bundles) - for item in sublist + processed_bundles = [ + bundle_output + for bundle_output in pool.imap(process_function, bundles) ] pool.close() pool.join() - df = pd.DataFrame(results) - return ( - df if merge_on is None or len(df) == 0 else self.merge_on_col(df, merge_on) - ) + results: Dict[str, List[Dict[str, Any]]] = {} + for bundle_output in processed_bundles: + if isinstance(bundle_output, List): + bundle_output = {"SingleResource": bundle_output} + for resource_type, records in bundle_output.items(): + results.setdefault(resource_type, []) + results[resource_type] += records + dfs = { + resource_type: pd.DataFrame(results[resource_type]).dropna( + axis=1, how="all" + ) + for resource_type in results + } + if always_return_dict: + return dfs + else: + return list(dfs.values())[0] if len(dfs) == 1 else dfs def _query_to_dataframe( self, @@ -1447,12 +1400,12 @@ def _query_to_dataframe( def wrap( process_function: Callable[[FHIRObj], Any] = flatten_data, fhir_paths: List[Union[str, Tuple[str, str]]] = None, - merge_on: str = None, build_df_after_query: bool = False, disable_multiprocessing_build: bool = False, + always_return_dict: bool = False, *args: Any, **kwargs: Any, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: with logging_redirect_tqdm(): if fhir_paths is not None: logger.info( @@ -1465,9 +1418,9 @@ def wrap( *args, **kwargs, tqdm_df_build=not build_df_after_query ), process_function=process_function, - merge_on=merge_on, build_df_after_query=build_df_after_query, disable_multiprocessing=disable_multiprocessing_build, + always_return_dict=always_return_dict, ) return wrap @@ -1478,9 +1431,8 @@ def query_to_dataframe( process_function: Callable[[FHIRObj], Any] = flatten_data, fhir_paths: List[Union[str, Tuple[str, str]]] = None, build_df_after_query: bool = False, - merge_on: str = None, **kwargs: Any, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Wrapper function that given any of the functions that return bundles, builds the DataFrame straight away. @@ -1530,7 +1482,6 @@ def query_to_dataframe( **kwargs, process_function=process_function, fhir_paths=fhir_paths, - merge_on=merge_on, build_df_after_query=build_df_after_query, ) elif bundles_function == self.sail_through_search_space: @@ -1538,7 +1489,6 @@ def query_to_dataframe( **kwargs, process_function=process_function, fhir_paths=fhir_paths, - merge_on=merge_on, build_df_after_query=build_df_after_query, ) elif bundles_function == self.trade_rows_for_bundles: @@ -1547,7 +1497,6 @@ def query_to_dataframe( process_function=process_function, fhir_paths=fhir_paths, with_ref=False, - merge_on=merge_on, build_df_after_query=build_df_after_query, ) else: diff --git a/fhir_pyrate/util/bundle_processing_templates.py b/fhir_pyrate/util/bundle_processing_templates.py index ab953b7..1611f08 100644 --- a/fhir_pyrate/util/bundle_processing_templates.py +++ b/fhir_pyrate/util/bundle_processing_templates.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -def flatten_data(bundle: FHIRObj, col_sep: str = "_") -> List[Dict]: +def flatten_data(bundle: FHIRObj, col_sep: str = "_") -> Dict[str, List[Dict]]: """ Preprocessing function that goes through the JSON bundle and returns lists of dictionaries for all possible attributes @@ -16,14 +16,17 @@ def flatten_data(bundle: FHIRObj, col_sep: str = "_") -> List[Dict]: :param col_sep: The separator to use to generate the column names for the DataFrame :return: A dictionary containing the parsed information """ - records = [] + records: Dict[str, List[Dict[str, Any]]] = {} for entry in bundle.entry or []: resource = entry.resource + records.setdefault(resource.resourceType, []) base_dict: Dict[str, Any] = {} recurse_resource( resource=resource, base_dict=base_dict, field_name="", col_sep=col_sep ) - records.append(base_dict) + records[resource.resourceType].append( + {k: v for k, v in base_dict.items() if v is not None} + ) return records @@ -66,7 +69,7 @@ def recurse_resource( def parse_fhir_path( bundle: FHIRObj, compiled_fhir_paths: List[Tuple[str, Callable]] -) -> List[Dict]: +) -> Dict[str, List[Dict]]: """ Preprocessing function that goes through the JSON bundle and returns lists of dictionaries for all possible attributes, which have been specified using a list of compiled FHIRPath @@ -80,9 +83,10 @@ def parse_fhir_path( functions for notes on how to use the FHIR paths. :return: A dictionary containing the parsed information """ - records = [] + records: Dict[str, List[Dict[str, Any]]] = {} for entry in bundle.entry or []: resource = entry.resource + records.setdefault(resource.resourceType, []) base_dict: Dict[str, Any] = {} for name, compiled_path in compiled_fhir_paths: result = compiled_path(resource=resource.to_dict()) @@ -98,5 +102,7 @@ def parse_fhir_path( base_dict[name] = None elif len(base_dict[name]) == 1: base_dict[name] = next(iter(base_dict[name])) - records.append(base_dict) + records[resource.resourceType].append( + {k: v for k, v in base_dict.items() if v is not None} + ) return records diff --git a/pyproject.toml b/pyproject.toml index 1e9cbf7..cf9b954 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fhir-pyrate" -version = "0.2.0-beta.7" +version = "0.2.0-beta.8" description = "FHIR-PYrate is a package that provides a high-level API to query FHIR Servers for bundles of resources and return the structured information as pandas DataFrames. It can also be used to filter resources using RegEx and SpaCy and download DICOM studies and series." license = "MIT" authors = ["Rene Hosch ", "Giulia Baldini "] diff --git a/tests/test_public.py b/tests/test_public.py index 2fc1fd4..c25961e 100644 --- a/tests/test_public.py +++ b/tests/test_public.py @@ -51,8 +51,24 @@ def decode_text(text: str) -> str: return str(div.text) -def get_diagnostic_text(bundle: FHIRObj) -> List[Dict]: - records = [] +def get_diagnostic_text(bundle: FHIRObj) -> Dict[str, List[Dict]]: + records: Dict[str, List[Dict]] = {"DiagnosticReport": []} + for entry in bundle.entry or []: + resource = entry.resource + records["DiagnosticReport"].append( + { + "fhir_diagnostic_report_id": resource.id, + "report_status": resource.text.status + if resource.text is not None + else None, + "report_text": resource.text.div if resource.text is not None else None, + } + ) + return records + + +def get_diagnostic_text_list(bundle: FHIRObj) -> List[Dict]: + records: List[Dict] = [] for entry in bundle.entry or []: resource = entry.resource records.append( @@ -67,8 +83,8 @@ def get_diagnostic_text(bundle: FHIRObj) -> List[Dict]: return records -def get_observation_info(bundle: FHIRObj) -> List[Dict]: - records = [] +def get_observation_info(bundle: FHIRObj) -> Dict[str, List[Dict]]: + records: Dict[str, List[Dict]] = {"Observation": []} for entry in bundle.entry or []: resource = entry.resource # Store the ID @@ -83,7 +99,7 @@ def get_observation_info(bundle: FHIRObj) -> List[Dict]: # If the component is a valueQuantity, get the value base_dict[resource_name] = component.valueQuantity.value base_dict[resource_name + " Unit"] = component.valueQuantity.unit - records.append(base_dict) + records["Observation"].append(base_dict) return records @@ -178,6 +194,7 @@ def testServers(self) -> None: ("patient", f"{patient_ref}.reference"), ], ) + assert isinstance(condition_df, pd.DataFrame) condition_df.dropna(axis=0, inplace=True, how="any") assert len(condition_df) > 0 diagnostic_df = search.trade_rows_for_dataframe( @@ -191,6 +208,7 @@ def testServers(self) -> None: }, process_function=get_diagnostic_text, ) + assert isinstance(diagnostic_df, pd.DataFrame) if len(diagnostic_df) > 0: diagnostic_df.dropna( subset=["report_text"], @@ -242,6 +260,7 @@ def testExample1(self) -> None: ("patient", "subject.reference.replace('Patient/', ''"), ], ) + assert isinstance(observation_values, pd.DataFrame) assert len(observation_values) == 1 assert ( observation_values.iloc[0, 2] == 6.079781499882176 @@ -281,7 +300,9 @@ def testExample3(self) -> None: ("value", "component.valueQuantity.value"), ("unit", "component.valueQuantity.unit"), ], - ).explode( + ) + assert isinstance(observation_df, pd.DataFrame) + observation_df = observation_df.explode( [ "test", "value", @@ -336,6 +357,26 @@ def testExample4(self) -> None: ) assert sum(df_filtered["text_found"]) == 33 + def testExample4WithList(self) -> None: + diagnostic_df = self.search.steal_bundles_to_dataframe( + resource_type="DiagnosticReport", + request_params={ + "_count": 100, + "_lastUpdated": "ge2021", + }, + process_function=get_diagnostic_text_list, # Use processing function + ) + assert len(diagnostic_df) > 47 + miner = Miner( + target_regex="Metabolic", decode_text=decode_text, num_processes=1 + ) + df_filtered = miner.nlp_on_dataframe( + diagnostic_df, + text_column_name="report_text", + new_column_name="text_found", + ) + assert sum(df_filtered["text_found"]) == 33 + class TestPirate(unittest.TestCase): def testCaching(self) -> None: @@ -360,10 +401,12 @@ def testCaching(self) -> None: build_df_after_query=False, ) obs_df_1 = search.trade_rows_for_dataframe(**params) + assert isinstance(obs_df_1, pd.DataFrame) time_1 = time.time() - init print("First run, caching", time_1, obs_df_1.shape) init = time.time() obs_df_2 = search.trade_rows_for_dataframe(**params) + assert isinstance(obs_df_2, pd.DataFrame) time_2 = time.time() - init print("Second run, retrieve", time_2, obs_df_2.shape) assert time_2 < time_1 @@ -407,6 +450,7 @@ def testStealBundles(self) -> None: num_pages=5, build_df_after_query=build_after_query, ) + assert isinstance(obs_df1, pd.DataFrame) assert obs_df1.equals(obs_df2) search.close() @@ -456,11 +500,13 @@ def testSail(self) -> None: date_end="2022-01-01", build_df_after_query=build_after_query, ) + assert isinstance(obs_df1, pd.DataFrame) sorted_obs1 = ( obs_df1.sort_index(axis=1) .sort_values(by="id") .reset_index(drop=True) ) + assert isinstance(obs_df2, pd.DataFrame) sorted_obs2 = ( obs_df2.sort_index(axis=1) .sort_values(by="id") @@ -528,11 +574,13 @@ def testTrade(self) -> None: request_params={"_lastUpdated": "ge2020"}, build_df_after_query=build_after_query, ) + assert isinstance(obs_df1, pd.DataFrame) sorted_obs1 = ( obs_df1.sort_index(axis=1) .sort_values(by="id") .reset_index(drop=True) ) + assert isinstance(obs_df2, pd.DataFrame) sorted_obs2 = ( obs_df2.sort_index(axis=1) .sort_values(by="id") @@ -572,6 +620,7 @@ def tearDown(self) -> None: self.search.close() def testRefDiagnosticSubject(self) -> None: + assert isinstance(self.condition_df, pd.DataFrame) condition_df_pat = self.condition_df.loc[ ~self.condition_df["patient_id"].isna() ] @@ -586,6 +635,7 @@ def testRefDiagnosticSubject(self) -> None: assert len(diagnostic_df) > 0 def testRefPatientSubject(self) -> None: + assert isinstance(self.condition_df, pd.DataFrame) condition_df_pat = self.condition_df.loc[ ~self.condition_df["patient_id"].isna() ]