1# Copyright (c) 2018 Google Inc. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6TestCmd.py:  a testing framework for commands and scripts.
7
8The TestCmd module provides a framework for portable automated testing
9of executable commands and scripts (in any language, not just Python),
10especially commands and scripts that require file system interaction.
11
12In addition to running tests and evaluating conditions, the TestCmd
13module manages and cleans up one or more temporary workspace
14directories, and provides methods for creating files and directories in
15those workspace directories from in-line data, here-documents), allowing
16tests to be completely self-contained.
17
18A TestCmd environment object is created via the usual invocation:
19
20    import TestCmd
21    test = TestCmd.TestCmd()
22
23There are a bunch of keyword arguments available at instantiation:
24
25    test = TestCmd.TestCmd(description = 'string',
26                           program = 'program_or_script_to_test',
27                           interpreter = 'script_interpreter',
28                           workdir = 'prefix',
29                           subdir = 'subdir',
30                           verbose = Boolean,
31                           match = default_match_function,
32                           diff = default_diff_function,
33                           combine = Boolean)
34
35There are a bunch of methods that let you do different things:
36
37    test.verbose_set(1)
38
39    test.description_set('string')
40
41    test.program_set('program_or_script_to_test')
42
43    test.interpreter_set('script_interpreter')
44    test.interpreter_set(['script_interpreter', 'arg'])
45
46    test.workdir_set('prefix')
47    test.workdir_set('')
48
49    test.workpath('file')
50    test.workpath('subdir', 'file')
51
52    test.subdir('subdir', ...)
53
54    test.rmdir('subdir', ...)
55
56    test.write('file', "contents\n")
57    test.write(['subdir', 'file'], "contents\n")
58
59    test.read('file')
60    test.read(['subdir', 'file'])
61    test.read('file', mode)
62    test.read(['subdir', 'file'], mode)
63
64    test.writable('dir', 1)
65    test.writable('dir', None)
66
67    test.preserve(condition, ...)
68
69    test.cleanup(condition)
70
71    test.command_args(program = 'program_or_script_to_run',
72                      interpreter = 'script_interpreter',
73                      arguments = 'arguments to pass to program')
74
75    test.run(program = 'program_or_script_to_run',
76             interpreter = 'script_interpreter',
77             arguments = 'arguments to pass to program',
78             chdir = 'directory_to_chdir_to',
79             stdin = 'input to feed to the program\n')
80             universal_newlines = True)
81
82    p = test.start(program = 'program_or_script_to_run',
83                   interpreter = 'script_interpreter',
84                   arguments = 'arguments to pass to program',
85                   universal_newlines = None)
86
87    test.finish(self, p)
88
89    test.pass_test()
90    test.pass_test(condition)
91    test.pass_test(condition, function)
92
93    test.fail_test()
94    test.fail_test(condition)
95    test.fail_test(condition, function)
96    test.fail_test(condition, function, skip)
97
98    test.no_result()
99    test.no_result(condition)
100    test.no_result(condition, function)
101    test.no_result(condition, function, skip)
102
103    test.stdout()
104    test.stdout(run)
105
106    test.stderr()
107    test.stderr(run)
108
109    test.symlink(target, link)
110
111    test.banner(string)
112    test.banner(string, width)
113
114    test.diff(actual, expected)
115
116    test.match(actual, expected)
117
118    test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
119    test.match_exact(["actual 1\n", "actual 2\n"],
120                     ["expected 1\n", "expected 2\n"])
121
122    test.match_re("actual 1\nactual 2\n", regex_string)
123    test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
124
125    test.match_re_dotall("actual 1\nactual 2\n", regex_string)
126    test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
127
128    test.tempdir()
129    test.tempdir('temporary-directory')
130
131    test.sleep()
132    test.sleep(seconds)
133
134    test.where_is('foo')
135    test.where_is('foo', 'PATH1:PATH2')
136    test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
137
138    test.unlink('file')
139    test.unlink('subdir', 'file')
140
141The TestCmd module provides pass_test(), fail_test(), and no_result()
142unbound functions that report test results for use with the Aegis change
143management system.  These methods terminate the test immediately,
144reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
145status 0 (success), 1 or 2 respectively.  This allows for a distinction
146between an actual failed test and a test that could not be properly
147evaluated because of an external condition (such as a full file system
148or incorrect permissions).
149
150    import TestCmd
151
152    TestCmd.pass_test()
153    TestCmd.pass_test(condition)
154    TestCmd.pass_test(condition, function)
155
156    TestCmd.fail_test()
157    TestCmd.fail_test(condition)
158    TestCmd.fail_test(condition, function)
159    TestCmd.fail_test(condition, function, skip)
160
161    TestCmd.no_result()
162    TestCmd.no_result(condition)
163    TestCmd.no_result(condition, function)
164    TestCmd.no_result(condition, function, skip)
165
166The TestCmd module also provides unbound functions that handle matching
167in the same way as the match_*() methods described above.
168
169    import TestCmd
170
171    test = TestCmd.TestCmd(match = TestCmd.match_exact)
172
173    test = TestCmd.TestCmd(match = TestCmd.match_re)
174
175    test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
176
177The TestCmd module provides unbound functions that can be used for the
178"diff" argument to TestCmd.TestCmd instantiation:
179
180    import TestCmd
181
182    test = TestCmd.TestCmd(match = TestCmd.match_re,
183                           diff = TestCmd.diff_re)
184
185    test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
186
187The "diff" argument can also be used with standard difflib functions:
188
189    import difflib
190
191    test = TestCmd.TestCmd(diff = difflib.context_diff)
192
193    test = TestCmd.TestCmd(diff = difflib.unified_diff)
194
195Lastly, the where_is() method also exists in an unbound function
196version.
197
198    import TestCmd
199
200    TestCmd.where_is('foo')
201    TestCmd.where_is('foo', 'PATH1:PATH2')
202    TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
203"""
204
205# Copyright 2000-2010 Steven Knight
206# This module is free software, and you may redistribute it and/or modify
207# it under the same terms as Python itself, so long as this copyright message
208# and disclaimer are retained in their original form.
209#
210# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
211# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
212# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
213# DAMAGE.
214#
215# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
216# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
217# PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
218# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
219# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
220
221from __future__ import print_function
222
223__author__ = "Steven Knight <knight at baldmt dot com>"
224__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
225__version__ = "0.37"
226
227import errno
228import os
229import os.path
230import re
231import shutil
232import stat
233import sys
234import tempfile
235import time
236import traceback
237try:
238    from UserList import UserList
239except ImportError:
240    from collections import UserList
241
242__all__ = [
243    'diff_re',
244    'fail_test',
245    'no_result',
246    'pass_test',
247    'match_exact',
248    'match_re',
249    'match_re_dotall',
250    'python_executable',
251    'TestCmd'
252]
253
254try:
255    import difflib
256except ImportError:
257    __all__.append('simple_diff')
258
259def is_List(e):
260    return (type(e) is list) or isinstance(e, UserList)
261
262try:
263    from UserString import UserString
264except ImportError:
265    try:
266        from collections import UserString
267    except ImportError:
268        class UserString:
269            pass
270
271try:
272  # basestring was removed in python3.
273  basestring
274except NameError:
275  basestring = str
276
277def is_String(e):
278    return isinstance(e, basestring) or isinstance(e, UserString)
279
280tempfile.template = 'testcmd.'
281if os.name in ('posix', 'nt'):
282    tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
283else:
284    tempfile.template = 'testcmd.'
285
286re_space = re.compile('\s')
287
288_Cleanup = []
289
290_chain_to_exitfunc = None
291
292def _clean():
293    global _Cleanup
294    for test in reversed(_Cleanup):
295        if test:
296            test.cleanup()
297    del _Cleanup[:]
298    if _chain_to_exitfunc:
299        _chain_to_exitfunc()
300
301try:
302    import atexit
303except ImportError:
304    # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
305    try:
306        _chain_to_exitfunc = sys.exitfunc
307    except AttributeError:
308        pass
309    sys.exitfunc = _clean
310else:
311    atexit.register(_clean)
312
313try:
314    zip
315except NameError:
316    def zip(*lists):
317        result = []
318        for i in range(min(map(len, lists))):
319            result.append(tuple(map(lambda l, i=i: l[i], lists)))
320        return result
321
322class Collector:
323    def __init__(self, top):
324        self.entries = [top]
325    def __call__(self, arg, dirname, names):
326        pathjoin = lambda n, d=dirname: os.path.join(d, n)
327        self.entries.extend(map(pathjoin, names))
328
329def _caller(tblist, skip):
330    string = ""
331    arr = []
332    for file, line, name, text in tblist:
333        if file[-10:] == "TestCmd.py":
334                break
335        arr = [(file, line, name, text)] + arr
336    atfrom = "at"
337    for file, line, name, text in arr[skip:]:
338        if name in ("?", "<module>"):
339            name = ""
340        else:
341            name = " (" + name + ")"
342        string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
343        atfrom = "\tfrom"
344    return string
345
346def fail_test(self = None, condition = 1, function = None, skip = 0):
347    """Cause the test to fail.
348
349    By default, the fail_test() method reports that the test FAILED
350    and exits with a status of 1.  If a condition argument is supplied,
351    the test fails only if the condition is true.
352    """
353    if not condition:
354        return
355    if not function is None:
356        function()
357    of = ""
358    desc = ""
359    sep = " "
360    if not self is None:
361        if self.program:
362            of = " of " + self.program
363            sep = "\n\t"
364        if self.description:
365            desc = " [" + self.description + "]"
366            sep = "\n\t"
367
368    at = _caller(traceback.extract_stack(), skip)
369    sys.stderr.write("FAILED test" + of + desc + sep + at)
370
371    sys.exit(1)
372
373def no_result(self = None, condition = 1, function = None, skip = 0):
374    """Causes a test to exit with no valid result.
375
376    By default, the no_result() method reports NO RESULT for the test
377    and exits with a status of 2.  If a condition argument is supplied,
378    the test fails only if the condition is true.
379    """
380    if not condition:
381        return
382    if not function is None:
383        function()
384    of = ""
385    desc = ""
386    sep = " "
387    if not self is None:
388        if self.program:
389            of = " of " + self.program
390            sep = "\n\t"
391        if self.description:
392            desc = " [" + self.description + "]"
393            sep = "\n\t"
394
395    if os.environ.get('TESTCMD_DEBUG_SKIPS'):
396        at = _caller(traceback.extract_stack(), skip)
397        sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
398    else:
399        sys.stderr.write("NO RESULT\n")
400
401    sys.exit(2)
402
403def pass_test(self = None, condition = 1, function = None):
404    """Causes a test to pass.
405
406    By default, the pass_test() method reports PASSED for the test
407    and exits with a status of 0.  If a condition argument is supplied,
408    the test passes only if the condition is true.
409    """
410    if not condition:
411        return
412    if not function is None:
413        function()
414    sys.stderr.write("PASSED\n")
415    sys.exit(0)
416
417def match_exact(lines = None, matches = None):
418    """
419    """
420    if not is_List(lines):
421        lines = lines.split("\n")
422    if not is_List(matches):
423        matches = matches.split("\n")
424    if len(lines) != len(matches):
425        return
426    for i in range(len(lines)):
427        if lines[i] != matches[i]:
428            return
429    return 1
430
431def match_re(lines = None, res = None):
432    """
433    """
434    if not is_List(lines):
435        lines = lines.split("\n")
436    if not is_List(res):
437        res = res.split("\n")
438    if len(lines) != len(res):
439        return
440    for i in range(len(lines)):
441        s = "^" + res[i] + "$"
442        try:
443            expr = re.compile(s)
444        except re.error as e:
445            msg = "Regular expression error in %s: %s"
446            raise re.error(msg % (repr(s), e[0]))
447        if not expr.search(lines[i]):
448            return
449    return 1
450
451def match_re_dotall(lines = None, res = None):
452    """
453    """
454    if not type(lines) is type(""):
455        lines = "\n".join(lines)
456    if not type(res) is type(""):
457        res = "\n".join(res)
458    s = "^" + res + "$"
459    try:
460        expr = re.compile(s, re.DOTALL)
461    except re.error as e:
462        msg = "Regular expression error in %s: %s"
463        raise re.error(msg % (repr(s), e[0]))
464    if expr.match(lines):
465        return 1
466
467try:
468    import difflib
469except ImportError:
470    pass
471else:
472    def simple_diff(a, b, fromfile='', tofile='',
473                    fromfiledate='', tofiledate='', n=3, lineterm='\n'):
474        """
475        A function with the same calling signature as difflib.context_diff
476        (diff -c) and difflib.unified_diff (diff -u) but which prints
477        output like the simple, unadorned 'diff" command.
478        """
479        sm = difflib.SequenceMatcher(None, a, b)
480        def comma(x1, x2):
481            return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
482        result = []
483        for op, a1, a2, b1, b2 in sm.get_opcodes():
484            if op == 'delete':
485                result.append("%sd%d" % (comma(a1, a2), b1))
486                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
487            elif op == 'insert':
488                result.append("%da%s" % (a1, comma(b1, b2)))
489                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
490            elif op == 'replace':
491                result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
492                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
493                result.append('---')
494                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
495        return result
496
497def diff_re(a, b, fromfile='', tofile='',
498                fromfiledate='', tofiledate='', n=3, lineterm='\n'):
499    """
500    A simple "diff" of two sets of lines when the expected lines
501    are regular expressions.  This is a really dumb thing that
502    just compares each line in turn, so it doesn't look for
503    chunks of matching lines and the like--but at least it lets
504    you know exactly which line first didn't compare correctl...
505    """
506    result = []
507    diff = len(a) - len(b)
508    if diff < 0:
509        a = a + ['']*(-diff)
510    elif diff > 0:
511        b = b + ['']*diff
512    i = 0
513    for aline, bline in zip(a, b):
514        s = "^" + aline + "$"
515        try:
516            expr = re.compile(s)
517        except re.error as e:
518            msg = "Regular expression error in %s: %s"
519            raise re.error(msg % (repr(s), e[0]))
520        if not expr.search(bline):
521            result.append("%sc%s" % (i+1, i+1))
522            result.append('< ' + repr(a[i]))
523            result.append('---')
524            result.append('> ' + repr(b[i]))
525        i = i+1
526    return result
527
528if os.name == 'java':
529
530    python_executable = os.path.join(sys.prefix, 'jython')
531
532else:
533
534    python_executable = sys.executable
535
536if sys.platform == 'win32':
537
538    default_sleep_seconds = 2
539
540    def where_is(file, path=None, pathext=None):
541        if path is None:
542            path = os.environ['PATH']
543        if is_String(path):
544            path = path.split(os.pathsep)
545        if pathext is None:
546            pathext = os.environ['PATHEXT']
547        if is_String(pathext):
548            pathext = pathext.split(os.pathsep)
549        for ext in pathext:
550            if ext.lower() == file[-len(ext):].lower():
551                pathext = ['']
552                break
553        for dir in path:
554            f = os.path.join(dir, file)
555            for ext in pathext:
556                fext = f + ext
557                if os.path.isfile(fext):
558                    return fext
559        return None
560
561else:
562
563    def where_is(file, path=None, pathext=None):
564        if path is None:
565            path = os.environ['PATH']
566        if is_String(path):
567            path = path.split(os.pathsep)
568        for dir in path:
569            f = os.path.join(dir, file)
570            if os.path.isfile(f):
571                try:
572                    st = os.stat(f)
573                except OSError:
574                    continue
575                if stat.S_IMODE(st[stat.ST_MODE]) & 0o111:
576                    return f
577        return None
578
579    default_sleep_seconds = 1
580
581
582
583try:
584    import subprocess
585except ImportError:
586    # The subprocess module doesn't exist in this version of Python,
587    # so we're going to cobble up something that looks just enough
588    # like its API for our purposes below.
589    import new
590
591    subprocess = new.module('subprocess')
592
593    subprocess.PIPE = 'PIPE'
594    subprocess.STDOUT = 'STDOUT'
595    subprocess.mswindows = (sys.platform == 'win32')
596
597    try:
598        import popen2
599        popen2.Popen3
600    except AttributeError:
601        class Popen3:
602            universal_newlines = 1
603            def __init__(self, command, **kw):
604                if sys.platform == 'win32' and command[0] == '"':
605                    command = '"' + command + '"'
606                (stdin, stdout, stderr) = os.popen3(' ' + command)
607                self.stdin = stdin
608                self.stdout = stdout
609                self.stderr = stderr
610            def close_output(self):
611                self.stdout.close()
612                self.resultcode = self.stderr.close()
613            def wait(self):
614                resultcode = self.resultcode
615                if os.WIFEXITED(resultcode):
616                    return os.WEXITSTATUS(resultcode)
617                elif os.WIFSIGNALED(resultcode):
618                    return os.WTERMSIG(resultcode)
619                else:
620                    return None
621
622    else:
623        try:
624            popen2.Popen4
625        except AttributeError:
626            # A cribbed Popen4 class, with some retrofitted code from
627            # the Python 1.5 Popen3 class methods to do certain things
628            # by hand.
629            class Popen4(popen2.Popen3):
630                childerr = None
631
632                def __init__(self, cmd, bufsize=-1):
633                    p2cread, p2cwrite = os.pipe()
634                    c2pread, c2pwrite = os.pipe()
635                    self.pid = os.fork()
636                    if self.pid == 0:
637                        # Child
638                        os.dup2(p2cread, 0)
639                        os.dup2(c2pwrite, 1)
640                        os.dup2(c2pwrite, 2)
641                        for i in range(3, popen2.MAXFD):
642                            try:
643                                os.close(i)
644                            except: pass
645                        try:
646                            os.execvp(cmd[0], cmd)
647                        finally:
648                            os._exit(1)
649                        # Shouldn't come here, I guess
650                        os._exit(1)
651                    os.close(p2cread)
652                    self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
653                    os.close(c2pwrite)
654                    self.fromchild = os.fdopen(c2pread, 'r', bufsize)
655                    popen2._active.append(self)
656
657            popen2.Popen4 = Popen4
658
659        class Popen3(popen2.Popen3, popen2.Popen4):
660            universal_newlines = 1
661            def __init__(self, command, **kw):
662                if kw.get('stderr') == 'STDOUT':
663                    popen2.Popen4.__init__(self, command, 1)
664                else:
665                    popen2.Popen3.__init__(self, command, 1)
666                self.stdin = self.tochild
667                self.stdout = self.fromchild
668                self.stderr = self.childerr
669            def wait(self, *args, **kw):
670                resultcode = popen2.Popen3.wait(self, *args, **kw)
671                if os.WIFEXITED(resultcode):
672                    return os.WEXITSTATUS(resultcode)
673                elif os.WIFSIGNALED(resultcode):
674                    return os.WTERMSIG(resultcode)
675                else:
676                    return None
677
678    subprocess.Popen = Popen3
679
680
681
682# From Josiah Carlson,
683# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
684# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
685
686PIPE = subprocess.PIPE
687
688if sys.platform == 'win32':
689    from win32file import ReadFile, WriteFile
690    from win32pipe import PeekNamedPipe
691    import msvcrt
692else:
693    import select
694    import fcntl
695
696    try:                    fcntl.F_GETFL
697    except AttributeError:  fcntl.F_GETFL = 3
698
699    try:                    fcntl.F_SETFL
700    except AttributeError:  fcntl.F_SETFL = 4
701
702class Popen(subprocess.Popen):
703    def recv(self, maxsize=None):
704        return self._recv('stdout', maxsize)
705
706    def recv_err(self, maxsize=None):
707        return self._recv('stderr', maxsize)
708
709    def send_recv(self, input='', maxsize=None):
710        return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
711
712    def get_conn_maxsize(self, which, maxsize):
713        if maxsize is None:
714            maxsize = 1024
715        elif maxsize < 1:
716            maxsize = 1
717        return getattr(self, which), maxsize
718
719    def _close(self, which):
720        getattr(self, which).close()
721        setattr(self, which, None)
722
723    if sys.platform == 'win32':
724        def send(self, input):
725            if not self.stdin:
726                return None
727
728            try:
729                x = msvcrt.get_osfhandle(self.stdin.fileno())
730                (errCode, written) = WriteFile(x, input)
731            except ValueError:
732                return self._close('stdin')
733            except (subprocess.pywintypes.error, Exception) as why:
734                if why[0] in (109, errno.ESHUTDOWN):
735                    return self._close('stdin')
736                raise
737
738            return written
739
740        def _recv(self, which, maxsize):
741            conn, maxsize = self.get_conn_maxsize(which, maxsize)
742            if conn is None:
743                return None
744
745            try:
746                x = msvcrt.get_osfhandle(conn.fileno())
747                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
748                if maxsize < nAvail:
749                    nAvail = maxsize
750                if nAvail > 0:
751                    (errCode, read) = ReadFile(x, nAvail, None)
752            except ValueError:
753                return self._close(which)
754            except (subprocess.pywintypes.error, Exception) as why:
755                if why[0] in (109, errno.ESHUTDOWN):
756                    return self._close(which)
757                raise
758
759            #if self.universal_newlines:
760            #    read = self._translate_newlines(read)
761            return read
762
763    else:
764        def send(self, input):
765            if not self.stdin:
766                return None
767
768            if not select.select([], [self.stdin], [], 0)[1]:
769                return 0
770
771            try:
772                written = os.write(self.stdin.fileno(), input)
773            except OSError as why:
774                if why[0] == errno.EPIPE: #broken pipe
775                    return self._close('stdin')
776                raise
777
778            return written
779
780        def _recv(self, which, maxsize):
781            conn, maxsize = self.get_conn_maxsize(which, maxsize)
782            if conn is None:
783                return None
784
785            try:
786                flags = fcntl.fcntl(conn, fcntl.F_GETFL)
787            except TypeError:
788                flags = None
789            else:
790                if not conn.closed:
791                    fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
792
793            try:
794                if not select.select([conn], [], [], 0)[0]:
795                    return ''
796
797                r = conn.read(maxsize)
798                if not r:
799                    return self._close(which)
800
801                #if self.universal_newlines:
802                #    r = self._translate_newlines(r)
803                return r
804            finally:
805                if not conn.closed and not flags is None:
806                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
807
808disconnect_message = "Other end disconnected!"
809
810def recv_some(p, t=.1, e=1, tr=5, stderr=0):
811    if tr < 1:
812        tr = 1
813    x = time.time()+t
814    y = []
815    r = ''
816    pr = p.recv
817    if stderr:
818        pr = p.recv_err
819    while time.time() < x or r:
820        r = pr()
821        if r is None:
822            if e:
823                raise Exception(disconnect_message)
824            else:
825                break
826        elif r:
827            y.append(r)
828        else:
829            time.sleep(max((x-time.time())/tr, 0))
830    return ''.join(y)
831
832def send_all(p, data):
833    data = memoryview(data)
834    while len(data):
835        sent = p.send(data)
836        if sent is None:
837            raise Exception(disconnect_message)
838        data = data[sent:]
839
840
841
842class TestCmd(object):
843    """Class TestCmd
844    """
845
846    def __init__(self, description = None,
847                       program = None,
848                       interpreter = None,
849                       workdir = None,
850                       subdir = None,
851                       verbose = None,
852                       match = None,
853                       diff = None,
854                       combine = 0,
855                       universal_newlines = 1):
856        self._cwd = os.getcwd()
857        self.description_set(description)
858        self.program_set(program)
859        self.interpreter_set(interpreter)
860        if verbose is None:
861            try:
862                verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
863            except ValueError:
864                verbose = 0
865        self.verbose_set(verbose)
866        self.combine = combine
867        self.universal_newlines = universal_newlines
868        if match is not None:
869            self.match_function = match
870        else:
871            self.match_function = match_re
872        if diff is not None:
873            self.diff_function = diff
874        else:
875            try:
876                difflib
877            except NameError:
878                pass
879            else:
880                self.diff_function = simple_diff
881                #self.diff_function = difflib.context_diff
882                #self.diff_function = difflib.unified_diff
883        self._dirlist = []
884        self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
885        if 'PRESERVE' in os.environ and os.environ['PRESERVE'] is not '':
886            self._preserve['pass_test'] = os.environ['PRESERVE']
887            self._preserve['fail_test'] = os.environ['PRESERVE']
888            self._preserve['no_result'] = os.environ['PRESERVE']
889        else:
890            try:
891                self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
892            except KeyError:
893                pass
894            try:
895                self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
896            except KeyError:
897                pass
898            try:
899                self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
900            except KeyError:
901                pass
902        self._stdout = []
903        self._stderr = []
904        self.status = None
905        self.condition = 'no_result'
906        self.workdir_set(workdir)
907        self.subdir(subdir)
908
909    def __del__(self):
910        self.cleanup()
911
912    def __repr__(self):
913        return "%x" % id(self)
914
915    banner_char = '='
916    banner_width = 80
917
918    def banner(self, s, width=None):
919        if width is None:
920            width = self.banner_width
921        return s + self.banner_char * (width - len(s))
922
923    if os.name == 'posix':
924
925        def escape(self, arg):
926            "escape shell special characters"
927            slash = '\\'
928            special = '"$'
929
930            arg = arg.replace(slash, slash+slash)
931            for c in special:
932                arg = arg.replace(c, slash+c)
933
934            if re_space.search(arg):
935                arg = '"' + arg + '"'
936            return arg
937
938    else:
939
940        # Windows does not allow special characters in file names
941        # anyway, so no need for an escape function, we will just quote
942        # the arg.
943        def escape(self, arg):
944            if re_space.search(arg):
945                arg = '"' + arg + '"'
946            return arg
947
948    def canonicalize(self, path):
949        if is_List(path):
950            path = os.path.join(*path)
951        if not os.path.isabs(path):
952            path = os.path.join(self.workdir, path)
953        return path
954
955    def chmod(self, path, mode):
956        """Changes permissions on the specified file or directory
957        path name."""
958        path = self.canonicalize(path)
959        os.chmod(path, mode)
960
961    def cleanup(self, condition = None):
962        """Removes any temporary working directories for the specified
963        TestCmd environment.  If the environment variable PRESERVE was
964        set when the TestCmd environment was created, temporary working
965        directories are not removed.  If any of the environment variables
966        PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
967        when the TestCmd environment was created, then temporary working
968        directories are not removed if the test passed, failed, or had
969        no result, respectively.  Temporary working directories are also
970        preserved for conditions specified via the preserve method.
971
972        Typically, this method is not called directly, but is used when
973        the script exits to clean up temporary working directories as
974        appropriate for the exit status.
975        """
976        if not self._dirlist:
977            return
978        os.chdir(self._cwd)
979        self.workdir = None
980        if condition is None:
981            condition = self.condition
982        if self._preserve[condition]:
983            for dir in self._dirlist:
984                print("Preserved directory", dir)
985        else:
986            list = self._dirlist[:]
987            list.reverse()
988            for dir in list:
989                self.writable(dir, 1)
990                shutil.rmtree(dir, ignore_errors = 1)
991            self._dirlist = []
992
993        try:
994            global _Cleanup
995            _Cleanup.remove(self)
996        except (AttributeError, ValueError):
997            pass
998
999    def command_args(self, program = None,
1000                           interpreter = None,
1001                           arguments = None):
1002        if program:
1003            if type(program) == type('') and not os.path.isabs(program):
1004                program = os.path.join(self._cwd, program)
1005        else:
1006            program = self.program
1007            if not interpreter:
1008                interpreter = self.interpreter
1009        if not type(program) in [type([]), type(())]:
1010            program = [program]
1011        cmd = list(program)
1012        if interpreter:
1013            if not type(interpreter) in [type([]), type(())]:
1014                interpreter = [interpreter]
1015            cmd = list(interpreter) + cmd
1016        if arguments:
1017            if type(arguments) == type(''):
1018                arguments = arguments.split()
1019            cmd.extend(arguments)
1020        return cmd
1021
1022    def description_set(self, description):
1023        """Set the description of the functionality being tested.
1024        """
1025        self.description = description
1026
1027    try:
1028        difflib
1029    except NameError:
1030        def diff(self, a, b, name, *args, **kw):
1031            print(self.banner('Expected %s' % name))
1032            print(a)
1033            print(self.banner('Actual %s' % name))
1034            print(b)
1035    else:
1036        def diff(self, a, b, name, *args, **kw):
1037            print(self.banner(name))
1038            args = (a.splitlines(), b.splitlines()) + args
1039            lines = self.diff_function(*args, **kw)
1040            for l in lines:
1041                print(l)
1042
1043    def fail_test(self, condition = 1, function = None, skip = 0):
1044        """Cause the test to fail.
1045        """
1046        if not condition:
1047            return
1048        self.condition = 'fail_test'
1049        fail_test(self = self,
1050                  condition = condition,
1051                  function = function,
1052                  skip = skip)
1053
1054    def interpreter_set(self, interpreter):
1055        """Set the program to be used to interpret the program
1056        under test as a script.
1057        """
1058        self.interpreter = interpreter
1059
1060    def match(self, lines, matches):
1061        """Compare actual and expected file contents.
1062        """
1063        return self.match_function(lines, matches)
1064
1065    def match_exact(self, lines, matches):
1066        """Compare actual and expected file contents.
1067        """
1068        return match_exact(lines, matches)
1069
1070    def match_re(self, lines, res):
1071        """Compare actual and expected file contents.
1072        """
1073        return match_re(lines, res)
1074
1075    def match_re_dotall(self, lines, res):
1076        """Compare actual and expected file contents.
1077        """
1078        return match_re_dotall(lines, res)
1079
1080    def no_result(self, condition = 1, function = None, skip = 0):
1081        """Report that the test could not be run.
1082        """
1083        if not condition:
1084            return
1085        self.condition = 'no_result'
1086        no_result(self = self,
1087                  condition = condition,
1088                  function = function,
1089                  skip = skip)
1090
1091    def pass_test(self, condition = 1, function = None):
1092        """Cause the test to pass.
1093        """
1094        if not condition:
1095            return
1096        self.condition = 'pass_test'
1097        pass_test(self = self, condition = condition, function = function)
1098
1099    def preserve(self, *conditions):
1100        """Arrange for the temporary working directories for the
1101        specified TestCmd environment to be preserved for one or more
1102        conditions.  If no conditions are specified, arranges for
1103        the temporary working directories to be preserved for all
1104        conditions.
1105        """
1106        if conditions is ():
1107            conditions = ('pass_test', 'fail_test', 'no_result')
1108        for cond in conditions:
1109            self._preserve[cond] = 1
1110
1111    def program_set(self, program):
1112        """Set the executable program or script to be tested.
1113        """
1114        if program and not os.path.isabs(program):
1115            program = os.path.join(self._cwd, program)
1116        self.program = program
1117
1118    def read(self, file, mode = 'r'):
1119        """Reads and returns the contents of the specified file name.
1120        The file name may be a list, in which case the elements are
1121        concatenated with the os.path.join() method.  The file is
1122        assumed to be under the temporary working directory unless it
1123        is an absolute path name.  The I/O mode for the file may
1124        be specified; it must begin with an 'r'.  The default is
1125        'r' (string read).
1126        """
1127        file = self.canonicalize(file)
1128        if mode[0] != 'r':
1129            raise ValueError("mode must begin with 'r'")
1130        with open(file, mode) as f:
1131            result = f.read()
1132        return result
1133
1134    def rmdir(self, dir):
1135        """Removes the specified dir name.
1136        The dir name may be a list, in which case the elements are
1137        concatenated with the os.path.join() method.  The dir is
1138        assumed to be under the temporary working directory unless it
1139        is an absolute path name.
1140        The dir must be empty.
1141        """
1142        dir = self.canonicalize(dir)
1143        os.rmdir(dir)
1144
1145    def start(self, program = None,
1146                    interpreter = None,
1147                    arguments = None,
1148                    universal_newlines = None,
1149                    **kw):
1150        """
1151        Starts a program or script for the test environment.
1152
1153        The specified program will have the original directory
1154        prepended unless it is enclosed in a [list].
1155        """
1156        cmd = self.command_args(program, interpreter, arguments)
1157        cmd_string = ' '.join(map(self.escape, cmd))
1158        if self.verbose:
1159            sys.stderr.write(cmd_string + "\n")
1160        if universal_newlines is None:
1161            universal_newlines = self.universal_newlines
1162
1163        # On Windows, if we make stdin a pipe when we plan to send
1164        # no input, and the test program exits before
1165        # Popen calls msvcrt.open_osfhandle, that call will fail.
1166        # So don't use a pipe for stdin if we don't need one.
1167        stdin = kw.get('stdin', None)
1168        if stdin is not None:
1169            stdin = subprocess.PIPE
1170
1171        combine = kw.get('combine', self.combine)
1172        if combine:
1173            stderr_value = subprocess.STDOUT
1174        else:
1175            stderr_value = subprocess.PIPE
1176
1177        return Popen(cmd,
1178                     stdin=stdin,
1179                     stdout=subprocess.PIPE,
1180                     stderr=stderr_value,
1181                     universal_newlines=universal_newlines)
1182
1183    def finish(self, popen, **kw):
1184        """
1185        Finishes and waits for the process being run under control of
1186        the specified popen argument, recording the exit status,
1187        standard output and error output.
1188        """
1189        popen.stdin.close()
1190        self.status = popen.wait()
1191        if not self.status:
1192            self.status = 0
1193        self._stdout.append(popen.stdout.read())
1194        if popen.stderr:
1195            stderr = popen.stderr.read()
1196        else:
1197            stderr = ''
1198        self._stderr.append(stderr)
1199
1200    def run(self, program = None,
1201                  interpreter = None,
1202                  arguments = None,
1203                  chdir = None,
1204                  stdin = None,
1205                  universal_newlines = None):
1206        """Runs a test of the program or script for the test
1207        environment.  Standard output and error output are saved for
1208        future retrieval via the stdout() and stderr() methods.
1209
1210        The specified program will have the original directory
1211        prepended unless it is enclosed in a [list].
1212        """
1213        if chdir:
1214            oldcwd = os.getcwd()
1215            if not os.path.isabs(chdir):
1216                chdir = os.path.join(self.workpath(chdir))
1217            if self.verbose:
1218                sys.stderr.write("chdir(" + chdir + ")\n")
1219            os.chdir(chdir)
1220        p = self.start(program,
1221                       interpreter,
1222                       arguments,
1223                       universal_newlines,
1224                       stdin=stdin)
1225        if stdin:
1226            if is_List(stdin):
1227                for line in stdin:
1228                    p.stdin.write(line)
1229            else:
1230                p.stdin.write(stdin)
1231            p.stdin.close()
1232
1233        out = p.stdout.read()
1234        if p.stderr is None:
1235            err = ''
1236        else:
1237            err = p.stderr.read()
1238        try:
1239            close_output = p.close_output
1240        except AttributeError:
1241            p.stdout.close()
1242            if not p.stderr is None:
1243                p.stderr.close()
1244        else:
1245            close_output()
1246
1247        self._stdout.append(out)
1248        self._stderr.append(err)
1249
1250        self.status = p.wait()
1251        if not self.status:
1252            self.status = 0
1253
1254        if chdir:
1255            os.chdir(oldcwd)
1256        if self.verbose >= 2:
1257            write = sys.stdout.write
1258            write('============ STATUS: %d\n' % self.status)
1259            out = self.stdout()
1260            if out or self.verbose >= 3:
1261                write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1262                write(out)
1263                write('============ END STDOUT\n')
1264            err = self.stderr()
1265            if err or self.verbose >= 3:
1266                write('============ BEGIN STDERR (len=%d)\n' % len(err))
1267                write(err)
1268                write('============ END STDERR\n')
1269
1270    def sleep(self, seconds = default_sleep_seconds):
1271        """Sleeps at least the specified number of seconds.  If no
1272        number is specified, sleeps at least the minimum number of
1273        seconds necessary to advance file time stamps on the current
1274        system.  Sleeping more seconds is all right.
1275        """
1276        time.sleep(seconds)
1277
1278    def stderr(self, run = None):
1279        """Returns the error output from the specified run number.
1280        If there is no specified run number, then returns the error
1281        output of the last run.  If the run number is less than zero,
1282        then returns the error output from that many runs back from the
1283        current run.
1284        """
1285        if not run:
1286            run = len(self._stderr)
1287        elif run < 0:
1288            run = len(self._stderr) + run
1289        run = run - 1
1290        return self._stderr[run]
1291
1292    def stdout(self, run = None):
1293        """Returns the standard output from the specified run number.
1294        If there is no specified run number, then returns the standard
1295        output of the last run.  If the run number is less than zero,
1296        then returns the standard output from that many runs back from
1297        the current run.
1298        """
1299        if not run:
1300            run = len(self._stdout)
1301        elif run < 0:
1302            run = len(self._stdout) + run
1303        run = run - 1
1304        return self._stdout[run]
1305
1306    def subdir(self, *subdirs):
1307        """Create new subdirectories under the temporary working
1308        directory, one for each argument.  An argument may be a list,
1309        in which case the list elements are concatenated using the
1310        os.path.join() method.  Subdirectories multiple levels deep
1311        must be created using a separate argument for each level:
1312
1313                test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1314
1315        Returns the number of subdirectories actually created.
1316        """
1317        count = 0
1318        for sub in subdirs:
1319            if sub is None:
1320                continue
1321            if is_List(sub):
1322                sub = os.path.join(*sub)
1323            new = os.path.join(self.workdir, sub)
1324            try:
1325                os.mkdir(new)
1326            except OSError:
1327                pass
1328            else:
1329                count = count + 1
1330        return count
1331
1332    def symlink(self, target, link):
1333        """Creates a symlink to the specified target.
1334        The link name may be a list, in which case the elements are
1335        concatenated with the os.path.join() method.  The link is
1336        assumed to be under the temporary working directory unless it
1337        is an absolute path name. The target is *not* assumed to be
1338        under the temporary working directory.
1339        """
1340        link = self.canonicalize(link)
1341        os.symlink(target, link)
1342
1343    def tempdir(self, path=None):
1344        """Creates a temporary directory.
1345        A unique directory name is generated if no path name is specified.
1346        The directory is created, and will be removed when the TestCmd
1347        object is destroyed.
1348        """
1349        if path is None:
1350            try:
1351                path = tempfile.mktemp(prefix=tempfile.template)
1352            except TypeError:
1353                path = tempfile.mktemp()
1354        os.mkdir(path)
1355
1356        # Symlinks in the path will report things
1357        # differently from os.getcwd(), so chdir there
1358        # and back to fetch the canonical path.
1359        cwd = os.getcwd()
1360        try:
1361            os.chdir(path)
1362            path = os.getcwd()
1363        finally:
1364            os.chdir(cwd)
1365
1366        # Uppercase the drive letter since the case of drive
1367        # letters is pretty much random on win32:
1368        drive,rest = os.path.splitdrive(path)
1369        if drive:
1370            path = drive.upper() + rest
1371
1372        #
1373        self._dirlist.append(path)
1374        global _Cleanup
1375        try:
1376            _Cleanup.index(self)
1377        except ValueError:
1378            _Cleanup.append(self)
1379
1380        return path
1381
1382    def touch(self, path, mtime=None):
1383        """Updates the modification time on the specified file or
1384        directory path name.  The default is to update to the
1385        current time if no explicit modification time is specified.
1386        """
1387        path = self.canonicalize(path)
1388        atime = os.path.getatime(path)
1389        if mtime is None:
1390            mtime = time.time()
1391        os.utime(path, (atime, mtime))
1392
1393    def unlink(self, file):
1394        """Unlinks the specified file name.
1395        The file name may be a list, in which case the elements are
1396        concatenated with the os.path.join() method.  The file is
1397        assumed to be under the temporary working directory unless it
1398        is an absolute path name.
1399        """
1400        file = self.canonicalize(file)
1401        os.unlink(file)
1402
1403    def verbose_set(self, verbose):
1404        """Set the verbose level.
1405        """
1406        self.verbose = verbose
1407
1408    def where_is(self, file, path=None, pathext=None):
1409        """Find an executable file.
1410        """
1411        if is_List(file):
1412            file = os.path.join(*file)
1413        if not os.path.isabs(file):
1414            file = where_is(file, path, pathext)
1415        return file
1416
1417    def workdir_set(self, path):
1418        """Creates a temporary working directory with the specified
1419        path name.  If the path is a null string (''), a unique
1420        directory name is created.
1421        """
1422        if (path != None):
1423            if path == '':
1424                path = None
1425            path = self.tempdir(path)
1426        self.workdir = path
1427
1428    def workpath(self, *args):
1429        """Returns the absolute path name to a subdirectory or file
1430        within the current temporary working directory.  Concatenates
1431        the temporary working directory name with the specified
1432        arguments using the os.path.join() method.
1433        """
1434        return os.path.join(self.workdir, *args)
1435
1436    def readable(self, top, read=1):
1437        """Make the specified directory tree readable (read == 1)
1438        or not (read == None).
1439
1440        This method has no effect on Windows systems, which use a
1441        completely different mechanism to control file readability.
1442        """
1443
1444        if sys.platform == 'win32':
1445            return
1446
1447        if read:
1448            def do_chmod(fname):
1449                try: st = os.stat(fname)
1450                except OSError: pass
1451                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1452        else:
1453            def do_chmod(fname):
1454                try: st = os.stat(fname)
1455                except OSError: pass
1456                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1457
1458        if os.path.isfile(top):
1459            # If it's a file, that's easy, just chmod it.
1460            do_chmod(top)
1461        elif read:
1462            # It's a directory and we're trying to turn on read
1463            # permission, so it's also pretty easy, just chmod the
1464            # directory and then chmod every entry on our walk down the
1465            # tree.  Because os.walk() is top-down, we'll enable
1466            # read permission on any directories that have it disabled
1467            # before os.walk() tries to list their contents.
1468            do_chmod(top)
1469
1470            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1471                for n in names:
1472                    do_chmod(os.path.join(dirname, n))
1473
1474            os.walk(top, chmod_entries, None)
1475        else:
1476            # It's a directory and we're trying to turn off read
1477            # permission, which means we have to chmod the directoreis
1478            # in the tree bottom-up, lest disabling read permission from
1479            # the top down get in the way of being able to get at lower
1480            # parts of the tree.  But os.walk() visits things top
1481            # down, so we just use an object to collect a list of all
1482            # of the entries in the tree, reverse the list, and then
1483            # chmod the reversed (bottom-up) list.
1484            col = Collector(top)
1485            os.walk(top, col, None)
1486            col.entries.reverse()
1487            for d in col.entries: do_chmod(d)
1488
1489    def writable(self, top, write=1):
1490        """Make the specified directory tree writable (write == 1)
1491        or not (write == None).
1492        """
1493
1494        if sys.platform == 'win32':
1495
1496            if write:
1497                def do_chmod(fname):
1498                    try: os.chmod(fname, stat.S_IWRITE)
1499                    except OSError: pass
1500            else:
1501                def do_chmod(fname):
1502                    try: os.chmod(fname, stat.S_IREAD)
1503                    except OSError: pass
1504
1505        else:
1506
1507            if write:
1508                def do_chmod(fname):
1509                    try: st = os.stat(fname)
1510                    except OSError: pass
1511                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0o200))
1512            else:
1513                def do_chmod(fname):
1514                    try: st = os.stat(fname)
1515                    except OSError: pass
1516                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0o200))
1517
1518        if os.path.isfile(top):
1519            do_chmod(top)
1520        else:
1521            col = Collector(top)
1522            os.walk(top, col, None)
1523            for d in col.entries: do_chmod(d)
1524
1525    def executable(self, top, execute=1):
1526        """Make the specified directory tree executable (execute == 1)
1527        or not (execute == None).
1528
1529        This method has no effect on Windows systems, which use a
1530        completely different mechanism to control file executability.
1531        """
1532
1533        if sys.platform == 'win32':
1534            return
1535
1536        if execute:
1537            def do_chmod(fname):
1538                try: st = os.stat(fname)
1539                except OSError: pass
1540                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1541        else:
1542            def do_chmod(fname):
1543                try: st = os.stat(fname)
1544                except OSError: pass
1545                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1546
1547        if os.path.isfile(top):
1548            # If it's a file, that's easy, just chmod it.
1549            do_chmod(top)
1550        elif execute:
1551            # It's a directory and we're trying to turn on execute
1552            # permission, so it's also pretty easy, just chmod the
1553            # directory and then chmod every entry on our walk down the
1554            # tree.  Because os.walk() is top-down, we'll enable
1555            # execute permission on any directories that have it disabled
1556            # before os.walk() tries to list their contents.
1557            do_chmod(top)
1558
1559            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1560                for n in names:
1561                    do_chmod(os.path.join(dirname, n))
1562
1563            os.walk(top, chmod_entries, None)
1564        else:
1565            # It's a directory and we're trying to turn off execute
1566            # permission, which means we have to chmod the directories
1567            # in the tree bottom-up, lest disabling execute permission from
1568            # the top down get in the way of being able to get at lower
1569            # parts of the tree.  But os.walk() visits things top
1570            # down, so we just use an object to collect a list of all
1571            # of the entries in the tree, reverse the list, and then
1572            # chmod the reversed (bottom-up) list.
1573            col = Collector(top)
1574            os.walk(top, col, None)
1575            col.entries.reverse()
1576            for d in col.entries: do_chmod(d)
1577
1578    def write(self, file, content, mode = 'w'):
1579        """Writes the specified content text (second argument) to the
1580        specified file name (first argument).  The file name may be
1581        a list, in which case the elements are concatenated with the
1582        os.path.join() method.  The file is created under the temporary
1583        working directory.  Any subdirectories in the path must already
1584        exist.  The I/O mode for the file may be specified; it must
1585        begin with a 'w'.  The default is 'w' (binary write).
1586        """
1587        file = self.canonicalize(file)
1588        if mode[0] != 'w':
1589            raise ValueError("mode must begin with 'w'")
1590        with open(file, mode) as f:
1591            f.write(content)
1592
1593# Local Variables:
1594# tab-width:4
1595# indent-tabs-mode:nil
1596# End:
1597# vim: set expandtab tabstop=4 shiftwidth=4:
1598