1import calendar
2import time
3from datetime import datetime, timedelta
4
5from .compat import as_text, utc
6from .connections import resolve_connection
7from .defaults import DEFAULT_FAILURE_TTL
8from .exceptions import InvalidJobOperation, NoSuchJobError
9from .job import Job, JobStatus
10from .queue import Queue
11from .utils import backend_class, current_timestamp
12
13
14class BaseRegistry(object):
15    """
16    Base implementation of a job registry, implemented in Redis sorted set.
17    Each job is stored as a key in the registry, scored by expiration time
18    (unix timestamp).
19    """
20    job_class = Job
21    key_template = 'rq:registry:{0}'
22
23    def __init__(self, name='default', connection=None, job_class=None,
24                 queue=None):
25        if queue:
26            self.name = queue.name
27            self.connection = resolve_connection(queue.connection)
28        else:
29            self.name = name
30            self.connection = resolve_connection(connection)
31
32        self.key = self.key_template.format(self.name)
33        self.job_class = backend_class(self, 'job_class', override=job_class)
34
35    def __len__(self):
36        """Returns the number of jobs in this registry"""
37        return self.count
38
39    def __eq__(self, other):
40        return (self.name == other.name and self.connection == other.connection)
41
42    def __contains__(self, item):
43        """
44        Returns a boolean indicating registry contains the given
45        job instance or job id.
46        """
47        job_id = item
48        if isinstance(item, self.job_class):
49            job_id = item.id
50        return self.connection.zscore(self.key, job_id) is not None
51
52    @property
53    def count(self):
54        """Returns the number of jobs in this registry"""
55        self.cleanup()
56        return self.connection.zcard(self.key)
57
58    def add(self, job, ttl=0, pipeline=None):
59        """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
60        score = ttl if ttl < 0 else current_timestamp() + ttl
61        if score == -1:
62            score = '+inf'
63        if pipeline is not None:
64            return pipeline.zadd(self.key, {job.id: score})
65
66        return self.connection.zadd(self.key, {job.id: score})
67
68    def remove(self, job, pipeline=None, delete_job=False):
69        """Removes job from registry and deletes it if `delete_job == True`"""
70        connection = pipeline if pipeline is not None else self.connection
71        job_id = job.id if isinstance(job, self.job_class) else job
72        result = connection.zrem(self.key, job_id)
73        if delete_job:
74            if isinstance(job, self.job_class):
75                job_instance = job
76            else:
77                job_instance = Job.fetch(job_id, connection=connection)
78            job_instance.delete()
79        return result
80
81    def get_expired_job_ids(self, timestamp=None):
82        """Returns job ids whose score are less than current timestamp.
83
84        Returns ids for jobs with an expiry time earlier than timestamp,
85        specified as seconds since the Unix epoch. timestamp defaults to call
86        time if unspecified.
87        """
88        score = timestamp if timestamp is not None else current_timestamp()
89        return [as_text(job_id) for job_id in
90                self.connection.zrangebyscore(self.key, 0, score)]
91
92    def get_job_ids(self, start=0, end=-1):
93        """Returns list of all job ids."""
94        self.cleanup()
95        return [as_text(job_id) for job_id in
96                self.connection.zrange(self.key, start, end)]
97
98    def get_queue(self):
99        """Returns Queue object associated with this registry."""
100        return Queue(self.name, connection=self.connection)
101
102    def get_expiration_time(self, job):
103        """Returns job's expiration time."""
104        score = self.connection.zscore(self.key, job.id)
105        return datetime.utcfromtimestamp(score)
106
107
108class StartedJobRegistry(BaseRegistry):
109    """
110    Registry of currently executing jobs. Each queue maintains a
111    StartedJobRegistry. Jobs in this registry are ones that are currently
112    being executed.
113
114    Jobs are added to registry right before they are executed and removed
115    right after completion (success or failure).
116    """
117    key_template = 'rq:wip:{0}'
118
119    def cleanup(self, timestamp=None):
120        """Remove expired jobs from registry and add them to FailedJobRegistry.
121
122        Removes jobs with an expiry time earlier than timestamp, specified as
123        seconds since the Unix epoch. timestamp defaults to call time if
124        unspecified. Removed jobs are added to the global failed job queue.
125        """
126        score = timestamp if timestamp is not None else current_timestamp()
127        job_ids = self.get_expired_job_ids(score)
128
129        if job_ids:
130            failed_job_registry = FailedJobRegistry(self.name, self.connection)
131
132            with self.connection.pipeline() as pipeline:
133                for job_id in job_ids:
134                    try:
135                        job = self.job_class.fetch(job_id,
136                                                   connection=self.connection)
137                        job.set_status(JobStatus.FAILED)
138                        job.save(pipeline=pipeline, include_meta=False)
139                        job.cleanup(ttl=-1, pipeline=pipeline)
140                        failed_job_registry.add(job, job.failure_ttl)
141                    except NoSuchJobError:
142                        pass
143
144                pipeline.zremrangebyscore(self.key, 0, score)
145                pipeline.execute()
146
147        return job_ids
148
149
150class FinishedJobRegistry(BaseRegistry):
151    """
152    Registry of jobs that have been completed. Jobs are added to this
153    registry after they have successfully completed for monitoring purposes.
154    """
155    key_template = 'rq:finished:{0}'
156
157    def cleanup(self, timestamp=None):
158        """Remove expired jobs from registry.
159
160        Removes jobs with an expiry time earlier than timestamp, specified as
161        seconds since the Unix epoch. timestamp defaults to call time if
162        unspecified.
163        """
164        score = timestamp if timestamp is not None else current_timestamp()
165        self.connection.zremrangebyscore(self.key, 0, score)
166
167
168class FailedJobRegistry(BaseRegistry):
169    """
170    Registry of containing failed jobs.
171    """
172    key_template = 'rq:failed:{0}'
173
174    def cleanup(self, timestamp=None):
175        """Remove expired jobs from registry.
176
177        Removes jobs with an expiry time earlier than timestamp, specified as
178        seconds since the Unix epoch. timestamp defaults to call time if
179        unspecified.
180        """
181        score = timestamp if timestamp is not None else current_timestamp()
182        self.connection.zremrangebyscore(self.key, 0, score)
183
184    def add(self, job, ttl=None, exc_string='', pipeline=None):
185        """
186        Adds a job to a registry with expiry time of now + ttl.
187        `ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
188        """
189        if ttl is None:
190            ttl = DEFAULT_FAILURE_TTL
191        score = ttl if ttl < 0 else current_timestamp() + ttl
192
193        if pipeline:
194            p = pipeline
195        else:
196            p = self.connection.pipeline()
197
198        job.exc_info = exc_string
199        job.save(pipeline=p, include_meta=False)
200        job.cleanup(ttl=ttl, pipeline=p)
201        p.zadd(self.key, {job.id: score})
202
203        if not pipeline:
204            p.execute()
205
206    def requeue(self, job_or_id):
207        """Requeues the job with the given job ID."""
208        if isinstance(job_or_id, self.job_class):
209            job = job_or_id
210        else:
211            job = self.job_class.fetch(job_or_id, connection=self.connection)
212
213        result = self.connection.zrem(self.key, job.id)
214        if not result:
215            raise InvalidJobOperation
216
217        queue = Queue(job.origin, connection=self.connection,
218                      job_class=self.job_class)
219
220        return queue.enqueue_job(job)
221
222
223class DeferredJobRegistry(BaseRegistry):
224    """
225    Registry of deferred jobs (waiting for another job to finish).
226    """
227    key_template = 'rq:deferred:{0}'
228
229    def cleanup(self):
230        """This method is only here to prevent errors because this method is
231        automatically called by `count()` and `get_job_ids()` methods
232        implemented in BaseRegistry."""
233        pass
234
235
236class ScheduledJobRegistry(BaseRegistry):
237    """
238    Registry of scheduled jobs.
239    """
240    key_template = 'rq:scheduled:{0}'
241
242    def __init__(self, *args, **kwargs):
243        super(ScheduledJobRegistry, self).__init__(*args, **kwargs)
244        # The underlying implementation of get_jobs_to_enqueue() is
245        # the same as get_expired_job_ids, but get_expired_job_ids() doesn't
246        # make sense in this context
247        self.get_jobs_to_enqueue = self.get_expired_job_ids
248
249    def schedule(self, job, scheduled_datetime, pipeline=None):
250        """
251        Adds job to registry, scored by its execution time (in UTC).
252        If datetime has no tzinfo, it will assume localtimezone.
253        """
254        # If datetime has no timezone, assume server's local timezone
255        # if we're on Python 3. If we're on Python 2.7, raise an
256        # exception since Python < 3.2 has no builtin `timezone` class
257        if not scheduled_datetime.tzinfo:
258            try:
259                from datetime import timezone
260            except ImportError:
261                raise ValueError('datetime object with no timezone')
262            tz = timezone(timedelta(seconds=-time.timezone))
263            scheduled_datetime = scheduled_datetime.replace(tzinfo=tz)
264
265        timestamp = calendar.timegm(scheduled_datetime.utctimetuple())
266        return self.connection.zadd(self.key, {job.id: timestamp})
267
268    def cleanup(self):
269        """This method is only here to prevent errors because this method is
270        automatically called by `count()` and `get_job_ids()` methods
271        implemented in BaseRegistry."""
272        pass
273
274    def remove_jobs(self, timestamp=None, pipeline=None):
275        """Remove jobs whose timestamp is in the past from registry."""
276        connection = pipeline if pipeline is not None else self.connection
277        score = timestamp if timestamp is not None else current_timestamp()
278        return connection.zremrangebyscore(self.key, 0, score)
279
280    def get_jobs_to_schedule(self, timestamp=None):
281        """Remove jobs whose timestamp is in the past from registry."""
282        score = timestamp if timestamp is not None else current_timestamp()
283        return [as_text(job_id) for job_id in
284                self.connection.zrangebyscore(self.key, 0, score)]
285
286    def get_scheduled_time(self, job_or_id):
287        """Returns datetime (UTC) at which job is scheduled to be enqueued"""
288        if isinstance(job_or_id, self.job_class):
289            job_id = job_or_id.id
290        else:
291            job_id = job_or_id
292
293        score = self.connection.zscore(self.key, job_id)
294        if not score:
295            raise NoSuchJobError
296
297        return datetime.fromtimestamp(score, tz=utc)
298
299
300def clean_registries(queue):
301    """Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
302    registry = FinishedJobRegistry(name=queue.name,
303                                   connection=queue.connection,
304                                   job_class=queue.job_class)
305    registry.cleanup()
306    registry = StartedJobRegistry(name=queue.name,
307                                  connection=queue.connection,
308                                  job_class=queue.job_class)
309    registry.cleanup()
310
311    registry = FailedJobRegistry(name=queue.name,
312                                 connection=queue.connection,
313                                 job_class=queue.job_class)
314    registry.cleanup()
315