1#
2# Copyright (c) 2010 Testrepository Contributors
3#
4# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
5# license at the users choice. A copy of both licenses are available in the
6# project source as Apache-2.0 and BSD. You may not use this file except in
7# compliance with one of these two licences.
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
12# license you chose for the specific language governing permissions and
13# limitations under that license.
14
15"""The test command that test repository knows how to run."""
16
17from extras import (
18    try_import,
19    try_imports,
20    )
21
22from collections import defaultdict
23ConfigParser = try_imports(['ConfigParser', 'configparser'])
24import io
25import itertools
26import operator
27import os.path
28import re
29import subprocess
30import sys
31import tempfile
32import multiprocessing
33from textwrap import dedent
34
35from fixtures import Fixture
36v2 = try_import('subunit.v2')
37
38from testrepository import results
39from testrepository.testlist import (
40    parse_enumeration,
41    write_list,
42    )
43
44testrconf_help = dedent("""
45    Configuring via .testr.conf:
46    ---
47    [DEFAULT]
48    test_command=foo $IDOPTION
49    test_id_option=--bar $IDFILE
50    ---
51    will cause 'testr run' to run 'foo' to execute tests, and
52    'testr run --failing' will cause 'foo --bar failing.list ' to be run to
53    execute tests. Shell variables are expanded in these commands on platforms
54    that have a shell.
55
56    The full list of options and variables for .testr.conf:
57    * filter_tags -- a list of tags which should be used to filter test counts.
58      This is useful for stripping out non-test results from the subunit stream
59      such as Zope test layers. These filtered items are still considered for
60      test failures.
61    * test_command -- command line to run to execute tests.
62    * test_id_option -- the value to substitute into test_command when specific
63      test ids should be run.
64    * test_id_list_default -- the value to use for $IDLIST when no specific
65      test ids are being run.
66    * test_list_option -- the option to use to cause the test runner to report
67      on the tests it would run, rather than running them. When supplied the
68      test_command should output on stdout all the test ids that would have
69      been run if every other option and argument was honoured, one per line.
70      This is required for parallel testing, and is substituted into $LISTOPT.
71    * test_run_concurrency -- Optional call out to establish concurrency.
72      Should return one line containing the number of concurrent test runner
73      processes to run.
74    * instance_provision -- provision one or more test run environments.
75      Accepts $INSTANCE_COUNT for the number of instances desired.
76    * instance_execute -- execute a test runner process in a given environment.
77      Accepts $INSTANCE_ID, $FILES and $COMMAND. Paths in $FILES should be
78      synchronised into the test runner environment filesystem. $COMMAND can
79      be adjusted if the paths are synched with different names.
80    * instance_dispose -- dispose of one or more test running environments.
81      Accepts $INSTANCE_IDS.
82    * group_regex -- If set group tests by the matched section of the test id.
83    * $IDOPTION -- the variable to use to trigger running some specific tests.
84    * $IDFILE -- A file created before the test command is run and deleted
85      afterwards which contains a list of test ids, one per line. This can
86      handle test ids with emedded whitespace.
87    * $IDLIST -- A list of the test ids to run, separated by spaces. IDLIST
88      defaults to an empty string when no test ids are known and no explicit
89      default is provided. This will not handle test ids with spaces.
90
91    See the testrepository manual for example .testr.conf files in different
92    programming languages.
93
94    """)
95
96
97class CallWhenProcFinishes(object):
98    """Convert a process object to trigger a callback when returncode is set.
99
100    This just wraps the entire object and when the returncode attribute access
101    finds a set value, calls the callback.
102    """
103
104    def __init__(self, process, callback):
105        """Adapt process
106
107        :param process: A subprocess.Popen object.
108        :param callback: The process to call when the process completes.
109        """
110        self._proc = process
111        self._callback = callback
112        self._done = False
113
114    @property
115    def stdin(self):
116        return self._proc.stdin
117
118    @property
119    def stdout(self):
120        return self._proc.stdout
121
122    @property
123    def stderr(self):
124        return self._proc.stderr
125
126    @property
127    def returncode(self):
128        result = self._proc.returncode
129        if not self._done and result is not None:
130            self._done = True
131            self._callback()
132        return result
133
134    def wait(self):
135        return self._proc.wait()
136
137
138compiled_re_type = type(re.compile(''))
139
140class TestListingFixture(Fixture):
141    """Write a temporary file to disk with test ids in it."""
142
143    def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
144        repository, parallel=True, listpath=None, parser=None,
145        test_filters=None, instance_source=None, group_callback=None):
146        """Create a TestListingFixture.
147
148        :param test_ids: The test_ids to use. May be None indicating that
149            no ids are known and they should be discovered by listing or
150            configuration if they must be known to run tests. Test ids are
151            needed to run tests when filtering or partitioning is needed: if
152            the run concurrency is > 1 partitioning is needed, and filtering is
153            needed if the user has passed in filters.
154        :param cmd_template: string to be filled out with
155            IDFILE.
156        :param listopt: Option to substitute into LISTOPT to cause test listing
157            to take place.
158        :param idoption: Option to substitutde into cmd when supplying any test
159            ids.
160        :param ui: The UI in use.
161        :param repository: The repository to query for test times, if needed.
162        :param parallel: If not True, prohibit parallel use : used to implement
163            --parallel run recursively.
164        :param listpath: The file listing path to use. If None, a unique path
165            is created.
166        :param parser: An options parser for reading options from.
167        :param test_filters: An optional list of test filters to apply. Each
168            filter should be a string suitable for passing to re.compile.
169            filters are applied using search() rather than match(), so if
170            anchoring is needed it should be included in the regex.
171            The test ids used for executing are the union of all the individual
172            filters: to take the intersection instead, craft a single regex that
173            matches all your criteria. Filters are automatically applied by
174            run_tests(), or can be applied by calling filter_tests(test_ids).
175        :param instance_source: A source of test run instances. Must support
176            obtain_instance(max_concurrency) -> id and release_instance(id)
177            calls.
178        :param group_callback: If supplied, should be a function that accepts a
179            test id and returns a group id. A group id is an arbitrary value
180            used as a dictionary key in the scheduler. All test ids with the
181            same group id are scheduled onto the same backend test process.
182        """
183        self.test_ids = test_ids
184        self.template = cmd_template
185        self.listopt = listopt
186        self.idoption = idoption
187        self.ui = ui
188        self.repository = repository
189        self.parallel = parallel
190        self._listpath = listpath
191        self._parser = parser
192        self.test_filters = test_filters
193        self._group_callback = group_callback
194        self._instance_source = instance_source
195
196    def setUp(self):
197        super(TestListingFixture, self).setUp()
198        variable_regex = '\$(IDOPTION|IDFILE|IDLIST|LISTOPT)'
199        variables = {}
200        list_variables = {'LISTOPT': self.listopt}
201        cmd = self.template
202        try:
203            default_idstr = self._parser.get('DEFAULT', 'test_id_list_default')
204            list_variables['IDLIST'] = default_idstr
205            # In theory we should also support casting this into IDFILE etc -
206            # needs this horrible class refactored.
207        except ConfigParser.NoOptionError as e:
208            if e.message != "No option 'test_id_list_default' in section: 'DEFAULT'":
209                raise
210            default_idstr = None
211        def list_subst(match):
212            return list_variables.get(match.groups(1)[0], '')
213        self.list_cmd = re.sub(variable_regex, list_subst, cmd)
214        nonparallel = (not self.parallel or not
215            getattr(self.ui, 'options', None) or not
216            getattr(self.ui.options, 'parallel', None))
217        if nonparallel:
218            self.concurrency = 1
219        else:
220            self.concurrency = self.ui.options.concurrency
221            if not self.concurrency:
222                self.concurrency = self.callout_concurrency()
223            if not self.concurrency:
224                self.concurrency = self.local_concurrency()
225            if not self.concurrency:
226                self.concurrency = 1
227        if self.test_ids is None:
228            if self.concurrency == 1:
229                if default_idstr:
230                    self.test_ids = default_idstr.split()
231            if self.concurrency != 1 or self.test_filters is not None:
232                # Have to be able to tell each worker what to run / filter
233                # tests.
234                self.test_ids = self.list_tests()
235        if self.test_ids is None:
236            # No test ids to supply to the program.
237            self.list_file_name = None
238            name = ''
239            idlist = ''
240        else:
241            self.test_ids = self.filter_tests(self.test_ids)
242            name = self.make_listfile()
243            variables['IDFILE'] = name
244            idlist = ' '.join(self.test_ids)
245        variables['IDLIST'] = idlist
246        def subst(match):
247            return variables.get(match.groups(1)[0], '')
248        if self.test_ids is None:
249            # No test ids, no id option.
250            idoption = ''
251        else:
252            idoption = re.sub(variable_regex, subst, self.idoption)
253            variables['IDOPTION'] = idoption
254        self.cmd = re.sub(variable_regex, subst, cmd)
255
256    def make_listfile(self):
257        name = None
258        try:
259            if self._listpath:
260                name = self._listpath
261                stream = open(name, 'wb')
262            else:
263                fd, name = tempfile.mkstemp()
264                stream = os.fdopen(fd, 'wb')
265            self.list_file_name = name
266            write_list(stream, self.test_ids)
267            stream.close()
268        except:
269            if name:
270                os.unlink(name)
271            raise
272        self.addCleanup(os.unlink, name)
273        return name
274
275    def filter_tests(self, test_ids):
276        """Filter test_ids by the test_filters.
277
278        :return: A list of test ids.
279        """
280        if self.test_filters is None:
281            return test_ids
282        filters = list(map(re.compile, self.test_filters))
283        def include(test_id):
284            for pred in filters:
285                if pred.search(test_id):
286                    return True
287        return list(filter(include, test_ids))
288
289    def list_tests(self):
290        """List the tests returned by list_cmd.
291
292        :return: A list of test ids.
293        """
294        if '$LISTOPT' not in self.template:
295            raise ValueError("LISTOPT not configured in .testr.conf")
296        instance, list_cmd = self._per_instance_command(self.list_cmd)
297        try:
298            self.ui.output_values([('running', list_cmd)])
299            run_proc = self.ui.subprocess_Popen(list_cmd, shell=True,
300                stdout=subprocess.PIPE, stdin=subprocess.PIPE)
301            out, err = run_proc.communicate()
302            if run_proc.returncode != 0:
303                if v2 is not None:
304                    new_out = io.BytesIO()
305                    v2.ByteStreamToStreamResult(io.BytesIO(out), 'stdout').run(
306                        results.CatFiles(new_out))
307                    out = new_out.getvalue()
308                self.ui.output_stream(io.BytesIO(out))
309                self.ui.output_stream(io.BytesIO(err))
310                raise ValueError(
311                    "Non-zero exit code (%d) from test listing."
312                    % (run_proc.returncode))
313            ids = parse_enumeration(out)
314            return ids
315        finally:
316            if instance:
317                self._instance_source.release_instance(instance)
318
319    def _per_instance_command(self, cmd):
320        """Customise cmd to with an instance-id.
321
322        :param concurrency: The number of instances to ask for (used to avoid
323            death-by-1000 cuts of latency.
324        """
325        if self._instance_source is None:
326            return None, cmd
327        instance = self._instance_source.obtain_instance(self.concurrency)
328        if instance is not None:
329            try:
330                instance_prefix = self._parser.get(
331                    'DEFAULT', 'instance_execute')
332                variables = {
333                    'INSTANCE_ID': instance.decode('utf8'),
334                    'COMMAND': cmd,
335                    # --list-tests cannot use FILES, so handle it being unset.
336                    'FILES': getattr(self, 'list_file_name', None) or '',
337                }
338                variable_regex = '\$(INSTANCE_ID|COMMAND|FILES)'
339                def subst(match):
340                    return variables.get(match.groups(1)[0], '')
341                cmd = re.sub(variable_regex, subst, instance_prefix)
342            except ConfigParser.NoOptionError:
343                # Per-instance execution environment not configured.
344                pass
345        return instance, cmd
346
347    def run_tests(self):
348        """Run the tests defined by the command and ui.
349
350        :return: A list of spawned processes.
351        """
352        result = []
353        test_ids = self.test_ids
354        if self.concurrency == 1 and (test_ids is None or test_ids):
355            # Have to customise cmd here, as instances are allocated
356            # just-in-time. XXX: Indicates this whole region needs refactoring.
357            instance, cmd = self._per_instance_command(self.cmd)
358            self.ui.output_values([('running', cmd)])
359            run_proc = self.ui.subprocess_Popen(cmd, shell=True,
360                stdout=subprocess.PIPE, stdin=subprocess.PIPE)
361            # Prevent processes stalling if they read from stdin; we could
362            # pass this through in future, but there is no point doing that
363            # until we have a working can-run-debugger-inline story.
364            run_proc.stdin.close()
365            if instance:
366                return [CallWhenProcFinishes(run_proc,
367                    lambda:self._instance_source.release_instance(instance))]
368            else:
369                return [run_proc]
370        test_id_groups = self.partition_tests(test_ids, self.concurrency)
371        for test_ids in test_id_groups:
372            if not test_ids:
373                # No tests in this partition
374                continue
375            fixture = self.useFixture(TestListingFixture(test_ids,
376                self.template, self.listopt, self.idoption, self.ui,
377                self.repository, parallel=False, parser=self._parser,
378                instance_source=self._instance_source))
379            result.extend(fixture.run_tests())
380        return result
381
382    def partition_tests(self, test_ids, concurrency):
383        """Parition test_ids by concurrency.
384
385        Test durations from the repository are used to get partitions which
386        have roughly the same expected runtime. New tests - those with no
387        recorded duration - are allocated in round-robin fashion to the
388        partitions created using test durations.
389
390        :return: A list where each element is a distinct subset of test_ids,
391            and the union of all the elements is equal to set(test_ids).
392        """
393        partitions = [list() for i in range(concurrency)]
394        timed_partitions = [[0.0, partition] for partition in partitions]
395        time_data = self.repository.get_test_times(test_ids)
396        timed_tests = time_data['known']
397        unknown_tests = time_data['unknown']
398        # Group tests: generate group_id -> test_ids.
399        group_ids = defaultdict(list)
400        if self._group_callback is None:
401            group_callback = lambda _:None
402        else:
403            group_callback = self._group_callback
404        for test_id in test_ids:
405            group_id = group_callback(test_id) or test_id
406            group_ids[group_id].append(test_id)
407        # Time groups: generate three sets of groups:
408        # - fully timed dict(group_id -> time),
409        # - partially timed dict(group_id -> time) and
410        # - unknown (set of group_id)
411        # We may in future treat partially timed different for scheduling, but
412        # at least today we just schedule them after the fully timed groups.
413        timed = {}
414        partial = {}
415        unknown = []
416        for group_id, group_tests in group_ids.items():
417            untimed_ids = unknown_tests.intersection(group_tests)
418            group_time = sum([timed_tests[test_id]
419                for test_id in untimed_ids.symmetric_difference(group_tests)])
420            if not untimed_ids:
421                timed[group_id] = group_time
422            elif group_time:
423                partial[group_id] = group_time
424            else:
425                unknown.append(group_id)
426        # Scheduling is NP complete in general, so we avoid aiming for
427        # perfection. A quick approximation that is sufficient for our general
428        # needs:
429        # sort the groups by time
430        # allocate to partitions by putting each group in to the partition with
431        # the current (lowest time, shortest length[in tests])
432        def consume_queue(groups):
433            queue = sorted(
434                groups.items(), key=operator.itemgetter(1), reverse=True)
435            for group_id, duration in queue:
436                timed_partitions[0][0] = timed_partitions[0][0] + duration
437                timed_partitions[0][1].extend(group_ids[group_id])
438                timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
439        consume_queue(timed)
440        consume_queue(partial)
441        # Assign groups with entirely unknown times in round robin fashion to
442        # the partitions.
443        for partition, group_id in zip(itertools.cycle(partitions), unknown):
444            partition.extend(group_ids[group_id])
445        return partitions
446
447    def callout_concurrency(self):
448        """Callout for user defined concurrency."""
449        try:
450            concurrency_cmd = self._parser.get(
451                'DEFAULT', 'test_run_concurrency')
452        except ConfigParser.NoOptionError:
453            return None
454        run_proc = self.ui.subprocess_Popen(concurrency_cmd, shell=True,
455            stdout=subprocess.PIPE, stdin=subprocess.PIPE)
456        out, err = run_proc.communicate()
457        if run_proc.returncode:
458            raise ValueError(
459                "test_run_concurrency failed: exit code %d, stderr='%s'" % (
460                run_proc.returncode, err.decode('utf8', 'replace')))
461        return int(out.strip())
462
463    def local_concurrency(self):
464        try:
465            return multiprocessing.cpu_count()
466        except NotImplementedError:
467            # No concurrency logic known.
468            return None
469
470
471class TestCommand(Fixture):
472    """Represents the test command defined in .testr.conf.
473
474    :ivar run_factory: The fixture to use to execute a command.
475    :ivar oldschool: Use failing.list rather than a unique file path.
476
477    TestCommand is a Fixture. Many uses of it will not require it to be setUp,
478    but calling get_run_command does require it: the fixture state is used to
479    track test environment instances, which are disposed of when cleanUp
480    happens. This is not done per-run-command, because test bisection (amongst
481    other things) uses multiple get_run_command configurations.
482    """
483
484    run_factory = TestListingFixture
485    oldschool = False
486
487    def __init__(self, ui, repository):
488        """Create a TestCommand.
489
490        :param ui: A testrepository.ui.UI object which is used to obtain the
491            location of the .testr.conf.
492        :param repository: A testrepository.repository.Repository used for
493            determining test times when partitioning tests.
494        """
495        super(TestCommand, self).__init__()
496        self.ui = ui
497        self.repository = repository
498        self._instances = None
499        self._allocated_instances = None
500
501    def setUp(self):
502        super(TestCommand, self).setUp()
503        self._instances = set()
504        self._allocated_instances = set()
505        self.addCleanup(self._dispose_instances)
506
507    def _dispose_instances(self):
508        instances = self._instances
509        if instances is None:
510            return
511        self._instances = None
512        self._allocated_instances = None
513        try:
514            dispose_cmd = self.get_parser().get('DEFAULT', 'instance_dispose')
515        except (ValueError, ConfigParser.NoOptionError):
516            return
517        variable_regex = '\$INSTANCE_IDS'
518        dispose_cmd = re.sub(variable_regex, ' '.join(sorted(instance.decode('utf') for instance in instances)),
519            dispose_cmd)
520        self.ui.output_values([('running', dispose_cmd)])
521        run_proc = self.ui.subprocess_Popen(dispose_cmd, shell=True)
522        run_proc.communicate()
523        if run_proc.returncode:
524            raise ValueError('Disposing of instances failed, return %d' %
525                run_proc.returncode)
526
527    def get_parser(self):
528        """Get a parser with the .testr.conf in it."""
529        parser = ConfigParser.ConfigParser()
530        # This possibly should push down into UI.
531        if self.ui.here == 'memory:':
532            return parser
533        if not parser.read(os.path.join(self.ui.here, '.testr.conf')):
534            raise ValueError("No .testr.conf config file")
535        return parser
536
537    def get_run_command(self, test_ids=None, testargs=(), test_filters=None):
538        """Get the command that would be run to run tests.
539
540        See TestListingFixture for the definition of test_ids and test_filters.
541        """
542        if self._instances is None:
543            raise TypeError('TestCommand not setUp')
544        parser = self.get_parser()
545        try:
546            command = parser.get('DEFAULT', 'test_command')
547        except ConfigParser.NoOptionError as e:
548            if e.message != "No option 'test_command' in section: 'DEFAULT'":
549                raise
550            raise ValueError("No test_command option present in .testr.conf")
551        elements = [command] + list(testargs)
552        cmd = ' '.join(elements)
553        idoption = ''
554        if '$IDOPTION' in command:
555            # IDOPTION is used, we must have it configured.
556            try:
557                idoption = parser.get('DEFAULT', 'test_id_option')
558            except ConfigParser.NoOptionError as e:
559                if e.message != "No option 'test_id_option' in section: 'DEFAULT'":
560                    raise
561                raise ValueError("No test_id_option option present in .testr.conf")
562        listopt = ''
563        if '$LISTOPT' in command:
564            # LISTOPT is used, test_list_option must be configured.
565            try:
566                listopt = parser.get('DEFAULT', 'test_list_option')
567            except ConfigParser.NoOptionError as e:
568                if e.message != "No option 'test_list_option' in section: 'DEFAULT'":
569                    raise
570                raise ValueError("No test_list_option option present in .testr.conf")
571        try:
572            group_regex = parser.get('DEFAULT', 'group_regex')
573        except ConfigParser.NoOptionError:
574            group_regex = None
575        if group_regex:
576            def group_callback(test_id, regex=re.compile(group_regex)):
577                match = regex.match(test_id)
578                if match:
579                    return match.group(0)
580        else:
581            group_callback = None
582        if self.oldschool:
583            listpath = os.path.join(self.ui.here, 'failing.list')
584            result = self.run_factory(test_ids, cmd, listopt, idoption,
585                self.ui, self.repository, listpath=listpath, parser=parser,
586                test_filters=test_filters, instance_source=self,
587                group_callback=group_callback)
588        else:
589            result = self.run_factory(test_ids, cmd, listopt, idoption,
590                self.ui, self.repository, parser=parser,
591                test_filters=test_filters, instance_source=self,
592                group_callback=group_callback)
593        return result
594
595    def get_filter_tags(self):
596        parser = self.get_parser()
597        try:
598            tags = parser.get('DEFAULT', 'filter_tags')
599        except ConfigParser.NoOptionError as e:
600            if e.message != "No option 'filter_tags' in section: 'DEFAULT'":
601                raise
602            return set()
603        return set([tag.strip() for tag in tags.split()])
604
605    def obtain_instance(self, concurrency):
606        """If possible, get one or more test run environment instance ids.
607
608        Note this is not threadsafe: calling it from multiple threads would
609        likely result in shared results.
610        """
611        while len(self._instances) < concurrency:
612            try:
613                cmd = self.get_parser().get('DEFAULT', 'instance_provision')
614            except ConfigParser.NoOptionError:
615                # Instance allocation not configured
616                return None
617            variable_regex = '\$INSTANCE_COUNT'
618            cmd = re.sub(variable_regex,
619                str(concurrency - len(self._instances)), cmd)
620            self.ui.output_values([('running', cmd)])
621            proc = self.ui.subprocess_Popen(
622                cmd, shell=True, stdout=subprocess.PIPE)
623            out, _ = proc.communicate()
624            if proc.returncode:
625                raise ValueError('Provisioning instances failed, return %d' %
626                    proc.returncode)
627            new_instances = set([item.strip() for item in out.split()])
628            self._instances.update(new_instances)
629        # Cached first.
630        available_instances = self._instances - self._allocated_instances
631        # We only ask for instances when one should be available.
632        result = available_instances.pop()
633        self._allocated_instances.add(result)
634        return result
635
636    def release_instance(self, instance_id):
637        """Return instance_ids to the pool for reuse."""
638        self._allocated_instances.remove(instance_id)
639