1### 2# Copyright (c) 2002-2005, Jeremiah Fincher 3# Copyright (c) 2011, James McCoy 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 9# * Redistributions of source code must retain the above copyright notice, 10# this list of conditions, and the following disclaimer. 11# * Redistributions in binary form must reproduce the above copyright notice, 12# this list of conditions, and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# * Neither the name of the author of this software nor the name of 15# contributors to this software may be used to endorse or promote products 16# derived from this software without specific prior written consent. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28# POSSIBILITY OF SUCH DAMAGE. 29### 30 31import gc 32import os 33import re 34import sys 35import time 36import shutil 37import urllib 38import unittest 39import functools 40import threading 41 42from . import (callbacks, conf, drivers, httpserver, i18n, ircdb, irclib, 43 ircmsgs, ircutils, log, plugin, registry, utils, world) 44from .utils import minisix 45 46if minisix.PY2: 47 from httplib import HTTPConnection 48 from urllib import splithost, splituser 49 from urllib import URLopener 50else: 51 from http.client import HTTPConnection 52 from urllib.parse import splithost, splituser 53 from urllib.request import URLopener 54 55class verbosity: 56 NONE = 0 57 EXCEPTIONS = 1 58 MESSAGES = 2 59 60i18n.import_conf() 61network = True 62setuid = True 63 64# This is the global list of suites that are to be run. 65suites = [] 66 67timeout = 10 68 69originalCallbacksGetHelp = callbacks.getHelp 70lastGetHelp = 'x' * 1000 71def cachingGetHelp(method, name=None, doc=None): 72 global lastGetHelp 73 lastGetHelp = originalCallbacksGetHelp(method, name, doc) 74 return lastGetHelp 75callbacks.getHelp = cachingGetHelp 76 77def retry(tries=3): 78 assert tries > 0 79 def decorator(f): 80 @functools.wraps(f) 81 def newf(self): 82 try: 83 f(self) 84 except AssertionError as e: 85 first_exception = e 86 for _ in range(1, tries): 87 try: 88 f(self) 89 except AssertionError as e: 90 pass 91 else: 92 break 93 else: 94 # All failed 95 raise first_exception 96 return newf 97 return decorator 98 99def getTestIrc(): 100 irc = irclib.Irc('test') 101 # Gotta clear the connect messages (USER, NICK, etc.) 102 while irc.takeMsg(): 103 pass 104 return irc 105 106class TimeoutError(AssertionError): 107 def __str__(self): 108 return '%r timed out' % self.args[0] 109 110class TestPlugin(callbacks.Plugin): 111 def eval(self, irc, msg, args): 112 """<text> 113 114 This is the help for eval. Since Owner doesn't have an eval command 115 anymore, we needed to add this so as not to invalidate any of the tests 116 that depended on that eval command. 117 """ 118 try: 119 irc.reply(repr(eval(' '.join(args)))) 120 except callbacks.ArgumentError: 121 raise 122 except Exception as e: 123 irc.reply(utils.exnToString(e)) 124# Since we know we don't now need the Irc object, we just give None. This 125# might break if callbacks.Privmsg ever *requires* the Irc object. 126TestInstance = TestPlugin(None) 127conf.registerPlugin('TestPlugin', True, public=False) 128 129class SupyTestCase(unittest.TestCase): 130 """This class exists simply for extra logging. It's come in useful in the 131 past.""" 132 def setUp(self): 133 log.critical('Beginning test case %s', self.id()) 134 threads = [t.getName() for t in threading.enumerate()] 135 log.critical('Threads: %L', threads) 136 unittest.TestCase.setUp(self) 137 138 def tearDown(self): 139 for irc in world.ircs[:]: 140 irc._reallyDie() 141 142 if sys.version_info < (2, 7, 0): 143 def assertIn(self, member, container, msg=None): 144 """Just like self.assertTrue(a in b), but with a nicer default message.""" 145 if member not in container: 146 standardMsg = '%s not found in %s' % (repr(member), 147 repr(container)) 148 self.fail(self._formatMessage(msg, standardMsg)) 149 150 def assertNotIn(self, member, container, msg=None): 151 """Just like self.assertTrue(a not in b), but with a nicer default message.""" 152 if member in container: 153 standardMsg = '%s unexpectedly found in %s' % (repr(member), 154 repr(container)) 155 self.fail(self._formatMessage(msg, standardMsg)) 156 157 def assertIs(self, expr1, expr2, msg=None): 158 """Just like self.assertTrue(a is b), but with a nicer default message.""" 159 if expr1 is not expr2: 160 standardMsg = '%s is not %s' % (repr(expr1), 161 repr(expr2)) 162 self.fail(self._formatMessage(msg, standardMsg)) 163 164 def assertIsNot(self, expr1, expr2, msg=None): 165 """Just like self.assertTrue(a is not b), but with a nicer default message.""" 166 if expr1 is expr2: 167 standardMsg = 'unexpectedly identical: %s' % (repr(expr1),) 168 self.fail(self._formatMessage(msg, standardMsg)) 169 170 171class PluginTestCase(SupyTestCase): 172 """Subclass this to write a test case for a plugin. See 173 plugins/Plugin/test.py for an example. 174 """ 175 plugins = None 176 cleanConfDir = True 177 cleanDataDir = True 178 config = {} 179 def __init__(self, methodName='runTest'): 180 self.timeout = timeout 181 originalRunTest = getattr(self, methodName) 182 def runTest(self): 183 run = True 184 if hasattr(self, 'irc') and self.irc: 185 for cb in self.irc.callbacks: 186 cbModule = sys.modules[cb.__class__.__module__] 187 if hasattr(cbModule, 'deprecated') and cbModule.deprecated: 188 print('') 189 print('Ignored, %s is deprecated.' % cb.name()) 190 run = False 191 if run: 192 originalRunTest() 193 runTest = utils.python.changeFunctionName(runTest, methodName) 194 setattr(self.__class__, methodName, runTest) 195 SupyTestCase.__init__(self, methodName=methodName) 196 self.originals = {} 197 198 def setUp(self, nick='test', forceSetup=False): 199 if not forceSetup and \ 200 self.__class__ in (PluginTestCase, ChannelPluginTestCase): 201 # Necessary because there's a test in here that shouldn\'t run. 202 return 203 SupyTestCase.setUp(self) 204 # Just in case, let's do this. Too many people forget to call their 205 # super methods. 206 for irc in world.ircs[:]: 207 irc._reallyDie() 208 # Set conf variables appropriately. 209 conf.supybot.reply.whenAddressedBy.chars.setValue('@') 210 conf.supybot.reply.error.detailed.setValue(True) 211 conf.supybot.reply.whenNotCommand.setValue(True) 212 self.myVerbose = world.myVerbose 213 def rmFiles(dir): 214 for filename in os.listdir(dir): 215 file = os.path.join(dir, filename) 216 if os.path.isfile(file): 217 os.remove(file) 218 else: 219 shutil.rmtree(file) 220 if self.cleanConfDir: 221 rmFiles(conf.supybot.directories.conf()) 222 if self.cleanDataDir: 223 rmFiles(conf.supybot.directories.data()) 224 ircdb.users.reload() 225 ircdb.ignores.reload() 226 ircdb.channels.reload() 227 if self.plugins is None: 228 raise ValueError('PluginTestCase must have a "plugins" attribute.') 229 self.nick = nick 230 self.prefix = ircutils.joinHostmask(nick, 'user', 'host.domain.tld') 231 self.irc = getTestIrc() 232 MiscModule = plugin.loadPluginModule('Misc') 233 OwnerModule = plugin.loadPluginModule('Owner') 234 ConfigModule = plugin.loadPluginModule('Config') 235 plugin.loadPluginClass(self.irc, MiscModule) 236 plugin.loadPluginClass(self.irc, OwnerModule) 237 plugin.loadPluginClass(self.irc, ConfigModule) 238 if isinstance(self.plugins, str): 239 self.plugins = [self.plugins] 240 else: 241 for name in self.plugins: 242 if name not in ('Owner', 'Misc', 'Config'): 243 module = plugin.loadPluginModule(name, 244 ignoreDeprecation=True) 245 plugin.loadPluginClass(self.irc, module) 246 self.irc.addCallback(TestInstance) 247 for (name, value) in self.config.items(): 248 group = conf.supybot 249 parts = registry.split(name) 250 if parts[0] == 'supybot': 251 parts.pop(0) 252 for part in parts: 253 group = group.get(part) 254 self.originals[group] = group() 255 group.setValue(value) 256 257 def tearDown(self): 258 if self.__class__ in (PluginTestCase, ChannelPluginTestCase): 259 # Necessary because there's a test in here that shouldn\'t run. 260 return 261 for (group, original) in self.originals.items(): 262 group.setValue(original) 263 ircdb.users.close() 264 ircdb.ignores.close() 265 ircdb.channels.close() 266 SupyTestCase.tearDown(self) 267 self.irc = None 268 gc.collect() 269 270 def _feedMsg(self, query, timeout=None, to=None, frm=None, 271 usePrefixChar=True, expectException=False): 272 if to is None: 273 to = self.irc.nick 274 if frm is None: 275 frm = self.prefix 276 if timeout is None: 277 timeout = self.timeout 278 if self.myVerbose >= verbosity.MESSAGES: 279 print('') # Extra newline, so it's pretty. 280 prefixChars = conf.supybot.reply.whenAddressedBy.chars() 281 if not usePrefixChar and query[0] in prefixChars: 282 query = query[1:] 283 if minisix.PY2: 284 query = query.encode('utf8') # unicode->str 285 msg = ircmsgs.privmsg(to, query, prefix=frm) 286 if self.myVerbose >= verbosity.MESSAGES: 287 print('Feeding: %r' % msg) 288 if not expectException and self.myVerbose >= verbosity.EXCEPTIONS: 289 conf.supybot.log.stdout.setValue(True) 290 self.irc.feedMsg(msg) 291 fed = time.time() 292 response = self.irc.takeMsg() 293 while response is None and time.time() - fed < timeout: 294 time.sleep(0.01) # So it doesn't suck up 100% cpu. 295 drivers.run() 296 response = self.irc.takeMsg() 297 if self.myVerbose >= verbosity.MESSAGES: 298 print('Response: %r' % response) 299 if not expectException and self.myVerbose >= verbosity.EXCEPTIONS: 300 conf.supybot.log.stdout.setValue(False) 301 return response 302 303 def getMsg(self, query, **kwargs): 304 return self._feedMsg(query, **kwargs) 305 306 def feedMsg(self, query, to=None, frm=None): 307 """Just feeds it a message, that's all.""" 308 if to is None: 309 to = self.irc.nick 310 if frm is None: 311 frm = self.prefix 312 self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm)) 313 314 # These assertError/assertNoError are somewhat fragile. The proper way to 315 # do them would be to use a proxy for the irc object and intercept .error. 316 # But that would be hard, so I don't bother. When this breaks, it'll get 317 # fixed, but not until then. 318 def assertError(self, query, **kwargs): 319 m = self._feedMsg(query, expectException=True, **kwargs) 320 if m is None: 321 raise TimeoutError(query) 322 if lastGetHelp not in m.args[1]: 323 self.failUnless(m.args[1].startswith('Error:'), 324 '%r did not error: %s' % (query, m.args[1])) 325 return m 326 327 def assertSnarfError(self, query, **kwargs): 328 return self.assertError(query, usePrefixChar=False, **kwargs) 329 330 def assertNotError(self, query, **kwargs): 331 m = self._feedMsg(query, **kwargs) 332 if m is None: 333 raise TimeoutError(query) 334 self.failIf(m.args[1].startswith('Error:'), 335 '%r errored: %s' % (query, m.args[1])) 336 self.failIf(lastGetHelp in m.args[1], 337 '%r returned the help string.' % query) 338 return m 339 340 def assertSnarfNotError(self, query, **kwargs): 341 return self.assertNotError(query, usePrefixChar=False, **kwargs) 342 343 def assertHelp(self, query, **kwargs): 344 m = self._feedMsg(query, **kwargs) 345 if m is None: 346 raise TimeoutError(query) 347 msg = m.args[1] 348 if 'more message' in msg: 349 msg = msg[0:-27] # Strip (XXX more messages) 350 self.failUnless(msg in lastGetHelp, 351 '%s is not the help (%s)' % (m.args[1], lastGetHelp)) 352 return m 353 354 def assertNoResponse(self, query, timeout=0, **kwargs): 355 m = self._feedMsg(query, timeout=timeout, **kwargs) 356 self.failIf(m, 'Unexpected response: %r' % m) 357 return m 358 359 def assertSnarfNoResponse(self, query, timeout=0, **kwargs): 360 return self.assertNoResponse(query, timeout=timeout, 361 usePrefixChar=False, **kwargs) 362 363 def assertResponse(self, query, expectedResponse, **kwargs): 364 m = self._feedMsg(query, **kwargs) 365 if m is None: 366 raise TimeoutError(query) 367 self.assertEqual(m.args[1], expectedResponse, 368 '%r != %r' % (expectedResponse, m.args[1])) 369 return m 370 371 def assertSnarfResponse(self, query, expectedResponse, **kwargs): 372 return self.assertResponse(query, expectedResponse, 373 usePrefixChar=False, **kwargs) 374 375 def assertRegexp(self, query, regexp, flags=re.I, **kwargs): 376 m = self._feedMsg(query, **kwargs) 377 if m is None: 378 raise TimeoutError(query) 379 self.failUnless(re.search(regexp, m.args[1], flags), 380 '%r does not match %r' % (m.args[1], regexp)) 381 return m 382 383 def assertSnarfRegexp(self, query, regexp, flags=re.I, **kwargs): 384 return self.assertRegexp(query, regexp, flags=re.I, 385 usePrefixChar=False, **kwargs) 386 387 def assertNotRegexp(self, query, regexp, flags=re.I, **kwargs): 388 m = self._feedMsg(query, **kwargs) 389 if m is None: 390 raise TimeoutError(query) 391 self.failUnless(re.search(regexp, m.args[1], flags) is None, 392 '%r matched %r' % (m.args[1], regexp)) 393 return m 394 395 def assertSnarfNotRegexp(self, query, regexp, flags=re.I, **kwargs): 396 return self.assertNotRegexp(query, regexp, flags=re.I, 397 usePrefixChar=False, **kwargs) 398 399 def assertAction(self, query, expectedResponse=None, **kwargs): 400 m = self._feedMsg(query, **kwargs) 401 if m is None: 402 raise TimeoutError(query) 403 self.failUnless(ircmsgs.isAction(m), '%r is not an action.' % m) 404 if expectedResponse is not None: 405 s = ircmsgs.unAction(m) 406 self.assertEqual(s, expectedResponse, 407 '%r != %r' % (s, expectedResponse)) 408 return m 409 410 def assertSnarfAction(self, query, expectedResponse=None, **kwargs): 411 return self.assertAction(query, expectedResponse=None, 412 usePrefixChar=False, **kwargs) 413 414 def assertActionRegexp(self, query, regexp, flags=re.I, **kwargs): 415 m = self._feedMsg(query, **kwargs) 416 if m is None: 417 raise TimeoutError(query) 418 self.failUnless(ircmsgs.isAction(m)) 419 s = ircmsgs.unAction(m) 420 self.failUnless(re.search(regexp, s, flags), 421 '%r does not match %r' % (s, regexp)) 422 423 def assertSnarfActionRegexp(self, query, regexp, flags=re.I, **kwargs): 424 return self.assertActionRegexp(query, regexp, flags=re.I, 425 usePrefixChar=False, **kwargs) 426 427 _noTestDoc = ('Admin', 'Channel', 'Config', 428 'Misc', 'Owner', 'User', 'TestPlugin') 429 def TestDocumentation(self): 430 if self.__class__ in (PluginTestCase, ChannelPluginTestCase): 431 return 432 for cb in self.irc.callbacks: 433 name = cb.name() 434 if ((name in self._noTestDoc) and \ 435 not name.lower() in self.__class__.__name__.lower()): 436 continue 437 self.failUnless(sys.modules[cb.__class__.__name__].__doc__, 438 '%s has no module documentation.' % name) 439 if hasattr(cb, 'isCommandMethod'): 440 for attr in dir(cb): 441 if cb.isCommandMethod(attr) and \ 442 attr == callbacks.canonicalName(attr): 443 self.failUnless(getattr(cb, attr, None).__doc__, 444 '%s.%s has no help.' % (name, attr)) 445 446 447 448class ChannelPluginTestCase(PluginTestCase): 449 channel = '#test' 450 def setUp(self, nick='test', forceSetup=False): 451 if not forceSetup and \ 452 self.__class__ in (PluginTestCase, ChannelPluginTestCase): 453 return 454 PluginTestCase.setUp(self) 455 self.irc.feedMsg(ircmsgs.join(self.channel, prefix=self.prefix)) 456 m = self.irc.takeMsg() 457 self.failIf(m is None, 'No message back from joining channel.') 458 self.assertEqual(m.command, 'MODE') 459 m = self.irc.takeMsg() 460 self.failIf(m is None, 'No message back from joining channel.') 461 self.assertEqual(m.command, 'MODE') 462 m = self.irc.takeMsg() 463 self.failIf(m is None, 'No message back from joining channel.') 464 self.assertEqual(m.command, 'WHO') 465 466 def _feedMsg(self, query, timeout=None, to=None, frm=None, private=False, 467 usePrefixChar=True, expectException=False): 468 if to is None: 469 if private: 470 to = self.irc.nick 471 else: 472 to = self.channel 473 if frm is None: 474 frm = self.prefix 475 if timeout is None: 476 timeout = self.timeout 477 if self.myVerbose >= verbosity.MESSAGES: 478 print('') # Newline, just like PluginTestCase. 479 prefixChars = conf.supybot.reply.whenAddressedBy.chars() 480 if query[0] not in prefixChars and usePrefixChar: 481 query = prefixChars[0] + query 482 if minisix.PY2 and isinstance(query, unicode): 483 query = query.encode('utf8') # unicode->str 484 if not expectException and self.myVerbose >= verbosity.EXCEPTIONS: 485 conf.supybot.log.stdout.setValue(True) 486 msg = ircmsgs.privmsg(to, query, prefix=frm) 487 if self.myVerbose >= verbosity.MESSAGES: 488 print('Feeding: %r' % msg) 489 self.irc.feedMsg(msg) 490 fed = time.time() 491 response = self.irc.takeMsg() 492 while response is None and time.time() - fed < timeout: 493 time.sleep(0.1) 494 drivers.run() 495 response = self.irc.takeMsg() 496 if response is not None: 497 if response.command == 'PRIVMSG': 498 args = list(response.args) 499 # Strip off nick: at beginning of response. 500 if args[1].startswith(self.nick) or \ 501 args[1].startswith(ircutils.nickFromHostmask(self.prefix)): 502 try: 503 args[1] = args[1].split(' ', 1)[1] 504 except IndexError: 505 # Odd. We'll skip this. 506 pass 507 ret = ircmsgs.privmsg(*args) 508 else: 509 ret = response 510 else: 511 ret = None 512 if self.myVerbose >= verbosity.MESSAGES: 513 print('Returning: %r' % ret) 514 if not expectException and self.myVerbose >= verbosity.EXCEPTIONS: 515 conf.supybot.log.stdout.setValue(False) 516 return ret 517 518 def feedMsg(self, query, to=None, frm=None, private=False): 519 """Just feeds it a message, that's all.""" 520 if to is None: 521 if private: 522 to = self.irc.nick 523 else: 524 to = self.channel 525 if frm is None: 526 frm = self.prefix 527 self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm)) 528 529class TestRequestHandler(httpserver.SupyHTTPRequestHandler): 530 def __init__(self, rfile, wfile, *args, **kwargs): 531 self._headers_mode = True 532 self.rfile = rfile 533 self.wfile = wfile 534 self.handle_one_request() 535 536 def send_response(self, code): 537 assert self._headers_mode 538 self._response = code 539 def send_headers(self, name, value): 540 assert self._headers_mode 541 self._headers[name] = value 542 def end_headers(self): 543 assert self._headers_mode 544 self._headers_mode = False 545 546 def do_X(self, *args, **kwargs): 547 assert httpserver.http_servers, \ 548 'The HTTP server is not started.' 549 self.server = httpserver.http_servers[0] 550 httpserver.SupyHTTPRequestHandler.do_X(self, *args, **kwargs) 551 552httpserver.http_servers = [httpserver.TestSupyHTTPServer()] 553 554# Partially stolen from the standard Python library :) 555def open_http(url, data=None): 556 """Use HTTP protocol.""" 557 user_passwd = None 558 proxy_passwd= None 559 if isinstance(url, str): 560 host, selector = splithost(url) 561 if host: 562 user_passwd, host = splituser(host) 563 host = urllib.unquote(host) 564 realhost = host 565 else: 566 host, selector = url 567 # check whether the proxy contains authorization information 568 proxy_passwd, host = splituser(host) 569 # now we proceed with the url we want to obtain 570 urltype, rest = urllib.splittype(selector) 571 url = rest 572 user_passwd = None 573 if urltype.lower() != 'http': 574 realhost = None 575 else: 576 realhost, rest = splithost(rest) 577 if realhost: 578 user_passwd, realhost = splituser(realhost) 579 if user_passwd: 580 selector = "%s://%s%s" % (urltype, realhost, rest) 581 if urllib.proxy_bypass(realhost): 582 host = realhost 583 584 #print "proxy via http:", host, selector 585 if not host: raise IOError('http error', 'no host given') 586 587 if proxy_passwd: 588 import base64 589 proxy_auth = base64.b64encode(proxy_passwd).strip() 590 else: 591 proxy_auth = None 592 593 if user_passwd: 594 import base64 595 auth = base64.b64encode(user_passwd).strip() 596 else: 597 auth = None 598 c = FakeHTTPConnection(host) 599 if data is not None: 600 c.putrequest('POST', selector) 601 c.putheader('Content-Type', 'application/x-www-form-urlencoded') 602 c.putheader('Content-Length', '%d' % len(data)) 603 else: 604 c.putrequest('GET', selector) 605 if proxy_auth: c.putheader('Proxy-Authorization', 'Basic %s' % proxy_auth) 606 if auth: c.putheader('Authorization', 'Basic %s' % auth) 607 if realhost: c.putheader('Host', realhost) 608 for args in URLopener().addheaders: c.putheader(*args) 609 c.endheaders() 610 return c 611 612class FakeHTTPConnection(HTTPConnection): 613 _data = '' 614 _headers = {} 615 def __init__(self, rfile, wfile): 616 HTTPConnection.__init__(self, 'localhost') 617 self.rfile = rfile 618 self.wfile = wfile 619 def send(self, data): 620 if minisix.PY3 and isinstance(data, bytes): 621 data = data.decode() 622 self.wfile.write(data) 623 #def putheader(self, name, value): 624 # self._headers[name] = value 625 #def connect(self, *args, **kwargs): 626 # self.sock = self.wfile 627 #def getresponse(self, *args, **kwargs): 628 # pass 629 630class HTTPPluginTestCase(PluginTestCase): 631 def setUp(self): 632 PluginTestCase.setUp(self, forceSetup=True) 633 634 def request(self, url, method='GET', read=True, data={}): 635 assert url.startswith('/') 636 wfile = minisix.io.StringIO() 637 rfile = minisix.io.StringIO() 638 connection = FakeHTTPConnection(wfile, rfile) 639 connection.putrequest(method, url) 640 connection.endheaders() 641 rfile.seek(0) 642 wfile.seek(0) 643 handler = TestRequestHandler(rfile, wfile) 644 if read: 645 return (handler._response, wfile.read()) 646 else: 647 return handler._response 648 649 def assertHTTPResponse(self, uri, expectedResponse, **kwargs): 650 response = self.request(uri, read=False, **kwargs) 651 self.assertEqual(response, expectedResponse) 652 653 def assertNotHTTPResponse(self, uri, expectedResponse, **kwargs): 654 response = self.request(uri, read=False, **kwargs) 655 self.assertNotEqual(response, expectedResponse) 656 657class ChannelHTTPPluginTestCase(ChannelPluginTestCase, HTTPPluginTestCase): 658 def setUp(self): 659 ChannelPluginTestCase.setUp(self, forceSetup=True) 660 661# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: 662 663