1# Copyright 2017 The Abseil Authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Base functionality for Abseil Python tests.
16
17This module contains base classes and high-level functions for Abseil-style
18tests.
19"""
20
21from __future__ import absolute_import
22from __future__ import division
23from __future__ import print_function
24
25import contextlib
26import difflib
27import errno
28import getpass
29import inspect
30import io
31import itertools
32import json
33import os
34import random
35import re
36import shlex
37import shutil
38import signal
39import stat
40import subprocess
41import sys
42import tempfile
43import textwrap
44import unittest
45
46try:
47  # The faulthandler module isn't always available, and pytype doesn't
48  # understand that we're catching ImportError, so suppress the error.
49  # pytype: disable=import-error
50  import faulthandler
51  # pytype: enable=import-error
52except ImportError:
53  # We use faulthandler if it is available.
54  faulthandler = None
55
56from absl import app
57from absl import flags
58from absl import logging
59from absl._collections_abc import abc
60from absl._enum_module import enum
61from absl.testing import _pretty_print_reporter
62from absl.testing import xml_reporter
63from absl.third_party import unittest3_backport
64import six
65from six.moves import urllib
66from six.moves import xrange  # pylint: disable=redefined-builtin
67
68# Make typing an optional import to avoid it being a required dependency
69# in Python 2. Type checkers will still understand the imports.
70try:
71  # pylint: disable=unused-import
72  import typing
73  from typing import Any, AnyStr, BinaryIO, Callable, ContextManager, IO, Iterator, List, Mapping, MutableMapping, MutableSequence, Optional, Sequence, Text, TextIO, Tuple, Type, Union
74  # pylint: enable=unused-import
75except ImportError:
76  pass
77else:
78  # Use an if-type-checking block to prevent leakage of type-checking only
79  # symbols. We don't want people relying on these at runtime.
80  if typing.TYPE_CHECKING:
81    # Unbounded TypeVar for general usage
82    _T = typing.TypeVar('_T')
83
84    if six.PY2:
85      _OutcomeType = unittest3_backport.case._Outcome
86    else:
87      import unittest.case
88      _OutcomeType = unittest.case._Outcome  # pytype: disable=module-attr
89
90
91if six.PY3:
92  from unittest import mock  # pylint: disable=unused-import
93else:
94  try:
95    import mock  # type: ignore
96  except ImportError:
97    mock = None
98
99
100# Re-export a bunch of unittest functions we support so that people don't
101# have to import unittest to get them
102# pylint: disable=invalid-name
103skip = unittest.skip
104skipIf = unittest.skipIf
105skipUnless = unittest.skipUnless
106SkipTest = unittest.SkipTest
107expectedFailure = unittest.expectedFailure
108# pylint: enable=invalid-name
109
110# End unittest re-exports
111
112FLAGS = flags.FLAGS
113
114_TEXT_OR_BINARY_TYPES = (six.text_type, six.binary_type)
115
116# Suppress surplus entries in AssertionError stack traces.
117__unittest = True  # pylint: disable=invalid-name
118
119
120def expectedFailureIf(condition, reason):  # pylint: disable=invalid-name
121  """Expects the test to fail if the run condition is True.
122
123  Example usage:
124    @expectedFailureIf(sys.version.major == 2, "Not yet working in py2")
125    def test_foo(self):
126      ...
127
128  Args:
129    condition: bool, whether to expect failure or not.
130    reason: Text, the reason to expect failure.
131  Returns:
132    Decorator function
133  """
134  del reason  # Unused
135  if condition:
136    return unittest.expectedFailure
137  else:
138    return lambda f: f
139
140
141class TempFileCleanup(enum.Enum):
142  # Always cleanup temp files when the test completes.
143  ALWAYS = 'always'
144  # Only cleanup temp file if the test passes. This allows easier inspection
145  # of tempfile contents on test failure. absltest.TEST_TMPDIR.value determines
146  # where tempfiles are created.
147  SUCCESS = 'success'
148  # Never cleanup temp files.
149  OFF = 'never'
150
151
152# Many of the methods in this module have names like assertSameElements.
153# This kind of name does not comply with PEP8 style,
154# but it is consistent with the naming of methods in unittest.py.
155# pylint: disable=invalid-name
156
157
158def _get_default_test_random_seed():
159  # type: () -> int
160  random_seed = 301
161  value = os.environ.get('TEST_RANDOM_SEED', '')
162  try:
163    random_seed = int(value)
164  except ValueError:
165    pass
166  return random_seed
167
168
169def get_default_test_srcdir():
170  # type: () -> Text
171  """Returns default test source dir."""
172  return os.environ.get('TEST_SRCDIR', '')
173
174
175def get_default_test_tmpdir():
176  # type: () -> Text
177  """Returns default test temp dir."""
178  tmpdir = os.environ.get('TEST_TMPDIR', '')
179  if not tmpdir:
180    tmpdir = os.path.join(tempfile.gettempdir(), 'absl_testing')
181
182  return tmpdir
183
184
185def _get_default_randomize_ordering_seed():
186  # type: () -> int
187  """Returns default seed to use for randomizing test order.
188
189  This function first checks the --test_randomize_ordering_seed flag, and then
190  the TEST_RANDOMIZE_ORDERING_SEED environment variable. If the first value
191  we find is:
192    * (not set): disable test randomization
193    * 0: disable test randomization
194    * 'random': choose a random seed in [1, 4294967295] for test order
195      randomization
196    * positive integer: use this seed for test order randomization
197
198  (The values used are patterned after
199  https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED).
200
201  In principle, it would be simpler to return None if no override is provided;
202  however, the python random module has no `get_seed()`, only `getstate()`,
203  which returns far more data than we want to pass via an environment variable
204  or flag.
205
206  Returns:
207    A default value for test case randomization (int). 0 means do not randomize.
208
209  Raises:
210    ValueError: Raised when the flag or env value is not one of the options
211        above.
212  """
213  if FLAGS['test_randomize_ordering_seed'].present:
214    randomize = FLAGS.test_randomize_ordering_seed
215  elif 'TEST_RANDOMIZE_ORDERING_SEED' in os.environ:
216    randomize = os.environ['TEST_RANDOMIZE_ORDERING_SEED']
217  else:
218    randomize = ''
219  if not randomize:
220    return 0
221  if randomize == 'random':
222    return random.Random().randint(1, 4294967295)
223  if randomize == '0':
224    return 0
225  try:
226    seed = int(randomize)
227    if seed > 0:
228      return seed
229  except ValueError:
230    pass
231  raise ValueError(
232      'Unknown test randomization seed value: {}'.format(randomize))
233
234
235TEST_SRCDIR = flags.DEFINE_string(
236    'test_srcdir',
237    get_default_test_srcdir(),
238    'Root of directory tree where source files live',
239    allow_override_cpp=True)
240TEST_TMPDIR = flags.DEFINE_string(
241    'test_tmpdir',
242    get_default_test_tmpdir(),
243    'Directory for temporary testing files',
244    allow_override_cpp=True)
245
246flags.DEFINE_integer(
247    'test_random_seed',
248    _get_default_test_random_seed(),
249    'Random seed for testing. Some test frameworks may '
250    'change the default value of this flag between runs, so '
251    'it is not appropriate for seeding probabilistic tests.',
252    allow_override_cpp=True)
253flags.DEFINE_string(
254    'test_randomize_ordering_seed',
255    '',
256    'If positive, use this as a seed to randomize the '
257    'execution order for test cases. If "random", pick a '
258    'random seed to use. If 0 or not set, do not randomize '
259    'test case execution order. This flag also overrides '
260    'the TEST_RANDOMIZE_ORDERING_SEED environment variable.',
261    allow_override_cpp=True)
262flags.DEFINE_string('xml_output_file', '', 'File to store XML test results')
263
264
265# We might need to monkey-patch TestResult so that it stops considering an
266# unexpected pass as a as a "successful result".  For details, see
267# http://bugs.python.org/issue20165
268def _monkey_patch_test_result_for_unexpected_passes():
269  # type: () -> None
270  """Workaround for <http://bugs.python.org/issue20165>."""
271
272  def wasSuccessful(self):
273    # type: () -> bool
274    """Tells whether or not this result was a success.
275
276    Any unexpected pass is to be counted as a non-success.
277
278    Args:
279      self: The TestResult instance.
280
281    Returns:
282      Whether or not this result was a success.
283    """
284    return (len(self.failures) == len(self.errors) ==
285            len(self.unexpectedSuccesses) == 0)
286
287  test_result = unittest.TestResult()
288  test_result.addUnexpectedSuccess(unittest.FunctionTestCase(lambda: None))
289  if test_result.wasSuccessful():  # The bug is present.
290    unittest.TestResult.wasSuccessful = wasSuccessful
291    if test_result.wasSuccessful():  # Warn the user if our hot-fix failed.
292      sys.stderr.write('unittest.result.TestResult monkey patch to report'
293                       ' unexpected passes as failures did not work.\n')
294
295
296_monkey_patch_test_result_for_unexpected_passes()
297
298
299def _open(filepath, mode, _open_func=open):
300  # type: (Text, Text, Callable[..., IO]) -> IO
301  """Opens a file.
302
303  Like open(), but compatible with Python 2 and 3. Also ensures that we can open
304  real files even if tests stub out open().
305
306  Args:
307    filepath: A filepath.
308    mode: A mode.
309    _open_func: A built-in open() function.
310
311  Returns:
312    The opened file object.
313  """
314  if six.PY2:
315    return _open_func(filepath, mode)
316  else:
317    return _open_func(filepath, mode, encoding='utf-8')
318
319
320class _TempDir(object):
321  """Represents a temporary directory for tests.
322
323  Creation of this class is internal. Using its public methods is OK.
324
325  This class implements the `os.PathLike` interface (specifically,
326  `os.PathLike[str]`). This means, in Python 3, it can be directly passed
327  to e.g. `os.path.join()`.
328  """
329
330  def __init__(self, path):
331    # type: (Text) -> None
332    """Module-private: do not instantiate outside module."""
333    self._path = path
334
335  @property
336  def full_path(self):
337    # type: () -> Text
338    """Returns the path, as a string, for the directory.
339
340    TIP: Instead of e.g. `os.path.join(temp_dir.full_path)`, you can simply
341    do `os.path.join(temp_dir)` because `__fspath__()` is implemented.
342    """
343    return self._path
344
345  def __fspath__(self):
346    # type: () -> Text
347    """See os.PathLike."""
348    return self.full_path
349
350  def create_file(self, file_path=None, content=None, mode='w', encoding='utf8',
351                  errors='strict'):
352    # type: (Optional[Text], Optional[AnyStr], Text, Text, Text) -> _TempFile
353    """Create a file in the directory.
354
355    NOTE: If the file already exists, it will be made writable and overwritten.
356
357    Args:
358      file_path: Optional file path for the temp file. If not given, a unique
359        file name will be generated and used. Slashes are allowed in the name;
360        any missing intermediate directories will be created. NOTE: This path
361        is the path that will be cleaned up, including any directories in the
362        path, e.g., 'foo/bar/baz.txt' will `rm -r foo`
363      content: Optional string or bytes to initially write to the file. If not
364        specified, then an empty file is created.
365      mode: Mode string to use when writing content. Only used if `content` is
366        non-empty.
367      encoding: Encoding to use when writing string content. Only used if
368        `content` is text.
369      errors: How to handle text to bytes encoding errors. Only used if
370        `content` is text.
371
372    Returns:
373      A _TempFile representing the created file.
374    """
375    tf, _ = _TempFile._create(self._path, file_path, content, mode, encoding,
376                              errors)
377    return tf
378
379  def mkdir(self, dir_path=None):
380    # type: (Optional[Text]) -> _TempDir
381    """Create a directory in the directory.
382
383    Args:
384      dir_path: Optional path to the directory to create. If not given,
385        a unique name will be generated and used.
386
387    Returns:
388      A _TempDir representing the created directory.
389    """
390    if dir_path:
391      path = os.path.join(self._path, dir_path)
392    else:
393      path = tempfile.mkdtemp(dir=self._path)
394
395    # Note: there's no need to clear the directory since the containing
396    # dir was cleared by the tempdir() function.
397    _makedirs_exist_ok(path)
398    return _TempDir(path)
399
400
401class _TempFile(object):
402  """Represents a tempfile for tests.
403
404  Creation of this class is internal. Using its public methods is OK.
405
406  This class implements the `os.PathLike` interface (specifically,
407  `os.PathLike[str]`). This means, in Python 3, it can be directly passed
408  to e.g. `os.path.join()`.
409  """
410
411  def __init__(self, path):
412    # type: (Text) -> None
413    """Private: use _create instead."""
414    self._path = path
415
416  # pylint: disable=line-too-long
417  @classmethod
418  def _create(cls, base_path, file_path, content, mode, encoding, errors):
419    # type: (Text, Optional[Text], AnyStr, Text, Text, Text) -> Tuple[_TempFile, Text]
420    # pylint: enable=line-too-long
421    """Module-private: create a tempfile instance."""
422    if file_path:
423      cleanup_path = os.path.join(base_path, _get_first_part(file_path))
424      path = os.path.join(base_path, file_path)
425      _makedirs_exist_ok(os.path.dirname(path))
426      # The file may already exist, in which case, ensure it's writable so that
427      # it can be truncated.
428      if os.path.exists(path) and not os.access(path, os.W_OK):
429        stat_info = os.stat(path)
430        os.chmod(path, stat_info.st_mode | stat.S_IWUSR)
431    else:
432      _makedirs_exist_ok(base_path)
433      fd, path = tempfile.mkstemp(dir=str(base_path))
434      os.close(fd)
435      cleanup_path = path
436
437    tf = cls(path)
438
439    if content:
440      if isinstance(content, six.text_type):
441        tf.write_text(content, mode=mode, encoding=encoding, errors=errors)
442      else:
443        tf.write_bytes(content, mode)
444
445    else:
446      tf.write_bytes(b'')
447
448    return tf, cleanup_path
449
450  @property
451  def full_path(self):
452    # type: () -> Text
453    """Returns the path, as a string, for the file.
454
455    TIP: Instead of e.g. `os.path.join(temp_file.full_path)`, you can simply
456    do `os.path.join(temp_file)` because `__fspath__()` is implemented.
457    """
458    return self._path
459
460  def __fspath__(self):
461    # type: () -> Text
462    """See os.PathLike."""
463    return self.full_path
464
465  def read_text(self, encoding='utf8', errors='strict'):
466    # type: (Text, Text) -> Text
467    """Return the contents of the file as text."""
468    with self.open_text(encoding=encoding, errors=errors) as fp:
469      return fp.read()
470
471  def read_bytes(self):
472    # type: () -> bytes
473    """Return the content of the file as bytes."""
474    with self.open_bytes() as fp:
475      return fp.read()
476
477  def write_text(self, text, mode='w', encoding='utf8', errors='strict'):
478    # type: (Text, Text, Text, Text) -> None
479    """Write text to the file.
480
481    Args:
482      text: Text to write. In Python 2, it can be bytes, which will be
483        decoded using the `encoding` arg (this is as an aid for code that
484        is 2 and 3 compatible).
485      mode: The mode to open the file for writing.
486      encoding: The encoding to use when writing the text to the file.
487      errors: The error handling strategy to use when converting text to bytes.
488    """
489    if six.PY2 and isinstance(text, bytes):
490      text = text.decode(encoding, errors)
491    with self.open_text(mode, encoding=encoding, errors=errors) as fp:
492      fp.write(text)
493
494  def write_bytes(self, data, mode='wb'):
495    # type: (bytes, Text) -> None
496    """Write bytes to the file.
497
498    Args:
499      data: bytes to write.
500      mode: Mode to open the file for writing. The "b" flag is implicit if
501        not already present. It must not have the "t" flag.
502    """
503    with self.open_bytes(mode) as fp:
504      fp.write(data)
505
506  def open_text(self, mode='rt', encoding='utf8', errors='strict'):
507    # type: (Text, Text, Text) -> ContextManager[TextIO]
508    """Return a context manager for opening the file in text mode.
509
510    Args:
511      mode: The mode to open the file in. The "t" flag is implicit if not
512        already present. It must not have the "b" flag.
513      encoding: The encoding to use when opening the file.
514      errors: How to handle decoding errors.
515
516    Returns:
517      Context manager that yields an open file.
518
519    Raises:
520      ValueError: if invalid inputs are provided.
521    """
522    if 'b' in mode:
523      raise ValueError('Invalid mode {!r}: "b" flag not allowed when opening '
524                       'file in text mode'.format(mode))
525    if 't' not in mode:
526      mode += 't'
527    cm = self._open(mode, encoding, errors)
528    return cm
529
530  def open_bytes(self, mode='rb'):
531    # type: (Text) -> ContextManager[BinaryIO]
532    """Return a context manager for opening the file in binary mode.
533
534    Args:
535      mode: The mode to open the file in. The "b" mode is implicit if not
536        already present. It must not have the "t" flag.
537
538    Returns:
539      Context manager that yields an open file.
540
541    Raises:
542      ValueError: if invalid inputs are provided.
543    """
544    if 't' in mode:
545      raise ValueError('Invalid mode {!r}: "t" flag not allowed when opening '
546                       'file in binary mode'.format(mode))
547    if 'b' not in mode:
548      mode += 'b'
549    cm = self._open(mode, encoding=None, errors=None)
550    return cm
551
552  # TODO(b/123775699): Once pytype supports typing.Literal, use overload and
553  # Literal to express more precise return types. The contained type is
554  # currently `Any` to avoid [bad-return-type] errors in the open_* methods.
555  @contextlib.contextmanager
556  def _open(self, mode, encoding='utf8', errors='strict'):
557    # type: (Text, Text, Text) -> Iterator[Any]
558    with io.open(
559        self.full_path, mode=mode, encoding=encoding, errors=errors) as fp:
560      yield fp
561
562
563class _method(object):
564  """A decorator that supports both instance and classmethod invocations.
565
566  Using similar semantics to the @property builtin, this decorator can augment
567  an instance method to support conditional logic when invoked on a class
568  object. This breaks support for invoking an instance method via the class
569  (e.g. Cls.method(self, ...)) but is still situationally useful.
570  """
571
572  def __init__(self, finstancemethod):
573    # type: (Callable[..., Any]) -> None
574    self._finstancemethod = finstancemethod
575    self._fclassmethod = None
576
577  def classmethod(self, fclassmethod):
578    # type: (Callable[..., Any]) -> _method
579    self._fclassmethod = classmethod(fclassmethod)
580    return self
581
582  def __doc__(self):
583    # type: () -> str
584    if getattr(self._finstancemethod, '__doc__'):
585      return self._finstancemethod.__doc__
586    elif getattr(self._fclassmethod, '__doc__'):
587      return self._fclassmethod.__doc__
588    return ''
589
590  def __get__(self, obj, type_):
591    # type: (Optional[Any], Optional[Type[Any]]) -> Callable[..., Any]
592    func = self._fclassmethod if obj is None else self._finstancemethod
593    return func.__get__(obj, type_)  # pytype: disable=attribute-error
594
595
596class TestCase(unittest3_backport.TestCase):
597  """Extension of unittest.TestCase providing more power."""
598
599  # When to cleanup files/directories created by our `create_tempfile()` and
600  # `create_tempdir()` methods after each test case completes. This does *not*
601  # affect e.g., files created outside of those methods, e.g., using the stdlib
602  # tempfile module. This can be overridden at the class level, instance level,
603  # or with the `cleanup` arg of `create_tempfile()` and `create_tempdir()`. See
604  # `TempFileCleanup` for details on the different values.
605  # TODO(b/70517332): Remove the type comment and the disable once pytype has
606  # better support for enums.
607  tempfile_cleanup = TempFileCleanup.ALWAYS  # type: TempFileCleanup  # pytype: disable=annotation-type-mismatch
608
609  maxDiff = 80 * 20
610  longMessage = True
611
612  # Exit stacks for per-test and per-class scopes.
613  _exit_stack = None
614  _cls_exit_stack = None
615
616  def __init__(self, *args, **kwargs):
617    super(TestCase, self).__init__(*args, **kwargs)
618    # This is to work around missing type stubs in unittest.pyi
619    self._outcome = getattr(self, '_outcome')  # type: Optional[_OutcomeType]
620
621  def setUp(self):
622    super(TestCase, self).setUp()
623    # NOTE: Only Python 3 contextlib has ExitStack
624    if hasattr(contextlib, 'ExitStack'):
625      self._exit_stack = contextlib.ExitStack()
626      self.addCleanup(self._exit_stack.close)
627
628  @classmethod
629  def setUpClass(cls):
630    super(TestCase, cls).setUpClass()
631    # NOTE: Only Python 3 contextlib has ExitStack and only Python 3.8+ has
632    # addClassCleanup.
633    if hasattr(contextlib, 'ExitStack') and hasattr(cls, 'addClassCleanup'):
634      cls._cls_exit_stack = contextlib.ExitStack()
635      cls.addClassCleanup(cls._cls_exit_stack.close)
636
637  def create_tempdir(self, name=None, cleanup=None):
638    # type: (Optional[Text], Optional[TempFileCleanup]) -> _TempDir
639    """Create a temporary directory specific to the test.
640
641    NOTE: The directory and its contents will be recursively cleared before
642    creation. This ensures that there is no pre-existing state.
643
644    This creates a named directory on disk that is isolated to this test, and
645    will be properly cleaned up by the test. This avoids several pitfalls of
646    creating temporary directories for test purposes, as well as makes it easier
647    to setup directories and verify their contents. For example:
648
649        def test_foo(self):
650          out_dir = self.create_tempdir()
651          out_log = out_dir.create_file('output.log')
652          expected_outputs = [
653              os.path.join(out_dir, 'data-0.txt'),
654              os.path.join(out_dir, 'data-1.txt'),
655          ]
656          code_under_test(out_dir)
657          self.assertTrue(os.path.exists(expected_paths[0]))
658          self.assertTrue(os.path.exists(expected_paths[1]))
659          self.assertEqual('foo', out_log.read_text())
660
661    See also: `create_tempfile()` for creating temporary files.
662
663    Args:
664      name: Optional name of the directory. If not given, a unique
665        name will be generated and used.
666      cleanup: Optional cleanup policy on when/if to remove the directory (and
667        all its contents) at the end of the test. If None, then uses
668        `self.tempfile_cleanup`.
669
670    Returns:
671      A _TempDir representing the created directory; see _TempDir class docs
672      for usage.
673    """
674    test_path = self._get_tempdir_path_test()
675
676    if name:
677      path = os.path.join(test_path, name)
678      cleanup_path = os.path.join(test_path, _get_first_part(name))
679    else:
680      _makedirs_exist_ok(test_path)
681      path = tempfile.mkdtemp(dir=test_path)
682      cleanup_path = path
683
684    _rmtree_ignore_errors(cleanup_path)
685    _makedirs_exist_ok(path)
686
687    self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
688
689    return _TempDir(path)
690
691  # pylint: disable=line-too-long
692  def create_tempfile(self, file_path=None, content=None, mode='w',
693                      encoding='utf8', errors='strict', cleanup=None):
694    # type: (Optional[Text], Optional[AnyStr], Text, Text, Text, Optional[TempFileCleanup]) -> _TempFile
695    # pylint: enable=line-too-long
696    """Create a temporary file specific to the test.
697
698    This creates a named file on disk that is isolated to this test, and will
699    be properly cleaned up by the test. This avoids several pitfalls of
700    creating temporary files for test purposes, as well as makes it easier
701    to setup files, their data, read them back, and inspect them when
702    a test fails. For example:
703
704        def test_foo(self):
705          output = self.create_tempfile()
706          code_under_test(output)
707          self.assertGreater(os.path.getsize(output), 0)
708          self.assertEqual('foo', output.read_text())
709
710    NOTE: This will zero-out the file. This ensures there is no pre-existing
711    state.
712    NOTE: If the file already exists, it will be made writable and overwritten.
713
714    See also: `create_tempdir()` for creating temporary directories, and
715    `_TempDir.create_file` for creating files within a temporary directory.
716
717    Args:
718      file_path: Optional file path for the temp file. If not given, a unique
719        file name will be generated and used. Slashes are allowed in the name;
720        any missing intermediate directories will be created. NOTE: This path is
721        the path that will be cleaned up, including any directories in the path,
722        e.g., 'foo/bar/baz.txt' will `rm -r foo`.
723      content: Optional string or
724        bytes to initially write to the file. If not
725        specified, then an empty file is created.
726      mode: Mode string to use when writing content. Only used if `content` is
727        non-empty.
728      encoding: Encoding to use when writing string content. Only used if
729        `content` is text.
730      errors: How to handle text to bytes encoding errors. Only used if
731        `content` is text.
732      cleanup: Optional cleanup policy on when/if to remove the directory (and
733        all its contents) at the end of the test. If None, then uses
734        `self.tempfile_cleanup`.
735
736    Returns:
737      A _TempFile representing the created file; see _TempFile class docs for
738      usage.
739    """
740    test_path = self._get_tempdir_path_test()
741    tf, cleanup_path = _TempFile._create(test_path, file_path, content=content,
742                                         mode=mode, encoding=encoding,
743                                         errors=errors)
744    self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
745    return tf
746
747  @_method
748  def enter_context(self, manager):
749    # type: (ContextManager[_T]) -> _T
750    """Returns the CM's value after registering it with the exit stack.
751
752    Entering a context pushes it onto a stack of contexts. When `enter_context`
753    is called on the test instance (e.g. `self.enter_context`), the context is
754    exited after the test case's tearDown call. When called on the test class
755    (e.g. `TestCase.enter_context`), the context is exited after the test
756    class's tearDownClass call.
757
758    Contexts are are exited in the reverse order of entering. They will always
759    be exited, regardless of test failure/success.
760
761    This is useful to eliminate per-test boilerplate when context managers
762    are used. For example, instead of decorating every test with `@mock.patch`,
763    simply do `self.foo = self.enter_context(mock.patch(...))' in `setUp()`.
764
765    NOTE: The context managers will always be exited without any error
766    information. This is an unfortunate implementation detail due to some
767    internals of how unittest runs tests.
768
769    Args:
770      manager: The context manager to enter.
771    """
772    if not self._exit_stack:
773      raise AssertionError(
774          'self._exit_stack is not set: enter_context is Py3-only; also make '
775          'sure that AbslTest.setUp() is called.')
776    return self._exit_stack.enter_context(manager)
777
778  @enter_context.classmethod
779  def enter_context(cls, manager):  # pylint: disable=no-self-argument
780    # type: (ContextManager[_T]) -> _T
781    if not cls._cls_exit_stack:
782      raise AssertionError(
783          'cls._cls_exit_stack is not set: cls.enter_context requires '
784          'Python 3.8+; also make sure that AbslTest.setUpClass() is called.')
785    return cls._cls_exit_stack.enter_context(manager)
786
787  @classmethod
788  def _get_tempdir_path_cls(cls):
789    # type: () -> Text
790    return os.path.join(TEST_TMPDIR.value, _get_qualname(cls))
791
792  def _get_tempdir_path_test(self):
793    # type: () -> Text
794    return os.path.join(self._get_tempdir_path_cls(), self._testMethodName)
795
796  def _get_tempfile_cleanup(self, override):
797    # type: (Optional[TempFileCleanup]) -> TempFileCleanup
798    if override is not None:
799      return override
800    return self.tempfile_cleanup
801
802  def _maybe_add_temp_path_cleanup(self, path, cleanup):
803    # type: (Text, Optional[TempFileCleanup]) -> None
804    cleanup = self._get_tempfile_cleanup(cleanup)
805    if cleanup == TempFileCleanup.OFF:
806      return
807    elif cleanup == TempFileCleanup.ALWAYS:
808      self.addCleanup(_rmtree_ignore_errors, path)
809    elif cleanup == TempFileCleanup.SUCCESS:
810      self._internal_cleanup_on_success(_rmtree_ignore_errors, path)
811    else:
812      raise AssertionError('Unexpected cleanup value: {}'.format(cleanup))
813
814  def _internal_cleanup_on_success(self, function, *args, **kwargs):
815    # type: (Callable[..., object], Any, Any) -> None
816    def _call_cleaner_on_success(*args, **kwargs):
817      if not self._ran_and_passed():
818        return
819      function(*args, **kwargs)
820    self.addCleanup(_call_cleaner_on_success, *args, **kwargs)
821
822  def _ran_and_passed(self):
823    # type: () -> bool
824    outcome = self._outcome
825    result = self.defaultTestResult()
826    self._feedErrorsToResult(result, outcome.errors)  # pytype: disable=attribute-error
827    return result.wasSuccessful()
828
829  def shortDescription(self):
830    # type: () -> Text
831    """Formats both the test method name and the first line of its docstring.
832
833    If no docstring is given, only returns the method name.
834
835    This method overrides unittest.TestCase.shortDescription(), which
836    only returns the first line of the docstring, obscuring the name
837    of the test upon failure.
838
839    Returns:
840      desc: A short description of a test method.
841    """
842    desc = self.id()
843
844    # Omit the main name so that test name can be directly copy/pasted to
845    # the command line.
846    if desc.startswith('__main__.'):
847      desc = desc[len('__main__.'):]
848
849    # NOTE: super() is used here instead of directly invoking
850    # unittest.TestCase.shortDescription(self), because of the
851    # following line that occurs later on:
852    #       unittest.TestCase = TestCase
853    # Because of this, direct invocation of what we think is the
854    # superclass will actually cause infinite recursion.
855    doc_first_line = super(TestCase, self).shortDescription()
856    if doc_first_line is not None:
857      desc = '\n'.join((desc, doc_first_line))
858    return desc
859
860  def assertStartsWith(self, actual, expected_start, msg=None):
861    """Asserts that actual.startswith(expected_start) is True.
862
863    Args:
864      actual: str
865      expected_start: str
866      msg: Optional message to report on failure.
867    """
868    if not actual.startswith(expected_start):
869      self.fail('%r does not start with %r' % (actual, expected_start), msg)
870
871  def assertNotStartsWith(self, actual, unexpected_start, msg=None):
872    """Asserts that actual.startswith(unexpected_start) is False.
873
874    Args:
875      actual: str
876      unexpected_start: str
877      msg: Optional message to report on failure.
878    """
879    if actual.startswith(unexpected_start):
880      self.fail('%r does start with %r' % (actual, unexpected_start), msg)
881
882  def assertEndsWith(self, actual, expected_end, msg=None):
883    """Asserts that actual.endswith(expected_end) is True.
884
885    Args:
886      actual: str
887      expected_end: str
888      msg: Optional message to report on failure.
889    """
890    if not actual.endswith(expected_end):
891      self.fail('%r does not end with %r' % (actual, expected_end), msg)
892
893  def assertNotEndsWith(self, actual, unexpected_end, msg=None):
894    """Asserts that actual.endswith(unexpected_end) is False.
895
896    Args:
897      actual: str
898      unexpected_end: str
899      msg: Optional message to report on failure.
900    """
901    if actual.endswith(unexpected_end):
902      self.fail('%r does end with %r' % (actual, unexpected_end), msg)
903
904  def assertSequenceStartsWith(self, prefix, whole, msg=None):
905    """An equality assertion for the beginning of ordered sequences.
906
907    If prefix is an empty sequence, it will raise an error unless whole is also
908    an empty sequence.
909
910    If prefix is not a sequence, it will raise an error if the first element of
911    whole does not match.
912
913    Args:
914      prefix: A sequence expected at the beginning of the whole parameter.
915      whole: The sequence in which to look for prefix.
916      msg: Optional message to report on failure.
917    """
918    try:
919      prefix_len = len(prefix)
920    except (TypeError, NotImplementedError):
921      prefix = [prefix]
922      prefix_len = 1
923
924    try:
925      whole_len = len(whole)
926    except (TypeError, NotImplementedError):
927      self.fail('For whole: len(%s) is not supported, it appears to be type: '
928                '%s' % (whole, type(whole)), msg)
929
930    assert prefix_len <= whole_len, self._formatMessage(
931        msg,
932        'Prefix length (%d) is longer than whole length (%d).' %
933        (prefix_len, whole_len)
934    )
935
936    if not prefix_len and whole_len:
937      self.fail('Prefix length is 0 but whole length is %d: %s' %
938                (len(whole), whole), msg)
939
940    try:
941      self.assertSequenceEqual(prefix, whole[:prefix_len], msg)
942    except AssertionError:
943      self.fail('prefix: %s not found at start of whole: %s.' %
944                (prefix, whole), msg)
945
946  def assertEmpty(self, container, msg=None):
947    """Asserts that an object has zero length.
948
949    Args:
950      container: Anything that implements the collections.abc.Sized interface.
951      msg: Optional message to report on failure.
952    """
953    if not isinstance(container, abc.Sized):
954      self.fail('Expected a Sized object, got: '
955                '{!r}'.format(type(container).__name__), msg)
956
957    # explicitly check the length since some Sized objects (e.g. numpy.ndarray)
958    # have strange __nonzero__/__bool__ behavior.
959    if len(container):  # pylint: disable=g-explicit-length-test
960      self.fail('{!r} has length of {}.'.format(container, len(container)), msg)
961
962  def assertNotEmpty(self, container, msg=None):
963    """Asserts that an object has non-zero length.
964
965    Args:
966      container: Anything that implements the collections.abc.Sized interface.
967      msg: Optional message to report on failure.
968    """
969    if not isinstance(container, abc.Sized):
970      self.fail('Expected a Sized object, got: '
971                '{!r}'.format(type(container).__name__), msg)
972
973    # explicitly check the length since some Sized objects (e.g. numpy.ndarray)
974    # have strange __nonzero__/__bool__ behavior.
975    if not len(container):  # pylint: disable=g-explicit-length-test
976      self.fail('{!r} has length of 0.'.format(container), msg)
977
978  def assertLen(self, container, expected_len, msg=None):
979    """Asserts that an object has the expected length.
980
981    Args:
982      container: Anything that implements the collections.abc.Sized interface.
983      expected_len: The expected length of the container.
984      msg: Optional message to report on failure.
985    """
986    if not isinstance(container, abc.Sized):
987      self.fail('Expected a Sized object, got: '
988                '{!r}'.format(type(container).__name__), msg)
989    if len(container) != expected_len:
990      container_repr = unittest.util.safe_repr(container)  # pytype: disable=module-attr
991      self.fail('{} has length of {}, expected {}.'.format(
992          container_repr, len(container), expected_len), msg)
993
994  def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None,
995                                msg=None, delta=None):
996    """An approximate equality assertion for ordered sequences.
997
998    Fail if the two sequences are unequal as determined by their value
999    differences rounded to the given number of decimal places (default 7) and
1000    comparing to zero, or by comparing that the difference between each value
1001    in the two sequences is more than the given delta.
1002
1003    Note that decimal places (from zero) are usually not the same as significant
1004    digits (measured from the most significant digit).
1005
1006    If the two sequences compare equal then they will automatically compare
1007    almost equal.
1008
1009    Args:
1010      expected_seq: A sequence containing elements we are expecting.
1011      actual_seq: The sequence that we are testing.
1012      places: The number of decimal places to compare.
1013      msg: The message to be printed if the test fails.
1014      delta: The OK difference between compared values.
1015    """
1016    if len(expected_seq) != len(actual_seq):
1017      self.fail('Sequence size mismatch: {} vs {}'.format(
1018          len(expected_seq), len(actual_seq)), msg)
1019
1020    err_list = []
1021    for idx, (exp_elem, act_elem) in enumerate(zip(expected_seq, actual_seq)):
1022      try:
1023        # assertAlmostEqual should be called with at most one of `places` and
1024        # `delta`. However, it's okay for assertSequenceAlmostEqual to pass
1025        # both because we want the latter to fail if the former does.
1026        # pytype: disable=wrong-keyword-args
1027        self.assertAlmostEqual(exp_elem, act_elem, places=places, msg=msg,
1028                               delta=delta)
1029        # pytype: enable=wrong-keyword-args
1030      except self.failureException as err:
1031        err_list.append('At index {}: {}'.format(idx, err))
1032
1033    if err_list:
1034      if len(err_list) > 30:
1035        err_list = err_list[:30] + ['...']
1036      msg = self._formatMessage(msg, '\n'.join(err_list))
1037      self.fail(msg)
1038
1039  def assertContainsSubset(self, expected_subset, actual_set, msg=None):
1040    """Checks whether actual iterable is a superset of expected iterable."""
1041    missing = set(expected_subset) - set(actual_set)
1042    if not missing:
1043      return
1044
1045    self.fail('Missing elements %s\nExpected: %s\nActual: %s' % (
1046        missing, expected_subset, actual_set), msg)
1047
1048  def assertNoCommonElements(self, expected_seq, actual_seq, msg=None):
1049    """Checks whether actual iterable and expected iterable are disjoint."""
1050    common = set(expected_seq) & set(actual_seq)
1051    if not common:
1052      return
1053
1054    self.fail('Common elements %s\nExpected: %s\nActual: %s' % (
1055        common, expected_seq, actual_seq), msg)
1056
1057  def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
1058    """Deprecated, please use assertCountEqual instead.
1059
1060    This is equivalent to assertCountEqual in Python 3. An implementation of
1061    assertCountEqual is also provided by absltest.TestCase for Python 2.
1062
1063    Args:
1064      expected_seq: A sequence containing elements we are expecting.
1065      actual_seq: The sequence that we are testing.
1066      msg: The message to be printed if the test fails.
1067    """
1068    if six.PY3:
1069      # The assertItemsEqual method was renamed assertCountEqual in Python 3.2
1070      super(TestCase, self).assertCountEqual(expected_seq, actual_seq, msg)
1071    else:
1072      super(TestCase, self).assertItemsEqual(expected_seq, actual_seq, msg)
1073
1074  # Only override assertCountEqual in Python 2 to avoid unnecessary calls.
1075  if six.PY2:
1076
1077    def assertCountEqual(self, expected_seq, actual_seq, msg=None):
1078      """Tests two sequences have the same elements regardless of order.
1079
1080      It tests that the first sequence contains the same elements as the
1081      second, regardless of their order. When they don't, an error message
1082      listing the differences between the sequences will be generated.
1083
1084      Duplicate elements are not ignored when comparing first and second.
1085      It verifies whether each element has the same count in both sequences.
1086      Equivalent to:
1087
1088          self.assertEqual(Counter(list(expected_seq)),
1089                           Counter(list(actual_seq)))
1090
1091      but works with sequences of unhashable objects as well.
1092
1093      Example:
1094          - [0, 1, 1] and [1, 0, 1] compare equal.
1095          - [0, 0, 1] and [0, 1] compare unequal.
1096
1097      Args:
1098        expected_seq: A sequence containing elements we are expecting.
1099        actual_seq: The sequence that we are testing.
1100        msg: The message to be printed if the test fails.
1101
1102      """
1103      # Only call super's method to avoid potential infinite recursions.
1104      super(TestCase, self).assertItemsEqual(expected_seq, actual_seq, msg)
1105
1106  def assertSameElements(self, expected_seq, actual_seq, msg=None):
1107    """Asserts that two sequences have the same elements (in any order).
1108
1109    This method, unlike assertCountEqual, doesn't care about any
1110    duplicates in the expected and actual sequences.
1111
1112      >> assertSameElements([1, 1, 1, 0, 0, 0], [0, 1])
1113      # Doesn't raise an AssertionError
1114
1115    If possible, you should use assertCountEqual instead of
1116    assertSameElements.
1117
1118    Args:
1119      expected_seq: A sequence containing elements we are expecting.
1120      actual_seq: The sequence that we are testing.
1121      msg: The message to be printed if the test fails.
1122    """
1123    # `unittest2.TestCase` used to have assertSameElements, but it was
1124    # removed in favor of assertItemsEqual. As there's a unit test
1125    # that explicitly checks this behavior, I am leaving this method
1126    # alone.
1127    # Fail on strings: empirically, passing strings to this test method
1128    # is almost always a bug. If comparing the character sets of two strings
1129    # is desired, cast the inputs to sets or lists explicitly.
1130    if (isinstance(expected_seq, _TEXT_OR_BINARY_TYPES) or
1131        isinstance(actual_seq, _TEXT_OR_BINARY_TYPES)):
1132      self.fail('Passing string/bytes to assertSameElements is usually a bug. '
1133                'Did you mean to use assertEqual?\n'
1134                'Expected: %s\nActual: %s' % (expected_seq, actual_seq))
1135    try:
1136      expected = dict([(element, None) for element in expected_seq])
1137      actual = dict([(element, None) for element in actual_seq])
1138      missing = [element for element in expected if element not in actual]
1139      unexpected = [element for element in actual if element not in expected]
1140      missing.sort()
1141      unexpected.sort()
1142    except TypeError:
1143      # Fall back to slower list-compare if any of the objects are
1144      # not hashable.
1145      expected = list(expected_seq)
1146      actual = list(actual_seq)
1147      expected.sort()
1148      actual.sort()
1149      missing, unexpected = _sorted_list_difference(expected, actual)
1150    errors = []
1151    if msg:
1152      errors.extend((msg, ':\n'))
1153    if missing:
1154      errors.append('Expected, but missing:\n  %r\n' % missing)
1155    if unexpected:
1156      errors.append('Unexpected, but present:\n  %r\n' % unexpected)
1157    if missing or unexpected:
1158      self.fail(''.join(errors))
1159
1160  # unittest.TestCase.assertMultiLineEqual works very similarly, but it
1161  # has a different error format. However, I find this slightly more readable.
1162  def assertMultiLineEqual(self, first, second, msg=None, **kwargs):
1163    """Asserts that two multi-line strings are equal."""
1164    assert isinstance(first, six.string_types), (
1165        'First argument is not a string: %r' % (first,))
1166    assert isinstance(second, six.string_types), (
1167        'Second argument is not a string: %r' % (second,))
1168    line_limit = kwargs.pop('line_limit', 0)
1169    if kwargs:
1170      raise TypeError('Unexpected keyword args {}'.format(tuple(kwargs)))
1171
1172    if first == second:
1173      return
1174    if msg:
1175      failure_message = [msg + ':\n']
1176    else:
1177      failure_message = ['\n']
1178    if line_limit:
1179      line_limit += len(failure_message)
1180    for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)):
1181      failure_message.append(line)
1182      if not line.endswith('\n'):
1183        failure_message.append('\n')
1184    if line_limit and len(failure_message) > line_limit:
1185      n_omitted = len(failure_message) - line_limit
1186      failure_message = failure_message[:line_limit]
1187      failure_message.append(
1188          '(... and {} more delta lines omitted for brevity.)\n'.format(
1189              n_omitted))
1190
1191    raise self.failureException(''.join(failure_message))
1192
1193  def assertBetween(self, value, minv, maxv, msg=None):
1194    """Asserts that value is between minv and maxv (inclusive)."""
1195    msg = self._formatMessage(msg,
1196                              '"%r" unexpectedly not between "%r" and "%r"' %
1197                              (value, minv, maxv))
1198    self.assertTrue(minv <= value, msg)
1199    self.assertTrue(maxv >= value, msg)
1200
1201  # Backport these names so that Py2 code can be written in Py3 style.
1202  if six.PY2:
1203
1204    def assertRegex(self, *args, **kwargs):
1205      return self.assertRegexpMatches(*args, **kwargs)
1206
1207    def assertRaisesRegex(self, *args, **kwargs):
1208      return self.assertRaisesRegexp(*args, **kwargs)
1209
1210    def assertNotRegex(self, *args, **kwargs):
1211      return self.assertNotRegexpMatches(*args, **kwargs)
1212
1213  def assertRegexMatch(self, actual_str, regexes, message=None):
1214    r"""Asserts that at least one regex in regexes matches str.
1215
1216    If possible you should use `assertRegex`, which is a simpler
1217    version of this method. `assertRegex` takes a single regular
1218    expression (a string or re compiled object) instead of a list.
1219
1220    Notes:
1221    1. This function uses substring matching, i.e. the matching
1222       succeeds if *any* substring of the error message matches *any*
1223       regex in the list.  This is more convenient for the user than
1224       full-string matching.
1225
1226    2. If regexes is the empty list, the matching will always fail.
1227
1228    3. Use regexes=[''] for a regex that will always pass.
1229
1230    4. '.' matches any single character *except* the newline.  To
1231       match any character, use '(.|\n)'.
1232
1233    5. '^' matches the beginning of each line, not just the beginning
1234       of the string.  Similarly, '$' matches the end of each line.
1235
1236    6. An exception will be thrown if regexes contains an invalid
1237       regex.
1238
1239    Args:
1240      actual_str:  The string we try to match with the items in regexes.
1241      regexes:  The regular expressions we want to match against str.
1242          See "Notes" above for detailed notes on how this is interpreted.
1243      message:  The message to be printed if the test fails.
1244    """
1245    if isinstance(regexes, _TEXT_OR_BINARY_TYPES):
1246      self.fail('regexes is string or bytes; use assertRegex instead.',
1247                message)
1248    if not regexes:
1249      self.fail('No regexes specified.', message)
1250
1251    regex_type = type(regexes[0])
1252    for regex in regexes[1:]:
1253      if type(regex) is not regex_type:  # pylint: disable=unidiomatic-typecheck
1254        self.fail('regexes list must all be the same type.', message)
1255
1256    if regex_type is bytes and isinstance(actual_str, six.text_type):
1257      regexes = [regex.decode('utf-8') for regex in regexes]
1258      regex_type = six.text_type
1259    elif regex_type is six.text_type and isinstance(actual_str, bytes):
1260      regexes = [regex.encode('utf-8') for regex in regexes]
1261      regex_type = bytes
1262
1263    if regex_type is six.text_type:
1264      regex = u'(?:%s)' % u')|(?:'.join(regexes)
1265    elif regex_type is bytes:
1266      regex = b'(?:' + (b')|(?:'.join(regexes)) + b')'
1267    else:
1268      self.fail('Only know how to deal with unicode str or bytes regexes.',
1269                message)
1270
1271    if not re.search(regex, actual_str, re.MULTILINE):
1272      self.fail('"%s" does not contain any of these regexes: %s.' %
1273                (actual_str, regexes), message)
1274
1275  def assertCommandSucceeds(self, command, regexes=(b'',), env=None,
1276                            close_fds=True, msg=None):
1277    """Asserts that a shell command succeeds (i.e. exits with code 0).
1278
1279    Args:
1280      command: List or string representing the command to run.
1281      regexes: List of regular expression byte strings that match success.
1282      env: Dictionary of environment variable settings. If None, no environment
1283          variables will be set for the child process. This is to make tests
1284          more hermetic. NOTE: this behavior is different than the standard
1285          subprocess module.
1286      close_fds: Whether or not to close all open fd's in the child after
1287          forking.
1288      msg: Optional message to report on failure.
1289    """
1290    (ret_code, err) = get_command_stderr(command, env, close_fds)
1291
1292    # We need bytes regexes here because `err` is bytes.
1293    # Accommodate code which listed their output regexes w/o the b'' prefix by
1294    # converting them to bytes for the user.
1295    if isinstance(regexes[0], six.text_type):
1296      regexes = [regex.encode('utf-8') for regex in regexes]
1297
1298    command_string = get_command_string(command)
1299    self.assertEqual(
1300        ret_code, 0,
1301        self._formatMessage(msg,
1302                            'Running command\n'
1303                            '%s failed with error code %s and message\n'
1304                            '%s' % (_quote_long_string(command_string),
1305                                    ret_code,
1306                                    _quote_long_string(err)))
1307    )
1308    self.assertRegexMatch(
1309        err,
1310        regexes,
1311        message=self._formatMessage(
1312            msg,
1313            'Running command\n'
1314            '%s failed with error code %s and message\n'
1315            '%s which matches no regex in %s' % (
1316                _quote_long_string(command_string),
1317                ret_code,
1318                _quote_long_string(err),
1319                regexes)))
1320
1321  def assertCommandFails(self, command, regexes, env=None, close_fds=True,
1322                         msg=None):
1323    """Asserts a shell command fails and the error matches a regex in a list.
1324
1325    Args:
1326      command: List or string representing the command to run.
1327      regexes: the list of regular expression strings.
1328      env: Dictionary of environment variable settings. If None, no environment
1329          variables will be set for the child process. This is to make tests
1330          more hermetic. NOTE: this behavior is different than the standard
1331          subprocess module.
1332      close_fds: Whether or not to close all open fd's in the child after
1333          forking.
1334      msg: Optional message to report on failure.
1335    """
1336    (ret_code, err) = get_command_stderr(command, env, close_fds)
1337
1338    # We need bytes regexes here because `err` is bytes.
1339    # Accommodate code which listed their output regexes w/o the b'' prefix by
1340    # converting them to bytes for the user.
1341    if isinstance(regexes[0], six.text_type):
1342      regexes = [regex.encode('utf-8') for regex in regexes]
1343
1344    command_string = get_command_string(command)
1345    self.assertNotEqual(
1346        ret_code, 0,
1347        self._formatMessage(msg, 'The following command succeeded '
1348                            'while expected to fail:\n%s' %
1349                            _quote_long_string(command_string)))
1350    self.assertRegexMatch(
1351        err,
1352        regexes,
1353        message=self._formatMessage(
1354            msg,
1355            'Running command\n'
1356            '%s failed with error code %s and message\n'
1357            '%s which matches no regex in %s' % (
1358                _quote_long_string(command_string),
1359                ret_code,
1360                _quote_long_string(err),
1361                regexes)))
1362
1363  class _AssertRaisesContext(object):
1364
1365    def __init__(self, expected_exception, test_case, test_func, msg=None):
1366      self.expected_exception = expected_exception
1367      self.test_case = test_case
1368      self.test_func = test_func
1369      self.msg = msg
1370
1371    def __enter__(self):
1372      return self
1373
1374    def __exit__(self, exc_type, exc_value, tb):
1375      if exc_type is None:
1376        self.test_case.fail(self.expected_exception.__name__ + ' not raised',
1377                            self.msg)
1378      if not issubclass(exc_type, self.expected_exception):
1379        return False
1380      self.test_func(exc_value)
1381      return True
1382
1383  def assertRaisesWithPredicateMatch(self, expected_exception, predicate,
1384                                     callable_obj=None, *args, **kwargs):
1385    """Asserts that exception is thrown and predicate(exception) is true.
1386
1387    Args:
1388      expected_exception: Exception class expected to be raised.
1389      predicate: Function of one argument that inspects the passed-in exception
1390          and returns True (success) or False (please fail the test).
1391      callable_obj: Function to be called.
1392      *args: Extra args.
1393      **kwargs: Extra keyword args.
1394
1395    Returns:
1396      A context manager if callable_obj is None. Otherwise, None.
1397
1398    Raises:
1399      self.failureException if callable_obj does not raise a matching exception.
1400    """
1401    def Check(err):
1402      self.assertTrue(predicate(err),
1403                      '%r does not match predicate %r' % (err, predicate))
1404
1405    context = self._AssertRaisesContext(expected_exception, self, Check)
1406    if callable_obj is None:
1407      return context
1408    with context:
1409      callable_obj(*args, **kwargs)
1410
1411  def assertRaisesWithLiteralMatch(self, expected_exception,
1412                                   expected_exception_message,
1413                                   callable_obj=None, *args, **kwargs):
1414    """Asserts that the message in a raised exception equals the given string.
1415
1416    Unlike assertRaisesRegex, this method takes a literal string, not
1417    a regular expression.
1418
1419    with self.assertRaisesWithLiteralMatch(ExType, 'message'):
1420      DoSomething()
1421
1422    Args:
1423      expected_exception: Exception class expected to be raised.
1424      expected_exception_message: String message expected in the raised
1425          exception.  For a raise exception e, expected_exception_message must
1426          equal str(e).
1427      callable_obj: Function to be called, or None to return a context.
1428      *args: Extra args.
1429      **kwargs: Extra kwargs.
1430
1431    Returns:
1432      A context manager if callable_obj is None. Otherwise, None.
1433
1434    Raises:
1435      self.failureException if callable_obj does not raise a matching exception.
1436    """
1437    def Check(err):
1438      actual_exception_message = str(err)
1439      self.assertTrue(expected_exception_message == actual_exception_message,
1440                      'Exception message does not match.\n'
1441                      'Expected: %r\n'
1442                      'Actual: %r' % (expected_exception_message,
1443                                      actual_exception_message))
1444
1445    context = self._AssertRaisesContext(expected_exception, self, Check)
1446    if callable_obj is None:
1447      return context
1448    with context:
1449      callable_obj(*args, **kwargs)
1450
1451  def assertContainsInOrder(self, strings, target, msg=None):
1452    """Asserts that the strings provided are found in the target in order.
1453
1454    This may be useful for checking HTML output.
1455
1456    Args:
1457      strings: A list of strings, such as [ 'fox', 'dog' ]
1458      target: A target string in which to look for the strings, such as
1459          'The quick brown fox jumped over the lazy dog'.
1460      msg: Optional message to report on failure.
1461    """
1462    if isinstance(strings, (bytes, unicode if str is bytes else str)):
1463      strings = (strings,)
1464
1465    current_index = 0
1466    last_string = None
1467    for string in strings:
1468      index = target.find(str(string), current_index)
1469      if index == -1 and current_index == 0:
1470        self.fail("Did not find '%s' in '%s'" %
1471                  (string, target), msg)
1472      elif index == -1:
1473        self.fail("Did not find '%s' after '%s' in '%s'" %
1474                  (string, last_string, target), msg)
1475      last_string = string
1476      current_index = index
1477
1478  def assertContainsSubsequence(self, container, subsequence, msg=None):
1479    """Asserts that "container" contains "subsequence" as a subsequence.
1480
1481    Asserts that "container" contains all the elements of "subsequence", in
1482    order, but possibly with other elements interspersed. For example, [1, 2, 3]
1483    is a subsequence of [0, 0, 1, 2, 0, 3, 0] but not of [0, 0, 1, 3, 0, 2, 0].
1484
1485    Args:
1486      container: the list we're testing for subsequence inclusion.
1487      subsequence: the list we hope will be a subsequence of container.
1488      msg: Optional message to report on failure.
1489    """
1490    first_nonmatching = None
1491    reversed_container = list(reversed(container))
1492    subsequence = list(subsequence)
1493
1494    for e in subsequence:
1495      if e not in reversed_container:
1496        first_nonmatching = e
1497        break
1498      while e != reversed_container.pop():
1499        pass
1500
1501    if first_nonmatching is not None:
1502      self.fail('%s not a subsequence of %s. First non-matching element: %s' %
1503                (subsequence, container, first_nonmatching), msg)
1504
1505  def assertContainsExactSubsequence(self, container, subsequence, msg=None):
1506    """Asserts that "container" contains "subsequence" as an exact subsequence.
1507
1508    Asserts that "container" contains all the elements of "subsequence", in
1509    order, and without other elements interspersed. For example, [1, 2, 3] is an
1510    exact subsequence of [0, 0, 1, 2, 3, 0] but not of [0, 0, 1, 2, 0, 3, 0].
1511
1512    Args:
1513      container: the list we're testing for subsequence inclusion.
1514      subsequence: the list we hope will be an exact subsequence of container.
1515      msg: Optional message to report on failure.
1516    """
1517    container = list(container)
1518    subsequence = list(subsequence)
1519    longest_match = 0
1520
1521    for start in xrange(1 + len(container) - len(subsequence)):
1522      if longest_match == len(subsequence):
1523        break
1524      index = 0
1525      while (index < len(subsequence) and
1526             subsequence[index] == container[start + index]):
1527        index += 1
1528      longest_match = max(longest_match, index)
1529
1530    if longest_match < len(subsequence):
1531      self.fail('%s not an exact subsequence of %s. '
1532                'Longest matching prefix: %s' %
1533                (subsequence, container, subsequence[:longest_match]), msg)
1534
1535  def assertTotallyOrdered(self, *groups, **kwargs):
1536    """Asserts that total ordering has been implemented correctly.
1537
1538    For example, say you have a class A that compares only on its attribute x.
1539    Comparators other than __lt__ are omitted for brevity.
1540
1541    class A(object):
1542      def __init__(self, x, y):
1543        self.x = x
1544        self.y = y
1545
1546      def __hash__(self):
1547        return hash(self.x)
1548
1549      def __lt__(self, other):
1550        try:
1551          return self.x < other.x
1552        except AttributeError:
1553          return NotImplemented
1554
1555    assertTotallyOrdered will check that instances can be ordered correctly.
1556    For example,
1557
1558    self.assertTotallyOrdered(
1559      [None],  # None should come before everything else.
1560      [1],     # Integers sort earlier.
1561      [A(1, 'a')],
1562      [A(2, 'b')],  # 2 is after 1.
1563      [A(3, 'c'), A(3, 'd')],  # The second argument is irrelevant.
1564      [A(4, 'z')],
1565      ['foo'])  # Strings sort last.
1566
1567    Args:
1568     *groups: A list of groups of elements.  Each group of elements is a list
1569         of objects that are equal.  The elements in each group must be less
1570         than the elements in the group after it.  For example, these groups are
1571         totally ordered: [None], [1], [2, 2], [3].
1572      **kwargs: optional msg keyword argument can be passed.
1573    """
1574
1575    def CheckOrder(small, big):
1576      """Ensures small is ordered before big."""
1577      self.assertFalse(small == big,
1578                       self._formatMessage(msg, '%r unexpectedly equals %r' %
1579                                           (small, big)))
1580      self.assertTrue(small != big,
1581                      self._formatMessage(msg, '%r unexpectedly equals %r' %
1582                                          (small, big)))
1583      self.assertLess(small, big, msg)
1584      self.assertFalse(big < small,
1585                       self._formatMessage(msg,
1586                                           '%r unexpectedly less than %r' %
1587                                           (big, small)))
1588      self.assertLessEqual(small, big, msg)
1589      self.assertFalse(big <= small, self._formatMessage(
1590          '%r unexpectedly less than or equal to %r' % (big, small), msg
1591      ))
1592      self.assertGreater(big, small, msg)
1593      self.assertFalse(small > big,
1594                       self._formatMessage(msg,
1595                                           '%r unexpectedly greater than %r' %
1596                                           (small, big)))
1597      self.assertGreaterEqual(big, small)
1598      self.assertFalse(small >= big, self._formatMessage(
1599          msg,
1600          '%r unexpectedly greater than or equal to %r' % (small, big)))
1601
1602    def CheckEqual(a, b):
1603      """Ensures that a and b are equal."""
1604      self.assertEqual(a, b, msg)
1605      self.assertFalse(a != b,
1606                       self._formatMessage(msg, '%r unexpectedly unequals %r' %
1607                                           (a, b)))
1608
1609      # Objects that compare equal must hash to the same value, but this only
1610      # applies if both objects are hashable.
1611      if (isinstance(a, abc.Hashable) and
1612          isinstance(b, abc.Hashable)):
1613        self.assertEqual(
1614            hash(a), hash(b),
1615            self._formatMessage(
1616                msg, 'hash %d of %r unexpectedly not equal to hash %d of %r' %
1617                (hash(a), a, hash(b), b)))
1618
1619      self.assertFalse(a < b,
1620                       self._formatMessage(msg,
1621                                           '%r unexpectedly less than %r' %
1622                                           (a, b)))
1623      self.assertFalse(b < a,
1624                       self._formatMessage(msg,
1625                                           '%r unexpectedly less than %r' %
1626                                           (b, a)))
1627      self.assertLessEqual(a, b, msg)
1628      self.assertLessEqual(b, a, msg)
1629      self.assertFalse(a > b,
1630                       self._formatMessage(msg,
1631                                           '%r unexpectedly greater than %r' %
1632                                           (a, b)))
1633      self.assertFalse(b > a,
1634                       self._formatMessage(msg,
1635                                           '%r unexpectedly greater than %r' %
1636                                           (b, a)))
1637      self.assertGreaterEqual(a, b, msg)
1638      self.assertGreaterEqual(b, a, msg)
1639
1640    msg = kwargs.get('msg')
1641
1642    # For every combination of elements, check the order of every pair of
1643    # elements.
1644    for elements in itertools.product(*groups):
1645      elements = list(elements)
1646      for index, small in enumerate(elements[:-1]):
1647        for big in elements[index + 1:]:
1648          CheckOrder(small, big)
1649
1650    # Check that every element in each group is equal.
1651    for group in groups:
1652      for a in group:
1653        CheckEqual(a, a)
1654      for a, b in itertools.product(group, group):
1655        CheckEqual(a, b)
1656
1657  def assertDictEqual(self, a, b, msg=None):
1658    """Raises AssertionError if a and b are not equal dictionaries.
1659
1660    Args:
1661      a: A dict, the expected value.
1662      b: A dict, the actual value.
1663      msg: An optional str, the associated message.
1664
1665    Raises:
1666      AssertionError: if the dictionaries are not equal.
1667    """
1668    self.assertIsInstance(a, dict, self._formatMessage(
1669        msg,
1670        'First argument is not a dictionary'
1671    ))
1672    self.assertIsInstance(b, dict, self._formatMessage(
1673        msg,
1674        'Second argument is not a dictionary'
1675    ))
1676
1677    def Sorted(list_of_items):
1678      try:
1679        return sorted(list_of_items)  # In 3.3, unordered are possible.
1680      except TypeError:
1681        return list_of_items
1682
1683    if a == b:
1684      return
1685    a_items = Sorted(list(six.iteritems(a)))
1686    b_items = Sorted(list(six.iteritems(b)))
1687
1688    unexpected = []
1689    missing = []
1690    different = []
1691
1692    safe_repr = unittest.util.safe_repr  # pytype: disable=module-attr
1693
1694    def Repr(dikt):
1695      """Deterministic repr for dict."""
1696      # Sort the entries based on their repr, not based on their sort order,
1697      # which will be non-deterministic across executions, for many types.
1698      entries = sorted((safe_repr(k), safe_repr(v))
1699                       for k, v in six.iteritems(dikt))
1700      return '{%s}' % (', '.join('%s: %s' % pair for pair in entries))
1701
1702    message = ['%s != %s%s' % (Repr(a), Repr(b), ' (%s)' % msg if msg else '')]
1703
1704    # The standard library default output confounds lexical difference with
1705    # value difference; treat them separately.
1706    for a_key, a_value in a_items:
1707      if a_key not in b:
1708        missing.append((a_key, a_value))
1709      elif a_value != b[a_key]:
1710        different.append((a_key, a_value, b[a_key]))
1711
1712    for b_key, b_value in b_items:
1713      if b_key not in a:
1714        unexpected.append((b_key, b_value))
1715
1716    if unexpected:
1717      message.append(
1718          'Unexpected, but present entries:\n%s' % ''.join(
1719              '%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in unexpected))
1720
1721    if different:
1722      message.append(
1723          'repr() of differing entries:\n%s' % ''.join(
1724              '%s: %s != %s\n' % (safe_repr(k), safe_repr(a_value),
1725                                  safe_repr(b_value))
1726              for k, a_value, b_value in different))
1727
1728    if missing:
1729      message.append(
1730          'Missing entries:\n%s' % ''.join(
1731              ('%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in missing)))
1732
1733    raise self.failureException('\n'.join(message))
1734
1735  def assertUrlEqual(self, a, b, msg=None):
1736    """Asserts that urls are equal, ignoring ordering of query params."""
1737    parsed_a = urllib.parse.urlparse(a)
1738    parsed_b = urllib.parse.urlparse(b)
1739    self.assertEqual(parsed_a.scheme, parsed_b.scheme, msg)
1740    self.assertEqual(parsed_a.netloc, parsed_b.netloc, msg)
1741    self.assertEqual(parsed_a.path, parsed_b.path, msg)
1742    self.assertEqual(parsed_a.fragment, parsed_b.fragment, msg)
1743    self.assertEqual(sorted(parsed_a.params.split(';')),
1744                     sorted(parsed_b.params.split(';')), msg)
1745    self.assertDictEqual(
1746        urllib.parse.parse_qs(parsed_a.query, keep_blank_values=True),
1747        urllib.parse.parse_qs(parsed_b.query, keep_blank_values=True), msg)
1748
1749  def assertSameStructure(self, a, b, aname='a', bname='b', msg=None):
1750    """Asserts that two values contain the same structural content.
1751
1752    The two arguments should be data trees consisting of trees of dicts and
1753    lists. They will be deeply compared by walking into the contents of dicts
1754    and lists; other items will be compared using the == operator.
1755    If the two structures differ in content, the failure message will indicate
1756    the location within the structures where the first difference is found.
1757    This may be helpful when comparing large structures.
1758
1759    Mixed Sequence and Set types are supported. Mixed Mapping types are
1760    supported, but the order of the keys will not be considered in the
1761    comparison.
1762
1763    Args:
1764      a: The first structure to compare.
1765      b: The second structure to compare.
1766      aname: Variable name to use for the first structure in assertion messages.
1767      bname: Variable name to use for the second structure.
1768      msg: Additional text to include in the failure message.
1769    """
1770
1771    # Accumulate all the problems found so we can report all of them at once
1772    # rather than just stopping at the first
1773    problems = []
1774
1775    _walk_structure_for_problems(a, b, aname, bname, problems)
1776
1777    # Avoid spamming the user toooo much
1778    if self.maxDiff is not None:
1779      max_problems_to_show = self.maxDiff // 80
1780      if len(problems) > max_problems_to_show:
1781        problems = problems[0:max_problems_to_show-1] + ['...']
1782
1783    if problems:
1784      self.fail('; '.join(problems), msg)
1785
1786  def assertJsonEqual(self, first, second, msg=None):
1787    """Asserts that the JSON objects defined in two strings are equal.
1788
1789    A summary of the differences will be included in the failure message
1790    using assertSameStructure.
1791
1792    Args:
1793      first: A string containing JSON to decode and compare to second.
1794      second: A string containing JSON to decode and compare to first.
1795      msg: Additional text to include in the failure message.
1796    """
1797    try:
1798      first_structured = json.loads(first)
1799    except ValueError as e:
1800      raise ValueError(self._formatMessage(
1801          msg,
1802          'could not decode first JSON value %s: %s' % (first, e)))
1803
1804    try:
1805      second_structured = json.loads(second)
1806    except ValueError as e:
1807      raise ValueError(self._formatMessage(
1808          msg,
1809          'could not decode second JSON value %s: %s' % (second, e)))
1810
1811    self.assertSameStructure(first_structured, second_structured,
1812                             aname='first', bname='second', msg=msg)
1813
1814  def _getAssertEqualityFunc(self, first, second):
1815    # type: (Any, Any) -> Callable[..., None]
1816    try:
1817      return super(TestCase, self)._getAssertEqualityFunc(first, second)
1818    except AttributeError:
1819      # This is a workaround if unittest.TestCase.__init__ was never run.
1820      # It usually means that somebody created a subclass just for the
1821      # assertions and has overridden __init__. "assertTrue" is a safe
1822      # value that will not make __init__ raise a ValueError.
1823      test_method = getattr(self, '_testMethodName', 'assertTrue')
1824      super(TestCase, self).__init__(test_method)
1825
1826    return super(TestCase, self)._getAssertEqualityFunc(first, second)
1827
1828  def fail(self, msg=None, prefix=None):
1829    """Fail immediately with the given message, optionally prefixed."""
1830    return super(TestCase, self).fail(self._formatMessage(prefix, msg))
1831
1832
1833def _sorted_list_difference(expected, actual):
1834  # type: (List[_T], List[_T]) -> Tuple[List[_T], List[_T]]
1835  """Finds elements in only one or the other of two, sorted input lists.
1836
1837  Returns a two-element tuple of lists.  The first list contains those
1838  elements in the "expected" list but not in the "actual" list, and the
1839  second contains those elements in the "actual" list but not in the
1840  "expected" list.  Duplicate elements in either input list are ignored.
1841
1842  Args:
1843    expected:  The list we expected.
1844    actual:  The list we actualy got.
1845  Returns:
1846    (missing, unexpected)
1847    missing: items in expected that are not in actual.
1848    unexpected: items in actual that are not in expected.
1849  """
1850  i = j = 0
1851  missing = []
1852  unexpected = []
1853  while True:
1854    try:
1855      e = expected[i]
1856      a = actual[j]
1857      if e < a:
1858        missing.append(e)
1859        i += 1
1860        while expected[i] == e:
1861          i += 1
1862      elif e > a:
1863        unexpected.append(a)
1864        j += 1
1865        while actual[j] == a:
1866          j += 1
1867      else:
1868        i += 1
1869        try:
1870          while expected[i] == e:
1871            i += 1
1872        finally:
1873          j += 1
1874          while actual[j] == a:
1875            j += 1
1876    except IndexError:
1877      missing.extend(expected[i:])
1878      unexpected.extend(actual[j:])
1879      break
1880  return missing, unexpected
1881
1882
1883def _are_both_of_integer_type(a, b):
1884  # type: (object, object) -> bool
1885  return isinstance(a, six.integer_types) and isinstance(b, six.integer_types)
1886
1887
1888def _are_both_of_sequence_type(a, b):
1889  # type: (object, object) -> bool
1890  return isinstance(a, abc.Sequence) and isinstance(
1891      b, abc.Sequence) and not isinstance(
1892          a, _TEXT_OR_BINARY_TYPES) and not isinstance(b, _TEXT_OR_BINARY_TYPES)
1893
1894
1895def _are_both_of_set_type(a, b):
1896  # type: (object, object) -> bool
1897  return isinstance(a, abc.Set) and isinstance(b, abc.Set)
1898
1899
1900def _are_both_of_mapping_type(a, b):
1901  # type: (object, object) -> bool
1902  return isinstance(a, abc.Mapping) and isinstance(
1903      b, abc.Mapping)
1904
1905
1906def _walk_structure_for_problems(a, b, aname, bname, problem_list):
1907  """The recursive comparison behind assertSameStructure."""
1908  if type(a) != type(b) and not (  # pylint: disable=unidiomatic-typecheck
1909      _are_both_of_integer_type(a, b) or _are_both_of_sequence_type(a, b) or
1910      _are_both_of_set_type(a, b) or _are_both_of_mapping_type(a, b)):
1911    # We do not distinguish between int and long types as 99.99% of Python 2
1912    # code should never care.  They collapse into a single type in Python 3.
1913    problem_list.append('%s is a %r but %s is a %r' %
1914                        (aname, type(a), bname, type(b)))
1915    # If they have different types there's no point continuing
1916    return
1917
1918  if isinstance(a, abc.Set):
1919    for k in a:
1920      if k not in b:
1921        problem_list.append(
1922            '%s has %r but %s does not' % (aname, k, bname))
1923    for k in b:
1924      if k not in a:
1925        problem_list.append('%s lacks %r but %s has it' % (aname, k, bname))
1926
1927  # NOTE: a or b could be a defaultdict, so we must take care that the traversal
1928  # doesn't modify the data.
1929  elif isinstance(a, abc.Mapping):
1930    for k in a:
1931      if k in b:
1932        _walk_structure_for_problems(
1933            a[k], b[k], '%s[%r]' % (aname, k), '%s[%r]' % (bname, k),
1934            problem_list)
1935      else:
1936        problem_list.append(
1937            "%s has [%r] with value %r but it's missing in %s" %
1938            (aname, k, a[k], bname))
1939    for k in b:
1940      if k not in a:
1941        problem_list.append(
1942            '%s lacks [%r] but %s has it with value %r' %
1943            (aname, k, bname, b[k]))
1944
1945  # Strings/bytes are Sequences but we'll just do those with regular !=
1946  elif (isinstance(a, abc.Sequence) and
1947        not isinstance(a, _TEXT_OR_BINARY_TYPES)):
1948    minlen = min(len(a), len(b))
1949    for i in xrange(minlen):
1950      _walk_structure_for_problems(
1951          a[i], b[i], '%s[%d]' % (aname, i), '%s[%d]' % (bname, i),
1952          problem_list)
1953    for i in xrange(minlen, len(a)):
1954      problem_list.append('%s has [%i] with value %r but %s does not' %
1955                          (aname, i, a[i], bname))
1956    for i in xrange(minlen, len(b)):
1957      problem_list.append('%s lacks [%i] but %s has it with value %r' %
1958                          (aname, i, bname, b[i]))
1959
1960  else:
1961    if a != b:
1962      problem_list.append('%s is %r but %s is %r' % (aname, a, bname, b))
1963
1964
1965def get_command_string(command):
1966  """Returns an escaped string that can be used as a shell command.
1967
1968  Args:
1969    command: List or string representing the command to run.
1970  Returns:
1971    A string suitable for use as a shell command.
1972  """
1973  if isinstance(command, six.string_types):
1974    return command
1975  else:
1976    if os.name == 'nt':
1977      return ' '.join(command)
1978    else:
1979      # The following is identical to Python 3's shlex.quote function.
1980      command_string = ''
1981      for word in command:
1982        # Single quote word, and replace each ' in word with '"'"'
1983        command_string += "'" + word.replace("'", "'\"'\"'") + "' "
1984      return command_string[:-1]
1985
1986
1987def get_command_stderr(command, env=None, close_fds=True):
1988  """Runs the given shell command and returns a tuple.
1989
1990  Args:
1991    command: List or string representing the command to run.
1992    env: Dictionary of environment variable settings. If None, no environment
1993        variables will be set for the child process. This is to make tests
1994        more hermetic. NOTE: this behavior is different than the standard
1995        subprocess module.
1996    close_fds: Whether or not to close all open fd's in the child after forking.
1997        On Windows, this is ignored and close_fds is always False.
1998
1999  Returns:
2000    Tuple of (exit status, text printed to stdout and stderr by the command).
2001  """
2002  if env is None: env = {}
2003  if os.name == 'nt':
2004    # Windows does not support setting close_fds to True while also redirecting
2005    # standard handles.
2006    close_fds = False
2007
2008  use_shell = isinstance(command, six.string_types)
2009  process = subprocess.Popen(
2010      command,
2011      close_fds=close_fds,
2012      env=env,
2013      shell=use_shell,
2014      stderr=subprocess.STDOUT,
2015      stdout=subprocess.PIPE)
2016  output = process.communicate()[0]
2017  exit_status = process.wait()
2018  return (exit_status, output)
2019
2020
2021def _quote_long_string(s):
2022  # type: (Union[Text, bytes, bytearray]) -> Text
2023  """Quotes a potentially multi-line string to make the start and end obvious.
2024
2025  Args:
2026    s: A string.
2027
2028  Returns:
2029    The quoted string.
2030  """
2031  if isinstance(s, (bytes, bytearray)):
2032    try:
2033      s = s.decode('utf-8')
2034    except UnicodeDecodeError:
2035      s = str(s)
2036  return ('8<-----------\n' +
2037          s + '\n' +
2038          '----------->8\n')
2039
2040
2041def print_python_version():
2042  # type: () -> None
2043  # Having this in the test output logs by default helps debugging when all
2044  # you've got is the log and no other idea of which Python was used.
2045  sys.stderr.write('Running tests under Python {0[0]}.{0[1]}.{0[2]}: '
2046                   '{1}\n'.format(
2047                       sys.version_info,
2048                       sys.executable if sys.executable else 'embedded.'))
2049
2050
2051def main(*args, **kwargs):
2052  # type: (Text, Any) -> None
2053  """Executes a set of Python unit tests.
2054
2055  Usually this function is called without arguments, so the
2056  unittest.TestProgram instance will get created with the default settings,
2057  so it will run all test methods of all TestCase classes in the __main__
2058  module.
2059
2060  Args:
2061    *args: Positional arguments passed through to unittest.TestProgram.__init__.
2062    **kwargs: Keyword arguments passed through to unittest.TestProgram.__init__.
2063  """
2064  print_python_version()
2065  _run_in_app(run_tests, args, kwargs)
2066
2067
2068def _is_in_app_main():
2069  # type: () -> bool
2070  """Returns True iff app.run is active."""
2071  f = sys._getframe().f_back  # pylint: disable=protected-access
2072  while f:
2073    if f.f_code == six.get_function_code(app.run):  # pytype: disable=wrong-arg-types
2074      return True
2075    f = f.f_back
2076  return False
2077
2078
2079class _SavedFlag(object):
2080  """Helper class for saving and restoring a flag value."""
2081
2082  def __init__(self, flag):
2083    self.flag = flag
2084    self.value = flag.value
2085    self.present = flag.present
2086
2087  def restore_flag(self):
2088    self.flag.value = self.value
2089    self.flag.present = self.present
2090
2091
2092def _register_sigterm_with_faulthandler():
2093  # type: () -> None
2094  """Have faulthandler dump stacks on SIGTERM.  Useful to diagnose timeouts."""
2095  if faulthandler and getattr(faulthandler, 'register', None):
2096    # faulthandler.register is not avaiable on Windows.
2097    # faulthandler.enable() is already called by app.run.
2098    try:
2099      faulthandler.register(signal.SIGTERM, chain=True)  # pytype: disable=module-attr
2100    except Exception as e:  # pylint: disable=broad-except
2101      sys.stderr.write('faulthandler.register(SIGTERM) failed '
2102                       '%r; ignoring.\n' % e)
2103
2104
2105def _run_in_app(function, args, kwargs):
2106  # type: (Callable[..., None], Sequence[Text], Mapping[Text, Any]) -> None
2107  """Executes a set of Python unit tests, ensuring app.run.
2108
2109  This is a private function, users should call absltest.main().
2110
2111  _run_in_app calculates argv to be the command-line arguments of this program
2112  (without the flags), sets the default of FLAGS.alsologtostderr to True,
2113  then it calls function(argv, args, kwargs), making sure that `function'
2114  will get called within app.run(). _run_in_app does this by checking whether
2115  it is called by app.run(), or by calling app.run() explicitly.
2116
2117  The reason why app.run has to be ensured is to make sure that
2118  flags are parsed and stripped properly, and other initializations done by
2119  the app module are also carried out, no matter if absltest.run() is called
2120  from within or outside app.run().
2121
2122  If _run_in_app is called from within app.run(), then it will reparse
2123  sys.argv and pass the result without command-line flags into the argv
2124  argument of `function'. The reason why this parsing is needed is that
2125  __main__.main() calls absltest.main() without passing its argv. So the
2126  only way _run_in_app could get to know the argv without the flags is that
2127  it reparses sys.argv.
2128
2129  _run_in_app changes the default of FLAGS.alsologtostderr to True so that the
2130  test program's stderr will contain all the log messages unless otherwise
2131  specified on the command-line. This overrides any explicit assignment to
2132  FLAGS.alsologtostderr by the test program prior to the call to _run_in_app()
2133  (e.g. in __main__.main).
2134
2135  Please note that _run_in_app (and the function it calls) is allowed to make
2136  changes to kwargs.
2137
2138  Args:
2139    function: absltest.run_tests or a similar function. It will be called as
2140        function(argv, args, kwargs) where argv is a list containing the
2141        elements of sys.argv without the command-line flags.
2142    args: Positional arguments passed through to unittest.TestProgram.__init__.
2143    kwargs: Keyword arguments passed through to unittest.TestProgram.__init__.
2144  """
2145  if _is_in_app_main():
2146    _register_sigterm_with_faulthandler()
2147
2148    # Save command-line flags so the side effects of FLAGS(sys.argv) can be
2149    # undone.
2150    flag_objects = (FLAGS[name] for name in FLAGS)
2151    saved_flags = dict((f.name, _SavedFlag(f)) for f in flag_objects)
2152
2153    # Change the default of alsologtostderr from False to True, so the test
2154    # programs's stderr will contain all the log messages.
2155    # If --alsologtostderr=false is specified in the command-line, or user
2156    # has called FLAGS.alsologtostderr = False before, then the value is kept
2157    # False.
2158    FLAGS.set_default('alsologtostderr', True)
2159    # Remove it from saved flags so it doesn't get restored later.
2160    del saved_flags['alsologtostderr']
2161
2162    # The call FLAGS(sys.argv) parses sys.argv, returns the arguments
2163    # without the flags, and -- as a side effect -- modifies flag values in
2164    # FLAGS. We don't want the side effect, because we don't want to
2165    # override flag changes the program did (e.g. in __main__.main)
2166    # after the command-line has been parsed. So we have the for loop below
2167    # to change back flags to their old values.
2168    argv = FLAGS(sys.argv)
2169    for saved_flag in six.itervalues(saved_flags):
2170      saved_flag.restore_flag()
2171
2172    function(argv, args, kwargs)
2173  else:
2174    # Send logging to stderr. Use --alsologtostderr instead of --logtostderr
2175    # in case tests are reading their own logs.
2176    FLAGS.set_default('alsologtostderr', True)
2177
2178    def main_function(argv):
2179      _register_sigterm_with_faulthandler()
2180      function(argv, args, kwargs)
2181
2182    app.run(main=main_function)
2183
2184
2185def _is_suspicious_attribute(testCaseClass, name):
2186  # type: (Type, Text) -> bool
2187  """Returns True if an attribute is a method named like a test method."""
2188  if name.startswith('Test') and len(name) > 4 and name[4].isupper():
2189    attr = getattr(testCaseClass, name)
2190    if inspect.isfunction(attr) or inspect.ismethod(attr):
2191      if six.PY2:
2192        args = inspect.getargspec(attr)
2193        return (len(args.args) == 1 and args.args[0] == 'self'
2194                and args.varargs is None and args.keywords is None)
2195      else:
2196        args = inspect.getfullargspec(attr)
2197        return (len(args.args) == 1 and args.args[0] == 'self'
2198                and args.varargs is None and args.varkw is None and
2199                not args.kwonlyargs)
2200  return False
2201
2202
2203def skipThisClass(reason):
2204  # type: (Text) -> Callable[[_T], _T]
2205  """Skip tests in the decorated TestCase, but not any of its subclasses.
2206
2207  This decorator indicates that this class should skip all its tests, but not
2208  any of its subclasses. Useful for if you want to share testMethod or setUp
2209  implementations between a number of concrete testcase classes.
2210
2211  Example usage, showing how you can share some common test methods between
2212  subclasses. In this example, only 'BaseTest' will be marked as skipped, and
2213  not RealTest or SecondRealTest:
2214
2215    @absltest.skipThisClass("Shared functionality")
2216    class BaseTest(absltest.TestCase):
2217      def test_simple_functionality(self):
2218        self.assertEqual(self.system_under_test.method(), 1)
2219
2220    class RealTest(BaseTest):
2221      def setUp(self):
2222        super().setUp()
2223        self.system_under_test = MakeSystem(argument)
2224
2225      def test_specific_behavior(self):
2226        ...
2227
2228    class SecondRealTest(BaseTest):
2229      def setUp(self):
2230        super().setUp()
2231        self.system_under_test = MakeSystem(other_arguments)
2232
2233      def test_other_behavior(self):
2234        ...
2235
2236  Args:
2237    reason: The reason we have a skip in place. For instance: 'shared test
2238      methods' or 'shared assertion methods'.
2239
2240  Returns:
2241    Decorator function that will cause a class to be skipped.
2242  """
2243  if isinstance(reason, type):
2244    raise TypeError('Got {!r}, expected reason as string'.format(reason))
2245
2246  def _skip_class(test_case_class):
2247    if not issubclass(test_case_class, unittest.TestCase):
2248      raise TypeError(
2249          'Decorating {!r}, expected TestCase subclass'.format(test_case_class))
2250
2251    # Only shadow the setUpClass method if it is directly defined. If it is
2252    # in the parent class we invoke it via a super() call instead of holding
2253    # a reference to it.
2254    shadowed_setupclass = test_case_class.__dict__.get('setUpClass', None)
2255
2256    @classmethod
2257    def replacement_setupclass(cls, *args, **kwargs):
2258      # Skip this class if it is the one that was decorated with @skipThisClass
2259      if cls is test_case_class:
2260        raise SkipTest(reason)
2261      if shadowed_setupclass:
2262        # Pass along `cls` so the MRO chain doesn't break.
2263        # The original method is a `classmethod` descriptor, which can't
2264        # be directly called, but `__func__` has the underlying function.
2265        return shadowed_setupclass.__func__(cls, *args, **kwargs)
2266      else:
2267        # Because there's no setUpClass() defined directly on test_case_class,
2268        # we call super() ourselves to continue execution of the inheritance
2269        # chain.
2270        return super(test_case_class, cls).setUpClass(*args, **kwargs)
2271
2272    test_case_class.setUpClass = replacement_setupclass
2273    return test_case_class
2274
2275  return _skip_class
2276
2277
2278class TestLoader(unittest.TestLoader):
2279  """A test loader which supports common test features.
2280
2281  Supported features include:
2282   * Banning untested methods with test-like names: methods attached to this
2283     testCase with names starting with `Test` are ignored by the test runner,
2284     and often represent mistakenly-omitted test cases. This loader will raise
2285     a TypeError when attempting to load a TestCase with such methods.
2286   * Randomization of test case execution order (optional).
2287  """
2288
2289  _ERROR_MSG = textwrap.dedent("""Method '%s' is named like a test case but
2290  is not one. This is often a bug. If you want it to be a test method,
2291  name it with 'test' in lowercase. If not, rename the method to not begin
2292  with 'Test'.""")
2293
2294  def __init__(self, *args, **kwds):
2295    super(TestLoader, self).__init__(*args, **kwds)
2296    seed = _get_default_randomize_ordering_seed()
2297    if seed:
2298      self._randomize_ordering_seed = seed
2299      self._random = random.Random(self._randomize_ordering_seed)
2300    else:
2301      self._randomize_ordering_seed = None
2302      self._random = None
2303
2304  def getTestCaseNames(self, testCaseClass):  # pylint:disable=invalid-name
2305    """Validates and returns a (possibly randomized) list of test case names."""
2306    for name in dir(testCaseClass):
2307      if _is_suspicious_attribute(testCaseClass, name):
2308        raise TypeError(TestLoader._ERROR_MSG % name)
2309    names = super(TestLoader, self).getTestCaseNames(testCaseClass)
2310    if self._randomize_ordering_seed is not None:
2311      logging.info(
2312          'Randomizing test order with seed: %d', self._randomize_ordering_seed)
2313      logging.info(
2314          'To reproduce this order, re-run with '
2315          '--test_randomize_ordering_seed=%d', self._randomize_ordering_seed)
2316      self._random.shuffle(names)
2317    return names
2318
2319
2320def get_default_xml_output_filename():
2321  # type: () -> Optional[Text]
2322  if os.environ.get('XML_OUTPUT_FILE'):
2323    return os.environ['XML_OUTPUT_FILE']
2324  elif os.environ.get('RUNNING_UNDER_TEST_DAEMON'):
2325    return os.path.join(os.path.dirname(TEST_TMPDIR.value), 'test_detail.xml')
2326  elif os.environ.get('TEST_XMLOUTPUTDIR'):
2327    return os.path.join(
2328        os.environ['TEST_XMLOUTPUTDIR'],
2329        os.path.splitext(os.path.basename(sys.argv[0]))[0] + '.xml')
2330
2331
2332def _setup_filtering(argv):
2333  # type: (MutableSequence[Text]) -> None
2334  """Implements the bazel test filtering protocol.
2335
2336  The following environment variable is used in this method:
2337
2338    TESTBRIDGE_TEST_ONLY: string, if set, is forwarded to the unittest
2339      framework to use as a test filter. Its value is split with shlex, then:
2340      1. On Python 3.6 and before, split values are passed as positional
2341         arguments on argv.
2342      2. On Python 3.7+, split values are passed to unittest's `-k` flag. Tests
2343         are matched by glob patterns or substring. See
2344         https://docs.python.org/3/library/unittest.html#cmdoption-unittest-k
2345
2346  Args:
2347    argv: the argv to mutate in-place.
2348  """
2349  test_filter = os.environ.get('TESTBRIDGE_TEST_ONLY')
2350  if argv is None or not test_filter:
2351    return
2352
2353  filters = shlex.split(test_filter)
2354  if sys.version_info[:2] >= (3, 7):
2355    filters = ['-k=' + test_filter for test_filter in filters]
2356
2357  argv[1:1] = filters
2358
2359
2360def _setup_test_runner_fail_fast(argv):
2361  # type: (MutableSequence[Text]) -> None
2362  """Implements the bazel test fail fast protocol.
2363
2364  The following environment variable is used in this method:
2365
2366    TESTBRIDGE_TEST_RUNNER_FAIL_FAST=<1|0>
2367
2368  If set to 1, --failfast is passed to the unittest framework to return upon
2369  first failure.
2370
2371  Args:
2372    argv: the argv to mutate in-place.
2373  """
2374
2375  if argv is None:
2376    return
2377
2378  if os.environ.get('TESTBRIDGE_TEST_RUNNER_FAIL_FAST') != '1':
2379    return
2380
2381  argv[1:1] = ['--failfast']
2382
2383
2384def _setup_sharding(custom_loader=None):
2385  # type: (Optional[unittest.TestLoader]) -> unittest.TestLoader
2386  """Implements the bazel sharding protocol.
2387
2388  The following environment variables are used in this method:
2389
2390    TEST_SHARD_STATUS_FILE: string, if set, points to a file. We write a blank
2391      file to tell the test runner that this test implements the test sharding
2392      protocol.
2393
2394    TEST_TOTAL_SHARDS: int, if set, sharding is requested.
2395
2396    TEST_SHARD_INDEX: int, must be set if TEST_TOTAL_SHARDS is set. Specifies
2397      the shard index for this instance of the test process. Must satisfy:
2398      0 <= TEST_SHARD_INDEX < TEST_TOTAL_SHARDS.
2399
2400  Args:
2401    custom_loader: A TestLoader to be made sharded.
2402
2403  Returns:
2404    The test loader for shard-filtering or the standard test loader, depending
2405    on the sharding environment variables.
2406  """
2407
2408  # It may be useful to write the shard file even if the other sharding
2409  # environment variables are not set. Test runners may use this functionality
2410  # to query whether a test binary implements the test sharding protocol.
2411  if 'TEST_SHARD_STATUS_FILE' in os.environ:
2412    try:
2413      f = None
2414      try:
2415        f = open(os.environ['TEST_SHARD_STATUS_FILE'], 'w')
2416        f.write('')
2417      except IOError:
2418        sys.stderr.write('Error opening TEST_SHARD_STATUS_FILE (%s). Exiting.'
2419                         % os.environ['TEST_SHARD_STATUS_FILE'])
2420        sys.exit(1)
2421    finally:
2422      if f is not None: f.close()
2423
2424  base_loader = custom_loader or TestLoader()
2425  if 'TEST_TOTAL_SHARDS' not in os.environ:
2426    # Not using sharding, use the expected test loader.
2427    return base_loader
2428
2429  total_shards = int(os.environ['TEST_TOTAL_SHARDS'])
2430  shard_index = int(os.environ['TEST_SHARD_INDEX'])
2431
2432  if shard_index < 0 or shard_index >= total_shards:
2433    sys.stderr.write('ERROR: Bad sharding values. index=%d, total=%d\n' %
2434                     (shard_index, total_shards))
2435    sys.exit(1)
2436
2437  # Replace the original getTestCaseNames with one that returns
2438  # the test case names for this shard.
2439  delegate_get_names = base_loader.getTestCaseNames
2440
2441  bucket_iterator = itertools.cycle(xrange(total_shards))
2442
2443  def getShardedTestCaseNames(testCaseClass):
2444    filtered_names = []
2445    # We need to sort the list of tests in order to determine which tests this
2446    # shard is responsible for; however, it's important to preserve the order
2447    # returned by the base loader, e.g. in the case of randomized test ordering.
2448    ordered_names = delegate_get_names(testCaseClass)
2449    for testcase in sorted(ordered_names):
2450      bucket = next(bucket_iterator)
2451      if bucket == shard_index:
2452        filtered_names.append(testcase)
2453    return [x for x in ordered_names if x in filtered_names]
2454
2455  base_loader.getTestCaseNames = getShardedTestCaseNames
2456  return base_loader
2457
2458
2459# pylint: disable=line-too-long
2460def _run_and_get_tests_result(argv, args, kwargs, xml_test_runner_class):
2461  # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any], Type) -> unittest.TestResult
2462  # pylint: enable=line-too-long
2463  """Same as run_tests, except it returns the result instead of exiting."""
2464
2465  # The entry from kwargs overrides argv.
2466  argv = kwargs.pop('argv', argv)
2467
2468  # Set up test filtering if requested in environment.
2469  _setup_filtering(argv)
2470  # Set up --failfast as requested in environment
2471  _setup_test_runner_fail_fast(argv)
2472
2473  # Shard the (default or custom) loader if sharding is turned on.
2474  kwargs['testLoader'] = _setup_sharding(kwargs.get('testLoader', None))
2475
2476  # XML file name is based upon (sorted by priority):
2477  # --xml_output_file flag, XML_OUTPUT_FILE variable,
2478  # TEST_XMLOUTPUTDIR variable or RUNNING_UNDER_TEST_DAEMON variable.
2479  if not FLAGS.xml_output_file:
2480    FLAGS.xml_output_file = get_default_xml_output_filename()
2481  xml_output_file = FLAGS.xml_output_file
2482
2483  xml_buffer = None
2484  if xml_output_file:
2485    xml_output_dir = os.path.dirname(xml_output_file)
2486    if xml_output_dir and not os.path.isdir(xml_output_dir):
2487      try:
2488        os.makedirs(xml_output_dir)
2489      except OSError as e:
2490        # File exists error can occur with concurrent tests
2491        if e.errno != errno.EEXIST:
2492          raise
2493    # Fail early if we can't write to the XML output file. This is so that we
2494    # don't waste people's time running tests that will just fail anyways.
2495    with _open(xml_output_file, 'w'):
2496      pass
2497
2498    # We can reuse testRunner if it supports XML output (e. g. by inheriting
2499    # from xml_reporter.TextAndXMLTestRunner). Otherwise we need to use
2500    # xml_reporter.TextAndXMLTestRunner.
2501    if (kwargs.get('testRunner') is not None
2502        and not hasattr(kwargs['testRunner'], 'set_default_xml_stream')):
2503      sys.stderr.write('WARNING: XML_OUTPUT_FILE or --xml_output_file setting '
2504                       'overrides testRunner=%r setting (possibly from --pdb)'
2505                       % (kwargs['testRunner']))
2506      # Passing a class object here allows TestProgram to initialize
2507      # instances based on its kwargs and/or parsed command-line args.
2508      kwargs['testRunner'] = xml_test_runner_class
2509    if kwargs.get('testRunner') is None:
2510      kwargs['testRunner'] = xml_test_runner_class
2511    # Use an in-memory buffer (not backed by the actual file) to store the XML
2512    # report, because some tools modify the file (e.g., create a placeholder
2513    # with partial information, in case the test process crashes).
2514    xml_buffer = six.StringIO()
2515    kwargs['testRunner'].set_default_xml_stream(xml_buffer)  # pytype: disable=attribute-error
2516
2517    # If we've used a seed to randomize test case ordering, we want to record it
2518    # as a top-level attribute in the `testsuites` section of the XML output.
2519    randomize_ordering_seed = getattr(
2520        kwargs['testLoader'], '_randomize_ordering_seed', None)
2521    setter = getattr(kwargs['testRunner'], 'set_testsuites_property', None)
2522    if randomize_ordering_seed and setter:
2523      setter('test_randomize_ordering_seed', randomize_ordering_seed)
2524  elif kwargs.get('testRunner') is None:
2525    kwargs['testRunner'] = _pretty_print_reporter.TextTestRunner
2526
2527  if FLAGS.pdb_post_mortem:
2528    runner = kwargs['testRunner']
2529    # testRunner can be a class or an instance, which must be tested for
2530    # differently.
2531    # Overriding testRunner isn't uncommon, so only enable the debugging
2532    # integration if the runner claims it does; we don't want to accidentally
2533    # clobber something on the runner.
2534    if ((isinstance(runner, type) and
2535         issubclass(runner, _pretty_print_reporter.TextTestRunner)) or
2536        isinstance(runner, _pretty_print_reporter.TextTestRunner)):
2537      runner.run_for_debugging = True
2538
2539  # Make sure tmpdir exists.
2540  if not os.path.isdir(TEST_TMPDIR.value):
2541    try:
2542      os.makedirs(TEST_TMPDIR.value)
2543    except OSError as e:
2544      # Concurrent test might have created the directory.
2545      if e.errno != errno.EEXIST:
2546        raise
2547
2548  # Let unittest.TestProgram.__init__ do its own argv parsing, e.g. for '-v',
2549  # on argv, which is sys.argv without the command-line flags.
2550  kwargs['argv'] = argv
2551
2552  try:
2553    test_program = unittest.TestProgram(*args, **kwargs)
2554    return test_program.result
2555  finally:
2556    if xml_buffer:
2557      try:
2558        with _open(xml_output_file, 'w') as f:
2559          f.write(xml_buffer.getvalue())
2560      finally:
2561        xml_buffer.close()
2562
2563
2564def run_tests(argv, args, kwargs):  # pylint: disable=line-too-long
2565  # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any]) -> None
2566  # pylint: enable=line-too-long
2567  """Executes a set of Python unit tests.
2568
2569  Most users should call absltest.main() instead of run_tests.
2570
2571  Please note that run_tests should be called from app.run.
2572  Calling absltest.main() would ensure that.
2573
2574  Please note that run_tests is allowed to make changes to kwargs.
2575
2576  Args:
2577    argv: sys.argv with the command-line flags removed from the front, i.e. the
2578      argv with which app.run() has called __main__.main. It is passed to
2579      unittest.TestProgram.__init__(argv=), which does its own flag parsing. It
2580      is ignored if kwargs contains an argv entry.
2581    args: Positional arguments passed through to unittest.TestProgram.__init__.
2582    kwargs: Keyword arguments passed through to unittest.TestProgram.__init__.
2583  """
2584  result = _run_and_get_tests_result(
2585      argv, args, kwargs, xml_reporter.TextAndXMLTestRunner)
2586  sys.exit(not result.wasSuccessful())
2587
2588
2589def _get_qualname(cls):
2590  # type: (Type) -> Text
2591  if six.PY3:
2592    name = cls.__qualname__
2593  else:
2594    name = '{}.{}'.format(cls.__module__, cls.__name__)
2595  return name.replace('__main__.', '')
2596
2597
2598def _rmtree_ignore_errors(path):
2599  # type: (Text) -> None
2600  if os.path.isfile(path):
2601    try:
2602      os.unlink(path)
2603    except OSError:
2604      pass
2605  else:
2606    shutil.rmtree(path, ignore_errors=True)
2607
2608
2609def _makedirs_exist_ok(dir_name):
2610  # type: (Text) -> None
2611  if six.PY3:
2612    os.makedirs(dir_name, exist_ok=True)  # pylint: disable=unexpected-keyword-arg
2613  else:
2614    # Python 2 doesn't have the exist_ok arg, so we roll it ourselves
2615    try:
2616      os.makedirs(dir_name)
2617    except OSError as e:
2618      if e.errno != errno.EEXIST:
2619        raise
2620
2621
2622def _get_first_part(path):
2623  # type: (Text) -> Text
2624  parts = path.split(os.sep, 1)
2625  return parts[0]
2626