1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Generic utils.""" 6 7from __future__ import print_function 8 9import codecs 10import collections 11import contextlib 12import datetime 13import errno 14import functools 15import io 16import logging 17import operator 18import os 19import pipes 20import platform 21import re 22import stat 23import subprocess 24import sys 25import tempfile 26import threading 27import time 28import subprocess2 29 30if sys.version_info.major == 2: 31 from cStringIO import StringIO 32 import collections as collections_abc 33 import Queue as queue 34 import urlparse 35else: 36 from collections import abc as collections_abc 37 from io import StringIO 38 import queue 39 import urllib.parse as urlparse 40 41 42RETRY_MAX = 3 43RETRY_INITIAL_SLEEP = 0.5 44START = datetime.datetime.now() 45 46 47_WARNINGS = [] 48 49 50# These repos are known to cause OOM errors on 32-bit platforms, due the the 51# very large objects they contain. It is not safe to use threaded index-pack 52# when cloning/fetching them. 53THREADED_INDEX_PACK_BLOCKLIST = [ 54 'https://chromium.googlesource.com/chromium/reference_builds/chrome_win.git' 55] 56 57"""To support rethrowing exceptions with tracebacks on both Py2 and 3.""" 58if sys.version_info.major == 2: 59 # We have to use exec to avoid a SyntaxError in Python 3. 60 exec("def reraise(typ, value, tb=None):\n raise typ, value, tb\n") 61else: 62 def reraise(typ, value, tb=None): 63 if value is None: 64 value = typ() 65 if value.__traceback__ is not tb: 66 raise value.with_traceback(tb) 67 raise value 68 69 70class Error(Exception): 71 """gclient exception class.""" 72 def __init__(self, msg, *args, **kwargs): 73 index = getattr(threading.currentThread(), 'index', 0) 74 if index: 75 msg = '\n'.join('%d> %s' % (index, l) for l in msg.splitlines()) 76 super(Error, self).__init__(msg, *args, **kwargs) 77 78 79def Elapsed(until=None): 80 if until is None: 81 until = datetime.datetime.now() 82 return str(until - START).partition('.')[0] 83 84 85def PrintWarnings(): 86 """Prints any accumulated warnings.""" 87 if _WARNINGS: 88 print('\n\nWarnings:', file=sys.stderr) 89 for warning in _WARNINGS: 90 print(warning, file=sys.stderr) 91 92 93def AddWarning(msg): 94 """Adds the given warning message to the list of accumulated warnings.""" 95 _WARNINGS.append(msg) 96 97 98def SplitUrlRevision(url): 99 """Splits url and returns a two-tuple: url, rev""" 100 if url.startswith('ssh:'): 101 # Make sure ssh://user-name@example.com/~/test.git@stable works 102 regex = r'(ssh://(?:[-.\w]+@)?[-\w:\.]+/[-~\w\./]+)(?:@(.+))?' 103 components = re.search(regex, url).groups() 104 else: 105 components = url.rsplit('@', 1) 106 if re.match(r'^\w+\@', url) and '@' not in components[0]: 107 components = [url] 108 109 if len(components) == 1: 110 components += [None] 111 return tuple(components) 112 113 114def IsGitSha(revision): 115 """Returns true if the given string is a valid hex-encoded sha""" 116 return re.match('^[a-fA-F0-9]{6,40}$', revision) is not None 117 118 119def IsFullGitSha(revision): 120 """Returns true if the given string is a valid hex-encoded full sha""" 121 return re.match('^[a-fA-F0-9]{40}$', revision) is not None 122 123 124def IsDateRevision(revision): 125 """Returns true if the given revision is of the form "{ ... }".""" 126 return bool(revision and re.match(r'^\{.+\}$', str(revision))) 127 128 129def MakeDateRevision(date): 130 """Returns a revision representing the latest revision before the given 131 date.""" 132 return "{" + date + "}" 133 134 135def SyntaxErrorToError(filename, e): 136 """Raises a gclient_utils.Error exception with the human readable message""" 137 try: 138 # Try to construct a human readable error message 139 if filename: 140 error_message = 'There is a syntax error in %s\n' % filename 141 else: 142 error_message = 'There is a syntax error\n' 143 error_message += 'Line #%s, character %s: "%s"' % ( 144 e.lineno, e.offset, re.sub(r'[\r\n]*$', '', e.text)) 145 except: 146 # Something went wrong, re-raise the original exception 147 raise e 148 else: 149 raise Error(error_message) 150 151 152class PrintableObject(object): 153 def __str__(self): 154 output = '' 155 for i in dir(self): 156 if i.startswith('__'): 157 continue 158 output += '%s = %s\n' % (i, str(getattr(self, i, ''))) 159 return output 160 161 162def AskForData(message): 163 # Use this so that it can be mocked in tests on Python 2 and 3. 164 try: 165 if sys.version_info.major == 2: 166 return raw_input(message) 167 return input(message) 168 except KeyboardInterrupt: 169 # Hide the exception. 170 sys.exit(1) 171 172 173def FileRead(filename, mode='rbU'): 174 # Always decodes output to a Unicode string. 175 # On Python 3 newlines are converted to '\n' by default and 'U' is deprecated. 176 if mode == 'rbU' and sys.version_info.major == 3: 177 mode = 'rb' 178 with open(filename, mode=mode) as f: 179 s = f.read() 180 if isinstance(s, bytes): 181 return s.decode('utf-8', 'replace') 182 return s 183 184 185def FileWrite(filename, content, mode='w', encoding='utf-8'): 186 with codecs.open(filename, mode=mode, encoding=encoding) as f: 187 f.write(content) 188 189 190@contextlib.contextmanager 191def temporary_directory(**kwargs): 192 tdir = tempfile.mkdtemp(**kwargs) 193 try: 194 yield tdir 195 finally: 196 if tdir: 197 rmtree(tdir) 198 199 200@contextlib.contextmanager 201def temporary_file(): 202 """Creates a temporary file. 203 204 On Windows, a file must be closed before it can be opened again. This function 205 allows to write something like: 206 207 with gclient_utils.temporary_file() as tmp: 208 gclient_utils.FileWrite(tmp, foo) 209 useful_stuff(tmp) 210 211 Instead of something like: 212 213 with tempfile.NamedTemporaryFile(delete=False) as tmp: 214 tmp.write(foo) 215 tmp.close() 216 try: 217 useful_stuff(tmp) 218 finally: 219 os.remove(tmp.name) 220 """ 221 handle, name = tempfile.mkstemp() 222 os.close(handle) 223 try: 224 yield name 225 finally: 226 os.remove(name) 227 228 229def safe_rename(old, new): 230 """Renames a file reliably. 231 232 Sometimes os.rename does not work because a dying git process keeps a handle 233 on it for a few seconds. An exception is then thrown, which make the program 234 give up what it was doing and remove what was deleted. 235 The only solution is to catch the exception and try again until it works. 236 """ 237 # roughly 10s 238 retries = 100 239 for i in range(retries): 240 try: 241 os.rename(old, new) 242 break 243 except OSError: 244 if i == (retries - 1): 245 # Give up. 246 raise 247 # retry 248 logging.debug("Renaming failed from %s to %s. Retrying ..." % (old, new)) 249 time.sleep(0.1) 250 251 252def rm_file_or_tree(path): 253 if os.path.isfile(path) or os.path.islink(path): 254 os.remove(path) 255 else: 256 rmtree(path) 257 258 259def rmtree(path): 260 """shutil.rmtree() on steroids. 261 262 Recursively removes a directory, even if it's marked read-only. 263 264 shutil.rmtree() doesn't work on Windows if any of the files or directories 265 are read-only. We need to be able to force the files to be writable (i.e., 266 deletable) as we traverse the tree. 267 268 Even with all this, Windows still sometimes fails to delete a file, citing 269 a permission error (maybe something to do with antivirus scans or disk 270 indexing). The best suggestion any of the user forums had was to wait a 271 bit and try again, so we do that too. It's hand-waving, but sometimes it 272 works. :/ 273 274 On POSIX systems, things are a little bit simpler. The modes of the files 275 to be deleted doesn't matter, only the modes of the directories containing 276 them are significant. As the directory tree is traversed, each directory 277 has its mode set appropriately before descending into it. This should 278 result in the entire tree being removed, with the possible exception of 279 *path itself, because nothing attempts to change the mode of its parent. 280 Doing so would be hazardous, as it's not a directory slated for removal. 281 In the ordinary case, this is not a problem: for our purposes, the user 282 will never lack write permission on *path's parent. 283 """ 284 if not os.path.exists(path): 285 return 286 287 if os.path.islink(path) or not os.path.isdir(path): 288 raise Error('Called rmtree(%s) in non-directory' % path) 289 290 if sys.platform == 'win32': 291 # Give up and use cmd.exe's rd command. 292 path = os.path.normcase(path) 293 for _ in range(3): 294 exitcode = subprocess.call(['cmd.exe', '/c', 'rd', '/q', '/s', path]) 295 if exitcode == 0: 296 return 297 else: 298 print('rd exited with code %d' % exitcode, file=sys.stderr) 299 time.sleep(3) 300 raise Exception('Failed to remove path %s' % path) 301 302 # On POSIX systems, we need the x-bit set on the directory to access it, 303 # the r-bit to see its contents, and the w-bit to remove files from it. 304 # The actual modes of the files within the directory is irrelevant. 305 os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 306 307 def remove(func, subpath): 308 func(subpath) 309 310 for fn in os.listdir(path): 311 # If fullpath is a symbolic link that points to a directory, isdir will 312 # be True, but we don't want to descend into that as a directory, we just 313 # want to remove the link. Check islink and treat links as ordinary files 314 # would be treated regardless of what they reference. 315 fullpath = os.path.join(path, fn) 316 if os.path.islink(fullpath) or not os.path.isdir(fullpath): 317 remove(os.remove, fullpath) 318 else: 319 # Recurse. 320 rmtree(fullpath) 321 322 remove(os.rmdir, path) 323 324 325def safe_makedirs(tree): 326 """Creates the directory in a safe manner. 327 328 Because multiple threads can create these directories concurrently, trap the 329 exception and pass on. 330 """ 331 count = 0 332 while not os.path.exists(tree): 333 count += 1 334 try: 335 os.makedirs(tree) 336 except OSError as e: 337 # 17 POSIX, 183 Windows 338 if e.errno not in (17, 183): 339 raise 340 if count > 40: 341 # Give up. 342 raise 343 344 345def CommandToStr(args): 346 """Converts an arg list into a shell escaped string.""" 347 return ' '.join(pipes.quote(arg) for arg in args) 348 349 350class Wrapper(object): 351 """Wraps an object, acting as a transparent proxy for all properties by 352 default. 353 """ 354 def __init__(self, wrapped): 355 self._wrapped = wrapped 356 357 def __getattr__(self, name): 358 return getattr(self._wrapped, name) 359 360 361class AutoFlush(Wrapper): 362 """Creates a file object clone to automatically flush after N seconds.""" 363 def __init__(self, wrapped, delay): 364 super(AutoFlush, self).__init__(wrapped) 365 if not hasattr(self, 'lock'): 366 self.lock = threading.Lock() 367 self.__last_flushed_at = time.time() 368 self.delay = delay 369 370 @property 371 def autoflush(self): 372 return self 373 374 def write(self, out, *args, **kwargs): 375 self._wrapped.write(out, *args, **kwargs) 376 should_flush = False 377 self.lock.acquire() 378 try: 379 if self.delay and (time.time() - self.__last_flushed_at) > self.delay: 380 should_flush = True 381 self.__last_flushed_at = time.time() 382 finally: 383 self.lock.release() 384 if should_flush: 385 self.flush() 386 387 388class Annotated(Wrapper): 389 """Creates a file object clone to automatically prepends every line in worker 390 threads with a NN> prefix. 391 """ 392 def __init__(self, wrapped, include_zero=False): 393 super(Annotated, self).__init__(wrapped) 394 if not hasattr(self, 'lock'): 395 self.lock = threading.Lock() 396 self.__output_buffers = {} 397 self.__include_zero = include_zero 398 self._wrapped_write = getattr(self._wrapped, 'buffer', self._wrapped).write 399 400 @property 401 def annotated(self): 402 return self 403 404 def write(self, out): 405 # Store as bytes to ensure Unicode characters get output correctly. 406 if not isinstance(out, bytes): 407 out = out.encode('utf-8') 408 409 index = getattr(threading.currentThread(), 'index', 0) 410 if not index and not self.__include_zero: 411 # Unindexed threads aren't buffered. 412 return self._wrapped_write(out) 413 414 self.lock.acquire() 415 try: 416 # Use a dummy array to hold the string so the code can be lockless. 417 # Strings are immutable, requiring to keep a lock for the whole dictionary 418 # otherwise. Using an array is faster than using a dummy object. 419 if not index in self.__output_buffers: 420 obj = self.__output_buffers[index] = [b''] 421 else: 422 obj = self.__output_buffers[index] 423 finally: 424 self.lock.release() 425 426 # Continue lockless. 427 obj[0] += out 428 while True: 429 cr_loc = obj[0].find(b'\r') 430 lf_loc = obj[0].find(b'\n') 431 if cr_loc == lf_loc == -1: 432 break 433 elif cr_loc == -1 or (lf_loc >= 0 and lf_loc < cr_loc): 434 line, remaining = obj[0].split(b'\n', 1) 435 if line: 436 self._wrapped_write(b'%d>%s\n' % (index, line)) 437 elif lf_loc == -1 or (cr_loc >= 0 and cr_loc < lf_loc): 438 line, remaining = obj[0].split(b'\r', 1) 439 if line: 440 self._wrapped_write(b'%d>%s\r' % (index, line)) 441 obj[0] = remaining 442 443 def flush(self): 444 """Flush buffered output.""" 445 orphans = [] 446 self.lock.acquire() 447 try: 448 # Detect threads no longer existing. 449 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) 450 indexes = filter(None, indexes) 451 for index in self.__output_buffers: 452 if not index in indexes: 453 orphans.append((index, self.__output_buffers[index][0])) 454 for orphan in orphans: 455 del self.__output_buffers[orphan[0]] 456 finally: 457 self.lock.release() 458 459 # Don't keep the lock while writing. Will append \n when it shouldn't. 460 for orphan in orphans: 461 if orphan[1]: 462 self._wrapped_write(b'%d>%s\n' % (orphan[0], orphan[1])) 463 return self._wrapped.flush() 464 465 466def MakeFileAutoFlush(fileobj, delay=10): 467 autoflush = getattr(fileobj, 'autoflush', None) 468 if autoflush: 469 autoflush.delay = delay 470 return fileobj 471 return AutoFlush(fileobj, delay) 472 473 474def MakeFileAnnotated(fileobj, include_zero=False): 475 if getattr(fileobj, 'annotated', None): 476 return fileobj 477 return Annotated(fileobj, include_zero) 478 479 480GCLIENT_CHILDREN = [] 481GCLIENT_CHILDREN_LOCK = threading.Lock() 482 483 484class GClientChildren(object): 485 @staticmethod 486 def add(popen_obj): 487 with GCLIENT_CHILDREN_LOCK: 488 GCLIENT_CHILDREN.append(popen_obj) 489 490 @staticmethod 491 def remove(popen_obj): 492 with GCLIENT_CHILDREN_LOCK: 493 GCLIENT_CHILDREN.remove(popen_obj) 494 495 @staticmethod 496 def _attemptToKillChildren(): 497 global GCLIENT_CHILDREN 498 with GCLIENT_CHILDREN_LOCK: 499 zombies = [c for c in GCLIENT_CHILDREN if c.poll() is None] 500 501 for zombie in zombies: 502 try: 503 zombie.kill() 504 except OSError: 505 pass 506 507 with GCLIENT_CHILDREN_LOCK: 508 GCLIENT_CHILDREN = [k for k in GCLIENT_CHILDREN if k.poll() is not None] 509 510 @staticmethod 511 def _areZombies(): 512 with GCLIENT_CHILDREN_LOCK: 513 return bool(GCLIENT_CHILDREN) 514 515 @staticmethod 516 def KillAllRemainingChildren(): 517 GClientChildren._attemptToKillChildren() 518 519 if GClientChildren._areZombies(): 520 time.sleep(0.5) 521 GClientChildren._attemptToKillChildren() 522 523 with GCLIENT_CHILDREN_LOCK: 524 if GCLIENT_CHILDREN: 525 print('Could not kill the following subprocesses:', file=sys.stderr) 526 for zombie in GCLIENT_CHILDREN: 527 print(' ', zombie.pid, file=sys.stderr) 528 529 530def CheckCallAndFilter(args, print_stdout=False, filter_fn=None, 531 show_header=False, always_show_header=False, retry=False, 532 **kwargs): 533 """Runs a command and calls back a filter function if needed. 534 535 Accepts all subprocess2.Popen() parameters plus: 536 print_stdout: If True, the command's stdout is forwarded to stdout. 537 filter_fn: A function taking a single string argument called with each line 538 of the subprocess2's output. Each line has the trailing newline 539 character trimmed. 540 show_header: Whether to display a header before the command output. 541 always_show_header: Show header even when the command produced no output. 542 retry: If the process exits non-zero, sleep for a brief interval and try 543 again, up to RETRY_MAX times. 544 545 stderr is always redirected to stdout. 546 547 Returns the output of the command as a binary string. 548 """ 549 def show_header_if_necessary(needs_header, attempt): 550 """Show the header at most once.""" 551 if not needs_header[0]: 552 return 553 554 needs_header[0] = False 555 # Automatically generated header. We only prepend a newline if 556 # always_show_header is false, since it usually indicates there's an 557 # external progress display, and it's better not to clobber it in that case. 558 header = '' if always_show_header else '\n' 559 header += '________ running \'%s\' in \'%s\'' % ( 560 ' '.join(args), kwargs.get('cwd', '.')) 561 if attempt: 562 header += ' attempt %s / %s' % (attempt + 1, RETRY_MAX + 1) 563 header += '\n' 564 565 if print_stdout: 566 stdout_write = getattr(sys.stdout, 'buffer', sys.stdout).write 567 stdout_write(header.encode()) 568 if filter_fn: 569 filter_fn(header) 570 571 def filter_line(command_output, line_start): 572 """Extract the last line from command output and filter it.""" 573 if not filter_fn or line_start is None: 574 return 575 command_output.seek(line_start) 576 filter_fn(command_output.read().decode('utf-8')) 577 578 # Initialize stdout writer if needed. On Python 3, sys.stdout does not accept 579 # byte inputs and sys.stdout.buffer must be used instead. 580 if print_stdout: 581 sys.stdout.flush() 582 stdout_write = getattr(sys.stdout, 'buffer', sys.stdout).write 583 else: 584 stdout_write = lambda _: None 585 586 sleep_interval = RETRY_INITIAL_SLEEP 587 run_cwd = kwargs.get('cwd', os.getcwd()) 588 for attempt in range(RETRY_MAX + 1): 589 # If our stdout is a terminal, then pass in a psuedo-tty pipe to our 590 # subprocess when filtering its output. This makes the subproc believe 591 # it was launched from a terminal, which will preserve ANSI color codes. 592 os_type = GetMacWinAixOrLinux() 593 if sys.stdout.isatty() and os_type != 'win' and os_type != 'aix': 594 pipe_reader, pipe_writer = os.openpty() 595 else: 596 pipe_reader, pipe_writer = os.pipe() 597 598 kid = subprocess2.Popen( 599 args, bufsize=0, stdout=pipe_writer, stderr=subprocess2.STDOUT, 600 **kwargs) 601 # Close the write end of the pipe once we hand it off to the child proc. 602 os.close(pipe_writer) 603 604 GClientChildren.add(kid) 605 606 # Store the output of the command regardless of the value of print_stdout or 607 # filter_fn. 608 command_output = io.BytesIO() 609 610 # Passed as a list for "by ref" semantics. 611 needs_header = [show_header] 612 if always_show_header: 613 show_header_if_necessary(needs_header, attempt) 614 615 # Also, we need to forward stdout to prevent weird re-ordering of output. 616 # This has to be done on a per byte basis to make sure it is not buffered: 617 # normally buffering is done for each line, but if the process requests 618 # input, no end-of-line character is output after the prompt and it would 619 # not show up. 620 try: 621 line_start = None 622 while True: 623 try: 624 in_byte = os.read(pipe_reader, 1) 625 except (IOError, OSError) as e: 626 if e.errno == errno.EIO: 627 # An errno.EIO means EOF? 628 in_byte = None 629 else: 630 raise e 631 is_newline = in_byte in (b'\n', b'\r') 632 if not in_byte: 633 break 634 635 show_header_if_necessary(needs_header, attempt) 636 637 if is_newline: 638 filter_line(command_output, line_start) 639 line_start = None 640 elif line_start is None: 641 line_start = command_output.tell() 642 643 stdout_write(in_byte) 644 command_output.write(in_byte) 645 646 # Flush the rest of buffered output. 647 sys.stdout.flush() 648 if line_start is not None: 649 filter_line(command_output, line_start) 650 651 os.close(pipe_reader) 652 rv = kid.wait() 653 654 # Don't put this in a 'finally,' since the child may still run if we get 655 # an exception. 656 GClientChildren.remove(kid) 657 658 except KeyboardInterrupt: 659 print('Failed while running "%s"' % ' '.join(args), file=sys.stderr) 660 raise 661 662 if rv == 0: 663 return command_output.getvalue() 664 665 if not retry: 666 break 667 668 print("WARNING: subprocess '%s' in %s failed; will retry after a short " 669 'nap...' % (' '.join('"%s"' % x for x in args), run_cwd)) 670 time.sleep(sleep_interval) 671 sleep_interval *= 2 672 673 raise subprocess2.CalledProcessError( 674 rv, args, kwargs.get('cwd', None), None, None) 675 676 677class GitFilter(object): 678 """A filter_fn implementation for quieting down git output messages. 679 680 Allows a custom function to skip certain lines (predicate), and will throttle 681 the output of percentage completed lines to only output every X seconds. 682 """ 683 PERCENT_RE = re.compile('(.*) ([0-9]{1,3})% .*') 684 685 def __init__(self, time_throttle=0, predicate=None, out_fh=None): 686 """ 687 Args: 688 time_throttle (int): GitFilter will throttle 'noisy' output (such as the 689 XX% complete messages) to only be printed at least |time_throttle| 690 seconds apart. 691 predicate (f(line)): An optional function which is invoked for every line. 692 The line will be skipped if predicate(line) returns False. 693 out_fh: File handle to write output to. 694 """ 695 self.first_line = True 696 self.last_time = 0 697 self.time_throttle = time_throttle 698 self.predicate = predicate 699 self.out_fh = out_fh or sys.stdout 700 self.progress_prefix = None 701 702 def __call__(self, line): 703 # git uses an escape sequence to clear the line; elide it. 704 esc = line.find(chr(0o33)) 705 if esc > -1: 706 line = line[:esc] 707 if self.predicate and not self.predicate(line): 708 return 709 now = time.time() 710 match = self.PERCENT_RE.match(line) 711 if match: 712 if match.group(1) != self.progress_prefix: 713 self.progress_prefix = match.group(1) 714 elif now - self.last_time < self.time_throttle: 715 return 716 self.last_time = now 717 if not self.first_line: 718 self.out_fh.write('[%s] ' % Elapsed()) 719 self.first_line = False 720 print(line, file=self.out_fh) 721 722 723def FindFileUpwards(filename, path=None): 724 """Search upwards from the a directory (default: current) to find a file. 725 726 Returns nearest upper-level directory with the passed in file. 727 """ 728 if not path: 729 path = os.getcwd() 730 path = os.path.realpath(path) 731 while True: 732 file_path = os.path.join(path, filename) 733 if os.path.exists(file_path): 734 return path 735 (new_path, _) = os.path.split(path) 736 if new_path == path: 737 return None 738 path = new_path 739 740 741def GetMacWinAixOrLinux(): 742 """Returns 'mac', 'win', or 'linux', matching the current platform.""" 743 if sys.platform.startswith(('cygwin', 'win')): 744 return 'win' 745 elif sys.platform.startswith('linux'): 746 return 'linux' 747 elif sys.platform == 'darwin': 748 return 'mac' 749 elif sys.platform.startswith('aix'): 750 return 'aix' 751 raise Error('Unknown platform: ' + sys.platform) 752 753 754def GetGClientRootAndEntries(path=None): 755 """Returns the gclient root and the dict of entries.""" 756 config_file = '.gclient_entries' 757 root = FindFileUpwards(config_file, path) 758 if not root: 759 print("Can't find %s" % config_file) 760 return None 761 config_path = os.path.join(root, config_file) 762 env = {} 763 with open(config_path) as config: 764 exec(config.read(), env) 765 config_dir = os.path.dirname(config_path) 766 return config_dir, env['entries'] 767 768 769def lockedmethod(method): 770 """Method decorator that holds self.lock for the duration of the call.""" 771 def inner(self, *args, **kwargs): 772 try: 773 try: 774 self.lock.acquire() 775 except KeyboardInterrupt: 776 print('Was deadlocked', file=sys.stderr) 777 raise 778 return method(self, *args, **kwargs) 779 finally: 780 self.lock.release() 781 return inner 782 783 784class WorkItem(object): 785 """One work item.""" 786 # On cygwin, creating a lock throwing randomly when nearing ~100 locks. 787 # As a workaround, use a single lock. Yep you read it right. Single lock for 788 # all the 100 objects. 789 lock = threading.Lock() 790 791 def __init__(self, name): 792 # A unique string representing this work item. 793 self._name = name 794 self.outbuf = StringIO() 795 self.start = self.finish = None 796 self.resources = [] # List of resources this work item requires. 797 798 def run(self, work_queue): 799 """work_queue is passed as keyword argument so it should be 800 the last parameters of the function when you override it.""" 801 pass 802 803 @property 804 def name(self): 805 return self._name 806 807 808class ExecutionQueue(object): 809 """Runs a set of WorkItem that have interdependencies and were WorkItem are 810 added as they are processed. 811 812 This class manages that all the required dependencies are run 813 before running each one. 814 815 Methods of this class are thread safe. 816 """ 817 def __init__(self, jobs, progress, ignore_requirements, verbose=False): 818 """jobs specifies the number of concurrent tasks to allow. progress is a 819 Progress instance.""" 820 # Set when a thread is done or a new item is enqueued. 821 self.ready_cond = threading.Condition() 822 # Maximum number of concurrent tasks. 823 self.jobs = jobs 824 # List of WorkItem, for gclient, these are Dependency instances. 825 self.queued = [] 826 # List of strings representing each Dependency.name that was run. 827 self.ran = [] 828 # List of items currently running. 829 self.running = [] 830 # Exceptions thrown if any. 831 self.exceptions = queue.Queue() 832 # Progress status 833 self.progress = progress 834 if self.progress: 835 self.progress.update(0) 836 837 self.ignore_requirements = ignore_requirements 838 self.verbose = verbose 839 self.last_join = None 840 self.last_subproc_output = None 841 842 def enqueue(self, d): 843 """Enqueue one Dependency to be executed later once its requirements are 844 satisfied. 845 """ 846 assert isinstance(d, WorkItem) 847 self.ready_cond.acquire() 848 try: 849 self.queued.append(d) 850 total = len(self.queued) + len(self.ran) + len(self.running) 851 if self.jobs == 1: 852 total += 1 853 logging.debug('enqueued(%s)' % d.name) 854 if self.progress: 855 self.progress._total = total 856 self.progress.update(0) 857 self.ready_cond.notifyAll() 858 finally: 859 self.ready_cond.release() 860 861 def out_cb(self, _): 862 self.last_subproc_output = datetime.datetime.now() 863 return True 864 865 @staticmethod 866 def format_task_output(task, comment=''): 867 if comment: 868 comment = ' (%s)' % comment 869 if task.start and task.finish: 870 elapsed = ' (Elapsed: %s)' % ( 871 str(task.finish - task.start).partition('.')[0]) 872 else: 873 elapsed = '' 874 return """ 875%s%s%s 876---------------------------------------- 877%s 878----------------------------------------""" % ( 879 task.name, comment, elapsed, task.outbuf.getvalue().strip()) 880 881 def _is_conflict(self, job): 882 """Checks to see if a job will conflict with another running job.""" 883 for running_job in self.running: 884 for used_resource in running_job.item.resources: 885 logging.debug('Checking resource %s' % used_resource) 886 if used_resource in job.resources: 887 return True 888 return False 889 890 def flush(self, *args, **kwargs): 891 """Runs all enqueued items until all are executed.""" 892 kwargs['work_queue'] = self 893 self.last_subproc_output = self.last_join = datetime.datetime.now() 894 self.ready_cond.acquire() 895 try: 896 while True: 897 # Check for task to run first, then wait. 898 while True: 899 if not self.exceptions.empty(): 900 # Systematically flush the queue when an exception logged. 901 self.queued = [] 902 self._flush_terminated_threads() 903 if (not self.queued and not self.running or 904 self.jobs == len(self.running)): 905 logging.debug('No more worker threads or can\'t queue anything.') 906 break 907 908 # Check for new tasks to start. 909 for i in range(len(self.queued)): 910 # Verify its requirements. 911 if (self.ignore_requirements or 912 not (set(self.queued[i].requirements) - set(self.ran))): 913 if not self._is_conflict(self.queued[i]): 914 # Start one work item: all its requirements are satisfied. 915 self._run_one_task(self.queued.pop(i), args, kwargs) 916 break 917 else: 918 # Couldn't find an item that could run. Break out the outher loop. 919 break 920 921 if not self.queued and not self.running: 922 # We're done. 923 break 924 # We need to poll here otherwise Ctrl-C isn't processed. 925 try: 926 self.ready_cond.wait(10) 927 # If we haven't printed to terminal for a while, but we have received 928 # spew from a suprocess, let the user know we're still progressing. 929 now = datetime.datetime.now() 930 if (now - self.last_join > datetime.timedelta(seconds=60) and 931 self.last_subproc_output > self.last_join): 932 if self.progress: 933 print('') 934 sys.stdout.flush() 935 elapsed = Elapsed() 936 print('[%s] Still working on:' % elapsed) 937 sys.stdout.flush() 938 for task in self.running: 939 print('[%s] %s' % (elapsed, task.item.name)) 940 sys.stdout.flush() 941 except KeyboardInterrupt: 942 # Help debugging by printing some information: 943 print( 944 ('\nAllowed parallel jobs: %d\n# queued: %d\nRan: %s\n' 945 'Running: %d') % (self.jobs, len(self.queued), ', '.join( 946 self.ran), len(self.running)), 947 file=sys.stderr) 948 for i in self.queued: 949 print( 950 '%s (not started): %s' % (i.name, ', '.join(i.requirements)), 951 file=sys.stderr) 952 for i in self.running: 953 print( 954 self.format_task_output(i.item, 'interrupted'), file=sys.stderr) 955 raise 956 # Something happened: self.enqueue() or a thread terminated. Loop again. 957 finally: 958 self.ready_cond.release() 959 960 assert not self.running, 'Now guaranteed to be single-threaded' 961 if not self.exceptions.empty(): 962 if self.progress: 963 print('') 964 # To get back the stack location correctly, the raise a, b, c form must be 965 # used, passing a tuple as the first argument doesn't work. 966 e, task = self.exceptions.get() 967 print(self.format_task_output(task.item, 'ERROR'), file=sys.stderr) 968 reraise(e[0], e[1], e[2]) 969 elif self.progress: 970 self.progress.end() 971 972 def _flush_terminated_threads(self): 973 """Flush threads that have terminated.""" 974 running = self.running 975 self.running = [] 976 for t in running: 977 if t.is_alive(): 978 self.running.append(t) 979 else: 980 t.join() 981 self.last_join = datetime.datetime.now() 982 sys.stdout.flush() 983 if self.verbose: 984 print(self.format_task_output(t.item)) 985 if self.progress: 986 self.progress.update(1, t.item.name) 987 if t.item.name in self.ran: 988 raise Error( 989 'gclient is confused, "%s" is already in "%s"' % ( 990 t.item.name, ', '.join(self.ran))) 991 if not t.item.name in self.ran: 992 self.ran.append(t.item.name) 993 994 def _run_one_task(self, task_item, args, kwargs): 995 if self.jobs > 1: 996 # Start the thread. 997 index = len(self.ran) + len(self.running) + 1 998 new_thread = self._Worker(task_item, index, args, kwargs) 999 self.running.append(new_thread) 1000 new_thread.start() 1001 else: 1002 # Run the 'thread' inside the main thread. Don't try to catch any 1003 # exception. 1004 try: 1005 task_item.start = datetime.datetime.now() 1006 print('[%s] Started.' % Elapsed(task_item.start), file=task_item.outbuf) 1007 task_item.run(*args, **kwargs) 1008 task_item.finish = datetime.datetime.now() 1009 print( 1010 '[%s] Finished.' % Elapsed(task_item.finish), file=task_item.outbuf) 1011 self.ran.append(task_item.name) 1012 if self.verbose: 1013 if self.progress: 1014 print('') 1015 print(self.format_task_output(task_item)) 1016 if self.progress: 1017 self.progress.update(1, ', '.join(t.item.name for t in self.running)) 1018 except KeyboardInterrupt: 1019 print( 1020 self.format_task_output(task_item, 'interrupted'), file=sys.stderr) 1021 raise 1022 except Exception: 1023 print(self.format_task_output(task_item, 'ERROR'), file=sys.stderr) 1024 raise 1025 1026 1027 class _Worker(threading.Thread): 1028 """One thread to execute one WorkItem.""" 1029 def __init__(self, item, index, args, kwargs): 1030 threading.Thread.__init__(self, name=item.name or 'Worker') 1031 logging.info('_Worker(%s) reqs:%s' % (item.name, item.requirements)) 1032 self.item = item 1033 self.index = index 1034 self.args = args 1035 self.kwargs = kwargs 1036 self.daemon = True 1037 1038 def run(self): 1039 """Runs in its own thread.""" 1040 logging.debug('_Worker.run(%s)' % self.item.name) 1041 work_queue = self.kwargs['work_queue'] 1042 try: 1043 self.item.start = datetime.datetime.now() 1044 print('[%s] Started.' % Elapsed(self.item.start), file=self.item.outbuf) 1045 self.item.run(*self.args, **self.kwargs) 1046 self.item.finish = datetime.datetime.now() 1047 print( 1048 '[%s] Finished.' % Elapsed(self.item.finish), file=self.item.outbuf) 1049 except KeyboardInterrupt: 1050 logging.info('Caught KeyboardInterrupt in thread %s', self.item.name) 1051 logging.info(str(sys.exc_info())) 1052 work_queue.exceptions.put((sys.exc_info(), self)) 1053 raise 1054 except Exception: 1055 # Catch exception location. 1056 logging.info('Caught exception in thread %s', self.item.name) 1057 logging.info(str(sys.exc_info())) 1058 work_queue.exceptions.put((sys.exc_info(), self)) 1059 finally: 1060 logging.info('_Worker.run(%s) done', self.item.name) 1061 work_queue.ready_cond.acquire() 1062 try: 1063 work_queue.ready_cond.notifyAll() 1064 finally: 1065 work_queue.ready_cond.release() 1066 1067 1068def GetEditor(git_editor=None): 1069 """Returns the most plausible editor to use. 1070 1071 In order of preference: 1072 - GIT_EDITOR environment variable 1073 - core.editor git configuration variable (if supplied by git-cl) 1074 - VISUAL environment variable 1075 - EDITOR environment variable 1076 - vi (non-Windows) or notepad (Windows) 1077 1078 In the case of git-cl, this matches git's behaviour, except that it does not 1079 include dumb terminal detection. 1080 """ 1081 editor = os.environ.get('GIT_EDITOR') or git_editor 1082 if not editor: 1083 editor = os.environ.get('VISUAL') 1084 if not editor: 1085 editor = os.environ.get('EDITOR') 1086 if not editor: 1087 if sys.platform.startswith('win'): 1088 editor = 'notepad' 1089 else: 1090 editor = 'vi' 1091 return editor 1092 1093 1094def RunEditor(content, git, git_editor=None): 1095 """Opens up the default editor in the system to get the CL description.""" 1096 file_handle, filename = tempfile.mkstemp(text=True, prefix='cl_description') 1097 # Make sure CRLF is handled properly by requiring none. 1098 if '\r' in content: 1099 print( 1100 '!! Please remove \\r from your change description !!', file=sys.stderr) 1101 fileobj = os.fdopen(file_handle, 'w') 1102 # Still remove \r if present. 1103 content = re.sub('\r?\n', '\n', content) 1104 # Some editors complain when the file doesn't end in \n. 1105 if not content.endswith('\n'): 1106 content += '\n' 1107 fileobj.write(content) 1108 fileobj.close() 1109 1110 try: 1111 editor = GetEditor(git_editor=git_editor) 1112 if not editor: 1113 return None 1114 cmd = '%s %s' % (editor, filename) 1115 if sys.platform == 'win32' and os.environ.get('TERM') == 'msys': 1116 # Msysgit requires the usage of 'env' to be present. 1117 cmd = 'env ' + cmd 1118 try: 1119 # shell=True to allow the shell to handle all forms of quotes in 1120 # $EDITOR. 1121 subprocess2.check_call(cmd, shell=True) 1122 except subprocess2.CalledProcessError: 1123 return None 1124 return FileRead(filename) 1125 finally: 1126 os.remove(filename) 1127 1128 1129def UpgradeToHttps(url): 1130 """Upgrades random urls to https://. 1131 1132 Do not touch unknown urls like ssh:// or git://. 1133 Do not touch http:// urls with a port number, 1134 Fixes invalid GAE url. 1135 """ 1136 if not url: 1137 return url 1138 if not re.match(r'[a-z\-]+\://.*', url): 1139 # Make sure it is a valid uri. Otherwise, urlparse() will consider it a 1140 # relative url and will use http:///foo. Note that it defaults to http:// 1141 # for compatibility with naked url like "localhost:8080". 1142 url = 'http://%s' % url 1143 parsed = list(urlparse.urlparse(url)) 1144 # Do not automatically upgrade http to https if a port number is provided. 1145 if parsed[0] == 'http' and not re.match(r'^.+?\:\d+$', parsed[1]): 1146 parsed[0] = 'https' 1147 return urlparse.urlunparse(parsed) 1148 1149 1150def ParseCodereviewSettingsContent(content): 1151 """Process a codereview.settings file properly.""" 1152 lines = (l for l in content.splitlines() if not l.strip().startswith("#")) 1153 try: 1154 keyvals = dict([x.strip() for x in l.split(':', 1)] for l in lines if l) 1155 except ValueError: 1156 raise Error( 1157 'Failed to process settings, please fix. Content:\n\n%s' % content) 1158 def fix_url(key): 1159 if keyvals.get(key): 1160 keyvals[key] = UpgradeToHttps(keyvals[key]) 1161 fix_url('CODE_REVIEW_SERVER') 1162 fix_url('VIEW_VC') 1163 return keyvals 1164 1165 1166def NumLocalCpus(): 1167 """Returns the number of processors. 1168 1169 multiprocessing.cpu_count() is permitted to raise NotImplementedError, and 1170 is known to do this on some Windows systems and OSX 10.6. If we can't get the 1171 CPU count, we will fall back to '1'. 1172 """ 1173 # Surround the entire thing in try/except; no failure here should stop gclient 1174 # from working. 1175 try: 1176 # Use multiprocessing to get CPU count. This may raise 1177 # NotImplementedError. 1178 try: 1179 import multiprocessing 1180 return multiprocessing.cpu_count() 1181 except NotImplementedError: # pylint: disable=bare-except 1182 # (UNIX) Query 'os.sysconf'. 1183 # pylint: disable=no-member 1184 if hasattr(os, 'sysconf') and 'SC_NPROCESSORS_ONLN' in os.sysconf_names: 1185 return int(os.sysconf('SC_NPROCESSORS_ONLN')) 1186 1187 # (Windows) Query 'NUMBER_OF_PROCESSORS' environment variable. 1188 if 'NUMBER_OF_PROCESSORS' in os.environ: 1189 return int(os.environ['NUMBER_OF_PROCESSORS']) 1190 except Exception as e: 1191 logging.exception("Exception raised while probing CPU count: %s", e) 1192 1193 logging.debug('Failed to get CPU count. Defaulting to 1.') 1194 return 1 1195 1196 1197def DefaultDeltaBaseCacheLimit(): 1198 """Return a reasonable default for the git config core.deltaBaseCacheLimit. 1199 1200 The primary constraint is the address space of virtual memory. The cache 1201 size limit is per-thread, and 32-bit systems can hit OOM errors if this 1202 parameter is set too high. 1203 """ 1204 if platform.architecture()[0].startswith('64'): 1205 return '2g' 1206 else: 1207 return '512m' 1208 1209 1210def DefaultIndexPackConfig(url=''): 1211 """Return reasonable default values for configuring git-index-pack. 1212 1213 Experiments suggest that higher values for pack.threads don't improve 1214 performance.""" 1215 cache_limit = DefaultDeltaBaseCacheLimit() 1216 result = ['-c', 'core.deltaBaseCacheLimit=%s' % cache_limit] 1217 if url in THREADED_INDEX_PACK_BLOCKLIST: 1218 result.extend(['-c', 'pack.threads=1']) 1219 return result 1220 1221 1222def FindExecutable(executable): 1223 """This mimics the "which" utility.""" 1224 path_folders = os.environ.get('PATH').split(os.pathsep) 1225 1226 for path_folder in path_folders: 1227 target = os.path.join(path_folder, executable) 1228 # Just in case we have some ~/blah paths. 1229 target = os.path.abspath(os.path.expanduser(target)) 1230 if os.path.isfile(target) and os.access(target, os.X_OK): 1231 return target 1232 if sys.platform.startswith('win'): 1233 for suffix in ('.bat', '.cmd', '.exe'): 1234 alt_target = target + suffix 1235 if os.path.isfile(alt_target) and os.access(alt_target, os.X_OK): 1236 return alt_target 1237 return None 1238 1239 1240def freeze(obj): 1241 """Takes a generic object ``obj``, and returns an immutable version of it. 1242 1243 Supported types: 1244 * dict / OrderedDict -> FrozenDict 1245 * list -> tuple 1246 * set -> frozenset 1247 * any object with a working __hash__ implementation (assumes that hashable 1248 means immutable) 1249 1250 Will raise TypeError if you pass an object which is not hashable. 1251 """ 1252 if isinstance(obj, collections_abc.Mapping): 1253 return FrozenDict((freeze(k), freeze(v)) for k, v in obj.items()) 1254 elif isinstance(obj, (list, tuple)): 1255 return tuple(freeze(i) for i in obj) 1256 elif isinstance(obj, set): 1257 return frozenset(freeze(i) for i in obj) 1258 else: 1259 hash(obj) 1260 return obj 1261 1262 1263class FrozenDict(collections_abc.Mapping): 1264 """An immutable OrderedDict. 1265 1266 Modified From: http://stackoverflow.com/a/2704866 1267 """ 1268 def __init__(self, *args, **kwargs): 1269 self._d = collections.OrderedDict(*args, **kwargs) 1270 1271 # Calculate the hash immediately so that we know all the items are 1272 # hashable too. 1273 self._hash = functools.reduce( 1274 operator.xor, (hash(i) for i in enumerate(self._d.items())), 0) 1275 1276 def __eq__(self, other): 1277 if not isinstance(other, collections_abc.Mapping): 1278 return NotImplemented 1279 if self is other: 1280 return True 1281 if len(self) != len(other): 1282 return False 1283 for k, v in self.items(): 1284 if k not in other or other[k] != v: 1285 return False 1286 return True 1287 1288 def __iter__(self): 1289 return iter(self._d) 1290 1291 def __len__(self): 1292 return len(self._d) 1293 1294 def __getitem__(self, key): 1295 return self._d[key] 1296 1297 def __hash__(self): 1298 return self._hash 1299 1300 def __repr__(self): 1301 return 'FrozenDict(%r)' % (self._d.items(),) 1302