1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5r"""A database of OWNERS files. 6 7OWNERS files indicate who is allowed to approve changes in a specific directory 8(or who is allowed to make changes without needing approval of another OWNER). 9Note that all changes must still be reviewed by someone familiar with the code, 10so you may need approval from both an OWNER and a reviewer in many cases. 11 12The syntax of the OWNERS file is, roughly: 13 14lines := (\s* line? \s* comment? \s* "\n")* 15 16line := directive 17 | "per-file" \s+ glob \s* "=" \s* directive 18 19directive := "set noparent" 20 | "file:" owner_file 21 | email_address 22 | "*" 23 24glob := [a-zA-Z0-9_-*?]+ 25 26comment := "#" [^"\n"]* 27 28owner_file := "OWNERS" 29 | [^"\n"]* "_OWNERS" 30 31Email addresses must follow the foo@bar.com short form (exact syntax given 32in BASIC_EMAIL_REGEXP, below). Filename globs follow the simple unix 33shell conventions, and relative and absolute paths are not allowed (i.e., 34globs only refer to the files in the current directory). 35 36If a user's email is one of the email_addresses in the file, the user is 37considered an "OWNER" for all files in the directory. 38 39If the "per-file" directive is used, the line only applies to files in that 40directory that match the filename glob specified. 41 42If the "set noparent" directive used, then only entries in this OWNERS file 43apply to files in this directory; if the "set noparent" directive is not 44used, then entries in OWNERS files in enclosing (upper) directories also 45apply (up until a "set noparent is encountered"). 46 47If "per-file glob=set noparent" is used, then global directives are ignored 48for the glob, and only the "per-file" owners are used for files matching that 49glob. 50 51If the "file:" directive is used, the referred to OWNERS file will be parsed and 52considered when determining the valid set of OWNERS. If the filename starts with 53"//" it is relative to the root of the repository, otherwise it is relative to 54the current file. The referred to file *must* be named OWNERS or end in a suffix 55of _OWNERS. 56 57Examples for all of these combinations can be found in tests/owners_unittest.py. 58""" 59 60import collections 61import fnmatch 62import os 63import random 64import re 65 66try: 67 # This fallback applies for all versions of Python before 3.3 68 import collections.abc as collections_abc 69except ImportError: 70 import collections as collections_abc 71 72 73# If this is present by itself on a line, this means that everyone can review. 74EVERYONE = '*' 75 76 77# Recognizes 'X@Y' email addresses. Very simplistic. 78BASIC_EMAIL_REGEXP = r'^[\w\-\+\%\.]+\@[\w\-\+\%\.]+$' 79 80 81# Key for global comments per email address. Should be unlikely to be a 82# pathname. 83GLOBAL_STATUS = '*' 84 85# Returned if there is no owner and anyone +1 86ANYONE='<anyone>' 87 88 89def _assert_is_collection(obj): 90 assert not isinstance(obj, str) 91 # Module 'collections' has no 'Iterable' member 92 # pylint: disable=no-member 93 if hasattr(collections_abc, 'Iterable') and hasattr(collections_abc, 'Sized'): 94 assert (isinstance(obj, collections_abc.Iterable) and 95 isinstance(obj, collections_abc.Sized)) 96 97 98class SyntaxErrorInOwnersFile(Exception): 99 def __init__(self, path, lineno, msg): 100 super(SyntaxErrorInOwnersFile, self).__init__((path, lineno, msg)) 101 self.path = path 102 self.lineno = lineno 103 self.msg = msg 104 105 def __str__(self): 106 return '%s:%d syntax error: %s' % (self.path, self.lineno, self.msg) 107 108 109class Database(object): 110 """A database of OWNERS files for a repository. 111 112 This class allows you to find a suggested set of reviewers for a list 113 of changed files, and see if a list of changed files is covered by a 114 list of reviewers.""" 115 116 def __init__(self, root, fopen, os_path): 117 """Args: 118 root: the path to the root of the Repository 119 open: function callback to open a text file for reading 120 os_path: module/object callback with fields for 'abspath', 'dirname', 121 'exists', 'join', and 'relpath' 122 """ 123 self.root = root 124 self.fopen = fopen 125 self.os_path = os_path 126 127 # Pick a default email regexp to use; callers can override as desired. 128 self.email_regexp = re.compile(BASIC_EMAIL_REGEXP) 129 130 # Replacement contents for the given files. Maps the file name of an 131 # OWNERS file (relative to root) to an iterator returning the replacement 132 # file contents. 133 self.override_files = {} 134 135 # Mapping of owners to the paths or globs they own. 136 self._owners_to_paths = {EVERYONE: set()} 137 138 # Mappings of directories -> globs in the directory -> owners 139 # Example: "chrome/browser" -> "chrome/browser/*.h" -> ("john", "maria") 140 # Paths used as keys must use slashes as the separator, even on Windows. 141 self._paths_to_owners = {} 142 143 # Mapping reviewers to the preceding comment per file in the OWNERS files. 144 self.comments = {} 145 146 # Cache of compiled regexes for _fnmatch() 147 self._fnmatch_cache = {} 148 149 # Sets of paths that stop us from looking above them for owners. 150 # (This is implicitly true for the root directory). 151 # 152 # The implementation is a mapping: 153 # Directory -> globs in the directory, 154 # 155 # Example: 156 # 'ui/events/devices/mojo' -> 'ui/events/devices/mojo/*_struct_traits*.*' 157 self._stop_looking = {'': set([''])} 158 159 # Set of files which have already been read. 160 self.read_files = set() 161 162 # Set of files which were included from other files. Files are processed 163 # differently depending on whether they are regular owners files or 164 # being included from another file. 165 self._included_files = {} 166 167 # File with global status lines for owners. 168 self._status_file = None 169 170 def _file_affects_ownership(self, path): 171 """Returns true if the path refers to a file that could affect ownership.""" 172 filename = self.os_path.split(path)[-1] 173 return filename == 'OWNERS' or filename.endswith('_OWNERS') 174 175 176 def reviewers_for(self, files, author): 177 """Returns a suggested set of reviewers that will cover the files. 178 179 files is a sequence of paths relative to (and under) self.root. 180 If author is nonempty, we ensure it is not included in the set returned 181 in order avoid suggesting the author as a reviewer for their own changes.""" 182 self._check_paths(files) 183 self.load_data_needed_for(files) 184 185 suggested_owners = self._covering_set_of_owners_for(files, author) 186 if EVERYONE in suggested_owners: 187 if len(suggested_owners) > 1: 188 suggested_owners.remove(EVERYONE) 189 else: 190 suggested_owners = set([ANYONE]) 191 return suggested_owners 192 193 def files_not_covered_by(self, files, reviewers): 194 """Returns the files not owned by one of the reviewers. 195 196 Args: 197 files is a sequence of paths relative to (and under) self.root. 198 reviewers is a sequence of strings matching self.email_regexp. 199 """ 200 self._check_paths(files) 201 self._check_reviewers(reviewers) 202 self.load_data_needed_for(files) 203 204 return set(f for f in files if not self._is_obj_covered_by(f, reviewers)) 205 206 def _check_paths(self, files): 207 def _is_under(f, pfx): 208 return self.os_path.abspath(self.os_path.join(pfx, f)).startswith(pfx) 209 _assert_is_collection(files) 210 assert all(not self.os_path.isabs(f) and 211 _is_under(f, self.os_path.abspath(self.root)) for f in files) 212 213 def _check_reviewers(self, reviewers): 214 _assert_is_collection(reviewers) 215 assert all(self.email_regexp.match(r) for r in reviewers), reviewers 216 217 def _is_obj_covered_by(self, objname, reviewers): 218 reviewers = list(reviewers) + [EVERYONE] 219 while True: 220 for reviewer in reviewers: 221 for owned_pattern in self._owners_to_paths.get(reviewer, set()): 222 if fnmatch.fnmatch(objname, owned_pattern): 223 return True 224 if self._should_stop_looking(objname): 225 break 226 objname = self.os_path.dirname(objname) 227 return False 228 229 def enclosing_dir_with_owners(self, objname): 230 """Returns the innermost enclosing directory that has an OWNERS file.""" 231 dirpath = objname 232 while not self._owners_for(dirpath): 233 if self._should_stop_looking(dirpath): 234 break 235 dirpath = self.os_path.dirname(dirpath) 236 return dirpath 237 238 def load_data_needed_for(self, files): 239 self._read_global_comments() 240 visited_dirs = set() 241 for f in files: 242 # Always use slashes as separators. 243 f = f.replace(os.sep, '/') 244 dirpath = self.os_path.dirname(f) 245 while dirpath not in visited_dirs: 246 visited_dirs.add(dirpath) 247 248 obj_owners = self._owners_for(dirpath) 249 if obj_owners: 250 break 251 self._read_owners(self.os_path.join(dirpath, 'OWNERS')) 252 if self._should_stop_looking(dirpath): 253 break 254 255 dirpath = self.os_path.dirname(dirpath) 256 257 def _should_stop_looking(self, objname): 258 dirname = objname 259 while True: 260 if dirname in self._stop_looking: 261 if any(self._fnmatch(objname, stop_looking) 262 for stop_looking in self._stop_looking[dirname]): 263 return True 264 up_dirname = self.os_path.dirname(dirname) 265 if up_dirname == dirname: 266 break 267 dirname = up_dirname 268 return False 269 270 def _get_root_affected_dir(self, obj_name): 271 """Returns the deepest directory/path that is affected by a file pattern 272 |obj_name|.""" 273 root_affected_dir = obj_name 274 while '*' in root_affected_dir: 275 root_affected_dir = self.os_path.dirname(root_affected_dir) 276 return root_affected_dir 277 278 def _owners_for(self, objname): 279 obj_owners = set() 280 281 # Possibly relevant rules can be found stored at every directory 282 # level so iterate upwards, looking for them. 283 dirname = objname 284 while True: 285 dir_owner_rules = self._paths_to_owners.get(dirname) 286 if dir_owner_rules: 287 for owned_path, path_owners in dir_owner_rules.items(): 288 if self._fnmatch(objname, owned_path): 289 obj_owners |= path_owners 290 up_dirname = self.os_path.dirname(dirname) 291 if up_dirname == dirname: 292 break 293 dirname = up_dirname 294 295 return obj_owners 296 297 def _read_owners(self, path): 298 owners_path = self.os_path.join(self.root, path) 299 if not (self.os_path.exists(owners_path) or (path in self.override_files)): 300 return 301 302 if owners_path in self.read_files: 303 return 304 305 self.read_files.add(owners_path) 306 307 is_toplevel = path == 'OWNERS' 308 309 comment = [] 310 dirpath = self.os_path.dirname(path) 311 in_comment = False 312 # We treat the beginning of the file as an blank line. 313 previous_line_was_blank = True 314 reset_comment_after_use = False 315 lineno = 0 316 317 if path in self.override_files: 318 file_iter = self.override_files[path] 319 else: 320 file_iter = self.fopen(owners_path) 321 322 for line in file_iter: 323 lineno += 1 324 line = line.strip() 325 if line.startswith('#'): 326 if is_toplevel: 327 m = re.match(r'#\s*OWNERS_STATUS\s+=\s+(.+)$', line) 328 if m: 329 self._status_file = m.group(1).strip() 330 continue 331 if not in_comment: 332 comment = [] 333 reset_comment_after_use = not previous_line_was_blank 334 comment.append(line[1:].strip()) 335 in_comment = True 336 continue 337 in_comment = False 338 339 if line == '': 340 comment = [] 341 previous_line_was_blank = True 342 continue 343 344 # If the line ends with a comment, strip the comment and store it for this 345 # line only. 346 line, _, line_comment = line.partition('#') 347 line = line.strip() 348 line_comment = [line_comment.strip()] if line_comment else [] 349 350 previous_line_was_blank = False 351 if line == 'set noparent': 352 self._stop_looking.setdefault( 353 self._get_root_affected_dir(dirpath), set()).add(dirpath) 354 continue 355 356 m = re.match('per-file (.+)=(.+)', line) 357 if m: 358 glob_string = m.group(1).strip() 359 directive = m.group(2).strip() 360 full_glob_string = self.os_path.join(self.root, dirpath, glob_string) 361 if '/' in glob_string or '\\' in glob_string: 362 raise SyntaxErrorInOwnersFile(owners_path, lineno, 363 'per-file globs cannot span directories or use escapes: "%s"' % 364 line) 365 relative_glob_string = self.os_path.relpath(full_glob_string, self.root) 366 self._add_entry(relative_glob_string, directive, owners_path, 367 lineno, '\n'.join(comment + line_comment)) 368 if reset_comment_after_use: 369 comment = [] 370 continue 371 372 if line.startswith('set '): 373 raise SyntaxErrorInOwnersFile(owners_path, lineno, 374 'unknown option: "%s"' % line[4:].strip()) 375 376 self._add_entry(dirpath, line, owners_path, lineno, 377 ' '.join(comment + line_comment)) 378 if reset_comment_after_use: 379 comment = [] 380 381 def _read_global_comments(self): 382 if not self._status_file: 383 if not 'OWNERS' in self.read_files: 384 self._read_owners('OWNERS') 385 if not self._status_file: 386 return 387 388 owners_status_path = self.os_path.join(self.root, self._status_file) 389 if not self.os_path.exists(owners_status_path): 390 raise IOError('Could not find global status file "%s"' % 391 owners_status_path) 392 393 if owners_status_path in self.read_files: 394 return 395 396 self.read_files.add(owners_status_path) 397 398 lineno = 0 399 for line in self.fopen(owners_status_path): 400 lineno += 1 401 line = line.strip() 402 if line.startswith('#'): 403 continue 404 if line == '': 405 continue 406 407 m = re.match('(.+?):(.+)', line) 408 if m: 409 owner = m.group(1).strip() 410 comment = m.group(2).strip() 411 if not self.email_regexp.match(owner): 412 raise SyntaxErrorInOwnersFile(owners_status_path, lineno, 413 'invalid email address: "%s"' % owner) 414 415 self.comments.setdefault(owner, {}) 416 self.comments[owner][GLOBAL_STATUS] = comment 417 continue 418 419 raise SyntaxErrorInOwnersFile(owners_status_path, lineno, 420 'cannot parse status entry: "%s"' % line.strip()) 421 422 def _add_entry(self, owned_paths, directive, owners_path, lineno, comment): 423 # Consistently uses paths with slashes as the keys or else Windows will 424 # break in surprising and untested ways. 425 owned_paths = owned_paths.replace(os.sep, '/') 426 if directive == 'set noparent': 427 self._stop_looking.setdefault( 428 self._get_root_affected_dir(owned_paths), set()).add(owned_paths) 429 elif directive.startswith('file:'): 430 include_file = self._resolve_include(directive[5:], owners_path, lineno) 431 if not include_file: 432 raise SyntaxErrorInOwnersFile(owners_path, lineno, 433 ('%s does not refer to an existing file.' % directive[5:])) 434 435 included_owners = self._read_just_the_owners(include_file) 436 for owner in included_owners: 437 self._owners_to_paths.setdefault(owner, set()).add(owned_paths) 438 self._paths_to_owners.setdefault( 439 self._get_root_affected_dir(owned_paths), {}).setdefault( 440 owned_paths, set()).add(owner) 441 elif self.email_regexp.match(directive) or directive == EVERYONE: 442 if comment: 443 self.comments.setdefault(directive, {}) 444 self.comments[directive][owned_paths] = comment 445 self._owners_to_paths.setdefault(directive, set()).add(owned_paths) 446 self._paths_to_owners.setdefault( 447 self._get_root_affected_dir(owned_paths), {}).setdefault( 448 owned_paths, set()).add(directive) 449 else: 450 raise SyntaxErrorInOwnersFile(owners_path, lineno, 451 ('"%s" is not a "set noparent", file include, "*", ' 452 'or an email address.' % (directive,))) 453 454 def _resolve_include(self, path, start, lineno): 455 if path.startswith('//'): 456 include_path = path[2:] 457 else: 458 assert start.startswith(self.root) 459 start = self.os_path.dirname(self.os_path.relpath(start, self.root)) 460 include_path = self.os_path.normpath(self.os_path.join(start, path)) 461 462 if include_path in self.override_files: 463 return include_path 464 465 owners_path = self.os_path.join(self.root, include_path) 466 # Paths included via "file:" must end in OWNERS or _OWNERS. Files that can 467 # affect ownership have a different set of ownership rules, so that users 468 # cannot self-approve changes adding themselves to an OWNERS file. 469 if not self._file_affects_ownership(owners_path): 470 raise SyntaxErrorInOwnersFile(start, lineno, 'file: include must specify ' 471 'a file named OWNERS or ending in _OWNERS') 472 473 if not self.os_path.exists(owners_path): 474 return None 475 476 return include_path 477 478 def _read_just_the_owners(self, include_file): 479 if include_file in self._included_files: 480 return self._included_files[include_file] 481 482 owners = set() 483 self._included_files[include_file] = owners 484 lineno = 0 485 if include_file in self.override_files: 486 file_iter = self.override_files[include_file] 487 else: 488 file_iter = self.fopen(self.os_path.join(self.root, include_file)) 489 for line in file_iter: 490 lineno += 1 491 line = line.strip() 492 if (line.startswith('#') or line == '' or 493 line.startswith('set noparent') or 494 line.startswith('per-file')): 495 continue 496 497 # If the line ends with a comment, strip the comment. 498 line, _delim, _comment = line.partition('#') 499 line = line.strip() 500 501 if self.email_regexp.match(line) or line == EVERYONE: 502 owners.add(line) 503 continue 504 if line.startswith('file:'): 505 sub_include_file = self._resolve_include(line[5:], include_file, lineno) 506 sub_owners = self._read_just_the_owners(sub_include_file) 507 owners.update(sub_owners) 508 continue 509 510 raise SyntaxErrorInOwnersFile(include_file, lineno, 511 ('"%s" is not a "set noparent", file include, "*", ' 512 'or an email address.' % (line,))) 513 return owners 514 515 def _covering_set_of_owners_for(self, files, author): 516 dirs_remaining = set(self.enclosing_dir_with_owners(f) for f in files) 517 all_possible_owners = self.all_possible_owners(dirs_remaining, author) 518 suggested_owners = set() 519 while dirs_remaining and all_possible_owners: 520 owner = self.lowest_cost_owner(all_possible_owners, dirs_remaining) 521 suggested_owners.add(owner) 522 dirs_to_remove = set(el[0] for el in all_possible_owners[owner]) 523 dirs_remaining -= dirs_to_remove 524 # Now that we've used `owner` and covered all their dirs, remove them 525 # from consideration. 526 del all_possible_owners[owner] 527 for o, dirs in list(all_possible_owners.items()): 528 new_dirs = [(d, dist) for (d, dist) in dirs if d not in dirs_to_remove] 529 if not new_dirs: 530 del all_possible_owners[o] 531 else: 532 all_possible_owners[o] = new_dirs 533 return suggested_owners 534 535 def _all_possible_owners_for_dir_or_file(self, dir_or_file, author, 536 cache): 537 """Returns a dict of {potential owner: (dir_or_file, distance)} mappings. 538 """ 539 assert not dir_or_file.startswith("/") 540 res = cache.get(dir_or_file) 541 if res is None: 542 res = {} 543 dirname = dir_or_file 544 for owner in self._owners_for(dirname): 545 if author and owner == author: 546 continue 547 res.setdefault(owner, []) 548 res[owner] = (dir_or_file, 1) 549 if not self._should_stop_looking(dirname): 550 dirname = self.os_path.dirname(dirname) 551 552 parent_res = self._all_possible_owners_for_dir_or_file(dirname, 553 author, cache) 554 555 # Merge the parent information with our information, adjusting 556 # distances as necessary, and replacing the parent directory 557 # names with our names. 558 for owner, par_dir_and_distances in parent_res.items(): 559 if owner in res: 560 # If the same person is in multiple OWNERS files above a given 561 # directory, only count the closest one. 562 continue 563 parent_distance = par_dir_and_distances[1] 564 res[owner] = (dir_or_file, parent_distance + 1) 565 566 cache[dir_or_file] = res 567 568 return res 569 570 def all_possible_owners(self, dirs_and_files, author): 571 """Returns a dict of {potential owner: (dir, distance)} mappings. 572 573 A distance of 1 is the lowest/closest possible distance (which makes the 574 subsequent math easier). 575 """ 576 577 self.load_data_needed_for(dirs_and_files) 578 all_possible_owners_for_dir_or_file_cache = {} 579 all_possible_owners = {} 580 for current_dir in dirs_and_files: 581 # Always use slashes as separators. 582 current_dir = current_dir.replace(os.sep, '/') 583 dir_owners = self._all_possible_owners_for_dir_or_file( 584 current_dir, author, 585 all_possible_owners_for_dir_or_file_cache) 586 for owner, dir_and_distance in dir_owners.items(): 587 if owner in all_possible_owners: 588 all_possible_owners[owner].append(dir_and_distance) 589 else: 590 all_possible_owners[owner] = [dir_and_distance] 591 592 return all_possible_owners 593 594 def _fnmatch(self, filename, pattern): 595 """Same as fnmatch.fnmatch(), but internally caches the compiled regexes.""" 596 # Make sure backslashes are never used in the filename. The regex 597 # expressions generated by fnmatch.translate don't handle matching slashes 598 # to backslashes. 599 filename = filename.replace(os.sep, '/') 600 assert pattern.count('\\') == 0, 'Backslashes found in %s' % pattern 601 matcher = self._fnmatch_cache.get(pattern) 602 if matcher is None: 603 matcher = re.compile(fnmatch.translate(pattern)).match 604 self._fnmatch_cache[pattern] = matcher 605 return matcher(filename) 606 607 @staticmethod 608 def total_costs_by_owner(all_possible_owners, dirs): 609 # We want to minimize both the number of reviewers and the distance 610 # from the files/dirs needing reviews. The "pow(X, 1.75)" below is 611 # an arbitrarily-selected scaling factor that seems to work well - it 612 # will select one reviewer in the parent directory over three reviewers 613 # in subdirs, but not one reviewer over just two. 614 result = {} 615 for owner in all_possible_owners: 616 total_distance = 0 617 num_directories_owned = 0 618 for dirname, distance in all_possible_owners[owner]: 619 if dirname in dirs: 620 total_distance += distance 621 num_directories_owned += 1 622 if num_directories_owned: 623 result[owner] = (total_distance / 624 pow(num_directories_owned, 1.75)) 625 return result 626 627 @staticmethod 628 def lowest_cost_owner(all_possible_owners, dirs): 629 total_costs_by_owner = Database.total_costs_by_owner(all_possible_owners, 630 dirs) 631 # Return the lowest cost owner. In the case of a tie, pick one randomly. 632 lowest_cost = min(total_costs_by_owner.values()) 633 lowest_cost_owners = [ 634 owner for owner, cost in total_costs_by_owner.items() 635 if cost == lowest_cost] 636 return random.Random().choice(lowest_cost_owners) 637 638 def owners_rooted_at_file(self, filename): 639 """Returns a set of all owners transitively listed in filename. 640 641 This function returns a set of all the owners either listed in filename, or 642 in a file transitively included by filename. Lines that are not plain owners 643 (i.e. per-file owners) are ignored. 644 """ 645 return self._read_just_the_owners(filename) 646