From e2fd3945e03dc2d067b8208c77ac10ee92285fff Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Wed, 31 Jan 2024 13:16:43 +0000 Subject: [PATCH] more code in python client library and some tests --- include/bedrock/SSGManager.hpp | 2 +- python/mochi/bedrock/client.py | 182 +++++++++++++++++- python/mochi/bedrock/test_client.py | 21 ++ .../bedrock/test_service_group_handle.py | 102 ++++++++++ python/src/py-bedrock-server.cpp | 2 +- src/SSGManager.cpp | 16 +- src/SSGManagerImpl.hpp | 5 +- tests/ValidConfigs.json | 4 +- 8 files changed, 314 insertions(+), 20 deletions(-) create mode 100644 python/mochi/bedrock/test_client.py create mode 100644 python/mochi/bedrock/test_service_group_handle.py diff --git a/include/bedrock/SSGManager.hpp b/include/bedrock/SSGManager.hpp index e082a9b..493f385 100644 --- a/include/bedrock/SSGManager.hpp +++ b/include/bedrock/SSGManager.hpp @@ -94,7 +94,7 @@ class SSGManager { */ std::shared_ptr createGroup(const std::string& name, - const ssg_group_config_t* config, + const ssg_group_config_t& config, const std::shared_ptr& pool, const std::string& bootstrap_method, const std::string& group_file = ""); diff --git a/python/mochi/bedrock/client.py b/python/mochi/bedrock/client.py index dd24b6a..4139684 100644 --- a/python/mochi/bedrock/client.py +++ b/python/mochi/bedrock/client.py @@ -15,22 +15,190 @@ import pybedrock_client import pymargo.core import pymargo +import json +from .spec import ProcSpec, XstreamSpec, PoolSpec, AbtIOSpec, SSGSpec, ProviderSpec, ClientSpec -class Client(pybedrock_client.Client): +ClientException = pybedrock_client.Exception + + +class ServiceHandle: + + def __init__(self, internal, client): + self._internal = internal + self._client = client + + @property + def client(self): + return self._client + + @property + def config(self): + return json.loads(self._internal.get_config()) + + @property + def spec(self): + return ProcSpec.from_dict(self.config) + + def query(self, script: str): + return json.loads(self._internal.query_config(script)) + + def load_module(self, name: str, path: str): + self._internal.load_module(name, path) + + def _ensure_config_str(self, config): + if isinstance(config, str): + return config + elif isinstance(config, dict): + return json.dumps(config) + else: + return config.to_json() + + def add_pool(self, config: str|dict|PoolSpec): + config = self._ensure_config_str(config) + self._internal.add_pool(config) + + def remove_pool(self, name: str): + self._internal.remove_pool(name) + + def add_xstream(self, config: str|dict|XstreamSpec): + config = self._ensure_config_str(config) + self._internal.add_xstream(config) + + def remove_xstream(self, name: str): + self._internal.remove_xstream(name) + + def add_ssg_group(self, config: str|dict|SSGSpec): + config = self._ensure_config_str(config) + self._internal.add_ssg_group(config) + + def add_abtio_instance(self, config: str|dict|AbtIOSpec): + if isinstance(config, str): + config = json.loads(config) + elif isinstance(config, AbtIOSpec): + config = config.to_dict() + if "config" not in config: + config["config"] = {} + self._internal.add_abtio_instance( + name=config["name"], pool=config["pool"], + config=json.dumps(config["config"])) + + def add_client(self, config: str|dict|ClientSpec): + if isinstance(config, str): + config = json.loads(config) + elif isinstance(config, AbtIOSpec): + config = config.to_dict() + if "config" not in config: + config["config"] = {} + if "dependencies" not in config: + config["dependencies"] = {} + if "tags" not in config: + config["tags"] = [] + self._internal.add_client( + name=config["name"], + type=config["type"], + config=json.dumps(config["config"]), + dependencies=config["dependencies"], + tags=config["tags"]) + + def start_provider(self, config: str|dict|ProviderSpec): + if isinstance(config, str): + config = json.loads(config) + elif isinstance(config, AbtIOSpec): + config = config.to_dict() + if "config" not in config: + config["config"] = {} + if "dependencies" not in config: + config["dependencies"] = {} + if "tags" not in config: + config["tags"] = [] + return self._internal.start_provider( + name=config["name"], + type=config["type"], + provider_id=config["provider_id"], + pool=config["pool"], + config=json.dumps(config["config"]), + dependencies=config["dependencies"], + tags=config["tags"]) + + def change_provider_pool(self, provider_name: str, pool_name: str): + self._internal.change_provider_pool( + provider_name, pool_name) + + +class ServiceGroupHandle: + + def __init__(self, internal, client): + self._internal = internal + self._client = client + + @property + def client(self): + return self._client + + def refresh(self): + self._internal.refresh() + + @property + def size(self): + return self._internal.size + + def __len__(self): + return self.size + + def __getitem__(self, index: int): + return ServiceHandle(self._internal[index], self.client) + + @property + def config(self): + return json.loads(self._internal.get_config()) + + @property + def spec(self): + config = self.config + spec = {} + for k, v in config.items(): + spec[k] = ProcSpec.from_dict(v) + return spec + + def query(self, script: str): + return json.loads(self._internal.query_config(script)) + + +class Client: def __init__(self, arg): if isinstance(arg, pymargo.core.Engine): - super().__init__(arg.get_internal_mid()) - self._engine = None + self._engine = arg + self._owns_engine = False elif isinstance(arg, str): - self._engine = pymargo.core.Engine( - arg, pymargo.client) - super().__init__(self._engine.get_internal_mid()) + self._engine = pymargo.core.Engine(arg, pymargo.client) + self._owns_engine = True else: raise TypeError(f'Invalid argument type {type(arg)}') + self._internal = pybedrock_client.Client(self._engine.get_internal_mid()) def __del__(self): - if self._engine is not None: + if self._owns_engine: self._engine.finalize() del self._engine + + @property + def mid(self): + return self._internal.margo_instance_id + + @property + def engine(self): + return self._engine + + def make_service_handle(self, address: str|pymargo.core.Address, provider_id: int = 0): + if isinstance(address, pymargo.core.Address): + address = str(address) + return ServiceHandle( + self._internal.make_service_handle(address=address, provider_id=provider_id), + self) + + def make_service_group_handle(self, group: str|int|list[str], provider_id: int = 0): + return ServiceGroupHandle( + self._internal.make_service_group_handle(group, provider_id), + self) diff --git a/python/mochi/bedrock/test_client.py b/python/mochi/bedrock/test_client.py new file mode 100644 index 0000000..f4171ec --- /dev/null +++ b/python/mochi/bedrock/test_client.py @@ -0,0 +1,21 @@ +import unittest +import mochi.bedrock.server as mbs +import mochi.bedrock.client as mbc +import mochi.bedrock.spec as spec + + +class TestClient(unittest.TestCase): + + def test_init_client_from_address(self): + client = mbc.Client("na+sm") + + def test_init_client_from_engine(self): + server = mbs.Server(address="na+sm") + client = mbc.Client(server.margo.engine) + del client + server.finalize() + del server + + +if __name__ == '__main__': + unittest.main() diff --git a/python/mochi/bedrock/test_service_group_handle.py b/python/mochi/bedrock/test_service_group_handle.py new file mode 100644 index 0000000..73c3833 --- /dev/null +++ b/python/mochi/bedrock/test_service_group_handle.py @@ -0,0 +1,102 @@ +import unittest +import mochi.bedrock.server as mbs +import mochi.bedrock.client as mbc +import mochi.bedrock.spec as spec +import tempfile +import os.path + + +class TestServiceGroupHandleInit(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.TemporaryDirectory() + self.groupfile = os.path.join(self.tempdir.name, "group.ssg") + config = { + "ssg": [{ + "name": "my_group", + "bootstrap": "init", + "group_file": self.groupfile + }] + } + self.server = mbs.Server(address="na+sm", config=config) + self.client = mbc.Client(self.server.margo.engine) + + def tearDown(self): + self.tempdir.cleanup() + del self.client + self.server.finalize() + del self.server + + def test_make_service_group_handle_from_file(self): + """ + Note: because the server and client are on the same process, + trying to open the group file will lead to an SSG error + about the group ID already existing. + """ + # sgh = self.client.make_service_group_handle(self.groupfile) + # self.assertIsInstance(sgh, mbc.ServiceGroupHandle) + + def test_make_service_group_handle_from_gid(self): + ssg_group = self.server.ssg["my_group"] + gid = ssg_group.handle + sgh = self.client.make_service_group_handle(gid) + self.assertIsInstance(sgh, mbc.ServiceGroupHandle) + + def test_make_service_group_handle_from_address(self): + address = str(self.server.margo.engine.address) + sgh = self.client.make_service_group_handle([address]) + self.assertIsInstance(sgh, mbc.ServiceGroupHandle) + + +class TestServiceGroupHandle(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.TemporaryDirectory() + self.groupfile = os.path.join(self.tempdir.name, "group.ssg") + config = { + "ssg": [{ + "name": "my_group", + "bootstrap": "init", + "group_file": self.groupfile + }] + } + self.server = mbs.Server(address="na+sm", config=config) + self.client = mbc.Client(self.server.margo.engine) + self.sgh = self.client.make_service_group_handle( + self.server.ssg["my_group"].handle) + + def tearDown(self): + del self.sgh + self.tempdir.cleanup() + del self.client + self.server.finalize() + del self.server + + def test_len(self): + self.assertEqual(len(self.sgh), 1) + self.assertEqual(self.sgh.size, 1) + + def test_getitem(self): + self.assertIsInstance(self.sgh[0], mbc.ServiceHandle) + with self.assertRaises(mbc.ClientException): + self.sgh[1] + + def test_config(self): + config = self.sgh.config + self.assertIsInstance(config, dict) + self.assertEqual(len(config.keys()), 1) + self_address = str(self.server.margo.engine.address) + self.assertIn(self_address, config) + for k in ["margo", "providers", "clients", "ssg", "abt_io", "bedrock"]: + self.assertIn(k, config[self_address]) + + def test_spec(self): + s = self.sgh.spec + self.assertIsInstance(s, dict) + self.assertEqual(len(s.keys()), 1) + self_address = str(self.server.margo.engine.address) + self.assertIn(self_address, s) + self.assertIsInstance(s[self_address], spec.ProcSpec) + +if __name__ == '__main__': + unittest.main() diff --git a/python/src/py-bedrock-server.cpp b/python/src/py-bedrock-server.cpp index bb0a3f4..dc5afe8 100644 --- a/python/src/py-bedrock-server.cpp +++ b/python/src/py-bedrock-server.cpp @@ -167,7 +167,7 @@ PYBIND11_MODULE(pybedrock_server, m) { GET_SSG_FIELD(swim_disabled); GET_SSG_FIELD(ssg_credential); #undef GET_SSG_FIELD - return ssg.createGroup(name, &cfg, pool, bootstrap_method, group_file); + return ssg.createGroup(name, cfg, pool, bootstrap_method, group_file); #else throw Exception{"Bedrock was not compiled with SSG support"}; #endif diff --git a/src/SSGManager.cpp b/src/SSGManager.cpp index de4ee25..7ed30b7 100644 --- a/src/SSGManager.cpp +++ b/src/SSGManager.cpp @@ -154,6 +154,8 @@ static void extractConfigParameters(const json& config, } else { pool = margo.getPool(config["pool"].get()); } + } else { + pool = margo.getDefaultHandlerPool(); } } #endif @@ -181,7 +183,7 @@ SSGManager::SSGManager(const MargoManager& margo, std::shared_ptr pool; extractConfigParameters(config, margo, name, bootstrap, group_config, group_file, pool); - createGroup(name, &group_config, pool, bootstrap, group_file); + createGroup(name, group_config, pool, bootstrap, group_file); }; std::vector existing_names; @@ -251,7 +253,7 @@ size_t SSGManager::getNumGroups() const { std::shared_ptr SSGManager::createGroup(const std::string& name, - const ssg_group_config_t* config, + const ssg_group_config_t& config, const std::shared_ptr& pool, const std::string& bootstrap_method, const std::string& group_file) { @@ -272,7 +274,7 @@ SSGManager::createGroup(const std::string& name, // The inner data of the ssg_entry will be set later. // The ssg_entry needs to be created here because the // membership callback needs it. - auto ssg_entry = std::make_shared(name); + auto ssg_entry = std::make_shared(name, pool); spdlog::trace("Creating SSG group {} with bootstrap method {}", name, bootstrap_method); @@ -312,7 +314,7 @@ SSGManager::createGroup(const std::string& name, std::vector addresses = {addr_str.c_str()}; ret = ssg_group_create(mid, name.c_str(), addresses.data(), 1, - const_cast(config), + const_cast(&config), SSGUpdateHandler::membershipUpdate, ssg_entry.get(), &gid); if (ret != SSG_SUCCESS) { @@ -352,7 +354,7 @@ SSGManager::createGroup(const std::string& name, } ret = ssg_group_create_mpi( mid, name.c_str(), MPI_COMM_WORLD, - const_cast(config), + const_cast(&config), SSGUpdateHandler::membershipUpdate, ssg_entry.get(), &gid); if (ret != SSG_SUCCESS) { throw DETAILED_EXCEPTION( @@ -403,7 +405,7 @@ SSGManager::createGroup(const std::string& name, } } ssg_entry->setSSGid(gid); - ssg_entry->config = *config; + ssg_entry->config = config; ssg_entry->bootstrap = bootstrap_method; ssg_entry->group_file = group_file; ssg_entry->pool = pool; @@ -446,7 +448,7 @@ SSGManager::createGroupFromConfig(const std::string& configString) { std::shared_ptr pool; extractConfigParameters(config, margo, name, bootstrap, group_config, group_file, pool); - return createGroup(name, &group_config, pool, bootstrap, group_file); + return createGroup(name, group_config, pool, bootstrap, group_file); #endif // ENABLE_SSG } diff --git a/src/SSGManagerImpl.hpp b/src/SSGManagerImpl.hpp index 2d2aba4..524f061 100644 --- a/src/SSGManagerImpl.hpp +++ b/src/SSGManagerImpl.hpp @@ -48,11 +48,12 @@ class SSGEntry : public NamedDependency { std::string group_file; std::shared_ptr pool; - SSGEntry(std::string name) + SSGEntry(std::string name, std::shared_ptr p) : NamedDependency( std::move(name), "ssg", SSG_GROUP_ID_INVALID, std::function()) + , pool{std::move(p)} {} SSGEntry(const SSGEntry&) = delete; @@ -67,7 +68,7 @@ class SSGEntry : public NamedDependency { c["name"] = getName(); c["bootstrap"] = bootstrap; c["group_file"] = group_file; - if(pool) c["pool"] = pool->getName(); + c["pool"] = pool->getName(); c["credential"] = config.ssg_credential; c["swim"] = json::object(); auto& swim = c["swim"]; diff --git a/tests/ValidConfigs.json b/tests/ValidConfigs.json index 79bac50..cd32333 100644 --- a/tests/ValidConfigs.json +++ b/tests/ValidConfigs.json @@ -20,13 +20,13 @@ { "test": "defining an SSG group with mpi bootstrapping", "input": {"ssg":[{"name":"my_ssg_group","bootstrap":"mpi"}]}, - "output": {"abt_io":[],"bedrock":{"pool":"__primary__","provider_id":0},"clients":[],"libraries":{},"margo":{"argobots":{"abt_mem_max_num_stacks":8,"abt_thread_stacksize":2097152,"lazy_stack_alloc":false,"pools":[{"access":"mpmc","kind":"fifo_wait","name":"__primary__"}],"profiling_dir":".","xstreams":[{"name":"__primary__","scheduler":{"pools":[0],"type":"basic_wait"}}]},"enable_abt_profiling":false,"handle_cache_size":32,"progress_pool":0,"progress_timeout_ub_msec":100,"rpc_pool":0},"providers":[],"ssg":[{"bootstrap":"mpi","credential":-1,"group_file":"","name":"my_ssg_group","swim":{"disabled":false,"period_length_ms":0,"subgroup_member_count":-1,"suspect_timeout_periods":-1}}],"mona":[]} + "output": {"abt_io":[],"bedrock":{"pool":"__primary__","provider_id":0},"clients":[],"libraries":{},"margo":{"argobots":{"abt_mem_max_num_stacks":8,"abt_thread_stacksize":2097152,"lazy_stack_alloc":false,"pools":[{"access":"mpmc","kind":"fifo_wait","name":"__primary__"}],"profiling_dir":".","xstreams":[{"name":"__primary__","scheduler":{"pools":[0],"type":"basic_wait"}}]},"enable_abt_profiling":false,"handle_cache_size":32,"progress_pool":0,"progress_timeout_ub_msec":100,"rpc_pool":0},"providers":[],"ssg":[{"bootstrap":"mpi","credential":-1,"group_file":"","name":"my_ssg_group","pool":"__primary__","swim":{"disabled":false,"period_length_ms":0,"subgroup_member_count":-1,"suspect_timeout_periods":-1}}],"mona":[]} }, { "test": "defining an SSG group with init bootstrapping", "input": {"ssg":[{"name":"my_ssg_group","bootstrap":"init"}]}, - "output": {"abt_io":[],"bedrock":{"pool":"__primary__","provider_id":0},"clients":[],"libraries":{},"margo":{"argobots":{"abt_mem_max_num_stacks":8,"abt_thread_stacksize":2097152,"lazy_stack_alloc":false,"pools":[{"access":"mpmc","kind":"fifo_wait","name":"__primary__"}],"profiling_dir":".","xstreams":[{"name":"__primary__","scheduler":{"pools":[0],"type":"basic_wait"}}]},"enable_abt_profiling":false,"handle_cache_size":32,"progress_pool":0,"progress_timeout_ub_msec":100,"rpc_pool":0},"providers":[],"ssg":[{"bootstrap":"init","credential":-1,"group_file":"","name":"my_ssg_group","swim":{"disabled":false,"period_length_ms":0,"subgroup_member_count":-1,"suspect_timeout_periods":-1}}],"mona":[]} + "output": {"abt_io":[],"bedrock":{"pool":"__primary__","provider_id":0},"clients":[],"libraries":{},"margo":{"argobots":{"abt_mem_max_num_stacks":8,"abt_thread_stacksize":2097152,"lazy_stack_alloc":false,"pools":[{"access":"mpmc","kind":"fifo_wait","name":"__primary__"}],"profiling_dir":".","xstreams":[{"name":"__primary__","scheduler":{"pools":[0],"type":"basic_wait"}}]},"enable_abt_profiling":false,"handle_cache_size":32,"progress_pool":0,"progress_timeout_ub_msec":100,"rpc_pool":0},"providers":[],"ssg":[{"bootstrap":"init","credential":-1,"group_file":"","name":"my_ssg_group","pool":"__primary__","swim":{"disabled":false,"period_length_ms":0,"subgroup_member_count":-1,"suspect_timeout_periods":-1}}],"mona":[]} }, {