1"""Git Utility functions."""
2from __future__ import absolute_import
3from __future__ import print_function
4
5import itertools
6import os
7import re
8from typing import Any, Callable, List, Tuple
9
10from buildscripts import git as _git
11from buildscripts import moduleconfig
12from buildscripts.resmokelib.utils import globstar
13
14# Path to the modules in the mongodb source tree
15# Has to match the string in SConstruct
16MODULE_DIR = "src/mongo/db/modules"
17
18
19def get_base_dir():
20    # type: () -> str
21    """
22    Get the base directory for mongo repo.
23
24    This script assumes that it is running in buildscripts/, and uses
25    that to find the base directory.
26    """
27    try:
28        return _git.Repository.get_base_directory()
29    except _git.GitException:
30        # We are not in a valid git directory. Use the script path instead.
31        return os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
32
33
34def get_repos():
35    # type: () -> List[Repo]
36    """Get a list of Repos to check linters for."""
37    base_dir = get_base_dir()
38
39    # Get a list of modules
40    # TODO: how do we filter rocks, does it matter?
41    mongo_modules = moduleconfig.discover_module_directories(
42        os.path.join(base_dir, MODULE_DIR), None)
43
44    paths = [os.path.join(base_dir, MODULE_DIR, m) for m in mongo_modules]
45
46    paths.append(base_dir)
47
48    return [Repo(p) for p in paths]
49
50
51class Repo(_git.Repository):
52    """Class encapsulates all knowledge about a git repository, and its metadata to run linters."""
53
54    def _get_local_dir(self, path):
55        # type: (str) -> str
56        """Get a directory path relative to the git root directory."""
57        if os.path.isabs(path):
58            path = os.path.relpath(path, self.directory)
59
60        # Normalize Windows style paths to Unix style which git uses on all platforms
61        path = path.replace("\\", "/")
62
63        return path
64
65    def get_candidates(self, candidates, filter_function):
66        # type: (List[str], Callable[[str], bool]) -> List[str]
67        """
68        Get the set of candidate files to check by querying the repository.
69
70        Returns the full path to the file for clang-format to consume.
71        """
72        if candidates is not None and len(candidates) > 0:
73            candidates = [self._get_local_dir(f) for f in candidates]
74            valid_files = list(
75                set(candidates).intersection(self.get_candidate_files(filter_function)))
76        else:
77            valid_files = list(self.get_candidate_files(filter_function))
78
79        # Get the full file name here
80        valid_files = [os.path.normpath(os.path.join(self.directory, f)) for f in valid_files]
81
82        return valid_files
83
84    def _git_ls_files(self, args, filter_function):
85        # type: (List[str], Callable[[str], bool]) -> List[str]
86        """Run git-ls-files and filter the list of files to a valid candidate list."""
87        gito = self.git_ls_files(args)
88
89        # This allows us to pick all the interesting files
90        # in the mongo and mongo-enterprise repos
91        file_list = [line.rstrip() for line in gito.splitlines() if filter_function(line.rstrip())]
92
93        return file_list
94
95    def get_candidate_files(self, filter_function):
96        # type: (Callable[[str], bool]) -> List[str]
97        """Query git to get a list of all files in the repo to consider for analysis."""
98        return self._git_ls_files(["--cached"], filter_function)
99
100    def get_working_tree_candidate_files(self, filter_function):
101        # type: (Callable[[str], bool]) -> List[str]
102        # pylint: disable=invalid-name
103        """Query git to get a list of all files in the working tree to consider for analysis."""
104        return self._git_ls_files(["--cached", "--others"], filter_function)
105
106    def get_working_tree_candidates(self, filter_function):
107        # type: (Callable[[str], bool]) -> List[str]
108        """
109        Get the set of candidate files to check by querying the repository.
110
111        Returns the full path to the file for clang-format to consume.
112        """
113        valid_files = list(self.get_working_tree_candidate_files(filter_function))
114
115        # Get the full file name here
116        valid_files = [os.path.normpath(os.path.join(self.directory, f)) for f in valid_files]
117
118        # Filter out files that git thinks exist but were removed.
119        valid_files = [f for f in valid_files if os.path.exists(f)]
120
121        return valid_files
122
123
124def expand_file_string(glob_pattern):
125    # type: (str) -> List[str]
126    """Expand a string that represents a set of files."""
127    return [os.path.abspath(f) for f in globstar.iglob(glob_pattern)]
128
129
130def get_files_to_check_working_tree(filter_function):
131    # type: (Callable[[str], bool]) -> List[str]
132    """
133    Get a list of files to check from the working tree.
134
135    This will pick up files not managed by git.
136    """
137    repos = get_repos()
138
139    valid_files = list(
140        itertools.chain.from_iterable(
141            [r.get_working_tree_candidates(filter_function) for r in repos]))
142
143    return valid_files
144
145
146def get_files_to_check(files, filter_function):
147    # type: (List[str], Callable[[str], bool]) -> List[str]
148    """Get a list of files that need to be checked based on which files are managed by git."""
149    # Get a list of candidate_files
150    candidates_nested = [expand_file_string(f) for f in files]
151    candidates = list(itertools.chain.from_iterable(candidates_nested))
152
153    if len(files) > 0 and len(candidates) == 0:
154        raise ValueError("Globs '%s' did not find any files with glob." % (files))
155
156    repos = get_repos()
157
158    valid_files = list(
159        itertools.chain.from_iterable(
160            [r.get_candidates(candidates, filter_function) for r in repos]))
161
162    if len(files) > 0 and len(valid_files) == 0:
163        raise ValueError("Globs '%s' did not find any files with glob in git." % (files))
164
165    return valid_files
166
167
168def get_files_to_check_from_patch(patches, filter_function):
169    # type: (List[str], Callable[[str], bool]) -> List[str]
170    """Take a patch file generated by git diff, and scan the patch for a list of files to check."""
171    candidates = []  # type: List[str]
172
173    # Get a list of candidate_files
174    check = re.compile(r"^diff --git a\/([\w\/\.\-]+) b\/[\w\/\.\-]+")
175
176    lines = []  # type: List[str]
177    for patch in patches:
178        with open(patch, "rb") as infile:
179            lines += infile.readlines()
180
181    candidates = [check.match(line).group(1) for line in lines if check.match(line)]
182
183    repos = get_repos()
184
185    valid_files = list(
186        itertools.chain.from_iterable(
187            [r.get_candidates(candidates, filter_function) for r in repos]))
188
189    return valid_files
190