Skip to content

Commit

Permalink
more code in python client library and some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jan 31, 2024
1 parent 75f35f0 commit e2fd394
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 20 deletions.
2 changes: 1 addition & 1 deletion include/bedrock/SSGManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SSGManager {
*/
std::shared_ptr<NamedDependency>
createGroup(const std::string& name,
const ssg_group_config_t* config,
const ssg_group_config_t& config,
const std::shared_ptr<NamedDependency>& pool,
const std::string& bootstrap_method,
const std::string& group_file = "");
Expand Down
182 changes: 175 additions & 7 deletions python/mochi/bedrock/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,190 @@
import pybedrock_client
import pymargo.core
import pymargo
import json
from .spec import ProcSpec, XstreamSpec, PoolSpec, AbtIOSpec, SSGSpec, ProviderSpec, ClientSpec


class Client(pybedrock_client.Client):
ClientException = pybedrock_client.Exception


class ServiceHandle:

def __init__(self, internal, client):
self._internal = internal
self._client = client

@property
def client(self):
return self._client

@property
def config(self):
return json.loads(self._internal.get_config())

@property
def spec(self):
return ProcSpec.from_dict(self.config)

def query(self, script: str):
return json.loads(self._internal.query_config(script))

def load_module(self, name: str, path: str):
self._internal.load_module(name, path)

def _ensure_config_str(self, config):
if isinstance(config, str):
return config
elif isinstance(config, dict):
return json.dumps(config)
else:
return config.to_json()

def add_pool(self, config: str|dict|PoolSpec):
config = self._ensure_config_str(config)
self._internal.add_pool(config)

def remove_pool(self, name: str):
self._internal.remove_pool(name)

def add_xstream(self, config: str|dict|XstreamSpec):
config = self._ensure_config_str(config)
self._internal.add_xstream(config)

def remove_xstream(self, name: str):
self._internal.remove_xstream(name)

def add_ssg_group(self, config: str|dict|SSGSpec):
config = self._ensure_config_str(config)
self._internal.add_ssg_group(config)

def add_abtio_instance(self, config: str|dict|AbtIOSpec):
if isinstance(config, str):
config = json.loads(config)
elif isinstance(config, AbtIOSpec):
config = config.to_dict()
if "config" not in config:
config["config"] = {}
self._internal.add_abtio_instance(
name=config["name"], pool=config["pool"],
config=json.dumps(config["config"]))

def add_client(self, config: str|dict|ClientSpec):
if isinstance(config, str):
config = json.loads(config)
elif isinstance(config, AbtIOSpec):
config = config.to_dict()
if "config" not in config:
config["config"] = {}
if "dependencies" not in config:
config["dependencies"] = {}
if "tags" not in config:
config["tags"] = []
self._internal.add_client(
name=config["name"],
type=config["type"],
config=json.dumps(config["config"]),
dependencies=config["dependencies"],
tags=config["tags"])

def start_provider(self, config: str|dict|ProviderSpec):
if isinstance(config, str):
config = json.loads(config)
elif isinstance(config, AbtIOSpec):
config = config.to_dict()
if "config" not in config:
config["config"] = {}
if "dependencies" not in config:
config["dependencies"] = {}
if "tags" not in config:
config["tags"] = []
return self._internal.start_provider(
name=config["name"],
type=config["type"],
provider_id=config["provider_id"],
pool=config["pool"],
config=json.dumps(config["config"]),
dependencies=config["dependencies"],
tags=config["tags"])

def change_provider_pool(self, provider_name: str, pool_name: str):
self._internal.change_provider_pool(
provider_name, pool_name)


class ServiceGroupHandle:

def __init__(self, internal, client):
self._internal = internal
self._client = client

@property
def client(self):
return self._client

def refresh(self):
self._internal.refresh()

@property
def size(self):
return self._internal.size

def __len__(self):
return self.size

def __getitem__(self, index: int):
return ServiceHandle(self._internal[index], self.client)

@property
def config(self):
return json.loads(self._internal.get_config())

@property
def spec(self):
config = self.config
spec = {}
for k, v in config.items():
spec[k] = ProcSpec.from_dict(v)
return spec

def query(self, script: str):
return json.loads(self._internal.query_config(script))


class Client:

def __init__(self, arg):
if isinstance(arg, pymargo.core.Engine):
super().__init__(arg.get_internal_mid())
self._engine = None
self._engine = arg
self._owns_engine = False
elif isinstance(arg, str):
self._engine = pymargo.core.Engine(
arg, pymargo.client)
super().__init__(self._engine.get_internal_mid())
self._engine = pymargo.core.Engine(arg, pymargo.client)
self._owns_engine = True
else:
raise TypeError(f'Invalid argument type {type(arg)}')
self._internal = pybedrock_client.Client(self._engine.get_internal_mid())

def __del__(self):
if self._engine is not None:
if self._owns_engine:
self._engine.finalize()
del self._engine

@property
def mid(self):
return self._internal.margo_instance_id

@property
def engine(self):
return self._engine

def make_service_handle(self, address: str|pymargo.core.Address, provider_id: int = 0):
if isinstance(address, pymargo.core.Address):
address = str(address)
return ServiceHandle(
self._internal.make_service_handle(address=address, provider_id=provider_id),
self)

def make_service_group_handle(self, group: str|int|list[str], provider_id: int = 0):
return ServiceGroupHandle(
self._internal.make_service_group_handle(group, provider_id),
self)
21 changes: 21 additions & 0 deletions python/mochi/bedrock/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import unittest
import mochi.bedrock.server as mbs
import mochi.bedrock.client as mbc
import mochi.bedrock.spec as spec


class TestClient(unittest.TestCase):

def test_init_client_from_address(self):
client = mbc.Client("na+sm")

def test_init_client_from_engine(self):
server = mbs.Server(address="na+sm")
client = mbc.Client(server.margo.engine)
del client
server.finalize()
del server


if __name__ == '__main__':
unittest.main()
102 changes: 102 additions & 0 deletions python/mochi/bedrock/test_service_group_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import unittest
import mochi.bedrock.server as mbs
import mochi.bedrock.client as mbc
import mochi.bedrock.spec as spec
import tempfile
import os.path


class TestServiceGroupHandleInit(unittest.TestCase):

def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
self.groupfile = os.path.join(self.tempdir.name, "group.ssg")
config = {
"ssg": [{
"name": "my_group",
"bootstrap": "init",
"group_file": self.groupfile
}]
}
self.server = mbs.Server(address="na+sm", config=config)
self.client = mbc.Client(self.server.margo.engine)

def tearDown(self):
self.tempdir.cleanup()
del self.client
self.server.finalize()
del self.server

def test_make_service_group_handle_from_file(self):
"""
Note: because the server and client are on the same process,
trying to open the group file will lead to an SSG error
about the group ID already existing.
"""
# sgh = self.client.make_service_group_handle(self.groupfile)
# self.assertIsInstance(sgh, mbc.ServiceGroupHandle)

def test_make_service_group_handle_from_gid(self):
ssg_group = self.server.ssg["my_group"]
gid = ssg_group.handle
sgh = self.client.make_service_group_handle(gid)
self.assertIsInstance(sgh, mbc.ServiceGroupHandle)

def test_make_service_group_handle_from_address(self):
address = str(self.server.margo.engine.address)
sgh = self.client.make_service_group_handle([address])
self.assertIsInstance(sgh, mbc.ServiceGroupHandle)


class TestServiceGroupHandle(unittest.TestCase):

def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
self.groupfile = os.path.join(self.tempdir.name, "group.ssg")
config = {
"ssg": [{
"name": "my_group",
"bootstrap": "init",
"group_file": self.groupfile
}]
}
self.server = mbs.Server(address="na+sm", config=config)
self.client = mbc.Client(self.server.margo.engine)
self.sgh = self.client.make_service_group_handle(
self.server.ssg["my_group"].handle)

def tearDown(self):
del self.sgh
self.tempdir.cleanup()
del self.client
self.server.finalize()
del self.server

def test_len(self):
self.assertEqual(len(self.sgh), 1)
self.assertEqual(self.sgh.size, 1)

def test_getitem(self):
self.assertIsInstance(self.sgh[0], mbc.ServiceHandle)
with self.assertRaises(mbc.ClientException):
self.sgh[1]

def test_config(self):
config = self.sgh.config
self.assertIsInstance(config, dict)
self.assertEqual(len(config.keys()), 1)
self_address = str(self.server.margo.engine.address)
self.assertIn(self_address, config)
for k in ["margo", "providers", "clients", "ssg", "abt_io", "bedrock"]:
self.assertIn(k, config[self_address])

def test_spec(self):
s = self.sgh.spec
self.assertIsInstance(s, dict)
self.assertEqual(len(s.keys()), 1)
self_address = str(self.server.margo.engine.address)
self.assertIn(self_address, s)
self.assertIsInstance(s[self_address], spec.ProcSpec)

if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion python/src/py-bedrock-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ PYBIND11_MODULE(pybedrock_server, m) {
GET_SSG_FIELD(swim_disabled);
GET_SSG_FIELD(ssg_credential);
#undef GET_SSG_FIELD
return ssg.createGroup(name, &cfg, pool, bootstrap_method, group_file);
return ssg.createGroup(name, cfg, pool, bootstrap_method, group_file);
#else
throw Exception{"Bedrock was not compiled with SSG support"};
#endif
Expand Down
Loading

0 comments on commit e2fd394

Please sign in to comment.