1#!/usr/bin/env python
2#
3# Copyright 2011 Facebook
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17"""Utilities for working with multiple processes, including both forking
18the server into multiple processes and managing subprocesses.
19"""
20
21from __future__ import absolute_import, division, print_function
22
23import errno
24import os
25import signal
26import subprocess
27import sys
28import time
29
30from binascii import hexlify
31
32from tornado.concurrent import Future
33from tornado import ioloop
34from tornado.iostream import PipeIOStream
35from tornado.log import gen_log
36from tornado.platform.auto import set_close_exec
37from tornado import stack_context
38from tornado.util import errno_from_exception, PY3
39
40try:
41    import multiprocessing
42except ImportError:
43    # Multiprocessing is not available on Google App Engine.
44    multiprocessing = None
45
46if PY3:
47    long = int
48
49# Re-export this exception for convenience.
50try:
51    CalledProcessError = subprocess.CalledProcessError
52except AttributeError:
53    # The subprocess module exists in Google App Engine, but is empty.
54    # This module isn't very useful in that case, but it should
55    # at least be importable.
56    if 'APPENGINE_RUNTIME' not in os.environ:
57        raise
58
59
60def cpu_count():
61    """Returns the number of processors on this machine."""
62    if multiprocessing is None:
63        return 1
64    try:
65        return multiprocessing.cpu_count()
66    except NotImplementedError:
67        pass
68    try:
69        return os.sysconf("SC_NPROCESSORS_CONF")
70    except (AttributeError, ValueError):
71        pass
72    gen_log.error("Could not detect number of processors; assuming 1")
73    return 1
74
75
76def _reseed_random():
77    if 'random' not in sys.modules:
78        return
79    import random
80    # If os.urandom is available, this method does the same thing as
81    # random.seed (at least as of python 2.6).  If os.urandom is not
82    # available, we mix in the pid in addition to a timestamp.
83    try:
84        seed = long(hexlify(os.urandom(16)), 16)
85    except NotImplementedError:
86        seed = int(time.time() * 1000) ^ os.getpid()
87    random.seed(seed)
88
89
90def _pipe_cloexec():
91    r, w = os.pipe()
92    set_close_exec(r)
93    set_close_exec(w)
94    return r, w
95
96
97_task_id = None
98
99
100def fork_processes(num_processes, max_restarts=100):
101    """Starts multiple worker processes.
102
103    If ``num_processes`` is None or <= 0, we detect the number of cores
104    available on this machine and fork that number of child
105    processes. If ``num_processes`` is given and > 0, we fork that
106    specific number of sub-processes.
107
108    Since we use processes and not threads, there is no shared memory
109    between any server code.
110
111    Note that multiple processes are not compatible with the autoreload
112    module (or the ``autoreload=True`` option to `tornado.web.Application`
113    which defaults to True when ``debug=True``).
114    When using multiple processes, no IOLoops can be created or
115    referenced until after the call to ``fork_processes``.
116
117    In each child process, ``fork_processes`` returns its *task id*, a
118    number between 0 and ``num_processes``.  Processes that exit
119    abnormally (due to a signal or non-zero exit status) are restarted
120    with the same id (up to ``max_restarts`` times).  In the parent
121    process, ``fork_processes`` returns None if all child processes
122    have exited normally, but will otherwise only exit by throwing an
123    exception.
124    """
125    global _task_id
126    assert _task_id is None
127    if num_processes is None or num_processes <= 0:
128        num_processes = cpu_count()
129    if ioloop.IOLoop.initialized():
130        raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
131                           "has already been initialized. You cannot call "
132                           "IOLoop.instance() before calling start_processes()")
133    gen_log.info("Starting %d processes", num_processes)
134    children = {}
135
136    def start_child(i):
137        pid = os.fork()
138        if pid == 0:
139            # child process
140            _reseed_random()
141            global _task_id
142            _task_id = i
143            return i
144        else:
145            children[pid] = i
146            return None
147
148    for i in range(num_processes):
149        id = start_child(i)
150        if id is not None:
151            return id
152    num_restarts = 0
153    while children:
154        try:
155            pid, status = os.wait()
156        except OSError as e:
157            if errno_from_exception(e) == errno.EINTR:
158                continue
159            raise
160        if pid not in children:
161            continue
162        id = children.pop(pid)
163        if os.WIFSIGNALED(status):
164            gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
165                            id, pid, os.WTERMSIG(status))
166        elif os.WEXITSTATUS(status) != 0:
167            gen_log.warning("child %d (pid %d) exited with status %d, restarting",
168                            id, pid, os.WEXITSTATUS(status))
169        else:
170            gen_log.info("child %d (pid %d) exited normally", id, pid)
171            continue
172        num_restarts += 1
173        if num_restarts > max_restarts:
174            raise RuntimeError("Too many child restarts, giving up")
175        new_id = start_child(id)
176        if new_id is not None:
177            return new_id
178    # All child processes exited cleanly, so exit the master process
179    # instead of just returning to right after the call to
180    # fork_processes (which will probably just start up another IOLoop
181    # unless the caller checks the return value).
182    sys.exit(0)
183
184
185def task_id():
186    """Returns the current task id, if any.
187
188    Returns None if this process was not created by `fork_processes`.
189    """
190    global _task_id
191    return _task_id
192
193
194class Subprocess(object):
195    """Wraps ``subprocess.Popen`` with IOStream support.
196
197    The constructor is the same as ``subprocess.Popen`` with the following
198    additions:
199
200    * ``stdin``, ``stdout``, and ``stderr`` may have the value
201      ``tornado.process.Subprocess.STREAM``, which will make the corresponding
202      attribute of the resulting Subprocess a `.PipeIOStream`.
203    * A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
204
205    The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
206    ``wait_for_exit`` methods do not work on Windows. There is
207    therefore no reason to use this class instead of
208    ``subprocess.Popen`` on that platform.
209
210    .. versionchanged:: 4.1
211       The ``io_loop`` argument is deprecated.
212
213    """
214    STREAM = object()
215
216    _initialized = False
217    _waiting = {}  # type: ignore
218
219    def __init__(self, *args, **kwargs):
220        self.io_loop = kwargs.pop('io_loop', None) or ioloop.IOLoop.current()
221        # All FDs we create should be closed on error; those in to_close
222        # should be closed in the parent process on success.
223        pipe_fds = []
224        to_close = []
225        if kwargs.get('stdin') is Subprocess.STREAM:
226            in_r, in_w = _pipe_cloexec()
227            kwargs['stdin'] = in_r
228            pipe_fds.extend((in_r, in_w))
229            to_close.append(in_r)
230            self.stdin = PipeIOStream(in_w, io_loop=self.io_loop)
231        if kwargs.get('stdout') is Subprocess.STREAM:
232            out_r, out_w = _pipe_cloexec()
233            kwargs['stdout'] = out_w
234            pipe_fds.extend((out_r, out_w))
235            to_close.append(out_w)
236            self.stdout = PipeIOStream(out_r, io_loop=self.io_loop)
237        if kwargs.get('stderr') is Subprocess.STREAM:
238            err_r, err_w = _pipe_cloexec()
239            kwargs['stderr'] = err_w
240            pipe_fds.extend((err_r, err_w))
241            to_close.append(err_w)
242            self.stderr = PipeIOStream(err_r, io_loop=self.io_loop)
243        try:
244            self.proc = subprocess.Popen(*args, **kwargs)
245        except:
246            for fd in pipe_fds:
247                os.close(fd)
248            raise
249        for fd in to_close:
250            os.close(fd)
251        for attr in ['stdin', 'stdout', 'stderr', 'pid']:
252            if not hasattr(self, attr):  # don't clobber streams set above
253                setattr(self, attr, getattr(self.proc, attr))
254        self._exit_callback = None
255        self.returncode = None
256
257    def set_exit_callback(self, callback):
258        """Runs ``callback`` when this process exits.
259
260        The callback takes one argument, the return code of the process.
261
262        This method uses a ``SIGCHLD`` handler, which is a global setting
263        and may conflict if you have other libraries trying to handle the
264        same signal.  If you are using more than one ``IOLoop`` it may
265        be necessary to call `Subprocess.initialize` first to designate
266        one ``IOLoop`` to run the signal handlers.
267
268        In many cases a close callback on the stdout or stderr streams
269        can be used as an alternative to an exit callback if the
270        signal handler is causing a problem.
271        """
272        self._exit_callback = stack_context.wrap(callback)
273        Subprocess.initialize(self.io_loop)
274        Subprocess._waiting[self.pid] = self
275        Subprocess._try_cleanup_process(self.pid)
276
277    def wait_for_exit(self, raise_error=True):
278        """Returns a `.Future` which resolves when the process exits.
279
280        Usage::
281
282            ret = yield proc.wait_for_exit()
283
284        This is a coroutine-friendly alternative to `set_exit_callback`
285        (and a replacement for the blocking `subprocess.Popen.wait`).
286
287        By default, raises `subprocess.CalledProcessError` if the process
288        has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
289        to suppress this behavior and return the exit status without raising.
290
291        .. versionadded:: 4.2
292        """
293        future = Future()
294
295        def callback(ret):
296            if ret != 0 and raise_error:
297                # Unfortunately we don't have the original args any more.
298                future.set_exception(CalledProcessError(ret, None))
299            else:
300                future.set_result(ret)
301        self.set_exit_callback(callback)
302        return future
303
304    @classmethod
305    def initialize(cls, io_loop=None):
306        """Initializes the ``SIGCHLD`` handler.
307
308        The signal handler is run on an `.IOLoop` to avoid locking issues.
309        Note that the `.IOLoop` used for signal handling need not be the
310        same one used by individual Subprocess objects (as long as the
311        ``IOLoops`` are each running in separate threads).
312
313        .. versionchanged:: 4.1
314           The ``io_loop`` argument is deprecated.
315        """
316        if cls._initialized:
317            return
318        if io_loop is None:
319            io_loop = ioloop.IOLoop.current()
320        cls._old_sigchld = signal.signal(
321            signal.SIGCHLD,
322            lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup))
323        cls._initialized = True
324
325    @classmethod
326    def uninitialize(cls):
327        """Removes the ``SIGCHLD`` handler."""
328        if not cls._initialized:
329            return
330        signal.signal(signal.SIGCHLD, cls._old_sigchld)
331        cls._initialized = False
332
333    @classmethod
334    def _cleanup(cls):
335        for pid in list(cls._waiting.keys()):  # make a copy
336            cls._try_cleanup_process(pid)
337
338    @classmethod
339    def _try_cleanup_process(cls, pid):
340        try:
341            ret_pid, status = os.waitpid(pid, os.WNOHANG)
342        except OSError as e:
343            if errno_from_exception(e) == errno.ECHILD:
344                return
345        if ret_pid == 0:
346            return
347        assert ret_pid == pid
348        subproc = cls._waiting.pop(pid)
349        subproc.io_loop.add_callback_from_signal(
350            subproc._set_returncode, status)
351
352    def _set_returncode(self, status):
353        if os.WIFSIGNALED(status):
354            self.returncode = -os.WTERMSIG(status)
355        else:
356            assert os.WIFEXITED(status)
357            self.returncode = os.WEXITSTATUS(status)
358        # We've taken over wait() duty from the subprocess.Popen
359        # object. If we don't inform it of the process's return code,
360        # it will log a warning at destruction in python 3.6+.
361        self.proc.returncode = self.returncode
362        if self._exit_callback:
363            callback = self._exit_callback
364            self._exit_callback = None
365            callback(self.returncode)
366