1# -*- coding: utf-8 -*-
2"""Program used to daemonize the worker.
3
4Using :func:`os.execv` as forking and multiprocessing
5leads to weird issues (it was a long time ago now, but it
6could have something to do with the threading mutex bug)
7"""
8from __future__ import absolute_import, unicode_literals
9
10import argparse
11import os
12import sys
13
14import celery
15from celery.bin.base import daemon_options
16from celery.platforms import EX_FAILURE, detached
17from celery.utils.log import get_logger
18from celery.utils.nodenames import default_nodename, node_format
19
20__all__ = ('detached_celeryd', 'detach')
21
22logger = get_logger(__name__)
23C_FAKEFORK = os.environ.get('C_FAKEFORK')
24
25
26def detach(path, argv, logfile=None, pidfile=None, uid=None,
27           gid=None, umask=None, workdir=None, fake=False, app=None,
28           executable=None, hostname=None):
29    """Detach program by argv'."""
30    hostname = default_nodename(hostname)
31    logfile = node_format(logfile, hostname)
32    pidfile = node_format(pidfile, hostname)
33    fake = 1 if C_FAKEFORK else fake
34    with detached(logfile, pidfile, uid, gid, umask, workdir, fake,
35                  after_forkers=False):
36        try:
37            if executable is not None:
38                path = executable
39            os.execv(path, [path] + argv)
40        except Exception:  # pylint: disable=broad-except
41            if app is None:
42                from celery import current_app
43                app = current_app
44            app.log.setup_logging_subsystem(
45                'ERROR', logfile, hostname=hostname)
46            logger.critical("Can't exec %r", ' '.join([path] + argv),
47                            exc_info=True)
48        return EX_FAILURE
49
50
51class detached_celeryd(object):
52    """Daemonize the celery worker process."""
53
54    usage = '%(prog)s [options] [celeryd options]'
55    version = celery.VERSION_BANNER
56    description = ('Detaches Celery worker nodes.  See `celery worker --help` '
57                   'for the list of supported worker arguments.')
58    command = sys.executable
59    execv_path = sys.executable
60    execv_argv = ['-m', 'celery', 'worker']
61
62    def __init__(self, app=None):
63        self.app = app
64
65    def create_parser(self, prog_name):
66        parser = argparse.ArgumentParser(
67            prog=prog_name,
68            usage=self.usage,
69            description=self.description,
70        )
71        self._add_version_argument(parser)
72        self.add_arguments(parser)
73        return parser
74
75    def _add_version_argument(self, parser):
76        parser.add_argument(
77            '--version', action='version', version=self.version,
78        )
79
80    def parse_options(self, prog_name, argv):
81        parser = self.create_parser(prog_name)
82        options, leftovers = parser.parse_known_args(argv)
83        if options.logfile:
84            leftovers.append('--logfile={0}'.format(options.logfile))
85        if options.pidfile:
86            leftovers.append('--pidfile={0}'.format(options.pidfile))
87        if options.hostname:
88            leftovers.append('--hostname={0}'.format(options.hostname))
89        return options, leftovers
90
91    def execute_from_commandline(self, argv=None):
92        argv = sys.argv if argv is None else argv
93        prog_name = os.path.basename(argv[0])
94        config, argv = self._split_command_line_config(argv)
95        options, leftovers = self.parse_options(prog_name, argv[1:])
96        sys.exit(detach(
97            app=self.app, path=self.execv_path,
98            argv=self.execv_argv + leftovers + config,
99            **vars(options)
100        ))
101
102    def _split_command_line_config(self, argv):
103        config = list(self._extract_command_line_config(argv))
104        try:
105            argv = argv[:argv.index('--')]
106        except ValueError:
107            pass
108        return config, argv
109
110    def _extract_command_line_config(self, argv):
111        # Extracts command-line config appearing after '--':
112        #    celery worker -l info -- worker.prefetch_multiplier=10
113        # This to make sure argparse doesn't gobble it up.
114        seen_cargs = 0
115        for arg in argv:
116            if seen_cargs:
117                yield arg
118            else:
119                if arg == '--':
120                    seen_cargs = 1
121                    yield arg
122
123    def add_arguments(self, parser):
124        daemon_options(parser, default_pidfile='celeryd.pid')
125        parser.add_argument('--workdir', default=None)
126        parser.add_argument('-n', '--hostname')
127        parser.add_argument(
128            '--fake',
129            action='store_true', default=False,
130            help="Don't fork (for debugging purposes)",
131        )
132
133
134def main(app=None):
135    detached_celeryd(app).execute_from_commandline()
136
137
138if __name__ == '__main__':  # pragma: no cover
139    main()
140