1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5import atexit
6import copy
7import logging
8import os
9import signal
10import sys
11import time
12import traceback
13from concurrent.futures import ProcessPoolExecutor
14from concurrent.futures.process import _python_exit as futures_atexit
15from itertools import chain
16from math import ceil
17from multiprocessing import cpu_count, get_context
18from multiprocessing.queues import Queue
19from subprocess import CalledProcessError
20
21import mozpack.path as mozpath
22from mozversioncontrol import (
23    get_repository_object,
24    MissingUpstreamRepo,
25    InvalidRepoPath,
26)
27
28from .errors import LintersNotConfigured, NoValidLinter
29from .parser import Parser
30from .pathutils import findobject
31from .result import ResultSummary
32from .types import supported_types
33
34SHUTDOWN = False
35orig_sigint = signal.getsignal(signal.SIGINT)
36
37logger = logging.getLogger("mozlint")
38handler = logging.StreamHandler()
39formatter = logging.Formatter(
40    "%(asctime)s.%(msecs)d %(lintname)s (%(pid)s) | %(message)s", "%H:%M:%S"
41)
42handler.setFormatter(formatter)
43logger.addHandler(handler)
44
45
46def _run_worker(config, paths, **lintargs):
47    log = logging.LoggerAdapter(
48        logger, {"lintname": config.get("name"), "pid": os.getpid()}
49    )
50    lintargs["log"] = log
51    result = ResultSummary(lintargs["root"])
52
53    if SHUTDOWN:
54        return result
55
56    # Override warnings setup for code review
57    # Only disactivating when code_review_warnings is set to False on a linter.yml in use
58    if os.environ.get("CODE_REVIEW") == "1" and config.get(
59        "code_review_warnings", True
60    ):
61        lintargs["show_warnings"] = True
62
63    # Override ignore thirdparty
64    # Only deactivating include_thirdparty is set on a linter.yml in use
65    if config.get("include_thirdparty", False):
66        lintargs["include_thirdparty"] = True
67
68    func = supported_types[config["type"]]
69    start_time = time.time()
70    try:
71        res = func(paths, config, **lintargs)
72        # Some linters support fixed operations
73        # dict returned - {"results":results,"fixed":fixed}
74        if isinstance(res, dict):
75            result.fixed += res["fixed"]
76            res = res["results"] or []
77        elif isinstance(res, list):
78            res = res or []
79        else:
80            print("Unexpected type received")
81            assert False
82    except Exception:
83        traceback.print_exc()
84        res = 1
85    except (KeyboardInterrupt, SystemExit):
86        return result
87    finally:
88        end_time = time.time()
89        log.debug("Finished in {:.2f} seconds".format(end_time - start_time))
90        sys.stdout.flush()
91
92    if not isinstance(res, (list, tuple)):
93        if res:
94            result.failed_run.add(config["name"])
95    else:
96        for r in res:
97            if not lintargs.get("show_warnings") and r.level == "warning":
98                result.suppressed_warnings[r.path] += 1
99                continue
100
101            result.issues[r.path].append(r)
102
103    return result
104
105
106class InterruptableQueue(Queue):
107    """A multiprocessing.Queue that catches KeyboardInterrupt when a worker is
108    blocking on it and returns None.
109
110    This is needed to gracefully handle KeyboardInterrupts when a worker is
111    blocking on ProcessPoolExecutor's call queue.
112    """
113
114    def __init__(self, *args, **kwargs):
115        kwargs["ctx"] = get_context()
116        super(InterruptableQueue, self).__init__(*args, **kwargs)
117
118    def get(self, *args, **kwargs):
119        try:
120            return Queue.get(self, *args, **kwargs)
121        except KeyboardInterrupt:
122            return None
123
124
125def _worker_sigint_handler(signum, frame):
126    """Sigint handler for the worker subprocesses.
127
128    Tells workers not to process the extra jobs on the call queue that couldn't
129    be canceled by the parent process.
130    """
131    global SHUTDOWN
132    SHUTDOWN = True
133    orig_sigint(signum, frame)
134
135
136def wrap_futures_atexit():
137    """Sometimes futures' atexit handler can spew tracebacks. This wrapper
138    suppresses them."""
139    try:
140        futures_atexit()
141    except Exception:
142        # Generally `atexit` handlers aren't supposed to raise exceptions, but the
143        # futures' handler can sometimes raise when the user presses `CTRL-C`. We
144        # suppress all possible exceptions here so users have a nice experience
145        # when canceling their lint run. Any exceptions raised by this function
146        # won't be useful anyway.
147        pass
148
149
150atexit.unregister(futures_atexit)
151atexit.register(wrap_futures_atexit)
152
153
154class LintRoller(object):
155    """Registers and runs linters.
156
157    :param root: Path to which relative paths will be joined. If
158                 unspecified, root will either be determined from
159                 version control or cwd.
160    :param lintargs: Arguments to pass to the underlying linter(s).
161    """
162
163    MAX_PATHS_PER_JOB = (
164        50  # set a max size to prevent command lines that are too long on Windows
165    )
166
167    def __init__(self, root, exclude=None, setupargs=None, **lintargs):
168        self.parse = Parser(root)
169        try:
170            self.vcs = get_repository_object(root)
171        except InvalidRepoPath:
172            self.vcs = None
173
174        self.linters = []
175        self.lintargs = lintargs
176        self.lintargs["root"] = root
177        self._setupargs = setupargs or {}
178
179        # result state
180        self.result = ResultSummary(root)
181
182        self.root = root
183        self.exclude = exclude or []
184
185        if lintargs.get("show_verbose"):
186            logger.setLevel(logging.DEBUG)
187        else:
188            logger.setLevel(logging.WARNING)
189
190        self.log = logging.LoggerAdapter(
191            logger, {"lintname": "mozlint", "pid": os.getpid()}
192        )
193
194    def read(self, paths):
195        """Parse one or more linters and add them to the registry.
196
197        :param paths: A path or iterable of paths to linter definitions.
198        """
199        if isinstance(paths, str):
200            paths = (paths,)
201
202        for linter in chain(*[self.parse(p) for p in paths]):
203            # Add only the excludes present in paths
204            linter["local_exclude"] = linter.get("exclude", [])[:]
205            # Add in our global excludes
206            linter.setdefault("exclude", []).extend(self.exclude)
207            self.linters.append(linter)
208
209    def setup(self, virtualenv_manager=None):
210        """Run setup for applicable linters"""
211        if not self.linters:
212            raise NoValidLinter
213
214        for linter in self.linters:
215            if "setup" not in linter:
216                continue
217
218            try:
219                setupargs = copy.deepcopy(self.lintargs)
220                setupargs.update(self._setupargs)
221                setupargs["name"] = linter["name"]
222                setupargs["log"] = logging.LoggerAdapter(
223                    self.log, {"lintname": linter["name"]}
224                )
225                if virtualenv_manager is not None:
226                    setupargs["virtualenv_manager"] = virtualenv_manager
227                start_time = time.time()
228                res = findobject(linter["setup"])(
229                    **setupargs,
230                )
231                self.log.debug(
232                    f"setup for {linter['name']} finished in "
233                    f"{round(time.time() - start_time, 2)} seconds"
234                )
235            except Exception:
236                traceback.print_exc()
237                res = 1
238
239            if res:
240                self.result.failed_setup.add(linter["name"])
241
242        if self.result.failed_setup:
243            print(
244                "error: problem with lint setup, skipping {}".format(
245                    ", ".join(sorted(self.result.failed_setup))
246                )
247            )
248            self.linters = [
249                l for l in self.linters if l["name"] not in self.result.failed_setup
250            ]
251            return 1
252        return 0
253
254    def _generate_jobs(self, paths, vcs_paths, num_procs):
255        def __get_current_paths(path=self.root):
256            return [os.path.join(path, p) for p in os.listdir(path)]
257
258        """A job is of the form (<linter:dict>, <paths:list>)."""
259        for linter in self.linters:
260            if any(
261                os.path.isfile(p) and mozpath.match(p, pattern)
262                for pattern in linter.get("support-files", [])
263                for p in vcs_paths
264            ):
265                lpaths = __get_current_paths()
266                print(
267                    "warning: {} support-file modified, linting entire tree "
268                    "(press ctrl-c to cancel)".format(linter["name"])
269                )
270            else:
271                lpaths = paths.union(vcs_paths)
272
273            lpaths = list(lpaths) or __get_current_paths(os.getcwd())
274            chunk_size = (
275                min(self.MAX_PATHS_PER_JOB, int(ceil(len(lpaths) / num_procs))) or 1
276            )
277            if linter["type"] == "global":
278                # Global linters lint the entire tree in one job.
279                chunk_size = len(lpaths) or 1
280            assert chunk_size > 0
281
282            while lpaths:
283                yield linter, lpaths[:chunk_size]
284                lpaths = lpaths[chunk_size:]
285
286    def _collect_results(self, future):
287        if future.cancelled():
288            return
289
290        # Merge this job's results with our global ones.
291        self.result.update(future.result())
292
293    def roll(self, paths=None, outgoing=None, workdir=None, rev=None, num_procs=None):
294        """Run all of the registered linters against the specified file paths.
295
296        :param paths: An iterable of files and/or directories to lint.
297        :param outgoing: Lint files touched by commits that are not on the remote repository.
298        :param workdir: Lint all files touched in the working directory.
299        :param num_procs: The number of processes to use. Default: cpu count
300        :return: A :class:`~result.ResultSummary` instance.
301        """
302        if not self.linters:
303            raise LintersNotConfigured
304
305        self.result.reset()
306
307        # Need to use a set in case vcs operations specify the same file
308        # more than once.
309        paths = paths or set()
310        if isinstance(paths, str):
311            paths = set([paths])
312        elif isinstance(paths, (list, tuple)):
313            paths = set(paths)
314
315        if not self.vcs and (workdir or outgoing):
316            print(
317                "error: '{}' is not a known repository, can't use "
318                "--workdir or --outgoing".format(self.lintargs["root"])
319            )
320
321        # Calculate files from VCS
322        vcs_paths = set()
323        try:
324            if workdir:
325                vcs_paths.update(self.vcs.get_changed_files("AM", mode=workdir))
326            if rev:
327                vcs_paths.update(self.vcs.get_changed_files("AM", rev=rev))
328            if outgoing:
329                upstream = outgoing if isinstance(outgoing, str) else None
330                try:
331                    vcs_paths.update(
332                        self.vcs.get_outgoing_files("AM", upstream=upstream)
333                    )
334                except MissingUpstreamRepo:
335                    print(
336                        "warning: could not find default push, specify a remote for --outgoing"
337                    )
338        except CalledProcessError as e:
339            print("error running: {}".format(" ".join(e.cmd)))
340            if e.output:
341                print(e.output)
342
343        if not (paths or vcs_paths) and (workdir or outgoing):
344            if os.environ.get("MOZ_AUTOMATION") and not os.environ.get(
345                "PYTEST_CURRENT_TEST"
346            ):
347                raise Exception(
348                    "Despite being a CI lint job, no files were linted. Is the task "
349                    "missing explicit paths?"
350                )
351
352            print("warning: no files linted")
353            return self.result
354
355        # Make sure all paths are absolute. Join `paths` to cwd and `vcs_paths` to root.
356        paths = set(map(os.path.abspath, paths))
357        vcs_paths = set(
358            [
359                os.path.join(self.root, p) if not os.path.isabs(p) else p
360                for p in vcs_paths
361            ]
362        )
363
364        num_procs = num_procs or cpu_count()
365        jobs = list(self._generate_jobs(paths, vcs_paths, num_procs))
366
367        # Make sure we never spawn more processes than we have jobs.
368        num_procs = min(len(jobs), num_procs) or 1
369        if sys.platform == "win32":
370            # https://github.com/python/cpython/pull/13132
371            num_procs = min(num_procs, 61)
372
373        signal.signal(signal.SIGINT, _worker_sigint_handler)
374        executor = ProcessPoolExecutor(num_procs)
375        executor._call_queue = InterruptableQueue(executor._call_queue._maxsize)
376
377        # Submit jobs to the worker pool. The _collect_results method will be
378        # called when a job is finished. We store the futures so that they can
379        # be canceled in the event of a KeyboardInterrupt.
380        futures = []
381        for job in jobs:
382            future = executor.submit(_run_worker, *job, **self.lintargs)
383            future.add_done_callback(self._collect_results)
384            futures.append(future)
385
386        def _parent_sigint_handler(signum, frame):
387            """Sigint handler for the parent process.
388
389            Cancels all jobs that have not yet been placed on the call queue.
390            The parent process won't exit until all workers have terminated.
391            Assuming the linters are implemented properly, this shouldn't take
392            more than a couple seconds.
393            """
394            [f.cancel() for f in futures]
395            executor.shutdown(wait=True)
396            print("\nwarning: not all files were linted")
397            signal.signal(signal.SIGINT, signal.SIG_IGN)
398
399        signal.signal(signal.SIGINT, _parent_sigint_handler)
400        executor.shutdown()
401        signal.signal(signal.SIGINT, orig_sigint)
402        return self.result
403