1from __future__ import absolute_import 2 3from datetime import datetime 4 5from pytz import utc 6from kazoo.exceptions import NoNodeError, NodeExistsError 7 8from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError 9from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime 10from apscheduler.job import Job 11 12try: 13 import cPickle as pickle 14except ImportError: # pragma: nocover 15 import pickle 16 17try: 18 from kazoo.client import KazooClient 19except ImportError: # pragma: nocover 20 raise ImportError('ZooKeeperJobStore requires Kazoo installed') 21 22 23class ZooKeeperJobStore(BaseJobStore): 24 """ 25 Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to 26 kazoo's `KazooClient 27 <http://kazoo.readthedocs.io/en/latest/api/client.html>`_. 28 29 Plugin alias: ``zookeeper`` 30 31 :param str path: path to store jobs in 32 :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of 33 providing connection arguments 34 :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the 35 highest available 36 """ 37 38 def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False, 39 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): 40 super(ZooKeeperJobStore, self).__init__() 41 self.pickle_protocol = pickle_protocol 42 self.close_connection_on_exit = close_connection_on_exit 43 44 if not path: 45 raise ValueError('The "path" parameter must not be empty') 46 47 self.path = path 48 49 if client: 50 self.client = maybe_ref(client) 51 else: 52 self.client = KazooClient(**connect_args) 53 self._ensured_path = False 54 55 def _ensure_paths(self): 56 if not self._ensured_path: 57 self.client.ensure_path(self.path) 58 self._ensured_path = True 59 60 def start(self, scheduler, alias): 61 super(ZooKeeperJobStore, self).start(scheduler, alias) 62 if not self.client.connected: 63 self.client.start() 64 65 def lookup_job(self, job_id): 66 self._ensure_paths() 67 node_path = self.path + "/" + str(job_id) 68 try: 69 content, _ = self.client.get(node_path) 70 doc = pickle.loads(content) 71 job = self._reconstitute_job(doc['job_state']) 72 return job 73 except BaseException: 74 return None 75 76 def get_due_jobs(self, now): 77 timestamp = datetime_to_utc_timestamp(now) 78 jobs = [job_def['job'] for job_def in self._get_jobs() 79 if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp] 80 return jobs 81 82 def get_next_run_time(self): 83 next_runs = [job_def['next_run_time'] for job_def in self._get_jobs() 84 if job_def['next_run_time'] is not None] 85 return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None 86 87 def get_all_jobs(self): 88 jobs = [job_def['job'] for job_def in self._get_jobs()] 89 self._fix_paused_jobs_sorting(jobs) 90 return jobs 91 92 def add_job(self, job): 93 self._ensure_paths() 94 node_path = self.path + "/" + str(job.id) 95 value = { 96 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 97 'job_state': job.__getstate__() 98 } 99 data = pickle.dumps(value, self.pickle_protocol) 100 try: 101 self.client.create(node_path, value=data) 102 except NodeExistsError: 103 raise ConflictingIdError(job.id) 104 105 def update_job(self, job): 106 self._ensure_paths() 107 node_path = self.path + "/" + str(job.id) 108 changes = { 109 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 110 'job_state': job.__getstate__() 111 } 112 data = pickle.dumps(changes, self.pickle_protocol) 113 try: 114 self.client.set(node_path, value=data) 115 except NoNodeError: 116 raise JobLookupError(job.id) 117 118 def remove_job(self, job_id): 119 self._ensure_paths() 120 node_path = self.path + "/" + str(job_id) 121 try: 122 self.client.delete(node_path) 123 except NoNodeError: 124 raise JobLookupError(job_id) 125 126 def remove_all_jobs(self): 127 try: 128 self.client.delete(self.path, recursive=True) 129 except NoNodeError: 130 pass 131 self._ensured_path = False 132 133 def shutdown(self): 134 if self.close_connection_on_exit: 135 self.client.stop() 136 self.client.close() 137 138 def _reconstitute_job(self, job_state): 139 job_state = job_state 140 job = Job.__new__(Job) 141 job.__setstate__(job_state) 142 job._scheduler = self._scheduler 143 job._jobstore_alias = self._alias 144 return job 145 146 def _get_jobs(self): 147 self._ensure_paths() 148 jobs = [] 149 failed_job_ids = [] 150 all_ids = self.client.get_children(self.path) 151 for node_name in all_ids: 152 try: 153 node_path = self.path + "/" + node_name 154 content, _ = self.client.get(node_path) 155 doc = pickle.loads(content) 156 job_def = { 157 'job_id': node_name, 158 'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None, 159 'job_state': doc['job_state'], 160 'job': self._reconstitute_job(doc['job_state']), 161 'creation_time': _.ctime 162 } 163 jobs.append(job_def) 164 except BaseException: 165 self._logger.exception('Unable to restore job "%s" -- removing it' % node_name) 166 failed_job_ids.append(node_name) 167 168 # Remove all the jobs we failed to restore 169 if failed_job_ids: 170 for failed_id in failed_job_ids: 171 self.remove_job(failed_id) 172 paused_sort_key = datetime(9999, 12, 31, tzinfo=utc) 173 return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key, 174 job_def['creation_time'])) 175 176 def __repr__(self): 177 self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client)) 178 return '<%s (client=%s)>' % (self.__class__.__name__, self.client) 179