1# Copyright 2011 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17"""Unit tests for `wsgi`."""
18
19import os
20import platform
21import socket
22import tempfile
23import testtools
24from unittest import mock
25
26import eventlet
27import eventlet.wsgi
28import requests
29import webob
30
31from oslo_config import cfg
32from oslo_service import sslutils
33from oslo_service.tests import base
34from oslo_service import wsgi
35from oslo_utils import netutils
36
37
38SSL_CERT_DIR = os.path.normpath(os.path.join(
39                                os.path.dirname(os.path.abspath(__file__)),
40                                'ssl_cert'))
41CONF = cfg.CONF
42
43
44class WsgiTestCase(base.ServiceBaseTestCase):
45    """Base class for WSGI tests."""
46
47    def setUp(self):
48        super(WsgiTestCase, self).setUp()
49        self.conf(args=[], default_config_files=[])
50
51
52class TestLoaderNothingExists(WsgiTestCase):
53    """Loader tests where os.path.exists always returns False."""
54
55    def setUp(self):
56        super(TestLoaderNothingExists, self).setUp()
57        mock_patcher = mock.patch.object(os.path, 'exists',
58                                         lambda _: False)
59        mock_patcher.start()
60        self.addCleanup(mock_patcher.stop)
61
62    def test_relpath_config_not_found(self):
63        self.config(api_paste_config='api-paste.ini')
64        self.assertRaises(
65            wsgi.ConfigNotFound,
66            wsgi.Loader,
67            self.conf
68        )
69
70    def test_asbpath_config_not_found(self):
71        self.config(api_paste_config='/etc/openstack-srv/api-paste.ini')
72        self.assertRaises(
73            wsgi.ConfigNotFound,
74            wsgi.Loader,
75            self.conf
76        )
77
78
79class TestLoaderNormalFilesystem(WsgiTestCase):
80    """Loader tests with normal filesystem (unmodified os.path module)."""
81
82    _paste_config = """
83[app:test_app]
84use = egg:Paste#static
85document_root = /tmp
86    """
87
88    def setUp(self):
89        super(TestLoaderNormalFilesystem, self).setUp()
90        self.paste_config = tempfile.NamedTemporaryFile(mode="w+t")
91        self.paste_config.write(self._paste_config.lstrip())
92        self.paste_config.seek(0)
93        self.paste_config.flush()
94
95        self.config(api_paste_config=self.paste_config.name)
96        self.loader = wsgi.Loader(CONF)
97
98    def test_config_found(self):
99        self.assertEqual(self.paste_config.name, self.loader.config_path)
100
101    def test_app_not_found(self):
102        self.assertRaises(
103            wsgi.PasteAppNotFound,
104            self.loader.load_app,
105            "nonexistent app",
106        )
107
108    def test_app_found(self):
109        url_parser = self.loader.load_app("test_app")
110        self.assertEqual("/tmp", url_parser.directory)
111
112    def tearDown(self):
113        self.paste_config.close()
114        super(TestLoaderNormalFilesystem, self).tearDown()
115
116
117class TestWSGIServer(WsgiTestCase):
118    """WSGI server tests."""
119
120    def setUp(self):
121        super(TestWSGIServer, self).setUp()
122
123    def test_no_app(self):
124        server = wsgi.Server(self.conf, "test_app", None)
125        self.assertEqual("test_app", server.name)
126
127    def test_custom_max_header_line(self):
128        self.config(max_header_line=4096)  # Default value is 16384
129        wsgi.Server(self.conf, "test_custom_max_header_line", None)
130        self.assertEqual(eventlet.wsgi.MAX_HEADER_LINE,
131                         self.conf.max_header_line)
132
133    def test_start_random_port(self):
134        server = wsgi.Server(self.conf, "test_random_port", None,
135                             host="127.0.0.1", port=0)
136        server.start()
137        self.assertNotEqual(0, server.port)
138        server.stop()
139        server.wait()
140
141    @testtools.skipIf(not netutils.is_ipv6_enabled(), "no ipv6 support")
142    def test_start_random_port_with_ipv6(self):
143        server = wsgi.Server(self.conf, "test_random_port", None,
144                             host="::1", port=0)
145        server.start()
146        self.assertEqual("::1", server.host)
147        self.assertNotEqual(0, server.port)
148        server.stop()
149        server.wait()
150
151    @testtools.skipIf(platform.mac_ver()[0] != '',
152                      'SO_REUSEADDR behaves differently '
153                      'on OSX, see bug 1436895')
154    def test_socket_options_for_simple_server(self):
155        # test normal socket options has set properly
156        self.config(tcp_keepidle=500)
157        server = wsgi.Server(self.conf, "test_socket_options", None,
158                             host="127.0.0.1", port=0)
159        server.start()
160        sock = server.socket
161        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
162                                            socket.SO_REUSEADDR))
163        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
164                                            socket.SO_KEEPALIVE))
165        if hasattr(socket, 'TCP_KEEPIDLE'):
166            self.assertEqual(self.conf.tcp_keepidle,
167                             sock.getsockopt(socket.IPPROTO_TCP,
168                                             socket.TCP_KEEPIDLE))
169        self.assertFalse(server._server.dead)
170        server.stop()
171        server.wait()
172        self.assertTrue(server._server.dead)
173
174    @testtools.skipIf(not hasattr(socket, "AF_UNIX"),
175                      'UNIX sockets not supported')
176    def test_server_with_unix_socket(self):
177        socket_file = self.get_temp_file_path('sock')
178        socket_mode = 0o644
179        server = wsgi.Server(self.conf, "test_socket_options", None,
180                             socket_family=socket.AF_UNIX,
181                             socket_mode=socket_mode,
182                             socket_file=socket_file)
183        self.assertEqual(socket_file, server.socket.getsockname())
184        self.assertEqual(socket_mode,
185                         os.stat(socket_file).st_mode & 0o777)
186        server.start()
187        self.assertFalse(server._server.dead)
188        server.stop()
189        server.wait()
190        self.assertTrue(server._server.dead)
191
192    def test_server_pool_waitall(self):
193        # test pools waitall method gets called while stopping server
194        server = wsgi.Server(self.conf, "test_server", None, host="127.0.0.1")
195        server.start()
196        with mock.patch.object(server._pool,
197                               'waitall') as mock_waitall:
198            server.stop()
199            server.wait()
200            mock_waitall.assert_called_once_with()
201
202    def test_uri_length_limit(self):
203        eventlet.monkey_patch(os=False, thread=False)
204        server = wsgi.Server(self.conf, "test_uri_length_limit", None,
205                             host="127.0.0.1", max_url_len=16384, port=33337)
206        server.start()
207        self.assertFalse(server._server.dead)
208
209        uri = "http://127.0.0.1:%d/%s" % (server.port, 10000 * 'x')
210        resp = requests.get(uri, proxies={"http": ""})
211        eventlet.sleep(0)
212        self.assertNotEqual(requests.codes.REQUEST_URI_TOO_LARGE,
213                            resp.status_code)
214
215        uri = "http://127.0.0.1:%d/%s" % (server.port, 20000 * 'x')
216        resp = requests.get(uri, proxies={"http": ""})
217        eventlet.sleep(0)
218        self.assertEqual(requests.codes.REQUEST_URI_TOO_LARGE,
219                         resp.status_code)
220        server.stop()
221        server.wait()
222
223    def test_reset_pool_size_to_default(self):
224        server = wsgi.Server(self.conf, "test_resize", None,
225                             host="127.0.0.1", max_url_len=16384)
226        server.start()
227
228        # Stopping the server, which in turn sets pool size to 0
229        server.stop()
230        self.assertEqual(0, server._pool.size)
231
232        # Resetting pool size to default
233        server.reset()
234        server.start()
235        self.assertEqual(CONF.wsgi_default_pool_size, server._pool.size)
236
237    def test_client_socket_timeout(self):
238        self.config(client_socket_timeout=5)
239
240        # mocking eventlet spawn method to check it is called with
241        # configured 'client_socket_timeout' value.
242        with mock.patch.object(eventlet,
243                               'spawn') as mock_spawn:
244            server = wsgi.Server(self.conf, "test_app", None,
245                                 host="127.0.0.1", port=0)
246            server.start()
247            _, kwargs = mock_spawn.call_args
248            self.assertEqual(self.conf.client_socket_timeout,
249                             kwargs['socket_timeout'])
250            server.stop()
251
252    def test_wsgi_keep_alive(self):
253        self.config(wsgi_keep_alive=False)
254
255        # mocking eventlet spawn method to check it is called with
256        # configured 'wsgi_keep_alive' value.
257        with mock.patch.object(eventlet,
258                               'spawn') as mock_spawn:
259            server = wsgi.Server(self.conf, "test_app", None,
260                                 host="127.0.0.1", port=0)
261            server.start()
262            _, kwargs = mock_spawn.call_args
263            self.assertEqual(self.conf.wsgi_keep_alive,
264                             kwargs['keepalive'])
265            server.stop()
266
267
268def requesting(host, port, ca_certs=None, method="POST",
269               content_type="application/x-www-form-urlencoded",
270               address_familly=socket.AF_INET):
271    frame = bytes("{verb} / HTTP/1.1\r\n\r\n".format(verb=method), "utf-8")
272    with socket.socket(address_familly, socket.SOCK_STREAM) as sock:
273        if ca_certs:
274            with eventlet.wrap_ssl(sock, ca_certs=ca_certs) as wrappedSocket:
275                wrappedSocket.connect((host, port))
276                wrappedSocket.send(frame)
277                data = wrappedSocket.recv(1024).decode()
278                return data
279        else:
280            sock.connect((host, port))
281            sock.send(frame)
282            data = sock.recv(1024).decode()
283            return data
284
285
286class TestWSGIServerWithSSL(WsgiTestCase):
287    """WSGI server with SSL tests."""
288
289    def setUp(self):
290        super(TestWSGIServerWithSSL, self).setUp()
291        cert_file_name = os.path.join(SSL_CERT_DIR, 'certificate.crt')
292        key_file_name = os.path.join(SSL_CERT_DIR, 'privatekey.key')
293        eventlet.monkey_patch(os=False, thread=False)
294        self.host = "127.0.0.1"
295
296        self.config(cert_file=cert_file_name,
297                    key_file=key_file_name,
298                    group=sslutils.config_section)
299
300    def test_ssl_server(self):
301        def test_app(env, start_response):
302            start_response('200 OK', {})
303            return ['PONG']
304
305        fake_ssl_server = wsgi.Server(self.conf, "fake_ssl", test_app,
306                                      host=self.host, port=0, use_ssl=True)
307        fake_ssl_server.start()
308        self.assertNotEqual(0, fake_ssl_server.port)
309
310        response = requesting(
311            method='GET',
312            host=self.host,
313            port=fake_ssl_server.port,
314            ca_certs=os.path.join(SSL_CERT_DIR, 'ca.crt'),
315        )
316        self.assertEqual('PONG', response[-4:])
317
318        fake_ssl_server.stop()
319        fake_ssl_server.wait()
320
321    def test_two_servers(self):
322        def test_app(env, start_response):
323            start_response('200 OK', {})
324            return ['PONG']
325
326        fake_ssl_server = wsgi.Server(self.conf, "fake_ssl", test_app,
327                                      host="127.0.0.1", port=0, use_ssl=True)
328        fake_ssl_server.start()
329        self.assertNotEqual(0, fake_ssl_server.port)
330
331        fake_server = wsgi.Server(self.conf, "fake", test_app,
332                                  host="127.0.0.1", port=0)
333        fake_server.start()
334        self.assertNotEqual(0, fake_server.port)
335
336        response = requesting(
337            method='GET',
338            host='127.0.0.1',
339            port=fake_ssl_server.port,
340            ca_certs=os.path.join(SSL_CERT_DIR, 'ca.crt'),
341        )
342        self.assertEqual('PONG', response[-4:])
343
344        response = requesting(
345            method='GET',
346            host='127.0.0.1',
347            port=fake_server.port,
348        )
349        self.assertEqual('PONG', response[-4:])
350
351        fake_ssl_server.stop()
352        fake_ssl_server.wait()
353
354        fake_server.stop()
355        fake_server.wait()
356
357    @testtools.skipIf(platform.mac_ver()[0] != '',
358                      'SO_REUSEADDR behaves differently '
359                      'on OSX, see bug 1436895')
360    def test_socket_options_for_ssl_server(self):
361        # test normal socket options has set properly
362        self.config(tcp_keepidle=500)
363        server = wsgi.Server(self.conf, "test_socket_options", None,
364                             host="127.0.0.1", port=0, use_ssl=True)
365        server.start()
366        sock = server.socket
367        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
368                                            socket.SO_REUSEADDR))
369        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
370                                            socket.SO_KEEPALIVE))
371        if hasattr(socket, 'TCP_KEEPIDLE'):
372            self.assertEqual(CONF.tcp_keepidle,
373                             sock.getsockopt(socket.IPPROTO_TCP,
374                                             socket.TCP_KEEPIDLE))
375        server.stop()
376        server.wait()
377
378    def test_app_using_ipv6_and_ssl(self):
379        greetings = 'Hello, World!!!'
380
381        @webob.dec.wsgify
382        def hello_world(req):
383            return greetings
384
385        server = wsgi.Server(self.conf, "fake_ssl",
386                             hello_world,
387                             host="::1",
388                             port=0,
389                             use_ssl=True)
390
391        server.start()
392
393        response = requesting(
394            method='GET',
395            host='::1',
396            port=server.port,
397            ca_certs=os.path.join(SSL_CERT_DIR, 'ca.crt'),
398            address_familly=socket.AF_INET6
399        )
400        self.assertEqual(greetings, response[-15:])
401
402        server.stop()
403        server.wait()
404