Skip to content

Commit

Permalink
refactor logging
Browse files Browse the repository at this point in the history
  • Loading branch information
alexboden committed Nov 21, 2024
1 parent 757ec29 commit 8d32e22
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 46 deletions.
16 changes: 16 additions & 0 deletions KubernetesLogFormatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
import os

POD_NAME = os.getenv('POD_NAME', 'unknown-pod')
NAMESPACE = os.getenv('NAMESPACE', 'default')

class KubernetesLogFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": record.getMessage(),
"pod_name": POD_NAME,
"namespace": NAMESPACE,
}
return json.dumps(log_record)
94 changes: 48 additions & 46 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,28 @@
from runner_size_config import get_runner_resources
from config import GITHUB_API_BASE_URL, GITHUB_REPO_URL, ALLOCATE_RUNNER_SCRIPT_PATH
from RunningJob import RunningJob
from KubernetesLogFormatter import KubernetesLogFormatter

# Configure logging
log_file = '/tmp/slurm-action-runners.log'
log_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) # 10 MB per file, 5 backup files
log_handler.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler.setFormatter(log_formatter)
log_formatter = KubernetesFormatter()

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
# StreamHandler for stdout (INFO and below)
stdout_handler = logger.StreamHandler(sys.stdout)
stdout_handler.setLevel(logger.INFO)
stdout_handler.setFormatter(log_formatter)

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
# StreamHandler for stderr (WARNING and above)
stderr_handler = logger.StreamHandler(sys.stderr)
stderr_handler.setLevel(logger.WARNING)
stderr_handler.setFormatter(log_formatter)

# Initialize logger
logger = logger.getLogger()
logger.setLevel(logger.DEBUG)

# Add handlers to logger
logger.addHandler(stdout_handler)
logger.addHandler(stderr_handler)

# Load GitHub access token from .env file
# Only secret required is the GitHub access token
Expand All @@ -40,6 +46,8 @@
# Global tracking for allocated runners
allocated_jobs = {} # Maps job_id -> RunningJob

POLLED_WITHOUT_ALLOCATING = False

def get_gh_api(url, token, etag=None):
"""
Sends a GET request to the GitHub API with the given URL and access token.
Expand All @@ -52,7 +60,7 @@ def get_gh_api(url, token, etag=None):

response = requests.get(url, headers=headers)
if int(response.headers['X-RateLimit-Remaining']) % 100 == 0:
logging.info(f"Rate Limit Remaining: {response.headers['X-RateLimit-Remaining']}")
logger.info(f"Rate Limit Remaining: {response.headers['X-RateLimit-Remaining']}")
if response.status_code == 304:
return None, etag
elif response.status_code == 200:
Expand All @@ -61,11 +69,11 @@ def get_gh_api(url, token, etag=None):
elif response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers and response.headers['X-RateLimit-Remaining'] == '0':
reset_time = int(response.headers['X-RateLimit-Reset'])
sleep_time = reset_time - time.time() + 1 # Adding 1 second to ensure the reset has occurred
logging.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds.")
logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds.")
time.sleep(sleep_time)
return get_gh_api(url, token, etag) # Retry the request
else:
logging.error(f"Unexpected status code: {response.status_code}")
logger.error(f"Unexpected status code: {response.status_code}")
response.raise_for_status() # Handle HTTP errors

def poll_github_actions_and_allocate_runners(url, token, sleep_time=5):
Expand All @@ -74,7 +82,9 @@ def poll_github_actions_and_allocate_runners(url, token, sleep_time=5):
data, _ = get_gh_api(url, token, etag)
if data:
allocate_runners_for_jobs(data, token)
logging.info("Polling for queued workflows...")
if not POLLED_WITHOUT_ALLOCATING:
logger.info("Polling for queued workflows...")
POLLED_WITHOUT_ALLOCATING = True
time.sleep(sleep_time) # issues occur if you request to frequently


Expand All @@ -94,51 +104,43 @@ def get_all_jobs(workflow_id, token):
if len(job_data['jobs']) < per_page:
break # No more pages
page += 1
logging.info(f"Getting jobs for workflow {workflow_id} page {page}")
logger.info(f"Getting jobs for workflow {workflow_id} page {page}")
else:
break # No more data

return all_jobs

def allocate_runners_for_jobs(workflow_data, token):
if "workflow_runs" not in workflow_data:
logging.error("No workflows found.")
logger.error("No workflows found.")
return

number_of_queued_workflows = workflow_data["total_count"]
# logging.info(f"Total number of queued workflows: {number_of_queued_workflows}")
number_of_queued_workflows = len(workflow_data["workflow_runs"])
# logging.info(f"Number of workflow runs: {number_of_queued_workflows}")

for i in range(number_of_queued_workflows):
workflow_id = workflow_data["workflow_runs"][i]["id"]
# logging.info(f"Evaluating workflow ID: {workflow_id}")
# If statement to check if the workflow is on the testing branch, remove this for production
logger.info(f"Evaluating workflow ID: {workflow_id}")
branch = workflow_data["workflow_runs"][i]["head_branch"]
if branch != "alexboden/test-slurm-gha-runner" and branch != "alexboden/test-ci-apptainer":
# logging.info(f"Skipping workflow {workflow_id} because it is not on the correct branch, branch: {branch}.")
continue
# else:
# logging.info(f"Processing workflow {workflow_id} because it is on the correct branch, branch: {branch}.")
job_data = get_all_jobs(workflow_id, token)
# logging.info(f"There are {len(job_data)} jobs in the workflow.")
for job in job_data:
if job["status"] == "queued":
queued_job_id = job["id"]
allocate_actions_runner(queued_job_id, token)
# logging.info(f"Job {job['name']} {job['id']} is queued.")
# else:
# logging.info(f"Job {job['name']} {job['id']} is not queued.")


def allocate_actions_runner(job_id, token):
"""
Allocates a runner for the given job ID by sending a POST request to the GitHub API to get a registration token.
Proceeds to submit a SLURM job to allocate the runner with the corresponding resources.
"""
"""
if job_id in allocated_jobs:
logging.info(f"Runner already allocated for job {job_id}")
logger.info(f"Runner already allocated for job {job_id}")
return
logging.info(f"Allocating runner for job {job_id}")
logger.info(f"Allocating runner for job {job_id}")
POLLED_WITHOUT_ALLOCATING = False
allocated_jobs[job_id] = None # mark as allocated to prevent double allocation

# get the runner registration token
Expand All @@ -157,14 +159,14 @@ def allocate_actions_runner(job_id, token):

data, _ = get_gh_api(f'{GITHUB_API_BASE_URL}/actions/jobs/{job_id}', token)
labels = data["labels"] # should only be one label in prod
logging.info(f"Job labels: {labels}")
logger.info(f"Job labels: {labels}")

run_id = data['run_id']

allocated_jobs[job_id] = RunningJob(job_id, None, data['workflow_name'], data['name'], labels)

if labels[0] != "alexboden-gh-arc-runners-small" and labels[0] != "alexboden-gh-arc-runners-medium" and labels[0] != "alexboden-gh-arc-runners-large" and labels[0] != "alexboden-gh-arc-runners-xlarge":
logging.info(f"Skipping job because it is not for the correct runner. labels: {labels}, labels[0]: {labels[0]}")
logger.info(f"Skipping job because it is not for the correct runner. labels: {labels}, labels[0]: {labels[0]}")
del allocated_jobs[job_id]
return

Expand All @@ -176,7 +178,7 @@ def allocate_actions_runner(job_id, token):
elif "alexboden-gh-arc-runners-xlarge" in labels:
runner_size_label = "gh-arc-runners-xlarge"

logging.info(f"Using runner size label: {runner_size_label}")
logger.info(f"Using runner size label: {runner_size_label}")
runner_resources = get_runner_resources(runner_size_label)

# sbatch resource allocation command
Expand All @@ -196,23 +198,23 @@ def allocate_actions_runner(job_id, token):
str(run_id)
]

logging.info(f"Running command: {' '.join(command)}")
logger.info(f"Running command: {' '.join(command)}")

result = subprocess.run(command, capture_output=True, text=True)
output = result.stdout.strip()
error_output = result.stderr.strip()
logging.info(f"Command stdout: {output}")
logging.error(f"Command stderr: {error_output}")
logger.info(f"Command stdout: {output}")
logger.error(f"Command stderr: {error_output}")
try:
slurm_job_id = int(output.split()[-1]) # output is of the form "Submitted batch job 3828"
allocated_jobs[job_id] = RunningJob(job_id, slurm_job_id, data['workflow_name'], data['name'], labels)
logging.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.")
logger.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.")
if result.returncode != 0:
del allocated_jobs[job_id]
logging.error(f"Failed to allocate runner for job {job_id}.")
logger.error(f"Failed to allocate runner for job {job_id}.")
allocate_actions_runner(job_id, token)
except (IndexError, ValueError) as e:
logging.error(f"Failed to parse SLURM job ID from command output: {output}. Error: {e}")
logger.error(f"Failed to parse SLURM job ID from command output: {output}. Error: {e}")
del allocated_jobs[job_id]
# retry the job allocation
allocate_actions_runner(job_id, token)
Expand All @@ -239,8 +241,8 @@ def check_slurm_status():
sacct_output = sacct_result.stdout.strip()

if sacct_result.returncode != 0:
logging.error(f"sacct command failed with return code {sacct_result.returncode}")
logging.error(f"Error output: {sacct_result.stderr}")
logger.error(f"sacct command failed with return code {sacct_result.returncode}")
logger.error(f"Error output: {sacct_result.stderr}")
continue

for line in sacct_output.split('\n'):
Expand All @@ -263,18 +265,18 @@ def check_slurm_status():
start_time = datetime.strptime(start_time_str, '%Y-%m-%dT%H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%dT%H:%M:%S')
except Exception as e:
logging.error(f"Error parsing start/end time for job {job_component}: {e}")
logger.error(f"Error parsing start/end time for job {job_component}: {e}")
start_time = None
end_time = None
duration = "[Unknown Duration]"

if start_time and end_time:
duration = end_time - start_time
logging.info(f"Slurm job {job_component} {status} in {duration}. Running Job Info: {str(runningjob)}")
logger.info(f"Slurm job {job_component} {status} in {duration}. Running Job Info: {str(runningjob)}")
to_remove.append(job_id)

except Exception as e:
logging.error(f"Error querying SLURM job details for job ID {runningjob.slurm_job_id}: {e}")
logger.error(f"Error querying SLURM job details for job ID {runningjob.slurm_job_id}: {e}")

for job_id in to_remove:
del allocated_jobs[job_id]
Expand Down

0 comments on commit 8d32e22

Please sign in to comment.