1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A "Test Server Spawner" that handles killing/stopping per-test test servers.
6
7It's used to accept requests from the device to spawn and kill instances of the
8chrome test server on the host.
9"""
10
11import BaseHTTPServer
12import json
13import logging
14import os
15import select
16import struct
17import subprocess
18import threading
19import time
20import urlparse
21
22import constants
23from forwarder import Forwarder
24import ports
25
26
27# Path that are needed to import necessary modules when running testserver.py.
28os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + ':%s:%s:%s:%s' % (
29    os.path.join(constants.CHROME_DIR, 'third_party'),
30    os.path.join(constants.CHROME_DIR, 'third_party', 'tlslite'),
31    os.path.join(constants.CHROME_DIR, 'third_party', 'pyftpdlib', 'src'),
32    os.path.join(constants.CHROME_DIR, 'net', 'tools', 'testserver'))
33
34
35SERVER_TYPES = {
36    'http': '',
37    'ftp': '-f',
38    'sync': '--sync',
39    'tcpecho': '--tcp-echo',
40    'udpecho': '--udp-echo',
41}
42
43
44# The timeout (in seconds) of starting up the Python test server.
45TEST_SERVER_STARTUP_TIMEOUT = 10
46
47
48def _CheckPortStatus(port, expected_status):
49  """Returns True if port has expected_status.
50
51  Args:
52    port: the port number.
53    expected_status: boolean of expected status.
54
55  Returns:
56    Returns True if the status is expected. Otherwise returns False.
57  """
58  for timeout in range(1, 5):
59    if ports.IsHostPortUsed(port) == expected_status:
60      return True
61    time.sleep(timeout)
62  return False
63
64
65def _GetServerTypeCommandLine(server_type):
66  """Returns the command-line by the given server type.
67
68  Args:
69    server_type: the server type to be used (e.g. 'http').
70
71  Returns:
72    A string containing the command-line argument.
73  """
74  if server_type not in SERVER_TYPES:
75    raise NotImplementedError('Unknown server type: %s' % server_type)
76  if server_type == 'udpecho':
77    raise Exception('Please do not run UDP echo tests because we do not have '
78                    'a UDP forwarder tool.')
79  return SERVER_TYPES[server_type]
80
81
82class TestServerThread(threading.Thread):
83  """A thread to run the test server in a separate process."""
84
85  def __init__(self, ready_event, arguments, adb, tool, build_type):
86    """Initialize TestServerThread with the following argument.
87
88    Args:
89      ready_event: event which will be set when the test server is ready.
90      arguments: dictionary of arguments to run the test server.
91      adb: instance of AndroidCommands.
92      tool: instance of runtime error detection tool.
93      build_type: 'Release' or 'Debug'.
94    """
95    threading.Thread.__init__(self)
96    self.wait_event = threading.Event()
97    self.stop_flag = False
98    self.ready_event = ready_event
99    self.ready_event.clear()
100    self.arguments = arguments
101    self.adb = adb
102    self.tool = tool
103    self.test_server_process = None
104    self.is_ready = False
105    self.host_port = self.arguments['port']
106    assert isinstance(self.host_port, int)
107    self._test_server_forwarder = None
108    # The forwarder device port now is dynamically allocated.
109    self.forwarder_device_port = 0
110    # Anonymous pipe in order to get port info from test server.
111    self.pipe_in = None
112    self.pipe_out = None
113    self.command_line = []
114    self.build_type = build_type
115
116  def _WaitToStartAndGetPortFromTestServer(self):
117    """Waits for the Python test server to start and gets the port it is using.
118
119    The port information is passed by the Python test server with a pipe given
120    by self.pipe_out. It is written as a result to |self.host_port|.
121
122    Returns:
123      Whether the port used by the test server was successfully fetched.
124    """
125    assert self.host_port == 0 and self.pipe_out and self.pipe_in
126    (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
127                                   TEST_SERVER_STARTUP_TIMEOUT)
128    if len(in_fds) == 0:
129      logging.error('Failed to wait to the Python test server to be started.')
130      return False
131    # First read the data length as an unsigned 4-byte value.  This
132    # is _not_ using network byte ordering since the Python test server packs
133    # size as native byte order and all Chromium platforms so far are
134    # configured to use little-endian.
135    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
136    # use a unified byte order (either big-endian or little-endian).
137    data_length = os.read(self.pipe_in, struct.calcsize('=L'))
138    if data_length:
139      (data_length,) = struct.unpack('=L', data_length)
140      assert data_length
141    if not data_length:
142      logging.error('Failed to get length of server data.')
143      return False
144    port_json = os.read(self.pipe_in, data_length)
145    if not port_json:
146      logging.error('Failed to get server data.')
147      return False
148    logging.info('Got port json data: %s', port_json)
149    port_json = json.loads(port_json)
150    if port_json.has_key('port') and isinstance(port_json['port'], int):
151      self.host_port = port_json['port']
152      return _CheckPortStatus(self.host_port, True)
153    logging.error('Failed to get port information from the server data.')
154    return False
155
156  def _GenerateCommandLineArguments(self):
157    """Generates the command line to run the test server.
158
159    Note that all options are processed by following the definitions in
160    testserver.py.
161    """
162    if self.command_line:
163      return
164    # The following arguments must exist.
165    type_cmd = _GetServerTypeCommandLine(self.arguments['server-type'])
166    if type_cmd:
167      self.command_line.append(type_cmd)
168    self.command_line.append('--port=%d' % self.host_port)
169    # Use a pipe to get the port given by the instance of Python test server
170    # if the test does not specify the port.
171    if self.host_port == 0:
172      (self.pipe_in, self.pipe_out) = os.pipe()
173      self.command_line.append('--startup-pipe=%d' % self.pipe_out)
174    self.command_line.append('--host=%s' % self.arguments['host'])
175    data_dir = self.arguments['data-dir'] or 'chrome/test/data'
176    if not os.path.isabs(data_dir):
177      data_dir = os.path.join(constants.CHROME_DIR, data_dir)
178    self.command_line.append('--data-dir=%s' % data_dir)
179    # The following arguments are optional depending on the individual test.
180    if self.arguments.has_key('log-to-console'):
181      self.command_line.append('--log-to-console')
182    if self.arguments.has_key('auth-token'):
183      self.command_line.append('--auth-token=%s' % self.arguments['auth-token'])
184    if self.arguments.has_key('https'):
185      self.command_line.append('--https')
186      if self.arguments.has_key('cert-and-key-file'):
187        self.command_line.append('--cert-and-key-file=%s' % os.path.join(
188            constants.CHROME_DIR, self.arguments['cert-and-key-file']))
189      if self.arguments.has_key('ocsp'):
190        self.command_line.append('--ocsp=%s' % self.arguments['ocsp'])
191      if self.arguments.has_key('https-record-resume'):
192        self.command_line.append('--https-record-resume')
193      if self.arguments.has_key('ssl-client-auth'):
194        self.command_line.append('--ssl-client-auth')
195      if self.arguments.has_key('tls-intolerant'):
196        self.command_line.append('--tls-intolerant=%s' %
197                                 self.arguments['tls-intolerant'])
198      if self.arguments.has_key('ssl-client-ca'):
199        for ca in self.arguments['ssl-client-ca']:
200          self.command_line.append('--ssl-client-ca=%s' %
201                                   os.path.join(constants.CHROME_DIR, ca))
202      if self.arguments.has_key('ssl-bulk-cipher'):
203        for bulk_cipher in self.arguments['ssl-bulk-cipher']:
204          self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher)
205
206  def run(self):
207    logging.info('Start running the thread!')
208    self.wait_event.clear()
209    self._GenerateCommandLineArguments()
210    command = [os.path.join(constants.CHROME_DIR, 'net', 'tools',
211                            'testserver', 'testserver.py')] + self.command_line
212    logging.info('Running: %s', command)
213    self.process = subprocess.Popen(command)
214    if self.process:
215      if self.pipe_out:
216        self.is_ready = self._WaitToStartAndGetPortFromTestServer()
217      else:
218        self.is_ready = _CheckPortStatus(self.host_port, True)
219    if self.is_ready:
220      self._test_server_forwarder = Forwarder(
221          self.adb, [(0, self.host_port)], self.tool, '127.0.0.1',
222          self.build_type)
223      # Check whether the forwarder is ready on the device.
224      self.is_ready = False
225      device_port = self._test_server_forwarder.DevicePortForHostPort(
226          self.host_port)
227      if device_port:
228        for timeout in range(1, 5):
229          if ports.IsDevicePortUsed(self.adb, device_port, 'LISTEN'):
230            self.is_ready = True
231            self.forwarder_device_port = device_port
232            break
233          time.sleep(timeout)
234    # Wake up the request handler thread.
235    self.ready_event.set()
236    # Keep thread running until Stop() gets called.
237    while not self.stop_flag:
238      time.sleep(1)
239    if self.process.poll() is None:
240      self.process.kill()
241    if self._test_server_forwarder:
242      self._test_server_forwarder.Close()
243    self.process = None
244    self.is_ready = False
245    if self.pipe_out:
246      os.close(self.pipe_in)
247      os.close(self.pipe_out)
248      self.pipe_in = None
249      self.pipe_out = None
250    logging.info('Test-server has died.')
251    self.wait_event.set()
252
253  def Stop(self):
254    """Blocks until the loop has finished.
255
256    Note that this must be called in another thread.
257    """
258    if not self.process:
259      return
260    self.stop_flag = True
261    self.wait_event.wait()
262
263
264class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
265  """A handler used to process http GET/POST request."""
266
267  def _SendResponse(self, response_code, response_reason, additional_headers,
268                    contents):
269    """Generates a response sent to the client from the provided parameters.
270
271    Args:
272      response_code: number of the response status.
273      response_reason: string of reason description of the response.
274      additional_headers: dict of additional headers. Each key is the name of
275                          the header, each value is the content of the header.
276      contents: string of the contents we want to send to client.
277    """
278    self.send_response(response_code, response_reason)
279    self.send_header('Content-Type', 'text/html')
280    # Specify the content-length as without it the http(s) response will not
281    # be completed properly (and the browser keeps expecting data).
282    self.send_header('Content-Length', len(contents))
283    for header_name in additional_headers:
284      self.send_header(header_name, additional_headers[header_name])
285    self.end_headers()
286    self.wfile.write(contents)
287    self.wfile.flush()
288
289  def _StartTestServer(self):
290    """Starts the test server thread."""
291    logging.info('Handling request to spawn a test server.')
292    content_type = self.headers.getheader('content-type')
293    if content_type != 'application/json':
294      raise Exception('Bad content-type for start request.')
295    content_length = self.headers.getheader('content-length')
296    if not content_length:
297      content_length = 0
298    try:
299      content_length = int(content_length)
300    except:
301      raise Exception('Bad content-length for start request.')
302    logging.info(content_length)
303    test_server_argument_json = self.rfile.read(content_length)
304    logging.info(test_server_argument_json)
305    assert not self.server.test_server_instance
306    ready_event = threading.Event()
307    self.server.test_server_instance = TestServerThread(
308        ready_event,
309        json.loads(test_server_argument_json),
310        self.server.adb,
311        self.server.tool,
312        self.server.build_type)
313    self.server.test_server_instance.setDaemon(True)
314    self.server.test_server_instance.start()
315    ready_event.wait()
316    if self.server.test_server_instance.is_ready:
317      self._SendResponse(200, 'OK', {}, json.dumps(
318          {'port': self.server.test_server_instance.forwarder_device_port,
319           'message': 'started'}))
320      logging.info('Test server is running on port: %d.',
321                   self.server.test_server_instance.host_port)
322    else:
323      self.server.test_server_instance.Stop()
324      self.server.test_server_instance = None
325      self._SendResponse(500, 'Test Server Error.', {}, '')
326      logging.info('Encounter problem during starting a test server.')
327
328  def _KillTestServer(self):
329    """Stops the test server instance."""
330    # There should only ever be one test server at a time. This may do the
331    # wrong thing if we try and start multiple test servers.
332    if not self.server.test_server_instance:
333      return
334    port = self.server.test_server_instance.host_port
335    logging.info('Handling request to kill a test server on port: %d.', port)
336    self.server.test_server_instance.Stop()
337    # Make sure the status of test server is correct before sending response.
338    if _CheckPortStatus(port, False):
339      self._SendResponse(200, 'OK', {}, 'killed')
340      logging.info('Test server on port %d is killed', port)
341    else:
342      self._SendResponse(500, 'Test Server Error.', {}, '')
343      logging.info('Encounter problem during killing a test server.')
344    self.server.test_server_instance = None
345
346  def do_POST(self):
347    parsed_path = urlparse.urlparse(self.path)
348    action = parsed_path.path
349    logging.info('Action for POST method is: %s.', action)
350    if action == '/start':
351      self._StartTestServer()
352    else:
353      self._SendResponse(400, 'Unknown request.', {}, '')
354      logging.info('Encounter unknown request: %s.', action)
355
356  def do_GET(self):
357    parsed_path = urlparse.urlparse(self.path)
358    action = parsed_path.path
359    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
360    logging.info('Action for GET method is: %s.', action)
361    for param in params:
362      logging.info('%s=%s', param, params[param][0])
363    if action == '/kill':
364      self._KillTestServer()
365    elif action == '/ping':
366      # The ping handler is used to check whether the spawner server is ready
367      # to serve the requests. We don't need to test the status of the test
368      # server when handling ping request.
369      self._SendResponse(200, 'OK', {}, 'ready')
370      logging.info('Handled ping request and sent response.')
371    else:
372      self._SendResponse(400, 'Unknown request', {}, '')
373      logging.info('Encounter unknown request: %s.', action)
374
375
376class SpawningServer(object):
377  """The class used to start/stop a http server."""
378
379  def __init__(self, test_server_spawner_port, adb, tool, build_type):
380    logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
381    self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
382                                            SpawningServerRequestHandler)
383    self.port = test_server_spawner_port
384    self.server.adb = adb
385    self.server.tool = tool
386    self.server.test_server_instance = None
387    self.server.build_type = build_type
388
389  def _Listen(self):
390    logging.info('Starting test server spawner')
391    self.server.serve_forever()
392
393  def Start(self):
394    listener_thread = threading.Thread(target=self._Listen)
395    listener_thread.setDaemon(True)
396    listener_thread.start()
397    time.sleep(1)
398
399  def Stop(self):
400    if self.server.test_server_instance:
401      self.server.test_server_instance.Stop()
402    self.server.shutdown()
403