1# Copyright (c) 2018 Google Inc. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" 6TestCmd.py: a testing framework for commands and scripts. 7 8The TestCmd module provides a framework for portable automated testing 9of executable commands and scripts (in any language, not just Python), 10especially commands and scripts that require file system interaction. 11 12In addition to running tests and evaluating conditions, the TestCmd 13module manages and cleans up one or more temporary workspace 14directories, and provides methods for creating files and directories in 15those workspace directories from in-line data, here-documents), allowing 16tests to be completely self-contained. 17 18A TestCmd environment object is created via the usual invocation: 19 20 import TestCmd 21 test = TestCmd.TestCmd() 22 23There are a bunch of keyword arguments available at instantiation: 24 25 test = TestCmd.TestCmd(description = 'string', 26 program = 'program_or_script_to_test', 27 interpreter = 'script_interpreter', 28 workdir = 'prefix', 29 subdir = 'subdir', 30 verbose = Boolean, 31 match = default_match_function, 32 diff = default_diff_function, 33 combine = Boolean) 34 35There are a bunch of methods that let you do different things: 36 37 test.verbose_set(1) 38 39 test.description_set('string') 40 41 test.program_set('program_or_script_to_test') 42 43 test.interpreter_set('script_interpreter') 44 test.interpreter_set(['script_interpreter', 'arg']) 45 46 test.workdir_set('prefix') 47 test.workdir_set('') 48 49 test.workpath('file') 50 test.workpath('subdir', 'file') 51 52 test.subdir('subdir', ...) 53 54 test.rmdir('subdir', ...) 55 56 test.write('file', "contents\n") 57 test.write(['subdir', 'file'], "contents\n") 58 59 test.read('file') 60 test.read(['subdir', 'file']) 61 test.read('file', mode) 62 test.read(['subdir', 'file'], mode) 63 64 test.writable('dir', 1) 65 test.writable('dir', None) 66 67 test.preserve(condition, ...) 68 69 test.cleanup(condition) 70 71 test.command_args(program = 'program_or_script_to_run', 72 interpreter = 'script_interpreter', 73 arguments = 'arguments to pass to program') 74 75 test.run(program = 'program_or_script_to_run', 76 interpreter = 'script_interpreter', 77 arguments = 'arguments to pass to program', 78 chdir = 'directory_to_chdir_to', 79 stdin = 'input to feed to the program\n') 80 universal_newlines = True) 81 82 p = test.start(program = 'program_or_script_to_run', 83 interpreter = 'script_interpreter', 84 arguments = 'arguments to pass to program', 85 universal_newlines = None) 86 87 test.finish(self, p) 88 89 test.pass_test() 90 test.pass_test(condition) 91 test.pass_test(condition, function) 92 93 test.fail_test() 94 test.fail_test(condition) 95 test.fail_test(condition, function) 96 test.fail_test(condition, function, skip) 97 98 test.no_result() 99 test.no_result(condition) 100 test.no_result(condition, function) 101 test.no_result(condition, function, skip) 102 103 test.stdout() 104 test.stdout(run) 105 106 test.stderr() 107 test.stderr(run) 108 109 test.symlink(target, link) 110 111 test.banner(string) 112 test.banner(string, width) 113 114 test.diff(actual, expected) 115 116 test.match(actual, expected) 117 118 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") 119 test.match_exact(["actual 1\n", "actual 2\n"], 120 ["expected 1\n", "expected 2\n"]) 121 122 test.match_re("actual 1\nactual 2\n", regex_string) 123 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes) 124 125 test.match_re_dotall("actual 1\nactual 2\n", regex_string) 126 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) 127 128 test.tempdir() 129 test.tempdir('temporary-directory') 130 131 test.sleep() 132 test.sleep(seconds) 133 134 test.where_is('foo') 135 test.where_is('foo', 'PATH1:PATH2') 136 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 137 138 test.unlink('file') 139 test.unlink('subdir', 'file') 140 141The TestCmd module provides pass_test(), fail_test(), and no_result() 142unbound functions that report test results for use with the Aegis change 143management system. These methods terminate the test immediately, 144reporting PASSED, FAILED, or NO RESULT respectively, and exiting with 145status 0 (success), 1 or 2 respectively. This allows for a distinction 146between an actual failed test and a test that could not be properly 147evaluated because of an external condition (such as a full file system 148or incorrect permissions). 149 150 import TestCmd 151 152 TestCmd.pass_test() 153 TestCmd.pass_test(condition) 154 TestCmd.pass_test(condition, function) 155 156 TestCmd.fail_test() 157 TestCmd.fail_test(condition) 158 TestCmd.fail_test(condition, function) 159 TestCmd.fail_test(condition, function, skip) 160 161 TestCmd.no_result() 162 TestCmd.no_result(condition) 163 TestCmd.no_result(condition, function) 164 TestCmd.no_result(condition, function, skip) 165 166The TestCmd module also provides unbound functions that handle matching 167in the same way as the match_*() methods described above. 168 169 import TestCmd 170 171 test = TestCmd.TestCmd(match = TestCmd.match_exact) 172 173 test = TestCmd.TestCmd(match = TestCmd.match_re) 174 175 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) 176 177The TestCmd module provides unbound functions that can be used for the 178"diff" argument to TestCmd.TestCmd instantiation: 179 180 import TestCmd 181 182 test = TestCmd.TestCmd(match = TestCmd.match_re, 183 diff = TestCmd.diff_re) 184 185 test = TestCmd.TestCmd(diff = TestCmd.simple_diff) 186 187The "diff" argument can also be used with standard difflib functions: 188 189 import difflib 190 191 test = TestCmd.TestCmd(diff = difflib.context_diff) 192 193 test = TestCmd.TestCmd(diff = difflib.unified_diff) 194 195Lastly, the where_is() method also exists in an unbound function 196version. 197 198 import TestCmd 199 200 TestCmd.where_is('foo') 201 TestCmd.where_is('foo', 'PATH1:PATH2') 202 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 203""" 204 205# Copyright 2000-2010 Steven Knight 206# This module is free software, and you may redistribute it and/or modify 207# it under the same terms as Python itself, so long as this copyright message 208# and disclaimer are retained in their original form. 209# 210# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, 211# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF 212# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH 213# DAMAGE. 214# 215# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 216# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 217# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, 218# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, 219# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 220 221from __future__ import print_function 222 223__author__ = "Steven Knight <knight at baldmt dot com>" 224__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight" 225__version__ = "0.37" 226 227import errno 228import os 229import os.path 230import re 231import shutil 232import stat 233import sys 234import tempfile 235import time 236import traceback 237try: 238 from UserList import UserList 239except ImportError: 240 from collections import UserList 241 242__all__ = [ 243 'diff_re', 244 'fail_test', 245 'no_result', 246 'pass_test', 247 'match_exact', 248 'match_re', 249 'match_re_dotall', 250 'python_executable', 251 'TestCmd' 252] 253 254try: 255 import difflib 256except ImportError: 257 __all__.append('simple_diff') 258 259def is_List(e): 260 return (type(e) is list) or isinstance(e, UserList) 261 262try: 263 from UserString import UserString 264except ImportError: 265 try: 266 from collections import UserString 267 except ImportError: 268 class UserString: 269 pass 270 271try: 272 # basestring was removed in python3. 273 basestring 274except NameError: 275 basestring = str 276 277def is_String(e): 278 return isinstance(e, basestring) or isinstance(e, UserString) 279 280tempfile.template = 'testcmd.' 281if os.name in ('posix', 'nt'): 282 tempfile.template = 'testcmd.' + str(os.getpid()) + '.' 283else: 284 tempfile.template = 'testcmd.' 285 286re_space = re.compile('\s') 287 288_Cleanup = [] 289 290_chain_to_exitfunc = None 291 292def _clean(): 293 global _Cleanup 294 for test in reversed(_Cleanup): 295 if test: 296 test.cleanup() 297 del _Cleanup[:] 298 if _chain_to_exitfunc: 299 _chain_to_exitfunc() 300 301try: 302 import atexit 303except ImportError: 304 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc 305 try: 306 _chain_to_exitfunc = sys.exitfunc 307 except AttributeError: 308 pass 309 sys.exitfunc = _clean 310else: 311 atexit.register(_clean) 312 313try: 314 zip 315except NameError: 316 def zip(*lists): 317 result = [] 318 for i in range(min(map(len, lists))): 319 result.append(tuple(map(lambda l, i=i: l[i], lists))) 320 return result 321 322class Collector: 323 def __init__(self, top): 324 self.entries = [top] 325 def __call__(self, arg, dirname, names): 326 pathjoin = lambda n, d=dirname: os.path.join(d, n) 327 self.entries.extend(map(pathjoin, names)) 328 329def _caller(tblist, skip): 330 string = "" 331 arr = [] 332 for file, line, name, text in tblist: 333 if file[-10:] == "TestCmd.py": 334 break 335 arr = [(file, line, name, text)] + arr 336 atfrom = "at" 337 for file, line, name, text in arr[skip:]: 338 if name in ("?", "<module>"): 339 name = "" 340 else: 341 name = " (" + name + ")" 342 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) 343 atfrom = "\tfrom" 344 return string 345 346def fail_test(self = None, condition = 1, function = None, skip = 0): 347 """Cause the test to fail. 348 349 By default, the fail_test() method reports that the test FAILED 350 and exits with a status of 1. If a condition argument is supplied, 351 the test fails only if the condition is true. 352 """ 353 if not condition: 354 return 355 if not function is None: 356 function() 357 of = "" 358 desc = "" 359 sep = " " 360 if not self is None: 361 if self.program: 362 of = " of " + self.program 363 sep = "\n\t" 364 if self.description: 365 desc = " [" + self.description + "]" 366 sep = "\n\t" 367 368 at = _caller(traceback.extract_stack(), skip) 369 sys.stderr.write("FAILED test" + of + desc + sep + at) 370 371 sys.exit(1) 372 373def no_result(self = None, condition = 1, function = None, skip = 0): 374 """Causes a test to exit with no valid result. 375 376 By default, the no_result() method reports NO RESULT for the test 377 and exits with a status of 2. If a condition argument is supplied, 378 the test fails only if the condition is true. 379 """ 380 if not condition: 381 return 382 if not function is None: 383 function() 384 of = "" 385 desc = "" 386 sep = " " 387 if not self is None: 388 if self.program: 389 of = " of " + self.program 390 sep = "\n\t" 391 if self.description: 392 desc = " [" + self.description + "]" 393 sep = "\n\t" 394 395 if os.environ.get('TESTCMD_DEBUG_SKIPS'): 396 at = _caller(traceback.extract_stack(), skip) 397 sys.stderr.write("NO RESULT for test" + of + desc + sep + at) 398 else: 399 sys.stderr.write("NO RESULT\n") 400 401 sys.exit(2) 402 403def pass_test(self = None, condition = 1, function = None): 404 """Causes a test to pass. 405 406 By default, the pass_test() method reports PASSED for the test 407 and exits with a status of 0. If a condition argument is supplied, 408 the test passes only if the condition is true. 409 """ 410 if not condition: 411 return 412 if not function is None: 413 function() 414 sys.stderr.write("PASSED\n") 415 sys.exit(0) 416 417def match_exact(lines = None, matches = None): 418 """ 419 """ 420 if not is_List(lines): 421 lines = lines.split("\n") 422 if not is_List(matches): 423 matches = matches.split("\n") 424 if len(lines) != len(matches): 425 return 426 for i in range(len(lines)): 427 if lines[i] != matches[i]: 428 return 429 return 1 430 431def match_re(lines = None, res = None): 432 """ 433 """ 434 if not is_List(lines): 435 lines = lines.split("\n") 436 if not is_List(res): 437 res = res.split("\n") 438 if len(lines) != len(res): 439 return 440 for i in range(len(lines)): 441 s = "^" + res[i] + "$" 442 try: 443 expr = re.compile(s) 444 except re.error as e: 445 msg = "Regular expression error in %s: %s" 446 raise re.error(msg % (repr(s), e[0])) 447 if not expr.search(lines[i]): 448 return 449 return 1 450 451def match_re_dotall(lines = None, res = None): 452 """ 453 """ 454 if not type(lines) is type(""): 455 lines = "\n".join(lines) 456 if not type(res) is type(""): 457 res = "\n".join(res) 458 s = "^" + res + "$" 459 try: 460 expr = re.compile(s, re.DOTALL) 461 except re.error as e: 462 msg = "Regular expression error in %s: %s" 463 raise re.error(msg % (repr(s), e[0])) 464 if expr.match(lines): 465 return 1 466 467try: 468 import difflib 469except ImportError: 470 pass 471else: 472 def simple_diff(a, b, fromfile='', tofile='', 473 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 474 """ 475 A function with the same calling signature as difflib.context_diff 476 (diff -c) and difflib.unified_diff (diff -u) but which prints 477 output like the simple, unadorned 'diff" command. 478 """ 479 sm = difflib.SequenceMatcher(None, a, b) 480 def comma(x1, x2): 481 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) 482 result = [] 483 for op, a1, a2, b1, b2 in sm.get_opcodes(): 484 if op == 'delete': 485 result.append("%sd%d" % (comma(a1, a2), b1)) 486 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 487 elif op == 'insert': 488 result.append("%da%s" % (a1, comma(b1, b2))) 489 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 490 elif op == 'replace': 491 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) 492 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 493 result.append('---') 494 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 495 return result 496 497def diff_re(a, b, fromfile='', tofile='', 498 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 499 """ 500 A simple "diff" of two sets of lines when the expected lines 501 are regular expressions. This is a really dumb thing that 502 just compares each line in turn, so it doesn't look for 503 chunks of matching lines and the like--but at least it lets 504 you know exactly which line first didn't compare correctl... 505 """ 506 result = [] 507 diff = len(a) - len(b) 508 if diff < 0: 509 a = a + ['']*(-diff) 510 elif diff > 0: 511 b = b + ['']*diff 512 i = 0 513 for aline, bline in zip(a, b): 514 s = "^" + aline + "$" 515 try: 516 expr = re.compile(s) 517 except re.error as e: 518 msg = "Regular expression error in %s: %s" 519 raise re.error(msg % (repr(s), e[0])) 520 if not expr.search(bline): 521 result.append("%sc%s" % (i+1, i+1)) 522 result.append('< ' + repr(a[i])) 523 result.append('---') 524 result.append('> ' + repr(b[i])) 525 i = i+1 526 return result 527 528if os.name == 'java': 529 530 python_executable = os.path.join(sys.prefix, 'jython') 531 532else: 533 534 python_executable = sys.executable 535 536if sys.platform == 'win32': 537 538 default_sleep_seconds = 2 539 540 def where_is(file, path=None, pathext=None): 541 if path is None: 542 path = os.environ['PATH'] 543 if is_String(path): 544 path = path.split(os.pathsep) 545 if pathext is None: 546 pathext = os.environ['PATHEXT'] 547 if is_String(pathext): 548 pathext = pathext.split(os.pathsep) 549 for ext in pathext: 550 if ext.lower() == file[-len(ext):].lower(): 551 pathext = [''] 552 break 553 for dir in path: 554 f = os.path.join(dir, file) 555 for ext in pathext: 556 fext = f + ext 557 if os.path.isfile(fext): 558 return fext 559 return None 560 561else: 562 563 def where_is(file, path=None, pathext=None): 564 if path is None: 565 path = os.environ['PATH'] 566 if is_String(path): 567 path = path.split(os.pathsep) 568 for dir in path: 569 f = os.path.join(dir, file) 570 if os.path.isfile(f): 571 try: 572 st = os.stat(f) 573 except OSError: 574 continue 575 if stat.S_IMODE(st[stat.ST_MODE]) & 0o111: 576 return f 577 return None 578 579 default_sleep_seconds = 1 580 581 582 583try: 584 import subprocess 585except ImportError: 586 # The subprocess module doesn't exist in this version of Python, 587 # so we're going to cobble up something that looks just enough 588 # like its API for our purposes below. 589 import new 590 591 subprocess = new.module('subprocess') 592 593 subprocess.PIPE = 'PIPE' 594 subprocess.STDOUT = 'STDOUT' 595 subprocess.mswindows = (sys.platform == 'win32') 596 597 try: 598 import popen2 599 popen2.Popen3 600 except AttributeError: 601 class Popen3: 602 universal_newlines = 1 603 def __init__(self, command, **kw): 604 if sys.platform == 'win32' and command[0] == '"': 605 command = '"' + command + '"' 606 (stdin, stdout, stderr) = os.popen3(' ' + command) 607 self.stdin = stdin 608 self.stdout = stdout 609 self.stderr = stderr 610 def close_output(self): 611 self.stdout.close() 612 self.resultcode = self.stderr.close() 613 def wait(self): 614 resultcode = self.resultcode 615 if os.WIFEXITED(resultcode): 616 return os.WEXITSTATUS(resultcode) 617 elif os.WIFSIGNALED(resultcode): 618 return os.WTERMSIG(resultcode) 619 else: 620 return None 621 622 else: 623 try: 624 popen2.Popen4 625 except AttributeError: 626 # A cribbed Popen4 class, with some retrofitted code from 627 # the Python 1.5 Popen3 class methods to do certain things 628 # by hand. 629 class Popen4(popen2.Popen3): 630 childerr = None 631 632 def __init__(self, cmd, bufsize=-1): 633 p2cread, p2cwrite = os.pipe() 634 c2pread, c2pwrite = os.pipe() 635 self.pid = os.fork() 636 if self.pid == 0: 637 # Child 638 os.dup2(p2cread, 0) 639 os.dup2(c2pwrite, 1) 640 os.dup2(c2pwrite, 2) 641 for i in range(3, popen2.MAXFD): 642 try: 643 os.close(i) 644 except: pass 645 try: 646 os.execvp(cmd[0], cmd) 647 finally: 648 os._exit(1) 649 # Shouldn't come here, I guess 650 os._exit(1) 651 os.close(p2cread) 652 self.tochild = os.fdopen(p2cwrite, 'w', bufsize) 653 os.close(c2pwrite) 654 self.fromchild = os.fdopen(c2pread, 'r', bufsize) 655 popen2._active.append(self) 656 657 popen2.Popen4 = Popen4 658 659 class Popen3(popen2.Popen3, popen2.Popen4): 660 universal_newlines = 1 661 def __init__(self, command, **kw): 662 if kw.get('stderr') == 'STDOUT': 663 popen2.Popen4.__init__(self, command, 1) 664 else: 665 popen2.Popen3.__init__(self, command, 1) 666 self.stdin = self.tochild 667 self.stdout = self.fromchild 668 self.stderr = self.childerr 669 def wait(self, *args, **kw): 670 resultcode = popen2.Popen3.wait(self, *args, **kw) 671 if os.WIFEXITED(resultcode): 672 return os.WEXITSTATUS(resultcode) 673 elif os.WIFSIGNALED(resultcode): 674 return os.WTERMSIG(resultcode) 675 else: 676 return None 677 678 subprocess.Popen = Popen3 679 680 681 682# From Josiah Carlson, 683# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms 684# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 685 686PIPE = subprocess.PIPE 687 688if sys.platform == 'win32': 689 from win32file import ReadFile, WriteFile 690 from win32pipe import PeekNamedPipe 691 import msvcrt 692else: 693 import select 694 import fcntl 695 696 try: fcntl.F_GETFL 697 except AttributeError: fcntl.F_GETFL = 3 698 699 try: fcntl.F_SETFL 700 except AttributeError: fcntl.F_SETFL = 4 701 702class Popen(subprocess.Popen): 703 def recv(self, maxsize=None): 704 return self._recv('stdout', maxsize) 705 706 def recv_err(self, maxsize=None): 707 return self._recv('stderr', maxsize) 708 709 def send_recv(self, input='', maxsize=None): 710 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) 711 712 def get_conn_maxsize(self, which, maxsize): 713 if maxsize is None: 714 maxsize = 1024 715 elif maxsize < 1: 716 maxsize = 1 717 return getattr(self, which), maxsize 718 719 def _close(self, which): 720 getattr(self, which).close() 721 setattr(self, which, None) 722 723 if sys.platform == 'win32': 724 def send(self, input): 725 if not self.stdin: 726 return None 727 728 try: 729 x = msvcrt.get_osfhandle(self.stdin.fileno()) 730 (errCode, written) = WriteFile(x, input) 731 except ValueError: 732 return self._close('stdin') 733 except (subprocess.pywintypes.error, Exception) as why: 734 if why[0] in (109, errno.ESHUTDOWN): 735 return self._close('stdin') 736 raise 737 738 return written 739 740 def _recv(self, which, maxsize): 741 conn, maxsize = self.get_conn_maxsize(which, maxsize) 742 if conn is None: 743 return None 744 745 try: 746 x = msvcrt.get_osfhandle(conn.fileno()) 747 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) 748 if maxsize < nAvail: 749 nAvail = maxsize 750 if nAvail > 0: 751 (errCode, read) = ReadFile(x, nAvail, None) 752 except ValueError: 753 return self._close(which) 754 except (subprocess.pywintypes.error, Exception) as why: 755 if why[0] in (109, errno.ESHUTDOWN): 756 return self._close(which) 757 raise 758 759 #if self.universal_newlines: 760 # read = self._translate_newlines(read) 761 return read 762 763 else: 764 def send(self, input): 765 if not self.stdin: 766 return None 767 768 if not select.select([], [self.stdin], [], 0)[1]: 769 return 0 770 771 try: 772 written = os.write(self.stdin.fileno(), input) 773 except OSError as why: 774 if why[0] == errno.EPIPE: #broken pipe 775 return self._close('stdin') 776 raise 777 778 return written 779 780 def _recv(self, which, maxsize): 781 conn, maxsize = self.get_conn_maxsize(which, maxsize) 782 if conn is None: 783 return None 784 785 try: 786 flags = fcntl.fcntl(conn, fcntl.F_GETFL) 787 except TypeError: 788 flags = None 789 else: 790 if not conn.closed: 791 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) 792 793 try: 794 if not select.select([conn], [], [], 0)[0]: 795 return '' 796 797 r = conn.read(maxsize) 798 if not r: 799 return self._close(which) 800 801 #if self.universal_newlines: 802 # r = self._translate_newlines(r) 803 return r 804 finally: 805 if not conn.closed and not flags is None: 806 fcntl.fcntl(conn, fcntl.F_SETFL, flags) 807 808disconnect_message = "Other end disconnected!" 809 810def recv_some(p, t=.1, e=1, tr=5, stderr=0): 811 if tr < 1: 812 tr = 1 813 x = time.time()+t 814 y = [] 815 r = '' 816 pr = p.recv 817 if stderr: 818 pr = p.recv_err 819 while time.time() < x or r: 820 r = pr() 821 if r is None: 822 if e: 823 raise Exception(disconnect_message) 824 else: 825 break 826 elif r: 827 y.append(r) 828 else: 829 time.sleep(max((x-time.time())/tr, 0)) 830 return ''.join(y) 831 832def send_all(p, data): 833 data = memoryview(data) 834 while len(data): 835 sent = p.send(data) 836 if sent is None: 837 raise Exception(disconnect_message) 838 data = data[sent:] 839 840 841 842class TestCmd(object): 843 """Class TestCmd 844 """ 845 846 def __init__(self, description = None, 847 program = None, 848 interpreter = None, 849 workdir = None, 850 subdir = None, 851 verbose = None, 852 match = None, 853 diff = None, 854 combine = 0, 855 universal_newlines = 1): 856 self._cwd = os.getcwd() 857 self.description_set(description) 858 self.program_set(program) 859 self.interpreter_set(interpreter) 860 if verbose is None: 861 try: 862 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) ) 863 except ValueError: 864 verbose = 0 865 self.verbose_set(verbose) 866 self.combine = combine 867 self.universal_newlines = universal_newlines 868 if match is not None: 869 self.match_function = match 870 else: 871 self.match_function = match_re 872 if diff is not None: 873 self.diff_function = diff 874 else: 875 try: 876 difflib 877 except NameError: 878 pass 879 else: 880 self.diff_function = simple_diff 881 #self.diff_function = difflib.context_diff 882 #self.diff_function = difflib.unified_diff 883 self._dirlist = [] 884 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} 885 if 'PRESERVE' in os.environ and os.environ['PRESERVE'] is not '': 886 self._preserve['pass_test'] = os.environ['PRESERVE'] 887 self._preserve['fail_test'] = os.environ['PRESERVE'] 888 self._preserve['no_result'] = os.environ['PRESERVE'] 889 else: 890 try: 891 self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] 892 except KeyError: 893 pass 894 try: 895 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] 896 except KeyError: 897 pass 898 try: 899 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] 900 except KeyError: 901 pass 902 self._stdout = [] 903 self._stderr = [] 904 self.status = None 905 self.condition = 'no_result' 906 self.workdir_set(workdir) 907 self.subdir(subdir) 908 909 def __del__(self): 910 self.cleanup() 911 912 def __repr__(self): 913 return "%x" % id(self) 914 915 banner_char = '=' 916 banner_width = 80 917 918 def banner(self, s, width=None): 919 if width is None: 920 width = self.banner_width 921 return s + self.banner_char * (width - len(s)) 922 923 if os.name == 'posix': 924 925 def escape(self, arg): 926 "escape shell special characters" 927 slash = '\\' 928 special = '"$' 929 930 arg = arg.replace(slash, slash+slash) 931 for c in special: 932 arg = arg.replace(c, slash+c) 933 934 if re_space.search(arg): 935 arg = '"' + arg + '"' 936 return arg 937 938 else: 939 940 # Windows does not allow special characters in file names 941 # anyway, so no need for an escape function, we will just quote 942 # the arg. 943 def escape(self, arg): 944 if re_space.search(arg): 945 arg = '"' + arg + '"' 946 return arg 947 948 def canonicalize(self, path): 949 if is_List(path): 950 path = os.path.join(*path) 951 if not os.path.isabs(path): 952 path = os.path.join(self.workdir, path) 953 return path 954 955 def chmod(self, path, mode): 956 """Changes permissions on the specified file or directory 957 path name.""" 958 path = self.canonicalize(path) 959 os.chmod(path, mode) 960 961 def cleanup(self, condition = None): 962 """Removes any temporary working directories for the specified 963 TestCmd environment. If the environment variable PRESERVE was 964 set when the TestCmd environment was created, temporary working 965 directories are not removed. If any of the environment variables 966 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set 967 when the TestCmd environment was created, then temporary working 968 directories are not removed if the test passed, failed, or had 969 no result, respectively. Temporary working directories are also 970 preserved for conditions specified via the preserve method. 971 972 Typically, this method is not called directly, but is used when 973 the script exits to clean up temporary working directories as 974 appropriate for the exit status. 975 """ 976 if not self._dirlist: 977 return 978 os.chdir(self._cwd) 979 self.workdir = None 980 if condition is None: 981 condition = self.condition 982 if self._preserve[condition]: 983 for dir in self._dirlist: 984 print("Preserved directory", dir) 985 else: 986 list = self._dirlist[:] 987 list.reverse() 988 for dir in list: 989 self.writable(dir, 1) 990 shutil.rmtree(dir, ignore_errors = 1) 991 self._dirlist = [] 992 993 try: 994 global _Cleanup 995 _Cleanup.remove(self) 996 except (AttributeError, ValueError): 997 pass 998 999 def command_args(self, program = None, 1000 interpreter = None, 1001 arguments = None): 1002 if program: 1003 if type(program) == type('') and not os.path.isabs(program): 1004 program = os.path.join(self._cwd, program) 1005 else: 1006 program = self.program 1007 if not interpreter: 1008 interpreter = self.interpreter 1009 if not type(program) in [type([]), type(())]: 1010 program = [program] 1011 cmd = list(program) 1012 if interpreter: 1013 if not type(interpreter) in [type([]), type(())]: 1014 interpreter = [interpreter] 1015 cmd = list(interpreter) + cmd 1016 if arguments: 1017 if type(arguments) == type(''): 1018 arguments = arguments.split() 1019 cmd.extend(arguments) 1020 return cmd 1021 1022 def description_set(self, description): 1023 """Set the description of the functionality being tested. 1024 """ 1025 self.description = description 1026 1027 try: 1028 difflib 1029 except NameError: 1030 def diff(self, a, b, name, *args, **kw): 1031 print(self.banner('Expected %s' % name)) 1032 print(a) 1033 print(self.banner('Actual %s' % name)) 1034 print(b) 1035 else: 1036 def diff(self, a, b, name, *args, **kw): 1037 print(self.banner(name)) 1038 args = (a.splitlines(), b.splitlines()) + args 1039 lines = self.diff_function(*args, **kw) 1040 for l in lines: 1041 print(l) 1042 1043 def fail_test(self, condition = 1, function = None, skip = 0): 1044 """Cause the test to fail. 1045 """ 1046 if not condition: 1047 return 1048 self.condition = 'fail_test' 1049 fail_test(self = self, 1050 condition = condition, 1051 function = function, 1052 skip = skip) 1053 1054 def interpreter_set(self, interpreter): 1055 """Set the program to be used to interpret the program 1056 under test as a script. 1057 """ 1058 self.interpreter = interpreter 1059 1060 def match(self, lines, matches): 1061 """Compare actual and expected file contents. 1062 """ 1063 return self.match_function(lines, matches) 1064 1065 def match_exact(self, lines, matches): 1066 """Compare actual and expected file contents. 1067 """ 1068 return match_exact(lines, matches) 1069 1070 def match_re(self, lines, res): 1071 """Compare actual and expected file contents. 1072 """ 1073 return match_re(lines, res) 1074 1075 def match_re_dotall(self, lines, res): 1076 """Compare actual and expected file contents. 1077 """ 1078 return match_re_dotall(lines, res) 1079 1080 def no_result(self, condition = 1, function = None, skip = 0): 1081 """Report that the test could not be run. 1082 """ 1083 if not condition: 1084 return 1085 self.condition = 'no_result' 1086 no_result(self = self, 1087 condition = condition, 1088 function = function, 1089 skip = skip) 1090 1091 def pass_test(self, condition = 1, function = None): 1092 """Cause the test to pass. 1093 """ 1094 if not condition: 1095 return 1096 self.condition = 'pass_test' 1097 pass_test(self = self, condition = condition, function = function) 1098 1099 def preserve(self, *conditions): 1100 """Arrange for the temporary working directories for the 1101 specified TestCmd environment to be preserved for one or more 1102 conditions. If no conditions are specified, arranges for 1103 the temporary working directories to be preserved for all 1104 conditions. 1105 """ 1106 if conditions is (): 1107 conditions = ('pass_test', 'fail_test', 'no_result') 1108 for cond in conditions: 1109 self._preserve[cond] = 1 1110 1111 def program_set(self, program): 1112 """Set the executable program or script to be tested. 1113 """ 1114 if program and not os.path.isabs(program): 1115 program = os.path.join(self._cwd, program) 1116 self.program = program 1117 1118 def read(self, file, mode = 'r'): 1119 """Reads and returns the contents of the specified file name. 1120 The file name may be a list, in which case the elements are 1121 concatenated with the os.path.join() method. The file is 1122 assumed to be under the temporary working directory unless it 1123 is an absolute path name. The I/O mode for the file may 1124 be specified; it must begin with an 'r'. The default is 1125 'r' (string read). 1126 """ 1127 file = self.canonicalize(file) 1128 if mode[0] != 'r': 1129 raise ValueError("mode must begin with 'r'") 1130 with open(file, mode) as f: 1131 result = f.read() 1132 return result 1133 1134 def rmdir(self, dir): 1135 """Removes the specified dir name. 1136 The dir name may be a list, in which case the elements are 1137 concatenated with the os.path.join() method. The dir is 1138 assumed to be under the temporary working directory unless it 1139 is an absolute path name. 1140 The dir must be empty. 1141 """ 1142 dir = self.canonicalize(dir) 1143 os.rmdir(dir) 1144 1145 def start(self, program = None, 1146 interpreter = None, 1147 arguments = None, 1148 universal_newlines = None, 1149 **kw): 1150 """ 1151 Starts a program or script for the test environment. 1152 1153 The specified program will have the original directory 1154 prepended unless it is enclosed in a [list]. 1155 """ 1156 cmd = self.command_args(program, interpreter, arguments) 1157 cmd_string = ' '.join(map(self.escape, cmd)) 1158 if self.verbose: 1159 sys.stderr.write(cmd_string + "\n") 1160 if universal_newlines is None: 1161 universal_newlines = self.universal_newlines 1162 1163 # On Windows, if we make stdin a pipe when we plan to send 1164 # no input, and the test program exits before 1165 # Popen calls msvcrt.open_osfhandle, that call will fail. 1166 # So don't use a pipe for stdin if we don't need one. 1167 stdin = kw.get('stdin', None) 1168 if stdin is not None: 1169 stdin = subprocess.PIPE 1170 1171 combine = kw.get('combine', self.combine) 1172 if combine: 1173 stderr_value = subprocess.STDOUT 1174 else: 1175 stderr_value = subprocess.PIPE 1176 1177 return Popen(cmd, 1178 stdin=stdin, 1179 stdout=subprocess.PIPE, 1180 stderr=stderr_value, 1181 universal_newlines=universal_newlines) 1182 1183 def finish(self, popen, **kw): 1184 """ 1185 Finishes and waits for the process being run under control of 1186 the specified popen argument, recording the exit status, 1187 standard output and error output. 1188 """ 1189 popen.stdin.close() 1190 self.status = popen.wait() 1191 if not self.status: 1192 self.status = 0 1193 self._stdout.append(popen.stdout.read()) 1194 if popen.stderr: 1195 stderr = popen.stderr.read() 1196 else: 1197 stderr = '' 1198 self._stderr.append(stderr) 1199 1200 def run(self, program = None, 1201 interpreter = None, 1202 arguments = None, 1203 chdir = None, 1204 stdin = None, 1205 universal_newlines = None): 1206 """Runs a test of the program or script for the test 1207 environment. Standard output and error output are saved for 1208 future retrieval via the stdout() and stderr() methods. 1209 1210 The specified program will have the original directory 1211 prepended unless it is enclosed in a [list]. 1212 """ 1213 if chdir: 1214 oldcwd = os.getcwd() 1215 if not os.path.isabs(chdir): 1216 chdir = os.path.join(self.workpath(chdir)) 1217 if self.verbose: 1218 sys.stderr.write("chdir(" + chdir + ")\n") 1219 os.chdir(chdir) 1220 p = self.start(program, 1221 interpreter, 1222 arguments, 1223 universal_newlines, 1224 stdin=stdin) 1225 if stdin: 1226 if is_List(stdin): 1227 for line in stdin: 1228 p.stdin.write(line) 1229 else: 1230 p.stdin.write(stdin) 1231 p.stdin.close() 1232 1233 out = p.stdout.read() 1234 if p.stderr is None: 1235 err = '' 1236 else: 1237 err = p.stderr.read() 1238 try: 1239 close_output = p.close_output 1240 except AttributeError: 1241 p.stdout.close() 1242 if not p.stderr is None: 1243 p.stderr.close() 1244 else: 1245 close_output() 1246 1247 self._stdout.append(out) 1248 self._stderr.append(err) 1249 1250 self.status = p.wait() 1251 if not self.status: 1252 self.status = 0 1253 1254 if chdir: 1255 os.chdir(oldcwd) 1256 if self.verbose >= 2: 1257 write = sys.stdout.write 1258 write('============ STATUS: %d\n' % self.status) 1259 out = self.stdout() 1260 if out or self.verbose >= 3: 1261 write('============ BEGIN STDOUT (len=%d):\n' % len(out)) 1262 write(out) 1263 write('============ END STDOUT\n') 1264 err = self.stderr() 1265 if err or self.verbose >= 3: 1266 write('============ BEGIN STDERR (len=%d)\n' % len(err)) 1267 write(err) 1268 write('============ END STDERR\n') 1269 1270 def sleep(self, seconds = default_sleep_seconds): 1271 """Sleeps at least the specified number of seconds. If no 1272 number is specified, sleeps at least the minimum number of 1273 seconds necessary to advance file time stamps on the current 1274 system. Sleeping more seconds is all right. 1275 """ 1276 time.sleep(seconds) 1277 1278 def stderr(self, run = None): 1279 """Returns the error output from the specified run number. 1280 If there is no specified run number, then returns the error 1281 output of the last run. If the run number is less than zero, 1282 then returns the error output from that many runs back from the 1283 current run. 1284 """ 1285 if not run: 1286 run = len(self._stderr) 1287 elif run < 0: 1288 run = len(self._stderr) + run 1289 run = run - 1 1290 return self._stderr[run] 1291 1292 def stdout(self, run = None): 1293 """Returns the standard output from the specified run number. 1294 If there is no specified run number, then returns the standard 1295 output of the last run. If the run number is less than zero, 1296 then returns the standard output from that many runs back from 1297 the current run. 1298 """ 1299 if not run: 1300 run = len(self._stdout) 1301 elif run < 0: 1302 run = len(self._stdout) + run 1303 run = run - 1 1304 return self._stdout[run] 1305 1306 def subdir(self, *subdirs): 1307 """Create new subdirectories under the temporary working 1308 directory, one for each argument. An argument may be a list, 1309 in which case the list elements are concatenated using the 1310 os.path.join() method. Subdirectories multiple levels deep 1311 must be created using a separate argument for each level: 1312 1313 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) 1314 1315 Returns the number of subdirectories actually created. 1316 """ 1317 count = 0 1318 for sub in subdirs: 1319 if sub is None: 1320 continue 1321 if is_List(sub): 1322 sub = os.path.join(*sub) 1323 new = os.path.join(self.workdir, sub) 1324 try: 1325 os.mkdir(new) 1326 except OSError: 1327 pass 1328 else: 1329 count = count + 1 1330 return count 1331 1332 def symlink(self, target, link): 1333 """Creates a symlink to the specified target. 1334 The link name may be a list, in which case the elements are 1335 concatenated with the os.path.join() method. The link is 1336 assumed to be under the temporary working directory unless it 1337 is an absolute path name. The target is *not* assumed to be 1338 under the temporary working directory. 1339 """ 1340 link = self.canonicalize(link) 1341 os.symlink(target, link) 1342 1343 def tempdir(self, path=None): 1344 """Creates a temporary directory. 1345 A unique directory name is generated if no path name is specified. 1346 The directory is created, and will be removed when the TestCmd 1347 object is destroyed. 1348 """ 1349 if path is None: 1350 try: 1351 path = tempfile.mktemp(prefix=tempfile.template) 1352 except TypeError: 1353 path = tempfile.mktemp() 1354 os.mkdir(path) 1355 1356 # Symlinks in the path will report things 1357 # differently from os.getcwd(), so chdir there 1358 # and back to fetch the canonical path. 1359 cwd = os.getcwd() 1360 try: 1361 os.chdir(path) 1362 path = os.getcwd() 1363 finally: 1364 os.chdir(cwd) 1365 1366 # Uppercase the drive letter since the case of drive 1367 # letters is pretty much random on win32: 1368 drive,rest = os.path.splitdrive(path) 1369 if drive: 1370 path = drive.upper() + rest 1371 1372 # 1373 self._dirlist.append(path) 1374 global _Cleanup 1375 try: 1376 _Cleanup.index(self) 1377 except ValueError: 1378 _Cleanup.append(self) 1379 1380 return path 1381 1382 def touch(self, path, mtime=None): 1383 """Updates the modification time on the specified file or 1384 directory path name. The default is to update to the 1385 current time if no explicit modification time is specified. 1386 """ 1387 path = self.canonicalize(path) 1388 atime = os.path.getatime(path) 1389 if mtime is None: 1390 mtime = time.time() 1391 os.utime(path, (atime, mtime)) 1392 1393 def unlink(self, file): 1394 """Unlinks the specified file name. 1395 The file name may be a list, in which case the elements are 1396 concatenated with the os.path.join() method. The file is 1397 assumed to be under the temporary working directory unless it 1398 is an absolute path name. 1399 """ 1400 file = self.canonicalize(file) 1401 os.unlink(file) 1402 1403 def verbose_set(self, verbose): 1404 """Set the verbose level. 1405 """ 1406 self.verbose = verbose 1407 1408 def where_is(self, file, path=None, pathext=None): 1409 """Find an executable file. 1410 """ 1411 if is_List(file): 1412 file = os.path.join(*file) 1413 if not os.path.isabs(file): 1414 file = where_is(file, path, pathext) 1415 return file 1416 1417 def workdir_set(self, path): 1418 """Creates a temporary working directory with the specified 1419 path name. If the path is a null string (''), a unique 1420 directory name is created. 1421 """ 1422 if (path != None): 1423 if path == '': 1424 path = None 1425 path = self.tempdir(path) 1426 self.workdir = path 1427 1428 def workpath(self, *args): 1429 """Returns the absolute path name to a subdirectory or file 1430 within the current temporary working directory. Concatenates 1431 the temporary working directory name with the specified 1432 arguments using the os.path.join() method. 1433 """ 1434 return os.path.join(self.workdir, *args) 1435 1436 def readable(self, top, read=1): 1437 """Make the specified directory tree readable (read == 1) 1438 or not (read == None). 1439 1440 This method has no effect on Windows systems, which use a 1441 completely different mechanism to control file readability. 1442 """ 1443 1444 if sys.platform == 'win32': 1445 return 1446 1447 if read: 1448 def do_chmod(fname): 1449 try: st = os.stat(fname) 1450 except OSError: pass 1451 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD)) 1452 else: 1453 def do_chmod(fname): 1454 try: st = os.stat(fname) 1455 except OSError: pass 1456 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD)) 1457 1458 if os.path.isfile(top): 1459 # If it's a file, that's easy, just chmod it. 1460 do_chmod(top) 1461 elif read: 1462 # It's a directory and we're trying to turn on read 1463 # permission, so it's also pretty easy, just chmod the 1464 # directory and then chmod every entry on our walk down the 1465 # tree. Because os.walk() is top-down, we'll enable 1466 # read permission on any directories that have it disabled 1467 # before os.walk() tries to list their contents. 1468 do_chmod(top) 1469 1470 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1471 for n in names: 1472 do_chmod(os.path.join(dirname, n)) 1473 1474 os.walk(top, chmod_entries, None) 1475 else: 1476 # It's a directory and we're trying to turn off read 1477 # permission, which means we have to chmod the directoreis 1478 # in the tree bottom-up, lest disabling read permission from 1479 # the top down get in the way of being able to get at lower 1480 # parts of the tree. But os.walk() visits things top 1481 # down, so we just use an object to collect a list of all 1482 # of the entries in the tree, reverse the list, and then 1483 # chmod the reversed (bottom-up) list. 1484 col = Collector(top) 1485 os.walk(top, col, None) 1486 col.entries.reverse() 1487 for d in col.entries: do_chmod(d) 1488 1489 def writable(self, top, write=1): 1490 """Make the specified directory tree writable (write == 1) 1491 or not (write == None). 1492 """ 1493 1494 if sys.platform == 'win32': 1495 1496 if write: 1497 def do_chmod(fname): 1498 try: os.chmod(fname, stat.S_IWRITE) 1499 except OSError: pass 1500 else: 1501 def do_chmod(fname): 1502 try: os.chmod(fname, stat.S_IREAD) 1503 except OSError: pass 1504 1505 else: 1506 1507 if write: 1508 def do_chmod(fname): 1509 try: st = os.stat(fname) 1510 except OSError: pass 1511 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0o200)) 1512 else: 1513 def do_chmod(fname): 1514 try: st = os.stat(fname) 1515 except OSError: pass 1516 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0o200)) 1517 1518 if os.path.isfile(top): 1519 do_chmod(top) 1520 else: 1521 col = Collector(top) 1522 os.walk(top, col, None) 1523 for d in col.entries: do_chmod(d) 1524 1525 def executable(self, top, execute=1): 1526 """Make the specified directory tree executable (execute == 1) 1527 or not (execute == None). 1528 1529 This method has no effect on Windows systems, which use a 1530 completely different mechanism to control file executability. 1531 """ 1532 1533 if sys.platform == 'win32': 1534 return 1535 1536 if execute: 1537 def do_chmod(fname): 1538 try: st = os.stat(fname) 1539 except OSError: pass 1540 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC)) 1541 else: 1542 def do_chmod(fname): 1543 try: st = os.stat(fname) 1544 except OSError: pass 1545 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC)) 1546 1547 if os.path.isfile(top): 1548 # If it's a file, that's easy, just chmod it. 1549 do_chmod(top) 1550 elif execute: 1551 # It's a directory and we're trying to turn on execute 1552 # permission, so it's also pretty easy, just chmod the 1553 # directory and then chmod every entry on our walk down the 1554 # tree. Because os.walk() is top-down, we'll enable 1555 # execute permission on any directories that have it disabled 1556 # before os.walk() tries to list their contents. 1557 do_chmod(top) 1558 1559 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1560 for n in names: 1561 do_chmod(os.path.join(dirname, n)) 1562 1563 os.walk(top, chmod_entries, None) 1564 else: 1565 # It's a directory and we're trying to turn off execute 1566 # permission, which means we have to chmod the directories 1567 # in the tree bottom-up, lest disabling execute permission from 1568 # the top down get in the way of being able to get at lower 1569 # parts of the tree. But os.walk() visits things top 1570 # down, so we just use an object to collect a list of all 1571 # of the entries in the tree, reverse the list, and then 1572 # chmod the reversed (bottom-up) list. 1573 col = Collector(top) 1574 os.walk(top, col, None) 1575 col.entries.reverse() 1576 for d in col.entries: do_chmod(d) 1577 1578 def write(self, file, content, mode = 'w'): 1579 """Writes the specified content text (second argument) to the 1580 specified file name (first argument). The file name may be 1581 a list, in which case the elements are concatenated with the 1582 os.path.join() method. The file is created under the temporary 1583 working directory. Any subdirectories in the path must already 1584 exist. The I/O mode for the file may be specified; it must 1585 begin with a 'w'. The default is 'w' (binary write). 1586 """ 1587 file = self.canonicalize(file) 1588 if mode[0] != 'w': 1589 raise ValueError("mode must begin with 'w'") 1590 with open(file, mode) as f: 1591 f.write(content) 1592 1593# Local Variables: 1594# tab-width:4 1595# indent-tabs-mode:nil 1596# End: 1597# vim: set expandtab tabstop=4 shiftwidth=4: 1598