1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2016 Red Hat, Inc. 4# Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved. 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); you may 7# not use this file except in compliance with the License. You may obtain 8# a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 15# License for the specific language governing permissions and limitations 16# under the License. 17 18import abc 19import collections 20from concurrent import futures 21import enum 22import logging 23import threading 24import urllib 25 26from oslo_utils import encodeutils 27from oslo_utils import netutils 28from oslo_utils import timeutils 29from stevedore import driver 30import tenacity 31 32import tooz 33from tooz import _retry 34from tooz import partitioner 35from tooz import utils 36 37LOG = logging.getLogger(__name__) 38 39 40TOOZ_BACKENDS_NAMESPACE = "tooz.backends" 41 42 43class Characteristics(enum.Enum): 44 """Attempts to describe the characteristic that a driver supports.""" 45 46 DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS' 47 """Coordinator components when used by multiple **threads** work 48 the same as if those components were only used by a single thread.""" 49 50 DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES' 51 """Coordinator components when used by multiple **processes** work 52 the same as if those components were only used by a single thread.""" 53 54 DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS' 55 """Coordinator components when used by multiple **hosts** work 56 the same as if those components were only used by a single thread.""" 57 58 NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED' 59 """The driver has the following property: 60 61 * Its operations are not based on the timeout of other clients, but on some 62 other more robust mechanisms. 63 """ 64 65 LINEARIZABLE = 'LINEARIZABLE' 66 """The driver has the following properties: 67 68 * Ensures each operation must take place before its 69 completion time. 70 * Any operation invoked subsequently must take place 71 after the invocation and by extension, after the original operation 72 itself. 73 """ 74 75 SEQUENTIAL = 'SEQUENTIAL' 76 """The driver has the following properties: 77 78 * Operations can take effect before or after completion – but all 79 operations retain the constraint that operations from any given process 80 must take place in that processes order. 81 """ 82 83 CAUSAL = 'CAUSAL' 84 """The driver has the following properties: 85 86 * Does **not** have to enforce the order of every 87 operation from a process, perhaps, only causally related operations 88 must occur in order. 89 """ 90 91 SERIALIZABLE = 'SERIALIZABLE' 92 """The driver has the following properties: 93 94 * The history of **all** operations is equivalent to 95 one that took place in some single atomic order but with unknown 96 invocation and completion times - it places no bounds on 97 time or order. 98 """ 99 100 SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS' 101 """When a client is connected to a server and that server is partitioned 102 from a group of other servers it will (somehow) have the same view of 103 data as a client connected to a server on the other side of the 104 partition (typically this is accomplished by write availability being 105 lost and therefore nothing can change). 106 """ 107 108 SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS' 109 """A client connected to one server will *always* have the same view 110 every other client will have (no matter what server those other 111 clients are connected to). Typically this is a sacrifice in 112 write availability because before a write can be acknowledged it must 113 be acknowledged by *all* servers in a cluster (so that all clients 114 that are connected to those servers read the exact *same* thing). 115 """ 116 117 118class Hooks(list): 119 def run(self, *args, **kwargs): 120 return list(map(lambda cb: cb(*args, **kwargs), self)) 121 122 123class Event(object): 124 """Base class for events.""" 125 126 127class MemberJoinedGroup(Event): 128 """A member joined a group event.""" 129 130 def __init__(self, group_id, member_id): 131 self.group_id = group_id 132 self.member_id = member_id 133 134 def __repr__(self): 135 return "<%s: group %s: +member %s>" % (self.__class__.__name__, 136 self.group_id, 137 self.member_id) 138 139 140class MemberLeftGroup(Event): 141 """A member left a group event.""" 142 143 def __init__(self, group_id, member_id): 144 self.group_id = group_id 145 self.member_id = member_id 146 147 def __repr__(self): 148 return "<%s: group %s: -member %s>" % (self.__class__.__name__, 149 self.group_id, 150 self.member_id) 151 152 153class LeaderElected(Event): 154 """A leader as been elected.""" 155 156 def __init__(self, group_id, member_id): 157 self.group_id = group_id 158 self.member_id = member_id 159 160 161class Heart(object): 162 """Coordination drivers main liveness pump (its heart).""" 163 164 def __init__(self, driver, thread_cls=threading.Thread, 165 event_cls=threading.Event): 166 self._thread_cls = thread_cls 167 self._dead = event_cls() 168 self._runner = None 169 self._driver = driver 170 self._beats = 0 171 172 @property 173 def beats(self): 174 """How many times the heart has beaten.""" 175 return self._beats 176 177 def is_alive(self): 178 """Returns if the heart is beating.""" 179 return not (self._runner is None 180 or not self._runner.is_alive()) 181 182 def _beat_forever_until_stopped(self): 183 """Inner beating loop.""" 184 retry = tenacity.Retrying( 185 wait=tenacity.wait_fixed(1), 186 before_sleep=tenacity.before_sleep_log(LOG, logging.WARNING), 187 ) 188 while not self._dead.is_set(): 189 with timeutils.StopWatch() as w: 190 wait_until_next_beat = retry(self._driver.heartbeat) 191 ran_for = w.elapsed() 192 has_to_sleep_for = wait_until_next_beat - ran_for 193 if has_to_sleep_for < 0: 194 LOG.warning( 195 "Heartbeating took too long to execute (it ran for" 196 " %0.2f seconds which is %0.2f seconds longer than" 197 " the next heartbeat idle time). This may cause" 198 " timeouts (in locks, leadership, ...) to" 199 " happen (which will not end well).", ran_for, 200 ran_for - wait_until_next_beat) 201 self._beats += 1 202 # NOTE(harlowja): use the event object for waiting and 203 # not a sleep function since doing that will allow this code 204 # to terminate early if stopped via the stop() method vs 205 # having to wait until the sleep function returns. 206 # NOTE(jd): Wait for only the half time of what we should. 207 # This is a measure of safety, better be too soon than too late. 208 self._dead.wait(has_to_sleep_for / 2.0) 209 210 def start(self, thread_cls=None): 211 """Starts the heart beating thread (noop if already started).""" 212 if not self.is_alive(): 213 self._dead.clear() 214 self._beats = 0 215 if thread_cls is None: 216 thread_cls = self._thread_cls 217 self._runner = thread_cls(target=self._beat_forever_until_stopped) 218 self._runner.daemon = True 219 self._runner.start() 220 221 def stop(self): 222 """Requests the heart beating thread to stop beating.""" 223 self._dead.set() 224 225 def wait(self, timeout=None): 226 """Wait up to given timeout for the heart beating thread to stop.""" 227 self._runner.join(timeout) 228 return self._runner.is_alive() 229 230 231class CoordinationDriver(object): 232 233 requires_beating = False 234 """ 235 Usage requirement that if true requires that the :py:meth:`~.heartbeat` 236 be called periodically (at a given rate) to avoid locks, sessions and 237 other from being automatically closed/discarded by the coordinators 238 backing store. 239 """ 240 241 CHARACTERISTICS = () 242 """ 243 Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable 244 enum member(s) that can be used to interogate how this driver works. 245 """ 246 247 def __init__(self, member_id, parsed_url, options): 248 super(CoordinationDriver, self).__init__() 249 self._member_id = member_id 250 self._started = False 251 self._hooks_join_group = collections.defaultdict(Hooks) 252 self._hooks_leave_group = collections.defaultdict(Hooks) 253 self._hooks_elected_leader = collections.defaultdict(Hooks) 254 self.requires_beating = ( 255 CoordinationDriver.heartbeat != self.__class__.heartbeat 256 ) 257 self.heart = Heart(self) 258 259 def _has_hooks_for_group(self, group_id): 260 return (group_id in self._hooks_join_group or 261 group_id in self._hooks_leave_group) 262 263 def join_partitioned_group( 264 self, group_id, 265 weight=1, 266 partitions=partitioner.Partitioner.DEFAULT_PARTITION_NUMBER): 267 """Join a group and get a partitioner. 268 269 A partitioner allows to distribute a bunch of objects across several 270 members using a consistent hash ring. Each object gets assigned (at 271 least) one member responsible for it. It's then possible to check which 272 object is owned by any member of the group. 273 274 This method also creates if necessary, and joins the group with the 275 selected weight. 276 277 :param group_id: The group to create a partitioner for. 278 :param weight: The weight to use in the hashring for this node. 279 :param partitions: The number of partitions to create. 280 :return: A :py:class:`~tooz.partitioner.Partitioner` object. 281 282 """ 283 self.join_group_create(group_id, capabilities={'weight': weight}) 284 return partitioner.Partitioner(self, group_id, partitions=partitions) 285 286 def leave_partitioned_group(self, partitioner): 287 """Leave a partitioned group. 288 289 This leaves the partitioned group and stop the partitioner. 290 :param group_id: The group to create a partitioner for. 291 """ 292 leave = self.leave_group(partitioner.group_id) 293 partitioner.stop() 294 return leave.get() 295 296 @staticmethod 297 def run_watchers(timeout=None): 298 """Run the watchers callback. 299 300 This may also activate :py:meth:`.run_elect_coordinator` (depending 301 on driver implementation). 302 """ 303 raise tooz.NotImplemented 304 305 @staticmethod 306 def run_elect_coordinator(): 307 """Try to leader elect this coordinator & activate hooks on success.""" 308 raise tooz.NotImplemented 309 310 def watch_join_group(self, group_id, callback): 311 """Call a function when group_id sees a new member joined. 312 313 The callback functions will be executed when `run_watchers` is 314 called. 315 316 :param group_id: The group id to watch 317 :param callback: The function to execute when a member joins this group 318 319 """ 320 self._hooks_join_group[group_id].append(callback) 321 322 def unwatch_join_group(self, group_id, callback): 323 """Stop executing a function when a group_id sees a new member joined. 324 325 :param group_id: The group id to unwatch 326 :param callback: The function that was executed when a member joined 327 this group 328 """ 329 try: 330 # Check if group_id is in hooks to avoid creating a default empty 331 # entry in hooks list. 332 if group_id not in self._hooks_join_group: 333 raise ValueError 334 self._hooks_join_group[group_id].remove(callback) 335 except ValueError: 336 raise WatchCallbackNotFound(group_id, callback) 337 338 if not self._hooks_join_group[group_id]: 339 del self._hooks_join_group[group_id] 340 341 def watch_leave_group(self, group_id, callback): 342 """Call a function when group_id sees a new member leaving. 343 344 The callback functions will be executed when `run_watchers` is 345 called. 346 347 :param group_id: The group id to watch 348 :param callback: The function to execute when a member leaves this 349 group 350 351 """ 352 self._hooks_leave_group[group_id].append(callback) 353 354 def unwatch_leave_group(self, group_id, callback): 355 """Stop executing a function when a group_id sees a new member leaving. 356 357 :param group_id: The group id to unwatch 358 :param callback: The function that was executed when a member left 359 this group 360 """ 361 try: 362 # Check if group_id is in hooks to avoid creating a default empty 363 # entry in hooks list. 364 if group_id not in self._hooks_leave_group: 365 raise ValueError 366 self._hooks_leave_group[group_id].remove(callback) 367 except ValueError: 368 raise WatchCallbackNotFound(group_id, callback) 369 370 if not self._hooks_leave_group[group_id]: 371 del self._hooks_leave_group[group_id] 372 373 def watch_elected_as_leader(self, group_id, callback): 374 """Call a function when member gets elected as leader. 375 376 The callback functions will be executed when `run_watchers` is 377 called. 378 379 :param group_id: The group id to watch 380 :param callback: The function to execute when a member leaves this 381 group 382 383 """ 384 self._hooks_elected_leader[group_id].append(callback) 385 386 def unwatch_elected_as_leader(self, group_id, callback): 387 """Call a function when member gets elected as leader. 388 389 The callback functions will be executed when `run_watchers` is 390 called. 391 392 :param group_id: The group id to watch 393 :param callback: The function to execute when a member leaves this 394 group 395 396 """ 397 try: 398 self._hooks_elected_leader[group_id].remove(callback) 399 except ValueError: 400 raise WatchCallbackNotFound(group_id, callback) 401 402 if not self._hooks_elected_leader[group_id]: 403 del self._hooks_elected_leader[group_id] 404 405 @staticmethod 406 def stand_down_group_leader(group_id): 407 """Stand down as the group leader if we are. 408 409 :param group_id: The group where we don't want to be a leader anymore 410 """ 411 raise tooz.NotImplemented 412 413 @property 414 def is_started(self): 415 return self._started 416 417 def start(self, start_heart=False): 418 """Start the service engine. 419 420 If needed, the establishment of a connection to the servers 421 is initiated. 422 """ 423 if self._started: 424 raise tooz.ToozError( 425 "Can not start a driver which has not been stopped") 426 self._start() 427 if self.requires_beating and start_heart: 428 self.heart.start() 429 self._started = True 430 # Tracks which group are joined 431 self._joined_groups = set() 432 433 def _start(self): 434 pass 435 436 def stop(self): 437 """Stop the service engine. 438 439 If needed, the connection to servers is closed and the client will 440 disappear from all joined groups. 441 """ 442 if not self._started: 443 raise tooz.ToozError( 444 "Can not stop a driver which has not been started") 445 if self.heart.is_alive(): 446 self.heart.stop() 447 self.heart.wait() 448 # Some of the drivers modify joined_groups when being called to leave 449 # so clone it so that we aren't modifying something while iterating. 450 joined_groups = self._joined_groups.copy() 451 leaving = [self.leave_group(group) for group in joined_groups] 452 for fut in leaving: 453 try: 454 fut.get() 455 except tooz.ToozError: 456 # Whatever happens, ignore. Maybe we got booted out/never 457 # existed in the first place, or something is down, but we just 458 # want to call _stop after whatever happens to not leak any 459 # connection. 460 pass 461 self._stop() 462 self._started = False 463 464 def _stop(self): 465 pass 466 467 @staticmethod 468 def create_group(group_id): 469 """Request the creation of a group asynchronously. 470 471 :param group_id: the id of the group to create 472 :type group_id: ascii bytes 473 :returns: None 474 :rtype: CoordAsyncResult 475 """ 476 raise tooz.NotImplemented 477 478 @staticmethod 479 def get_groups(): 480 """Return the list composed by all groups ids asynchronously. 481 482 :returns: the list of all created group ids 483 :rtype: CoordAsyncResult 484 """ 485 raise tooz.NotImplemented 486 487 @staticmethod 488 def join_group(group_id, capabilities=b""): 489 """Join a group and establish group membership asynchronously. 490 491 :param group_id: the id of the group to join 492 :type group_id: ascii bytes 493 :param capabilities: the capabilities of the joined member 494 :type capabilities: object 495 :returns: None 496 :rtype: CoordAsyncResult 497 """ 498 raise tooz.NotImplemented 499 500 @_retry.retry() 501 def join_group_create(self, group_id, capabilities=b""): 502 """Join a group and create it if necessary. 503 504 If the group cannot be joined because it does not exist, it is created 505 before being joined. 506 507 This function will keep retrying until it can create the group and join 508 it. Since nothing is transactional, it may have to retry several times 509 if another member is creating/deleting the group at the same time. 510 511 :param group_id: Identifier of the group to join and create 512 :param capabilities: the capabilities of the joined member 513 """ 514 req = self.join_group(group_id, capabilities) 515 try: 516 req.get() 517 except GroupNotCreated: 518 req = self.create_group(group_id) 519 try: 520 req.get() 521 except GroupAlreadyExist: 522 # The group might have been created in the meantime, ignore 523 pass 524 # Now retry to join the group 525 raise _retry.TryAgain 526 527 @staticmethod 528 def leave_group(group_id): 529 """Leave a group asynchronously. 530 531 :param group_id: the id of the group to leave 532 :type group_id: ascii bytes 533 :returns: None 534 :rtype: CoordAsyncResult 535 """ 536 raise tooz.NotImplemented 537 538 @staticmethod 539 def delete_group(group_id): 540 """Delete a group asynchronously. 541 542 :param group_id: the id of the group to leave 543 :type group_id: ascii bytes 544 :returns: Result 545 :rtype: CoordAsyncResult 546 """ 547 raise tooz.NotImplemented 548 549 @staticmethod 550 def get_members(group_id): 551 """Return the set of all members ids of the specified group. 552 553 :returns: set of all created group ids 554 :rtype: CoordAsyncResult 555 """ 556 raise tooz.NotImplemented 557 558 @staticmethod 559 def get_member_capabilities(group_id, member_id): 560 """Return the capabilities of a member asynchronously. 561 562 :param group_id: the id of the group of the member 563 :type group_id: ascii bytes 564 :param member_id: the id of the member 565 :type member_id: ascii bytes 566 :returns: capabilities of a member 567 :rtype: CoordAsyncResult 568 """ 569 raise tooz.NotImplemented 570 571 @staticmethod 572 def get_member_info(group_id, member_id): 573 """Return the statistics and capabilities of a member asynchronously. 574 575 :param group_id: the id of the group of the member 576 :type group_id: ascii bytes 577 :param member_id: the id of the member 578 :type member_id: ascii bytes 579 :returns: capabilities and statistics of a member 580 :rtype: CoordAsyncResult 581 """ 582 raise tooz.NotImplemented 583 584 @staticmethod 585 def update_capabilities(group_id, capabilities): 586 """Update member capabilities in the specified group. 587 588 :param group_id: the id of the group of the current member 589 :type group_id: ascii bytes 590 :param capabilities: the capabilities of the updated member 591 :type capabilities: object 592 :returns: None 593 :rtype: CoordAsyncResult 594 """ 595 raise tooz.NotImplemented 596 597 @staticmethod 598 def get_leader(group_id): 599 """Return the leader for a group. 600 601 :param group_id: the id of the group: 602 :returns: the leader 603 :rtype: CoordAsyncResult 604 """ 605 raise tooz.NotImplemented 606 607 @staticmethod 608 def get_lock(name): 609 """Return a distributed lock. 610 611 This is a exclusive lock, a second call to acquire() will block or 612 return False. 613 614 :param name: The lock name that is used to identify it across all 615 nodes. 616 617 """ 618 raise tooz.NotImplemented 619 620 @staticmethod 621 def heartbeat(): 622 """Update member status to indicate it is still alive. 623 624 Method to run once in a while to be sure that the member is not dead 625 and is still an active member of a group. 626 627 :return: The number of seconds to wait before sending a new heartbeat. 628 """ 629 pass 630 631 632class CoordAsyncResult(object, metaclass=abc.ABCMeta): 633 """Representation of an asynchronous task. 634 635 Every call API returns an CoordAsyncResult object on which the result or 636 the status of the task can be requested. 637 638 """ 639 640 @abc.abstractmethod 641 def get(self, timeout=None): 642 """Retrieve the result of the corresponding asynchronous call. 643 644 :param timeout: block until the timeout expire. 645 :type timeout: float 646 """ 647 648 @abc.abstractmethod 649 def done(self): 650 """Returns True if the task is done, False otherwise.""" 651 652 653class CoordinatorResult(CoordAsyncResult): 654 """Asynchronous result that references a future.""" 655 656 def __init__(self, fut, failure_translator=None): 657 self._fut = fut 658 self._failure_translator = failure_translator 659 660 def get(self, timeout=None): 661 try: 662 if self._failure_translator: 663 with self._failure_translator(): 664 return self._fut.result(timeout=timeout) 665 else: 666 return self._fut.result(timeout=timeout) 667 except futures.TimeoutError as e: 668 utils.raise_with_cause(OperationTimedOut, 669 encodeutils.exception_to_unicode(e), 670 cause=e) 671 672 def done(self): 673 return self._fut.done() 674 675 676class CoordinationDriverWithExecutor(CoordinationDriver): 677 678 EXCLUDE_OPTIONS = None 679 680 def __init__(self, member_id, parsed_url, options): 681 self._options = utils.collapse(options, exclude=self.EXCLUDE_OPTIONS) 682 self._executor = utils.ProxyExecutor.build( 683 self.__class__.__name__, self._options) 684 super(CoordinationDriverWithExecutor, self).__init__( 685 member_id, parsed_url, options) 686 687 def start(self, start_heart=False): 688 self._executor.start() 689 super(CoordinationDriverWithExecutor, self).start(start_heart) 690 691 def stop(self): 692 super(CoordinationDriverWithExecutor, self).stop() 693 self._executor.stop() 694 695 696class CoordinationDriverCachedRunWatchers(CoordinationDriver): 697 """Coordination driver with a `run_watchers` implementation. 698 699 This implementation of `run_watchers` is based on a cache of the group 700 members between each run of `run_watchers` that is being updated between 701 each run. 702 703 """ 704 705 def __init__(self, member_id, parsed_url, options): 706 super(CoordinationDriverCachedRunWatchers, self).__init__( 707 member_id, parsed_url, options) 708 # A cache for group members 709 self._group_members = collections.defaultdict(set) 710 self._joined_groups = set() 711 712 def _init_watch_group(self, group_id): 713 if group_id not in self._group_members: 714 members = self.get_members(group_id) 715 self._group_members[group_id] = members.get() 716 717 def watch_join_group(self, group_id, callback): 718 self._init_watch_group(group_id) 719 super(CoordinationDriverCachedRunWatchers, self).watch_join_group( 720 group_id, callback) 721 722 def unwatch_join_group(self, group_id, callback): 723 super(CoordinationDriverCachedRunWatchers, self).unwatch_join_group( 724 group_id, callback) 725 726 if (not self._has_hooks_for_group(group_id) and 727 group_id in self._group_members): 728 del self._group_members[group_id] 729 730 def watch_leave_group(self, group_id, callback): 731 self._init_watch_group(group_id) 732 super(CoordinationDriverCachedRunWatchers, self).watch_leave_group( 733 group_id, callback) 734 735 def unwatch_leave_group(self, group_id, callback): 736 super(CoordinationDriverCachedRunWatchers, self).unwatch_leave_group( 737 group_id, callback) 738 739 if (not self._has_hooks_for_group(group_id) and 740 group_id in self._group_members): 741 del self._group_members[group_id] 742 743 def run_watchers(self, timeout=None): 744 with timeutils.StopWatch(duration=timeout) as w: 745 result = [] 746 group_with_hooks = set(self._hooks_join_group.keys()).union( 747 set(self._hooks_leave_group.keys())) 748 for group_id in group_with_hooks: 749 try: 750 group_members = self.get_members(group_id).get( 751 timeout=w.leftover(return_none=True)) 752 except GroupNotCreated: 753 group_members = set() 754 if (group_id in self._joined_groups and 755 self._member_id not in group_members): 756 self._joined_groups.discard(group_id) 757 old_group_members = self._group_members.get(group_id, set()) 758 for member_id in (group_members - old_group_members): 759 result.extend( 760 self._hooks_join_group[group_id].run( 761 MemberJoinedGroup(group_id, member_id))) 762 for member_id in (old_group_members - group_members): 763 result.extend( 764 self._hooks_leave_group[group_id].run( 765 MemberLeftGroup(group_id, member_id))) 766 self._group_members[group_id] = group_members 767 return result 768 769 770def get_coordinator(backend_url, member_id, 771 characteristics=frozenset(), **kwargs): 772 """Initialize and load the backend. 773 774 :param backend_url: the backend URL to use 775 :type backend: str 776 :param member_id: the id of the member 777 :type member_id: ascii bytes 778 :param characteristics: set 779 :type characteristics: set of :py:class:`.Characteristics` that will 780 be matched to the requested driver (this **will** 781 become a **required** parameter in a future tooz 782 version) 783 :param kwargs: additional coordinator options (these take precedence over 784 options of the **same** name found in the ``backend_url`` 785 arguments query string) 786 """ 787 parsed_url = netutils.urlsplit(backend_url) 788 parsed_qs = urllib.parse.parse_qs(parsed_url.query) 789 if kwargs: 790 options = {} 791 for (k, v) in kwargs.items(): 792 options[k] = [v] 793 for (k, v) in parsed_qs.items(): 794 if k not in options: 795 options[k] = v 796 else: 797 options = parsed_qs 798 d = driver.DriverManager( 799 namespace=TOOZ_BACKENDS_NAMESPACE, 800 name=parsed_url.scheme, 801 invoke_on_load=True, 802 invoke_args=(member_id, parsed_url, options)).driver 803 characteristics = set(characteristics) 804 driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set())) 805 missing_characteristics = characteristics - driver_characteristics 806 if missing_characteristics: 807 raise ToozDriverChosenPoorly("Desired characteristics %s" 808 " is not a strict subset of driver" 809 " characteristics %s, %s" 810 " characteristics were not found" 811 % (characteristics, 812 driver_characteristics, 813 missing_characteristics)) 814 return d 815 816 817# TODO(harlowja): We'll have to figure out a way to remove this 'alias' at 818# some point in the future (when we have a better way to tell people it has 819# moved without messing up their exception catching hierarchy). 820ToozError = tooz.ToozError 821 822 823class ToozDriverChosenPoorly(tooz.ToozError): 824 """Raised when a driver does not match desired characteristics.""" 825 826 827class ToozConnectionError(tooz.ToozError): 828 """Exception raised when the client cannot connect to the server.""" 829 830 831class OperationTimedOut(tooz.ToozError): 832 """Exception raised when an operation times out.""" 833 834 835class LockAcquireFailed(tooz.ToozError): 836 """Exception raised when a lock acquire fails in a context manager.""" 837 838 839class GroupNotCreated(tooz.ToozError): 840 """Exception raised when the caller request an nonexistent group.""" 841 def __init__(self, group_id): 842 self.group_id = group_id 843 super(GroupNotCreated, self).__init__( 844 "Group %s does not exist" % group_id) 845 846 847class GroupAlreadyExist(tooz.ToozError): 848 """Exception raised trying to create an already existing group.""" 849 def __init__(self, group_id): 850 self.group_id = group_id 851 super(GroupAlreadyExist, self).__init__( 852 "Group %s already exists" % group_id) 853 854 855class MemberAlreadyExist(tooz.ToozError): 856 """Exception raised trying to join a group already joined.""" 857 def __init__(self, group_id, member_id): 858 self.group_id = group_id 859 self.member_id = member_id 860 super(MemberAlreadyExist, self).__init__( 861 "Member %s has already joined %s" % 862 (member_id, group_id)) 863 864 865class MemberNotJoined(tooz.ToozError): 866 """Exception raised trying to access a member not in a group.""" 867 def __init__(self, group_id, member_id): 868 self.group_id = group_id 869 self.member_id = member_id 870 super(MemberNotJoined, self).__init__("Member %s has not joined %s" % 871 (member_id, group_id)) 872 873 874class GroupNotEmpty(tooz.ToozError): 875 "Exception raised when the caller try to delete a group with members." 876 def __init__(self, group_id): 877 self.group_id = group_id 878 super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id) 879 880 881class WatchCallbackNotFound(tooz.ToozError): 882 """Exception raised when unwatching a group. 883 884 Raised when the caller tries to unwatch a group with a callback that 885 does not exist. 886 887 """ 888 def __init__(self, group_id, callback): 889 self.group_id = group_id 890 self.callback = callback 891 super(WatchCallbackNotFound, self).__init__( 892 'Callback %s is not registered on group %s' % 893 (callback.__name__, group_id)) 894 895 896# TODO(harlowja,jd): We'll have to figure out a way to remove this 'alias' at 897# some point in the future (when we have a better way to tell people it has 898# moved without messing up their exception catching hierarchy). 899SerializationError = utils.SerializationError 900