diff --git a/supervisor/process.py b/supervisor/process.py index 641aafb03..4f032bc4e 100644 --- a/supervisor/process.py +++ b/supervisor/process.py @@ -205,7 +205,7 @@ def spawn(self, supervisor=None): Return the process id. If the fork() call fails, return None. """ - if self.config.depends_on is not None: + if self.config.depends_on is not None and not supervisor.abort_queing : if any([dependee.state is not ProcessStates.RUNNING for dependee in self.config.depends_on.values()]): self.queue_all_dependee_processes(supervisor) diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 749f532bd..a2fa5ea1e 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -60,6 +60,7 @@ def __init__(self, options): self.ticks = {} self.process_spawn_dict = dict() self.process_started_dict = dict() + self.abort_queing = None def main(self): if not self.options.first: @@ -90,7 +91,7 @@ def run(self): for config in self.options.process_group_configs: self.add_process_group(config) # add processes to directed graph, to check for dependency cycles - g = Graph(len(self.options.process_group_configs)) + self.g = Graph(len(self.options.process_group_configs)) # replace depends_on string with actual process object for config in (self.options.process_group_configs): # check dependencies for all programs in group: @@ -105,11 +106,11 @@ def run(self): dependent_group, dependent_process=process.split(":") except: dependent_group=dependent_process=process - g.addEdge(config.process_configs[conf[0]].name, dependent_process) + self.g.addEdge(config.process_configs[conf[0]].name, dependent_process) process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process] config.process_configs[conf[0]].depends_on = process_dict # check for cyclical process dependencies - if g.cyclic() == 1: + if self.g.cyclic() == 1: raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!') self.options.openhttpservers(self) @@ -356,16 +357,23 @@ def _spawn_dependee_queue(self): if self.process_spawn_dict: for process_name, process_object in list(self.process_spawn_dict.items()): if process_object.config.depends_on is not None: - if any([dependee.state is ProcessStates.FATAL for dependee in - process_object.config.depends_on.values()]): - self._set_fatal_state_and_empty_queue() + if self._any_dependee_failed(process_object): + self._empty_queue() break - if all([dependee.state is ProcessStates.RUNNING for dependee in - process_object.config.depends_on.values()]): + if self._all_dependees_running(process_object): self._spawn_process_from_process_dict(process_name, process_object) else: self._spawn_process_from_process_dict(process_name, process_object) + def _any_dependee_failed(self, process_object): + return any([dependee.state is ProcessStates.BACKOFF or dependee.state + is ProcessStates.FATAL for dependee in + process_object.config.depends_on.values()]) + + def _all_dependees_running(self, process_object): + return all([dependee.state is ProcessStates.RUNNING for dependee in + process_object.config.depends_on.values()]) + def _spawn_process_from_process_dict(self, process_name, process_object): self.process_started_dict[process_name] = process_object del self.process_spawn_dict[process_name] @@ -375,12 +383,7 @@ def _spawn_process_from_process_dict(self, process_name, process_object): process_object.spawn(self) process_object.notify_timer = 5 - def _set_fatal_state_and_empty_queue(self): - for process_name, process_object in self.process_spawn_dict.items(): - process_object.record_spawnerr( - 'Dependee process did not start - set FATAL state for {}' - .format(process_name)) - process_object.change_state(ProcessStates.FATAL) + def _empty_queue(self): self.process_spawn_set = set() self.process_spawn_dict = dict() @@ -390,7 +393,11 @@ def _handle_spawn_timeout(self): Timeout if a process needs longer than spawn_timeout (default=60 seconds) to reach RUNNING """ - # check if any of the processes that was started did not make it and remove RUNNING ones. + # check if any of the processes which was started did not make it and + # remove RUNNING processes from the process_started_dict. + if self.abort_queing is not None: + self.abort_queing.change_state(ProcessStates.FATAL) + self.abort_queing = None if self.process_started_dict: for process_name, process_object in list(self.process_started_dict.items()): if process_object.state is ProcessStates.RUNNING: @@ -398,29 +405,43 @@ def _handle_spawn_timeout(self): # handle timeout error. elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout: self._timeout_process(process_name, process_object) + del self.process_started_dict[process_name] + break # notify user about waiting elif (time.time() - process_object.laststart) >= process_object.notify_timer: self._notfiy_user_about_waiting(process_name, process_object) def _timeout_process(self, process_name, process_object): - msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned" + msg = ("timeout: dependee process {} in {} did not reach RUNNING within" + " {} seconds, checking now if dependees {} can be spawned" .format(process_name, getProcessStateDescription(process_object.state), process_object.config.spawn_timeout, [process for process in self.process_spawn_dict.keys()])) process_object.config.options.logger.warn(msg) - process_object.record_spawnerr( - 'timeout: Process {} did not reach RUNNING state within {} seconds' - .format(process_name, - process_object.config.spawn_timeout)) - process_object.change_state(ProcessStates.FATAL) + process_object.stop() + # keep track of the process that timed out - will be set to FATAL in + # the next iteration + self.abort_queing = process_object + keys_to_remove = [] for process_name, process_object in self.process_spawn_dict.items(): - process_object.record_spawnerr( - 'Dependee process did not start - set FATAL state for {}' - .format(process_name)) - process_object.change_state(ProcessStates.FATAL) - self.process_spawn_dict = dict() - self.process_started_dict = dict() + if self.g.connected(process_name, self.abort_queing.config.name): + keys_to_remove.append(process_name) + msg = ("{} will not be spawned, because {} did not " + "successfully start".format(process_name, self.abort_queing.config.name)) + process_object.config.options.logger.warn(msg) + for key in keys_to_remove: + del self.process_spawn_dict[key] + keys_to_remove = [] + for process_name, process_object in self.process_started_dict.items(): + if self.g.connected(process_name, self.abort_queing.config.name): + keys_to_remove.append(process_name) + msg = ("stopping {}, because {} did not successfully " + "start".format(process_name, self.abort_queing.config.name)) + process_object.config.options.logger.warn(msg) + process_object.stop() + for key in keys_to_remove: + del self.process_started_dict[key] def _notfiy_user_about_waiting(self, process_name, process_object): process_object.notify_timer += 5