1# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. (HP) 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import os 16import re 17import subprocess 18import sys 19try: 20 # python 2 21 from urllib2 import urlopen 22except ImportError: 23 # python 3 24 from urllib.request import urlopen 25 26from pbr.tests import base 27 28 29class TestWsgiScripts(base.BaseTestCase): 30 31 cmd_names = ('pbr_test_wsgi', 'pbr_test_wsgi_with_class') 32 33 def _get_path(self): 34 if os.path.isdir("%s/lib64" % self.temp_dir): 35 path = "%s/lib64" % self.temp_dir 36 elif os.path.isdir("%s/lib" % self.temp_dir): 37 path = "%s/lib" % self.temp_dir 38 elif os.path.isdir("%s/site-packages" % self.temp_dir): 39 return ".:%s/site-packages" % self.temp_dir 40 else: 41 raise Exception("Could not determine path for test") 42 return ".:%s/python%s.%s/site-packages" % ( 43 path, 44 sys.version_info[0], 45 sys.version_info[1]) 46 47 def test_wsgi_script_install(self): 48 """Test that we install a non-pkg-resources wsgi script.""" 49 if os.name == 'nt': 50 self.skipTest('Windows support is passthrough') 51 52 stdout, _, return_code = self.run_setup( 53 'install', '--prefix=%s' % self.temp_dir) 54 55 self._check_wsgi_install_content(stdout) 56 57 def test_wsgi_script_run(self): 58 """Test that we install a runnable wsgi script. 59 60 This test actually attempts to start and interact with the 61 wsgi script in question to demonstrate that it's a working 62 wsgi script using simple server. 63 64 """ 65 if os.name == 'nt': 66 self.skipTest('Windows support is passthrough') 67 68 stdout, _, return_code = self.run_setup( 69 'install', '--prefix=%s' % self.temp_dir) 70 71 self._check_wsgi_install_content(stdout) 72 73 # Live test run the scripts and see that they respond to wsgi 74 # requests. 75 for cmd_name in self.cmd_names: 76 self._test_wsgi(cmd_name, b'Hello World') 77 78 def _test_wsgi(self, cmd_name, output, extra_args=None): 79 cmd = os.path.join(self.temp_dir, 'bin', cmd_name) 80 print("Running %s -p 0 -b 127.0.0.1" % cmd) 81 popen_cmd = [cmd, '-p', '0', '-b', '127.0.0.1'] 82 if extra_args: 83 popen_cmd.extend(extra_args) 84 85 env = {'PYTHONPATH': self._get_path()} 86 87 p = subprocess.Popen(popen_cmd, stdout=subprocess.PIPE, 88 stderr=subprocess.PIPE, cwd=self.temp_dir, 89 env=env) 90 self.addCleanup(p.kill) 91 92 stdoutdata = p.stdout.readline() # ****... 93 94 stdoutdata = p.stdout.readline() # STARTING test server... 95 self.assertIn( 96 b"STARTING test server pbr_testpackage.wsgi", 97 stdoutdata) 98 99 stdoutdata = p.stdout.readline() # Available at ... 100 print(stdoutdata) 101 m = re.search(br'(http://[^:]+:\d+)/', stdoutdata) 102 self.assertIsNotNone(m, "Regex failed to match on %s" % stdoutdata) 103 104 stdoutdata = p.stdout.readline() # DANGER! ... 105 self.assertIn( 106 b"DANGER! For testing only, do not use in production", 107 stdoutdata) 108 109 stdoutdata = p.stdout.readline() # ***... 110 111 f = urlopen(m.group(1).decode('utf-8')) 112 self.assertEqual(output, f.read()) 113 114 # Request again so that the application can force stderr.flush(), 115 # otherwise the log is buffered and the next readline() will hang. 116 urlopen(m.group(1).decode('utf-8')) 117 118 stdoutdata = p.stderr.readline() 119 # we should have logged an HTTP request, return code 200, that 120 # returned the right amount of bytes 121 status = '"GET / HTTP/1.1" 200 %d' % len(output) 122 self.assertIn(status.encode('utf-8'), stdoutdata) 123 124 def _check_wsgi_install_content(self, install_stdout): 125 for cmd_name in self.cmd_names: 126 install_txt = 'Installing %s script to %s' % (cmd_name, 127 self.temp_dir) 128 self.assertIn(install_txt, install_stdout) 129 130 cmd_filename = os.path.join(self.temp_dir, 'bin', cmd_name) 131 132 script_txt = open(cmd_filename, 'r').read() 133 self.assertNotIn('pkg_resources', script_txt) 134 135 main_block = """if __name__ == "__main__": 136 import argparse 137 import socket 138 import sys 139 import wsgiref.simple_server as wss""" 140 141 if cmd_name == 'pbr_test_wsgi': 142 app_name = "main" 143 else: 144 app_name = "WSGI.app" 145 146 starting_block = ("STARTING test server pbr_testpackage.wsgi." 147 "%s" % app_name) 148 149 else_block = """else: 150 application = None""" 151 152 self.assertIn(main_block, script_txt) 153 self.assertIn(starting_block, script_txt) 154 self.assertIn(else_block, script_txt) 155 156 def test_with_argument(self): 157 if os.name == 'nt': 158 self.skipTest('Windows support is passthrough') 159 160 stdout, _, return_code = self.run_setup( 161 'install', '--prefix=%s' % self.temp_dir) 162 163 self._test_wsgi('pbr_test_wsgi', b'Foo Bar', ["--", "-c", "Foo Bar"]) 164