1 /*
2  *  Copyright (c) 2010, 2021, Oracle and/or its affiliates.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License, version 2.0,
6  *  as published by the Free Software Foundation.
7  *
8  *  This program is also distributed with certain software (including
9  *  but not limited to OpenSSL) that is licensed under separate terms,
10  *  as designated in a particular file or component or in included license
11  *  documentation.  The authors of MySQL hereby grant you an additional
12  *  permission to link the program and your derivative works with the
13  *  separately licensed software that they have included with MySQL.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License, version 2.0, for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 package com.mysql.clusterj.tie;
26 
27 import java.util.IdentityHashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 
34 import com.mysql.ndbjtie.ndbapi.Ndb;
35 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
36 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
37 
38 import com.mysql.clusterj.ClusterJDatastoreException;
39 import com.mysql.clusterj.ClusterJFatalInternalException;
40 import com.mysql.clusterj.ClusterJHelper;
41 
42 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
43 import com.mysql.clusterj.core.store.Db;
44 import com.mysql.clusterj.core.store.Index;
45 import com.mysql.clusterj.core.store.Table;
46 
47 import com.mysql.clusterj.core.util.I18NHelper;
48 import com.mysql.clusterj.core.util.Logger;
49 import com.mysql.clusterj.core.util.LoggerFactoryService;
50 
51 /**
52  *
53  */
54 public class ClusterConnectionImpl
55         implements com.mysql.clusterj.core.store.ClusterConnection {
56 
57     /** My message translator */
58     static final I18NHelper local = I18NHelper.getInstance(ClusterConnectionImpl.class);
59 
60     /** My logger */
61     static final Logger logger = LoggerFactoryService.getFactory()
62             .getInstance(ClusterConnectionImpl.class);
63 
64     /** Ndb_cluster_connection is wrapped by ClusterConnection */
65     protected Ndb_cluster_connection clusterConnection;
66 
67     /** The connection string for this connection */
68     final String connectString;
69 
70     /** The node id requested for this connection; 0 for default */
71     final int nodeId;
72 
73     /** The timeout value to connect to mgm */
74     final int connectTimeoutMgm;
75 
76     /** All regular dbs (not dbForNdbRecord) given out by this cluster connection */
77     private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>();
78 
79     /** The DbImplForNdbRecord */
80     DbImplForNdbRecord dbForNdbRecord;
81 
82     /** The map of table name to NdbRecordImpl */
83     private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>();
84 
85     /** The sizes of the byte buffer pool. Set from SessionFactoryImpl after construction, before connect. */
86     private int[] byteBufferPoolSizes;
87 
88     /** The byte buffer pool */
89     private VariableByteBufferPoolImpl byteBufferPool;
90 
91     /** The dictionary used to create NdbRecords */
92     Dictionary dictionaryForNdbRecord = null;
93 
94     private long[] autoIncrement;
95 
96     private static final String USE_SMART_VALUE_HANDLER_NAME = "com.mysql.clusterj.UseSmartValueHandler";
97 
98     private static final boolean USE_SMART_VALUE_HANDLER =
99             ClusterJHelper.getBooleanProperty(USE_SMART_VALUE_HANDLER_NAME, "true");
100 
101     /** Connect to the MySQL Cluster
102      *
103      * @param connectString the connect string
104      * @param nodeId the node id; node id of zero means "any node"
105      */
ClusterConnectionImpl(String connectString, int nodeId, int connectTimeoutMgm)106     public ClusterConnectionImpl(String connectString, int nodeId, int connectTimeoutMgm) {
107         this.connectString = connectString;
108         this.nodeId = nodeId;
109         this.connectTimeoutMgm = connectTimeoutMgm;
110         clusterConnection = Ndb_cluster_connection.create(connectString, nodeId);
111         handleError(clusterConnection, connectString, nodeId);
112         int timeoutError = clusterConnection.set_timeout(connectTimeoutMgm);
113         handleError(timeoutError, connectString, nodeId, connectTimeoutMgm);
114         logger.info(local.message("INFO_Create_Cluster_Connection", connectString, nodeId, connectTimeoutMgm));
115     }
116 
connect(int connectRetries, int connectDelay, boolean verbose)117     public void connect(int connectRetries, int connectDelay, boolean verbose) {
118         byteBufferPool = new VariableByteBufferPoolImpl(byteBufferPoolSizes);
119         checkConnection();
120         int returnCode = clusterConnection.connect(connectRetries, connectDelay, verbose?1:0);
121         handleError(returnCode, clusterConnection, connectString, nodeId);
122     }
123 
createDb(String database, int maxTransactions)124     public Db createDb(String database, int maxTransactions) {
125         checkConnection();
126         Ndb ndb = null;
127         // synchronize because create is not guaranteed thread-safe
128         synchronized(this) {
129             ndb = Ndb.create(clusterConnection, database, "def");
130             handleError(ndb, clusterConnection, connectString, nodeId);
131             if (dictionaryForNdbRecord == null) {
132                 // create a dictionary for NdbRecord
133                 Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def");
134                 handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId);
135                 dbForNdbRecord = new DbImplForNdbRecord(this, ndbForNdbRecord);
136                 dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary();
137             }
138         }
139         DbImpl result = new DbImpl(this, ndb, maxTransactions);
140         result.initializeAutoIncrement(autoIncrement);
141         dbs.put(result, null);
142         return result;
143     }
144 
waitUntilReady(int connectTimeoutBefore, int connectTimeoutAfter)145     public void waitUntilReady(int connectTimeoutBefore, int connectTimeoutAfter) {
146         checkConnection();
147         int returnCode = clusterConnection.wait_until_ready(connectTimeoutBefore, connectTimeoutAfter);
148         handleError(returnCode, clusterConnection, connectString, nodeId);
149     }
150 
checkConnection()151     private void checkConnection() {
152         if (clusterConnection == null) {
153             throw new ClusterJFatalInternalException(local.message("ERR_Cluster_Connection_Must_Not_Be_Null"));
154         }
155     }
156 
handleError(int timeoutError, String connectString, int nodeId, int connectTimeoutMgm)157     protected static void handleError(int timeoutError, String connectString, int nodeId, int connectTimeoutMgm) {
158         if (timeoutError != 0) {
159             String message = local.message("ERR_Set_Timeout_Mgm", connectString, nodeId, connectTimeoutMgm, timeoutError);
160             logger.error(message);
161             throw new ClusterJDatastoreException(message);
162         }
163     }
164 
handleError(int returnCode, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)165     protected static void handleError(int returnCode, Ndb_cluster_connection clusterConnection,
166             String connectString, int nodeId) {
167         if (returnCode >= 0) {
168             return;
169         } else {
170             try {
171                 throwError(returnCode, clusterConnection, connectString, nodeId);
172             } finally {
173                 // all errors on Ndb_cluster_connection are fatal
174                 Ndb_cluster_connection.delete(clusterConnection);
175             }
176         }
177     }
178 
handleError(Object object, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)179     protected static void handleError(Object object, Ndb_cluster_connection clusterConnection,
180             String connectString, int nodeId) {
181         if (object != null) {
182             return;
183         } else {
184             throwError(null, clusterConnection, connectString, nodeId);
185         }
186     }
187 
handleError(Ndb_cluster_connection clusterConnection, String connectString, int nodeId)188     protected static void handleError(Ndb_cluster_connection clusterConnection, String connectString, int nodeId) {
189         if (clusterConnection == null) {
190             String message = local.message("ERR_Connect", connectString, nodeId);
191             logger.error(message);
192             throw new ClusterJDatastoreException(message);
193         }
194     }
195 
throwError(Object returnCode, Ndb_cluster_connection clusterConnection, String connectString, int nodeId)196     protected static void throwError(Object returnCode, Ndb_cluster_connection clusterConnection,
197             String connectString, int nodeId) {
198         String message = clusterConnection.get_latest_error_msg();
199         int errorCode = clusterConnection.get_latest_error();
200         String msg = local.message("ERR_NdbError", returnCode, errorCode, message, connectString, nodeId);
201         throw new ClusterJDatastoreException(msg);
202     }
203 
close()204     public void close() {
205         if (clusterConnection != null) {
206             logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId));
207             for (DbImpl db: dbs.keySet()) {
208                 // mark all dbs as closing so no more transactions will start
209                 db.closing();
210             }
211             dbForNdbRecord.closing();
212             if (dbs.size() != 0) {
213                 Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs);
214                 for (Db db: dbsToClose.keySet()) {
215                     db.close();
216                 }
217             }
218             for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) {
219                 ndbRecord.releaseNdbRecord();
220             }
221             if (dbForNdbRecord != null) {
222                 dbForNdbRecord.close();
223                 dbForNdbRecord = null;
224             }
225             ndbRecordImplMap.clear();
226             Ndb_cluster_connection.delete(clusterConnection);
227             clusterConnection = null;
228         }
229     }
230 
close(Db db)231     public void close(Db db) {
232         dbs.remove(db);
233     }
234 
dbCount()235     public int dbCount() {
236         // one of the dbs is for the NdbRecord dictionary if it is not null
237         int dbForNdbRecord = (dictionaryForNdbRecord == null)?0:1;
238         return dbs.size() - dbForNdbRecord;
239     }
240 
241     /**
242      * Get the cached NdbRecord implementation for the table
243      * used with this cluster connection. All columns are included
244      * in the NdbRecord.
245      * Use a ConcurrentHashMap for best multithread performance.
246      * There are three possibilities:
247      * <ul><li>Case 1: return the already-cached NdbRecord
248      * </li><li>Case 2: return a new instance created by this method
249      * </li><li>Case 3: return the winner of a race with another thread
250      * </li></ul>
251      * @param storeTable the store table
252      * @return the NdbRecordImpl for the table
253      */
getCachedNdbRecordImpl(Table storeTable)254     protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
255         dbForNdbRecord.assertOpen("ClusterConnectionImpl.getCachedNdbRecordImpl for table");
256         String tableName = storeTable.getName();
257         // find the NdbRecordImpl in the global cache
258         NdbRecordImpl result = ndbRecordImplMap.get(tableName);
259         if (result != null) {
260             // case 1
261             if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + tableName);
262             return result;
263         } else {
264             // dictionary is single thread
265             NdbRecordImpl newNdbRecordImpl;
266             synchronized (dictionaryForNdbRecord) {
267                 // try again; another thread might have beat us
268                 result = ndbRecordImplMap.get(tableName);
269                 if (result != null) {
270                     return result;
271                 }
272                 newNdbRecordImpl = new NdbRecordImpl(storeTable, dictionaryForNdbRecord);
273             }
274             NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(tableName, newNdbRecordImpl);
275             if (winner == null) {
276                 // case 2: the previous value was null, so return the new (winning) value
277                 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + tableName);
278                 return newNdbRecordImpl;
279             } else {
280                 // case 3: another thread beat us, so return the winner and garbage collect ours
281                 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + tableName);
282                 newNdbRecordImpl.releaseNdbRecord();
283                 return winner;
284             }
285         }
286     }
287 
288     /**
289      * Get the cached NdbRecord implementation for the index and table
290      * used with this cluster connection.
291      * The NdbRecordImpl is cached under the name tableName+indexName.
292      * Only the key columns are included in the NdbRecord.
293      * Use a ConcurrentHashMap for best multithread performance.
294      * There are three possibilities:
295      * <ul><li>Case 1: return the already-cached NdbRecord
296      * </li><li>Case 2: return a new instance created by this method
297      * </li><li>Case 3: return the winner of a race with another thread
298      * </li></ul>
299      * @param storeTable the store table
300      * @param storeIndex the store index
301      * @return the NdbRecordImpl for the index
302      */
getCachedNdbRecordImpl(Index storeIndex, Table storeTable)303     protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
304         dbForNdbRecord.assertOpen("ClusterConnectionImpl.getCachedNdbRecordImpl for index");
305         String recordName = storeTable.getName() + "+" + storeIndex.getInternalName();
306         // find the NdbRecordImpl in the global cache
307         NdbRecordImpl result = ndbRecordImplMap.get(recordName);
308         if (result != null) {
309             // case 1
310             if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + recordName);
311             return result;
312         } else {
313             // dictionary is single thread
314             NdbRecordImpl newNdbRecordImpl;
315             synchronized (dictionaryForNdbRecord) {
316                 // try again; another thread might have beat us
317                 result = ndbRecordImplMap.get(recordName);
318                 if (result != null) {
319                     return result;
320                 }
321                 newNdbRecordImpl = new NdbRecordImpl(storeIndex, storeTable, dictionaryForNdbRecord);
322             }
323             NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(recordName, newNdbRecordImpl);
324             if (winner == null) {
325                 // case 2: the previous value was null, so return the new (winning) value
326                 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + recordName);
327                 return newNdbRecordImpl;
328             } else {
329                 // case 3: another thread beat us, so return the winner and garbage collect ours
330                 if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + recordName);
331                 newNdbRecordImpl.releaseNdbRecord();
332                 return winner;
333             }
334         }
335     }
336 
337     /** Remove the cached NdbRecord(s) associated with this table. This allows schema change to work.
338      * All NdbRecords including any index NdbRecords will be removed. Index NdbRecords are named
339      * tableName+indexName.
340      * @param tableName the name of the table
341      */
unloadSchema(String tableName)342     public void unloadSchema(String tableName) {
343         // synchronize to avoid multiple threads unloading schema simultaneously
344         // it is possible although unlikely that another thread is adding an entry while
345         // we are removing entries; if this occurs an error will be signaled here
346         synchronized(ndbRecordImplMap) {
347             Iterator<Map.Entry<String, NdbRecordImpl>> iterator = ndbRecordImplMap.entrySet().iterator();
348             while (iterator.hasNext()) {
349                 Map.Entry<String, NdbRecordImpl> entry = iterator.next();
350                 String key = entry.getKey();
351                 if (key.startsWith(tableName)) {
352                     // remove all records whose key begins with the table name; this will remove index records also
353                     if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + key);
354                     NdbRecordImpl record = entry.getValue();
355                     iterator.remove();
356                     if (record != null) {
357                         record.releaseNdbRecord();
358                     }
359                 }
360             }
361             if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
362             dictionaryForNdbRecord.removeCachedTable(tableName);
363         }
364     }
365 
getSmartValueHandlerFactory()366     public ValueHandlerFactory getSmartValueHandlerFactory() {
367         ValueHandlerFactory result = null;
368         if (USE_SMART_VALUE_HANDLER) {
369             result = new NdbRecordSmartValueHandlerFactoryImpl();
370         }
371         return result;
372     }
373 
newNdbRecordOperationImpl(DbImpl db, Table storeTable)374     public NdbRecordOperationImpl newNdbRecordOperationImpl(DbImpl db, Table storeTable) {
375         return new NdbRecordOperationImpl(this, db, storeTable);
376     }
377 
initializeAutoIncrement(long[] autoIncrement)378     public void initializeAutoIncrement(long[] autoIncrement) {
379         this.autoIncrement = autoIncrement;
380     }
381 
getByteBufferPool()382     public VariableByteBufferPoolImpl getByteBufferPool() {
383         return byteBufferPool;
384     }
385 
setByteBufferPoolSizes(int[] poolSizes)386     public void setByteBufferPoolSizes(int[] poolSizes) {
387         this.byteBufferPoolSizes = poolSizes;
388     }
389 
390 }
391