Skip to content

Commit

Permalink
Merge pull request #4720 from cphyc/ramses-speedup
Browse files Browse the repository at this point in the history
  • Loading branch information
neutrinoceros authored Nov 8, 2023
2 parents 2c8bf86 + 7a953bc commit de317c9
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 39 deletions.
2 changes: 1 addition & 1 deletion yt/frontends/ramses/field_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def offset(self):
self.domain.domain_id,
self.parameters["nvar"],
self.domain.amr_header,
skip_len=nvars * 8,
Nskip=nvars * 8,
)

self._offset = offset
Expand Down
81 changes: 60 additions & 21 deletions yt/frontends/ramses/io_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ ctypedef np.int32_t INT32_t
ctypedef np.int64_t INT64_t
ctypedef np.float64_t DOUBLE_t

cdef int INT32_SIZE = sizeof(np.int32_t)
cdef int INT64_SIZE = sizeof(np.int64_t)
cdef int DOUBLE_SIZE = sizeof(np.float64_t)

@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down Expand Up @@ -83,12 +87,16 @@ def read_amr(FortranFile f, dict headers,

return max_level


cdef inline int skip_len(int Nskip, int record_len) noexcept nogil:
return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE)

@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
@cython.nonecheck(False)
cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int skip_len):
cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int Nskip):

cdef np.ndarray[np.int64_t, ndim=2] offset, level_count
cdef INT64_t ndim, twotondim, nlevelmax, n_levels, nboundary, ncpu, ncpu_and_bound
Expand All @@ -104,8 +112,8 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n
ncpu_and_bound = nboundary + ncpu
twotondim = 2**ndim

if skip_len == -1:
skip_len = twotondim * nvar
if Nskip == -1:
Nskip = twotondim * nvar

# It goes: level, CPU, 8-variable (1 oct)
offset = np.full((ncpu_and_bound, n_levels), -1, dtype=np.int64)
Expand All @@ -130,7 +138,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n
if ilevel >= min_level:
offset_view[icpu, ilevel - min_level] = f.tell()
level_count_view[icpu, ilevel - min_level] = <INT64_t> file_ncache
f.skip(skip_len)
f.seek(skip_len(Nskip, file_ncache), 1)

return offset, level_count

Expand All @@ -154,41 +162,72 @@ def fill_hydro(FortranFile f,
cdef dict tmp
cdef str field
cdef INT64_t twotondim
cdef int ilevel, icpu, ifield, nfields, nlevels, nc, ncpu_selected
cdef np.ndarray[np.uint8_t, ndim=1] mask
cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected
cdef int i, j, ii

twotondim = 2**ndim
nfields = len(all_fields)
nfields_selected = len(fields)

nlevels = offsets.shape[1]
ncpu_selected = len(cpu_enumerator)

mask = np.array([(field in fields) for field in all_fields], dtype=np.uint8)

cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64)

cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64)
cdef int jump_len
cdef np.ndarray[np.float64_t, ndim=3] buffer

jump_len = 0
j = 0
for i, field in enumerate(all_fields):
if field in fields:
jumps[j] = jump_len
j += 1
jump_len = 0
else:
jump_len += 1
jumps[j] = jump_len
cdef int first_field_index = jumps[0]

buffer = np.empty((level_count.max(), twotondim, nfields_selected), dtype="float64", order='F')
# Loop over levels
for ilevel in range(nlevels):
# Loop over cpu domains
for icpu in cpu_enumerator:
for ii in range(ncpu_selected):
icpu = cpu_list[ii]
nc = level_count[icpu, ilevel]
if nc == 0:
continue
offset = offsets[icpu, ilevel]
if offset == -1:
continue
f.seek(offset)
tmp = {}
# Initialize temporary data container for io
# note: we use Fortran ordering to reflect the in-file ordering
for field in all_fields:
tmp[field] = np.empty((nc, twotondim), dtype="float64", order='F')
f.seek(offset + skip_len(first_field_index, nc))

# We have already skipped the first fields (if any)
# so we "rewind" (this will cancel the first seek)
jump_len = -first_field_index
for i in range(twotondim):
# Read the selected fields
for ifield in range(nfields):
if not mask[ifield]:
f.skip()
else:
tmp[all_fields[ifield]][:, i] = f.read_vector('d') # i-th cell
for j in range(nfields_selected):
jump_len += jumps[j]
if jump_len > 0:
f.seek(skip_len(jump_len, nc), 1)
jump_len = 0
f.read_vector_inplace('d', <void*> &buffer[0, i, j])

jump_len += jumps[nfields_selected]

# In principle, we may be left with some fields to skip
# but since we're doing an absolute seek at the beginning of
# the loop on CPUs, we can spare one seek here
## if jump_len > 0:
## f.seek(skip_len(jump_len, nc), 1)

# Alias buffer into dictionary
tmp = {}
for i, field in enumerate(fields):
tmp[field] = buffer[:, :, i]

if ncpu_selected > 1:
oct_handler.fill_level_with_domain(
ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1)
Expand Down
23 changes: 23 additions & 0 deletions yt/geometry/oct_container.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ cdef class OctreeContainer:
cdef public object fill_style
cdef public int max_level

cpdef void fill_level(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
dict dest_fields,
dict source_fields,
np.int64_t offset = ?
)
cpdef int fill_level_with_domain(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.int32_t[:] domains,
dict dest_fields,
dict source_fields,
const np.int32_t domain,
np.int64_t offset = ?
)

cdef class SparseOctreeContainer(OctreeContainer):
cdef OctKey *root_nodes
cdef void *tree_root
Expand Down
39 changes: 22 additions & 17 deletions yt/geometry/oct_container.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -742,12 +742,16 @@ cdef class OctreeContainer:
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
def fill_level(self, int level,
np.ndarray[np.uint8_t, ndim=1] levels,
np.ndarray[np.uint8_t, ndim=1] cell_inds,
np.ndarray[np.int64_t, ndim=1] file_inds,
dest_fields, source_fields,
np.int64_t offset = 0):
cpdef void fill_level(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
dict dest_fields,
dict source_fields,
np.int64_t offset = 0
):
cdef np.ndarray[np.float64_t, ndim=2] source
cdef np.ndarray[np.float64_t, ndim=1] dest
cdef int i, lvl
Expand Down Expand Up @@ -824,17 +828,18 @@ cdef class OctreeContainer:
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
def fill_level_with_domain(
self, int level,
np.uint8_t[:] levels,
np.uint8_t[:] cell_inds,
np.int64_t[:] file_inds,
np.int32_t[:] domains,
dict dest_fields,
dict source_fields,
np.int32_t domain,
np.int64_t offset = 0
):
cpdef int fill_level_with_domain(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.int32_t[:] domains,
dict dest_fields,
dict source_fields,
const np.int32_t domain,
np.int64_t offset = 0
):
"""Similar to fill_level but accepts a domain argument.
This is particularly useful for frontends that have buffer zones at CPU boundaries.
Expand Down
1 change: 1 addition & 0 deletions yt/utilities/cython_fortran_utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cdef class FortranFile:
cdef INT64_t get_size(self, str dtype)
cpdef INT32_t read_int(self) except? -1
cpdef np.ndarray read_vector(self, str dtype)
cdef int read_vector_inplace(self, str dtype, void *data)
cpdef INT64_t tell(self) except -1
cpdef INT64_t seek(self, INT64_t pos, INT64_t whence=*) except -1
cpdef void close(self)
32 changes: 32 additions & 0 deletions yt/utilities/cython_fortran_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,38 @@ cdef class FortranFile:

return data

cdef int read_vector_inplace(self, str dtype, void *data):
"""Reads a record from the file.
Parameters
----------
d : data type
This is the datatype (from the struct module) that we should read.
data : void*
The pointer where to store the data.
It should be preallocated and have the correct size.
"""
cdef INT32_t s1, s2, size

if self._closed:
raise ValueError("I/O operation on closed file.")

size = self.get_size(dtype)

fread(&s1, INT32_SIZE, 1, self.cfile)

# Check record is compatible with data type
if s1 % size != 0:
raise ValueError('Size obtained (%s) does not match with the expected '
'size (%s) of multi-item record' % (s1, size))

fread(data, size, s1 // size, self.cfile)
fread(&s2, INT32_SIZE, 1, self.cfile)

if s1 != s2:
raise IOError('Sizes do not agree in the header and footer for '
'this record - check header dtype')

cpdef INT32_t read_int(self) except? -1:
"""Reads a single int32 from the file and return it.
Expand Down

0 comments on commit de317c9

Please sign in to comment.