1# Copyright 2020 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# Requires Python 2.6+ and Openssl 1.0+
16#
17import contextlib
18import json
19import os
20import platform
21import re
22import tempfile
23import time
24import uuid
25from datetime import datetime, timedelta
26
27from mock import MagicMock, Mock, patch, PropertyMock
28
29from azurelinuxagent.common import logger
30from azurelinuxagent.common.datacontract import get_properties
31from azurelinuxagent.common.event import WALAEventOperation, EVENTS_DIRECTORY
32from azurelinuxagent.common.exception import HttpError, ServiceStoppedError
33from azurelinuxagent.common.future import ustr
34from azurelinuxagent.common.osutil.factory import get_osutil
35from azurelinuxagent.common.protocol.util import ProtocolUtil
36from azurelinuxagent.common.protocol.wire import event_to_v1
37from azurelinuxagent.common.telemetryevent import TelemetryEvent, TelemetryEventParam, \
38    GuestAgentExtensionEventsSchema
39from azurelinuxagent.common.utils import restutil, fileutil
40from azurelinuxagent.common.version import CURRENT_VERSION, DISTRO_NAME, DISTRO_VERSION, AGENT_VERSION, CURRENT_AGENT, \
41    DISTRO_CODE_NAME
42from azurelinuxagent.ga.collect_telemetry_events import _CollectAndEnqueueEventsPeriodicOperation
43from azurelinuxagent.ga.send_telemetry_events import get_send_telemetry_events_handler
44from tests.ga.test_monitor import random_generator
45from tests.protocol.mocks import MockHttpResponse, mock_wire_protocol, HttpRequestPredicates
46from tests.protocol.mockwiredata import DATA_FILE
47from tests.tools import AgentTestCase, clear_singleton_instances, mock_sleep
48from tests.utils.event_logger_tools import EventLoggerTools
49
50
51class TestSendTelemetryEventsHandler(AgentTestCase, HttpRequestPredicates):
52    def setUp(self):
53        AgentTestCase.setUp(self)
54        clear_singleton_instances(ProtocolUtil)
55        self.lib_dir = tempfile.mkdtemp()
56        self.event_dir = os.path.join(self.lib_dir, EVENTS_DIRECTORY)
57
58        EventLoggerTools.initialize_event_logger(self.event_dir)
59
60    def tearDown(self):
61        AgentTestCase.tearDown(self)
62        fileutil.rm_dirs(self.lib_dir)
63
64    _TEST_EVENT_PROVIDER_ID = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
65
66    @contextlib.contextmanager
67    def _create_send_telemetry_events_handler(self, timeout=0.5, start_thread=True, batching_queue_limit=1):
68        def http_post_handler(url, body, **__):
69            if self.is_telemetry_request(url):
70                send_telemetry_events_handler.event_calls.append((datetime.now(), body))
71                return MockHttpResponse(status=200)
72            return None
73
74        with mock_wire_protocol(DATA_FILE, http_post_handler=http_post_handler) as protocol:
75            protocol_util = MagicMock()
76            protocol_util.get_protocol = Mock(return_value=protocol)
77            send_telemetry_events_handler = get_send_telemetry_events_handler(protocol_util)
78            send_telemetry_events_handler.event_calls = []
79            with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_EVENTS_TO_BATCH",
80                       batching_queue_limit):
81                with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MAX_TIMEOUT", timeout):
82
83                    send_telemetry_events_handler.get_mock_wire_protocol = lambda: protocol
84                    if start_thread:
85                        send_telemetry_events_handler.start()
86                        self.assertTrue(send_telemetry_events_handler.is_alive(), "Thread didn't start properly!")
87                    yield send_telemetry_events_handler
88
89    @staticmethod
90    def _stop_handler(telemetry_handler, timeout=0.001):
91        # Giving it some grace time to finish execution and then stopping thread
92        time.sleep(timeout)
93        telemetry_handler.stop()
94
95    def _assert_test_data_in_event_body(self, telemetry_handler, test_events):
96        # Stop the thread and Wait for the queue and thread to join
97        TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
98
99        for telemetry_event in test_events:
100            event_str = event_to_v1(telemetry_event)
101            found = False
102            for _, event_body in telemetry_handler.event_calls:
103                if event_str in event_body:
104                    found = True
105                    break
106
107            self.assertTrue(found, "Event {0} not found in any telemetry calls".format(event_str))
108
109    def _assert_error_event_reported(self, mock_add_event, expected_msg, operation=WALAEventOperation.ReportEventErrors):
110        found_msg = False
111        for call_args in mock_add_event.call_args_list:
112            _, kwargs = call_args
113            if expected_msg in kwargs['message'] and kwargs['op'] == operation:
114                found_msg = True
115                break
116        self.assertTrue(found_msg, "Error msg: {0} not reported".format(expected_msg))
117
118    def _setup_and_assert_bad_request_scenarios(self, http_post_handler, expected_msgs):
119        with self._create_send_telemetry_events_handler() as telemetry_handler:
120
121            telemetry_handler.get_mock_wire_protocol().set_http_handlers(http_post_handler=http_post_handler)
122
123            with patch("azurelinuxagent.common.event.add_event") as mock_add_event:
124                telemetry_handler.enqueue_event(TelemetryEvent())
125                TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
126                for msg in expected_msgs:
127                    self._assert_error_event_reported(mock_add_event, msg)
128
129    def test_it_should_send_events_properly(self):
130        events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))]
131
132        with self._create_send_telemetry_events_handler() as telemetry_handler:
133            for test_event in events:
134                telemetry_handler.enqueue_event(test_event)
135
136            self._assert_test_data_in_event_body(telemetry_handler, events)
137
138    def test_it_should_send_as_soon_as_events_available_in_queue_with_minimal_batching_limits(self):
139        events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))]
140
141        with self._create_send_telemetry_events_handler() as telemetry_handler:
142            test_start_time = datetime.now()
143            for test_event in events:
144                telemetry_handler.enqueue_event(test_event)
145
146            self._assert_test_data_in_event_body(telemetry_handler, events)
147
148            # Ensure that we send out the data as soon as we enqueue the events
149            for event_time, _ in telemetry_handler.event_calls:
150                elapsed = event_time - test_start_time
151                self.assertLessEqual(elapsed, timedelta(seconds=2), "Request was not sent as soon as possible")
152
153    def test_thread_should_wait_for_events_to_get_in_queue_before_processing(self):
154        events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))]
155
156        with self._create_send_telemetry_events_handler(timeout=0.1) as telemetry_handler:
157
158            # Do nothing for some time
159            time.sleep(0.3)
160
161            # Ensure that no events were transmitted by the telemetry handler during this time, i.e. telemetry thread was idle
162            self.assertEqual(0, len(telemetry_handler.event_calls), "Unwanted calls to telemetry")
163
164            # Now enqueue data and verify send_telemetry_events sends them asap
165            for test_event in events:
166                telemetry_handler.enqueue_event(test_event)
167
168            self._assert_test_data_in_event_body(telemetry_handler, events)
169
170    def test_it_should_honor_batch_time_limits_before_sending_telemetry(self):
171        events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))]
172        wait_time = timedelta(seconds=10)
173        orig_sleep = time.sleep
174
175        with patch("time.sleep", lambda *_: orig_sleep(0.01)):
176            with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time):
177                with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler:
178                    for test_event in events:
179                        telemetry_handler.enqueue_event(test_event)
180
181                    self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged")
182                    TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01)
183
184        wait_time = timedelta(seconds=0.2)
185        with patch("time.sleep", lambda *_: orig_sleep(0.05)):
186            with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time):
187                with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler:
188                    test_start_time = datetime.now()
189                    for test_event in events:
190                        telemetry_handler.enqueue_event(test_event)
191
192                    while not telemetry_handler.event_calls and (test_start_time + timedelta(seconds=1)) > datetime.now():
193                        # Wait for event calls to be made, wait a max of 1 secs
194                        orig_sleep(0.1)
195
196                    self.assertGreater(len(telemetry_handler.event_calls), 0, "No event calls made at all!")
197                    self._assert_test_data_in_event_body(telemetry_handler, events)
198                    for event_time, _ in telemetry_handler.event_calls:
199                        elapsed = event_time - test_start_time
200                        # Technically we should send out data after 0.2 secs, but keeping a buffer of 1sec while testing
201                        self.assertLessEqual(elapsed, timedelta(seconds=1), "Request was not sent properly")
202
203    def test_it_should_clear_queue_before_stopping(self):
204        events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))]
205        wait_time = timedelta(seconds=10)
206
207        with patch("time.sleep", lambda *_: mock_sleep(0.01)):
208            with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time):
209                with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler:
210                    for test_event in events:
211                        telemetry_handler.enqueue_event(test_event)
212
213                    self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged")
214                    TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01)
215                    # After the service is asked to stop, we should send all data in the queue
216                    self._assert_test_data_in_event_body(telemetry_handler, events)
217
218    def test_it_should_honor_batch_queue_limits_before_sending_telemetry(self):
219
220        batch_limit = 5
221
222        with self._create_send_telemetry_events_handler(batching_queue_limit=batch_limit) as telemetry_handler:
223            events = []
224
225            for _ in range(batch_limit-1):
226                test_event = TelemetryEvent(eventId=ustr(uuid.uuid4()))
227                events.append(test_event)
228                telemetry_handler.enqueue_event(test_event)
229
230            self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged")
231
232            for _ in range(batch_limit):
233                test_event = TelemetryEvent(eventId=ustr(uuid.uuid4()))
234                events.append(test_event)
235                telemetry_handler.enqueue_event(test_event)
236
237            self._assert_test_data_in_event_body(telemetry_handler, events)
238
239    def test_it_should_raise_on_enqueue_if_service_stopped(self):
240        with self._create_send_telemetry_events_handler(start_thread=False) as telemetry_handler:
241            # Ensure the thread is stopped
242            telemetry_handler.stop()
243            with self.assertRaises(ServiceStoppedError) as context_manager:
244                telemetry_handler.enqueue_event(TelemetryEvent(eventId=ustr(uuid.uuid4())))
245
246            exception = context_manager.exception
247            self.assertIn("{0} is stopped, not accepting anymore events".format(telemetry_handler.get_thread_name()),
248                          str(exception))
249
250    def test_it_should_honour_the_incoming_order_of_events(self):
251
252        with self._create_send_telemetry_events_handler(timeout=0.3, start_thread=False) as telemetry_handler:
253            for index in range(5):
254                telemetry_handler.enqueue_event(TelemetryEvent(eventId=index))
255
256            telemetry_handler.start()
257            self.assertTrue(telemetry_handler.is_alive(), "Thread not alive")
258            TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
259            _, event_body = telemetry_handler.event_calls[0]
260            event_orders = re.findall(r'<Event id=\"(\d+)\"><!\[CDATA\[]]></Event>', event_body)
261            self.assertEqual(sorted(event_orders), event_orders, "Events not ordered correctly")
262
263    def test_send_telemetry_events_should_report_event_if_wireserver_returns_http_error(self):
264
265        test_str = "A test exception, Guid: {0}".format(str(uuid.uuid4()))
266
267        def http_post_handler(url, _, **__):
268            if self.is_telemetry_request(url):
269                return HttpError(test_str)
270            return None
271
272        self._setup_and_assert_bad_request_scenarios(http_post_handler, [test_str])
273
274    def test_send_telemetry_events_should_report_event_when_http_post_returning_503(self):
275
276        def http_post_handler(url, _, **__):
277            if self.is_telemetry_request(url):
278                return MockHttpResponse(restutil.httpclient.SERVICE_UNAVAILABLE)
279            return None
280
281        expected_msgs = ["[ProtocolError] [Wireserver Exception] [ProtocolError] [Wireserver Failed]",
282                        "[HTTP Failed] Status Code 503"]
283
284        self._setup_and_assert_bad_request_scenarios(http_post_handler, expected_msgs)
285
286    def test_send_telemetry_events_should_add_event_on_unexpected_errors(self):
287
288        with self._create_send_telemetry_events_handler(timeout=0.1) as telemetry_handler:
289
290            with patch("azurelinuxagent.ga.send_telemetry_events.add_event") as mock_add_event:
291                with patch("azurelinuxagent.common.protocol.wire.WireClient.report_event") as patch_report_event:
292                    test_str = "Test exception, Guid: {0}".format(str(uuid.uuid4()))
293                    patch_report_event.side_effect = Exception(test_str)
294
295                    telemetry_handler.enqueue_event(TelemetryEvent())
296                    TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01)
297
298                    self._assert_error_event_reported(mock_add_event, test_str, operation=WALAEventOperation.UnhandledError)
299
300    def _create_extension_event(self,
301                               size=0,
302                               name="DummyExtension",
303                               message="DummyMessage"):
304        event_data = self._get_event_data(name=size if size != 0 else name,
305                message=random_generator(size) if size != 0 else message)
306        event_file = os.path.join(self.event_dir, "{0}.tld".format(int(time.time() * 1000000)))
307        with open(event_file, 'wb+') as file_descriptor:
308            file_descriptor.write(event_data.encode('utf-8'))
309
310    @staticmethod
311    def _get_event_data(message, name):
312        event = TelemetryEvent(1, TestSendTelemetryEventsHandler._TEST_EVENT_PROVIDER_ID)
313        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Name, name))
314        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Version, str(CURRENT_VERSION)))
315        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Operation, WALAEventOperation.Unknown))
316        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.OperationSuccess, True))
317        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Message, message))
318        event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, 0))
319
320        data = get_properties(event)
321        return json.dumps(data)
322
323    @patch("azurelinuxagent.common.event.TELEMETRY_EVENT_PROVIDER_ID", _TEST_EVENT_PROVIDER_ID)
324    @patch("azurelinuxagent.common.conf.get_lib_dir")
325    def test_it_should_enqueue_and_send_events_properly(self, mock_lib_dir, *_):
326        mock_lib_dir.return_value = self.lib_dir
327
328        with self._create_send_telemetry_events_handler() as telemetry_handler:
329            monitor_handler = _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler)
330            self._create_extension_event(message="Message-Test")
331
332            test_mtime = 1000  # epoch time, in ms
333            test_opcodename = datetime.fromtimestamp(test_mtime).strftime(logger.Logger.LogTimeFormatInUTC)
334            test_eventtid = 42
335            test_eventpid = 24
336            test_taskname = "TEST_TaskName"
337
338            with patch("os.path.getmtime", return_value=test_mtime):
339                with patch('os.getpid', return_value=test_eventpid):
340                    with patch("threading.Thread.ident", new_callable=PropertyMock(return_value=test_eventtid)):
341                        with patch("threading.Thread.getName", return_value=test_taskname):
342                            monitor_handler.run()
343
344            TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
345            # Validating the crafted message by the collect_and_send_events call.
346            self.assertEqual(1, len(telemetry_handler.event_calls), "Only 1 event should be sent")
347
348            _, collected_event = telemetry_handler.event_calls[0]
349
350            # Some of those expected values come from the mock protocol and imds client set up during test initialization
351            osutil = get_osutil()
352            osversion = u"{0}:{1}-{2}-{3}:{4}".format(platform.system(), DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME,
353                                                      platform.release())
354
355            sample_message = '<Event id="1"><![CDATA[' \
356                             '<Param Name="Name" Value="DummyExtension" T="mt:wstr" />' \
357                             '<Param Name="Version" Value="{0}" T="mt:wstr" />' \
358                             '<Param Name="Operation" Value="Unknown" T="mt:wstr" />' \
359                             '<Param Name="OperationSuccess" Value="True" T="mt:bool" />' \
360                             '<Param Name="Message" Value="Message-Test" T="mt:wstr" />' \
361                             '<Param Name="Duration" Value="0" T="mt:uint64" />' \
362                             '<Param Name="GAVersion" Value="{1}" T="mt:wstr" />' \
363                             '<Param Name="ContainerId" Value="c6d5526c-5ac2-4200-b6e2-56f2b70c5ab2" T="mt:wstr" />' \
364                             '<Param Name="OpcodeName" Value="{2}" T="mt:wstr" />' \
365                             '<Param Name="EventTid" Value="{3}" T="mt:uint64" />' \
366                             '<Param Name="EventPid" Value="{4}" T="mt:uint64" />' \
367                             '<Param Name="TaskName" Value="{5}" T="mt:wstr" />' \
368                             '<Param Name="KeywordName" Value="" T="mt:wstr" />' \
369                             '<Param Name="ExtensionType" Value="json" T="mt:wstr" />' \
370                             '<Param Name="IsInternal" Value="False" T="mt:bool" />' \
371                             '<Param Name="OSVersion" Value="{6}" T="mt:wstr" />' \
372                             '<Param Name="ExecutionMode" Value="IAAS" T="mt:wstr" />' \
373                             '<Param Name="RAM" Value="{7}" T="mt:uint64" />' \
374                             '<Param Name="Processors" Value="{8}" T="mt:uint64" />' \
375                             '<Param Name="TenantName" Value="db00a7755a5e4e8a8fe4b19bc3b330c3" T="mt:wstr" />' \
376                             '<Param Name="RoleName" Value="MachineRole" T="mt:wstr" />' \
377                             '<Param Name="RoleInstanceName" Value="b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0" T="mt:wstr" />' \
378                             '<Param Name="Location" Value="uswest" T="mt:wstr" />' \
379                             '<Param Name="SubscriptionId" Value="AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE" T="mt:wstr" />' \
380                             '<Param Name="ResourceGroupName" Value="test-rg" T="mt:wstr" />' \
381                             '<Param Name="VMId" Value="99999999-8888-7777-6666-555555555555" T="mt:wstr" />' \
382                             '<Param Name="ImageOrigin" Value="2468" T="mt:uint64" />' \
383                             ']]></Event>'.format(AGENT_VERSION, CURRENT_AGENT, test_opcodename, test_eventtid,
384                                                  test_eventpid, test_taskname, osversion, int(osutil.get_total_mem()),
385                                                  osutil.get_processor_cores())
386
387            self.assertIn(sample_message, collected_event)
388
389    @patch("azurelinuxagent.common.conf.get_lib_dir")
390    def test_collect_and_send_events_with_small_events(self, mock_lib_dir):
391        mock_lib_dir.return_value = self.lib_dir
392
393        with self._create_send_telemetry_events_handler() as telemetry_handler:
394            sizes = [15, 15, 15, 15]  # get the powers of 2 - 2**16 is the limit
395
396            for power in sizes:
397                size = 2 ** power
398                self._create_extension_event(size)
399
400            _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler).run()
401
402            # The send_event call would be called each time, as we are filling up the buffer up to the brim for each call.
403            TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
404            self.assertEqual(4, len(telemetry_handler.event_calls))
405
406    @patch("azurelinuxagent.common.conf.get_lib_dir")
407    def test_collect_and_send_events_with_large_events(self, mock_lib_dir):
408        mock_lib_dir.return_value = self.lib_dir
409
410        with self._create_send_telemetry_events_handler() as telemetry_handler:
411            sizes = [17, 17, 17]  # get the powers of 2
412
413            for power in sizes:
414                size = 2 ** power
415                self._create_extension_event(size)
416
417            with patch("azurelinuxagent.common.logger.periodic_warn") as patch_periodic_warn:
418                _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler).run()
419                TestSendTelemetryEventsHandler._stop_handler(telemetry_handler)
420                self.assertEqual(3, patch_periodic_warn.call_count)
421
422                # The send_event call should never be called as the events are larger than 2**16.
423                self.assertEqual(0, len(telemetry_handler.event_calls))