Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry pipeline improvements (iteration 1) #3133

Merged
merged 16 commits into from
Oct 12, 2024
Merged
87 changes: 51 additions & 36 deletions azurelinuxagent/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,15 @@ def update_unicode_error(self, unicode_err):
def update_op_error(self, op_err):
self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err)

def has_no_errors(self):
narrieta marked this conversation as resolved.
Show resolved Hide resolved
return self.__op_error_count == 0 and self.__unicode_error_count == 0


class EventLogger(object):
def __init__(self):
self.event_dir = None
self.periodic_events = {}
self.protocol = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not a good idea to have this uninitialized property. let's look for alternatives... maybe get the protocol on demand when sending an important event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending on demand may add other compilations, wherever we send imp event, we need reference for protocol object as well plus we need to add protocol as optional property to add_event method with None.

or if you have something else in mind for on demand, lets have quick call to discuss


#
# All events should have these parameters.
Expand Down Expand Up @@ -482,16 +486,16 @@ def is_period_elapsed(self, delta, h):
(self.periodic_events[h] + delta) <= datetime.now()

def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0,
version=str(CURRENT_VERSION), message="", log_event=True, force=False):
version=str(CURRENT_VERSION), message="", log_event=True, force=False, immediate_flush=False):
narrieta marked this conversation as resolved.
Show resolved Hide resolved
h = hash(name + op + ustr(is_success) + message)

if force or self.is_period_elapsed(delta, h):
self.add_event(name, op=op, is_success=is_success, duration=duration,
version=version, message=message, log_event=log_event)
version=version, message=message, log_event=log_event, immediate_flush=immediate_flush)
self.periodic_events[h] = datetime.now()

def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True):
message="", log_event=True, immediate_flush=False):

if (not is_success) and log_event:
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -505,14 +509,9 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)

try:
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
self.flush_or_save_event(event, message, immediate_flush)

def add_log_event(self, level, message):
def add_log_event(self, level, message, immediate_flush=False):
narrieta marked this conversation as resolved.
Show resolved Hide resolved
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level]))
Expand All @@ -521,13 +520,9 @@ def add_log_event(self, level, message):
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)
try:
self.save_event(json.dumps(data))
except EventError:
pass
self.flush_or_save_event(event, message, immediate_flush)

def add_metric(self, category, counter, instance, value, log_event=False):
def add_metric(self, category, counter, instance, value, log_event=False, immediate_flush=False):
"""
Create and save an event which contains a telemetry event.

Expand All @@ -536,9 +531,10 @@ def add_metric(self, category, counter, instance, value, log_event=False):
:param str instance: For instanced metrics, the instance identifier (filesystem name, cpu core#, etc.)
:param value: Value of the metric
:param bool log_event: If true, log the collected metric in the agent log
:param immediate_flush: If true, flush the event to wireserver immediately
"""
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
if log_event:
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
_log_event(AGENT_NAME, "METRIC", message, 0)

event = TelemetryEvent(TELEMETRY_METRICS_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
Expand All @@ -548,11 +544,26 @@ def add_metric(self, category, counter, instance, value, log_event=False):
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)
try:
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
self.flush_or_save_event(event, message, immediate_flush)

def flush_or_save_event(self, event, message, immediate_flush):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to report_or_save_event

"""
Flush the event to wireserver if immediate_flush to set to true, else
save it disk if we fail to send or not required to flush immediately.
"""
report_success = False
if immediate_flush and self.protocol is not None:
maddieford marked this conversation as resolved.
Show resolved Hide resolved
report_success = self.protocol.report_event([event])
narrieta marked this conversation as resolved.
Show resolved Hide resolved
if not report_success:
logger.error("Failed to send event: '{0}' directly to Wireserver. So, agent will save it to disk for periodic flush.", message)
narrieta marked this conversation as resolved.
Show resolved Hide resolved

if not report_success:
try:
data = get_properties(event)
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))


@staticmethod
def _clean_up_message(message):
Expand Down Expand Up @@ -633,31 +644,32 @@ def elapsed_milliseconds(utc_start):
(d.microseconds / 1000.0))


def report_event(op, is_success=True, message='', log_event=True):
def report_event(op, is_success=True, message='', log_event=True, immediate_flush=False):
add_event(AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op,
log_event=log_event)
log_event=log_event, immediate_flush=immediate_flush)


def report_periodic(delta, op, is_success=True, message=''):
def report_periodic(delta, op, is_success=True, message='', immediate_flush=False):
add_periodic(delta, AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op)
op=op, immediate_flush=immediate_flush)


def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__):
def report_metric(category, counter, instance, value, log_event=False, immediate_flush=False, reporter=__event_logger__):
"""
Send a telemetry event reporting a single instance of a performance counter.
:param str category: The category of the metric (cpu, memory, etc)
:param str counter: The name of the metric ("%idle", etc)
:param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core#
:param value: The value of the metric
:param bool log_event: If True, log the metric in the agent log as well
:param bool immediate_flush: If True, flush the event to wireserver immediately
:param EventLogger reporter: The EventLogger instance to which metric events should be sent
"""
if reporter.event_dir is None:
Expand All @@ -666,18 +678,20 @@ def report_metric(category, counter, instance, value, log_event=False, reporter=
_log_event(AGENT_NAME, "METRIC", message, 0)
return
try:
reporter.add_metric(category, counter, instance, float(value), log_event)
reporter.add_metric(category, counter, instance, float(value), log_event, immediate_flush)
except ValueError:
logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - "
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))


def initialize_event_logger_vminfo_common_parameters(protocol, reporter=__event_logger__):
def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, reporter=__event_logger__):
# Initialize protocal for event logger to directly send events to wireserver
reporter.protocol = protocol
reporter.initialize_vminfo_common_parameters(protocol)


def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, reporter=__event_logger__):
message="", log_event=True, immediate_flush=False, reporter=__event_logger__):
if reporter.event_dir is None:
logger.warn("Cannot add event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -687,16 +701,17 @@ def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, d
mark_event_status(name, version, op, is_success)
reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message,
log_event=log_event)
log_event=log_event, immediate_flush=immediate_flush)


def add_log_event(level, message, forced=False, reporter=__event_logger__):
def add_log_event(level, message, forced=False, immediate_flush=False, reporter=__event_logger__):
"""
:param level: LoggerLevel of the log event
:param message: Message
:param forced: Force write the event even if send_logs_to_telemetry() is disabled
(NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
:param reporter:
:param immediate_flush: Flush the event immediately
:param reporter: The EventLogger instance to which metric events should be sent
:return:
"""
if reporter.event_dir is None:
Expand All @@ -706,18 +721,18 @@ def add_log_event(level, message, forced=False, reporter=__event_logger__):
return

if level >= logger.LogLevel.WARNING:
reporter.add_log_event(level, message)
reporter.add_log_event(level, message, immediate_flush)


def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, force=False, reporter=__event_logger__):
message="", log_event=True, force=False, immediate_flush=False, reporter=__event_logger__):
if reporter.event_dir is None:
logger.warn("Cannot add periodic event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
return

reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message, log_event=log_event, force=force)
message=message, log_event=log_event, force=force, immediate_flush=immediate_flush)


def mark_event_status(name, version, op, status):
Expand Down
70 changes: 65 additions & 5 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import time
import zipfile

from collections import defaultdict
from collections import defaultdict, deque
from datetime import datetime, timedelta
from xml.sax import saxutils

Expand All @@ -44,6 +44,7 @@
from azurelinuxagent.common.telemetryevent import GuestAgentExtensionEventsSchema
from azurelinuxagent.common.utils import fileutil, restutil
from azurelinuxagent.common.utils.cryptutil import CryptUtil
from azurelinuxagent.common.utils.restutil import RETRY_CODES_FOR_TELEMETRY
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
findtext, gettext, remove_bom, get_bytes_from_pem, parse_json
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
Expand All @@ -62,6 +63,15 @@

_DOWNLOAD_TIMEOUT = timedelta(minutes=5)

# telemetrydata api max calls per 15 secs
# Considered conservative approach and set the value to 12
TELEMETRY_MAX_CALLS_PER_INTERVAL = 12
maddieford marked this conversation as resolved.
Show resolved Hide resolved
TELEMETRY_INTERVAL = 15 # 15 seconds
narrieta marked this conversation as resolved.
Show resolved Hide resolved

# The maximum number of times to retry sending telemetry data
TELEMETRY_MAX_RETRIES = 3
TELEMETRY_DELAY = 1 # 1 second
narrieta marked this conversation as resolved.
Show resolved Hide resolved


class UploadError(HttpError):
pass
Expand Down Expand Up @@ -130,7 +140,7 @@ def report_vm_status(self, vm_status):
self.client.upload_status_blob()

def report_event(self, events_iterator):
self.client.report_event(events_iterator)
return self.client.report_event(events_iterator)

def upload_logs(self, logs):
self.client.upload_logs(logs)
Expand Down Expand Up @@ -535,6 +545,7 @@ def __init__(self, endpoint):
self._goal_state = None
self._host_plugin = None
self.status_blob = StatusBlob(self)
self.telemetry_endpoint_calls_timestamps = deque() # A thread-safe queue to store the timestamps of the telemetry endpoint calls

def get_endpoint(self):
return self._endpoint
Expand Down Expand Up @@ -1033,6 +1044,20 @@ def report_health(self, status, substatus, description):
resp.read()))

def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
narrieta marked this conversation as resolved.
Show resolved Hide resolved
"""
Construct the encoded event and url for telemetry endpoint call
Before calling telemetry endpoint, ensure calls are under throttling limits and checks if the number of telemetry endpoint calls 12 in the last 15 seconds
(Considered 12 instead actual limit 15 as a conservative approach plus it helps for directly flushing events not hit the throttling issues)
(Trade off between delay vs successful event delivery for immediate_flush events)
Note: throttling limit is 15 calls in 15 seconds
"""
def can_make_wireserver_call():
narrieta marked this conversation as resolved.
Show resolved Hide resolved
current_time = datetime.utcnow()
interval_start_timestamp = current_time - timedelta(seconds=TELEMETRY_INTERVAL)
while len(self.telemetry_endpoint_calls_timestamps) > 0 and self.telemetry_endpoint_calls_timestamps[0] < interval_start_timestamp:
self.telemetry_endpoint_calls_timestamps.popleft()
return len(self.telemetry_endpoint_calls_timestamps) < TELEMETRY_MAX_CALLS_PER_INTERVAL

uri = TELEMETRY_URI.format(self.get_endpoint())
data_format_header = ustr('<?xml version="1.0"?><TelemetryData version="1.0"><Provider id="{0}">').format(
provider_id).encode(encoding)
Expand All @@ -1041,10 +1066,28 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
# dividing it into parts.
data = data_format_header + event_str + data_format_footer
try:
header = self.get_header_for_xml_content()
# NOTE: The call to wireserver requests utf-8 encoding in the headers, but the body should not
# be encoded: some nodes in the telemetry pipeline do not support utf-8 encoding.
resp = self.call_wireserver(restutil.http_post, uri, data, header)
header = self.get_header_for_xml_content()

# wait until throttling limit reset to make next call
while not can_make_wireserver_call():
next_call_time = self.telemetry_endpoint_calls_timestamps[0] + timedelta(seconds=TELEMETRY_INTERVAL)
logger.verbose("Reached telemetry endpoint throttling limit: {0}, so waiting to make next call after : {1}".format(TELEMETRY_MAX_CALLS_PER_INTERVAL, next_call_time))
sleep_timedelta = next_call_time - datetime.utcnow()
# timedelta.total_seconds() is not available on Python 2.6, do the computation manually
sleep_seconds = ((sleep_timedelta.days * 24 * 3600 + sleep_timedelta.seconds) * 10.0 ** 6 + sleep_timedelta.microseconds) / 10.0 ** 6
# next_call_time might be in past when we are here, so check if sleep_time is negative
if sleep_seconds > 0:
time.sleep(sleep_seconds)

current_time = datetime.utcnow()
self.telemetry_endpoint_calls_timestamps.append(current_time)

# Since we have throttling limit and also retry logic to pick up the events in next iteration, we set max_retry to 1 on http call to prevent retries on errors.
# Currently, http_request has a logic to reset the max_retry to 26 on throttling errors, so
# setting specific retry codes(which doesn't include throttling codes) to avoid max_retry reset and that will prevent retry in http request on throttling errors
resp = self.call_wireserver(restutil.http_post, uri, data, header, max_retry=1, retry_codes=RETRY_CODES_FOR_TELEMETRY)
except HttpError as e:
raise ProtocolError("Failed to send events:{0}".format(e))

Expand All @@ -1054,13 +1097,27 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
"Failed to send events:{0}".format(resp.status))

def report_event(self, events_iterator):
"""
Report events to the wire server. The events are grouped and sent out in batches well with in the api body limit.
Note: Max body size is 64kb and throttling limit is 15 calls in 15 seconds
narrieta marked this conversation as resolved.
Show resolved Hide resolved
"""
buf = {}
debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT)
events_per_provider = defaultdict(int)

def _send_event(provider_id, debug_info):
# we retry few times and eventually drop the events if we are not able to send them out
error_count = 0
try:
self.send_encoded_event(provider_id, buf[provider_id])
while True:
narrieta marked this conversation as resolved.
Show resolved Hide resolved
try:
self.send_encoded_event(provider_id, buf[provider_id])
break
except Exception:
narrieta marked this conversation as resolved.
Show resolved Hide resolved
error_count += 1
if error_count >= TELEMETRY_MAX_RETRIES:
raise
time.sleep(TELEMETRY_DELAY)
except UnicodeError as uni_error:
debug_info.update_unicode_error(uni_error)
except Exception as error:
Expand Down Expand Up @@ -1106,6 +1163,9 @@ def _send_event(provider_id, debug_info):

debug_info.report_debug_info()

# use debug info to determine if the operation was successful or not, this is needed for immediate_flush events
return debug_info.has_no_errors()

def report_status_event(self, message, is_success):
report_event(op=WALAEventOperation.ReportStatus,
is_success=is_success,
Expand Down
2 changes: 1 addition & 1 deletion azurelinuxagent/common/telemetryevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ def get_version(self):
for param in self.parameters:
if param.name == GuestAgentExtensionEventsSchema.Version:
return param.value
return None
return None
2 changes: 2 additions & 0 deletions azurelinuxagent/common/utils/restutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
httpclient.BadStatusLine
]

RETRY_CODES_FOR_TELEMETRY = list(set(RETRY_CODES) - set(THROTTLE_CODES))

# http://www.gnu.org/software/wget/manual/html_node/Proxies.html
HTTP_PROXY_ENV = "http_proxy"
HTTPS_PROXY_ENV = "https_proxy"
Expand Down
Loading
Loading