1from __future__ import absolute_import, division, print_function 2 3import functools 4import hmac 5import io 6import logging 7import sys 8import os 9import re 10 11import stripe 12from stripe import six 13from stripe.six.moves.urllib.parse import parse_qsl 14 15 16STRIPE_LOG = os.environ.get("STRIPE_LOG") 17 18logger = logging.getLogger("stripe") 19 20__all__ = [ 21 "io", 22 "parse_qsl", 23 "utf8", 24 "log_info", 25 "log_debug", 26 "dashboard_link", 27 "logfmt", 28] 29 30 31def utf8(value): 32 if six.PY2 and isinstance(value, six.text_type): 33 return value.encode("utf-8") 34 else: 35 return value 36 37 38def is_appengine_dev(): 39 return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get( 40 "SERVER_SOFTWARE", "" 41 ) 42 43 44def _console_log_level(): 45 if stripe.log in ["debug", "info"]: 46 return stripe.log 47 elif STRIPE_LOG in ["debug", "info"]: 48 return STRIPE_LOG 49 else: 50 return None 51 52 53def log_debug(message, **params): 54 msg = logfmt(dict(message=message, **params)) 55 if _console_log_level() == "debug": 56 print(msg, file=sys.stderr) 57 logger.debug(msg) 58 59 60def log_info(message, **params): 61 msg = logfmt(dict(message=message, **params)) 62 if _console_log_level() in ["debug", "info"]: 63 print(msg, file=sys.stderr) 64 logger.info(msg) 65 66 67def _test_or_live_environment(): 68 if stripe.api_key is None: 69 return 70 match = re.match(r"sk_(live|test)_", stripe.api_key) 71 if match is None: 72 return 73 return match.groups()[0] 74 75 76def dashboard_link(request_id): 77 return "https://dashboard.stripe.com/{env}/logs/{reqid}".format( 78 env=_test_or_live_environment() or "test", reqid=request_id 79 ) 80 81 82def logfmt(props): 83 def fmt(key, val): 84 # Handle case where val is a bytes or bytesarray 85 if six.PY3 and hasattr(val, "decode"): 86 val = val.decode("utf-8") 87 # Check if val is already a string to avoid re-encoding into 88 # ascii. Since the code is sent through 2to3, we can't just 89 # use unicode(val, encoding='utf8') since it will be 90 # translated incorrectly. 91 if not isinstance(val, six.string_types): 92 val = six.text_type(val) 93 if re.search(r"\s", val): 94 val = repr(val) 95 # key should already be a string 96 if re.search(r"\s", key): 97 key = repr(key) 98 return u"{key}={val}".format(key=key, val=val) 99 100 return u" ".join([fmt(key, val) for key, val in sorted(props.items())]) 101 102 103# Borrowed from Django's source code 104if hasattr(hmac, "compare_digest"): 105 # Prefer the stdlib implementation, when available. 106 def secure_compare(val1, val2): 107 return hmac.compare_digest(utf8(val1), utf8(val2)) 108 109 110else: 111 112 def secure_compare(val1, val2): 113 """ 114 Returns True if the two strings are equal, False otherwise. 115 The time taken is independent of the number of characters that match. 116 For the sake of simplicity, this function executes in constant time 117 only when the two strings have the same length. It short-circuits when 118 they have different lengths. 119 """ 120 val1, val2 = utf8(val1), utf8(val2) 121 if len(val1) != len(val2): 122 return False 123 result = 0 124 if six.PY3 and isinstance(val1, bytes) and isinstance(val2, bytes): 125 for x, y in zip(val1, val2): 126 result |= x ^ y 127 else: 128 for x, y in zip(val1, val2): 129 result |= ord(x) ^ ord(y) 130 return result == 0 131 132 133def get_object_classes(): 134 # This is here to avoid a circular dependency 135 from stripe.object_classes import OBJECT_CLASSES 136 137 return OBJECT_CLASSES 138 139 140def convert_to_stripe_object( 141 resp, api_key=None, stripe_version=None, stripe_account=None 142): 143 # If we get a StripeResponse, we'll want to return a 144 # StripeObject with the last_response field filled out with 145 # the raw API response information 146 stripe_response = None 147 148 if isinstance(resp, stripe.stripe_response.StripeResponse): 149 stripe_response = resp 150 resp = stripe_response.data 151 152 if isinstance(resp, list): 153 return [ 154 convert_to_stripe_object( 155 i, api_key, stripe_version, stripe_account 156 ) 157 for i in resp 158 ] 159 elif isinstance(resp, dict) and not isinstance( 160 resp, stripe.stripe_object.StripeObject 161 ): 162 resp = resp.copy() 163 klass_name = resp.get("object") 164 if isinstance(klass_name, six.string_types): 165 klass = get_object_classes().get( 166 klass_name, stripe.stripe_object.StripeObject 167 ) 168 else: 169 klass = stripe.stripe_object.StripeObject 170 171 return klass.construct_from( 172 resp, 173 api_key, 174 stripe_version=stripe_version, 175 stripe_account=stripe_account, 176 last_response=stripe_response, 177 ) 178 else: 179 return resp 180 181 182def convert_to_dict(obj): 183 """Converts a StripeObject back to a regular dict. 184 185 Nested StripeObjects are also converted back to regular dicts. 186 187 :param obj: The StripeObject to convert. 188 189 :returns: The StripeObject as a dict. 190 """ 191 if isinstance(obj, list): 192 return [convert_to_dict(i) for i in obj] 193 # This works by virtue of the fact that StripeObjects _are_ dicts. The dict 194 # comprehension returns a regular dict and recursively applies the 195 # conversion to each value. 196 elif isinstance(obj, dict): 197 return {k: convert_to_dict(v) for k, v in six.iteritems(obj)} 198 else: 199 return obj 200 201 202def populate_headers(idempotency_key): 203 if idempotency_key is not None: 204 return {"Idempotency-Key": idempotency_key} 205 return None 206 207 208class class_method_variant(object): 209 def __init__(self, class_method_name): 210 self.class_method_name = class_method_name 211 212 def __call__(self, method): 213 self.method = method 214 return self 215 216 def __get__(self, obj=None, objtype=None): 217 @functools.wraps(self.method) 218 def _wrapper(*args, **kwargs): 219 if obj is not None: 220 return self.method(obj, *args, **kwargs) 221 else: 222 class_method = getattr(objtype, self.class_method_name) 223 return class_method(*args, **kwargs) 224 225 return _wrapper 226