Skip to content

Commit

Permalink
Add VM Scale Management Steps
Browse files Browse the repository at this point in the history
  • Loading branch information
ebattat committed Aug 9, 2024
1 parent 4e9c29f commit 7371a41
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 65 deletions.
40 changes: 34 additions & 6 deletions benchmark_runner/common/oc/oc.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,23 @@ def _get_vm_name(self, vm_name: str, namespace: str = environment_variables.envi
except Exception as err:
raise VMNameNotExist(vm_name=vm_name)

@typechecked
def _get_all_vm_names(self, namespace: str = environment_variables.environment_variables_dict['namespace']):
"""
This method returns a list of VM names in the given namespace.
:param namespace: str, the namespace to look for VMs in. Defaults to the namespace in environment_variables_dict.
:return: list of VM names or an empty list if an error occurs
"""
namespace_option = f'-n {namespace}' if namespace else ''
command = f"{self.__cli} get {namespace_option} vm -o jsonpath='{{.items[*].metadata.name}}'"
try:
vm_names = self.run(command)
return vm_names.split() if vm_names else []
except Exception:
return []


@typechecked
def vm_exists(self, vm_name: str, namespace: str = environment_variables.environment_variables_dict['namespace']):
"""
Expand Down Expand Up @@ -926,8 +943,8 @@ def wait_for_vm_status(self, vm_name: str = '', status: VMStatus = VMStatus.Stop
current_wait_time += OC.SLEEP_TIME
raise VMStateTimeout(vm_name=vm_name, state=status)

def wait_for_vm_login(self, vm_name: str = '', node_ip: str = '', vm_node_port: str = '',
timeout: int = SHORT_TIMEOUT):
def wait_for_vm_ssh(self, vm_name: str = '', node_ip: str = '', vm_node_port: str = '',
timeout: int = SHORT_TIMEOUT):
"""
This method waits for VM to be accessible via ssh login
:param vm_name:
Expand All @@ -938,14 +955,14 @@ def wait_for_vm_login(self, vm_name: str = '', node_ip: str = '', vm_node_port:
"""
current_wait_time = 0
while timeout <= 0 or current_wait_time <= timeout:
check_vm_login = f"""if [ "$(ssh -o 'BatchMode=yes' -o ConnectTimeout=1 root@{node_ip} -p {vm_node_port} 2>&1|egrep 'denied|verification failed')" ]; then echo 'True'; else echo 'False'; fi"""
result = self.run(check_vm_login)
check_vm_ssh = f"""if [ "$(ssh -o 'BatchMode=yes' -o ConnectTimeout=1 root@{node_ip} -p {vm_node_port} 2>&1|egrep 'denied|verification failed')" ]; then echo 'True'; else echo 'False'; fi"""
result = self.run(check_vm_ssh)
if result == 'True':
return True
# sleep for x seconds
time.sleep(OC.SLEEP_TIME)
current_wait_time += OC.SLEEP_TIME
raise VMStateTimeout(vm_name=vm_name, state='login')
raise VMStateTimeout(vm_name=vm_name, state='ssh')

@logger_time_stamp
def get_vm_node(self, vm_name: str, namespace: str = environment_variables.environment_variables_dict['namespace']):
Expand All @@ -956,7 +973,18 @@ def get_vm_node(self, vm_name: str, namespace: str = environment_variables.envir
:return:
"""
namespace = f'-n {namespace}' if namespace else ''
return self.run(f"{self.__cli} get vmi {vm_name} {namespace} -o jsonpath={{.metadata.labels.'kubevirt\.io/nodeName'}}")
command = f"{self.__cli} get vmi {vm_name} {namespace} -o jsonpath={{.metadata.labels.'kubevirt\\.io/nodeName'}}"

try:
result = self.run(command)
if result and "NotFound" not in result:
return result.strip()
return None
except Exception as e:
# Log the exception details if necessary
print(f"Error occurred: {e}")
return None


@typechecked
@logger_time_stamp
Expand Down
2 changes: 1 addition & 1 deletion benchmark_runner/common/oc/oc_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(self, workload):
class VMStateTimeout(OCError):
"""This exception indicates timeout for VM state """
def __init__(self, vm_name, state):
self.message = f'VM: {vm_name} does not reach to start: {state}'
self.message = f'VM: {vm_name} does not reach to state: {state}'
super(VMStateTimeout, self).__init__(self.message)


Expand Down
4 changes: 4 additions & 0 deletions benchmark_runner/main/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def __init__(self):

# windows url
self._environment_variables_dict['windows_url'] = EnvironmentVariables.get_env('WINDOWS_URL', '')
# Delete all resources before and after the run, default True
self._environment_variables_dict['delete_all'] = EnvironmentVariables.get_boolean_from_environment('DELETE_ALL', True)
# Verification only, without running or deleting any resources, default False
self._environment_variables_dict['verification_only'] = EnvironmentVariables.get_boolean_from_environment('VERIFICATION_ONLY', False)

# default parameter - change only if needed
# Parameters below related to 'run_workload()'
Expand Down
141 changes: 96 additions & 45 deletions benchmark_runner/workloads/bootstorm_vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from benchmark_runner.common.logger.logger_time_stamp import logger_time_stamp, logger
from benchmark_runner.common.elasticsearch.elasticsearch_exceptions import ElasticSearchDataNotUploaded
from benchmark_runner.workloads.workloads_exceptions import MissingVMs
from benchmark_runner.workloads.workloads_operations import WorkloadsOperations
from benchmark_runner.common.oc.oc import VMStatus

Expand Down Expand Up @@ -66,23 +67,33 @@ def _set_bootstorm_vm_start_time(self, vm_name: str = ''):
self._bootstorm_start_time[vm_name] = time.time()

@logger_time_stamp
def _get_bootstorm_vm_elapsed_time(self, vm_name: str):
def _ssh_vm(self, vm_name: str):
"""
Verify ssh into VM and return vm node in success or False if failed
@return:
"""
self._virtctl.expose_vm(vm_name=vm_name)
# wait till vm ssh login
if self._oc.get_vm_node(vm_name=vm_name):
vm_node = self._oc.get_vm_node(vm_name=vm_name)
if vm_node:
node_ip = self._oc.get_nodes_addresses()[vm_node]
vm_node_port = self._oc.get_exposed_vm_port(vm_name=vm_name)
if self._oc.wait_for_vm_ssh(vm_name=vm_name, node_ip=node_ip, vm_node_port=vm_node_port):
logger.info(f"Successfully ssh into VM: '{vm_name}' in Node: '{vm_node}' ")
return vm_node
return False

@logger_time_stamp
def _get_bootstorm_vm_elapsed_time(self, vm_name: str, vm_node: str) -> dict:
"""
This method returns boot elapse time for specified VM in milliseconds
@return: Dictionary with vm_name, node and its boot elapse time
Returns the boot elapse time for the specified VM in milliseconds.
@return: Dictionary with vm_name, node, and boot elapse time.
"""
vm_bootstorm_time = {}
self._virtctl.expose_vm(vm_name=vm_name)
# wait till vm login
vm_node = self._oc.get_vm_node(vm_name=vm_name)
node_ip = self._oc.get_nodes_addresses()[vm_node]
vm_node_port = self._oc.get_exposed_vm_port(vm_name=vm_name)
if self._oc.wait_for_vm_login(vm_name=vm_name, node_ip=node_ip, vm_node_port=vm_node_port):
vm_bootstorm_time['vm_name'] = vm_name
vm_bootstorm_time['node'] = vm_node
delta = time.time() - self._bootstorm_start_time[vm_name]
vm_bootstorm_time['bootstorm_time'] = round(delta, 3) * self.MILLISECONDS
return vm_bootstorm_time
if vm_node:
delta = round((time.time() - self._bootstorm_start_time[vm_name]) * self.MILLISECONDS, 3)
return {'vm_name': vm_name, 'node': vm_node, 'bootstorm_time': delta, 'vm_ssh': int(bool(vm_node)),}
return {}

def _create_vm_scale(self, vm_num: str):
"""
Expand All @@ -103,8 +114,9 @@ def _finalize_vm(self):
metric_results = self._prometheus_metrics_operation.run_prometheus_queries()
prometheus_result = self._prometheus_metrics_operation.parse_prometheus_metrics(data=metric_results)
# update total vm run time
total_run_time = self._get_bootstorm_vm_total_run_time()
self._data_dict.update({'total_run_time': total_run_time})
if not self._verification_only:
total_run_time = self._get_bootstorm_vm_total_run_time()
self._data_dict.update({'total_run_time': total_run_time})
self._data_dict.update(prometheus_result)
if self._es_host:
# upload several run results
Expand All @@ -123,14 +135,41 @@ def _run_vm(self):
self._set_bootstorm_vm_first_run_time()
self._set_bootstorm_vm_start_time(vm_name=self._vm_name)
self._virtctl.start_vm_sync(vm_name=self._vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name)
self.vm_node = self._ssh_vm(vm_name=self._vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name, vm_node=self.vm_node)
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url,
f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
self._finalize_vm()
self._oc.delete_vm_sync(
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),
vm_name=self._vm_name)

def _verify_vm_ssh(self):
"""
This method verifies each VM ssh login
:return:
"""
try:
vm_names = self._oc._get_all_vm_names()
if not vm_names:
raise MissingVMs
for vm_name in vm_names:
vm_node = self._ssh_vm(vm_name)
self._data_dict = {
'vm_name': vm_name,
'node': vm_node,
'vm_ssh': int(bool(vm_node)),
'run_artifacts_url': os.path.join(
self._run_artifacts_url,
f"{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz"
)
}
self._finalize_vm()
except Exception as err:
# save run artifacts logs
self.save_error_logs()
raise err

def _run_vm_scale(self, vm_num: str):
"""
This method runs VMs in parallel and wait for login to be enabled
Expand All @@ -140,7 +179,8 @@ def _run_vm_scale(self, vm_num: str):
self._set_bootstorm_vm_start_time(vm_name=f'{self._workload_name}-{self._trunc_uuid}-{vm_num}')
self._virtctl.start_vm_async(vm_name=f'{self._workload_name}-{self._trunc_uuid}-{vm_num}')
self._virtctl.wait_for_vm_status(vm_name=vm_name, status=VMStatus.Running)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=vm_name)
vm_node = self._ssh_vm(vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=vm_name, vm_node=vm_node)
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url, f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-scale-{self._time_stamp_format}.tar.gz')
self._finalize_vm()
except Exception as err:
Expand Down Expand Up @@ -203,36 +243,47 @@ def _initialize_run(self):
self._vm_name = f'{self._workload_name}-{self._trunc_uuid}'
self._kind = 'vm'
self._environment_variables_dict['kind'] = 'vm'
# create namespace
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
if not self._verification_only:
# create namespace
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))

def run_vm_workload(self):
if not self._scale:
self._run_vm()
# scale
# verification only w/o running or deleting any resource
if self._verification_only:
self._verify_vm_ssh()
else:
first_run_time_updated = False
# create run bulks
bulks = tuple(self.split_run_bulks(iterable=range(self._scale * len(self._scale_node_list)),
limit=self._threads_limit))
# create, run and delete vms
for target in (self._create_vm_scale, self._run_vm_scale, self._stop_vm_scale, self._wait_for_stop_vm_scale,
self._delete_vm_scale, self._wait_for_delete_vm_scale):
proc = []
for bulk in bulks:
for vm_num in bulk:
# save the first run vm time
if self._run_vm_scale == target and not first_run_time_updated:
self._set_bootstorm_vm_first_run_time()
first_run_time_updated = True
p = Process(target=target, args=(str(vm_num),))
p.start()
proc.append(p)
for p in proc:
p.join()
# sleep between bulks
time.sleep(self._bulk_sleep_time)
if not self._scale:
self._run_vm()
# scale
else:
first_run_time_updated = False
# Run VMs only
if not self._delete_all:
steps = (self._create_vm_scale, self._run_vm_scale)
else:
steps = (self._create_vm_scale, self._run_vm_scale, self._stop_vm_scale,
self._wait_for_stop_vm_scale,self._delete_vm_scale, self._wait_for_delete_vm_scale)

# create run bulks
bulks = tuple(self.split_run_bulks(iterable=range(self._scale * len(self._scale_node_list)),
limit=self._threads_limit))
# create, run and delete vms
for target in steps:
proc = []
for bulk in bulks:
for vm_num in bulk:
# save the first run vm time
if self._run_vm_scale == target and not first_run_time_updated:
self._set_bootstorm_vm_first_run_time()
first_run_time_updated = True
p = Process(target=target, args=(str(vm_num),))
p.start()
proc.append(p)
for p in proc:
p.join()
# sleep between bulks
time.sleep(self._bulk_sleep_time)
proc = []

@logger_time_stamp
def run(self):
Expand Down
19 changes: 11 additions & 8 deletions benchmark_runner/workloads/windows_vm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import sys
import time
from multiprocessing import Process

Expand All @@ -24,19 +25,21 @@ def run(self):
:return:
"""
try:
self._initialize_run()
if self._run_type == 'test_ci':
self._es_index = 'windows-test-ci-results'
else:
self._es_index = 'windows-results'
# create windows dv
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
self._oc.wait_for_dv_status(status='Succeeded')
self._initialize_run()
if not self._verification_only:
# create windows dv
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
self._oc.wait_for_dv_status(status='Succeeded')
self.run_vm_workload()
# delete windows dv
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
# delete namespace
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
if self._delete_all:
# delete windows dv
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
# delete namespace
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
except ElasticSearchDataNotUploaded as err:
self._oc.delete_vm_sync(
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),
Expand Down
6 changes: 4 additions & 2 deletions benchmark_runner/workloads/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ def run(self):
workload_module = importlib.import_module(f'benchmark_runner.workloads.{workload}')

try:
self.initialize_workload()
if not self._verification_only:
self.initialize_workload()
success = True
# extract workload module and class
for cls in inspect.getmembers(workload_module, inspect.isclass):
if workload.replace('_', '').lower() == cls[0].lower():
if cls[1]().run() == False:
success = False
self.finalize_workload()
if not self._verification_only:
self.finalize_workload()
return success

except Exception as err:
Expand Down
9 changes: 9 additions & 0 deletions benchmark_runner/workloads/workloads_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,12 @@ class MissingRedis(BenchmarkRunnerError):
def __init__(self):
self.message = "Missing redis"
super(MissingRedis, self).__init__(self.message)


class MissingVMs(BenchmarkRunnerError):
"""
This class raises an error for missing VMs
"""
def __init__(self):
self.message = "Missing running VMs"
super(MissingVMs, self).__init__(self.message)
10 changes: 7 additions & 3 deletions benchmark_runner/workloads/workloads_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def __init__(self):
self._prometheus_snap_interval = self._environment_variables_dict.get('prometheus_snap_interval', '')
self._prometheus_metrics_operation = PrometheusMetricsOperation()
self._windows_url = self._environment_variables_dict.get('windows_url', '')
self._delete_all = self._environment_variables_dict.get('delete_all', '')
self._verification_only = self._environment_variables_dict.get('verification_only', '')
if self._windows_url:
file_name = os.path.basename(self._windows_url)
self._windows_os = os.path.splitext(file_name)[0]
Expand Down Expand Up @@ -455,8 +457,9 @@ def initialize_workload(self):
# Verify that Kata operator in installed for kata workloads
if '_kata' in self._workload and not self._oc.is_kata_installed():
raise KataNotInstalled(workload=self._workload)
self.delete_all()
self.clear_nodes_cache()
if self._delete_all:
self.delete_all()
self.clear_nodes_cache()
if self._odf_pvc:
self.odf_workload_verification()
self._template.generate_yamls(scale=str(self._scale), scale_nodes=self._scale_node_list, redis=self._redis, thread_limit=self._threads_limit)
Expand All @@ -475,4 +478,5 @@ def finalize_workload(self):
self.upload_run_artifacts_to_s3()
if not self._save_artifacts_local:
self.delete_local_artifacts()
self.delete_all()
if self._delete_all:
self.delete_all()

0 comments on commit 7371a41

Please sign in to comment.