1# -*- coding: utf-8 -*-
2from __future__ import (absolute_import, division, print_function,
3                        unicode_literals)
4
5import inspect
6import warnings
7import zlib
8from functools import partial
9from uuid import uuid4
10
11from rq.compat import as_text, decode_redis_hash, string_types, text_type
12
13from .connections import resolve_connection
14from .exceptions import InvalidJobDependency, NoSuchJobError, UnpickleError
15from .local import LocalStack
16from .utils import (enum, import_attribute, parse_timeout, str_to_date,
17                    utcformat, utcnow)
18
19try:
20    import cPickle as pickle
21except ImportError:  # noqa  # pragma: no cover
22    import pickle
23
24
25# Serialize pickle dumps using the highest pickle protocol (binary, default
26# uses ascii)
27dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
28loads = pickle.loads
29
30JobStatus = enum(
31    'JobStatus',
32    QUEUED='queued',
33    FINISHED='finished',
34    FAILED='failed',
35    STARTED='started',
36    DEFERRED='deferred',
37    SCHEDULED='scheduled',
38)
39
40# Sentinel value to mark that some of our lazily evaluated properties have not
41# yet been evaluated.
42UNEVALUATED = object()
43
44
45def unpickle(pickled_string):
46    """Unpickles a string, but raises a unified UnpickleError in case anything
47    fails.
48
49    This is a helper method to not have to deal with the fact that `loads()`
50    potentially raises many types of exceptions (e.g. AttributeError,
51    IndexError, TypeError, KeyError, etc.)
52    """
53    try:
54        obj = loads(pickled_string)
55    except Exception as e:
56        raise UnpickleError('Could not unpickle', pickled_string, e)
57    return obj
58
59
60def cancel_job(job_id, connection=None):
61    """Cancels the job with the given job ID, preventing execution.  Discards
62    any job info (i.e. it can't be requeued later).
63    """
64    Job.fetch(job_id, connection=connection).cancel()
65
66
67def get_current_job(connection=None, job_class=None):
68    """Returns the Job instance that is currently being executed.  If this
69    function is invoked from outside a job context, None is returned.
70    """
71    if job_class:
72        warnings.warn("job_class argument for get_current_job is deprecated.",
73                      DeprecationWarning)
74    return _job_stack.top
75
76
77def requeue_job(job_id, connection):
78    job = Job.fetch(job_id, connection=connection)
79    return job.requeue()
80
81
82class Job(object):
83    """A Job is just a convenient datastructure to pass around job (meta) data.
84    """
85    redis_job_namespace_prefix = 'rq:job:'
86
87    # Job construction
88    @classmethod
89    def create(cls, func, args=None, kwargs=None, connection=None,
90               result_ttl=None, ttl=None, status=None, description=None,
91               depends_on=None, timeout=None, id=None, origin=None, meta=None,
92               failure_ttl=None):
93        """Creates a new Job instance for the given function, arguments, and
94        keyword arguments.
95        """
96        if args is None:
97            args = ()
98        if kwargs is None:
99            kwargs = {}
100
101        if not isinstance(args, (tuple, list)):
102            raise TypeError('{0!r} is not a valid args list'.format(args))
103        if not isinstance(kwargs, dict):
104            raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
105
106        job = cls(connection=connection)
107        if id is not None:
108            job.set_id(id)
109
110        if origin is not None:
111            job.origin = origin
112
113        # Set the core job tuple properties
114        job._instance = None
115        if inspect.ismethod(func):
116            job._instance = func.__self__
117            job._func_name = func.__name__
118        elif inspect.isfunction(func) or inspect.isbuiltin(func):
119            job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
120        elif isinstance(func, string_types):
121            job._func_name = as_text(func)
122        elif not inspect.isclass(func) and hasattr(func, '__call__'):  # a callable class instance
123            job._instance = func
124            job._func_name = '__call__'
125        else:
126            raise TypeError('Expected a callable or a string, but got: {0}'.format(func))
127        job._args = args
128        job._kwargs = kwargs
129
130        # Extra meta data
131        job.description = description or job.get_call_string()
132        job.result_ttl = parse_timeout(result_ttl)
133        job.failure_ttl = parse_timeout(failure_ttl)
134        job.ttl = parse_timeout(ttl)
135        job.timeout = parse_timeout(timeout)
136        job._status = status
137        job.meta = meta or {}
138
139        # dependency could be job instance or id
140        if depends_on is not None:
141            job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
142        return job
143
144    def get_status(self, refresh=True):
145        if refresh:
146            self._status = as_text(self.connection.hget(self.key, 'status'))
147
148        return self._status
149
150    def set_status(self, status, pipeline=None):
151        self._status = status
152        connection = pipeline or self.connection
153        connection.hset(self.key, 'status', self._status)
154
155    @property
156    def is_finished(self):
157        return self.get_status() == JobStatus.FINISHED
158
159    @property
160    def is_queued(self):
161        return self.get_status() == JobStatus.QUEUED
162
163    @property
164    def is_failed(self):
165        return self.get_status() == JobStatus.FAILED
166
167    @property
168    def is_started(self):
169        return self.get_status() == JobStatus.STARTED
170
171    @property
172    def is_deferred(self):
173        return self.get_status() == JobStatus.DEFERRED
174
175    @property
176    def is_scheduled(self):
177        return self.get_status() == JobStatus.SCHEDULED
178
179    @property
180    def _dependency_id(self):
181        """Returns the first item in self._dependency_ids. Present
182        preserve compatibility with third party packages..
183        """
184        if self._dependency_ids:
185            return self._dependency_ids[0]
186
187    @property
188    def dependency(self):
189        """Returns a job's dependency. To avoid repeated Redis fetches, we cache
190        job.dependency as job._dependency.
191        """
192        if not self._dependency_ids:
193            return None
194        if hasattr(self, '_dependency'):
195            return self._dependency
196        job = self.fetch(self._dependency_ids[0], connection=self.connection)
197        self._dependency = job
198        return job
199
200    @property
201    def dependent_ids(self):
202        """Returns a list of ids of jobs whose execution depends on this
203        job's successful execution."""
204        return list(map(as_text, self.connection.smembers(self.dependents_key)))
205
206    @property
207    def func(self):
208        func_name = self.func_name
209        if func_name is None:
210            return None
211
212        if self.instance:
213            return getattr(self.instance, func_name)
214
215        return import_attribute(self.func_name)
216
217    def _unpickle_data(self):
218        self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
219
220    @property
221    def data(self):
222        if self._data is UNEVALUATED:
223            if self._func_name is UNEVALUATED:
224                raise ValueError('Cannot build the job data')
225
226            if self._instance is UNEVALUATED:
227                self._instance = None
228
229            if self._args is UNEVALUATED:
230                self._args = ()
231
232            if self._kwargs is UNEVALUATED:
233                self._kwargs = {}
234
235            job_tuple = self._func_name, self._instance, self._args, self._kwargs
236            self._data = dumps(job_tuple)
237        return self._data
238
239    @data.setter
240    def data(self, value):
241        self._data = value
242        self._func_name = UNEVALUATED
243        self._instance = UNEVALUATED
244        self._args = UNEVALUATED
245        self._kwargs = UNEVALUATED
246
247    @property
248    def func_name(self):
249        if self._func_name is UNEVALUATED:
250            self._unpickle_data()
251        return self._func_name
252
253    @func_name.setter
254    def func_name(self, value):
255        self._func_name = value
256        self._data = UNEVALUATED
257
258    @property
259    def instance(self):
260        if self._instance is UNEVALUATED:
261            self._unpickle_data()
262        return self._instance
263
264    @instance.setter
265    def instance(self, value):
266        self._instance = value
267        self._data = UNEVALUATED
268
269    @property
270    def args(self):
271        if self._args is UNEVALUATED:
272            self._unpickle_data()
273        return self._args
274
275    @args.setter
276    def args(self, value):
277        self._args = value
278        self._data = UNEVALUATED
279
280    @property
281    def kwargs(self):
282        if self._kwargs is UNEVALUATED:
283            self._unpickle_data()
284        return self._kwargs
285
286    @kwargs.setter
287    def kwargs(self, value):
288        self._kwargs = value
289        self._data = UNEVALUATED
290
291    @classmethod
292    def exists(cls, job_id, connection=None):
293        """Returns whether a job hash exists for the given job ID."""
294        conn = resolve_connection(connection)
295        return conn.exists(cls.key_for(job_id))
296
297    @classmethod
298    def fetch(cls, id, connection=None):
299        """Fetches a persisted job from its corresponding Redis key and
300        instantiates it.
301        """
302        job = cls(id, connection=connection)
303        job.refresh()
304        return job
305
306    @classmethod
307    def fetch_many(cls, job_ids, connection):
308        """
309        Bulk version of Job.fetch
310
311        For any job_ids which a job does not exist, the corresponding item in
312        the returned list will be None.
313        """
314        with connection.pipeline() as pipeline:
315            for job_id in job_ids:
316                pipeline.hgetall(cls.key_for(job_id))
317            results = pipeline.execute()
318
319        jobs = []
320        for i, job_id in enumerate(job_ids):
321            if results[i]:
322                job = cls(job_id, connection=connection)
323                job.restore(results[i])
324                jobs.append(job)
325            else:
326                jobs.append(None)
327
328        return jobs
329
330    def __init__(self, id=None, connection=None):
331        self.connection = resolve_connection(connection)
332        self._id = id
333        self.created_at = utcnow()
334        self._data = UNEVALUATED
335        self._func_name = UNEVALUATED
336        self._instance = UNEVALUATED
337        self._args = UNEVALUATED
338        self._kwargs = UNEVALUATED
339        self.description = None
340        self.origin = None
341        self.enqueued_at = None
342        self.started_at = None
343        self.ended_at = None
344        self._result = None
345        self.exc_info = None
346        self.timeout = None
347        self.result_ttl = None
348        self.failure_ttl = None
349        self.ttl = None
350        self._status = None
351        self._dependency_ids = []
352        self.meta = {}
353
354    def __repr__(self):  # noqa  # pragma: no cover
355        return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
356                                                      self._id,
357                                                      self.enqueued_at)
358
359    def __str__(self):
360        return '<{0} {1}: {2}>'.format(self.__class__.__name__,
361                                       self.id,
362                                       self.description)
363
364    # Job equality
365    def __eq__(self, other):  # noqa
366        return isinstance(other, self.__class__) and self.id == other.id
367
368    def __hash__(self):  # pragma: no cover
369        return hash(self.id)
370
371    # Data access
372    def get_id(self):  # noqa
373        """The job ID for this job instance. Generates an ID lazily the
374        first time the ID is requested.
375        """
376        if self._id is None:
377            self._id = text_type(uuid4())
378        return self._id
379
380    def set_id(self, value):
381        """Sets a job ID for the given job."""
382        if not isinstance(value, string_types):
383            raise TypeError('id must be a string, not {0}'.format(type(value)))
384        self._id = value
385
386    id = property(get_id, set_id)
387
388    @classmethod
389    def key_for(cls, job_id):
390        """The Redis key that is used to store job hash under."""
391        return (cls.redis_job_namespace_prefix + job_id).encode('utf-8')
392
393    @classmethod
394    def dependents_key_for(cls, job_id):
395        """The Redis key that is used to store job dependents hash under."""
396        return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id)
397
398    @property
399    def key(self):
400        """The Redis key that is used to store job hash under."""
401        return self.key_for(self.id)
402
403    @property
404    def dependents_key(self):
405        """The Redis key that is used to store job dependents hash under."""
406        return self.dependents_key_for(self.id)
407
408    @property
409    def dependencies_key(self):
410        return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id)
411
412    def fetch_dependencies(self, watch=False, pipeline=None):
413        """
414        Fetch all of a job's dependencies. If a pipeline is supplied, and
415        watch is true, then set WATCH on all the keys of all dependencies.
416
417        Returned jobs will use self's connection, not the pipeline supplied.
418        """
419        connection = pipeline if pipeline is not None else self.connection
420
421        if watch and self._dependency_ids:
422            connection.watch(*self._dependency_ids)
423
424        jobs = self.fetch_many(self._dependency_ids, connection=self.connection)
425
426        for i, job in enumerate(jobs):
427            if not job:
428                raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i]))
429
430        return jobs
431
432
433    @property
434    def result(self):
435        """Returns the return value of the job.
436
437        Initially, right after enqueueing a job, the return value will be
438        None.  But when the job has been executed, and had a return value or
439        exception, this will return that value or exception.
440
441        Note that, when the job has no return value (i.e. returns None), the
442        ReadOnlyJob object is useless, as the result won't be written back to
443        Redis.
444
445        Also note that you cannot draw the conclusion that a job has _not_
446        been executed when its return value is None, since return values
447        written back to Redis will expire after a given amount of time (500
448        seconds by default).
449        """
450        if self._result is None:
451            rv = self.connection.hget(self.key, 'result')
452            if rv is not None:
453                # cache the result
454                self._result = loads(rv)
455        return self._result
456
457    """Backwards-compatibility accessor property `return_value`."""
458    return_value = result
459
460    def restore(self, raw_data):
461        """Overwrite properties with the provided values stored in Redis"""
462        obj = decode_redis_hash(raw_data)
463        try:
464            raw_data = obj['data']
465        except KeyError:
466            raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
467
468        try:
469            self.data = zlib.decompress(raw_data)
470        except zlib.error:
471            # Fallback to uncompressed string
472            self.data = raw_data
473
474        self.created_at = str_to_date(obj.get('created_at'))
475        self.origin = as_text(obj.get('origin'))
476        self.description = as_text(obj.get('description'))
477        self.enqueued_at = str_to_date(obj.get('enqueued_at'))
478        self.started_at = str_to_date(obj.get('started_at'))
479        self.ended_at = str_to_date(obj.get('ended_at'))
480        result = obj.get('result')
481        if result:
482            try:
483                self._result = unpickle(obj.get('result'))
484            except UnpickleError:
485                self._result = 'Unpickleable return value'
486        self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
487        self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None  # noqa
488        self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None  # noqa
489        self._status = as_text(obj.get('status')) if obj.get('status') else None
490
491        dependency_id = obj.get('dependency_id', None)
492        self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
493
494        self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
495        self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
496
497        raw_exc_info = obj.get('exc_info')
498        if raw_exc_info:
499            try:
500                self.exc_info = as_text(zlib.decompress(raw_exc_info))
501            except zlib.error:
502                # Fallback to uncompressed string
503                self.exc_info = as_text(raw_exc_info)
504
505    # Persistence
506    def refresh(self):  # noqa
507        """Overwrite the current instance's properties with the values in the
508        corresponding Redis key.
509
510        Will raise a NoSuchJobError if no corresponding Redis key exists.
511        """
512        data = self.connection.hgetall(self.key)
513        if not data:
514            raise NoSuchJobError('No such job: {0}'.format(self.key))
515        self.restore(data)
516
517    def to_dict(self, include_meta=True):
518        """
519        Returns a serialization of the current job instance
520
521        You can exclude serializing the `meta` dictionary by setting
522        `include_meta=False`.
523        """
524        obj = {}
525        obj['created_at'] = utcformat(self.created_at or utcnow())
526        obj['data'] = zlib.compress(self.data)
527
528        if self.origin is not None:
529            obj['origin'] = self.origin
530        if self.description is not None:
531            obj['description'] = self.description
532        if self.enqueued_at is not None:
533            obj['enqueued_at'] = utcformat(self.enqueued_at)
534        if self.started_at is not None:
535            obj['started_at'] = utcformat(self.started_at)
536        if self.ended_at is not None:
537            obj['ended_at'] = utcformat(self.ended_at)
538        if self._result is not None:
539            try:
540                obj['result'] = dumps(self._result)
541            except:
542                obj['result'] = 'Unpickleable return value'
543        if self.exc_info is not None:
544            obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
545        if self.timeout is not None:
546            obj['timeout'] = self.timeout
547        if self.result_ttl is not None:
548            obj['result_ttl'] = self.result_ttl
549        if self.failure_ttl is not None:
550            obj['failure_ttl'] = self.failure_ttl
551        if self._status is not None:
552            obj['status'] = self._status
553        if self._dependency_ids:
554            obj['dependency_id'] = self._dependency_ids[0]
555        if self.meta and include_meta:
556            obj['meta'] = dumps(self.meta)
557        if self.ttl:
558            obj['ttl'] = self.ttl
559
560        return obj
561
562    def save(self, pipeline=None, include_meta=True):
563        """
564        Dumps the current job instance to its corresponding Redis key.
565
566        Exclude saving the `meta` dictionary by setting
567        `include_meta=False`. This is useful to prevent clobbering
568        user metadata without an expensive `refresh()` call first.
569
570        Redis key persistence may be altered by `cleanup()` method.
571        """
572        key = self.key
573        connection = pipeline if pipeline is not None else self.connection
574
575        connection.hmset(key, self.to_dict(include_meta=include_meta))
576
577    def save_meta(self):
578        """Stores job meta from the job instance to the corresponding Redis key."""
579        meta = dumps(self.meta)
580        self.connection.hset(self.key, 'meta', meta)
581
582    def cancel(self, pipeline=None):
583        """Cancels the given job, which will prevent the job from ever being
584        ran (or inspected).
585
586        This method merely exists as a high-level API call to cancel jobs
587        without worrying about the internals required to implement job
588        cancellation.
589        """
590        from .queue import Queue
591        pipeline = pipeline or self.connection.pipeline()
592        if self.origin:
593            q = Queue(name=self.origin, connection=self.connection)
594            q.remove(self, pipeline=pipeline)
595        pipeline.execute()
596
597    def requeue(self):
598        """Requeues job."""
599        self.failed_job_registry.requeue(self)
600
601    def delete(self, pipeline=None, remove_from_queue=True,
602               delete_dependents=False):
603        """Cancels the job and deletes the job hash from Redis. Jobs depending
604        on this job can optionally be deleted as well."""
605        if remove_from_queue:
606            self.cancel(pipeline=pipeline)
607        connection = pipeline if pipeline is not None else self.connection
608
609        if self.is_finished:
610            from .registry import FinishedJobRegistry
611            registry = FinishedJobRegistry(self.origin,
612                                           connection=self.connection,
613                                           job_class=self.__class__)
614            registry.remove(self, pipeline=pipeline)
615
616        elif self.is_deferred:
617            from .registry import DeferredJobRegistry
618            registry = DeferredJobRegistry(self.origin,
619                                           connection=self.connection,
620                                           job_class=self.__class__)
621            registry.remove(self, pipeline=pipeline)
622
623        elif self.is_started:
624            from .registry import StartedJobRegistry
625            registry = StartedJobRegistry(self.origin,
626                                          connection=self.connection,
627                                          job_class=self.__class__)
628            registry.remove(self, pipeline=pipeline)
629
630        elif self.is_scheduled:
631            from .registry import ScheduledJobRegistry
632            registry = ScheduledJobRegistry(self.origin,
633                                            connection=self.connection,
634                                            job_class=self.__class__)
635            registry.remove(self, pipeline=pipeline)
636
637        elif self.is_failed:
638            self.failed_job_registry.remove(self, pipeline=pipeline)
639
640        if delete_dependents:
641            self.delete_dependents(pipeline=pipeline)
642
643        connection.delete(self.key, self.dependents_key, self.dependencies_key)
644
645    def delete_dependents(self, pipeline=None):
646        """Delete jobs depending on this job."""
647        connection = pipeline if pipeline is not None else self.connection
648        for dependent_id in self.dependent_ids:
649            try:
650                job = Job.fetch(dependent_id, connection=self.connection)
651                job.delete(pipeline=pipeline,
652                           remove_from_queue=False)
653            except NoSuchJobError:
654                # It could be that the dependent job was never saved to redis
655                pass
656        connection.delete(self.dependents_key)
657
658    # Job execution
659    def perform(self):  # noqa
660        """Invokes the job function with the job arguments."""
661        self.connection.persist(self.key)
662        _job_stack.push(self)
663        try:
664            self._result = self._execute()
665        finally:
666            assert self is _job_stack.pop()
667        return self._result
668
669    def _execute(self):
670        return self.func(*self.args, **self.kwargs)
671
672    def get_ttl(self, default_ttl=None):
673        """Returns ttl for a job that determines how long a job will be
674        persisted. In the future, this method will also be responsible
675        for determining ttl for repeated jobs.
676        """
677        return default_ttl if self.ttl is None else self.ttl
678
679    def get_result_ttl(self, default_ttl=None):
680        """Returns ttl for a job that determines how long a jobs result will
681        be persisted. In the future, this method will also be responsible
682        for determining ttl for repeated jobs.
683        """
684        return default_ttl if self.result_ttl is None else self.result_ttl
685
686    # Representation
687    def get_call_string(self):  # noqa
688        """Returns a string representation of the call, formatted as a regular
689        Python function invocation statement.
690        """
691        if self.func_name is None:
692            return None
693
694        arg_list = [as_text(repr(arg)) for arg in self.args]
695
696        kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()]
697        # Sort here because python 3.3 & 3.4 makes different call_string
698        arg_list += sorted(kwargs)
699        args = ', '.join(arg_list)
700
701        return '{0}({1})'.format(self.func_name, args)
702
703    def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True):
704        """Prepare job for eventual deletion (if needed). This method is usually
705        called after successful execution. How long we persist the job and its
706        result depends on the value of ttl:
707        - If ttl is 0, cleanup the job immediately.
708        - If it's a positive number, set the job to expire in X seconds.
709        - If ttl is negative, don't set an expiry to it (persist
710          forever)
711        """
712        if ttl == 0:
713            self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue)
714        elif not ttl:
715            return
716        elif ttl > 0:
717            connection = pipeline if pipeline is not None else self.connection
718            connection.expire(self.key, ttl)
719            connection.expire(self.dependents_key, ttl)
720            connection.expire(self.dependencies_key, ttl)
721
722    @property
723    def failed_job_registry(self):
724        from .registry import FailedJobRegistry
725        return FailedJobRegistry(self.origin, connection=self.connection,
726                                 job_class=self.__class__)
727
728    def register_dependency(self, pipeline=None):
729        """Jobs may have dependencies. Jobs are enqueued only if the job they
730        depend on is successfully performed. We record this relation as
731        a reverse dependency (a Redis set), with a key that looks something
732        like:
733
734            rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
735
736        This method adds the job in its dependency's dependents set
737        and adds the job to DeferredJobRegistry.
738        """
739        from .registry import DeferredJobRegistry
740
741        registry = DeferredJobRegistry(self.origin,
742                                       connection=self.connection,
743                                       job_class=self.__class__)
744        registry.add(self, pipeline=pipeline)
745
746        connection = pipeline if pipeline is not None else self.connection
747
748        for dependency_id in self._dependency_ids:
749            dependents_key = self.dependents_key_for(dependency_id)
750            connection.sadd(dependents_key, self.id)
751            connection.sadd(self.dependencies_key, dependency_id)
752
753_job_stack = LocalStack()
754