1# -*- coding: utf-8 -*- 2# 3# Copyright © 2014 eNovance 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16import logging 17 18from oslo_utils import encodeutils 19import pymysql 20 21import tooz 22from tooz import _retry 23from tooz import coordination 24from tooz import locking 25from tooz import utils 26 27LOG = logging.getLogger(__name__) 28 29 30class MySQLLock(locking.Lock): 31 """A MySQL based lock.""" 32 33 MYSQL_DEFAULT_PORT = 3306 34 35 def __init__(self, name, parsed_url, options): 36 super(MySQLLock, self).__init__(name) 37 self.acquired = False 38 self._conn = MySQLDriver.get_connection(parsed_url, options, True) 39 40 def acquire(self, blocking=True, shared=False): 41 42 if shared: 43 raise tooz.NotImplemented 44 45 @_retry.retry(stop_max_delay=blocking) 46 def _lock(): 47 # NOTE(sileht): mysql-server (<5.7.5) allows only one lock per 48 # connection at a time: 49 # select GET_LOCK("a", 0); 50 # select GET_LOCK("b", 0); <-- this release lock "a" ... 51 # Or 52 # select GET_LOCK("a", 0); 53 # select GET_LOCK("a", 0); release and lock again "a" 54 # 55 # So, we track locally the lock status with self.acquired 56 if self.acquired is True: 57 if blocking: 58 raise _retry.TryAgain 59 return False 60 61 try: 62 if not self._conn.open: 63 self._conn.connect() 64 cur = self._conn.cursor() 65 cur.execute("SELECT GET_LOCK(%s, 0);", self.name) 66 # Can return NULL on error 67 if cur.fetchone()[0] == 1: 68 self.acquired = True 69 return True 70 except pymysql.MySQLError as e: 71 utils.raise_with_cause( 72 tooz.ToozError, 73 encodeutils.exception_to_unicode(e), 74 cause=e) 75 76 if blocking: 77 raise _retry.TryAgain 78 self._conn.close() 79 return False 80 81 try: 82 return _lock() 83 except Exception: 84 # Close the connection if we tried too much and finally failed, or 85 # anything else bad happened. 86 self._conn.close() 87 raise 88 89 def release(self): 90 if not self.acquired: 91 return False 92 try: 93 cur = self._conn.cursor() 94 cur.execute("SELECT RELEASE_LOCK(%s);", self.name) 95 cur.fetchone() 96 self.acquired = False 97 self._conn.close() 98 return True 99 except pymysql.MySQLError as e: 100 utils.raise_with_cause(tooz.ToozError, 101 encodeutils.exception_to_unicode(e), 102 cause=e) 103 104 def __del__(self): 105 if self.acquired: 106 LOG.warning("unreleased lock %s garbage collected", self.name) 107 108 109class MySQLDriver(coordination.CoordinationDriver): 110 """A `MySQL`_ based driver. 111 112 This driver users `MySQL`_ database tables to 113 provide the coordination driver semantics and required API(s). It **is** 114 missing some functionality but in the future these not implemented API(s) 115 will be filled in. 116 117 The MySQL driver connection URI should look like:: 118 119 mysql://USERNAME:PASSWORD@HOST[:PORT]/DBNAME[?unix_socket=SOCKET_PATH] 120 121 If not specified, PORT defaults to 3306. 122 123 .. _MySQL: http://dev.mysql.com/ 124 """ 125 126 CHARACTERISTICS = ( 127 coordination.Characteristics.NON_TIMEOUT_BASED, 128 coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, 129 coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, 130 coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS, 131 ) 132 """ 133 Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable 134 enum member(s) that can be used to interogate how this driver works. 135 """ 136 137 def __init__(self, member_id, parsed_url, options): 138 """Initialize the MySQL driver.""" 139 super(MySQLDriver, self).__init__(member_id, parsed_url, options) 140 self._parsed_url = parsed_url 141 self._options = utils.collapse(options) 142 143 def _start(self): 144 self._conn = MySQLDriver.get_connection(self._parsed_url, 145 self._options) 146 147 def _stop(self): 148 self._conn.close() 149 150 def get_lock(self, name): 151 return MySQLLock(name, self._parsed_url, self._options) 152 153 @staticmethod 154 def watch_join_group(group_id, callback): 155 raise tooz.NotImplemented 156 157 @staticmethod 158 def unwatch_join_group(group_id, callback): 159 raise tooz.NotImplemented 160 161 @staticmethod 162 def watch_leave_group(group_id, callback): 163 raise tooz.NotImplemented 164 165 @staticmethod 166 def unwatch_leave_group(group_id, callback): 167 raise tooz.NotImplemented 168 169 @staticmethod 170 def watch_elected_as_leader(group_id, callback): 171 raise tooz.NotImplemented 172 173 @staticmethod 174 def unwatch_elected_as_leader(group_id, callback): 175 raise tooz.NotImplemented 176 177 @staticmethod 178 def get_connection(parsed_url, options, defer_connect=False): 179 host = parsed_url.hostname 180 port = parsed_url.port or MySQLLock.MYSQL_DEFAULT_PORT 181 dbname = parsed_url.path[1:] 182 username = parsed_url.username 183 password = parsed_url.password 184 unix_socket = options.get("unix_socket") 185 186 try: 187 if unix_socket: 188 return pymysql.Connect(unix_socket=unix_socket, 189 port=port, 190 user=username, 191 passwd=password, 192 database=dbname, 193 defer_connect=defer_connect) 194 else: 195 return pymysql.Connect(host=host, 196 port=port, 197 user=username, 198 passwd=password, 199 database=dbname, 200 defer_connect=defer_connect) 201 except (pymysql.err.OperationalError, pymysql.err.InternalError) as e: 202 utils.raise_with_cause(coordination.ToozConnectionError, 203 encodeutils.exception_to_unicode(e), 204 cause=e) 205