1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Run a group of subprocesses and then finish."""
15
16import logging
17import multiprocessing
18import os
19import platform
20import re
21import signal
22import subprocess
23import sys
24import tempfile
25import time
26import errno
27
28# cpu cost measurement
29measure_cpu_costs = False
30
31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
32# Maximum number of bytes of job's stdout that will be stored in the result.
33# Only last N bytes of stdout will be kept if the actual output longer.
34_MAX_RESULT_SIZE = 64 * 1024
35
36
37# NOTE: If you change this, please make sure to test reviewing the
38# github PR with http://reviewable.io, which is known to add UTF-8
39# characters to the PR description, which leak into the environment here
40# and cause failures.
41def strip_non_ascii_chars(s):
42    return ''.join(c for c in s if ord(c) < 128)
43
44
45def sanitized_environment(env):
46    sanitized = {}
47    for key, value in env.items():
48        sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
49    return sanitized
50
51
52def platform_string():
53    if platform.system() == 'Windows':
54        return 'windows'
55    elif platform.system()[:7] == 'MSYS_NT':
56        return 'windows'
57    elif platform.system() == 'Darwin':
58        return 'mac'
59    elif platform.system() == 'Linux':
60        return 'linux'
61    else:
62        return 'posix'
63
64
65# setup a signal handler so that signal.pause registers 'something'
66# when a child finishes
67# not using futures and threading to avoid a dependency on subprocess32
68if platform_string() == 'windows':
69    pass
70else:
71
72    def alarm_handler(unused_signum, unused_frame):
73        pass
74
75    signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
76    signal.signal(signal.SIGALRM, alarm_handler)
77
78_SUCCESS = object()
79_FAILURE = object()
80_RUNNING = object()
81_KILLED = object()
82
83_COLORS = {
84    'red': [31, 0],
85    'green': [32, 0],
86    'yellow': [33, 0],
87    'lightgray': [37, 0],
88    'gray': [30, 1],
89    'purple': [35, 0],
90    'cyan': [36, 0]
91}
92
93_BEGINNING_OF_LINE = '\x1b[0G'
94_CLEAR_LINE = '\x1b[2K'
95
96_TAG_COLOR = {
97    'FAILED': 'red',
98    'FLAKE': 'purple',
99    'TIMEOUT_FLAKE': 'purple',
100    'WARNING': 'yellow',
101    'TIMEOUT': 'red',
102    'PASSED': 'green',
103    'START': 'gray',
104    'WAITING': 'yellow',
105    'SUCCESS': 'green',
106    'IDLE': 'gray',
107    'SKIPPED': 'cyan'
108}
109
110_FORMAT = '%(asctime)-15s %(message)s'
111logging.basicConfig(level=logging.INFO, format=_FORMAT)
112
113
114def eintr_be_gone(fn):
115    """Run fn until it doesn't stop because of EINTR"""
116    while True:
117        try:
118            return fn()
119        except IOError as e:
120            if e.errno != errno.EINTR:
121                raise
122
123
124def message(tag, msg, explanatory_text=None, do_newline=False):
125    if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
126        return
127    message.old_tag = tag
128    message.old_msg = msg
129    while True:
130        try:
131            if platform_string() == 'windows' or not sys.stdout.isatty():
132                if explanatory_text:
133                    logging.info(explanatory_text)
134                logging.info('%s: %s', tag, msg)
135            else:
136                sys.stdout.write(
137                    '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' %
138                    (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' %
139                     explanatory_text if explanatory_text is not None else '',
140                     _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
141                     tag, msg, '\n'
142                     if do_newline or explanatory_text is not None else ''))
143            sys.stdout.flush()
144            return
145        except IOError as e:
146            if e.errno != errno.EINTR:
147                raise
148
149
150message.old_tag = ''
151message.old_msg = ''
152
153
154def which(filename):
155    if '/' in filename:
156        return filename
157    for path in os.environ['PATH'].split(os.pathsep):
158        if os.path.exists(os.path.join(path, filename)):
159            return os.path.join(path, filename)
160    raise Exception('%s not found' % filename)
161
162
163class JobSpec(object):
164    """Specifies what to run for a job."""
165
166    def __init__(self,
167                 cmdline,
168                 shortname=None,
169                 environ=None,
170                 cwd=None,
171                 shell=False,
172                 timeout_seconds=5 * 60,
173                 flake_retries=0,
174                 timeout_retries=0,
175                 kill_handler=None,
176                 cpu_cost=1.0,
177                 verbose_success=False,
178                 logfilename=None):
179        """
180    Arguments:
181      cmdline: a list of arguments to pass as the command line
182      environ: a dictionary of environment variables to set in the child process
183      kill_handler: a handler that will be called whenever job.kill() is invoked
184      cpu_cost: number of cores per second this job needs
185      logfilename: use given file to store job's output, rather than using a temporary file
186    """
187        if environ is None:
188            environ = {}
189        self.cmdline = cmdline
190        self.environ = environ
191        self.shortname = cmdline[0] if shortname is None else shortname
192        self.cwd = cwd
193        self.shell = shell
194        self.timeout_seconds = timeout_seconds
195        self.flake_retries = flake_retries
196        self.timeout_retries = timeout_retries
197        self.kill_handler = kill_handler
198        self.cpu_cost = cpu_cost
199        self.verbose_success = verbose_success
200        self.logfilename = logfilename
201        if self.logfilename and self.flake_retries != 0 and self.timeout_retries != 0:
202            # Forbidden to avoid overwriting the test log when retrying.
203            raise Exception(
204                'Cannot use custom logfile when retries are enabled')
205
206    def identity(self):
207        return '%r %r' % (self.cmdline, self.environ)
208
209    def __hash__(self):
210        return hash(self.identity())
211
212    def __cmp__(self, other):
213        return self.identity() == other.identity()
214
215    def __repr__(self):
216        return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname,
217                                                      self.cmdline)
218
219    def __str__(self):
220        return '%s: %s %s' % (self.shortname, ' '.join(
221            '%s=%s' % kv for kv in self.environ.items()), ' '.join(
222                self.cmdline))
223
224
225class JobResult(object):
226
227    def __init__(self):
228        self.state = 'UNKNOWN'
229        self.returncode = -1
230        self.elapsed_time = 0
231        self.num_failures = 0
232        self.retries = 0
233        self.message = ''
234        self.cpu_estimated = 1
235        self.cpu_measured = 1
236
237
238def read_from_start(f):
239    f.seek(0)
240    return f.read()
241
242
243class Job(object):
244    """Manages one job."""
245
246    def __init__(self,
247                 spec,
248                 newline_on_success,
249                 travis,
250                 add_env,
251                 quiet_success=False):
252        self._spec = spec
253        self._newline_on_success = newline_on_success
254        self._travis = travis
255        self._add_env = add_env.copy()
256        self._retries = 0
257        self._timeout_retries = 0
258        self._suppress_failure_message = False
259        self._quiet_success = quiet_success
260        if not self._quiet_success:
261            message('START', spec.shortname, do_newline=self._travis)
262        self.result = JobResult()
263        self.start()
264
265    def GetSpec(self):
266        return self._spec
267
268    def start(self):
269        if self._spec.logfilename:
270            # make sure the log directory exists
271            logfile_dir = os.path.dirname(
272                os.path.abspath(self._spec.logfilename))
273            if not os.path.exists(logfile_dir):
274                os.makedirs(logfile_dir)
275            self._logfile = open(self._spec.logfilename, 'w+')
276        else:
277            self._logfile = tempfile.TemporaryFile()
278        env = dict(os.environ)
279        env.update(self._spec.environ)
280        env.update(self._add_env)
281        env = sanitized_environment(env)
282        self._start = time.time()
283        cmdline = self._spec.cmdline
284        # The Unix time command is finicky when used with MSBuild, so we don't use it
285        # with jobs that run MSBuild.
286        global measure_cpu_costs
287        if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
288            cmdline = ['time', '-p'] + cmdline
289        else:
290            measure_cpu_costs = False
291        try_start = lambda: subprocess.Popen(args=cmdline,
292                                             stderr=subprocess.STDOUT,
293                                             stdout=self._logfile,
294                                             cwd=self._spec.cwd,
295                                             shell=self._spec.shell,
296                                             env=env)
297        delay = 0.3
298        for i in range(0, 4):
299            try:
300                self._process = try_start()
301                break
302            except OSError:
303                message(
304                    'WARNING', 'Failed to start %s, retrying in %f seconds' %
305                    (self._spec.shortname, delay))
306                time.sleep(delay)
307                delay *= 2
308        else:
309            self._process = try_start()
310        self._state = _RUNNING
311
312    def state(self):
313        """Poll current state of the job. Prints messages at completion."""
314
315        def stdout(self=self):
316            stdout = read_from_start(self._logfile)
317            self.result.message = stdout[-_MAX_RESULT_SIZE:]
318            return stdout
319
320        if self._state == _RUNNING and self._process.poll() is not None:
321            elapsed = time.time() - self._start
322            self.result.elapsed_time = elapsed
323            if self._process.returncode != 0:
324                if self._retries < self._spec.flake_retries:
325                    message('FLAKE',
326                            '%s [ret=%d, pid=%d]' %
327                            (self._spec.shortname, self._process.returncode,
328                             self._process.pid),
329                            stdout(),
330                            do_newline=True)
331                    self._retries += 1
332                    self.result.num_failures += 1
333                    self.result.retries = self._timeout_retries + self._retries
334                    # NOTE: job is restarted regardless of jobset's max_time setting
335                    self.start()
336                else:
337                    self._state = _FAILURE
338                    if not self._suppress_failure_message:
339                        message('FAILED',
340                                '%s [ret=%d, pid=%d, time=%.1fsec]' %
341                                (self._spec.shortname, self._process.returncode,
342                                 self._process.pid, elapsed),
343                                stdout(),
344                                do_newline=True)
345                    self.result.state = 'FAILED'
346                    self.result.num_failures += 1
347                    self.result.returncode = self._process.returncode
348            else:
349                self._state = _SUCCESS
350                measurement = ''
351                if measure_cpu_costs:
352                    m = re.search(
353                        r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
354                        stdout())
355                    real = float(m.group(1))
356                    user = float(m.group(2))
357                    sys = float(m.group(3))
358                    if real > 0.5:
359                        cores = (user + sys) / real
360                        self.result.cpu_measured = float('%.01f' % cores)
361                        self.result.cpu_estimated = float('%.01f' %
362                                                          self._spec.cpu_cost)
363                        measurement = '; cpu_cost=%.01f; estimated=%.01f' % (
364                            self.result.cpu_measured, self.result.cpu_estimated)
365                if not self._quiet_success:
366                    message('PASSED',
367                            '%s [time=%.1fsec, retries=%d:%d%s]' %
368                            (self._spec.shortname, elapsed, self._retries,
369                             self._timeout_retries, measurement),
370                            stdout() if self._spec.verbose_success else None,
371                            do_newline=self._newline_on_success or self._travis)
372                self.result.state = 'PASSED'
373        elif (self._state == _RUNNING and
374              self._spec.timeout_seconds is not None and
375              time.time() - self._start > self._spec.timeout_seconds):
376            elapsed = time.time() - self._start
377            self.result.elapsed_time = elapsed
378            if self._timeout_retries < self._spec.timeout_retries:
379                message('TIMEOUT_FLAKE',
380                        '%s [pid=%d]' %
381                        (self._spec.shortname, self._process.pid),
382                        stdout(),
383                        do_newline=True)
384                self._timeout_retries += 1
385                self.result.num_failures += 1
386                self.result.retries = self._timeout_retries + self._retries
387                if self._spec.kill_handler:
388                    self._spec.kill_handler(self)
389                self._process.terminate()
390                # NOTE: job is restarted regardless of jobset's max_time setting
391                self.start()
392            else:
393                message('TIMEOUT',
394                        '%s [pid=%d, time=%.1fsec]' %
395                        (self._spec.shortname, self._process.pid, elapsed),
396                        stdout(),
397                        do_newline=True)
398                self.kill()
399                self.result.state = 'TIMEOUT'
400                self.result.num_failures += 1
401        return self._state
402
403    def kill(self):
404        if self._state == _RUNNING:
405            self._state = _KILLED
406            if self._spec.kill_handler:
407                self._spec.kill_handler(self)
408            self._process.terminate()
409
410    def suppress_failure_message(self):
411        self._suppress_failure_message = True
412
413
414class Jobset(object):
415    """Manages one run of jobs."""
416
417    def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
418                 newline_on_success, travis, stop_on_failure, add_env,
419                 quiet_success, max_time):
420        self._running = set()
421        self._check_cancelled = check_cancelled
422        self._cancelled = False
423        self._failures = 0
424        self._completed = 0
425        self._maxjobs = maxjobs
426        self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
427        self._newline_on_success = newline_on_success
428        self._travis = travis
429        self._stop_on_failure = stop_on_failure
430        self._add_env = add_env
431        self._quiet_success = quiet_success
432        self._max_time = max_time
433        self.resultset = {}
434        self._remaining = None
435        self._start_time = time.time()
436
437    def set_remaining(self, remaining):
438        self._remaining = remaining
439
440    def get_num_failures(self):
441        return self._failures
442
443    def cpu_cost(self):
444        c = 0
445        for job in self._running:
446            c += job._spec.cpu_cost
447        return c
448
449    def start(self, spec):
450        """Start a job. Return True on success, False on failure."""
451        while True:
452            if self._max_time > 0 and time.time(
453            ) - self._start_time > self._max_time:
454                skipped_job_result = JobResult()
455                skipped_job_result.state = 'SKIPPED'
456                message('SKIPPED', spec.shortname, do_newline=True)
457                self.resultset[spec.shortname] = [skipped_job_result]
458                return True
459            if self.cancelled(): return False
460            current_cpu_cost = self.cpu_cost()
461            if current_cpu_cost == 0: break
462            if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
463                if len(self._running) < self._maxjobs_cpu_agnostic:
464                    break
465            self.reap(spec.shortname, spec.cpu_cost)
466        if self.cancelled(): return False
467        job = Job(spec, self._newline_on_success, self._travis, self._add_env,
468                  self._quiet_success)
469        self._running.add(job)
470        if job.GetSpec().shortname not in self.resultset:
471            self.resultset[job.GetSpec().shortname] = []
472        return True
473
474    def reap(self, waiting_for=None, waiting_for_cost=None):
475        """Collect the dead jobs."""
476        while self._running:
477            dead = set()
478            for job in self._running:
479                st = eintr_be_gone(lambda: job.state())
480                if st == _RUNNING: continue
481                if st == _FAILURE or st == _KILLED:
482                    self._failures += 1
483                    if self._stop_on_failure:
484                        self._cancelled = True
485                        for job in self._running:
486                            job.kill()
487                dead.add(job)
488                break
489            for job in dead:
490                self._completed += 1
491                if not self._quiet_success or job.result.state != 'PASSED':
492                    self.resultset[job.GetSpec().shortname].append(job.result)
493                self._running.remove(job)
494            if dead: return
495            if not self._travis and platform_string() != 'windows':
496                rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
497                if self._remaining is not None and self._completed > 0:
498                    now = time.time()
499                    sofar = now - self._start_time
500                    remaining = sofar / self._completed * (self._remaining +
501                                                           len(self._running))
502                    rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
503                if waiting_for is not None:
504                    wstr = ' next: %s @ %.2f cpu' % (waiting_for,
505                                                     waiting_for_cost)
506                else:
507                    wstr = ''
508                message(
509                    'WAITING',
510                    '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
511                    (rstr, len(self._running), self._completed, self._failures,
512                     self.cpu_cost(), wstr))
513            if platform_string() == 'windows':
514                time.sleep(0.1)
515            else:
516                signal.alarm(10)
517                signal.pause()
518
519    def cancelled(self):
520        """Poll for cancellation."""
521        if self._cancelled: return True
522        if not self._check_cancelled(): return False
523        for job in self._running:
524            job.kill()
525        self._cancelled = True
526        return True
527
528    def finish(self):
529        while self._running:
530            if self.cancelled(): pass  # poll cancellation
531            self.reap()
532        if platform_string() != 'windows':
533            signal.alarm(0)
534        return not self.cancelled() and self._failures == 0
535
536
537def _never_cancelled():
538    return False
539
540
541def tag_remaining(xs):
542    staging = []
543    for x in xs:
544        staging.append(x)
545        if len(staging) > 5000:
546            yield (staging.pop(0), None)
547    n = len(staging)
548    for i, x in enumerate(staging):
549        yield (x, n - i - 1)
550
551
552def run(cmdlines,
553        check_cancelled=_never_cancelled,
554        maxjobs=None,
555        maxjobs_cpu_agnostic=None,
556        newline_on_success=False,
557        travis=False,
558        infinite_runs=False,
559        stop_on_failure=False,
560        add_env={},
561        skip_jobs=False,
562        quiet_success=False,
563        max_time=-1):
564    if skip_jobs:
565        resultset = {}
566        skipped_job_result = JobResult()
567        skipped_job_result.state = 'SKIPPED'
568        for job in cmdlines:
569            message('SKIPPED', job.shortname, do_newline=True)
570            resultset[job.shortname] = [skipped_job_result]
571        return 0, resultset
572    js = Jobset(
573        check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
574        maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else
575        _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env,
576        quiet_success, max_time)
577    for cmdline, remaining in tag_remaining(cmdlines):
578        if not js.start(cmdline):
579            break
580        if remaining is not None:
581            js.set_remaining(remaining)
582    js.finish()
583    return js.get_num_failures(), js.resultset
584