Skip to content

Commit

Permalink
added ServiceSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jul 22, 2024
1 parent ff1d496 commit a376497
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 10 deletions.
161 changes: 154 additions & 7 deletions python/mochi/bedrock/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def _IntegerOrConst(name: str, bounds: int|tuple[int, int], *,
"""
from ConfigSpace import Integer, Constant
if isinstance(bounds, int):
return Constant(name=name, value=bounds, meta=meta)
c = Constant(name=name, value=bounds, meta=meta)
setattr(c, "upper", bounds)
setattr(c, "lower", bounds)
return c
elif bounds[0] == bounds[1]:
c = Constant(name=name, value=bounds[0], meta=meta)
setattr(c, "upper", bounds[0])
Expand Down Expand Up @@ -1362,8 +1365,8 @@ def space(handle_cache_size: int|tuple[int,int] = 32,
# Note: rpc_pool and progress_pool are categorical because AI tools should not
# make the assumption that adding/removing 1 to the value will lead to a smaller
# change than adding/removing a larger value.
hp_rpc_pool = _CategoricalOrConst('rpc_pool', list(range(max_num_pools)), default=1)
hp_progress_pool = _CategoricalOrConst('progress_pool', list(range(max_num_pools)), default=1)
hp_rpc_pool = _CategoricalOrConst('rpc_pool', list(range(max_num_pools)), default=0)
hp_progress_pool = _CategoricalOrConst('progress_pool', list(range(max_num_pools)), default=0)
cs.add(hp_rpc_pool)
cs.add(hp_progress_pool)
for i in range(min_num_pools, max_num_pools):
Expand Down Expand Up @@ -2253,13 +2256,9 @@ def space(provider_space_factories: list[tuple[str, 'ConfigurationSpace', int|tu
family = provider_group['family']
provider_cs = provider_group['space']
count = provider_group.get('count', 1)
config_resolver = provider_group.get('config_resolver', None)
dependency_resolver = provider_group.get('dependency_resolver', None)
default_count = count if isinstance(count, int) else count[0]
hp_num_providers = _IntegerOrConst(f'providers.{family}.count', count, default=default_count)
cs.add(hp_num_providers)
cs.add(Constant(f'providers.{family}.config_resolver', config_resolver))
cs.add(Constant(f'providers.{family}.dependency_resolver', dependency_resolver))
conditions_to_add = {}
for i in range(0, hp_num_providers.upper):
cs.add_configuration_space(
Expand Down Expand Up @@ -2318,6 +2317,153 @@ def from_config(config: 'Configuration',
return ProcSpec(margo=margo_spec, providers=provider_specs)


@attr.s(auto_attribs=True, on_setattr=_check_validators, kw_only=True)
class ServiceSpec:
"""Service specification.
:param processes: List of ProcSpec
:type processes: list
"""

_processes: list = attr.ib(
factory=list,
validator=instance_of(list))

@property
def processes(self) -> SpecListDecorator:
"""Return a decorator to access the internal list of ProcSpec
and validate changes to this list.
"""
return SpecListDecorator(list=self._processes, type=ProcSpec)

def to_dict(self) -> dict:
"""Convert the ServiceSpec into a dictionary.
"""
data = {'processes': [p.to_dict() for p in self._processes]}
return data

@staticmethod
def from_dict(data: dict) -> 'ProcSpec':
"""Construct a ServiceSpec from a dictionary.
"""
processes = []
if 'processes' in data:
for p in data['processes']:
processes.append(ProcSpec.from_dict(p))
return ProcSpec(processes=processes)

def to_json(self, *args, **kwargs) -> str:
"""Convert the ProcSpec into a JSON string.
"""
return json.dumps(self.to_dict(), *args, **kwargs)

@staticmethod
def from_json(json_string: str) -> 'ServiceSpec':
"""Construct a ServiceSpec from a JSON string.
"""
return ServiceSpec.from_dict(json.loads(json_string))

def validate(self) -> NoReturn:
"""Validate the state of the ServiceSpec.
"""
def check_provider_dependency(dep: str):
if '@' in value and dep.split('@')[1].isnumeric():
rank = int(dep.split('@')[1].isnumeric())
if rank < 0 or rank >= len(self._processes):
raise ValueError(f'Dependency {dep} refers to a non-existing rank')

attr.validate(self)
for p in self._processes:
p.validate()
for provider in p.providers:
for key, value in provider.dependencies.items():
if isinstance(value, str):
check_provider_dependency(value)
elif isinstance(value, list):
for dep in value:
check_provider_dependency(dep)

@staticmethod
def space(process_space_factories: list[tuple[str, 'ConfigurationSpace', int|tuple[int,int]]] = [],
**kwargs):
"""
The process_space_factories argument is a list of dictionaries with the following format.
```
{
"family": "<family-name>",
"space" : ConfigurationSpace,
"count" : int or tuple[int,int]
}
```
- "family" is a name to use in the configuration space to represent a family of processes
using the same ConfigurationSpace;
- "space" is a ConfigurationSpace generated by ProcSpec.space();
- "count" is either an int, or a pair (int, int) (if ommitted, will default to 1).
"""
from ConfigSpace import ConfigurationSpace, GreaterThanCondition, AndConjunction, Constant
families = [f['family'] for f in process_space_factories]
if len(set(families)) != len(process_space_factories):
raise ValueError('Duplicate provider family in provider_space_factories')
cs = ConfigurationSpace()
# FIXME we are serializing the families into a string because of
# https://github.com/automl/ConfigSpace/issues/381
# When it is fixed, just do:
# cs.add(Constant('processes.families', families))
cs.add(Constant('processes.families', ','.join(families)))
for process_group in process_space_factories:
family = process_group['family']
process_cs = process_group['space']
count = process_group.get('count', 1)
default_count = count if isinstance(count, int) else count[0]
hp_num_processes = _IntegerOrConst(f'processes.{family}.count', count, default=default_count)
cs.add(hp_num_processes)
conditions_to_add = {}
for i in range(0, hp_num_processes.upper):
cs.add_configuration_space(
prefix=f'processes.{family}[{i}]', delimiter='.',
configuration_space=process_cs)
if i <= hp_num_processes.lower:
continue
# FIXME: the code bellow will not work because some of the parameters
# already have a condition attached to them and can't have more added
# see https://github.com/automl/ConfigSpace/issues/380
for param in provider_cs:
param_key = f'processes.{family}[{i}].{param}'
if param_key not in conditions_to_add:
conditions_to_add[param_key] = []
conditions_to_add[param_key].append(
GreaterThanCondition(cs[param_key], hp_num_providers, i))
for param_key, conditions in conditions_to_add.items():
if len(conditions) == 1:
cs.add(conditions[0])
else:
cs.add(AndConjunction(*conditions))
return cs

@staticmethod
def from_config(config: 'Configuration',
prefix: str = '', **kwargs):
"""
Create a ServiceSpec from the provided Configuration object.
Extra parameters (**kwargs) will be propagated to the underlying
ProcSpec.from_config().
"""
# FIXME we are serializing the families into a string because of
# https://github.com/automl/ConfigSpace/issues/381
# When it is fixed, just do:
# families = config[f'{prefix}processes.families']
families = config[f'{prefix}processes.families'].split(',')
proc_specs = []
for family in families:
if len(family) == 0:
continue
proc_count = int(config[f'{prefix}processes.{family}.count'])
for i in range(proc_count):
proc_specs.append(
ProcSpec.from_config(config, prefix=f'{prefix}processes.{family}[{i}].', **kwargs))
return ServiceSpec(processes=proc_specs)


attr.resolve_types(MercurySpec, globals(), locals())
attr.resolve_types(PoolSpec, globals(), locals())
attr.resolve_types(SchedulerSpec, globals(), locals())
Expand All @@ -2331,3 +2477,4 @@ def from_config(config: 'Configuration',
attr.resolve_types(SSGSpec, globals(), locals())
attr.resolve_types(BedrockSpec, globals(), locals())
attr.resolve_types(ProcSpec, globals(), locals())
attr.resolve_types(ServiceSpec, globals(), locals())
22 changes: 22 additions & 0 deletions python/mochi/bedrock/test_config_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ def resolve_provider_dependencies(config: 'Configuration', prefix: str) -> dict:
config = space.sample_configuration()
spec = ProcSpec.from_config(address='na+sm', config=config)

def test_service_config_space(self):

proc_type_a = ProcSpec.space(num_pools=2, num_xstreams=1)
proc_type_b = ProcSpec.space(num_pools=1, num_xstreams=2)

space = ServiceSpec.space(
process_space_factories=[
{
'family': 'proc_type_a',
'space': proc_type_a,
'count': 2
},
{
'family': 'proc_type_b',
'space': proc_type_b,
'count': 2
}
])

config = space.sample_configuration()

spec = ServiceSpec.from_config(address='na+sm', config=config)

if __name__ == '__main__':
unittest.main()
15 changes: 12 additions & 3 deletions src/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,19 @@ Server::Server(const std::string& address, const std::string& configString,

// If the config is an array, it should have only one entry remaining after filtering
if(config.is_array()) {
if(config.size() != 1) {
throw Exception{"Configuration did not resolve to a single possibility"};
if(config.size() == 1) {
config = config[0];
} else {
if(!mpi.isEnabled()) {
throw Exception{"Configuration resolved to an array but MPI is not enabled"};
}
if(config.size() == (size_t)mpi.globalSize()) {
config = config[mpi.globalRank()];
} else {
throw Exception{"Ambiguous configuration did not resolve "
"to a single possibility for the process"};
}
}
config = config[0];
}

// Extract margo section from the config
Expand Down

0 comments on commit a376497

Please sign in to comment.