1
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements.  See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership.  The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License.  You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import urllib2
19import urllib
20import simplejson
21
22from contextlib import contextmanager
23
24class RequestWithMethod(urllib2.Request):
25    """ Request class that know how to set the method name """
26    def __init__(self, *args, **kwargs):
27        urllib2.Request.__init__(self, *args, **kwargs)
28        self._method = None
29
30    def get_method(self):
31        return self._method or \
32            urllib2.Request.get_method(self)
33
34    def set_method(self, method):
35        self._method = method
36
37class ZooKeeper(object):
38
39    class Error(Exception): pass
40
41    class NotFound(Error): pass
42
43    class ZNodeExists(Error): pass
44
45    class InvalidSession(Error): pass
46
47    class WrongVersion(Error): pass
48
49    def __init__(self, uri = 'http://localhost:9998'):
50        self._base = uri
51        self._session = None
52
53    def start_session(self, expire=5, id=None):
54        """ Create a session and return the ID """
55        if id is None:
56            url = "%s/sessions/v1/?op=create&expire=%d" % (self._base, expire)
57            self._session = self._do_post(url)['id']
58        else:
59            self._session = id
60        return self._session
61
62    def close_session(self):
63        """ Close the session on the server """
64        if self._session is not None:
65            url = '%s/sessions/v1/%s' % (self._base, self._session)
66            self._do_delete(url)
67            self._session = None
68
69    def heartbeat(self):
70        """ Send a heartbeat request. This is needed in order to keep a session alive """
71        if self._session is not None:
72            url = '%s/sessions/v1/%s' % (self._base, self._session)
73            self._do_put(url, '')
74
75    @contextmanager
76    def session(self, *args, **kwargs):
77        """ Session handling using a context manager """
78        yield self.start_session(*args, **kwargs)
79        self.close_session()
80
81    def get(self, path):
82        """ Get a node """
83        url = "%s/znodes/v1%s" % (self._base, path)
84        return self._do_get(url)
85
86    def get_children(self, path):
87        """ Get all the children for a given path. This function creates a generator """
88        url = "%s/znodes/v1%s?view=children" % (self._base, path)
89        resp = self._do_get(url)
90        for child in resp.get('children', []):
91            try:
92                yield self._do_get(resp['child_uri_template']\
93                    .replace('{child}', urllib2.quote(child)))
94            except ZooKeeper.NotFound:
95                continue
96
97    def create(self, path, data=None, sequence=False, ephemeral=False):
98        """ Create a new node. By default this call creates a persistent znode.
99
100        You can also create an ephemeral or a sequential znode.
101        """
102        ri = path.rindex('/')
103        head, name = path[:ri+1], path[ri+1:]
104        if head != '/': head = head[:-1]
105
106        flags = {
107            'null': 'true' if data is None else 'false',
108            'ephemeral': 'true' if ephemeral else 'false',
109            'sequence': 'true' if sequence else 'false'
110        }
111        if ephemeral:
112            if self._session:
113                flags['session'] = self._session
114            else:
115                raise ZooKeeper.Error, 'You need a session '\
116                    'to create an ephemeral node'
117        flags = urllib.urlencode(flags)
118
119        url = "%s/znodes/v1%s?op=create&name=%s&%s" % \
120            (self._base, head, name, flags)
121
122        return self._do_post(url, data)
123
124    def set(self, path, data=None, version=-1, null=False):
125        """ Set the value of node """
126        url = "%s/znodes/v1%s?%s" % (self._base, path, \
127            urllib.urlencode({
128                'version': version,
129                'null': 'true' if null else 'false'
130        }))
131        return self._do_put(url, data)
132
133    def delete(self, path, version=-1):
134        """ Delete a znode """
135        if type(path) is list:
136            map(lambda el: self.delete(el, version), path)
137            return
138
139        url = '%s/znodes/v1%s?%s' % (self._base, path, \
140            urllib.urlencode({
141                'version':version
142        }))
143        try:
144            return self._do_delete(url)
145        except urllib2.HTTPError, e:
146            if e.code == 412:
147                raise ZooKeeper.WrongVersion(path)
148            elif e.code == 404:
149                raise ZooKeeper.NotFound(path)
150            raise
151
152    def exists(self, path):
153        """ Do a znode exists """
154        try:
155            self.get(path)
156            return True
157        except ZooKeeper.NotFound:
158            return False
159
160    def _do_get(self, uri):
161        """ Send a GET request and convert errors to exceptions """
162        try:
163            req = urllib2.urlopen(uri)
164            resp = simplejson.load(req)
165
166            if 'Error' in resp:
167               raise ZooKeeper.Error(resp['Error'])
168
169            return resp
170        except urllib2.HTTPError, e:
171            if e.code == 404:
172                raise ZooKeeper.NotFound(uri)
173            raise
174
175    def _do_post(self, uri, data=None):
176        """ Send a POST request and convert errors to exceptions """
177        try:
178            req = urllib2.Request(uri, {})
179            req.add_header('Content-Type', 'application/octet-stream')
180            if data is not None:
181                req.add_data(data)
182
183            resp = simplejson.load(urllib2.urlopen(req))
184            if 'Error' in resp:
185                raise ZooKeeper.Error(resp['Error'])
186            return resp
187
188        except urllib2.HTTPError, e:
189            if e.code == 201:
190                return True
191            elif e.code == 409:
192                raise ZooKeeper.ZNodeExists(uri)
193            elif e.code == 401:
194                raise ZooKeeper.InvalidSession(uri)
195            raise
196
197    def _do_delete(self, uri):
198        """ Send a DELETE request """
199        req = RequestWithMethod(uri)
200        req.set_method('DELETE')
201        req.add_header('Content-Type', 'application/octet-stream')
202        return urllib2.urlopen(req).read()
203
204    def _do_put(self, uri, data):
205        """ Send a PUT request """
206        try:
207            req = RequestWithMethod(uri)
208            req.set_method('PUT')
209            req.add_header('Content-Type', 'application/octet-stream')
210            if data is not None:
211                req.add_data(data)
212
213            return urllib2.urlopen(req).read()
214        except urllib2.HTTPError, e:
215            if e.code == 412: # precondition failed
216                raise ZooKeeper.WrongVersion(uri)
217            raise
218
219