1from __future__ import absolute_import 2from datetime import datetime 3 4from pytz import utc 5import six 6 7from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError 8from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime 9from apscheduler.job import Job 10 11try: 12 import cPickle as pickle 13except ImportError: # pragma: nocover 14 import pickle 15 16try: 17 from redis import Redis 18except ImportError: # pragma: nocover 19 raise ImportError('RedisJobStore requires redis installed') 20 21 22class RedisJobStore(BaseJobStore): 23 """ 24 Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's 25 :class:`~redis.StrictRedis`. 26 27 Plugin alias: ``redis`` 28 29 :param int db: the database number to store jobs in 30 :param str jobs_key: key to store jobs in 31 :param str run_times_key: key to store the jobs' run times in 32 :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the 33 highest available 34 """ 35 36 def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', 37 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): 38 super(RedisJobStore, self).__init__() 39 40 if db is None: 41 raise ValueError('The "db" parameter must not be empty') 42 if not jobs_key: 43 raise ValueError('The "jobs_key" parameter must not be empty') 44 if not run_times_key: 45 raise ValueError('The "run_times_key" parameter must not be empty') 46 47 self.pickle_protocol = pickle_protocol 48 self.jobs_key = jobs_key 49 self.run_times_key = run_times_key 50 self.redis = Redis(db=int(db), **connect_args) 51 52 def lookup_job(self, job_id): 53 job_state = self.redis.hget(self.jobs_key, job_id) 54 return self._reconstitute_job(job_state) if job_state else None 55 56 def get_due_jobs(self, now): 57 timestamp = datetime_to_utc_timestamp(now) 58 job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp) 59 if job_ids: 60 job_states = self.redis.hmget(self.jobs_key, *job_ids) 61 return self._reconstitute_jobs(six.moves.zip(job_ids, job_states)) 62 return [] 63 64 def get_next_run_time(self): 65 next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True) 66 if next_run_time: 67 return utc_timestamp_to_datetime(next_run_time[0][1]) 68 69 def get_all_jobs(self): 70 job_states = self.redis.hgetall(self.jobs_key) 71 jobs = self._reconstitute_jobs(six.iteritems(job_states)) 72 paused_sort_key = datetime(9999, 12, 31, tzinfo=utc) 73 return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key) 74 75 def add_job(self, job): 76 if self.redis.hexists(self.jobs_key, job.id): 77 raise ConflictingIdError(job.id) 78 79 with self.redis.pipeline() as pipe: 80 pipe.multi() 81 pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), 82 self.pickle_protocol)) 83 if job.next_run_time: 84 pipe.zadd(self.run_times_key, 85 {job.id: datetime_to_utc_timestamp(job.next_run_time)}) 86 87 pipe.execute() 88 89 def update_job(self, job): 90 if not self.redis.hexists(self.jobs_key, job.id): 91 raise JobLookupError(job.id) 92 93 with self.redis.pipeline() as pipe: 94 pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), 95 self.pickle_protocol)) 96 if job.next_run_time: 97 pipe.zadd(self.run_times_key, 98 {job.id: datetime_to_utc_timestamp(job.next_run_time)}) 99 else: 100 pipe.zrem(self.run_times_key, job.id) 101 102 pipe.execute() 103 104 def remove_job(self, job_id): 105 if not self.redis.hexists(self.jobs_key, job_id): 106 raise JobLookupError(job_id) 107 108 with self.redis.pipeline() as pipe: 109 pipe.hdel(self.jobs_key, job_id) 110 pipe.zrem(self.run_times_key, job_id) 111 pipe.execute() 112 113 def remove_all_jobs(self): 114 with self.redis.pipeline() as pipe: 115 pipe.delete(self.jobs_key) 116 pipe.delete(self.run_times_key) 117 pipe.execute() 118 119 def shutdown(self): 120 self.redis.connection_pool.disconnect() 121 122 def _reconstitute_job(self, job_state): 123 job_state = pickle.loads(job_state) 124 job = Job.__new__(Job) 125 job.__setstate__(job_state) 126 job._scheduler = self._scheduler 127 job._jobstore_alias = self._alias 128 return job 129 130 def _reconstitute_jobs(self, job_states): 131 jobs = [] 132 failed_job_ids = [] 133 for job_id, job_state in job_states: 134 try: 135 jobs.append(self._reconstitute_job(job_state)) 136 except BaseException: 137 self._logger.exception('Unable to restore job "%s" -- removing it', job_id) 138 failed_job_ids.append(job_id) 139 140 # Remove all the jobs we failed to restore 141 if failed_job_ids: 142 with self.redis.pipeline() as pipe: 143 pipe.hdel(self.jobs_key, *failed_job_ids) 144 pipe.zrem(self.run_times_key, *failed_job_ids) 145 pipe.execute() 146 147 return jobs 148 149 def __repr__(self): 150 return '<%s>' % self.__class__.__name__ 151