1 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import urllib2 19import urllib 20import simplejson 21 22from contextlib import contextmanager 23 24class RequestWithMethod(urllib2.Request): 25 """ Request class that know how to set the method name """ 26 def __init__(self, *args, **kwargs): 27 urllib2.Request.__init__(self, *args, **kwargs) 28 self._method = None 29 30 def get_method(self): 31 return self._method or \ 32 urllib2.Request.get_method(self) 33 34 def set_method(self, method): 35 self._method = method 36 37class ZooKeeper(object): 38 39 class Error(Exception): pass 40 41 class NotFound(Error): pass 42 43 class ZNodeExists(Error): pass 44 45 class InvalidSession(Error): pass 46 47 class WrongVersion(Error): pass 48 49 def __init__(self, uri = 'http://localhost:9998'): 50 self._base = uri 51 self._session = None 52 53 def start_session(self, expire=5, id=None): 54 """ Create a session and return the ID """ 55 if id is None: 56 url = "%s/sessions/v1/?op=create&expire=%d" % (self._base, expire) 57 self._session = self._do_post(url)['id'] 58 else: 59 self._session = id 60 return self._session 61 62 def close_session(self): 63 """ Close the session on the server """ 64 if self._session is not None: 65 url = '%s/sessions/v1/%s' % (self._base, self._session) 66 self._do_delete(url) 67 self._session = None 68 69 def heartbeat(self): 70 """ Send a heartbeat request. This is needed in order to keep a session alive """ 71 if self._session is not None: 72 url = '%s/sessions/v1/%s' % (self._base, self._session) 73 self._do_put(url, '') 74 75 @contextmanager 76 def session(self, *args, **kwargs): 77 """ Session handling using a context manager """ 78 yield self.start_session(*args, **kwargs) 79 self.close_session() 80 81 def get(self, path): 82 """ Get a node """ 83 url = "%s/znodes/v1%s" % (self._base, path) 84 return self._do_get(url) 85 86 def get_children(self, path): 87 """ Get all the children for a given path. This function creates a generator """ 88 for child_path in self.get_children_paths(path, uris=True): 89 try: 90 yield self._do_get(child_path) 91 except ZooKeeper.NotFound: 92 continue 93 94 def get_children_paths(self, path, uris=False): 95 """ Get the paths for children nodes """ 96 url = "%s/znodes/v1%s?view=children" % (self._base, path) 97 resp = self._do_get(url) 98 for child in resp.get('children', []): 99 yield child if not uris else resp['child_uri_template']\ 100 .replace('{child}', urllib2.quote(child)) 101 102 def create(self, path, data=None, sequence=False, ephemeral=False): 103 """ Create a new node. By default this call creates a persistent znode. 104 105 You can also create an ephemeral or a sequential znode. 106 """ 107 ri = path.rindex('/') 108 head, name = path[:ri+1], path[ri+1:] 109 if head != '/': head = head[:-1] 110 111 flags = { 112 'null': 'true' if data is None else 'false', 113 'ephemeral': 'true' if ephemeral else 'false', 114 'sequence': 'true' if sequence else 'false' 115 } 116 if ephemeral: 117 if self._session: 118 flags['session'] = self._session 119 else: 120 raise ZooKeeper.Error, 'You need a session '\ 121 'to create an ephemeral node' 122 flags = urllib.urlencode(flags) 123 124 url = "%s/znodes/v1%s?op=create&name=%s&%s" % \ 125 (self._base, head, name, flags) 126 127 return self._do_post(url, data) 128 129 def set(self, path, data=None, version=-1, null=False): 130 """ Set the value of node """ 131 url = "%s/znodes/v1%s?%s" % (self._base, path, \ 132 urllib.urlencode({ 133 'version': version, 134 'null': 'true' if null else 'false' 135 })) 136 return self._do_put(url, data) 137 138 def delete(self, path, version=-1): 139 """ Delete a znode """ 140 if type(path) is list: 141 map(lambda el: self.delete(el, version), path) 142 return 143 144 url = '%s/znodes/v1%s?%s' % (self._base, path, \ 145 urllib.urlencode({ 146 'version':version 147 })) 148 try: 149 return self._do_delete(url) 150 except urllib2.HTTPError, e: 151 if e.code == 412: 152 raise ZooKeeper.WrongVersion(path) 153 elif e.code == 404: 154 raise ZooKeeper.NotFound(path) 155 raise 156 157 def recursive_delete(self, path): 158 """ Delete all the nodes from the tree """ 159 for child in self.get_children_paths(path): 160 fp = ("%s/%s" % (path, child)).replace('//', '/') 161 self.recursive_delete(fp) 162 self.delete(path) 163 164 def exists(self, path): 165 """ Do a znode exists """ 166 try: 167 self.get(path) 168 return True 169 except ZooKeeper.NotFound: 170 return False 171 172 def _do_get(self, uri): 173 """ Send a GET request and convert errors to exceptions """ 174 try: 175 req = urllib2.urlopen(uri) 176 resp = simplejson.load(req) 177 178 if 'Error' in resp: 179 raise ZooKeeper.Error(resp['Error']) 180 181 return resp 182 except urllib2.HTTPError, e: 183 if e.code == 404: 184 raise ZooKeeper.NotFound(uri) 185 raise 186 187 def _do_post(self, uri, data=None): 188 """ Send a POST request and convert errors to exceptions """ 189 try: 190 req = urllib2.Request(uri, {}) 191 req.add_header('Content-Type', 'application/octet-stream') 192 if data is not None: 193 req.add_data(data) 194 195 resp = simplejson.load(urllib2.urlopen(req)) 196 if 'Error' in resp: 197 raise ZooKeeper.Error(resp['Error']) 198 return resp 199 200 except urllib2.HTTPError, e: 201 if e.code == 201: 202 return True 203 elif e.code == 409: 204 raise ZooKeeper.ZNodeExists(uri) 205 elif e.code == 401: 206 raise ZooKeeper.InvalidSession(uri) 207 raise 208 209 def _do_delete(self, uri): 210 """ Send a DELETE request """ 211 req = RequestWithMethod(uri) 212 req.set_method('DELETE') 213 req.add_header('Content-Type', 'application/octet-stream') 214 return urllib2.urlopen(req).read() 215 216 def _do_put(self, uri, data): 217 """ Send a PUT request """ 218 try: 219 req = RequestWithMethod(uri) 220 req.set_method('PUT') 221 req.add_header('Content-Type', 'application/octet-stream') 222 if data is not None: 223 req.add_data(data) 224 225 return urllib2.urlopen(req).read() 226 except urllib2.HTTPError, e: 227 if e.code == 412: # precondition failed 228 raise ZooKeeper.WrongVersion(uri) 229 raise 230 231