1# -*- coding: utf-8 -*-
2"""
3RQ command line tool
4"""
5from __future__ import (absolute_import, division, print_function,
6                        unicode_literals)
7
8from functools import update_wrapper
9import os
10import sys
11
12import click
13from redis.exceptions import ConnectionError
14
15from rq import Connection, __version__ as version
16from rq.cli.helpers import (read_config_file, refresh,
17                            setup_loghandlers_from_args,
18                            show_both, show_queues, show_workers, CliConfig)
19from rq.contrib.legacy import cleanup_ghosts
20from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
21                         DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS,
22                         DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL,
23                         DEFAULT_JOB_MONITORING_INTERVAL,
24                         DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
25from rq.exceptions import InvalidJobOperationError
26from rq.registry import FailedJobRegistry, clean_registries
27from rq.utils import import_attribute
28from rq.suspension import (suspend as connection_suspend,
29                           resume as connection_resume, is_suspended)
30from rq.worker_registration import clean_worker_registry
31
32
33# Disable the warning that Click displays (as of Click version 5.0) when users
34# use unicode_literals in Python 2.
35# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
36click.disable_unicode_literals_warning = True
37
38
39shared_options = [
40    click.option('--url', '-u',
41                 envvar='RQ_REDIS_URL',
42                 help='URL describing Redis connection details.'),
43    click.option('--config', '-c',
44                 envvar='RQ_CONFIG',
45                 help='Module containing RQ settings.'),
46    click.option('--worker-class', '-w',
47                 envvar='RQ_WORKER_CLASS',
48                 default=DEFAULT_WORKER_CLASS,
49                 help='RQ Worker class to use'),
50    click.option('--job-class', '-j',
51                 envvar='RQ_JOB_CLASS',
52                 default=DEFAULT_JOB_CLASS,
53                 help='RQ Job class to use'),
54    click.option('--queue-class',
55                 envvar='RQ_QUEUE_CLASS',
56                 default=DEFAULT_QUEUE_CLASS,
57                 help='RQ Queue class to use'),
58    click.option('--connection-class',
59                 envvar='RQ_CONNECTION_CLASS',
60                 default=DEFAULT_CONNECTION_CLASS,
61                 help='Redis client class to use'),
62    click.option('--path', '-P',
63                 default='.',
64                 help='Specify the import path.',
65                 multiple=True)
66]
67
68
69def pass_cli_config(func):
70    # add all the shared options to the command
71    for option in shared_options:
72        func = option(func)
73
74    # pass the cli config object into the command
75    def wrapper(*args, **kwargs):
76        ctx = click.get_current_context()
77        cli_config = CliConfig(**kwargs)
78        return ctx.invoke(func, cli_config, *args[1:], **kwargs)
79
80    return update_wrapper(wrapper, func)
81
82
83@click.group()
84@click.version_option(version)
85def main():
86    """RQ command line tool."""
87    pass
88
89
90@main.command()
91@click.option('--all', '-a', is_flag=True, help='Empty all queues')
92@click.argument('queues', nargs=-1)
93@pass_cli_config
94def empty(cli_config, all, queues, **options):
95    """Empty given queues."""
96
97    if all:
98        queues = cli_config.queue_class.all(connection=cli_config.connection,
99                                            job_class=cli_config.job_class)
100    else:
101        queues = [cli_config.queue_class(queue,
102                                         connection=cli_config.connection,
103                                         job_class=cli_config.job_class)
104                  for queue in queues]
105
106    if not queues:
107        click.echo('Nothing to do')
108        sys.exit(0)
109
110    for queue in queues:
111        num_jobs = queue.empty()
112        click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))
113
114
115@main.command()
116@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
117@click.option('--queue', required=True, type=str)
118@click.argument('job_ids', nargs=-1)
119@pass_cli_config
120def requeue(cli_config, queue, all, job_class, job_ids,  **options):
121    """Requeue failed jobs."""
122
123    failed_job_registry = FailedJobRegistry(queue,
124                                            connection=cli_config.connection)
125    if all:
126        job_ids = failed_job_registry.get_job_ids()
127
128    if not job_ids:
129        click.echo('Nothing to do')
130        sys.exit(0)
131
132    click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids)))
133    fail_count = 0
134    with click.progressbar(job_ids) as job_ids:
135        for job_id in job_ids:
136            try:
137                failed_job_registry.requeue(job_id)
138            except InvalidJobOperationError:
139                fail_count += 1
140
141    if fail_count > 0:
142        click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red')
143
144
145@main.command()
146@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
147@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
148@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
149@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
150@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
151@click.argument('queues', nargs=-1)
152@pass_cli_config
153def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
154         **options):
155    """RQ command-line monitor."""
156
157    if only_queues:
158        func = show_queues
159    elif only_workers:
160        func = show_workers
161    else:
162        func = show_both
163
164    try:
165        with Connection(cli_config.connection):
166
167            if queues:
168                qs = list(map(cli_config.queue_class, queues))
169            else:
170                qs = cli_config.queue_class.all()
171
172            for queue in qs:
173                clean_registries(queue)
174                clean_worker_registry(queue)
175
176            refresh(interval, func, qs, raw, by_queue,
177                    cli_config.queue_class, cli_config.worker_class)
178    except ConnectionError as e:
179        click.echo(e)
180        sys.exit(1)
181    except KeyboardInterrupt:
182        click.echo()
183        sys.exit(0)
184
185
186@main.command()
187@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
188@click.option('--logging_level', type=str, default="INFO", help='Set logging level')
189@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
190@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
191@click.option('--name', '-n', help='Specify a different name')
192@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
193@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used')
194@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used')
195@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
196@click.option('--verbose', '-v', is_flag=True, help='Show more output')
197@click.option('--quiet', '-q', is_flag=True, help='Show less output')
198@click.option('--sentry-dsn', envvar='RQ_SENTRY_DSN', help='Report exceptions to this Sentry DSN')
199@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
200@click.option('--pid', help='Write the process ID number to a file at the specified path')
201@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
202@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
203@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
204@click.argument('queues', nargs=-1)
205@pass_cli_config
206def worker(cli_config, burst, logging_level, name, results_ttl,
207           worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn,
208           exception_handler, pid, disable_default_exception_handler, max_jobs, with_scheduler,
209           queues, log_format, date_format, **options):
210    """Starts an RQ worker."""
211    settings = read_config_file(cli_config.config) if cli_config.config else {}
212    # Worker specific default arguments
213    queues = queues or settings.get('QUEUES', ['default'])
214    sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
215    name = name or settings.get('NAME')
216
217    if pid:
218        with open(os.path.expanduser(pid), "w") as fp:
219            fp.write(str(os.getpid()))
220
221    setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
222
223    try:
224
225        cleanup_ghosts(cli_config.connection)
226        exception_handlers = []
227        for h in exception_handler:
228            exception_handlers.append(import_attribute(h))
229
230        if is_suspended(cli_config.connection):
231            click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
232            sys.exit(1)
233
234        queues = [cli_config.queue_class(queue,
235                                         connection=cli_config.connection,
236                                         job_class=cli_config.job_class)
237                  for queue in queues]
238        worker = cli_config.worker_class(
239            queues, name=name, connection=cli_config.connection,
240            default_worker_ttl=worker_ttl, default_result_ttl=results_ttl,
241            job_monitoring_interval=job_monitoring_interval,
242            job_class=cli_config.job_class, queue_class=cli_config.queue_class,
243            exception_handlers=exception_handlers or None,
244            disable_default_exception_handler=disable_default_exception_handler,
245            log_job_description=not disable_job_desc_logging
246        )
247
248        # Should we configure Sentry?
249        if sentry_dsn:
250            from rq.contrib.sentry import register_sentry
251            register_sentry(sentry_dsn)
252
253        # if --verbose or --quiet, override --logging_level
254        if verbose or quiet:
255            logging_level = None
256
257        worker.work(burst=burst, logging_level=logging_level,
258                    date_format=date_format, log_format=log_format,
259                    max_jobs=max_jobs, with_scheduler=with_scheduler)
260    except ConnectionError as e:
261        print(e)
262        sys.exit(1)
263
264
265@main.command()
266@click.option('--duration', help='Seconds you want the workers to be suspended.  Default is forever.', type=int)
267@pass_cli_config
268def suspend(cli_config, duration, **options):
269    """Suspends all workers, to resume run `rq resume`"""
270
271    if duration is not None and duration < 1:
272        click.echo("Duration must be an integer greater than 1")
273        sys.exit(1)
274
275    connection_suspend(cli_config.connection, duration)
276
277    if duration:
278        msg = """Suspending workers for {0} seconds.  No new jobs will be started during that time, but then will
279        automatically resume""".format(duration)
280        click.echo(msg)
281    else:
282        click.echo("Suspending workers.  No new jobs will be started.  But current jobs will be completed")
283
284
285@main.command()
286@pass_cli_config
287def resume(cli_config, **options):
288    """Resumes processing of queues, that were suspended with `rq suspend`"""
289    connection_resume(cli_config.connection)
290    click.echo("Resuming workers.")
291