1from __future__ import absolute_import
2
3from datetime import datetime
4
5from pytz import utc
6from kazoo.exceptions import NoNodeError, NodeExistsError
7
8from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
9from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
10from apscheduler.job import Job
11
12try:
13    import cPickle as pickle
14except ImportError:  # pragma: nocover
15    import pickle
16
17try:
18    from kazoo.client import KazooClient
19except ImportError:  # pragma: nocover
20    raise ImportError('ZooKeeperJobStore requires Kazoo installed')
21
22
23class ZooKeeperJobStore(BaseJobStore):
24    """
25    Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to
26    kazoo's `KazooClient
27    <http://kazoo.readthedocs.io/en/latest/api/client.html>`_.
28
29    Plugin alias: ``zookeeper``
30
31    :param str path: path to store jobs in
32    :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
33        providing connection arguments
34    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
35        highest available
36    """
37
38    def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False,
39                 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
40        super(ZooKeeperJobStore, self).__init__()
41        self.pickle_protocol = pickle_protocol
42        self.close_connection_on_exit = close_connection_on_exit
43
44        if not path:
45            raise ValueError('The "path" parameter must not be empty')
46
47        self.path = path
48
49        if client:
50            self.client = maybe_ref(client)
51        else:
52            self.client = KazooClient(**connect_args)
53        self._ensured_path = False
54
55    def _ensure_paths(self):
56        if not self._ensured_path:
57            self.client.ensure_path(self.path)
58        self._ensured_path = True
59
60    def start(self, scheduler, alias):
61        super(ZooKeeperJobStore, self).start(scheduler, alias)
62        if not self.client.connected:
63            self.client.start()
64
65    def lookup_job(self, job_id):
66        self._ensure_paths()
67        node_path = self.path + "/" + str(job_id)
68        try:
69            content, _ = self.client.get(node_path)
70            doc = pickle.loads(content)
71            job = self._reconstitute_job(doc['job_state'])
72            return job
73        except BaseException:
74            return None
75
76    def get_due_jobs(self, now):
77        timestamp = datetime_to_utc_timestamp(now)
78        jobs = [job_def['job'] for job_def in self._get_jobs()
79                if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp]
80        return jobs
81
82    def get_next_run_time(self):
83        next_runs = [job_def['next_run_time'] for job_def in self._get_jobs()
84                     if job_def['next_run_time'] is not None]
85        return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
86
87    def get_all_jobs(self):
88        jobs = [job_def['job'] for job_def in self._get_jobs()]
89        self._fix_paused_jobs_sorting(jobs)
90        return jobs
91
92    def add_job(self, job):
93        self._ensure_paths()
94        node_path = self.path + "/" + str(job.id)
95        value = {
96            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
97            'job_state': job.__getstate__()
98        }
99        data = pickle.dumps(value, self.pickle_protocol)
100        try:
101            self.client.create(node_path, value=data)
102        except NodeExistsError:
103            raise ConflictingIdError(job.id)
104
105    def update_job(self, job):
106        self._ensure_paths()
107        node_path = self.path + "/" + str(job.id)
108        changes = {
109            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
110            'job_state': job.__getstate__()
111        }
112        data = pickle.dumps(changes, self.pickle_protocol)
113        try:
114            self.client.set(node_path, value=data)
115        except NoNodeError:
116            raise JobLookupError(job.id)
117
118    def remove_job(self, job_id):
119        self._ensure_paths()
120        node_path = self.path + "/" + str(job_id)
121        try:
122            self.client.delete(node_path)
123        except NoNodeError:
124            raise JobLookupError(job_id)
125
126    def remove_all_jobs(self):
127        try:
128            self.client.delete(self.path, recursive=True)
129        except NoNodeError:
130            pass
131        self._ensured_path = False
132
133    def shutdown(self):
134        if self.close_connection_on_exit:
135            self.client.stop()
136            self.client.close()
137
138    def _reconstitute_job(self, job_state):
139        job_state = job_state
140        job = Job.__new__(Job)
141        job.__setstate__(job_state)
142        job._scheduler = self._scheduler
143        job._jobstore_alias = self._alias
144        return job
145
146    def _get_jobs(self):
147        self._ensure_paths()
148        jobs = []
149        failed_job_ids = []
150        all_ids = self.client.get_children(self.path)
151        for node_name in all_ids:
152            try:
153                node_path = self.path + "/" + node_name
154                content, _ = self.client.get(node_path)
155                doc = pickle.loads(content)
156                job_def = {
157                    'job_id': node_name,
158                    'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None,
159                    'job_state': doc['job_state'],
160                    'job': self._reconstitute_job(doc['job_state']),
161                    'creation_time': _.ctime
162                }
163                jobs.append(job_def)
164            except BaseException:
165                self._logger.exception('Unable to restore job "%s" -- removing it' % node_name)
166                failed_job_ids.append(node_name)
167
168        # Remove all the jobs we failed to restore
169        if failed_job_ids:
170            for failed_id in failed_job_ids:
171                self.remove_job(failed_id)
172        paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
173        return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key,
174                                                 job_def['creation_time']))
175
176    def __repr__(self):
177        self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client))
178        return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
179