Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent Spawner daemon #125

Merged
merged 3 commits into from
Nov 6, 2023
Merged

Agent Spawner daemon #125

merged 3 commits into from
Nov 6, 2023

Conversation

praiskup
Copy link
Owner

@praiskup praiskup commented Sep 6, 2023

Relates to #123

@praiskup praiskup marked this pull request as draft September 6, 2023 11:13
agentspawner/daemon.py Fixed Show fixed Hide fixed
@praiskup praiskup mentioned this pull request Sep 6, 2023
agentspawner/daemon.py Fixed Show fixed Hide fixed
continue

self.log.debug("Closing ticket %s", ticket.id)
ticket.close()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siteshwar Note that we close the ticket once the call_release() hook finishes with exit status 0

ticket = self.conn.newTicket(self.tags)
self.log.debug("Taking ticket id %s", ticket.id)
self.tickets.append(ticket.id)
data = ticket.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause blocking in linear mode. Can we open a ticket and wait in a separate thread (or process)?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is truth, though resalloc prepares the resources in advance with "preallocation" mechanism, so doing this in parallel isn't worth the initial hassle IMO. Simply put, the wait() will end up immediately (or at least the subsequent wait()). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

self.log.debug("Taking ticket id %s", ticket.id)
self.tickets.append(ticket.id)
data = ticket.wait()
self.call_take(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also need a post_call_take() callback to start the osh worker on the machine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or probably I can merge my changes in one and just use call_take() hook.

@siteshwar
Copy link
Contributor

The process of starting a new osh worker is as follows:

  • Request for a new machine.
  • Wait for it to appear and get its hostname.
  • Generate an osh worker configuration based on the hostname.
  • Transfer the configuration to the new osh worker.
  • Start osh worker daemon.

The last 3 steps should be done after the hostname is known and probably belong to a separate post_call_take() hook.

@praiskup
Copy link
Owner Author

The last 3 steps should be done after the hostname is known and probably belong to a separate post_call_take() hook.

We can have two hooks, but if they can be logically merged - the API would be simpler.

sleep = 30

def __init__(self, resalloc_connection, logger):
self.tags = ["A"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be customizable through configurations
.

if todo > 0:
self.start(todo)
elif todo < 0:
self.try_to_stop(-todo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a periodic check here to find virtual machines that can not be used anymore and should be deleted.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this one? The try_to_stop hook actually does this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_to_stop hook tries to close tickets based on a number, so if the number do not change, the tickets would not close. We would like to have throwaway workers in openscanhub, that is, a worker should be deleted once a task assigned to it has been completed. This would require querying the database and it should be done through a separate hook.

On a sidenote, we should regularly perform health checks on the workers and if a worker ends up in an unusable state, its related ticket should be closed.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Discussed on Slack 1:1, the ticket needs to be closed for finished agents .... because agents can not close the ticket themselves. We could rely on "FAILED" ticket state (if the VM/resource goes down by itself) but that's not ideal for OSH.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an agent goes down by itself and the ticket is in failed state, there should be a separate hook for it to remove the agent from osh (kobo) database.

def __init__(self, resalloc_connection, logger):
self.tags = ["A"]
# TODO: use a persistent storage so we can restart the process
self.tickets = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tickets being processed by the daemon should be stored in the database to survive service restarts.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we dump the list of tickets into a persistent "text" file for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be fine for the beginning, but I plan to use containers for deployments, so this would have to be fixed using a database before that.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, OK -> for the beginning we can have a persistent volume or a bindmounted file.

def call_converge_to(self):
""" Execute the configured hook script """
while True:
result = subprocess.run(["./hook-converge-to"], capture_output=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to replace capture_output=True with stdout=subprocess.PIPE to get this script working on a RHEL 8 machine.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Python 3.7+, thank you for the hint.

"""
Call hook that prepares the resource
"""
return not subprocess.run(["./hook-take", f"{data}"], check=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hooks should be customizable through a configuration file.

"""
while True:
start = time.time()
todo = self.call_converge_to() - len(self.tickets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a check here so that number of tickets opened should not be above maximum number of machines allowed in a pool.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of resources may span across several pools so this is a hard task to do ... can we implement this later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the converge-to hook returns a number greater than number of running machines, the recycle hook would never be called and all the running machines would be in an unusable state.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self.recycle() is called everytime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code synchronously waits for tickets, so if the converge-to hook returns a number greater than running nodes, recyle hook would never be called.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed -> it is better to perform the preparation async. Noted. I think I'm going to start using python-copr-common logic for dispatcher/background worker which depends on Redis, is that OK? While depending on Redis, I think we can save the list of opened tickets there, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kdudka The hub and resalloc spawner daemon should be running on the same node. Would it cause any problems to have a dependency on redis?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redis can be hosted in a different container. We just need the runtime dependency on redis client (python3-redis).

self._drop_ticket(ticket)
stopped += 1

def recyle(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo here and it should be renamed to recycle.

Ticket,
)

RESALLOC_SERVER = "http://localhost:49100"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be customizable through configurations.


for ticket_id in list(self.tickets):
ticket = Ticket(ticket_id, connection=self.conn)
data = ticket.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would this call return?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we pass ticket.output to the throwaway hook? I would expect to get ip address of the machine from there.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data that is printed on stdout by cmd_alloc in resalloc. Whatever you dump there you get here.


ticket = Ticket(ticket_id, connection=self.conn)
data = ticket.collect()
if not self.call_release(data):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass ticket.output here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK -> if it is ticket.output then yes.

if not self.call_throwaway(data):
continue

self._drop_ticket(ticket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we call the release hook here? Shall the throwaway hook only check if a node can be thrown away or delete it too? That may cause duplication of code.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to call release hook on two places. If something needs to be done on both places, then you can appropriately tweak both the configurable release and throwaway hooks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between release and throwaway hooks? They seem to be doing the same thing under different names.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be actually named try_release vs. trhowaway, or something like that.

The first one removes even workers who have not yet started any tasks. While throwaway removes workers that already finished at least one task.

Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vcs-diff-lint found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.

resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/dispatcher.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/helpers.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/worker.py Fixed Show fixed Hide fixed
resalloc_agent_spawner/worker.py Fixed Show fixed Hide fixed
@praiskup praiskup force-pushed the praiskup-agent-spawner branch 3 times, most recently from b7dc13d to 172001e Compare October 24, 2023 00:38
There doesn't seem to be ideal way to handle exceptions in XMLRPC.
The previous code just raised Fault() error which we mishandled client
side with the `survive_server_restart` feature (non-existing ticket id
led to indefinite retries).
@praiskup praiskup marked this pull request as ready for review October 27, 2023 01:10
resalloc/client.py Fixed Show fixed Hide fixed
@praiskup
Copy link
Owner Author

@siteshwar what do you think about the current variant? It seems to be working
with this configuration:

agent_groups:
  osh_workers:
    cmd_prepare: /bin/true
    cmd_terminate: echo noop
    cmd_converge_to: echo 5
    cmd_check_finished: exit $(( RANDOM % 15 ))
    cmd_try_release: exit $(( RANDOM % 5 ))
    tags:
      - kobo_worker

Of course you must be running resalloc server on a normal port, and have some
pool defind with kobo_worker tag.

@praiskup praiskup changed the title PoC: agent spawner Agent Spawner daemon Nov 6, 2023
from copr_common.log import setup_script_logger
from copr_common.redis_helpers import get_redis_connection

from resalloc.client import (

Check warning

Code scanning / vcs-diff-lint

No name 'client' in module 'resalloc' Warning

No name 'client' in module 'resalloc'
import os
import subprocess

from resalloc.helpers import load_config_file

Check warning

Code scanning / vcs-diff-lint

No name 'helpers' in module 'resalloc' Warning

No name 'helpers' in module 'resalloc'
@praiskup praiskup merged commit 871b629 into main Nov 6, 2023
2 of 3 checks passed
@praiskup praiskup deleted the praiskup-agent-spawner branch January 10, 2024 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants