1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5r"""A database of OWNERS files.
6
7OWNERS files indicate who is allowed to approve changes in a specific directory
8(or who is allowed to make changes without needing approval of another OWNER).
9Note that all changes must still be reviewed by someone familiar with the code,
10so you may need approval from both an OWNER and a reviewer in many cases.
11
12The syntax of the OWNERS file is, roughly:
13
14lines      := (\s* line? \s* comment? \s* "\n")*
15
16line       := directive
17           | "per-file" \s+ glob \s* "=" \s* directive
18
19directive  := "set noparent"
20           |  "file:" owner_file
21           |  email_address
22           |  "*"
23
24glob       := [a-zA-Z0-9_-*?]+
25
26comment    := "#" [^"\n"]*
27
28owner_file := "OWNERS"
29           |  [^"\n"]* "_OWNERS"
30
31Email addresses must follow the foo@bar.com short form (exact syntax given
32in BASIC_EMAIL_REGEXP, below). Filename globs follow the simple unix
33shell conventions, and relative and absolute paths are not allowed (i.e.,
34globs only refer to the files in the current directory).
35
36If a user's email is one of the email_addresses in the file, the user is
37considered an "OWNER" for all files in the directory.
38
39If the "per-file" directive is used, the line only applies to files in that
40directory that match the filename glob specified.
41
42If the "set noparent" directive used, then only entries in this OWNERS file
43apply to files in this directory; if the "set noparent" directive is not
44used, then entries in OWNERS files in enclosing (upper) directories also
45apply (up until a "set noparent is encountered").
46
47If "per-file glob=set noparent" is used, then global directives are ignored
48for the glob, and only the "per-file" owners are used for files matching that
49glob.
50
51If the "file:" directive is used, the referred to OWNERS file will be parsed and
52considered when determining the valid set of OWNERS. If the filename starts with
53"//" it is relative to the root of the repository, otherwise it is relative to
54the current file. The referred to file *must* be named OWNERS or end in a suffix
55of _OWNERS.
56
57Examples for all of these combinations can be found in tests/owners_unittest.py.
58"""
59
60import collections
61import fnmatch
62import os
63import random
64import re
65
66try:
67  # This fallback applies for all versions of Python before 3.3
68  import collections.abc as collections_abc
69except ImportError:
70  import collections as collections_abc
71
72
73# If this is present by itself on a line, this means that everyone can review.
74EVERYONE = '*'
75
76
77# Recognizes 'X@Y' email addresses. Very simplistic.
78BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$'
79
80
81# Key for global comments per email address. Should be unlikely to be a
82# pathname.
83GLOBAL_STATUS = '*'
84
85# Returned if there is no owner and anyone +1
86ANYONE='<anyone>'
87
88
89def _assert_is_collection(obj):
90  assert not isinstance(obj, str)
91  # Module 'collections' has no 'Iterable' member
92  # pylint: disable=no-member
93  if hasattr(collections_abc, 'Iterable') and hasattr(collections_abc, 'Sized'):
94    assert (isinstance(obj, collections_abc.Iterable) and
95            isinstance(obj, collections_abc.Sized))
96
97
98class SyntaxErrorInOwnersFile(Exception):
99  def __init__(self, path, lineno, msg):
100    super(SyntaxErrorInOwnersFile, self).__init__((path, lineno, msg))
101    self.path = path
102    self.lineno = lineno
103    self.msg = msg
104
105  def __str__(self):
106    return '%s:%d syntax error: %s' % (self.path, self.lineno, self.msg)
107
108
109class Database(object):
110  """A database of OWNERS files for a repository.
111
112  This class allows you to find a suggested set of reviewers for a list
113  of changed files, and see if a list of changed files is covered by a
114  list of reviewers."""
115
116  def __init__(self, root, fopen, os_path):
117    """Args:
118      root: the path to the root of the Repository
119      open: function callback to open a text file for reading
120      os_path: module/object callback with fields for 'abspath', 'dirname',
121          'exists', 'join', and 'relpath'
122    """
123    self.root = root
124    self.fopen = fopen
125    self.os_path = os_path
126
127    # Pick a default email regexp to use; callers can override as desired.
128    self.email_regexp = re.compile(BASIC_EMAIL_REGEXP)
129
130    # Replacement contents for the given files. Maps the file name of an
131    # OWNERS file (relative to root) to an iterator returning the replacement
132    # file contents.
133    self.override_files = {}
134
135    # Mapping of owners to the paths or globs they own.
136    self._owners_to_paths = {EVERYONE: set()}
137
138    # Mappings of directories -> globs in the directory -> owners
139    # Example: "chrome/browser" -> "chrome/browser/*.h" -> ("john", "maria")
140    # Paths used as keys must use slashes as the separator, even on Windows.
141    self._paths_to_owners = {}
142
143    # Mapping reviewers to the preceding comment per file in the OWNERS files.
144    self.comments = {}
145
146    # Cache of compiled regexes for _fnmatch()
147    self._fnmatch_cache = {}
148
149    # Sets of paths that stop us from looking above them for owners.
150    # (This is implicitly true for the root directory).
151    #
152    # The implementation is a mapping:
153    # Directory -> globs in the directory,
154    #
155    # Example:
156    # 'ui/events/devices/mojo' -> 'ui/events/devices/mojo/*_struct_traits*.*'
157    self._stop_looking = {'': set([''])}
158
159    # Set of files which have already been read.
160    self.read_files = set()
161
162    # Set of files which were included from other files. Files are processed
163    # differently depending on whether they are regular owners files or
164    # being included from another file.
165    self._included_files = {}
166
167    # File with global status lines for owners.
168    self._status_file = None
169
170  def _file_affects_ownership(self, path):
171    """Returns true if the path refers to a file that could affect ownership."""
172    filename = self.os_path.split(path)[-1]
173    return filename == 'OWNERS' or filename.endswith('_OWNERS')
174
175
176  def reviewers_for(self, files, author):
177    """Returns a suggested set of reviewers that will cover the files.
178
179    files is a sequence of paths relative to (and under) self.root.
180    If author is nonempty, we ensure it is not included in the set returned
181    in order avoid suggesting the author as a reviewer for their own changes."""
182    self._check_paths(files)
183    self.load_data_needed_for(files)
184
185    suggested_owners = self._covering_set_of_owners_for(files, author)
186    if EVERYONE in suggested_owners:
187      if len(suggested_owners) > 1:
188        suggested_owners.remove(EVERYONE)
189      else:
190        suggested_owners = set([ANYONE])
191    return suggested_owners
192
193  def files_not_covered_by(self, files, reviewers):
194    """Returns the files not owned by one of the reviewers.
195
196    Args:
197        files is a sequence of paths relative to (and under) self.root.
198        reviewers is a sequence of strings matching self.email_regexp.
199    """
200    self._check_paths(files)
201    self._check_reviewers(reviewers)
202    self.load_data_needed_for(files)
203
204    return set(f for f in files if not self._is_obj_covered_by(f, reviewers))
205
206  def _check_paths(self, files):
207    def _is_under(f, pfx):
208      return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx)
209    _assert_is_collection(files)
210    assert all(not self.os_path.isabs(f) and
211                _is_under(f, self.os_path.abspath(self.root)) for f in files)
212
213  def _check_reviewers(self, reviewers):
214    _assert_is_collection(reviewers)
215    assert all(self.email_regexp.match(r) for r in reviewers), reviewers
216
217  def _is_obj_covered_by(self, objname, reviewers):
218    reviewers = list(reviewers) + [EVERYONE]
219    while True:
220      for reviewer in reviewers:
221        for owned_pattern in self._owners_to_paths.get(reviewer, set()):
222          if fnmatch.fnmatch(objname, owned_pattern):
223            return True
224      if self._should_stop_looking(objname):
225        break
226      objname = self.os_path.dirname(objname)
227    return False
228
229  def enclosing_dir_with_owners(self, objname):
230    """Returns the innermost enclosing directory that has an OWNERS file."""
231    dirpath = objname
232    while not self._owners_for(dirpath):
233      if self._should_stop_looking(dirpath):
234        break
235      dirpath = self.os_path.dirname(dirpath)
236    return dirpath
237
238  def load_data_needed_for(self, files):
239    self._read_global_comments()
240    visited_dirs = set()
241    for f in files:
242      # Always use slashes as separators.
243      f = f.replace(os.sep, '/')
244      dirpath = self.os_path.dirname(f)
245      while dirpath not in visited_dirs:
246        visited_dirs.add(dirpath)
247
248        obj_owners = self._owners_for(dirpath)
249        if obj_owners:
250          break
251        self._read_owners(self.os_path.join(dirpath, 'OWNERS'))
252        if self._should_stop_looking(dirpath):
253          break
254
255        dirpath = self.os_path.dirname(dirpath)
256
257  def _should_stop_looking(self, objname):
258    dirname = objname
259    while True:
260      if dirname in self._stop_looking:
261        if any(self._fnmatch(objname, stop_looking)
262               for stop_looking in self._stop_looking[dirname]):
263          return True
264      up_dirname = self.os_path.dirname(dirname)
265      if up_dirname == dirname:
266        break
267      dirname = up_dirname
268    return False
269
270  def _get_root_affected_dir(self, obj_name):
271    """Returns the deepest directory/path that is affected by a file pattern
272    |obj_name|."""
273    root_affected_dir = obj_name
274    while '*' in root_affected_dir:
275      root_affected_dir = self.os_path.dirname(root_affected_dir)
276    return root_affected_dir
277
278  def _owners_for(self, objname):
279    obj_owners = set()
280
281    # Possibly relevant rules can be found stored at every directory
282    # level so iterate upwards, looking for them.
283    dirname = objname
284    while True:
285      dir_owner_rules = self._paths_to_owners.get(dirname)
286      if dir_owner_rules:
287        for owned_path, path_owners in dir_owner_rules.items():
288          if self._fnmatch(objname, owned_path):
289            obj_owners |= path_owners
290      up_dirname = self.os_path.dirname(dirname)
291      if up_dirname == dirname:
292        break
293      dirname = up_dirname
294
295    return obj_owners
296
297  def _read_owners(self, path):
298    owners_path = self.os_path.join(self.root, path)
299    if not (self.os_path.exists(owners_path) or (path in self.override_files)):
300      return
301
302    if owners_path in self.read_files:
303      return
304
305    self.read_files.add(owners_path)
306
307    is_toplevel = path == 'OWNERS'
308
309    comment = []
310    dirpath = self.os_path.dirname(path)
311    in_comment = False
312    # We treat the beginning of the file as an blank line.
313    previous_line_was_blank = True
314    reset_comment_after_use = False
315    lineno = 0
316
317    if path in self.override_files:
318      file_iter = self.override_files[path]
319    else:
320      file_iter = self.fopen(owners_path)
321
322    for line in file_iter:
323      lineno += 1
324      line = line.strip()
325      if line.startswith('#'):
326        if is_toplevel:
327          m = re.match(r'#\s*OWNERS_STATUS\s+=\s+(.+)$', line)
328          if m:
329            self._status_file = m.group(1).strip()
330            continue
331        if not in_comment:
332          comment = []
333          reset_comment_after_use = not previous_line_was_blank
334        comment.append(line[1:].strip())
335        in_comment = True
336        continue
337      in_comment = False
338
339      if line == '':
340        comment = []
341        previous_line_was_blank = True
342        continue
343
344      # If the line ends with a comment, strip the comment and store it for this
345      # line only.
346      line, _, line_comment = line.partition('#')
347      line = line.strip()
348      line_comment = [line_comment.strip()] if line_comment else []
349
350      previous_line_was_blank = False
351      if line == 'set noparent':
352        self._stop_looking.setdefault(
353          self._get_root_affected_dir(dirpath), set()).add(dirpath)
354        continue
355
356      m = re.match('per-file (.+)=(.+)', line)
357      if m:
358        glob_string = m.group(1).strip()
359        directive = m.group(2).strip()
360        full_glob_string = self.os_path.join(self.root, dirpath, glob_string)
361        if '/' in glob_string or '\\' in glob_string:
362          raise SyntaxErrorInOwnersFile(owners_path, lineno,
363              'per-file globs cannot span directories or use escapes: "%s"' %
364              line)
365        relative_glob_string = self.os_path.relpath(full_glob_string, self.root)
366        self._add_entry(relative_glob_string, directive, owners_path,
367                        lineno, '\n'.join(comment + line_comment))
368        if reset_comment_after_use:
369          comment = []
370        continue
371
372      if line.startswith('set '):
373        raise SyntaxErrorInOwnersFile(owners_path, lineno,
374            'unknown option: "%s"' % line[4:].strip())
375
376      self._add_entry(dirpath, line, owners_path, lineno,
377                      ' '.join(comment + line_comment))
378      if reset_comment_after_use:
379        comment = []
380
381  def _read_global_comments(self):
382    if not self._status_file:
383      if not 'OWNERS' in self.read_files:
384        self._read_owners('OWNERS')
385      if not self._status_file:
386        return
387
388    owners_status_path = self.os_path.join(self.root, self._status_file)
389    if not self.os_path.exists(owners_status_path):
390      raise IOError('Could not find global status file "%s"' %
391                    owners_status_path)
392
393    if owners_status_path in self.read_files:
394      return
395
396    self.read_files.add(owners_status_path)
397
398    lineno = 0
399    for line in self.fopen(owners_status_path):
400      lineno += 1
401      line = line.strip()
402      if line.startswith('#'):
403        continue
404      if line == '':
405        continue
406
407      m = re.match('(.+?):(.+)', line)
408      if m:
409        owner = m.group(1).strip()
410        comment = m.group(2).strip()
411        if not self.email_regexp.match(owner):
412          raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
413              'invalid email address: "%s"' % owner)
414
415        self.comments.setdefault(owner, {})
416        self.comments[owner][GLOBAL_STATUS] = comment
417        continue
418
419      raise SyntaxErrorInOwnersFile(owners_status_path, lineno,
420          'cannot parse status entry: "%s"' % line.strip())
421
422  def _add_entry(self, owned_paths, directive, owners_path, lineno, comment):
423    # Consistently uses paths with slashes as the keys or else Windows will
424    # break in surprising and untested ways.
425    owned_paths = owned_paths.replace(os.sep, '/')
426    if directive == 'set noparent':
427      self._stop_looking.setdefault(
428        self._get_root_affected_dir(owned_paths), set()).add(owned_paths)
429    elif directive.startswith('file:'):
430      include_file = self._resolve_include(directive[5:], owners_path, lineno)
431      if not include_file:
432        raise SyntaxErrorInOwnersFile(owners_path, lineno,
433            ('%s does not refer to an existing file.' % directive[5:]))
434
435      included_owners = self._read_just_the_owners(include_file)
436      for owner in included_owners:
437        self._owners_to_paths.setdefault(owner, set()).add(owned_paths)
438        self._paths_to_owners.setdefault(
439          self._get_root_affected_dir(owned_paths), {}).setdefault(
440            owned_paths, set()).add(owner)
441    elif self.email_regexp.match(directive) or directive == EVERYONE:
442      if comment:
443        self.comments.setdefault(directive, {})
444        self.comments[directive][owned_paths] = comment
445      self._owners_to_paths.setdefault(directive, set()).add(owned_paths)
446      self._paths_to_owners.setdefault(
447        self._get_root_affected_dir(owned_paths), {}).setdefault(
448          owned_paths, set()).add(directive)
449    else:
450      raise SyntaxErrorInOwnersFile(owners_path, lineno,
451          ('"%s" is not a "set noparent", file include, "*", '
452           'or an email address.' % (directive,)))
453
454  def _resolve_include(self, path, start, lineno):
455    if path.startswith('//'):
456      include_path = path[2:]
457    else:
458      assert start.startswith(self.root)
459      start = self.os_path.dirname(self.os_path.relpath(start, self.root))
460      include_path = self.os_path.normpath(self.os_path.join(start, path))
461
462    if include_path in self.override_files:
463      return include_path
464
465    owners_path = self.os_path.join(self.root, include_path)
466    # Paths included via "file:" must end in OWNERS or _OWNERS. Files that can
467    # affect ownership have a different set of ownership rules, so that users
468    # cannot self-approve changes adding themselves to an OWNERS file.
469    if not self._file_affects_ownership(owners_path):
470      raise SyntaxErrorInOwnersFile(start, lineno, 'file: include must specify '
471                                    'a file named OWNERS or ending in _OWNERS')
472
473    if not self.os_path.exists(owners_path):
474      return None
475
476    return include_path
477
478  def _read_just_the_owners(self, include_file):
479    if include_file in self._included_files:
480      return self._included_files[include_file]
481
482    owners = set()
483    self._included_files[include_file] = owners
484    lineno = 0
485    if include_file in self.override_files:
486      file_iter = self.override_files[include_file]
487    else:
488      file_iter = self.fopen(self.os_path.join(self.root, include_file))
489    for line in file_iter:
490      lineno += 1
491      line = line.strip()
492      if (line.startswith('#') or line == '' or
493              line.startswith('set noparent') or
494              line.startswith('per-file')):
495        continue
496
497      # If the line ends with a comment, strip the comment.
498      line, _delim, _comment = line.partition('#')
499      line = line.strip()
500
501      if self.email_regexp.match(line) or line == EVERYONE:
502        owners.add(line)
503        continue
504      if line.startswith('file:'):
505        sub_include_file = self._resolve_include(line[5:], include_file, lineno)
506        sub_owners = self._read_just_the_owners(sub_include_file)
507        owners.update(sub_owners)
508        continue
509
510      raise SyntaxErrorInOwnersFile(include_file, lineno,
511          ('"%s" is not a "set noparent", file include, "*", '
512           'or an email address.' % (line,)))
513    return owners
514
515  def _covering_set_of_owners_for(self, files, author):
516    dirs_remaining = set(self.enclosing_dir_with_owners(f) for f in files)
517    all_possible_owners = self.all_possible_owners(dirs_remaining, author)
518    suggested_owners = set()
519    while dirs_remaining and all_possible_owners:
520      owner = self.lowest_cost_owner(all_possible_owners, dirs_remaining)
521      suggested_owners.add(owner)
522      dirs_to_remove = set(el[0] for el in all_possible_owners[owner])
523      dirs_remaining -= dirs_to_remove
524      # Now that we've used `owner` and covered all their dirs, remove them
525      # from consideration.
526      del all_possible_owners[owner]
527      for o, dirs in list(all_possible_owners.items()):
528        new_dirs = [(d, dist) for (d, dist) in dirs if d not in dirs_to_remove]
529        if not new_dirs:
530          del all_possible_owners[o]
531        else:
532          all_possible_owners[o] = new_dirs
533    return suggested_owners
534
535  def _all_possible_owners_for_dir_or_file(self, dir_or_file, author,
536                                           cache):
537    """Returns a dict of {potential owner: (dir_or_file, distance)} mappings.
538    """
539    assert not dir_or_file.startswith("/")
540    res = cache.get(dir_or_file)
541    if res is None:
542      res = {}
543      dirname = dir_or_file
544      for owner in self._owners_for(dirname):
545        if author and owner == author:
546          continue
547        res.setdefault(owner, [])
548        res[owner] = (dir_or_file, 1)
549      if not self._should_stop_looking(dirname):
550        dirname = self.os_path.dirname(dirname)
551
552        parent_res = self._all_possible_owners_for_dir_or_file(dirname,
553                                                               author, cache)
554
555        # Merge the parent information with our information, adjusting
556        # distances as necessary, and replacing the parent directory
557        # names with our names.
558        for owner, par_dir_and_distances in parent_res.items():
559          if owner in res:
560            # If the same person is in multiple OWNERS files above a given
561            # directory, only count the closest one.
562            continue
563          parent_distance = par_dir_and_distances[1]
564          res[owner] = (dir_or_file, parent_distance + 1)
565
566      cache[dir_or_file] = res
567
568    return res
569
570  def all_possible_owners(self, dirs_and_files, author):
571    """Returns a dict of {potential owner: (dir, distance)} mappings.
572
573    A distance of 1 is the lowest/closest possible distance (which makes the
574    subsequent math easier).
575    """
576
577    self.load_data_needed_for(dirs_and_files)
578    all_possible_owners_for_dir_or_file_cache = {}
579    all_possible_owners = {}
580    for current_dir in dirs_and_files:
581      # Always use slashes as separators.
582      current_dir = current_dir.replace(os.sep, '/')
583      dir_owners = self._all_possible_owners_for_dir_or_file(
584        current_dir, author,
585        all_possible_owners_for_dir_or_file_cache)
586      for owner, dir_and_distance in dir_owners.items():
587          if owner in all_possible_owners:
588            all_possible_owners[owner].append(dir_and_distance)
589          else:
590            all_possible_owners[owner] = [dir_and_distance]
591
592    return all_possible_owners
593
594  def _fnmatch(self, filename, pattern):
595    """Same as fnmatch.fnmatch(), but internally caches the compiled regexes."""
596    # Make sure backslashes are never used in the filename. The regex
597    # expressions generated by fnmatch.translate don't handle matching slashes
598    # to backslashes.
599    filename = filename.replace(os.sep, '/')
600    assert pattern.count('\\') == 0, 'Backslashes found in %s' % pattern
601    matcher = self._fnmatch_cache.get(pattern)
602    if matcher is None:
603      matcher = re.compile(fnmatch.translate(pattern)).match
604      self._fnmatch_cache[pattern] = matcher
605    return matcher(filename)
606
607  @staticmethod
608  def total_costs_by_owner(all_possible_owners, dirs):
609    # We want to minimize both the number of reviewers and the distance
610    # from the files/dirs needing reviews. The "pow(X, 1.75)" below is
611    # an arbitrarily-selected scaling factor that seems to work well - it
612    # will select one reviewer in the parent directory over three reviewers
613    # in subdirs, but not one reviewer over just two.
614    result = {}
615    for owner in all_possible_owners:
616      total_distance = 0
617      num_directories_owned = 0
618      for dirname, distance in all_possible_owners[owner]:
619        if dirname in dirs:
620          total_distance += distance
621          num_directories_owned += 1
622      if num_directories_owned:
623        result[owner] = (total_distance /
624                         pow(num_directories_owned, 1.75))
625    return result
626
627  @staticmethod
628  def lowest_cost_owner(all_possible_owners, dirs):
629    total_costs_by_owner = Database.total_costs_by_owner(all_possible_owners,
630                                                         dirs)
631    # Return the lowest cost owner. In the case of a tie, pick one randomly.
632    lowest_cost = min(total_costs_by_owner.values())
633    lowest_cost_owners = [
634        owner for owner, cost in total_costs_by_owner.items()
635        if cost == lowest_cost]
636    return random.Random().choice(lowest_cost_owners)
637
638  def owners_rooted_at_file(self, filename):
639    """Returns a set of all owners transitively listed in filename.
640
641    This function returns a set of all the owners either listed in filename, or
642    in a file transitively included by filename. Lines that are not plain owners
643    (i.e. per-file owners) are ignored.
644    """
645    return self._read_just_the_owners(filename)
646