1# Copyright 2018 Microsoft Corporation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# Requires Python 2.6+ and Openssl 1.0+ 16# 17 18import atexit 19import json 20import os 21import platform 22import re 23import sys 24import threading 25import time 26import traceback 27from datetime import datetime 28 29import azurelinuxagent.common.conf as conf 30import azurelinuxagent.common.logger as logger 31from azurelinuxagent.common.AgentGlobals import AgentGlobals 32from azurelinuxagent.common.exception import EventError, OSUtilError 33from azurelinuxagent.common.future import ustr 34from azurelinuxagent.common.datacontract import get_properties, set_properties 35from azurelinuxagent.common.osutil import get_osutil 36from azurelinuxagent.common.telemetryevent import TelemetryEventParam, TelemetryEvent, CommonTelemetryEventSchema, \ 37 GuestAgentGenericLogsSchema, GuestAgentExtensionEventsSchema, GuestAgentPerfCounterEventsSchema 38from azurelinuxagent.common.utils import fileutil, textutil 39from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib 40from azurelinuxagent.common.version import CURRENT_VERSION, CURRENT_AGENT, AGENT_NAME, DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, AGENT_EXECUTION_MODE 41from azurelinuxagent.common.protocol.imds import get_imds_client 42 43EVENTS_DIRECTORY = "events" 44 45_EVENT_MSG = "Event: name={0}, op={1}, message={2}, duration={3}" 46TELEMETRY_EVENT_PROVIDER_ID = "69B669B9-4AF8-4C50-BDC4-6006FA76E975" 47TELEMETRY_EVENT_EVENT_ID = 1 48TELEMETRY_METRICS_EVENT_ID = 4 49 50TELEMETRY_LOG_PROVIDER_ID = "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F" 51TELEMETRY_LOG_EVENT_ID = 7 52 53# 54# When this flag is enabled the TODO comment in Logger.log() needs to be addressed; also the tests 55# marked with "Enable this test when SEND_LOGS_TO_TELEMETRY is enabled" should be enabled. 56# 57SEND_LOGS_TO_TELEMETRY = False 58 59MAX_NUMBER_OF_EVENTS = 1000 60 61AGENT_EVENT_FILE_EXTENSION = '.waagent.tld' 62EVENT_FILE_REGEX = re.compile(r'(?P<agent_event>\.waagent)?\.tld$') 63 64def send_logs_to_telemetry(): 65 return SEND_LOGS_TO_TELEMETRY 66 67 68class WALAEventOperation: # pylint: disable=no-init 69 ActivateResourceDisk = "ActivateResourceDisk" 70 AgentBlacklisted = "AgentBlacklisted" 71 AgentEnabled = "AgentEnabled" 72 ArtifactsProfileBlob = "ArtifactsProfileBlob" 73 CGroupsCleanUp = "CGroupsCleanUp" 74 CGroupsDebug = "CGroupsDebug" 75 CGroupsDisabled = "CGroupsDisabled" 76 CGroupsInfo = "CGroupsInfo" 77 CGroupsInitialize = "CGroupsInitialize" 78 CGroupsLimitsCrossed = "CGroupsLimitsCrossed" 79 CollectEventErrors = "CollectEventErrors" 80 CollectEventUnicodeErrors = "CollectEventUnicodeErrors" 81 ConfigurationChange = "ConfigurationChange" 82 CustomData = "CustomData" 83 DefaultChannelChange = "DefaultChannelChange" 84 Deploy = "Deploy" 85 Disable = "Disable" 86 Downgrade = "Downgrade" 87 Download = "Download" 88 Enable = "Enable" 89 ExtensionProcessing = "ExtensionProcessing" 90 ExtensionTelemetryEventProcessing = "ExtensionTelemetryEventProcessing" 91 FetchGoalState = "FetchGoalState" 92 Firewall = "Firewall" 93 HealthCheck = "HealthCheck" 94 HealthObservation = "HealthObservation" 95 HeartBeat = "HeartBeat" 96 HostPlugin = "HostPlugin" 97 HostPluginHeartbeat = "HostPluginHeartbeat" 98 HostPluginHeartbeatExtended = "HostPluginHeartbeatExtended" 99 HttpErrors = "HttpErrors" 100 ImdsHeartbeat = "ImdsHeartbeat" 101 Install = "Install" 102 InitializeHostPlugin = "InitializeHostPlugin" 103 Log = "Log" 104 LogCollection = "LogCollection" 105 OSInfo = "OSInfo" 106 Partition = "Partition" 107 PersistFirewallRules = "PersistFirewallRules" 108 PluginSettingsVersionMismatch = "PluginSettingsVersionMismatch" 109 InvalidExtensionConfig = "InvalidExtensionConfig" 110 ProcessGoalState = "ProcessGoalState" 111 Provision = "Provision" 112 ProvisionGuestAgent = "ProvisionGuestAgent" 113 RemoteAccessHandling = "RemoteAccessHandling" 114 ReportEventErrors = "ReportEventErrors" 115 ReportEventUnicodeErrors = "ReportEventUnicodeErrors" 116 ReportStatus = "ReportStatus" 117 ReportStatusExtended = "ReportStatusExtended" 118 Restart = "Restart" 119 SequenceNumberMismatch = "SequenceNumberMismatch" 120 SetCGroupsLimits = "SetCGroupsLimits" 121 SkipUpdate = "SkipUpdate" 122 StatusProcessing = "StatusProcessing" 123 UnhandledError = "UnhandledError" 124 UnInstall = "UnInstall" 125 Unknown = "Unknown" 126 Upgrade = "Upgrade" 127 Update = "Update" 128 129 130SHOULD_ENCODE_MESSAGE_LEN = 80 131SHOULD_ENCODE_MESSAGE_OP = [ 132 WALAEventOperation.Disable, 133 WALAEventOperation.Enable, 134 WALAEventOperation.Install, 135 WALAEventOperation.UnInstall, 136] 137 138 139class EventStatus(object): 140 EVENT_STATUS_FILE = "event_status.json" 141 142 def __init__(self): 143 self._path = None 144 self._status = {} 145 146 def clear(self): 147 self._status = {} 148 self._save() 149 150 def event_marked(self, name, version, op): 151 return self._event_name(name, version, op) in self._status 152 153 def event_succeeded(self, name, version, op): 154 event = self._event_name(name, version, op) 155 if event not in self._status: 156 return True 157 return self._status[event] is True 158 159 def initialize(self, status_dir=conf.get_lib_dir()): 160 self._path = os.path.join(status_dir, EventStatus.EVENT_STATUS_FILE) 161 self._load() 162 163 def mark_event_status(self, name, version, op, status): 164 event = self._event_name(name, version, op) 165 self._status[event] = (status is True) 166 self._save() 167 168 def _event_name(self, name, version, op): 169 return "{0}-{1}-{2}".format(name, version, op) 170 171 def _load(self): 172 try: 173 self._status = {} 174 if os.path.isfile(self._path): 175 with open(self._path, 'r') as f: 176 self._status = json.load(f) 177 except Exception as e: 178 logger.warn("Exception occurred loading event status: {0}".format(e)) 179 self._status = {} 180 181 def _save(self): 182 try: 183 with open(self._path, 'w') as f: 184 json.dump(self._status, f) 185 except Exception as e: 186 logger.warn("Exception occurred saving event status: {0}".format(e)) 187 188 189__event_status__ = EventStatus() 190__event_status_operations__ = [ 191 WALAEventOperation.ReportStatus 192 ] 193 194 195def parse_json_event(data_str): 196 data = json.loads(data_str) 197 event = TelemetryEvent() 198 set_properties("TelemetryEvent", event, data) 199 event.file_type = "json" 200 return event 201 202 203def parse_event(data_str): 204 try: 205 try: 206 return parse_json_event(data_str) 207 except ValueError: 208 return parse_xml_event(data_str) 209 except Exception as e: 210 raise EventError("Error parsing event: {0}".format(ustr(e))) 211 212 213def parse_xml_param(param_node): 214 name = getattrib(param_node, "Name") 215 value_str = getattrib(param_node, "Value") 216 attr_type = getattrib(param_node, "T") 217 value = value_str 218 if attr_type == 'mt:uint64': 219 value = int(value_str) 220 elif attr_type == 'mt:bool': 221 value = bool(value_str) 222 elif attr_type == 'mt:float64': 223 value = float(value_str) 224 return TelemetryEventParam(name, value) 225 226 227def parse_xml_event(data_str): 228 try: 229 xml_doc = parse_doc(data_str) 230 event_id = getattrib(find(xml_doc, "Event"), 'id') 231 provider_id = getattrib(find(xml_doc, "Provider"), 'id') 232 event = TelemetryEvent(event_id, provider_id) 233 param_nodes = findall(xml_doc, 'Param') 234 for param_node in param_nodes: 235 event.parameters.append(parse_xml_param(param_node)) 236 event.file_type = "xml" 237 return event 238 except Exception as e: 239 raise ValueError(ustr(e)) 240 241 242def _encode_message(op, message): 243 """ 244 Gzip and base64 encode a message based on the operation. 245 246 The intent of this message is to make the logs human readable and include the 247 stdout/stderr from extension operations. Extension operations tend to generate 248 a lot of noise, which makes it difficult to parse the line-oriented waagent.log. 249 The compromise is to encode the stdout/stderr so we preserve the data and do 250 not destroy the line oriented nature. 251 252 The data can be recovered using the following command: 253 254 $ echo '<encoded data>' | base64 -d | pigz -zd 255 256 You may need to install the pigz command. 257 258 :param op: Operation, e.g. Enable or Install 259 :param message: Message to encode 260 :return: gzip'ed and base64 encoded message, or the original message 261 """ 262 263 if len(message) == 0: 264 return message 265 266 if op not in SHOULD_ENCODE_MESSAGE_OP: 267 return message 268 269 try: 270 return textutil.compress(message) 271 except Exception: 272 # If the message could not be encoded a dummy message ('<>') is returned. 273 # The original message was still sent via telemetry, so all is not lost. 274 return "<>" 275 276 277def _log_event(name, op, message, duration, is_success=True): 278 global _EVENT_MSG # pylint: disable=W0603 279 280 message = _encode_message(op, message) 281 if not is_success: 282 logger.error(_EVENT_MSG, name, op, message, duration) 283 else: 284 logger.info(_EVENT_MSG, name, op, message, duration) 285 286 287class CollectOrReportEventDebugInfo(object): 288 """ 289 This class is used for capturing and reporting debug info that is captured during event collection and 290 reporting to wireserver. 291 It captures the count of unicode errors and any unexpected errors and also a subset of errors with stacks to help 292 with debugging any potential issues. 293 """ 294 __MAX_ERRORS_TO_REPORT = 5 295 OP_REPORT = "Report" 296 OP_COLLECT = "Collect" 297 298 def __init__(self, operation=OP_REPORT): 299 self.__unicode_error_count = 0 300 self.__unicode_errors = set() 301 self.__op_error_count = 0 302 self.__op_errors = set() 303 304 if operation == self.OP_REPORT: 305 self.__unicode_error_event = WALAEventOperation.ReportEventUnicodeErrors 306 self.__op_errors_event = WALAEventOperation.ReportEventErrors 307 elif operation == self.OP_COLLECT: 308 self.__unicode_error_event = WALAEventOperation.CollectEventUnicodeErrors 309 self.__op_errors_event = WALAEventOperation.CollectEventErrors 310 311 def report_debug_info(self): 312 313 def report_dropped_events_error(count, errors, operation_name): 314 err_msg_format = "DroppedEventsCount: {0}\nReasons (first {1} errors): {2}" 315 if count > 0: 316 add_event(op=operation_name, 317 message=err_msg_format.format(count, CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT, ', '.join(errors)), 318 is_success=False) 319 320 report_dropped_events_error(self.__op_error_count, self.__op_errors, self.__op_errors_event) 321 report_dropped_events_error(self.__unicode_error_count, self.__unicode_errors, self.__unicode_error_event) 322 323 @staticmethod 324 def _update_errors_and_get_count(error_count, errors, error): 325 error_count += 1 326 if len(errors) < CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT: 327 errors.add("{0}: {1}".format(ustr(error), traceback.format_exc())) 328 return error_count 329 330 def update_unicode_error(self, unicode_err): 331 self.__unicode_error_count = self._update_errors_and_get_count(self.__unicode_error_count, self.__unicode_errors, 332 unicode_err) 333 334 def update_op_error(self, op_err): 335 self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err) 336 337 338class EventLogger(object): 339 def __init__(self): 340 self.event_dir = None 341 self.periodic_events = {} 342 343 # 344 # All events should have these parameters. 345 # 346 # The first set comes from the current OS and is initialized here. These values don't change during 347 # the agent's lifetime. 348 # 349 # The next two sets come from the goal state and IMDS and must be explicitly initialized using 350 # initialize_vminfo_common_parameters() once a protocol for communication with the host has been 351 # created. Their values don't change during the agent's lifetime. Note that we initialize these 352 # parameters here using dummy values (*_UNINITIALIZED) since events sent to the host should always 353 # match the schema defined for them in the telemetry pipeline. 354 # 355 # There is another set of common parameters that must be computed at the time the event is created 356 # (e.g. the timestamp and the container ID); those are added to events (along with the parameters 357 # below) in _add_common_event_parameters() 358 # 359 # Note that different kinds of events may also include other parameters; those are added by the 360 # corresponding add_* method (e.g. add_metric for performance metrics). 361 # 362 self._common_parameters = [] 363 364 # Parameters from OS 365 osutil = get_osutil() 366 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.OSVersion, EventLogger._get_os_version())) 367 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ExecutionMode, AGENT_EXECUTION_MODE)) 368 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RAM, int(EventLogger._get_ram(osutil)))) 369 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Processors, int(EventLogger._get_processors(osutil)))) 370 371 # Parameters from goal state 372 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.TenantName, "TenantName_UNINITIALIZED")) 373 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleName, "RoleName_UNINITIALIZED")) 374 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleInstanceName, "RoleInstanceName_UNINITIALIZED")) 375 # 376 # # Parameters from IMDS 377 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Location, "Location_UNINITIALIZED")) 378 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.SubscriptionId, "SubscriptionId_UNINITIALIZED")) 379 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ResourceGroupName, "ResourceGroupName_UNINITIALIZED")) 380 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.VMId, "VMId_UNINITIALIZED")) 381 self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ImageOrigin, 0)) 382 383 @staticmethod 384 def _get_os_version(): 385 return "{0}:{1}-{2}-{3}:{4}".format(platform.system(), DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, platform.release()) 386 387 @staticmethod 388 def _get_ram(osutil): 389 try: 390 return osutil.get_total_mem() 391 except OSUtilError as e: 392 logger.warn("Failed to get RAM info; will be missing from telemetry: {0}", ustr(e)) 393 return 0 394 395 @staticmethod 396 def _get_processors(osutil): 397 try: 398 return osutil.get_processor_cores() 399 except OSUtilError as e: 400 logger.warn("Failed to get Processors info; will be missing from telemetry: {0}", ustr(e)) 401 return 0 402 403 def initialize_vminfo_common_parameters(self, protocol): 404 """ 405 Initializes the common parameters that come from the goal state and IMDS 406 """ 407 # create an index of the event parameters for faster updates 408 parameters = {} 409 for p in self._common_parameters: 410 parameters[p.name] = p 411 412 try: 413 vminfo = protocol.get_vminfo() 414 parameters[CommonTelemetryEventSchema.TenantName].value = vminfo.tenantName 415 parameters[CommonTelemetryEventSchema.RoleName].value = vminfo.roleName 416 parameters[CommonTelemetryEventSchema.RoleInstanceName].value = vminfo.roleInstanceName 417 except Exception as e: 418 logger.warn("Failed to get VM info from goal state; will be missing from telemetry: {0}", ustr(e)) 419 420 try: 421 imds_client = get_imds_client(protocol.get_endpoint()) 422 imds_info = imds_client.get_compute() 423 parameters[CommonTelemetryEventSchema.Location].value = imds_info.location 424 parameters[CommonTelemetryEventSchema.SubscriptionId].value = imds_info.subscriptionId 425 parameters[CommonTelemetryEventSchema.ResourceGroupName].value = imds_info.resourceGroupName 426 parameters[CommonTelemetryEventSchema.VMId].value = imds_info.vmId 427 parameters[CommonTelemetryEventSchema.ImageOrigin].value = int(imds_info.image_origin) 428 except Exception as e: 429 logger.warn("Failed to get IMDS info; will be missing from telemetry: {0}", ustr(e)) 430 431 def save_event(self, data): 432 if self.event_dir is None: 433 logger.warn("Cannot save event -- Event reporter is not initialized.") 434 return 435 436 try: 437 fileutil.mkdir(self.event_dir, mode=0o700) 438 except (IOError, OSError) as e: 439 msg = "Failed to create events folder {0}. Error: {1}".format(self.event_dir, ustr(e)) 440 raise EventError(msg) 441 442 try: 443 existing_events = os.listdir(self.event_dir) 444 if len(existing_events) >= MAX_NUMBER_OF_EVENTS: 445 logger.periodic_warn(logger.EVERY_MINUTE, "[PERIODIC] Too many files under: {0}, current count: {1}, " 446 "removing oldest event files".format(self.event_dir, 447 len(existing_events))) 448 existing_events.sort() 449 oldest_files = existing_events[:-999] 450 for event_file in oldest_files: 451 os.remove(os.path.join(self.event_dir, event_file)) 452 except (IOError, OSError) as e: 453 msg = "Failed to remove old events from events folder {0}. Error: {1}".format(self.event_dir, ustr(e)) 454 raise EventError(msg) 455 456 filename = os.path.join(self.event_dir, 457 ustr(int(time.time() * 1000000))) 458 try: 459 with open(filename + ".tmp", 'wb+') as hfile: 460 hfile.write(data.encode("utf-8")) 461 os.rename(filename + ".tmp", filename + AGENT_EVENT_FILE_EXTENSION) 462 except (IOError, OSError) as e: 463 msg = "Failed to write events to file: {0}".format(e) 464 raise EventError(msg) 465 466 def reset_periodic(self): 467 self.periodic_events = {} 468 469 def is_period_elapsed(self, delta, h): 470 return h not in self.periodic_events or \ 471 (self.periodic_events[h] + delta) <= datetime.now() 472 473 def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, 474 version=str(CURRENT_VERSION), message="", log_event=True, force=False): 475 h = hash(name + op + ustr(is_success) + message) 476 477 if force or self.is_period_elapsed(delta, h): 478 self.add_event(name, op=op, is_success=is_success, duration=duration, 479 version=version, message=message, log_event=log_event) 480 self.periodic_events[h] = datetime.now() 481 482 def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), 483 message="", log_event=True): 484 485 if (not is_success) and log_event: 486 _log_event(name, op, message, duration, is_success=is_success) 487 488 event = TelemetryEvent(TELEMETRY_EVENT_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID) 489 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Name, str(name))) 490 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Version, str(version))) 491 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Operation, str(op))) 492 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.OperationSuccess, bool(is_success))) 493 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Message, str(message))) 494 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration))) 495 self.add_common_event_parameters(event, datetime.utcnow()) 496 497 data = get_properties(event) 498 try: 499 self.save_event(json.dumps(data)) 500 except EventError as e: 501 logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e))) 502 503 def add_log_event(self, level, message): 504 event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID) 505 event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log)) 506 event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level])) 507 event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context1, self._clean_up_message(message))) 508 event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context2, datetime.utcnow().strftime(logger.Logger.LogTimeFormatInUTC))) 509 event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, '')) 510 self.add_common_event_parameters(event, datetime.utcnow()) 511 512 data = get_properties(event) 513 try: 514 self.save_event(json.dumps(data)) 515 except EventError: 516 pass 517 518 def add_metric(self, category, counter, instance, value, log_event=False): 519 """ 520 Create and save an event which contains a telemetry event. 521 522 :param str category: The category of metric (e.g. "cpu", "memory") 523 :param str counter: The specific metric within the category (e.g. "%idle") 524 :param str instance: For instanced metrics, the instance identifier (filesystem name, cpu core#, etc.) 525 :param value: Value of the metric 526 :param bool log_event: If true, log the collected metric in the agent log 527 """ 528 if log_event: 529 message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value) 530 _log_event(AGENT_NAME, "METRIC", message, 0) 531 532 event = TelemetryEvent(TELEMETRY_METRICS_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID) 533 event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Category, str(category))) 534 event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Counter, str(counter))) 535 event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Instance, str(instance))) 536 event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value))) 537 self.add_common_event_parameters(event, datetime.utcnow()) 538 539 data = get_properties(event) 540 try: 541 self.save_event(json.dumps(data)) 542 except EventError as e: 543 logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e))) 544 545 @staticmethod 546 def _clean_up_message(message): 547 # By the time the message has gotten to this point it is formatted as 548 # 549 # Old Time format 550 # YYYY/MM/DD HH:mm:ss.fffffff LEVEL <text>. 551 # YYYY/MM/DD HH:mm:ss.fffffff <text>. 552 # YYYY/MM/DD HH:mm:ss LEVEL <text>. 553 # YYYY/MM/DD HH:mm:ss <text>. 554 # 555 # UTC ISO Time format added in #1716 556 # YYYY-MM-DDTHH:mm:ss.fffffffZ LEVEL <text>. 557 # YYYY-MM-DDTHH:mm:ss.fffffffZ <text>. 558 # YYYY-MM-DDTHH:mm:ssZ LEVEL <text>. 559 # YYYY-MM-DDTHH:mm:ssZ <text>. 560 # 561 # The timestamp and the level are redundant, and should be stripped. The logging library does not schematize 562 # this data, so I am forced to parse the message using a regex. The format is regular, so the burden is low, 563 # and usability on the telemetry side is high. 564 565 if not message: 566 return message 567 568 # Adding two regexs to simplify the handling of logs and to keep it maintainable. Most of the logs would have 569 # level includent in the log itself, but if it doesn't have, the second regex is a catch all case and will work 570 # for all the cases. 571 log_level_format_parser = re.compile(r"^.*(INFO|WARNING|ERROR|VERBOSE)\s*(.*)$") 572 log_format_parser = re.compile(r"^[0-9:/\-TZ\s.]*\s(.*)$") 573 574 # Parsing the log messages containing levels in it 575 extract_level_message = log_level_format_parser.search(message) 576 if extract_level_message: 577 return extract_level_message.group(2) # The message bit 578 else: 579 # Parsing the log messages without levels in it. 580 extract_message = log_format_parser.search(message) 581 if extract_message: 582 return extract_message.group(1) # The message bit 583 else: 584 return message 585 586 def add_common_event_parameters(self, event, event_timestamp): 587 """ 588 This method is called for all events and ensures all telemetry fields are added before the event is sent out. 589 Note that the event timestamp is saved in the OpcodeName field. 590 """ 591 common_params = [TelemetryEventParam(CommonTelemetryEventSchema.GAVersion, CURRENT_AGENT), 592 TelemetryEventParam(CommonTelemetryEventSchema.ContainerId, AgentGlobals.get_container_id()), 593 TelemetryEventParam(CommonTelemetryEventSchema.OpcodeName, event_timestamp.strftime(logger.Logger.LogTimeFormatInUTC)), 594 TelemetryEventParam(CommonTelemetryEventSchema.EventTid, threading.current_thread().ident), 595 TelemetryEventParam(CommonTelemetryEventSchema.EventPid, os.getpid()), 596 TelemetryEventParam(CommonTelemetryEventSchema.TaskName, threading.current_thread().getName()), 597 TelemetryEventParam(CommonTelemetryEventSchema.KeywordName, '')] 598 599 if event.eventId == TELEMETRY_EVENT_EVENT_ID and event.providerId == TELEMETRY_EVENT_PROVIDER_ID: 600 # Currently only the GuestAgentExtensionEvents has these columns, the other tables dont have them so skipping 601 # this data in those tables. 602 common_params.extend([TelemetryEventParam(GuestAgentExtensionEventsSchema.ExtensionType, event.file_type), 603 TelemetryEventParam(GuestAgentExtensionEventsSchema.IsInternal, False)]) 604 605 event.parameters.extend(common_params) 606 event.parameters.extend(self._common_parameters) 607 608 609__event_logger__ = EventLogger() 610 611def get_event_logger(): 612 return __event_logger__ 613 614 615def elapsed_milliseconds(utc_start): 616 now = datetime.utcnow() 617 if now < utc_start: 618 return 0 619 620 d = now - utc_start 621 return int(((d.days * 24 * 60 * 60 + d.seconds) * 1000) + \ 622 (d.microseconds / 1000.0)) 623 624 625def report_event(op, is_success=True, message='', log_event=True): 626 add_event(AGENT_NAME, 627 version=str(CURRENT_VERSION), 628 is_success=is_success, 629 message=message, 630 op=op, 631 log_event=log_event) 632 633 634def report_periodic(delta, op, is_success=True, message=''): 635 add_periodic(delta, AGENT_NAME, 636 version=str(CURRENT_VERSION), 637 is_success=is_success, 638 message=message, 639 op=op) 640 641 642def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__): 643 """ 644 Send a telemetry event reporting a single instance of a performance counter. 645 :param str category: The category of the metric (cpu, memory, etc) 646 :param str counter: The name of the metric ("%idle", etc) 647 :param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core# 648 :param value: The value of the metric 649 :param bool log_event: If True, log the metric in the agent log as well 650 :param EventLogger reporter: The EventLogger instance to which metric events should be sent 651 """ 652 if reporter.event_dir is None: 653 logger.warn("Cannot report metric event -- Event reporter is not initialized.") 654 message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value) 655 _log_event(AGENT_NAME, "METRIC", message, 0) 656 return 657 try: 658 reporter.add_metric(category, counter, instance, float(value), log_event) 659 except ValueError: 660 logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - " 661 "{0}/{1} [{2}] = {3}".format(category, counter, instance, value)) 662 663 664def initialize_event_logger_vminfo_common_parameters(protocol, reporter=__event_logger__): 665 reporter.initialize_vminfo_common_parameters(protocol) 666 667 668def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), 669 message="", log_event=True, reporter=__event_logger__): 670 if reporter.event_dir is None: 671 logger.warn("Cannot add event -- Event reporter is not initialized.") 672 _log_event(name, op, message, duration, is_success=is_success) 673 return 674 675 if should_emit_event(name, version, op, is_success): 676 mark_event_status(name, version, op, is_success) 677 reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version), 678 message=message, 679 log_event=log_event) 680 681 682def add_log_event(level, message, forced=False, reporter=__event_logger__): 683 """ 684 :param level: LoggerLevel of the log event 685 :param message: Message 686 :param forced: Force write the event even if send_logs_to_telemetry() is disabled 687 (NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events) 688 :param reporter: 689 :return: 690 """ 691 if reporter.event_dir is None: 692 return 693 694 if not (forced or send_logs_to_telemetry()): 695 return 696 697 if level >= logger.LogLevel.WARNING: 698 reporter.add_log_event(level, message) 699 700 701def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), 702 message="", log_event=True, force=False, reporter=__event_logger__): 703 if reporter.event_dir is None: 704 logger.warn("Cannot add periodic event -- Event reporter is not initialized.") 705 _log_event(name, op, message, duration, is_success=is_success) 706 return 707 708 reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version), 709 message=message, log_event=log_event, force=force) 710 711 712def mark_event_status(name, version, op, status): 713 if op in __event_status_operations__: 714 __event_status__.mark_event_status(name, version, op, status) 715 716 717def should_emit_event(name, version, op, status): 718 return \ 719 op not in __event_status_operations__ or \ 720 __event_status__ is None or \ 721 not __event_status__.event_marked(name, version, op) or \ 722 __event_status__.event_succeeded(name, version, op) != status 723 724 725def init_event_logger(event_dir): 726 __event_logger__.event_dir = event_dir 727 728 729def init_event_status(status_dir): 730 __event_status__.initialize(status_dir) 731 732 733def dump_unhandled_err(name): 734 if hasattr(sys, 'last_type') and hasattr(sys, 'last_value') and \ 735 hasattr(sys, 'last_traceback'): 736 last_type = getattr(sys, 'last_type') 737 last_value = getattr(sys, 'last_value') 738 last_traceback = getattr(sys, 'last_traceback') 739 error = traceback.format_exception(last_type, last_value, 740 last_traceback) 741 message = "".join(error) 742 add_event(name, is_success=False, message=message, 743 op=WALAEventOperation.UnhandledError) 744 745 746def enable_unhandled_err_dump(name): 747 atexit.register(dump_unhandled_err, name) 748