1import logging 2import signal 3import time 4import os 5import socket 6from uuid import uuid4 7 8from datetime import datetime 9from itertools import repeat 10 11from rq.exceptions import NoSuchJobError 12from rq.job import Job 13from rq.queue import Queue 14from rq.utils import backend_class, import_attribute 15from rq.compat import string_types 16 17from redis import WatchError 18 19from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until 20 21logger = logging.getLogger(__name__) 22 23 24class Scheduler(object): 25 redis_scheduler_namespace_prefix = 'rq:scheduler_instance:' 26 scheduler_key = 'rq:scheduler' 27 scheduler_lock_key = 'rq:scheduler_lock' 28 scheduled_jobs_key = 'rq:scheduler:scheduled_jobs' 29 queue_class = Queue 30 job_class = Job 31 32 def __init__(self, queue_name='default', queue=None, interval=60, connection=None, 33 job_class=None, queue_class=None, name=None): 34 from rq.connections import resolve_connection 35 self.connection = resolve_connection(connection) 36 self._queue = queue 37 if self._queue is None: 38 self.queue_name = queue_name 39 else: 40 self.queue_name = self._queue.name 41 self._interval = interval 42 self.log = logger 43 self._lock_acquired = False 44 self.job_class = backend_class(self, 'job_class', override=job_class) 45 self.queue_class_name = None 46 if isinstance(queue_class, string_types): 47 self.queue_class_name = queue_class 48 self.queue_class = backend_class(self, 'queue_class', override=queue_class) 49 self.name = name or uuid4().hex 50 51 @property 52 def key(self): 53 """Returns the schedulers Redis hash key.""" 54 return self.redis_scheduler_namespace_prefix + self.name 55 56 @property 57 def pid(self): 58 """The current process ID.""" 59 return os.getpid() 60 61 def register_birth(self): 62 self.log.info('Registering birth') 63 if self.connection.exists(self.key) and \ 64 not self.connection.hexists(self.key, 'death'): 65 raise ValueError("There's already an active RQ scheduler named: {0!r}".format(self.name)) 66 67 key = self.key 68 now = time.time() 69 70 with self.connection.pipeline() as p: 71 p.delete(key) 72 p.hset(key, 'birth', now) 73 # Set scheduler key to expire a few seconds after polling interval 74 # This way, the key will automatically expire if scheduler 75 # quits unexpectedly 76 p.expire(key, int(self._interval) + 10) 77 p.execute() 78 79 def register_death(self): 80 """Registers its own death.""" 81 self.log.info('Registering death') 82 with self.connection.pipeline() as p: 83 p.hset(self.key, 'death', time.time()) 84 p.expire(self.key, 60) 85 p.execute() 86 87 def acquire_lock(self): 88 """ 89 Acquire lock before scheduling jobs to prevent another scheduler 90 from scheduling jobs at the same time. 91 92 This function returns True if a lock is acquired. False otherwise. 93 """ 94 key = self.scheduler_lock_key 95 now = time.time() 96 expires = int(self._interval) + 10 97 self._lock_acquired = self.connection.set( 98 key, now, ex=expires, nx=True) 99 return self._lock_acquired 100 101 def remove_lock(self): 102 """ 103 Remove acquired lock. 104 """ 105 key = self.scheduler_lock_key 106 107 if self._lock_acquired: 108 self.connection.delete(key) 109 self._lock_acquired = False 110 self.log.debug('{}: Lock Removed'.format(self.key)) 111 112 def _install_signal_handlers(self): 113 """ 114 Installs signal handlers for handling SIGINT and SIGTERM 115 gracefully. 116 """ 117 118 def stop(signum, frame): 119 """ 120 Register scheduler's death and exit 121 and remove previously acquired lock and exit. 122 """ 123 self.log.info('Shutting down RQ scheduler...') 124 self.register_death() 125 self.remove_lock() 126 raise SystemExit() 127 128 signal.signal(signal.SIGINT, stop) 129 signal.signal(signal.SIGTERM, stop) 130 131 def _create_job(self, func, args=None, kwargs=None, commit=True, 132 result_ttl=None, ttl=None, id=None, description=None, 133 queue_name=None, timeout=None, meta=None, depends_on=None): 134 """ 135 Creates an RQ job and saves it to Redis. The job is assigned to the 136 given queue name if not None else it is assigned to scheduler queue by 137 default. 138 """ 139 if args is None: 140 args = () 141 if kwargs is None: 142 kwargs = {} 143 job = self.job_class.create( 144 func, args=args, connection=self.connection, 145 kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id, 146 description=description, timeout=timeout, meta=meta, depends_on=depends_on) 147 if queue_name: 148 job.origin = queue_name 149 else: 150 job.origin = self.queue_name 151 152 if self.queue_class_name: 153 job.meta["queue_class_name"] = self.queue_class_name 154 155 if commit: 156 job.save() 157 return job 158 159 def enqueue_at(self, scheduled_time, func, *args, **kwargs): 160 """ 161 Pushes a job to the scheduler queue. The scheduled queue is a Redis sorted 162 set ordered by timestamp - which in this case is job's scheduled execution time. 163 164 All args and kwargs are passed onto the job, except for the following kwarg 165 keys (which affect the job creation itself): 166 - timeout 167 - job_id 168 - job_ttl 169 - job_result_ttl 170 - job_description 171 - depends_on 172 - meta 173 - queue_name 174 175 Usage: 176 177 from datetime import datetime 178 from redis import Redis 179 from rq.scheduler import Scheduler 180 181 from foo import func 182 183 redis = Redis() 184 scheduler = Scheduler(queue_name='default', connection=redis) 185 scheduler.enqueue_at(datetime(2020, 1, 1), func, 'argument', keyword='argument') 186 """ 187 timeout = kwargs.pop('timeout', None) 188 job_id = kwargs.pop('job_id', None) 189 job_ttl = kwargs.pop('job_ttl', None) 190 job_result_ttl = kwargs.pop('job_result_ttl', None) 191 job_description = kwargs.pop('job_description', None) 192 depends_on = kwargs.pop('depends_on', None) 193 meta = kwargs.pop('meta', None) 194 queue_name = kwargs.pop('queue_name', None) 195 196 job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, 197 id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, 198 description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) 199 self.connection.zadd(self.scheduled_jobs_key, 200 {job.id: to_unix(scheduled_time)}) 201 return job 202 203 def enqueue_in(self, time_delta, func, *args, **kwargs): 204 """ 205 Similar to ``enqueue_at``, but accepts a timedelta instead of datetime object. 206 The job's scheduled execution time will be calculated by adding the timedelta 207 to datetime.utcnow(). 208 """ 209 timeout = kwargs.pop('timeout', None) 210 job_id = kwargs.pop('job_id', None) 211 job_ttl = kwargs.pop('job_ttl', None) 212 job_result_ttl = kwargs.pop('job_result_ttl', None) 213 job_description = kwargs.pop('job_description', None) 214 depends_on = kwargs.pop('depends_on', None) 215 meta = kwargs.pop('meta', None) 216 queue_name = kwargs.pop('queue_name', None) 217 218 job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, 219 id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, 220 description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) 221 self.connection.zadd(self.scheduled_jobs_key, 222 {job.id: to_unix(datetime.utcnow() + time_delta)}) 223 return job 224 225 def schedule(self, scheduled_time, func, args=None, kwargs=None, 226 interval=None, repeat=None, result_ttl=None, ttl=None, 227 timeout=None, id=None, description=None, 228 queue_name=None, meta=None, depends_on=None): 229 """ 230 Schedule a job to be periodically executed, at a certain interval. 231 """ 232 # Set result_ttl to -1 for periodic jobs, if result_ttl not specified 233 if interval is not None and result_ttl is None: 234 result_ttl = -1 235 job = self._create_job(func, args=args, kwargs=kwargs, commit=False, 236 result_ttl=result_ttl, ttl=ttl, id=id, 237 description=description, queue_name=queue_name, 238 timeout=timeout, meta=meta, depends_on=depends_on) 239 240 if interval is not None: 241 job.meta['interval'] = int(interval) 242 if repeat is not None: 243 job.meta['repeat'] = int(repeat) 244 if repeat and interval is None: 245 raise ValueError("Can't repeat a job without interval argument") 246 job.save() 247 self.connection.zadd(self.scheduled_jobs_key, 248 {job.id: to_unix(scheduled_time)}) 249 return job 250 251 def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, 252 queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, depends_on=None): 253 """ 254 Schedule a cronjob 255 """ 256 scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone) 257 258 # Set result_ttl to -1, as jobs scheduled via cron are periodic ones. 259 # Otherwise the job would expire after 500 sec. 260 job = self._create_job(func, args=args, kwargs=kwargs, commit=False, 261 result_ttl=-1, id=id, queue_name=queue_name, 262 description=description, timeout=timeout, meta=meta, depends_on=depends_on) 263 264 job.meta['cron_string'] = cron_string 265 job.meta['use_local_timezone'] = use_local_timezone 266 267 if repeat is not None: 268 job.meta['repeat'] = int(repeat) 269 270 job.save() 271 272 self.connection.zadd(self.scheduled_jobs_key, 273 {job.id: to_unix(scheduled_time)}) 274 return job 275 276 def cancel(self, job): 277 """ 278 Pulls a job from the scheduler queue. This function accepts either a 279 job_id or a job instance. 280 """ 281 if isinstance(job, self.job_class): 282 self.connection.zrem(self.scheduled_jobs_key, job.id) 283 else: 284 self.connection.zrem(self.scheduled_jobs_key, job) 285 286 def __contains__(self, item): 287 """ 288 Returns a boolean indicating whether the given job instance or job id 289 is scheduled for execution. 290 """ 291 job_id = item 292 if isinstance(item, self.job_class): 293 job_id = item.id 294 return self.connection.zscore(self.scheduled_jobs_key, job_id) is not None 295 296 def change_execution_time(self, job, date_time): 297 """ 298 Change a job's execution time. 299 """ 300 with self.connection.pipeline() as pipe: 301 while 1: 302 try: 303 pipe.watch(self.scheduled_jobs_key) 304 if pipe.zscore(self.scheduled_jobs_key, job.id) is None: 305 raise ValueError('Job not in scheduled jobs queue') 306 pipe.zadd(self.scheduled_jobs_key, {job.id: to_unix(date_time)}) 307 break 308 except WatchError: 309 # If job is still in the queue, retry otherwise job is already executed 310 # so we raise an error 311 if pipe.zscore(self.scheduled_jobs_key, job.id) is None: 312 raise ValueError('Job not in scheduled jobs queue') 313 continue 314 315 def count(self, until=None): 316 """ 317 Returns the total number of jobs that are scheduled for all queues. 318 This function accepts datetime, timedelta instances as well as 319 integers representing epoch values. 320 """ 321 322 until = rationalize_until(until) 323 return self.connection.zcount(self.scheduled_jobs_key, 0, until) 324 325 def get_jobs(self, until=None, with_times=False, offset=None, length=None): 326 """ 327 Returns a iterator of job instances that will be queued until the given 328 time. If no 'until' argument is given all jobs are returned. 329 330 If with_times is True, a list of tuples consisting of the job instance 331 and it's scheduled execution time is returned. 332 333 If offset and length are specified, a slice of the list starting at the 334 specified zero-based offset of the specified length will be returned. 335 336 If either of offset or length is specified, then both must be, or 337 an exception will be raised. 338 """ 339 def epoch_to_datetime(epoch): 340 return from_unix(float(epoch)) 341 342 until = rationalize_until(until) 343 job_ids = self.connection.zrangebyscore(self.scheduled_jobs_key, 0, 344 until, withscores=with_times, 345 score_cast_func=epoch_to_datetime, 346 start=offset, num=length) 347 if not with_times: 348 job_ids = zip(job_ids, repeat(None)) 349 for job_id, sched_time in job_ids: 350 job_id = job_id.decode('utf-8') 351 try: 352 job = self.job_class.fetch(job_id, connection=self.connection) 353 except NoSuchJobError: 354 # Delete jobs that aren't there from scheduler 355 self.cancel(job_id) 356 continue 357 if with_times: 358 yield (job, sched_time) 359 else: 360 yield job 361 362 def get_jobs_to_queue(self, with_times=False): 363 """ 364 Returns a list of job instances that should be queued 365 (score lower than current timestamp). 366 If with_times is True a list of tuples consisting of the job instance and 367 it's scheduled execution time is returned. 368 """ 369 return self.get_jobs(to_unix(datetime.utcnow()), with_times=with_times) 370 371 def get_queue_for_job(self, job): 372 """ 373 Returns a queue to put job into. 374 """ 375 key = '{0}{1}'.format(self.queue_class.redis_queue_namespace_prefix, 376 job.origin) 377 if job.meta.get('queue_class_name'): 378 queue_class = import_attribute(job.meta['queue_class_name']) 379 else: 380 queue_class = self.queue_class 381 return queue_class.from_queue_key(key, connection=self.connection, job_class=self.job_class) 382 383 def enqueue_job(self, job): 384 """ 385 Move a scheduled job to a queue. In addition, it also does puts the job 386 back into the scheduler if needed. 387 """ 388 self.log.debug('Pushing {0}({1}) to {2}'.format(job.func_name, job.id, job.origin)) 389 390 interval = job.meta.get('interval', None) 391 repeat = job.meta.get('repeat', None) 392 cron_string = job.meta.get('cron_string', None) 393 use_local_timezone = job.meta.get('use_local_timezone', None) 394 395 # If job is a repeated job, decrement counter 396 if repeat: 397 job.meta['repeat'] = int(repeat) - 1 398 399 queue = self.get_queue_for_job(job) 400 queue.enqueue_job(job) 401 self.connection.zrem(self.scheduled_jobs_key, job.id) 402 403 if interval: 404 # If this is a repeat job and counter has reached 0, don't repeat 405 if repeat is not None: 406 if job.meta['repeat'] == 0: 407 return 408 self.connection.zadd(self.scheduled_jobs_key, 409 {job.id: to_unix(datetime.utcnow()) + int(interval)}) 410 elif cron_string: 411 # If this is a repeat job and counter has reached 0, don't repeat 412 if repeat is not None: 413 if job.meta['repeat'] == 0: 414 return 415 self.connection.zadd(self.scheduled_jobs_key, 416 {job.id: to_unix(get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone))}) 417 418 def enqueue_jobs(self): 419 """ 420 Move scheduled jobs into queues. 421 """ 422 self.log.debug('Checking for scheduled jobs') 423 424 jobs = self.get_jobs_to_queue() 425 for job in jobs: 426 self.enqueue_job(job) 427 428 return jobs 429 430 def heartbeat(self): 431 """Refreshes schedulers key, typically by extending the 432 expiration time of the scheduler, effectively making this a "heartbeat" 433 to not expire the scheduler until the timeout passes. 434 """ 435 self.log.debug('{}: Sending a HeartBeat'.format(self.key)) 436 self.connection.expire(self.key, int(self._interval) + 10) 437 438 def run(self, burst=False): 439 """ 440 Periodically check whether there's any job that should be put in the queue (score 441 lower than current time). 442 """ 443 444 self.register_birth() 445 self._install_signal_handlers() 446 447 try: 448 while True: 449 self.log.debug("Entering run loop") 450 self.heartbeat() 451 452 start_time = time.time() 453 if self.acquire_lock(): 454 self.log.debug('{}: Acquired Lock'.format(self.key)) 455 self.enqueue_jobs() 456 self.heartbeat() 457 self.remove_lock() 458 459 if burst: 460 self.log.info('RQ scheduler done, quitting') 461 break 462 else: 463 self.log.warning('Lock already taken - skipping run') 464 465 # Time has already elapsed while enqueuing jobs, so don't wait too long. 466 seconds_elapsed_since_start = time.time() - start_time 467 seconds_until_next_scheduled_run = self._interval - seconds_elapsed_since_start 468 # ensure we have a non-negative number 469 if seconds_until_next_scheduled_run > 0: 470 self.log.debug("Sleeping %.2f seconds" % seconds_until_next_scheduled_run) 471 time.sleep(seconds_until_next_scheduled_run) 472 finally: 473 self.remove_lock() 474 self.register_death() 475