From b84e56773440359a63a31939525e9c67a79cecf0 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Tue, 18 Jun 2024 13:36:50 +0100 Subject: [PATCH] better MPI management and possibility to use Flock files in bedrock-query and bedrock-shutdown --- CMakeLists.txt | 1 + bin/bedrock-query.cpp | 30 +++++++++++++++++++++-- bin/bedrock-shutdown.cpp | 31 ++++++++++++++++++++--- src/CMakeLists.txt | 3 ++- src/MPI.cpp | 15 ++++++++++++ src/MPI.hpp | 53 ++++++++++++++++++++++++++++++++++++++++ src/SSGManager.cpp | 9 ------- src/SSGManagerImpl.hpp | 3 --- src/Server.cpp | 32 +++--------------------- src/ServerImpl.hpp | 2 ++ 10 files changed, 132 insertions(+), 47 deletions(-) create mode 100644 src/MPI.cpp create mode 100644 src/MPI.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b6a689a..1eb1230 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,7 @@ set (BEDROCK_VERSION_MINOR 11) set (BEDROCK_VERSION_PATCH 0) set (BEDROCK_VERSION "${BEDROCK_VERSION_MAJOR}.${BEDROCK_VERSION_MINOR}.${BEDROCK_VERSION_PATCH}") +add_definitions ("-DBEDROCK_VERSION=${BEDROCK_VERSION}") # add our cmake module directory to the path set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} diff --git a/bin/bedrock-query.cpp b/bin/bedrock-query.cpp index 4ea195d..d39bcbc 100644 --- a/bin/bedrock-query.cpp +++ b/bin/bedrock-query.cpp @@ -21,12 +21,13 @@ static std::string g_protocol; static std::vector g_addresses; static std::string g_log_level; static std::string g_ssg_file; +static std::string g_flock_file; static std::string g_jx9_file; static std::string g_jx9_script_content; static uint16_t g_provider_id; static bool g_pretty; -static void parseCommandLine(int argc, char** argv); +static void parseCommandLine(int argc, char** argv); int main(int argc, char** argv) { parseCommandLine(argc, argv); @@ -50,6 +51,23 @@ int main(int argc, char** argv) { ssg_init(); #endif + if(!g_flock_file.empty()) { + json flock_file_content; + std::ifstream flock_file{g_flock_file}; + if(!flock_file.good()) { + spdlog::critical("Could not open flock file {}", g_flock_file); + exit(-1); + } + flock_file >> flock_file_content; + std::unordered_set addresses; + for(auto& member : flock_file_content["members"]) { + auto& address = member["address"].get_ref(); + if(addresses.count(address)) continue; + g_addresses.push_back(address); + addresses.insert(address); + } + } + bedrock::Client client(engine); auto sgh = g_ssg_file.empty() ? client.makeServiceGroupHandle(g_addresses, g_provider_id) @@ -69,9 +87,11 @@ int main(int argc, char** argv) { } static void parseCommandLine(int argc, char** argv) { + #define VALUE(string) #string + #define TO_LITERAL(string) VALUE(string) try { TCLAP::CmdLine cmd("Query the configuration from Bedrock daemons", ' ', - "0.6.0"); + TO_LITERAL(BEDROCK_VERSION)); TCLAP::UnlabeledValueArg protocol( "protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol"); TCLAP::ValueArg logLevel( @@ -82,6 +102,10 @@ static void parseCommandLine(int argc, char** argv) { "i", "provider-id", "Provider id to use when contacting Bedrock daemons", false, 0, "int"); + TCLAP::ValueArg flockFile( + "f", "flock-file", + "Flock file from which to read addresses of Bedrock daemons", false, + "", "filename"); #ifdef ENABLE_SSG TCLAP::ValueArg ssgFile( "s", "ssg-file", @@ -97,6 +121,7 @@ static void parseCommandLine(int argc, char** argv) { false); cmd.add(protocol); cmd.add(logLevel); + cmd.add(flockFile); #ifdef ENABLE_SSG cmd.add(ssgFile); #endif @@ -107,6 +132,7 @@ static void parseCommandLine(int argc, char** argv) { cmd.parse(argc, argv); g_addresses = addresses.getValue(); g_log_level = logLevel.getValue(); + g_flock_file = flockFile.getValue(); #ifdef ENABLE_SSG g_ssg_file = ssgFile.getValue(); #endif diff --git a/bin/bedrock-shutdown.cpp b/bin/bedrock-shutdown.cpp index 22f9e0a..70705ce 100644 --- a/bin/bedrock-shutdown.cpp +++ b/bin/bedrock-shutdown.cpp @@ -20,9 +20,10 @@ static std::string g_protocol; static std::vector g_addresses; static std::string g_log_level; static std::string g_ssg_file; +static std::string g_flock_file; -static void parseCommandLine(int argc, char** argv); -static void resolveSSGAddresses(thallium::engine& engine); +static void parseCommandLine(int argc, char** argv); +static void resolveSSGAddresses(thallium::engine& engine); int main(int argc, char** argv) { parseCommandLine(argc, argv); @@ -50,9 +51,11 @@ int main(int argc, char** argv) { } static void parseCommandLine(int argc, char** argv) { + #define VALUE(string) #string + #define TO_LITERAL(string) VALUE(string) try { TCLAP::CmdLine cmd("Query the configuration from Bedrock daemons", ' ', - "0.4"); + TO_LITERAL(BEDROCK_VERSION)); TCLAP::UnlabeledValueArg protocol( "protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol"); TCLAP::ValueArg logLevel( @@ -63,17 +66,23 @@ static void parseCommandLine(int argc, char** argv) { "s", "ssg-file", "SSG file from which to read addresses of Bedrock daemons", false, "", "filename"); + TCLAP::ValueArg flockFile( + "f", "flock-file", + "Flock file from which to read addresses of Bedrock daemons", false, + "", "filename"); TCLAP::MultiArg addresses( "a", "addresses", "Address of a Bedrock daemon", false, "address"); TCLAP::SwitchArg prettyJSON("p", "pretty", "Print human-readable JSON", false); cmd.add(protocol); cmd.add(logLevel); + cmd.add(flockFile); cmd.add(ssgFile); cmd.add(addresses); cmd.parse(argc, argv); g_addresses = addresses.getValue(); g_log_level = logLevel.getValue(); + g_flock_file = flockFile.getValue(); g_ssg_file = ssgFile.getValue(); g_protocol = protocol.getValue(); } catch (TCLAP::ArgException& e) { @@ -84,6 +93,22 @@ static void parseCommandLine(int argc, char** argv) { } static void resolveSSGAddresses(thallium::engine& engine) { + if(!g_flock_file.empty()) { + json flock_file_content; + std::ifstream flock_file{g_flock_file}; + if(!flock_file.good()) { + spdlog::critical("Could not open flock file {}", g_flock_file); + exit(-1); + } + flock_file >> flock_file_content; + std::unordered_set addresses; + for(auto& member : flock_file_content["members"]) { + auto& address = member["address"].get_ref(); + if(addresses.count(address)) continue; + g_addresses.push_back(address); + addresses.insert(address); + } + } if (g_ssg_file.empty()) return; #ifndef ENABLE_SSG (void)engine; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5998224..9fc9abf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,7 +12,8 @@ set (server-src-files Module.cpp MargoLogging.cpp NamedDependency.cpp - Jx9Manager.cpp) + Jx9Manager.cpp + MPI.cpp) set (client-src-files Client.cpp diff --git a/src/MPI.cpp b/src/MPI.cpp new file mode 100644 index 0000000..6e6a03c --- /dev/null +++ b/src/MPI.cpp @@ -0,0 +1,15 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include "MPI.hpp" + +namespace bedrock { + +#ifdef ENABLE_MPI +bool MPI::s_initialized_mpi; +size_t MPI::s_initiaze_mpi_count; +#endif + +} // namespace bedrock diff --git a/src/MPI.hpp b/src/MPI.hpp new file mode 100644 index 0000000..6095522 --- /dev/null +++ b/src/MPI.hpp @@ -0,0 +1,53 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __BEDROCK_MPI_H +#define __BEDROCK_MPI_H + +#include +#ifdef ENABLE_MPI +#include +#endif + +namespace bedrock { + +class MPI { + +#ifdef ENABLE_MPI + static bool s_initialized_mpi; + static size_t s_initiaze_mpi_count; +#endif + + public: + + MPI() { +#ifdef ENABLE_MPI + int mpi_is_initialized; + MPI_Initialized(&mpi_is_initialized); + if(!mpi_is_initialized) { + MPI_Init(nullptr, nullptr); + s_initialized_mpi = true; + } + if(s_initialized_mpi) + s_initiaze_mpi_count += 1; +#endif + } + + ~MPI() { +#ifdef ENABLE_MPI + if(s_initialized_mpi) { + s_initiaze_mpi_count -= 1; + if(s_initiaze_mpi_count == 0) { + //MPI_Finalize(); + } + s_initialized_mpi = false; + } +#endif + } +}; + +} // namespace bedrock + +#endif diff --git a/src/SSGManager.cpp b/src/SSGManager.cpp index 18f2954..148685b 100644 --- a/src/SSGManager.cpp +++ b/src/SSGManager.cpp @@ -29,9 +29,6 @@ using nlohmann::json; using namespace std::string_literals; int SSGManagerImpl::s_num_ssg_init = 0; -#ifdef ENABLE_MPI -bool SSGManagerImpl::s_initialized_mpi = false; -#endif #ifdef ENABLE_PMIX bool SSGManagerImpl::s_initialized_pmix = false; #endif @@ -238,12 +235,6 @@ SSGManager::addGroup(const std::string& name, } else if (method == "mpi") { #ifdef ENABLE_MPI - int flag; - MPI_Initialized(&flag); - if (!flag) { - MPI_Init(NULL, NULL); - SSGManagerImpl::s_initialized_mpi = true; - } ret = ssg_group_create_mpi( mid, name.c_str(), MPI_COMM_WORLD, const_cast(&config), diff --git a/src/SSGManagerImpl.hpp b/src/SSGManagerImpl.hpp index 9e7fa3d..0ba6871 100644 --- a/src/SSGManagerImpl.hpp +++ b/src/SSGManagerImpl.hpp @@ -174,9 +174,6 @@ class SSGManagerImpl { static int s_num_ssg_init; -#ifdef ENABLE_MPI - static bool s_initialized_mpi; -#endif #ifdef ENABLE_PMIX static bool s_initialized_pmix; #endif diff --git a/src/Server.cpp b/src/Server.cpp index 1197635..1868f7b 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -19,9 +19,6 @@ #include #include #include -#ifdef ENABLE_MPI -#include -#endif namespace tl = thallium; @@ -30,29 +27,13 @@ namespace bedrock { using namespace std::string_literals; using nlohmann::json; - -#ifdef ENABLE_MPI -bool s_initialized_mpi = false; -size_t s_initiaze_mpi_count = 0; -#endif - - Server::Server(const std::string& address, const std::string& configString, ConfigType configType, const Jx9ParamMap& jx9Params) { -#ifdef ENABLE_MPI - int mpi_is_initialized; - MPI_Initialized(&mpi_is_initialized); - if(!mpi_is_initialized) { - MPI_Init(nullptr, nullptr); - s_initialized_mpi = true; - } - if(s_initialized_mpi) - s_initiaze_mpi_count += 1; -#endif - std::string jsonConfigString; + auto mpi = std::make_shared(); + auto jx9Manager = Jx9Manager{}; if(configType == ConfigType::JX9) { @@ -114,6 +95,7 @@ Server::Server(const std::string& address, const std::string& configString, // Create self self = std::unique_ptr( new ServerImpl(margoMgr, bedrock_provider_id, bedrock_pool)); + self->m_mpi = mpi; self->m_jx9_manager = jx9Manager; try { @@ -191,14 +173,6 @@ Server::Server(const std::string& address, const std::string& configString, Server::~Server() { if(self) finalize(); -#ifdef ENABLE_MPI - if(s_initialized_mpi) { - s_initiaze_mpi_count -= 1; - if(s_initiaze_mpi_count == 0) - MPI_Finalize(); - s_initialized_mpi = false; - } -#endif } MargoManager Server::getMargoManager() const { return self->m_margo_manager; } diff --git a/src/ServerImpl.hpp b/src/ServerImpl.hpp index 951a77b..4fb2a5f 100644 --- a/src/ServerImpl.hpp +++ b/src/ServerImpl.hpp @@ -14,6 +14,7 @@ #include "DependencyFinderImpl.hpp" #include "SSGManagerImpl.hpp" #include "Jx9ManagerImpl.hpp" +#include "MPI.hpp" #include "bedrock/Jx9Manager.hpp" #include "bedrock/RequestResult.hpp" #include "bedrock/ModuleContext.hpp" @@ -30,6 +31,7 @@ namespace tl = thallium; class ServerImpl : public tl::provider { public: + std::shared_ptr m_mpi; std::shared_ptr m_jx9_manager; std::shared_ptr m_margo_manager; std::shared_ptr m_abtio_manager;