1# -*- coding: utf-8 -*- 2# 3# Copyright © 2014 eNovance 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17import contextlib 18import errno 19import functools 20import logging 21import socket 22 23from oslo_utils import encodeutils 24from pymemcache import client as pymemcache_client 25 26import tooz 27from tooz import _retry 28from tooz import coordination 29from tooz import locking 30from tooz import utils 31 32 33LOG = logging.getLogger(__name__) 34 35 36@contextlib.contextmanager 37def _failure_translator(): 38 """Translates common pymemcache exceptions into tooz exceptions. 39 40 https://github.com/pinterest/pymemcache/blob/d995/pymemcache/client.py#L202 41 """ 42 try: 43 yield 44 except pymemcache_client.MemcacheUnexpectedCloseError as e: 45 utils.raise_with_cause(coordination.ToozConnectionError, 46 encodeutils.exception_to_unicode(e), 47 cause=e) 48 except (socket.timeout, socket.error, 49 socket.gaierror, socket.herror) as e: 50 # TODO(harlowja): get upstream pymemcache to produce a better 51 # exception for these, using socket (vs. a memcache specific 52 # error) seems sorta not right and/or the best approach... 53 msg = encodeutils.exception_to_unicode(e) 54 if e.errno is not None: 55 msg += " (with errno %s [%s])" % (errno.errorcode[e.errno], 56 e.errno) 57 utils.raise_with_cause(coordination.ToozConnectionError, 58 msg, cause=e) 59 except pymemcache_client.MemcacheError as e: 60 utils.raise_with_cause(tooz.ToozError, 61 encodeutils.exception_to_unicode(e), 62 cause=e) 63 64 65def _translate_failures(func): 66 67 @functools.wraps(func) 68 def wrapper(*args, **kwargs): 69 with _failure_translator(): 70 return func(*args, **kwargs) 71 72 return wrapper 73 74 75class MemcachedLock(locking.Lock): 76 _LOCK_PREFIX = b'__TOOZ_LOCK_' 77 78 def __init__(self, coord, name, timeout): 79 super(MemcachedLock, self).__init__(self._LOCK_PREFIX + name) 80 self.coord = coord 81 self.timeout = timeout 82 83 def is_still_owner(self): 84 if not self.acquired: 85 return False 86 else: 87 owner = self.get_owner() 88 if owner is None: 89 return False 90 return owner == self.coord._member_id 91 92 def acquire(self, blocking=True, shared=False): 93 if shared: 94 raise tooz.NotImplemented 95 96 @_retry.retry(stop_max_delay=blocking) 97 @_translate_failures 98 def _acquire(): 99 if self.coord.client.add( 100 self.name, 101 self.coord._member_id, 102 expire=self.timeout, 103 noreply=False): 104 self.coord._acquired_locks.append(self) 105 return True 106 if blocking is False: 107 return False 108 raise _retry.TryAgain 109 110 return _acquire() 111 112 @_translate_failures 113 def break_(self): 114 return bool(self.coord.client.delete(self.name, noreply=False)) 115 116 @_translate_failures 117 def release(self): 118 if not self.acquired: 119 return False 120 # NOTE(harlowja): this has the potential to delete others locks 121 # especially if this key expired before the delete/release call is 122 # triggered. 123 # 124 # For example: 125 # 126 # 1. App #1 with coordinator 'A' acquires lock "b" 127 # 2. App #1 heartbeats every 10 seconds, expiry for lock let's 128 # say is 11 seconds. 129 # 3. App #2 with coordinator also named 'A' blocks trying to get 130 # lock "b" (let's say it retries attempts every 0.5 seconds) 131 # 4. App #1 is running behind a little bit, tries to heartbeat but 132 # key has expired (log message is written); at this point app #1 133 # doesn't own the lock anymore but it doesn't know that. 134 # 5. App #2 now retries and adds the key, and now it believes it 135 # has the lock. 136 # 6. App #1 (still believing it has the lock) calls release, and 137 # deletes app #2 lock, app #2 now doesn't own the lock anymore 138 # but it doesn't know that and now app #(X + 1) can get it. 139 # 7. App #2 calls release (repeat #6 as many times as desired) 140 # 141 # Sadly I don't think memcache has the primitives to actually make 142 # this work, redis does because it has lua which can check a session 143 # id and then do the delete and bail out if the session id is not 144 # as expected but memcache doesn't seem to have any equivalent 145 # capability. 146 if self not in self.coord._acquired_locks: 147 return False 148 # Do a ghetto test to see what the value is... (see above note), 149 # and how this really can't be done safely with memcache due to 150 # it being done in the client side (non-atomic). 151 value = self.coord.client.get(self.name) 152 if value != self.coord._member_id: 153 return False 154 else: 155 was_deleted = self.coord.client.delete(self.name, noreply=False) 156 if was_deleted: 157 self.coord._acquired_locks.remove(self) 158 return was_deleted 159 160 @_translate_failures 161 def heartbeat(self): 162 """Keep the lock alive.""" 163 if self.acquired: 164 poked = self.coord.client.touch(self.name, 165 expire=self.timeout, 166 noreply=False) 167 if poked: 168 return True 169 LOG.warning("Unable to heartbeat by updating key '%s' with " 170 "extended expiry of %s seconds", self.name, 171 self.timeout) 172 return False 173 174 @_translate_failures 175 def get_owner(self): 176 return self.coord.client.get(self.name) 177 178 @property 179 def acquired(self): 180 return self in self.coord._acquired_locks 181 182 183class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, 184 coordination.CoordinationDriverWithExecutor): 185 """A `memcached`_ based driver. 186 187 This driver users `memcached`_ concepts to provide the coordination driver 188 semantics and required API(s). It **is** fully functional and implements 189 all of the coordination driver API(s). It stores data into memcache 190 using expiries and `msgpack`_ encoded values. 191 192 The Memcached driver connection URI should look like:: 193 194 memcached://[HOST[:PORT]][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]] 195 196 If not specified, HOST defaults to localhost and PORT defaults to 11211. 197 Available options are: 198 199 ================== ======= 200 Name Default 201 ================== ======= 202 timeout 30 203 membership_timeout 30 204 lock_timeout 30 205 leader_timeout 30 206 max_pool_size None 207 ================== ======= 208 209 General recommendations/usage considerations: 210 211 - Memcache (without different backend technology) is a **cache** enough 212 said. 213 214 .. _memcached: http://memcached.org/ 215 .. _msgpack: http://msgpack.org/ 216 """ 217 218 CHARACTERISTICS = ( 219 coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, 220 coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, 221 coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS, 222 coordination.Characteristics.CAUSAL, 223 ) 224 """ 225 Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable 226 enum member(s) that can be used to interogate how this driver works. 227 """ 228 229 #: Key prefix attached to groups (used in name-spacing keys) 230 GROUP_PREFIX = b'_TOOZ_GROUP_' 231 232 #: Key prefix attached to leaders of groups (used in name-spacing keys) 233 GROUP_LEADER_PREFIX = b'_TOOZ_GROUP_LEADER_' 234 235 #: Key prefix attached to members of groups (used in name-spacing keys) 236 MEMBER_PREFIX = b'_TOOZ_MEMBER_' 237 238 #: Key where all groups 'known' are stored. 239 GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST' 240 241 #: Default socket/lock/member/leader timeout used when none is provided. 242 DEFAULT_TIMEOUT = 30 243 244 #: String used to keep a key/member alive (until it next expires). 245 STILL_ALIVE = b"It's alive!" 246 247 def __init__(self, member_id, parsed_url, options): 248 super(MemcachedDriver, self).__init__(member_id, parsed_url, options) 249 self.host = (parsed_url.hostname or "localhost", 250 parsed_url.port or 11211) 251 default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT) 252 self.timeout = int(default_timeout) 253 self.membership_timeout = int(self._options.get( 254 'membership_timeout', default_timeout)) 255 self.lock_timeout = int(self._options.get( 256 'lock_timeout', default_timeout)) 257 self.leader_timeout = int(self._options.get( 258 'leader_timeout', default_timeout)) 259 max_pool_size = self._options.get('max_pool_size', None) 260 if max_pool_size is not None: 261 self.max_pool_size = int(max_pool_size) 262 else: 263 self.max_pool_size = None 264 self._acquired_locks = [] 265 266 @staticmethod 267 def _msgpack_serializer(key, value): 268 if isinstance(value, bytes): 269 return value, 1 270 return utils.dumps(value), 2 271 272 @staticmethod 273 def _msgpack_deserializer(key, value, flags): 274 if flags == 1: 275 return value 276 if flags == 2: 277 return utils.loads(value) 278 raise coordination.SerializationError("Unknown serialization" 279 " format '%s'" % flags) 280 281 @_translate_failures 282 def _start(self): 283 super(MemcachedDriver, self)._start() 284 self.client = pymemcache_client.PooledClient( 285 self.host, 286 serializer=self._msgpack_serializer, 287 deserializer=self._msgpack_deserializer, 288 timeout=self.timeout, 289 connect_timeout=self.timeout, 290 max_pool_size=self.max_pool_size) 291 # Run heartbeat here because pymemcache use a lazy connection 292 # method and only connect once you do an operation. 293 self.heartbeat() 294 295 @_translate_failures 296 def _stop(self): 297 super(MemcachedDriver, self)._stop() 298 for lock in list(self._acquired_locks): 299 lock.release() 300 self.client.delete(self._encode_member_id(self._member_id)) 301 self.client.close() 302 303 def _encode_group_id(self, group_id): 304 return self.GROUP_PREFIX + utils.to_binary(group_id) 305 306 def _encode_member_id(self, member_id): 307 return self.MEMBER_PREFIX + utils.to_binary(member_id) 308 309 def _encode_group_leader(self, group_id): 310 return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id) 311 312 @_retry.retry() 313 def _add_group_to_group_list(self, group_id): 314 """Add group to the group list. 315 316 :param group_id: The group id 317 """ 318 group_list, cas = self.client.gets(self.GROUP_LIST_KEY) 319 if cas: 320 group_list = set(group_list) 321 group_list.add(group_id) 322 if not self.client.cas(self.GROUP_LIST_KEY, 323 list(group_list), cas): 324 # Someone updated the group list before us, try again! 325 raise _retry.TryAgain 326 else: 327 if not self.client.add(self.GROUP_LIST_KEY, 328 [group_id], noreply=False): 329 # Someone updated the group list before us, try again! 330 raise _retry.TryAgain 331 332 @_retry.retry() 333 def _remove_from_group_list(self, group_id): 334 """Remove group from the group list. 335 336 :param group_id: The group id 337 """ 338 group_list, cas = self.client.gets(self.GROUP_LIST_KEY) 339 group_list = set(group_list) 340 group_list.remove(group_id) 341 if not self.client.cas(self.GROUP_LIST_KEY, 342 list(group_list), cas): 343 # Someone updated the group list before us, try again! 344 raise _retry.TryAgain 345 346 def create_group(self, group_id): 347 encoded_group = self._encode_group_id(group_id) 348 349 @_translate_failures 350 def _create_group(): 351 if not self.client.add(encoded_group, {}, noreply=False): 352 raise coordination.GroupAlreadyExist(group_id) 353 self._add_group_to_group_list(group_id) 354 355 return MemcachedFutureResult(self._executor.submit(_create_group)) 356 357 def get_groups(self): 358 359 @_translate_failures 360 def _get_groups(): 361 return self.client.get(self.GROUP_LIST_KEY) or [] 362 363 return MemcachedFutureResult(self._executor.submit(_get_groups)) 364 365 def join_group(self, group_id, capabilities=b""): 366 encoded_group = self._encode_group_id(group_id) 367 368 @_retry.retry() 369 @_translate_failures 370 def _join_group(): 371 group_members, cas = self.client.gets(encoded_group) 372 if group_members is None: 373 raise coordination.GroupNotCreated(group_id) 374 if self._member_id in group_members: 375 raise coordination.MemberAlreadyExist(group_id, 376 self._member_id) 377 group_members[self._member_id] = { 378 b"capabilities": capabilities, 379 } 380 if not self.client.cas(encoded_group, group_members, cas): 381 # It changed, let's try again 382 raise _retry.TryAgain 383 self._joined_groups.add(group_id) 384 385 return MemcachedFutureResult(self._executor.submit(_join_group)) 386 387 def leave_group(self, group_id): 388 encoded_group = self._encode_group_id(group_id) 389 390 @_retry.retry() 391 @_translate_failures 392 def _leave_group(): 393 group_members, cas = self.client.gets(encoded_group) 394 if group_members is None: 395 raise coordination.GroupNotCreated(group_id) 396 if self._member_id not in group_members: 397 raise coordination.MemberNotJoined(group_id, self._member_id) 398 del group_members[self._member_id] 399 if not self.client.cas(encoded_group, group_members, cas): 400 # It changed, let's try again 401 raise _retry.TryAgain 402 self._joined_groups.discard(group_id) 403 404 return MemcachedFutureResult(self._executor.submit(_leave_group)) 405 406 def _destroy_group(self, group_id): 407 self.client.delete(self._encode_group_id(group_id)) 408 409 def delete_group(self, group_id): 410 encoded_group = self._encode_group_id(group_id) 411 412 @_retry.retry() 413 @_translate_failures 414 def _delete_group(): 415 group_members, cas = self.client.gets(encoded_group) 416 if group_members is None: 417 raise coordination.GroupNotCreated(group_id) 418 if group_members != {}: 419 raise coordination.GroupNotEmpty(group_id) 420 # Delete is not atomic, so we first set the group to 421 # using CAS, and then we delete it, to avoid race conditions. 422 if not self.client.cas(encoded_group, None, cas): 423 raise _retry.TryAgain 424 self.client.delete(encoded_group) 425 self._remove_from_group_list(group_id) 426 427 return MemcachedFutureResult(self._executor.submit(_delete_group)) 428 429 @_retry.retry() 430 @_translate_failures 431 def _get_members(self, group_id): 432 encoded_group = self._encode_group_id(group_id) 433 group_members, cas = self.client.gets(encoded_group) 434 if group_members is None: 435 raise coordination.GroupNotCreated(group_id) 436 actual_group_members = {} 437 for m, v in group_members.items(): 438 # Never kick self from the group, we know we're alive 439 if (m == self._member_id or 440 self.client.get(self._encode_member_id(m))): 441 actual_group_members[m] = v 442 if group_members != actual_group_members: 443 # There are some dead members, update the group 444 if not self.client.cas(encoded_group, actual_group_members, cas): 445 # It changed, let's try again 446 raise _retry.TryAgain 447 return actual_group_members 448 449 def get_members(self, group_id): 450 451 def _get_members(): 452 return set(self._get_members(group_id).keys()) 453 454 return MemcachedFutureResult(self._executor.submit(_get_members)) 455 456 def get_member_capabilities(self, group_id, member_id): 457 458 def _get_member_capabilities(): 459 group_members = self._get_members(group_id) 460 if member_id not in group_members: 461 raise coordination.MemberNotJoined(group_id, member_id) 462 return group_members[member_id][b'capabilities'] 463 464 return MemcachedFutureResult( 465 self._executor.submit(_get_member_capabilities)) 466 467 def update_capabilities(self, group_id, capabilities): 468 encoded_group = self._encode_group_id(group_id) 469 470 @_retry.retry() 471 @_translate_failures 472 def _update_capabilities(): 473 group_members, cas = self.client.gets(encoded_group) 474 if group_members is None: 475 raise coordination.GroupNotCreated(group_id) 476 if self._member_id not in group_members: 477 raise coordination.MemberNotJoined(group_id, self._member_id) 478 group_members[self._member_id][b'capabilities'] = capabilities 479 if not self.client.cas(encoded_group, group_members, cas): 480 # It changed, try again 481 raise _retry.TryAgain 482 483 return MemcachedFutureResult( 484 self._executor.submit(_update_capabilities)) 485 486 def get_leader(self, group_id): 487 488 def _get_leader(): 489 return self._get_leader_lock(group_id).get_owner() 490 491 return MemcachedFutureResult(self._executor.submit(_get_leader)) 492 493 @_translate_failures 494 def heartbeat(self): 495 self.client.set(self._encode_member_id(self._member_id), 496 self.STILL_ALIVE, 497 expire=self.membership_timeout) 498 # Reset the acquired locks 499 for lock in self._acquired_locks: 500 lock.heartbeat() 501 return min(self.membership_timeout, 502 self.leader_timeout, 503 self.lock_timeout) 504 505 def get_lock(self, name): 506 return MemcachedLock(self, name, self.lock_timeout) 507 508 def _get_leader_lock(self, group_id): 509 return MemcachedLock(self, self._encode_group_leader(group_id), 510 self.leader_timeout) 511 512 @_translate_failures 513 def run_elect_coordinator(self): 514 for group_id, hooks in self._hooks_elected_leader.items(): 515 # Try to grab the lock, if that fails, that means someone has it 516 # already. 517 leader_lock = self._get_leader_lock(group_id) 518 if leader_lock.acquire(blocking=False): 519 # We got the lock 520 hooks.run(coordination.LeaderElected( 521 group_id, 522 self._member_id)) 523 524 def run_watchers(self, timeout=None): 525 result = super(MemcachedDriver, self).run_watchers(timeout=timeout) 526 self.run_elect_coordinator() 527 return result 528 529 530MemcachedFutureResult = functools.partial( 531 coordination.CoordinatorResult, 532 failure_translator=_failure_translator) 533