diff --git a/.gitignore b/.gitignore index 9af5601..d97434b 100644 --- a/.gitignore +++ b/.gitignore @@ -142,3 +142,4 @@ dmypy.json .DS_Store tests/wip.py +dataproc-init-dev.sh diff --git a/src/dnarecords/reader.py b/src/dnarecords/reader.py index 453dece..fc1670c 100644 --- a/src/dnarecords/reader.py +++ b/src/dnarecords/reader.py @@ -75,13 +75,22 @@ def _types_dict(): 'double': tf.float32} @staticmethod - def _pandas_safe_read_parquet(path): + def _pandas_safe_read_parquet(path, columns, taste): import pandas as pd import tensorflow as tf files = tf.io.gfile.glob(f'{path}/*.parquet') if files: - return pd.concat(pd.read_parquet(f) for f in files) + if columns: + if taste: + return pd.read_parquet(files[0], columns=columns) + else: + return pd.concat(pd.read_parquet(f, columns=columns) for f in files) + if not columns: + if taste: + return pd.read_parquet(files[0]) + else: + return pd.concat(pd.read_parquet(f) for f in files) return None @staticmethod @@ -94,11 +103,14 @@ def _pandas_safe_read_json(path): return pd.concat(pd.read_json(f) for f in files) return None - def metadata(self) -> Dict[str, DataFrame]: + def metadata(self, vkeys_columns: List[str] = None, skeys_columns: List[str] = None, taste: bool = False) -> Dict[str, DataFrame]: """Gets the metadata associated to the DNARecords dataset as a dictionary of names to pandas DataFrames. :rtype: Dict[str, DataFrame]. :return: the metadata associated to the DNARecords as a dictionary of names to pandas DataFrames. + :param vkeys_columns: columns to return from variant metadata files (potentially big files). Defaults to None (all columns). + :param skeys_columns: columns to return from sample metadata files (potentially big files). Defaults to None (all columns). + :param taste: The full metadata DataFrames could be huge, wo you can get a taste of them without going into memory issues. With that, decide wich columns to get metadata for. Defaults to False. See Also -------- @@ -109,8 +121,12 @@ def metadata(self) -> Dict[str, DataFrame]: result = {} tree = dr.helper.DNARecordsUtils.dnarecords_tree(self._dnarecords_path) for k, v in tree.items(): - if k in ['skeys', 'vkeys', 'swpfs', 'vwpfs', 'swrfs', 'vwrfs']: - result.update({k: self._pandas_safe_read_parquet(v)}) + if k == 'skeys': + result.update({k: self._pandas_safe_read_parquet(v, skeys_columns, taste)}) + if k == 'vkeys': + result.update({k: self._pandas_safe_read_parquet(v, vkeys_columns, taste)}) + if k in ['swpfs', 'vwpfs', 'swrfs', 'vwrfs']: + result.update({k: self._pandas_safe_read_parquet(v, None, False)}) if k in ['swpsc', 'vwpsc', 'swrsc', 'vwrsc']: result.update({k: self._pandas_safe_read_json(v)}) return result diff --git a/src/dnarecords/writer.py b/src/dnarecords/writer.py index bd6078a..2f21d0c 100644 --- a/src/dnarecords/writer.py +++ b/src/dnarecords/writer.py @@ -62,7 +62,7 @@ class DNARecordsWriter: ... :param expr: a Hail expression. Currently, ony expressions coercible to numeric are supported - :param block_size: block size to handle transposing the matrix + :param block_size: entries per block in the internal operations :param staging: path to staging directory to use for intermediate data. Default: /tmp/dnarecords/staging. """ from typing import TYPE_CHECKING @@ -75,12 +75,13 @@ class DNARecordsWriter: _j_blocks: set _nrows: int _ncols: int + _sparsity: float _chrom_ranges: dict _mt: 'MatrixTable' _skeys: 'DataFrame' _vkeys: 'DataFrame' - def __init__(self, expr: 'Expression', block_size=(10000, 10000), staging: str = '/tmp/dnarecords/staging'): + def __init__(self, expr: 'Expression', block_size: int = int(1e6), staging: str = '/tmp/dnarecords/staging'): self._assert_expr_type(expr) self._expr = expr self._block_size = block_size @@ -144,13 +145,26 @@ def _set_max_nrows_ncols(self): self._nrows = self._mt.count_rows() self._ncols = self._mt.count_cols() + def _set_sparsity(self): + mts = self._mt.head(10000, None) + entries = mts.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null').count() + self._sparsity = entries / (mts.count_rows() * mts.count_cols()) + + def _get_block_size(self): + import math + M, N, S = self._nrows + 1, self._ncols + 1, self._sparsity + 1e-6 + B = self._block_size / S + m = math.ceil(math.sqrt(B * M / N)) + n = math.ceil(math.sqrt(B * N / M)) + return m, n + def _build_ij_blocks(self): import pyspark.sql.functions as F - m, n = self._block_size + m, n = self._get_block_size() df = self._mt.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null') - df = df.withColumn('ib', F.floor(F.col('i')/F.lit(m))) - df = df.withColumn('jb', F.floor(F.col('j')/F.lit(n))) - df.write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path) + df = df.withColumn('ib', F.expr(f"i div {m}")) + df = df.withColumn('jb', F.expr(f"j div {n}")) + df.repartition('ib', 'jb').write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path) def _set_ij_blocks(self): import re @@ -266,6 +280,7 @@ def _write_dnarecords(output, output_schema, dna_blocks, write_mode, gzip, tfrec if tfrecord_format: df_writer = df_writer.format("tfrecord").option("recordType", "Example") if gzip: + # Needs huge overhead memory df_writer = df_writer.option("codec", "org.apache.hadoop.io.compress.GzipCodec") else: df_writer = df_writer.format('parquet') @@ -319,21 +334,26 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari if not tfrecord_format and not parquet_format: raise Exception('At least one of tfrecord_format, parquet_format must be True') + otree = DNARecordsUtils.dnarecords_tree(output) + self._set_mt() self._index_mt() self._set_vkeys_skeys() self._set_chrom_ranges() self._update_vkeys_by_chrom_ranges() + + self._vkeys.write.mode(write_mode).parquet(otree['vkeys']) + self._skeys.write.mode(write_mode).parquet(otree['skeys']) + self._select_ijv() self._filter_out_undefined_entries() if sparse: self._filter_out_zeroes() self._set_max_nrows_ncols() + self._set_sparsity() self._build_ij_blocks() self._set_ij_blocks() - otree = DNARecordsUtils.dnarecords_tree(output) - if variant_wise: self._build_dna_blocks('i') if tfrecord_format: @@ -355,6 +375,3 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari self._write_dnarecords(otree['swpar'], otree['swpsc'], f'{self._sw_dna_staging}/*', write_mode, gzip, False) self._write_key_files(otree['swpar'], otree['swpfs'], False, write_mode) - - self._vkeys.write.mode(write_mode).parquet(otree['vkeys']) - self._skeys.write.mode(write_mode).parquet(otree['skeys']) diff --git a/tests/test_writer.py b/tests/test_writer.py index 861ea6a..46c98b5 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -2,6 +2,8 @@ import pandas as pd import pytest import dnarecords as dr +import glob +from itertools import groupby def check_dosage_sample_wise(skeys, vkeys, mt, dna, approx): @@ -116,3 +118,11 @@ def test_raises_exception_when_not_tfrecord_format_and_not_parquet_format(dosage with pytest.raises(Exception) as ex_info: dr.writer.DNARecordsWriter(dosage_1kg.dosage).write(dosage_output, tfrecord_format=False, parquet_format=False) assert ex_info.match(r"^At least one of tfrecord_format, parquet_format must be True$") + + +def test_generates_single_file_per_block(dosage_output): + kv_blocks = dosage_output.replace('/output', '/staging/kv-blocks') + pairs = [f.rsplit('/', 1) for f in glob.glob(f"{kv_blocks}/*/*/*.parquet")] + for r, g in groupby(pairs, key=lambda t: t[0]): + files = list(g) + assert len(list(files)) == 1, "kv-blocks with more than one file exists"