From acdee53d5d620a7a38c160c22e887d1b9ef248aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zo=C3=AB=20Bilodeau?= <70441641+zbilodea@users.noreply.github.com> Date: Fri, 19 Jul 2024 18:35:59 +0200 Subject: [PATCH] feat: merge_parquet (#97) * Made parquet merge, works but needs max_files_open? * added to test * Fixed for nested arrays, added test * globbing --- src/hepconvert/merge.py | 223 ++++++++++++++++++++ tests/test_merge_parquet.py | 112 ++++++++++ tests/{test_merge.py => test_merge_root.py} | 0 3 files changed, 335 insertions(+) create mode 100644 tests/test_merge_parquet.py rename tests/{test_merge.py => test_merge_root.py} (100%) diff --git a/src/hepconvert/merge.py b/src/hepconvert/merge.py index 7613767..eb95164 100644 --- a/src/hepconvert/merge.py +++ b/src/hepconvert/merge.py @@ -14,6 +14,229 @@ from hepconvert.histogram_adding import _hadd_1d, _hadd_2d, _hadd_3d +def merge_parquet( + out_file, + in_files, + *, + # max_files=2, + force=False, + list_to32=False, + string_to32=True, + bytestring_to32=True, + emptyarray_to=None, + categorical_as_dictionary=False, + extensionarray=True, + count_nulls=True, + compression="zstd", + compression_level=None, + row_group_size=64 * 1024 * 1024, + data_page_size=None, + parquet_flavor=None, + parquet_version="2.4", + parquet_page_version="1.0", + parquet_metadata_statistics=True, + parquet_dictionary_encoding=False, + parquet_byte_stream_split=False, + parquet_coerce_timestamps=None, + parquet_old_int96_timestamps=None, + parquet_compliant_nested=False, + parquet_extra_options=None, + storage_options=None, + skip_bad_files=False, +): + """Merges Parquet files together. + + Args: + :param destination: Name of the output file or file path. + :type destination: path-like + :param files: List of local Parquet files to merge. + May contain glob patterns. + :type files: str or list of str + :param list_to32: If True, convert Awkward lists into 32-bit Arrow lists if they're small enough, even if it means an extra conversion. + Otherwise, signed 32-bit ak.types.ListType maps to Arrow ListType, signed 64-bit ak.types.ListType maps to Arrow LargeListType, and + unsigned 32-bit ak.types.ListType picks whichever Arrow type its values fit into. Command line option ``--list-to32``. + :type list_to32: bool + :param string_to32: Same as the above for Arrow string and ``large_string``. Command line option: ``--string-to32``. + :type string_to32: bool + :param bytestring_to32: Same as the above for Arrow binary and ``large_binary``. Command line option: ``--bytestring-to32``. + :type bytestring_to32: bool + :param emptyarray_to: If None, #ak.types.UnknownType maps to Arrow's + null type; otherwise, it is converted a given numeric dtype. Command line option: ``--emptyarray-to``. + :type emptyarray_to: None or dtype + :param categorical_as_dictionary: If True, #ak.contents.IndexedArray and + #ak.contents.IndexedOptionArray labeled with ``__array__ = "categorical"`` + are mapped to Arrow `DictionaryArray`; otherwise, the projection is + evaluated before conversion (always the case without + `__array__ = "categorical"`). Command line option: ``--categorical-as-dictionary``. + :type categorical_as_dictionary: bool + :param extensionarray: If True, this function returns extended Arrow arrays + (at all levels of nesting), which preserve metadata so that Awkward \u2192 + Arrow \u2192 Awkward preserves the array's #ak.types.Type (though not + the #ak.forms.Form). If False, this function returns generic Arrow arrays + that might be needed for third-party tools that don't recognize Arrow's + extensions. Even with `extensionarray=False`, the values produced by + Arrow's `to_pylist` method are the same as the values produced by Awkward's + #ak.to_list. Command line option: ``--extensionarray``. + :type extensionarray: bool + :param count_nulls: If True, count the number of missing values at each level + and include these in the resulting Arrow array, which makes some downstream + applications faster. If False, skip the up-front cost of counting them. + Command line option: ``--count-nulls``. + :type count_nulls: bool + :param compression: Compression algorithm name, passed to + `pyarrow.parquet.ParquetWriter `__. + Parquet supports `{"NONE", "SNAPPY", "GZIP", "BROTLI", "LZ4", "ZSTD"}` + (where `"GZIP"` is also known as "zlib" or "deflate"). If a dict, the keys + are column names (the same column names that #ak.forms.Form.columns returns + and #ak.forms.Form.select_columns accepts) and the values are compression + algorithm names, to compress each column differently. Command line option: ``--compression``. + :type compression: None, str, or dict + :param compression_level: Compression level, passed to + `pyarrow.parquet.ParquetWriter `__. + Compression levels have different meanings for different compression + algorithms: GZIP ranges from 1 to 9, but ZSTD ranges from -7 to 22, for + example. Generally, higher numbers provide slower but smaller compression. Command line option + ``--compression-level``. + :type compression_level: None, int, or dict None + :param row_group_size: Maximum number of entries in each row group, + passed to `pyarrow.parquet.ParquetWriter.write_table `__. + If None, the Parquet default of 64 MiB is used. Command line options: ``-rg`` or ``--row-group-size``. + :type row_group_size: int or None + :param data_page_size: Number of bytes in each data page, passed to + `pyarrow.parquet.ParquetWriter `__. + If None, the Parquet default of 1 MiB is used. Command line option: ``--data-page-size``. + :type data_page_size: None or int + :param parquet_flavor: If None, the output Parquet file will follow + Arrow conventions; if `"spark"`, it will follow Spark conventions. Some + systems, such as Spark and Google BigQuery, might need Spark conventions, + while others might need Arrow conventions. Passed to + `pyarrow.parquet.ParquetWriter `__. + as `flavor`. Command line option: ``--parquet-flavor``. + :type parquet_flavor: None or `"spark"` + :param parquet_version: Parquet file format version. + Passed to `pyarrow.parquet.ParquetWriter `__. + as `version`. Command line option: ``--parquet-version``. + :type parquet_version: `"1.0"`, `"2.4"`, or `"2.6"` + :param parquet_page_version: Parquet page format version. + Passed to `pyarrow.parquet.ParquetWriter `__. + as `data_page_version`. Command line option: ``--parquet-page-version``. + :type parquet_page_version: `"1.0"` or `"2.0"` + :param parquet_metadata_statistics: If True, include summary + statistics for each data page in the Parquet metadata, which lets some + applications search for data more quickly (by skipping pages). If a dict + mapping column names to bool, include summary statistics on only the + specified columns. Passed to + `pyarrow.parquet.ParquetWriter `__. + as `write_statistics`. Command line option: ``--parquet-metadata-statistics``. + :type parquet_metadata_statistics: bool or dict + :param parquet_dictionary_encoding: If True, allow Parquet to pre-compress + with dictionary encoding. If a dict mapping column names to bool, only + use dictionary encoding on the specified columns. Passed to + `pyarrow.parquet.ParquetWriter `__. + as `use_dictionary`. Command line option: ``--parquet-dictionary-encoding``. + :type parquet_dictionary_encoding: bool or dict + :param parquet_byte_stream_split: If True, pre-compress floating + point fields (`float32` or `float64`) with byte stream splitting, which + collects all mantissas in one part of the stream and exponents in another. + Passed to `pyarrow.parquet.ParquetWriter `__. + as `use_byte_stream_split`. Command line option: ``--parquet-byte-stream-split``. + :type parquet_byte_stream_split: bool or dict + :param parquet_coerce_timestamps: If None, any timestamps + (`datetime64` data) are coerced to a given resolution depending on + `parquet_version`: version `"1.0"` and `"2.4"` are coerced to microseconds, + but later versions use the `datetime64`'s own units. If `"ms"` is explicitly + specified, timestamps are coerced to milliseconds; if `"us"`, microseconds. + Passed to `pyarrow.parquet.ParquetWriter `__. + as `coerce_timestamps`. Command line option: ``--parquet-coerce-timestamps``. + :type parquet_coerce_timestamps: None, `"ms"`, or `"us"` + :param parquet_old_int96_timestamps: If True, use Parquet's INT96 format + for any timestamps (`datetime64` data), taking priority over `parquet_coerce_timestamps`. + If None, let the `parquet_flavor` decide. Passed to + `pyarrow.parquet.ParquetWriter `__. + as `use_deprecated_int96_timestamps`. Command line option: ``--parquet-old-int96-timestamps``. + :type parquet_old_int96_timestamps: None or bool + :param parquet_compliant_nested: If True, use the Spark/BigQuery/Parquet + `convention for nested lists `__, + in which each list is a one-field record with field name "`element`"; + otherwise, use the Arrow convention, in which the field name is "`item`". + Passed to `pyarrow.parquet.ParquetWriter `__. + as `use_compliant_nested_type`. Command line option: ``--parquet-compliant-nested``. + :type parquet_compliated_nested: bool + :param parquet_extra_options: Any additional options to pass to + `pyarrow.parquet.ParquetWriter `__. + :type parquet_extra_options: None or dict + :param storage_options: Any additional options to pass to + `fsspec.core.url_to_fs `__ + to open a remote file for writing. + :type storage_options: None or dict + + Examples: + --------- + Converts a TTree from a ROOT file to a Parquet File. + + >>> hepconvert.root_to_parquet(in_file="file.root", out_file="file.parquet") + + Command Line Instructions: + -------------------------- + This function can be run from the command line. Use command + + .. code-block:: bash + + """ + if not isinstance(in_files, list) and not isinstance(in_files, tuple): + path = Path(in_files) + in_files = sorted(path.glob("**/*.root")) + if len(in_files) < 2: + msg = f"Must have at least 2 files to merge, not {len(in_files)} files." + raise AttributeError(msg) + path = Path(out_file) + if Path.is_file(path) and not force: + raise FileExistsError + + data = False + for file in in_files: + try: + ak.metadata_from_parquet(file) + except FileNotFoundError: + if skip_bad_files: + continue + msg = "File: {file} does not exist or is corrupt." + raise FileNotFoundError(msg) from None + if isinstance(data, bool): + data = ak.from_parquet(file) + else: + data = ak.merge_union_of_records( + ak.concatenate((data, ak.from_parquet(file))), axis=0 + ) + + ak.to_parquet( + data, + out_file, + list_to32=list_to32, + string_to32=string_to32, + bytestring_to32=bytestring_to32, + emptyarray_to=emptyarray_to, + categorical_as_dictionary=categorical_as_dictionary, + extensionarray=extensionarray, + count_nulls=count_nulls, + compression=compression, + compression_level=compression_level, + row_group_size=row_group_size, + data_page_size=data_page_size, + parquet_flavor=parquet_flavor, + parquet_version=parquet_version, + parquet_page_version=parquet_page_version, + parquet_metadata_statistics=parquet_metadata_statistics, + parquet_dictionary_encoding=parquet_dictionary_encoding, + parquet_byte_stream_split=parquet_byte_stream_split, + parquet_coerce_timestamps=parquet_coerce_timestamps, + parquet_old_int96_timestamps=parquet_old_int96_timestamps, + parquet_compliant_nested=parquet_compliant_nested, + parquet_extra_options=parquet_extra_options, + storage_options=storage_options, + ) + + def merge_root( destination, files, diff --git a/tests/test_merge_parquet.py b/tests/test_merge_parquet.py new file mode 100644 index 0000000..7e4a020 --- /dev/null +++ b/tests/test_merge_parquet.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from pathlib import Path + +import awkward as ak +import pytest + +from hepconvert import merge, root_to_parquet + +skhep_testdata = pytest.importorskip("skhep_testdata") + + +def simple_test(tmp_path): + arr1 = ak.Array( + { + "a": [ + 1, + 2, + ], + "b": [ + 1, + 2, + ], + "c": [ + 1, + 2, + ], + } + ) + ak.to_parquet(arr1, Path(tmp_path / "arr1.parquet")) + arr2 = ak.Array( + { + "a": [7, 8, 9], + "b": [ + 3, + 4, + 5, + ], + } + ) + ak.to_parquet(arr2, Path(tmp_path / "arr2.parquet")) + arr3 = ak.Array( + { + "a": [10, 11, 12, 13, 14], + "c": [3, 4, 5, 6, 7], + "d": [1, 2, 3, 4, 5], + } + ) + ak.to_parquet(arr3, Path(tmp_path / "arr3.parquet")) + + merge.merge_parquet( + Path(tmp_path / "new.parquet"), + [ + Path(tmp_path / "arr1.parquet"), + Path(tmp_path / "arr2.parquet"), + Path(tmp_path / "arr3.parquet"), + ], + force=True, + ) + array = ak.from_parquet(Path(tmp_path / "new.parquet")) + assert ak.all(array["a"] == [1, 2, 7, 8, 9, 10, 11, 12, 13, 14]) + assert ak.all( + array["b"] + == [ + 1, + 2, + 3, + 4, + 5, + None, + None, + None, + None, + None, + ] + ) + assert ak.all(array["c"] == [1, 2, None, None, None, 3, 4, 5, 6, 7]) + assert ak.all( + array["d"] + == [ + None, + None, + None, + None, + None, + 1, + 2, + 3, + 4, + 5, + ] + ) + + +def HZZ_test(tmp_path): + merge.merge_parquet( + Path(tmp_path / "/merged_hzz.parquet"), + [ + Path(tmp_path / "/uproot-HZZ.parquet"), + Path(tmp_path / "/uproot-HZZ.parquet"), + ], + force=True, + ) + new_arrays = ak.from_parquet(Path(tmp_path / "/new.parquet")) + root_to_parquet( + Path(tmp_path / "/merged_hzz.root"), + Path(tmp_path / "/merged_hzz.parquet"), + force=True, + ) + test = ak.from_parquet(Path(tmp_path / "/merged_hzz.parquet")) + for key in new_arrays.fields: + assert ak.all(new_arrays[key] == test[key]) diff --git a/tests/test_merge.py b/tests/test_merge_root.py similarity index 100% rename from tests/test_merge.py rename to tests/test_merge_root.py