1# Copyright 2016 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""client provides a complete standalone service control client.
16
17:class:`Client` is a package-level facade that encapsulates all service control
18functionality.
19
20The :class:`Loaders` simplify ``Client`` initialization.
21
22``Client`` needs to stop and start a thread to implement its behaviour.  In most
23environments, the default thread class is sufficient.  However, on Google App Engine,
24it's necessary to use the appengine specific threading class instead.
25
26:func:`use_gae_thread` and `use_default_thread` can be used to change the thread
27class used by new instances of `Client`.
28
29Example:
30
31  >>> from google.api.control import client
32  >>>
33  >>> # use on appengine with package-default settings
34  >>> service_name = 'my-appengine-service-name'
35  >>> client.use_gae_thread()
36  >>> gae_client = client.Loaders.DEFAULT.load(service_name)
37  >>> gae_client.start()
38
39"""
40from __future__ import absolute_import
41
42from datetime import datetime, timedelta
43from enum import Enum
44import json
45import logging
46import os
47import threading
48import time
49
50from . import api_client, check_request, report_request
51from . import USER_AGENT
52from google.api.control.caches import CheckOptions, ReportOptions, to_cache_timer
53from google.api.control.vendor.py3 import sched
54
55
56logger = logging.getLogger(__name__)
57
58
59CONFIG_VAR = 'ENDPOINTS_SERVER_CONFIG_FILE'
60
61
62def _load_from_well_known_env():
63    if CONFIG_VAR not in os.environ:
64        logger.info('did not load server config; no environ var %s', CONFIG_VAR)
65        return _load_default()
66    json_file = os.environ[CONFIG_VAR]
67    if not os.path.exists(json_file):
68        logger.warn('did not load service; missing config file %s', json_file)
69        return _load_default()
70    try:
71        with open(json_file) as f:
72            json_dict = json.load(f)
73            check_json = json_dict['checkAggregatorConfig']
74            report_json = json_dict['reportAggregatorConfig']
75            check_options = CheckOptions(
76                num_entries=check_json['cacheEntries'],
77                expiration=timedelta(
78                    milliseconds=check_json['responseExpirationMs']),
79                flush_interval=timedelta(
80                    milliseconds=check_json['flushIntervalMs']))
81            report_options = ReportOptions(
82                num_entries=report_json['cacheEntries'],
83                flush_interval=timedelta(
84                    milliseconds=report_json['flushIntervalMs']))
85            return check_options, report_options
86    except (KeyError, ValueError):
87        logger.warn('did not load service; bad json config file %s',
88                    json_file,
89                    exc_info=True)
90        return _load_default()
91
92
93def _load_default():
94    return CheckOptions(), ReportOptions()
95
96
97def _load_no_cache():
98    return (CheckOptions(num_entries=-1),
99            ReportOptions(num_entries=-1))
100
101
102class Loaders(Enum):
103    """Enumerates the functions used to load clients from server configs."""
104    # pylint: disable=too-few-public-methods
105    ENVIRONMENT = (_load_from_well_known_env,)
106    DEFAULT = (_load_default,)
107    NO_CACHE = (_load_no_cache,)
108
109    def __init__(self, load_func):
110        """Constructor.
111
112        load_func is used to load a client config
113        """
114        self._load_func = load_func
115
116    def load(self, service_name, **kw):
117        check_opts, report_opts = self._load_func()
118        return Client(service_name, check_opts, report_opts, **kw)
119
120
121_THREAD_CLASS = threading.Thread
122
123
124def _create_http_transport():
125    additional_http_headers = {"user-agent": USER_AGENT}
126    do_logging = logger.level <= logging.DEBUG
127    return api_client.ServicecontrolV1(
128        additional_http_headers=additional_http_headers,
129        log_request=do_logging,
130        log_response=do_logging)
131
132
133def _thread_local_http_transport_func():
134    local = threading.local()
135
136    def create_transport():
137        if not getattr(local, "transport", None):
138            local.transport = _create_http_transport()
139        return local.transport
140
141    return create_transport
142
143
144_CREATE_THREAD_LOCAL_TRANSPORT = _thread_local_http_transport_func()
145
146
147class Client(object):
148    """Client is a package-level facade that encapsulates all service control
149    functionality.
150
151    Using one of the :class:`Loaders` makes it easy to initialize ``Client``
152    instances.
153
154    Example:
155
156      >>> from google.api.control import client
157      >>> service_name = 'my-service-name'
158      >>>
159      >>> # create an scc client using the package default values
160      >>> default_client = client.Loaders.DEFAULT.load(service_name)
161
162      >>> # create an scc client by loading configuration from the
163      >>> # a JSON file configured by an environment variable
164      >>> json_conf_client = client.Loaders.ENVIRONMENT.load(service_name)
165
166    Client is thread-compatible
167
168    """
169    # pylint: disable=too-many-instance-attributes, too-many-arguments
170
171    def __init__(self,
172                 service_name,
173                 check_options,
174                 report_options,
175                 timer=datetime.utcnow,
176                 create_transport=_CREATE_THREAD_LOCAL_TRANSPORT):
177        """
178
179        Args:
180            service_name (str): the name of the service to be controlled
181            check_options (:class:`google.api.control.caches.CheckOptions`):
182              configures checking
183            report_options (:class:`google.api.control.caches.ReportOptions`):
184              configures reporting
185            timer (:func[[datetime.datetime]]: used to obtain the current time.
186        """
187        self._check_aggregator = check_request.Aggregator(service_name,
188                                                          check_options,
189                                                          timer=timer)
190        self._report_aggregator = report_request.Aggregator(service_name,
191                                                            report_options,
192                                                            timer=timer)
193        self._running = False
194        self._scheduler = None
195        self._stopped = False
196        self._timer = timer
197        self._thread = None
198        self._create_transport = create_transport
199        self._lock = threading.RLock()
200
201    def start(self):
202        """Starts processing.
203
204        Calling this method
205
206        - starts the thread that regularly flushes all enabled caches.
207        - enables the other methods on the instance to be called successfully
208
209        I.e, even when the configuration disables aggregation, it is invalid to
210        access the other methods of an instance until ``start`` is called -
211        Calls to other public methods will fail with an AssertionError.
212
213        """
214        with self._lock:
215            if self._running:
216                logger.info('%s is already started', self)
217                return
218
219            self._stopped = False
220            self._running = True
221            logger.info('starting thread of type %s to run the scheduler',
222                        _THREAD_CLASS)
223            self._thread = _THREAD_CLASS(target=self._schedule_flushes)
224            try:
225                self._thread.start()
226            except Exception:  # pylint: disable=broad-except
227                logger.warn(
228                    'no scheduler thread, scheduler.run() will be invoked by report(...)',
229                    exc_info=True)
230                self._thread = None
231                self._initialize_flushing()
232
233    def stop(self):
234        """Halts processing
235
236        This will lead to the reports being flushed, the caches being cleared
237        and a stop to the current processing thread.
238
239        """
240        with self._lock:
241            if self._stopped:
242                logger.info('%s is already stopped', self)
243                return
244
245            self._flush_all_reports()
246            self._stopped = True
247            if self._run_scheduler_directly:
248                self._cleanup_if_stopped()
249
250            if self._scheduler and self._scheduler.empty():
251                # if there are events scheduled, then _running will subsequently
252                # be set False by the scheduler thread.  This handles the
253                # case where there are no events, e.g because all aggreagation
254                # was disabled
255                self._running = False
256            self._scheduler = None
257
258    def check(self, check_req):
259        """Process a check_request.
260
261        The req is first passed to the check_aggregator.  If there is a valid
262        cached response, that is returned, otherwise a response is obtained from
263        the transport.
264
265        Args:
266          check_req (``ServicecontrolServicesCheckRequest``): to be sent to
267            the service control service
268
269        Returns:
270           ``CheckResponse``: either the cached response if one is applicable
271            or a response from making a transport request, or None if
272            if the request to the transport fails
273
274        """
275
276        self._assert_is_running()
277        res = self._check_aggregator.check(check_req)
278        if res:
279            logger.debug('using cached check response for %s: %s',
280                         check_request, res)
281            return res
282
283        # Application code should not fail because check request's don't
284        # complete, They should fail open, so here simply log the error and
285        # return None to indicate that no response was obtained
286        try:
287            transport = self._create_transport()
288            resp = transport.services.check(check_req)
289            self._check_aggregator.add_response(check_req, resp)
290            return resp
291        except Exception:  # pylint: disable=broad-except
292            logger.error('direct send of check request failed %s',
293                         check_request, exc_info=True)
294            return None
295
296    def report(self, report_req):
297        """Processes a report request.
298
299        It will aggregate it with prior report_requests to be send later
300        or it will send it immediately if that's appropriate.
301        """
302        self._assert_is_running()
303
304        # no thread running, run the scheduler to ensure any pending
305        # flush tasks are executed.
306        if self._run_scheduler_directly:
307            self._scheduler.run(blocking=False)
308
309        if not self._report_aggregator.report(report_req):
310            logger.info('need to send a report request directly')
311            try:
312                transport = self._create_transport()
313                transport.services.report(report_req)
314            except Exception:  # pylint: disable=broad-except
315                logger.error('direct send for report request failed',
316                             exc_info=True)
317
318    @property
319    def _run_scheduler_directly(self):
320        return self._running and self._thread is None
321
322    def _assert_is_running(self):
323        assert self._running, '%s needs to be running' % (self,)
324
325    def _initialize_flushing(self):
326        with self._lock:
327            logger.info('created a scheduler to control flushing')
328            self._scheduler = sched.scheduler(to_cache_timer(self._timer),
329                                              time.sleep)
330            logger.info('scheduling initial check and flush')
331            self._flush_schedule_check_aggregator()
332            self._flush_schedule_report_aggregator()
333
334    def _schedule_flushes(self):
335        # the method expects to be run in the thread created in start()
336        self._initialize_flushing()
337        self._scheduler.run() # should block until self._stopped is set
338        logger.info('scheduler.run completed, %s will exit', threading.current_thread())
339
340    def _cleanup_if_stopped(self):
341        if not self._stopped:
342            return False
343
344        self._check_aggregator.clear()
345        self._report_aggregator.clear()
346        self._running = False
347        return True
348
349    def _flush_schedule_check_aggregator(self):
350        if self._cleanup_if_stopped():
351            logger.info('did not schedule check flush: client is stopped')
352            return
353
354        flush_interval = self._check_aggregator.flush_interval
355        if not flush_interval or flush_interval.total_seconds() < 0:
356            logger.debug('did not schedule check flush: caching is disabled')
357            return
358
359        if self._run_scheduler_directly:
360            logger.debug('did not schedule check flush: no scheduler thread')
361            return
362
363        logger.debug('flushing the check aggregator')
364        transport = self._create_transport()
365        for req in self._check_aggregator.flush():
366            try:
367                resp = transport.services.check(req)
368                self._check_aggregator.add_response(req, resp)
369            except Exception:  # pylint: disable=broad-except
370                logger.error('failed to flush check_req %s', req, exc_info=True)
371
372        # schedule a repeat of this method
373        self._scheduler.enter(
374            flush_interval.total_seconds(),
375            2,  # a higher priority than report flushes
376            self._flush_schedule_check_aggregator,
377            ()
378        )
379
380    def _flush_schedule_report_aggregator(self):
381        if self._cleanup_if_stopped():
382            logger.info('did not schedule report flush: client is stopped')
383            return
384
385        flush_interval = self._report_aggregator.flush_interval
386        if not flush_interval or flush_interval.total_seconds() < 0:
387            logger.debug('did not schedule report flush: caching is disabled')
388            return
389
390        # flush reports and schedule a repeat of this method
391        transport = self._create_transport()
392        reqs = self._report_aggregator.flush()
393        logger.debug("will flush %d report requests", len(reqs))
394        for req in reqs:
395            try:
396                transport.services.report(req)
397            except Exception:  # pylint: disable=broad-except
398                logger.error('failed to flush report_req %s', req, exc_info=True)
399
400        self._scheduler.enter(
401            flush_interval.total_seconds(),
402            1,  # a lower priority than check flushes
403            self._flush_schedule_report_aggregator,
404            ()
405        )
406
407    def _flush_all_reports(self):
408        all_requests = self._report_aggregator.clear()
409        logger.info('flushing all reports (count=%d)', len(all_requests))
410        transport = self._create_transport()
411        for req in all_requests:
412            try:
413                transport.services.report(req)
414            except Exception:  # pylint: disable=broad-except
415                logger.error('failed to flush report_req %s', req, exc_info=True)
416
417
418def use_default_thread():
419    """Makes ``Client``s started after this use the standard Thread class."""
420    global _THREAD_CLASS  # pylint: disable=global-statement
421    _THREAD_CLASS = threading.Thread
422
423
424def use_gae_thread():
425    """Makes ``Client``s started after this use the appengine thread class."""
426    global _THREAD_CLASS  # pylint: disable=global-statement
427    try:
428        from google.appengine.api.background_thread import background_thread
429        _THREAD_CLASS = background_thread.BackgroundThread
430    except ImportError:
431        logger.error(
432            'Could not install appengine background threads!'
433            ' Please install the python AppEngine SDK and use this from there'
434        )
435