1 /*
2  *  Copyright (c) 2009, 2015, Oracle and/or its affiliates. All rights reserved.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License, version 2.0,
6  *  as published by the Free Software Foundation.
7  *
8  *  This program is also distributed with certain software (including
9  *  but not limited to OpenSSL) that is licensed under separate terms,
10  *  as designated in a particular file or component or in included license
11  *  documentation.  The authors of MySQL hereby grant you an additional
12  *  permission to link the program and your derivative works with the
13  *  separately licensed software that they have included with MySQL.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License, version 2.0, for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 package com.mysql.clusterj.tie;
26 
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.List;
30 
31 import com.mysql.clusterj.ClusterJDatastoreException;
32 import com.mysql.clusterj.ClusterJFatalInternalException;
33 import com.mysql.clusterj.ClusterJHelper;
34 import com.mysql.clusterj.LockMode;
35 
36 import com.mysql.clusterj.core.store.ClusterTransaction;
37 import com.mysql.clusterj.core.store.Index;
38 import com.mysql.clusterj.core.store.IndexOperation;
39 import com.mysql.clusterj.core.store.IndexScanOperation;
40 import com.mysql.clusterj.core.store.Operation;
41 import com.mysql.clusterj.core.store.PartitionKey;
42 import com.mysql.clusterj.core.store.ScanOperation;
43 import com.mysql.clusterj.core.store.Table;
44 
45 import com.mysql.clusterj.core.util.I18NHelper;
46 import com.mysql.clusterj.core.util.Logger;
47 import com.mysql.clusterj.core.util.LoggerFactoryService;
48 import com.mysql.clusterj.tie.DbImpl.BufferManager;
49 
50 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
51 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
52 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
53 import com.mysql.ndbjtie.ndbapi.NdbOperation;
54 import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
55 import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
56 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
57 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
58 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
59 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
60 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
61 import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
62 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
63 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
64 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
65 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst;
66 
67 /**
68  *
69  */
70 class ClusterTransactionImpl implements ClusterTransaction {
71 
72     /** My message translator */
73     static final I18NHelper local = I18NHelper
74             .getInstance(ClusterTransactionImpl.class);
75 
76     /** My logger */
77     static final Logger logger = LoggerFactoryService.getFactory()
78             .getInstance(ClusterTransactionImpl.class);
79 
80     protected final static String USE_NDBRECORD_NAME = "com.mysql.clusterj.UseNdbRecord";
81     private static boolean USE_NDBRECORD = ClusterJHelper.getBooleanProperty(USE_NDBRECORD_NAME, "true");
82 
83     protected NdbTransaction ndbTransaction;
84     private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
85 
86     /** The cluster connection for this transaction */
87     protected ClusterConnectionImpl clusterConnectionImpl;
88 
89     /** The DbImpl associated with this NdbTransaction */
90     protected DbImpl db;
91 
92     /** The partition key; by default it doesn't do anything */
93     protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance();
94 
95     /** The NdbDictionary */
96     private Dictionary ndbDictionary;
97 
98     /** The coordinated transaction identifier */
99     private String coordinatedTransactionId = null;
100 
101     /** Is getCoordinatedTransactionId supported? True until proven false. */
102     private static boolean supportsGetCoordinatedTransactionId = true;
103 
104     /** Lock mode for find operations */
105     private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
106 
107     /** Lock mode for index lookup operations */
108     private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
109 
110     /** Lock mode for index scan operations */
111     private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
112 
113     /** Lock mode for table scan operations */
114     private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
115 
116     /** Autocommit flag if we are in an autocommit transaction */
117     private boolean autocommit = false;
118 
119     /** Autocommitted flag if we autocommitted early */
120     private boolean autocommitted = false;
121 
122     /** The transaction id to join this transaction to */
123     private String joinTransactionId;
124 
125     private BufferManager bufferManager;
126 
127     private List<Operation> operationsToCheck = new ArrayList<Operation>();
128 
ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl, DbImpl db, Dictionary ndbDictionary, String joinTransactionId)129     public ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl,
130             DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
131         this.db = db;
132         this.clusterConnectionImpl = clusterConnectionImpl;
133         this.ndbDictionary = ndbDictionary;
134         this.joinTransactionId = joinTransactionId;
135         this.bufferManager = db.getBufferManager();
136     }
137 
getClusterConnection()138     public ClusterConnectionImpl getClusterConnection() {
139         return this.clusterConnectionImpl;
140     }
141 
close()142     public void close() {
143         if (ndbTransaction != null) {
144             ndbTransaction.close();
145             ndbTransaction = null;
146         }
147     }
148 
executeCommit()149     public void executeCommit() {
150         executeCommit(true, true);
151     }
152 
isEnlisted()153     public boolean isEnlisted() {
154         return ndbTransaction != null;
155     }
156 
157     /**
158      * Enlist the ndb transaction if not already enlisted.
159      * If the coordinated transaction id is set, join an existing transaction.
160      * Otherwise, use the partition key to enlist the transaction.
161      */
enlist()162     private void enlist() {
163         db.assertOpen("ClusterTransactionImpl.enlist");
164         if (logger.isTraceEnabled()) logger.trace("ndbTransaction: " + ndbTransaction
165                 + " with joinTransactionId: " + joinTransactionId);
166         if (ndbTransaction == null) {
167             if (coordinatedTransactionId != null) {
168                 ndbTransaction = db.joinTransaction(coordinatedTransactionId);
169             } else {
170                 ndbTransaction = partitionKey.enlist(db);
171                 getCoordinatedTransactionId(db);
172             }
173         }
174     }
175 
executeCommit(boolean abort, boolean force)176     public void executeCommit(boolean abort, boolean force) {
177         db.assertOpen("ClusterTransactionImpl.executeCommit");
178         if (logger.isTraceEnabled()) logger.trace("");
179         // nothing to do if no ndbTransaction was ever enlisted or already autocommitted
180         if (isEnlisted() && !autocommitted) {
181             handlePendingPostExecuteCallbacks();
182             int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
183             int forceOption = force?1:0;
184             int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Commit,
185                     abortOption, forceOption);
186             handleError(returnCode, ndbTransaction);
187         }
188         autocommitted = false;
189         autocommit = false;
190     }
191 
executeNoCommit()192     public void executeNoCommit() {
193         executeNoCommit(true, true);
194     }
195 
executeNoCommit(boolean abort, boolean force)196     public void executeNoCommit(boolean abort, boolean force) {
197         db.assertOpen("ClusterTransactionImpl.executeNoCommit");
198         if (logger.isTraceEnabled()) logger.trace("");
199         if (!isEnlisted()) {
200             // nothing to do if no ndbTransaction was ever enlisted
201             return;
202         }
203         if (autocommit && postExecuteCallbacks.size() == 0) {
204             // optimization to commit now because no blob columns
205             executeCommit(abort, force);
206             autocommitted = true;
207             return;
208         }
209         int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
210         int forceOption = force?1:0;
211         int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.NoCommit,
212                 abortOption, forceOption);
213         handleError(returnCode, ndbTransaction);
214         performPostExecuteCallbacks();
215     }
216 
executeRollback()217     public void executeRollback() {
218         db.assertOpen("ClusterTransactionImpl.executeRollback");
219         if (!isEnlisted()) {
220             // nothing to do if no ndbTransaction was ever enlisted
221             return;
222         }
223         int abortOption = AbortOption.AO_IgnoreError;
224         int forceOption = 1;
225         int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Rollback,
226                 abortOption, forceOption);
227         handleError(returnCode, ndbTransaction);
228     }
229 
getDeleteOperation(Table storeTable)230     public Operation getDeleteOperation(Table storeTable) {
231         enlist();
232         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
233         if (USE_NDBRECORD) {
234             return new NdbRecordDeleteOperationImpl(this, storeTable);
235         }
236         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
237         handleError(ndbTable, ndbDictionary);
238         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
239         handleError(ndbOperation, ndbTransaction);
240         int returnCode = ndbOperation.deleteTuple();
241         handleError(returnCode, ndbTransaction);
242         return new OperationImpl(ndbOperation, this);
243     }
244 
getInsertOperation(Table storeTable)245     public Operation getInsertOperation(Table storeTable) {
246         enlist();
247         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
248         if (USE_NDBRECORD) {
249             return new NdbRecordInsertOperationImpl(this, storeTable);
250         }
251         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
252         handleError(ndbTable, ndbDictionary);
253         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
254         handleError(ndbOperation, ndbTransaction);
255         int returnCode = ndbOperation.insertTuple();
256         handleError(returnCode, ndbTransaction);
257         return new OperationImpl(ndbOperation, this);
258     }
259 
getIndexScanOperation(Index storeIndex, Table storeTable)260     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
261         enlist();
262         if (USE_NDBRECORD) {
263             return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, indexScanLockMode);
264         }
265         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
266         handleError(ndbIndex, ndbDictionary);
267         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
268         handleError(ndbOperation, ndbTransaction);
269         int scanFlags = 0;
270         int lockMode = indexScanLockMode;
271         if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
272             scanFlags = ScanFlag.SF_KeyInfo;
273         }
274         int parallel = 0;
275         int batch = 0;
276         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
277         handleError(returnCode, ndbTransaction);
278         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
279         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
280     }
281 
getIndexScanOperationMultiRange(Index storeIndex, Table storeTable)282     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
283         enlist();
284         if (USE_NDBRECORD) {
285             return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, true, indexScanLockMode);
286         }
287         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
288         handleError(ndbIndex, ndbDictionary);
289         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
290         handleError(ndbOperation, ndbTransaction);
291         int scanFlags = ScanFlag.SF_OrderBy;
292         int lockMode = indexScanLockMode;
293         if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
294             scanFlags |= ScanFlag.SF_KeyInfo;
295         }
296         scanFlags |= ScanFlag.SF_MultiRange;
297         int parallel = 0;
298         int batch = 0;
299         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
300         handleError(returnCode, ndbTransaction);
301         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
302         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
303     }
304 
getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable)305     public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) {
306         enlist();
307         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
308         handleError(ndbIndex, ndbDictionary);
309         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
310         handleError(ndbOperation, ndbTransaction);
311         int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
312         int scanFlags = ScanFlag.SF_KeyInfo;
313         int parallel = 0;
314         int batch = 0;
315         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
316         handleError(returnCode, ndbTransaction);
317         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
318         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
319     }
320 
getSelectOperation(Table storeTable)321     public Operation getSelectOperation(Table storeTable) {
322         enlist();
323         if (USE_NDBRECORD) {
324             return new NdbRecordKeyOperationImpl(this, storeTable);
325         }
326         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
327         handleError(ndbTable, ndbDictionary);
328         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
329         handleError(ndbOperation, ndbTransaction);
330         int lockMode = findLockMode;
331         int returnCode = ndbOperation.readTuple(lockMode);
332         handleError(returnCode, ndbTransaction);
333         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
334         return new OperationImpl(storeTable, ndbOperation, this);
335     }
336 
getTableScanOperation(Table storeTable)337     public ScanOperation getTableScanOperation(Table storeTable) {
338         enlist();
339         if (USE_NDBRECORD) {
340             return new NdbRecordTableScanOperationImpl(this, storeTable, tableScanLockMode);
341         }
342         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
343         handleError(ndbTable, ndbDictionary);
344         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
345         handleError(ndbScanOperation, ndbTransaction);
346         int lockMode = tableScanLockMode;
347         int scanFlags = 0;
348         if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
349             scanFlags = ScanFlag.SF_KeyInfo;
350         }
351         int parallel = 0;
352         int batch = 0;
353         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
354         handleError(returnCode, ndbTransaction);
355         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
356         return new ScanOperationImpl(storeTable, ndbScanOperation, this);
357     }
358 
getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable)359     public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) {
360         enlist();
361         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
362         handleError(ndbTable, ndbDictionary);
363         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
364         handleError(ndbScanOperation, ndbTransaction);
365         int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
366         int scanFlags = ScanFlag.SF_KeyInfo;
367         int parallel = 0;
368         int batch = 0;
369         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
370         handleError(returnCode, ndbTransaction);
371         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
372         return new ScanOperationImpl(storeTable, ndbScanOperation, this);
373     }
374 
getUniqueIndexOperation(Index storeIndex, Table storeTable)375     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
376         enlist();
377         if (USE_NDBRECORD) {
378             return new NdbRecordUniqueKeyOperationImpl(this, storeIndex, storeTable);
379         }
380         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
381         handleError(ndbIndex, ndbDictionary);
382         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
383         handleError(ndbIndexOperation, ndbTransaction);
384         int lockMode = lookupLockMode;
385         int returnCode = ndbIndexOperation.readTuple(lockMode);
386         handleError(returnCode, ndbTransaction);
387         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
388         return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
389     }
390 
getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable)391     public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
392         enlist();
393         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
394         handleError(ndbIndex, ndbDictionary);
395         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
396         handleError(ndbIndexOperation, ndbTransaction);
397         int returnCode = ndbIndexOperation.deleteTuple();
398         handleError(returnCode, ndbTransaction);
399         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
400         return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
401     }
402 
getUpdateOperation(Table storeTable)403     public Operation getUpdateOperation(Table storeTable) {
404         enlist();
405         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
406         handleError(ndbTable, ndbDictionary);
407         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
408         handleError(ndbOperation, ndbTransaction);
409         int returnCode = ndbOperation.updateTuple();
410         handleError(returnCode, ndbTransaction);
411         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
412         return new OperationImpl(storeTable, ndbOperation, this);
413     }
414 
getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable)415     public IndexOperation getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable) {
416         enlist();
417         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
418         handleError(ndbIndex, ndbDictionary);
419         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
420         handleError(ndbIndexOperation, ndbTransaction);
421         int returnCode = ndbIndexOperation.updateTuple();
422         handleError(returnCode, ndbTransaction);
423         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
424         return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
425     }
426 
getWriteOperation(Table storeTable)427     public Operation getWriteOperation(Table storeTable) {
428         enlist();
429         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
430         handleError(ndbTable, ndbDictionary);
431         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
432         handleError(ndbOperation, ndbTransaction);
433         int returnCode = ndbOperation.writeTuple();
434         handleError(returnCode, ndbTransaction);
435         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
436         return new OperationImpl(storeTable, ndbOperation, this);
437     }
438 
439     /** Create an NdbOperation for insert using NdbRecord.
440      *
441      * @param ndbRecord the NdbRecord
442      * @param buffer the buffer with data for the operation
443      * @param mask the mask of column values already set in the buffer
444      * @param options the OperationOptions for this operation
445      * @return the insert operation
446      */
insertTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)447     public NdbOperationConst insertTuple(NdbRecordConst ndbRecord,
448             ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
449         enlist();
450         NdbOperationConst operation = ndbTransaction.insertTuple(ndbRecord, buffer, mask, options, 0);
451         handleError(operation, ndbTransaction);
452         return operation;
453     }
454 
455     /** Create a table scan operation using NdbRecord.
456      *
457      * @param ndbRecord the NdbRecord for the result
458      * @param mask the columns to read
459      * @param options the scan options
460      * @return
461      */
scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options)462     public NdbScanOperation scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options) {
463         enlist();
464         int lockMode = tableScanLockMode;
465         NdbScanOperation operation = ndbTransaction.scanTable(ndbRecord, lockMode, mask, options, 0);
466         handleError(operation, ndbTransaction);
467         return operation;
468     }
469 
470     /** Create a scan operation on the index using NdbRecord.
471      *
472      * @param ndbRecord the ndb record
473      * @param mask the mask that specifies which columns to read
474      * @param object scan options // TODO change this
475      * @return
476      */
scanIndex(NdbRecordConst key_record, NdbRecordConst result_record, byte[] result_mask, ScanOptions scanOptions)477     public NdbIndexScanOperation scanIndex(NdbRecordConst key_record, NdbRecordConst result_record,
478             byte[] result_mask, ScanOptions scanOptions) {
479         enlist();
480         return ndbTransaction.scanIndex(key_record, result_record, indexScanLockMode, result_mask, null, scanOptions, 0);
481     }
482 
483     /** Create an NdbOperation for delete using NdbRecord.
484      *
485      * @param ndbRecord the NdbRecord
486      * @param buffer the buffer with data for the operation
487      * @param mask the mask of column values already set in the buffer
488      * @param options the OperationOptions for this operation
489      * @return the delete operation
490      */
deleteTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)491     public NdbOperationConst deleteTuple(NdbRecordConst ndbRecord,
492             ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
493         enlist();
494         NdbOperationConst operation = ndbTransaction.deleteTuple(ndbRecord, buffer, ndbRecord, null, mask, options, 0);
495         handleError(operation, ndbTransaction);
496         return operation;
497     }
498 
499     /** Create an NdbOperation for update using NdbRecord.
500      *
501      * @param ndbRecord the NdbRecord
502      * @param buffer the buffer with data for the operation
503      * @param mask the mask of column values already set in the buffer
504      * @param options the OperationOptions for this operation
505      * @return the update operation
506      */
updateTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)507     public NdbOperationConst updateTuple(NdbRecordConst ndbRecord,
508             ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
509         enlist();
510         NdbOperationConst operation = ndbTransaction.updateTuple(ndbRecord, buffer, ndbRecord, buffer, mask, options, 0);
511         handleError(operation, ndbTransaction);
512         return operation;
513     }
514 
515     /** Create an NdbOperation for write using NdbRecord.
516      *
517      * @param ndbRecord the NdbRecord
518      * @param buffer the buffer with data for the operation
519      * @param mask the mask of column values already set in the buffer
520      * @param options the OperationOptions for this operation
521      * @return the update operation
522      */
writeTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)523     public NdbOperationConst writeTuple(NdbRecordConst ndbRecord,
524             ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
525         enlist();
526         NdbOperationConst operation = ndbTransaction.writeTuple(ndbRecord, buffer, ndbRecord, buffer, mask, options, 0);
527         handleError(operation, ndbTransaction);
528         return operation;
529     }
530 
531     /** Create an NdbOperation for key read using NdbRecord. The 'find' lock mode is used.
532      *
533      * @param ndbRecordKeys the NdbRecord for the key
534      * @param keyBuffer the buffer with the key for the operation
535      * @param ndbRecordValues the NdbRecord for the value
536      * @param valueBuffer the buffer with the value returned by the operation
537      * @param mask the mask of column values to be read
538      * @param options the OperationOptions for this operation
539      * @return the ndb operation for key read
540      */
readTuple(NdbRecordConst ndbRecordKeys, ByteBuffer keyBuffer, NdbRecordConst ndbRecordValues, ByteBuffer valueBuffer, byte[] mask, OperationOptionsConst options)541     public NdbOperationConst readTuple(NdbRecordConst ndbRecordKeys, ByteBuffer keyBuffer,
542             NdbRecordConst ndbRecordValues, ByteBuffer valueBuffer,
543             byte[] mask, OperationOptionsConst options) {
544         enlist();
545         NdbOperationConst operation = ndbTransaction.readTuple(ndbRecordKeys, keyBuffer,
546                 ndbRecordValues, valueBuffer, findLockMode, mask, options, 0);
547         handleError(operation, ndbTransaction);
548         return operation;
549     }
550 
postExecuteCallback(Runnable callback)551     public void postExecuteCallback(Runnable callback) {
552         postExecuteCallbacks.add(callback);
553     }
554 
clearPostExecuteCallbacks()555     private void clearPostExecuteCallbacks() {
556         postExecuteCallbacks.clear();
557     }
558 
handlePendingPostExecuteCallbacks()559     private void handlePendingPostExecuteCallbacks() {
560         // if any pending postExecuteCallbacks, flush via executeNoCommit
561         if (!postExecuteCallbacks.isEmpty()) {
562             executeNoCommit(false, true);
563         }
564     }
565 
performPostExecuteCallbacks()566     private void performPostExecuteCallbacks() {
567         // check completed operations
568         StringBuilder exceptionMessages = new StringBuilder();
569         for (Operation op: operationsToCheck) {
570             int code = op.getErrorCode();
571             if (code != 0) {
572                 int mysqlCode = op.getMysqlCode();
573                 int status = op.getStatus();
574                 int classification = op.getClassification();
575                 String message = local.message("ERR_Datastore", -1, code, mysqlCode, status, classification,
576                         op.toString());
577                 exceptionMessages.append(message);
578                 exceptionMessages.append('\n');
579             }
580         }
581         operationsToCheck.clear();
582         // TODO should this set rollback only?
583         try {
584             for (Runnable runnable: postExecuteCallbacks) {
585                 try {
586                     runnable.run();
587                 } catch (Throwable t) {
588                     t.printStackTrace();
589                     exceptionMessages.append(t.getMessage());
590                     exceptionMessages.append('\n');
591                 }
592             }
593         } finally {
594             clearPostExecuteCallbacks();
595         }
596         if (exceptionMessages.length() > 0) {
597             throw new ClusterJDatastoreException(exceptionMessages.toString());
598         }
599     }
600 
601     /** Handle errors from ScanOperation where the error returnCode is -1.
602      *
603      * @param returnCode the return code from the nextResult operation
604      */
handleError(int returnCode)605     protected void handleError(int returnCode) {
606         if (returnCode == -1) {
607             NdbErrorConst ndbError = ndbTransaction.getNdbError();
608             String detail = db.getNdbErrorDetail(ndbError);
609             Utility.throwError(returnCode, ndbError, detail);
610         }
611     }
612 
handleError(int returnCode, NdbTransaction ndbTransaction)613     protected void handleError(int returnCode, NdbTransaction ndbTransaction) {
614         if (returnCode == 0) {
615             return;
616         } else {
617             NdbErrorConst ndbError = ndbTransaction.getNdbError();
618             if (ndbError.code() == 0) {
619                 return;
620             }
621             String detail = db.getNdbErrorDetail(ndbError);
622             Utility.throwError(returnCode, ndbError, detail);
623         }
624     }
625 
handleError(Object object, NdbTransaction ndbTransaction)626     protected void handleError(Object object, NdbTransaction ndbTransaction) {
627         if (object != null) {
628             return;
629         } else {
630             NdbErrorConst ndbError = ndbTransaction.getNdbError();
631             String detail = db.getNdbErrorDetail(ndbError);
632             Utility.throwError(null, ndbError, detail);
633         }
634     }
635 
handleError(Object object, Dictionary ndbDictionary)636     protected void handleError(Object object, Dictionary ndbDictionary) {
637         if (object != null) {
638             return;
639         } else {
640             NdbErrorConst ndbError = ndbDictionary.getNdbError();
641             String detail = db.getNdbErrorDetail(ndbError);
642             Utility.throwError(null, ndbError, detail);
643         }
644     }
645 
setPartitionKey(PartitionKey partitionKey)646     public void setPartitionKey(PartitionKey partitionKey) {
647         if (partitionKey == null) {
648             throw new ClusterJFatalInternalException(
649                     local.message("ERR_Partition_Key_Null"));
650         }
651         this.partitionKey = (PartitionKeyImpl)partitionKey;
652     }
653 
getCoordinatedTransactionId()654     public String getCoordinatedTransactionId() {
655         return coordinatedTransactionId;
656     }
657 
658     /** Get the coordinated transaction id if possible and update the field with
659      * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie
660      * and native library) the ndbTransaction.getCoordinatedTransactionId() method
661      * will throw an Error of some kind (java.lang.NoSuchMethodError or
662      * java.lang.UnsatisfiedLinkError) and this will cause this instance
663      * (and any other instance with access to the new value of the static variable
664      * supportsGetCoordinatedTransactionId) to never try again.
665      * @param db the DbImpl instance
666      */
getCoordinatedTransactionId(DbImpl db)667     private void getCoordinatedTransactionId(DbImpl db) {
668         try {
669             if (supportsGetCoordinatedTransactionId) {
670 // not implemented quite yet...
671 //                ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
672 //                coordinatedTransactionId = ndbTransaction.
673 //                        getCoordinatedTransactionId(buffer, buffer.capacity());
674                 if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
675                         + coordinatedTransactionId);
676                 throw new ClusterJFatalInternalException("Not Implemented");
677             }
678         } catch (Throwable t) {
679             // oops, don't do this again
680             supportsGetCoordinatedTransactionId = false;
681         }
682     }
683 
setCoordinatedTransactionId(String coordinatedTransactionId)684     public void setCoordinatedTransactionId(String coordinatedTransactionId) {
685         this.coordinatedTransactionId = coordinatedTransactionId;
686     }
687 
setLockMode(LockMode lockmode)688     public void setLockMode(LockMode lockmode) {
689         findLockMode = translateLockMode(lockmode);
690         lookupLockMode = findLockMode;
691         indexScanLockMode = findLockMode;
692         tableScanLockMode = findLockMode;
693     }
694 
translateLockMode(LockMode lockmode)695     private int translateLockMode(LockMode lockmode) {
696         switch(lockmode) {
697             case READ_COMMITTED:
698                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
699             case SHARED:
700                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read;
701             case EXCLUSIVE:
702                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
703             default:
704                 throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Lock_Mode", lockmode));
705         }
706     }
707 
setAutocommit(boolean autocommit)708     public void setAutocommit(boolean autocommit) {
709         this.autocommit = autocommit;
710     }
711 
getBufferManager()712     public BufferManager getBufferManager() {
713         return bufferManager;
714     }
715 
716     /** Get the cached NdbRecordImpl for this table. The NdbRecordImpl is cached in the
717      * cluster connection.
718      * @param storeTable the table
719      * @return
720      */
getCachedNdbRecordImpl(Table storeTable)721     protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
722         return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
723     }
724 
725     /** Get the cached NdbRecordImpl for this index and table. The NdbRecordImpl is cached in the
726      * cluster connection.
727      * @param storeTable the table
728      * @param storeIndex the index
729      * @return
730      */
getCachedNdbRecordImpl(Index storeIndex, Table storeTable)731     protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
732         return clusterConnectionImpl.getCachedNdbRecordImpl(storeIndex, storeTable);
733     }
734 
735     /**
736      * Add an operation to check for errors after execute.
737      * @param op the operation to check
738      */
addOperationToCheck(Operation op)739     public void addOperationToCheck(Operation op) {
740         operationsToCheck.add(op);
741     }
742 
743 }
744