1"""Generic functions."""
2
3import collections
4import contextlib
5import itertools
6import os.path
7import re
8import subprocess
9import sys
10import threading
11import traceback
12import types
13from typing import List, Tuple
14import weakref
15
16from pytype import pytype_source_utils
17
18
19# We disable the check that keeps pytype from running on not-yet-supported
20# versions when we detect that a pytype test is executing, in order to be able
21# to test upcoming versions.
22def _validate_python_version_upper_bound():
23  for frame_summary in traceback.extract_stack():
24    head, tail = os.path.split(frame_summary.filename)
25    if "/pytype/" in head + "/" and (
26        tail.startswith("test_") or tail.endswith("_test.py")):
27      return False
28  return True
29
30
31_VALIDATE_PYTHON_VERSION_UPPER_BOUND = _validate_python_version_upper_bound()
32
33
34def message(error):
35  """A convenience function which extracts a message from an exception.
36
37  Use this to replace exception.message, which is deprecated in python2 and
38  removed in python3.
39
40  Args:
41    error: The exception.
42
43  Returns:
44    A message string.
45  """
46  return error.args[0] if error.args else ""
47
48
49class UsageError(Exception):
50  """Raise this for top-level usage errors."""
51
52
53def format_version(python_version):
54  """Format a version tuple into a dotted version string."""
55  return ".".join(str(x) for x in python_version)
56
57
58def version_from_string(version_string):
59  """Parse a version string like "3" or "3.7" into a tuple."""
60  try:
61    version_int = int(version_string)
62  except ValueError:
63    return tuple(map(int, version_string.split(".")))
64  return full_version_from_major(version_int)
65
66
67# TODO(b/195453869): There's no longer any reason to accept just a major
68# version, since all supported versions are Python 3.x.
69def full_version_from_major(major_version):
70  """Get a (major, minor) Python version tuple from a major version int."""
71  if major_version == sys.version_info.major:
72    return sys.version_info[:2]
73  else:
74    raise UsageError(
75        "Cannot infer Python minor version for major version %d. "
76        "Specify the version as <major>.<minor>." % major_version)
77
78
79def normalize_version(version):
80  """Gets a version tuple from either a major version int or a version tuple."""
81  if isinstance(version, int):
82    return full_version_from_major(version)
83  else:
84    return version
85
86
87def validate_version(python_version):
88  """Raise an exception if the python version is unsupported."""
89  if len(python_version) != 2:
90    # This is typically validated in the option parser, but check here too in
91    # case we get python_version via a different entry point.
92    raise UsageError("python_version must be <major>.<minor>: %r" %
93                     format_version(python_version))
94  elif python_version <= (2, 7):
95    raise UsageError("Python version %r is not supported. "
96                     "Use pytype release 2021.08.03 for Python 2 support." %
97                     format_version(python_version))
98  elif (2, 8) <= python_version < (3, 0):
99    raise UsageError("Python version %r is not a valid Python version." %
100                     format_version(python_version))
101  elif (3, 0) <= python_version <= (3, 4):
102    # These have odd __build_class__ parameters, store co_code.co_name fields
103    # as unicode, and don't yet have the extra qualname parameter to
104    # MAKE_FUNCTION. Jumping through these extra hoops is not worth it, given
105    # that typing.py isn't introduced until 3.5, anyway.
106    raise UsageError(
107        "Python versions 3.0 - 3.4 are not supported. Use 3.5 and higher.")
108  elif python_version > (3, 9) and _VALIDATE_PYTHON_VERSION_UPPER_BOUND:
109    # We have an explicit per-minor-version mapping in opcodes.py
110    raise UsageError("Python versions > 3.9 are not yet supported.")
111
112
113def strip_prefix(string, prefix):
114  """Strip off prefix if it exists."""
115  if string.startswith(prefix):
116    return string[len(prefix):]
117  return string
118
119
120def maybe_truncate(s, length=30):
121  """Truncate long strings (and append '...'), but leave short strings alone."""
122  s = str(s)
123  if len(s) > length-3:
124    return s[0:length-3] + "..."
125  else:
126    return s
127
128
129def pretty_conjunction(conjunction):
130  """Pretty-print a conjunction. Use parentheses as necessary.
131
132  E.g. ["a", "b"] -> "(a & b)"
133
134  Args:
135    conjunction: List of strings.
136  Returns:
137    A pretty-printed string.
138  """
139  if not conjunction:
140    return "true"
141  elif len(conjunction) == 1:
142    return conjunction[0]
143  else:
144    return "(" + " & ".join(conjunction) + ")"
145
146
147def pretty_dnf(dnf):
148  """Pretty-print a disjunctive normal form (disjunction of conjunctions).
149
150  E.g. [["a", "b"], ["c"]] -> "(a & b) | c".
151
152  Args:
153    dnf: A list of list of strings. (Disjunction of conjunctions of strings)
154  Returns:
155    A pretty-printed string.
156  """
157  if not dnf:
158    return "false"
159  else:
160    return " | ".join(pretty_conjunction(c) for c in dnf)
161
162
163def numeric_sort_key(s):
164  return tuple((int(e) if e.isdigit() else e) for e in re.split(r"(\d+)", s))
165
166
167def concat_tuples(tuples):
168  return tuple(itertools.chain.from_iterable(tuples))
169
170
171def native_str(s, errors="strict"):
172  """Convert a bytes object to the native str type."""
173  if isinstance(s, str):
174    return s
175  else:
176    assert isinstance(s, bytes)
177    return s.decode("utf-8", errors)
178
179
180def get_python_exe(python_version) -> Tuple[List[str], List[str]]:
181  """Find a python executable to use.
182
183  Arguments:
184    python_version: the version tuple (e.g. (3, 7))
185  Returns:
186    A tuple of the path to the executable and any command-line flags
187  """
188  # Use custom interpreters, if provided, in preference to the ones in $PATH
189  custom_python_exe = pytype_source_utils.get_custom_python_exe(python_version)
190  if custom_python_exe:
191    python_exe = [custom_python_exe]
192  elif sys.platform == "win32":
193    python_exe = ["py", "-%d.%d" % python_version]
194  else:
195    python_exe = ["python%d.%d" % python_version]
196  return python_exe, []
197
198
199def get_python_exe_version(python_exe: List[str]):
200  """Determine the major and minor version of given Python executable.
201
202  Arguments:
203    python_exe: absolute path to the Python executable
204  Returns:
205    Version as (major, minor) tuple.
206  """
207  try:
208    python_exe_version = subprocess.check_output(
209        python_exe + ["-V"], stderr=subprocess.STDOUT).decode()
210  except subprocess.CalledProcessError:
211    return None
212
213  return parse_exe_version_string(python_exe_version)
214
215
216def parse_exe_version_string(version_str):
217  """Parse the version string of a Python executable.
218
219  Arguments:
220    version_str: Version string as emitted by running `PYTHON_EXE -V`
221  Returns:
222    Version as (major, minor) tuple.
223  """
224  # match the major.minor part of the version string, ignore the micro part
225  matcher = re.search(r"Python (\d+\.\d+)\.\d+", version_str)
226
227  if matcher:
228    return version_from_string(matcher.group(1))
229  else:
230    return None
231
232
233def can_compile_bytecode_natively(python_version):
234  # Optimization: calling compile_bytecode directly is faster than spawning a
235  # subprocess and lets us avoid extracting a large Python executable into /tmp.
236  # We can do this only when the host and target versions match.
237  return python_version == sys.version_info[:2]
238
239
240def list_startswith(l, prefix):
241  """Like str.startswith, but for lists."""
242  return l[:len(prefix)] == prefix
243
244
245def list_strip_prefix(l, prefix):
246  """Remove prefix, if it's there."""
247  return l[len(prefix):] if list_startswith(l, prefix) else l
248
249
250def _arg_names(f):
251  """Return the argument names of a function."""
252  return f.__code__.co_varnames[:f.__code__.co_argcount]
253
254
255class memoize:  # pylint: disable=invalid-name
256  """A memoizing decorator that supports expressions as keys.
257
258  Use it like this:
259    @memoize
260    def f(x):
261      ...
262  or
263    @memoize("(id(x), y)")
264    def f(x, y, z):
265      ...
266  .
267  Careful with methods. If you have code like
268    @memoize("x")
269    def f(self, x):
270      ...
271  then memoized values will be shared across instances.
272
273  This decorator contains some speed optimizations that make it not thread-safe.
274  """
275
276  def __new__(cls, key_or_function):
277    if isinstance(key_or_function, types.FunctionType):
278      f = key_or_function
279      key = "(" + ", ".join(_arg_names(f)) + ")"
280      return memoize(key)(f)
281    else:
282      key = key_or_function
283      return object.__new__(cls)
284
285  def __init__(self, key):
286    self.key = key
287
288  def __call__(self, f):
289    key_program = compile(self.key, filename=__name__, mode="eval")
290    argnames = _arg_names(f)
291    memoized = {}
292    no_result = object()
293    if f.__defaults__:
294      defaults = dict(zip(argnames[-len(f.__defaults__):], f.__defaults__))
295    else:
296      defaults = {}
297    pos_and_arg_tuples = list(zip(range(f.__code__.co_argcount), argnames))
298    shared_dict = {}
299    # TODO(b/159037011): Use functools.wraps or functools.update_wrapper to
300    # preserve the metadata of the original function.
301    def call(*posargs, **kwargs):
302      """Call a memoized function."""
303      if kwargs or defaults:
304        # Slower version; for default arguments, we need two dictionaries.
305        args = defaults.copy()
306        args.update(dict(zip(argnames, posargs)))
307        args.update(kwargs)
308        key = eval(key_program, args)  # pylint: disable=eval-used
309      else:
310        # Faster version, if we have no default args.
311        for pos, arg in pos_and_arg_tuples:
312          # We know we write *all* the values, so we can re-use the dictionary.
313          shared_dict[arg] = posargs[pos]
314        key = eval(key_program, shared_dict)  # pylint: disable=eval-used
315      result = memoized.get(key, no_result)
316      if result is no_result:
317        # Call the actual function.
318        result = f(*posargs, **kwargs)
319        memoized[key] = result
320      return result
321    return call
322
323
324def invert_dict(d):
325  """Invert a dictionary.
326
327  Converts a dictionary (mapping strings to lists of strings) to a dictionary
328  that maps into the other direction.
329
330  Arguments:
331    d: Dictionary to be inverted
332
333  Returns:
334    A dictionary n with the property that if "y in d[x]", then "x in n[y]".
335  """
336
337  inverted = collections.defaultdict(list)
338  for key, value_list in d.items():
339    for val in value_list:
340      inverted[val].append(key)
341  return inverted
342
343
344def unique_list(xs):
345  """Return a unique list from an iterable, preserving order."""
346  seen = set()
347  out = []
348  for x in xs:
349    if x not in seen:
350      seen.add(x)
351      out.append(x)
352  return out
353
354
355class DynamicVar:
356  """A dynamically scoped variable.
357
358  This is a per-thread dynamic variable, with an initial value of None.
359  The bind() call establishes a new value that will be in effect for the
360  duration of the resulting context manager.  This is intended to be used
361  in conjunction with a decorator.
362  """
363
364  def __init__(self):
365    self._local = threading.local()
366
367  def _values(self):
368    values = getattr(self._local, "values", None)
369    if values is None:
370      values = [None]  # Stack of bindings, with an initial default of None.
371      self._local.values = values
372    return values
373
374  @contextlib.contextmanager
375  def bind(self, value):
376    """Bind the dynamic variable to the supplied value."""
377    values = self._values()
378    try:
379      values.append(value)  # Push the new binding.
380      yield
381    finally:
382      values.pop()  # Pop the binding.
383
384  def get(self):
385    """Return the current value of the dynamic variable."""
386    return self._values()[-1]
387
388
389class AnnotatingDecorator:
390  """A decorator for storing function attributes.
391
392  Attributes:
393    lookup: maps functions to their attributes.
394  """
395
396  def __init__(self):
397    self.lookup = {}
398
399  def __call__(self, value):
400    def decorate(f):
401      self.lookup[f.__name__] = value
402      return f
403    return decorate
404
405
406class VirtualMachineWeakrefMixin:
407
408  __slots__ = ["vm_weakref"]
409
410  def __init__(self, vm):
411    self.vm_weakref = weakref.ref(vm)
412
413  @property
414  def vm(self):
415    return self.vm_weakref()
416