1from __future__ import absolute_import
2from datetime import datetime
3
4from pytz import utc
5import six
6
7from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
8from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime
9from apscheduler.job import Job
10
11try:
12    import cPickle as pickle
13except ImportError:  # pragma: nocover
14    import pickle
15
16try:
17    from redis import Redis
18except ImportError:  # pragma: nocover
19    raise ImportError('RedisJobStore requires redis installed')
20
21
22class RedisJobStore(BaseJobStore):
23    """
24    Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's
25    :class:`~redis.StrictRedis`.
26
27    Plugin alias: ``redis``
28
29    :param int db: the database number to store jobs in
30    :param str jobs_key: key to store jobs in
31    :param str run_times_key: key to store the jobs' run times in
32    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
33        highest available
34    """
35
36    def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times',
37                 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
38        super(RedisJobStore, self).__init__()
39
40        if db is None:
41            raise ValueError('The "db" parameter must not be empty')
42        if not jobs_key:
43            raise ValueError('The "jobs_key" parameter must not be empty')
44        if not run_times_key:
45            raise ValueError('The "run_times_key" parameter must not be empty')
46
47        self.pickle_protocol = pickle_protocol
48        self.jobs_key = jobs_key
49        self.run_times_key = run_times_key
50        self.redis = Redis(db=int(db), **connect_args)
51
52    def lookup_job(self, job_id):
53        job_state = self.redis.hget(self.jobs_key, job_id)
54        return self._reconstitute_job(job_state) if job_state else None
55
56    def get_due_jobs(self, now):
57        timestamp = datetime_to_utc_timestamp(now)
58        job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
59        if job_ids:
60            job_states = self.redis.hmget(self.jobs_key, *job_ids)
61            return self._reconstitute_jobs(six.moves.zip(job_ids, job_states))
62        return []
63
64    def get_next_run_time(self):
65        next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True)
66        if next_run_time:
67            return utc_timestamp_to_datetime(next_run_time[0][1])
68
69    def get_all_jobs(self):
70        job_states = self.redis.hgetall(self.jobs_key)
71        jobs = self._reconstitute_jobs(six.iteritems(job_states))
72        paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
73        return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key)
74
75    def add_job(self, job):
76        if self.redis.hexists(self.jobs_key, job.id):
77            raise ConflictingIdError(job.id)
78
79        with self.redis.pipeline() as pipe:
80            pipe.multi()
81            pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
82                                                          self.pickle_protocol))
83            if job.next_run_time:
84                pipe.zadd(self.run_times_key,
85                          {job.id: datetime_to_utc_timestamp(job.next_run_time)})
86
87            pipe.execute()
88
89    def update_job(self, job):
90        if not self.redis.hexists(self.jobs_key, job.id):
91            raise JobLookupError(job.id)
92
93        with self.redis.pipeline() as pipe:
94            pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
95                                                          self.pickle_protocol))
96            if job.next_run_time:
97                pipe.zadd(self.run_times_key,
98                          {job.id: datetime_to_utc_timestamp(job.next_run_time)})
99            else:
100                pipe.zrem(self.run_times_key, job.id)
101
102            pipe.execute()
103
104    def remove_job(self, job_id):
105        if not self.redis.hexists(self.jobs_key, job_id):
106            raise JobLookupError(job_id)
107
108        with self.redis.pipeline() as pipe:
109            pipe.hdel(self.jobs_key, job_id)
110            pipe.zrem(self.run_times_key, job_id)
111            pipe.execute()
112
113    def remove_all_jobs(self):
114        with self.redis.pipeline() as pipe:
115            pipe.delete(self.jobs_key)
116            pipe.delete(self.run_times_key)
117            pipe.execute()
118
119    def shutdown(self):
120        self.redis.connection_pool.disconnect()
121
122    def _reconstitute_job(self, job_state):
123        job_state = pickle.loads(job_state)
124        job = Job.__new__(Job)
125        job.__setstate__(job_state)
126        job._scheduler = self._scheduler
127        job._jobstore_alias = self._alias
128        return job
129
130    def _reconstitute_jobs(self, job_states):
131        jobs = []
132        failed_job_ids = []
133        for job_id, job_state in job_states:
134            try:
135                jobs.append(self._reconstitute_job(job_state))
136            except BaseException:
137                self._logger.exception('Unable to restore job "%s" -- removing it', job_id)
138                failed_job_ids.append(job_id)
139
140        # Remove all the jobs we failed to restore
141        if failed_job_ids:
142            with self.redis.pipeline() as pipe:
143                pipe.hdel(self.jobs_key, *failed_job_ids)
144                pipe.zrem(self.run_times_key, *failed_job_ids)
145                pipe.execute()
146
147        return jobs
148
149    def __repr__(self):
150        return '<%s>' % self.__class__.__name__
151