From 44197776e18a5a4c8e98c834a8f7b46bfe0306cc Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:30:44 +0000 Subject: [PATCH 01/10] Fast seek through file --- yt/frontends/ramses/io_utils.pyx | 56 +++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 7becee63486..3e57a79184c 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -14,6 +14,10 @@ ctypedef np.int32_t INT32_t ctypedef np.int64_t INT64_t ctypedef np.float64_t DOUBLE_t +cdef INT32_SIZE = sizeof(np.int32_t) +cdef INT64_SIZE = sizeof(np.int64_t) +cdef DOUBLE_SIZE = sizeof(np.float64_t) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @@ -134,6 +138,9 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n return offset, level_count +cdef inline int skip_len(int Nskip, int record_len): + return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @@ -154,16 +161,27 @@ def fill_hydro(FortranFile f, cdef dict tmp cdef str field cdef INT64_t twotondim - cdef int ilevel, icpu, ifield, nfields, nlevels, nc, ncpu_selected - cdef np.ndarray[np.uint8_t, ndim=1] mask + cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected twotondim = 2**ndim - nfields = len(all_fields) + nfields_selected = len(fields) nlevels = offsets.shape[1] ncpu_selected = len(cpu_enumerator) - mask = np.array([(field in fields) for field in all_fields], dtype=np.uint8) + cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) + cdef int jump_len + + jump_len = 0 + j = 0 + for i, field in enumerate(all_fields): + if field in fields: + jumps[j] = jump_len + j += 1 + jump_len = 0 + else: + jump_len += 1 + jumps[j] = jump_len # Loop over levels for ilevel in range(nlevels): @@ -176,19 +194,33 @@ def fill_hydro(FortranFile f, if offset == -1: continue f.seek(offset) - tmp = {} # Initialize temporary data container for io # note: we use Fortran ordering to reflect the in-file ordering - for field in all_fields: - tmp[field] = np.empty((nc, twotondim), dtype="float64", order='F') + buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') + jump_len = 0 for i in range(twotondim): # Read the selected fields - for ifield in range(nfields): - if not mask[ifield]: - f.skip() - else: - tmp[all_fields[ifield]][:, i] = f.read_vector('d') # i-th cell + for j in range(nfields_selected): + jump_len += jumps[j] + if jump_len > 0: + f.seek(skip_len(jump_len, nc), 1) + jump_len = 0 + buffer[:, i, j] = f.read_vector('d') + + jump_len += jumps[nfields_selected] + + # In principle, we may be left with some fields to skip + # but since we're doing an absolute seek above, + # we don't need to do anything here + ## if jump_len > 0: + ## f.seek(skip_len(jump_len, nc), 1) + + # Alias buffer into dictionary + tmp = {} + for i, field in enumerate(fields): + tmp[field] = buffer[:, :, i] + if ncpu_selected > 1: oct_handler.fill_level_with_domain( ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1) From bad7f6686da2ef9044e4fbf78a76fc6a2f7df718 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:41:13 +0000 Subject: [PATCH 02/10] Make performance-critical loop C-only --- yt/frontends/ramses/io_utils.pyx | 6 +++-- yt/utilities/cython_fortran_utils.pxd | 1 + yt/utilities/cython_fortran_utils.pyx | 32 +++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 3e57a79184c..5f8fbc284f9 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -138,7 +138,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n return offset, level_count -cdef inline int skip_len(int Nskip, int record_len): +cdef inline int skip_len(int Nskip, int record_len) noexcept: return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) @cython.cpow(True) @@ -162,6 +162,7 @@ def fill_hydro(FortranFile f, cdef str field cdef INT64_t twotondim cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected + cdef int i, j twotondim = 2**ndim nfields_selected = len(fields) @@ -171,6 +172,7 @@ def fill_hydro(FortranFile f, cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) cdef int jump_len + cdef np.ndarray[np.float64_t, ndim=3] buffer jump_len = 0 j = 0 @@ -206,7 +208,7 @@ def fill_hydro(FortranFile f, if jump_len > 0: f.seek(skip_len(jump_len, nc), 1) jump_len = 0 - buffer[:, i, j] = f.read_vector('d') + f.read_vector_inplace('d', &buffer[0, i, j]) jump_len += jumps[nfields_selected] diff --git a/yt/utilities/cython_fortran_utils.pxd b/yt/utilities/cython_fortran_utils.pxd index ab6c2a0dc59..71c35fe1210 100644 --- a/yt/utilities/cython_fortran_utils.pxd +++ b/yt/utilities/cython_fortran_utils.pxd @@ -13,6 +13,7 @@ cdef class FortranFile: cdef INT64_t get_size(self, str dtype) cpdef INT32_t read_int(self) except? -1 cpdef np.ndarray read_vector(self, str dtype) + cdef int read_vector_inplace(self, str dtype, void *data) cpdef INT64_t tell(self) except -1 cpdef INT64_t seek(self, INT64_t pos, INT64_t whence=*) except -1 cpdef void close(self) diff --git a/yt/utilities/cython_fortran_utils.pyx b/yt/utilities/cython_fortran_utils.pyx index aa26add5c16..a1edbc70f4b 100644 --- a/yt/utilities/cython_fortran_utils.pyx +++ b/yt/utilities/cython_fortran_utils.pyx @@ -142,6 +142,38 @@ cdef class FortranFile: return data + cdef int read_vector_inplace(self, str dtype, void *data): + """Reads a record from the file and return it as numpy array. + + Parameters + ---------- + d : data type + This is the datatype (from the struct module) that we should read. + data : void* + The pointer to the data to be read. + + """ + cdef INT32_t s1, s2, size + + if self._closed: + raise ValueError("I/O operation on closed file.") + + size = self.get_size(dtype) + + fread(&s1, INT32_SIZE, 1, self.cfile) + + # Check record is compatible with data type + if s1 % size != 0: + raise ValueError('Size obtained (%s) does not match with the expected ' + 'size (%s) of multi-item record' % (s1, size)) + + fread(data, size, s1 // size, self.cfile) + fread(&s2, INT32_SIZE, 1, self.cfile) + + if s1 != s2: + raise IOError('Sizes do not agree in the header and footer for ' + 'this record - check header dtype') + cpdef INT32_t read_int(self) except? -1: """Reads a single int32 from the file and return it. From ada5ddab4b82bcb9708b7a05a3260d723a35b69d Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:51:55 +0000 Subject: [PATCH 03/10] Also make reading offset faster --- yt/frontends/ramses/field_handlers.py | 2 +- yt/frontends/ramses/io_utils.pyx | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/yt/frontends/ramses/field_handlers.py b/yt/frontends/ramses/field_handlers.py index 10860b5afd5..edd0f612f7b 100644 --- a/yt/frontends/ramses/field_handlers.py +++ b/yt/frontends/ramses/field_handlers.py @@ -272,7 +272,7 @@ def offset(self): self.domain.domain_id, self.parameters["nvar"], self.domain.amr_header, - skip_len=nvars * 8, + Nskip=nvars * 8, ) self._offset = offset diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 5f8fbc284f9..831bc4e67c4 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -87,12 +87,16 @@ def read_amr(FortranFile f, dict headers, return max_level + +cdef inline int skip_len(int Nskip, int record_len) noexcept: + return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) @cython.nonecheck(False) -cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int skip_len): +cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int Nskip): cdef np.ndarray[np.int64_t, ndim=2] offset, level_count cdef INT64_t ndim, twotondim, nlevelmax, n_levels, nboundary, ncpu, ncpu_and_bound @@ -108,8 +112,8 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n ncpu_and_bound = nboundary + ncpu twotondim = 2**ndim - if skip_len == -1: - skip_len = twotondim * nvar + if Nskip == -1: + Nskip = twotondim * nvar # It goes: level, CPU, 8-variable (1 oct) offset = np.full((ncpu_and_bound, n_levels), -1, dtype=np.int64) @@ -134,13 +138,10 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n if ilevel >= min_level: offset_view[icpu, ilevel - min_level] = f.tell() level_count_view[icpu, ilevel - min_level] = file_ncache - f.skip(skip_len) + f.seek(skip_len(Nskip, file_ncache), 1) return offset, level_count -cdef inline int skip_len(int Nskip, int record_len) noexcept: - return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) - @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) From 50711d0f60d537975d30fd9d23317884b66e244e Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 19:06:21 +0000 Subject: [PATCH 04/10] Make performance-critical functions cpdef'ed --- yt/geometry/oct_container.pxd | 23 +++++++++++++++++++++ yt/geometry/oct_container.pyx | 39 ++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/yt/geometry/oct_container.pxd b/yt/geometry/oct_container.pxd index 5ac9f9318a7..3891f8fdab0 100644 --- a/yt/geometry/oct_container.pxd +++ b/yt/geometry/oct_container.pxd @@ -82,6 +82,29 @@ cdef class OctreeContainer: cdef public object fill_style cdef public int max_level + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = ? + ) + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = ? + ) + cdef class SparseOctreeContainer(OctreeContainer): cdef OctKey *root_nodes cdef void *tree_root diff --git a/yt/geometry/oct_container.pyx b/yt/geometry/oct_container.pyx index 7d3118cb252..4b09d360e51 100644 --- a/yt/geometry/oct_container.pyx +++ b/yt/geometry/oct_container.pyx @@ -742,12 +742,16 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level(self, int level, - np.ndarray[np.uint8_t, ndim=1] levels, - np.ndarray[np.uint8_t, ndim=1] cell_inds, - np.ndarray[np.int64_t, ndim=1] file_inds, - dest_fields, source_fields, - np.int64_t offset = 0): + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = 0 + ): cdef np.ndarray[np.float64_t, ndim=2] source cdef np.ndarray[np.float64_t, ndim=1] dest cdef int i, lvl @@ -824,17 +828,18 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level_with_domain( - self, int level, - np.uint8_t[:] levels, - np.uint8_t[:] cell_inds, - np.int64_t[:] file_inds, - np.int32_t[:] domains, - dict dest_fields, - dict source_fields, - np.int32_t domain, - np.int64_t offset = 0 - ): + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = 0 + ): """Similar to fill_level but accepts a domain argument. This is particularly useful for frontends that have buffer zones at CPU boundaries. From 253f0ead1bf9581071e37413669b9372fa7f3b3e Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 11:04:48 +0000 Subject: [PATCH 05/10] Fix the docstring --- yt/utilities/cython_fortran_utils.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yt/utilities/cython_fortran_utils.pyx b/yt/utilities/cython_fortran_utils.pyx index a1edbc70f4b..fc2008fc419 100644 --- a/yt/utilities/cython_fortran_utils.pyx +++ b/yt/utilities/cython_fortran_utils.pyx @@ -143,15 +143,15 @@ cdef class FortranFile: return data cdef int read_vector_inplace(self, str dtype, void *data): - """Reads a record from the file and return it as numpy array. + """Reads a record from the file. Parameters ---------- d : data type This is the datatype (from the struct module) that we should read. data : void* - The pointer to the data to be read. - + The pointer where to store the data. + It should be preallocated and have the correct size. """ cdef INT32_t s1, s2, size From 57fef2fd35af61d1738ad2458c87502f4150ee21 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 11:27:36 +0000 Subject: [PATCH 06/10] Avoid repetitive allocations --- yt/frontends/ramses/io_utils.pyx | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 831bc4e67c4..76b59d1cc47 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -14,9 +14,9 @@ ctypedef np.int32_t INT32_t ctypedef np.int64_t INT64_t ctypedef np.float64_t DOUBLE_t -cdef INT32_SIZE = sizeof(np.int32_t) -cdef INT64_SIZE = sizeof(np.int64_t) -cdef DOUBLE_SIZE = sizeof(np.float64_t) +cdef int INT32_SIZE = sizeof(np.int32_t) +cdef int INT64_SIZE = sizeof(np.int64_t) +cdef int DOUBLE_SIZE = sizeof(np.float64_t) @cython.cpow(True) @cython.boundscheck(False) @@ -163,7 +163,7 @@ def fill_hydro(FortranFile f, cdef str field cdef INT64_t twotondim cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected - cdef int i, j + cdef int i, j, ii twotondim = 2**ndim nfields_selected = len(fields) @@ -171,7 +171,9 @@ def fill_hydro(FortranFile f, nlevels = offsets.shape[1] ncpu_selected = len(cpu_enumerator) - cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) + cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64) + + cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) cdef int jump_len cdef np.ndarray[np.float64_t, ndim=3] buffer @@ -186,10 +188,12 @@ def fill_hydro(FortranFile f, jump_len += 1 jumps[j] = jump_len + cdef int nc_largest = 0 # Loop over levels for ilevel in range(nlevels): # Loop over cpu domains - for icpu in cpu_enumerator: + for ii in range(ncpu_selected): + icpu = cpu_list[ii] nc = level_count[icpu, ilevel] if nc == 0: continue @@ -199,7 +203,9 @@ def fill_hydro(FortranFile f, f.seek(offset) # Initialize temporary data container for io # note: we use Fortran ordering to reflect the in-file ordering - buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') + if nc > nc_largest: + nc_largest = nc + buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') jump_len = 0 for i in range(twotondim): From d33e2b130c47a2325bbc7342fde9b84bba905403 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 17:23:19 +0000 Subject: [PATCH 07/10] Make comment more obvious --- yt/frontends/ramses/io_utils.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 76b59d1cc47..cb58ad98c57 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -220,8 +220,8 @@ def fill_hydro(FortranFile f, jump_len += jumps[nfields_selected] # In principle, we may be left with some fields to skip - # but since we're doing an absolute seek above, - # we don't need to do anything here + # but since we're doing an absolute seek at the beginning of + # the loop on CPUs, we can spare one seek here ## if jump_len > 0: ## f.seek(skip_len(jump_len, nc), 1) From a16bda2a47ea639dfdb76233a3e4204164e8e236 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 17:31:24 +0000 Subject: [PATCH 08/10] Making the skip_len nogil, just to be safe --- yt/frontends/ramses/io_utils.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index cb58ad98c57..335176eff2e 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -88,7 +88,7 @@ def read_amr(FortranFile f, dict headers, return max_level -cdef inline int skip_len(int Nskip, int record_len) noexcept: +cdef inline int skip_len(int Nskip, int record_len) noexcept nogil: return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) @cython.cpow(True) From b3eef0d1aae7adb3bb9ade1159bba39ce0995cb9 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 14:37:47 +0000 Subject: [PATCH 09/10] Preallocate given maximal size --- yt/frontends/ramses/io_utils.pyx | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 335176eff2e..579f36ec439 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -188,7 +188,7 @@ def fill_hydro(FortranFile f, jump_len += 1 jumps[j] = jump_len - cdef int nc_largest = 0 + buffer = np.empty((level_count.max(), twotondim, nfields_selected), dtype="float64", order='F') # Loop over levels for ilevel in range(nlevels): # Loop over cpu domains @@ -201,11 +201,6 @@ def fill_hydro(FortranFile f, if offset == -1: continue f.seek(offset) - # Initialize temporary data container for io - # note: we use Fortran ordering to reflect the in-file ordering - if nc > nc_largest: - nc_largest = nc - buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') jump_len = 0 for i in range(twotondim): From 7a953bcbecd63cddf04c4e6ec927bf72948fea9b Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 14:50:47 +0000 Subject: [PATCH 10/10] Spare the first jump --- yt/frontends/ramses/io_utils.pyx | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 579f36ec439..9dc749e1400 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -187,6 +187,7 @@ def fill_hydro(FortranFile f, else: jump_len += 1 jumps[j] = jump_len + cdef int first_field_index = jumps[0] buffer = np.empty((level_count.max(), twotondim, nfields_selected), dtype="float64", order='F') # Loop over levels @@ -200,9 +201,11 @@ def fill_hydro(FortranFile f, offset = offsets[icpu, ilevel] if offset == -1: continue - f.seek(offset) + f.seek(offset + skip_len(first_field_index, nc)) - jump_len = 0 + # We have already skipped the first fields (if any) + # so we "rewind" (this will cancel the first seek) + jump_len = -first_field_index for i in range(twotondim): # Read the selected fields for j in range(nfields_selected):