1# 2# Copyright (c) 2010 Testrepository Contributors 3# 4# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 5# license at the users choice. A copy of both licenses are available in the 6# project source as Apache-2.0 and BSD. You may not use this file except in 7# compliance with one of these two licences. 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# license you chose for the specific language governing permissions and 13# limitations under that license. 14 15"""The test command that test repository knows how to run.""" 16 17from extras import ( 18 try_import, 19 try_imports, 20 ) 21 22from collections import defaultdict 23ConfigParser = try_imports(['ConfigParser', 'configparser']) 24import io 25import itertools 26import operator 27import os.path 28import re 29import subprocess 30import sys 31import tempfile 32import multiprocessing 33from textwrap import dedent 34 35from fixtures import Fixture 36v2 = try_import('subunit.v2') 37 38from testrepository import results 39from testrepository.testlist import ( 40 parse_enumeration, 41 write_list, 42 ) 43 44testrconf_help = dedent(""" 45 Configuring via .testr.conf: 46 --- 47 [DEFAULT] 48 test_command=foo $IDOPTION 49 test_id_option=--bar $IDFILE 50 --- 51 will cause 'testr run' to run 'foo' to execute tests, and 52 'testr run --failing' will cause 'foo --bar failing.list ' to be run to 53 execute tests. Shell variables are expanded in these commands on platforms 54 that have a shell. 55 56 The full list of options and variables for .testr.conf: 57 * filter_tags -- a list of tags which should be used to filter test counts. 58 This is useful for stripping out non-test results from the subunit stream 59 such as Zope test layers. These filtered items are still considered for 60 test failures. 61 * test_command -- command line to run to execute tests. 62 * test_id_option -- the value to substitute into test_command when specific 63 test ids should be run. 64 * test_id_list_default -- the value to use for $IDLIST when no specific 65 test ids are being run. 66 * test_list_option -- the option to use to cause the test runner to report 67 on the tests it would run, rather than running them. When supplied the 68 test_command should output on stdout all the test ids that would have 69 been run if every other option and argument was honoured, one per line. 70 This is required for parallel testing, and is substituted into $LISTOPT. 71 * test_run_concurrency -- Optional call out to establish concurrency. 72 Should return one line containing the number of concurrent test runner 73 processes to run. 74 * instance_provision -- provision one or more test run environments. 75 Accepts $INSTANCE_COUNT for the number of instances desired. 76 * instance_execute -- execute a test runner process in a given environment. 77 Accepts $INSTANCE_ID, $FILES and $COMMAND. Paths in $FILES should be 78 synchronised into the test runner environment filesystem. $COMMAND can 79 be adjusted if the paths are synched with different names. 80 * instance_dispose -- dispose of one or more test running environments. 81 Accepts $INSTANCE_IDS. 82 * group_regex -- If set group tests by the matched section of the test id. 83 * $IDOPTION -- the variable to use to trigger running some specific tests. 84 * $IDFILE -- A file created before the test command is run and deleted 85 afterwards which contains a list of test ids, one per line. This can 86 handle test ids with emedded whitespace. 87 * $IDLIST -- A list of the test ids to run, separated by spaces. IDLIST 88 defaults to an empty string when no test ids are known and no explicit 89 default is provided. This will not handle test ids with spaces. 90 91 See the testrepository manual for example .testr.conf files in different 92 programming languages. 93 94 """) 95 96 97class CallWhenProcFinishes(object): 98 """Convert a process object to trigger a callback when returncode is set. 99 100 This just wraps the entire object and when the returncode attribute access 101 finds a set value, calls the callback. 102 """ 103 104 def __init__(self, process, callback): 105 """Adapt process 106 107 :param process: A subprocess.Popen object. 108 :param callback: The process to call when the process completes. 109 """ 110 self._proc = process 111 self._callback = callback 112 self._done = False 113 114 @property 115 def stdin(self): 116 return self._proc.stdin 117 118 @property 119 def stdout(self): 120 return self._proc.stdout 121 122 @property 123 def stderr(self): 124 return self._proc.stderr 125 126 @property 127 def returncode(self): 128 result = self._proc.returncode 129 if not self._done and result is not None: 130 self._done = True 131 self._callback() 132 return result 133 134 def wait(self): 135 return self._proc.wait() 136 137 138compiled_re_type = type(re.compile('')) 139 140class TestListingFixture(Fixture): 141 """Write a temporary file to disk with test ids in it.""" 142 143 def __init__(self, test_ids, cmd_template, listopt, idoption, ui, 144 repository, parallel=True, listpath=None, parser=None, 145 test_filters=None, instance_source=None, group_callback=None): 146 """Create a TestListingFixture. 147 148 :param test_ids: The test_ids to use. May be None indicating that 149 no ids are known and they should be discovered by listing or 150 configuration if they must be known to run tests. Test ids are 151 needed to run tests when filtering or partitioning is needed: if 152 the run concurrency is > 1 partitioning is needed, and filtering is 153 needed if the user has passed in filters. 154 :param cmd_template: string to be filled out with 155 IDFILE. 156 :param listopt: Option to substitute into LISTOPT to cause test listing 157 to take place. 158 :param idoption: Option to substitutde into cmd when supplying any test 159 ids. 160 :param ui: The UI in use. 161 :param repository: The repository to query for test times, if needed. 162 :param parallel: If not True, prohibit parallel use : used to implement 163 --parallel run recursively. 164 :param listpath: The file listing path to use. If None, a unique path 165 is created. 166 :param parser: An options parser for reading options from. 167 :param test_filters: An optional list of test filters to apply. Each 168 filter should be a string suitable for passing to re.compile. 169 filters are applied using search() rather than match(), so if 170 anchoring is needed it should be included in the regex. 171 The test ids used for executing are the union of all the individual 172 filters: to take the intersection instead, craft a single regex that 173 matches all your criteria. Filters are automatically applied by 174 run_tests(), or can be applied by calling filter_tests(test_ids). 175 :param instance_source: A source of test run instances. Must support 176 obtain_instance(max_concurrency) -> id and release_instance(id) 177 calls. 178 :param group_callback: If supplied, should be a function that accepts a 179 test id and returns a group id. A group id is an arbitrary value 180 used as a dictionary key in the scheduler. All test ids with the 181 same group id are scheduled onto the same backend test process. 182 """ 183 self.test_ids = test_ids 184 self.template = cmd_template 185 self.listopt = listopt 186 self.idoption = idoption 187 self.ui = ui 188 self.repository = repository 189 self.parallel = parallel 190 self._listpath = listpath 191 self._parser = parser 192 self.test_filters = test_filters 193 self._group_callback = group_callback 194 self._instance_source = instance_source 195 196 def setUp(self): 197 super(TestListingFixture, self).setUp() 198 variable_regex = '\$(IDOPTION|IDFILE|IDLIST|LISTOPT)' 199 variables = {} 200 list_variables = {'LISTOPT': self.listopt} 201 cmd = self.template 202 try: 203 default_idstr = self._parser.get('DEFAULT', 'test_id_list_default') 204 list_variables['IDLIST'] = default_idstr 205 # In theory we should also support casting this into IDFILE etc - 206 # needs this horrible class refactored. 207 except ConfigParser.NoOptionError as e: 208 if e.message != "No option 'test_id_list_default' in section: 'DEFAULT'": 209 raise 210 default_idstr = None 211 def list_subst(match): 212 return list_variables.get(match.groups(1)[0], '') 213 self.list_cmd = re.sub(variable_regex, list_subst, cmd) 214 nonparallel = (not self.parallel or not 215 getattr(self.ui, 'options', None) or not 216 getattr(self.ui.options, 'parallel', None)) 217 if nonparallel: 218 self.concurrency = 1 219 else: 220 self.concurrency = self.ui.options.concurrency 221 if not self.concurrency: 222 self.concurrency = self.callout_concurrency() 223 if not self.concurrency: 224 self.concurrency = self.local_concurrency() 225 if not self.concurrency: 226 self.concurrency = 1 227 if self.test_ids is None: 228 if self.concurrency == 1: 229 if default_idstr: 230 self.test_ids = default_idstr.split() 231 if self.concurrency != 1 or self.test_filters is not None: 232 # Have to be able to tell each worker what to run / filter 233 # tests. 234 self.test_ids = self.list_tests() 235 if self.test_ids is None: 236 # No test ids to supply to the program. 237 self.list_file_name = None 238 name = '' 239 idlist = '' 240 else: 241 self.test_ids = self.filter_tests(self.test_ids) 242 name = self.make_listfile() 243 variables['IDFILE'] = name 244 idlist = ' '.join(self.test_ids) 245 variables['IDLIST'] = idlist 246 def subst(match): 247 return variables.get(match.groups(1)[0], '') 248 if self.test_ids is None: 249 # No test ids, no id option. 250 idoption = '' 251 else: 252 idoption = re.sub(variable_regex, subst, self.idoption) 253 variables['IDOPTION'] = idoption 254 self.cmd = re.sub(variable_regex, subst, cmd) 255 256 def make_listfile(self): 257 name = None 258 try: 259 if self._listpath: 260 name = self._listpath 261 stream = open(name, 'wb') 262 else: 263 fd, name = tempfile.mkstemp() 264 stream = os.fdopen(fd, 'wb') 265 self.list_file_name = name 266 write_list(stream, self.test_ids) 267 stream.close() 268 except: 269 if name: 270 os.unlink(name) 271 raise 272 self.addCleanup(os.unlink, name) 273 return name 274 275 def filter_tests(self, test_ids): 276 """Filter test_ids by the test_filters. 277 278 :return: A list of test ids. 279 """ 280 if self.test_filters is None: 281 return test_ids 282 filters = list(map(re.compile, self.test_filters)) 283 def include(test_id): 284 for pred in filters: 285 if pred.search(test_id): 286 return True 287 return list(filter(include, test_ids)) 288 289 def list_tests(self): 290 """List the tests returned by list_cmd. 291 292 :return: A list of test ids. 293 """ 294 if '$LISTOPT' not in self.template: 295 raise ValueError("LISTOPT not configured in .testr.conf") 296 instance, list_cmd = self._per_instance_command(self.list_cmd) 297 try: 298 self.ui.output_values([('running', list_cmd)]) 299 run_proc = self.ui.subprocess_Popen(list_cmd, shell=True, 300 stdout=subprocess.PIPE, stdin=subprocess.PIPE) 301 out, err = run_proc.communicate() 302 if run_proc.returncode != 0: 303 if v2 is not None: 304 new_out = io.BytesIO() 305 v2.ByteStreamToStreamResult(io.BytesIO(out), 'stdout').run( 306 results.CatFiles(new_out)) 307 out = new_out.getvalue() 308 self.ui.output_stream(io.BytesIO(out)) 309 self.ui.output_stream(io.BytesIO(err)) 310 raise ValueError( 311 "Non-zero exit code (%d) from test listing." 312 % (run_proc.returncode)) 313 ids = parse_enumeration(out) 314 return ids 315 finally: 316 if instance: 317 self._instance_source.release_instance(instance) 318 319 def _per_instance_command(self, cmd): 320 """Customise cmd to with an instance-id. 321 322 :param concurrency: The number of instances to ask for (used to avoid 323 death-by-1000 cuts of latency. 324 """ 325 if self._instance_source is None: 326 return None, cmd 327 instance = self._instance_source.obtain_instance(self.concurrency) 328 if instance is not None: 329 try: 330 instance_prefix = self._parser.get( 331 'DEFAULT', 'instance_execute') 332 variables = { 333 'INSTANCE_ID': instance.decode('utf8'), 334 'COMMAND': cmd, 335 # --list-tests cannot use FILES, so handle it being unset. 336 'FILES': getattr(self, 'list_file_name', None) or '', 337 } 338 variable_regex = '\$(INSTANCE_ID|COMMAND|FILES)' 339 def subst(match): 340 return variables.get(match.groups(1)[0], '') 341 cmd = re.sub(variable_regex, subst, instance_prefix) 342 except ConfigParser.NoOptionError: 343 # Per-instance execution environment not configured. 344 pass 345 return instance, cmd 346 347 def run_tests(self): 348 """Run the tests defined by the command and ui. 349 350 :return: A list of spawned processes. 351 """ 352 result = [] 353 test_ids = self.test_ids 354 if self.concurrency == 1 and (test_ids is None or test_ids): 355 # Have to customise cmd here, as instances are allocated 356 # just-in-time. XXX: Indicates this whole region needs refactoring. 357 instance, cmd = self._per_instance_command(self.cmd) 358 self.ui.output_values([('running', cmd)]) 359 run_proc = self.ui.subprocess_Popen(cmd, shell=True, 360 stdout=subprocess.PIPE, stdin=subprocess.PIPE) 361 # Prevent processes stalling if they read from stdin; we could 362 # pass this through in future, but there is no point doing that 363 # until we have a working can-run-debugger-inline story. 364 run_proc.stdin.close() 365 if instance: 366 return [CallWhenProcFinishes(run_proc, 367 lambda:self._instance_source.release_instance(instance))] 368 else: 369 return [run_proc] 370 test_id_groups = self.partition_tests(test_ids, self.concurrency) 371 for test_ids in test_id_groups: 372 if not test_ids: 373 # No tests in this partition 374 continue 375 fixture = self.useFixture(TestListingFixture(test_ids, 376 self.template, self.listopt, self.idoption, self.ui, 377 self.repository, parallel=False, parser=self._parser, 378 instance_source=self._instance_source)) 379 result.extend(fixture.run_tests()) 380 return result 381 382 def partition_tests(self, test_ids, concurrency): 383 """Parition test_ids by concurrency. 384 385 Test durations from the repository are used to get partitions which 386 have roughly the same expected runtime. New tests - those with no 387 recorded duration - are allocated in round-robin fashion to the 388 partitions created using test durations. 389 390 :return: A list where each element is a distinct subset of test_ids, 391 and the union of all the elements is equal to set(test_ids). 392 """ 393 partitions = [list() for i in range(concurrency)] 394 timed_partitions = [[0.0, partition] for partition in partitions] 395 time_data = self.repository.get_test_times(test_ids) 396 timed_tests = time_data['known'] 397 unknown_tests = time_data['unknown'] 398 # Group tests: generate group_id -> test_ids. 399 group_ids = defaultdict(list) 400 if self._group_callback is None: 401 group_callback = lambda _:None 402 else: 403 group_callback = self._group_callback 404 for test_id in test_ids: 405 group_id = group_callback(test_id) or test_id 406 group_ids[group_id].append(test_id) 407 # Time groups: generate three sets of groups: 408 # - fully timed dict(group_id -> time), 409 # - partially timed dict(group_id -> time) and 410 # - unknown (set of group_id) 411 # We may in future treat partially timed different for scheduling, but 412 # at least today we just schedule them after the fully timed groups. 413 timed = {} 414 partial = {} 415 unknown = [] 416 for group_id, group_tests in group_ids.items(): 417 untimed_ids = unknown_tests.intersection(group_tests) 418 group_time = sum([timed_tests[test_id] 419 for test_id in untimed_ids.symmetric_difference(group_tests)]) 420 if not untimed_ids: 421 timed[group_id] = group_time 422 elif group_time: 423 partial[group_id] = group_time 424 else: 425 unknown.append(group_id) 426 # Scheduling is NP complete in general, so we avoid aiming for 427 # perfection. A quick approximation that is sufficient for our general 428 # needs: 429 # sort the groups by time 430 # allocate to partitions by putting each group in to the partition with 431 # the current (lowest time, shortest length[in tests]) 432 def consume_queue(groups): 433 queue = sorted( 434 groups.items(), key=operator.itemgetter(1), reverse=True) 435 for group_id, duration in queue: 436 timed_partitions[0][0] = timed_partitions[0][0] + duration 437 timed_partitions[0][1].extend(group_ids[group_id]) 438 timed_partitions.sort(key=lambda item:(item[0], len(item[1]))) 439 consume_queue(timed) 440 consume_queue(partial) 441 # Assign groups with entirely unknown times in round robin fashion to 442 # the partitions. 443 for partition, group_id in zip(itertools.cycle(partitions), unknown): 444 partition.extend(group_ids[group_id]) 445 return partitions 446 447 def callout_concurrency(self): 448 """Callout for user defined concurrency.""" 449 try: 450 concurrency_cmd = self._parser.get( 451 'DEFAULT', 'test_run_concurrency') 452 except ConfigParser.NoOptionError: 453 return None 454 run_proc = self.ui.subprocess_Popen(concurrency_cmd, shell=True, 455 stdout=subprocess.PIPE, stdin=subprocess.PIPE) 456 out, err = run_proc.communicate() 457 if run_proc.returncode: 458 raise ValueError( 459 "test_run_concurrency failed: exit code %d, stderr='%s'" % ( 460 run_proc.returncode, err.decode('utf8', 'replace'))) 461 return int(out.strip()) 462 463 def local_concurrency(self): 464 try: 465 return multiprocessing.cpu_count() 466 except NotImplementedError: 467 # No concurrency logic known. 468 return None 469 470 471class TestCommand(Fixture): 472 """Represents the test command defined in .testr.conf. 473 474 :ivar run_factory: The fixture to use to execute a command. 475 :ivar oldschool: Use failing.list rather than a unique file path. 476 477 TestCommand is a Fixture. Many uses of it will not require it to be setUp, 478 but calling get_run_command does require it: the fixture state is used to 479 track test environment instances, which are disposed of when cleanUp 480 happens. This is not done per-run-command, because test bisection (amongst 481 other things) uses multiple get_run_command configurations. 482 """ 483 484 run_factory = TestListingFixture 485 oldschool = False 486 487 def __init__(self, ui, repository): 488 """Create a TestCommand. 489 490 :param ui: A testrepository.ui.UI object which is used to obtain the 491 location of the .testr.conf. 492 :param repository: A testrepository.repository.Repository used for 493 determining test times when partitioning tests. 494 """ 495 super(TestCommand, self).__init__() 496 self.ui = ui 497 self.repository = repository 498 self._instances = None 499 self._allocated_instances = None 500 501 def setUp(self): 502 super(TestCommand, self).setUp() 503 self._instances = set() 504 self._allocated_instances = set() 505 self.addCleanup(self._dispose_instances) 506 507 def _dispose_instances(self): 508 instances = self._instances 509 if instances is None: 510 return 511 self._instances = None 512 self._allocated_instances = None 513 try: 514 dispose_cmd = self.get_parser().get('DEFAULT', 'instance_dispose') 515 except (ValueError, ConfigParser.NoOptionError): 516 return 517 variable_regex = '\$INSTANCE_IDS' 518 dispose_cmd = re.sub(variable_regex, ' '.join(sorted(instance.decode('utf') for instance in instances)), 519 dispose_cmd) 520 self.ui.output_values([('running', dispose_cmd)]) 521 run_proc = self.ui.subprocess_Popen(dispose_cmd, shell=True) 522 run_proc.communicate() 523 if run_proc.returncode: 524 raise ValueError('Disposing of instances failed, return %d' % 525 run_proc.returncode) 526 527 def get_parser(self): 528 """Get a parser with the .testr.conf in it.""" 529 parser = ConfigParser.ConfigParser() 530 # This possibly should push down into UI. 531 if self.ui.here == 'memory:': 532 return parser 533 if not parser.read(os.path.join(self.ui.here, '.testr.conf')): 534 raise ValueError("No .testr.conf config file") 535 return parser 536 537 def get_run_command(self, test_ids=None, testargs=(), test_filters=None): 538 """Get the command that would be run to run tests. 539 540 See TestListingFixture for the definition of test_ids and test_filters. 541 """ 542 if self._instances is None: 543 raise TypeError('TestCommand not setUp') 544 parser = self.get_parser() 545 try: 546 command = parser.get('DEFAULT', 'test_command') 547 except ConfigParser.NoOptionError as e: 548 if e.message != "No option 'test_command' in section: 'DEFAULT'": 549 raise 550 raise ValueError("No test_command option present in .testr.conf") 551 elements = [command] + list(testargs) 552 cmd = ' '.join(elements) 553 idoption = '' 554 if '$IDOPTION' in command: 555 # IDOPTION is used, we must have it configured. 556 try: 557 idoption = parser.get('DEFAULT', 'test_id_option') 558 except ConfigParser.NoOptionError as e: 559 if e.message != "No option 'test_id_option' in section: 'DEFAULT'": 560 raise 561 raise ValueError("No test_id_option option present in .testr.conf") 562 listopt = '' 563 if '$LISTOPT' in command: 564 # LISTOPT is used, test_list_option must be configured. 565 try: 566 listopt = parser.get('DEFAULT', 'test_list_option') 567 except ConfigParser.NoOptionError as e: 568 if e.message != "No option 'test_list_option' in section: 'DEFAULT'": 569 raise 570 raise ValueError("No test_list_option option present in .testr.conf") 571 try: 572 group_regex = parser.get('DEFAULT', 'group_regex') 573 except ConfigParser.NoOptionError: 574 group_regex = None 575 if group_regex: 576 def group_callback(test_id, regex=re.compile(group_regex)): 577 match = regex.match(test_id) 578 if match: 579 return match.group(0) 580 else: 581 group_callback = None 582 if self.oldschool: 583 listpath = os.path.join(self.ui.here, 'failing.list') 584 result = self.run_factory(test_ids, cmd, listopt, idoption, 585 self.ui, self.repository, listpath=listpath, parser=parser, 586 test_filters=test_filters, instance_source=self, 587 group_callback=group_callback) 588 else: 589 result = self.run_factory(test_ids, cmd, listopt, idoption, 590 self.ui, self.repository, parser=parser, 591 test_filters=test_filters, instance_source=self, 592 group_callback=group_callback) 593 return result 594 595 def get_filter_tags(self): 596 parser = self.get_parser() 597 try: 598 tags = parser.get('DEFAULT', 'filter_tags') 599 except ConfigParser.NoOptionError as e: 600 if e.message != "No option 'filter_tags' in section: 'DEFAULT'": 601 raise 602 return set() 603 return set([tag.strip() for tag in tags.split()]) 604 605 def obtain_instance(self, concurrency): 606 """If possible, get one or more test run environment instance ids. 607 608 Note this is not threadsafe: calling it from multiple threads would 609 likely result in shared results. 610 """ 611 while len(self._instances) < concurrency: 612 try: 613 cmd = self.get_parser().get('DEFAULT', 'instance_provision') 614 except ConfigParser.NoOptionError: 615 # Instance allocation not configured 616 return None 617 variable_regex = '\$INSTANCE_COUNT' 618 cmd = re.sub(variable_regex, 619 str(concurrency - len(self._instances)), cmd) 620 self.ui.output_values([('running', cmd)]) 621 proc = self.ui.subprocess_Popen( 622 cmd, shell=True, stdout=subprocess.PIPE) 623 out, _ = proc.communicate() 624 if proc.returncode: 625 raise ValueError('Provisioning instances failed, return %d' % 626 proc.returncode) 627 new_instances = set([item.strip() for item in out.split()]) 628 self._instances.update(new_instances) 629 # Cached first. 630 available_instances = self._instances - self._allocated_instances 631 # We only ask for instances when one should be available. 632 result = available_instances.pop() 633 self._allocated_instances.add(result) 634 return result 635 636 def release_instance(self, instance_id): 637 """Return instance_ids to the pool for reuse.""" 638 self._allocated_instances.remove(instance_id) 639