1# Copyright 2016 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""client provides a complete standalone service control client. 16 17:class:`Client` is a package-level facade that encapsulates all service control 18functionality. 19 20The :class:`Loaders` simplify ``Client`` initialization. 21 22``Client`` needs to stop and start a thread to implement its behaviour. In most 23environments, the default thread class is sufficient. However, on Google App Engine, 24it's necessary to use the appengine specific threading class instead. 25 26:func:`use_gae_thread` and `use_default_thread` can be used to change the thread 27class used by new instances of `Client`. 28 29Example: 30 31 >>> from google.api.control import client 32 >>> 33 >>> # use on appengine with package-default settings 34 >>> service_name = 'my-appengine-service-name' 35 >>> client.use_gae_thread() 36 >>> gae_client = client.Loaders.DEFAULT.load(service_name) 37 >>> gae_client.start() 38 39""" 40from __future__ import absolute_import 41 42from datetime import datetime, timedelta 43from enum import Enum 44import json 45import logging 46import os 47import threading 48import time 49 50from . import api_client, check_request, report_request 51from . import USER_AGENT 52from google.api.control.caches import CheckOptions, ReportOptions, to_cache_timer 53from google.api.control.vendor.py3 import sched 54 55 56logger = logging.getLogger(__name__) 57 58 59CONFIG_VAR = 'ENDPOINTS_SERVER_CONFIG_FILE' 60 61 62def _load_from_well_known_env(): 63 if CONFIG_VAR not in os.environ: 64 logger.info('did not load server config; no environ var %s', CONFIG_VAR) 65 return _load_default() 66 json_file = os.environ[CONFIG_VAR] 67 if not os.path.exists(json_file): 68 logger.warn('did not load service; missing config file %s', json_file) 69 return _load_default() 70 try: 71 with open(json_file) as f: 72 json_dict = json.load(f) 73 check_json = json_dict['checkAggregatorConfig'] 74 report_json = json_dict['reportAggregatorConfig'] 75 check_options = CheckOptions( 76 num_entries=check_json['cacheEntries'], 77 expiration=timedelta( 78 milliseconds=check_json['responseExpirationMs']), 79 flush_interval=timedelta( 80 milliseconds=check_json['flushIntervalMs'])) 81 report_options = ReportOptions( 82 num_entries=report_json['cacheEntries'], 83 flush_interval=timedelta( 84 milliseconds=report_json['flushIntervalMs'])) 85 return check_options, report_options 86 except (KeyError, ValueError): 87 logger.warn('did not load service; bad json config file %s', 88 json_file, 89 exc_info=True) 90 return _load_default() 91 92 93def _load_default(): 94 return CheckOptions(), ReportOptions() 95 96 97def _load_no_cache(): 98 return (CheckOptions(num_entries=-1), 99 ReportOptions(num_entries=-1)) 100 101 102class Loaders(Enum): 103 """Enumerates the functions used to load clients from server configs.""" 104 # pylint: disable=too-few-public-methods 105 ENVIRONMENT = (_load_from_well_known_env,) 106 DEFAULT = (_load_default,) 107 NO_CACHE = (_load_no_cache,) 108 109 def __init__(self, load_func): 110 """Constructor. 111 112 load_func is used to load a client config 113 """ 114 self._load_func = load_func 115 116 def load(self, service_name, **kw): 117 check_opts, report_opts = self._load_func() 118 return Client(service_name, check_opts, report_opts, **kw) 119 120 121_THREAD_CLASS = threading.Thread 122 123 124def _create_http_transport(): 125 additional_http_headers = {"user-agent": USER_AGENT} 126 do_logging = logger.level <= logging.DEBUG 127 return api_client.ServicecontrolV1( 128 additional_http_headers=additional_http_headers, 129 log_request=do_logging, 130 log_response=do_logging) 131 132 133def _thread_local_http_transport_func(): 134 local = threading.local() 135 136 def create_transport(): 137 if not getattr(local, "transport", None): 138 local.transport = _create_http_transport() 139 return local.transport 140 141 return create_transport 142 143 144_CREATE_THREAD_LOCAL_TRANSPORT = _thread_local_http_transport_func() 145 146 147class Client(object): 148 """Client is a package-level facade that encapsulates all service control 149 functionality. 150 151 Using one of the :class:`Loaders` makes it easy to initialize ``Client`` 152 instances. 153 154 Example: 155 156 >>> from google.api.control import client 157 >>> service_name = 'my-service-name' 158 >>> 159 >>> # create an scc client using the package default values 160 >>> default_client = client.Loaders.DEFAULT.load(service_name) 161 162 >>> # create an scc client by loading configuration from the 163 >>> # a JSON file configured by an environment variable 164 >>> json_conf_client = client.Loaders.ENVIRONMENT.load(service_name) 165 166 Client is thread-compatible 167 168 """ 169 # pylint: disable=too-many-instance-attributes, too-many-arguments 170 171 def __init__(self, 172 service_name, 173 check_options, 174 report_options, 175 timer=datetime.utcnow, 176 create_transport=_CREATE_THREAD_LOCAL_TRANSPORT): 177 """ 178 179 Args: 180 service_name (str): the name of the service to be controlled 181 check_options (:class:`google.api.control.caches.CheckOptions`): 182 configures checking 183 report_options (:class:`google.api.control.caches.ReportOptions`): 184 configures reporting 185 timer (:func[[datetime.datetime]]: used to obtain the current time. 186 """ 187 self._check_aggregator = check_request.Aggregator(service_name, 188 check_options, 189 timer=timer) 190 self._report_aggregator = report_request.Aggregator(service_name, 191 report_options, 192 timer=timer) 193 self._running = False 194 self._scheduler = None 195 self._stopped = False 196 self._timer = timer 197 self._thread = None 198 self._create_transport = create_transport 199 self._lock = threading.RLock() 200 201 def start(self): 202 """Starts processing. 203 204 Calling this method 205 206 - starts the thread that regularly flushes all enabled caches. 207 - enables the other methods on the instance to be called successfully 208 209 I.e, even when the configuration disables aggregation, it is invalid to 210 access the other methods of an instance until ``start`` is called - 211 Calls to other public methods will fail with an AssertionError. 212 213 """ 214 with self._lock: 215 if self._running: 216 logger.info('%s is already started', self) 217 return 218 219 self._stopped = False 220 self._running = True 221 logger.info('starting thread of type %s to run the scheduler', 222 _THREAD_CLASS) 223 self._thread = _THREAD_CLASS(target=self._schedule_flushes) 224 try: 225 self._thread.start() 226 except Exception: # pylint: disable=broad-except 227 logger.warn( 228 'no scheduler thread, scheduler.run() will be invoked by report(...)', 229 exc_info=True) 230 self._thread = None 231 self._initialize_flushing() 232 233 def stop(self): 234 """Halts processing 235 236 This will lead to the reports being flushed, the caches being cleared 237 and a stop to the current processing thread. 238 239 """ 240 with self._lock: 241 if self._stopped: 242 logger.info('%s is already stopped', self) 243 return 244 245 self._flush_all_reports() 246 self._stopped = True 247 if self._run_scheduler_directly: 248 self._cleanup_if_stopped() 249 250 if self._scheduler and self._scheduler.empty(): 251 # if there are events scheduled, then _running will subsequently 252 # be set False by the scheduler thread. This handles the 253 # case where there are no events, e.g because all aggreagation 254 # was disabled 255 self._running = False 256 self._scheduler = None 257 258 def check(self, check_req): 259 """Process a check_request. 260 261 The req is first passed to the check_aggregator. If there is a valid 262 cached response, that is returned, otherwise a response is obtained from 263 the transport. 264 265 Args: 266 check_req (``ServicecontrolServicesCheckRequest``): to be sent to 267 the service control service 268 269 Returns: 270 ``CheckResponse``: either the cached response if one is applicable 271 or a response from making a transport request, or None if 272 if the request to the transport fails 273 274 """ 275 276 self._assert_is_running() 277 res = self._check_aggregator.check(check_req) 278 if res: 279 logger.debug('using cached check response for %s: %s', 280 check_request, res) 281 return res 282 283 # Application code should not fail because check request's don't 284 # complete, They should fail open, so here simply log the error and 285 # return None to indicate that no response was obtained 286 try: 287 transport = self._create_transport() 288 resp = transport.services.check(check_req) 289 self._check_aggregator.add_response(check_req, resp) 290 return resp 291 except Exception: # pylint: disable=broad-except 292 logger.error('direct send of check request failed %s', 293 check_request, exc_info=True) 294 return None 295 296 def report(self, report_req): 297 """Processes a report request. 298 299 It will aggregate it with prior report_requests to be send later 300 or it will send it immediately if that's appropriate. 301 """ 302 self._assert_is_running() 303 304 # no thread running, run the scheduler to ensure any pending 305 # flush tasks are executed. 306 if self._run_scheduler_directly: 307 self._scheduler.run(blocking=False) 308 309 if not self._report_aggregator.report(report_req): 310 logger.info('need to send a report request directly') 311 try: 312 transport = self._create_transport() 313 transport.services.report(report_req) 314 except Exception: # pylint: disable=broad-except 315 logger.error('direct send for report request failed', 316 exc_info=True) 317 318 @property 319 def _run_scheduler_directly(self): 320 return self._running and self._thread is None 321 322 def _assert_is_running(self): 323 assert self._running, '%s needs to be running' % (self,) 324 325 def _initialize_flushing(self): 326 with self._lock: 327 logger.info('created a scheduler to control flushing') 328 self._scheduler = sched.scheduler(to_cache_timer(self._timer), 329 time.sleep) 330 logger.info('scheduling initial check and flush') 331 self._flush_schedule_check_aggregator() 332 self._flush_schedule_report_aggregator() 333 334 def _schedule_flushes(self): 335 # the method expects to be run in the thread created in start() 336 self._initialize_flushing() 337 self._scheduler.run() # should block until self._stopped is set 338 logger.info('scheduler.run completed, %s will exit', threading.current_thread()) 339 340 def _cleanup_if_stopped(self): 341 if not self._stopped: 342 return False 343 344 self._check_aggregator.clear() 345 self._report_aggregator.clear() 346 self._running = False 347 return True 348 349 def _flush_schedule_check_aggregator(self): 350 if self._cleanup_if_stopped(): 351 logger.info('did not schedule check flush: client is stopped') 352 return 353 354 flush_interval = self._check_aggregator.flush_interval 355 if not flush_interval or flush_interval.total_seconds() < 0: 356 logger.debug('did not schedule check flush: caching is disabled') 357 return 358 359 if self._run_scheduler_directly: 360 logger.debug('did not schedule check flush: no scheduler thread') 361 return 362 363 logger.debug('flushing the check aggregator') 364 transport = self._create_transport() 365 for req in self._check_aggregator.flush(): 366 try: 367 resp = transport.services.check(req) 368 self._check_aggregator.add_response(req, resp) 369 except Exception: # pylint: disable=broad-except 370 logger.error('failed to flush check_req %s', req, exc_info=True) 371 372 # schedule a repeat of this method 373 self._scheduler.enter( 374 flush_interval.total_seconds(), 375 2, # a higher priority than report flushes 376 self._flush_schedule_check_aggregator, 377 () 378 ) 379 380 def _flush_schedule_report_aggregator(self): 381 if self._cleanup_if_stopped(): 382 logger.info('did not schedule report flush: client is stopped') 383 return 384 385 flush_interval = self._report_aggregator.flush_interval 386 if not flush_interval or flush_interval.total_seconds() < 0: 387 logger.debug('did not schedule report flush: caching is disabled') 388 return 389 390 # flush reports and schedule a repeat of this method 391 transport = self._create_transport() 392 reqs = self._report_aggregator.flush() 393 logger.debug("will flush %d report requests", len(reqs)) 394 for req in reqs: 395 try: 396 transport.services.report(req) 397 except Exception: # pylint: disable=broad-except 398 logger.error('failed to flush report_req %s', req, exc_info=True) 399 400 self._scheduler.enter( 401 flush_interval.total_seconds(), 402 1, # a lower priority than check flushes 403 self._flush_schedule_report_aggregator, 404 () 405 ) 406 407 def _flush_all_reports(self): 408 all_requests = self._report_aggregator.clear() 409 logger.info('flushing all reports (count=%d)', len(all_requests)) 410 transport = self._create_transport() 411 for req in all_requests: 412 try: 413 transport.services.report(req) 414 except Exception: # pylint: disable=broad-except 415 logger.error('failed to flush report_req %s', req, exc_info=True) 416 417 418def use_default_thread(): 419 """Makes ``Client``s started after this use the standard Thread class.""" 420 global _THREAD_CLASS # pylint: disable=global-statement 421 _THREAD_CLASS = threading.Thread 422 423 424def use_gae_thread(): 425 """Makes ``Client``s started after this use the appengine thread class.""" 426 global _THREAD_CLASS # pylint: disable=global-statement 427 try: 428 from google.appengine.api.background_thread import background_thread 429 _THREAD_CLASS = background_thread.BackgroundThread 430 except ImportError: 431 logger.error( 432 'Could not install appengine background threads!' 433 ' Please install the python AppEngine SDK and use this from there' 434 ) 435