1# -*- coding: utf-8 -*- 2"""Program used to daemonize the worker. 3 4Using :func:`os.execv` as forking and multiprocessing 5leads to weird issues (it was a long time ago now, but it 6could have something to do with the threading mutex bug) 7""" 8from __future__ import absolute_import, unicode_literals 9 10import argparse 11import os 12import sys 13 14import celery 15from celery.bin.base import daemon_options 16from celery.platforms import EX_FAILURE, detached 17from celery.utils.log import get_logger 18from celery.utils.nodenames import default_nodename, node_format 19 20__all__ = ('detached_celeryd', 'detach') 21 22logger = get_logger(__name__) 23C_FAKEFORK = os.environ.get('C_FAKEFORK') 24 25 26def detach(path, argv, logfile=None, pidfile=None, uid=None, 27 gid=None, umask=None, workdir=None, fake=False, app=None, 28 executable=None, hostname=None): 29 """Detach program by argv'.""" 30 hostname = default_nodename(hostname) 31 logfile = node_format(logfile, hostname) 32 pidfile = node_format(pidfile, hostname) 33 fake = 1 if C_FAKEFORK else fake 34 with detached(logfile, pidfile, uid, gid, umask, workdir, fake, 35 after_forkers=False): 36 try: 37 if executable is not None: 38 path = executable 39 os.execv(path, [path] + argv) 40 except Exception: # pylint: disable=broad-except 41 if app is None: 42 from celery import current_app 43 app = current_app 44 app.log.setup_logging_subsystem( 45 'ERROR', logfile, hostname=hostname) 46 logger.critical("Can't exec %r", ' '.join([path] + argv), 47 exc_info=True) 48 return EX_FAILURE 49 50 51class detached_celeryd(object): 52 """Daemonize the celery worker process.""" 53 54 usage = '%(prog)s [options] [celeryd options]' 55 version = celery.VERSION_BANNER 56 description = ('Detaches Celery worker nodes. See `celery worker --help` ' 57 'for the list of supported worker arguments.') 58 command = sys.executable 59 execv_path = sys.executable 60 execv_argv = ['-m', 'celery', 'worker'] 61 62 def __init__(self, app=None): 63 self.app = app 64 65 def create_parser(self, prog_name): 66 parser = argparse.ArgumentParser( 67 prog=prog_name, 68 usage=self.usage, 69 description=self.description, 70 ) 71 self._add_version_argument(parser) 72 self.add_arguments(parser) 73 return parser 74 75 def _add_version_argument(self, parser): 76 parser.add_argument( 77 '--version', action='version', version=self.version, 78 ) 79 80 def parse_options(self, prog_name, argv): 81 parser = self.create_parser(prog_name) 82 options, leftovers = parser.parse_known_args(argv) 83 if options.logfile: 84 leftovers.append('--logfile={0}'.format(options.logfile)) 85 if options.pidfile: 86 leftovers.append('--pidfile={0}'.format(options.pidfile)) 87 if options.hostname: 88 leftovers.append('--hostname={0}'.format(options.hostname)) 89 return options, leftovers 90 91 def execute_from_commandline(self, argv=None): 92 argv = sys.argv if argv is None else argv 93 prog_name = os.path.basename(argv[0]) 94 config, argv = self._split_command_line_config(argv) 95 options, leftovers = self.parse_options(prog_name, argv[1:]) 96 sys.exit(detach( 97 app=self.app, path=self.execv_path, 98 argv=self.execv_argv + leftovers + config, 99 **vars(options) 100 )) 101 102 def _split_command_line_config(self, argv): 103 config = list(self._extract_command_line_config(argv)) 104 try: 105 argv = argv[:argv.index('--')] 106 except ValueError: 107 pass 108 return config, argv 109 110 def _extract_command_line_config(self, argv): 111 # Extracts command-line config appearing after '--': 112 # celery worker -l info -- worker.prefetch_multiplier=10 113 # This to make sure argparse doesn't gobble it up. 114 seen_cargs = 0 115 for arg in argv: 116 if seen_cargs: 117 yield arg 118 else: 119 if arg == '--': 120 seen_cargs = 1 121 yield arg 122 123 def add_arguments(self, parser): 124 daemon_options(parser, default_pidfile='celeryd.pid') 125 parser.add_argument('--workdir', default=None) 126 parser.add_argument('-n', '--hostname') 127 parser.add_argument( 128 '--fake', 129 action='store_true', default=False, 130 help="Don't fork (for debugging purposes)", 131 ) 132 133 134def main(app=None): 135 detached_celeryd(app).execute_from_commandline() 136 137 138if __name__ == '__main__': # pragma: no cover 139 main() 140