1# -*- coding: utf-8 -*-
2#
3# Copyright © 2014 eNovance
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16import logging
17
18from oslo_utils import encodeutils
19import pymysql
20
21import tooz
22from tooz import _retry
23from tooz import coordination
24from tooz import locking
25from tooz import utils
26
27LOG = logging.getLogger(__name__)
28
29
30class MySQLLock(locking.Lock):
31    """A MySQL based lock."""
32
33    MYSQL_DEFAULT_PORT = 3306
34
35    def __init__(self, name, parsed_url, options):
36        super(MySQLLock, self).__init__(name)
37        self.acquired = False
38        self._conn = MySQLDriver.get_connection(parsed_url, options, True)
39
40    def acquire(self, blocking=True, shared=False):
41
42        if shared:
43            raise tooz.NotImplemented
44
45        @_retry.retry(stop_max_delay=blocking)
46        def _lock():
47            # NOTE(sileht): mysql-server (<5.7.5) allows only one lock per
48            # connection at a time:
49            #  select GET_LOCK("a", 0);
50            #  select GET_LOCK("b", 0); <-- this release lock "a" ...
51            # Or
52            #  select GET_LOCK("a", 0);
53            #  select GET_LOCK("a", 0); release and lock again "a"
54            #
55            # So, we track locally the lock status with self.acquired
56            if self.acquired is True:
57                if blocking:
58                    raise _retry.TryAgain
59                return False
60
61            try:
62                if not self._conn.open:
63                    self._conn.connect()
64                cur = self._conn.cursor()
65                cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
66                # Can return NULL on error
67                if cur.fetchone()[0] == 1:
68                    self.acquired = True
69                    return True
70            except pymysql.MySQLError as e:
71                utils.raise_with_cause(
72                    tooz.ToozError,
73                    encodeutils.exception_to_unicode(e),
74                    cause=e)
75
76            if blocking:
77                raise _retry.TryAgain
78            self._conn.close()
79            return False
80
81        try:
82            return _lock()
83        except Exception:
84            # Close the connection if we tried too much and finally failed, or
85            # anything else bad happened.
86            self._conn.close()
87            raise
88
89    def release(self):
90        if not self.acquired:
91            return False
92        try:
93            cur = self._conn.cursor()
94            cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
95            cur.fetchone()
96            self.acquired = False
97            self._conn.close()
98            return True
99        except pymysql.MySQLError as e:
100            utils.raise_with_cause(tooz.ToozError,
101                                   encodeutils.exception_to_unicode(e),
102                                   cause=e)
103
104    def __del__(self):
105        if self.acquired:
106            LOG.warning("unreleased lock %s garbage collected", self.name)
107
108
109class MySQLDriver(coordination.CoordinationDriver):
110    """A `MySQL`_ based driver.
111
112    This driver users `MySQL`_ database tables to
113    provide the coordination driver semantics and required API(s). It **is**
114    missing some functionality but in the future these not implemented API(s)
115    will be filled in.
116
117    The MySQL driver connection URI should look like::
118
119      mysql://USERNAME:PASSWORD@HOST[:PORT]/DBNAME[?unix_socket=SOCKET_PATH]
120
121    If not specified, PORT defaults to 3306.
122
123    .. _MySQL: http://dev.mysql.com/
124    """
125
126    CHARACTERISTICS = (
127        coordination.Characteristics.NON_TIMEOUT_BASED,
128        coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
129        coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
130        coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS,
131    )
132    """
133    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
134    enum member(s) that can be used to interogate how this driver works.
135    """
136
137    def __init__(self, member_id, parsed_url, options):
138        """Initialize the MySQL driver."""
139        super(MySQLDriver, self).__init__(member_id, parsed_url, options)
140        self._parsed_url = parsed_url
141        self._options = utils.collapse(options)
142
143    def _start(self):
144        self._conn = MySQLDriver.get_connection(self._parsed_url,
145                                                self._options)
146
147    def _stop(self):
148        self._conn.close()
149
150    def get_lock(self, name):
151        return MySQLLock(name, self._parsed_url, self._options)
152
153    @staticmethod
154    def watch_join_group(group_id, callback):
155        raise tooz.NotImplemented
156
157    @staticmethod
158    def unwatch_join_group(group_id, callback):
159        raise tooz.NotImplemented
160
161    @staticmethod
162    def watch_leave_group(group_id, callback):
163        raise tooz.NotImplemented
164
165    @staticmethod
166    def unwatch_leave_group(group_id, callback):
167        raise tooz.NotImplemented
168
169    @staticmethod
170    def watch_elected_as_leader(group_id, callback):
171        raise tooz.NotImplemented
172
173    @staticmethod
174    def unwatch_elected_as_leader(group_id, callback):
175        raise tooz.NotImplemented
176
177    @staticmethod
178    def get_connection(parsed_url, options, defer_connect=False):
179        host = parsed_url.hostname
180        port = parsed_url.port or MySQLLock.MYSQL_DEFAULT_PORT
181        dbname = parsed_url.path[1:]
182        username = parsed_url.username
183        password = parsed_url.password
184        unix_socket = options.get("unix_socket")
185
186        try:
187            if unix_socket:
188                return pymysql.Connect(unix_socket=unix_socket,
189                                       port=port,
190                                       user=username,
191                                       passwd=password,
192                                       database=dbname,
193                                       defer_connect=defer_connect)
194            else:
195                return pymysql.Connect(host=host,
196                                       port=port,
197                                       user=username,
198                                       passwd=password,
199                                       database=dbname,
200                                       defer_connect=defer_connect)
201        except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
202            utils.raise_with_cause(coordination.ToozConnectionError,
203                                   encodeutils.exception_to_unicode(e),
204                                   cause=e)
205