Skip to content

Commit

Permalink
better MPI management and possibility to use Flock files in bedrock-q…
Browse files Browse the repository at this point in the history
…uery and bedrock-shutdown
  • Loading branch information
mdorier committed Jun 18, 2024
1 parent 22122c8 commit b84e567
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 47 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set (BEDROCK_VERSION_MINOR 11)
set (BEDROCK_VERSION_PATCH 0)
set (BEDROCK_VERSION
"${BEDROCK_VERSION_MAJOR}.${BEDROCK_VERSION_MINOR}.${BEDROCK_VERSION_PATCH}")
add_definitions ("-DBEDROCK_VERSION=${BEDROCK_VERSION}")

# add our cmake module directory to the path
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
Expand Down
30 changes: 28 additions & 2 deletions bin/bedrock-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ static std::string g_protocol;
static std::vector<std::string> g_addresses;
static std::string g_log_level;
static std::string g_ssg_file;
static std::string g_flock_file;
static std::string g_jx9_file;
static std::string g_jx9_script_content;
static uint16_t g_provider_id;
static bool g_pretty;

static void parseCommandLine(int argc, char** argv);
static void parseCommandLine(int argc, char** argv);

int main(int argc, char** argv) {
parseCommandLine(argc, argv);
Expand All @@ -50,6 +51,23 @@ int main(int argc, char** argv) {
ssg_init();
#endif

if(!g_flock_file.empty()) {
json flock_file_content;
std::ifstream flock_file{g_flock_file};
if(!flock_file.good()) {
spdlog::critical("Could not open flock file {}", g_flock_file);
exit(-1);
}
flock_file >> flock_file_content;
std::unordered_set<std::string> addresses;
for(auto& member : flock_file_content["members"]) {
auto& address = member["address"].get_ref<const std::string&>();
if(addresses.count(address)) continue;
g_addresses.push_back(address);
addresses.insert(address);
}
}

bedrock::Client client(engine);

auto sgh = g_ssg_file.empty() ? client.makeServiceGroupHandle(g_addresses, g_provider_id)
Expand All @@ -69,9 +87,11 @@ int main(int argc, char** argv) {
}

static void parseCommandLine(int argc, char** argv) {
#define VALUE(string) #string
#define TO_LITERAL(string) VALUE(string)
try {
TCLAP::CmdLine cmd("Query the configuration from Bedrock daemons", ' ',
"0.6.0");
TO_LITERAL(BEDROCK_VERSION));
TCLAP::UnlabeledValueArg<std::string> protocol(
"protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol");
TCLAP::ValueArg<std::string> logLevel(
Expand All @@ -82,6 +102,10 @@ static void parseCommandLine(int argc, char** argv) {
"i", "provider-id",
"Provider id to use when contacting Bedrock daemons", false, 0,
"int");
TCLAP::ValueArg<std::string> flockFile(
"f", "flock-file",
"Flock file from which to read addresses of Bedrock daemons", false,
"", "filename");
#ifdef ENABLE_SSG
TCLAP::ValueArg<std::string> ssgFile(
"s", "ssg-file",
Expand All @@ -97,6 +121,7 @@ static void parseCommandLine(int argc, char** argv) {
false);
cmd.add(protocol);
cmd.add(logLevel);
cmd.add(flockFile);
#ifdef ENABLE_SSG
cmd.add(ssgFile);
#endif
Expand All @@ -107,6 +132,7 @@ static void parseCommandLine(int argc, char** argv) {
cmd.parse(argc, argv);
g_addresses = addresses.getValue();
g_log_level = logLevel.getValue();
g_flock_file = flockFile.getValue();
#ifdef ENABLE_SSG
g_ssg_file = ssgFile.getValue();
#endif
Expand Down
31 changes: 28 additions & 3 deletions bin/bedrock-shutdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ static std::string g_protocol;
static std::vector<std::string> g_addresses;
static std::string g_log_level;
static std::string g_ssg_file;
static std::string g_flock_file;

static void parseCommandLine(int argc, char** argv);
static void resolveSSGAddresses(thallium::engine& engine);
static void parseCommandLine(int argc, char** argv);
static void resolveSSGAddresses(thallium::engine& engine);

int main(int argc, char** argv) {
parseCommandLine(argc, argv);
Expand Down Expand Up @@ -50,9 +51,11 @@ int main(int argc, char** argv) {
}

static void parseCommandLine(int argc, char** argv) {
#define VALUE(string) #string
#define TO_LITERAL(string) VALUE(string)
try {
TCLAP::CmdLine cmd("Query the configuration from Bedrock daemons", ' ',
"0.4");
TO_LITERAL(BEDROCK_VERSION));
TCLAP::UnlabeledValueArg<std::string> protocol(
"protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol");
TCLAP::ValueArg<std::string> logLevel(
Expand All @@ -63,17 +66,23 @@ static void parseCommandLine(int argc, char** argv) {
"s", "ssg-file",
"SSG file from which to read addresses of Bedrock daemons", false,
"", "filename");
TCLAP::ValueArg<std::string> flockFile(
"f", "flock-file",
"Flock file from which to read addresses of Bedrock daemons", false,
"", "filename");
TCLAP::MultiArg<std::string> addresses(
"a", "addresses", "Address of a Bedrock daemon", false, "address");
TCLAP::SwitchArg prettyJSON("p", "pretty", "Print human-readable JSON",
false);
cmd.add(protocol);
cmd.add(logLevel);
cmd.add(flockFile);
cmd.add(ssgFile);
cmd.add(addresses);
cmd.parse(argc, argv);
g_addresses = addresses.getValue();
g_log_level = logLevel.getValue();
g_flock_file = flockFile.getValue();
g_ssg_file = ssgFile.getValue();
g_protocol = protocol.getValue();
} catch (TCLAP::ArgException& e) {
Expand All @@ -84,6 +93,22 @@ static void parseCommandLine(int argc, char** argv) {
}

static void resolveSSGAddresses(thallium::engine& engine) {
if(!g_flock_file.empty()) {
json flock_file_content;
std::ifstream flock_file{g_flock_file};
if(!flock_file.good()) {
spdlog::critical("Could not open flock file {}", g_flock_file);
exit(-1);
}
flock_file >> flock_file_content;
std::unordered_set<std::string> addresses;
for(auto& member : flock_file_content["members"]) {
auto& address = member["address"].get_ref<const std::string&>();
if(addresses.count(address)) continue;
g_addresses.push_back(address);
addresses.insert(address);
}
}
if (g_ssg_file.empty()) return;
#ifndef ENABLE_SSG
(void)engine;
Expand Down
3 changes: 2 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ set (server-src-files
Module.cpp
MargoLogging.cpp
NamedDependency.cpp
Jx9Manager.cpp)
Jx9Manager.cpp
MPI.cpp)

set (client-src-files
Client.cpp
Expand Down
15 changes: 15 additions & 0 deletions src/MPI.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "MPI.hpp"

namespace bedrock {

#ifdef ENABLE_MPI
bool MPI::s_initialized_mpi;
size_t MPI::s_initiaze_mpi_count;
#endif

} // namespace bedrock
53 changes: 53 additions & 0 deletions src/MPI.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __BEDROCK_MPI_H
#define __BEDROCK_MPI_H

#include <iostream>
#ifdef ENABLE_MPI
#include <mpi.h>
#endif

namespace bedrock {

class MPI {

#ifdef ENABLE_MPI
static bool s_initialized_mpi;
static size_t s_initiaze_mpi_count;
#endif

public:

MPI() {
#ifdef ENABLE_MPI
int mpi_is_initialized;
MPI_Initialized(&mpi_is_initialized);
if(!mpi_is_initialized) {
MPI_Init(nullptr, nullptr);
s_initialized_mpi = true;
}
if(s_initialized_mpi)
s_initiaze_mpi_count += 1;
#endif
}

~MPI() {
#ifdef ENABLE_MPI
if(s_initialized_mpi) {
s_initiaze_mpi_count -= 1;
if(s_initiaze_mpi_count == 0) {
//MPI_Finalize();
}
s_initialized_mpi = false;
}
#endif
}
};

} // namespace bedrock

#endif
9 changes: 0 additions & 9 deletions src/SSGManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ using nlohmann::json;
using namespace std::string_literals;

int SSGManagerImpl::s_num_ssg_init = 0;
#ifdef ENABLE_MPI
bool SSGManagerImpl::s_initialized_mpi = false;
#endif
#ifdef ENABLE_PMIX
bool SSGManagerImpl::s_initialized_pmix = false;
#endif
Expand Down Expand Up @@ -238,12 +235,6 @@ SSGManager::addGroup(const std::string& name,

} else if (method == "mpi") {
#ifdef ENABLE_MPI
int flag;
MPI_Initialized(&flag);
if (!flag) {
MPI_Init(NULL, NULL);
SSGManagerImpl::s_initialized_mpi = true;
}
ret = ssg_group_create_mpi(
mid, name.c_str(), MPI_COMM_WORLD,
const_cast<ssg_group_config_t*>(&config),
Expand Down
3 changes: 0 additions & 3 deletions src/SSGManagerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ class SSGManagerImpl {


static int s_num_ssg_init;
#ifdef ENABLE_MPI
static bool s_initialized_mpi;
#endif
#ifdef ENABLE_PMIX
static bool s_initialized_pmix;
#endif
Expand Down
32 changes: 3 additions & 29 deletions src/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
#include <spdlog/spdlog.h>
#include <fmt/format.h>
#include <fstream>
#ifdef ENABLE_MPI
#include <mpi.h>
#endif

namespace tl = thallium;

Expand All @@ -30,29 +27,13 @@ namespace bedrock {
using namespace std::string_literals;
using nlohmann::json;


#ifdef ENABLE_MPI
bool s_initialized_mpi = false;
size_t s_initiaze_mpi_count = 0;
#endif


Server::Server(const std::string& address, const std::string& configString,
ConfigType configType, const Jx9ParamMap& jx9Params) {

#ifdef ENABLE_MPI
int mpi_is_initialized;
MPI_Initialized(&mpi_is_initialized);
if(!mpi_is_initialized) {
MPI_Init(nullptr, nullptr);
s_initialized_mpi = true;
}
if(s_initialized_mpi)
s_initiaze_mpi_count += 1;
#endif

std::string jsonConfigString;

auto mpi = std::make_shared<MPI>();

auto jx9Manager = Jx9Manager{};

if(configType == ConfigType::JX9) {
Expand Down Expand Up @@ -114,6 +95,7 @@ Server::Server(const std::string& address, const std::string& configString,
// Create self
self = std::unique_ptr<ServerImpl>(
new ServerImpl(margoMgr, bedrock_provider_id, bedrock_pool));
self->m_mpi = mpi;
self->m_jx9_manager = jx9Manager;

try {
Expand Down Expand Up @@ -191,14 +173,6 @@ Server::Server(const std::string& address, const std::string& configString,

Server::~Server() {
if(self) finalize();
#ifdef ENABLE_MPI
if(s_initialized_mpi) {
s_initiaze_mpi_count -= 1;
if(s_initiaze_mpi_count == 0)
MPI_Finalize();
s_initialized_mpi = false;
}
#endif
}

MargoManager Server::getMargoManager() const { return self->m_margo_manager; }
Expand Down
2 changes: 2 additions & 0 deletions src/ServerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "DependencyFinderImpl.hpp"
#include "SSGManagerImpl.hpp"
#include "Jx9ManagerImpl.hpp"
#include "MPI.hpp"
#include "bedrock/Jx9Manager.hpp"
#include "bedrock/RequestResult.hpp"
#include "bedrock/ModuleContext.hpp"
Expand All @@ -30,6 +31,7 @@ namespace tl = thallium;
class ServerImpl : public tl::provider<ServerImpl> {

public:
std::shared_ptr<MPI> m_mpi;
std::shared_ptr<Jx9ManagerImpl> m_jx9_manager;
std::shared_ptr<MargoManagerImpl> m_margo_manager;
std::shared_ptr<ABTioManagerImpl> m_abtio_manager;
Expand Down

0 comments on commit b84e567

Please sign in to comment.