1# -*- coding: utf-8 -*- 2"""Base command-line interface.""" 3from __future__ import absolute_import, print_function, unicode_literals 4 5import argparse 6import json 7import os 8import random 9import re 10import sys 11import warnings 12from collections import defaultdict 13from heapq import heappush 14from pprint import pformat 15 16from celery import VERSION_BANNER, Celery, maybe_patch_concurrency, signals 17from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning 18from celery.five import (PY2, getfullargspec, items, long_t, 19 python_2_unicode_compatible, string, string_t, 20 text_t) 21from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE, isatty 22from celery.utils import imports, term, text 23from celery.utils.functional import dictfilter 24from celery.utils.nodenames import host_format, node_format 25from celery.utils.objects import Bunch 26 27# Option is here for backwards compatibility, as third-party commands 28# may import it from here. 29try: 30 from optparse import Option # pylint: disable=deprecated-module 31except ImportError: # pragma: no cover 32 Option = None # noqa 33 34try: 35 input = raw_input 36except NameError: # pragma: no cover 37 pass 38 39__all__ = ( 40 'Error', 'UsageError', 'Extensions', 'Command', 'Option', 'daemon_options', 41) 42 43# always enable DeprecationWarnings, so our users can see them. 44for warning in (CDeprecationWarning, CPendingDeprecationWarning): 45 warnings.simplefilter('once', warning, 0) 46 47# TODO: Remove this once we drop support for Python < 3.6 48if sys.version_info < (3, 6): 49 ModuleNotFoundError = ImportError 50 51ARGV_DISABLED = """ 52Unrecognized command-line arguments: {0} 53 54Try --help? 55""" 56 57UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND = """ 58Unable to load celery application. 59The module {0} was not found. 60""" 61 62UNABLE_TO_LOAD_APP_APP_MISSING = """ 63Unable to load celery application. 64{0} 65""" 66 67find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)') 68find_rst_ref = re.compile(r':\w+:`(.+?)`') 69find_rst_decl = re.compile(r'^\s*\.\. .+?::.+$') 70 71 72def _optparse_callback_to_type(option, callback): 73 parser = Bunch(values=Bunch()) 74 75 def _on_arg(value): 76 callback(option, None, value, parser) 77 return getattr(parser.values, option.dest) 78 return _on_arg 79 80 81def _add_optparse_argument(parser, opt, typemap=None): 82 typemap = { 83 'string': text_t, 84 'int': int, 85 'long': long_t, 86 'float': float, 87 'complex': complex, 88 'choice': None} if not typemap else typemap 89 if opt.callback: 90 opt.type = _optparse_callback_to_type(opt, opt.type) 91 # argparse checks for existence of this kwarg 92 if opt.action == 'callback': 93 opt.action = None 94 # store_true sets value to "('NO', 'DEFAULT')" for some 95 # crazy reason, so not to set a sane default here. 96 if opt.action == 'store_true' and opt.default is None: 97 opt.default = False 98 parser.add_argument( 99 *opt._long_opts + opt._short_opts, 100 **dictfilter({ 101 'action': opt.action, 102 'type': typemap.get(opt.type, opt.type), 103 'dest': opt.dest, 104 'nargs': opt.nargs, 105 'choices': opt.choices, 106 'help': opt.help, 107 'metavar': opt.metavar, 108 'default': opt.default})) 109 110 111def _add_compat_options(parser, options): 112 for option in options or (): 113 if callable(option): 114 option(parser) 115 else: 116 _add_optparse_argument(parser, option) 117 118 119@python_2_unicode_compatible 120class Error(Exception): 121 """Exception raised by commands.""" 122 123 status = EX_FAILURE 124 125 def __init__(self, reason, status=None): 126 self.reason = reason 127 self.status = status if status is not None else self.status 128 super(Error, self).__init__(reason, status) 129 130 def __str__(self): 131 return self.reason 132 133 134class UsageError(Error): 135 """Exception raised for malformed arguments.""" 136 137 status = EX_USAGE 138 139 140class Extensions(object): 141 """Loads extensions from setuptools entrypoints.""" 142 143 def __init__(self, namespace, register): 144 self.names = [] 145 self.namespace = namespace 146 self.register = register 147 148 def add(self, cls, name): 149 heappush(self.names, name) 150 self.register(cls, name=name) 151 152 def load(self): 153 for name, cls in imports.load_extension_classes(self.namespace): 154 self.add(cls, name) 155 return self.names 156 157 158class Command(object): 159 """Base class for command-line applications. 160 161 Arguments: 162 app (Celery): The app to use. 163 get_app (Callable): Fucntion returning the current app 164 when no app provided. 165 """ 166 167 Error = Error 168 UsageError = UsageError 169 Parser = argparse.ArgumentParser 170 171 #: Arg list used in help. 172 args = '' 173 174 #: Application version. 175 version = VERSION_BANNER 176 177 #: If false the parser will raise an exception if positional 178 #: args are provided. 179 supports_args = True 180 181 #: List of options (without preload options). 182 option_list = None 183 184 # module Rst documentation to parse help from (if any) 185 doc = None 186 187 # Some programs (multi) does not want to load the app specified 188 # (Issue #1008). 189 respects_app_option = True 190 191 #: Enable if the application should support config from the cmdline. 192 enable_config_from_cmdline = False 193 194 #: Default configuration name-space. 195 namespace = None 196 197 #: Text to print at end of --help 198 epilog = None 199 200 #: Text to print in --help before option list. 201 description = '' 202 203 #: Set to true if this command doesn't have sub-commands 204 leaf = True 205 206 # used by :meth:`say_remote_command_reply`. 207 show_body = True 208 # used by :meth:`say_chat`. 209 show_reply = True 210 211 prog_name = 'celery' 212 213 #: Name of argparse option used for parsing positional args. 214 args_name = 'args' 215 216 def __init__(self, app=None, get_app=None, no_color=False, 217 stdout=None, stderr=None, quiet=False, on_error=None, 218 on_usage_error=None): 219 self.app = app 220 self.get_app = get_app or self._get_default_app 221 self.stdout = stdout or sys.stdout 222 self.stderr = stderr or sys.stderr 223 self._colored = None 224 self._no_color = no_color 225 self.quiet = quiet 226 if not self.description: 227 self.description = self._strip_restructeredtext(self.__doc__) 228 if on_error: 229 self.on_error = on_error 230 if on_usage_error: 231 self.on_usage_error = on_usage_error 232 233 def run(self, *args, **options): 234 raise NotImplementedError('subclass responsibility') 235 236 def on_error(self, exc): 237 # pylint: disable=method-hidden 238 # on_error argument to __init__ may override this method. 239 self.error(self.colored.red('Error: {0}'.format(exc))) 240 241 def on_usage_error(self, exc): 242 # pylint: disable=method-hidden 243 # on_usage_error argument to __init__ may override this method. 244 self.handle_error(exc) 245 246 def on_concurrency_setup(self): 247 pass 248 249 def __call__(self, *args, **kwargs): 250 random.seed() # maybe we were forked. 251 self.verify_args(args) 252 try: 253 ret = self.run(*args, **kwargs) 254 return ret if ret is not None else EX_OK 255 except self.UsageError as exc: 256 self.on_usage_error(exc) 257 return exc.status 258 except self.Error as exc: 259 self.on_error(exc) 260 return exc.status 261 262 def verify_args(self, given, _index=0): 263 S = getfullargspec(self.run) 264 _index = 1 if S.args and S.args[0] == 'self' else _index 265 required = S.args[_index:-len(S.defaults) if S.defaults else None] 266 missing = required[len(given):] 267 if missing: 268 raise self.UsageError('Missing required {0}: {1}'.format( 269 text.pluralize(len(missing), 'argument'), 270 ', '.join(missing) 271 )) 272 273 def execute_from_commandline(self, argv=None): 274 """Execute application from command-line. 275 276 Arguments: 277 argv (List[str]): The list of command-line arguments. 278 Defaults to ``sys.argv``. 279 """ 280 if argv is None: 281 argv = list(sys.argv) 282 # Should we load any special concurrency environment? 283 self.maybe_patch_concurrency(argv) 284 self.on_concurrency_setup() 285 286 # Dump version and exit if '--version' arg set. 287 self.early_version(argv) 288 try: 289 argv = self.setup_app_from_commandline(argv) 290 except ModuleNotFoundError as e: 291 # In Python 2.7 and below, there is no name instance for exceptions 292 # TODO: Remove this once we drop support for Python 2.7 293 if PY2: 294 package_name = e.message.replace("No module named ", "") 295 else: 296 package_name = e.name 297 self.on_error(UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND.format(package_name)) 298 return EX_FAILURE 299 except AttributeError as e: 300 msg = e.args[0].capitalize() 301 self.on_error(UNABLE_TO_LOAD_APP_APP_MISSING.format(msg)) 302 return EX_FAILURE 303 304 self.prog_name = os.path.basename(argv[0]) 305 return self.handle_argv(self.prog_name, argv[1:]) 306 307 def run_from_argv(self, prog_name, argv=None, command=None): 308 return self.handle_argv(prog_name, 309 sys.argv if argv is None else argv, command) 310 311 def maybe_patch_concurrency(self, argv=None): 312 argv = argv or sys.argv 313 pool_option = self.with_pool_option(argv) 314 if pool_option: 315 maybe_patch_concurrency(argv, *pool_option) 316 317 def usage(self, command): 318 return '%(prog)s {0} [options] {self.args}'.format(command, self=self) 319 320 def add_arguments(self, parser): 321 pass 322 323 def get_options(self): 324 # This is for optparse options, please use add_arguments. 325 return self.option_list 326 327 def add_preload_arguments(self, parser): 328 group = parser.add_argument_group('Global Options') 329 group.add_argument('-A', '--app', default=None) 330 group.add_argument('-b', '--broker', default=None) 331 group.add_argument('--result-backend', default=None) 332 group.add_argument('--loader', default=None) 333 group.add_argument('--config', default=None) 334 group.add_argument('--workdir', default=None) 335 group.add_argument( 336 '--no-color', '-C', action='store_true', default=None) 337 group.add_argument('--quiet', '-q', action='store_true') 338 339 def _add_version_argument(self, parser): 340 parser.add_argument( 341 '--version', action='version', version=self.version, 342 ) 343 344 def prepare_arguments(self, parser): 345 pass 346 347 def expanduser(self, value): 348 if isinstance(value, string_t): 349 return os.path.expanduser(value) 350 return value 351 352 def ask(self, q, choices, default=None): 353 """Prompt user to choose from a tuple of string values. 354 355 If a default is not specified the question will be repeated 356 until the user gives a valid choice. 357 358 Matching is case insensitive. 359 360 Arguments: 361 q (str): the question to ask (don't include questionark) 362 choice (Tuple[str]): tuple of possible choices, must be lowercase. 363 default (Any): Default value if any. 364 """ 365 schoices = choices 366 if default is not None: 367 schoices = [c.upper() if c == default else c.lower() 368 for c in choices] 369 schoices = '/'.join(schoices) 370 371 p = '{0} ({1})? '.format(q.capitalize(), schoices) 372 while 1: 373 val = input(p).lower() 374 if val in choices: 375 return val 376 elif default is not None: 377 break 378 return default 379 380 def handle_argv(self, prog_name, argv, command=None): 381 """Parse arguments from argv and dispatch to :meth:`run`. 382 383 Warning: 384 Exits with an error message if :attr:`supports_args` is disabled 385 and ``argv`` contains positional arguments. 386 387 Arguments: 388 prog_name (str): The program name (``argv[0]``). 389 argv (List[str]): Rest of command-line arguments. 390 """ 391 options, args = self.prepare_args( 392 *self.parse_options(prog_name, argv, command)) 393 return self(*args, **options) 394 395 def prepare_args(self, options, args): 396 if options: 397 options = { 398 k: self.expanduser(v) 399 for k, v in items(options) if not k.startswith('_') 400 } 401 args = [self.expanduser(arg) for arg in args] 402 self.check_args(args) 403 return options, args 404 405 def check_args(self, args): 406 if not self.supports_args and args: 407 self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE) 408 409 def error(self, s): 410 self.out(s, fh=self.stderr) 411 412 def out(self, s, fh=None): 413 print(s, file=fh or self.stdout) 414 415 def die(self, msg, status=EX_FAILURE): 416 self.error(msg) 417 sys.exit(status) 418 419 def early_version(self, argv): 420 if '--version' in argv: 421 print(self.version, file=self.stdout) 422 sys.exit(0) 423 424 def parse_options(self, prog_name, arguments, command=None): 425 """Parse the available options.""" 426 # Don't want to load configuration to just print the version, 427 # so we handle --version manually here. 428 self.parser = self.create_parser(prog_name, command) 429 options = vars(self.parser.parse_args(arguments)) 430 return options, options.pop(self.args_name, None) or [] 431 432 def create_parser(self, prog_name, command=None): 433 # for compatibility with optparse usage. 434 usage = self.usage(command).replace('%prog', '%(prog)s') 435 parser = self.Parser( 436 prog=prog_name, 437 usage=usage, 438 epilog=self._format_epilog(self.epilog), 439 formatter_class=argparse.RawDescriptionHelpFormatter, 440 description=self._format_description(self.description), 441 ) 442 self._add_version_argument(parser) 443 self.add_preload_arguments(parser) 444 self.add_arguments(parser) 445 self.add_compat_options(parser, self.get_options()) 446 self.add_compat_options(parser, self.app.user_options['preload']) 447 448 if self.supports_args: 449 # for backward compatibility with optparse, we automatically 450 # add arbitrary positional args. 451 parser.add_argument(self.args_name, nargs='*') 452 return self.prepare_parser(parser) 453 454 def _format_epilog(self, epilog): 455 if epilog: 456 return '\n{0}\n\n'.format(epilog) 457 return '' 458 459 def _format_description(self, description): 460 width = argparse.HelpFormatter('prog')._width 461 return text.ensure_newlines( 462 text.fill_paragraphs(text.dedent(description), width)) 463 464 def add_compat_options(self, parser, options): 465 _add_compat_options(parser, options) 466 467 def prepare_parser(self, parser): 468 docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc] 469 for doc in docs: 470 for long_opt, help in items(doc): 471 option = parser._option_string_actions[long_opt] 472 if option is not None: 473 option.help = ' '.join(help).format(default=option.default) 474 return parser 475 476 def setup_app_from_commandline(self, argv): 477 preload_options, remaining_options = self.parse_preload_options(argv) 478 quiet = preload_options.get('quiet') 479 if quiet is not None: 480 self.quiet = quiet 481 try: 482 self.no_color = preload_options['no_color'] 483 except KeyError: 484 pass 485 workdir = preload_options.get('workdir') 486 if workdir: 487 os.chdir(workdir) 488 app = (preload_options.get('app') or 489 os.environ.get('CELERY_APP') or 490 self.app) 491 preload_loader = preload_options.get('loader') 492 if preload_loader: 493 # Default app takes loader from this env (Issue #1066). 494 os.environ['CELERY_LOADER'] = preload_loader 495 loader = (preload_loader, 496 os.environ.get('CELERY_LOADER') or 497 'default') 498 broker = preload_options.get('broker', None) 499 if broker: 500 os.environ['CELERY_BROKER_URL'] = broker 501 result_backend = preload_options.get('result_backend', None) 502 if result_backend: 503 os.environ['CELERY_RESULT_BACKEND'] = result_backend 504 config = preload_options.get('config') 505 if config: 506 os.environ['CELERY_CONFIG_MODULE'] = config 507 if self.respects_app_option: 508 if app: 509 self.app = self.find_app(app) 510 elif self.app is None: 511 self.app = self.get_app(loader=loader) 512 if self.enable_config_from_cmdline: 513 remaining_options = self.process_cmdline_config(remaining_options) 514 else: 515 self.app = Celery(fixups=[]) 516 517 self._handle_user_preload_options(argv) 518 519 return remaining_options 520 521 def _handle_user_preload_options(self, argv): 522 user_preload = tuple(self.app.user_options['preload'] or ()) 523 if user_preload: 524 user_options, _ = self._parse_preload_options(argv, user_preload) 525 signals.user_preload_options.send( 526 sender=self, app=self.app, options=user_options, 527 ) 528 529 def find_app(self, app): 530 from celery.app.utils import find_app 531 return find_app(app, symbol_by_name=self.symbol_by_name) 532 533 def symbol_by_name(self, name, imp=imports.import_from_cwd): 534 return imports.symbol_by_name(name, imp=imp) 535 get_cls_by_name = symbol_by_name # XXX compat 536 537 def process_cmdline_config(self, argv): 538 try: 539 cargs_start = argv.index('--') 540 except ValueError: 541 return argv 542 argv, cargs = argv[:cargs_start], argv[cargs_start + 1:] 543 self.app.config_from_cmdline(cargs, namespace=self.namespace) 544 return argv 545 546 def parse_preload_options(self, args): 547 return self._parse_preload_options(args, [self.add_preload_arguments]) 548 549 def _parse_preload_options(self, args, options): 550 args = [arg for arg in args if arg not in ('-h', '--help')] 551 parser = self.Parser() 552 self.add_compat_options(parser, options) 553 namespace, unknown_args = parser.parse_known_args(args) 554 return vars(namespace), unknown_args 555 556 def add_append_opt(self, acc, opt, value): 557 default = opt.default or [] 558 559 if opt.dest not in acc: 560 acc[opt.dest] = default 561 562 acc[opt.dest].append(value) 563 564 def parse_doc(self, doc): 565 options, in_option = defaultdict(list), None 566 for line in doc.splitlines(): 567 if line.startswith('.. cmdoption::'): 568 m = find_long_opt.match(line) 569 if m: 570 in_option = m.groups()[0].strip() 571 assert in_option, 'missing long opt' 572 elif in_option and line.startswith(' ' * 4): 573 if not find_rst_decl.match(line): 574 options[in_option].append( 575 find_rst_ref.sub( 576 r'\1', line.strip()).replace('`', '')) 577 return options 578 579 def _strip_restructeredtext(self, s): 580 return '\n'.join( 581 find_rst_ref.sub(r'\1', line.replace('`', '')) 582 for line in (s or '').splitlines() 583 if not find_rst_decl.match(line) 584 ) 585 586 def with_pool_option(self, argv): 587 """Return tuple of ``(short_opts, long_opts)``. 588 589 Returns only if the command 590 supports a pool argument, and used to monkey patch eventlet/gevent 591 environments as early as possible. 592 593 Example: 594 >>> has_pool_option = (['-P'], ['--pool']) 595 """ 596 597 def node_format(self, s, nodename, **extra): 598 return node_format(s, nodename, **extra) 599 600 def host_format(self, s, **extra): 601 return host_format(s, **extra) 602 603 def _get_default_app(self, *args, **kwargs): 604 from celery._state import get_current_app 605 return get_current_app() # omit proxy 606 607 def pretty_list(self, n): 608 c = self.colored 609 if not n: 610 return '- empty -' 611 return '\n'.join( 612 str(c.reset(c.white('*'), ' {0}'.format(item))) for item in n 613 ) 614 615 def pretty_dict_ok_error(self, n): 616 c = self.colored 617 try: 618 return (c.green('OK'), 619 text.indent(self.pretty(n['ok'])[1], 4)) 620 except KeyError: 621 pass 622 return (c.red('ERROR'), 623 text.indent(self.pretty(n['error'])[1], 4)) 624 625 def say_remote_command_reply(self, replies): 626 c = self.colored 627 node = next(iter(replies)) # <-- take first. 628 reply = replies[node] 629 status, preply = self.pretty(reply) 630 self.say_chat('->', c.cyan(node, ': ') + status, 631 text.indent(preply, 4) if self.show_reply else '') 632 633 def pretty(self, n): 634 OK = str(self.colored.green('OK')) 635 if isinstance(n, list): 636 return OK, self.pretty_list(n) 637 if isinstance(n, dict): 638 if 'ok' in n or 'error' in n: 639 return self.pretty_dict_ok_error(n) 640 else: 641 return OK, json.dumps(n, sort_keys=True, indent=4) 642 if isinstance(n, string_t): 643 return OK, string(n) 644 return OK, pformat(n) 645 646 def say_chat(self, direction, title, body=''): 647 c = self.colored 648 if direction == '<-' and self.quiet: 649 return 650 dirstr = not self.quiet and c.bold(c.white(direction), ' ') or '' 651 self.out(c.reset(dirstr, title)) 652 if body and self.show_body: 653 self.out(body) 654 655 @property 656 def colored(self): 657 if self._colored is None: 658 self._colored = term.colored( 659 enabled=isatty(self.stdout) and not self.no_color) 660 return self._colored 661 662 @colored.setter 663 def colored(self, obj): 664 self._colored = obj 665 666 @property 667 def no_color(self): 668 return self._no_color 669 670 @no_color.setter 671 def no_color(self, value): 672 self._no_color = value 673 if self._colored is not None: 674 self._colored.enabled = not self._no_color 675 676 677def daemon_options(parser, default_pidfile=None, default_logfile=None): 678 """Add daemon options to argparse parser.""" 679 group = parser.add_argument_group('Daemonization Options') 680 group.add_argument('-f', '--logfile', default=default_logfile), 681 group.add_argument('--pidfile', default=default_pidfile), 682 group.add_argument('--uid', default=None), 683 group.add_argument('--gid', default=None), 684 group.add_argument('--umask', default=None), 685 group.add_argument('--executable', default=None), 686