1# -*- coding: utf-8 -*- 2from __future__ import (absolute_import, division, print_function, 3 unicode_literals) 4 5import uuid 6import warnings 7 8from datetime import datetime 9 10from redis import WatchError 11 12from .compat import as_text, string_types, total_ordering, utc 13from .connections import resolve_connection 14from .defaults import DEFAULT_RESULT_TTL 15from .exceptions import DequeueTimeout, NoSuchJobError, UnpickleError 16from .job import Job, JobStatus 17from .utils import backend_class, import_attribute, parse_timeout, utcnow 18 19 20def compact(lst): 21 return [item for item in lst if item is not None] 22 23 24@total_ordering 25class Queue(object): 26 job_class = Job 27 DEFAULT_TIMEOUT = 180 # Default timeout seconds. 28 redis_queue_namespace_prefix = 'rq:queue:' 29 redis_queues_keys = 'rq:queues' 30 31 @classmethod 32 def all(cls, connection=None, job_class=None): 33 """Returns an iterable of all Queues. 34 """ 35 connection = resolve_connection(connection) 36 37 def to_queue(queue_key): 38 return cls.from_queue_key(as_text(queue_key), 39 connection=connection, 40 job_class=job_class) 41 return [to_queue(rq_key) 42 for rq_key in connection.smembers(cls.redis_queues_keys) 43 if rq_key] 44 45 @classmethod 46 def from_queue_key(cls, queue_key, connection=None, job_class=None): 47 """Returns a Queue instance, based on the naming conventions for naming 48 the internal Redis keys. Can be used to reverse-lookup Queues by their 49 Redis keys. 50 """ 51 prefix = cls.redis_queue_namespace_prefix 52 if not queue_key.startswith(prefix): 53 raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) 54 name = queue_key[len(prefix):] 55 return cls(name, connection=connection, job_class=job_class) 56 57 def __init__(self, name='default', default_timeout=None, connection=None, 58 is_async=True, job_class=None, **kwargs): 59 self.connection = resolve_connection(connection) 60 prefix = self.redis_queue_namespace_prefix 61 self.name = name 62 self._key = '{0}{1}'.format(prefix, name) 63 self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT 64 self._is_async = is_async 65 66 if 'async' in kwargs: 67 self._is_async = kwargs['async'] 68 warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning) 69 70 # override class attribute job_class if one was passed 71 if job_class is not None: 72 if isinstance(job_class, string_types): 73 job_class = import_attribute(job_class) 74 self.job_class = job_class 75 76 def __len__(self): 77 return self.count 78 79 def __nonzero__(self): 80 return True 81 82 def __bool__(self): 83 return True 84 85 def __iter__(self): 86 yield self 87 88 @property 89 def key(self): 90 """Returns the Redis key for this Queue.""" 91 return self._key 92 93 @property 94 def registry_cleaning_key(self): 95 """Redis key used to indicate this queue has been cleaned.""" 96 return 'rq:clean_registries:%s' % self.name 97 98 def acquire_cleaning_lock(self): 99 """Returns a boolean indicating whether a lock to clean this queue 100 is acquired. A lock expires in 899 seconds (15 minutes - 1 second) 101 """ 102 return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899) 103 104 def empty(self): 105 """Removes all messages on the queue.""" 106 script = """ 107 local prefix = "{0}" 108 local q = KEYS[1] 109 local count = 0 110 while true do 111 local job_id = redis.call("lpop", q) 112 if job_id == false then 113 break 114 end 115 116 -- Delete the relevant keys 117 redis.call("del", prefix..job_id) 118 redis.call("del", prefix..job_id..":dependents") 119 count = count + 1 120 end 121 return count 122 """.format(self.job_class.redis_job_namespace_prefix).encode("utf-8") 123 script = self.connection.register_script(script) 124 return script(keys=[self.key]) 125 126 def delete(self, delete_jobs=True): 127 """Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first.""" 128 if delete_jobs: 129 self.empty() 130 131 with self.connection.pipeline() as pipeline: 132 pipeline.srem(self.redis_queues_keys, self._key) 133 pipeline.delete(self._key) 134 pipeline.execute() 135 136 def is_empty(self): 137 """Returns whether the current queue is empty.""" 138 return self.count == 0 139 140 @property 141 def is_async(self): 142 """Returns whether the current queue is async.""" 143 return bool(self._is_async) 144 145 def fetch_job(self, job_id): 146 try: 147 job = self.job_class.fetch(job_id, connection=self.connection) 148 except NoSuchJobError: 149 self.remove(job_id) 150 else: 151 if job.origin == self.name: 152 return job 153 154 def get_job_ids(self, offset=0, length=-1): 155 """Returns a slice of job IDs in the queue.""" 156 start = offset 157 if length >= 0: 158 end = offset + (length - 1) 159 else: 160 end = length 161 return [as_text(job_id) for job_id in 162 self.connection.lrange(self.key, start, end)] 163 164 def get_jobs(self, offset=0, length=-1): 165 """Returns a slice of jobs in the queue.""" 166 job_ids = self.get_job_ids(offset, length) 167 return compact([self.fetch_job(job_id) for job_id in job_ids]) 168 169 @property 170 def job_ids(self): 171 """Returns a list of all job IDS in the queue.""" 172 return self.get_job_ids() 173 174 @property 175 def jobs(self): 176 """Returns a list of all (valid) jobs in the queue.""" 177 return self.get_jobs() 178 179 @property 180 def count(self): 181 """Returns a count of all messages in the queue.""" 182 return self.connection.llen(self.key) 183 184 @property 185 def failed_job_registry(self): 186 """Returns this queue's FailedJobRegistry.""" 187 from rq.registry import FailedJobRegistry 188 return FailedJobRegistry(queue=self, job_class=self.job_class) 189 190 @property 191 def started_job_registry(self): 192 """Returns this queue's FailedJobRegistry.""" 193 from rq.registry import StartedJobRegistry 194 return StartedJobRegistry(queue=self, job_class=self.job_class) 195 196 @property 197 def finished_job_registry(self): 198 """Returns this queue's FailedJobRegistry.""" 199 from rq.registry import FinishedJobRegistry 200 return FinishedJobRegistry(queue=self) 201 202 @property 203 def deferred_job_registry(self): 204 """Returns this queue's FailedJobRegistry.""" 205 from rq.registry import DeferredJobRegistry 206 return DeferredJobRegistry(queue=self, job_class=self.job_class) 207 208 @property 209 def scheduled_job_registry(self): 210 """Returns this queue's FailedJobRegistry.""" 211 from rq.registry import ScheduledJobRegistry 212 return ScheduledJobRegistry(queue=self, job_class=self.job_class) 213 214 def remove(self, job_or_id, pipeline=None): 215 """Removes Job from queue, accepts either a Job instance or ID.""" 216 job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id 217 218 if pipeline is not None: 219 pipeline.lrem(self.key, 1, job_id) 220 return 221 222 return self.connection.lrem(self.key, 1, job_id) 223 224 def compact(self): 225 """Removes all "dead" jobs from the queue by cycling through it, while 226 guaranteeing FIFO semantics. 227 """ 228 COMPACT_QUEUE = '{0}_compact:{1}'.format( 229 self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa 230 231 self.connection.rename(self.key, COMPACT_QUEUE) 232 while True: 233 job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) 234 if job_id is None: 235 break 236 if self.job_class.exists(job_id, self.connection): 237 self.connection.rpush(self.key, job_id) 238 239 def push_job_id(self, job_id, pipeline=None, at_front=False): 240 """Pushes a job ID on the corresponding Redis queue. 241 'at_front' allows you to push the job onto the front instead of the back of the queue""" 242 connection = pipeline if pipeline is not None else self.connection 243 if at_front: 244 connection.lpush(self.key, job_id) 245 else: 246 connection.rpush(self.key, job_id) 247 248 def create_job(self, func, args=None, kwargs=None, timeout=None, 249 result_ttl=None, ttl=None, failure_ttl=None, 250 description=None, depends_on=None, job_id=None, 251 meta=None, status=JobStatus.QUEUED): 252 """Creates a job based on parameters given.""" 253 timeout = parse_timeout(timeout) 254 255 if timeout is None: 256 timeout = self._default_timeout 257 elif timeout == 0: 258 raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout') 259 260 result_ttl = parse_timeout(result_ttl) 261 failure_ttl = parse_timeout(failure_ttl) 262 263 ttl = parse_timeout(ttl) 264 if ttl is not None and ttl <= 0: 265 raise ValueError('Job ttl must be greater than 0') 266 267 job = self.job_class.create( 268 func, args=args, kwargs=kwargs, connection=self.connection, 269 result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, 270 status=status, description=description, 271 depends_on=depends_on, timeout=timeout, id=job_id, 272 origin=self.name, meta=meta 273 ) 274 275 return job 276 277 def enqueue_call(self, func, args=None, kwargs=None, timeout=None, 278 result_ttl=None, ttl=None, failure_ttl=None, 279 description=None, depends_on=None, job_id=None, 280 at_front=False, meta=None): 281 """Creates a job to represent the delayed function call and enqueues 282 it. 283 284 It is much like `.enqueue()`, except that it takes the function's args 285 and kwargs as explicit arguments. Any kwargs passed to this function 286 contain options for RQ itself. 287 """ 288 289 job = self.create_job( 290 func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, 291 failure_ttl=failure_ttl, description=description, depends_on=depends_on, 292 job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, 293 ) 294 295 # If a _dependent_ job depends on any unfinished job, register all the 296 # _dependent_ job's dependencies instead of enqueueing it. 297 # 298 # `Job#fetch_dependencies` sets WATCH on all dependencies. If 299 # WatchError is raised in the when the pipeline is executed, that means 300 # something else has modified either the set of dependencies or the 301 # status of one of them. In this case, we simply retry. 302 if depends_on is not None: 303 with self.connection.pipeline() as pipe: 304 while True: 305 try: 306 307 pipe.watch(job.dependencies_key) 308 309 dependencies = job.fetch_dependencies( 310 watch=True, 311 pipeline=pipe 312 ) 313 314 pipe.multi() 315 316 for dependency in dependencies: 317 if dependency.get_status(refresh=False) != JobStatus.FINISHED: 318 job.set_status(JobStatus.DEFERRED, pipeline=pipe) 319 job.register_dependency(pipeline=pipe) 320 job.save(pipeline=pipe) 321 job.cleanup(ttl=job.ttl, pipeline=pipe) 322 pipe.execute() 323 return job 324 325 break 326 except WatchError: 327 continue 328 329 job = self.enqueue_job(job, at_front=at_front) 330 return job 331 332 def run_job(self, job): 333 job.perform() 334 job.set_status(JobStatus.FINISHED) 335 job.save(include_meta=False) 336 job.cleanup(DEFAULT_RESULT_TTL) 337 return job 338 339 @classmethod 340 def parse_args(cls, f, *args, **kwargs): 341 """ 342 Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()` 343 344 The function argument `f` may be any of the following: 345 346 * A reference to a function 347 * A reference to an object's instance method 348 * A string, representing the location of a function (must be 349 meaningful to the import context of the workers) 350 """ 351 if not isinstance(f, string_types) and f.__module__ == '__main__': 352 raise ValueError('Functions from the __main__ module cannot be processed ' 353 'by workers') 354 355 # Detect explicit invocations, i.e. of the form: 356 # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) 357 timeout = kwargs.pop('job_timeout', None) 358 description = kwargs.pop('description', None) 359 result_ttl = kwargs.pop('result_ttl', None) 360 ttl = kwargs.pop('ttl', None) 361 failure_ttl = kwargs.pop('failure_ttl', None) 362 depends_on = kwargs.pop('depends_on', None) 363 job_id = kwargs.pop('job_id', None) 364 at_front = kwargs.pop('at_front', False) 365 meta = kwargs.pop('meta', None) 366 367 if 'args' in kwargs or 'kwargs' in kwargs: 368 assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa 369 args = kwargs.pop('args', None) 370 kwargs = kwargs.pop('kwargs', None) 371 372 return (f, timeout, description, result_ttl, ttl, failure_ttl, 373 depends_on, job_id, at_front, meta, args, kwargs) 374 375 def enqueue(self, f, *args, **kwargs): 376 """Creates a job to represent the delayed function call and enqueues it.""" 377 378 (f, timeout, description, result_ttl, ttl, failure_ttl, 379 depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) 380 381 return self.enqueue_call( 382 func=f, args=args, kwargs=kwargs, timeout=timeout, 383 result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, 384 description=description, depends_on=depends_on, job_id=job_id, 385 at_front=at_front, meta=meta 386 ) 387 388 def enqueue_at(self, datetime, f, *args, **kwargs): 389 """Schedules a job to be enqueued at specified time""" 390 from .registry import ScheduledJobRegistry 391 392 (f, timeout, description, result_ttl, ttl, failure_ttl, 393 depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) 394 job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, 395 timeout=timeout, result_ttl=result_ttl, ttl=ttl, 396 failure_ttl=failure_ttl, description=description, 397 depends_on=depends_on, job_id=job_id, meta=meta) 398 399 registry = ScheduledJobRegistry(queue=self) 400 with self.connection.pipeline() as pipeline: 401 job.save(pipeline=pipeline) 402 registry.schedule(job, datetime, pipeline=pipeline) 403 pipeline.execute() 404 405 return job 406 407 def enqueue_in(self, time_delta, func, *args, **kwargs): 408 """Schedules a job to be executed in a given `timedelta` object""" 409 return self.enqueue_at(datetime.now(utc) + time_delta, 410 func, *args, **kwargs) 411 412 def enqueue_job(self, job, pipeline=None, at_front=False): 413 """Enqueues a job for delayed execution. 414 415 If Queue is instantiated with is_async=False, job is executed immediately. 416 """ 417 pipe = pipeline if pipeline is not None else self.connection.pipeline() 418 419 # Add Queue key set 420 pipe.sadd(self.redis_queues_keys, self.key) 421 job.set_status(JobStatus.QUEUED, pipeline=pipe) 422 423 job.origin = self.name 424 job.enqueued_at = utcnow() 425 426 if job.timeout is None: 427 job.timeout = self._default_timeout 428 job.save(pipeline=pipe) 429 job.cleanup(ttl=job.ttl, pipeline=pipe) 430 431 if self._is_async: 432 self.push_job_id(job.id, pipeline=pipe, at_front=at_front) 433 434 if pipeline is None: 435 pipe.execute() 436 437 if not self._is_async: 438 job = self.run_job(job) 439 440 return job 441 442 def enqueue_dependents(self, job, pipeline=None): 443 """Enqueues all jobs in the given job's dependents set and clears it. 444 445 When called without a pipeline, this method uses WATCH/MULTI/EXEC. 446 If you pass a pipeline, only MULTI is called. The rest is up to the 447 caller. 448 """ 449 from .registry import DeferredJobRegistry 450 451 pipe = pipeline if pipeline is not None else self.connection.pipeline() 452 dependents_key = job.dependents_key 453 454 while True: 455 try: 456 # if a pipeline is passed, the caller is responsible for calling WATCH 457 # to ensure all jobs are enqueued 458 if pipeline is None: 459 pipe.watch(dependents_key) 460 461 dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) 462 for job_id in pipe.smembers(dependents_key)] 463 464 pipe.multi() 465 466 for dependent in dependent_jobs: 467 registry = DeferredJobRegistry(dependent.origin, 468 self.connection, 469 job_class=self.job_class) 470 registry.remove(dependent, pipeline=pipe) 471 if dependent.origin == self.name: 472 self.enqueue_job(dependent, pipeline=pipe) 473 else: 474 queue = self.__class__(name=dependent.origin, connection=self.connection) 475 queue.enqueue_job(dependent, pipeline=pipe) 476 477 pipe.delete(dependents_key) 478 479 if pipeline is None: 480 pipe.execute() 481 482 break 483 except WatchError: 484 if pipeline is None: 485 continue 486 else: 487 # if the pipeline comes from the caller, we re-raise the 488 # exception as it it the responsibility of the caller to 489 # handle it 490 raise 491 492 def pop_job_id(self): 493 """Pops a given job ID from this Redis queue.""" 494 return as_text(self.connection.lpop(self.key)) 495 496 @classmethod 497 def lpop(cls, queue_keys, timeout, connection=None): 498 """Helper method. Intermediate method to abstract away from some 499 Redis API details, where LPOP accepts only a single key, whereas BLPOP 500 accepts multiple. So if we want the non-blocking LPOP, we need to 501 iterate over all queues, do individual LPOPs, and return the result. 502 503 Until Redis receives a specific method for this, we'll have to wrap it 504 this way. 505 506 The timeout parameter is interpreted as follows: 507 None - non-blocking (return immediately) 508 > 0 - maximum number of seconds to block 509 """ 510 connection = resolve_connection(connection) 511 if timeout is not None: # blocking variant 512 if timeout == 0: 513 raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') 514 result = connection.blpop(queue_keys, timeout) 515 if result is None: 516 raise DequeueTimeout(timeout, queue_keys) 517 queue_key, job_id = result 518 return queue_key, job_id 519 else: # non-blocking variant 520 for queue_key in queue_keys: 521 blob = connection.lpop(queue_key) 522 if blob is not None: 523 return queue_key, blob 524 return None 525 526 @classmethod 527 def dequeue_any(cls, queues, timeout, connection=None, job_class=None): 528 """Class method returning the job_class instance at the front of the given 529 set of Queues, where the order of the queues is important. 530 531 When all of the Queues are empty, depending on the `timeout` argument, 532 either blocks execution of this function for the duration of the 533 timeout or until new messages arrive on any of the queues, or returns 534 None. 535 536 See the documentation of cls.lpop for the interpretation of timeout. 537 """ 538 job_class = backend_class(cls, 'job_class', override=job_class) 539 540 while True: 541 queue_keys = [q.key for q in queues] 542 result = cls.lpop(queue_keys, timeout, connection=connection) 543 if result is None: 544 return None 545 queue_key, job_id = map(as_text, result) 546 queue = cls.from_queue_key(queue_key, 547 connection=connection, 548 job_class=job_class) 549 try: 550 job = job_class.fetch(job_id, connection=connection) 551 except NoSuchJobError: 552 # Silently pass on jobs that don't exist (anymore), 553 # and continue in the look 554 continue 555 except UnpickleError as e: 556 # Attach queue information on the exception for improved error 557 # reporting 558 e.job_id = job_id 559 e.queue = queue 560 raise e 561 return job, queue 562 return None, None 563 564 # Total ordering defition (the rest of the required Python methods are 565 # auto-generated by the @total_ordering decorator) 566 def __eq__(self, other): # noqa 567 if not isinstance(other, Queue): 568 raise TypeError('Cannot compare queues to other objects') 569 return self.name == other.name 570 571 def __lt__(self, other): 572 if not isinstance(other, Queue): 573 raise TypeError('Cannot compare queues to other objects') 574 return self.name < other.name 575 576 def __hash__(self): # pragma: no cover 577 return hash(self.name) 578 579 def __repr__(self): # noqa # pragma: no cover 580 return '{0}({1!r})'.format(self.__class__.__name__, self.name) 581 582 def __str__(self): 583 return '<{0} {1}>'.format(self.__class__.__name__, self.name) 584