Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python bindings for index writer #8

Open
wants to merge 74 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
30bd7fe
Add index_writer.py
atemerev Sep 10, 2024
cc72148
Add nanobind bindings for index writer C++ code and update CMake build
atemerev Sep 10, 2024
4950834
Nanobind CMake fixes
atemerev Sep 10, 2024
1fe3e1a
feat: add sample code to index_writer.py
atemerev Sep 11, 2024
b4754b5
CMake/nanobind fix for PIC build error
atemerev Sep 17, 2024
9e9a1e1
Index writer python bindings as index_writer_py
atemerev Sep 17, 2024
e088495
Remove include directories redundancy
atemerev Sep 23, 2024
e882b38
feat: Integrate neuron_parquet and index_writer projects into a singl…
atemerev Oct 3, 2024
b11054f
fix: Add HighFive library to CMakeLists.txt
atemerev Oct 3, 2024
6ac9203
fix: Remove unnecessary installation of Python module
atemerev Oct 3, 2024
15ddf73
fix: Add HighFive include directories and link HighFive library to in…
atemerev Oct 3, 2024
dd65b72
fix: Link MPI library to touch2parquet target
atemerev Oct 3, 2024
469a132
feat: Add MPI support to touch2parquet target
atemerev Oct 3, 2024
dbfc2a1
build: Update MPI linking for tests
atemerev Oct 3, 2024
5ccaaae
feat: Add MPI setup and teardown to test_indexing.cpp
atemerev Oct 3, 2024
094fef0
test indexing revert
atemerev Oct 3, 2024
f231482
feat: Add MPI initialization to index_writer.py
atemerev Oct 3, 2024
61e3cba
feat: Add MPI support to index bindings and writer
atemerev Oct 3, 2024
49e7edc
fix: Use correct method to extract MPI communicator from nanobind object
atemerev Oct 3, 2024
5f22072
fix: Use MPI4Py to extract MPI_Comm from Python object
atemerev Oct 3, 2024
c225eca
fix: Remove direct inclusion of mpi4py header and use PyCapsule API t…
atemerev Oct 3, 2024
2f0b112
fix: Replace deprecated HighFive::MPIOFileDriver with HighFive::MPIOF…
atemerev Oct 3, 2024
a64b026
fix: add MPI_INFO_NULL parameter to MPIOFileAccess constructor
atemerev Oct 3, 2024
d9e24d0
refactor: Create datasets without parallel access in index_writer.py
atemerev Oct 3, 2024
430bb0a
refactor: Remove parallel I/O for file creation
atemerev Oct 3, 2024
de41a36
fix: Remove rank participation message in file creation
atemerev Oct 3, 2024
acfc865
fix: Correct MPI communicator handling between Python and C++
atemerev Oct 3, 2024
db0a89c
fix: Remove direct inclusion of mpi4py header and use PyCapsule API t…
atemerev Oct 3, 2024
162e877
fix: Correctly handle MPI communicator in C++ bindings
atemerev Oct 3, 2024
bf27457
fix: Use correct MPI communicator in index_writer.py
atemerev Oct 3, 2024
c492579
fix: Handle MPI communicator correctly
atemerev Oct 3, 2024
0a4aa44
fix: Remove attempts to pass MPI communicator from Python
atemerev Oct 3, 2024
9d17694
Python added to CI build
atemerev Oct 8, 2024
250b692
Python3 dep to dockerfile
atemerev Oct 8, 2024
2c73722
nanobind dependency
atemerev Oct 8, 2024
305f876
nanobind dependency fix
atemerev Oct 8, 2024
42e2f3c
nanobind dependency fix2
atemerev Oct 8, 2024
14acdd5
H5 file driver not needed
atemerev Oct 8, 2024
1ab568a
feat: Port test_indexing.cpp to Python using the new bindings
atemerev Oct 10, 2024
bc2baaa
refactor: update test to use MPI communicator fixture
atemerev Oct 10, 2024
b3c839b
fix: Make the code path more deterministic
atemerev Oct 10, 2024
e4dc308
chore: Add tracing and logging to understand code behavior
atemerev Oct 10, 2024
d377355
fix: Ensure errors from C++ bindings are properly propagated and disp…
atemerev Oct 10, 2024
b215579
fix: Add verification step to ensure datasets are written correctly
atemerev Oct 10, 2024
a4a5843
fix: add GROUP constant definition to index_bindings.cpp
atemerev Oct 10, 2024
311cca7
refactor: Improve error handling in index_bindings.cpp
atemerev Oct 10, 2024
986b98e
build: Add more logging and synchronization to diagnose MPI issue
atemerev Oct 10, 2024
40fbae5
feat: Add more detailed logging to test_indexing and generate_data
atemerev Oct 10, 2024
329aa85
fix: Add timeout mechanism and detailed logging to index_writer.py
atemerev Oct 10, 2024
b35811f
fix: Add timeout mechanism for index_writer_py.write
atemerev Oct 10, 2024
3306d95
fix: Remove threading and ensure write_index completion before verifi…
atemerev Oct 10, 2024
e414888
fix: Add file existence check and improve error handling in write_index
atemerev Oct 10, 2024
918536f
fix: resolve compilation errors in index_bindings.cpp
atemerev Oct 10, 2024
f8a70c9
fix: Ensure file existence before calling index_write
atemerev Oct 10, 2024
132b3d1
fix: Ensure file visibility on all MPI ranks before proceeding with test
atemerev Oct 10, 2024
268574c
fix: Ensure file visibility on all MPI ranks
atemerev Oct 10, 2024
1b9b5ab
fix: Use MPI-IO for file creation and verification in generate_data f…
atemerev Oct 10, 2024
642cac1
fix: Create test file in current directory
atemerev Oct 10, 2024
62ebace
fix: Update SEARCH/REPLACE block to match existing content in tests/t…
atemerev Oct 10, 2024
076f499
fix: Use MPI-IO to open HDF5 file for parallel access
atemerev Oct 10, 2024
0e79024
fix: enable parallel HDF5 support for HighFive
atemerev Oct 10, 2024
9ce55eb
fix: replace deprecated `use_mpi_comm` with `add(HighFive::MPIOFileAc…
atemerev Oct 10, 2024
bff14e8
fix: Remove MPIO driver from Python test file
atemerev Oct 10, 2024
bca9a43
chore: remove std::cout debug logging from index_bindings
atemerev Oct 10, 2024
64a3f60
chore: remove debug logging from python test_indexing
atemerev Oct 10, 2024
5ec7f92
feat: run verification from all nodes
atemerev Oct 10, 2024
32c80a9
build: Exclude MPI test from regular test run and add separate job to…
atemerev Oct 10, 2024
2af1d9c
chore: install h5py and add build directory to PYTHONPATH in CI script
atemerev Oct 10, 2024
4f40a3b
build: Combine environment variable settings and mpirun command into …
atemerev Oct 10, 2024
71f11c5
build: Update MPI run options to -n 2 --oversubscribe
atemerev Oct 10, 2024
8a7f8a4
Cleanup, remove unnecessary comments etc
atemerev Oct 21, 2024
29c6ae4
Logging cleanup
atemerev Oct 21, 2024
147e402
Group path parameter to the API
atemerev Oct 24, 2024
8894292
init_mpi no longer needed
atemerev Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ jobs:
run: |
sudo apt-get install -y cmake g++ libhdf5-openmpi-dev librange-v3-dev ninja-build nlohmann-json3-dev
sudo apt-get install -y libarrow-dev libparquet-dev
sudo apt-get install -y python3 python3-dev python3-pip
sudo apt-get install -y openmpi-bin libopenmpi-dev
pip3 install nanobind mpi4py h5py

- name: Configure and build
run: |
Expand All @@ -39,11 +42,15 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5

- name: Small integration test
- name: Small integration test (excluding MPI test)
run: |
export PATH=$PATH:$PWD/install/bin
export PYTHONPATH=$PYTHONPATH:$PWD/build
python -mpip install -r tests/test_requirements.txt
python -mpytest tests
python -mpytest tests -k "not test_indexing"

- name: Run MPI test
run: PYTHONPATH=$PYTHONPATH:$PWD/build PATH=$PATH:$PWD/install/bin mpirun -n 2 --oversubscribe python -mpytest tests/test_indexing.py

build_docker_no_submodules:
runs-on: ubuntu-latest
Expand Down
47 changes: 39 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
cmake_minimum_required(VERSION 3.1)
cmake_minimum_required(VERSION 3.15)
project(neuron_parquet VERSION 0.8.1)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Look for libraries in 'lib64' as well as in 'lib'
set_property(GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS ON)

# Determine version from git if possible
find_package(Git)
execute_process(
COMMAND ${GIT_EXECUTABLE} describe --tags
Expand All @@ -15,36 +17,65 @@ execute_process(
ERROR_QUIET)

if(ECODE)
message(
"Failed to determine version from git, using ${CMAKE_PROJECT_VERSION}")
message("Failed to determine version from git, using ${CMAKE_PROJECT_VERSION}")
set(NEURONPARQUET_VERSION ${CMAKE_PROJECT_VERSION})
endif()

# Find required packages
find_package(MPI REQUIRED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Find required packages

Comment is just reiterating what the code does, does not provide a why.

find_package(Arrow REQUIRED)

# Add MPI include directories and definitions
include_directories(SYSTEM ${MPI_INCLUDE_PATH})
add_definitions(${MPI_CXX_COMPILE_FLAGS})
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_CXX_LINK_FLAGS}")
add_definitions(-DOMPI_SKIP_MPICXX)
get_filename_component(MY_SEARCH_DIR ${Arrow_CONFIG} DIRECTORY)
Comment on lines +28 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use MPI targets

Comment on lines +27 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Add MPI include directories and definitions
include_directories(SYSTEM ${MPI_INCLUDE_PATH})
add_definitions(${MPI_CXX_COMPILE_FLAGS})
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_CXX_LINK_FLAGS}")
add_definitions(-DOMPI_SKIP_MPICXX)

Please link against the CMake targets for MPI. For example like this: https://github.com/BlueBrain/HighFive/blob/9d4c77e719dc3fcad77ba13d809ba2cdf9f845d2/cmake/HighFiveConfig.cmake#L15

find_package(Parquet REQUIRED HINTS ${MY_SEARCH_DIR})
find_package(HDF5 REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(Range-v3 REQUIRED)

find_package(HighFive)
find_package(CLI11)
find_package(Catch2)
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)

if(NOT HighFive_FOUND)
set(HDF5_IS_PARALLEL ON CACHE BOOL "Enable parallel HDF5" FORCE)
add_subdirectory(${CMAKE_SOURCE_DIR}/deps/highfive)
endif()

find_package(CLI11)
if(NOT CLI11_FOUND)
add_subdirectory(${CMAKE_SOURCE_DIR}/deps/cli11)
endif()

add_subdirectory(src)

find_package(Catch2)
if(NOT ${Catch2_FOUND})
add_subdirectory(deps/catch2 EXCLUDE_FROM_ALL)
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/deps/catch2/extras)
endif()

execute_process(
COMMAND "${Python_EXECUTABLE}" -m nanobind --cmake_dir
OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE nanobind_ROOT)
find_package(nanobind CONFIG REQUIRED)

Comment on lines +57 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally vendor submodules for bindings, with a parameter to specify if the bindings are built.

For example here: https://github.com/BlueBrain/MorphIO/blob/master/CMakeLists.txt#L10

add_subdirectory(src)

enable_testing()
add_subdirectory(tests)

add_library(index_writer STATIC
src/index/index.cpp
)

set_target_properties(index_writer PROPERTIES POSITION_INDEPENDENT_CODE ON)

target_link_libraries(index_writer PRIVATE HighFive HDF5::HDF5)

nanobind_add_module(index_writer_py src/index/index_bindings.cpp)

target_include_directories(index_writer_py PRIVATE ${HighFive_INCLUDE_DIRS})
target_link_libraries(index_writer_py PRIVATE index_writer HighFive)

# Install the Python module
#install(TARGETS index_writer_py DESTINATION .)
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ RUN apt-get update \
lsb-release \
ninja-build \
nlohmann-json3-dev \
python3 \
python3-dev \
python3-pip \
wget
RUN wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb \
&& apt-get install -y ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb \
Expand All @@ -21,6 +24,8 @@ RUN wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short |
libarrow-dev \
libparquet-dev

RUN pip3 install nanobind --break-system-packages

VOLUME /highfive
RUN git clone https://github.com/BlueBrain/HighFive /highfive/src \
&& cmake -B /highfive/build -S /highfive/src -DCMAKE_INSTALL_PREFIX=/highfive/install -DHIGHFIVE_UNIT_TESTS=OFF -DHIGHFIVE_EXAMPLES=OFF -DHIGHFIVE_BUILD_DOCS=OFF \
Expand Down
34 changes: 34 additions & 0 deletions python/index_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import index_writer_py
from mpi4py import MPI
import logging
import sys
import traceback

logger = logging.getLogger(__name__)

def write_index(filename, group_path, source_node_count, target_node_count):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
logger.info(f"Rank {rank}/{size}: Starting write_index")
try:
logger.info(
f"Rank {rank}/{size}: Filename: {filename}, Source node count: {source_node_count}, Target node count: {target_node_count}")

start_time = MPI.Wtime()
index_writer_py.write(filename, group_path, source_node_count, target_node_count)
end_time = MPI.Wtime()

logger.info(
f"Rank {rank}/{size}: After index_writer_py.write, execution time: {end_time - start_time:.2f} seconds")

# Ensure all processes have completed writing
comm.Barrier()
logger.info(f"Rank {rank}/{size}: After final barrier")
except Exception as e:
logger.error(f"Rank {rank}/{size}: Error writing index: {e}")
logger.error(traceback.format_exc())
raise
finally:
logger.info(f"Rank {rank}/{size}: Completed write_index")
sys.stdout.flush()
3 changes: 2 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ target_compile_options(CircuitParquet PRIVATE -Werror=unused-result)
add_executable(touch2parquet touch2parquet.cpp)
target_link_libraries(touch2parquet
TouchParquet
CLI11::CLI11)
CLI11::CLI11
MPI::MPI_CXX)

add_executable(parquet2hdf5 parquet2hdf5.cpp)
target_link_libraries(parquet2hdf5
Expand Down
45 changes: 45 additions & 0 deletions src/index/index_bindings.cpp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to, e.g., bindings/indexing.cpp - and move the associated CMake code in the same directory.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <nanobind/nanobind.h>
#include <nanobind/stl/string.h>
#include <highfive/H5File.hpp>
#include <mpi.h>
#include "index.h"
#include <filesystem>

namespace nb = nanobind;

void write_index(const std::string& filename, const std::string& group_path, uint64_t sourceNodeCount, uint64_t targetNodeCount) {
try {
// Check if the file exists
if (!std::filesystem::exists(filename)) {
throw std::runtime_error("File does not exist: " + filename);
}

// Open the file in read-write mode with MPI-IO
HighFive::FileAccessProps fapl;
fapl.add(HighFive::MPIOFileAccess(MPI_COMM_WORLD, MPI_INFO_NULL));
HighFive::File file(filename, HighFive::File::ReadWrite, fapl);

if (!file.exist(group_path)) {
throw std::runtime_error("Group '" + std::string(group_path) + "' not found in file");
}

HighFive::Group group = file.getGroup(group_path);

if (!group.exist("source_node_id") || !group.exist("target_node_id")) {
throw std::runtime_error("Required datasets 'source_node_id' or 'target_node_id' not found in group '" + std::string(group_path) + "'");
}

indexing::write(group, sourceNodeCount, targetNodeCount);
} catch (const HighFive::Exception& e) {
throw std::runtime_error("HighFive error in write_index: " + std::string(e.what()));
} catch (const std::exception& e) {
throw std::runtime_error("Error in write_index: " + std::string(e.what()));
} catch (...) {
throw std::runtime_error("Unknown error in write_index");
}
}

NB_MODULE(index_writer_py, m) {
m.def("write", &write_index, "Write index to HDF5 file using parallel I/O",
nb::arg("filename"), nb::arg("groupPath"), nb::arg("sourceNodeCount"), nb::arg("targetNodeCount"));
}
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ set_tests_properties(touches_conversion_v2 parquet_conversion_v2

add_executable(test_indexing test_indexing.cpp
${${PROJECT_NAME}_SOURCE_DIR}/src/index/index.cpp)
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_C)
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_CXX)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_CXX)
target_link_libraries(test_indexing Catch2::Catch2WithMain HighFive MPI::MPI_C)

We don't use C++ MPI features normally, so linking against the C target should be sufficient. Did you encounter any problems using the C target?

target_include_directories(
test_indexing PRIVATE $<BUILD_INTERFACE:${${PROJECT_NAME}_SOURCE_DIR}/src>)
target_compile_definitions(test_indexing PRIVATE OMPI_SKIP_MPICXX)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? I've never seen the need for this feature.


include(CTest)
include(Catch)
Expand Down
85 changes: 85 additions & 0 deletions tests/test_indexing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import pytest
import h5py
import numpy as np
from mpi4py import MPI
import index_writer_py

NNODES = 10
SOURCE_OFFSET = 90
GROUP = "data"

@pytest.fixture(scope="module")
def mpi_comm():
return MPI.COMM_WORLD

def generate_data(base, rank, comm):
if rank == 0:
source_ids = np.repeat(np.arange(SOURCE_OFFSET, SOURCE_OFFSET + NNODES), NNODES)
target_ids = np.tile(np.arange(NNODES), NNODES)

with h5py.File(base, 'w') as file:
g = file.create_group(GROUP)
g.create_dataset("source_node_id", data=source_ids)
g.create_dataset("target_node_id", data=target_ids)

# Verify the file exists and contains the expected datasets
with h5py.File(base, 'r') as file:
g = file[GROUP]
assert "source_node_id" in g, "source_node_id dataset not found"
assert "target_node_id" in g, "target_node_id dataset not found"

comm.Barrier()

def test_indexing(mpi_comm):
rank = mpi_comm.Get_rank()
base = "index_test.h5"

# Step 1: Create and write to file using all ranks
generate_data(base, rank, mpi_comm)

# Step 2: Call index writer from all nodes
index_writer_py.write(base, GROUP, SOURCE_OFFSET + NNODES, NNODES)

# Ensure all processes have completed writing before verification
mpi_comm.Barrier()

# Step 3: Verify from all ranks
with h5py.File(base, 'r') as f:
g = f[GROUP]
gidx = g['indices']
sidx = gidx['source_to_target']
tidx = gidx['target_to_source']

source_ranges = sidx['node_id_to_ranges'][:]
source_edges = sidx['range_to_edge_id'][:]
target_ranges = tidx['node_id_to_ranges'][:]
target_edges = tidx['range_to_edge_id'][:]

# Check index sizes
assert len(source_ranges) == SOURCE_OFFSET + NNODES
assert len(target_ranges) == NNODES
assert len(source_edges) == NNODES
assert len(target_edges) == NNODES * NNODES

# Check source index
for i in range(SOURCE_OFFSET):
assert source_ranges[i][0] == 0
assert source_ranges[i][1] == 0

for i in range(NNODES):
assert source_ranges[SOURCE_OFFSET + i][0] == i
assert source_ranges[SOURCE_OFFSET + i][1] == i + 1
assert source_edges[i][0] == NNODES * i
assert source_edges[i][1] == NNODES * (i + 1)

# Check target index
for i in range(NNODES):
assert target_ranges[i][0] == NNODES * i
assert target_ranges[i][1] == NNODES * (i + 1)

for j in range(NNODES):
assert target_edges[NNODES * i + j][0] == NNODES * j + i
assert target_edges[NNODES * i + j][1] == NNODES * j + i + 1

# Ensure all processes have completed verification
mpi_comm.Barrier()
Loading