1""" 2TestCmd.py: a testing framework for commands and scripts. 3 4The TestCmd module provides a framework for portable automated testing 5of executable commands and scripts (in any language, not just Python), 6especially commands and scripts that require file system interaction. 7 8In addition to running tests and evaluating conditions, the TestCmd 9module manages and cleans up one or more temporary workspace 10directories, and provides methods for creating files and directories in 11those workspace directories from in-line data, here-documents), allowing 12tests to be completely self-contained. 13 14A TestCmd environment object is created via the usual invocation: 15 16 import TestCmd 17 test = TestCmd.TestCmd() 18 19There are a bunch of keyword arguments available at instantiation: 20 21 test = TestCmd.TestCmd(description = 'string', 22 program = 'program_or_script_to_test', 23 interpreter = 'script_interpreter', 24 workdir = 'prefix', 25 subdir = 'subdir', 26 verbose = Boolean, 27 match = default_match_function, 28 diff = default_diff_function, 29 combine = Boolean) 30 31There are a bunch of methods that let you do different things: 32 33 test.verbose_set(1) 34 35 test.description_set('string') 36 37 test.program_set('program_or_script_to_test') 38 39 test.interpreter_set('script_interpreter') 40 test.interpreter_set(['script_interpreter', 'arg']) 41 42 test.workdir_set('prefix') 43 test.workdir_set('') 44 45 test.workpath('file') 46 test.workpath('subdir', 'file') 47 48 test.subdir('subdir', ...) 49 50 test.rmdir('subdir', ...) 51 52 test.write('file', "contents\n") 53 test.write(['subdir', 'file'], "contents\n") 54 55 test.read('file') 56 test.read(['subdir', 'file']) 57 test.read('file', mode) 58 test.read(['subdir', 'file'], mode) 59 60 test.writable('dir', 1) 61 test.writable('dir', None) 62 63 test.preserve(condition, ...) 64 65 test.cleanup(condition) 66 67 test.command_args(program = 'program_or_script_to_run', 68 interpreter = 'script_interpreter', 69 arguments = 'arguments to pass to program') 70 71 test.run(program = 'program_or_script_to_run', 72 interpreter = 'script_interpreter', 73 arguments = 'arguments to pass to program', 74 chdir = 'directory_to_chdir_to', 75 stdin = 'input to feed to the program\n') 76 universal_newlines = True) 77 78 p = test.start(program = 'program_or_script_to_run', 79 interpreter = 'script_interpreter', 80 arguments = 'arguments to pass to program', 81 universal_newlines = None) 82 83 test.finish(self, p) 84 85 test.pass_test() 86 test.pass_test(condition) 87 test.pass_test(condition, function) 88 89 test.fail_test() 90 test.fail_test(condition) 91 test.fail_test(condition, function) 92 test.fail_test(condition, function, skip) 93 94 test.no_result() 95 test.no_result(condition) 96 test.no_result(condition, function) 97 test.no_result(condition, function, skip) 98 99 test.stdout() 100 test.stdout(run) 101 102 test.stderr() 103 test.stderr(run) 104 105 test.symlink(target, link) 106 107 test.banner(string) 108 test.banner(string, width) 109 110 test.diff(actual, expected) 111 112 test.match(actual, expected) 113 114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") 115 test.match_exact(["actual 1\n", "actual 2\n"], 116 ["expected 1\n", "expected 2\n"]) 117 118 test.match_re("actual 1\nactual 2\n", regex_string) 119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes) 120 121 test.match_re_dotall("actual 1\nactual 2\n", regex_string) 122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) 123 124 test.tempdir() 125 test.tempdir('temporary-directory') 126 127 test.sleep() 128 test.sleep(seconds) 129 130 test.where_is('foo') 131 test.where_is('foo', 'PATH1:PATH2') 132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 133 134 test.unlink('file') 135 test.unlink('subdir', 'file') 136 137The TestCmd module provides pass_test(), fail_test(), and no_result() 138unbound functions that report test results for use with the Aegis change 139management system. These methods terminate the test immediately, 140reporting PASSED, FAILED, or NO RESULT respectively, and exiting with 141status 0 (success), 1 or 2 respectively. This allows for a distinction 142between an actual failed test and a test that could not be properly 143evaluated because of an external condition (such as a full file system 144or incorrect permissions). 145 146 import TestCmd 147 148 TestCmd.pass_test() 149 TestCmd.pass_test(condition) 150 TestCmd.pass_test(condition, function) 151 152 TestCmd.fail_test() 153 TestCmd.fail_test(condition) 154 TestCmd.fail_test(condition, function) 155 TestCmd.fail_test(condition, function, skip) 156 157 TestCmd.no_result() 158 TestCmd.no_result(condition) 159 TestCmd.no_result(condition, function) 160 TestCmd.no_result(condition, function, skip) 161 162The TestCmd module also provides unbound functions that handle matching 163in the same way as the match_*() methods described above. 164 165 import TestCmd 166 167 test = TestCmd.TestCmd(match = TestCmd.match_exact) 168 169 test = TestCmd.TestCmd(match = TestCmd.match_re) 170 171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) 172 173The TestCmd module provides unbound functions that can be used for the 174"diff" argument to TestCmd.TestCmd instantiation: 175 176 import TestCmd 177 178 test = TestCmd.TestCmd(match = TestCmd.match_re, 179 diff = TestCmd.diff_re) 180 181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff) 182 183The "diff" argument can also be used with standard difflib functions: 184 185 import difflib 186 187 test = TestCmd.TestCmd(diff = difflib.context_diff) 188 189 test = TestCmd.TestCmd(diff = difflib.unified_diff) 190 191Lastly, the where_is() method also exists in an unbound function 192version. 193 194 import TestCmd 195 196 TestCmd.where_is('foo') 197 TestCmd.where_is('foo', 'PATH1:PATH2') 198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 199""" 200 201# Copyright 2000-2010 Steven Knight 202# This module is free software, and you may redistribute it and/or modify 203# it under the same terms as Python itself, so long as this copyright message 204# and disclaimer are retained in their original form. 205# 206# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, 207# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF 208# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH 209# DAMAGE. 210# 211# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 212# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 213# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, 214# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, 215# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 216 217__author__ = "Steven Knight <knight at baldmt dot com>" 218__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight" 219__version__ = "0.37" 220 221import errno 222import os 223import os.path 224import re 225import shutil 226import stat 227import string 228import sys 229import tempfile 230import time 231import traceback 232import types 233import UserList 234 235__all__ = [ 236 'diff_re', 237 'fail_test', 238 'no_result', 239 'pass_test', 240 'match_exact', 241 'match_re', 242 'match_re_dotall', 243 'python_executable', 244 'TestCmd' 245] 246 247try: 248 import difflib 249except ImportError: 250 __all__.append('simple_diff') 251 252def is_List(e): 253 return type(e) is types.ListType \ 254 or isinstance(e, UserList.UserList) 255 256try: 257 from UserString import UserString 258except ImportError: 259 class UserString: 260 pass 261 262if hasattr(types, 'UnicodeType'): 263 def is_String(e): 264 return type(e) is types.StringType \ 265 or type(e) is types.UnicodeType \ 266 or isinstance(e, UserString) 267else: 268 def is_String(e): 269 return type(e) is types.StringType or isinstance(e, UserString) 270 271tempfile.template = 'testcmd.' 272if os.name in ('posix', 'nt'): 273 tempfile.template = 'testcmd.' + str(os.getpid()) + '.' 274else: 275 tempfile.template = 'testcmd.' 276 277re_space = re.compile('\s') 278 279_Cleanup = [] 280 281_chain_to_exitfunc = None 282 283def _clean(): 284 global _Cleanup 285 cleanlist = filter(None, _Cleanup) 286 del _Cleanup[:] 287 cleanlist.reverse() 288 for test in cleanlist: 289 test.cleanup() 290 if _chain_to_exitfunc: 291 _chain_to_exitfunc() 292 293try: 294 import atexit 295except ImportError: 296 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc 297 try: 298 _chain_to_exitfunc = sys.exitfunc 299 except AttributeError: 300 pass 301 sys.exitfunc = _clean 302else: 303 atexit.register(_clean) 304 305try: 306 zip 307except NameError: 308 def zip(*lists): 309 result = [] 310 for i in xrange(min(map(len, lists))): 311 result.append(tuple(map(lambda l, i=i: l[i], lists))) 312 return result 313 314class Collector: 315 def __init__(self, top): 316 self.entries = [top] 317 def __call__(self, arg, dirname, names): 318 pathjoin = lambda n, d=dirname: os.path.join(d, n) 319 self.entries.extend(map(pathjoin, names)) 320 321def _caller(tblist, skip): 322 string = "" 323 arr = [] 324 for file, line, name, text in tblist: 325 if file[-10:] == "TestCmd.py": 326 break 327 arr = [(file, line, name, text)] + arr 328 atfrom = "at" 329 for file, line, name, text in arr[skip:]: 330 if name in ("?", "<module>"): 331 name = "" 332 else: 333 name = " (" + name + ")" 334 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) 335 atfrom = "\tfrom" 336 return string 337 338def fail_test(self = None, condition = 1, function = None, skip = 0): 339 """Cause the test to fail. 340 341 By default, the fail_test() method reports that the test FAILED 342 and exits with a status of 1. If a condition argument is supplied, 343 the test fails only if the condition is true. 344 """ 345 if not condition: 346 return 347 if not function is None: 348 function() 349 of = "" 350 desc = "" 351 sep = " " 352 if not self is None: 353 if self.program: 354 of = " of " + self.program 355 sep = "\n\t" 356 if self.description: 357 desc = " [" + self.description + "]" 358 sep = "\n\t" 359 360 at = _caller(traceback.extract_stack(), skip) 361 sys.stderr.write("FAILED test" + of + desc + sep + at) 362 363 sys.exit(1) 364 365def no_result(self = None, condition = 1, function = None, skip = 0): 366 """Causes a test to exit with no valid result. 367 368 By default, the no_result() method reports NO RESULT for the test 369 and exits with a status of 2. If a condition argument is supplied, 370 the test fails only if the condition is true. 371 """ 372 if not condition: 373 return 374 if not function is None: 375 function() 376 of = "" 377 desc = "" 378 sep = " " 379 if not self is None: 380 if self.program: 381 of = " of " + self.program 382 sep = "\n\t" 383 if self.description: 384 desc = " [" + self.description + "]" 385 sep = "\n\t" 386 387 at = _caller(traceback.extract_stack(), skip) 388 sys.stderr.write("NO RESULT for test" + of + desc + sep + at) 389 390 sys.exit(2) 391 392def pass_test(self = None, condition = 1, function = None): 393 """Causes a test to pass. 394 395 By default, the pass_test() method reports PASSED for the test 396 and exits with a status of 0. If a condition argument is supplied, 397 the test passes only if the condition is true. 398 """ 399 if not condition: 400 return 401 if not function is None: 402 function() 403 sys.stderr.write("PASSED\n") 404 sys.exit(0) 405 406def match_exact(lines = None, matches = None): 407 """ 408 """ 409 if not is_List(lines): 410 lines = string.split(lines, "\n") 411 if not is_List(matches): 412 matches = string.split(matches, "\n") 413 if len(lines) != len(matches): 414 return 415 for i in range(len(lines)): 416 if lines[i] != matches[i]: 417 return 418 return 1 419 420def match_re(lines = None, res = None): 421 """ 422 """ 423 if not is_List(lines): 424 lines = string.split(lines, "\n") 425 if not is_List(res): 426 res = string.split(res, "\n") 427 if len(lines) != len(res): 428 return 429 for i in range(len(lines)): 430 s = "^" + res[i] + "$" 431 try: 432 expr = re.compile(s) 433 except re.error, e: 434 msg = "Regular expression error in %s: %s" 435 raise re.error, msg % (repr(s), e[0]) 436 if not expr.search(lines[i]): 437 return 438 return 1 439 440def match_re_dotall(lines = None, res = None): 441 """ 442 """ 443 if not type(lines) is type(""): 444 lines = string.join(lines, "\n") 445 if not type(res) is type(""): 446 res = string.join(res, "\n") 447 s = "^" + res + "$" 448 try: 449 expr = re.compile(s, re.DOTALL) 450 except re.error, e: 451 msg = "Regular expression error in %s: %s" 452 raise re.error, msg % (repr(s), e[0]) 453 if expr.match(lines): 454 return 1 455 456try: 457 import difflib 458except ImportError: 459 pass 460else: 461 def simple_diff(a, b, fromfile='', tofile='', 462 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 463 """ 464 A function with the same calling signature as difflib.context_diff 465 (diff -c) and difflib.unified_diff (diff -u) but which prints 466 output like the simple, unadorned 'diff" command. 467 """ 468 sm = difflib.SequenceMatcher(None, a, b) 469 def comma(x1, x2): 470 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) 471 result = [] 472 for op, a1, a2, b1, b2 in sm.get_opcodes(): 473 if op == 'delete': 474 result.append("%sd%d" % (comma(a1, a2), b1)) 475 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 476 elif op == 'insert': 477 result.append("%da%s" % (a1, comma(b1, b2))) 478 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 479 elif op == 'replace': 480 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) 481 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 482 result.append('---') 483 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 484 return result 485 486def diff_re(a, b, fromfile='', tofile='', 487 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 488 """ 489 A simple "diff" of two sets of lines when the expected lines 490 are regular expressions. This is a really dumb thing that 491 just compares each line in turn, so it doesn't look for 492 chunks of matching lines and the like--but at least it lets 493 you know exactly which line first didn't compare correctl... 494 """ 495 result = [] 496 diff = len(a) - len(b) 497 if diff < 0: 498 a = a + ['']*(-diff) 499 elif diff > 0: 500 b = b + ['']*diff 501 i = 0 502 for aline, bline in zip(a, b): 503 s = "^" + aline + "$" 504 try: 505 expr = re.compile(s) 506 except re.error, e: 507 msg = "Regular expression error in %s: %s" 508 raise re.error, msg % (repr(s), e[0]) 509 if not expr.search(bline): 510 result.append("%sc%s" % (i+1, i+1)) 511 result.append('< ' + repr(a[i])) 512 result.append('---') 513 result.append('> ' + repr(b[i])) 514 i = i+1 515 return result 516 517if os.name == 'java': 518 519 python_executable = os.path.join(sys.prefix, 'jython') 520 521else: 522 523 python_executable = sys.executable 524 525if sys.platform == 'win32': 526 527 default_sleep_seconds = 2 528 529 def where_is(file, path=None, pathext=None): 530 if path is None: 531 path = os.environ['PATH'] 532 if is_String(path): 533 path = string.split(path, os.pathsep) 534 if pathext is None: 535 pathext = os.environ['PATHEXT'] 536 if is_String(pathext): 537 pathext = string.split(pathext, os.pathsep) 538 for ext in pathext: 539 if string.lower(ext) == string.lower(file[-len(ext):]): 540 pathext = [''] 541 break 542 for dir in path: 543 f = os.path.join(dir, file) 544 for ext in pathext: 545 fext = f + ext 546 if os.path.isfile(fext): 547 return fext 548 return None 549 550else: 551 552 def where_is(file, path=None, pathext=None): 553 if path is None: 554 path = os.environ['PATH'] 555 if is_String(path): 556 path = string.split(path, os.pathsep) 557 for dir in path: 558 f = os.path.join(dir, file) 559 if os.path.isfile(f): 560 try: 561 st = os.stat(f) 562 except OSError: 563 continue 564 if stat.S_IMODE(st[stat.ST_MODE]) & 0111: 565 return f 566 return None 567 568 default_sleep_seconds = 1 569 570 571 572try: 573 import subprocess 574except ImportError: 575 # The subprocess module doesn't exist in this version of Python, 576 # so we're going to cobble up something that looks just enough 577 # like its API for our purposes below. 578 import new 579 580 subprocess = new.module('subprocess') 581 582 subprocess.PIPE = 'PIPE' 583 subprocess.STDOUT = 'STDOUT' 584 subprocess.mswindows = (sys.platform == 'win32') 585 586 try: 587 import popen2 588 popen2.Popen3 589 except AttributeError: 590 class Popen3: 591 universal_newlines = 1 592 def __init__(self, command, **kw): 593 if sys.platform == 'win32' and command[0] == '"': 594 command = '"' + command + '"' 595 (stdin, stdout, stderr) = os.popen3(' ' + command) 596 self.stdin = stdin 597 self.stdout = stdout 598 self.stderr = stderr 599 def close_output(self): 600 self.stdout.close() 601 self.resultcode = self.stderr.close() 602 def wait(self): 603 resultcode = self.resultcode 604 if os.WIFEXITED(resultcode): 605 return os.WEXITSTATUS(resultcode) 606 elif os.WIFSIGNALED(resultcode): 607 return os.WTERMSIG(resultcode) 608 else: 609 return None 610 611 else: 612 try: 613 popen2.Popen4 614 except AttributeError: 615 # A cribbed Popen4 class, with some retrofitted code from 616 # the Python 1.5 Popen3 class methods to do certain things 617 # by hand. 618 class Popen4(popen2.Popen3): 619 childerr = None 620 621 def __init__(self, cmd, bufsize=-1): 622 p2cread, p2cwrite = os.pipe() 623 c2pread, c2pwrite = os.pipe() 624 self.pid = os.fork() 625 if self.pid == 0: 626 # Child 627 os.dup2(p2cread, 0) 628 os.dup2(c2pwrite, 1) 629 os.dup2(c2pwrite, 2) 630 for i in range(3, popen2.MAXFD): 631 try: 632 os.close(i) 633 except: pass 634 try: 635 os.execvp(cmd[0], cmd) 636 finally: 637 os._exit(1) 638 # Shouldn't come here, I guess 639 os._exit(1) 640 os.close(p2cread) 641 self.tochild = os.fdopen(p2cwrite, 'w', bufsize) 642 os.close(c2pwrite) 643 self.fromchild = os.fdopen(c2pread, 'r', bufsize) 644 popen2._active.append(self) 645 646 popen2.Popen4 = Popen4 647 648 class Popen3(popen2.Popen3, popen2.Popen4): 649 universal_newlines = 1 650 def __init__(self, command, **kw): 651 if kw.get('stderr') == 'STDOUT': 652 apply(popen2.Popen4.__init__, (self, command, 1)) 653 else: 654 apply(popen2.Popen3.__init__, (self, command, 1)) 655 self.stdin = self.tochild 656 self.stdout = self.fromchild 657 self.stderr = self.childerr 658 def wait(self, *args, **kw): 659 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw) 660 if os.WIFEXITED(resultcode): 661 return os.WEXITSTATUS(resultcode) 662 elif os.WIFSIGNALED(resultcode): 663 return os.WTERMSIG(resultcode) 664 else: 665 return None 666 667 subprocess.Popen = Popen3 668 669 670 671# From Josiah Carlson, 672# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms 673# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 674 675PIPE = subprocess.PIPE 676 677if subprocess.mswindows: 678 from win32file import ReadFile, WriteFile 679 from win32pipe import PeekNamedPipe 680 import msvcrt 681else: 682 import select 683 import fcntl 684 685 try: fcntl.F_GETFL 686 except AttributeError: fcntl.F_GETFL = 3 687 688 try: fcntl.F_SETFL 689 except AttributeError: fcntl.F_SETFL = 4 690 691class Popen(subprocess.Popen): 692 def recv(self, maxsize=None): 693 return self._recv('stdout', maxsize) 694 695 def recv_err(self, maxsize=None): 696 return self._recv('stderr', maxsize) 697 698 def send_recv(self, input='', maxsize=None): 699 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) 700 701 def get_conn_maxsize(self, which, maxsize): 702 if maxsize is None: 703 maxsize = 1024 704 elif maxsize < 1: 705 maxsize = 1 706 return getattr(self, which), maxsize 707 708 def _close(self, which): 709 getattr(self, which).close() 710 setattr(self, which, None) 711 712 if subprocess.mswindows: 713 def send(self, input): 714 if not self.stdin: 715 return None 716 717 try: 718 x = msvcrt.get_osfhandle(self.stdin.fileno()) 719 (errCode, written) = WriteFile(x, input) 720 except ValueError: 721 return self._close('stdin') 722 except (subprocess.pywintypes.error, Exception), why: 723 if why[0] in (109, errno.ESHUTDOWN): 724 return self._close('stdin') 725 raise 726 727 return written 728 729 def _recv(self, which, maxsize): 730 conn, maxsize = self.get_conn_maxsize(which, maxsize) 731 if conn is None: 732 return None 733 734 try: 735 x = msvcrt.get_osfhandle(conn.fileno()) 736 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) 737 if maxsize < nAvail: 738 nAvail = maxsize 739 if nAvail > 0: 740 (errCode, read) = ReadFile(x, nAvail, None) 741 except ValueError: 742 return self._close(which) 743 except (subprocess.pywintypes.error, Exception), why: 744 if why[0] in (109, errno.ESHUTDOWN): 745 return self._close(which) 746 raise 747 748 #if self.universal_newlines: 749 # read = self._translate_newlines(read) 750 return read 751 752 else: 753 def send(self, input): 754 if not self.stdin: 755 return None 756 757 if not select.select([], [self.stdin], [], 0)[1]: 758 return 0 759 760 try: 761 written = os.write(self.stdin.fileno(), input) 762 except OSError, why: 763 if why[0] == errno.EPIPE: #broken pipe 764 return self._close('stdin') 765 raise 766 767 return written 768 769 def _recv(self, which, maxsize): 770 conn, maxsize = self.get_conn_maxsize(which, maxsize) 771 if conn is None: 772 return None 773 774 try: 775 flags = fcntl.fcntl(conn, fcntl.F_GETFL) 776 except TypeError: 777 flags = None 778 else: 779 if not conn.closed: 780 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) 781 782 try: 783 if not select.select([conn], [], [], 0)[0]: 784 return '' 785 786 r = conn.read(maxsize) 787 if not r: 788 return self._close(which) 789 790 #if self.universal_newlines: 791 # r = self._translate_newlines(r) 792 return r 793 finally: 794 if not conn.closed and not flags is None: 795 fcntl.fcntl(conn, fcntl.F_SETFL, flags) 796 797disconnect_message = "Other end disconnected!" 798 799def recv_some(p, t=.1, e=1, tr=5, stderr=0): 800 if tr < 1: 801 tr = 1 802 x = time.time()+t 803 y = [] 804 r = '' 805 pr = p.recv 806 if stderr: 807 pr = p.recv_err 808 while time.time() < x or r: 809 r = pr() 810 if r is None: 811 if e: 812 raise Exception(disconnect_message) 813 else: 814 break 815 elif r: 816 y.append(r) 817 else: 818 time.sleep(max((x-time.time())/tr, 0)) 819 return ''.join(y) 820 821# TODO(3.0: rewrite to use memoryview() 822def send_all(p, data): 823 while len(data): 824 sent = p.send(data) 825 if sent is None: 826 raise Exception(disconnect_message) 827 data = buffer(data, sent) 828 829 830 831try: 832 object 833except NameError: 834 class object: 835 pass 836 837 838 839class TestCmd(object): 840 """Class TestCmd 841 """ 842 843 def __init__(self, description = None, 844 program = None, 845 interpreter = None, 846 workdir = None, 847 subdir = None, 848 verbose = None, 849 match = None, 850 diff = None, 851 combine = 0, 852 universal_newlines = 1): 853 self._cwd = os.getcwd() 854 self.description_set(description) 855 self.program_set(program) 856 self.interpreter_set(interpreter) 857 if verbose is None: 858 try: 859 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) ) 860 except ValueError: 861 verbose = 0 862 self.verbose_set(verbose) 863 self.combine = combine 864 self.universal_newlines = universal_newlines 865 if not match is None: 866 self.match_function = match 867 else: 868 self.match_function = match_re 869 if not diff is None: 870 self.diff_function = diff 871 else: 872 try: 873 difflib 874 except NameError: 875 pass 876 else: 877 self.diff_function = simple_diff 878 #self.diff_function = difflib.context_diff 879 #self.diff_function = difflib.unified_diff 880 self._dirlist = [] 881 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} 882 if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '': 883 self._preserve['pass_test'] = os.environ['PRESERVE'] 884 self._preserve['fail_test'] = os.environ['PRESERVE'] 885 self._preserve['no_result'] = os.environ['PRESERVE'] 886 else: 887 try: 888 self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] 889 except KeyError: 890 pass 891 try: 892 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] 893 except KeyError: 894 pass 895 try: 896 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] 897 except KeyError: 898 pass 899 self._stdout = [] 900 self._stderr = [] 901 self.status = None 902 self.condition = 'no_result' 903 self.workdir_set(workdir) 904 self.subdir(subdir) 905 906 def __del__(self): 907 self.cleanup() 908 909 def __repr__(self): 910 return "%x" % id(self) 911 912 banner_char = '=' 913 banner_width = 80 914 915 def banner(self, s, width=None): 916 if width is None: 917 width = self.banner_width 918 return s + self.banner_char * (width - len(s)) 919 920 if os.name == 'posix': 921 922 def escape(self, arg): 923 "escape shell special characters" 924 slash = '\\' 925 special = '"$' 926 927 arg = string.replace(arg, slash, slash+slash) 928 for c in special: 929 arg = string.replace(arg, c, slash+c) 930 931 if re_space.search(arg): 932 arg = '"' + arg + '"' 933 return arg 934 935 else: 936 937 # Windows does not allow special characters in file names 938 # anyway, so no need for an escape function, we will just quote 939 # the arg. 940 def escape(self, arg): 941 if re_space.search(arg): 942 arg = '"' + arg + '"' 943 return arg 944 945 def canonicalize(self, path): 946 if is_List(path): 947 path = apply(os.path.join, tuple(path)) 948 if not os.path.isabs(path): 949 path = os.path.join(self.workdir, path) 950 return path 951 952 def chmod(self, path, mode): 953 """Changes permissions on the specified file or directory 954 path name.""" 955 path = self.canonicalize(path) 956 os.chmod(path, mode) 957 958 def cleanup(self, condition = None): 959 """Removes any temporary working directories for the specified 960 TestCmd environment. If the environment variable PRESERVE was 961 set when the TestCmd environment was created, temporary working 962 directories are not removed. If any of the environment variables 963 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set 964 when the TestCmd environment was created, then temporary working 965 directories are not removed if the test passed, failed, or had 966 no result, respectively. Temporary working directories are also 967 preserved for conditions specified via the preserve method. 968 969 Typically, this method is not called directly, but is used when 970 the script exits to clean up temporary working directories as 971 appropriate for the exit status. 972 """ 973 if not self._dirlist: 974 return 975 os.chdir(self._cwd) 976 self.workdir = None 977 if condition is None: 978 condition = self.condition 979 if self._preserve[condition]: 980 for dir in self._dirlist: 981 print "Preserved directory", dir 982 else: 983 list = self._dirlist[:] 984 list.reverse() 985 for dir in list: 986 self.writable(dir, 1) 987 shutil.rmtree(dir, ignore_errors = 1) 988 self._dirlist = [] 989 990 try: 991 global _Cleanup 992 _Cleanup.remove(self) 993 except (AttributeError, ValueError): 994 pass 995 996 def command_args(self, program = None, 997 interpreter = None, 998 arguments = None): 999 if program: 1000 if type(program) == type('') and not os.path.isabs(program): 1001 program = os.path.join(self._cwd, program) 1002 else: 1003 program = self.program 1004 if not interpreter: 1005 interpreter = self.interpreter 1006 if not type(program) in [type([]), type(())]: 1007 program = [program] 1008 cmd = list(program) 1009 if interpreter: 1010 if not type(interpreter) in [type([]), type(())]: 1011 interpreter = [interpreter] 1012 cmd = list(interpreter) + cmd 1013 if arguments: 1014 if type(arguments) == type(''): 1015 arguments = string.split(arguments) 1016 cmd.extend(arguments) 1017 return cmd 1018 1019 def description_set(self, description): 1020 """Set the description of the functionality being tested. 1021 """ 1022 self.description = description 1023 1024 try: 1025 difflib 1026 except NameError: 1027 def diff(self, a, b, name, *args, **kw): 1028 print self.banner('Expected %s' % name) 1029 print a 1030 print self.banner('Actual %s' % name) 1031 print b 1032 else: 1033 def diff(self, a, b, name, *args, **kw): 1034 print self.banner(name) 1035 args = (a.splitlines(), b.splitlines()) + args 1036 lines = apply(self.diff_function, args, kw) 1037 for l in lines: 1038 print l 1039 1040 def fail_test(self, condition = 1, function = None, skip = 0): 1041 """Cause the test to fail. 1042 """ 1043 if not condition: 1044 return 1045 self.condition = 'fail_test' 1046 fail_test(self = self, 1047 condition = condition, 1048 function = function, 1049 skip = skip) 1050 1051 def interpreter_set(self, interpreter): 1052 """Set the program to be used to interpret the program 1053 under test as a script. 1054 """ 1055 self.interpreter = interpreter 1056 1057 def match(self, lines, matches): 1058 """Compare actual and expected file contents. 1059 """ 1060 return self.match_function(lines, matches) 1061 1062 def match_exact(self, lines, matches): 1063 """Compare actual and expected file contents. 1064 """ 1065 return match_exact(lines, matches) 1066 1067 def match_re(self, lines, res): 1068 """Compare actual and expected file contents. 1069 """ 1070 return match_re(lines, res) 1071 1072 def match_re_dotall(self, lines, res): 1073 """Compare actual and expected file contents. 1074 """ 1075 return match_re_dotall(lines, res) 1076 1077 def no_result(self, condition = 1, function = None, skip = 0): 1078 """Report that the test could not be run. 1079 """ 1080 if not condition: 1081 return 1082 self.condition = 'no_result' 1083 no_result(self = self, 1084 condition = condition, 1085 function = function, 1086 skip = skip) 1087 1088 def pass_test(self, condition = 1, function = None): 1089 """Cause the test to pass. 1090 """ 1091 if not condition: 1092 return 1093 self.condition = 'pass_test' 1094 pass_test(self = self, condition = condition, function = function) 1095 1096 def preserve(self, *conditions): 1097 """Arrange for the temporary working directories for the 1098 specified TestCmd environment to be preserved for one or more 1099 conditions. If no conditions are specified, arranges for 1100 the temporary working directories to be preserved for all 1101 conditions. 1102 """ 1103 if conditions is (): 1104 conditions = ('pass_test', 'fail_test', 'no_result') 1105 for cond in conditions: 1106 self._preserve[cond] = 1 1107 1108 def program_set(self, program): 1109 """Set the executable program or script to be tested. 1110 """ 1111 if program and not os.path.isabs(program): 1112 program = os.path.join(self._cwd, program) 1113 self.program = program 1114 1115 def read(self, file, mode = 'rb'): 1116 """Reads and returns the contents of the specified file name. 1117 The file name may be a list, in which case the elements are 1118 concatenated with the os.path.join() method. The file is 1119 assumed to be under the temporary working directory unless it 1120 is an absolute path name. The I/O mode for the file may 1121 be specified; it must begin with an 'r'. The default is 1122 'rb' (binary read). 1123 """ 1124 file = self.canonicalize(file) 1125 if mode[0] != 'r': 1126 raise ValueError, "mode must begin with 'r'" 1127 return open(file, mode).read() 1128 1129 def rmdir(self, dir): 1130 """Removes the specified dir name. 1131 The dir name may be a list, in which case the elements are 1132 concatenated with the os.path.join() method. The dir is 1133 assumed to be under the temporary working directory unless it 1134 is an absolute path name. 1135 The dir must be empty. 1136 """ 1137 dir = self.canonicalize(dir) 1138 os.rmdir(dir) 1139 1140 def start(self, program = None, 1141 interpreter = None, 1142 arguments = None, 1143 universal_newlines = None, 1144 **kw): 1145 """ 1146 Starts a program or script for the test environment. 1147 1148 The specified program will have the original directory 1149 prepended unless it is enclosed in a [list]. 1150 """ 1151 cmd = self.command_args(program, interpreter, arguments) 1152 cmd_string = string.join(map(self.escape, cmd), ' ') 1153 if self.verbose: 1154 sys.stderr.write(cmd_string + "\n") 1155 if universal_newlines is None: 1156 universal_newlines = self.universal_newlines 1157 1158 # On Windows, if we make stdin a pipe when we plan to send 1159 # no input, and the test program exits before 1160 # Popen calls msvcrt.open_osfhandle, that call will fail. 1161 # So don't use a pipe for stdin if we don't need one. 1162 stdin = kw.get('stdin', None) 1163 if stdin is not None: 1164 stdin = subprocess.PIPE 1165 1166 combine = kw.get('combine', self.combine) 1167 if combine: 1168 stderr_value = subprocess.STDOUT 1169 else: 1170 stderr_value = subprocess.PIPE 1171 1172 return Popen(cmd, 1173 stdin=stdin, 1174 stdout=subprocess.PIPE, 1175 stderr=stderr_value, 1176 universal_newlines=universal_newlines) 1177 1178 def finish(self, popen, **kw): 1179 """ 1180 Finishes and waits for the process being run under control of 1181 the specified popen argument, recording the exit status, 1182 standard output and error output. 1183 """ 1184 popen.stdin.close() 1185 self.status = popen.wait() 1186 if not self.status: 1187 self.status = 0 1188 self._stdout.append(popen.stdout.read()) 1189 if popen.stderr: 1190 stderr = popen.stderr.read() 1191 else: 1192 stderr = '' 1193 self._stderr.append(stderr) 1194 1195 def run(self, program = None, 1196 interpreter = None, 1197 arguments = None, 1198 chdir = None, 1199 stdin = None, 1200 universal_newlines = None): 1201 """Runs a test of the program or script for the test 1202 environment. Standard output and error output are saved for 1203 future retrieval via the stdout() and stderr() methods. 1204 1205 The specified program will have the original directory 1206 prepended unless it is enclosed in a [list]. 1207 """ 1208 if chdir: 1209 oldcwd = os.getcwd() 1210 if not os.path.isabs(chdir): 1211 chdir = os.path.join(self.workpath(chdir)) 1212 if self.verbose: 1213 sys.stderr.write("chdir(" + chdir + ")\n") 1214 os.chdir(chdir) 1215 p = self.start(program, 1216 interpreter, 1217 arguments, 1218 universal_newlines, 1219 stdin=stdin) 1220 if stdin: 1221 if is_List(stdin): 1222 for line in stdin: 1223 p.stdin.write(line) 1224 else: 1225 p.stdin.write(stdin) 1226 p.stdin.close() 1227 1228 out = p.stdout.read() 1229 if p.stderr is None: 1230 err = '' 1231 else: 1232 err = p.stderr.read() 1233 try: 1234 close_output = p.close_output 1235 except AttributeError: 1236 p.stdout.close() 1237 if not p.stderr is None: 1238 p.stderr.close() 1239 else: 1240 close_output() 1241 1242 self._stdout.append(out) 1243 self._stderr.append(err) 1244 1245 self.status = p.wait() 1246 if not self.status: 1247 self.status = 0 1248 1249 if chdir: 1250 os.chdir(oldcwd) 1251 if self.verbose >= 2: 1252 write = sys.stdout.write 1253 write('============ STATUS: %d\n' % self.status) 1254 out = self.stdout() 1255 if out or self.verbose >= 3: 1256 write('============ BEGIN STDOUT (len=%d):\n' % len(out)) 1257 write(out) 1258 write('============ END STDOUT\n') 1259 err = self.stderr() 1260 if err or self.verbose >= 3: 1261 write('============ BEGIN STDERR (len=%d)\n' % len(err)) 1262 write(err) 1263 write('============ END STDERR\n') 1264 1265 def sleep(self, seconds = default_sleep_seconds): 1266 """Sleeps at least the specified number of seconds. If no 1267 number is specified, sleeps at least the minimum number of 1268 seconds necessary to advance file time stamps on the current 1269 system. Sleeping more seconds is all right. 1270 """ 1271 time.sleep(seconds) 1272 1273 def stderr(self, run = None): 1274 """Returns the error output from the specified run number. 1275 If there is no specified run number, then returns the error 1276 output of the last run. If the run number is less than zero, 1277 then returns the error output from that many runs back from the 1278 current run. 1279 """ 1280 if not run: 1281 run = len(self._stderr) 1282 elif run < 0: 1283 run = len(self._stderr) + run 1284 run = run - 1 1285 return self._stderr[run] 1286 1287 def stdout(self, run = None): 1288 """Returns the standard output from the specified run number. 1289 If there is no specified run number, then returns the standard 1290 output of the last run. If the run number is less than zero, 1291 then returns the standard output from that many runs back from 1292 the current run. 1293 """ 1294 if not run: 1295 run = len(self._stdout) 1296 elif run < 0: 1297 run = len(self._stdout) + run 1298 run = run - 1 1299 return self._stdout[run] 1300 1301 def subdir(self, *subdirs): 1302 """Create new subdirectories under the temporary working 1303 directory, one for each argument. An argument may be a list, 1304 in which case the list elements are concatenated using the 1305 os.path.join() method. Subdirectories multiple levels deep 1306 must be created using a separate argument for each level: 1307 1308 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) 1309 1310 Returns the number of subdirectories actually created. 1311 """ 1312 count = 0 1313 for sub in subdirs: 1314 if sub is None: 1315 continue 1316 if is_List(sub): 1317 sub = apply(os.path.join, tuple(sub)) 1318 new = os.path.join(self.workdir, sub) 1319 try: 1320 os.mkdir(new) 1321 except OSError: 1322 pass 1323 else: 1324 count = count + 1 1325 return count 1326 1327 def symlink(self, target, link): 1328 """Creates a symlink to the specified target. 1329 The link name may be a list, in which case the elements are 1330 concatenated with the os.path.join() method. The link is 1331 assumed to be under the temporary working directory unless it 1332 is an absolute path name. The target is *not* assumed to be 1333 under the temporary working directory. 1334 """ 1335 link = self.canonicalize(link) 1336 os.symlink(target, link) 1337 1338 def tempdir(self, path=None): 1339 """Creates a temporary directory. 1340 A unique directory name is generated if no path name is specified. 1341 The directory is created, and will be removed when the TestCmd 1342 object is destroyed. 1343 """ 1344 if path is None: 1345 try: 1346 path = tempfile.mktemp(prefix=tempfile.template) 1347 except TypeError: 1348 path = tempfile.mktemp() 1349 os.mkdir(path) 1350 1351 # Symlinks in the path will report things 1352 # differently from os.getcwd(), so chdir there 1353 # and back to fetch the canonical path. 1354 cwd = os.getcwd() 1355 try: 1356 os.chdir(path) 1357 path = os.getcwd() 1358 finally: 1359 os.chdir(cwd) 1360 1361 # Uppercase the drive letter since the case of drive 1362 # letters is pretty much random on win32: 1363 drive,rest = os.path.splitdrive(path) 1364 if drive: 1365 path = string.upper(drive) + rest 1366 1367 # 1368 self._dirlist.append(path) 1369 global _Cleanup 1370 try: 1371 _Cleanup.index(self) 1372 except ValueError: 1373 _Cleanup.append(self) 1374 1375 return path 1376 1377 def touch(self, path, mtime=None): 1378 """Updates the modification time on the specified file or 1379 directory path name. The default is to update to the 1380 current time if no explicit modification time is specified. 1381 """ 1382 path = self.canonicalize(path) 1383 atime = os.path.getatime(path) 1384 if mtime is None: 1385 mtime = time.time() 1386 os.utime(path, (atime, mtime)) 1387 1388 def unlink(self, file): 1389 """Unlinks the specified file name. 1390 The file name may be a list, in which case the elements are 1391 concatenated with the os.path.join() method. The file is 1392 assumed to be under the temporary working directory unless it 1393 is an absolute path name. 1394 """ 1395 file = self.canonicalize(file) 1396 os.unlink(file) 1397 1398 def verbose_set(self, verbose): 1399 """Set the verbose level. 1400 """ 1401 self.verbose = verbose 1402 1403 def where_is(self, file, path=None, pathext=None): 1404 """Find an executable file. 1405 """ 1406 if is_List(file): 1407 file = apply(os.path.join, tuple(file)) 1408 if not os.path.isabs(file): 1409 file = where_is(file, path, pathext) 1410 return file 1411 1412 def workdir_set(self, path): 1413 """Creates a temporary working directory with the specified 1414 path name. If the path is a null string (''), a unique 1415 directory name is created. 1416 """ 1417 if (path != None): 1418 if path == '': 1419 path = None 1420 path = self.tempdir(path) 1421 self.workdir = path 1422 1423 def workpath(self, *args): 1424 """Returns the absolute path name to a subdirectory or file 1425 within the current temporary working directory. Concatenates 1426 the temporary working directory name with the specified 1427 arguments using the os.path.join() method. 1428 """ 1429 return apply(os.path.join, (self.workdir,) + tuple(args)) 1430 1431 def readable(self, top, read=1): 1432 """Make the specified directory tree readable (read == 1) 1433 or not (read == None). 1434 1435 This method has no effect on Windows systems, which use a 1436 completely different mechanism to control file readability. 1437 """ 1438 1439 if sys.platform == 'win32': 1440 return 1441 1442 if read: 1443 def do_chmod(fname): 1444 try: st = os.stat(fname) 1445 except OSError: pass 1446 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD)) 1447 else: 1448 def do_chmod(fname): 1449 try: st = os.stat(fname) 1450 except OSError: pass 1451 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD)) 1452 1453 if os.path.isfile(top): 1454 # If it's a file, that's easy, just chmod it. 1455 do_chmod(top) 1456 elif read: 1457 # It's a directory and we're trying to turn on read 1458 # permission, so it's also pretty easy, just chmod the 1459 # directory and then chmod every entry on our walk down the 1460 # tree. Because os.path.walk() is top-down, we'll enable 1461 # read permission on any directories that have it disabled 1462 # before os.path.walk() tries to list their contents. 1463 do_chmod(top) 1464 1465 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1466 for n in names: 1467 do_chmod(os.path.join(dirname, n)) 1468 1469 os.path.walk(top, chmod_entries, None) 1470 else: 1471 # It's a directory and we're trying to turn off read 1472 # permission, which means we have to chmod the directoreis 1473 # in the tree bottom-up, lest disabling read permission from 1474 # the top down get in the way of being able to get at lower 1475 # parts of the tree. But os.path.walk() visits things top 1476 # down, so we just use an object to collect a list of all 1477 # of the entries in the tree, reverse the list, and then 1478 # chmod the reversed (bottom-up) list. 1479 col = Collector(top) 1480 os.path.walk(top, col, None) 1481 col.entries.reverse() 1482 for d in col.entries: do_chmod(d) 1483 1484 def writable(self, top, write=1): 1485 """Make the specified directory tree writable (write == 1) 1486 or not (write == None). 1487 """ 1488 1489 if sys.platform == 'win32': 1490 1491 if write: 1492 def do_chmod(fname): 1493 try: os.chmod(fname, stat.S_IWRITE) 1494 except OSError: pass 1495 else: 1496 def do_chmod(fname): 1497 try: os.chmod(fname, stat.S_IREAD) 1498 except OSError: pass 1499 1500 else: 1501 1502 if write: 1503 def do_chmod(fname): 1504 try: st = os.stat(fname) 1505 except OSError: pass 1506 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200)) 1507 else: 1508 def do_chmod(fname): 1509 try: st = os.stat(fname) 1510 except OSError: pass 1511 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200)) 1512 1513 if os.path.isfile(top): 1514 do_chmod(top) 1515 else: 1516 col = Collector(top) 1517 os.path.walk(top, col, None) 1518 for d in col.entries: do_chmod(d) 1519 1520 def executable(self, top, execute=1): 1521 """Make the specified directory tree executable (execute == 1) 1522 or not (execute == None). 1523 1524 This method has no effect on Windows systems, which use a 1525 completely different mechanism to control file executability. 1526 """ 1527 1528 if sys.platform == 'win32': 1529 return 1530 1531 if execute: 1532 def do_chmod(fname): 1533 try: st = os.stat(fname) 1534 except OSError: pass 1535 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC)) 1536 else: 1537 def do_chmod(fname): 1538 try: st = os.stat(fname) 1539 except OSError: pass 1540 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC)) 1541 1542 if os.path.isfile(top): 1543 # If it's a file, that's easy, just chmod it. 1544 do_chmod(top) 1545 elif execute: 1546 # It's a directory and we're trying to turn on execute 1547 # permission, so it's also pretty easy, just chmod the 1548 # directory and then chmod every entry on our walk down the 1549 # tree. Because os.path.walk() is top-down, we'll enable 1550 # execute permission on any directories that have it disabled 1551 # before os.path.walk() tries to list their contents. 1552 do_chmod(top) 1553 1554 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1555 for n in names: 1556 do_chmod(os.path.join(dirname, n)) 1557 1558 os.path.walk(top, chmod_entries, None) 1559 else: 1560 # It's a directory and we're trying to turn off execute 1561 # permission, which means we have to chmod the directories 1562 # in the tree bottom-up, lest disabling execute permission from 1563 # the top down get in the way of being able to get at lower 1564 # parts of the tree. But os.path.walk() visits things top 1565 # down, so we just use an object to collect a list of all 1566 # of the entries in the tree, reverse the list, and then 1567 # chmod the reversed (bottom-up) list. 1568 col = Collector(top) 1569 os.path.walk(top, col, None) 1570 col.entries.reverse() 1571 for d in col.entries: do_chmod(d) 1572 1573 def write(self, file, content, mode = 'wb'): 1574 """Writes the specified content text (second argument) to the 1575 specified file name (first argument). The file name may be 1576 a list, in which case the elements are concatenated with the 1577 os.path.join() method. The file is created under the temporary 1578 working directory. Any subdirectories in the path must already 1579 exist. The I/O mode for the file may be specified; it must 1580 begin with a 'w'. The default is 'wb' (binary write). 1581 """ 1582 file = self.canonicalize(file) 1583 if mode[0] != 'w': 1584 raise ValueError, "mode must begin with 'w'" 1585 open(file, mode).write(content) 1586 1587# Local Variables: 1588# tab-width:4 1589# indent-tabs-mode:nil 1590# End: 1591# vim: set expandtab tabstop=4 shiftwidth=4: 1592