1# util/compat.py 2# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors 3# <see AUTHORS file> 4# 5# This module is part of SQLAlchemy and is released under 6# the MIT License: http://www.opensource.org/licenses/mit-license.php 7 8"""Handle Python version/platform incompatibilities.""" 9 10import sys 11from contextlib import contextmanager 12 13try: 14 import threading 15except ImportError: 16 import dummy_threading as threading 17 18py36 = sys.version_info >= (3, 6) 19py33 = sys.version_info >= (3, 3) 20py32 = sys.version_info >= (3, 2) 21py3k = sys.version_info >= (3, 0) 22py2k = sys.version_info < (3, 0) 23py265 = sys.version_info >= (2, 6, 5) 24jython = sys.platform.startswith('java') 25pypy = hasattr(sys, 'pypy_version_info') 26win32 = sys.platform.startswith('win') 27cpython = not pypy and not jython # TODO: something better for this ? 28 29import collections 30next = next 31 32if py3k: 33 import pickle 34else: 35 try: 36 import cPickle as pickle 37 except ImportError: 38 import pickle 39 40# work around http://bugs.python.org/issue2646 41if py265: 42 safe_kwarg = lambda arg: arg 43else: 44 safe_kwarg = str 45 46ArgSpec = collections.namedtuple("ArgSpec", 47 ["args", "varargs", "keywords", "defaults"]) 48 49if py3k: 50 import builtins 51 52 from inspect import getfullargspec as inspect_getfullargspec 53 from urllib.parse import (quote_plus, unquote_plus, 54 parse_qsl, quote, unquote) 55 import configparser 56 from io import StringIO 57 58 from io import BytesIO as byte_buffer 59 60 def inspect_getargspec(func): 61 return ArgSpec( 62 *inspect_getfullargspec(func)[0:4] 63 ) 64 65 string_types = str, 66 binary_types = bytes, 67 binary_type = bytes 68 text_type = str 69 int_types = int, 70 iterbytes = iter 71 72 def u(s): 73 return s 74 75 def ue(s): 76 return s 77 78 def b(s): 79 return s.encode("latin-1") 80 81 if py32: 82 callable = callable 83 else: 84 def callable(fn): 85 return hasattr(fn, '__call__') 86 87 def cmp(a, b): 88 return (a > b) - (a < b) 89 90 from functools import reduce 91 92 print_ = getattr(builtins, "print") 93 94 import_ = getattr(builtins, '__import__') 95 96 import itertools 97 itertools_filterfalse = itertools.filterfalse 98 itertools_filter = filter 99 itertools_imap = map 100 from itertools import zip_longest 101 102 import base64 103 104 def b64encode(x): 105 return base64.b64encode(x).decode('ascii') 106 107 def b64decode(x): 108 return base64.b64decode(x.encode('ascii')) 109 110else: 111 from inspect import getargspec as inspect_getfullargspec 112 inspect_getargspec = inspect_getfullargspec 113 from urllib import quote_plus, unquote_plus, quote, unquote 114 from urlparse import parse_qsl 115 import ConfigParser as configparser 116 from StringIO import StringIO 117 from cStringIO import StringIO as byte_buffer 118 119 string_types = basestring, 120 binary_types = bytes, 121 binary_type = str 122 text_type = unicode 123 int_types = int, long 124 125 def iterbytes(buf): 126 return (ord(byte) for byte in buf) 127 128 def u(s): 129 # this differs from what six does, which doesn't support non-ASCII 130 # strings - we only use u() with 131 # literal source strings, and all our source files with non-ascii 132 # in them (all are tests) are utf-8 encoded. 133 return unicode(s, "utf-8") 134 135 def ue(s): 136 return unicode(s, "unicode_escape") 137 138 def b(s): 139 return s 140 141 def import_(*args): 142 if len(args) == 4: 143 args = args[0:3] + ([str(arg) for arg in args[3]],) 144 return __import__(*args) 145 146 callable = callable 147 cmp = cmp 148 reduce = reduce 149 150 import base64 151 b64encode = base64.b64encode 152 b64decode = base64.b64decode 153 154 def print_(*args, **kwargs): 155 fp = kwargs.pop("file", sys.stdout) 156 if fp is None: 157 return 158 for arg in enumerate(args): 159 if not isinstance(arg, basestring): 160 arg = str(arg) 161 fp.write(arg) 162 163 import itertools 164 itertools_filterfalse = itertools.ifilterfalse 165 itertools_filter = itertools.ifilter 166 itertools_imap = itertools.imap 167 from itertools import izip_longest as zip_longest 168 169 170import time 171if win32 or jython: 172 time_func = time.clock 173else: 174 time_func = time.time 175 176from collections import namedtuple 177from operator import attrgetter as dottedgetter 178 179 180if py3k: 181 def reraise(tp, value, tb=None, cause=None): 182 if cause is not None: 183 assert cause is not value, "Same cause emitted" 184 value.__cause__ = cause 185 if value.__traceback__ is not tb: 186 raise value.with_traceback(tb) 187 raise value 188 189else: 190 # not as nice as that of Py3K, but at least preserves 191 # the code line where the issue occurred 192 exec("def reraise(tp, value, tb=None, cause=None):\n" 193 " if cause is not None:\n" 194 " assert cause is not value, 'Same cause emitted'\n" 195 " raise tp, value, tb\n") 196 197 198def raise_from_cause(exception, exc_info=None): 199 if exc_info is None: 200 exc_info = sys.exc_info() 201 exc_type, exc_value, exc_tb = exc_info 202 cause = exc_value if exc_value is not exception else None 203 reraise(type(exception), exception, tb=exc_tb, cause=cause) 204 205if py3k: 206 exec_ = getattr(builtins, 'exec') 207else: 208 def exec_(func_text, globals_, lcl=None): 209 if lcl is None: 210 exec('exec func_text in globals_') 211 else: 212 exec('exec func_text in globals_, lcl') 213 214 215def with_metaclass(meta, *bases): 216 """Create a base class with a metaclass. 217 218 Drops the middle class upon creation. 219 220 Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ 221 222 """ 223 224 class metaclass(meta): 225 __call__ = type.__call__ 226 __init__ = type.__init__ 227 228 def __new__(cls, name, this_bases, d): 229 if this_bases is None: 230 return type.__new__(cls, name, (), d) 231 return meta(name, bases, d) 232 return metaclass('temporary_class', None, {}) 233 234 235 236 237@contextmanager 238def nested(*managers): 239 """Implement contextlib.nested, mostly for unit tests. 240 241 As tests still need to run on py2.6 we can't use multiple-with yet. 242 243 Function is removed in py3k but also emits deprecation warning in 2.7 244 so just roll it here for everyone. 245 246 """ 247 248 exits = [] 249 vars = [] 250 exc = (None, None, None) 251 try: 252 for mgr in managers: 253 exit = mgr.__exit__ 254 enter = mgr.__enter__ 255 vars.append(enter()) 256 exits.append(exit) 257 yield vars 258 except: 259 exc = sys.exc_info() 260 finally: 261 while exits: 262 exit = exits.pop() 263 try: 264 if exit(*exc): 265 exc = (None, None, None) 266 except: 267 exc = sys.exc_info() 268 if exc != (None, None, None): 269 reraise(exc[0], exc[1], exc[2]) 270