1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"). You 4# may not use this file except in compliance with the License. A copy of 5# the License is located at 6# 7# http://aws.amazon.com/apache2.0/ 8# 9# or in the "license" file accompanying this file. This file is 10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 11# ANY KIND, either express or implied. See the License for the specific 12# language governing permissions and limitations under the License. 13import json 14import logging 15import re 16import time 17 18from botocore.compat import ensure_unicode, ensure_bytes, urlparse 19from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS 20 21 22logger = logging.getLogger(__name__) 23 24 25class Monitor(object): 26 _EVENTS_TO_REGISTER = [ 27 'before-parameter-build', 28 'request-created', 29 'response-received', 30 'after-call', 31 'after-call-error', 32 ] 33 34 def __init__(self, adapter, publisher): 35 """Abstraction for monitoring clients API calls 36 37 :param adapter: An adapter that takes event emitter events 38 and produces monitor events 39 40 :param publisher: A publisher for generated monitor events 41 """ 42 self._adapter = adapter 43 self._publisher = publisher 44 45 def register(self, event_emitter): 46 """Register an event emitter to the monitor""" 47 for event_to_register in self._EVENTS_TO_REGISTER: 48 event_emitter.register_last(event_to_register, self.capture) 49 50 def capture(self, event_name, **payload): 51 """Captures an incoming event from the event emitter 52 53 It will feed an event emitter event to the monitor's adaptor to create 54 a monitor event and then publish that event to the monitor's publisher. 55 """ 56 try: 57 monitor_event = self._adapter.feed(event_name, payload) 58 if monitor_event: 59 self._publisher.publish(monitor_event) 60 except Exception as e: 61 logger.debug( 62 'Exception %s raised by client monitor in handling event %s', 63 e, event_name, exc_info=True) 64 65 66class MonitorEventAdapter(object): 67 def __init__(self, time=time.time): 68 """Adapts event emitter events to produce monitor events 69 70 :type time: callable 71 :param time: A callable that produces the current time 72 """ 73 self._time = time 74 75 def feed(self, emitter_event_name, emitter_payload): 76 """Feed an event emitter event to generate a monitor event 77 78 :type emitter_event_name: str 79 :param emitter_event_name: The name of the event emitted 80 81 :type emitter_payload: dict 82 :param emitter_payload: The payload to associated to the event 83 emitted 84 85 :rtype: BaseMonitorEvent 86 :returns: A monitor event based on the event emitter events 87 fired 88 """ 89 return self._get_handler(emitter_event_name)(**emitter_payload) 90 91 def _get_handler(self, event_name): 92 return getattr( 93 self, '_handle_' + event_name.split('.')[0].replace('-', '_') 94 ) 95 96 def _handle_before_parameter_build(self, model, context, **kwargs): 97 context['current_api_call_event'] = APICallEvent( 98 service=model.service_model.service_id, 99 operation=model.wire_name, 100 timestamp=self._get_current_time(), 101 ) 102 103 def _handle_request_created(self, request, **kwargs): 104 context = request.context 105 new_attempt_event = context[ 106 'current_api_call_event'].new_api_call_attempt( 107 timestamp=self._get_current_time()) 108 new_attempt_event.request_headers = request.headers 109 new_attempt_event.url = request.url 110 context['current_api_call_attempt_event'] = new_attempt_event 111 112 def _handle_response_received(self, parsed_response, context, exception, 113 **kwargs): 114 attempt_event = context.pop('current_api_call_attempt_event') 115 attempt_event.latency = self._get_latency(attempt_event) 116 if parsed_response is not None: 117 attempt_event.http_status_code = parsed_response[ 118 'ResponseMetadata']['HTTPStatusCode'] 119 attempt_event.response_headers = parsed_response[ 120 'ResponseMetadata']['HTTPHeaders'] 121 attempt_event.parsed_error = parsed_response.get('Error') 122 else: 123 attempt_event.wire_exception = exception 124 return attempt_event 125 126 def _handle_after_call(self, context, parsed, **kwargs): 127 context['current_api_call_event'].retries_exceeded = parsed[ 128 'ResponseMetadata'].get('MaxAttemptsReached', False) 129 return self._complete_api_call(context) 130 131 def _handle_after_call_error(self, context, exception, **kwargs): 132 # If the after-call-error was emitted and the error being raised 133 # was a retryable connection error, then the retries must have exceeded 134 # for that exception as this event gets emitted **after** retries 135 # happen. 136 context['current_api_call_event'].retries_exceeded = \ 137 self._is_retryable_exception(exception) 138 return self._complete_api_call(context) 139 140 def _is_retryable_exception(self, exception): 141 return isinstance( 142 exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR'])) 143 144 def _complete_api_call(self, context): 145 call_event = context.pop('current_api_call_event') 146 call_event.latency = self._get_latency(call_event) 147 return call_event 148 149 def _get_latency(self, event): 150 return self._get_current_time() - event.timestamp 151 152 def _get_current_time(self): 153 return int(self._time() * 1000) 154 155 156class BaseMonitorEvent(object): 157 def __init__(self, service, operation, timestamp): 158 """Base monitor event 159 160 :type service: str 161 :param service: A string identifying the service associated to 162 the event 163 164 :type operation: str 165 :param operation: A string identifying the operation of service 166 associated to the event 167 168 :type timestamp: int 169 :param timestamp: Epoch time in milliseconds from when the event began 170 """ 171 self.service = service 172 self.operation = operation 173 self.timestamp = timestamp 174 175 def __repr__(self): 176 return '%s(%r)' % (self.__class__.__name__, self.__dict__) 177 178 def __eq__(self, other): 179 if isinstance(other, self.__class__): 180 return self.__dict__ == other.__dict__ 181 return False 182 183 184class APICallEvent(BaseMonitorEvent): 185 def __init__(self, service, operation, timestamp, latency=None, 186 attempts=None, retries_exceeded=False): 187 """Monitor event for a single API call 188 189 This event corresponds to a single client method call, which includes 190 every HTTP requests attempt made in order to complete the client call 191 192 :type service: str 193 :param service: A string identifying the service associated to 194 the event 195 196 :type operation: str 197 :param operation: A string identifying the operation of service 198 associated to the event 199 200 :type timestamp: int 201 :param timestamp: Epoch time in milliseconds from when the event began 202 203 :type latency: int 204 :param latency: The time in milliseconds to complete the client call 205 206 :type attempts: list 207 :param attempts: The list of APICallAttempts associated to the 208 APICall 209 210 :type retries_exceeded: bool 211 :param retries_exceeded: True if API call exceeded retries. False 212 otherwise 213 """ 214 super(APICallEvent, self).__init__( 215 service=service, operation=operation, timestamp=timestamp) 216 self.latency = latency 217 self.attempts = attempts 218 if attempts is None: 219 self.attempts = [] 220 self.retries_exceeded = retries_exceeded 221 222 def new_api_call_attempt(self, timestamp): 223 """Instantiates APICallAttemptEvent associated to the APICallEvent 224 225 :type timestamp: int 226 :param timestamp: Epoch time in milliseconds to associate to the 227 APICallAttemptEvent 228 """ 229 attempt_event = APICallAttemptEvent( 230 service=self.service, 231 operation=self.operation, 232 timestamp=timestamp 233 ) 234 self.attempts.append(attempt_event) 235 return attempt_event 236 237 238class APICallAttemptEvent(BaseMonitorEvent): 239 def __init__(self, service, operation, timestamp, 240 latency=None, url=None, http_status_code=None, 241 request_headers=None, response_headers=None, 242 parsed_error=None, wire_exception=None): 243 """Monitor event for a single API call attempt 244 245 This event corresponds to a single HTTP request attempt in completing 246 the entire client method call. 247 248 :type service: str 249 :param service: A string identifying the service associated to 250 the event 251 252 :type operation: str 253 :param operation: A string identifying the operation of service 254 associated to the event 255 256 :type timestamp: int 257 :param timestamp: Epoch time in milliseconds from when the HTTP request 258 started 259 260 :type latency: int 261 :param latency: The time in milliseconds to complete the HTTP request 262 whether it succeeded or failed 263 264 :type url: str 265 :param url: The URL the attempt was sent to 266 267 :type http_status_code: int 268 :param http_status_code: The HTTP status code of the HTTP response 269 if there was a response 270 271 :type request_headers: dict 272 :param request_headers: The HTTP headers sent in making the HTTP 273 request 274 275 :type response_headers: dict 276 :param response_headers: The HTTP headers returned in the HTTP response 277 if there was a response 278 279 :type parsed_error: dict 280 :param parsed_error: The error parsed if the service returned an 281 error back 282 283 :type wire_exception: Exception 284 :param wire_exception: The exception raised in sending the HTTP 285 request (i.e. ConnectionError) 286 """ 287 super(APICallAttemptEvent, self).__init__( 288 service=service, operation=operation, timestamp=timestamp 289 ) 290 self.latency = latency 291 self.url = url 292 self.http_status_code = http_status_code 293 self.request_headers = request_headers 294 self.response_headers = response_headers 295 self.parsed_error = parsed_error 296 self.wire_exception = wire_exception 297 298 299class CSMSerializer(object): 300 _MAX_CLIENT_ID_LENGTH = 255 301 _MAX_EXCEPTION_CLASS_LENGTH = 128 302 _MAX_ERROR_CODE_LENGTH = 128 303 _MAX_USER_AGENT_LENGTH = 256 304 _MAX_MESSAGE_LENGTH = 512 305 _RESPONSE_HEADERS_TO_EVENT_ENTRIES = { 306 'x-amzn-requestid': 'XAmznRequestId', 307 'x-amz-request-id': 'XAmzRequestId', 308 'x-amz-id-2': 'XAmzId2', 309 } 310 _AUTH_REGEXS = { 311 'v4': re.compile( 312 r'AWS4-HMAC-SHA256 ' 313 r'Credential=(?P<access_key>\w+)/\d+/' 314 r'(?P<signing_region>[a-z0-9-]+)/' 315 ), 316 's3': re.compile( 317 r'AWS (?P<access_key>\w+):' 318 ) 319 } 320 _SERIALIZEABLE_EVENT_PROPERTIES = [ 321 'service', 322 'operation', 323 'timestamp', 324 'attempts', 325 'latency', 326 'retries_exceeded', 327 'url', 328 'request_headers', 329 'http_status_code', 330 'response_headers', 331 'parsed_error', 332 'wire_exception', 333 ] 334 335 def __init__(self, csm_client_id): 336 """Serializes monitor events to CSM (Client Side Monitoring) format 337 338 :type csm_client_id: str 339 :param csm_client_id: The application identifier to associate 340 to the serialized events 341 """ 342 self._validate_client_id(csm_client_id) 343 self.csm_client_id = csm_client_id 344 345 def _validate_client_id(self, csm_client_id): 346 if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH: 347 raise ValueError( 348 'The value provided for csm_client_id: %s exceeds the ' 349 'maximum length of %s characters' % ( 350 csm_client_id, self._MAX_CLIENT_ID_LENGTH) 351 ) 352 353 def serialize(self, event): 354 """Serializes a monitor event to the CSM format 355 356 :type event: BaseMonitorEvent 357 :param event: The event to serialize to bytes 358 359 :rtype: bytes 360 :returns: The CSM serialized form of the event 361 """ 362 event_dict = self._get_base_event_dict(event) 363 event_type = self._get_event_type(event) 364 event_dict['Type'] = event_type 365 for attr in self._SERIALIZEABLE_EVENT_PROPERTIES: 366 value = getattr(event, attr, None) 367 if value is not None: 368 getattr(self, '_serialize_' + attr)( 369 value, event_dict, event_type=event_type) 370 return ensure_bytes( 371 json.dumps(event_dict, separators=(',', ':'))) 372 373 def _get_base_event_dict(self, event): 374 return { 375 'Version': 1, 376 'ClientId': self.csm_client_id, 377 } 378 379 def _serialize_service(self, service, event_dict, **kwargs): 380 event_dict['Service'] = service 381 382 def _serialize_operation(self, operation, event_dict, **kwargs): 383 event_dict['Api'] = operation 384 385 def _serialize_timestamp(self, timestamp, event_dict, **kwargs): 386 event_dict['Timestamp'] = timestamp 387 388 def _serialize_attempts(self, attempts, event_dict, **kwargs): 389 event_dict['AttemptCount'] = len(attempts) 390 if attempts: 391 self._add_fields_from_last_attempt(event_dict, attempts[-1]) 392 393 def _add_fields_from_last_attempt(self, event_dict, last_attempt): 394 if last_attempt.request_headers: 395 # It does not matter which attempt to use to grab the region 396 # for the ApiCall event, but SDKs typically do the last one. 397 region = self._get_region(last_attempt.request_headers) 398 if region is not None: 399 event_dict['Region'] = region 400 event_dict['UserAgent'] = self._get_user_agent( 401 last_attempt.request_headers) 402 if last_attempt.http_status_code is not None: 403 event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code 404 if last_attempt.parsed_error is not None: 405 self._serialize_parsed_error( 406 last_attempt.parsed_error, event_dict, 'ApiCall') 407 if last_attempt.wire_exception is not None: 408 self._serialize_wire_exception( 409 last_attempt.wire_exception, event_dict, 'ApiCall') 410 411 def _serialize_latency(self, latency, event_dict, event_type): 412 if event_type == 'ApiCall': 413 event_dict['Latency'] = latency 414 elif event_type == 'ApiCallAttempt': 415 event_dict['AttemptLatency'] = latency 416 417 def _serialize_retries_exceeded(self, retries_exceeded, event_dict, 418 **kwargs): 419 event_dict['MaxRetriesExceeded'] = (1 if retries_exceeded else 0) 420 421 def _serialize_url(self, url, event_dict, **kwargs): 422 event_dict['Fqdn'] = urlparse(url).netloc 423 424 def _serialize_request_headers(self, request_headers, event_dict, 425 **kwargs): 426 event_dict['UserAgent'] = self._get_user_agent(request_headers) 427 if self._is_signed(request_headers): 428 event_dict['AccessKey'] = self._get_access_key(request_headers) 429 region = self._get_region(request_headers) 430 if region is not None: 431 event_dict['Region'] = region 432 if 'X-Amz-Security-Token' in request_headers: 433 event_dict['SessionToken'] = request_headers[ 434 'X-Amz-Security-Token'] 435 436 def _serialize_http_status_code(self, http_status_code, event_dict, 437 **kwargs): 438 event_dict['HttpStatusCode'] = http_status_code 439 440 def _serialize_response_headers(self, response_headers, event_dict, 441 **kwargs): 442 for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items(): 443 if header in response_headers: 444 event_dict[entry] = response_headers[header] 445 446 def _serialize_parsed_error(self, parsed_error, event_dict, event_type, 447 **kwargs): 448 field_prefix = 'Final' if event_type == 'ApiCall' else '' 449 event_dict[field_prefix + 'AwsException'] = self._truncate( 450 parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH) 451 event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate( 452 parsed_error['Message'], self._MAX_MESSAGE_LENGTH) 453 454 def _serialize_wire_exception(self, wire_exception, event_dict, event_type, 455 **kwargs): 456 field_prefix = 'Final' if event_type == 'ApiCall' else '' 457 event_dict[field_prefix + 'SdkException'] = self._truncate( 458 wire_exception.__class__.__name__, 459 self._MAX_EXCEPTION_CLASS_LENGTH) 460 event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate( 461 str(wire_exception), self._MAX_MESSAGE_LENGTH) 462 463 def _get_event_type(self, event): 464 if isinstance(event, APICallEvent): 465 return 'ApiCall' 466 elif isinstance(event, APICallAttemptEvent): 467 return 'ApiCallAttempt' 468 469 def _get_access_key(self, request_headers): 470 auth_val = self._get_auth_value(request_headers) 471 _, auth_match = self._get_auth_match(auth_val) 472 return auth_match.group('access_key') 473 474 def _get_region(self, request_headers): 475 if not self._is_signed(request_headers): 476 return None 477 auth_val = self._get_auth_value(request_headers) 478 signature_version, auth_match = self._get_auth_match(auth_val) 479 if signature_version != 'v4': 480 return None 481 return auth_match.group('signing_region') 482 483 def _get_user_agent(self, request_headers): 484 return self._truncate( 485 ensure_unicode(request_headers.get('User-Agent', '')), 486 self._MAX_USER_AGENT_LENGTH 487 ) 488 489 def _is_signed(self, request_headers): 490 return 'Authorization' in request_headers 491 492 def _get_auth_value(self, request_headers): 493 return ensure_unicode(request_headers['Authorization']) 494 495 def _get_auth_match(self, auth_val): 496 for signature_version, regex in self._AUTH_REGEXS.items(): 497 match = regex.match(auth_val) 498 if match: 499 return signature_version, match 500 return None, None 501 502 def _truncate(self, text, max_length): 503 if len(text) > max_length: 504 logger.debug( 505 'Truncating following value to maximum length of ' 506 '%s: %s', text, max_length) 507 return text[:max_length] 508 return text 509 510 511class SocketPublisher(object): 512 _MAX_MONITOR_EVENT_LENGTH = 8 * 1024 513 514 def __init__(self, socket, host, port, serializer): 515 """Publishes monitor events to a socket 516 517 :type socket: socket.socket 518 :param socket: The socket object to use to publish events 519 520 :type host: string 521 :param host: The host to send events to 522 523 :type port: integer 524 :param port: The port on the host to send events to 525 526 :param serializer: The serializer to use to serialize the event 527 to a form that can be published to the socket. This must 528 have a `serialize()` method that accepts a monitor event 529 and return bytes 530 """ 531 self._socket = socket 532 self._address = (host, port) 533 self._serializer = serializer 534 535 def publish(self, event): 536 """Publishes a specified monitor event 537 538 :type event: BaseMonitorEvent 539 :param event: The monitor event to be sent 540 over the publisher's socket to the desired address. 541 """ 542 serialized_event = self._serializer.serialize(event) 543 if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH: 544 logger.debug( 545 'Serialized event of size %s exceeds the maximum length ' 546 'allowed: %s. Not sending event to socket.', 547 len(serialized_event), self._MAX_MONITOR_EVENT_LENGTH 548 ) 549 return 550 self._socket.sendto(serialized_event, self._address) 551