diff --git a/src/starfile/functions.py b/src/starfile/functions.py index 4a1be42..b563f8a 100644 --- a/src/starfile/functions.py +++ b/src/starfile/functions.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Dict, List, Union +from typing import TYPE_CHECKING, Dict, List, Union, Optional if TYPE_CHECKING: import pandas as pd @@ -15,15 +15,32 @@ from os import PathLike -def read(filename: PathLike, read_n_blocks: int = None, always_dict: bool = False): - """ - Read a star file into a pandas dataframe or dict of pandas dataframes +def read( + filename: PathLike, + read_n_blocks: Optional[int] = None, + always_dict: bool = False, + parse_as_string: List[str] = [] +) -> Union[DataBlock, Dict[DataBlock]]: + """Read data from a STAR file. - default behaviour in the case of only one data block being present in the STAR file is to - return only a dataframe, this can be changed by setting 'always_dict=True' - """ + Basic data blocks are read as dictionaries. Loop blocks are read as pandas + dataframes. When multiple data blocks are present a dictionary of datablocks is + returned. When a single datablock is present only the block is returned by default. + To force returning a dectionary even when only one datablock is present set + `always_dict=True`. - parser = StarParser(filename, n_blocks_to_read=read_n_blocks) + Parameters + ---------- + filename: PathLike + File from which to read data. + read_n_blocks: int | None + Limit reading the file to the first n data blocks. + always_dict: bool + Always return a dictionary, even when only a single data block is present. + parse_as_string: list[str] + A list of keys or column names which will not be coerced to numeric values. + """ + parser = StarParser(filename, n_blocks_to_read=read_n_blocks, parse_as_string=parse_as_string) if len(parser.data_blocks) == 1 and always_dict is False: return list(parser.data_blocks.values())[0] else: @@ -38,9 +55,24 @@ def write( na_rep: str = '', quote_character: str = '"', quote_all_strings: bool = False, - **kwargs, + **kwargs ): - """Write data blocks as STAR files.""" + """Write data to disk in the STAR format. + + Parameters + ---------- + data: DataBlock | Dict[str, DataBlock] | List[DataBlock] + Data to be saved to file. DataBlocks are dictionaries or dataframes. + If a dictionary of datablocks are passed the keys will be the data block names. + filename: PathLike + Path where the file will be saved. + float_format: str + Float format string which will be passed to pandas. + sep: str + Separator between values, will be passed to pandas. + na_rep: str + Representation of null values, will be passed to pandas. + """ StarWriter( data, filename=filename, diff --git a/src/starfile/parser.py b/src/starfile/parser.py index 4de697d..c21a8c2 100644 --- a/src/starfile/parser.py +++ b/src/starfile/parser.py @@ -8,18 +8,13 @@ import numpy as np import pandas as pd from pathlib import Path -from typing import TYPE_CHECKING, Union, Optional, Dict, Tuple +from typing import TYPE_CHECKING, Union, Optional, Dict, Tuple, List from starfile.typing import DataBlock if TYPE_CHECKING: from os import PathLike -def _apply_numeric(col: pd.Series) -> pd.Series: - try: - return pd.to_numeric(col) - except ValueError: - return col class StarParser: filename: Path @@ -27,8 +22,14 @@ class StarParser: n_blocks_to_read: int current_line_number: int data_blocks: Dict[DataBlock] - - def __init__(self, filename: PathLike, n_blocks_to_read: Optional[int] = None): + parse_as_string: List[str] + + def __init__( + self, + filename: PathLike, + n_blocks_to_read: Optional[int] = None, + parse_as_string: List[str] = [], + ): # set filename, with path checking filename = Path(filename) if not filename.exists(): @@ -39,6 +40,7 @@ def __init__(self, filename: PathLike, n_blocks_to_read: Optional[int] = None): self.data_blocks = {} self.n_lines_in_file = count_lines(self.filename) self.n_blocks_to_read = n_blocks_to_read + self.parse_as_string = parse_as_string # parse file self.current_line_number = 0 @@ -78,7 +80,15 @@ def _parse_simple_block(self) -> Dict[str, Union[str, int, float]]: break elif self.current_line.startswith('_'): # '_foo bar' k, v = shlex.split(self.current_line) - block[k[1:]] = numericise(v) + column_name = k[1:] + parse_column_as_string = ( + self.parse_as_string is not None + and any(column_name == col for col in self.parse_as_string) + ) + if parse_column_as_string is True: + block[column_name] = v + else: + block[column_name] = numericise(v) self.current_line_number += 1 return block @@ -108,18 +118,27 @@ def _parse_loop_block(self) -> pd.DataFrame: n_cols = len(loop_column_names) df = pd.DataFrame(np.zeros(shape=(0, n_cols))) else: + column_name_to_index = {col: idx for idx, col in enumerate(loop_column_names)} df = pd.read_csv( StringIO(loop_data.replace("'", '"')), delimiter=r'\s+', header=None, comment='#', - keep_default_na=False + dtype={column_name_to_index[k]: str for k in self.parse_as_string if k in loop_column_names}, + keep_default_na=False, + engine='c', ) + df.columns = loop_column_names + + # Numericise all columns in temporary copy df_numeric = df.apply(_apply_numeric) - # Replace columns that are all NaN with the original string columns + + # Replace columns that are all NaN with the original columns df_numeric[df_numeric.columns[df_numeric.isna().all()]] = df[df_numeric.columns[df_numeric.isna().all()]] - df = df_numeric - df.columns = loop_column_names + + # Replace columns that should be strings + for col in df.columns: + df[col] = df_numeric[col] if col not in self.parse_as_string else df[col] return df @@ -150,3 +169,10 @@ def numericise(value: str) -> Union[str, int, float]: # If it's not a float either, leave it as a string value = value return value + + +def _apply_numeric(col: pd.Series) -> pd.Series: + try: + return pd.to_numeric(col) + except ValueError: + return col diff --git a/tests/test_parsing.py b/tests/test_parsing.py index 4df542f..a94bf9e 100644 --- a/tests/test_parsing.py +++ b/tests/test_parsing.py @@ -243,11 +243,10 @@ def test_empty_loop_block(): assert len(parser.data_blocks) == 1 - -@pytest.mark.parametrize("quote_character, filename", [("'",basic_single_quote), - ('"',basic_double_quote), - ]) -def test_quote_basic(quote_character,filename): +@pytest.mark.parametrize("quote_character, filename", [("'", basic_single_quote), + ('"', basic_double_quote), + ]) +def test_quote_basic(quote_character, filename): parser = StarParser(filename) assert len(parser.data_blocks) == 1 assert parser.data_blocks['']['no_quote_string'] == "noquote" @@ -255,22 +254,36 @@ def test_quote_basic(quote_character,filename): assert parser.data_blocks['']['whitespace_string'] == " " assert parser.data_blocks['']['empty_string'] == "" -@pytest.mark.parametrize("quote_character, filename", [("'",loop_single_quote), - ('"',loop_double_quote), - ]) -def test_quote_loop(quote_character,filename): + +@pytest.mark.parametrize("quote_character, filename", [("'", loop_single_quote), + ('"', loop_double_quote), + ]) +def test_quote_loop(quote_character, filename): import math parser = StarParser(filename) assert len(parser.data_blocks) == 1 - assert parser.data_blocks[''].loc[0,'no_quote_string'] == "noquote" - assert parser.data_blocks[''].loc[0,'quote_string'] == "quote string" - assert parser.data_blocks[''].loc[0,'whitespace_string'] == " " - assert parser.data_blocks[''].loc[0,'empty_string'] == "" + assert parser.data_blocks[''].loc[0, 'no_quote_string'] == "noquote" + assert parser.data_blocks[''].loc[0, 'quote_string'] == "quote string" + assert parser.data_blocks[''].loc[0, 'whitespace_string'] == " " + assert parser.data_blocks[''].loc[0, 'empty_string'] == "" assert parser.data_blocks[''].dtypes['number_and_string'] == object assert parser.data_blocks[''].dtypes['number_and_empty'] == 'float64' assert parser.data_blocks[''].dtypes['number'] == 'float64' assert parser.data_blocks[''].dtypes['empty_string_and_normal_string'] == object - assert math.isnan(parser.data_blocks[''].loc[1,'number_and_empty']) - assert parser.data_blocks[''].loc[0,'empty_string_and_normal_string'] == '' + assert math.isnan(parser.data_blocks[''].loc[1, 'number_and_empty']) + assert parser.data_blocks[''].loc[0, 'empty_string_and_normal_string'] == '' + + +def test_parse_as_string(): + parser = StarParser(postprocess, parse_as_string=['rlnFinalResolution', 'rlnResolution']) + + # check 'rlnFinalResolution' is parsed as string in general (basic) block + block = parser.data_blocks['general'] + assert type(block['rlnFinalResolution']) == str + + # check 'rlnResolution' is parsed as string in fsc (loop) block + df = parser.data_blocks['fsc'] + assert df['rlnResolution'].dtype == 'object' +