Skip to content

Commit

Permalink
Merge pull request #4736 from cphyc/ramses-speedup-hydro
Browse files Browse the repository at this point in the history
Fast seek through file
  • Loading branch information
cphyc authored Nov 27, 2023
2 parents ca16b77 + ed47ea5 commit f40a712
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 65 deletions.
35 changes: 22 additions & 13 deletions yt/frontends/ramses/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,25 @@ def _fill_no_ghostzones(self, fd, fields, selector, file_handler):
data = {}
cell_count = selector.count_oct_cells(self.oct_handler, self.domain_id)

levels, cell_inds, file_inds = self.oct_handler.file_index_octs(
selector, self.domain_id, cell_count
)

# Initializing data container
for field in fields:
data[field] = np.zeros(cell_count, "float64")

# Do an early exit if the cell count is null
if cell_count == 0:
return data

level_inds, cell_inds, file_inds = self.oct_handler.file_index_octs(
selector, self.domain_id, cell_count
)

cpu_list = [self.domain_id - 1]
fill_hydro(
fd,
file_handler.offset,
file_handler.level_count,
cpu_list,
levels,
level_inds,
cell_inds,
file_inds,
ndim,
Expand All @@ -416,38 +420,43 @@ def _fill_with_ghostzones(
selector.count_octs(self.oct_handler, self.domain_id) * self.nz**ndim
)

# Initializing data container
for field in fields:
tr[field] = np.zeros(cell_count, "float64")

# Do an early exit if the cell count is null
if cell_count == 0:
return tr

gz_cache = getattr(self, "_ghost_zone_cache", None)
if gz_cache:
levels, cell_inds, file_inds, domains = gz_cache
level_inds, cell_inds, file_inds, domain_inds = gz_cache
else:
gz_cache = (
levels,
level_inds,
cell_inds,
file_inds,
domains,
domain_inds,
) = self.oct_handler.file_index_octs_with_ghost_zones(
selector, self.domain_id, cell_count, self._num_ghost_zones
)
self._ghost_zone_cache = gz_cache

# Initializing data container
for field in fields:
tr[field] = np.zeros(cell_count, "float64")
cpu_list = list(range(ncpu))
fill_hydro(
fd,
file_handler.offset,
file_handler.level_count,
cpu_list,
levels,
level_inds,
cell_inds,
file_inds,
ndim,
all_fields,
fields,
tr,
oct_handler,
domains=domains,
domain_inds=domain_inds,
)
return tr

Expand Down
89 changes: 58 additions & 31 deletions yt/frontends/ramses/io_utils.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# distutils: libraries = STD_LIBS
# distutils: include_dirs = LIB_DIR
cimport cython
from libc.stdio cimport SEEK_CUR, SEEK_SET
cimport numpy as np

import numpy as np
Expand All @@ -10,6 +11,7 @@ from yt.utilities.cython_fortran_utils cimport FortranFile

from yt.utilities.exceptions import YTIllDefinedAMRData


ctypedef np.int32_t INT32_t
ctypedef np.int64_t INT64_t
ctypedef np.float64_t DOUBLE_t
Expand All @@ -18,6 +20,11 @@ cdef int INT32_SIZE = sizeof(np.int32_t)
cdef int INT64_SIZE = sizeof(np.int64_t)
cdef int DOUBLE_SIZE = sizeof(np.float64_t)


cdef inline int skip_len(int Nskip, int record_len) noexcept nogil:
return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE)


@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
Expand All @@ -29,10 +36,15 @@ def read_amr(FortranFile f, dict headers,

cdef INT64_t ncpu, nboundary, max_level, nlevelmax, ncpu_and_bound
cdef DOUBLE_t nx, ny, nz
cdef INT64_t ilevel, icpu, n, ndim, skip_len
cdef INT32_t ng, buffer_size
cdef INT64_t ilevel, icpu, n, ndim, jump_len
cdef INT32_t ng
cdef np.ndarray[np.int32_t, ndim=2] numbl
cdef np.ndarray[np.float64_t, ndim=2] pos
cdef int i

# The ordering is very important here, as we'll write directly into the memory
# address the content of the files.
cdef np.float64_t[::1, :] pos_view

ndim = headers['ndim']
numbl = headers['numbl']
Expand All @@ -43,17 +55,21 @@ def read_amr(FortranFile f, dict headers,

ncpu_and_bound = nboundary + ncpu

pos = np.empty((0, 3), dtype=np.float64)
buffer_size = 0
# Allocate more memory if required
pos = np.empty((max(numbl.max(), ngridbound.max()), 3), dtype="d", order="F")
pos_view = pos

# Compute number of fields to skip. This should be 31 in 3 dimensions
skip_len = (1 # father index
jump_len = (1 # father index
+ 2*ndim # neighbor index
+ 2**ndim # son index
+ 2**ndim # cpu map
+ 2**ndim # refinement map
)
# Initialize values
max_level = 0
cdef int record_len

for ilevel in range(nlevelmax):
for icpu in range(ncpu_and_bound):
if icpu < ncpu:
Expand All @@ -63,21 +79,24 @@ def read_amr(FortranFile f, dict headers,

if ng == 0:
continue
# Skip grid index, 'next' and 'prev' arrays (they are used
# to build the linked list in RAMSES)
f.skip(3)
# Skip grid index, 'next' and 'prev' arrays
record_len = (ng * INT32_SIZE + INT64_SIZE)
f.seek(record_len * 3, SEEK_CUR)

# Allocate more memory if required
if ng > buffer_size:
pos = np.empty((ng, 3), dtype="d")
buffer_size = ng
f.read_vector_inplace("d", <void*> &pos_view[0, 0])
f.read_vector_inplace("d", <void*> &pos_view[0, 1])
f.read_vector_inplace("d", <void*> &pos_view[0, 2])

pos[:ng, 0] = f.read_vector("d") - nx
pos[:ng, 1] = f.read_vector("d") - ny
pos[:ng, 2] = f.read_vector("d") - nz
for i in range(ng):
pos_view[i, 0] -= nx
for i in range(ng):
pos_view[i, 1] -= ny
for i in range(ng):
pos_view[i, 2] -= nz

# Skip father, neighbor, son, cpu map and refinement map
f.skip(skip_len)
f.seek(record_len * jump_len, SEEK_CUR)

# Note that we're adding *grids*, not individual cells.
if ilevel >= min_level:
n = oct_handler.add(icpu + 1, ilevel - min_level, pos[:ng, :],
Expand All @@ -87,10 +106,6 @@ def read_amr(FortranFile f, dict headers,

return max_level


cdef inline int skip_len(int Nskip, int record_len) noexcept nogil:
return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE)

@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down Expand Up @@ -138,7 +153,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n
if ilevel >= min_level:
offset_view[icpu, ilevel - min_level] = f.tell()
level_count_view[icpu, ilevel - min_level] = <INT64_t> file_ncache
f.seek(skip_len(Nskip, file_ncache), 1)
f.seek(skip_len(Nskip, file_ncache), SEEK_CUR)

return offset, level_count

Expand All @@ -151,13 +166,13 @@ def fill_hydro(FortranFile f,
np.ndarray[np.int64_t, ndim=2] offsets,
np.ndarray[np.int64_t, ndim=2] level_count,
list cpu_enumerator,
np.ndarray[np.uint8_t, ndim=1] levels,
np.ndarray[np.uint8_t, ndim=1] level_inds,
np.ndarray[np.uint8_t, ndim=1] cell_inds,
np.ndarray[np.int64_t, ndim=1] file_inds,
INT64_t ndim, list all_fields, list fields,
dict tr,
RAMSESOctreeContainer oct_handler,
np.ndarray[np.int32_t, ndim=1] domains=np.array([], dtype='int32')):
np.ndarray[np.int32_t, ndim=1] domain_inds=np.array([], dtype='int32')):
cdef INT64_t offset
cdef dict tmp
cdef str field
Expand All @@ -174,8 +189,12 @@ def fill_hydro(FortranFile f,
cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64)

cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64)
cdef int jump_len
cdef np.ndarray[np.float64_t, ndim=3] buffer
cdef int jump_len, Ncells
cdef np.uint8_t[::1] mask_level = np.zeros(nlevels, dtype=np.uint8)

# The ordering is very important here, as we'll write directly into the memory
# address the content of the files.
cdef np.float64_t[::1, :, :] buffer

jump_len = 0
j = 0
Expand All @@ -190,9 +209,17 @@ def fill_hydro(FortranFile f,
cdef int first_field_index = jumps[0]

buffer = np.empty((level_count.max(), twotondim, nfields_selected), dtype="float64", order='F')

# Precompute which levels we need to read
Ncells = len(level_inds)
for i in range(Ncells):
mask_level[level_inds[i]] |= 1

# Loop over levels
for ilevel in range(nlevels):
# Loop over cpu domains
if mask_level[ilevel] == 0:
continue
# Loop over cpu domains. In most cases, we'll only read the CPU corresponding to the domain.
for ii in range(ncpu_selected):
icpu = cpu_list[ii]
nc = level_count[icpu, ilevel]
Expand All @@ -201,7 +228,7 @@ def fill_hydro(FortranFile f,
offset = offsets[icpu, ilevel]
if offset == -1:
continue
f.seek(offset + skip_len(first_field_index, nc))
f.seek(offset + skip_len(first_field_index, nc), SEEK_SET)

# We have already skipped the first fields (if any)
# so we "rewind" (this will cancel the first seek)
Expand All @@ -211,7 +238,7 @@ def fill_hydro(FortranFile f,
for j in range(nfields_selected):
jump_len += jumps[j]
if jump_len > 0:
f.seek(skip_len(jump_len, nc), 1)
f.seek(skip_len(jump_len, nc), SEEK_CUR)
jump_len = 0
f.read_vector_inplace('d', <void*> &buffer[0, i, j])

Expand All @@ -221,7 +248,7 @@ def fill_hydro(FortranFile f,
# but since we're doing an absolute seek at the beginning of
# the loop on CPUs, we can spare one seek here
## if jump_len > 0:
## f.seek(skip_len(jump_len, nc), 1)
## f.seek(skip_len(jump_len, nc), SEEK_CUR)

# Alias buffer into dictionary
tmp = {}
Expand All @@ -230,7 +257,7 @@ def fill_hydro(FortranFile f,

if ncpu_selected > 1:
oct_handler.fill_level_with_domain(
ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1)
ilevel, level_inds, cell_inds, file_inds, domain_inds, tr, tmp, domain=icpu+1)
else:
oct_handler.fill_level(
ilevel, levels, cell_inds, file_inds, tr, tmp)
ilevel, level_inds, cell_inds, file_inds, tr, tmp)
14 changes: 7 additions & 7 deletions yt/geometry/oct_container.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ cdef class OctreeContainer:
cpdef void fill_level(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.uint8_t[::1] level_inds,
const np.uint8_t[::1] cell_inds,
const np.int64_t[::1] file_inds,
dict dest_fields,
dict source_fields,
np.int64_t offset = ?
)
cpdef int fill_level_with_domain(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.int32_t[:] domains,
const np.uint8_t[::1] level_inds,
const np.uint8_t[::1] cell_inds,
const np.int64_t[::1] file_inds,
const np.int32_t[::1] domain_inds,
dict dest_fields,
dict source_fields,
const np.int32_t domain,
Expand Down
28 changes: 14 additions & 14 deletions yt/geometry/oct_container.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -745,15 +745,15 @@ cdef class OctreeContainer:
cpdef void fill_level(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.uint8_t[::1] levels,
const np.uint8_t[::1] cell_inds,
const np.int64_t[::1] file_inds,
dict dest_fields,
dict source_fields,
np.int64_t offset = 0
):
cdef np.ndarray[np.float64_t, ndim=2] source
cdef np.ndarray[np.float64_t, ndim=1] dest
cdef np.float64_t[:, :] source
cdef np.float64_t[::1] dest
cdef int i, lvl

for key in dest_fields:
Expand Down Expand Up @@ -831,10 +831,10 @@ cdef class OctreeContainer:
cpdef int fill_level_with_domain(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.int32_t[:] domains,
const np.uint8_t[::1] level_inds,
const np.uint8_t[::1] cell_inds,
const np.int64_t[::1] file_inds,
const np.int32_t[::1] domain_inds,
dict dest_fields,
dict source_fields,
const np.int32_t domain,
Expand All @@ -846,18 +846,18 @@ cdef class OctreeContainer:
These buffer oct cells have a different domain than the local one and
are usually not read, but one has to read them e.g. to compute ghost zones.
"""
cdef np.ndarray[np.float64_t, ndim=2] source
cdef np.ndarray[np.float64_t, ndim=1] dest
cdef np.float64_t[:, :] source
cdef np.float64_t[::1] dest
cdef int i, count, lev
cdef np.int32_t dom

for key in dest_fields:
dest = dest_fields[key]
source = source_fields[key]
count = 0
for i in range(levels.shape[0]):
lev = levels[i]
dom = domains[i]
for i in range(level_inds.shape[0]):
lev = level_inds[i]
dom = domain_inds[i]
if lev != level or dom != domain: continue
count += 1
if file_inds[i] < 0:
Expand Down

0 comments on commit f40a712

Please sign in to comment.