Skip to content

Commit

Permalink
Merge pull request #2783 from Azure/release-2.9.0.0
Browse files Browse the repository at this point in the history
Merge release-2.9.0.0 to master
  • Loading branch information
nagworld9 authored Mar 9, 2023
2 parents 44701b8 + de77b1e commit dce0341
Show file tree
Hide file tree
Showing 117 changed files with 2,834 additions and 1,455 deletions.
3 changes: 2 additions & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
1
# See https://help.github.com/articles/about-codeowners/
# for more info about CODEOWNERS file

Expand All @@ -20,4 +21,4 @@
#
# Linux Agent team
#
* @narrieta @larohra @kevinclark19a @ZhidongPeng @dhivyaganesan @nagworld9
* @narrieta @ZhidongPeng @nagworld9
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ _Default: y_

If set, agent logs will be periodically collected and uploaded to a secure location for improved supportability.

NOTE: This feature is only supported ubuntu 16.04+; this flag will not take effect on any other distro.
NOTE: This feature relies on the agent's resource usage features (cgroups); this flag will not take effect on any distro not supported.

#### __Logs.CollectPeriod__

Expand Down
31 changes: 20 additions & 11 deletions azurelinuxagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
PY_VERSION_MAJOR, PY_VERSION_MINOR, \
PY_VERSION_MICRO, GOAL_STATE_AGENT_VERSION, \
get_daemon_version, set_daemon_version
from azurelinuxagent.ga.collect_logs import CollectLogsHandler
from azurelinuxagent.ga.collect_logs import CollectLogsHandler, get_log_collector_monitor_handler
from azurelinuxagent.pa.provision.default import ProvisionHandler


Expand Down Expand Up @@ -196,36 +196,45 @@ def show_configuration(self):
print("{0} = {1}".format(k, configuration[k]))

def collect_logs(self, is_full_mode):
logger.set_prefix("LogCollector")

if is_full_mode:
print("Running log collector mode full")
logger.info("Running log collector mode full")
else:
print("Running log collector mode normal")
logger.info("Running log collector mode normal")

# Check the cgroups unit
cpu_cgroup_path, memory_cgroup_path, log_collector_monitor = None, None, None
if CollectLogsHandler.should_validate_cgroups():
cpu_cgroup_path, memory_cgroup_path = SystemdCgroupsApi.get_process_cgroup_relative_paths("self")
cgroups_api = SystemdCgroupsApi()
cpu_cgroup_path, memory_cgroup_path = cgroups_api.get_process_cgroup_paths("self")

cpu_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in cpu_cgroup_path)
memory_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in memory_cgroup_path)

if not cpu_slice_matches or not memory_slice_matches:
print("The Log Collector process is not in the proper cgroups:")
logger.info("The Log Collector process is not in the proper cgroups:")
if not cpu_slice_matches:
print("\tunexpected cpu slice")
logger.info("\tunexpected cpu slice")
if not memory_slice_matches:
print("\tunexpected memory slice")
logger.info("\tunexpected memory slice")

sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

try:
log_collector = LogCollector(is_full_mode)
log_collector = LogCollector(is_full_mode, cpu_cgroup_path, memory_cgroup_path)
log_collector_monitor = get_log_collector_monitor_handler(log_collector.cgroups)
log_collector_monitor.run()
archive = log_collector.collect_logs_and_get_archive()
print("Log collection successfully completed. Archive can be found at {0} "
logger.info("Log collection successfully completed. Archive can be found at {0} "
"and detailed log output can be found at {1}".format(archive, OUTPUT_RESULTS_FILE_PATH))
except Exception as e:
print("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
print("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
logger.error("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
logger.info("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
sys.exit(1)
finally:
if log_collector_monitor is not None:
log_collector_monitor.stop()

@staticmethod
def setup_firewall(firewall_metadata):
Expand Down
151 changes: 123 additions & 28 deletions azurelinuxagent/common/cgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,61 @@
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
from collections import namedtuple

import errno
import os
import re
from datetime import timedelta

from azurelinuxagent.common import logger
from azurelinuxagent.common import logger, conf
from azurelinuxagent.common.exception import CGroupsException
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.utils import fileutil

_REPORT_EVERY_HOUR = timedelta(hours=1)
_DEFAULT_REPORT_PERIOD = timedelta(seconds=conf.get_cgroup_check_period())

AGENT_NAME_TELEMETRY = "walinuxagent.service" # Name used for telemetry; it needs to be consistent even if the name of the service changes
AGENT_LOG_COLLECTOR = "azure-walinuxagent-logcollector"


class CounterNotFound(Exception):
pass


class MetricValue(object):

"""
Class for defining all the required metric fields to send telemetry.
"""

def __init__(self, category, counter, instance, value, report_period=_DEFAULT_REPORT_PERIOD):
self._category = category
self._counter = counter
self._instance = instance
self._value = value
self._report_period = report_period

@property
def category(self):
return self._category

@property
def counter(self):
return self._counter

@property
def instance(self):
return self._instance

MetricValue = namedtuple('Metric', ['category', 'counter', 'instance', 'value'])
@property
def value(self):
return self._value

@property
def report_period(self):
return self._report_period


class MetricsCategory(object):
Expand All @@ -40,6 +80,9 @@ class MetricsCounter(object):
TOTAL_MEM_USAGE = "Total Memory Usage"
MAX_MEM_USAGE = "Max Memory Usage"
THROTTLED_TIME = "Throttled Time"
SWAP_MEM_USAGE = "Swap Memory Usage"
AVAILABLE_MEM = "Available MBytes"
USED_MEM = "Used MBytes"


re_user_system_times = re.compile(r'user (\d+)\nsystem (\d+)\n')
Expand Down Expand Up @@ -166,12 +209,13 @@ def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False):
#
match = re_user_system_times.match(cpuacct_stat)
if not match:
raise CGroupsException("The contents of {0} are invalid: {1}".format(self._get_cgroup_file('cpuacct.stat'), cpuacct_stat))
raise CGroupsException(
"The contents of {0} are invalid: {1}".format(self._get_cgroup_file('cpuacct.stat'), cpuacct_stat))
cpu_ticks = int(match.groups()[0]) + int(match.groups()[1])

return cpu_ticks

def _get_throttled_time(self):
def get_throttled_time(self):
try:
with open(os.path.join(self.path, 'cpu.stat')) as cpu_stat:
#
Expand Down Expand Up @@ -205,7 +249,7 @@ def initialize_cpu_usage(self):
raise CGroupsException("initialize_cpu_usage() should be invoked only once")
self._current_cgroup_cpu = self._get_cpu_ticks(allow_no_such_file_or_directory_error=True)
self._current_system_cpu = self._osutil.get_total_cpu_ticks_since_boot()
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

def get_cpu_usage(self):
"""
Expand All @@ -229,70 +273,121 @@ def get_cpu_usage(self):

return round(100.0 * self._osutil.get_processor_cores() * float(cgroup_delta) / float(system_delta), 3)

def get_throttled_time(self):
def get_cpu_throttled_time(self, read_previous_throttled_time=True):
"""
Computes the throttled time (in seconds) since the last call to this function.
NOTE: initialize_cpu_usage() must be invoked before calling this function
Compute only current throttled time if read_previous_throttled_time set to False
"""
if not read_previous_throttled_time:
return float(self.get_throttled_time() / 1E9)

if not self._cpu_usage_initialized():
raise CGroupsException("initialize_cpu_usage() must be invoked before the first call to get_throttled_time()")
raise CGroupsException(
"initialize_cpu_usage() must be invoked before the first call to get_throttled_time()")

self._previous_throttled_time = self._current_throttled_time
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

return float(self._current_throttled_time - self._previous_throttled_time) / 1E9

def get_tracked_metrics(self, **kwargs):
tracked = []
cpu_usage = self.get_cpu_usage()
if cpu_usage >= float(0):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.PROCESSOR_PERCENT_TIME, self.name, cpu_usage))
tracked.append(
MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.PROCESSOR_PERCENT_TIME, self.name, cpu_usage))

if 'track_throttled_time' in kwargs and kwargs['track_throttled_time']:
throttled_time = self.get_throttled_time()
throttled_time = self.get_cpu_throttled_time()
if cpu_usage >= float(0) and throttled_time >= float(0):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.THROTTLED_TIME, self.name, throttled_time))
tracked.append(
MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.THROTTLED_TIME, self.name, throttled_time))

return tracked


class MemoryCgroup(CGroup):
def __init__(self, name, cgroup_path):
super(MemoryCgroup, self).__init__(name, cgroup_path)

self._counter_not_found_error_count = 0

def _get_memory_stat_counter(self, counter_name):
try:
with open(os.path.join(self.path, 'memory.stat')) as memory_stat:
# cat /sys/fs/cgroup/memory/azure.slice/memory.stat
# cache 67178496
# rss 42340352
# rss_huge 6291456
# swap 0
for line in memory_stat:
re_memory_counter = r'{0}\s+(\d+)'.format(counter_name)
match = re.match(re_memory_counter, line)
if match is not None:
return int(match.groups()[0])
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
raise
raise CGroupsException("Failed to read memory.stat: {0}".format(ustr(e)))
except Exception as e:
raise CGroupsException("Failed to read memory.stat: {0}".format(ustr(e)))

raise CounterNotFound("Cannot find counter: {0}".format(counter_name))

def get_memory_usage(self):
"""
Collect memory.usage_in_bytes from the cgroup.
Collect RSS+CACHE from memory.stat cgroup.
:return: Memory usage in bytes
:rtype: int
"""
usage = None
try:
usage = self._get_parameters('memory.usage_in_bytes', first_line_only=True)
except Exception as e:
if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT: # pylint: disable=E1101
raise
raise CGroupsException("Exception while attempting to read {0}".format("memory.usage_in_bytes"), e)

return int(usage)
cache = self._get_memory_stat_counter("cache")
rss = self._get_memory_stat_counter("rss")
return cache + rss

def try_swap_memory_usage(self):
"""
Collect SWAP from memory.stat cgroup.
:return: Memory usage in bytes
:rtype: int
Note: stat file is the only place to get the SWAP since other swap related file memory.memsw.usage_in_bytes is for total Memory+SWAP.
"""
try:
return self._get_memory_stat_counter("swap")
except CounterNotFound as e:
if self._counter_not_found_error_count < 1:
logger.periodic_info(logger.EVERY_HALF_HOUR,
'Could not find swap counter from "memory.stat" file in the cgroup: {0}.'
' Internal error: {1}'.format(self.path, ustr(e)))
self._counter_not_found_error_count += 1
return 0

def get_max_memory_usage(self):
"""
Collect memory.usage_in_bytes from the cgroup.
Collect memory.max_usage_in_bytes from the cgroup.
:return: Memory usage in bytes
:rtype: int
"""
usage = None
usage = 0
try:
usage = self._get_parameters('memory.max_usage_in_bytes', first_line_only=True)
usage = int(self._get_parameters('memory.max_usage_in_bytes', first_line_only=True))
except Exception as e:
if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT: # pylint: disable=E1101
raise
raise CGroupsException("Exception while attempting to read {0}".format("memory.usage_in_bytes"), e)
raise CGroupsException("Exception while attempting to read {0}".format("memory.max_usage_in_bytes"), e)

return int(usage)
return usage

def get_tracked_metrics(self, **_):
return [
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.TOTAL_MEM_USAGE, self.name, self.get_memory_usage()),
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.MAX_MEM_USAGE, self.name, self.get_max_memory_usage()),
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.TOTAL_MEM_USAGE, self.name,
self.get_memory_usage()),
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.MAX_MEM_USAGE, self.name,
self.get_max_memory_usage(), _REPORT_EVERY_HOUR),
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.SWAP_MEM_USAGE, self.name,
self.try_swap_memory_usage(), _REPORT_EVERY_HOUR)
]
26 changes: 18 additions & 8 deletions azurelinuxagent/common/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import uuid

from azurelinuxagent.common import logger
from azurelinuxagent.common.cgroup import CpuCgroup
from azurelinuxagent.common.cgroup import CpuCgroup, MemoryCgroup
from azurelinuxagent.common.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.common.conf import get_agent_pid_file_path
from azurelinuxagent.common.exception import CGroupsException, ExtensionErrorCodes, ExtensionError, \
Expand Down Expand Up @@ -59,9 +59,7 @@ def cgroups_supported():
distro_version = FlexibleVersion(distro_info[1])
except ValueError:
return False
return ((distro_name.lower() == 'ubuntu' and distro_version.major >= 16) or
(distro_name.lower() in ("centos", "redhat") and
((distro_version.major == 7 and distro_version.minor >= 4) or distro_version.major >= 8)))
return distro_name.lower() == 'ubuntu' and distro_version.major >= 16

@staticmethod
def track_cgroups(extension_cgroups):
Expand Down Expand Up @@ -265,7 +263,10 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
extension_slice_name = self.get_extension_slice_name(extension_name)
with self._systemd_run_commands_lock:
process = subprocess.Popen( # pylint: disable=W1509
"systemd-run --unit={0} --scope --slice={1} {2}".format(scope, extension_slice_name, command),
# Some distros like ubuntu20 by default cpu and memory accounting enabled. Thus create nested cgroups under the extension slice
# So disabling CPU and Memory accounting prevents from creating nested cgroups, so that all the counters will be present in extension Cgroup
# since slice unit file configured with accounting enabled.
"systemd-run --property=CPUAccounting=no --property=MemoryAccounting=no --unit={0} --scope --slice={1} {2}".format(scope, extension_slice_name, command),
shell=shell,
cwd=cwd,
stdout=stdout,
Expand All @@ -280,16 +281,25 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh

logger.info("Started extension in unit '{0}'", scope_name)

cpu_cgroup = None
try:
cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name)

cpu_cgroup_mountpoint, _ = self.get_cgroup_mount_points()
cpu_cgroup_mountpoint, memory_cgroup_mountpoint = self.get_cgroup_mount_points()

if cpu_cgroup_mountpoint is None:
logger.info("The CPU controller is not mounted; will not track resource usage")
else:
cpu_cgroup_path = os.path.join(cpu_cgroup_mountpoint, cgroup_relative_path)
CGroupsTelemetry.track_cgroup(CpuCgroup(extension_name, cpu_cgroup_path))
cpu_cgroup = CpuCgroup(extension_name, cpu_cgroup_path)
CGroupsTelemetry.track_cgroup(cpu_cgroup)

if memory_cgroup_mountpoint is None:
logger.info("The Memory controller is not mounted; will not track resource usage")
else:
memory_cgroup_path = os.path.join(memory_cgroup_mountpoint, cgroup_relative_path)
memory_cgroup = MemoryCgroup(extension_name, memory_cgroup_path)
CGroupsTelemetry.track_cgroup(memory_cgroup)

except IOError as e:
if e.errno == 2: # 'No such file or directory'
Expand All @@ -301,7 +311,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
# Wait for process completion or timeout
try:
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout,
stderr=stderr, error_code=error_code)
stderr=stderr, error_code=error_code, cpu_cgroup=cpu_cgroup)
except ExtensionError as e:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
Expand Down
Loading

0 comments on commit dce0341

Please sign in to comment.