1# Copyright (c) 2004-2007 Divmod. 2# See LICENSE for details. 3 4""" 5Tests for L{nevow.appserver}. 6""" 7 8from zope.interface import implements 9 10from cStringIO import StringIO 11from shlex import split 12 13from twisted.trial.unittest import TestCase 14from twisted.internet.defer import Deferred 15 16from nevow import inevow 17from nevow import appserver 18from nevow import context 19from nevow import testutil 20from nevow import util 21 22from nevow.appserver import NevowSite 23from nevow.context import RequestContext 24from nevow.rend import Page 25from nevow.testutil import FakeRequest 26 27class Render: 28 implements(inevow.IResource) 29 30 rendered = False 31 32 def locateChild(self, context, segs): 33 return self, () 34 35 def renderHTTP(self, context): 36 self.rendered = True 37 return '' 38 39 40class TestLookup(testutil.TestCase): 41 def getResourceFor(self, root, url): 42 r = testutil.FakeRequest() 43 self.request = r 44 r.postpath = url.split('/') 45 ctx = context.RequestContext(tag=self.request) 46 return util.maybeDeferred( 47 appserver.NevowSite(root).getPageContextForRequestContext, ctx) 48 49 def test_leafIsRoot(self): 50 root = Render() 51 self.getResourceFor(root, 'foo/bar/baz').addCallback( 52 lambda result: self.assertIdentical(result.tag, root)) 53 54 def test_children(self): 55 class FirstTwo(Render): 56 def locateChild(self, context, segs): 57 return LastOne(), segs[2:] 58 59 class LastOne(Render): 60 def locateChild(self, context, segs): 61 return Render(), segs[1:] 62 63 return self.getResourceFor(FirstTwo(), 'foo/bar/baz').addCallback( 64 lambda result: self.assertIdentical(result.tag.__class__, Render)) 65 66 def test_oldresource(self): 67 from twisted.web import resource 68 r = resource.Resource() 69 r.putChild('foo', r) 70 return self.getResourceFor(r, 'foo').addCallback( 71 lambda result: self.assertIdentical(r, result.tag.original)) 72 73 def test_deferredChild(self): 74 class Deferreder(Render): 75 def locateChild(self, context, segs): 76 d = util.succeed((self, segs[1:])) 77 return d 78 79 r = Deferreder() 80 return self.getResourceFor(r, 'foo').addCallback( 81 lambda result: self.assertIdentical(r, result.tag)) 82 83 def test_cycle(self): 84 class Resource(object): 85 implements(inevow.IResource) 86 def locateChild(self, ctx, segments): 87 if segments[0] == 'self': 88 return self, segments 89 return self, segments[1:] 90 root = Resource() 91 92 def asserterr(f): 93 f.trap(AssertionError) 94 return self.getResourceFor(root, 'notself') 95 self.getResourceFor(root, 'self').addCallbacks( 96 lambda : self.fail(), 97 asserterr) 98 99 100 101class TestSiteAndRequest(testutil.TestCase): 102 def renderResource(self, resource, path): 103 s = appserver.NevowSite(resource) 104 r = appserver.NevowRequest(testutil.FakeChannel(s), True) 105 r.path = path 106 return r.process() 107 108 def test_deferredRender(self): 109 class Deferreder(Render): 110 def renderHTTP(self, context): 111 return util.succeed("hello") 112 113 return self.renderResource(Deferreder(), 'foo').addCallback( 114 lambda result: self.assertEquals(result, "hello")) 115 116 def test_regularRender(self): 117 class Regular(Render): 118 def renderHTTP(self, context): 119 return "world" 120 121 return self.renderResource(Regular(), 'bar').addCallback( 122 lambda result: self.assertEquals(result, 'world')) 123 124 def test_returnsResource(self): 125 class Res2(Render): 126 def renderHTTP(self, ctx): 127 return "world" 128 129 class Res1(Render): 130 def renderHTTP(self, ctx): 131 return Res2() 132 133 return self.renderResource(Res1(), 'bar').addCallback( 134 lambda result: self.assertEquals(result, 'world')) 135 136 def test_connectionLost(self): 137 """ 138 L{Request.finish} is not called when the connection is lost before 139 rendering has finished. 140 """ 141 rendering = Deferred() 142 class Res(Render): 143 def renderHTTP(self, ctx): 144 return rendering 145 site = appserver.NevowSite(Res()) 146 request = appserver.NevowRequest(testutil.FakeChannel(site), True) 147 request.connectionLost(Exception("Just Testing")) 148 rendering.callback(b"finished") 149 150 self.assertFalse( 151 request.finished, "Request was incorrectly marked as finished.") 152 153 154 def test_renderPOST(self): 155 """ 156 A POST request with form data has the form data parsed into 157 C{request.fields}. 158 """ 159 class Res(Render): 160 def renderHTTP(self, ctx): 161 return b'' 162 163 s = appserver.NevowSite(Res()) 164 r = appserver.NevowRequest( 165 testutil.FakeChannel(s), True) 166 r.method = b'POST' 167 r.path = b'/' 168 r.content = StringIO(b'foo=bar') 169 self.successResultOf(r.process()) 170 self.assertEquals(r.fields[b'foo'].value, b'bar') 171 172 173 174from twisted.internet import protocol, address 175 176class FakeTransport(protocol.FileWrapper): 177 disconnecting = False 178 disconnect_done = False 179 def __init__(self, addr, peerAddr): 180 self.data = StringIO() 181 protocol.FileWrapper.__init__(self, self.data) 182 self.addr = addr 183 self.peerAddr = peerAddr 184 def getHost(self): 185 return self.addr 186 def getPeer(self): 187 return self.peerAddr 188 def loseConnection(self): 189 self.disconnecting = True 190 191class Logging(testutil.TestCase): 192 def setUp(self): 193 class Res1(Render): 194 def renderHTTP(self, ctx): 195 return "boring" 196 self.site = appserver.NevowSite(Res1()) 197 self.site.startFactory() 198 self.addCleanup(self.site.stopFactory) 199 self.site.logFile = StringIO() 200 201 202 def renderResource(self, path): 203 """@todo: share me""" 204 proto = self.site.buildProtocol( 205 address.IPv4Address('TCP', 'fakeaddress', 42)) 206 transport = FakeTransport( 207 address.IPv4Address('TCP', 'fakeaddress1', 42), 208 address.IPv4Address('TCP', 'fakeaddress2', 42)) 209 proto.makeConnection(transport) 210 proto.dataReceived('\r\n'.join(['GET %s HTTP/1.0' % path, 211 'ReFeReR: fakerefer', 212 'uSeR-AgEnt: fakeagent', 213 '', ''])) 214 assert transport.disconnecting 215 return proto 216 217 218 def setSiteTime(self, when): 219 """ 220 Forcibly override the current time as known by C{self.site}. 221 222 This relies on knowledge of private details of 223 L{twisted.web.server.Site}. It would be nice if there were an API on 224 that class for doing this more properly, to facilitate testing. 225 """ 226 self.site._logDateTime = when 227 228 229 def test_oldStyle(self): 230 self.setSiteTime('faketime') 231 proto = self.renderResource('/foo') 232 logLines = proto.site.logFile.getvalue().splitlines() 233 self.assertEquals(len(logLines), 1) 234 self.assertEqual( 235 split(logLines[0]), 236 ['fakeaddress2', '-', '-', 'faketime', 'GET /foo HTTP/1.0', '200', '6', 237 'fakerefer', 'fakeagent']) 238 239 240 def test_newStyle(self): 241 class FakeLogger(object): 242 logged = [] 243 def log(self, ctx): 244 request = inevow.IRequest(ctx) 245 self.logged.append(('fakeLog', 246 request.getClientIP(), 247 request.method, 248 request.uri, 249 request.clientproto, 250 request.code, 251 request.sentLength)) 252 253 myLog = FakeLogger() 254 self.site.remember(myLog, inevow.ILogger) 255 proto = self.renderResource('/foo') 256 logLines = proto.site.logFile.getvalue().splitlines() 257 self.assertEquals(len(logLines), 0) 258 self.assertEquals(myLog.logged, 259 [ 260 ('fakeLog', 'fakeaddress2', 'GET', '/foo', 'HTTP/1.0', 200, 6), 261 ]) 262 263 264 265class HandleSegment(TestCase): 266 """ 267 Tests for L{NevowSite.handleSegment}. 268 269 L{NevowSite.handleSegment} interprets the return value of a single call to 270 L{IResource.locateChild} and makes subsequent calls to that API or returns 271 a context object which will render a resource. 272 """ 273 def test_emptyPostPath(self): 274 """ 275 If more path segments are consumed than remain in the request's 276 I{postpath}, L{NevowSite.handleSegment} should silently not update 277 I{prepath}. 278 """ 279 request = FakeRequest(currentSegments=('',)) 280 context = RequestContext(tag=request) 281 rootResource = Page() 282 childResource = Page() 283 site = NevowSite(rootResource) 284 result = site.handleSegment( 285 (childResource, ()), request, ('foo', 'bar'), context) 286 self.assertEqual(request.prepath, ['']) 287 self.assertEqual(request.postpath, []) 288