-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python bindings for index writer #8
base: main
Are you sure you want to change the base?
Changes from all commits
30bd7fe
cc72148
4950834
1fe3e1a
b4754b5
9e9a1e1
e088495
e882b38
b11054f
6ac9203
15ddf73
dd65b72
469a132
dbfc2a1
5ccaaae
094fef0
f231482
61e3cba
49e7edc
5f22072
c225eca
2f0b112
a64b026
d9e24d0
430bb0a
de41a36
acfc865
db0a89c
162e877
bf27457
c492579
0a4aa44
9d17694
250b692
2c73722
305f876
42e2f3c
14acdd5
1ab568a
bc2baaa
b3c839b
e4dc308
d377355
b215579
a4a5843
311cca7
986b98e
40fbae5
329aa85
b35811f
3306d95
e414888
918536f
f8a70c9
132b3d1
268574c
1b9b5ab
642cac1
62ebace
076f499
0e79024
9ce55eb
bff14e8
bca9a43
64a3f60
5ec7f92
32c80a9
2af1d9c
4f40a3b
71f11c5
8a7f8a4
29c6ae4
147e402
8894292
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,11 +1,13 @@ | ||||||||||||||
cmake_minimum_required(VERSION 3.1) | ||||||||||||||
cmake_minimum_required(VERSION 3.15) | ||||||||||||||
project(neuron_parquet VERSION 0.8.1) | ||||||||||||||
|
||||||||||||||
set(CMAKE_CXX_STANDARD 17) | ||||||||||||||
set(CMAKE_CXX_STANDARD_REQUIRED ON) | ||||||||||||||
|
||||||||||||||
# Look for libraries in 'lib64' as well as in 'lib' | ||||||||||||||
set_property(GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS ON) | ||||||||||||||
|
||||||||||||||
# Determine version from git if possible | ||||||||||||||
find_package(Git) | ||||||||||||||
execute_process( | ||||||||||||||
COMMAND ${GIT_EXECUTABLE} describe --tags | ||||||||||||||
|
@@ -15,36 +17,65 @@ execute_process( | |||||||||||||
ERROR_QUIET) | ||||||||||||||
|
||||||||||||||
if(ECODE) | ||||||||||||||
message( | ||||||||||||||
"Failed to determine version from git, using ${CMAKE_PROJECT_VERSION}") | ||||||||||||||
message("Failed to determine version from git, using ${CMAKE_PROJECT_VERSION}") | ||||||||||||||
set(NEURONPARQUET_VERSION ${CMAKE_PROJECT_VERSION}) | ||||||||||||||
endif() | ||||||||||||||
|
||||||||||||||
# Find required packages | ||||||||||||||
find_package(MPI REQUIRED) | ||||||||||||||
find_package(Arrow REQUIRED) | ||||||||||||||
|
||||||||||||||
# Add MPI include directories and definitions | ||||||||||||||
include_directories(SYSTEM ${MPI_INCLUDE_PATH}) | ||||||||||||||
add_definitions(${MPI_CXX_COMPILE_FLAGS}) | ||||||||||||||
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_CXX_LINK_FLAGS}") | ||||||||||||||
add_definitions(-DOMPI_SKIP_MPICXX) | ||||||||||||||
get_filename_component(MY_SEARCH_DIR ${Arrow_CONFIG} DIRECTORY) | ||||||||||||||
Comment on lines
+28
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use MPI targets
Comment on lines
+27
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please link against the CMake targets for MPI. For example like this: https://github.com/BlueBrain/HighFive/blob/9d4c77e719dc3fcad77ba13d809ba2cdf9f845d2/cmake/HighFiveConfig.cmake#L15 |
||||||||||||||
find_package(Parquet REQUIRED HINTS ${MY_SEARCH_DIR}) | ||||||||||||||
find_package(HDF5 REQUIRED) | ||||||||||||||
find_package(nlohmann_json REQUIRED) | ||||||||||||||
find_package(Range-v3 REQUIRED) | ||||||||||||||
|
||||||||||||||
find_package(HighFive) | ||||||||||||||
find_package(CLI11) | ||||||||||||||
find_package(Catch2) | ||||||||||||||
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED) | ||||||||||||||
|
||||||||||||||
if(NOT HighFive_FOUND) | ||||||||||||||
set(HDF5_IS_PARALLEL ON CACHE BOOL "Enable parallel HDF5" FORCE) | ||||||||||||||
add_subdirectory(${CMAKE_SOURCE_DIR}/deps/highfive) | ||||||||||||||
endif() | ||||||||||||||
|
||||||||||||||
find_package(CLI11) | ||||||||||||||
if(NOT CLI11_FOUND) | ||||||||||||||
add_subdirectory(${CMAKE_SOURCE_DIR}/deps/cli11) | ||||||||||||||
endif() | ||||||||||||||
|
||||||||||||||
add_subdirectory(src) | ||||||||||||||
|
||||||||||||||
find_package(Catch2) | ||||||||||||||
if(NOT ${Catch2_FOUND}) | ||||||||||||||
add_subdirectory(deps/catch2 EXCLUDE_FROM_ALL) | ||||||||||||||
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/deps/catch2/extras) | ||||||||||||||
endif() | ||||||||||||||
|
||||||||||||||
execute_process( | ||||||||||||||
COMMAND "${Python_EXECUTABLE}" -m nanobind --cmake_dir | ||||||||||||||
OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE nanobind_ROOT) | ||||||||||||||
find_package(nanobind CONFIG REQUIRED) | ||||||||||||||
|
||||||||||||||
Comment on lines
+57
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We normally vendor submodules for bindings, with a parameter to specify if the bindings are built. For example here: https://github.com/BlueBrain/MorphIO/blob/master/CMakeLists.txt#L10 |
||||||||||||||
add_subdirectory(src) | ||||||||||||||
|
||||||||||||||
enable_testing() | ||||||||||||||
add_subdirectory(tests) | ||||||||||||||
|
||||||||||||||
add_library(index_writer STATIC | ||||||||||||||
src/index/index.cpp | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
set_target_properties(index_writer PROPERTIES POSITION_INDEPENDENT_CODE ON) | ||||||||||||||
|
||||||||||||||
target_link_libraries(index_writer PRIVATE HighFive HDF5::HDF5) | ||||||||||||||
|
||||||||||||||
nanobind_add_module(index_writer_py src/index/index_bindings.cpp) | ||||||||||||||
|
||||||||||||||
target_include_directories(index_writer_py PRIVATE ${HighFive_INCLUDE_DIRS}) | ||||||||||||||
target_link_libraries(index_writer_py PRIVATE index_writer HighFive) | ||||||||||||||
|
||||||||||||||
# Install the Python module | ||||||||||||||
#install(TARGETS index_writer_py DESTINATION .) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import index_writer_py | ||
from mpi4py import MPI | ||
import logging | ||
import sys | ||
import traceback | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
def write_index(filename, group_path, source_node_count, target_node_count): | ||
comm = MPI.COMM_WORLD | ||
rank = comm.Get_rank() | ||
size = comm.Get_size() | ||
logger.info(f"Rank {rank}/{size}: Starting write_index") | ||
try: | ||
logger.info( | ||
f"Rank {rank}/{size}: Filename: {filename}, Source node count: {source_node_count}, Target node count: {target_node_count}") | ||
|
||
start_time = MPI.Wtime() | ||
index_writer_py.write(filename, group_path, source_node_count, target_node_count) | ||
end_time = MPI.Wtime() | ||
|
||
logger.info( | ||
f"Rank {rank}/{size}: After index_writer_py.write, execution time: {end_time - start_time:.2f} seconds") | ||
|
||
# Ensure all processes have completed writing | ||
comm.Barrier() | ||
logger.info(f"Rank {rank}/{size}: After final barrier") | ||
except Exception as e: | ||
logger.error(f"Rank {rank}/{size}: Error writing index: {e}") | ||
logger.error(traceback.format_exc()) | ||
raise | ||
finally: | ||
logger.info(f"Rank {rank}/{size}: Completed write_index") | ||
sys.stdout.flush() |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move this to, e.g., |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
#include <nanobind/nanobind.h> | ||
#include <nanobind/stl/string.h> | ||
#include <highfive/H5File.hpp> | ||
#include <mpi.h> | ||
#include "index.h" | ||
#include <filesystem> | ||
|
||
namespace nb = nanobind; | ||
|
||
void write_index(const std::string& filename, const std::string& group_path, uint64_t sourceNodeCount, uint64_t targetNodeCount) { | ||
try { | ||
// Check if the file exists | ||
if (!std::filesystem::exists(filename)) { | ||
throw std::runtime_error("File does not exist: " + filename); | ||
} | ||
|
||
// Open the file in read-write mode with MPI-IO | ||
HighFive::FileAccessProps fapl; | ||
fapl.add(HighFive::MPIOFileAccess(MPI_COMM_WORLD, MPI_INFO_NULL)); | ||
HighFive::File file(filename, HighFive::File::ReadWrite, fapl); | ||
|
||
if (!file.exist(group_path)) { | ||
throw std::runtime_error("Group '" + std::string(group_path) + "' not found in file"); | ||
} | ||
|
||
HighFive::Group group = file.getGroup(group_path); | ||
|
||
if (!group.exist("source_node_id") || !group.exist("target_node_id")) { | ||
throw std::runtime_error("Required datasets 'source_node_id' or 'target_node_id' not found in group '" + std::string(group_path) + "'"); | ||
} | ||
|
||
indexing::write(group, sourceNodeCount, targetNodeCount); | ||
} catch (const HighFive::Exception& e) { | ||
throw std::runtime_error("HighFive error in write_index: " + std::string(e.what())); | ||
} catch (const std::exception& e) { | ||
throw std::runtime_error("Error in write_index: " + std::string(e.what())); | ||
} catch (...) { | ||
throw std::runtime_error("Unknown error in write_index"); | ||
} | ||
} | ||
|
||
NB_MODULE(index_writer_py, m) { | ||
m.def("write", &write_index, "Write index to HDF5 file using parallel I/O", | ||
nb::arg("filename"), nb::arg("groupPath"), nb::arg("sourceNodeCount"), nb::arg("targetNodeCount")); | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -44,9 +44,10 @@ set_tests_properties(touches_conversion_v2 parquet_conversion_v2 | |||||
|
||||||
add_executable(test_indexing test_indexing.cpp | ||||||
${${PROJECT_NAME}_SOURCE_DIR}/src/index/index.cpp) | ||||||
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_C) | ||||||
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_CXX) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We don't use C++ MPI features normally, so linking against the C target should be sufficient. Did you encounter any problems using the C target? |
||||||
target_include_directories( | ||||||
test_indexing PRIVATE $<BUILD_INTERFACE:${${PROJECT_NAME}_SOURCE_DIR}/src>) | ||||||
target_compile_definitions(test_indexing PRIVATE OMPI_SKIP_MPICXX) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed? I've never seen the need for this feature. |
||||||
|
||||||
include(CTest) | ||||||
include(Catch) | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import pytest | ||
import h5py | ||
import numpy as np | ||
from mpi4py import MPI | ||
import index_writer_py | ||
|
||
NNODES = 10 | ||
SOURCE_OFFSET = 90 | ||
GROUP = "data" | ||
|
||
@pytest.fixture(scope="module") | ||
def mpi_comm(): | ||
return MPI.COMM_WORLD | ||
|
||
def generate_data(base, rank, comm): | ||
if rank == 0: | ||
source_ids = np.repeat(np.arange(SOURCE_OFFSET, SOURCE_OFFSET + NNODES), NNODES) | ||
target_ids = np.tile(np.arange(NNODES), NNODES) | ||
|
||
with h5py.File(base, 'w') as file: | ||
g = file.create_group(GROUP) | ||
g.create_dataset("source_node_id", data=source_ids) | ||
g.create_dataset("target_node_id", data=target_ids) | ||
|
||
# Verify the file exists and contains the expected datasets | ||
with h5py.File(base, 'r') as file: | ||
g = file[GROUP] | ||
assert "source_node_id" in g, "source_node_id dataset not found" | ||
assert "target_node_id" in g, "target_node_id dataset not found" | ||
|
||
comm.Barrier() | ||
|
||
def test_indexing(mpi_comm): | ||
rank = mpi_comm.Get_rank() | ||
base = "index_test.h5" | ||
|
||
# Step 1: Create and write to file using all ranks | ||
generate_data(base, rank, mpi_comm) | ||
|
||
# Step 2: Call index writer from all nodes | ||
index_writer_py.write(base, GROUP, SOURCE_OFFSET + NNODES, NNODES) | ||
|
||
# Ensure all processes have completed writing before verification | ||
mpi_comm.Barrier() | ||
|
||
# Step 3: Verify from all ranks | ||
with h5py.File(base, 'r') as f: | ||
g = f[GROUP] | ||
gidx = g['indices'] | ||
sidx = gidx['source_to_target'] | ||
tidx = gidx['target_to_source'] | ||
|
||
source_ranges = sidx['node_id_to_ranges'][:] | ||
source_edges = sidx['range_to_edge_id'][:] | ||
target_ranges = tidx['node_id_to_ranges'][:] | ||
target_edges = tidx['range_to_edge_id'][:] | ||
|
||
# Check index sizes | ||
assert len(source_ranges) == SOURCE_OFFSET + NNODES | ||
assert len(target_ranges) == NNODES | ||
assert len(source_edges) == NNODES | ||
assert len(target_edges) == NNODES * NNODES | ||
|
||
# Check source index | ||
for i in range(SOURCE_OFFSET): | ||
assert source_ranges[i][0] == 0 | ||
assert source_ranges[i][1] == 0 | ||
|
||
for i in range(NNODES): | ||
assert source_ranges[SOURCE_OFFSET + i][0] == i | ||
assert source_ranges[SOURCE_OFFSET + i][1] == i + 1 | ||
assert source_edges[i][0] == NNODES * i | ||
assert source_edges[i][1] == NNODES * (i + 1) | ||
|
||
# Check target index | ||
for i in range(NNODES): | ||
assert target_ranges[i][0] == NNODES * i | ||
assert target_ranges[i][1] == NNODES * (i + 1) | ||
|
||
for j in range(NNODES): | ||
assert target_edges[NNODES * i + j][0] == NNODES * j + i | ||
assert target_edges[NNODES * i + j][1] == NNODES * j + i + 1 | ||
|
||
# Ensure all processes have completed verification | ||
mpi_comm.Barrier() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is just reiterating what the code does, does not provide a why.