1"""Common logic for the coverage subcommand.""" 2from __future__ import (absolute_import, division, print_function) 3__metaclass__ = type 4 5import os 6import re 7 8from .. import types as t 9 10from ..encoding import ( 11 to_bytes, 12) 13 14from ..io import ( 15 open_binary_file, 16 read_json_file, 17) 18 19from ..util import ( 20 ApplicationError, 21 common_environment, 22 display, 23 ANSIBLE_TEST_DATA_ROOT, 24) 25 26from ..util_common import ( 27 intercept_command, 28 ResultType, 29) 30 31from ..config import ( 32 EnvironmentConfig, 33) 34 35from ..executor import ( 36 Delegate, 37 install_command_requirements, 38) 39 40from .. target import ( 41 walk_module_targets, 42) 43 44from ..data import ( 45 data_context, 46) 47 48if t.TYPE_CHECKING: 49 import coverage as coverage_module 50 51COVERAGE_GROUPS = ('command', 'target', 'environment', 'version') 52COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc') 53COVERAGE_OUTPUT_FILE_NAME = 'coverage' 54 55 56class CoverageConfig(EnvironmentConfig): 57 """Configuration for the coverage command.""" 58 def __init__(self, args): # type: (t.Any) -> None 59 super(CoverageConfig, self).__init__(args, 'coverage') 60 61 self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set() # type: t.FrozenSet[str] 62 self.all = args.all if 'all' in args else False # type: bool 63 self.stub = args.stub if 'stub' in args else False # type: bool 64 self.export = args.export if 'export' in args else None # type: str 65 self.coverage = False # temporary work-around to support intercept_command in cover.py 66 67 68def initialize_coverage(args): # type: (CoverageConfig) -> coverage_module 69 """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available.""" 70 if args.delegate: 71 raise Delegate() 72 73 if args.requirements: 74 install_command_requirements(args) 75 76 try: 77 import coverage 78 except ImportError: 79 coverage = None 80 81 if not coverage: 82 raise ApplicationError('You must install the "coverage" python module to use this command.') 83 84 coverage_version_string = coverage.__version__ 85 coverage_version = tuple(int(v) for v in coverage_version_string.split('.')) 86 87 min_version = (4, 2) 88 max_version = (5, 0) 89 90 supported_version = True 91 recommended_version = '4.5.4' 92 93 if coverage_version < min_version or coverage_version >= max_version: 94 supported_version = False 95 96 if not supported_version: 97 raise ApplicationError('Version %s of "coverage" is not supported. Version %s is known to work and is recommended.' % ( 98 coverage_version_string, recommended_version)) 99 100 return coverage 101 102 103def run_coverage(args, output_file, command, cmd): # type: (CoverageConfig, str, str, t.List[str]) -> None 104 """Run the coverage cli tool with the specified options.""" 105 env = common_environment() 106 env.update(dict(COVERAGE_FILE=output_file)) 107 108 cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd 109 110 intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True) 111 112 113def get_python_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str] 114 """Return the list of Python coverage file paths.""" 115 return get_coverage_files('python', path) 116 117 118def get_powershell_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str] 119 """Return the list of PowerShell coverage file paths.""" 120 return get_coverage_files('powershell', path) 121 122 123def get_coverage_files(language, path=None): # type: (str, t.Optional[str]) -> t.List[str] 124 """Return the list of coverage file paths for the given language.""" 125 coverage_dir = path or ResultType.COVERAGE.path 126 coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir) 127 if '=coverage.' in f and '=%s' % language in f] 128 129 return coverage_files 130 131 132def get_collection_path_regexes(): # type: () -> t.Tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]] 133 """Return a pair of regexes used for identifying and manipulating collection paths.""" 134 if data_context().content.collection: 135 collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory) 136 collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory) 137 else: 138 collection_search_re = None 139 collection_sub_re = None 140 141 return collection_search_re, collection_sub_re 142 143 144def get_python_modules(): # type: () -> t.Dict[str, str] 145 """Return a dictionary of Ansible module names and their paths.""" 146 return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py')) 147 148 149def enumerate_python_arcs( 150 path, # type: str 151 coverage, # type: coverage_module 152 modules, # type: t.Dict[str, str] 153 collection_search_re, # type: t.Optional[t.Pattern] 154 collection_sub_re, # type: t.Optional[t.Pattern] 155): # type: (...) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]]] 156 """Enumerate Python code coverage arcs in the given file.""" 157 if os.path.getsize(path) == 0: 158 display.warning('Empty coverage file: %s' % path, verbosity=2) 159 return 160 161 original = coverage.CoverageData() 162 163 try: 164 original.read_file(path) 165 except Exception as ex: # pylint: disable=locally-disabled, broad-except 166 with open_binary_file(path) as file_obj: 167 header = file_obj.read(6) 168 169 if header == b'SQLite': 170 display.error('File created by "coverage" 5.0+: %s' % os.path.relpath(path)) 171 else: 172 display.error(u'%s' % ex) 173 174 return 175 176 for filename in original.measured_files(): 177 arcs = original.arcs(filename) 178 179 if not arcs: 180 # This is most likely due to using an unsupported version of coverage. 181 display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path)) 182 continue 183 184 filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) 185 186 if not filename: 187 continue 188 189 yield filename, set(arcs) 190 191 192def enumerate_powershell_lines( 193 path, # type: str 194 collection_search_re, # type: t.Optional[t.Pattern] 195 collection_sub_re, # type: t.Optional[t.Pattern] 196): # type: (...) -> t.Generator[t.Tuple[str, t.Dict[int, int]]] 197 """Enumerate PowerShell code coverage lines in the given file.""" 198 if os.path.getsize(path) == 0: 199 display.warning('Empty coverage file: %s' % path, verbosity=2) 200 return 201 202 try: 203 coverage_run = read_json_file(path) 204 except Exception as ex: # pylint: disable=locally-disabled, broad-except 205 display.error(u'%s' % ex) 206 return 207 208 for filename, hits in coverage_run.items(): 209 filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) 210 211 if not filename: 212 continue 213 214 if isinstance(hits, dict) and not hits.get('Line'): 215 # Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage. 216 # This format differs from the more verbose format of raw coverage data from the remote Windows hosts. 217 hits = dict((int(key), value) for key, value in hits.items()) 218 219 yield filename, hits 220 continue 221 222 # PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that 223 if not isinstance(hits, list): 224 hits = [hits] 225 226 hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit) 227 228 yield filename, hits 229 230 231def sanitize_filename( 232 filename, # type: str 233 modules=None, # type: t.Optional[t.Dict[str, str]] 234 collection_search_re=None, # type: t.Optional[t.Pattern] 235 collection_sub_re=None, # type: t.Optional[t.Pattern] 236): # type: (...) -> t.Optional[str] 237 """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid.""" 238 ansible_path = os.path.abspath('lib/ansible/') + '/' 239 root_path = data_context().content.root + '/' 240 integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep 241 242 if modules is None: 243 modules = {} 244 245 if '/ansible_modlib.zip/ansible/' in filename: 246 # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier. 247 new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename) 248 display.info('%s -> %s' % (filename, new_name), verbosity=3) 249 filename = new_name 250 elif collection_search_re and collection_search_re.search(filename): 251 new_name = os.path.abspath(collection_sub_re.sub('', filename)) 252 display.info('%s -> %s' % (filename, new_name), verbosity=3) 253 filename = new_name 254 elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename): 255 # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later. 256 new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename) 257 display.info('%s -> %s' % (filename, new_name), verbosity=3) 258 filename = new_name 259 elif '/ansible_module_' in filename: 260 # Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier. 261 module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename) 262 if module_name not in modules: 263 display.warning('Skipping coverage of unknown module: %s' % module_name) 264 return None 265 new_name = os.path.abspath(modules[module_name]) 266 display.info('%s -> %s' % (filename, new_name), verbosity=3) 267 filename = new_name 268 elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename): 269 # Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later. 270 # AnsiballZ versions using zipimporter will match the `.zip` portion of the regex. 271 # AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex. 272 module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$', 273 '\\g<module>', filename).rstrip('_') 274 if module_name not in modules: 275 display.warning('Skipping coverage of unknown module: %s' % module_name) 276 return None 277 new_name = os.path.abspath(modules[module_name]) 278 display.info('%s -> %s' % (filename, new_name), verbosity=3) 279 filename = new_name 280 elif re.search('^(/.*?)?/root/ansible/', filename): 281 # Rewrite the path of code running on a remote host or in a docker container as root. 282 new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename) 283 display.info('%s -> %s' % (filename, new_name), verbosity=3) 284 filename = new_name 285 elif integration_temp_path in filename: 286 # Rewrite the path of code running from an integration test temporary directory. 287 new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename) 288 display.info('%s -> %s' % (filename, new_name), verbosity=3) 289 filename = new_name 290 291 filename = os.path.abspath(filename) # make sure path is absolute (will be relative if previously exported) 292 293 return filename 294 295 296class PathChecker: 297 """Checks code coverage paths to verify they are valid and reports on the findings.""" 298 def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None 299 self.args = args 300 self.collection_search_re = collection_search_re 301 self.invalid_paths = [] 302 self.invalid_path_chars = 0 303 304 def check_path(self, path): # type: (str) -> bool 305 """Return True if the given coverage path is valid, otherwise display a warning and return False.""" 306 if os.path.isfile(to_bytes(path)): 307 return True 308 309 if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py': 310 # the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk 311 # coverage is still reported for these non-existent files, but warnings are not needed 312 return False 313 314 self.invalid_paths.append(path) 315 self.invalid_path_chars += len(path) 316 317 if self.args.verbosity > 1: 318 display.warning('Invalid coverage path: %s' % path) 319 320 return False 321 322 def report(self): # type: () -> None 323 """Display a warning regarding invalid paths if any were found.""" 324 if self.invalid_paths: 325 display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths))) 326