1# -*- coding: utf-8 -*- 2""" 3RQ command line tool 4""" 5from __future__ import (absolute_import, division, print_function, 6 unicode_literals) 7 8from functools import update_wrapper 9import os 10import sys 11 12import click 13from redis.exceptions import ConnectionError 14 15from rq import Connection, __version__ as version 16from rq.cli.helpers import (read_config_file, refresh, 17 setup_loghandlers_from_args, 18 show_both, show_queues, show_workers, CliConfig) 19from rq.contrib.legacy import cleanup_ghosts 20from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, 21 DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, 22 DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, 23 DEFAULT_JOB_MONITORING_INTERVAL, 24 DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) 25from rq.exceptions import InvalidJobOperationError 26from rq.registry import FailedJobRegistry, clean_registries 27from rq.utils import import_attribute 28from rq.suspension import (suspend as connection_suspend, 29 resume as connection_resume, is_suspended) 30from rq.worker_registration import clean_worker_registry 31 32 33# Disable the warning that Click displays (as of Click version 5.0) when users 34# use unicode_literals in Python 2. 35# See http://click.pocoo.org/dev/python3/#unicode-literals for more details. 36click.disable_unicode_literals_warning = True 37 38 39shared_options = [ 40 click.option('--url', '-u', 41 envvar='RQ_REDIS_URL', 42 help='URL describing Redis connection details.'), 43 click.option('--config', '-c', 44 envvar='RQ_CONFIG', 45 help='Module containing RQ settings.'), 46 click.option('--worker-class', '-w', 47 envvar='RQ_WORKER_CLASS', 48 default=DEFAULT_WORKER_CLASS, 49 help='RQ Worker class to use'), 50 click.option('--job-class', '-j', 51 envvar='RQ_JOB_CLASS', 52 default=DEFAULT_JOB_CLASS, 53 help='RQ Job class to use'), 54 click.option('--queue-class', 55 envvar='RQ_QUEUE_CLASS', 56 default=DEFAULT_QUEUE_CLASS, 57 help='RQ Queue class to use'), 58 click.option('--connection-class', 59 envvar='RQ_CONNECTION_CLASS', 60 default=DEFAULT_CONNECTION_CLASS, 61 help='Redis client class to use'), 62 click.option('--path', '-P', 63 default='.', 64 help='Specify the import path.', 65 multiple=True) 66] 67 68 69def pass_cli_config(func): 70 # add all the shared options to the command 71 for option in shared_options: 72 func = option(func) 73 74 # pass the cli config object into the command 75 def wrapper(*args, **kwargs): 76 ctx = click.get_current_context() 77 cli_config = CliConfig(**kwargs) 78 return ctx.invoke(func, cli_config, *args[1:], **kwargs) 79 80 return update_wrapper(wrapper, func) 81 82 83@click.group() 84@click.version_option(version) 85def main(): 86 """RQ command line tool.""" 87 pass 88 89 90@main.command() 91@click.option('--all', '-a', is_flag=True, help='Empty all queues') 92@click.argument('queues', nargs=-1) 93@pass_cli_config 94def empty(cli_config, all, queues, **options): 95 """Empty given queues.""" 96 97 if all: 98 queues = cli_config.queue_class.all(connection=cli_config.connection, 99 job_class=cli_config.job_class) 100 else: 101 queues = [cli_config.queue_class(queue, 102 connection=cli_config.connection, 103 job_class=cli_config.job_class) 104 for queue in queues] 105 106 if not queues: 107 click.echo('Nothing to do') 108 sys.exit(0) 109 110 for queue in queues: 111 num_jobs = queue.empty() 112 click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name)) 113 114 115@main.command() 116@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') 117@click.option('--queue', required=True, type=str) 118@click.argument('job_ids', nargs=-1) 119@pass_cli_config 120def requeue(cli_config, queue, all, job_class, job_ids, **options): 121 """Requeue failed jobs.""" 122 123 failed_job_registry = FailedJobRegistry(queue, 124 connection=cli_config.connection) 125 if all: 126 job_ids = failed_job_registry.get_job_ids() 127 128 if not job_ids: 129 click.echo('Nothing to do') 130 sys.exit(0) 131 132 click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids))) 133 fail_count = 0 134 with click.progressbar(job_ids) as job_ids: 135 for job_id in job_ids: 136 try: 137 failed_job_registry.requeue(job_id) 138 except InvalidJobOperationError: 139 fail_count += 1 140 141 if fail_count > 0: 142 click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red') 143 144 145@main.command() 146@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)') 147@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') 148@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') 149@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') 150@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') 151@click.argument('queues', nargs=-1) 152@pass_cli_config 153def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, 154 **options): 155 """RQ command-line monitor.""" 156 157 if only_queues: 158 func = show_queues 159 elif only_workers: 160 func = show_workers 161 else: 162 func = show_both 163 164 try: 165 with Connection(cli_config.connection): 166 167 if queues: 168 qs = list(map(cli_config.queue_class, queues)) 169 else: 170 qs = cli_config.queue_class.all() 171 172 for queue in qs: 173 clean_registries(queue) 174 clean_worker_registry(queue) 175 176 refresh(interval, func, qs, raw, by_queue, 177 cli_config.queue_class, cli_config.worker_class) 178 except ConnectionError as e: 179 click.echo(e) 180 sys.exit(1) 181 except KeyboardInterrupt: 182 click.echo() 183 sys.exit(0) 184 185 186@main.command() 187@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') 188@click.option('--logging_level', type=str, default="INFO", help='Set logging level') 189@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs') 190@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs') 191@click.option('--name', '-n', help='Specify a different name') 192@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') 193@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') 194@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used') 195@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') 196@click.option('--verbose', '-v', is_flag=True, help='Show more output') 197@click.option('--quiet', '-q', is_flag=True, help='Show less output') 198@click.option('--sentry-dsn', envvar='RQ_SENTRY_DSN', help='Report exceptions to this Sentry DSN') 199@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) 200@click.option('--pid', help='Write the process ID number to a file at the specified path') 201@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') 202@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') 203@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') 204@click.argument('queues', nargs=-1) 205@pass_cli_config 206def worker(cli_config, burst, logging_level, name, results_ttl, 207 worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn, 208 exception_handler, pid, disable_default_exception_handler, max_jobs, with_scheduler, 209 queues, log_format, date_format, **options): 210 """Starts an RQ worker.""" 211 settings = read_config_file(cli_config.config) if cli_config.config else {} 212 # Worker specific default arguments 213 queues = queues or settings.get('QUEUES', ['default']) 214 sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN') 215 name = name or settings.get('NAME') 216 217 if pid: 218 with open(os.path.expanduser(pid), "w") as fp: 219 fp.write(str(os.getpid())) 220 221 setup_loghandlers_from_args(verbose, quiet, date_format, log_format) 222 223 try: 224 225 cleanup_ghosts(cli_config.connection) 226 exception_handlers = [] 227 for h in exception_handler: 228 exception_handlers.append(import_attribute(h)) 229 230 if is_suspended(cli_config.connection): 231 click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') 232 sys.exit(1) 233 234 queues = [cli_config.queue_class(queue, 235 connection=cli_config.connection, 236 job_class=cli_config.job_class) 237 for queue in queues] 238 worker = cli_config.worker_class( 239 queues, name=name, connection=cli_config.connection, 240 default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, 241 job_monitoring_interval=job_monitoring_interval, 242 job_class=cli_config.job_class, queue_class=cli_config.queue_class, 243 exception_handlers=exception_handlers or None, 244 disable_default_exception_handler=disable_default_exception_handler, 245 log_job_description=not disable_job_desc_logging 246 ) 247 248 # Should we configure Sentry? 249 if sentry_dsn: 250 from rq.contrib.sentry import register_sentry 251 register_sentry(sentry_dsn) 252 253 # if --verbose or --quiet, override --logging_level 254 if verbose or quiet: 255 logging_level = None 256 257 worker.work(burst=burst, logging_level=logging_level, 258 date_format=date_format, log_format=log_format, 259 max_jobs=max_jobs, with_scheduler=with_scheduler) 260 except ConnectionError as e: 261 print(e) 262 sys.exit(1) 263 264 265@main.command() 266@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int) 267@pass_cli_config 268def suspend(cli_config, duration, **options): 269 """Suspends all workers, to resume run `rq resume`""" 270 271 if duration is not None and duration < 1: 272 click.echo("Duration must be an integer greater than 1") 273 sys.exit(1) 274 275 connection_suspend(cli_config.connection, duration) 276 277 if duration: 278 msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will 279 automatically resume""".format(duration) 280 click.echo(msg) 281 else: 282 click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") 283 284 285@main.command() 286@pass_cli_config 287def resume(cli_config, **options): 288 """Resumes processing of queues, that were suspended with `rq suspend`""" 289 connection_resume(cli_config.connection) 290 click.echo("Resuming workers.") 291