1# Copyright 2013 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing,
10# software distributed under the License is distributed on an
11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12# either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15"""Util functions and classes for cloudstorage_api."""
16
17
18
19__all__ = ['set_default_retry_params',
20           'RetryParams',
21          ]
22
23import copy
24import httplib
25import logging
26import math
27import os
28import threading
29import time
30import urllib
31
32
33try:
34  from google.appengine.api import urlfetch
35  from google.appengine.datastore import datastore_rpc
36  from google.appengine.ext.ndb import eventloop
37  from google.appengine.ext.ndb import utils
38  from google.appengine import runtime
39  from google.appengine.runtime import apiproxy_errors
40except ImportError:
41  from google.appengine.api import urlfetch
42  from google.appengine.datastore import datastore_rpc
43  from google.appengine import runtime
44  from google.appengine.runtime import apiproxy_errors
45  from google.appengine.ext.ndb import eventloop
46  from google.appengine.ext.ndb import utils
47
48
49_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
50                         apiproxy_errors.Error)
51
52_thread_local_settings = threading.local()
53_thread_local_settings.default_retry_params = None
54
55
56def set_default_retry_params(retry_params):
57  """Set a default RetryParams for current thread current request."""
58  _thread_local_settings.default_retry_params = copy.copy(retry_params)
59
60
61def _get_default_retry_params():
62  """Get default RetryParams for current request and current thread.
63
64  Returns:
65    A new instance of the default RetryParams.
66  """
67  default = getattr(_thread_local_settings, 'default_retry_params', None)
68  if default is None or not default.belong_to_current_request():
69    return RetryParams()
70  else:
71    return copy.copy(default)
72
73
74def _quote_filename(filename):
75  """Quotes filename to use as a valid URI path.
76
77  Args:
78    filename: user provided filename. /bucket/filename.
79
80  Returns:
81    The filename properly quoted to use as URI's path component.
82  """
83  return urllib.quote(filename)
84
85
86def _unquote_filename(filename):
87  """Unquotes a valid URI path back to its filename.
88
89  This is the opposite of _quote_filename.
90
91  Args:
92    filename: a quoted filename. /bucket/some%20filename.
93
94  Returns:
95    The filename unquoted.
96  """
97  return urllib.unquote(filename)
98
99
100def _should_retry(resp):
101  """Given a urlfetch response, decide whether to retry that request."""
102  return (resp.status_code == httplib.REQUEST_TIMEOUT or
103          (resp.status_code >= 500 and
104           resp.status_code < 600))
105
106
107class RetryParams(object):
108  """Retry configuration parameters."""
109
110  @datastore_rpc._positional(1)
111  def __init__(self,
112               backoff_factor=2.0,
113               initial_delay=0.1,
114               max_delay=10.0,
115               min_retries=2,
116               max_retries=5,
117               max_retry_period=30.0,
118               urlfetch_timeout=None,
119               save_access_token=False):
120    """Init.
121
122    This object is unique per request per thread.
123
124    Library will retry according to this setting when App Engine Server
125    can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
126    500-600 response.
127
128    Args:
129      backoff_factor: exponential backoff multiplier.
130      initial_delay: seconds to delay for the first retry.
131      max_delay: max seconds to delay for every retry.
132      min_retries: min number of times to retry. This value is automatically
133        capped by max_retries.
134      max_retries: max number of times to retry. Set this to 0 for no retry.
135      max_retry_period: max total seconds spent on retry. Retry stops when
136        this period passed AND min_retries has been attempted.
137      urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
138        in which case the value will be chosen by urlfetch module.
139      save_access_token: persist access token to datastore to avoid
140        excessive usage of GetAccessToken API. Usually the token is cached
141        in process and in memcache. In some cases, memcache isn't very
142        reliable.
143    """
144    self.backoff_factor = self._check('backoff_factor', backoff_factor)
145    self.initial_delay = self._check('initial_delay', initial_delay)
146    self.max_delay = self._check('max_delay', max_delay)
147    self.max_retry_period = self._check('max_retry_period', max_retry_period)
148    self.max_retries = self._check('max_retries', max_retries, True, int)
149    self.min_retries = self._check('min_retries', min_retries, True, int)
150    if self.min_retries > self.max_retries:
151      self.min_retries = self.max_retries
152
153    self.urlfetch_timeout = None
154    if urlfetch_timeout is not None:
155      self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
156    self.save_access_token = self._check('save_access_token', save_access_token,
157                                         True, bool)
158
159    self._request_id = os.getenv('REQUEST_LOG_ID')
160
161  def __eq__(self, other):
162    if not isinstance(other, self.__class__):
163      return False
164    return self.__dict__ == other.__dict__
165
166  def __ne__(self, other):
167    return not self.__eq__(other)
168
169  @classmethod
170  def _check(cls, name, val, can_be_zero=False, val_type=float):
171    """Check init arguments.
172
173    Args:
174      name: name of the argument. For logging purpose.
175      val: value. Value has to be non negative number.
176      can_be_zero: whether value can be zero.
177      val_type: Python type of the value.
178
179    Returns:
180      The value.
181
182    Raises:
183      ValueError: when invalid value is passed in.
184      TypeError: when invalid value type is passed in.
185    """
186    valid_types = [val_type]
187    if val_type is float:
188      valid_types.append(int)
189
190    if type(val) not in valid_types:
191      raise TypeError(
192          'Expect type %s for parameter %s' % (val_type.__name__, name))
193    if val < 0:
194      raise ValueError(
195          'Value for parameter %s has to be greater than 0' % name)
196    if not can_be_zero and val == 0:
197      raise ValueError(
198          'Value for parameter %s can not be 0' % name)
199    return val
200
201  def belong_to_current_request(self):
202    return os.getenv('REQUEST_LOG_ID') == self._request_id
203
204  def delay(self, n, start_time):
205    """Calculate delay before the next retry.
206
207    Args:
208      n: the number of current attempt. The first attempt should be 1.
209      start_time: the time when retry started in unix time.
210
211    Returns:
212      Number of seconds to wait before next retry. -1 if retry should give up.
213    """
214    if (n > self.max_retries or
215        (n > self.min_retries and
216         time.time() - start_time > self.max_retry_period)):
217      return -1
218    return min(
219        math.pow(self.backoff_factor, n-1) * self.initial_delay,
220        self.max_delay)
221
222
223def _retry_fetch(url, retry_params, **kwds):
224  """A blocking fetch function similar to urlfetch.fetch.
225
226  This function should be used when a urlfetch has timed out or the response
227  shows http request timeout. This function will put current thread to
228  sleep between retry backoffs.
229
230  Args:
231    url: url to fetch.
232    retry_params: an instance of RetryParams.
233    **kwds: keyword arguments for urlfetch. If deadline is specified in kwds,
234      it precedes the one in RetryParams. If none is specified, it's up to
235      urlfetch to use its own default.
236
237  Returns:
238    A urlfetch response from the last retry. None if no retry was attempted.
239
240  Raises:
241    Whatever exception encountered during the last retry.
242  """
243  n = 1
244  start_time = time.time()
245  delay = retry_params.delay(n, start_time)
246  if delay <= 0:
247    return
248
249  logging.info('Will retry request to %s.', url)
250  while delay > 0:
251    resp = None
252    try:
253      logging.info('Retry in %s seconds.', delay)
254      time.sleep(delay)
255      resp = urlfetch.fetch(url, **kwds)
256    except runtime.DeadlineExceededError:
257      logging.info(
258          'Urlfetch retry %s will exceed request deadline '
259          'after %s seconds total', n, time.time() - start_time)
260      raise
261    except _RETRIABLE_EXCEPTIONS, e:
262      pass
263
264    n += 1
265    delay = retry_params.delay(n, start_time)
266    if resp and not _should_retry(resp):
267      break
268    elif resp:
269      logging.info(
270          'Got status %s from GCS.', resp.status_code)
271    else:
272      logging.info(
273          'Got exception "%r" while contacting GCS.', e)
274
275  if resp:
276    return resp
277
278  logging.info('Urlfetch failed after %s retries and %s seconds in total.',
279               n - 1, time.time() - start_time)
280  raise
281
282
283def _run_until_rpc():
284  """Eagerly evaluate tasklets until it is blocking on some RPC.
285
286  Usually ndb eventloop el isn't run until some code calls future.get_result().
287
288  When an async tasklet is called, the tasklet wrapper evaluates the tasklet
289  code into a generator, enqueues a callback _help_tasklet_along onto
290  the el.current queue, and returns a future.
291
292  _help_tasklet_along, when called by the el, will
293  get one yielded value from the generator. If the value if another future,
294  set up a callback _on_future_complete to invoke _help_tasklet_along
295  when the dependent future fulfills. If the value if a RPC, set up a
296  callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
297  Thus _help_tasklet_along drills down
298  the chain of futures until some future is blocked by RPC. El runs
299  all callbacks and constantly check pending RPC status.
300  """
301  el = eventloop.get_event_loop()
302  while el.current:
303    el.run0()
304
305
306def _eager_tasklet(tasklet):
307  """Decorator to turn tasklet to run eagerly."""
308
309  @utils.wrapping(tasklet)
310  def eager_wrapper(*args, **kwds):
311    fut = tasklet(*args, **kwds)
312    _run_until_rpc()
313    return fut
314
315  return eager_wrapper
316