1# -*- coding: utf-8 -*-
2#
3#    Copyright (C) 2016 Red Hat, Inc.
4#    Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
5#
6#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7#    not use this file except in compliance with the License. You may obtain
8#    a copy of the License at
9#
10#         http://www.apache.org/licenses/LICENSE-2.0
11#
12#    Unless required by applicable law or agreed to in writing, software
13#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15#    License for the specific language governing permissions and limitations
16#    under the License.
17
18import abc
19import collections
20from concurrent import futures
21import enum
22import logging
23import threading
24import urllib
25
26from oslo_utils import encodeutils
27from oslo_utils import netutils
28from oslo_utils import timeutils
29from stevedore import driver
30import tenacity
31
32import tooz
33from tooz import _retry
34from tooz import partitioner
35from tooz import utils
36
37LOG = logging.getLogger(__name__)
38
39
40TOOZ_BACKENDS_NAMESPACE = "tooz.backends"
41
42
43class Characteristics(enum.Enum):
44    """Attempts to describe the characteristic that a driver supports."""
45
46    DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS'
47    """Coordinator components when used by multiple **threads** work
48       the same as if those components were only used by a single thread."""
49
50    DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES'
51    """Coordinator components when used by multiple **processes** work
52       the same as if those components were only used by a single thread."""
53
54    DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS'
55    """Coordinator components when used by multiple **hosts** work
56       the same as if those components were only used by a single thread."""
57
58    NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED'
59    """The driver has the following property:
60
61    * Its operations are not based on the timeout of other clients, but on some
62      other more robust mechanisms.
63    """
64
65    LINEARIZABLE = 'LINEARIZABLE'
66    """The driver has the following properties:
67
68    * Ensures each operation must take place before its
69      completion time.
70    * Any operation invoked subsequently must take place
71      after the invocation and by extension, after the original operation
72      itself.
73    """
74
75    SEQUENTIAL = 'SEQUENTIAL'
76    """The driver has the following properties:
77
78    * Operations can take effect before or after completion – but all
79      operations retain the constraint that operations from any given process
80      must take place in that processes order.
81    """
82
83    CAUSAL = 'CAUSAL'
84    """The driver has the following properties:
85
86    * Does **not** have to enforce the order of every
87      operation from a process, perhaps, only causally related operations
88      must occur in order.
89    """
90
91    SERIALIZABLE = 'SERIALIZABLE'
92    """The driver has the following properties:
93
94    * The history of **all** operations is equivalent to
95      one that took place in some single atomic order but with unknown
96      invocation and completion times - it places no bounds on
97      time or order.
98    """
99
100    SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS'
101    """When a client is connected to a server and that server is partitioned
102    from a group of other servers it will (somehow) have the same view of
103    data as a client connected to a server on the other side of the
104    partition (typically this is accomplished by write availability being
105    lost and therefore nothing can change).
106    """
107
108    SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS'
109    """A client connected to one server will *always* have the same view
110    every other client will have (no matter what server those other
111    clients are connected to). Typically this is a sacrifice in
112    write availability because before a write can be acknowledged it must
113    be acknowledged by *all* servers in a cluster (so that all clients
114    that are connected to those servers read the exact *same* thing).
115    """
116
117
118class Hooks(list):
119    def run(self, *args, **kwargs):
120        return list(map(lambda cb: cb(*args, **kwargs), self))
121
122
123class Event(object):
124    """Base class for events."""
125
126
127class MemberJoinedGroup(Event):
128    """A member joined a group event."""
129
130    def __init__(self, group_id, member_id):
131        self.group_id = group_id
132        self.member_id = member_id
133
134    def __repr__(self):
135        return "<%s: group %s: +member %s>" % (self.__class__.__name__,
136                                               self.group_id,
137                                               self.member_id)
138
139
140class MemberLeftGroup(Event):
141    """A member left a group event."""
142
143    def __init__(self, group_id, member_id):
144        self.group_id = group_id
145        self.member_id = member_id
146
147    def __repr__(self):
148        return "<%s: group %s: -member %s>" % (self.__class__.__name__,
149                                               self.group_id,
150                                               self.member_id)
151
152
153class LeaderElected(Event):
154    """A leader as been elected."""
155
156    def __init__(self, group_id, member_id):
157        self.group_id = group_id
158        self.member_id = member_id
159
160
161class Heart(object):
162    """Coordination drivers main liveness pump (its heart)."""
163
164    def __init__(self, driver, thread_cls=threading.Thread,
165                 event_cls=threading.Event):
166        self._thread_cls = thread_cls
167        self._dead = event_cls()
168        self._runner = None
169        self._driver = driver
170        self._beats = 0
171
172    @property
173    def beats(self):
174        """How many times the heart has beaten."""
175        return self._beats
176
177    def is_alive(self):
178        """Returns if the heart is beating."""
179        return not (self._runner is None
180                    or not self._runner.is_alive())
181
182    def _beat_forever_until_stopped(self):
183        """Inner beating loop."""
184        retry = tenacity.Retrying(
185            wait=tenacity.wait_fixed(1),
186            before_sleep=tenacity.before_sleep_log(LOG, logging.WARNING),
187        )
188        while not self._dead.is_set():
189            with timeutils.StopWatch() as w:
190                wait_until_next_beat = retry(self._driver.heartbeat)
191            ran_for = w.elapsed()
192            has_to_sleep_for = wait_until_next_beat - ran_for
193            if has_to_sleep_for < 0:
194                LOG.warning(
195                    "Heartbeating took too long to execute (it ran for"
196                    " %0.2f seconds which is %0.2f seconds longer than"
197                    " the next heartbeat idle time). This may cause"
198                    " timeouts (in locks, leadership, ...) to"
199                    " happen (which will not end well).", ran_for,
200                    ran_for - wait_until_next_beat)
201            self._beats += 1
202            # NOTE(harlowja): use the event object for waiting and
203            # not a sleep function since doing that will allow this code
204            # to terminate early if stopped via the stop() method vs
205            # having to wait until the sleep function returns.
206            # NOTE(jd): Wait for only the half time of what we should.
207            # This is a measure of safety, better be too soon than too late.
208            self._dead.wait(has_to_sleep_for / 2.0)
209
210    def start(self, thread_cls=None):
211        """Starts the heart beating thread (noop if already started)."""
212        if not self.is_alive():
213            self._dead.clear()
214            self._beats = 0
215            if thread_cls is None:
216                thread_cls = self._thread_cls
217            self._runner = thread_cls(target=self._beat_forever_until_stopped)
218            self._runner.daemon = True
219            self._runner.start()
220
221    def stop(self):
222        """Requests the heart beating thread to stop beating."""
223        self._dead.set()
224
225    def wait(self, timeout=None):
226        """Wait up to given timeout for the heart beating thread to stop."""
227        self._runner.join(timeout)
228        return self._runner.is_alive()
229
230
231class CoordinationDriver(object):
232
233    requires_beating = False
234    """
235    Usage requirement that if true requires that the :py:meth:`~.heartbeat`
236    be called periodically (at a given rate) to avoid locks, sessions and
237    other from being automatically closed/discarded by the coordinators
238    backing store.
239    """
240
241    CHARACTERISTICS = ()
242    """
243    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
244    enum member(s) that can be used to interogate how this driver works.
245    """
246
247    def __init__(self, member_id, parsed_url, options):
248        super(CoordinationDriver, self).__init__()
249        self._member_id = member_id
250        self._started = False
251        self._hooks_join_group = collections.defaultdict(Hooks)
252        self._hooks_leave_group = collections.defaultdict(Hooks)
253        self._hooks_elected_leader = collections.defaultdict(Hooks)
254        self.requires_beating = (
255            CoordinationDriver.heartbeat != self.__class__.heartbeat
256        )
257        self.heart = Heart(self)
258
259    def _has_hooks_for_group(self, group_id):
260        return (group_id in self._hooks_join_group or
261                group_id in self._hooks_leave_group)
262
263    def join_partitioned_group(
264            self, group_id,
265            weight=1,
266            partitions=partitioner.Partitioner.DEFAULT_PARTITION_NUMBER):
267        """Join a group and get a partitioner.
268
269        A partitioner allows to distribute a bunch of objects across several
270        members using a consistent hash ring. Each object gets assigned (at
271        least) one member responsible for it. It's then possible to check which
272        object is owned by any member of the group.
273
274        This method also creates if necessary, and joins the group with the
275        selected weight.
276
277        :param group_id: The group to create a partitioner for.
278        :param weight: The weight to use in the hashring for this node.
279        :param partitions: The number of partitions to create.
280        :return: A :py:class:`~tooz.partitioner.Partitioner` object.
281
282        """
283        self.join_group_create(group_id, capabilities={'weight': weight})
284        return partitioner.Partitioner(self, group_id, partitions=partitions)
285
286    def leave_partitioned_group(self, partitioner):
287        """Leave a partitioned group.
288
289        This leaves the partitioned group and stop the partitioner.
290        :param group_id: The group to create a partitioner for.
291        """
292        leave = self.leave_group(partitioner.group_id)
293        partitioner.stop()
294        return leave.get()
295
296    @staticmethod
297    def run_watchers(timeout=None):
298        """Run the watchers callback.
299
300        This may also activate :py:meth:`.run_elect_coordinator` (depending
301        on driver implementation).
302        """
303        raise tooz.NotImplemented
304
305    @staticmethod
306    def run_elect_coordinator():
307        """Try to leader elect this coordinator & activate hooks on success."""
308        raise tooz.NotImplemented
309
310    def watch_join_group(self, group_id, callback):
311        """Call a function when group_id sees a new member joined.
312
313        The callback functions will be executed when `run_watchers` is
314        called.
315
316        :param group_id: The group id to watch
317        :param callback: The function to execute when a member joins this group
318
319        """
320        self._hooks_join_group[group_id].append(callback)
321
322    def unwatch_join_group(self, group_id, callback):
323        """Stop executing a function when a group_id sees a new member joined.
324
325        :param group_id: The group id to unwatch
326        :param callback: The function that was executed when a member joined
327                         this group
328        """
329        try:
330            # Check if group_id is in hooks to avoid creating a default empty
331            # entry in hooks list.
332            if group_id not in self._hooks_join_group:
333                raise ValueError
334            self._hooks_join_group[group_id].remove(callback)
335        except ValueError:
336            raise WatchCallbackNotFound(group_id, callback)
337
338        if not self._hooks_join_group[group_id]:
339            del self._hooks_join_group[group_id]
340
341    def watch_leave_group(self, group_id, callback):
342        """Call a function when group_id sees a new member leaving.
343
344        The callback functions will be executed when `run_watchers` is
345        called.
346
347        :param group_id: The group id to watch
348        :param callback: The function to execute when a member leaves this
349                         group
350
351        """
352        self._hooks_leave_group[group_id].append(callback)
353
354    def unwatch_leave_group(self, group_id, callback):
355        """Stop executing a function when a group_id sees a new member leaving.
356
357        :param group_id: The group id to unwatch
358        :param callback: The function that was executed when a member left
359                         this group
360        """
361        try:
362            # Check if group_id is in hooks to avoid creating a default empty
363            # entry in hooks list.
364            if group_id not in self._hooks_leave_group:
365                raise ValueError
366            self._hooks_leave_group[group_id].remove(callback)
367        except ValueError:
368            raise WatchCallbackNotFound(group_id, callback)
369
370        if not self._hooks_leave_group[group_id]:
371            del self._hooks_leave_group[group_id]
372
373    def watch_elected_as_leader(self, group_id, callback):
374        """Call a function when member gets elected as leader.
375
376        The callback functions will be executed when `run_watchers` is
377        called.
378
379        :param group_id: The group id to watch
380        :param callback: The function to execute when a member leaves this
381                         group
382
383        """
384        self._hooks_elected_leader[group_id].append(callback)
385
386    def unwatch_elected_as_leader(self, group_id, callback):
387        """Call a function when member gets elected as leader.
388
389        The callback functions will be executed when `run_watchers` is
390        called.
391
392        :param group_id: The group id to watch
393        :param callback: The function to execute when a member leaves this
394                         group
395
396        """
397        try:
398            self._hooks_elected_leader[group_id].remove(callback)
399        except ValueError:
400            raise WatchCallbackNotFound(group_id, callback)
401
402        if not self._hooks_elected_leader[group_id]:
403            del self._hooks_elected_leader[group_id]
404
405    @staticmethod
406    def stand_down_group_leader(group_id):
407        """Stand down as the group leader if we are.
408
409        :param group_id: The group where we don't want to be a leader anymore
410        """
411        raise tooz.NotImplemented
412
413    @property
414    def is_started(self):
415        return self._started
416
417    def start(self, start_heart=False):
418        """Start the service engine.
419
420        If needed, the establishment of a connection to the servers
421        is initiated.
422        """
423        if self._started:
424            raise tooz.ToozError(
425                "Can not start a driver which has not been stopped")
426        self._start()
427        if self.requires_beating and start_heart:
428            self.heart.start()
429        self._started = True
430        # Tracks which group are joined
431        self._joined_groups = set()
432
433    def _start(self):
434        pass
435
436    def stop(self):
437        """Stop the service engine.
438
439        If needed, the connection to servers is closed and the client will
440        disappear from all joined groups.
441        """
442        if not self._started:
443            raise tooz.ToozError(
444                "Can not stop a driver which has not been started")
445        if self.heart.is_alive():
446            self.heart.stop()
447            self.heart.wait()
448        # Some of the drivers modify joined_groups when being called to leave
449        # so clone it so that we aren't modifying something while iterating.
450        joined_groups = self._joined_groups.copy()
451        leaving = [self.leave_group(group) for group in joined_groups]
452        for fut in leaving:
453            try:
454                fut.get()
455            except tooz.ToozError:
456                # Whatever happens, ignore. Maybe we got booted out/never
457                # existed in the first place, or something is down, but we just
458                # want to call _stop after whatever happens to not leak any
459                # connection.
460                pass
461        self._stop()
462        self._started = False
463
464    def _stop(self):
465        pass
466
467    @staticmethod
468    def create_group(group_id):
469        """Request the creation of a group asynchronously.
470
471        :param group_id: the id of the group to create
472        :type group_id: ascii bytes
473        :returns: None
474        :rtype: CoordAsyncResult
475        """
476        raise tooz.NotImplemented
477
478    @staticmethod
479    def get_groups():
480        """Return the list composed by all groups ids asynchronously.
481
482        :returns: the list of all created group ids
483        :rtype: CoordAsyncResult
484        """
485        raise tooz.NotImplemented
486
487    @staticmethod
488    def join_group(group_id, capabilities=b""):
489        """Join a group and establish group membership asynchronously.
490
491        :param group_id: the id of the group to join
492        :type group_id: ascii bytes
493        :param capabilities: the capabilities of the joined member
494        :type capabilities: object
495        :returns: None
496        :rtype: CoordAsyncResult
497        """
498        raise tooz.NotImplemented
499
500    @_retry.retry()
501    def join_group_create(self, group_id, capabilities=b""):
502        """Join a group and create it if necessary.
503
504        If the group cannot be joined because it does not exist, it is created
505        before being joined.
506
507        This function will keep retrying until it can create the group and join
508        it. Since nothing is transactional, it may have to retry several times
509        if another member is creating/deleting the group at the same time.
510
511        :param group_id: Identifier of the group to join and create
512        :param capabilities: the capabilities of the joined member
513        """
514        req = self.join_group(group_id, capabilities)
515        try:
516            req.get()
517        except GroupNotCreated:
518            req = self.create_group(group_id)
519            try:
520                req.get()
521            except GroupAlreadyExist:
522                # The group might have been created in the meantime, ignore
523                pass
524            # Now retry to join the group
525            raise _retry.TryAgain
526
527    @staticmethod
528    def leave_group(group_id):
529        """Leave a group asynchronously.
530
531        :param group_id: the id of the group to leave
532        :type group_id: ascii bytes
533        :returns: None
534        :rtype: CoordAsyncResult
535        """
536        raise tooz.NotImplemented
537
538    @staticmethod
539    def delete_group(group_id):
540        """Delete a group asynchronously.
541
542        :param group_id: the id of the group to leave
543        :type group_id: ascii bytes
544        :returns: Result
545        :rtype: CoordAsyncResult
546        """
547        raise tooz.NotImplemented
548
549    @staticmethod
550    def get_members(group_id):
551        """Return the set of all members ids of the specified group.
552
553        :returns: set of all created group ids
554        :rtype: CoordAsyncResult
555        """
556        raise tooz.NotImplemented
557
558    @staticmethod
559    def get_member_capabilities(group_id, member_id):
560        """Return the capabilities of a member asynchronously.
561
562        :param group_id: the id of the group of the member
563        :type group_id: ascii bytes
564        :param member_id: the id of the member
565        :type member_id: ascii bytes
566        :returns: capabilities of a member
567        :rtype: CoordAsyncResult
568        """
569        raise tooz.NotImplemented
570
571    @staticmethod
572    def get_member_info(group_id, member_id):
573        """Return the statistics and capabilities of a member asynchronously.
574
575        :param group_id: the id of the group of the member
576        :type group_id: ascii bytes
577        :param member_id: the id of the member
578        :type member_id: ascii bytes
579        :returns: capabilities and statistics of a member
580        :rtype: CoordAsyncResult
581        """
582        raise tooz.NotImplemented
583
584    @staticmethod
585    def update_capabilities(group_id, capabilities):
586        """Update member capabilities in the specified group.
587
588        :param group_id: the id of the group of the current member
589        :type group_id: ascii bytes
590        :param capabilities: the capabilities of the updated member
591        :type capabilities: object
592        :returns: None
593        :rtype: CoordAsyncResult
594        """
595        raise tooz.NotImplemented
596
597    @staticmethod
598    def get_leader(group_id):
599        """Return the leader for a group.
600
601        :param group_id: the id of the group:
602        :returns: the leader
603        :rtype: CoordAsyncResult
604        """
605        raise tooz.NotImplemented
606
607    @staticmethod
608    def get_lock(name):
609        """Return a distributed lock.
610
611        This is a exclusive lock, a second call to acquire() will block or
612        return False.
613
614        :param name: The lock name that is used to identify it across all
615                     nodes.
616
617        """
618        raise tooz.NotImplemented
619
620    @staticmethod
621    def heartbeat():
622        """Update member status to indicate it is still alive.
623
624        Method to run once in a while to be sure that the member is not dead
625        and is still an active member of a group.
626
627        :return: The number of seconds to wait before sending a new heartbeat.
628        """
629        pass
630
631
632class CoordAsyncResult(object, metaclass=abc.ABCMeta):
633    """Representation of an asynchronous task.
634
635    Every call API returns an CoordAsyncResult object on which the result or
636    the status of the task can be requested.
637
638    """
639
640    @abc.abstractmethod
641    def get(self, timeout=None):
642        """Retrieve the result of the corresponding asynchronous call.
643
644        :param timeout: block until the timeout expire.
645        :type timeout: float
646        """
647
648    @abc.abstractmethod
649    def done(self):
650        """Returns True if the task is done, False otherwise."""
651
652
653class CoordinatorResult(CoordAsyncResult):
654    """Asynchronous result that references a future."""
655
656    def __init__(self, fut, failure_translator=None):
657        self._fut = fut
658        self._failure_translator = failure_translator
659
660    def get(self, timeout=None):
661        try:
662            if self._failure_translator:
663                with self._failure_translator():
664                    return self._fut.result(timeout=timeout)
665            else:
666                return self._fut.result(timeout=timeout)
667        except futures.TimeoutError as e:
668            utils.raise_with_cause(OperationTimedOut,
669                                   encodeutils.exception_to_unicode(e),
670                                   cause=e)
671
672    def done(self):
673        return self._fut.done()
674
675
676class CoordinationDriverWithExecutor(CoordinationDriver):
677
678    EXCLUDE_OPTIONS = None
679
680    def __init__(self, member_id, parsed_url, options):
681        self._options = utils.collapse(options, exclude=self.EXCLUDE_OPTIONS)
682        self._executor = utils.ProxyExecutor.build(
683            self.__class__.__name__, self._options)
684        super(CoordinationDriverWithExecutor, self).__init__(
685            member_id, parsed_url, options)
686
687    def start(self, start_heart=False):
688        self._executor.start()
689        super(CoordinationDriverWithExecutor, self).start(start_heart)
690
691    def stop(self):
692        super(CoordinationDriverWithExecutor, self).stop()
693        self._executor.stop()
694
695
696class CoordinationDriverCachedRunWatchers(CoordinationDriver):
697    """Coordination driver with a `run_watchers` implementation.
698
699    This implementation of `run_watchers` is based on a cache of the group
700    members between each run of `run_watchers` that is being updated between
701    each run.
702
703    """
704
705    def __init__(self, member_id, parsed_url, options):
706        super(CoordinationDriverCachedRunWatchers, self).__init__(
707            member_id, parsed_url, options)
708        # A cache for group members
709        self._group_members = collections.defaultdict(set)
710        self._joined_groups = set()
711
712    def _init_watch_group(self, group_id):
713        if group_id not in self._group_members:
714            members = self.get_members(group_id)
715            self._group_members[group_id] = members.get()
716
717    def watch_join_group(self, group_id, callback):
718        self._init_watch_group(group_id)
719        super(CoordinationDriverCachedRunWatchers, self).watch_join_group(
720            group_id, callback)
721
722    def unwatch_join_group(self, group_id, callback):
723        super(CoordinationDriverCachedRunWatchers, self).unwatch_join_group(
724            group_id, callback)
725
726        if (not self._has_hooks_for_group(group_id) and
727           group_id in self._group_members):
728            del self._group_members[group_id]
729
730    def watch_leave_group(self, group_id, callback):
731        self._init_watch_group(group_id)
732        super(CoordinationDriverCachedRunWatchers, self).watch_leave_group(
733            group_id, callback)
734
735    def unwatch_leave_group(self, group_id, callback):
736        super(CoordinationDriverCachedRunWatchers, self).unwatch_leave_group(
737            group_id, callback)
738
739        if (not self._has_hooks_for_group(group_id) and
740           group_id in self._group_members):
741            del self._group_members[group_id]
742
743    def run_watchers(self, timeout=None):
744        with timeutils.StopWatch(duration=timeout) as w:
745            result = []
746            group_with_hooks = set(self._hooks_join_group.keys()).union(
747                set(self._hooks_leave_group.keys()))
748            for group_id in group_with_hooks:
749                try:
750                    group_members = self.get_members(group_id).get(
751                        timeout=w.leftover(return_none=True))
752                except GroupNotCreated:
753                    group_members = set()
754                if (group_id in self._joined_groups and
755                        self._member_id not in group_members):
756                    self._joined_groups.discard(group_id)
757                old_group_members = self._group_members.get(group_id, set())
758                for member_id in (group_members - old_group_members):
759                    result.extend(
760                        self._hooks_join_group[group_id].run(
761                            MemberJoinedGroup(group_id, member_id)))
762                for member_id in (old_group_members - group_members):
763                    result.extend(
764                        self._hooks_leave_group[group_id].run(
765                            MemberLeftGroup(group_id, member_id)))
766                self._group_members[group_id] = group_members
767            return result
768
769
770def get_coordinator(backend_url, member_id,
771                    characteristics=frozenset(), **kwargs):
772    """Initialize and load the backend.
773
774    :param backend_url: the backend URL to use
775    :type backend: str
776    :param member_id: the id of the member
777    :type member_id: ascii bytes
778    :param characteristics: set
779    :type characteristics: set of :py:class:`.Characteristics` that will
780                           be matched to the requested driver (this **will**
781                           become a **required** parameter in a future tooz
782                           version)
783    :param kwargs: additional coordinator options (these take precedence over
784                   options of the **same** name found in the ``backend_url``
785                   arguments query string)
786    """
787    parsed_url = netutils.urlsplit(backend_url)
788    parsed_qs = urllib.parse.parse_qs(parsed_url.query)
789    if kwargs:
790        options = {}
791        for (k, v) in kwargs.items():
792            options[k] = [v]
793        for (k, v) in parsed_qs.items():
794            if k not in options:
795                options[k] = v
796    else:
797        options = parsed_qs
798    d = driver.DriverManager(
799        namespace=TOOZ_BACKENDS_NAMESPACE,
800        name=parsed_url.scheme,
801        invoke_on_load=True,
802        invoke_args=(member_id, parsed_url, options)).driver
803    characteristics = set(characteristics)
804    driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set()))
805    missing_characteristics = characteristics - driver_characteristics
806    if missing_characteristics:
807        raise ToozDriverChosenPoorly("Desired characteristics %s"
808                                     " is not a strict subset of driver"
809                                     " characteristics %s, %s"
810                                     " characteristics were not found"
811                                     % (characteristics,
812                                        driver_characteristics,
813                                        missing_characteristics))
814    return d
815
816
817# TODO(harlowja): We'll have to figure out a way to remove this 'alias' at
818# some point in the future (when we have a better way to tell people it has
819# moved without messing up their exception catching hierarchy).
820ToozError = tooz.ToozError
821
822
823class ToozDriverChosenPoorly(tooz.ToozError):
824    """Raised when a driver does not match desired characteristics."""
825
826
827class ToozConnectionError(tooz.ToozError):
828    """Exception raised when the client cannot connect to the server."""
829
830
831class OperationTimedOut(tooz.ToozError):
832    """Exception raised when an operation times out."""
833
834
835class LockAcquireFailed(tooz.ToozError):
836    """Exception raised when a lock acquire fails in a context manager."""
837
838
839class GroupNotCreated(tooz.ToozError):
840    """Exception raised when the caller request an nonexistent group."""
841    def __init__(self, group_id):
842        self.group_id = group_id
843        super(GroupNotCreated, self).__init__(
844            "Group %s does not exist" % group_id)
845
846
847class GroupAlreadyExist(tooz.ToozError):
848    """Exception raised trying to create an already existing group."""
849    def __init__(self, group_id):
850        self.group_id = group_id
851        super(GroupAlreadyExist, self).__init__(
852            "Group %s already exists" % group_id)
853
854
855class MemberAlreadyExist(tooz.ToozError):
856    """Exception raised trying to join a group already joined."""
857    def __init__(self, group_id, member_id):
858        self.group_id = group_id
859        self.member_id = member_id
860        super(MemberAlreadyExist, self).__init__(
861            "Member %s has already joined %s" %
862            (member_id, group_id))
863
864
865class MemberNotJoined(tooz.ToozError):
866    """Exception raised trying to access a member not in a group."""
867    def __init__(self, group_id, member_id):
868        self.group_id = group_id
869        self.member_id = member_id
870        super(MemberNotJoined, self).__init__("Member %s has not joined %s" %
871                                              (member_id, group_id))
872
873
874class GroupNotEmpty(tooz.ToozError):
875    "Exception raised when the caller try to delete a group with members."
876    def __init__(self, group_id):
877        self.group_id = group_id
878        super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id)
879
880
881class WatchCallbackNotFound(tooz.ToozError):
882    """Exception raised when unwatching a group.
883
884    Raised when the caller tries to unwatch a group with a callback that
885    does not exist.
886
887    """
888    def __init__(self, group_id, callback):
889        self.group_id = group_id
890        self.callback = callback
891        super(WatchCallbackNotFound, self).__init__(
892            'Callback %s is not registered on group %s' %
893            (callback.__name__, group_id))
894
895
896# TODO(harlowja,jd): We'll have to figure out a way to remove this 'alias' at
897# some point in the future (when we have a better way to tell people it has
898# moved without messing up their exception catching hierarchy).
899SerializationError = utils.SerializationError
900