1#!/usr/bin/env python 2# 3# Copyright 2011 Facebook 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17"""Utilities for working with multiple processes, including both forking 18the server into multiple processes and managing subprocesses. 19""" 20 21from __future__ import absolute_import, division, print_function 22 23import errno 24import os 25import signal 26import subprocess 27import sys 28import time 29 30from binascii import hexlify 31 32from tornado.concurrent import Future 33from tornado import ioloop 34from tornado.iostream import PipeIOStream 35from tornado.log import gen_log 36from tornado.platform.auto import set_close_exec 37from tornado import stack_context 38from tornado.util import errno_from_exception, PY3 39 40try: 41 import multiprocessing 42except ImportError: 43 # Multiprocessing is not available on Google App Engine. 44 multiprocessing = None 45 46if PY3: 47 long = int 48 49# Re-export this exception for convenience. 50try: 51 CalledProcessError = subprocess.CalledProcessError 52except AttributeError: 53 # The subprocess module exists in Google App Engine, but is empty. 54 # This module isn't very useful in that case, but it should 55 # at least be importable. 56 if 'APPENGINE_RUNTIME' not in os.environ: 57 raise 58 59 60def cpu_count(): 61 """Returns the number of processors on this machine.""" 62 if multiprocessing is None: 63 return 1 64 try: 65 return multiprocessing.cpu_count() 66 except NotImplementedError: 67 pass 68 try: 69 return os.sysconf("SC_NPROCESSORS_CONF") 70 except (AttributeError, ValueError): 71 pass 72 gen_log.error("Could not detect number of processors; assuming 1") 73 return 1 74 75 76def _reseed_random(): 77 if 'random' not in sys.modules: 78 return 79 import random 80 # If os.urandom is available, this method does the same thing as 81 # random.seed (at least as of python 2.6). If os.urandom is not 82 # available, we mix in the pid in addition to a timestamp. 83 try: 84 seed = long(hexlify(os.urandom(16)), 16) 85 except NotImplementedError: 86 seed = int(time.time() * 1000) ^ os.getpid() 87 random.seed(seed) 88 89 90def _pipe_cloexec(): 91 r, w = os.pipe() 92 set_close_exec(r) 93 set_close_exec(w) 94 return r, w 95 96 97_task_id = None 98 99 100def fork_processes(num_processes, max_restarts=100): 101 """Starts multiple worker processes. 102 103 If ``num_processes`` is None or <= 0, we detect the number of cores 104 available on this machine and fork that number of child 105 processes. If ``num_processes`` is given and > 0, we fork that 106 specific number of sub-processes. 107 108 Since we use processes and not threads, there is no shared memory 109 between any server code. 110 111 Note that multiple processes are not compatible with the autoreload 112 module (or the ``autoreload=True`` option to `tornado.web.Application` 113 which defaults to True when ``debug=True``). 114 When using multiple processes, no IOLoops can be created or 115 referenced until after the call to ``fork_processes``. 116 117 In each child process, ``fork_processes`` returns its *task id*, a 118 number between 0 and ``num_processes``. Processes that exit 119 abnormally (due to a signal or non-zero exit status) are restarted 120 with the same id (up to ``max_restarts`` times). In the parent 121 process, ``fork_processes`` returns None if all child processes 122 have exited normally, but will otherwise only exit by throwing an 123 exception. 124 """ 125 global _task_id 126 assert _task_id is None 127 if num_processes is None or num_processes <= 0: 128 num_processes = cpu_count() 129 if ioloop.IOLoop.initialized(): 130 raise RuntimeError("Cannot run in multiple processes: IOLoop instance " 131 "has already been initialized. You cannot call " 132 "IOLoop.instance() before calling start_processes()") 133 gen_log.info("Starting %d processes", num_processes) 134 children = {} 135 136 def start_child(i): 137 pid = os.fork() 138 if pid == 0: 139 # child process 140 _reseed_random() 141 global _task_id 142 _task_id = i 143 return i 144 else: 145 children[pid] = i 146 return None 147 148 for i in range(num_processes): 149 id = start_child(i) 150 if id is not None: 151 return id 152 num_restarts = 0 153 while children: 154 try: 155 pid, status = os.wait() 156 except OSError as e: 157 if errno_from_exception(e) == errno.EINTR: 158 continue 159 raise 160 if pid not in children: 161 continue 162 id = children.pop(pid) 163 if os.WIFSIGNALED(status): 164 gen_log.warning("child %d (pid %d) killed by signal %d, restarting", 165 id, pid, os.WTERMSIG(status)) 166 elif os.WEXITSTATUS(status) != 0: 167 gen_log.warning("child %d (pid %d) exited with status %d, restarting", 168 id, pid, os.WEXITSTATUS(status)) 169 else: 170 gen_log.info("child %d (pid %d) exited normally", id, pid) 171 continue 172 num_restarts += 1 173 if num_restarts > max_restarts: 174 raise RuntimeError("Too many child restarts, giving up") 175 new_id = start_child(id) 176 if new_id is not None: 177 return new_id 178 # All child processes exited cleanly, so exit the master process 179 # instead of just returning to right after the call to 180 # fork_processes (which will probably just start up another IOLoop 181 # unless the caller checks the return value). 182 sys.exit(0) 183 184 185def task_id(): 186 """Returns the current task id, if any. 187 188 Returns None if this process was not created by `fork_processes`. 189 """ 190 global _task_id 191 return _task_id 192 193 194class Subprocess(object): 195 """Wraps ``subprocess.Popen`` with IOStream support. 196 197 The constructor is the same as ``subprocess.Popen`` with the following 198 additions: 199 200 * ``stdin``, ``stdout``, and ``stderr`` may have the value 201 ``tornado.process.Subprocess.STREAM``, which will make the corresponding 202 attribute of the resulting Subprocess a `.PipeIOStream`. 203 * A new keyword argument ``io_loop`` may be used to pass in an IOLoop. 204 205 The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and 206 ``wait_for_exit`` methods do not work on Windows. There is 207 therefore no reason to use this class instead of 208 ``subprocess.Popen`` on that platform. 209 210 .. versionchanged:: 4.1 211 The ``io_loop`` argument is deprecated. 212 213 """ 214 STREAM = object() 215 216 _initialized = False 217 _waiting = {} # type: ignore 218 219 def __init__(self, *args, **kwargs): 220 self.io_loop = kwargs.pop('io_loop', None) or ioloop.IOLoop.current() 221 # All FDs we create should be closed on error; those in to_close 222 # should be closed in the parent process on success. 223 pipe_fds = [] 224 to_close = [] 225 if kwargs.get('stdin') is Subprocess.STREAM: 226 in_r, in_w = _pipe_cloexec() 227 kwargs['stdin'] = in_r 228 pipe_fds.extend((in_r, in_w)) 229 to_close.append(in_r) 230 self.stdin = PipeIOStream(in_w, io_loop=self.io_loop) 231 if kwargs.get('stdout') is Subprocess.STREAM: 232 out_r, out_w = _pipe_cloexec() 233 kwargs['stdout'] = out_w 234 pipe_fds.extend((out_r, out_w)) 235 to_close.append(out_w) 236 self.stdout = PipeIOStream(out_r, io_loop=self.io_loop) 237 if kwargs.get('stderr') is Subprocess.STREAM: 238 err_r, err_w = _pipe_cloexec() 239 kwargs['stderr'] = err_w 240 pipe_fds.extend((err_r, err_w)) 241 to_close.append(err_w) 242 self.stderr = PipeIOStream(err_r, io_loop=self.io_loop) 243 try: 244 self.proc = subprocess.Popen(*args, **kwargs) 245 except: 246 for fd in pipe_fds: 247 os.close(fd) 248 raise 249 for fd in to_close: 250 os.close(fd) 251 for attr in ['stdin', 'stdout', 'stderr', 'pid']: 252 if not hasattr(self, attr): # don't clobber streams set above 253 setattr(self, attr, getattr(self.proc, attr)) 254 self._exit_callback = None 255 self.returncode = None 256 257 def set_exit_callback(self, callback): 258 """Runs ``callback`` when this process exits. 259 260 The callback takes one argument, the return code of the process. 261 262 This method uses a ``SIGCHLD`` handler, which is a global setting 263 and may conflict if you have other libraries trying to handle the 264 same signal. If you are using more than one ``IOLoop`` it may 265 be necessary to call `Subprocess.initialize` first to designate 266 one ``IOLoop`` to run the signal handlers. 267 268 In many cases a close callback on the stdout or stderr streams 269 can be used as an alternative to an exit callback if the 270 signal handler is causing a problem. 271 """ 272 self._exit_callback = stack_context.wrap(callback) 273 Subprocess.initialize(self.io_loop) 274 Subprocess._waiting[self.pid] = self 275 Subprocess._try_cleanup_process(self.pid) 276 277 def wait_for_exit(self, raise_error=True): 278 """Returns a `.Future` which resolves when the process exits. 279 280 Usage:: 281 282 ret = yield proc.wait_for_exit() 283 284 This is a coroutine-friendly alternative to `set_exit_callback` 285 (and a replacement for the blocking `subprocess.Popen.wait`). 286 287 By default, raises `subprocess.CalledProcessError` if the process 288 has a non-zero exit status. Use ``wait_for_exit(raise_error=False)`` 289 to suppress this behavior and return the exit status without raising. 290 291 .. versionadded:: 4.2 292 """ 293 future = Future() 294 295 def callback(ret): 296 if ret != 0 and raise_error: 297 # Unfortunately we don't have the original args any more. 298 future.set_exception(CalledProcessError(ret, None)) 299 else: 300 future.set_result(ret) 301 self.set_exit_callback(callback) 302 return future 303 304 @classmethod 305 def initialize(cls, io_loop=None): 306 """Initializes the ``SIGCHLD`` handler. 307 308 The signal handler is run on an `.IOLoop` to avoid locking issues. 309 Note that the `.IOLoop` used for signal handling need not be the 310 same one used by individual Subprocess objects (as long as the 311 ``IOLoops`` are each running in separate threads). 312 313 .. versionchanged:: 4.1 314 The ``io_loop`` argument is deprecated. 315 """ 316 if cls._initialized: 317 return 318 if io_loop is None: 319 io_loop = ioloop.IOLoop.current() 320 cls._old_sigchld = signal.signal( 321 signal.SIGCHLD, 322 lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup)) 323 cls._initialized = True 324 325 @classmethod 326 def uninitialize(cls): 327 """Removes the ``SIGCHLD`` handler.""" 328 if not cls._initialized: 329 return 330 signal.signal(signal.SIGCHLD, cls._old_sigchld) 331 cls._initialized = False 332 333 @classmethod 334 def _cleanup(cls): 335 for pid in list(cls._waiting.keys()): # make a copy 336 cls._try_cleanup_process(pid) 337 338 @classmethod 339 def _try_cleanup_process(cls, pid): 340 try: 341 ret_pid, status = os.waitpid(pid, os.WNOHANG) 342 except OSError as e: 343 if errno_from_exception(e) == errno.ECHILD: 344 return 345 if ret_pid == 0: 346 return 347 assert ret_pid == pid 348 subproc = cls._waiting.pop(pid) 349 subproc.io_loop.add_callback_from_signal( 350 subproc._set_returncode, status) 351 352 def _set_returncode(self, status): 353 if os.WIFSIGNALED(status): 354 self.returncode = -os.WTERMSIG(status) 355 else: 356 assert os.WIFEXITED(status) 357 self.returncode = os.WEXITSTATUS(status) 358 # We've taken over wait() duty from the subprocess.Popen 359 # object. If we don't inform it of the process's return code, 360 # it will log a warning at destruction in python 3.6+. 361 self.proc.returncode = self.returncode 362 if self._exit_callback: 363 callback = self._exit_callback 364 self._exit_callback = None 365 callback(self.returncode) 366