1# Copyright (c) 2004-2007 Divmod.
2# See LICENSE for details.
3
4"""
5Tests for L{nevow.appserver}.
6"""
7
8from zope.interface import implements
9
10from cStringIO import StringIO
11from shlex import split
12
13from twisted.trial.unittest import TestCase
14from twisted.internet.defer import Deferred
15
16from nevow import inevow
17from nevow import appserver
18from nevow import context
19from nevow import testutil
20from nevow import util
21
22from nevow.appserver import NevowSite
23from nevow.context import RequestContext
24from nevow.rend import Page
25from nevow.testutil import FakeRequest
26
27class Render:
28    implements(inevow.IResource)
29
30    rendered = False
31
32    def locateChild(self, context, segs):
33        return self, ()
34
35    def renderHTTP(self, context):
36        self.rendered = True
37        return ''
38
39
40class TestLookup(testutil.TestCase):
41    def getResourceFor(self, root, url):
42        r = testutil.FakeRequest()
43        self.request = r
44        r.postpath = url.split('/')
45        ctx = context.RequestContext(tag=self.request)
46        return util.maybeDeferred(
47            appserver.NevowSite(root).getPageContextForRequestContext, ctx)
48
49    def test_leafIsRoot(self):
50        root = Render()
51        self.getResourceFor(root, 'foo/bar/baz').addCallback(
52            lambda result: self.assertIdentical(result.tag, root))
53
54    def test_children(self):
55        class FirstTwo(Render):
56            def locateChild(self, context, segs):
57                return LastOne(), segs[2:]
58
59        class LastOne(Render):
60            def locateChild(self, context, segs):
61                return Render(), segs[1:]
62
63        return self.getResourceFor(FirstTwo(), 'foo/bar/baz').addCallback(
64            lambda result: self.assertIdentical(result.tag.__class__, Render))
65
66    def test_oldresource(self):
67        from twisted.web import resource
68        r = resource.Resource()
69        r.putChild('foo', r)
70        return self.getResourceFor(r, 'foo').addCallback(
71            lambda result: self.assertIdentical(r, result.tag.original))
72
73    def test_deferredChild(self):
74        class Deferreder(Render):
75            def locateChild(self, context, segs):
76                d = util.succeed((self, segs[1:]))
77                return d
78
79        r = Deferreder()
80        return self.getResourceFor(r, 'foo').addCallback(
81            lambda result: self.assertIdentical(r, result.tag))
82
83    def test_cycle(self):
84        class Resource(object):
85            implements(inevow.IResource)
86            def locateChild(self, ctx, segments):
87                if segments[0] == 'self':
88                    return self, segments
89                return self, segments[1:]
90        root = Resource()
91
92        def asserterr(f):
93            f.trap(AssertionError)
94            return self.getResourceFor(root, 'notself')
95        self.getResourceFor(root, 'self').addCallbacks(
96            lambda : self.fail(),
97            asserterr)
98
99
100
101class TestSiteAndRequest(testutil.TestCase):
102    def renderResource(self, resource, path):
103        s = appserver.NevowSite(resource)
104        r = appserver.NevowRequest(testutil.FakeChannel(s), True)
105        r.path = path
106        return r.process()
107
108    def test_deferredRender(self):
109        class Deferreder(Render):
110            def renderHTTP(self, context):
111                return util.succeed("hello")
112
113        return self.renderResource(Deferreder(), 'foo').addCallback(
114            lambda result: self.assertEquals(result, "hello"))
115
116    def test_regularRender(self):
117        class Regular(Render):
118            def renderHTTP(self, context):
119                return "world"
120
121        return self.renderResource(Regular(), 'bar').addCallback(
122            lambda result: self.assertEquals(result, 'world'))
123
124    def test_returnsResource(self):
125        class Res2(Render):
126            def renderHTTP(self, ctx):
127                return "world"
128
129        class Res1(Render):
130            def renderHTTP(self, ctx):
131                return Res2()
132
133        return self.renderResource(Res1(), 'bar').addCallback(
134            lambda result: self.assertEquals(result, 'world'))
135
136    def test_connectionLost(self):
137        """
138        L{Request.finish} is not called when the connection is lost before
139        rendering has finished.
140        """
141        rendering = Deferred()
142        class Res(Render):
143            def renderHTTP(self, ctx):
144                return rendering
145        site = appserver.NevowSite(Res())
146        request = appserver.NevowRequest(testutil.FakeChannel(site), True)
147        request.connectionLost(Exception("Just Testing"))
148        rendering.callback(b"finished")
149
150        self.assertFalse(
151            request.finished, "Request was incorrectly marked as finished.")
152
153
154    def test_renderPOST(self):
155        """
156        A POST request with form data has the form data parsed into
157        C{request.fields}.
158        """
159        class Res(Render):
160            def renderHTTP(self, ctx):
161                return b''
162
163        s = appserver.NevowSite(Res())
164        r = appserver.NevowRequest(
165            testutil.FakeChannel(s), True)
166        r.method = b'POST'
167        r.path = b'/'
168        r.content = StringIO(b'foo=bar')
169        self.successResultOf(r.process())
170        self.assertEquals(r.fields[b'foo'].value, b'bar')
171
172
173
174from twisted.internet import protocol, address
175
176class FakeTransport(protocol.FileWrapper):
177    disconnecting = False
178    disconnect_done = False
179    def __init__(self, addr, peerAddr):
180        self.data = StringIO()
181        protocol.FileWrapper.__init__(self, self.data)
182        self.addr = addr
183        self.peerAddr = peerAddr
184    def getHost(self):
185        return self.addr
186    def getPeer(self):
187        return self.peerAddr
188    def loseConnection(self):
189        self.disconnecting = True
190
191class Logging(testutil.TestCase):
192    def setUp(self):
193        class Res1(Render):
194            def renderHTTP(self, ctx):
195                return "boring"
196        self.site = appserver.NevowSite(Res1())
197        self.site.startFactory()
198        self.addCleanup(self.site.stopFactory)
199        self.site.logFile = StringIO()
200
201
202    def renderResource(self, path):
203        """@todo: share me"""
204        proto = self.site.buildProtocol(
205            address.IPv4Address('TCP', 'fakeaddress', 42))
206        transport = FakeTransport(
207            address.IPv4Address('TCP', 'fakeaddress1', 42),
208            address.IPv4Address('TCP', 'fakeaddress2', 42))
209        proto.makeConnection(transport)
210        proto.dataReceived('\r\n'.join(['GET %s HTTP/1.0' % path,
211                                        'ReFeReR: fakerefer',
212                                        'uSeR-AgEnt: fakeagent',
213                                        '', '']))
214        assert transport.disconnecting
215        return proto
216
217
218    def setSiteTime(self, when):
219        """
220        Forcibly override the current time as known by C{self.site}.
221
222        This relies on knowledge of private details of
223        L{twisted.web.server.Site}.  It would be nice if there were an API on
224        that class for doing this more properly, to facilitate testing.
225        """
226        self.site._logDateTime = when
227
228
229    def test_oldStyle(self):
230        self.setSiteTime('faketime')
231        proto = self.renderResource('/foo')
232        logLines = proto.site.logFile.getvalue().splitlines()
233        self.assertEquals(len(logLines), 1)
234        self.assertEqual(
235            split(logLines[0]),
236            ['fakeaddress2', '-', '-', 'faketime', 'GET /foo HTTP/1.0', '200', '6',
237             'fakerefer', 'fakeagent'])
238
239
240    def test_newStyle(self):
241        class FakeLogger(object):
242            logged = []
243            def log(self, ctx):
244                request = inevow.IRequest(ctx)
245                self.logged.append(('fakeLog',
246                                    request.getClientIP(),
247                                    request.method,
248                                    request.uri,
249                                    request.clientproto,
250                                    request.code,
251                                    request.sentLength))
252
253        myLog = FakeLogger()
254        self.site.remember(myLog, inevow.ILogger)
255        proto = self.renderResource('/foo')
256        logLines = proto.site.logFile.getvalue().splitlines()
257        self.assertEquals(len(logLines), 0)
258        self.assertEquals(myLog.logged,
259                          [
260            ('fakeLog', 'fakeaddress2', 'GET', '/foo', 'HTTP/1.0', 200, 6),
261            ])
262
263
264
265class HandleSegment(TestCase):
266    """
267    Tests for L{NevowSite.handleSegment}.
268
269    L{NevowSite.handleSegment} interprets the return value of a single call to
270    L{IResource.locateChild} and makes subsequent calls to that API or returns
271    a context object which will render a resource.
272    """
273    def test_emptyPostPath(self):
274        """
275        If more path segments are consumed than remain in the request's
276        I{postpath}, L{NevowSite.handleSegment} should silently not update
277        I{prepath}.
278        """
279        request = FakeRequest(currentSegments=('',))
280        context = RequestContext(tag=request)
281        rootResource = Page()
282        childResource = Page()
283        site = NevowSite(rootResource)
284        result = site.handleSegment(
285            (childResource, ()), request, ('foo', 'bar'), context)
286        self.assertEqual(request.prepath, [''])
287        self.assertEqual(request.postpath, [])
288