1# Copyright 2016 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""report_request supports aggregation of ReportRequests.
16
17It proves :class:`.Aggregator` that aggregates and batches together
18ReportRequests.
19
20"""
21
22
23from __future__ import absolute_import
24
25import collections
26import functools
27import hashlib
28import logging
29import time
30from datetime import datetime, timedelta
31
32from apitools.base.py import encoding
33from enum import Enum
34from . import caches, label_descriptor, operation, messages
35from . import metric_descriptor, signing, timestamp
36
37logger = logging.getLogger(__name__)
38
39SIZE_NOT_SET = -1
40
41
42def _validate_int_arg(name, value):
43    if value == SIZE_NOT_SET or (isinstance(value, int) and value >= 0):
44        return
45    raise ValueError('%s should be a non-negative int/long' % (name,))
46
47
48def _validate_timedelta_arg(name, value):
49    if value is None or isinstance(value, timedelta):
50        return
51    raise ValueError('%s should be a timedelta' % (name,))
52
53
54class ReportingRules(collections.namedtuple('ReportingRules',
55                                            ['logs', 'metrics', 'labels'])):
56    """Holds information that determines how to fill a `ReportRequest`.
57
58    Attributes:
59      logs (iterable[string]): the name of logs to be included in the `ReportRequest`
60      metrics (iterable[:class:`google.api.control.metric_descriptor.KnownMetrics`]):
61        the metrics to be added to a `ReportRequest`
62      labels (iterable[:class:`google.api.control.metric_descriptor.KnownLabels`]):
63        the labels to be added to a `ReportRequest`
64    """
65    # pylint: disable=too-few-public-methods
66
67    def __new__(cls, logs=None, metrics=None, labels=None):
68        """Invokes the base constructor with default values."""
69        logs = set(logs) if logs else set()
70        metrics = tuple(metrics) if metrics else tuple()
71        labels = tuple(labels) if labels else tuple()
72        return super(cls, ReportingRules).__new__(cls, logs, metrics, labels)
73
74    @classmethod
75    def from_known_inputs(cls, logs=None, metric_names=None, label_names=None):
76        """An alternate constructor that assumes known metrics and labels.
77
78        This differs from the default constructor in that the metrics and labels
79        are iterables of names of 'known' metrics and labels respectively. The
80        names are used to obtain the metrics and labels from
81        :class:`google.api.control.metric_descriptor.KnownMetrics` and
82        :class:`google.api.control.label_descriptor.KnownLabels` respectively.
83
84        names that don't correspond to a known metric or label are ignored; as
85        are metrics or labels that don't yet have a way of updating the
86        `ReportRequest` operation.
87
88        Args:
89          logs (iterable[string]): the name of logs to be included in the
90            `ReportRequest`
91          metric_names (iterable[string]): the name of a known metric to be
92            added to the `ReportRequest`
93          label_names (iterable[string]): the name of a known label to be added
94            to the `ReportRequest`
95
96        """
97        if not metric_names:
98            metric_names = ()
99        if not label_names:
100            label_names = ()
101        known_labels = []
102        known_metrics = []
103        # pylint: disable=no-member
104        # pylint is not aware of the __members__ attributes
105        for l in label_descriptor.KnownLabels.__members__.values():
106            if l.update_label_func and l.label_name in label_names:
107                known_labels.append(l)
108        for m in metric_descriptor.KnownMetrics.__members__.values():
109            if m.update_op_func and m.metric_name in metric_names:
110                known_metrics.append(m)
111        return cls(logs=logs, metrics=known_metrics, labels=known_labels)
112
113
114class ReportedProtocols(Enum):
115    """Enumerates the protocols that can be reported."""
116    # pylint: disable=too-few-public-methods
117    UNKNOWN = 0
118    HTTP = 1
119    HTTP2 = 2
120    GRPC = 3
121
122
123class ReportedPlatforms(Enum):
124    """Enumerates the platforms that can be reported."""
125    # pylint: disable=too-few-public-methods
126    UNKNOWN = 0
127    GAE_FLEX = 1
128    GAE_STANDARD = 2
129    GCE = 3
130    GKE = 4
131    DEVELOPMENT = 5
132
133
134class ErrorCause(Enum):
135    """Enumerates the causes of errors."""
136    # pylint: disable=too-few-public-methods
137    internal = 0  # default, error in scc library code
138    application = 1  # external application error
139    auth = 2  # authentication error
140    service_control = 3  # error in service control check
141
142
143# alias the severity enum
144_SEVERITY = messages.LogEntry.SeverityValueValuesEnum
145
146
147def _struct_payload_from(a_dict):
148    return encoding.PyValueToMessage(messages.LogEntry.StructPayloadValue, a_dict)
149
150
151class Info(
152        collections.namedtuple(
153            'Info', (
154                'api_name',
155                'api_method',
156                'api_version',
157                'auth_issuer',
158                'auth_audience',
159                'backend_time',
160                'error_cause',
161                'location',
162                'log_message',
163                'method',
164                'overhead_time',
165                'platform',
166                'producer_project_id',
167                'protocol',
168                'request_size',
169                'request_time',
170                'response_code',
171                'response_size',
172                'url',
173            ) + operation.Info._fields),
174        operation.Info):
175    """Holds the information necessary to fill in a ReportRequest.
176
177    In the attribute descriptions below, N/A means 'not available'
178
179    Attributes:
180       api_name (string): the api name and version
181       api_method (string): the full api method name
182       api_version (string): the api version
183       auth_issuer (string): the auth issuer
184       auth_audience (string): the auth audience
185       backend_time(datetime.timedelta): the backend request time, None for N/A
186       error_cause(:class:`ErrorCause`): the cause of error if one has occurred
187       location (string): the location of the service
188       log_message (string): a message to log as an info log
189       method (string): the HTTP method used to make the request
190       overhead_time(datetime.timedelta): the overhead time, None for N/A
191       platform (:class:`ReportedPlatform`): the platform in use
192       producer_project_id (string): the producer project id
193       protocol (:class:`ReportedProtocol`): the protocol used
194       request_size(int): the request size in bytes, -1 means N/A
195       request_time(datetime.timedelta): the request time
196       response_size(int): the request size in bytes, -1 means N/A
197       response_code(int): the code of the http response
198       url (string): the request url
199
200    """
201    # pylint: disable=too-many-arguments,too-many-locals
202
203    COPYABLE_LOG_FIELDS = [
204        'api_name',
205        'api_method',
206        'api_key',
207        'producer_project_id',
208        'referer',
209        'location',
210        'log_message',
211        'url',
212    ]
213
214    def __new__(cls,
215                api_name='',
216                api_method='',
217                api_version='',
218                auth_issuer='',
219                auth_audience='',
220                backend_time=None,
221                error_cause=ErrorCause.internal,
222                location='',
223                log_message='',
224                method='',
225                overhead_time=None,
226                platform=ReportedPlatforms.UNKNOWN,
227                producer_project_id='',
228                protocol=ReportedProtocols.UNKNOWN,
229                request_size=SIZE_NOT_SET,
230                request_time=None,
231                response_size=SIZE_NOT_SET,
232                response_code=200,
233                url='',
234                **kw):
235        """Invokes the base constructor with default values."""
236        op_info = operation.Info(**kw)
237        _validate_timedelta_arg('backend_time', backend_time)
238        _validate_timedelta_arg('overhead_time', overhead_time)
239        _validate_timedelta_arg('request_time', request_time)
240        _validate_int_arg('request_size', request_size)
241        _validate_int_arg('response_size', response_size)
242        if not isinstance(protocol, ReportedProtocols):
243            raise ValueError('protocol should be a %s' % (ReportedProtocols,))
244        if not isinstance(platform, ReportedPlatforms):
245            raise ValueError('platform should be a %s' % (ReportedPlatforms,))
246        if not isinstance(error_cause, ErrorCause):
247            raise ValueError('error_cause should be a %s' % (ErrorCause,))
248        return super(cls, Info).__new__(
249            cls,
250            api_name,
251            api_method,
252            api_version,
253            auth_issuer,
254            auth_audience,
255            backend_time,
256            error_cause,
257            location,
258            log_message,
259            method,
260            overhead_time,
261            platform,
262            producer_project_id,
263            protocol,
264            request_size,
265            request_time,
266            response_code,
267            response_size,
268            url,
269            **op_info._asdict())
270
271    def _as_log_entry(self, name, now):
272        """Makes a `LogEntry` from this instance for the given log_name.
273
274        Args:
275          rules (:class:`ReportingRules`): determines what labels, metrics and
276            logs to include in the report request.
277          now (:class:`datetime.DateTime`): the current time
278
279        Return:
280          a ``LogEntry`` generated from this instance with the given name
281          and timestamp
282
283        Raises:
284          ValueError: if the fields in this instance are insufficient to
285            to create a valid ``ServicecontrolServicesReportRequest``
286
287        """
288        # initialize the struct with fields that are always present
289        d = {
290            'http_response_code': self.response_code,
291            'timestamp': time.mktime(now.timetuple())
292        }
293
294        # compute the severity
295        severity = _SEVERITY.INFO
296        if self.response_code >= 400:
297            severity = _SEVERITY.ERROR
298            d['error_cause'] = self.error_cause.name
299
300        # add 'optional' fields to the struct
301        if self.request_size > 0:
302            d['request_size'] = self.request_size
303        if self.response_size > 0:
304            d['response_size'] = self.response_size
305        if self.method:
306            d['http_method'] = self.method
307        if self.request_time:
308            d['request_latency_in_ms'] = self.request_time.total_seconds() * 1000
309
310        # add 'copyable' fields to the struct
311        for key in self.COPYABLE_LOG_FIELDS:
312            value = getattr(self, key, None)
313            if value:
314                d[key] = value
315
316        return messages.LogEntry(
317            name=name,
318            timestamp=timestamp.to_rfc3339(now),
319            severity=severity,
320            structPayload=_struct_payload_from(d))
321
322    def as_report_request(self, rules, timer=datetime.utcnow):
323        """Makes a `ServicecontrolServicesReportRequest` from this instance
324
325        Args:
326          rules (:class:`ReportingRules`): determines what labels, metrics and
327            logs to include in the report request.
328          timer: a function that determines the current time
329
330        Return:
331          a ``ServicecontrolServicesReportRequest`` generated from this instance
332          governed by the provided ``rules``
333
334        Raises:
335          ValueError: if the fields in this instance cannot be used to create
336            a valid ``ServicecontrolServicesReportRequest``
337
338        """
339        if not self.service_name:
340            raise ValueError('the service name must be set')
341        op = super(Info, self).as_operation(timer=timer)
342
343        # Populate metrics and labels if they can be associated with a
344        # method/operation
345        if op.operationId and op.operationName:
346            labels = {}
347            for known_label in rules.labels:
348                known_label.do_labels_update(self, labels)
349            if labels:
350                op.labels = encoding.PyValueToMessage(
351                    messages.Operation.LabelsValue,
352                    labels)
353            for known_metric in rules.metrics:
354                known_metric.do_operation_update(self, op)
355
356        # Populate the log entries
357        now = timer()
358        op.logEntries = [self._as_log_entry(l, now) for l in rules.logs]
359
360        return messages.ServicecontrolServicesReportRequest(
361            serviceName=self.service_name,
362            reportRequest=messages.ReportRequest(operations=[op]))
363
364
365_NO_RESULTS = tuple()
366
367
368class Aggregator(object):
369    """Aggregates Service Control Report requests.
370
371    :func:`report` determines if a `ReportRequest` should be sent to the
372    service immediately
373
374    """
375
376    CACHED_OK = object()
377    """A sentinel returned by :func:`report` when a request is cached OK."""
378
379    MAX_OPERATION_COUNT = 1000
380    """The maximum number of operations to send in a report request."""
381
382    def __init__(self, service_name, options, kinds=None,
383                 timer=datetime.utcnow):
384        """
385        Constructor
386
387        Args:
388          service_name (string): name of the service being aggregagated
389          options (:class:`google.api.caches.ReportOptions`): configures the behavior
390            of this aggregator
391          kinds (dict[string, [:class:`.MetricKind`]]): describes the
392            type of metrics used during aggregation
393          timer (function([[datetime]]): a function that returns the current
394            as a time as a datetime instance
395
396        """
397        self._cache = caches.create(options, timer=timer)
398        self._options = options
399        self._kinds = kinds
400        self._service_name = service_name
401
402    @property
403    def flush_interval(self):
404        """The interval between calls to flush.
405
406        Returns:
407           timedelta: the period between calls to flush if, or ``None`` if no
408           cache is set
409
410        """
411        return None if self._cache is None else self._options.flush_interval
412
413    @property
414    def service_name(self):
415        """The service to which all requests being aggregated should belong."""
416        return self._service_name
417
418    def flush(self):
419        """Flushes this instance's cache.
420
421        The driver of this instance should call this method every
422        `flush_interval`.
423
424        Returns:
425          list[``ServicecontrolServicesReportRequest``]: corresponding to the
426            pending cached operations
427
428        """
429        if self._cache is None:
430            return _NO_RESULTS
431        with self._cache as c:
432            flushed_ops = [x.as_operation() for x in list(c.out_deque)]
433            c.out_deque.clear()
434            reqs = []
435            max_ops = self.MAX_OPERATION_COUNT
436            for x in range(0, len(flushed_ops), max_ops):
437                report_request = messages.ReportRequest(
438                    operations=flushed_ops[x:x + max_ops])
439                reqs.append(
440                    messages.ServicecontrolServicesReportRequest(
441                        serviceName=self.service_name,
442                        reportRequest=report_request))
443
444            return reqs
445
446    def clear(self):
447        """Clears the cache."""
448        if self._cache is None:
449            return _NO_RESULTS
450        if self._cache is not None:
451            with self._cache as k:
452                res = [x.as_operation() for x in k.values()]
453                k.clear()
454                k.out_deque.clear()
455                return res
456
457    def report(self, req):
458        """Adds a report request to the cache.
459
460        Returns ``None`` if it could not be aggregated, and callers need to
461        send the request to the server, otherwise it returns ``CACHED_OK``.
462
463        Args:
464           req (:class:`messages.ReportRequest`): the request
465             to be aggregated
466
467        Result:
468           ``None`` if the request as not cached, otherwise ``CACHED_OK``
469
470        """
471        if self._cache is None:
472            return None  # no cache, send request now
473        if not isinstance(req, messages.ServicecontrolServicesReportRequest):
474            raise ValueError('Invalid request')
475        if req.serviceName != self.service_name:
476            logger.error('bad report(): service_name %s does not match ours %s',
477                         req.serviceName, self.service_name)
478            raise ValueError('Service name mismatch')
479        report_req = req.reportRequest
480        if report_req is None:
481            logger.error('bad report(): no report_request in %s', req)
482            raise ValueError('Expected report_request not set')
483        if _has_high_important_operation(report_req) or self._cache is None:
484            return None
485        ops_by_signature = _key_by_signature(report_req.operations,
486                                             _sign_operation)
487
488        # Concurrency:
489        #
490        # This holds a lock on the cache while updating it.  No i/o operations
491        # are performed, so any waiting threads see minimal delays
492        with self._cache as cache:
493            for key, op in iter(ops_by_signature.items()):
494                agg = cache.get(key)
495                if agg is None:
496                    cache[key] = operation.Aggregator(op, self._kinds)
497                else:
498                    agg.add(op)
499
500        return self.CACHED_OK
501
502
503def _has_high_important_operation(req):
504    def is_important(op):
505        return (op.importance !=
506                messages.Operation.ImportanceValueValuesEnum.LOW)
507
508    return functools.reduce(lambda x, y: x and is_important(y),
509                            req.operations, True)
510
511
512def _key_by_signature(operations, signature_func):
513    """Creates a dictionary of operations keyed by signature
514
515    Args:
516      operations (iterable[Operations]): the input operations
517
518    Returns:
519       dict[string, [Operations]]: the operations keyed by signature
520    """
521    return dict((signature_func(op), op) for op in operations)
522
523
524def _sign_operation(op):
525    """Obtains a signature for an operation in a ReportRequest.
526
527    Args:
528       op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an
529         operation used in a `ReportRequest`
530
531    Returns:
532       string: a unique signature for that operation
533    """
534    md5 = hashlib.md5()
535    md5.update(op.consumerId)
536    md5.update('\x00')
537    md5.update(op.operationName)
538    if op.labels:
539        signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels))
540    return md5.digest()
541