1# -*- coding: utf-8 -*-
2from __future__ import (absolute_import, division, print_function,
3                        unicode_literals)
4
5import uuid
6import warnings
7
8from datetime import datetime
9
10from redis import WatchError
11
12from .compat import as_text, string_types, total_ordering, utc
13from .connections import resolve_connection
14from .defaults import DEFAULT_RESULT_TTL
15from .exceptions import DequeueTimeout, NoSuchJobError, UnpickleError
16from .job import Job, JobStatus
17from .utils import backend_class, import_attribute, parse_timeout, utcnow
18
19
20def compact(lst):
21    return [item for item in lst if item is not None]
22
23
24@total_ordering
25class Queue(object):
26    job_class = Job
27    DEFAULT_TIMEOUT = 180  # Default timeout seconds.
28    redis_queue_namespace_prefix = 'rq:queue:'
29    redis_queues_keys = 'rq:queues'
30
31    @classmethod
32    def all(cls, connection=None, job_class=None):
33        """Returns an iterable of all Queues.
34        """
35        connection = resolve_connection(connection)
36
37        def to_queue(queue_key):
38            return cls.from_queue_key(as_text(queue_key),
39                                      connection=connection,
40                                      job_class=job_class)
41        return [to_queue(rq_key)
42                for rq_key in connection.smembers(cls.redis_queues_keys)
43                if rq_key]
44
45    @classmethod
46    def from_queue_key(cls, queue_key, connection=None, job_class=None):
47        """Returns a Queue instance, based on the naming conventions for naming
48        the internal Redis keys.  Can be used to reverse-lookup Queues by their
49        Redis keys.
50        """
51        prefix = cls.redis_queue_namespace_prefix
52        if not queue_key.startswith(prefix):
53            raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
54        name = queue_key[len(prefix):]
55        return cls(name, connection=connection, job_class=job_class)
56
57    def __init__(self, name='default', default_timeout=None, connection=None,
58                 is_async=True, job_class=None, **kwargs):
59        self.connection = resolve_connection(connection)
60        prefix = self.redis_queue_namespace_prefix
61        self.name = name
62        self._key = '{0}{1}'.format(prefix, name)
63        self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT
64        self._is_async = is_async
65
66        if 'async' in kwargs:
67            self._is_async = kwargs['async']
68            warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
69
70        # override class attribute job_class if one was passed
71        if job_class is not None:
72            if isinstance(job_class, string_types):
73                job_class = import_attribute(job_class)
74            self.job_class = job_class
75
76    def __len__(self):
77        return self.count
78
79    def __nonzero__(self):
80        return True
81
82    def __bool__(self):
83        return True
84
85    def __iter__(self):
86        yield self
87
88    @property
89    def key(self):
90        """Returns the Redis key for this Queue."""
91        return self._key
92
93    @property
94    def registry_cleaning_key(self):
95        """Redis key used to indicate this queue has been cleaned."""
96        return 'rq:clean_registries:%s' % self.name
97
98    def acquire_cleaning_lock(self):
99        """Returns a boolean indicating whether a lock to clean this queue
100        is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
101        """
102        return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
103
104    def empty(self):
105        """Removes all messages on the queue."""
106        script = """
107            local prefix = "{0}"
108            local q = KEYS[1]
109            local count = 0
110            while true do
111                local job_id = redis.call("lpop", q)
112                if job_id == false then
113                    break
114                end
115
116                -- Delete the relevant keys
117                redis.call("del", prefix..job_id)
118                redis.call("del", prefix..job_id..":dependents")
119                count = count + 1
120            end
121            return count
122        """.format(self.job_class.redis_job_namespace_prefix).encode("utf-8")
123        script = self.connection.register_script(script)
124        return script(keys=[self.key])
125
126    def delete(self, delete_jobs=True):
127        """Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first."""
128        if delete_jobs:
129            self.empty()
130
131        with self.connection.pipeline() as pipeline:
132            pipeline.srem(self.redis_queues_keys, self._key)
133            pipeline.delete(self._key)
134            pipeline.execute()
135
136    def is_empty(self):
137        """Returns whether the current queue is empty."""
138        return self.count == 0
139
140    @property
141    def is_async(self):
142        """Returns whether the current queue is async."""
143        return bool(self._is_async)
144
145    def fetch_job(self, job_id):
146        try:
147            job = self.job_class.fetch(job_id, connection=self.connection)
148        except NoSuchJobError:
149            self.remove(job_id)
150        else:
151            if job.origin == self.name:
152                return job
153
154    def get_job_ids(self, offset=0, length=-1):
155        """Returns a slice of job IDs in the queue."""
156        start = offset
157        if length >= 0:
158            end = offset + (length - 1)
159        else:
160            end = length
161        return [as_text(job_id) for job_id in
162                self.connection.lrange(self.key, start, end)]
163
164    def get_jobs(self, offset=0, length=-1):
165        """Returns a slice of jobs in the queue."""
166        job_ids = self.get_job_ids(offset, length)
167        return compact([self.fetch_job(job_id) for job_id in job_ids])
168
169    @property
170    def job_ids(self):
171        """Returns a list of all job IDS in the queue."""
172        return self.get_job_ids()
173
174    @property
175    def jobs(self):
176        """Returns a list of all (valid) jobs in the queue."""
177        return self.get_jobs()
178
179    @property
180    def count(self):
181        """Returns a count of all messages in the queue."""
182        return self.connection.llen(self.key)
183
184    @property
185    def failed_job_registry(self):
186        """Returns this queue's FailedJobRegistry."""
187        from rq.registry import FailedJobRegistry
188        return FailedJobRegistry(queue=self, job_class=self.job_class)
189
190    @property
191    def started_job_registry(self):
192        """Returns this queue's FailedJobRegistry."""
193        from rq.registry import StartedJobRegistry
194        return StartedJobRegistry(queue=self, job_class=self.job_class)
195
196    @property
197    def finished_job_registry(self):
198        """Returns this queue's FailedJobRegistry."""
199        from rq.registry import FinishedJobRegistry
200        return FinishedJobRegistry(queue=self)
201
202    @property
203    def deferred_job_registry(self):
204        """Returns this queue's FailedJobRegistry."""
205        from rq.registry import DeferredJobRegistry
206        return DeferredJobRegistry(queue=self, job_class=self.job_class)
207
208    @property
209    def scheduled_job_registry(self):
210        """Returns this queue's FailedJobRegistry."""
211        from rq.registry import ScheduledJobRegistry
212        return ScheduledJobRegistry(queue=self, job_class=self.job_class)
213
214    def remove(self, job_or_id, pipeline=None):
215        """Removes Job from queue, accepts either a Job instance or ID."""
216        job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
217
218        if pipeline is not None:
219            pipeline.lrem(self.key, 1, job_id)
220            return
221
222        return self.connection.lrem(self.key, 1, job_id)
223
224    def compact(self):
225        """Removes all "dead" jobs from the queue by cycling through it, while
226        guaranteeing FIFO semantics.
227        """
228        COMPACT_QUEUE = '{0}_compact:{1}'.format(
229            self.redis_queue_namespace_prefix, uuid.uuid4())  # noqa
230
231        self.connection.rename(self.key, COMPACT_QUEUE)
232        while True:
233            job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
234            if job_id is None:
235                break
236            if self.job_class.exists(job_id, self.connection):
237                self.connection.rpush(self.key, job_id)
238
239    def push_job_id(self, job_id, pipeline=None, at_front=False):
240        """Pushes a job ID on the corresponding Redis queue.
241        'at_front' allows you to push the job onto the front instead of the back of the queue"""
242        connection = pipeline if pipeline is not None else self.connection
243        if at_front:
244            connection.lpush(self.key, job_id)
245        else:
246            connection.rpush(self.key, job_id)
247
248    def create_job(self, func, args=None, kwargs=None, timeout=None,
249                   result_ttl=None, ttl=None, failure_ttl=None,
250                   description=None, depends_on=None, job_id=None,
251                   meta=None, status=JobStatus.QUEUED):
252        """Creates a job based on parameters given."""
253        timeout = parse_timeout(timeout)
254
255        if timeout is None:
256            timeout = self._default_timeout
257        elif timeout == 0:
258            raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
259
260        result_ttl = parse_timeout(result_ttl)
261        failure_ttl = parse_timeout(failure_ttl)
262
263        ttl = parse_timeout(ttl)
264        if ttl is not None and ttl <= 0:
265            raise ValueError('Job ttl must be greater than 0')
266
267        job = self.job_class.create(
268            func, args=args, kwargs=kwargs, connection=self.connection,
269            result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
270            status=status, description=description,
271            depends_on=depends_on, timeout=timeout, id=job_id,
272            origin=self.name, meta=meta
273        )
274
275        return job
276
277    def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
278                     result_ttl=None, ttl=None, failure_ttl=None,
279                     description=None, depends_on=None, job_id=None,
280                     at_front=False, meta=None):
281        """Creates a job to represent the delayed function call and enqueues
282        it.
283
284        It is much like `.enqueue()`, except that it takes the function's args
285        and kwargs as explicit arguments.  Any kwargs passed to this function
286        contain options for RQ itself.
287        """
288
289        job = self.create_job(
290            func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
291            failure_ttl=failure_ttl, description=description, depends_on=depends_on,
292            job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
293        )
294
295        # If a _dependent_ job depends on any unfinished job, register all the
296        # _dependent_ job's dependencies instead of enqueueing it.
297        #
298        # `Job#fetch_dependencies` sets WATCH on all dependencies. If
299        # WatchError is raised in the when the pipeline is executed, that means
300        # something else has modified either the set of dependencies or the
301        # status of one of them. In this case, we simply retry.
302        if depends_on is not None:
303            with self.connection.pipeline() as pipe:
304                while True:
305                    try:
306
307                        pipe.watch(job.dependencies_key)
308
309                        dependencies = job.fetch_dependencies(
310                            watch=True,
311                            pipeline=pipe
312                        )
313
314                        pipe.multi()
315
316                        for dependency in dependencies:
317                            if dependency.get_status(refresh=False) != JobStatus.FINISHED:
318                                job.set_status(JobStatus.DEFERRED, pipeline=pipe)
319                                job.register_dependency(pipeline=pipe)
320                                job.save(pipeline=pipe)
321                                job.cleanup(ttl=job.ttl, pipeline=pipe)
322                                pipe.execute()
323                                return job
324
325                        break
326                    except WatchError:
327                        continue
328
329        job = self.enqueue_job(job, at_front=at_front)
330        return job
331
332    def run_job(self, job):
333        job.perform()
334        job.set_status(JobStatus.FINISHED)
335        job.save(include_meta=False)
336        job.cleanup(DEFAULT_RESULT_TTL)
337        return job
338
339    @classmethod
340    def parse_args(cls, f, *args, **kwargs):
341        """
342        Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()`
343
344        The function argument `f` may be any of the following:
345
346        * A reference to a function
347        * A reference to an object's instance method
348        * A string, representing the location of a function (must be
349          meaningful to the import context of the workers)
350        """
351        if not isinstance(f, string_types) and f.__module__ == '__main__':
352            raise ValueError('Functions from the __main__ module cannot be processed '
353                             'by workers')
354
355        # Detect explicit invocations, i.e. of the form:
356        #     q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30)
357        timeout = kwargs.pop('job_timeout', None)
358        description = kwargs.pop('description', None)
359        result_ttl = kwargs.pop('result_ttl', None)
360        ttl = kwargs.pop('ttl', None)
361        failure_ttl = kwargs.pop('failure_ttl', None)
362        depends_on = kwargs.pop('depends_on', None)
363        job_id = kwargs.pop('job_id', None)
364        at_front = kwargs.pop('at_front', False)
365        meta = kwargs.pop('meta', None)
366
367        if 'args' in kwargs or 'kwargs' in kwargs:
368            assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs'  # noqa
369            args = kwargs.pop('args', None)
370            kwargs = kwargs.pop('kwargs', None)
371
372        return (f, timeout, description, result_ttl, ttl, failure_ttl,
373                depends_on, job_id, at_front, meta, args, kwargs)
374
375    def enqueue(self, f, *args, **kwargs):
376        """Creates a job to represent the delayed function call and enqueues it."""
377
378        (f, timeout, description, result_ttl, ttl, failure_ttl,
379         depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
380
381        return self.enqueue_call(
382            func=f, args=args, kwargs=kwargs, timeout=timeout,
383            result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
384            description=description, depends_on=depends_on, job_id=job_id,
385            at_front=at_front, meta=meta
386        )
387
388    def enqueue_at(self, datetime, f, *args, **kwargs):
389        """Schedules a job to be enqueued at specified time"""
390        from .registry import ScheduledJobRegistry
391
392        (f, timeout, description, result_ttl, ttl, failure_ttl,
393         depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
394        job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
395                              timeout=timeout, result_ttl=result_ttl, ttl=ttl,
396                              failure_ttl=failure_ttl, description=description,
397                              depends_on=depends_on, job_id=job_id, meta=meta)
398
399        registry = ScheduledJobRegistry(queue=self)
400        with self.connection.pipeline() as pipeline:
401            job.save(pipeline=pipeline)
402            registry.schedule(job, datetime, pipeline=pipeline)
403            pipeline.execute()
404
405        return job
406
407    def enqueue_in(self, time_delta, func, *args, **kwargs):
408        """Schedules a job to be executed in a given `timedelta` object"""
409        return self.enqueue_at(datetime.now(utc) + time_delta,
410                               func, *args, **kwargs)
411
412    def enqueue_job(self, job, pipeline=None, at_front=False):
413        """Enqueues a job for delayed execution.
414
415        If Queue is instantiated with is_async=False, job is executed immediately.
416        """
417        pipe = pipeline if pipeline is not None else self.connection.pipeline()
418
419        # Add Queue key set
420        pipe.sadd(self.redis_queues_keys, self.key)
421        job.set_status(JobStatus.QUEUED, pipeline=pipe)
422
423        job.origin = self.name
424        job.enqueued_at = utcnow()
425
426        if job.timeout is None:
427            job.timeout = self._default_timeout
428        job.save(pipeline=pipe)
429        job.cleanup(ttl=job.ttl, pipeline=pipe)
430
431        if self._is_async:
432            self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
433
434        if pipeline is None:
435            pipe.execute()
436
437        if not self._is_async:
438            job = self.run_job(job)
439
440        return job
441
442    def enqueue_dependents(self, job, pipeline=None):
443        """Enqueues all jobs in the given job's dependents set and clears it.
444
445        When called without a pipeline, this method uses WATCH/MULTI/EXEC.
446        If you pass a pipeline, only MULTI is called. The rest is up to the
447        caller.
448        """
449        from .registry import DeferredJobRegistry
450
451        pipe = pipeline if pipeline is not None else self.connection.pipeline()
452        dependents_key = job.dependents_key
453
454        while True:
455            try:
456                # if a pipeline is passed, the caller is responsible for calling WATCH
457                # to ensure all jobs are enqueued
458                if pipeline is None:
459                    pipe.watch(dependents_key)
460
461                dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection)
462                                  for job_id in pipe.smembers(dependents_key)]
463
464                pipe.multi()
465
466                for dependent in dependent_jobs:
467                    registry = DeferredJobRegistry(dependent.origin,
468                                                   self.connection,
469                                                   job_class=self.job_class)
470                    registry.remove(dependent, pipeline=pipe)
471                    if dependent.origin == self.name:
472                        self.enqueue_job(dependent, pipeline=pipe)
473                    else:
474                        queue = self.__class__(name=dependent.origin, connection=self.connection)
475                        queue.enqueue_job(dependent, pipeline=pipe)
476
477                pipe.delete(dependents_key)
478
479                if pipeline is None:
480                    pipe.execute()
481
482                break
483            except WatchError:
484                if pipeline is None:
485                    continue
486                else:
487                    # if the pipeline comes from the caller, we re-raise the
488                    # exception as it it the responsibility of the caller to
489                    # handle it
490                    raise
491
492    def pop_job_id(self):
493        """Pops a given job ID from this Redis queue."""
494        return as_text(self.connection.lpop(self.key))
495
496    @classmethod
497    def lpop(cls, queue_keys, timeout, connection=None):
498        """Helper method.  Intermediate method to abstract away from some
499        Redis API details, where LPOP accepts only a single key, whereas BLPOP
500        accepts multiple.  So if we want the non-blocking LPOP, we need to
501        iterate over all queues, do individual LPOPs, and return the result.
502
503        Until Redis receives a specific method for this, we'll have to wrap it
504        this way.
505
506        The timeout parameter is interpreted as follows:
507            None - non-blocking (return immediately)
508             > 0 - maximum number of seconds to block
509        """
510        connection = resolve_connection(connection)
511        if timeout is not None:  # blocking variant
512            if timeout == 0:
513                raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
514            result = connection.blpop(queue_keys, timeout)
515            if result is None:
516                raise DequeueTimeout(timeout, queue_keys)
517            queue_key, job_id = result
518            return queue_key, job_id
519        else:  # non-blocking variant
520            for queue_key in queue_keys:
521                blob = connection.lpop(queue_key)
522                if blob is not None:
523                    return queue_key, blob
524            return None
525
526    @classmethod
527    def dequeue_any(cls, queues, timeout, connection=None, job_class=None):
528        """Class method returning the job_class instance at the front of the given
529        set of Queues, where the order of the queues is important.
530
531        When all of the Queues are empty, depending on the `timeout` argument,
532        either blocks execution of this function for the duration of the
533        timeout or until new messages arrive on any of the queues, or returns
534        None.
535
536        See the documentation of cls.lpop for the interpretation of timeout.
537        """
538        job_class = backend_class(cls, 'job_class', override=job_class)
539
540        while True:
541            queue_keys = [q.key for q in queues]
542            result = cls.lpop(queue_keys, timeout, connection=connection)
543            if result is None:
544                return None
545            queue_key, job_id = map(as_text, result)
546            queue = cls.from_queue_key(queue_key,
547                                       connection=connection,
548                                       job_class=job_class)
549            try:
550                job = job_class.fetch(job_id, connection=connection)
551            except NoSuchJobError:
552                # Silently pass on jobs that don't exist (anymore),
553                # and continue in the look
554                continue
555            except UnpickleError as e:
556                # Attach queue information on the exception for improved error
557                # reporting
558                e.job_id = job_id
559                e.queue = queue
560                raise e
561            return job, queue
562        return None, None
563
564    # Total ordering defition (the rest of the required Python methods are
565    # auto-generated by the @total_ordering decorator)
566    def __eq__(self, other):  # noqa
567        if not isinstance(other, Queue):
568            raise TypeError('Cannot compare queues to other objects')
569        return self.name == other.name
570
571    def __lt__(self, other):
572        if not isinstance(other, Queue):
573            raise TypeError('Cannot compare queues to other objects')
574        return self.name < other.name
575
576    def __hash__(self):  # pragma: no cover
577        return hash(self.name)
578
579    def __repr__(self):  # noqa  # pragma: no cover
580        return '{0}({1!r})'.format(self.__class__.__name__, self.name)
581
582    def __str__(self):
583        return '<{0} {1}>'.format(self.__class__.__name__, self.name)
584