1import calendar 2import time 3from datetime import datetime, timedelta 4 5from .compat import as_text, utc 6from .connections import resolve_connection 7from .defaults import DEFAULT_FAILURE_TTL 8from .exceptions import InvalidJobOperation, NoSuchJobError 9from .job import Job, JobStatus 10from .queue import Queue 11from .utils import backend_class, current_timestamp 12 13 14class BaseRegistry(object): 15 """ 16 Base implementation of a job registry, implemented in Redis sorted set. 17 Each job is stored as a key in the registry, scored by expiration time 18 (unix timestamp). 19 """ 20 job_class = Job 21 key_template = 'rq:registry:{0}' 22 23 def __init__(self, name='default', connection=None, job_class=None, 24 queue=None): 25 if queue: 26 self.name = queue.name 27 self.connection = resolve_connection(queue.connection) 28 else: 29 self.name = name 30 self.connection = resolve_connection(connection) 31 32 self.key = self.key_template.format(self.name) 33 self.job_class = backend_class(self, 'job_class', override=job_class) 34 35 def __len__(self): 36 """Returns the number of jobs in this registry""" 37 return self.count 38 39 def __eq__(self, other): 40 return (self.name == other.name and self.connection == other.connection) 41 42 def __contains__(self, item): 43 """ 44 Returns a boolean indicating registry contains the given 45 job instance or job id. 46 """ 47 job_id = item 48 if isinstance(item, self.job_class): 49 job_id = item.id 50 return self.connection.zscore(self.key, job_id) is not None 51 52 @property 53 def count(self): 54 """Returns the number of jobs in this registry""" 55 self.cleanup() 56 return self.connection.zcard(self.key) 57 58 def add(self, job, ttl=0, pipeline=None): 59 """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf""" 60 score = ttl if ttl < 0 else current_timestamp() + ttl 61 if score == -1: 62 score = '+inf' 63 if pipeline is not None: 64 return pipeline.zadd(self.key, {job.id: score}) 65 66 return self.connection.zadd(self.key, {job.id: score}) 67 68 def remove(self, job, pipeline=None, delete_job=False): 69 """Removes job from registry and deletes it if `delete_job == True`""" 70 connection = pipeline if pipeline is not None else self.connection 71 job_id = job.id if isinstance(job, self.job_class) else job 72 result = connection.zrem(self.key, job_id) 73 if delete_job: 74 if isinstance(job, self.job_class): 75 job_instance = job 76 else: 77 job_instance = Job.fetch(job_id, connection=connection) 78 job_instance.delete() 79 return result 80 81 def get_expired_job_ids(self, timestamp=None): 82 """Returns job ids whose score are less than current timestamp. 83 84 Returns ids for jobs with an expiry time earlier than timestamp, 85 specified as seconds since the Unix epoch. timestamp defaults to call 86 time if unspecified. 87 """ 88 score = timestamp if timestamp is not None else current_timestamp() 89 return [as_text(job_id) for job_id in 90 self.connection.zrangebyscore(self.key, 0, score)] 91 92 def get_job_ids(self, start=0, end=-1): 93 """Returns list of all job ids.""" 94 self.cleanup() 95 return [as_text(job_id) for job_id in 96 self.connection.zrange(self.key, start, end)] 97 98 def get_queue(self): 99 """Returns Queue object associated with this registry.""" 100 return Queue(self.name, connection=self.connection) 101 102 def get_expiration_time(self, job): 103 """Returns job's expiration time.""" 104 score = self.connection.zscore(self.key, job.id) 105 return datetime.utcfromtimestamp(score) 106 107 108class StartedJobRegistry(BaseRegistry): 109 """ 110 Registry of currently executing jobs. Each queue maintains a 111 StartedJobRegistry. Jobs in this registry are ones that are currently 112 being executed. 113 114 Jobs are added to registry right before they are executed and removed 115 right after completion (success or failure). 116 """ 117 key_template = 'rq:wip:{0}' 118 119 def cleanup(self, timestamp=None): 120 """Remove expired jobs from registry and add them to FailedJobRegistry. 121 122 Removes jobs with an expiry time earlier than timestamp, specified as 123 seconds since the Unix epoch. timestamp defaults to call time if 124 unspecified. Removed jobs are added to the global failed job queue. 125 """ 126 score = timestamp if timestamp is not None else current_timestamp() 127 job_ids = self.get_expired_job_ids(score) 128 129 if job_ids: 130 failed_job_registry = FailedJobRegistry(self.name, self.connection) 131 132 with self.connection.pipeline() as pipeline: 133 for job_id in job_ids: 134 try: 135 job = self.job_class.fetch(job_id, 136 connection=self.connection) 137 job.set_status(JobStatus.FAILED) 138 job.save(pipeline=pipeline, include_meta=False) 139 job.cleanup(ttl=-1, pipeline=pipeline) 140 failed_job_registry.add(job, job.failure_ttl) 141 except NoSuchJobError: 142 pass 143 144 pipeline.zremrangebyscore(self.key, 0, score) 145 pipeline.execute() 146 147 return job_ids 148 149 150class FinishedJobRegistry(BaseRegistry): 151 """ 152 Registry of jobs that have been completed. Jobs are added to this 153 registry after they have successfully completed for monitoring purposes. 154 """ 155 key_template = 'rq:finished:{0}' 156 157 def cleanup(self, timestamp=None): 158 """Remove expired jobs from registry. 159 160 Removes jobs with an expiry time earlier than timestamp, specified as 161 seconds since the Unix epoch. timestamp defaults to call time if 162 unspecified. 163 """ 164 score = timestamp if timestamp is not None else current_timestamp() 165 self.connection.zremrangebyscore(self.key, 0, score) 166 167 168class FailedJobRegistry(BaseRegistry): 169 """ 170 Registry of containing failed jobs. 171 """ 172 key_template = 'rq:failed:{0}' 173 174 def cleanup(self, timestamp=None): 175 """Remove expired jobs from registry. 176 177 Removes jobs with an expiry time earlier than timestamp, specified as 178 seconds since the Unix epoch. timestamp defaults to call time if 179 unspecified. 180 """ 181 score = timestamp if timestamp is not None else current_timestamp() 182 self.connection.zremrangebyscore(self.key, 0, score) 183 184 def add(self, job, ttl=None, exc_string='', pipeline=None): 185 """ 186 Adds a job to a registry with expiry time of now + ttl. 187 `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. 188 """ 189 if ttl is None: 190 ttl = DEFAULT_FAILURE_TTL 191 score = ttl if ttl < 0 else current_timestamp() + ttl 192 193 if pipeline: 194 p = pipeline 195 else: 196 p = self.connection.pipeline() 197 198 job.exc_info = exc_string 199 job.save(pipeline=p, include_meta=False) 200 job.cleanup(ttl=ttl, pipeline=p) 201 p.zadd(self.key, {job.id: score}) 202 203 if not pipeline: 204 p.execute() 205 206 def requeue(self, job_or_id): 207 """Requeues the job with the given job ID.""" 208 if isinstance(job_or_id, self.job_class): 209 job = job_or_id 210 else: 211 job = self.job_class.fetch(job_or_id, connection=self.connection) 212 213 result = self.connection.zrem(self.key, job.id) 214 if not result: 215 raise InvalidJobOperation 216 217 queue = Queue(job.origin, connection=self.connection, 218 job_class=self.job_class) 219 220 return queue.enqueue_job(job) 221 222 223class DeferredJobRegistry(BaseRegistry): 224 """ 225 Registry of deferred jobs (waiting for another job to finish). 226 """ 227 key_template = 'rq:deferred:{0}' 228 229 def cleanup(self): 230 """This method is only here to prevent errors because this method is 231 automatically called by `count()` and `get_job_ids()` methods 232 implemented in BaseRegistry.""" 233 pass 234 235 236class ScheduledJobRegistry(BaseRegistry): 237 """ 238 Registry of scheduled jobs. 239 """ 240 key_template = 'rq:scheduled:{0}' 241 242 def __init__(self, *args, **kwargs): 243 super(ScheduledJobRegistry, self).__init__(*args, **kwargs) 244 # The underlying implementation of get_jobs_to_enqueue() is 245 # the same as get_expired_job_ids, but get_expired_job_ids() doesn't 246 # make sense in this context 247 self.get_jobs_to_enqueue = self.get_expired_job_ids 248 249 def schedule(self, job, scheduled_datetime, pipeline=None): 250 """ 251 Adds job to registry, scored by its execution time (in UTC). 252 If datetime has no tzinfo, it will assume localtimezone. 253 """ 254 # If datetime has no timezone, assume server's local timezone 255 # if we're on Python 3. If we're on Python 2.7, raise an 256 # exception since Python < 3.2 has no builtin `timezone` class 257 if not scheduled_datetime.tzinfo: 258 try: 259 from datetime import timezone 260 except ImportError: 261 raise ValueError('datetime object with no timezone') 262 tz = timezone(timedelta(seconds=-time.timezone)) 263 scheduled_datetime = scheduled_datetime.replace(tzinfo=tz) 264 265 timestamp = calendar.timegm(scheduled_datetime.utctimetuple()) 266 return self.connection.zadd(self.key, {job.id: timestamp}) 267 268 def cleanup(self): 269 """This method is only here to prevent errors because this method is 270 automatically called by `count()` and `get_job_ids()` methods 271 implemented in BaseRegistry.""" 272 pass 273 274 def remove_jobs(self, timestamp=None, pipeline=None): 275 """Remove jobs whose timestamp is in the past from registry.""" 276 connection = pipeline if pipeline is not None else self.connection 277 score = timestamp if timestamp is not None else current_timestamp() 278 return connection.zremrangebyscore(self.key, 0, score) 279 280 def get_jobs_to_schedule(self, timestamp=None): 281 """Remove jobs whose timestamp is in the past from registry.""" 282 score = timestamp if timestamp is not None else current_timestamp() 283 return [as_text(job_id) for job_id in 284 self.connection.zrangebyscore(self.key, 0, score)] 285 286 def get_scheduled_time(self, job_or_id): 287 """Returns datetime (UTC) at which job is scheduled to be enqueued""" 288 if isinstance(job_or_id, self.job_class): 289 job_id = job_or_id.id 290 else: 291 job_id = job_or_id 292 293 score = self.connection.zscore(self.key, job_id) 294 if not score: 295 raise NoSuchJobError 296 297 return datetime.fromtimestamp(score, tz=utc) 298 299 300def clean_registries(queue): 301 """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" 302 registry = FinishedJobRegistry(name=queue.name, 303 connection=queue.connection, 304 job_class=queue.job_class) 305 registry.cleanup() 306 registry = StartedJobRegistry(name=queue.name, 307 connection=queue.connection, 308 job_class=queue.job_class) 309 registry.cleanup() 310 311 registry = FailedJobRegistry(name=queue.name, 312 connection=queue.connection, 313 job_class=queue.job_class) 314 registry.cleanup() 315