1# Copyright 2016 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""report_request supports aggregation of ReportRequests. 16 17It proves :class:`.Aggregator` that aggregates and batches together 18ReportRequests. 19 20""" 21 22 23from __future__ import absolute_import 24 25import collections 26import functools 27import hashlib 28import logging 29import time 30from datetime import datetime, timedelta 31 32from apitools.base.py import encoding 33from enum import Enum 34from . import caches, label_descriptor, operation, messages 35from . import metric_descriptor, signing, timestamp 36 37logger = logging.getLogger(__name__) 38 39SIZE_NOT_SET = -1 40 41 42def _validate_int_arg(name, value): 43 if value == SIZE_NOT_SET or (isinstance(value, int) and value >= 0): 44 return 45 raise ValueError('%s should be a non-negative int/long' % (name,)) 46 47 48def _validate_timedelta_arg(name, value): 49 if value is None or isinstance(value, timedelta): 50 return 51 raise ValueError('%s should be a timedelta' % (name,)) 52 53 54class ReportingRules(collections.namedtuple('ReportingRules', 55 ['logs', 'metrics', 'labels'])): 56 """Holds information that determines how to fill a `ReportRequest`. 57 58 Attributes: 59 logs (iterable[string]): the name of logs to be included in the `ReportRequest` 60 metrics (iterable[:class:`google.api.control.metric_descriptor.KnownMetrics`]): 61 the metrics to be added to a `ReportRequest` 62 labels (iterable[:class:`google.api.control.metric_descriptor.KnownLabels`]): 63 the labels to be added to a `ReportRequest` 64 """ 65 # pylint: disable=too-few-public-methods 66 67 def __new__(cls, logs=None, metrics=None, labels=None): 68 """Invokes the base constructor with default values.""" 69 logs = set(logs) if logs else set() 70 metrics = tuple(metrics) if metrics else tuple() 71 labels = tuple(labels) if labels else tuple() 72 return super(cls, ReportingRules).__new__(cls, logs, metrics, labels) 73 74 @classmethod 75 def from_known_inputs(cls, logs=None, metric_names=None, label_names=None): 76 """An alternate constructor that assumes known metrics and labels. 77 78 This differs from the default constructor in that the metrics and labels 79 are iterables of names of 'known' metrics and labels respectively. The 80 names are used to obtain the metrics and labels from 81 :class:`google.api.control.metric_descriptor.KnownMetrics` and 82 :class:`google.api.control.label_descriptor.KnownLabels` respectively. 83 84 names that don't correspond to a known metric or label are ignored; as 85 are metrics or labels that don't yet have a way of updating the 86 `ReportRequest` operation. 87 88 Args: 89 logs (iterable[string]): the name of logs to be included in the 90 `ReportRequest` 91 metric_names (iterable[string]): the name of a known metric to be 92 added to the `ReportRequest` 93 label_names (iterable[string]): the name of a known label to be added 94 to the `ReportRequest` 95 96 """ 97 if not metric_names: 98 metric_names = () 99 if not label_names: 100 label_names = () 101 known_labels = [] 102 known_metrics = [] 103 # pylint: disable=no-member 104 # pylint is not aware of the __members__ attributes 105 for l in label_descriptor.KnownLabels.__members__.values(): 106 if l.update_label_func and l.label_name in label_names: 107 known_labels.append(l) 108 for m in metric_descriptor.KnownMetrics.__members__.values(): 109 if m.update_op_func and m.metric_name in metric_names: 110 known_metrics.append(m) 111 return cls(logs=logs, metrics=known_metrics, labels=known_labels) 112 113 114class ReportedProtocols(Enum): 115 """Enumerates the protocols that can be reported.""" 116 # pylint: disable=too-few-public-methods 117 UNKNOWN = 0 118 HTTP = 1 119 HTTP2 = 2 120 GRPC = 3 121 122 123class ReportedPlatforms(Enum): 124 """Enumerates the platforms that can be reported.""" 125 # pylint: disable=too-few-public-methods 126 UNKNOWN = 0 127 GAE_FLEX = 1 128 GAE_STANDARD = 2 129 GCE = 3 130 GKE = 4 131 DEVELOPMENT = 5 132 133 134class ErrorCause(Enum): 135 """Enumerates the causes of errors.""" 136 # pylint: disable=too-few-public-methods 137 internal = 0 # default, error in scc library code 138 application = 1 # external application error 139 auth = 2 # authentication error 140 service_control = 3 # error in service control check 141 142 143# alias the severity enum 144_SEVERITY = messages.LogEntry.SeverityValueValuesEnum 145 146 147def _struct_payload_from(a_dict): 148 return encoding.PyValueToMessage(messages.LogEntry.StructPayloadValue, a_dict) 149 150 151class Info( 152 collections.namedtuple( 153 'Info', ( 154 'api_name', 155 'api_method', 156 'api_version', 157 'auth_issuer', 158 'auth_audience', 159 'backend_time', 160 'error_cause', 161 'location', 162 'log_message', 163 'method', 164 'overhead_time', 165 'platform', 166 'producer_project_id', 167 'protocol', 168 'request_size', 169 'request_time', 170 'response_code', 171 'response_size', 172 'url', 173 ) + operation.Info._fields), 174 operation.Info): 175 """Holds the information necessary to fill in a ReportRequest. 176 177 In the attribute descriptions below, N/A means 'not available' 178 179 Attributes: 180 api_name (string): the api name and version 181 api_method (string): the full api method name 182 api_version (string): the api version 183 auth_issuer (string): the auth issuer 184 auth_audience (string): the auth audience 185 backend_time(datetime.timedelta): the backend request time, None for N/A 186 error_cause(:class:`ErrorCause`): the cause of error if one has occurred 187 location (string): the location of the service 188 log_message (string): a message to log as an info log 189 method (string): the HTTP method used to make the request 190 overhead_time(datetime.timedelta): the overhead time, None for N/A 191 platform (:class:`ReportedPlatform`): the platform in use 192 producer_project_id (string): the producer project id 193 protocol (:class:`ReportedProtocol`): the protocol used 194 request_size(int): the request size in bytes, -1 means N/A 195 request_time(datetime.timedelta): the request time 196 response_size(int): the request size in bytes, -1 means N/A 197 response_code(int): the code of the http response 198 url (string): the request url 199 200 """ 201 # pylint: disable=too-many-arguments,too-many-locals 202 203 COPYABLE_LOG_FIELDS = [ 204 'api_name', 205 'api_method', 206 'api_key', 207 'producer_project_id', 208 'referer', 209 'location', 210 'log_message', 211 'url', 212 ] 213 214 def __new__(cls, 215 api_name='', 216 api_method='', 217 api_version='', 218 auth_issuer='', 219 auth_audience='', 220 backend_time=None, 221 error_cause=ErrorCause.internal, 222 location='', 223 log_message='', 224 method='', 225 overhead_time=None, 226 platform=ReportedPlatforms.UNKNOWN, 227 producer_project_id='', 228 protocol=ReportedProtocols.UNKNOWN, 229 request_size=SIZE_NOT_SET, 230 request_time=None, 231 response_size=SIZE_NOT_SET, 232 response_code=200, 233 url='', 234 **kw): 235 """Invokes the base constructor with default values.""" 236 op_info = operation.Info(**kw) 237 _validate_timedelta_arg('backend_time', backend_time) 238 _validate_timedelta_arg('overhead_time', overhead_time) 239 _validate_timedelta_arg('request_time', request_time) 240 _validate_int_arg('request_size', request_size) 241 _validate_int_arg('response_size', response_size) 242 if not isinstance(protocol, ReportedProtocols): 243 raise ValueError('protocol should be a %s' % (ReportedProtocols,)) 244 if not isinstance(platform, ReportedPlatforms): 245 raise ValueError('platform should be a %s' % (ReportedPlatforms,)) 246 if not isinstance(error_cause, ErrorCause): 247 raise ValueError('error_cause should be a %s' % (ErrorCause,)) 248 return super(cls, Info).__new__( 249 cls, 250 api_name, 251 api_method, 252 api_version, 253 auth_issuer, 254 auth_audience, 255 backend_time, 256 error_cause, 257 location, 258 log_message, 259 method, 260 overhead_time, 261 platform, 262 producer_project_id, 263 protocol, 264 request_size, 265 request_time, 266 response_code, 267 response_size, 268 url, 269 **op_info._asdict()) 270 271 def _as_log_entry(self, name, now): 272 """Makes a `LogEntry` from this instance for the given log_name. 273 274 Args: 275 rules (:class:`ReportingRules`): determines what labels, metrics and 276 logs to include in the report request. 277 now (:class:`datetime.DateTime`): the current time 278 279 Return: 280 a ``LogEntry`` generated from this instance with the given name 281 and timestamp 282 283 Raises: 284 ValueError: if the fields in this instance are insufficient to 285 to create a valid ``ServicecontrolServicesReportRequest`` 286 287 """ 288 # initialize the struct with fields that are always present 289 d = { 290 'http_response_code': self.response_code, 291 'timestamp': time.mktime(now.timetuple()) 292 } 293 294 # compute the severity 295 severity = _SEVERITY.INFO 296 if self.response_code >= 400: 297 severity = _SEVERITY.ERROR 298 d['error_cause'] = self.error_cause.name 299 300 # add 'optional' fields to the struct 301 if self.request_size > 0: 302 d['request_size'] = self.request_size 303 if self.response_size > 0: 304 d['response_size'] = self.response_size 305 if self.method: 306 d['http_method'] = self.method 307 if self.request_time: 308 d['request_latency_in_ms'] = self.request_time.total_seconds() * 1000 309 310 # add 'copyable' fields to the struct 311 for key in self.COPYABLE_LOG_FIELDS: 312 value = getattr(self, key, None) 313 if value: 314 d[key] = value 315 316 return messages.LogEntry( 317 name=name, 318 timestamp=timestamp.to_rfc3339(now), 319 severity=severity, 320 structPayload=_struct_payload_from(d)) 321 322 def as_report_request(self, rules, timer=datetime.utcnow): 323 """Makes a `ServicecontrolServicesReportRequest` from this instance 324 325 Args: 326 rules (:class:`ReportingRules`): determines what labels, metrics and 327 logs to include in the report request. 328 timer: a function that determines the current time 329 330 Return: 331 a ``ServicecontrolServicesReportRequest`` generated from this instance 332 governed by the provided ``rules`` 333 334 Raises: 335 ValueError: if the fields in this instance cannot be used to create 336 a valid ``ServicecontrolServicesReportRequest`` 337 338 """ 339 if not self.service_name: 340 raise ValueError('the service name must be set') 341 op = super(Info, self).as_operation(timer=timer) 342 343 # Populate metrics and labels if they can be associated with a 344 # method/operation 345 if op.operationId and op.operationName: 346 labels = {} 347 for known_label in rules.labels: 348 known_label.do_labels_update(self, labels) 349 if labels: 350 op.labels = encoding.PyValueToMessage( 351 messages.Operation.LabelsValue, 352 labels) 353 for known_metric in rules.metrics: 354 known_metric.do_operation_update(self, op) 355 356 # Populate the log entries 357 now = timer() 358 op.logEntries = [self._as_log_entry(l, now) for l in rules.logs] 359 360 return messages.ServicecontrolServicesReportRequest( 361 serviceName=self.service_name, 362 reportRequest=messages.ReportRequest(operations=[op])) 363 364 365_NO_RESULTS = tuple() 366 367 368class Aggregator(object): 369 """Aggregates Service Control Report requests. 370 371 :func:`report` determines if a `ReportRequest` should be sent to the 372 service immediately 373 374 """ 375 376 CACHED_OK = object() 377 """A sentinel returned by :func:`report` when a request is cached OK.""" 378 379 MAX_OPERATION_COUNT = 1000 380 """The maximum number of operations to send in a report request.""" 381 382 def __init__(self, service_name, options, kinds=None, 383 timer=datetime.utcnow): 384 """ 385 Constructor 386 387 Args: 388 service_name (string): name of the service being aggregagated 389 options (:class:`google.api.caches.ReportOptions`): configures the behavior 390 of this aggregator 391 kinds (dict[string, [:class:`.MetricKind`]]): describes the 392 type of metrics used during aggregation 393 timer (function([[datetime]]): a function that returns the current 394 as a time as a datetime instance 395 396 """ 397 self._cache = caches.create(options, timer=timer) 398 self._options = options 399 self._kinds = kinds 400 self._service_name = service_name 401 402 @property 403 def flush_interval(self): 404 """The interval between calls to flush. 405 406 Returns: 407 timedelta: the period between calls to flush if, or ``None`` if no 408 cache is set 409 410 """ 411 return None if self._cache is None else self._options.flush_interval 412 413 @property 414 def service_name(self): 415 """The service to which all requests being aggregated should belong.""" 416 return self._service_name 417 418 def flush(self): 419 """Flushes this instance's cache. 420 421 The driver of this instance should call this method every 422 `flush_interval`. 423 424 Returns: 425 list[``ServicecontrolServicesReportRequest``]: corresponding to the 426 pending cached operations 427 428 """ 429 if self._cache is None: 430 return _NO_RESULTS 431 with self._cache as c: 432 flushed_ops = [x.as_operation() for x in list(c.out_deque)] 433 c.out_deque.clear() 434 reqs = [] 435 max_ops = self.MAX_OPERATION_COUNT 436 for x in range(0, len(flushed_ops), max_ops): 437 report_request = messages.ReportRequest( 438 operations=flushed_ops[x:x + max_ops]) 439 reqs.append( 440 messages.ServicecontrolServicesReportRequest( 441 serviceName=self.service_name, 442 reportRequest=report_request)) 443 444 return reqs 445 446 def clear(self): 447 """Clears the cache.""" 448 if self._cache is None: 449 return _NO_RESULTS 450 if self._cache is not None: 451 with self._cache as k: 452 res = [x.as_operation() for x in k.values()] 453 k.clear() 454 k.out_deque.clear() 455 return res 456 457 def report(self, req): 458 """Adds a report request to the cache. 459 460 Returns ``None`` if it could not be aggregated, and callers need to 461 send the request to the server, otherwise it returns ``CACHED_OK``. 462 463 Args: 464 req (:class:`messages.ReportRequest`): the request 465 to be aggregated 466 467 Result: 468 ``None`` if the request as not cached, otherwise ``CACHED_OK`` 469 470 """ 471 if self._cache is None: 472 return None # no cache, send request now 473 if not isinstance(req, messages.ServicecontrolServicesReportRequest): 474 raise ValueError('Invalid request') 475 if req.serviceName != self.service_name: 476 logger.error('bad report(): service_name %s does not match ours %s', 477 req.serviceName, self.service_name) 478 raise ValueError('Service name mismatch') 479 report_req = req.reportRequest 480 if report_req is None: 481 logger.error('bad report(): no report_request in %s', req) 482 raise ValueError('Expected report_request not set') 483 if _has_high_important_operation(report_req) or self._cache is None: 484 return None 485 ops_by_signature = _key_by_signature(report_req.operations, 486 _sign_operation) 487 488 # Concurrency: 489 # 490 # This holds a lock on the cache while updating it. No i/o operations 491 # are performed, so any waiting threads see minimal delays 492 with self._cache as cache: 493 for key, op in iter(ops_by_signature.items()): 494 agg = cache.get(key) 495 if agg is None: 496 cache[key] = operation.Aggregator(op, self._kinds) 497 else: 498 agg.add(op) 499 500 return self.CACHED_OK 501 502 503def _has_high_important_operation(req): 504 def is_important(op): 505 return (op.importance != 506 messages.Operation.ImportanceValueValuesEnum.LOW) 507 508 return functools.reduce(lambda x, y: x and is_important(y), 509 req.operations, True) 510 511 512def _key_by_signature(operations, signature_func): 513 """Creates a dictionary of operations keyed by signature 514 515 Args: 516 operations (iterable[Operations]): the input operations 517 518 Returns: 519 dict[string, [Operations]]: the operations keyed by signature 520 """ 521 return dict((signature_func(op), op) for op in operations) 522 523 524def _sign_operation(op): 525 """Obtains a signature for an operation in a ReportRequest. 526 527 Args: 528 op (:class:`google.api.gen.servicecontrol_v1_messages.Operation`): an 529 operation used in a `ReportRequest` 530 531 Returns: 532 string: a unique signature for that operation 533 """ 534 md5 = hashlib.md5() 535 md5.update(op.consumerId) 536 md5.update('\x00') 537 md5.update(op.operationName) 538 if op.labels: 539 signing.add_dict_to_hash(md5, encoding.MessageToPyValue(op.labels)) 540 return md5.digest() 541