1 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import urllib2 19import urllib 20import simplejson 21 22from contextlib import contextmanager 23 24class RequestWithMethod(urllib2.Request): 25 """ Request class that know how to set the method name """ 26 def __init__(self, *args, **kwargs): 27 urllib2.Request.__init__(self, *args, **kwargs) 28 self._method = None 29 30 def get_method(self): 31 return self._method or \ 32 urllib2.Request.get_method(self) 33 34 def set_method(self, method): 35 self._method = method 36 37class ZooKeeper(object): 38 39 class Error(Exception): pass 40 41 class NotFound(Error): pass 42 43 class ZNodeExists(Error): pass 44 45 class InvalidSession(Error): pass 46 47 class WrongVersion(Error): pass 48 49 def __init__(self, uri = 'http://localhost:9998'): 50 self._base = uri 51 self._session = None 52 53 def start_session(self, expire=5, id=None): 54 """ Create a session and return the ID """ 55 if id is None: 56 url = "%s/sessions/v1/?op=create&expire=%d" % (self._base, expire) 57 self._session = self._do_post(url)['id'] 58 else: 59 self._session = id 60 return self._session 61 62 def close_session(self): 63 """ Close the session on the server """ 64 if self._session is not None: 65 url = '%s/sessions/v1/%s' % (self._base, self._session) 66 self._do_delete(url) 67 self._session = None 68 69 def heartbeat(self): 70 """ Send a heartbeat request. This is needed in order to keep a session alive """ 71 if self._session is not None: 72 url = '%s/sessions/v1/%s' % (self._base, self._session) 73 self._do_put(url, '') 74 75 @contextmanager 76 def session(self, *args, **kwargs): 77 """ Session handling using a context manager """ 78 yield self.start_session(*args, **kwargs) 79 self.close_session() 80 81 def get(self, path): 82 """ Get a node """ 83 url = "%s/znodes/v1%s" % (self._base, path) 84 return self._do_get(url) 85 86 def get_children(self, path): 87 """ Get all the children for a given path. This function creates a generator """ 88 url = "%s/znodes/v1%s?view=children" % (self._base, path) 89 resp = self._do_get(url) 90 for child in resp.get('children', []): 91 try: 92 yield self._do_get(resp['child_uri_template']\ 93 .replace('{child}', urllib2.quote(child))) 94 except ZooKeeper.NotFound: 95 continue 96 97 def create(self, path, data=None, sequence=False, ephemeral=False): 98 """ Create a new node. By default this call creates a persistent znode. 99 100 You can also create an ephemeral or a sequential znode. 101 """ 102 ri = path.rindex('/') 103 head, name = path[:ri+1], path[ri+1:] 104 if head != '/': head = head[:-1] 105 106 flags = { 107 'null': 'true' if data is None else 'false', 108 'ephemeral': 'true' if ephemeral else 'false', 109 'sequence': 'true' if sequence else 'false' 110 } 111 if ephemeral: 112 if self._session: 113 flags['session'] = self._session 114 else: 115 raise ZooKeeper.Error, 'You need a session '\ 116 'to create an ephemeral node' 117 flags = urllib.urlencode(flags) 118 119 url = "%s/znodes/v1%s?op=create&name=%s&%s" % \ 120 (self._base, head, name, flags) 121 122 return self._do_post(url, data) 123 124 def set(self, path, data=None, version=-1, null=False): 125 """ Set the value of node """ 126 url = "%s/znodes/v1%s?%s" % (self._base, path, \ 127 urllib.urlencode({ 128 'version': version, 129 'null': 'true' if null else 'false' 130 })) 131 return self._do_put(url, data) 132 133 def delete(self, path, version=-1): 134 """ Delete a znode """ 135 if type(path) is list: 136 map(lambda el: self.delete(el, version), path) 137 return 138 139 url = '%s/znodes/v1%s?%s' % (self._base, path, \ 140 urllib.urlencode({ 141 'version':version 142 })) 143 try: 144 return self._do_delete(url) 145 except urllib2.HTTPError, e: 146 if e.code == 412: 147 raise ZooKeeper.WrongVersion(path) 148 elif e.code == 404: 149 raise ZooKeeper.NotFound(path) 150 raise 151 152 def exists(self, path): 153 """ Do a znode exists """ 154 try: 155 self.get(path) 156 return True 157 except ZooKeeper.NotFound: 158 return False 159 160 def _do_get(self, uri): 161 """ Send a GET request and convert errors to exceptions """ 162 try: 163 req = urllib2.urlopen(uri) 164 resp = simplejson.load(req) 165 166 if 'Error' in resp: 167 raise ZooKeeper.Error(resp['Error']) 168 169 return resp 170 except urllib2.HTTPError, e: 171 if e.code == 404: 172 raise ZooKeeper.NotFound(uri) 173 raise 174 175 def _do_post(self, uri, data=None): 176 """ Send a POST request and convert errors to exceptions """ 177 try: 178 req = urllib2.Request(uri, {}) 179 req.add_header('Content-Type', 'application/octet-stream') 180 if data is not None: 181 req.add_data(data) 182 183 resp = simplejson.load(urllib2.urlopen(req)) 184 if 'Error' in resp: 185 raise ZooKeeper.Error(resp['Error']) 186 return resp 187 188 except urllib2.HTTPError, e: 189 if e.code == 201: 190 return True 191 elif e.code == 409: 192 raise ZooKeeper.ZNodeExists(uri) 193 elif e.code == 401: 194 raise ZooKeeper.InvalidSession(uri) 195 raise 196 197 def _do_delete(self, uri): 198 """ Send a DELETE request """ 199 req = RequestWithMethod(uri) 200 req.set_method('DELETE') 201 req.add_header('Content-Type', 'application/octet-stream') 202 return urllib2.urlopen(req).read() 203 204 def _do_put(self, uri, data): 205 """ Send a PUT request """ 206 try: 207 req = RequestWithMethod(uri) 208 req.set_method('PUT') 209 req.add_header('Content-Type', 'application/octet-stream') 210 if data is not None: 211 req.add_data(data) 212 213 return urllib2.urlopen(req).read() 214 except urllib2.HTTPError, e: 215 if e.code == 412: # precondition failed 216 raise ZooKeeper.WrongVersion(uri) 217 raise 218 219