1import logging
2import signal
3import time
4import os
5import socket
6from uuid import uuid4
7
8from datetime import datetime
9from itertools import repeat
10
11from rq.exceptions import NoSuchJobError
12from rq.job import Job
13from rq.queue import Queue
14from rq.utils import backend_class, import_attribute
15from rq.compat import string_types
16
17from redis import WatchError
18
19from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until
20
21logger = logging.getLogger(__name__)
22
23
24class Scheduler(object):
25    redis_scheduler_namespace_prefix = 'rq:scheduler_instance:'
26    scheduler_key = 'rq:scheduler'
27    scheduler_lock_key = 'rq:scheduler_lock'
28    scheduled_jobs_key = 'rq:scheduler:scheduled_jobs'
29    queue_class = Queue
30    job_class = Job
31
32    def __init__(self, queue_name='default', queue=None, interval=60, connection=None,
33                 job_class=None, queue_class=None, name=None):
34        from rq.connections import resolve_connection
35        self.connection = resolve_connection(connection)
36        self._queue = queue
37        if self._queue is None:
38            self.queue_name = queue_name
39        else:
40            self.queue_name = self._queue.name
41        self._interval = interval
42        self.log = logger
43        self._lock_acquired = False
44        self.job_class = backend_class(self, 'job_class', override=job_class)
45        self.queue_class_name = None
46        if isinstance(queue_class, string_types):
47            self.queue_class_name = queue_class
48        self.queue_class = backend_class(self, 'queue_class', override=queue_class)
49        self.name = name or uuid4().hex
50
51    @property
52    def key(self):
53        """Returns the schedulers Redis hash key."""
54        return self.redis_scheduler_namespace_prefix + self.name
55
56    @property
57    def pid(self):
58        """The current process ID."""
59        return os.getpid()
60
61    def register_birth(self):
62        self.log.info('Registering birth')
63        if self.connection.exists(self.key) and \
64                not self.connection.hexists(self.key, 'death'):
65            raise ValueError("There's already an active RQ scheduler named: {0!r}".format(self.name))
66
67        key = self.key
68        now = time.time()
69
70        with self.connection.pipeline() as p:
71            p.delete(key)
72            p.hset(key, 'birth', now)
73            # Set scheduler key to expire a few seconds after polling interval
74            # This way, the key will automatically expire if scheduler
75            # quits unexpectedly
76            p.expire(key, int(self._interval) + 10)
77            p.execute()
78
79    def register_death(self):
80        """Registers its own death."""
81        self.log.info('Registering death')
82        with self.connection.pipeline() as p:
83            p.hset(self.key, 'death', time.time())
84            p.expire(self.key, 60)
85            p.execute()
86
87    def acquire_lock(self):
88        """
89        Acquire lock before scheduling jobs to prevent another scheduler
90        from scheduling jobs at the same time.
91
92        This function returns True if a lock is acquired. False otherwise.
93        """
94        key = self.scheduler_lock_key
95        now = time.time()
96        expires = int(self._interval) + 10
97        self._lock_acquired = self.connection.set(
98                key, now, ex=expires, nx=True)
99        return self._lock_acquired
100
101    def remove_lock(self):
102        """
103        Remove acquired lock.
104        """
105        key = self.scheduler_lock_key
106
107        if self._lock_acquired:
108            self.connection.delete(key)
109            self._lock_acquired = False
110            self.log.debug('{}: Lock Removed'.format(self.key))
111
112    def _install_signal_handlers(self):
113        """
114        Installs signal handlers for handling SIGINT and SIGTERM
115        gracefully.
116        """
117
118        def stop(signum, frame):
119            """
120            Register scheduler's death and exit
121            and remove previously acquired lock and exit.
122            """
123            self.log.info('Shutting down RQ scheduler...')
124            self.register_death()
125            self.remove_lock()
126            raise SystemExit()
127
128        signal.signal(signal.SIGINT, stop)
129        signal.signal(signal.SIGTERM, stop)
130
131    def _create_job(self, func, args=None, kwargs=None, commit=True,
132                    result_ttl=None, ttl=None, id=None, description=None,
133                    queue_name=None, timeout=None, meta=None, depends_on=None):
134        """
135        Creates an RQ job and saves it to Redis. The job is assigned to the
136        given queue name if not None else it is assigned to scheduler queue by
137        default.
138        """
139        if args is None:
140            args = ()
141        if kwargs is None:
142            kwargs = {}
143        job = self.job_class.create(
144                func, args=args, connection=self.connection,
145                kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id,
146                description=description, timeout=timeout, meta=meta, depends_on=depends_on)
147        if queue_name:
148            job.origin = queue_name
149        else:
150            job.origin = self.queue_name
151
152        if self.queue_class_name:
153            job.meta["queue_class_name"] = self.queue_class_name
154
155        if commit:
156            job.save()
157        return job
158
159    def enqueue_at(self, scheduled_time, func, *args, **kwargs):
160        """
161        Pushes a job to the scheduler queue. The scheduled queue is a Redis sorted
162        set ordered by timestamp - which in this case is job's scheduled execution time.
163
164        All args and kwargs are passed onto the job, except for the following kwarg
165        keys (which affect the job creation itself):
166        - timeout
167        - job_id
168        - job_ttl
169        - job_result_ttl
170        - job_description
171        - depends_on
172        - meta
173        - queue_name
174
175        Usage:
176
177        from datetime import datetime
178        from redis import Redis
179        from rq.scheduler import Scheduler
180
181        from foo import func
182
183        redis = Redis()
184        scheduler = Scheduler(queue_name='default', connection=redis)
185        scheduler.enqueue_at(datetime(2020, 1, 1), func, 'argument', keyword='argument')
186        """
187        timeout = kwargs.pop('timeout', None)
188        job_id = kwargs.pop('job_id', None)
189        job_ttl = kwargs.pop('job_ttl', None)
190        job_result_ttl = kwargs.pop('job_result_ttl', None)
191        job_description = kwargs.pop('job_description', None)
192        depends_on = kwargs.pop('depends_on', None)
193        meta = kwargs.pop('meta', None)
194        queue_name = kwargs.pop('queue_name', None)
195
196        job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout,
197                               id=job_id, result_ttl=job_result_ttl, ttl=job_ttl,
198                               description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on)
199        self.connection.zadd(self.scheduled_jobs_key,
200                              {job.id: to_unix(scheduled_time)})
201        return job
202
203    def enqueue_in(self, time_delta, func, *args, **kwargs):
204        """
205        Similar to ``enqueue_at``, but accepts a timedelta instead of datetime object.
206        The job's scheduled execution time will be calculated by adding the timedelta
207        to datetime.utcnow().
208        """
209        timeout = kwargs.pop('timeout', None)
210        job_id = kwargs.pop('job_id', None)
211        job_ttl = kwargs.pop('job_ttl', None)
212        job_result_ttl = kwargs.pop('job_result_ttl', None)
213        job_description = kwargs.pop('job_description', None)
214        depends_on = kwargs.pop('depends_on', None)
215        meta = kwargs.pop('meta', None)
216        queue_name = kwargs.pop('queue_name', None)
217
218        job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout,
219                               id=job_id, result_ttl=job_result_ttl, ttl=job_ttl,
220                               description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on)
221        self.connection.zadd(self.scheduled_jobs_key,
222                              {job.id: to_unix(datetime.utcnow() + time_delta)})
223        return job
224
225    def schedule(self, scheduled_time, func, args=None, kwargs=None,
226                 interval=None, repeat=None, result_ttl=None, ttl=None,
227                 timeout=None, id=None, description=None,
228                 queue_name=None, meta=None, depends_on=None):
229        """
230        Schedule a job to be periodically executed, at a certain interval.
231        """
232        # Set result_ttl to -1 for periodic jobs, if result_ttl not specified
233        if interval is not None and result_ttl is None:
234            result_ttl = -1
235        job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
236                               result_ttl=result_ttl, ttl=ttl, id=id,
237                               description=description, queue_name=queue_name,
238                               timeout=timeout, meta=meta, depends_on=depends_on)
239
240        if interval is not None:
241            job.meta['interval'] = int(interval)
242        if repeat is not None:
243            job.meta['repeat'] = int(repeat)
244        if repeat and interval is None:
245            raise ValueError("Can't repeat a job without interval argument")
246        job.save()
247        self.connection.zadd(self.scheduled_jobs_key,
248                              {job.id: to_unix(scheduled_time)})
249        return job
250
251    def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
252             queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, depends_on=None):
253        """
254        Schedule a cronjob
255        """
256        scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone)
257
258        # Set result_ttl to -1, as jobs scheduled via cron are periodic ones.
259        # Otherwise the job would expire after 500 sec.
260        job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
261                               result_ttl=-1, id=id, queue_name=queue_name,
262                               description=description, timeout=timeout, meta=meta, depends_on=depends_on)
263
264        job.meta['cron_string'] = cron_string
265        job.meta['use_local_timezone'] = use_local_timezone
266
267        if repeat is not None:
268            job.meta['repeat'] = int(repeat)
269
270        job.save()
271
272        self.connection.zadd(self.scheduled_jobs_key,
273                              {job.id: to_unix(scheduled_time)})
274        return job
275
276    def cancel(self, job):
277        """
278        Pulls a job from the scheduler queue. This function accepts either a
279        job_id or a job instance.
280        """
281        if isinstance(job, self.job_class):
282            self.connection.zrem(self.scheduled_jobs_key, job.id)
283        else:
284            self.connection.zrem(self.scheduled_jobs_key, job)
285
286    def __contains__(self, item):
287        """
288        Returns a boolean indicating whether the given job instance or job id
289        is scheduled for execution.
290        """
291        job_id = item
292        if isinstance(item, self.job_class):
293            job_id = item.id
294        return self.connection.zscore(self.scheduled_jobs_key, job_id) is not None
295
296    def change_execution_time(self, job, date_time):
297        """
298        Change a job's execution time.
299        """
300        with self.connection.pipeline() as pipe:
301            while 1:
302                try:
303                    pipe.watch(self.scheduled_jobs_key)
304                    if pipe.zscore(self.scheduled_jobs_key, job.id) is None:
305                        raise ValueError('Job not in scheduled jobs queue')
306                    pipe.zadd(self.scheduled_jobs_key, {job.id: to_unix(date_time)})
307                    break
308                except WatchError:
309                    # If job is still in the queue, retry otherwise job is already executed
310                    # so we raise an error
311                    if pipe.zscore(self.scheduled_jobs_key, job.id) is None:
312                        raise ValueError('Job not in scheduled jobs queue')
313                    continue
314
315    def count(self, until=None):
316        """
317        Returns the total number of jobs that are scheduled for all queues.
318        This function accepts datetime, timedelta instances as well as
319        integers representing epoch values.
320        """
321
322        until = rationalize_until(until)
323        return self.connection.zcount(self.scheduled_jobs_key, 0, until)
324
325    def get_jobs(self, until=None, with_times=False, offset=None, length=None):
326        """
327        Returns a iterator of job instances that will be queued until the given
328        time. If no 'until' argument is given all jobs are returned.
329
330        If with_times is True, a list of tuples consisting of the job instance
331        and it's scheduled execution time is returned.
332
333        If offset and length are specified, a slice of the list starting at the
334        specified zero-based offset of the specified length will be returned.
335
336        If either of offset or length is specified, then both must be, or
337        an exception will be raised.
338        """
339        def epoch_to_datetime(epoch):
340            return from_unix(float(epoch))
341
342        until = rationalize_until(until)
343        job_ids = self.connection.zrangebyscore(self.scheduled_jobs_key, 0,
344                                                until, withscores=with_times,
345                                                score_cast_func=epoch_to_datetime,
346                                                start=offset, num=length)
347        if not with_times:
348            job_ids = zip(job_ids, repeat(None))
349        for job_id, sched_time in job_ids:
350            job_id = job_id.decode('utf-8')
351            try:
352                job = self.job_class.fetch(job_id, connection=self.connection)
353            except NoSuchJobError:
354                # Delete jobs that aren't there from scheduler
355                self.cancel(job_id)
356                continue
357            if with_times:
358                yield (job, sched_time)
359            else:
360                yield job
361
362    def get_jobs_to_queue(self, with_times=False):
363        """
364        Returns a list of job instances that should be queued
365        (score lower than current timestamp).
366        If with_times is True a list of tuples consisting of the job instance and
367        it's scheduled execution time is returned.
368        """
369        return self.get_jobs(to_unix(datetime.utcnow()), with_times=with_times)
370
371    def get_queue_for_job(self, job):
372        """
373        Returns a queue to put job into.
374        """
375        key = '{0}{1}'.format(self.queue_class.redis_queue_namespace_prefix,
376                              job.origin)
377        if job.meta.get('queue_class_name'):
378            queue_class = import_attribute(job.meta['queue_class_name'])
379        else:
380            queue_class = self.queue_class
381        return queue_class.from_queue_key(key, connection=self.connection, job_class=self.job_class)
382
383    def enqueue_job(self, job):
384        """
385        Move a scheduled job to a queue. In addition, it also does puts the job
386        back into the scheduler if needed.
387        """
388        self.log.debug('Pushing {0}({1}) to {2}'.format(job.func_name, job.id, job.origin))
389
390        interval = job.meta.get('interval', None)
391        repeat = job.meta.get('repeat', None)
392        cron_string = job.meta.get('cron_string', None)
393        use_local_timezone = job.meta.get('use_local_timezone', None)
394
395        # If job is a repeated job, decrement counter
396        if repeat:
397            job.meta['repeat'] = int(repeat) - 1
398
399        queue = self.get_queue_for_job(job)
400        queue.enqueue_job(job)
401        self.connection.zrem(self.scheduled_jobs_key, job.id)
402
403        if interval:
404            # If this is a repeat job and counter has reached 0, don't repeat
405            if repeat is not None:
406                if job.meta['repeat'] == 0:
407                    return
408            self.connection.zadd(self.scheduled_jobs_key,
409                                  {job.id: to_unix(datetime.utcnow()) + int(interval)})
410        elif cron_string:
411            # If this is a repeat job and counter has reached 0, don't repeat
412            if repeat is not None:
413                if job.meta['repeat'] == 0:
414                    return
415            self.connection.zadd(self.scheduled_jobs_key,
416                                  {job.id: to_unix(get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone))})
417
418    def enqueue_jobs(self):
419        """
420        Move scheduled jobs into queues.
421        """
422        self.log.debug('Checking for scheduled jobs')
423
424        jobs = self.get_jobs_to_queue()
425        for job in jobs:
426            self.enqueue_job(job)
427
428        return jobs
429
430    def heartbeat(self):
431        """Refreshes schedulers key, typically by extending the
432        expiration time of the scheduler, effectively making this a "heartbeat"
433        to not expire the scheduler until the timeout passes.
434        """
435        self.log.debug('{}: Sending a HeartBeat'.format(self.key))
436        self.connection.expire(self.key, int(self._interval) + 10)
437
438    def run(self, burst=False):
439        """
440        Periodically check whether there's any job that should be put in the queue (score
441        lower than current time).
442        """
443
444        self.register_birth()
445        self._install_signal_handlers()
446
447        try:
448            while True:
449                self.log.debug("Entering run loop")
450                self.heartbeat()
451
452                start_time = time.time()
453                if self.acquire_lock():
454                    self.log.debug('{}: Acquired Lock'.format(self.key))
455                    self.enqueue_jobs()
456                    self.heartbeat()
457                    self.remove_lock()
458
459                    if burst:
460                        self.log.info('RQ scheduler done, quitting')
461                        break
462                else:
463                    self.log.warning('Lock already taken - skipping run')
464
465                # Time has already elapsed while enqueuing jobs, so don't wait too long.
466                seconds_elapsed_since_start = time.time() - start_time
467                seconds_until_next_scheduled_run = self._interval - seconds_elapsed_since_start
468                # ensure we have a non-negative number
469                if seconds_until_next_scheduled_run > 0:
470                    self.log.debug("Sleeping %.2f seconds" % seconds_until_next_scheduled_run)
471                    time.sleep(seconds_until_next_scheduled_run)
472        finally:
473            self.remove_lock()
474            self.register_death()
475