1# -*- coding: utf-8 -*-
2#
3# Copyright © 2014 eNovance
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17import contextlib
18import errno
19import functools
20import logging
21import socket
22
23from oslo_utils import encodeutils
24from pymemcache import client as pymemcache_client
25
26import tooz
27from tooz import _retry
28from tooz import coordination
29from tooz import locking
30from tooz import utils
31
32
33LOG = logging.getLogger(__name__)
34
35
36@contextlib.contextmanager
37def _failure_translator():
38    """Translates common pymemcache exceptions into tooz exceptions.
39
40    https://github.com/pinterest/pymemcache/blob/d995/pymemcache/client.py#L202
41    """
42    try:
43        yield
44    except pymemcache_client.MemcacheUnexpectedCloseError as e:
45        utils.raise_with_cause(coordination.ToozConnectionError,
46                               encodeutils.exception_to_unicode(e),
47                               cause=e)
48    except (socket.timeout, socket.error,
49            socket.gaierror, socket.herror) as e:
50        # TODO(harlowja): get upstream pymemcache to produce a better
51        # exception for these, using socket (vs. a memcache specific
52        # error) seems sorta not right and/or the best approach...
53        msg = encodeutils.exception_to_unicode(e)
54        if e.errno is not None:
55            msg += " (with errno %s [%s])" % (errno.errorcode[e.errno],
56                                              e.errno)
57        utils.raise_with_cause(coordination.ToozConnectionError,
58                               msg, cause=e)
59    except pymemcache_client.MemcacheError as e:
60        utils.raise_with_cause(tooz.ToozError,
61                               encodeutils.exception_to_unicode(e),
62                               cause=e)
63
64
65def _translate_failures(func):
66
67    @functools.wraps(func)
68    def wrapper(*args, **kwargs):
69        with _failure_translator():
70            return func(*args, **kwargs)
71
72    return wrapper
73
74
75class MemcachedLock(locking.Lock):
76    _LOCK_PREFIX = b'__TOOZ_LOCK_'
77
78    def __init__(self, coord, name, timeout):
79        super(MemcachedLock, self).__init__(self._LOCK_PREFIX + name)
80        self.coord = coord
81        self.timeout = timeout
82
83    def is_still_owner(self):
84        if not self.acquired:
85            return False
86        else:
87            owner = self.get_owner()
88            if owner is None:
89                return False
90            return owner == self.coord._member_id
91
92    def acquire(self, blocking=True, shared=False):
93        if shared:
94            raise tooz.NotImplemented
95
96        @_retry.retry(stop_max_delay=blocking)
97        @_translate_failures
98        def _acquire():
99            if self.coord.client.add(
100                    self.name,
101                    self.coord._member_id,
102                    expire=self.timeout,
103                    noreply=False):
104                self.coord._acquired_locks.append(self)
105                return True
106            if blocking is False:
107                return False
108            raise _retry.TryAgain
109
110        return _acquire()
111
112    @_translate_failures
113    def break_(self):
114        return bool(self.coord.client.delete(self.name, noreply=False))
115
116    @_translate_failures
117    def release(self):
118        if not self.acquired:
119            return False
120        # NOTE(harlowja): this has the potential to delete others locks
121        # especially if this key expired before the delete/release call is
122        # triggered.
123        #
124        # For example:
125        #
126        # 1. App #1 with coordinator 'A' acquires lock "b"
127        # 2. App #1 heartbeats every 10 seconds, expiry for lock let's
128        #    say is 11 seconds.
129        # 3. App #2 with coordinator also named 'A' blocks trying to get
130        #    lock "b" (let's say it retries attempts every 0.5 seconds)
131        # 4. App #1 is running behind a little bit, tries to heartbeat but
132        #    key has expired (log message is written); at this point app #1
133        #    doesn't own the lock anymore but it doesn't know that.
134        # 5. App #2 now retries and adds the key, and now it believes it
135        #    has the lock.
136        # 6. App #1 (still believing it has the lock) calls release, and
137        #    deletes app #2 lock, app #2 now doesn't own the lock anymore
138        #    but it doesn't know that and now app #(X + 1) can get it.
139        # 7. App #2 calls release (repeat #6 as many times as desired)
140        #
141        # Sadly I don't think memcache has the primitives to actually make
142        # this work, redis does because it has lua which can check a session
143        # id and then do the delete and bail out if the session id is not
144        # as expected but memcache doesn't seem to have any equivalent
145        # capability.
146        if self not in self.coord._acquired_locks:
147            return False
148        # Do a ghetto test to see what the value is... (see above note),
149        # and how this really can't be done safely with memcache due to
150        # it being done in the client side (non-atomic).
151        value = self.coord.client.get(self.name)
152        if value != self.coord._member_id:
153            return False
154        else:
155            was_deleted = self.coord.client.delete(self.name, noreply=False)
156            if was_deleted:
157                self.coord._acquired_locks.remove(self)
158            return was_deleted
159
160    @_translate_failures
161    def heartbeat(self):
162        """Keep the lock alive."""
163        if self.acquired:
164            poked = self.coord.client.touch(self.name,
165                                            expire=self.timeout,
166                                            noreply=False)
167            if poked:
168                return True
169            LOG.warning("Unable to heartbeat by updating key '%s' with "
170                        "extended expiry of %s seconds", self.name,
171                        self.timeout)
172        return False
173
174    @_translate_failures
175    def get_owner(self):
176        return self.coord.client.get(self.name)
177
178    @property
179    def acquired(self):
180        return self in self.coord._acquired_locks
181
182
183class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
184                      coordination.CoordinationDriverWithExecutor):
185    """A `memcached`_ based driver.
186
187    This driver users `memcached`_ concepts to provide the coordination driver
188    semantics and required API(s). It **is** fully functional and implements
189    all of the coordination driver API(s). It stores data into memcache
190    using expiries and `msgpack`_ encoded values.
191
192    The Memcached driver connection URI should look like::
193
194      memcached://[HOST[:PORT]][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]]
195
196    If not specified, HOST defaults to localhost and PORT defaults to 11211.
197    Available options are:
198
199    ==================  =======
200    Name                Default
201    ==================  =======
202    timeout             30
203    membership_timeout  30
204    lock_timeout        30
205    leader_timeout      30
206    max_pool_size       None
207    ==================  =======
208
209    General recommendations/usage considerations:
210
211    - Memcache (without different backend technology) is a **cache** enough
212      said.
213
214    .. _memcached: http://memcached.org/
215    .. _msgpack: http://msgpack.org/
216    """
217
218    CHARACTERISTICS = (
219        coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
220        coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
221        coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS,
222        coordination.Characteristics.CAUSAL,
223    )
224    """
225    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
226    enum member(s) that can be used to interogate how this driver works.
227    """
228
229    #: Key prefix attached to groups (used in name-spacing keys)
230    GROUP_PREFIX = b'_TOOZ_GROUP_'
231
232    #: Key prefix attached to leaders of groups (used in name-spacing keys)
233    GROUP_LEADER_PREFIX = b'_TOOZ_GROUP_LEADER_'
234
235    #: Key prefix attached to members of groups (used in name-spacing keys)
236    MEMBER_PREFIX = b'_TOOZ_MEMBER_'
237
238    #: Key where all groups 'known' are stored.
239    GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
240
241    #: Default socket/lock/member/leader timeout used when none is provided.
242    DEFAULT_TIMEOUT = 30
243
244    #: String used to keep a key/member alive (until it next expires).
245    STILL_ALIVE = b"It's alive!"
246
247    def __init__(self, member_id, parsed_url, options):
248        super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
249        self.host = (parsed_url.hostname or "localhost",
250                     parsed_url.port or 11211)
251        default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
252        self.timeout = int(default_timeout)
253        self.membership_timeout = int(self._options.get(
254            'membership_timeout', default_timeout))
255        self.lock_timeout = int(self._options.get(
256            'lock_timeout', default_timeout))
257        self.leader_timeout = int(self._options.get(
258            'leader_timeout', default_timeout))
259        max_pool_size = self._options.get('max_pool_size', None)
260        if max_pool_size is not None:
261            self.max_pool_size = int(max_pool_size)
262        else:
263            self.max_pool_size = None
264        self._acquired_locks = []
265
266    @staticmethod
267    def _msgpack_serializer(key, value):
268        if isinstance(value, bytes):
269            return value, 1
270        return utils.dumps(value), 2
271
272    @staticmethod
273    def _msgpack_deserializer(key, value, flags):
274        if flags == 1:
275            return value
276        if flags == 2:
277            return utils.loads(value)
278        raise coordination.SerializationError("Unknown serialization"
279                                              " format '%s'" % flags)
280
281    @_translate_failures
282    def _start(self):
283        super(MemcachedDriver, self)._start()
284        self.client = pymemcache_client.PooledClient(
285            self.host,
286            serializer=self._msgpack_serializer,
287            deserializer=self._msgpack_deserializer,
288            timeout=self.timeout,
289            connect_timeout=self.timeout,
290            max_pool_size=self.max_pool_size)
291        # Run heartbeat here because pymemcache use a lazy connection
292        # method and only connect once you do an operation.
293        self.heartbeat()
294
295    @_translate_failures
296    def _stop(self):
297        super(MemcachedDriver, self)._stop()
298        for lock in list(self._acquired_locks):
299            lock.release()
300        self.client.delete(self._encode_member_id(self._member_id))
301        self.client.close()
302
303    def _encode_group_id(self, group_id):
304        return self.GROUP_PREFIX + utils.to_binary(group_id)
305
306    def _encode_member_id(self, member_id):
307        return self.MEMBER_PREFIX + utils.to_binary(member_id)
308
309    def _encode_group_leader(self, group_id):
310        return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id)
311
312    @_retry.retry()
313    def _add_group_to_group_list(self, group_id):
314        """Add group to the group list.
315
316        :param group_id: The group id
317        """
318        group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
319        if cas:
320            group_list = set(group_list)
321            group_list.add(group_id)
322            if not self.client.cas(self.GROUP_LIST_KEY,
323                                   list(group_list), cas):
324                # Someone updated the group list before us, try again!
325                raise _retry.TryAgain
326        else:
327            if not self.client.add(self.GROUP_LIST_KEY,
328                                   [group_id], noreply=False):
329                # Someone updated the group list before us, try again!
330                raise _retry.TryAgain
331
332    @_retry.retry()
333    def _remove_from_group_list(self, group_id):
334        """Remove group from the group list.
335
336        :param group_id: The group id
337        """
338        group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
339        group_list = set(group_list)
340        group_list.remove(group_id)
341        if not self.client.cas(self.GROUP_LIST_KEY,
342                               list(group_list), cas):
343            # Someone updated the group list before us, try again!
344            raise _retry.TryAgain
345
346    def create_group(self, group_id):
347        encoded_group = self._encode_group_id(group_id)
348
349        @_translate_failures
350        def _create_group():
351            if not self.client.add(encoded_group, {}, noreply=False):
352                raise coordination.GroupAlreadyExist(group_id)
353            self._add_group_to_group_list(group_id)
354
355        return MemcachedFutureResult(self._executor.submit(_create_group))
356
357    def get_groups(self):
358
359        @_translate_failures
360        def _get_groups():
361            return self.client.get(self.GROUP_LIST_KEY) or []
362
363        return MemcachedFutureResult(self._executor.submit(_get_groups))
364
365    def join_group(self, group_id, capabilities=b""):
366        encoded_group = self._encode_group_id(group_id)
367
368        @_retry.retry()
369        @_translate_failures
370        def _join_group():
371            group_members, cas = self.client.gets(encoded_group)
372            if group_members is None:
373                raise coordination.GroupNotCreated(group_id)
374            if self._member_id in group_members:
375                raise coordination.MemberAlreadyExist(group_id,
376                                                      self._member_id)
377            group_members[self._member_id] = {
378                b"capabilities": capabilities,
379            }
380            if not self.client.cas(encoded_group, group_members, cas):
381                # It changed, let's try again
382                raise _retry.TryAgain
383            self._joined_groups.add(group_id)
384
385        return MemcachedFutureResult(self._executor.submit(_join_group))
386
387    def leave_group(self, group_id):
388        encoded_group = self._encode_group_id(group_id)
389
390        @_retry.retry()
391        @_translate_failures
392        def _leave_group():
393            group_members, cas = self.client.gets(encoded_group)
394            if group_members is None:
395                raise coordination.GroupNotCreated(group_id)
396            if self._member_id not in group_members:
397                raise coordination.MemberNotJoined(group_id, self._member_id)
398            del group_members[self._member_id]
399            if not self.client.cas(encoded_group, group_members, cas):
400                # It changed, let's try again
401                raise _retry.TryAgain
402            self._joined_groups.discard(group_id)
403
404        return MemcachedFutureResult(self._executor.submit(_leave_group))
405
406    def _destroy_group(self, group_id):
407        self.client.delete(self._encode_group_id(group_id))
408
409    def delete_group(self, group_id):
410        encoded_group = self._encode_group_id(group_id)
411
412        @_retry.retry()
413        @_translate_failures
414        def _delete_group():
415            group_members, cas = self.client.gets(encoded_group)
416            if group_members is None:
417                raise coordination.GroupNotCreated(group_id)
418            if group_members != {}:
419                raise coordination.GroupNotEmpty(group_id)
420            # Delete is not atomic, so we first set the group to
421            # using CAS, and then we delete it, to avoid race conditions.
422            if not self.client.cas(encoded_group, None, cas):
423                raise _retry.TryAgain
424            self.client.delete(encoded_group)
425            self._remove_from_group_list(group_id)
426
427        return MemcachedFutureResult(self._executor.submit(_delete_group))
428
429    @_retry.retry()
430    @_translate_failures
431    def _get_members(self, group_id):
432        encoded_group = self._encode_group_id(group_id)
433        group_members, cas = self.client.gets(encoded_group)
434        if group_members is None:
435            raise coordination.GroupNotCreated(group_id)
436        actual_group_members = {}
437        for m, v in group_members.items():
438            # Never kick self from the group, we know we're alive
439            if (m == self._member_id or
440               self.client.get(self._encode_member_id(m))):
441                actual_group_members[m] = v
442        if group_members != actual_group_members:
443            # There are some dead members, update the group
444            if not self.client.cas(encoded_group, actual_group_members, cas):
445                # It changed, let's try again
446                raise _retry.TryAgain
447        return actual_group_members
448
449    def get_members(self, group_id):
450
451        def _get_members():
452            return set(self._get_members(group_id).keys())
453
454        return MemcachedFutureResult(self._executor.submit(_get_members))
455
456    def get_member_capabilities(self, group_id, member_id):
457
458        def _get_member_capabilities():
459            group_members = self._get_members(group_id)
460            if member_id not in group_members:
461                raise coordination.MemberNotJoined(group_id, member_id)
462            return group_members[member_id][b'capabilities']
463
464        return MemcachedFutureResult(
465            self._executor.submit(_get_member_capabilities))
466
467    def update_capabilities(self, group_id, capabilities):
468        encoded_group = self._encode_group_id(group_id)
469
470        @_retry.retry()
471        @_translate_failures
472        def _update_capabilities():
473            group_members, cas = self.client.gets(encoded_group)
474            if group_members is None:
475                raise coordination.GroupNotCreated(group_id)
476            if self._member_id not in group_members:
477                raise coordination.MemberNotJoined(group_id, self._member_id)
478            group_members[self._member_id][b'capabilities'] = capabilities
479            if not self.client.cas(encoded_group, group_members, cas):
480                # It changed, try again
481                raise _retry.TryAgain
482
483        return MemcachedFutureResult(
484            self._executor.submit(_update_capabilities))
485
486    def get_leader(self, group_id):
487
488        def _get_leader():
489            return self._get_leader_lock(group_id).get_owner()
490
491        return MemcachedFutureResult(self._executor.submit(_get_leader))
492
493    @_translate_failures
494    def heartbeat(self):
495        self.client.set(self._encode_member_id(self._member_id),
496                        self.STILL_ALIVE,
497                        expire=self.membership_timeout)
498        # Reset the acquired locks
499        for lock in self._acquired_locks:
500            lock.heartbeat()
501        return min(self.membership_timeout,
502                   self.leader_timeout,
503                   self.lock_timeout)
504
505    def get_lock(self, name):
506        return MemcachedLock(self, name, self.lock_timeout)
507
508    def _get_leader_lock(self, group_id):
509        return MemcachedLock(self, self._encode_group_leader(group_id),
510                             self.leader_timeout)
511
512    @_translate_failures
513    def run_elect_coordinator(self):
514        for group_id, hooks in self._hooks_elected_leader.items():
515            # Try to grab the lock, if that fails, that means someone has it
516            # already.
517            leader_lock = self._get_leader_lock(group_id)
518            if leader_lock.acquire(blocking=False):
519                # We got the lock
520                hooks.run(coordination.LeaderElected(
521                    group_id,
522                    self._member_id))
523
524    def run_watchers(self, timeout=None):
525        result = super(MemcachedDriver, self).run_watchers(timeout=timeout)
526        self.run_elect_coordinator()
527        return result
528
529
530MemcachedFutureResult = functools.partial(
531    coordination.CoordinatorResult,
532    failure_translator=_failure_translator)
533