1 /* 2 * Copyright (c) 2010, 2021, Oracle and/or its affiliates. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License, version 2.0, 6 * as published by the Free Software Foundation. 7 * 8 * This program is also distributed with certain software (including 9 * but not limited to OpenSSL) that is licensed under separate terms, 10 * as designated in a particular file or component or in included license 11 * documentation. The authors of MySQL hereby grant you an additional 12 * permission to link the program and your derivative works with the 13 * separately licensed software that they have included with MySQL. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU General Public License, version 2.0, for more details. 19 * 20 * You should have received a copy of the GNU General Public License 21 * along with this program; if not, write to the Free Software 22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package com.mysql.clusterj.tie; 26 27 import java.util.IdentityHashMap; 28 import java.util.Iterator; 29 import java.util.Map; 30 31 import java.util.concurrent.ConcurrentHashMap; 32 import java.util.concurrent.ConcurrentMap; 33 34 import com.mysql.ndbjtie.ndbapi.Ndb; 35 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection; 36 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; 37 38 import com.mysql.clusterj.ClusterJDatastoreException; 39 import com.mysql.clusterj.ClusterJFatalInternalException; 40 import com.mysql.clusterj.ClusterJHelper; 41 42 import com.mysql.clusterj.core.spi.ValueHandlerFactory; 43 import com.mysql.clusterj.core.store.Db; 44 import com.mysql.clusterj.core.store.Index; 45 import com.mysql.clusterj.core.store.Table; 46 47 import com.mysql.clusterj.core.util.I18NHelper; 48 import com.mysql.clusterj.core.util.Logger; 49 import com.mysql.clusterj.core.util.LoggerFactoryService; 50 51 /** 52 * 53 */ 54 public class ClusterConnectionImpl 55 implements com.mysql.clusterj.core.store.ClusterConnection { 56 57 /** My message translator */ 58 static final I18NHelper local = I18NHelper.getInstance(ClusterConnectionImpl.class); 59 60 /** My logger */ 61 static final Logger logger = LoggerFactoryService.getFactory() 62 .getInstance(ClusterConnectionImpl.class); 63 64 /** Ndb_cluster_connection is wrapped by ClusterConnection */ 65 protected Ndb_cluster_connection clusterConnection; 66 67 /** The connection string for this connection */ 68 final String connectString; 69 70 /** The node id requested for this connection; 0 for default */ 71 final int nodeId; 72 73 /** The timeout value to connect to mgm */ 74 final int connectTimeoutMgm; 75 76 /** All regular dbs (not dbForNdbRecord) given out by this cluster connection */ 77 private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>(); 78 79 /** The DbImplForNdbRecord */ 80 DbImplForNdbRecord dbForNdbRecord; 81 82 /** The map of table name to NdbRecordImpl */ 83 private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>(); 84 85 /** The sizes of the byte buffer pool. Set from SessionFactoryImpl after construction, before connect. */ 86 private int[] byteBufferPoolSizes; 87 88 /** The byte buffer pool */ 89 private VariableByteBufferPoolImpl byteBufferPool; 90 91 /** The dictionary used to create NdbRecords */ 92 Dictionary dictionaryForNdbRecord = null; 93 94 private long[] autoIncrement; 95 96 private static final String USE_SMART_VALUE_HANDLER_NAME = "com.mysql.clusterj.UseSmartValueHandler"; 97 98 private static final boolean USE_SMART_VALUE_HANDLER = 99 ClusterJHelper.getBooleanProperty(USE_SMART_VALUE_HANDLER_NAME, "true"); 100 101 /** Connect to the MySQL Cluster 102 * 103 * @param connectString the connect string 104 * @param nodeId the node id; node id of zero means "any node" 105 */ ClusterConnectionImpl(String connectString, int nodeId, int connectTimeoutMgm)106 public ClusterConnectionImpl(String connectString, int nodeId, int connectTimeoutMgm) { 107 this.connectString = connectString; 108 this.nodeId = nodeId; 109 this.connectTimeoutMgm = connectTimeoutMgm; 110 clusterConnection = Ndb_cluster_connection.create(connectString, nodeId); 111 handleError(clusterConnection, connectString, nodeId); 112 int timeoutError = clusterConnection.set_timeout(connectTimeoutMgm); 113 handleError(timeoutError, connectString, nodeId, connectTimeoutMgm); 114 logger.info(local.message("INFO_Create_Cluster_Connection", connectString, nodeId, connectTimeoutMgm)); 115 } 116 connect(int connectRetries, int connectDelay, boolean verbose)117 public void connect(int connectRetries, int connectDelay, boolean verbose) { 118 byteBufferPool = new VariableByteBufferPoolImpl(byteBufferPoolSizes); 119 checkConnection(); 120 int returnCode = clusterConnection.connect(connectRetries, connectDelay, verbose?1:0); 121 handleError(returnCode, clusterConnection, connectString, nodeId); 122 } 123 createDb(String database, int maxTransactions)124 public Db createDb(String database, int maxTransactions) { 125 checkConnection(); 126 Ndb ndb = null; 127 // synchronize because create is not guaranteed thread-safe 128 synchronized(this) { 129 ndb = Ndb.create(clusterConnection, database, "def"); 130 handleError(ndb, clusterConnection, connectString, nodeId); 131 if (dictionaryForNdbRecord == null) { 132 // create a dictionary for NdbRecord 133 Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def"); 134 handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId); 135 dbForNdbRecord = new DbImplForNdbRecord(this, ndbForNdbRecord); 136 dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary(); 137 } 138 } 139 DbImpl result = new DbImpl(this, ndb, maxTransactions); 140 result.initializeAutoIncrement(autoIncrement); 141 dbs.put(result, null); 142 return result; 143 } 144 waitUntilReady(int connectTimeoutBefore, int connectTimeoutAfter)145 public void waitUntilReady(int connectTimeoutBefore, int connectTimeoutAfter) { 146 checkConnection(); 147 int returnCode = clusterConnection.wait_until_ready(connectTimeoutBefore, connectTimeoutAfter); 148 handleError(returnCode, clusterConnection, connectString, nodeId); 149 } 150 checkConnection()151 private void checkConnection() { 152 if (clusterConnection == null) { 153 throw new ClusterJFatalInternalException(local.message("ERR_Cluster_Connection_Must_Not_Be_Null")); 154 } 155 } 156 handleError(int timeoutError, String connectString, int nodeId, int connectTimeoutMgm)157 protected static void handleError(int timeoutError, String connectString, int nodeId, int connectTimeoutMgm) { 158 if (timeoutError != 0) { 159 String message = local.message("ERR_Set_Timeout_Mgm", connectString, nodeId, connectTimeoutMgm, timeoutError); 160 logger.error(message); 161 throw new ClusterJDatastoreException(message); 162 } 163 } 164 handleError(int returnCode, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)165 protected static void handleError(int returnCode, Ndb_cluster_connection clusterConnection, 166 String connectString, int nodeId) { 167 if (returnCode >= 0) { 168 return; 169 } else { 170 try { 171 throwError(returnCode, clusterConnection, connectString, nodeId); 172 } finally { 173 // all errors on Ndb_cluster_connection are fatal 174 Ndb_cluster_connection.delete(clusterConnection); 175 } 176 } 177 } 178 handleError(Object object, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)179 protected static void handleError(Object object, Ndb_cluster_connection clusterConnection, 180 String connectString, int nodeId) { 181 if (object != null) { 182 return; 183 } else { 184 throwError(null, clusterConnection, connectString, nodeId); 185 } 186 } 187 handleError(Ndb_cluster_connection clusterConnection, String connectString, int nodeId)188 protected static void handleError(Ndb_cluster_connection clusterConnection, String connectString, int nodeId) { 189 if (clusterConnection == null) { 190 String message = local.message("ERR_Connect", connectString, nodeId); 191 logger.error(message); 192 throw new ClusterJDatastoreException(message); 193 } 194 } 195 throwError(Object returnCode, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)196 protected static void throwError(Object returnCode, Ndb_cluster_connection clusterConnection, 197 String connectString, int nodeId) { 198 String message = clusterConnection.get_latest_error_msg(); 199 int errorCode = clusterConnection.get_latest_error(); 200 String msg = local.message("ERR_NdbError", returnCode, errorCode, message, connectString, nodeId); 201 throw new ClusterJDatastoreException(msg); 202 } 203 close()204 public void close() { 205 if (clusterConnection != null) { 206 logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId)); 207 for (DbImpl db: dbs.keySet()) { 208 // mark all dbs as closing so no more transactions will start 209 db.closing(); 210 } 211 dbForNdbRecord.closing(); 212 if (dbs.size() != 0) { 213 Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs); 214 for (Db db: dbsToClose.keySet()) { 215 db.close(); 216 } 217 } 218 for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) { 219 ndbRecord.releaseNdbRecord(); 220 } 221 if (dbForNdbRecord != null) { 222 dbForNdbRecord.close(); 223 dbForNdbRecord = null; 224 } 225 ndbRecordImplMap.clear(); 226 Ndb_cluster_connection.delete(clusterConnection); 227 clusterConnection = null; 228 } 229 } 230 close(Db db)231 public void close(Db db) { 232 dbs.remove(db); 233 } 234 dbCount()235 public int dbCount() { 236 // one of the dbs is for the NdbRecord dictionary if it is not null 237 int dbForNdbRecord = (dictionaryForNdbRecord == null)?0:1; 238 return dbs.size() - dbForNdbRecord; 239 } 240 241 /** 242 * Get the cached NdbRecord implementation for the table 243 * used with this cluster connection. All columns are included 244 * in the NdbRecord. 245 * Use a ConcurrentHashMap for best multithread performance. 246 * There are three possibilities: 247 * <ul><li>Case 1: return the already-cached NdbRecord 248 * </li><li>Case 2: return a new instance created by this method 249 * </li><li>Case 3: return the winner of a race with another thread 250 * </li></ul> 251 * @param storeTable the store table 252 * @return the NdbRecordImpl for the table 253 */ getCachedNdbRecordImpl(Table storeTable)254 protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) { 255 dbForNdbRecord.assertOpen("ClusterConnectionImpl.getCachedNdbRecordImpl for table"); 256 String tableName = storeTable.getName(); 257 // find the NdbRecordImpl in the global cache 258 NdbRecordImpl result = ndbRecordImplMap.get(tableName); 259 if (result != null) { 260 // case 1 261 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + tableName); 262 return result; 263 } else { 264 // dictionary is single thread 265 NdbRecordImpl newNdbRecordImpl; 266 synchronized (dictionaryForNdbRecord) { 267 // try again; another thread might have beat us 268 result = ndbRecordImplMap.get(tableName); 269 if (result != null) { 270 return result; 271 } 272 newNdbRecordImpl = new NdbRecordImpl(storeTable, dictionaryForNdbRecord); 273 } 274 NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(tableName, newNdbRecordImpl); 275 if (winner == null) { 276 // case 2: the previous value was null, so return the new (winning) value 277 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + tableName); 278 return newNdbRecordImpl; 279 } else { 280 // case 3: another thread beat us, so return the winner and garbage collect ours 281 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + tableName); 282 newNdbRecordImpl.releaseNdbRecord(); 283 return winner; 284 } 285 } 286 } 287 288 /** 289 * Get the cached NdbRecord implementation for the index and table 290 * used with this cluster connection. 291 * The NdbRecordImpl is cached under the name tableName+indexName. 292 * Only the key columns are included in the NdbRecord. 293 * Use a ConcurrentHashMap for best multithread performance. 294 * There are three possibilities: 295 * <ul><li>Case 1: return the already-cached NdbRecord 296 * </li><li>Case 2: return a new instance created by this method 297 * </li><li>Case 3: return the winner of a race with another thread 298 * </li></ul> 299 * @param storeTable the store table 300 * @param storeIndex the store index 301 * @return the NdbRecordImpl for the index 302 */ getCachedNdbRecordImpl(Index storeIndex, Table storeTable)303 protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) { 304 dbForNdbRecord.assertOpen("ClusterConnectionImpl.getCachedNdbRecordImpl for index"); 305 String recordName = storeTable.getName() + "+" + storeIndex.getInternalName(); 306 // find the NdbRecordImpl in the global cache 307 NdbRecordImpl result = ndbRecordImplMap.get(recordName); 308 if (result != null) { 309 // case 1 310 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + recordName); 311 return result; 312 } else { 313 // dictionary is single thread 314 NdbRecordImpl newNdbRecordImpl; 315 synchronized (dictionaryForNdbRecord) { 316 // try again; another thread might have beat us 317 result = ndbRecordImplMap.get(recordName); 318 if (result != null) { 319 return result; 320 } 321 newNdbRecordImpl = new NdbRecordImpl(storeIndex, storeTable, dictionaryForNdbRecord); 322 } 323 NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(recordName, newNdbRecordImpl); 324 if (winner == null) { 325 // case 2: the previous value was null, so return the new (winning) value 326 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + recordName); 327 return newNdbRecordImpl; 328 } else { 329 // case 3: another thread beat us, so return the winner and garbage collect ours 330 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + recordName); 331 newNdbRecordImpl.releaseNdbRecord(); 332 return winner; 333 } 334 } 335 } 336 337 /** Remove the cached NdbRecord(s) associated with this table. This allows schema change to work. 338 * All NdbRecords including any index NdbRecords will be removed. Index NdbRecords are named 339 * tableName+indexName. 340 * @param tableName the name of the table 341 */ unloadSchema(String tableName)342 public void unloadSchema(String tableName) { 343 // synchronize to avoid multiple threads unloading schema simultaneously 344 // it is possible although unlikely that another thread is adding an entry while 345 // we are removing entries; if this occurs an error will be signaled here 346 synchronized(ndbRecordImplMap) { 347 Iterator<Map.Entry<String, NdbRecordImpl>> iterator = ndbRecordImplMap.entrySet().iterator(); 348 while (iterator.hasNext()) { 349 Map.Entry<String, NdbRecordImpl> entry = iterator.next(); 350 String key = entry.getKey(); 351 if (key.startsWith(tableName)) { 352 // remove all records whose key begins with the table name; this will remove index records also 353 if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + key); 354 NdbRecordImpl record = entry.getValue(); 355 iterator.remove(); 356 if (record != null) { 357 record.releaseNdbRecord(); 358 } 359 } 360 } 361 if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName); 362 dictionaryForNdbRecord.removeCachedTable(tableName); 363 } 364 } 365 getSmartValueHandlerFactory()366 public ValueHandlerFactory getSmartValueHandlerFactory() { 367 ValueHandlerFactory result = null; 368 if (USE_SMART_VALUE_HANDLER) { 369 result = new NdbRecordSmartValueHandlerFactoryImpl(); 370 } 371 return result; 372 } 373 newNdbRecordOperationImpl(DbImpl db, Table storeTable)374 public NdbRecordOperationImpl newNdbRecordOperationImpl(DbImpl db, Table storeTable) { 375 return new NdbRecordOperationImpl(this, db, storeTable); 376 } 377 initializeAutoIncrement(long[] autoIncrement)378 public void initializeAutoIncrement(long[] autoIncrement) { 379 this.autoIncrement = autoIncrement; 380 } 381 getByteBufferPool()382 public VariableByteBufferPoolImpl getByteBufferPool() { 383 return byteBufferPool; 384 } 385 setByteBufferPoolSizes(int[] poolSizes)386 public void setByteBufferPoolSizes(int[] poolSizes) { 387 this.byteBufferPoolSizes = poolSizes; 388 } 389 390 } 391