1# Copyright 2018 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# Requires Python 2.6+ and Openssl 1.0+
16#
17
18import atexit
19import json
20import os
21import platform
22import re
23import sys
24import threading
25import time
26import traceback
27from datetime import datetime
28
29import azurelinuxagent.common.conf as conf
30import azurelinuxagent.common.logger as logger
31from azurelinuxagent.common.AgentGlobals import AgentGlobals
32from azurelinuxagent.common.exception import EventError, OSUtilError
33from azurelinuxagent.common.future import ustr
34from azurelinuxagent.common.datacontract import get_properties, set_properties
35from azurelinuxagent.common.osutil import get_osutil
36from azurelinuxagent.common.telemetryevent import TelemetryEventParam, TelemetryEvent, CommonTelemetryEventSchema, \
37    GuestAgentGenericLogsSchema, GuestAgentExtensionEventsSchema, GuestAgentPerfCounterEventsSchema
38from azurelinuxagent.common.utils import fileutil, textutil
39from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib
40from azurelinuxagent.common.version import CURRENT_VERSION, CURRENT_AGENT, AGENT_NAME, DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, AGENT_EXECUTION_MODE
41from azurelinuxagent.common.protocol.imds import get_imds_client
42
43EVENTS_DIRECTORY = "events"
44
45_EVENT_MSG = "Event: name={0}, op={1}, message={2}, duration={3}"
46TELEMETRY_EVENT_PROVIDER_ID = "69B669B9-4AF8-4C50-BDC4-6006FA76E975"
47TELEMETRY_EVENT_EVENT_ID = 1
48TELEMETRY_METRICS_EVENT_ID = 4
49
50TELEMETRY_LOG_PROVIDER_ID = "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F"
51TELEMETRY_LOG_EVENT_ID = 7
52
53#
54# When this flag is enabled the TODO comment in Logger.log() needs to be addressed; also the tests
55# marked with "Enable this test when SEND_LOGS_TO_TELEMETRY is enabled" should be enabled.
56#
57SEND_LOGS_TO_TELEMETRY = False
58
59MAX_NUMBER_OF_EVENTS = 1000
60
61AGENT_EVENT_FILE_EXTENSION = '.waagent.tld'
62EVENT_FILE_REGEX = re.compile(r'(?P<agent_event>\.waagent)?\.tld$')
63
64def send_logs_to_telemetry():
65    return SEND_LOGS_TO_TELEMETRY
66
67
68class WALAEventOperation:  # pylint: disable=no-init
69    ActivateResourceDisk = "ActivateResourceDisk"
70    AgentBlacklisted = "AgentBlacklisted"
71    AgentEnabled = "AgentEnabled"
72    ArtifactsProfileBlob = "ArtifactsProfileBlob"
73    CGroupsCleanUp = "CGroupsCleanUp"
74    CGroupsDebug = "CGroupsDebug"
75    CGroupsDisabled = "CGroupsDisabled"
76    CGroupsInfo = "CGroupsInfo"
77    CGroupsInitialize = "CGroupsInitialize"
78    CGroupsLimitsCrossed = "CGroupsLimitsCrossed"
79    CollectEventErrors = "CollectEventErrors"
80    CollectEventUnicodeErrors = "CollectEventUnicodeErrors"
81    ConfigurationChange = "ConfigurationChange"
82    CustomData = "CustomData"
83    DefaultChannelChange = "DefaultChannelChange"
84    Deploy = "Deploy"
85    Disable = "Disable"
86    Downgrade = "Downgrade"
87    Download = "Download"
88    Enable = "Enable"
89    ExtensionProcessing = "ExtensionProcessing"
90    ExtensionTelemetryEventProcessing = "ExtensionTelemetryEventProcessing"
91    FetchGoalState = "FetchGoalState"
92    Firewall = "Firewall"
93    HealthCheck = "HealthCheck"
94    HealthObservation = "HealthObservation"
95    HeartBeat = "HeartBeat"
96    HostPlugin = "HostPlugin"
97    HostPluginHeartbeat = "HostPluginHeartbeat"
98    HostPluginHeartbeatExtended = "HostPluginHeartbeatExtended"
99    HttpErrors = "HttpErrors"
100    ImdsHeartbeat = "ImdsHeartbeat"
101    Install = "Install"
102    InitializeHostPlugin = "InitializeHostPlugin"
103    Log = "Log"
104    LogCollection = "LogCollection"
105    OSInfo = "OSInfo"
106    Partition = "Partition"
107    PersistFirewallRules = "PersistFirewallRules"
108    PluginSettingsVersionMismatch = "PluginSettingsVersionMismatch"
109    InvalidExtensionConfig = "InvalidExtensionConfig"
110    ProcessGoalState = "ProcessGoalState"
111    Provision = "Provision"
112    ProvisionGuestAgent = "ProvisionGuestAgent"
113    RemoteAccessHandling = "RemoteAccessHandling"
114    ReportEventErrors = "ReportEventErrors"
115    ReportEventUnicodeErrors = "ReportEventUnicodeErrors"
116    ReportStatus = "ReportStatus"
117    ReportStatusExtended = "ReportStatusExtended"
118    Restart = "Restart"
119    SequenceNumberMismatch = "SequenceNumberMismatch"
120    SetCGroupsLimits = "SetCGroupsLimits"
121    SkipUpdate = "SkipUpdate"
122    StatusProcessing = "StatusProcessing"
123    UnhandledError = "UnhandledError"
124    UnInstall = "UnInstall"
125    Unknown = "Unknown"
126    Upgrade = "Upgrade"
127    Update = "Update"
128
129
130SHOULD_ENCODE_MESSAGE_LEN = 80
131SHOULD_ENCODE_MESSAGE_OP = [
132    WALAEventOperation.Disable,
133    WALAEventOperation.Enable,
134    WALAEventOperation.Install,
135    WALAEventOperation.UnInstall,
136]
137
138
139class EventStatus(object):
140    EVENT_STATUS_FILE = "event_status.json"
141
142    def __init__(self):
143        self._path = None
144        self._status = {}
145
146    def clear(self):
147        self._status = {}
148        self._save()
149
150    def event_marked(self, name, version, op):
151        return self._event_name(name, version, op) in self._status
152
153    def event_succeeded(self, name, version, op):
154        event = self._event_name(name, version, op)
155        if event not in self._status:
156            return True
157        return self._status[event] is True
158
159    def initialize(self, status_dir=conf.get_lib_dir()):
160        self._path = os.path.join(status_dir, EventStatus.EVENT_STATUS_FILE)
161        self._load()
162
163    def mark_event_status(self, name, version, op, status):
164        event = self._event_name(name, version, op)
165        self._status[event] = (status is True)
166        self._save()
167
168    def _event_name(self, name, version, op):
169        return "{0}-{1}-{2}".format(name, version, op)
170
171    def _load(self):
172        try:
173            self._status = {}
174            if os.path.isfile(self._path):
175                with open(self._path, 'r') as f:
176                    self._status = json.load(f)
177        except Exception as e:
178            logger.warn("Exception occurred loading event status: {0}".format(e))
179            self._status = {}
180
181    def _save(self):
182        try:
183            with open(self._path, 'w') as f:
184                json.dump(self._status, f)
185        except Exception as e:
186            logger.warn("Exception occurred saving event status: {0}".format(e))
187
188
189__event_status__ = EventStatus()
190__event_status_operations__ = [
191        WALAEventOperation.ReportStatus
192    ]
193
194
195def parse_json_event(data_str):
196    data = json.loads(data_str)
197    event = TelemetryEvent()
198    set_properties("TelemetryEvent", event, data)
199    event.file_type = "json"
200    return event
201
202
203def parse_event(data_str):
204    try:
205        try:
206            return parse_json_event(data_str)
207        except ValueError:
208            return parse_xml_event(data_str)
209    except Exception as e:
210        raise EventError("Error parsing event: {0}".format(ustr(e)))
211
212
213def parse_xml_param(param_node):
214    name = getattrib(param_node, "Name")
215    value_str = getattrib(param_node, "Value")
216    attr_type = getattrib(param_node, "T")
217    value = value_str
218    if attr_type == 'mt:uint64':
219        value = int(value_str)
220    elif attr_type == 'mt:bool':
221        value = bool(value_str)
222    elif attr_type == 'mt:float64':
223        value = float(value_str)
224    return TelemetryEventParam(name, value)
225
226
227def parse_xml_event(data_str):
228    try:
229        xml_doc = parse_doc(data_str)
230        event_id = getattrib(find(xml_doc, "Event"), 'id')
231        provider_id = getattrib(find(xml_doc, "Provider"), 'id')
232        event = TelemetryEvent(event_id, provider_id)
233        param_nodes = findall(xml_doc, 'Param')
234        for param_node in param_nodes:
235            event.parameters.append(parse_xml_param(param_node))
236        event.file_type = "xml"
237        return event
238    except Exception as e:
239        raise ValueError(ustr(e))
240
241
242def _encode_message(op, message):
243    """
244    Gzip and base64 encode a message based on the operation.
245
246    The intent of this message is to make the logs human readable and include the
247    stdout/stderr from extension operations.  Extension operations tend to generate
248    a lot of noise, which makes it difficult to parse the line-oriented waagent.log.
249    The compromise is to encode the stdout/stderr so we preserve the data and do
250    not destroy the line oriented nature.
251
252    The data can be recovered using the following command:
253
254      $ echo '<encoded data>' | base64 -d | pigz -zd
255
256    You may need to install the pigz command.
257
258    :param op: Operation, e.g. Enable or Install
259    :param message: Message to encode
260    :return: gzip'ed and base64 encoded message, or the original message
261    """
262
263    if len(message) == 0:
264        return message
265
266    if op not in SHOULD_ENCODE_MESSAGE_OP:
267        return message
268
269    try:
270        return textutil.compress(message)
271    except Exception:
272        # If the message could not be encoded a dummy message ('<>') is returned.
273        # The original message was still sent via telemetry, so all is not lost.
274        return "<>"
275
276
277def _log_event(name, op, message, duration, is_success=True):
278    global _EVENT_MSG  # pylint: disable=W0603
279
280    message = _encode_message(op, message)
281    if not is_success:
282        logger.error(_EVENT_MSG, name, op, message, duration)
283    else:
284        logger.info(_EVENT_MSG, name, op, message, duration)
285
286
287class CollectOrReportEventDebugInfo(object):
288    """
289    This class is used for capturing and reporting debug info that is captured during event collection and
290    reporting to wireserver.
291    It captures the count of unicode errors and any unexpected errors and also a subset of errors with stacks to help
292    with debugging any potential issues.
293    """
294    __MAX_ERRORS_TO_REPORT = 5
295    OP_REPORT = "Report"
296    OP_COLLECT = "Collect"
297
298    def __init__(self, operation=OP_REPORT):
299        self.__unicode_error_count = 0
300        self.__unicode_errors = set()
301        self.__op_error_count = 0
302        self.__op_errors = set()
303
304        if operation == self.OP_REPORT:
305            self.__unicode_error_event = WALAEventOperation.ReportEventUnicodeErrors
306            self.__op_errors_event = WALAEventOperation.ReportEventErrors
307        elif operation == self.OP_COLLECT:
308            self.__unicode_error_event = WALAEventOperation.CollectEventUnicodeErrors
309            self.__op_errors_event = WALAEventOperation.CollectEventErrors
310
311    def report_debug_info(self):
312
313        def report_dropped_events_error(count, errors, operation_name):
314            err_msg_format = "DroppedEventsCount: {0}\nReasons (first {1} errors): {2}"
315            if count > 0:
316                add_event(op=operation_name,
317                          message=err_msg_format.format(count, CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT, ', '.join(errors)),
318                          is_success=False)
319
320        report_dropped_events_error(self.__op_error_count, self.__op_errors, self.__op_errors_event)
321        report_dropped_events_error(self.__unicode_error_count, self.__unicode_errors, self.__unicode_error_event)
322
323    @staticmethod
324    def _update_errors_and_get_count(error_count, errors, error):
325        error_count += 1
326        if len(errors) < CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT:
327            errors.add("{0}: {1}".format(ustr(error), traceback.format_exc()))
328        return error_count
329
330    def update_unicode_error(self, unicode_err):
331        self.__unicode_error_count = self._update_errors_and_get_count(self.__unicode_error_count, self.__unicode_errors,
332                                                                       unicode_err)
333
334    def update_op_error(self, op_err):
335        self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err)
336
337
338class EventLogger(object):
339    def __init__(self):
340        self.event_dir = None
341        self.periodic_events = {}
342
343        #
344        # All events should have these parameters.
345        #
346        # The first set comes from the current OS and is initialized here. These values don't change during
347        # the agent's lifetime.
348        #
349        # The next two sets come from the goal state and IMDS and must be explicitly initialized using
350        # initialize_vminfo_common_parameters() once a protocol for communication with the host has been
351        # created. Their values  don't change during the agent's lifetime. Note that we initialize these
352        # parameters here using dummy values (*_UNINITIALIZED) since events sent to the host should always
353        # match the schema defined for them in the telemetry pipeline.
354        #
355        # There is another set of common parameters that must be computed at the time the event is created
356        # (e.g. the timestamp and the container ID); those are added to events (along with the parameters
357        # below) in _add_common_event_parameters()
358        #
359        # Note that different kinds of events may also include other parameters; those are added by the
360        # corresponding add_* method (e.g. add_metric for performance metrics).
361        #
362        self._common_parameters = []
363
364        # Parameters from OS
365        osutil = get_osutil()
366        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.OSVersion, EventLogger._get_os_version()))
367        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ExecutionMode, AGENT_EXECUTION_MODE))
368        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RAM, int(EventLogger._get_ram(osutil))))
369        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Processors, int(EventLogger._get_processors(osutil))))
370
371        # Parameters from goal state
372        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.TenantName, "TenantName_UNINITIALIZED"))
373        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleName, "RoleName_UNINITIALIZED"))
374        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleInstanceName, "RoleInstanceName_UNINITIALIZED"))
375        #
376        # # Parameters from IMDS
377        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Location, "Location_UNINITIALIZED"))
378        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.SubscriptionId, "SubscriptionId_UNINITIALIZED"))
379        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ResourceGroupName, "ResourceGroupName_UNINITIALIZED"))
380        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.VMId, "VMId_UNINITIALIZED"))
381        self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ImageOrigin, 0))
382
383    @staticmethod
384    def _get_os_version():
385        return "{0}:{1}-{2}-{3}:{4}".format(platform.system(), DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, platform.release())
386
387    @staticmethod
388    def _get_ram(osutil):
389        try:
390            return osutil.get_total_mem()
391        except OSUtilError as e:
392            logger.warn("Failed to get RAM info; will be missing from telemetry: {0}", ustr(e))
393        return 0
394
395    @staticmethod
396    def _get_processors(osutil):
397        try:
398            return osutil.get_processor_cores()
399        except OSUtilError as e:
400            logger.warn("Failed to get Processors info; will be missing from telemetry: {0}", ustr(e))
401        return 0
402
403    def initialize_vminfo_common_parameters(self, protocol):
404        """
405        Initializes the common parameters that come from the goal state and IMDS
406        """
407        # create an index of the event parameters for faster updates
408        parameters = {}
409        for p in self._common_parameters:
410            parameters[p.name] = p
411
412        try:
413            vminfo = protocol.get_vminfo()
414            parameters[CommonTelemetryEventSchema.TenantName].value = vminfo.tenantName
415            parameters[CommonTelemetryEventSchema.RoleName].value = vminfo.roleName
416            parameters[CommonTelemetryEventSchema.RoleInstanceName].value = vminfo.roleInstanceName
417        except Exception as e:
418            logger.warn("Failed to get VM info from goal state; will be missing from telemetry: {0}", ustr(e))
419
420        try:
421            imds_client = get_imds_client(protocol.get_endpoint())
422            imds_info = imds_client.get_compute()
423            parameters[CommonTelemetryEventSchema.Location].value = imds_info.location
424            parameters[CommonTelemetryEventSchema.SubscriptionId].value = imds_info.subscriptionId
425            parameters[CommonTelemetryEventSchema.ResourceGroupName].value = imds_info.resourceGroupName
426            parameters[CommonTelemetryEventSchema.VMId].value = imds_info.vmId
427            parameters[CommonTelemetryEventSchema.ImageOrigin].value = int(imds_info.image_origin)
428        except Exception as e:
429            logger.warn("Failed to get IMDS info; will be missing from telemetry: {0}", ustr(e))
430
431    def save_event(self, data):
432        if self.event_dir is None:
433            logger.warn("Cannot save event -- Event reporter is not initialized.")
434            return
435
436        try:
437            fileutil.mkdir(self.event_dir, mode=0o700)
438        except (IOError, OSError) as e:
439            msg = "Failed to create events folder {0}. Error: {1}".format(self.event_dir, ustr(e))
440            raise EventError(msg)
441
442        try:
443            existing_events = os.listdir(self.event_dir)
444            if len(existing_events) >= MAX_NUMBER_OF_EVENTS:
445                logger.periodic_warn(logger.EVERY_MINUTE, "[PERIODIC] Too many files under: {0}, current count:  {1}, "
446                                                          "removing oldest event files".format(self.event_dir,
447                                                                                               len(existing_events)))
448                existing_events.sort()
449                oldest_files = existing_events[:-999]
450                for event_file in oldest_files:
451                    os.remove(os.path.join(self.event_dir, event_file))
452        except (IOError, OSError) as e:
453            msg = "Failed to remove old events from events folder {0}. Error: {1}".format(self.event_dir, ustr(e))
454            raise EventError(msg)
455
456        filename = os.path.join(self.event_dir,
457                                ustr(int(time.time() * 1000000)))
458        try:
459            with open(filename + ".tmp", 'wb+') as hfile:
460                hfile.write(data.encode("utf-8"))
461            os.rename(filename + ".tmp", filename + AGENT_EVENT_FILE_EXTENSION)
462        except (IOError, OSError) as e:
463            msg = "Failed to write events to file: {0}".format(e)
464            raise EventError(msg)
465
466    def reset_periodic(self):
467        self.periodic_events = {}
468
469    def is_period_elapsed(self, delta, h):
470        return h not in self.periodic_events or \
471            (self.periodic_events[h] + delta) <= datetime.now()
472
473    def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0,
474                     version=str(CURRENT_VERSION), message="", log_event=True, force=False):
475        h = hash(name + op + ustr(is_success) + message)
476
477        if force or self.is_period_elapsed(delta, h):
478            self.add_event(name, op=op, is_success=is_success, duration=duration,
479                           version=version, message=message, log_event=log_event)
480            self.periodic_events[h] = datetime.now()
481
482    def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
483                  message="", log_event=True):
484
485        if (not is_success) and log_event:
486            _log_event(name, op, message, duration, is_success=is_success)
487
488        event = TelemetryEvent(TELEMETRY_EVENT_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
489        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Name, str(name)))
490        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Version, str(version)))
491        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Operation, str(op)))
492        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.OperationSuccess, bool(is_success)))
493        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Message, str(message)))
494        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
495        self.add_common_event_parameters(event, datetime.utcnow())
496
497        data = get_properties(event)
498        try:
499            self.save_event(json.dumps(data))
500        except EventError as e:
501            logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
502
503    def add_log_event(self, level, message):
504        event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
505        event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log))
506        event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level]))
507        event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context1, self._clean_up_message(message)))
508        event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context2, datetime.utcnow().strftime(logger.Logger.LogTimeFormatInUTC)))
509        event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
510        self.add_common_event_parameters(event, datetime.utcnow())
511
512        data = get_properties(event)
513        try:
514            self.save_event(json.dumps(data))
515        except EventError:
516            pass
517
518    def add_metric(self, category, counter, instance, value, log_event=False):
519        """
520        Create and save an event which contains a telemetry event.
521
522        :param str category: The category of metric (e.g. "cpu", "memory")
523        :param str counter: The specific metric within the category (e.g. "%idle")
524        :param str instance: For instanced metrics, the instance identifier (filesystem name, cpu core#, etc.)
525        :param value: Value of the metric
526        :param bool log_event: If true, log the collected metric in the agent log
527        """
528        if log_event:
529            message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
530            _log_event(AGENT_NAME, "METRIC", message, 0)
531
532        event = TelemetryEvent(TELEMETRY_METRICS_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
533        event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Category, str(category)))
534        event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Counter, str(counter)))
535        event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Instance, str(instance)))
536        event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
537        self.add_common_event_parameters(event, datetime.utcnow())
538
539        data = get_properties(event)
540        try:
541            self.save_event(json.dumps(data))
542        except EventError as e:
543            logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
544
545    @staticmethod
546    def _clean_up_message(message):
547        # By the time the message has gotten to this point it is formatted as
548        #
549        #   Old Time format
550        #   YYYY/MM/DD HH:mm:ss.fffffff LEVEL <text>.
551        #   YYYY/MM/DD HH:mm:ss.fffffff <text>.
552        #   YYYY/MM/DD HH:mm:ss LEVEL <text>.
553        #   YYYY/MM/DD HH:mm:ss <text>.
554        #
555        #   UTC ISO Time format added in #1716
556        #   YYYY-MM-DDTHH:mm:ss.fffffffZ LEVEL <text>.
557        #   YYYY-MM-DDTHH:mm:ss.fffffffZ <text>.
558        #   YYYY-MM-DDTHH:mm:ssZ LEVEL <text>.
559        #   YYYY-MM-DDTHH:mm:ssZ <text>.
560        #
561        # The timestamp and the level are redundant, and should be stripped. The logging library does not schematize
562        # this data, so I am forced to parse the message using a regex.  The format is regular, so the burden is low,
563        # and usability on the telemetry side is high.
564
565        if not message:
566            return message
567
568        # Adding two regexs to simplify the handling of logs and to keep it maintainable. Most of the logs would have
569        # level includent in the log itself, but if it doesn't have, the second regex is a catch all case and will work
570        # for all the cases.
571        log_level_format_parser = re.compile(r"^.*(INFO|WARNING|ERROR|VERBOSE)\s*(.*)$")
572        log_format_parser = re.compile(r"^[0-9:/\-TZ\s.]*\s(.*)$")
573
574        # Parsing the log messages containing levels in it
575        extract_level_message = log_level_format_parser.search(message)
576        if extract_level_message:
577            return extract_level_message.group(2)  # The message bit
578        else:
579            # Parsing the log messages without levels in it.
580            extract_message = log_format_parser.search(message)
581            if extract_message:
582                return extract_message.group(1)  # The message bit
583            else:
584                return message
585
586    def add_common_event_parameters(self, event, event_timestamp):
587        """
588        This method is called for all events and ensures all telemetry fields are added before the event is sent out.
589        Note that the event timestamp is saved in the OpcodeName field.
590        """
591        common_params = [TelemetryEventParam(CommonTelemetryEventSchema.GAVersion, CURRENT_AGENT),
592                         TelemetryEventParam(CommonTelemetryEventSchema.ContainerId, AgentGlobals.get_container_id()),
593                         TelemetryEventParam(CommonTelemetryEventSchema.OpcodeName, event_timestamp.strftime(logger.Logger.LogTimeFormatInUTC)),
594                         TelemetryEventParam(CommonTelemetryEventSchema.EventTid, threading.current_thread().ident),
595                         TelemetryEventParam(CommonTelemetryEventSchema.EventPid, os.getpid()),
596                         TelemetryEventParam(CommonTelemetryEventSchema.TaskName, threading.current_thread().getName()),
597                         TelemetryEventParam(CommonTelemetryEventSchema.KeywordName, '')]
598
599        if event.eventId == TELEMETRY_EVENT_EVENT_ID and event.providerId == TELEMETRY_EVENT_PROVIDER_ID:
600            # Currently only the GuestAgentExtensionEvents has these columns, the other tables dont have them so skipping
601            # this data in those tables.
602            common_params.extend([TelemetryEventParam(GuestAgentExtensionEventsSchema.ExtensionType, event.file_type),
603                         TelemetryEventParam(GuestAgentExtensionEventsSchema.IsInternal, False)])
604
605        event.parameters.extend(common_params)
606        event.parameters.extend(self._common_parameters)
607
608
609__event_logger__ = EventLogger()
610
611def get_event_logger():
612    return __event_logger__
613
614
615def elapsed_milliseconds(utc_start):
616    now = datetime.utcnow()
617    if now < utc_start:
618        return 0
619
620    d = now - utc_start
621    return int(((d.days * 24 * 60 * 60 + d.seconds) * 1000) + \
622                    (d.microseconds / 1000.0))
623
624
625def report_event(op, is_success=True, message='', log_event=True):
626    add_event(AGENT_NAME,
627              version=str(CURRENT_VERSION),
628              is_success=is_success,
629              message=message,
630              op=op,
631              log_event=log_event)
632
633
634def report_periodic(delta, op, is_success=True, message=''):
635    add_periodic(delta, AGENT_NAME,
636                 version=str(CURRENT_VERSION),
637                 is_success=is_success,
638                 message=message,
639                 op=op)
640
641
642def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__):
643    """
644    Send a telemetry event reporting a single instance of a performance counter.
645    :param str category: The category of the metric (cpu, memory, etc)
646    :param str counter: The name of the metric ("%idle", etc)
647    :param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core#
648    :param     value: The value of the metric
649    :param bool log_event: If True, log the metric in the agent log as well
650    :param EventLogger reporter: The EventLogger instance to which metric events should be sent
651    """
652    if reporter.event_dir is None:
653        logger.warn("Cannot report metric event -- Event reporter is not initialized.")
654        message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
655        _log_event(AGENT_NAME, "METRIC", message, 0)
656        return
657    try:
658        reporter.add_metric(category, counter, instance, float(value), log_event)
659    except ValueError:
660        logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - "
661                                                     "{0}/{1} [{2}] = {3}".format(category, counter, instance, value))
662
663
664def initialize_event_logger_vminfo_common_parameters(protocol, reporter=__event_logger__):
665    reporter.initialize_vminfo_common_parameters(protocol)
666
667
668def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
669              message="", log_event=True, reporter=__event_logger__):
670    if reporter.event_dir is None:
671        logger.warn("Cannot add event -- Event reporter is not initialized.")
672        _log_event(name, op, message, duration, is_success=is_success)
673        return
674
675    if should_emit_event(name, version, op, is_success):
676        mark_event_status(name, version, op, is_success)
677        reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
678                           message=message,
679                           log_event=log_event)
680
681
682def add_log_event(level, message, forced=False, reporter=__event_logger__):
683    """
684    :param level: LoggerLevel of the log event
685    :param message: Message
686    :param forced: Force write the event even if send_logs_to_telemetry() is disabled
687        (NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
688    :param reporter:
689    :return:
690    """
691    if reporter.event_dir is None:
692        return
693
694    if not (forced or send_logs_to_telemetry()):
695        return
696
697    if level >= logger.LogLevel.WARNING:
698        reporter.add_log_event(level, message)
699
700
701def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
702                 message="", log_event=True, force=False, reporter=__event_logger__):
703    if reporter.event_dir is None:
704        logger.warn("Cannot add periodic event -- Event reporter is not initialized.")
705        _log_event(name, op, message, duration, is_success=is_success)
706        return
707
708    reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version),
709                          message=message, log_event=log_event, force=force)
710
711
712def mark_event_status(name, version, op, status):
713    if op in __event_status_operations__:
714        __event_status__.mark_event_status(name, version, op, status)
715
716
717def should_emit_event(name, version, op, status):
718    return \
719        op not in __event_status_operations__ or \
720        __event_status__ is None or \
721        not __event_status__.event_marked(name, version, op) or \
722        __event_status__.event_succeeded(name, version, op) != status
723
724
725def init_event_logger(event_dir):
726    __event_logger__.event_dir = event_dir
727
728
729def init_event_status(status_dir):
730    __event_status__.initialize(status_dir)
731
732
733def dump_unhandled_err(name):
734    if hasattr(sys, 'last_type') and hasattr(sys, 'last_value') and \
735            hasattr(sys, 'last_traceback'):
736        last_type = getattr(sys, 'last_type')
737        last_value = getattr(sys, 'last_value')
738        last_traceback = getattr(sys, 'last_traceback')
739        error = traceback.format_exception(last_type, last_value,
740                                           last_traceback)
741        message = "".join(error)
742        add_event(name, is_success=False, message=message,
743                  op=WALAEventOperation.UnhandledError)
744
745
746def enable_unhandled_err_dump(name):
747    atexit.register(dump_unhandled_err, name)
748