Skip to content

Commit

Permalink
Merge pull request #2 from amanas/scalability
Browse files Browse the repository at this point in the history
Scalability
  • Loading branch information
amanas authored May 20, 2022
2 parents ba74281 + 26b2353 commit 896d14b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ dmypy.json
.DS_Store

tests/wip.py
dataproc-init-dev.sh
26 changes: 21 additions & 5 deletions src/dnarecords/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,22 @@ def _types_dict():
'double': tf.float32}

@staticmethod
def _pandas_safe_read_parquet(path):
def _pandas_safe_read_parquet(path, columns, taste):
import pandas as pd
import tensorflow as tf

files = tf.io.gfile.glob(f'{path}/*.parquet')
if files:
return pd.concat(pd.read_parquet(f) for f in files)
if columns:
if taste:
return pd.read_parquet(files[0], columns=columns)
else:
return pd.concat(pd.read_parquet(f, columns=columns) for f in files)
if not columns:
if taste:
return pd.read_parquet(files[0])
else:
return pd.concat(pd.read_parquet(f) for f in files)
return None

@staticmethod
Expand All @@ -94,11 +103,14 @@ def _pandas_safe_read_json(path):
return pd.concat(pd.read_json(f) for f in files)
return None

def metadata(self) -> Dict[str, DataFrame]:
def metadata(self, vkeys_columns: List[str] = None, skeys_columns: List[str] = None, taste: bool = False) -> Dict[str, DataFrame]:
"""Gets the metadata associated to the DNARecords dataset as a dictionary of names to pandas DataFrames.
:rtype: Dict[str, DataFrame].
:return: the metadata associated to the DNARecords as a dictionary of names to pandas DataFrames.
:param vkeys_columns: columns to return from variant metadata files (potentially big files). Defaults to None (all columns).
:param skeys_columns: columns to return from sample metadata files (potentially big files). Defaults to None (all columns).
:param taste: The full metadata DataFrames could be huge, wo you can get a taste of them without going into memory issues. With that, decide wich columns to get metadata for. Defaults to False.
See Also
--------
Expand All @@ -109,8 +121,12 @@ def metadata(self) -> Dict[str, DataFrame]:
result = {}
tree = dr.helper.DNARecordsUtils.dnarecords_tree(self._dnarecords_path)
for k, v in tree.items():
if k in ['skeys', 'vkeys', 'swpfs', 'vwpfs', 'swrfs', 'vwrfs']:
result.update({k: self._pandas_safe_read_parquet(v)})
if k == 'skeys':
result.update({k: self._pandas_safe_read_parquet(v, skeys_columns, taste)})
if k == 'vkeys':
result.update({k: self._pandas_safe_read_parquet(v, vkeys_columns, taste)})
if k in ['swpfs', 'vwpfs', 'swrfs', 'vwrfs']:
result.update({k: self._pandas_safe_read_parquet(v, None, False)})
if k in ['swpsc', 'vwpsc', 'swrsc', 'vwrsc']:
result.update({k: self._pandas_safe_read_json(v)})
return result
Expand Down
39 changes: 28 additions & 11 deletions src/dnarecords/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DNARecordsWriter:
...
:param expr: a Hail expression. Currently, ony expressions coercible to numeric are supported
:param block_size: block size to handle transposing the matrix
:param block_size: entries per block in the internal operations
:param staging: path to staging directory to use for intermediate data. Default: /tmp/dnarecords/staging.
"""
from typing import TYPE_CHECKING
Expand All @@ -75,12 +75,13 @@ class DNARecordsWriter:
_j_blocks: set
_nrows: int
_ncols: int
_sparsity: float
_chrom_ranges: dict
_mt: 'MatrixTable'
_skeys: 'DataFrame'
_vkeys: 'DataFrame'

def __init__(self, expr: 'Expression', block_size=(10000, 10000), staging: str = '/tmp/dnarecords/staging'):
def __init__(self, expr: 'Expression', block_size: int = int(1e6), staging: str = '/tmp/dnarecords/staging'):
self._assert_expr_type(expr)
self._expr = expr
self._block_size = block_size
Expand Down Expand Up @@ -144,13 +145,26 @@ def _set_max_nrows_ncols(self):
self._nrows = self._mt.count_rows()
self._ncols = self._mt.count_cols()

def _set_sparsity(self):
mts = self._mt.head(10000, None)
entries = mts.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null').count()
self._sparsity = entries / (mts.count_rows() * mts.count_cols())

def _get_block_size(self):
import math
M, N, S = self._nrows + 1, self._ncols + 1, self._sparsity + 1e-6
B = self._block_size / S
m = math.ceil(math.sqrt(B * M / N))
n = math.ceil(math.sqrt(B * N / M))
return m, n

def _build_ij_blocks(self):
import pyspark.sql.functions as F
m, n = self._block_size
m, n = self._get_block_size()
df = self._mt.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null')
df = df.withColumn('ib', F.floor(F.col('i')/F.lit(m)))
df = df.withColumn('jb', F.floor(F.col('j')/F.lit(n)))
df.write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path)
df = df.withColumn('ib', F.expr(f"i div {m}"))
df = df.withColumn('jb', F.expr(f"j div {n}"))
df.repartition('ib', 'jb').write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path)

def _set_ij_blocks(self):
import re
Expand Down Expand Up @@ -266,6 +280,7 @@ def _write_dnarecords(output, output_schema, dna_blocks, write_mode, gzip, tfrec
if tfrecord_format:
df_writer = df_writer.format("tfrecord").option("recordType", "Example")
if gzip:
# Needs huge overhead memory
df_writer = df_writer.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
else:
df_writer = df_writer.format('parquet')
Expand Down Expand Up @@ -319,21 +334,26 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
if not tfrecord_format and not parquet_format:
raise Exception('At least one of tfrecord_format, parquet_format must be True')

otree = DNARecordsUtils.dnarecords_tree(output)

self._set_mt()
self._index_mt()
self._set_vkeys_skeys()
self._set_chrom_ranges()
self._update_vkeys_by_chrom_ranges()

self._vkeys.write.mode(write_mode).parquet(otree['vkeys'])
self._skeys.write.mode(write_mode).parquet(otree['skeys'])

self._select_ijv()
self._filter_out_undefined_entries()
if sparse:
self._filter_out_zeroes()
self._set_max_nrows_ncols()
self._set_sparsity()
self._build_ij_blocks()
self._set_ij_blocks()

otree = DNARecordsUtils.dnarecords_tree(output)

if variant_wise:
self._build_dna_blocks('i')
if tfrecord_format:
Expand All @@ -355,6 +375,3 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
self._write_dnarecords(otree['swpar'], otree['swpsc'], f'{self._sw_dna_staging}/*', write_mode,
gzip, False)
self._write_key_files(otree['swpar'], otree['swpfs'], False, write_mode)

self._vkeys.write.mode(write_mode).parquet(otree['vkeys'])
self._skeys.write.mode(write_mode).parquet(otree['skeys'])
10 changes: 10 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import pandas as pd
import pytest
import dnarecords as dr
import glob
from itertools import groupby


def check_dosage_sample_wise(skeys, vkeys, mt, dna, approx):
Expand Down Expand Up @@ -116,3 +118,11 @@ def test_raises_exception_when_not_tfrecord_format_and_not_parquet_format(dosage
with pytest.raises(Exception) as ex_info:
dr.writer.DNARecordsWriter(dosage_1kg.dosage).write(dosage_output, tfrecord_format=False, parquet_format=False)
assert ex_info.match(r"^At least one of tfrecord_format, parquet_format must be True$")


def test_generates_single_file_per_block(dosage_output):
kv_blocks = dosage_output.replace('/output', '/staging/kv-blocks')
pairs = [f.rsplit('/', 1) for f in glob.glob(f"{kv_blocks}/*/*/*.parquet")]
for r, g in groupby(pairs, key=lambda t: t[0]):
files = list(g)
assert len(list(files)) == 1, "kv-blocks with more than one file exists"

0 comments on commit 896d14b

Please sign in to comment.