+## License
+# Copyright (c) 2020 Software AG, Darmstadt, Germany and/or its licensors
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software distributed under the
+# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+# either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+from pysys.constants import *
+import json
+import urllib
+import sys, os, time, pathlib, glob
+import csv
+import math, statistics
+from pysys.utils.linecount import linecount
+from apamax.eplapplications.basetest import ApamaC8YBaseTest
+from apamax.eplapplications.eplapps import EPLApps
+
+# constants for performance metrics strings.
+PERF_TIMESTAMP = 'timestamp'
+PERF_TOTAL_MEMORY_USAGE = 'total_memory_usage'
+PERF_MEMORY_CORR = 'memory_usage_corr'
+PERF_MEMORY_APCTRL = 'memory_usage_apctrl'
+PERF_CORR_IQ_SIZE = 'correlator_iq_size'
+PERF_CORR_OQ_SIZE = 'correlator_oq_size'
+PERF_CORR_SPAW_RATE = 'correlator_swap_read_write'
+PERF_CORR_NUM_OUTPUT_SENT = 'correlator_num_output_sent'
+PERF_CORR_NUM_INPUT_RECEIVED = 'correlator_num_input_received'
+PERF_CEP_PROXY_REQ_STARTED = 'cep_proxy_requests_started'
+PERF_CEP_PROXY_REQ_COMPLETED = 'cep_proxy_requests_completed'
+PERF_CEP_PROXY_REQ_FAILED = 'cep_proxy_requests_failed'
+PERF_CPU_USAGE_MILLI = 'cpu_usage_milli'
+
+# Description of metrics
+METRICS_DESCRIPTION = {
+ PERF_TOTAL_MEMORY_USAGE: 'Total Memory Usage (MB)',
+ PERF_MEMORY_CORR: 'Correlator Memory Usage (MB)',
+ PERF_MEMORY_APCTRL: 'Apama-ctrl Memory Usage (MB)',
+ PERF_CORR_IQ_SIZE: 'Correlator Input Queue Size',
+ PERF_CORR_OQ_SIZE: 'Correlator Output Queue Size',
+ PERF_CORR_SPAW_RATE: 'Correlator Swapping Rate',
+ PERF_CORR_NUM_INPUT_RECEIVED: 'Number of Inputs Received',
+ PERF_CORR_NUM_OUTPUT_SENT: 'Number of Outputs Sent',
+ PERF_CPU_USAGE_MILLI: 'CPU Usage (millicpu)',
+ PERF_CEP_PROXY_REQ_STARTED: 'CEP Requests Started',
+ PERF_CEP_PROXY_REQ_COMPLETED: 'CEP Requests Completed',
+ PERF_CEP_PROXY_REQ_FAILED: 'CEP Requests Failed',
+}
+
+# constants for output files
+OUTFILE_PERF_RAW_DATA = 'perf_raw_data'
+OUTFILE_PERF_CPU_USAGE = 'perf_cpuusage'
+OUTFILE_PERF_STATS = 'perf_statistics'
+OUTFILE_PERF_COUNTERS = 'perf_counters'
+OUTFILE_ENV_DETAILS = 'env_details'
+
+[docs]class ObjectCreator:
+
"""
+
Base class for object creator implementation.
+
"""
+
[docs] def createObject(self, device, time):
+
"""
+
Creates an object to publish.
+
+
:param str device: The ID of the device to create an object for.
+
:param str time: The source time to use for the object.
+
:return: A new object instance to publish.
+
+
For example::
+
+
# Create and return a measurement object
+
return {
+
'time': time,
+
"type": 'my_measurement',
+
"source": { "id": device },
+
'my_fragment': {
+
'my_series': {
+
"value": random.uniform(0, 100)
+
}
+
}
+
}
+
"""
+
raise Exception('Not Implemeted')
+
+[docs]class EPLAppsPerfTest(ApamaC8YBaseTest):
+
"""
+
Base class for EPL applications performance tests.
+
+
Requires the following to be set on the project in the pysysproject.xml file (typically from the environment):
+
+
- EPL_TESTING_SDK
+
+
:ivar eplapps: The object for deploying and un-deploying EPL apps.
+
"""
+
+
[docs] def setup(self):
+
super(EPLAppsPerfTest, self).setup()
+
self.addCleanupFunction(lambda: self._shutdown())
+
self.eplapps = EPLApps(self.platform.getC8YConnection())
+
+
self.perfMonitorThread = None # Current performance monitoring thread
+
self.perfMonitorCount = 0 # Number of time performance monitoring is started
+
self.simulators = [] # All simulators
+
+
[docs] def prepareTenant(self, restartMicroservice=False):
+
"""
+
Prepares the tenant for a performance test by deleting all devices created by previous tests, deleting all EPL test applications, and clearing all active alarms.
+
+
This must be called by the test before the application is deployed.
+
+
:param bool restartMicroservice: Restart Apama-ctrl microservice.
+
"""
+
self.log.info('Preparing tenant to run performance test')
+
+
# delete existing EPL test apps
+
self._deleteTestEPLApps()
+
+
# Clear all active alarms
+
self._clearActiveAlarms()
+
+
# Delete devices that were created by tests
+
self._deleteTestDevices()
+
+
# stop monitoring thread
+
if self.perfMonitorThread:
+
self.perfMonitorThread.stop()
+
self.perfMonitorThread.join()
+
+
# stop any running simulators
+
for s in self.simulators:
+
s.stop()
+
self.simulators = []
+
+
if restartMicroservice:
+
self._restartApamaMicroserviceImpl()
+
+
def _shutdown(self):
+
"""
+
Performs common cleanup during test shutdown, like stopping performance monitoring thread, deactivating EPL test apps, and generating final HTML report.
+
"""
+
if self.perfMonitorThread:
+
self.perfMonitorThread.stop()
+
self.perfMonitorThread.join()
+
self._deactivateTestEPLApps()
+
self._generateFinalHTMLReport()
+
+
[docs] def restartApamaMicroservice(self):
+
"""
+
Restarts Apama-ctrl microservice and waits for it to come back up.
+
"""
+
self._restartApamaMicroserviceImpl()
+
+
def _restartApamaMicroserviceImpl(self):
+
""" Restarts Apama-ctrl microservice and wait for it to come back up. """
+
+
self.log.info('Restarting Apama-ctrl microservice')
+
count1 = linecount(self.platform.getApamaLogFile(), 'Microservice restart Microservice .* is being restarted')
+
count2 = linecount(self.platform.getApamaLogFile(), 'httpServer-instance.*Started receiving messages')
+
try:
+
self.platform.getC8YConnection().do_request_json('PUT', '/service/cep/restart', {})
+
self.log.info('Restart requested')
+
except (urllib.error.HTTPError, urllib.error.URLError) as ex:
+
statuscode = int(ex.code)
+
if statuscode // 10 == 50:
+
self.log.info('Restart requested')
+
else:
+
raise Exception(f'Failed to restart Apama-ctrl: {ex}')
+
except Exception as ex:
+
raise Exception(f'Failed to restart Apama-ctrl: {ex}')
+
self.waitForGrep('platform.log', expr='Microservice restart Microservice .* is being restarted', condition=f'>={count1+1}')
+
self.waitForGrep('platform.log', expr='httpServer-instance.*Started receiving messages', condition=f'>={count2+1}', timeout=TIMEOUTS['WaitForProcess'])
+
self.log.info('Apama-ctrl microservice is successfully restarted')
+
+
def _deactivateTestEPLApps(self):
+
"""
+
Deactivates all EPL test apps.
+
"""
+
eplapps = self.eplapps.getEPLApps(False) or []
+
for app in eplapps:
+
name = app["name"]
+
if name.startswith(self.EPL_APP_PREFIX):
+
try:
+
self.eplapps.update(name, state='inactive')
+
except Exception as e:
+
self.log.info(f"Failed to deactivate app {name}: {e}")
+
+
+
[docs] def startMeasurementSimulator(self, devices, perDeviceRate, creatorFile, creatorClassName, creatorParams, duration=None, processingMode='CEP'):
+
"""
+
Starts a measurement simulator process to publish simulated measurements to Cumulocity IoT.
+
+
The simulator uses an instance of the provided measurement creator class to create new measurements to send,
+
allowing the test to publish measurements of different types and sizes.
+
The simulator looks up the specified class in the specified Python file and creates a new instance of the class
+
using the provided parameters. The measurement creator class must extend the :class:`apamax.eplapplications.perf.basetest.ObjectCreator` class.
+
+
:param list[str] devices: List of device IDs to generate measurements for.
+
:param float perDeviceRate: The rate of measurements to publish per device.
+
:param str creatorFile: The path to the Python file containing the measurement creator class.
+
:param str creatorClassName: The name of the measurement creator class that extends the :class:`apamax.eplapplications.perf.basetest.ObjectCreator` class.
+
:param list creatorParams: The list of parameters to pass to the constructor of the measurement creator class to create a new instance.
+
:param str processingMode: Cumulocity IoT processing mode. Possible values are CEP, PERSISTENT, TRANSIENT, and QUIESCENT.
+
:param duration: The duration (in seconds) to run the simulator for. If no duration is specified, then the simulator runs until either stopped or the end of the test.
+
:type duration: float, optional
+
:return: The process handle of the simulator process.
+
:rtype: pysys.process.Process
+
+
For example::
+
+
# In a 'creator.py' file in test input directory.
+
class MyMeasurementCreator(ObjectCreator):
+
def __init__(lowerBound, upperBound):
+
self.lowerBound = lowerBound
+
self.upperBound = upperBound
+
def createObject(self, device, time):
+
return {
+
'time': time,
+
"type": 'my_measurement',
+
"source": { "id": device },
+
'my_fragment': {
+
'my_series': {
+
"value": random.uniform(self.lowerBound, self.upperBound)
+
}
+
}
+
}
+
+
...
+
+
# In the test
+
self.startMeasurementSimulator(
+
['12345'], # device IDs
+
1, # rate of measurements to publish
+
f'{self.input}/creator.py', # Python file path
+
'MyMeasurementCreator', # class name
+
[10, 50], # constructor parameters for MyMeasurementCreator class
+
)
+
"""
+
return self._startPublisher(devices, perDeviceRate, '/measurement/measurements', creatorFile, creatorClassName, creatorParams, duration, processingMode)
+
+
def _startPublisher(self, devices, perDeviceRate, resourceUrl, creatorFile, creatorClassName, creatorParams, duration=None, processingMode='CEP'):
+
"""
+
Starts a publisher process to publish simulated data to Cumulocity IoT using provided object creator class.
+
+
:param list[str] devices: List of device IDs.
+
:param float perDeviceRate: The rate of objects to publish per device.
+
:param str resourceUrl: The resource url, for example /measurement/measurements.
+
:param str creatorFile: The path to the Python file containing object creator class.
+
:param str creatorClassName: The name of the object creator class.
+
:param list creatorParams: The list of parameters to pass to constructor of the object creator class.
+
:param str processingMode: Cumulocity IoT processing mode. Possible values are CEP, PERSISTENT, TRANSIENT and QUIESCENT.
+
:param duration: The duration (in seconds) to run simulators for. If no duration specified then it runs until either stopped or end of the test.
+
:type duration: float, optional
+
:return: The publisher object which can be stopped by calling stop() method on it.
+
:rtype: pysys.process.Process
+
"""
+
object_creator_info = {
+
'className': creatorClassName,
+
'constructorParams': creatorParams,
+
'file': creatorFile
+
}
+
env = self.getDefaultEnvirons(command=sys.executable)
+
test_framework_root=f'{self.project.EPL_TESTING_SDK}/testframework'
+
env['PYTHONPATH'] = f'{test_framework_root}{os.pathsep}{env.get("PYTHONPATH", "")}'
+
env['PYTHONDONTWRITEBYTECODE'] = 'true'
+
+
(url, tanant_id, username, password) = self.platform.getC8yConnectionDetails()
+
script_path = str(pathlib.Path(__file__).parent.joinpath('publisher.py'))
+
arguments = [script_path,
+
'--base_url', url,
+
'--username', username,
+
'--password', password,
+
'--devices', json.dumps(devices),
+
'--per_device_rate', str(perDeviceRate),
+
'--resource_url', resourceUrl,
+
'--processing_mode', processingMode,
+
'--object_creator_info', json.dumps(object_creator_info),
+
]
+
+
if duration is not None:
+
arguments.extend(['--duration', str(duration)])
+
+
self.mkdir(f'{self.output}/simulators')
+
stdouterr=self.allocateUniqueStdOutErr('simulators/measurementpublisher')
+
p = self.startPython(arguments, stdouterr=stdouterr, disableCoverage=True, environs=env, background=True)
+
self.simulators.append(p)
+
self.waitForGrep(stdouterr[0], expr='Started publishing Cumulocity', errorExpr=['ERROR ', 'DataPublisher failed'])
+
return p
+
+
def _perfMonitorSuffix(self, noSuffixForFirst=True):
+
"""
+
Gets suffix to add to generated files.
+
+
:param noSuffixForFirst: Do not generate suffix if first perf monitoring thread is running, defaults to True
+
:type noSuffixForFirst: bool, optional
+
:return: The suffix string.
+
:rtype: str
+
"""
+
if self.perfMonitorCount > 1 or not noSuffixForFirst:
+
return '.' + str(self.perfMonitorCount - 1).rjust(2, '0')
+
else:
+
return ''
+
+
def _getEnvironmentDetails(self):
+
"""
+
Gets environment details in which test is running.
+
+
Used for HTML report.
+
"""
+
paq_version = self.platform.getC8YConnection().do_get('/service/cep/diagnostics/componentVersion').get('componentVersion', '<unknown>')
+
apamaCtrlStatus = self.platform.getC8YConnection().do_get('/service/cep/diagnostics/apamaCtrlStatus')
+
microservice_name = apamaCtrlStatus.get('microservice_name', '<unknown>')
+
uptime = apamaCtrlStatus.get('uptime_secs', '<unknown>')
+
c8y_url = self.platform.getC8YConnection().base_url
+
c8y_version = self.platform.getC8YConnection().do_get('/tenant/system/options/system/version').get('value', '<unknown>')
+
pam_version = self.platform.getC8YConnection().do_get('/service/cep/diagnostics/info', headers={'Accept':'application/json'}).get('productVersion', '<unknow>')
+
app_manifest = self.platform.getC8YConnection().do_get(f'/application/applicationsByName/{microservice_name}')
+
microservice_resources = app_manifest.get('applications', [{}])[0]['manifest']['resources']
+
cpu_limit = microservice_resources['cpu']
+
if cpu_limit == '1':
+
cpu_limit += ' core'
+
else:
+
cpu_limit += ' cores'
+
+
return {
+
'Cumulocity IoT Tenant': c8y_url,
+
'Cumulocity IoT Version': c8y_version,
+
'Microservice name': microservice_name,
+
'Microservice CPU Limit': cpu_limit,
+
'Microservice Memory Limit': microservice_resources['memory'],
+
'Apama Version': f'PAM {pam_version}, PAQ {paq_version}',
+
'Uptime (secs)': uptime
+
}
+
+
+
+
def _monitorPerformance(self, stopping, log, pollingInterval):
+
"""
+
Implements performance gathering thread.
+
+
:param stopping: To check if thread should be stopped.
+
:param log: The logger.
+
:param pollingInterval: The polling interval to get performance data.
+
"""
+
log.info('Started gathering performance metrics')
+
cpu_monitoring_thread = self.startBackgroundThread("cpu_monitoring_thread", self._monitor_cpu_usage_impl)
+
suffix = self._perfMonitorSuffix()
+
def num(value):
+
try:
+
f = float(value)
+
return f if not math.isnan(f) else 0
+
except Exception:
+
return 0
+
try:
+
fieldnames = [PERF_TIMESTAMP, PERF_TOTAL_MEMORY_USAGE, PERF_MEMORY_CORR, PERF_MEMORY_APCTRL,
+
PERF_CORR_IQ_SIZE, PERF_CORR_OQ_SIZE, PERF_CORR_SPAW_RATE,
+
PERF_CORR_NUM_OUTPUT_SENT, PERF_CORR_NUM_INPUT_RECEIVED, PERF_CEP_PROXY_REQ_STARTED,
+
PERF_CEP_PROXY_REQ_COMPLETED, PERF_CEP_PROXY_REQ_FAILED
+
]
+
csv_file = open(f'{self.output}/{OUTFILE_PERF_RAW_DATA}{suffix}.csv', 'w', encoding='utf8')
+
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
+
writer.writeheader()
+
while not stopping.is_set():
+
data = {}
+
# gather performance data
+
# 1) get correlator status
+
corr_status = self.platform.getC8YConnection().do_get('/service/cep/diagnostics/correlator/status', headers={'Accept':'application/json'})
+
+
# 2) get apama-ctrl status
+
apctrl_status = self.platform.getC8YConnection().do_get('/service/cep/diagnostics/apamaCtrlStatus')
+
+
# write data
+
data[PERF_TIMESTAMP] = time.time()
+
data[PERF_MEMORY_CORR] = num(corr_status.get('physicalMemoryMB', 0))
+
data[PERF_MEMORY_APCTRL] = num(apctrl_status.get('apama_ctrl_physical_mb', 0))
+
data[PERF_TOTAL_MEMORY_USAGE] = data[PERF_MEMORY_CORR] + data[PERF_MEMORY_APCTRL]
+
data[PERF_CORR_IQ_SIZE] = num(corr_status.get('numQueuedInput', 0)) # numInputQueuedInput
+
data[PERF_CORR_OQ_SIZE] = num(corr_status.get('numOutEventsQueued', 0))
+
data[PERF_CORR_NUM_OUTPUT_SENT] = num(corr_status.get('numOutEventsSent', 0))
+
data[PERF_CORR_NUM_INPUT_RECEIVED] = num(corr_status.get('numReceived', 0))
+
data[PERF_CORR_SPAW_RATE] = num(corr_status.get('swapPagesRead', 0)) + num(corr_status.get('swapPagesWrite', 0))
+
+
cep_proxy_requests_started = 0
+
cep_proxy_requests_completed = 0
+
cep_proxy_requests_failed = 0
+
cep_proxy_request_counts = apctrl_status.get('cep_proxy_request_counts', {})
+
for key in cep_proxy_request_counts.keys():
+
cep_proxy_requests_started += num(cep_proxy_request_counts[key].get('requestsStarted', 0))
+
cep_proxy_requests_completed += num(cep_proxy_request_counts[key].get('requestsCompleted', 0))
+
cep_proxy_requests_failed += num(cep_proxy_request_counts[key].get('requestsFailed', 0))
+
data[PERF_CEP_PROXY_REQ_STARTED] = cep_proxy_requests_started
+
data[PERF_CEP_PROXY_REQ_COMPLETED] = cep_proxy_requests_completed
+
data[PERF_CEP_PROXY_REQ_FAILED] = cep_proxy_requests_failed
+
+
writer.writerow(data)
+
csv_file.flush()
+
if not stopping.is_set():
+
time.sleep(pollingInterval)
+
except Exception as ex:
+
log.error(f'Exception while gathering performance data: {ex}')
+
raise Exception(f'Exception while gathering performance data: {ex}').with_traceback(ex.__traceback__)
+
finally:
+
if csv_file: csv_file.close()
+
cpu_monitoring_thread.stop()
+
cpu_monitoring_thread.join()
+
self._generatePerfStatistics()
+
log.info('Finished performance monitoring')
+
+
def _monitor_cpu_usage_impl(self, stopping, log):
+
"""
+
Implements CPU usage monitoring thread.
+
+
:param stopping: To check if thread should be stopped.
+
:param log: The logger.
+
"""
+
+
url = '/service/cep/diagnostics/cpuUsageMillicores'
+
+
# check if able to monitor CPU usage (REST url exposed + able to calculate CPU usage)
+
try:
+
cpu_usage = float(self.platform.getC8YConnection().do_get(url + '?sampleDurationMSec=10'))
+
except Exception as ex:
+
log.info("Unable to monitor CPU usage")
+
return
+
+
log.info('Started gathering CPU usage')
+
url = url + '?sampleDurationMSec=2000'
+
suffix = self._perfMonitorSuffix()
+
with open(f'{self.output}/{OUTFILE_PERF_CPU_USAGE}{suffix}.csv', 'w', encoding='utf8') as csv_file:
+
writer = csv.DictWriter(csv_file, fieldnames=['timestamp', PERF_CPU_USAGE_MILLI])
+
writer.writeheader()
+
while not stopping.is_set():
+
try:
+
cpu_usage = self.platform.getC8YConnection().do_get(url)
+
writer.writerow({
+
'timestamp': time.time(),
+
PERF_CPU_USAGE_MILLI: cpu_usage
+
})
+
csv_file.flush()
+
except Exception as e:
+
log.error("Unable to get cpu usage: " + str(e))
+
+
def _generatePerfStatistics(self):
+
"""
+
Generates performance statistics.
+
"""
+
suffix = self._perfMonitorSuffix()
+
# read data points
+
datapoints = {}
+
with open(f'{self.output}/{OUTFILE_PERF_RAW_DATA}{suffix}.csv', 'r', encoding='utf8') as csv_file:
+
csv_reader = csv.DictReader(csv_file)
+
for row in csv_reader:
+
for metric_name in [PERF_TOTAL_MEMORY_USAGE, PERF_MEMORY_CORR, PERF_MEMORY_APCTRL,
+
PERF_CORR_IQ_SIZE, PERF_CORR_OQ_SIZE, PERF_CORR_SPAW_RATE,
+
PERF_CORR_NUM_OUTPUT_SENT, PERF_CORR_NUM_INPUT_RECEIVED, PERF_CEP_PROXY_REQ_STARTED,
+
PERF_CEP_PROXY_REQ_COMPLETED, PERF_CEP_PROXY_REQ_FAILED]:
+
datapoints.setdefault(metric_name, [])
+
datapoints[metric_name].append(float(row[metric_name]))
+
+
cpu_perf_file = f'{self.output}/{OUTFILE_PERF_CPU_USAGE}{suffix}.csv'
+
if os.path.exists(cpu_perf_file):
+
metric_name = PERF_CPU_USAGE_MILLI
+
datapoints.setdefault(metric_name, [])
+
with open(cpu_perf_file, 'r', encoding='utf8') as csv_file:
+
csv_reader = csv.DictReader(csv_file)
+
for row in csv_reader:
+
datapoints[metric_name].append(float(row[metric_name]))
+
+
# Extract counter like values to a separate object. We capture difference between first and last value only for these.
+
counter_values = {}
+
for name in [PERF_CORR_NUM_INPUT_RECEIVED, PERF_CORR_NUM_OUTPUT_SENT, PERF_CEP_PROXY_REQ_STARTED,
+
PERF_CEP_PROXY_REQ_COMPLETED, PERF_CEP_PROXY_REQ_FAILED]:
+
values = datapoints[name]
+
counter_values[name] = int(values[-1]) - int(values[0])
+
del datapoints[name]
+
+
self.write_text(f'{OUTFILE_PERF_COUNTERS}{suffix}.json', json.dumps(counter_values, indent=2), encoding='utf8')
+
self.write_text(f'{OUTFILE_PERF_RAW_DATA}{suffix}.json', json.dumps(datapoints, indent=2), encoding='utf8')
+
+
def percentile(data, percent): # calculate percentile
+
size = len(data)
+
return sorted(data)[int(math.ceil((size * percent) / 100)) - 1]
+
+
# calculate statistics
+
stats = {}
+
for name in datapoints.keys():
+
values = datapoints[name]
+
stats[name] = {}
+
stats[name]['min'] = min(values)
+
stats[name]['max'] = max(values)
+
stats[name]['mean'] = statistics.mean(values)
+
stats[name]['median'] = statistics.median(values)
+
stats[name]['75th_percentile'] = percentile(values, 75)
+
stats[name]['90th_percentile'] = percentile(values, 90)
+
stats[name]['95th_percentile'] = percentile(values, 95)
+
stats[name]['99th_percentile'] = percentile(values, 99)
+
+
self.write_text(f'{OUTFILE_PERF_STATS}{suffix}.json', json.dumps(stats, indent=2), encoding='utf8')
+
+
with open(f'{self.output}/{OUTFILE_PERF_STATS}{suffix}.csv', 'w', encoding='utf8') as csv_file:
+
columns = ['name'] + list(stats[PERF_TOTAL_MEMORY_USAGE].keys())
+
writer = csv.DictWriter(csv_file, fieldnames=columns)
+
writer.writeheader()
+
+
for name in stats.keys():
+
row = {'name':name}
+
for col in columns[1:]:
+
row[col] = stats[name][col]
+
writer.writerow(row)
+
+
[docs] def read_json(self, fileName, fileDirectory=None):
+
"""
+
Reads a JSON file and returns its content.
+
+
:param fileName: Name of the file.
+
:type fileName: str
+
:param fileDirectory: Directory of the file. Use test output directory if not specified.
+
:type fileDirectory: str, optional
+
:return: The decoded content of the file.
+
"""
+
fileDirectory = fileDirectory or self.output
+
return json.loads(pathlib.Path(fileDirectory).joinpath(fileName).read_text(encoding='utf8'))
+
+
def _confirmStableQueueSize(self, queue_name, perf_data, noise_floor=200, ratio_threshold=0.5, discard_fraction=0.2):
+
"""
+
Checks that the queue sizes are stable or coming down. Logs warning if queue sizes are still increasing.
+
+
:param: queue_name: The name of the correlator queue.
+
:param: perf_data: The raw performance data.
+
:param: noise_floor: Skip analysis if difference between queue sizes at end compared to start is less than this.
+
:param: ratio_threshold: Log error if ration of slopes of graph is second half versus first half is more than this.
+
:param: discard_fraction: The amount of data to discard at the beginning and the end before performing analysis.
+
"""
+
values = perf_data['correlator_iq_size' if queue_name == 'input' else 'correlator_oq_size']
+
discard_size = int(len(values)*discard_fraction)
+
values = values[discard_size:-discard_size]
+
if len(values) <= 5:
+
self.log.warn('Not enough datapoints to analyse queue growth')
+
return
+
+
# Check that queue size graph flattens off, or at least starts to flatten off
+
# calculate the segment averages
+
left = values[0 : int(len(values) * 0.35)]
+
left = float(sum(left)) / len(left)
+
mid = values[int(len(values) * 0.35) : int(len(values) * 0.65)]
+
mid = float(sum(mid)) / len(mid)
+
right = values[int(len(values) * 0.65) : int(len(values) * 0.95)]
+
right = float(sum(right)) / len(right)
+
+
# Easy case - it's going downwards for a large part of the graph, or the change is beneath the noise floor
+
if right - left <= noise_floor or right - mid <= noise_floor:
+
return
+
if mid - left == 0 and right - mid == 0:
+
return
+
try:
+
slope_ratio = (right - mid) / (mid - left)
+
except ZeroDivisionError:
+
slope_ratio = float("inf")
+
+
# if slope is not coming down fast enough then most probably queue is going to get full
+
mean_size_towards_end = statistics.mean(values[int(len(values) * 0.85) : int(len(values) * 0.95)])
+
if slope_ratio > ratio_threshold:
+
self.log.error(f'Correlator {queue_name} queue was increasing continuously. It probably would have been full eventually. Mean queue size towards the end was {mean_size_towards_end}')
+
elif slope_ratio > 0.2:
+
self.log.warn(f'Correlator {queue_name} queue was increasing slowly. It probably would have been full eventually. Mean queue size towards the end was {mean_size_towards_end}')
+
# test is not failed because it might be false positive
+
+
def _to_html_list(self, values):
+
"""
+
Generates a HTML list to be embedded in an HTML page from provided values.
+
"""
+
if values is None: return ''
+
result = ''
+
if isinstance(values, dict):
+
for key, value in values.items():
+
result += f'<li><span class="key">{key}: </span>{value}</li>'
+
elif isinstance(values, list):
+
for value in values:
+
result += f'<li>{value}</li>'
+
else:
+
result += f'<li>{str(values)}</li>'
+
return f'<ul>{result}</ul>'
+
+
def _dict_to_html_table(self, values, column_names):
+
"""
+
Generates a HTML table to be embedded in an HTML page from provided dictionary.
+
:param: values: Dictionary of values.
+
:param: column_names: Name of the columns to use.
+
"""
+
result = '<tr><td> </td>'
+
for key in column_names:
+
result += f'<th scope="col">{key}</th>'
+
result += f'</tr>'
+
+
def format_value(val):
+
if isinstance(val, float): return f'{val:0.2f}'
+
else: return str(val)
+
+
for metric_name, row in values.items():
+
result += f'<tr><th scope="row">{metric_name}</th>'
+
if isinstance(row, dict):
+
for c in column_names:
+
val = row.get(c, '')
+
result += f'<td>{format_value(val)}</td>'
+
else:
+
result += f'<td>{format_value(row)}</td>'
+
result += '</tr>'
+
+
return f'<table>{result}</table>'
+
+
[docs] def generateHTMLReport(self, description, testConfigurationDetails=None, extraPerformanceMetrics=None):
+
"""
+
Generates an HTML report of the performance result. The report is generated at the end of the test.
+
+
When testing for multiple variations, multiple HTML reports are combined into a single HTML report.
+
+
:param description: A brief description of the test.
+
:type description: str
+
:param testConfigurationDetails: Details of the test configuration to include in the report.
+
:type testConfigurationDetails: dict, list, str, optional
+
:param extraPerformanceMetrics: Extra application-specific performance metrics to include in the report.
+
:type extraPerformanceMetrics: dict, list, str, optional
+
"""
+
if self.perfMonitorThread.is_alive():
+
self.perfMonitorThread.stop()
+
self.perfMonitorThread.join()
+
+
suffix = self._perfMonitorSuffix()
+
+
if not os.path.exists(f'{self.output}/{OUTFILE_PERF_STATS}{suffix}.json'):
+
self._generatePerfStatistics()
+
+
# method to generate textual representation of a time range
+
def format_time_range(timestamp1, timestamp2):
+
import datetime
+
datetime1 = datetime.datetime.fromtimestamp(timestamp1, datetime.timezone.utc)
+
datetime2 = datetime.datetime.fromtimestamp(timestamp2, datetime.timezone.utc)
+
+
format1 = datetime1.strftime('%a %Y-%m-%d %H:%M:%S') + ' UTC'
+
if datetime1.date()==datetime2.date():
+
format2 = datetime2.strftime('%H:%M:%S')
+
else:
+
format2 = datetime2.strftime('%a %Y-%m-%d %H:%M:%S') + ' UTC'
+
+
delta = datetime2 - datetime1
+
delta = delta-datetime.timedelta(microseconds=delta.microseconds)
+
return f'{format1} to {format2} (={delta})'
+
+
### Now generate codes, html fragments to fill in the template to generate final report
+
+
## Generate HTML for table of standard performance statistics
+
# Read statistics in a single dict
+
table_data = self.read_json(f'{OUTFILE_PERF_STATS}{suffix}.json')
+
# add counter type data to the table as well
+
counters = self.read_json(f'{OUTFILE_PERF_COUNTERS}{suffix}.json')
+
for s in [PERF_CORR_NUM_INPUT_RECEIVED, PERF_CORR_NUM_OUTPUT_SENT]:
+
table_data[s] = counters[s]
+
column_names = list(table_data[PERF_TOTAL_MEMORY_USAGE].keys())
+
+
# prepare dictionary for html using more descriptive names for metrics
+
for key in list(table_data.keys()):
+
table_data[METRICS_DESCRIPTION.get(key, key)] = table_data[key]
+
del table_data[key]
+
standard_perf_stats_table = self._dict_to_html_table(table_data, column_names)
+
+
## Generate HTML list of test configuration
+
test_config_list = self._to_html_list(testConfigurationDetails)
+
+
## Generate HTML for any additional performance metrics
+
additional_perf_statistics = self._to_html_list(extraPerformanceMetrics)
+
if extraPerformanceMetrics:
+
additional_perf_statistics = f'<h4>App Specific Performance Statistics</h4>{additional_perf_statistics}'
+
+
## Generate HTML for graphs
+
# generate data for memory and queue_size graphs
+
queue_data = []
+
memory_data = []
+
cpu_usage_data = []
+
start_time = -1
+
end_time = -1
+
with open(f'{self.output}/{OUTFILE_PERF_RAW_DATA}{suffix}.csv', 'r', encoding='utf8') as csv_file:
+
csv_reader = csv.DictReader(csv_file)
+
for row in csv_reader:
+
timestamp = float(row[PERF_TIMESTAMP])
+
if start_time < 0:
+
start_time = timestamp
+
end_time = timestamp
+
timestamp_milli = 1000.0 * timestamp
+
iq = int(float(row[PERF_CORR_IQ_SIZE]))
+
oq = int(float(row[PERF_CORR_OQ_SIZE]))
+
# generate JavaScript code to create an array of data for a timestamp - [new Date(...), y1_value, y2_value, ...]
+
queue_data.append(f'[new Date({timestamp_milli}),{iq}, {oq}]')
+
memory = float(row[PERF_TOTAL_MEMORY_USAGE])
+
memory_data.append(f'[new Date({timestamp_milli}), {memory}, {float(row[PERF_MEMORY_APCTRL])}, {float(row[PERF_MEMORY_CORR])}]')
+
queue_time_range = memory_time_range = format_time_range(start_time, end_time)
+
+
# generate data for cpu usage graphs
+
if os.path.exists(f'{self.output}/{OUTFILE_PERF_CPU_USAGE}{suffix}.csv'):
+
start_time = -1
+
end_time = -1
+
with open(f'{self.output}/{OUTFILE_PERF_CPU_USAGE}{suffix}.csv', 'r', encoding='utf8') as csv_file:
+
csv_reader = csv.DictReader(csv_file)
+
for row in csv_reader:
+
timestamp = float(row[PERF_TIMESTAMP])
+
if start_time < 0:
+
start_time = timestamp
+
end_time = timestamp
+
timestamp_milli = 1000.0 * timestamp
+
cpu_usage = float(row[PERF_CPU_USAGE_MILLI])
+
cpu_usage_data.append(f'[new Date({timestamp_milli}), {cpu_usage}]')
+
cpu_usage_time_range = format_time_range(start_time, end_time)
+
+
### Generate final report.html file by filling in various details into the table
+
variation_replacements = {
+
'TEST_TITLE': self.descriptor.title,
+
'VARIATION_DESCRIPTION': description,
+
'VARIATION_TITLE': description,
+
'VARIATION_LINKS': '',
+
'VARIATION_ID': '0',
+
'TEST_CONFIGURATION': test_config_list,
+
'STATS_TABLE': standard_perf_stats_table,
+
'ADDITIONAL_PERF_STATISTICS': additional_perf_statistics,
+
'CORRELATOR_QUEUES_DATA': ','.join(queue_data),
+
'CORRELATOR_QUEUES_TIMERANGE': queue_time_range,
+
'MEMORY_DATA': ','.join(memory_data),
+
'MEMORY_TIMERANGE': memory_time_range,
+
'CPU_USAGE_DATA': ','.join(cpu_usage_data),
+
'CPU_USAGE_TIMERANGE': cpu_usage_time_range,
+
}
+
+
self.write_text(f'html_report_data{self._perfMonitorSuffix(False)}.json', json.dumps(variation_replacements, indent=2), encoding='utf8')
+
+
def _generateFinalHTMLReport(self):
+
"""
+
Generates a final HTML report.
+
+
Combines multiple HTML reports into one.
+
"""
+
template_dir = f'{self.project.EPL_TESTING_SDK}/testframework/resources'
+
variation_template = pathlib.Path(f'{template_dir}/template_perf_details.html').read_text(encoding='utf8')
+
+
files = glob.glob(f'{self.output}/html_report_data*.json')
+
files = sorted(files)
+
+
variation_htmls = []
+
variation_links = []
+
env_details = self.read_json(f'{OUTFILE_ENV_DETAILS}.json')
+
for i,f in enumerate(files):
+
f = f.strip()
+
filename = os.path.basename(f)
+
replacements = self.read_json(filename)
+
variation_desc = replacements['VARIATION_DESCRIPTION']
+
replacements['VARIATION_ID'] = f'variation_{i}'
+
replacements['VARIATION_TITLE'] = variation_desc
+
+
variation_html = variation_template
+
for (key, value) in replacements.items():
+
variation_html = variation_html.replace(f'@{key}@', value)
+
if i > 0:
+
variation_html = '<br/><a class="backtotoplink" href="#top_of_the_page">Back to top</a><br/><hr/>' + variation_html
+
variation_htmls.append(variation_html)
+
variation_links.append(f'<a href="#test_variation_variation_{i}">{variation_desc}</a>')
+
+
if len(files) > 1:
+
if 'Uptime (secs)' in env_details:
+
# don't need to log uptime in combined report
+
del env_details['Uptime (secs)']
+
+
variation_links_html = '' if len(variation_links) <= 1 else f'<h2>Variation List</h2>{self._to_html_list(variation_links)}'
+
+
replacements = {
+
'TEST_TITLE': self.descriptor.title,
+
'ENVIRONMENT_DETAILS': self._to_html_list(env_details),
+
'VARIATION_DATA': '\n\n'.join(variation_htmls),
+
'VARIATION_LINKS': variation_links_html,
+
}
+
+
self.copyWithReplace(f'{template_dir}/template_perf_report.html', f'{self.output}/report.html', replacements, marker='@')
+
self.log.info(f'Generated performance report {os.path.abspath(os.path.join(self.output, f"report.html"))}')
+
+
[docs] def validate(self):
+
"""
+
Performs standard validations of the test run.
+
+
The following validations are performed.
+
+
- No errors in the microservice log.
+
- Microservice did not terminate because of high memory usage.
+
- Microservice's memory usage remained below 90% of available memory.
+
- Correlator was not swapping memory.
+
+
The test should define its own `validate` method for performing any application-specific validation. Ensure that
+
the test calls the super implementation of the `validate` method, using `super(PySysTest, self).validate()`.
+
"""
+
logFile = self.platform.getApamaLogFile()
+
+
self.assertGrep(logFile, expr=' (ERROR|FATAL) .*', contains=False)
+
+
# Check that microservice did not use more than 90% of available memory
+
self.assertGrep(logFile, expr='apama_highmemoryusage.*Apama is using 90. of available memory', contains=False)
+
+
# Check that microservice did not exit because of high memory usage
+
self.assertGrep(logFile, expr='(Java exit 137|exit code 137)', contains=False)
+
+
# Check that no request to /cep from cumulocity failed
+
for file in glob.glob(f'{self.output}/{OUTFILE_PERF_COUNTERS}*.json'):
+
perf_counters = self.read_json(file)
+
self.assertThat('num_failed_cep_requests == 0', num_failed_cep_requests=perf_counters[PERF_CEP_PROXY_REQ_FAILED])
+
+
# Check that correlator was not swapping
+
for file in glob.glob(f'{self.output}/{OUTFILE_PERF_STATS}*.json'):
+
perf_stats = self.read_json(file)
+
self.assertThat('correlator_swap_read_write == 0', correlator_swap_read_write=perf_stats[PERF_CORR_SPAW_RATE]['min'])
+
+
# check that mean and median size of input and output queue are reasonable
+
filename = os.path.basename(file).replace(OUTFILE_PERF_STATS, OUTFILE_PERF_RAW_DATA)
+
raw_perf_data = self.read_json(f'{self.output}/{filename}')
+
for (queue, max_size) in [('input', 20_000), ('output', 10_000)]:
+
stats = perf_stats[PERF_CORR_IQ_SIZE if queue == 'input' else PERF_CORR_OQ_SIZE]
+
self.assertThat('median_queue_size < max_queue_size * 0.8', median_queue_size=stats['median'], max_queue_size=max_size)
+
self.assertThat('mean_queue_size < max_queue_size * 0.8', mean_queue_size=stats['mean'], max_queue_size=max_size)
+
self._confirmStableQueueSize(queue, raw_perf_data)
+
+