1"""Use pytype to analyze and infer types for an entire project."""
2
3import logging
4import os
5import subprocess
6import sys
7from typing import Iterable, Sequence, Tuple
8
9from pytype import file_utils
10from pytype import module_utils
11from pytype.tools.analyze_project import config
12
13
14# Generate a default pyi for builtin and system dependencies.
15DEFAULT_PYI = """
16from typing import Any
17def __getattr__(name) -> Any: ...
18"""
19
20
21class Action:
22  CHECK = 'check'
23  INFER = 'infer'
24  GENERATE_DEFAULT = 'generate default'
25
26
27class Stage:
28  SINGLE_PASS = 'single pass'
29  FIRST_PASS = 'first pass'
30  SECOND_PASS = 'second pass'
31
32
33FIRST_PASS_SUFFIX = '-1'
34
35
36def _get_executable(binary, module=None):
37  """Get the path to the executable with the given name."""
38  if binary == 'pytype-single':
39    custom_bin = os.path.join('out', 'bin', 'pytype')
40    if sys.argv[0] == custom_bin:
41      # The Travis type-check step uses custom binaries in pytype/out/bin/.
42      return [os.path.join(os.path.abspath(os.path.dirname(custom_bin)),
43                           'pytype-single')]
44  if sys.executable is not None:
45    return [sys.executable, '-m', module or binary]
46  else:
47    return [binary]
48PYTYPE_SINGLE = _get_executable('pytype-single', 'pytype.single')
49
50
51def resolved_file_to_module(f):
52  """Turn an importlab ResolvedFile into a pytype Module."""
53  full_path = f.path
54  target = f.short_path
55  path = full_path[:-len(target)]
56  name = f.module_name
57  # We want to preserve __init__ in the module_name for pytype.
58  if os.path.basename(full_path) == '__init__.py':
59    name += '.__init__'
60  return module_utils.Module(
61      path=path, target=target, name=name, kind=f.__class__.__name__)
62
63
64def deps_from_import_graph(import_graph):
65  """Construct PytypeRunner args from an importlab.ImportGraph instance.
66
67  Kept as a separate function so PytypeRunner can be tested independently of
68  importlab.
69
70  Args:
71    import_graph: An importlab.ImportGraph instance.
72
73  Returns:
74    List of (tuple of source modules, tuple of direct deps) in dependency order.
75  """
76  def get_filenames(node):
77    if isinstance(node, str):
78      return (node,)
79    else:
80      # Make the build as deterministic as possible to minimize rebuilds.
81      return tuple(sorted(node.nodes))
82  def make_module(filename):
83    return resolved_file_to_module(import_graph.provenance[filename])
84  modules = []
85  for node, deps in reversed(import_graph.deps_list()):
86    files = tuple(
87        make_module(f) for f in get_filenames(node) if not _is_type_stub(f))
88    # flatten and dedup
89    seen = set()
90    final_deps = []
91    for dep in deps:
92      for d in get_filenames(dep):
93        if d in seen:
94          continue
95        seen.add(d)
96        if not _is_type_stub(d):
97          final_deps.append(make_module(d))
98    if files:
99      modules.append((files, tuple(final_deps)))
100  return modules
101
102
103def _is_type_stub(f):
104  _, ext = os.path.splitext(f)
105  return ext in ('.pyi', '.pytd')
106
107
108def _module_to_output_path(mod):
109  """Convert a module to an output path."""
110  path, _ = os.path.splitext(mod.target)
111  if path.replace(os.path.sep, '.').endswith(mod.name):
112    # Preferentially use the short path.
113    return path[-len(mod.name):]
114  else:
115    # Fall back to computing the output path from the name, which is a last
116    # resort because it messes up hidden files. Since such files aren't valid
117    # python packages anyway, we preserve any leading '.' in order to not
118    # create a file directly in / (which would likely cause a crash with a
119    # permission error) and let the rest of the path be mangled.
120    return mod.name[0] + mod.name[1:].replace('.', os.path.sep)
121
122
123def get_imports_map(deps, module_to_imports_map, module_to_output):
124  """Get a short path -> full path map for the given deps."""
125  imports_map = {}
126  for m in deps:
127    if m in module_to_imports_map:
128      imports_map.update(module_to_imports_map[m])
129    imports_map[_module_to_output_path(m)] = module_to_output[m]
130  return imports_map
131
132
133class PytypeRunner:
134  """Runs pytype over an import graph."""
135
136  def __init__(self, conf, sorted_sources):
137    self.filenames = set(conf.inputs)  # files to type-check
138    # all source modules as a sequence of (module, direct_deps)
139    self.sorted_sources = sorted_sources
140    self.python_version = conf.python_version
141    self.pyi_dir = os.path.join(conf.output, 'pyi')
142    self.imports_dir = os.path.join(conf.output, 'imports')
143    self.ninja_file = os.path.join(conf.output, 'build.ninja')
144    self.custom_options = [
145        (k, getattr(conf, k)) for k in set(conf.__slots__) - set(config.ITEMS)]
146    self.keep_going = conf.keep_going
147    self.jobs = conf.jobs
148
149  def set_custom_options(self, flags_with_values, binary_flags):
150    """Merge self.custom_options into flags_with_values and binary_flags."""
151    for dest, value in self.custom_options:
152      arg_info = config.get_pytype_single_item(dest).arg_info
153      if arg_info.to_command_line:
154        value = arg_info.to_command_line(value)
155      if isinstance(value, bool):
156        if value:
157          binary_flags.add(arg_info.flag)
158        else:
159          binary_flags.discard(arg_info.flag)
160      elif value:
161        flags_with_values[arg_info.flag] = str(value)
162
163  def get_pytype_command_for_ninja(self, report_errors):
164    """Get the command line for running pytype."""
165    exe = PYTYPE_SINGLE
166    flags_with_values = {
167        '--imports_info': '$imports',
168        '-V': self.python_version,
169        '-o': '$out',
170        '--module-name': '$module',
171    }
172    binary_flags = {
173        '--quick',
174        '--analyze-annotated' if report_errors else '--no-report-errors',
175        '--nofail',
176    }
177    if report_errors:
178      self.set_custom_options(flags_with_values, binary_flags)
179    # Order the flags so that ninja recognizes commands across runs.
180    return (
181        exe +
182        list(sum(sorted(flags_with_values.items()), ())) +
183        sorted(binary_flags) +
184        ['$in']
185    )
186
187  def make_imports_dir(self):
188    try:
189      file_utils.makedirs(self.imports_dir)
190    except OSError:
191      logging.error('Could not create imports directory: %s', self.imports_dir)
192      return False
193    return True
194
195  def write_default_pyi(self):
196    """Write a default pyi file."""
197    output = os.path.join(self.imports_dir, 'default.pyi')
198    with open(output, 'w') as f:
199      f.write(DEFAULT_PYI)
200    return output
201
202  def write_imports(self, module_name, imports_map, suffix):
203    """Write a .imports file."""
204    output = os.path.join(self.imports_dir, module_name + '.imports' + suffix)
205    with open(output, 'w') as f:
206      for item in imports_map.items():
207        f.write('%s %s\n' % item)
208    return output
209
210  def get_module_action(self, module):
211    """Get the action for the given module.
212
213    Args:
214      module: A module_utils.Module object.
215
216    Returns:
217      An Action object, or None for a non-Python file.
218    """
219    f = module.full_path
220    # Report errors for files we are analysing directly.
221    if f in self.filenames:
222      action = Action.CHECK
223      report = logging.warning
224    else:
225      action = Action.INFER
226      report = logging.info
227    # For builtin and system files not in pytype's own pytype_extensions
228    # library, do not attempt to generate a pyi.
229    if (not module.name.startswith('pytype_extensions.') and
230        module.kind in ('Builtin', 'System')):
231      action = Action.GENERATE_DEFAULT
232      report('%s: %s module %s', action, module.kind, module.name)
233    return action
234
235  def yield_sorted_modules(self) -> Iterable[
236      Tuple[module_utils.Module, str, Sequence[module_utils.Module], str]]:
237    """Yield modules from our sorted source files."""
238    for group, deps in self.sorted_sources:
239      modules = []
240      for module in group:
241        action = self.get_module_action(module)
242        if action:
243          modules.append((module, action))
244      if len(modules) == 1:
245        # TODO(b/73562531): Remove the pytype disable once the bug is fixed.
246        yield modules[0] + (deps, Stage.SINGLE_PASS)  # pytype: disable=bad-return-type
247      else:
248        # If we have a cycle we run pytype over the files twice. So that we
249        # don't fail on missing dependencies, we'll ignore errors the first
250        # time and add the cycle itself to the dependencies the second time.
251        second_pass_deps = []
252        for module, action in modules:
253          second_pass_deps.append(module)
254          if action == Action.CHECK:
255            action = Action.INFER
256          yield module, action, deps, Stage.FIRST_PASS
257        deps += tuple(second_pass_deps)
258        for module, action in modules:
259          # We don't need to run generate_default twice
260          if action != Action.GENERATE_DEFAULT:
261            yield module, action, deps, Stage.SECOND_PASS
262
263  def write_ninja_preamble(self):
264    """Write out the pytype-single commands that the build will call."""
265    with open(self.ninja_file, 'w') as f:
266      for action, report_errors in ((Action.INFER, False),
267                                    (Action.CHECK, True)):
268        command = ' '.join(
269            self.get_pytype_command_for_ninja(report_errors=report_errors))
270        logging.info('%s command: %s', action, command)
271        f.write(
272            'rule {action}\n'
273            '  command = {command}\n'
274            '  description = {action} $module\n'.format(
275                action=action, command=command)
276        )
277
278  def write_build_statement(self, module, action, deps, imports, suffix):
279    """Write a build statement for the given module.
280
281    Args:
282      module: A module_utils.Module object.
283      action: An Action object.
284      deps: The module's dependencies.
285      imports: An imports file.
286      suffix: An output file suffix.
287
288    Returns:
289      The expected output of the build statement.
290    """
291    output = os.path.join(self.pyi_dir,
292                          _module_to_output_path(module) + '.pyi' + suffix)
293    logging.info('%s %s\n  imports: %s\n  deps: %s\n  output: %s',
294                 action, module.name, imports, deps, output)
295    with open(self.ninja_file, 'a') as f:
296      f.write('build {output}: {action} {input}{deps}\n'
297              '  imports = {imports}\n'
298              '  module = {module}\n'.format(
299                  output=output,
300                  action=action,
301                  input=module.full_path,
302                  deps=' | ' + ' '.join(deps) if deps else '',
303                  imports=imports,
304                  module=module.name))
305    return output
306
307  def setup_build(self):
308    """Write out the full build.ninja file.
309
310    Returns:
311      All files with build statements.
312    """
313    if not self.make_imports_dir():
314      return set()
315    default_output = self.write_default_pyi()
316    self.write_ninja_preamble()
317    files = set()
318    module_to_imports_map = {}
319    module_to_output = {}
320    for module, action, deps, stage in self.yield_sorted_modules():
321      if files >= self.filenames:
322        logging.info('skipped: %s %s (%s)', action, module.name, stage)
323        continue
324      if action == Action.GENERATE_DEFAULT:
325        module_to_output[module] = default_output
326        continue
327      if stage == Stage.SINGLE_PASS:
328        files.add(module.full_path)
329        suffix = ''
330      elif stage == Stage.FIRST_PASS:
331        suffix = FIRST_PASS_SUFFIX
332      else:
333        assert stage == Stage.SECOND_PASS
334        files.add(module.full_path)
335        suffix = ''
336      imports_map = module_to_imports_map[module] = get_imports_map(
337          deps, module_to_imports_map, module_to_output)
338      imports = self.write_imports(module.name, imports_map, suffix)
339      # Don't depend on default.pyi, since it's regenerated every time.
340      deps = tuple(module_to_output[m] for m in deps
341                   if module_to_output[m] != default_output)
342      module_to_output[module] = self.write_build_statement(
343          module, action, deps, imports, suffix)
344    return files
345
346  def build(self):
347    """Execute the build.ninja file."""
348    # -k N     keep going until N jobs fail (0 means infinity)
349    # -C DIR   change to DIR before doing anything else
350    # -j N     run N jobs in parallel (0 means infinity)
351    # -v       show all command lines while building
352    k = '0' if self.keep_going else '1'
353    # relpath() prevents possibly sensitive directory info from appearing in
354    # ninja's "Entering directory" message.
355    c = os.path.relpath(os.path.dirname(self.ninja_file))
356    command = _get_executable('ninja') + [
357        '-k', k, '-C', c, '-j', str(self.jobs)]
358    if logging.getLogger().isEnabledFor(logging.INFO):
359      command.append('-v')
360    ret = subprocess.call(command)
361    print('Leaving directory %r' % c)
362    return ret
363
364  def run(self):
365    """Run pytype over the project."""
366    logging.info('------------- Starting pytype run. -------------')
367    files_to_analyze = self.setup_build()
368    num_sources = len(self.filenames & files_to_analyze)
369    print('Analyzing %d sources with %d local dependencies' %
370          (num_sources, len(files_to_analyze) - num_sources))
371    ret = self.build()
372    if not ret:
373      print('Success: no errors found')
374    return ret
375