1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5import atexit 6import copy 7import logging 8import os 9import signal 10import sys 11import time 12import traceback 13from concurrent.futures import ProcessPoolExecutor 14from concurrent.futures.process import _python_exit as futures_atexit 15from itertools import chain 16from math import ceil 17from multiprocessing import cpu_count, get_context 18from multiprocessing.queues import Queue 19from subprocess import CalledProcessError 20 21import mozpack.path as mozpath 22from mozversioncontrol import ( 23 get_repository_object, 24 MissingUpstreamRepo, 25 InvalidRepoPath, 26) 27 28from .errors import LintersNotConfigured, NoValidLinter 29from .parser import Parser 30from .pathutils import findobject 31from .result import ResultSummary 32from .types import supported_types 33 34SHUTDOWN = False 35orig_sigint = signal.getsignal(signal.SIGINT) 36 37logger = logging.getLogger("mozlint") 38handler = logging.StreamHandler() 39formatter = logging.Formatter( 40 "%(asctime)s.%(msecs)d %(lintname)s (%(pid)s) | %(message)s", "%H:%M:%S" 41) 42handler.setFormatter(formatter) 43logger.addHandler(handler) 44 45 46def _run_worker(config, paths, **lintargs): 47 log = logging.LoggerAdapter( 48 logger, {"lintname": config.get("name"), "pid": os.getpid()} 49 ) 50 lintargs["log"] = log 51 result = ResultSummary(lintargs["root"]) 52 53 if SHUTDOWN: 54 return result 55 56 # Override warnings setup for code review 57 # Only disactivating when code_review_warnings is set to False on a linter.yml in use 58 if os.environ.get("CODE_REVIEW") == "1" and config.get( 59 "code_review_warnings", True 60 ): 61 lintargs["show_warnings"] = True 62 63 # Override ignore thirdparty 64 # Only deactivating include_thirdparty is set on a linter.yml in use 65 if config.get("include_thirdparty", False): 66 lintargs["include_thirdparty"] = True 67 68 func = supported_types[config["type"]] 69 start_time = time.time() 70 try: 71 res = func(paths, config, **lintargs) 72 # Some linters support fixed operations 73 # dict returned - {"results":results,"fixed":fixed} 74 if isinstance(res, dict): 75 result.fixed += res["fixed"] 76 res = res["results"] or [] 77 elif isinstance(res, list): 78 res = res or [] 79 else: 80 print("Unexpected type received") 81 assert False 82 except Exception: 83 traceback.print_exc() 84 res = 1 85 except (KeyboardInterrupt, SystemExit): 86 return result 87 finally: 88 end_time = time.time() 89 log.debug("Finished in {:.2f} seconds".format(end_time - start_time)) 90 sys.stdout.flush() 91 92 if not isinstance(res, (list, tuple)): 93 if res: 94 result.failed_run.add(config["name"]) 95 else: 96 for r in res: 97 if not lintargs.get("show_warnings") and r.level == "warning": 98 result.suppressed_warnings[r.path] += 1 99 continue 100 101 result.issues[r.path].append(r) 102 103 return result 104 105 106class InterruptableQueue(Queue): 107 """A multiprocessing.Queue that catches KeyboardInterrupt when a worker is 108 blocking on it and returns None. 109 110 This is needed to gracefully handle KeyboardInterrupts when a worker is 111 blocking on ProcessPoolExecutor's call queue. 112 """ 113 114 def __init__(self, *args, **kwargs): 115 kwargs["ctx"] = get_context() 116 super(InterruptableQueue, self).__init__(*args, **kwargs) 117 118 def get(self, *args, **kwargs): 119 try: 120 return Queue.get(self, *args, **kwargs) 121 except KeyboardInterrupt: 122 return None 123 124 125def _worker_sigint_handler(signum, frame): 126 """Sigint handler for the worker subprocesses. 127 128 Tells workers not to process the extra jobs on the call queue that couldn't 129 be canceled by the parent process. 130 """ 131 global SHUTDOWN 132 SHUTDOWN = True 133 orig_sigint(signum, frame) 134 135 136def wrap_futures_atexit(): 137 """Sometimes futures' atexit handler can spew tracebacks. This wrapper 138 suppresses them.""" 139 try: 140 futures_atexit() 141 except Exception: 142 # Generally `atexit` handlers aren't supposed to raise exceptions, but the 143 # futures' handler can sometimes raise when the user presses `CTRL-C`. We 144 # suppress all possible exceptions here so users have a nice experience 145 # when canceling their lint run. Any exceptions raised by this function 146 # won't be useful anyway. 147 pass 148 149 150atexit.unregister(futures_atexit) 151atexit.register(wrap_futures_atexit) 152 153 154class LintRoller(object): 155 """Registers and runs linters. 156 157 :param root: Path to which relative paths will be joined. If 158 unspecified, root will either be determined from 159 version control or cwd. 160 :param lintargs: Arguments to pass to the underlying linter(s). 161 """ 162 163 MAX_PATHS_PER_JOB = ( 164 50 # set a max size to prevent command lines that are too long on Windows 165 ) 166 167 def __init__(self, root, exclude=None, setupargs=None, **lintargs): 168 self.parse = Parser(root) 169 try: 170 self.vcs = get_repository_object(root) 171 except InvalidRepoPath: 172 self.vcs = None 173 174 self.linters = [] 175 self.lintargs = lintargs 176 self.lintargs["root"] = root 177 self._setupargs = setupargs or {} 178 179 # result state 180 self.result = ResultSummary(root) 181 182 self.root = root 183 self.exclude = exclude or [] 184 185 if lintargs.get("show_verbose"): 186 logger.setLevel(logging.DEBUG) 187 else: 188 logger.setLevel(logging.WARNING) 189 190 self.log = logging.LoggerAdapter( 191 logger, {"lintname": "mozlint", "pid": os.getpid()} 192 ) 193 194 def read(self, paths): 195 """Parse one or more linters and add them to the registry. 196 197 :param paths: A path or iterable of paths to linter definitions. 198 """ 199 if isinstance(paths, str): 200 paths = (paths,) 201 202 for linter in chain(*[self.parse(p) for p in paths]): 203 # Add only the excludes present in paths 204 linter["local_exclude"] = linter.get("exclude", [])[:] 205 # Add in our global excludes 206 linter.setdefault("exclude", []).extend(self.exclude) 207 self.linters.append(linter) 208 209 def setup(self, virtualenv_manager=None): 210 """Run setup for applicable linters""" 211 if not self.linters: 212 raise NoValidLinter 213 214 for linter in self.linters: 215 if "setup" not in linter: 216 continue 217 218 try: 219 setupargs = copy.deepcopy(self.lintargs) 220 setupargs.update(self._setupargs) 221 setupargs["name"] = linter["name"] 222 setupargs["log"] = logging.LoggerAdapter( 223 self.log, {"lintname": linter["name"]} 224 ) 225 if virtualenv_manager is not None: 226 setupargs["virtualenv_manager"] = virtualenv_manager 227 start_time = time.time() 228 res = findobject(linter["setup"])( 229 **setupargs, 230 ) 231 self.log.debug( 232 f"setup for {linter['name']} finished in " 233 f"{round(time.time() - start_time, 2)} seconds" 234 ) 235 except Exception: 236 traceback.print_exc() 237 res = 1 238 239 if res: 240 self.result.failed_setup.add(linter["name"]) 241 242 if self.result.failed_setup: 243 print( 244 "error: problem with lint setup, skipping {}".format( 245 ", ".join(sorted(self.result.failed_setup)) 246 ) 247 ) 248 self.linters = [ 249 l for l in self.linters if l["name"] not in self.result.failed_setup 250 ] 251 return 1 252 return 0 253 254 def _generate_jobs(self, paths, vcs_paths, num_procs): 255 def __get_current_paths(path=self.root): 256 return [os.path.join(path, p) for p in os.listdir(path)] 257 258 """A job is of the form (<linter:dict>, <paths:list>).""" 259 for linter in self.linters: 260 if any( 261 os.path.isfile(p) and mozpath.match(p, pattern) 262 for pattern in linter.get("support-files", []) 263 for p in vcs_paths 264 ): 265 lpaths = __get_current_paths() 266 print( 267 "warning: {} support-file modified, linting entire tree " 268 "(press ctrl-c to cancel)".format(linter["name"]) 269 ) 270 else: 271 lpaths = paths.union(vcs_paths) 272 273 lpaths = list(lpaths) or __get_current_paths(os.getcwd()) 274 chunk_size = ( 275 min(self.MAX_PATHS_PER_JOB, int(ceil(len(lpaths) / num_procs))) or 1 276 ) 277 if linter["type"] == "global": 278 # Global linters lint the entire tree in one job. 279 chunk_size = len(lpaths) or 1 280 assert chunk_size > 0 281 282 while lpaths: 283 yield linter, lpaths[:chunk_size] 284 lpaths = lpaths[chunk_size:] 285 286 def _collect_results(self, future): 287 if future.cancelled(): 288 return 289 290 # Merge this job's results with our global ones. 291 self.result.update(future.result()) 292 293 def roll(self, paths=None, outgoing=None, workdir=None, rev=None, num_procs=None): 294 """Run all of the registered linters against the specified file paths. 295 296 :param paths: An iterable of files and/or directories to lint. 297 :param outgoing: Lint files touched by commits that are not on the remote repository. 298 :param workdir: Lint all files touched in the working directory. 299 :param num_procs: The number of processes to use. Default: cpu count 300 :return: A :class:`~result.ResultSummary` instance. 301 """ 302 if not self.linters: 303 raise LintersNotConfigured 304 305 self.result.reset() 306 307 # Need to use a set in case vcs operations specify the same file 308 # more than once. 309 paths = paths or set() 310 if isinstance(paths, str): 311 paths = set([paths]) 312 elif isinstance(paths, (list, tuple)): 313 paths = set(paths) 314 315 if not self.vcs and (workdir or outgoing): 316 print( 317 "error: '{}' is not a known repository, can't use " 318 "--workdir or --outgoing".format(self.lintargs["root"]) 319 ) 320 321 # Calculate files from VCS 322 vcs_paths = set() 323 try: 324 if workdir: 325 vcs_paths.update(self.vcs.get_changed_files("AM", mode=workdir)) 326 if rev: 327 vcs_paths.update(self.vcs.get_changed_files("AM", rev=rev)) 328 if outgoing: 329 upstream = outgoing if isinstance(outgoing, str) else None 330 try: 331 vcs_paths.update( 332 self.vcs.get_outgoing_files("AM", upstream=upstream) 333 ) 334 except MissingUpstreamRepo: 335 print( 336 "warning: could not find default push, specify a remote for --outgoing" 337 ) 338 except CalledProcessError as e: 339 print("error running: {}".format(" ".join(e.cmd))) 340 if e.output: 341 print(e.output) 342 343 if not (paths or vcs_paths) and (workdir or outgoing): 344 if os.environ.get("MOZ_AUTOMATION") and not os.environ.get( 345 "PYTEST_CURRENT_TEST" 346 ): 347 raise Exception( 348 "Despite being a CI lint job, no files were linted. Is the task " 349 "missing explicit paths?" 350 ) 351 352 print("warning: no files linted") 353 return self.result 354 355 # Make sure all paths are absolute. Join `paths` to cwd and `vcs_paths` to root. 356 paths = set(map(os.path.abspath, paths)) 357 vcs_paths = set( 358 [ 359 os.path.join(self.root, p) if not os.path.isabs(p) else p 360 for p in vcs_paths 361 ] 362 ) 363 364 num_procs = num_procs or cpu_count() 365 jobs = list(self._generate_jobs(paths, vcs_paths, num_procs)) 366 367 # Make sure we never spawn more processes than we have jobs. 368 num_procs = min(len(jobs), num_procs) or 1 369 if sys.platform == "win32": 370 # https://github.com/python/cpython/pull/13132 371 num_procs = min(num_procs, 61) 372 373 signal.signal(signal.SIGINT, _worker_sigint_handler) 374 executor = ProcessPoolExecutor(num_procs) 375 executor._call_queue = InterruptableQueue(executor._call_queue._maxsize) 376 377 # Submit jobs to the worker pool. The _collect_results method will be 378 # called when a job is finished. We store the futures so that they can 379 # be canceled in the event of a KeyboardInterrupt. 380 futures = [] 381 for job in jobs: 382 future = executor.submit(_run_worker, *job, **self.lintargs) 383 future.add_done_callback(self._collect_results) 384 futures.append(future) 385 386 def _parent_sigint_handler(signum, frame): 387 """Sigint handler for the parent process. 388 389 Cancels all jobs that have not yet been placed on the call queue. 390 The parent process won't exit until all workers have terminated. 391 Assuming the linters are implemented properly, this shouldn't take 392 more than a couple seconds. 393 """ 394 [f.cancel() for f in futures] 395 executor.shutdown(wait=True) 396 print("\nwarning: not all files were linted") 397 signal.signal(signal.SIGINT, signal.SIG_IGN) 398 399 signal.signal(signal.SIGINT, _parent_sigint_handler) 400 executor.shutdown() 401 signal.signal(signal.SIGINT, orig_sigint) 402 return self.result 403