1
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements.  See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership.  The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License.  You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import urllib2
19import urllib
20import simplejson
21
22from contextlib import contextmanager
23
24class RequestWithMethod(urllib2.Request):
25    """ Request class that know how to set the method name """
26    def __init__(self, *args, **kwargs):
27        urllib2.Request.__init__(self, *args, **kwargs)
28        self._method = None
29
30    def get_method(self):
31        return self._method or \
32            urllib2.Request.get_method(self)
33
34    def set_method(self, method):
35        self._method = method
36
37class ZooKeeper(object):
38
39    class Error(Exception): pass
40
41    class NotFound(Error): pass
42
43    class ZNodeExists(Error): pass
44
45    class InvalidSession(Error): pass
46
47    class WrongVersion(Error): pass
48
49    def __init__(self, uri = 'http://localhost:9998'):
50        self._base = uri
51        self._session = None
52
53    def start_session(self, expire=5, id=None):
54        """ Create a session and return the ID """
55        if id is None:
56            url = "%s/sessions/v1/?op=create&expire=%d" % (self._base, expire)
57            self._session = self._do_post(url)['id']
58        else:
59            self._session = id
60        return self._session
61
62    def close_session(self):
63        """ Close the session on the server """
64        if self._session is not None:
65            url = '%s/sessions/v1/%s' % (self._base, self._session)
66            self._do_delete(url)
67            self._session = None
68
69    def heartbeat(self):
70        """ Send a heartbeat request. This is needed in order to keep a session alive """
71        if self._session is not None:
72            url = '%s/sessions/v1/%s' % (self._base, self._session)
73            self._do_put(url, '')
74
75    @contextmanager
76    def session(self, *args, **kwargs):
77        """ Session handling using a context manager """
78        yield self.start_session(*args, **kwargs)
79        self.close_session()
80
81    def get(self, path):
82        """ Get a node """
83        url = "%s/znodes/v1%s" % (self._base, path)
84        return self._do_get(url)
85
86    def get_children(self, path):
87        """ Get all the children for a given path. This function creates a generator """
88        for child_path in self.get_children_paths(path, uris=True):
89            try:
90                yield self._do_get(child_path)
91            except ZooKeeper.NotFound:
92                continue
93
94    def get_children_paths(self, path, uris=False):
95        """ Get the paths for children nodes """
96        url = "%s/znodes/v1%s?view=children" % (self._base, path)
97        resp = self._do_get(url)
98        for child in resp.get('children', []):
99            yield child if not uris else resp['child_uri_template']\
100              .replace('{child}', urllib2.quote(child))
101
102    def create(self, path, data=None, sequence=False, ephemeral=False):
103        """ Create a new node. By default this call creates a persistent znode.
104
105        You can also create an ephemeral or a sequential znode.
106        """
107        ri = path.rindex('/')
108        head, name = path[:ri+1], path[ri+1:]
109        if head != '/': head = head[:-1]
110
111        flags = {
112            'null': 'true' if data is None else 'false',
113            'ephemeral': 'true' if ephemeral else 'false',
114            'sequence': 'true' if sequence else 'false'
115        }
116        if ephemeral:
117            if self._session:
118                flags['session'] = self._session
119            else:
120                raise ZooKeeper.Error, 'You need a session '\
121                    'to create an ephemeral node'
122        flags = urllib.urlencode(flags)
123
124        url = "%s/znodes/v1%s?op=create&name=%s&%s" % \
125            (self._base, head, name, flags)
126
127        return self._do_post(url, data)
128
129    def set(self, path, data=None, version=-1, null=False):
130        """ Set the value of node """
131        url = "%s/znodes/v1%s?%s" % (self._base, path, \
132            urllib.urlencode({
133                'version': version,
134                'null': 'true' if null else 'false'
135        }))
136        return self._do_put(url, data)
137
138    def delete(self, path, version=-1):
139        """ Delete a znode """
140        if type(path) is list:
141            map(lambda el: self.delete(el, version), path)
142            return
143
144        url = '%s/znodes/v1%s?%s' % (self._base, path, \
145            urllib.urlencode({
146                'version':version
147        }))
148        try:
149            return self._do_delete(url)
150        except urllib2.HTTPError, e:
151            if e.code == 412:
152                raise ZooKeeper.WrongVersion(path)
153            elif e.code == 404:
154                raise ZooKeeper.NotFound(path)
155            raise
156
157    def recursive_delete(self, path):
158        """ Delete all the nodes from the tree """
159        for child in self.get_children_paths(path):
160            fp = ("%s/%s" % (path, child)).replace('//', '/')
161            self.recursive_delete(fp)
162        self.delete(path)
163
164    def exists(self, path):
165        """ Do a znode exists """
166        try:
167            self.get(path)
168            return True
169        except ZooKeeper.NotFound:
170            return False
171
172    def _do_get(self, uri):
173        """ Send a GET request and convert errors to exceptions """
174        try:
175            req = urllib2.urlopen(uri)
176            resp = simplejson.load(req)
177
178            if 'Error' in resp:
179               raise ZooKeeper.Error(resp['Error'])
180
181            return resp
182        except urllib2.HTTPError, e:
183            if e.code == 404:
184                raise ZooKeeper.NotFound(uri)
185            raise
186
187    def _do_post(self, uri, data=None):
188        """ Send a POST request and convert errors to exceptions """
189        try:
190            req = urllib2.Request(uri, {})
191            req.add_header('Content-Type', 'application/octet-stream')
192            if data is not None:
193                req.add_data(data)
194
195            resp = simplejson.load(urllib2.urlopen(req))
196            if 'Error' in resp:
197                raise ZooKeeper.Error(resp['Error'])
198            return resp
199
200        except urllib2.HTTPError, e:
201            if e.code == 201:
202                return True
203            elif e.code == 409:
204                raise ZooKeeper.ZNodeExists(uri)
205            elif e.code == 401:
206                raise ZooKeeper.InvalidSession(uri)
207            raise
208
209    def _do_delete(self, uri):
210        """ Send a DELETE request """
211        req = RequestWithMethod(uri)
212        req.set_method('DELETE')
213        req.add_header('Content-Type', 'application/octet-stream')
214        return urllib2.urlopen(req).read()
215
216    def _do_put(self, uri, data):
217        """ Send a PUT request """
218        try:
219            req = RequestWithMethod(uri)
220            req.set_method('PUT')
221            req.add_header('Content-Type', 'application/octet-stream')
222            if data is not None:
223                req.add_data(data)
224
225            return urllib2.urlopen(req).read()
226        except urllib2.HTTPError, e:
227            if e.code == 412: # precondition failed
228                raise ZooKeeper.WrongVersion(uri)
229            raise
230
231