1"""Git Utility functions.""" 2from __future__ import absolute_import 3from __future__ import print_function 4 5import itertools 6import os 7import re 8from typing import Any, Callable, List, Tuple 9 10from buildscripts import git as _git 11from buildscripts import moduleconfig 12from buildscripts.resmokelib.utils import globstar 13 14# Path to the modules in the mongodb source tree 15# Has to match the string in SConstruct 16MODULE_DIR = "src/mongo/db/modules" 17 18 19def get_base_dir(): 20 # type: () -> str 21 """ 22 Get the base directory for mongo repo. 23 24 This script assumes that it is running in buildscripts/, and uses 25 that to find the base directory. 26 """ 27 try: 28 return _git.Repository.get_base_directory() 29 except _git.GitException: 30 # We are not in a valid git directory. Use the script path instead. 31 return os.path.dirname(os.path.dirname(os.path.realpath(__file__))) 32 33 34def get_repos(): 35 # type: () -> List[Repo] 36 """Get a list of Repos to check linters for.""" 37 base_dir = get_base_dir() 38 39 # Get a list of modules 40 # TODO: how do we filter rocks, does it matter? 41 mongo_modules = moduleconfig.discover_module_directories( 42 os.path.join(base_dir, MODULE_DIR), None) 43 44 paths = [os.path.join(base_dir, MODULE_DIR, m) for m in mongo_modules] 45 46 paths.append(base_dir) 47 48 return [Repo(p) for p in paths] 49 50 51class Repo(_git.Repository): 52 """Class encapsulates all knowledge about a git repository, and its metadata to run linters.""" 53 54 def _get_local_dir(self, path): 55 # type: (str) -> str 56 """Get a directory path relative to the git root directory.""" 57 if os.path.isabs(path): 58 path = os.path.relpath(path, self.directory) 59 60 # Normalize Windows style paths to Unix style which git uses on all platforms 61 path = path.replace("\\", "/") 62 63 return path 64 65 def get_candidates(self, candidates, filter_function): 66 # type: (List[str], Callable[[str], bool]) -> List[str] 67 """ 68 Get the set of candidate files to check by querying the repository. 69 70 Returns the full path to the file for clang-format to consume. 71 """ 72 if candidates is not None and len(candidates) > 0: 73 candidates = [self._get_local_dir(f) for f in candidates] 74 valid_files = list( 75 set(candidates).intersection(self.get_candidate_files(filter_function))) 76 else: 77 valid_files = list(self.get_candidate_files(filter_function)) 78 79 # Get the full file name here 80 valid_files = [os.path.normpath(os.path.join(self.directory, f)) for f in valid_files] 81 82 return valid_files 83 84 def _git_ls_files(self, args, filter_function): 85 # type: (List[str], Callable[[str], bool]) -> List[str] 86 """Run git-ls-files and filter the list of files to a valid candidate list.""" 87 gito = self.git_ls_files(args) 88 89 # This allows us to pick all the interesting files 90 # in the mongo and mongo-enterprise repos 91 file_list = [line.rstrip() for line in gito.splitlines() if filter_function(line.rstrip())] 92 93 return file_list 94 95 def get_candidate_files(self, filter_function): 96 # type: (Callable[[str], bool]) -> List[str] 97 """Query git to get a list of all files in the repo to consider for analysis.""" 98 return self._git_ls_files(["--cached"], filter_function) 99 100 def get_working_tree_candidate_files(self, filter_function): 101 # type: (Callable[[str], bool]) -> List[str] 102 # pylint: disable=invalid-name 103 """Query git to get a list of all files in the working tree to consider for analysis.""" 104 return self._git_ls_files(["--cached", "--others"], filter_function) 105 106 def get_working_tree_candidates(self, filter_function): 107 # type: (Callable[[str], bool]) -> List[str] 108 """ 109 Get the set of candidate files to check by querying the repository. 110 111 Returns the full path to the file for clang-format to consume. 112 """ 113 valid_files = list(self.get_working_tree_candidate_files(filter_function)) 114 115 # Get the full file name here 116 valid_files = [os.path.normpath(os.path.join(self.directory, f)) for f in valid_files] 117 118 # Filter out files that git thinks exist but were removed. 119 valid_files = [f for f in valid_files if os.path.exists(f)] 120 121 return valid_files 122 123 124def expand_file_string(glob_pattern): 125 # type: (str) -> List[str] 126 """Expand a string that represents a set of files.""" 127 return [os.path.abspath(f) for f in globstar.iglob(glob_pattern)] 128 129 130def get_files_to_check_working_tree(filter_function): 131 # type: (Callable[[str], bool]) -> List[str] 132 """ 133 Get a list of files to check from the working tree. 134 135 This will pick up files not managed by git. 136 """ 137 repos = get_repos() 138 139 valid_files = list( 140 itertools.chain.from_iterable( 141 [r.get_working_tree_candidates(filter_function) for r in repos])) 142 143 return valid_files 144 145 146def get_files_to_check(files, filter_function): 147 # type: (List[str], Callable[[str], bool]) -> List[str] 148 """Get a list of files that need to be checked based on which files are managed by git.""" 149 # Get a list of candidate_files 150 candidates_nested = [expand_file_string(f) for f in files] 151 candidates = list(itertools.chain.from_iterable(candidates_nested)) 152 153 if len(files) > 0 and len(candidates) == 0: 154 raise ValueError("Globs '%s' did not find any files with glob." % (files)) 155 156 repos = get_repos() 157 158 valid_files = list( 159 itertools.chain.from_iterable( 160 [r.get_candidates(candidates, filter_function) for r in repos])) 161 162 if len(files) > 0 and len(valid_files) == 0: 163 raise ValueError("Globs '%s' did not find any files with glob in git." % (files)) 164 165 return valid_files 166 167 168def get_files_to_check_from_patch(patches, filter_function): 169 # type: (List[str], Callable[[str], bool]) -> List[str] 170 """Take a patch file generated by git diff, and scan the patch for a list of files to check.""" 171 candidates = [] # type: List[str] 172 173 # Get a list of candidate_files 174 check = re.compile(r"^diff --git a\/([\w\/\.\-]+) b\/[\w\/\.\-]+") 175 176 lines = [] # type: List[str] 177 for patch in patches: 178 with open(patch, "rb") as infile: 179 lines += infile.readlines() 180 181 candidates = [check.match(line).group(1) for line in lines if check.match(line)] 182 183 repos = get_repos() 184 185 valid_files = list( 186 itertools.chain.from_iterable( 187 [r.get_candidates(candidates, filter_function) for r in repos])) 188 189 return valid_files 190