1# Copyright 2020 Microsoft Corporation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# Requires Python 2.6+ and Openssl 1.0+ 16# 17import contextlib 18import json 19import os 20import platform 21import re 22import tempfile 23import time 24import uuid 25from datetime import datetime, timedelta 26 27from mock import MagicMock, Mock, patch, PropertyMock 28 29from azurelinuxagent.common import logger 30from azurelinuxagent.common.datacontract import get_properties 31from azurelinuxagent.common.event import WALAEventOperation, EVENTS_DIRECTORY 32from azurelinuxagent.common.exception import HttpError, ServiceStoppedError 33from azurelinuxagent.common.future import ustr 34from azurelinuxagent.common.osutil.factory import get_osutil 35from azurelinuxagent.common.protocol.util import ProtocolUtil 36from azurelinuxagent.common.protocol.wire import event_to_v1 37from azurelinuxagent.common.telemetryevent import TelemetryEvent, TelemetryEventParam, \ 38 GuestAgentExtensionEventsSchema 39from azurelinuxagent.common.utils import restutil, fileutil 40from azurelinuxagent.common.version import CURRENT_VERSION, DISTRO_NAME, DISTRO_VERSION, AGENT_VERSION, CURRENT_AGENT, \ 41 DISTRO_CODE_NAME 42from azurelinuxagent.ga.collect_telemetry_events import _CollectAndEnqueueEventsPeriodicOperation 43from azurelinuxagent.ga.send_telemetry_events import get_send_telemetry_events_handler 44from tests.ga.test_monitor import random_generator 45from tests.protocol.mocks import MockHttpResponse, mock_wire_protocol, HttpRequestPredicates 46from tests.protocol.mockwiredata import DATA_FILE 47from tests.tools import AgentTestCase, clear_singleton_instances, mock_sleep 48from tests.utils.event_logger_tools import EventLoggerTools 49 50 51class TestSendTelemetryEventsHandler(AgentTestCase, HttpRequestPredicates): 52 def setUp(self): 53 AgentTestCase.setUp(self) 54 clear_singleton_instances(ProtocolUtil) 55 self.lib_dir = tempfile.mkdtemp() 56 self.event_dir = os.path.join(self.lib_dir, EVENTS_DIRECTORY) 57 58 EventLoggerTools.initialize_event_logger(self.event_dir) 59 60 def tearDown(self): 61 AgentTestCase.tearDown(self) 62 fileutil.rm_dirs(self.lib_dir) 63 64 _TEST_EVENT_PROVIDER_ID = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" 65 66 @contextlib.contextmanager 67 def _create_send_telemetry_events_handler(self, timeout=0.5, start_thread=True, batching_queue_limit=1): 68 def http_post_handler(url, body, **__): 69 if self.is_telemetry_request(url): 70 send_telemetry_events_handler.event_calls.append((datetime.now(), body)) 71 return MockHttpResponse(status=200) 72 return None 73 74 with mock_wire_protocol(DATA_FILE, http_post_handler=http_post_handler) as protocol: 75 protocol_util = MagicMock() 76 protocol_util.get_protocol = Mock(return_value=protocol) 77 send_telemetry_events_handler = get_send_telemetry_events_handler(protocol_util) 78 send_telemetry_events_handler.event_calls = [] 79 with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_EVENTS_TO_BATCH", 80 batching_queue_limit): 81 with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MAX_TIMEOUT", timeout): 82 83 send_telemetry_events_handler.get_mock_wire_protocol = lambda: protocol 84 if start_thread: 85 send_telemetry_events_handler.start() 86 self.assertTrue(send_telemetry_events_handler.is_alive(), "Thread didn't start properly!") 87 yield send_telemetry_events_handler 88 89 @staticmethod 90 def _stop_handler(telemetry_handler, timeout=0.001): 91 # Giving it some grace time to finish execution and then stopping thread 92 time.sleep(timeout) 93 telemetry_handler.stop() 94 95 def _assert_test_data_in_event_body(self, telemetry_handler, test_events): 96 # Stop the thread and Wait for the queue and thread to join 97 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 98 99 for telemetry_event in test_events: 100 event_str = event_to_v1(telemetry_event) 101 found = False 102 for _, event_body in telemetry_handler.event_calls: 103 if event_str in event_body: 104 found = True 105 break 106 107 self.assertTrue(found, "Event {0} not found in any telemetry calls".format(event_str)) 108 109 def _assert_error_event_reported(self, mock_add_event, expected_msg, operation=WALAEventOperation.ReportEventErrors): 110 found_msg = False 111 for call_args in mock_add_event.call_args_list: 112 _, kwargs = call_args 113 if expected_msg in kwargs['message'] and kwargs['op'] == operation: 114 found_msg = True 115 break 116 self.assertTrue(found_msg, "Error msg: {0} not reported".format(expected_msg)) 117 118 def _setup_and_assert_bad_request_scenarios(self, http_post_handler, expected_msgs): 119 with self._create_send_telemetry_events_handler() as telemetry_handler: 120 121 telemetry_handler.get_mock_wire_protocol().set_http_handlers(http_post_handler=http_post_handler) 122 123 with patch("azurelinuxagent.common.event.add_event") as mock_add_event: 124 telemetry_handler.enqueue_event(TelemetryEvent()) 125 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 126 for msg in expected_msgs: 127 self._assert_error_event_reported(mock_add_event, msg) 128 129 def test_it_should_send_events_properly(self): 130 events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))] 131 132 with self._create_send_telemetry_events_handler() as telemetry_handler: 133 for test_event in events: 134 telemetry_handler.enqueue_event(test_event) 135 136 self._assert_test_data_in_event_body(telemetry_handler, events) 137 138 def test_it_should_send_as_soon_as_events_available_in_queue_with_minimal_batching_limits(self): 139 events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))] 140 141 with self._create_send_telemetry_events_handler() as telemetry_handler: 142 test_start_time = datetime.now() 143 for test_event in events: 144 telemetry_handler.enqueue_event(test_event) 145 146 self._assert_test_data_in_event_body(telemetry_handler, events) 147 148 # Ensure that we send out the data as soon as we enqueue the events 149 for event_time, _ in telemetry_handler.event_calls: 150 elapsed = event_time - test_start_time 151 self.assertLessEqual(elapsed, timedelta(seconds=2), "Request was not sent as soon as possible") 152 153 def test_thread_should_wait_for_events_to_get_in_queue_before_processing(self): 154 events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))] 155 156 with self._create_send_telemetry_events_handler(timeout=0.1) as telemetry_handler: 157 158 # Do nothing for some time 159 time.sleep(0.3) 160 161 # Ensure that no events were transmitted by the telemetry handler during this time, i.e. telemetry thread was idle 162 self.assertEqual(0, len(telemetry_handler.event_calls), "Unwanted calls to telemetry") 163 164 # Now enqueue data and verify send_telemetry_events sends them asap 165 for test_event in events: 166 telemetry_handler.enqueue_event(test_event) 167 168 self._assert_test_data_in_event_body(telemetry_handler, events) 169 170 def test_it_should_honor_batch_time_limits_before_sending_telemetry(self): 171 events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))] 172 wait_time = timedelta(seconds=10) 173 orig_sleep = time.sleep 174 175 with patch("time.sleep", lambda *_: orig_sleep(0.01)): 176 with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time): 177 with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler: 178 for test_event in events: 179 telemetry_handler.enqueue_event(test_event) 180 181 self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged") 182 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01) 183 184 wait_time = timedelta(seconds=0.2) 185 with patch("time.sleep", lambda *_: orig_sleep(0.05)): 186 with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time): 187 with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler: 188 test_start_time = datetime.now() 189 for test_event in events: 190 telemetry_handler.enqueue_event(test_event) 191 192 while not telemetry_handler.event_calls and (test_start_time + timedelta(seconds=1)) > datetime.now(): 193 # Wait for event calls to be made, wait a max of 1 secs 194 orig_sleep(0.1) 195 196 self.assertGreater(len(telemetry_handler.event_calls), 0, "No event calls made at all!") 197 self._assert_test_data_in_event_body(telemetry_handler, events) 198 for event_time, _ in telemetry_handler.event_calls: 199 elapsed = event_time - test_start_time 200 # Technically we should send out data after 0.2 secs, but keeping a buffer of 1sec while testing 201 self.assertLessEqual(elapsed, timedelta(seconds=1), "Request was not sent properly") 202 203 def test_it_should_clear_queue_before_stopping(self): 204 events = [TelemetryEvent(eventId=ustr(uuid.uuid4())), TelemetryEvent(eventId=ustr(uuid.uuid4()))] 205 wait_time = timedelta(seconds=10) 206 207 with patch("time.sleep", lambda *_: mock_sleep(0.01)): 208 with patch("azurelinuxagent.ga.send_telemetry_events.SendTelemetryEventsHandler._MIN_BATCH_WAIT_TIME", wait_time): 209 with self._create_send_telemetry_events_handler(batching_queue_limit=5) as telemetry_handler: 210 for test_event in events: 211 telemetry_handler.enqueue_event(test_event) 212 213 self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged") 214 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01) 215 # After the service is asked to stop, we should send all data in the queue 216 self._assert_test_data_in_event_body(telemetry_handler, events) 217 218 def test_it_should_honor_batch_queue_limits_before_sending_telemetry(self): 219 220 batch_limit = 5 221 222 with self._create_send_telemetry_events_handler(batching_queue_limit=batch_limit) as telemetry_handler: 223 events = [] 224 225 for _ in range(batch_limit-1): 226 test_event = TelemetryEvent(eventId=ustr(uuid.uuid4())) 227 events.append(test_event) 228 telemetry_handler.enqueue_event(test_event) 229 230 self.assertEqual(0, len(telemetry_handler.event_calls), "No events should have been logged") 231 232 for _ in range(batch_limit): 233 test_event = TelemetryEvent(eventId=ustr(uuid.uuid4())) 234 events.append(test_event) 235 telemetry_handler.enqueue_event(test_event) 236 237 self._assert_test_data_in_event_body(telemetry_handler, events) 238 239 def test_it_should_raise_on_enqueue_if_service_stopped(self): 240 with self._create_send_telemetry_events_handler(start_thread=False) as telemetry_handler: 241 # Ensure the thread is stopped 242 telemetry_handler.stop() 243 with self.assertRaises(ServiceStoppedError) as context_manager: 244 telemetry_handler.enqueue_event(TelemetryEvent(eventId=ustr(uuid.uuid4()))) 245 246 exception = context_manager.exception 247 self.assertIn("{0} is stopped, not accepting anymore events".format(telemetry_handler.get_thread_name()), 248 str(exception)) 249 250 def test_it_should_honour_the_incoming_order_of_events(self): 251 252 with self._create_send_telemetry_events_handler(timeout=0.3, start_thread=False) as telemetry_handler: 253 for index in range(5): 254 telemetry_handler.enqueue_event(TelemetryEvent(eventId=index)) 255 256 telemetry_handler.start() 257 self.assertTrue(telemetry_handler.is_alive(), "Thread not alive") 258 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 259 _, event_body = telemetry_handler.event_calls[0] 260 event_orders = re.findall(r'<Event id=\"(\d+)\"><!\[CDATA\[]]></Event>', event_body) 261 self.assertEqual(sorted(event_orders), event_orders, "Events not ordered correctly") 262 263 def test_send_telemetry_events_should_report_event_if_wireserver_returns_http_error(self): 264 265 test_str = "A test exception, Guid: {0}".format(str(uuid.uuid4())) 266 267 def http_post_handler(url, _, **__): 268 if self.is_telemetry_request(url): 269 return HttpError(test_str) 270 return None 271 272 self._setup_and_assert_bad_request_scenarios(http_post_handler, [test_str]) 273 274 def test_send_telemetry_events_should_report_event_when_http_post_returning_503(self): 275 276 def http_post_handler(url, _, **__): 277 if self.is_telemetry_request(url): 278 return MockHttpResponse(restutil.httpclient.SERVICE_UNAVAILABLE) 279 return None 280 281 expected_msgs = ["[ProtocolError] [Wireserver Exception] [ProtocolError] [Wireserver Failed]", 282 "[HTTP Failed] Status Code 503"] 283 284 self._setup_and_assert_bad_request_scenarios(http_post_handler, expected_msgs) 285 286 def test_send_telemetry_events_should_add_event_on_unexpected_errors(self): 287 288 with self._create_send_telemetry_events_handler(timeout=0.1) as telemetry_handler: 289 290 with patch("azurelinuxagent.ga.send_telemetry_events.add_event") as mock_add_event: 291 with patch("azurelinuxagent.common.protocol.wire.WireClient.report_event") as patch_report_event: 292 test_str = "Test exception, Guid: {0}".format(str(uuid.uuid4())) 293 patch_report_event.side_effect = Exception(test_str) 294 295 telemetry_handler.enqueue_event(TelemetryEvent()) 296 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler, timeout=0.01) 297 298 self._assert_error_event_reported(mock_add_event, test_str, operation=WALAEventOperation.UnhandledError) 299 300 def _create_extension_event(self, 301 size=0, 302 name="DummyExtension", 303 message="DummyMessage"): 304 event_data = self._get_event_data(name=size if size != 0 else name, 305 message=random_generator(size) if size != 0 else message) 306 event_file = os.path.join(self.event_dir, "{0}.tld".format(int(time.time() * 1000000))) 307 with open(event_file, 'wb+') as file_descriptor: 308 file_descriptor.write(event_data.encode('utf-8')) 309 310 @staticmethod 311 def _get_event_data(message, name): 312 event = TelemetryEvent(1, TestSendTelemetryEventsHandler._TEST_EVENT_PROVIDER_ID) 313 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Name, name)) 314 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Version, str(CURRENT_VERSION))) 315 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Operation, WALAEventOperation.Unknown)) 316 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.OperationSuccess, True)) 317 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Message, message)) 318 event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, 0)) 319 320 data = get_properties(event) 321 return json.dumps(data) 322 323 @patch("azurelinuxagent.common.event.TELEMETRY_EVENT_PROVIDER_ID", _TEST_EVENT_PROVIDER_ID) 324 @patch("azurelinuxagent.common.conf.get_lib_dir") 325 def test_it_should_enqueue_and_send_events_properly(self, mock_lib_dir, *_): 326 mock_lib_dir.return_value = self.lib_dir 327 328 with self._create_send_telemetry_events_handler() as telemetry_handler: 329 monitor_handler = _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler) 330 self._create_extension_event(message="Message-Test") 331 332 test_mtime = 1000 # epoch time, in ms 333 test_opcodename = datetime.fromtimestamp(test_mtime).strftime(logger.Logger.LogTimeFormatInUTC) 334 test_eventtid = 42 335 test_eventpid = 24 336 test_taskname = "TEST_TaskName" 337 338 with patch("os.path.getmtime", return_value=test_mtime): 339 with patch('os.getpid', return_value=test_eventpid): 340 with patch("threading.Thread.ident", new_callable=PropertyMock(return_value=test_eventtid)): 341 with patch("threading.Thread.getName", return_value=test_taskname): 342 monitor_handler.run() 343 344 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 345 # Validating the crafted message by the collect_and_send_events call. 346 self.assertEqual(1, len(telemetry_handler.event_calls), "Only 1 event should be sent") 347 348 _, collected_event = telemetry_handler.event_calls[0] 349 350 # Some of those expected values come from the mock protocol and imds client set up during test initialization 351 osutil = get_osutil() 352 osversion = u"{0}:{1}-{2}-{3}:{4}".format(platform.system(), DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, 353 platform.release()) 354 355 sample_message = '<Event id="1"><![CDATA[' \ 356 '<Param Name="Name" Value="DummyExtension" T="mt:wstr" />' \ 357 '<Param Name="Version" Value="{0}" T="mt:wstr" />' \ 358 '<Param Name="Operation" Value="Unknown" T="mt:wstr" />' \ 359 '<Param Name="OperationSuccess" Value="True" T="mt:bool" />' \ 360 '<Param Name="Message" Value="Message-Test" T="mt:wstr" />' \ 361 '<Param Name="Duration" Value="0" T="mt:uint64" />' \ 362 '<Param Name="GAVersion" Value="{1}" T="mt:wstr" />' \ 363 '<Param Name="ContainerId" Value="c6d5526c-5ac2-4200-b6e2-56f2b70c5ab2" T="mt:wstr" />' \ 364 '<Param Name="OpcodeName" Value="{2}" T="mt:wstr" />' \ 365 '<Param Name="EventTid" Value="{3}" T="mt:uint64" />' \ 366 '<Param Name="EventPid" Value="{4}" T="mt:uint64" />' \ 367 '<Param Name="TaskName" Value="{5}" T="mt:wstr" />' \ 368 '<Param Name="KeywordName" Value="" T="mt:wstr" />' \ 369 '<Param Name="ExtensionType" Value="json" T="mt:wstr" />' \ 370 '<Param Name="IsInternal" Value="False" T="mt:bool" />' \ 371 '<Param Name="OSVersion" Value="{6}" T="mt:wstr" />' \ 372 '<Param Name="ExecutionMode" Value="IAAS" T="mt:wstr" />' \ 373 '<Param Name="RAM" Value="{7}" T="mt:uint64" />' \ 374 '<Param Name="Processors" Value="{8}" T="mt:uint64" />' \ 375 '<Param Name="TenantName" Value="db00a7755a5e4e8a8fe4b19bc3b330c3" T="mt:wstr" />' \ 376 '<Param Name="RoleName" Value="MachineRole" T="mt:wstr" />' \ 377 '<Param Name="RoleInstanceName" Value="b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0" T="mt:wstr" />' \ 378 '<Param Name="Location" Value="uswest" T="mt:wstr" />' \ 379 '<Param Name="SubscriptionId" Value="AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE" T="mt:wstr" />' \ 380 '<Param Name="ResourceGroupName" Value="test-rg" T="mt:wstr" />' \ 381 '<Param Name="VMId" Value="99999999-8888-7777-6666-555555555555" T="mt:wstr" />' \ 382 '<Param Name="ImageOrigin" Value="2468" T="mt:uint64" />' \ 383 ']]></Event>'.format(AGENT_VERSION, CURRENT_AGENT, test_opcodename, test_eventtid, 384 test_eventpid, test_taskname, osversion, int(osutil.get_total_mem()), 385 osutil.get_processor_cores()) 386 387 self.assertIn(sample_message, collected_event) 388 389 @patch("azurelinuxagent.common.conf.get_lib_dir") 390 def test_collect_and_send_events_with_small_events(self, mock_lib_dir): 391 mock_lib_dir.return_value = self.lib_dir 392 393 with self._create_send_telemetry_events_handler() as telemetry_handler: 394 sizes = [15, 15, 15, 15] # get the powers of 2 - 2**16 is the limit 395 396 for power in sizes: 397 size = 2 ** power 398 self._create_extension_event(size) 399 400 _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler).run() 401 402 # The send_event call would be called each time, as we are filling up the buffer up to the brim for each call. 403 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 404 self.assertEqual(4, len(telemetry_handler.event_calls)) 405 406 @patch("azurelinuxagent.common.conf.get_lib_dir") 407 def test_collect_and_send_events_with_large_events(self, mock_lib_dir): 408 mock_lib_dir.return_value = self.lib_dir 409 410 with self._create_send_telemetry_events_handler() as telemetry_handler: 411 sizes = [17, 17, 17] # get the powers of 2 412 413 for power in sizes: 414 size = 2 ** power 415 self._create_extension_event(size) 416 417 with patch("azurelinuxagent.common.logger.periodic_warn") as patch_periodic_warn: 418 _CollectAndEnqueueEventsPeriodicOperation(telemetry_handler).run() 419 TestSendTelemetryEventsHandler._stop_handler(telemetry_handler) 420 self.assertEqual(3, patch_periodic_warn.call_count) 421 422 # The send_event call should never be called as the events are larger than 2**16. 423 self.assertEqual(0, len(telemetry_handler.event_calls))