1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7#     http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import json
14import logging
15import re
16import time
17
18from botocore.compat import ensure_unicode, ensure_bytes, urlparse
19from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
20
21
22logger = logging.getLogger(__name__)
23
24
25class Monitor(object):
26    _EVENTS_TO_REGISTER = [
27        'before-parameter-build',
28        'request-created',
29        'response-received',
30        'after-call',
31        'after-call-error',
32    ]
33
34    def __init__(self, adapter, publisher):
35        """Abstraction for monitoring clients API calls
36
37        :param adapter: An adapter that takes event emitter events
38            and produces monitor events
39
40        :param publisher: A publisher for generated monitor events
41        """
42        self._adapter = adapter
43        self._publisher = publisher
44
45    def register(self, event_emitter):
46        """Register an event emitter to the monitor"""
47        for event_to_register in self._EVENTS_TO_REGISTER:
48            event_emitter.register_last(event_to_register, self.capture)
49
50    def capture(self, event_name, **payload):
51        """Captures an incoming event from the event emitter
52
53        It will feed an event emitter event to the monitor's adaptor to create
54        a monitor event and then publish that event to the monitor's publisher.
55        """
56        try:
57            monitor_event = self._adapter.feed(event_name, payload)
58            if monitor_event:
59                self._publisher.publish(monitor_event)
60        except Exception as e:
61            logger.debug(
62                'Exception %s raised by client monitor in handling event %s',
63                e, event_name, exc_info=True)
64
65
66class MonitorEventAdapter(object):
67    def __init__(self, time=time.time):
68        """Adapts event emitter events to produce monitor events
69
70        :type time: callable
71        :param time: A callable that produces the current time
72        """
73        self._time = time
74
75    def feed(self, emitter_event_name, emitter_payload):
76        """Feed an event emitter event to generate a monitor event
77
78        :type emitter_event_name: str
79        :param emitter_event_name: The name of the event emitted
80
81        :type emitter_payload: dict
82        :param emitter_payload: The payload to associated to the event
83            emitted
84
85        :rtype: BaseMonitorEvent
86        :returns: A monitor event based on the event emitter events
87            fired
88        """
89        return self._get_handler(emitter_event_name)(**emitter_payload)
90
91    def _get_handler(self, event_name):
92        return getattr(
93            self, '_handle_' + event_name.split('.')[0].replace('-', '_')
94        )
95
96    def _handle_before_parameter_build(self, model, context, **kwargs):
97        context['current_api_call_event'] = APICallEvent(
98            service=model.service_model.service_id,
99            operation=model.wire_name,
100            timestamp=self._get_current_time(),
101        )
102
103    def _handle_request_created(self, request, **kwargs):
104        context = request.context
105        new_attempt_event = context[
106            'current_api_call_event'].new_api_call_attempt(
107                timestamp=self._get_current_time())
108        new_attempt_event.request_headers = request.headers
109        new_attempt_event.url = request.url
110        context['current_api_call_attempt_event'] = new_attempt_event
111
112    def _handle_response_received(self, parsed_response, context, exception,
113                                  **kwargs):
114        attempt_event = context.pop('current_api_call_attempt_event')
115        attempt_event.latency = self._get_latency(attempt_event)
116        if parsed_response is not None:
117            attempt_event.http_status_code = parsed_response[
118                'ResponseMetadata']['HTTPStatusCode']
119            attempt_event.response_headers = parsed_response[
120                'ResponseMetadata']['HTTPHeaders']
121            attempt_event.parsed_error = parsed_response.get('Error')
122        else:
123            attempt_event.wire_exception = exception
124        return attempt_event
125
126    def _handle_after_call(self, context, parsed, **kwargs):
127        context['current_api_call_event'].retries_exceeded = parsed[
128            'ResponseMetadata'].get('MaxAttemptsReached', False)
129        return self._complete_api_call(context)
130
131    def _handle_after_call_error(self, context, exception, **kwargs):
132        # If the after-call-error was emitted and the error being raised
133        # was a retryable connection error, then the retries must have exceeded
134        # for that exception as this event gets emitted **after** retries
135        # happen.
136        context['current_api_call_event'].retries_exceeded = \
137            self._is_retryable_exception(exception)
138        return self._complete_api_call(context)
139
140    def _is_retryable_exception(self, exception):
141        return isinstance(
142            exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR']))
143
144    def _complete_api_call(self, context):
145        call_event = context.pop('current_api_call_event')
146        call_event.latency = self._get_latency(call_event)
147        return call_event
148
149    def _get_latency(self, event):
150        return self._get_current_time() - event.timestamp
151
152    def _get_current_time(self):
153        return int(self._time() * 1000)
154
155
156class BaseMonitorEvent(object):
157    def __init__(self, service, operation, timestamp):
158        """Base monitor event
159
160        :type service: str
161        :param service: A string identifying the service associated to
162            the event
163
164        :type operation: str
165        :param operation: A string identifying the operation of service
166            associated to the event
167
168        :type timestamp: int
169        :param timestamp: Epoch time in milliseconds from when the event began
170        """
171        self.service = service
172        self.operation = operation
173        self.timestamp = timestamp
174
175    def __repr__(self):
176        return '%s(%r)' % (self.__class__.__name__, self.__dict__)
177
178    def __eq__(self, other):
179        if isinstance(other, self.__class__):
180            return self.__dict__ == other.__dict__
181        return False
182
183
184class APICallEvent(BaseMonitorEvent):
185    def __init__(self, service, operation, timestamp, latency=None,
186                 attempts=None, retries_exceeded=False):
187        """Monitor event for a single API call
188
189        This event corresponds to a single client method call, which includes
190        every HTTP requests attempt made in order to complete the client call
191
192        :type service: str
193        :param service: A string identifying the service associated to
194            the event
195
196        :type operation: str
197        :param operation: A string identifying the operation of service
198            associated to the event
199
200        :type timestamp: int
201        :param timestamp: Epoch time in milliseconds from when the event began
202
203        :type latency: int
204        :param latency: The time in milliseconds to complete the client call
205
206        :type attempts: list
207        :param attempts: The list of APICallAttempts associated to the
208            APICall
209
210        :type retries_exceeded: bool
211        :param retries_exceeded: True if API call exceeded retries. False
212            otherwise
213        """
214        super(APICallEvent, self).__init__(
215            service=service, operation=operation, timestamp=timestamp)
216        self.latency = latency
217        self.attempts = attempts
218        if attempts is None:
219            self.attempts = []
220        self.retries_exceeded = retries_exceeded
221
222    def new_api_call_attempt(self, timestamp):
223        """Instantiates APICallAttemptEvent associated to the APICallEvent
224
225        :type timestamp: int
226        :param timestamp: Epoch time in milliseconds to associate to the
227            APICallAttemptEvent
228        """
229        attempt_event = APICallAttemptEvent(
230            service=self.service,
231            operation=self.operation,
232            timestamp=timestamp
233        )
234        self.attempts.append(attempt_event)
235        return attempt_event
236
237
238class APICallAttemptEvent(BaseMonitorEvent):
239    def __init__(self, service, operation, timestamp,
240                 latency=None, url=None, http_status_code=None,
241                 request_headers=None, response_headers=None,
242                 parsed_error=None, wire_exception=None):
243        """Monitor event for a single API call attempt
244
245        This event corresponds to a single HTTP request attempt in completing
246        the entire client method call.
247
248        :type service: str
249        :param service: A string identifying the service associated to
250            the event
251
252        :type operation: str
253        :param operation: A string identifying the operation of service
254            associated to the event
255
256        :type timestamp: int
257        :param timestamp: Epoch time in milliseconds from when the HTTP request
258            started
259
260        :type latency: int
261        :param latency: The time in milliseconds to complete the HTTP request
262            whether it succeeded or failed
263
264        :type url: str
265        :param url: The URL the attempt was sent to
266
267        :type http_status_code: int
268        :param http_status_code: The HTTP status code of the HTTP response
269            if there was a response
270
271        :type request_headers: dict
272        :param request_headers: The HTTP headers sent in making the HTTP
273            request
274
275        :type response_headers: dict
276        :param response_headers: The HTTP headers returned in the HTTP response
277            if there was a response
278
279        :type parsed_error: dict
280        :param parsed_error: The error parsed if the service returned an
281            error back
282
283        :type wire_exception: Exception
284        :param wire_exception: The exception raised in sending the HTTP
285            request (i.e. ConnectionError)
286        """
287        super(APICallAttemptEvent, self).__init__(
288            service=service, operation=operation, timestamp=timestamp
289        )
290        self.latency = latency
291        self.url = url
292        self.http_status_code = http_status_code
293        self.request_headers = request_headers
294        self.response_headers = response_headers
295        self.parsed_error = parsed_error
296        self.wire_exception = wire_exception
297
298
299class CSMSerializer(object):
300    _MAX_CLIENT_ID_LENGTH = 255
301    _MAX_EXCEPTION_CLASS_LENGTH = 128
302    _MAX_ERROR_CODE_LENGTH = 128
303    _MAX_USER_AGENT_LENGTH = 256
304    _MAX_MESSAGE_LENGTH = 512
305    _RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
306        'x-amzn-requestid': 'XAmznRequestId',
307        'x-amz-request-id': 'XAmzRequestId',
308        'x-amz-id-2': 'XAmzId2',
309    }
310    _AUTH_REGEXS = {
311        'v4': re.compile(
312            r'AWS4-HMAC-SHA256 '
313            r'Credential=(?P<access_key>\w+)/\d+/'
314            r'(?P<signing_region>[a-z0-9-]+)/'
315        ),
316        's3': re.compile(
317            r'AWS (?P<access_key>\w+):'
318        )
319    }
320    _SERIALIZEABLE_EVENT_PROPERTIES = [
321        'service',
322        'operation',
323        'timestamp',
324        'attempts',
325        'latency',
326        'retries_exceeded',
327        'url',
328        'request_headers',
329        'http_status_code',
330        'response_headers',
331        'parsed_error',
332        'wire_exception',
333    ]
334
335    def __init__(self, csm_client_id):
336        """Serializes monitor events to CSM (Client Side Monitoring) format
337
338        :type csm_client_id: str
339        :param csm_client_id: The application identifier to associate
340            to the serialized events
341        """
342        self._validate_client_id(csm_client_id)
343        self.csm_client_id = csm_client_id
344
345    def _validate_client_id(self, csm_client_id):
346        if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
347            raise ValueError(
348                'The value provided for csm_client_id: %s exceeds the '
349                'maximum length of %s characters' % (
350                    csm_client_id, self._MAX_CLIENT_ID_LENGTH)
351            )
352
353    def serialize(self, event):
354        """Serializes a monitor event to the CSM format
355
356        :type event: BaseMonitorEvent
357        :param event: The event to serialize to bytes
358
359        :rtype: bytes
360        :returns: The CSM serialized form of the event
361        """
362        event_dict = self._get_base_event_dict(event)
363        event_type = self._get_event_type(event)
364        event_dict['Type'] = event_type
365        for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
366            value = getattr(event, attr, None)
367            if value is not None:
368                getattr(self, '_serialize_' + attr)(
369                    value, event_dict, event_type=event_type)
370        return ensure_bytes(
371            json.dumps(event_dict, separators=(',', ':')))
372
373    def _get_base_event_dict(self, event):
374        return {
375            'Version': 1,
376            'ClientId': self.csm_client_id,
377        }
378
379    def _serialize_service(self, service, event_dict, **kwargs):
380        event_dict['Service'] = service
381
382    def _serialize_operation(self, operation, event_dict, **kwargs):
383        event_dict['Api'] = operation
384
385    def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
386        event_dict['Timestamp'] = timestamp
387
388    def _serialize_attempts(self, attempts, event_dict, **kwargs):
389        event_dict['AttemptCount'] = len(attempts)
390        if attempts:
391            self._add_fields_from_last_attempt(event_dict, attempts[-1])
392
393    def _add_fields_from_last_attempt(self, event_dict, last_attempt):
394        if last_attempt.request_headers:
395            # It does not matter which attempt to use to grab the region
396            # for the ApiCall event, but SDKs typically do the last one.
397            region = self._get_region(last_attempt.request_headers)
398            if region is not None:
399                event_dict['Region'] = region
400            event_dict['UserAgent'] = self._get_user_agent(
401                last_attempt.request_headers)
402        if last_attempt.http_status_code is not None:
403            event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
404        if last_attempt.parsed_error is not None:
405            self._serialize_parsed_error(
406                last_attempt.parsed_error, event_dict, 'ApiCall')
407        if last_attempt.wire_exception is not None:
408            self._serialize_wire_exception(
409                last_attempt.wire_exception, event_dict, 'ApiCall')
410
411    def _serialize_latency(self, latency, event_dict, event_type):
412        if event_type == 'ApiCall':
413            event_dict['Latency'] = latency
414        elif event_type == 'ApiCallAttempt':
415            event_dict['AttemptLatency'] = latency
416
417    def _serialize_retries_exceeded(self, retries_exceeded, event_dict,
418                                    **kwargs):
419        event_dict['MaxRetriesExceeded'] = (1 if retries_exceeded else 0)
420
421    def _serialize_url(self, url, event_dict, **kwargs):
422        event_dict['Fqdn'] = urlparse(url).netloc
423
424    def _serialize_request_headers(self, request_headers, event_dict,
425                                   **kwargs):
426        event_dict['UserAgent'] = self._get_user_agent(request_headers)
427        if self._is_signed(request_headers):
428            event_dict['AccessKey'] = self._get_access_key(request_headers)
429        region = self._get_region(request_headers)
430        if region is not None:
431            event_dict['Region'] = region
432        if 'X-Amz-Security-Token' in request_headers:
433            event_dict['SessionToken'] = request_headers[
434                'X-Amz-Security-Token']
435
436    def _serialize_http_status_code(self, http_status_code, event_dict,
437                                    **kwargs):
438        event_dict['HttpStatusCode'] = http_status_code
439
440    def _serialize_response_headers(self, response_headers, event_dict,
441                                    **kwargs):
442        for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
443            if header in response_headers:
444                event_dict[entry] = response_headers[header]
445
446    def _serialize_parsed_error(self, parsed_error, event_dict, event_type,
447                                **kwargs):
448        field_prefix = 'Final' if event_type == 'ApiCall' else ''
449        event_dict[field_prefix + 'AwsException'] = self._truncate(
450            parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH)
451        event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
452            parsed_error['Message'], self._MAX_MESSAGE_LENGTH)
453
454    def _serialize_wire_exception(self, wire_exception, event_dict, event_type,
455                                  **kwargs):
456        field_prefix = 'Final' if event_type == 'ApiCall' else ''
457        event_dict[field_prefix + 'SdkException'] = self._truncate(
458            wire_exception.__class__.__name__,
459            self._MAX_EXCEPTION_CLASS_LENGTH)
460        event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
461            str(wire_exception), self._MAX_MESSAGE_LENGTH)
462
463    def _get_event_type(self, event):
464        if isinstance(event, APICallEvent):
465            return 'ApiCall'
466        elif isinstance(event, APICallAttemptEvent):
467            return 'ApiCallAttempt'
468
469    def _get_access_key(self, request_headers):
470        auth_val = self._get_auth_value(request_headers)
471        _, auth_match = self._get_auth_match(auth_val)
472        return auth_match.group('access_key')
473
474    def _get_region(self, request_headers):
475        if not self._is_signed(request_headers):
476            return None
477        auth_val = self._get_auth_value(request_headers)
478        signature_version, auth_match = self._get_auth_match(auth_val)
479        if signature_version != 'v4':
480            return None
481        return auth_match.group('signing_region')
482
483    def _get_user_agent(self, request_headers):
484        return self._truncate(
485            ensure_unicode(request_headers.get('User-Agent', '')),
486            self._MAX_USER_AGENT_LENGTH
487        )
488
489    def _is_signed(self, request_headers):
490        return 'Authorization' in request_headers
491
492    def _get_auth_value(self, request_headers):
493        return ensure_unicode(request_headers['Authorization'])
494
495    def _get_auth_match(self, auth_val):
496        for signature_version, regex in self._AUTH_REGEXS.items():
497            match = regex.match(auth_val)
498            if match:
499                return signature_version, match
500        return None, None
501
502    def _truncate(self, text, max_length):
503        if len(text) > max_length:
504            logger.debug(
505                'Truncating following value to maximum length of '
506                '%s: %s', text, max_length)
507            return text[:max_length]
508        return text
509
510
511class SocketPublisher(object):
512    _MAX_MONITOR_EVENT_LENGTH = 8 * 1024
513
514    def __init__(self, socket, host, port, serializer):
515        """Publishes monitor events to a socket
516
517        :type socket: socket.socket
518        :param socket: The socket object to use to publish events
519
520        :type host: string
521        :param host: The host to send events to
522
523        :type port: integer
524        :param port: The port on the host to send events to
525
526        :param serializer: The serializer to use to serialize the event
527            to a form that can be published to the socket. This must
528            have a `serialize()` method that accepts a monitor event
529            and return bytes
530        """
531        self._socket = socket
532        self._address = (host, port)
533        self._serializer = serializer
534
535    def publish(self, event):
536        """Publishes a specified monitor event
537
538        :type event: BaseMonitorEvent
539        :param event: The monitor event to be sent
540            over the publisher's socket to the desired address.
541        """
542        serialized_event = self._serializer.serialize(event)
543        if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
544            logger.debug(
545                'Serialized event of size %s exceeds the maximum length '
546                'allowed: %s. Not sending event to socket.',
547                len(serialized_event), self._MAX_MONITOR_EVENT_LENGTH
548            )
549            return
550        self._socket.sendto(serialized_event, self._address)
551