Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry pipeline improvements (iteration 1) #3133

Merged
merged 16 commits into from
Oct 12, 2024
Merged
76 changes: 46 additions & 30 deletions azurelinuxagent/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,9 @@ def parse_json_event(data_str):

def parse_event(data_str):
try:
try:
return parse_json_event(data_str)
except ValueError:
return parse_xml_event(data_str)
except Exception as e:
raise EventError("Error parsing event: {0}".format(ustr(e)))

return parse_json_event(data_str)
except ValueError:
return parse_xml_event(data_str)
maddieford marked this conversation as resolved.
Show resolved Hide resolved

def parse_xml_param(param_node):
name = getattrib(param_node, "Name")
Expand Down Expand Up @@ -342,11 +338,15 @@ def update_unicode_error(self, unicode_err):
def update_op_error(self, op_err):
self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err)

def get_error_count(self):
return self.__op_error_count + self.__unicode_error_count


class EventLogger(object):
def __init__(self):
self.event_dir = None
self.periodic_events = {}
self.protocol = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not a good idea to have this uninitialized property. let's look for alternatives... maybe get the protocol on demand when sending an important event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending on demand may add other compilations, wherever we send imp event, we need reference for protocol object as well plus we need to add protocol as optional property to add_event method with None.

or if you have something else in mind for on demand, lets have quick call to discuss


#
# All events should have these parameters.
Expand Down Expand Up @@ -492,7 +492,10 @@ def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=Tr
self.periodic_events[h] = datetime.now()

def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True):
message="", log_event=True, flush=False):
"""
:param flush: Flush the event immediately to the wire server
"""

if (not is_success) and log_event:
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -506,12 +509,7 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)

try:
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
self.report_or_save_event(event, flush)

def add_log_event(self, level, message):
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
Expand All @@ -522,11 +520,7 @@ def add_log_event(self, level, message):
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)
try:
self.save_event(json.dumps(data))
except EventError:
pass
self.report_or_save_event(event)

def add_metric(self, category, counter, instance, value, log_event=False):
"""
Expand All @@ -549,11 +543,25 @@ def add_metric(self, category, counter, instance, value, log_event=False):
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
self.add_common_event_parameters(event, datetime.utcnow())

data = get_properties(event)
try:
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
self.report_or_save_event(event)

def report_or_save_event(self, event, flush=False):
"""
Flush the event to wireserver if flush to set to true, else
save it disk if we fail to send or not required to flush immediately.
TODO: pickup as many events as possible and send them in one go.
"""
report_success = False
if flush and self.protocol is not None:
report_success = self.protocol.report_event([event], flush)
maddieford marked this conversation as resolved.
Show resolved Hide resolved

if not report_success:
try:
data = get_properties(event)
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))


@staticmethod
def _clean_up_message(message):
Expand Down Expand Up @@ -634,13 +642,16 @@ def elapsed_milliseconds(utc_start):
(d.microseconds / 1000.0))


def report_event(op, is_success=True, message='', log_event=True):
def report_event(op, is_success=True, message='', log_event=True, flush=False):
"""
:param flush: if true, flush the event immediately to the wire server
"""
add_event(AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op,
log_event=log_event)
log_event=log_event, flush=flush)


def report_periodic(delta, op, is_success=True, message=''):
Expand Down Expand Up @@ -673,12 +684,17 @@ def report_metric(category, counter, instance, value, log_event=False, reporter=
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))


def initialize_event_logger_vminfo_common_parameters(protocol, reporter=__event_logger__):
def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, reporter=__event_logger__):
# Initialize protocal for event logger to directly send events to wireserver
reporter.protocol = protocol
reporter.initialize_vminfo_common_parameters(protocol)


def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, reporter=__event_logger__):
message="", log_event=True, flush=False, reporter=__event_logger__):
"""
:param flush: if true, flush the event immediately to the wire server
"""
if reporter.event_dir is None:
logger.warn("Cannot add event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
Expand All @@ -688,7 +704,7 @@ def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, d
mark_event_status(name, version, op, is_success)
reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message,
log_event=log_event)
log_event=log_event, flush=flush)


def info(op, fmt, *args):
Expand Down Expand Up @@ -721,7 +737,7 @@ def add_log_event(level, message, forced=False, reporter=__event_logger__):
:param message: Message
:param forced: Force write the event even if send_logs_to_telemetry() is disabled
(NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
:param reporter:
:param reporter: The EventLogger instance to which metric events should be sent
:return:
"""
if reporter.event_dir is None:
Expand Down
31 changes: 20 additions & 11 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@
from azurelinuxagent.common.telemetryevent import GuestAgentExtensionEventsSchema
from azurelinuxagent.common.utils import fileutil, restutil
from azurelinuxagent.common.utils.cryptutil import CryptUtil
from azurelinuxagent.common.utils.restutil import TELEMETRY_THROTTLE_DELAY_IN_SECONDS, \
TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS, TELEMETRY_DATA
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
findtext, gettext, remove_bom, get_bytes_from_pem, parse_json
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION

VERSION_INFO_URI = "http://{0}/?comp=versions"
HEALTH_REPORT_URI = "http://{0}/machine?comp=health"
ROLE_PROP_URI = "http://{0}/machine?comp=roleProperties"
TELEMETRY_URI = "http://{0}/machine?comp=telemetrydata"
TELEMETRY_URI = "http://{0}/machine?comp={1}"

PROTOCOL_VERSION = "2012-11-30"
ENDPOINT_FINE_NAME = "WireServer"
Expand Down Expand Up @@ -144,8 +146,8 @@ def report_vm_status(self, vm_status):
self.client.status_blob.set_vm_status(vm_status)
self.client.upload_status_blob()

def report_event(self, events_iterator):
self.client.report_event(events_iterator)
def report_event(self, events_iterator, flush=False):
return self.client.report_event(events_iterator, flush)

def upload_logs(self, logs):
self.client.upload_logs(logs)
Expand Down Expand Up @@ -1047,8 +1049,8 @@ def report_health(self, status, substatus, description):
u",{0}: {1}").format(resp.status,
resp.read()))

def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
uri = TELEMETRY_URI.format(self.get_endpoint())
def _send_encoded_event(self, provider_id, event_str, flush, encoding='utf8'):
uri = TELEMETRY_URI.format(self.get_endpoint(), TELEMETRY_DATA)
data_format_header = ustr('<?xml version="1.0"?><TelemetryData version="1.0"><Provider id="{0}">').format(
provider_id).encode(encoding)
data_format_footer = ustr('</Provider></TelemetryData>').encode(encoding)
Expand All @@ -1059,7 +1061,12 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
header = self.get_header_for_xml_content()
# NOTE: The call to wireserver requests utf-8 encoding in the headers, but the body should not
# be encoded: some nodes in the telemetry pipeline do not support utf-8 encoding.
resp = self.call_wireserver(restutil.http_post, uri, data, header)

# if it's important event flush, we use less throttle delay(to avoid long delay to complete this operation)) on throttling errors
if flush:
resp = self.call_wireserver(restutil.http_post, uri, data, header, max_retry=3, throttle_delay=TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS)
else:
resp = self.call_wireserver(restutil.http_post, uri, data, header, max_retry=3, throttle_delay=TELEMETRY_THROTTLE_DELAY_IN_SECONDS)
except HttpError as e:
raise ProtocolError("Failed to send events:{0}".format(e))

Expand All @@ -1068,14 +1075,14 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
raise ProtocolError(
"Failed to send events:{0}".format(resp.status))

def report_event(self, events_iterator):
def report_event(self, events_iterator, flush=False):
buf = {}
debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT)
events_per_provider = defaultdict(int)

def _send_event(provider_id, debug_info):
def _send_event(provider_id, debug_info, flush):
try:
self.send_encoded_event(provider_id, buf[provider_id])
self._send_encoded_event(provider_id, buf[provider_id], flush)
except UnicodeError as uni_error:
debug_info.update_unicode_error(uni_error)
except Exception as error:
Expand All @@ -1102,7 +1109,7 @@ def _send_event(provider_id, debug_info):
# If buffer is full, send out the events in buffer and reset buffer
if len(buf[event.providerId] + event_str) >= MAX_EVENT_BUFFER_SIZE:
logger.verbose("No of events this request = {0}".format(events_per_provider[event.providerId]))
_send_event(event.providerId, debug_info)
_send_event(event.providerId, debug_info, flush)
buf[event.providerId] = b""
events_per_provider[event.providerId] = 0

Expand All @@ -1117,10 +1124,12 @@ def _send_event(provider_id, debug_info):
for provider_id in list(buf.keys()):
if buf[provider_id]:
logger.verbose("No of events this request = {0}".format(events_per_provider[provider_id]))
_send_event(provider_id, debug_info)
_send_event(provider_id, debug_info, flush)

debug_info.report_debug_info()

return debug_info.get_error_count() == 0

def report_status_event(self, message, is_success):
report_event(op=WALAEventOperation.ReportStatus,
is_success=is_success,
Expand Down
2 changes: 1 addition & 1 deletion azurelinuxagent/common/telemetryevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ def get_version(self):
for param in self.parameters:
if param.name == GuestAgentExtensionEventsSchema.Version:
return param.value
return None
return None
27 changes: 21 additions & 6 deletions azurelinuxagent/common/utils/restutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@

THROTTLE_RETRIES = 25
THROTTLE_DELAY_IN_SECONDS = 1
# Reducing next attempt calls when throttled since telemetrydata endpoint has a limit 15 calls per 15 secs,
TELEMETRY_THROTTLE_DELAY_IN_SECONDS = 8
# Considering short delay for telemetry flush imp events
TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS = 2

REDACTED_TEXT = "<SAS_SIGNATURE>"
SAS_TOKEN_RETRIEVAL_REGEX = re.compile(r'^(https?://[a-zA-Z0-9.].*sig=)([a-zA-Z0-9%-]*)(.*)$')
Expand Down Expand Up @@ -109,6 +113,7 @@
KNOWN_WIRESERVER_IP = '168.63.129.16'
HOST_PLUGIN_PORT = 32526

TELEMETRY_DATA = "telemetrydata"

class IOErrorCounter(object):
_lock = threading.RLock()
Expand Down Expand Up @@ -163,6 +168,10 @@ def _is_retry_exception(e):
def _is_throttle_status(status):
return status in THROTTLE_CODES

def _is_telemetry_req(url):
if TELEMETRY_DATA in url:
return True
return False

def _parse_url(url):
"""
Expand Down Expand Up @@ -364,6 +373,7 @@ def http_request(method,
max_retry=None,
retry_codes=None,
retry_delay=DELAY_IN_SECONDS,
maddieford marked this conversation as resolved.
Show resolved Hide resolved
throttle_delay=THROTTLE_DELAY_IN_SECONDS,
redact_data=False,
return_raw_response=False):
"""
Expand Down Expand Up @@ -427,10 +437,10 @@ def http_request(method,
# (with a safe, minimum number of retry attempts)
# -- Otherwise, compute a delay that is the product of the next
# item in the Fibonacci series and the initial delay value
delay = THROTTLE_DELAY_IN_SECONDS \
if was_throttled \
else _compute_delay(retry_attempt=attempt,
delay=retry_delay)
if was_throttled:
delay = throttle_delay
else:
delay = _compute_delay(retry_attempt=attempt, delay=retry_delay)

logger.verbose("[HTTP Retry] "
"Attempt {0} of {1} will delay {2} seconds: {3}",
Expand Down Expand Up @@ -468,7 +478,10 @@ def http_request(method,
# retry attempts
if _is_throttle_status(resp.status):
was_throttled = True
max_retry = max(max_retry, THROTTLE_RETRIES)
# Today, THROTTLE_RETRIES is set to a large number (26) for retries, as opposed to backing off and attempting fewer retries.
# However, for telemetry calls (due to throttle limit 15 calls per 15 seconds), we use max_retry set by the caller for overall retry attempts instead of THROTTLE_RETRIES.
if not _is_telemetry_req(url):
max_retry = max(max_retry, THROTTLE_RETRIES)
continue

# If we got a 410 (resource gone) for any reason, raise an exception. The caller will handle it by
Expand Down Expand Up @@ -563,6 +576,7 @@ def http_post(url,
max_retry=None,
retry_codes=None,
retry_delay=DELAY_IN_SECONDS,
throttle_delay=THROTTLE_DELAY_IN_SECONDS,
timeout=10):

if max_retry is None:
Expand All @@ -575,7 +589,8 @@ def http_post(url,
use_proxy=use_proxy,
max_retry=max_retry,
retry_codes=retry_codes,
retry_delay=retry_delay)
retry_delay=retry_delay,
throttle_delay=throttle_delay)


def http_put(url,
Expand Down
4 changes: 2 additions & 2 deletions azurelinuxagent/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.fileutil as fileutil

from azurelinuxagent.common.event import add_event, WALAEventOperation, initialize_event_logger_vminfo_common_parameters
from azurelinuxagent.common.event import add_event, WALAEventOperation, initialize_event_logger_vminfo_common_parameters_and_protocol
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.protocol.goal_state import GoalState, GoalStateProperties
Expand Down Expand Up @@ -119,7 +119,7 @@ def initialize_environment(self):

def _initialize_telemetry(self):
protocol = self.protocol_util.get_protocol()
initialize_event_logger_vminfo_common_parameters(protocol)
initialize_event_logger_vminfo_common_parameters_and_protocol(protocol)

def daemon(self, child_args=None):
logger.info("Run daemon")
Expand Down
Loading
Loading