From a3764976df70d4985329a3b020691df6a8b40bed Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 22 Jul 2024 16:09:23 +0100 Subject: [PATCH] added ServiceSpec --- python/mochi/bedrock/spec.py | 161 +++++++++++++++++++++- python/mochi/bedrock/test_config_space.py | 22 +++ src/Server.cpp | 15 +- 3 files changed, 188 insertions(+), 10 deletions(-) diff --git a/python/mochi/bedrock/spec.py b/python/mochi/bedrock/spec.py index 7d11ccd..f4b84d7 100644 --- a/python/mochi/bedrock/spec.py +++ b/python/mochi/bedrock/spec.py @@ -41,7 +41,10 @@ def _IntegerOrConst(name: str, bounds: int|tuple[int, int], *, """ from ConfigSpace import Integer, Constant if isinstance(bounds, int): - return Constant(name=name, value=bounds, meta=meta) + c = Constant(name=name, value=bounds, meta=meta) + setattr(c, "upper", bounds) + setattr(c, "lower", bounds) + return c elif bounds[0] == bounds[1]: c = Constant(name=name, value=bounds[0], meta=meta) setattr(c, "upper", bounds[0]) @@ -1362,8 +1365,8 @@ def space(handle_cache_size: int|tuple[int,int] = 32, # Note: rpc_pool and progress_pool are categorical because AI tools should not # make the assumption that adding/removing 1 to the value will lead to a smaller # change than adding/removing a larger value. - hp_rpc_pool = _CategoricalOrConst('rpc_pool', list(range(max_num_pools)), default=1) - hp_progress_pool = _CategoricalOrConst('progress_pool', list(range(max_num_pools)), default=1) + hp_rpc_pool = _CategoricalOrConst('rpc_pool', list(range(max_num_pools)), default=0) + hp_progress_pool = _CategoricalOrConst('progress_pool', list(range(max_num_pools)), default=0) cs.add(hp_rpc_pool) cs.add(hp_progress_pool) for i in range(min_num_pools, max_num_pools): @@ -2253,13 +2256,9 @@ def space(provider_space_factories: list[tuple[str, 'ConfigurationSpace', int|tu family = provider_group['family'] provider_cs = provider_group['space'] count = provider_group.get('count', 1) - config_resolver = provider_group.get('config_resolver', None) - dependency_resolver = provider_group.get('dependency_resolver', None) default_count = count if isinstance(count, int) else count[0] hp_num_providers = _IntegerOrConst(f'providers.{family}.count', count, default=default_count) cs.add(hp_num_providers) - cs.add(Constant(f'providers.{family}.config_resolver', config_resolver)) - cs.add(Constant(f'providers.{family}.dependency_resolver', dependency_resolver)) conditions_to_add = {} for i in range(0, hp_num_providers.upper): cs.add_configuration_space( @@ -2318,6 +2317,153 @@ def from_config(config: 'Configuration', return ProcSpec(margo=margo_spec, providers=provider_specs) +@attr.s(auto_attribs=True, on_setattr=_check_validators, kw_only=True) +class ServiceSpec: + """Service specification. + + :param processes: List of ProcSpec + :type processes: list + """ + + _processes: list = attr.ib( + factory=list, + validator=instance_of(list)) + + @property + def processes(self) -> SpecListDecorator: + """Return a decorator to access the internal list of ProcSpec + and validate changes to this list. + """ + return SpecListDecorator(list=self._processes, type=ProcSpec) + + def to_dict(self) -> dict: + """Convert the ServiceSpec into a dictionary. + """ + data = {'processes': [p.to_dict() for p in self._processes]} + return data + + @staticmethod + def from_dict(data: dict) -> 'ProcSpec': + """Construct a ServiceSpec from a dictionary. + """ + processes = [] + if 'processes' in data: + for p in data['processes']: + processes.append(ProcSpec.from_dict(p)) + return ProcSpec(processes=processes) + + def to_json(self, *args, **kwargs) -> str: + """Convert the ProcSpec into a JSON string. + """ + return json.dumps(self.to_dict(), *args, **kwargs) + + @staticmethod + def from_json(json_string: str) -> 'ServiceSpec': + """Construct a ServiceSpec from a JSON string. + """ + return ServiceSpec.from_dict(json.loads(json_string)) + + def validate(self) -> NoReturn: + """Validate the state of the ServiceSpec. + """ + def check_provider_dependency(dep: str): + if '@' in value and dep.split('@')[1].isnumeric(): + rank = int(dep.split('@')[1].isnumeric()) + if rank < 0 or rank >= len(self._processes): + raise ValueError(f'Dependency {dep} refers to a non-existing rank') + + attr.validate(self) + for p in self._processes: + p.validate() + for provider in p.providers: + for key, value in provider.dependencies.items(): + if isinstance(value, str): + check_provider_dependency(value) + elif isinstance(value, list): + for dep in value: + check_provider_dependency(dep) + + @staticmethod + def space(process_space_factories: list[tuple[str, 'ConfigurationSpace', int|tuple[int,int]]] = [], + **kwargs): + """ + The process_space_factories argument is a list of dictionaries with the following format. + ``` + { + "family": "", + "space" : ConfigurationSpace, + "count" : int or tuple[int,int] + } + ``` + - "family" is a name to use in the configuration space to represent a family of processes + using the same ConfigurationSpace; + - "space" is a ConfigurationSpace generated by ProcSpec.space(); + - "count" is either an int, or a pair (int, int) (if ommitted, will default to 1). + """ + from ConfigSpace import ConfigurationSpace, GreaterThanCondition, AndConjunction, Constant + families = [f['family'] for f in process_space_factories] + if len(set(families)) != len(process_space_factories): + raise ValueError('Duplicate provider family in provider_space_factories') + cs = ConfigurationSpace() + # FIXME we are serializing the families into a string because of + # https://github.com/automl/ConfigSpace/issues/381 + # When it is fixed, just do: + # cs.add(Constant('processes.families', families)) + cs.add(Constant('processes.families', ','.join(families))) + for process_group in process_space_factories: + family = process_group['family'] + process_cs = process_group['space'] + count = process_group.get('count', 1) + default_count = count if isinstance(count, int) else count[0] + hp_num_processes = _IntegerOrConst(f'processes.{family}.count', count, default=default_count) + cs.add(hp_num_processes) + conditions_to_add = {} + for i in range(0, hp_num_processes.upper): + cs.add_configuration_space( + prefix=f'processes.{family}[{i}]', delimiter='.', + configuration_space=process_cs) + if i <= hp_num_processes.lower: + continue + # FIXME: the code bellow will not work because some of the parameters + # already have a condition attached to them and can't have more added + # see https://github.com/automl/ConfigSpace/issues/380 + for param in provider_cs: + param_key = f'processes.{family}[{i}].{param}' + if param_key not in conditions_to_add: + conditions_to_add[param_key] = [] + conditions_to_add[param_key].append( + GreaterThanCondition(cs[param_key], hp_num_providers, i)) + for param_key, conditions in conditions_to_add.items(): + if len(conditions) == 1: + cs.add(conditions[0]) + else: + cs.add(AndConjunction(*conditions)) + return cs + + @staticmethod + def from_config(config: 'Configuration', + prefix: str = '', **kwargs): + """ + Create a ServiceSpec from the provided Configuration object. + Extra parameters (**kwargs) will be propagated to the underlying + ProcSpec.from_config(). + """ + # FIXME we are serializing the families into a string because of + # https://github.com/automl/ConfigSpace/issues/381 + # When it is fixed, just do: + # families = config[f'{prefix}processes.families'] + families = config[f'{prefix}processes.families'].split(',') + proc_specs = [] + for family in families: + if len(family) == 0: + continue + proc_count = int(config[f'{prefix}processes.{family}.count']) + for i in range(proc_count): + proc_specs.append( + ProcSpec.from_config(config, prefix=f'{prefix}processes.{family}[{i}].', **kwargs)) + return ServiceSpec(processes=proc_specs) + + attr.resolve_types(MercurySpec, globals(), locals()) attr.resolve_types(PoolSpec, globals(), locals()) attr.resolve_types(SchedulerSpec, globals(), locals()) @@ -2331,3 +2477,4 @@ def from_config(config: 'Configuration', attr.resolve_types(SSGSpec, globals(), locals()) attr.resolve_types(BedrockSpec, globals(), locals()) attr.resolve_types(ProcSpec, globals(), locals()) +attr.resolve_types(ServiceSpec, globals(), locals()) diff --git a/python/mochi/bedrock/test_config_space.py b/python/mochi/bedrock/test_config_space.py index 3436221..9290f15 100644 --- a/python/mochi/bedrock/test_config_space.py +++ b/python/mochi/bedrock/test_config_space.py @@ -111,6 +111,28 @@ def resolve_provider_dependencies(config: 'Configuration', prefix: str) -> dict: config = space.sample_configuration() spec = ProcSpec.from_config(address='na+sm', config=config) + def test_service_config_space(self): + + proc_type_a = ProcSpec.space(num_pools=2, num_xstreams=1) + proc_type_b = ProcSpec.space(num_pools=1, num_xstreams=2) + + space = ServiceSpec.space( + process_space_factories=[ + { + 'family': 'proc_type_a', + 'space': proc_type_a, + 'count': 2 + }, + { + 'family': 'proc_type_b', + 'space': proc_type_b, + 'count': 2 + } + ]) + + config = space.sample_configuration() + + spec = ServiceSpec.from_config(address='na+sm', config=config) if __name__ == '__main__': unittest.main() diff --git a/src/Server.cpp b/src/Server.cpp index 89a4e45..c79588c 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -78,10 +78,19 @@ Server::Server(const std::string& address, const std::string& configString, // If the config is an array, it should have only one entry remaining after filtering if(config.is_array()) { - if(config.size() != 1) { - throw Exception{"Configuration did not resolve to a single possibility"}; + if(config.size() == 1) { + config = config[0]; + } else { + if(!mpi.isEnabled()) { + throw Exception{"Configuration resolved to an array but MPI is not enabled"}; + } + if(config.size() == (size_t)mpi.globalSize()) { + config = config[mpi.globalRank()]; + } else { + throw Exception{"Ambiguous configuration did not resolve " + "to a single possibility for the process"}; + } } - config = config[0]; } // Extract margo section from the config