1# -*- coding: utf-8 -*-
2"""Base command-line interface."""
3from __future__ import absolute_import, print_function, unicode_literals
4
5import argparse
6import json
7import os
8import random
9import re
10import sys
11import warnings
12from collections import defaultdict
13from heapq import heappush
14from pprint import pformat
15
16from celery import VERSION_BANNER, Celery, maybe_patch_concurrency, signals
17from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
18from celery.five import (PY2, getfullargspec, items, long_t,
19                         python_2_unicode_compatible, string, string_t,
20                         text_t)
21from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE, isatty
22from celery.utils import imports, term, text
23from celery.utils.functional import dictfilter
24from celery.utils.nodenames import host_format, node_format
25from celery.utils.objects import Bunch
26
27# Option is here for backwards compatibility, as third-party commands
28# may import it from here.
29try:
30    from optparse import Option  # pylint: disable=deprecated-module
31except ImportError:  # pragma: no cover
32    Option = None  # noqa
33
34try:
35    input = raw_input
36except NameError:  # pragma: no cover
37    pass
38
39__all__ = (
40    'Error', 'UsageError', 'Extensions', 'Command', 'Option', 'daemon_options',
41)
42
43# always enable DeprecationWarnings, so our users can see them.
44for warning in (CDeprecationWarning, CPendingDeprecationWarning):
45    warnings.simplefilter('once', warning, 0)
46
47# TODO: Remove this once we drop support for Python < 3.6
48if sys.version_info < (3, 6):
49    ModuleNotFoundError = ImportError
50
51ARGV_DISABLED = """
52Unrecognized command-line arguments: {0}
53
54Try --help?
55"""
56
57UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND = """
58Unable to load celery application.
59The module {0} was not found.
60"""
61
62UNABLE_TO_LOAD_APP_APP_MISSING = """
63Unable to load celery application.
64{0}
65"""
66
67find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
68find_rst_ref = re.compile(r':\w+:`(.+?)`')
69find_rst_decl = re.compile(r'^\s*\.\. .+?::.+$')
70
71
72def _optparse_callback_to_type(option, callback):
73    parser = Bunch(values=Bunch())
74
75    def _on_arg(value):
76        callback(option, None, value, parser)
77        return getattr(parser.values, option.dest)
78    return _on_arg
79
80
81def _add_optparse_argument(parser, opt, typemap=None):
82    typemap = {
83        'string': text_t,
84        'int': int,
85        'long': long_t,
86        'float': float,
87        'complex': complex,
88        'choice': None} if not typemap else typemap
89    if opt.callback:
90        opt.type = _optparse_callback_to_type(opt, opt.type)
91    # argparse checks for existence of this kwarg
92    if opt.action == 'callback':
93        opt.action = None
94    # store_true sets value to "('NO', 'DEFAULT')" for some
95    # crazy reason, so not to set a sane default here.
96    if opt.action == 'store_true' and opt.default is None:
97        opt.default = False
98    parser.add_argument(
99        *opt._long_opts + opt._short_opts,
100        **dictfilter({
101            'action': opt.action,
102            'type': typemap.get(opt.type, opt.type),
103            'dest': opt.dest,
104            'nargs': opt.nargs,
105            'choices': opt.choices,
106            'help': opt.help,
107            'metavar': opt.metavar,
108            'default': opt.default}))
109
110
111def _add_compat_options(parser, options):
112    for option in options or ():
113        if callable(option):
114            option(parser)
115        else:
116            _add_optparse_argument(parser, option)
117
118
119@python_2_unicode_compatible
120class Error(Exception):
121    """Exception raised by commands."""
122
123    status = EX_FAILURE
124
125    def __init__(self, reason, status=None):
126        self.reason = reason
127        self.status = status if status is not None else self.status
128        super(Error, self).__init__(reason, status)
129
130    def __str__(self):
131        return self.reason
132
133
134class UsageError(Error):
135    """Exception raised for malformed arguments."""
136
137    status = EX_USAGE
138
139
140class Extensions(object):
141    """Loads extensions from setuptools entrypoints."""
142
143    def __init__(self, namespace, register):
144        self.names = []
145        self.namespace = namespace
146        self.register = register
147
148    def add(self, cls, name):
149        heappush(self.names, name)
150        self.register(cls, name=name)
151
152    def load(self):
153        for name, cls in imports.load_extension_classes(self.namespace):
154            self.add(cls, name)
155        return self.names
156
157
158class Command(object):
159    """Base class for command-line applications.
160
161    Arguments:
162        app (Celery): The app to use.
163        get_app (Callable): Fucntion returning the current app
164            when no app provided.
165    """
166
167    Error = Error
168    UsageError = UsageError
169    Parser = argparse.ArgumentParser
170
171    #: Arg list used in help.
172    args = ''
173
174    #: Application version.
175    version = VERSION_BANNER
176
177    #: If false the parser will raise an exception if positional
178    #: args are provided.
179    supports_args = True
180
181    #: List of options (without preload options).
182    option_list = None
183
184    # module Rst documentation to parse help from (if any)
185    doc = None
186
187    # Some programs (multi) does not want to load the app specified
188    # (Issue #1008).
189    respects_app_option = True
190
191    #: Enable if the application should support config from the cmdline.
192    enable_config_from_cmdline = False
193
194    #: Default configuration name-space.
195    namespace = None
196
197    #: Text to print at end of --help
198    epilog = None
199
200    #: Text to print in --help before option list.
201    description = ''
202
203    #: Set to true if this command doesn't have sub-commands
204    leaf = True
205
206    # used by :meth:`say_remote_command_reply`.
207    show_body = True
208    # used by :meth:`say_chat`.
209    show_reply = True
210
211    prog_name = 'celery'
212
213    #: Name of argparse option used for parsing positional args.
214    args_name = 'args'
215
216    def __init__(self, app=None, get_app=None, no_color=False,
217                 stdout=None, stderr=None, quiet=False, on_error=None,
218                 on_usage_error=None):
219        self.app = app
220        self.get_app = get_app or self._get_default_app
221        self.stdout = stdout or sys.stdout
222        self.stderr = stderr or sys.stderr
223        self._colored = None
224        self._no_color = no_color
225        self.quiet = quiet
226        if not self.description:
227            self.description = self._strip_restructeredtext(self.__doc__)
228        if on_error:
229            self.on_error = on_error
230        if on_usage_error:
231            self.on_usage_error = on_usage_error
232
233    def run(self, *args, **options):
234        raise NotImplementedError('subclass responsibility')
235
236    def on_error(self, exc):
237        # pylint: disable=method-hidden
238        #   on_error argument to __init__ may override this method.
239        self.error(self.colored.red('Error: {0}'.format(exc)))
240
241    def on_usage_error(self, exc):
242        # pylint: disable=method-hidden
243        #   on_usage_error argument to __init__ may override this method.
244        self.handle_error(exc)
245
246    def on_concurrency_setup(self):
247        pass
248
249    def __call__(self, *args, **kwargs):
250        random.seed()  # maybe we were forked.
251        self.verify_args(args)
252        try:
253            ret = self.run(*args, **kwargs)
254            return ret if ret is not None else EX_OK
255        except self.UsageError as exc:
256            self.on_usage_error(exc)
257            return exc.status
258        except self.Error as exc:
259            self.on_error(exc)
260            return exc.status
261
262    def verify_args(self, given, _index=0):
263        S = getfullargspec(self.run)
264        _index = 1 if S.args and S.args[0] == 'self' else _index
265        required = S.args[_index:-len(S.defaults) if S.defaults else None]
266        missing = required[len(given):]
267        if missing:
268            raise self.UsageError('Missing required {0}: {1}'.format(
269                text.pluralize(len(missing), 'argument'),
270                ', '.join(missing)
271            ))
272
273    def execute_from_commandline(self, argv=None):
274        """Execute application from command-line.
275
276        Arguments:
277            argv (List[str]): The list of command-line arguments.
278                Defaults to ``sys.argv``.
279        """
280        if argv is None:
281            argv = list(sys.argv)
282        # Should we load any special concurrency environment?
283        self.maybe_patch_concurrency(argv)
284        self.on_concurrency_setup()
285
286        # Dump version and exit if '--version' arg set.
287        self.early_version(argv)
288        try:
289            argv = self.setup_app_from_commandline(argv)
290        except ModuleNotFoundError as e:
291            # In Python 2.7 and below, there is no name instance for exceptions
292            # TODO: Remove this once we drop support for Python 2.7
293            if PY2:
294                package_name = e.message.replace("No module named ", "")
295            else:
296                package_name = e.name
297            self.on_error(UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND.format(package_name))
298            return EX_FAILURE
299        except AttributeError as e:
300            msg = e.args[0].capitalize()
301            self.on_error(UNABLE_TO_LOAD_APP_APP_MISSING.format(msg))
302            return EX_FAILURE
303
304        self.prog_name = os.path.basename(argv[0])
305        return self.handle_argv(self.prog_name, argv[1:])
306
307    def run_from_argv(self, prog_name, argv=None, command=None):
308        return self.handle_argv(prog_name,
309                                sys.argv if argv is None else argv, command)
310
311    def maybe_patch_concurrency(self, argv=None):
312        argv = argv or sys.argv
313        pool_option = self.with_pool_option(argv)
314        if pool_option:
315            maybe_patch_concurrency(argv, *pool_option)
316
317    def usage(self, command):
318        return '%(prog)s {0} [options] {self.args}'.format(command, self=self)
319
320    def add_arguments(self, parser):
321        pass
322
323    def get_options(self):
324        # This is for optparse options, please use add_arguments.
325        return self.option_list
326
327    def add_preload_arguments(self, parser):
328        group = parser.add_argument_group('Global Options')
329        group.add_argument('-A', '--app', default=None)
330        group.add_argument('-b', '--broker', default=None)
331        group.add_argument('--result-backend', default=None)
332        group.add_argument('--loader', default=None)
333        group.add_argument('--config', default=None)
334        group.add_argument('--workdir', default=None)
335        group.add_argument(
336            '--no-color', '-C', action='store_true', default=None)
337        group.add_argument('--quiet', '-q', action='store_true')
338
339    def _add_version_argument(self, parser):
340        parser.add_argument(
341            '--version', action='version', version=self.version,
342        )
343
344    def prepare_arguments(self, parser):
345        pass
346
347    def expanduser(self, value):
348        if isinstance(value, string_t):
349            return os.path.expanduser(value)
350        return value
351
352    def ask(self, q, choices, default=None):
353        """Prompt user to choose from a tuple of string values.
354
355        If a default is not specified the question will be repeated
356        until the user gives a valid choice.
357
358        Matching is case insensitive.
359
360        Arguments:
361            q (str): the question to ask (don't include questionark)
362            choice (Tuple[str]): tuple of possible choices, must be lowercase.
363            default (Any): Default value if any.
364        """
365        schoices = choices
366        if default is not None:
367            schoices = [c.upper() if c == default else c.lower()
368                        for c in choices]
369        schoices = '/'.join(schoices)
370
371        p = '{0} ({1})? '.format(q.capitalize(), schoices)
372        while 1:
373            val = input(p).lower()
374            if val in choices:
375                return val
376            elif default is not None:
377                break
378        return default
379
380    def handle_argv(self, prog_name, argv, command=None):
381        """Parse arguments from argv and dispatch to :meth:`run`.
382
383        Warning:
384            Exits with an error message if :attr:`supports_args` is disabled
385            and ``argv`` contains positional arguments.
386
387        Arguments:
388            prog_name (str): The program name (``argv[0]``).
389            argv (List[str]): Rest of command-line arguments.
390        """
391        options, args = self.prepare_args(
392            *self.parse_options(prog_name, argv, command))
393        return self(*args, **options)
394
395    def prepare_args(self, options, args):
396        if options:
397            options = {
398                k: self.expanduser(v)
399                for k, v in items(options) if not k.startswith('_')
400            }
401        args = [self.expanduser(arg) for arg in args]
402        self.check_args(args)
403        return options, args
404
405    def check_args(self, args):
406        if not self.supports_args and args:
407            self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE)
408
409    def error(self, s):
410        self.out(s, fh=self.stderr)
411
412    def out(self, s, fh=None):
413        print(s, file=fh or self.stdout)
414
415    def die(self, msg, status=EX_FAILURE):
416        self.error(msg)
417        sys.exit(status)
418
419    def early_version(self, argv):
420        if '--version' in argv:
421            print(self.version, file=self.stdout)
422            sys.exit(0)
423
424    def parse_options(self, prog_name, arguments, command=None):
425        """Parse the available options."""
426        # Don't want to load configuration to just print the version,
427        # so we handle --version manually here.
428        self.parser = self.create_parser(prog_name, command)
429        options = vars(self.parser.parse_args(arguments))
430        return options, options.pop(self.args_name, None) or []
431
432    def create_parser(self, prog_name, command=None):
433        # for compatibility with optparse usage.
434        usage = self.usage(command).replace('%prog', '%(prog)s')
435        parser = self.Parser(
436            prog=prog_name,
437            usage=usage,
438            epilog=self._format_epilog(self.epilog),
439            formatter_class=argparse.RawDescriptionHelpFormatter,
440            description=self._format_description(self.description),
441        )
442        self._add_version_argument(parser)
443        self.add_preload_arguments(parser)
444        self.add_arguments(parser)
445        self.add_compat_options(parser, self.get_options())
446        self.add_compat_options(parser, self.app.user_options['preload'])
447
448        if self.supports_args:
449            # for backward compatibility with optparse, we automatically
450            # add arbitrary positional args.
451            parser.add_argument(self.args_name, nargs='*')
452        return self.prepare_parser(parser)
453
454    def _format_epilog(self, epilog):
455        if epilog:
456            return '\n{0}\n\n'.format(epilog)
457        return ''
458
459    def _format_description(self, description):
460        width = argparse.HelpFormatter('prog')._width
461        return text.ensure_newlines(
462            text.fill_paragraphs(text.dedent(description), width))
463
464    def add_compat_options(self, parser, options):
465        _add_compat_options(parser, options)
466
467    def prepare_parser(self, parser):
468        docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
469        for doc in docs:
470            for long_opt, help in items(doc):
471                option = parser._option_string_actions[long_opt]
472                if option is not None:
473                    option.help = ' '.join(help).format(default=option.default)
474        return parser
475
476    def setup_app_from_commandline(self, argv):
477        preload_options, remaining_options = self.parse_preload_options(argv)
478        quiet = preload_options.get('quiet')
479        if quiet is not None:
480            self.quiet = quiet
481        try:
482            self.no_color = preload_options['no_color']
483        except KeyError:
484            pass
485        workdir = preload_options.get('workdir')
486        if workdir:
487            os.chdir(workdir)
488        app = (preload_options.get('app') or
489               os.environ.get('CELERY_APP') or
490               self.app)
491        preload_loader = preload_options.get('loader')
492        if preload_loader:
493            # Default app takes loader from this env (Issue #1066).
494            os.environ['CELERY_LOADER'] = preload_loader
495        loader = (preload_loader,
496                  os.environ.get('CELERY_LOADER') or
497                  'default')
498        broker = preload_options.get('broker', None)
499        if broker:
500            os.environ['CELERY_BROKER_URL'] = broker
501        result_backend = preload_options.get('result_backend', None)
502        if result_backend:
503            os.environ['CELERY_RESULT_BACKEND'] = result_backend
504        config = preload_options.get('config')
505        if config:
506            os.environ['CELERY_CONFIG_MODULE'] = config
507        if self.respects_app_option:
508            if app:
509                self.app = self.find_app(app)
510            elif self.app is None:
511                self.app = self.get_app(loader=loader)
512            if self.enable_config_from_cmdline:
513                remaining_options = self.process_cmdline_config(remaining_options)
514        else:
515            self.app = Celery(fixups=[])
516
517        self._handle_user_preload_options(argv)
518
519        return remaining_options
520
521    def _handle_user_preload_options(self, argv):
522        user_preload = tuple(self.app.user_options['preload'] or ())
523        if user_preload:
524            user_options, _ = self._parse_preload_options(argv, user_preload)
525            signals.user_preload_options.send(
526                sender=self, app=self.app, options=user_options,
527            )
528
529    def find_app(self, app):
530        from celery.app.utils import find_app
531        return find_app(app, symbol_by_name=self.symbol_by_name)
532
533    def symbol_by_name(self, name, imp=imports.import_from_cwd):
534        return imports.symbol_by_name(name, imp=imp)
535    get_cls_by_name = symbol_by_name  # XXX compat
536
537    def process_cmdline_config(self, argv):
538        try:
539            cargs_start = argv.index('--')
540        except ValueError:
541            return argv
542        argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
543        self.app.config_from_cmdline(cargs, namespace=self.namespace)
544        return argv
545
546    def parse_preload_options(self, args):
547        return self._parse_preload_options(args, [self.add_preload_arguments])
548
549    def _parse_preload_options(self, args, options):
550        args = [arg for arg in args if arg not in ('-h', '--help')]
551        parser = self.Parser()
552        self.add_compat_options(parser, options)
553        namespace, unknown_args = parser.parse_known_args(args)
554        return vars(namespace), unknown_args
555
556    def add_append_opt(self, acc, opt, value):
557        default = opt.default or []
558
559        if opt.dest not in acc:
560            acc[opt.dest] = default
561
562        acc[opt.dest].append(value)
563
564    def parse_doc(self, doc):
565        options, in_option = defaultdict(list), None
566        for line in doc.splitlines():
567            if line.startswith('.. cmdoption::'):
568                m = find_long_opt.match(line)
569                if m:
570                    in_option = m.groups()[0].strip()
571                assert in_option, 'missing long opt'
572            elif in_option and line.startswith(' ' * 4):
573                if not find_rst_decl.match(line):
574                    options[in_option].append(
575                        find_rst_ref.sub(
576                            r'\1', line.strip()).replace('`', ''))
577        return options
578
579    def _strip_restructeredtext(self, s):
580        return '\n'.join(
581            find_rst_ref.sub(r'\1', line.replace('`', ''))
582            for line in (s or '').splitlines()
583            if not find_rst_decl.match(line)
584        )
585
586    def with_pool_option(self, argv):
587        """Return tuple of ``(short_opts, long_opts)``.
588
589        Returns only if the command
590        supports a pool argument, and used to monkey patch eventlet/gevent
591        environments as early as possible.
592
593        Example:
594              >>> has_pool_option = (['-P'], ['--pool'])
595        """
596
597    def node_format(self, s, nodename, **extra):
598        return node_format(s, nodename, **extra)
599
600    def host_format(self, s, **extra):
601        return host_format(s, **extra)
602
603    def _get_default_app(self, *args, **kwargs):
604        from celery._state import get_current_app
605        return get_current_app()  # omit proxy
606
607    def pretty_list(self, n):
608        c = self.colored
609        if not n:
610            return '- empty -'
611        return '\n'.join(
612            str(c.reset(c.white('*'), ' {0}'.format(item))) for item in n
613        )
614
615    def pretty_dict_ok_error(self, n):
616        c = self.colored
617        try:
618            return (c.green('OK'),
619                    text.indent(self.pretty(n['ok'])[1], 4))
620        except KeyError:
621            pass
622        return (c.red('ERROR'),
623                text.indent(self.pretty(n['error'])[1], 4))
624
625    def say_remote_command_reply(self, replies):
626        c = self.colored
627        node = next(iter(replies))  # <-- take first.
628        reply = replies[node]
629        status, preply = self.pretty(reply)
630        self.say_chat('->', c.cyan(node, ': ') + status,
631                      text.indent(preply, 4) if self.show_reply else '')
632
633    def pretty(self, n):
634        OK = str(self.colored.green('OK'))
635        if isinstance(n, list):
636            return OK, self.pretty_list(n)
637        if isinstance(n, dict):
638            if 'ok' in n or 'error' in n:
639                return self.pretty_dict_ok_error(n)
640            else:
641                return OK, json.dumps(n, sort_keys=True, indent=4)
642        if isinstance(n, string_t):
643            return OK, string(n)
644        return OK, pformat(n)
645
646    def say_chat(self, direction, title, body=''):
647        c = self.colored
648        if direction == '<-' and self.quiet:
649            return
650        dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
651        self.out(c.reset(dirstr, title))
652        if body and self.show_body:
653            self.out(body)
654
655    @property
656    def colored(self):
657        if self._colored is None:
658            self._colored = term.colored(
659                enabled=isatty(self.stdout) and not self.no_color)
660        return self._colored
661
662    @colored.setter
663    def colored(self, obj):
664        self._colored = obj
665
666    @property
667    def no_color(self):
668        return self._no_color
669
670    @no_color.setter
671    def no_color(self, value):
672        self._no_color = value
673        if self._colored is not None:
674            self._colored.enabled = not self._no_color
675
676
677def daemon_options(parser, default_pidfile=None, default_logfile=None):
678    """Add daemon options to argparse parser."""
679    group = parser.add_argument_group('Daemonization Options')
680    group.add_argument('-f', '--logfile', default=default_logfile),
681    group.add_argument('--pidfile', default=default_pidfile),
682    group.add_argument('--uid', default=None),
683    group.add_argument('--gid', default=None),
684    group.add_argument('--umask', default=None),
685    group.add_argument('--executable', default=None),
686