1 /* 2 * Copyright (c) 2009, 2015, Oracle and/or its affiliates. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License, version 2.0, 6 * as published by the Free Software Foundation. 7 * 8 * This program is also distributed with certain software (including 9 * but not limited to OpenSSL) that is licensed under separate terms, 10 * as designated in a particular file or component or in included license 11 * documentation. The authors of MySQL hereby grant you an additional 12 * permission to link the program and your derivative works with the 13 * separately licensed software that they have included with MySQL. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU General Public License, version 2.0, for more details. 19 * 20 * You should have received a copy of the GNU General Public License 21 * along with this program; if not, write to the Free Software 22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package com.mysql.clusterj.tie; 26 27 import java.nio.ByteBuffer; 28 import java.util.ArrayList; 29 import java.util.List; 30 31 import com.mysql.clusterj.ClusterJDatastoreException; 32 import com.mysql.clusterj.ClusterJFatalInternalException; 33 import com.mysql.clusterj.ClusterJHelper; 34 import com.mysql.clusterj.LockMode; 35 36 import com.mysql.clusterj.core.store.ClusterTransaction; 37 import com.mysql.clusterj.core.store.Index; 38 import com.mysql.clusterj.core.store.IndexOperation; 39 import com.mysql.clusterj.core.store.IndexScanOperation; 40 import com.mysql.clusterj.core.store.Operation; 41 import com.mysql.clusterj.core.store.PartitionKey; 42 import com.mysql.clusterj.core.store.ScanOperation; 43 import com.mysql.clusterj.core.store.Table; 44 45 import com.mysql.clusterj.core.util.I18NHelper; 46 import com.mysql.clusterj.core.util.Logger; 47 import com.mysql.clusterj.core.util.LoggerFactoryService; 48 import com.mysql.clusterj.tie.DbImpl.BufferManager; 49 50 import com.mysql.ndbjtie.ndbapi.NdbErrorConst; 51 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation; 52 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation; 53 import com.mysql.ndbjtie.ndbapi.NdbOperation; 54 import com.mysql.ndbjtie.ndbapi.NdbOperationConst; 55 import com.mysql.ndbjtie.ndbapi.NdbRecordConst; 56 import com.mysql.ndbjtie.ndbapi.NdbScanOperation; 57 import com.mysql.ndbjtie.ndbapi.NdbTransaction; 58 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; 59 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst; 60 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst; 61 import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst; 62 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption; 63 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag; 64 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions; 65 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst; 66 67 /** 68 * 69 */ 70 class ClusterTransactionImpl implements ClusterTransaction { 71 72 /** My message translator */ 73 static final I18NHelper local = I18NHelper 74 .getInstance(ClusterTransactionImpl.class); 75 76 /** My logger */ 77 static final Logger logger = LoggerFactoryService.getFactory() 78 .getInstance(ClusterTransactionImpl.class); 79 80 protected final static String USE_NDBRECORD_NAME = "com.mysql.clusterj.UseNdbRecord"; 81 private static boolean USE_NDBRECORD = ClusterJHelper.getBooleanProperty(USE_NDBRECORD_NAME, "true"); 82 83 protected NdbTransaction ndbTransaction; 84 private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>(); 85 86 /** The cluster connection for this transaction */ 87 protected ClusterConnectionImpl clusterConnectionImpl; 88 89 /** The DbImpl associated with this NdbTransaction */ 90 protected DbImpl db; 91 92 /** The partition key; by default it doesn't do anything */ 93 protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance(); 94 95 /** The NdbDictionary */ 96 private Dictionary ndbDictionary; 97 98 /** The coordinated transaction identifier */ 99 private String coordinatedTransactionId = null; 100 101 /** Is getCoordinatedTransactionId supported? True until proven false. */ 102 private static boolean supportsGetCoordinatedTransactionId = true; 103 104 /** Lock mode for find operations */ 105 private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead; 106 107 /** Lock mode for index lookup operations */ 108 private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead; 109 110 /** Lock mode for index scan operations */ 111 private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead; 112 113 /** Lock mode for table scan operations */ 114 private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead; 115 116 /** Autocommit flag if we are in an autocommit transaction */ 117 private boolean autocommit = false; 118 119 /** Autocommitted flag if we autocommitted early */ 120 private boolean autocommitted = false; 121 122 /** The transaction id to join this transaction to */ 123 private String joinTransactionId; 124 125 private BufferManager bufferManager; 126 127 private List<Operation> operationsToCheck = new ArrayList<Operation>(); 128 ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl, DbImpl db, Dictionary ndbDictionary, String joinTransactionId)129 public ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl, 130 DbImpl db, Dictionary ndbDictionary, String joinTransactionId) { 131 this.db = db; 132 this.clusterConnectionImpl = clusterConnectionImpl; 133 this.ndbDictionary = ndbDictionary; 134 this.joinTransactionId = joinTransactionId; 135 this.bufferManager = db.getBufferManager(); 136 } 137 getClusterConnection()138 public ClusterConnectionImpl getClusterConnection() { 139 return this.clusterConnectionImpl; 140 } 141 close()142 public void close() { 143 if (ndbTransaction != null) { 144 ndbTransaction.close(); 145 ndbTransaction = null; 146 } 147 } 148 executeCommit()149 public void executeCommit() { 150 executeCommit(true, true); 151 } 152 isEnlisted()153 public boolean isEnlisted() { 154 return ndbTransaction != null; 155 } 156 157 /** 158 * Enlist the ndb transaction if not already enlisted. 159 * If the coordinated transaction id is set, join an existing transaction. 160 * Otherwise, use the partition key to enlist the transaction. 161 */ enlist()162 private void enlist() { 163 db.assertOpen("ClusterTransactionImpl.enlist"); 164 if (logger.isTraceEnabled()) logger.trace("ndbTransaction: " + ndbTransaction 165 + " with joinTransactionId: " + joinTransactionId); 166 if (ndbTransaction == null) { 167 if (coordinatedTransactionId != null) { 168 ndbTransaction = db.joinTransaction(coordinatedTransactionId); 169 } else { 170 ndbTransaction = partitionKey.enlist(db); 171 getCoordinatedTransactionId(db); 172 } 173 } 174 } 175 executeCommit(boolean abort, boolean force)176 public void executeCommit(boolean abort, boolean force) { 177 db.assertOpen("ClusterTransactionImpl.executeCommit"); 178 if (logger.isTraceEnabled()) logger.trace(""); 179 // nothing to do if no ndbTransaction was ever enlisted or already autocommitted 180 if (isEnlisted() && !autocommitted) { 181 handlePendingPostExecuteCallbacks(); 182 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError; 183 int forceOption = force?1:0; 184 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Commit, 185 abortOption, forceOption); 186 handleError(returnCode, ndbTransaction); 187 } 188 autocommitted = false; 189 autocommit = false; 190 } 191 executeNoCommit()192 public void executeNoCommit() { 193 executeNoCommit(true, true); 194 } 195 executeNoCommit(boolean abort, boolean force)196 public void executeNoCommit(boolean abort, boolean force) { 197 db.assertOpen("ClusterTransactionImpl.executeNoCommit"); 198 if (logger.isTraceEnabled()) logger.trace(""); 199 if (!isEnlisted()) { 200 // nothing to do if no ndbTransaction was ever enlisted 201 return; 202 } 203 if (autocommit && postExecuteCallbacks.size() == 0) { 204 // optimization to commit now because no blob columns 205 executeCommit(abort, force); 206 autocommitted = true; 207 return; 208 } 209 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError; 210 int forceOption = force?1:0; 211 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.NoCommit, 212 abortOption, forceOption); 213 handleError(returnCode, ndbTransaction); 214 performPostExecuteCallbacks(); 215 } 216 executeRollback()217 public void executeRollback() { 218 db.assertOpen("ClusterTransactionImpl.executeRollback"); 219 if (!isEnlisted()) { 220 // nothing to do if no ndbTransaction was ever enlisted 221 return; 222 } 223 int abortOption = AbortOption.AO_IgnoreError; 224 int forceOption = 1; 225 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Rollback, 226 abortOption, forceOption); 227 handleError(returnCode, ndbTransaction); 228 } 229 getDeleteOperation(Table storeTable)230 public Operation getDeleteOperation(Table storeTable) { 231 enlist(); 232 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 233 if (USE_NDBRECORD) { 234 return new NdbRecordDeleteOperationImpl(this, storeTable); 235 } 236 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 237 handleError(ndbTable, ndbDictionary); 238 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); 239 handleError(ndbOperation, ndbTransaction); 240 int returnCode = ndbOperation.deleteTuple(); 241 handleError(returnCode, ndbTransaction); 242 return new OperationImpl(ndbOperation, this); 243 } 244 getInsertOperation(Table storeTable)245 public Operation getInsertOperation(Table storeTable) { 246 enlist(); 247 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 248 if (USE_NDBRECORD) { 249 return new NdbRecordInsertOperationImpl(this, storeTable); 250 } 251 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 252 handleError(ndbTable, ndbDictionary); 253 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); 254 handleError(ndbOperation, ndbTransaction); 255 int returnCode = ndbOperation.insertTuple(); 256 handleError(returnCode, ndbTransaction); 257 return new OperationImpl(ndbOperation, this); 258 } 259 getIndexScanOperation(Index storeIndex, Table storeTable)260 public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) { 261 enlist(); 262 if (USE_NDBRECORD) { 263 return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, indexScanLockMode); 264 } 265 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 266 handleError(ndbIndex, ndbDictionary); 267 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex); 268 handleError(ndbOperation, ndbTransaction); 269 int scanFlags = 0; 270 int lockMode = indexScanLockMode; 271 if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) { 272 scanFlags = ScanFlag.SF_KeyInfo; 273 } 274 int parallel = 0; 275 int batch = 0; 276 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch); 277 handleError(returnCode, ndbTransaction); 278 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 279 return new IndexScanOperationImpl(storeTable, ndbOperation, this); 280 } 281 getIndexScanOperationMultiRange(Index storeIndex, Table storeTable)282 public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) { 283 enlist(); 284 if (USE_NDBRECORD) { 285 return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, true, indexScanLockMode); 286 } 287 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 288 handleError(ndbIndex, ndbDictionary); 289 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex); 290 handleError(ndbOperation, ndbTransaction); 291 int scanFlags = ScanFlag.SF_OrderBy; 292 int lockMode = indexScanLockMode; 293 if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) { 294 scanFlags |= ScanFlag.SF_KeyInfo; 295 } 296 scanFlags |= ScanFlag.SF_MultiRange; 297 int parallel = 0; 298 int batch = 0; 299 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch); 300 handleError(returnCode, ndbTransaction); 301 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 302 return new IndexScanOperationImpl(storeTable, ndbOperation, this); 303 } 304 getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable)305 public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) { 306 enlist(); 307 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 308 handleError(ndbIndex, ndbDictionary); 309 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex); 310 handleError(ndbOperation, ndbTransaction); 311 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive; 312 int scanFlags = ScanFlag.SF_KeyInfo; 313 int parallel = 0; 314 int batch = 0; 315 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch); 316 handleError(returnCode, ndbTransaction); 317 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 318 return new IndexScanOperationImpl(storeTable, ndbOperation, this); 319 } 320 getSelectOperation(Table storeTable)321 public Operation getSelectOperation(Table storeTable) { 322 enlist(); 323 if (USE_NDBRECORD) { 324 return new NdbRecordKeyOperationImpl(this, storeTable); 325 } 326 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 327 handleError(ndbTable, ndbDictionary); 328 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); 329 handleError(ndbOperation, ndbTransaction); 330 int lockMode = findLockMode; 331 int returnCode = ndbOperation.readTuple(lockMode); 332 handleError(returnCode, ndbTransaction); 333 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 334 return new OperationImpl(storeTable, ndbOperation, this); 335 } 336 getTableScanOperation(Table storeTable)337 public ScanOperation getTableScanOperation(Table storeTable) { 338 enlist(); 339 if (USE_NDBRECORD) { 340 return new NdbRecordTableScanOperationImpl(this, storeTable, tableScanLockMode); 341 } 342 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 343 handleError(ndbTable, ndbDictionary); 344 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable); 345 handleError(ndbScanOperation, ndbTransaction); 346 int lockMode = tableScanLockMode; 347 int scanFlags = 0; 348 if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) { 349 scanFlags = ScanFlag.SF_KeyInfo; 350 } 351 int parallel = 0; 352 int batch = 0; 353 int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch); 354 handleError(returnCode, ndbTransaction); 355 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 356 return new ScanOperationImpl(storeTable, ndbScanOperation, this); 357 } 358 getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable)359 public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) { 360 enlist(); 361 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 362 handleError(ndbTable, ndbDictionary); 363 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable); 364 handleError(ndbScanOperation, ndbTransaction); 365 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive; 366 int scanFlags = ScanFlag.SF_KeyInfo; 367 int parallel = 0; 368 int batch = 0; 369 int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch); 370 handleError(returnCode, ndbTransaction); 371 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 372 return new ScanOperationImpl(storeTable, ndbScanOperation, this); 373 } 374 getUniqueIndexOperation(Index storeIndex, Table storeTable)375 public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) { 376 enlist(); 377 if (USE_NDBRECORD) { 378 return new NdbRecordUniqueKeyOperationImpl(this, storeIndex, storeTable); 379 } 380 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 381 handleError(ndbIndex, ndbDictionary); 382 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex); 383 handleError(ndbIndexOperation, ndbTransaction); 384 int lockMode = lookupLockMode; 385 int returnCode = ndbIndexOperation.readTuple(lockMode); 386 handleError(returnCode, ndbTransaction); 387 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 388 return new IndexOperationImpl(storeTable, ndbIndexOperation, this); 389 } 390 getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable)391 public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) { 392 enlist(); 393 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 394 handleError(ndbIndex, ndbDictionary); 395 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex); 396 handleError(ndbIndexOperation, ndbTransaction); 397 int returnCode = ndbIndexOperation.deleteTuple(); 398 handleError(returnCode, ndbTransaction); 399 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 400 return new IndexOperationImpl(storeTable, ndbIndexOperation, this); 401 } 402 getUpdateOperation(Table storeTable)403 public Operation getUpdateOperation(Table storeTable) { 404 enlist(); 405 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 406 handleError(ndbTable, ndbDictionary); 407 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); 408 handleError(ndbOperation, ndbTransaction); 409 int returnCode = ndbOperation.updateTuple(); 410 handleError(returnCode, ndbTransaction); 411 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 412 return new OperationImpl(storeTable, ndbOperation, this); 413 } 414 getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable)415 public IndexOperation getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable) { 416 enlist(); 417 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName()); 418 handleError(ndbIndex, ndbDictionary); 419 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex); 420 handleError(ndbIndexOperation, ndbTransaction); 421 int returnCode = ndbIndexOperation.updateTuple(); 422 handleError(returnCode, ndbTransaction); 423 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName()); 424 return new IndexOperationImpl(storeTable, ndbIndexOperation, this); 425 } 426 getWriteOperation(Table storeTable)427 public Operation getWriteOperation(Table storeTable) { 428 enlist(); 429 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); 430 handleError(ndbTable, ndbDictionary); 431 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); 432 handleError(ndbOperation, ndbTransaction); 433 int returnCode = ndbOperation.writeTuple(); 434 handleError(returnCode, ndbTransaction); 435 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); 436 return new OperationImpl(storeTable, ndbOperation, this); 437 } 438 439 /** Create an NdbOperation for insert using NdbRecord. 440 * 441 * @param ndbRecord the NdbRecord 442 * @param buffer the buffer with data for the operation 443 * @param mask the mask of column values already set in the buffer 444 * @param options the OperationOptions for this operation 445 * @return the insert operation 446 */ insertTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)447 public NdbOperationConst insertTuple(NdbRecordConst ndbRecord, 448 ByteBuffer buffer, byte[] mask, OperationOptionsConst options) { 449 enlist(); 450 NdbOperationConst operation = ndbTransaction.insertTuple(ndbRecord, buffer, mask, options, 0); 451 handleError(operation, ndbTransaction); 452 return operation; 453 } 454 455 /** Create a table scan operation using NdbRecord. 456 * 457 * @param ndbRecord the NdbRecord for the result 458 * @param mask the columns to read 459 * @param options the scan options 460 * @return 461 */ scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options)462 public NdbScanOperation scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options) { 463 enlist(); 464 int lockMode = tableScanLockMode; 465 NdbScanOperation operation = ndbTransaction.scanTable(ndbRecord, lockMode, mask, options, 0); 466 handleError(operation, ndbTransaction); 467 return operation; 468 } 469 470 /** Create a scan operation on the index using NdbRecord. 471 * 472 * @param ndbRecord the ndb record 473 * @param mask the mask that specifies which columns to read 474 * @param object scan options // TODO change this 475 * @return 476 */ scanIndex(NdbRecordConst key_record, NdbRecordConst result_record, byte[] result_mask, ScanOptions scanOptions)477 public NdbIndexScanOperation scanIndex(NdbRecordConst key_record, NdbRecordConst result_record, 478 byte[] result_mask, ScanOptions scanOptions) { 479 enlist(); 480 return ndbTransaction.scanIndex(key_record, result_record, indexScanLockMode, result_mask, null, scanOptions, 0); 481 } 482 483 /** Create an NdbOperation for delete using NdbRecord. 484 * 485 * @param ndbRecord the NdbRecord 486 * @param buffer the buffer with data for the operation 487 * @param mask the mask of column values already set in the buffer 488 * @param options the OperationOptions for this operation 489 * @return the delete operation 490 */ deleteTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)491 public NdbOperationConst deleteTuple(NdbRecordConst ndbRecord, 492 ByteBuffer buffer, byte[] mask, OperationOptionsConst options) { 493 enlist(); 494 NdbOperationConst operation = ndbTransaction.deleteTuple(ndbRecord, buffer, ndbRecord, null, mask, options, 0); 495 handleError(operation, ndbTransaction); 496 return operation; 497 } 498 499 /** Create an NdbOperation for update using NdbRecord. 500 * 501 * @param ndbRecord the NdbRecord 502 * @param buffer the buffer with data for the operation 503 * @param mask the mask of column values already set in the buffer 504 * @param options the OperationOptions for this operation 505 * @return the update operation 506 */ updateTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)507 public NdbOperationConst updateTuple(NdbRecordConst ndbRecord, 508 ByteBuffer buffer, byte[] mask, OperationOptionsConst options) { 509 enlist(); 510 NdbOperationConst operation = ndbTransaction.updateTuple(ndbRecord, buffer, ndbRecord, buffer, mask, options, 0); 511 handleError(operation, ndbTransaction); 512 return operation; 513 } 514 515 /** Create an NdbOperation for write using NdbRecord. 516 * 517 * @param ndbRecord the NdbRecord 518 * @param buffer the buffer with data for the operation 519 * @param mask the mask of column values already set in the buffer 520 * @param options the OperationOptions for this operation 521 * @return the update operation 522 */ writeTuple(NdbRecordConst ndbRecord, ByteBuffer buffer, byte[] mask, OperationOptionsConst options)523 public NdbOperationConst writeTuple(NdbRecordConst ndbRecord, 524 ByteBuffer buffer, byte[] mask, OperationOptionsConst options) { 525 enlist(); 526 NdbOperationConst operation = ndbTransaction.writeTuple(ndbRecord, buffer, ndbRecord, buffer, mask, options, 0); 527 handleError(operation, ndbTransaction); 528 return operation; 529 } 530 531 /** Create an NdbOperation for key read using NdbRecord. The 'find' lock mode is used. 532 * 533 * @param ndbRecordKeys the NdbRecord for the key 534 * @param keyBuffer the buffer with the key for the operation 535 * @param ndbRecordValues the NdbRecord for the value 536 * @param valueBuffer the buffer with the value returned by the operation 537 * @param mask the mask of column values to be read 538 * @param options the OperationOptions for this operation 539 * @return the ndb operation for key read 540 */ readTuple(NdbRecordConst ndbRecordKeys, ByteBuffer keyBuffer, NdbRecordConst ndbRecordValues, ByteBuffer valueBuffer, byte[] mask, OperationOptionsConst options)541 public NdbOperationConst readTuple(NdbRecordConst ndbRecordKeys, ByteBuffer keyBuffer, 542 NdbRecordConst ndbRecordValues, ByteBuffer valueBuffer, 543 byte[] mask, OperationOptionsConst options) { 544 enlist(); 545 NdbOperationConst operation = ndbTransaction.readTuple(ndbRecordKeys, keyBuffer, 546 ndbRecordValues, valueBuffer, findLockMode, mask, options, 0); 547 handleError(operation, ndbTransaction); 548 return operation; 549 } 550 postExecuteCallback(Runnable callback)551 public void postExecuteCallback(Runnable callback) { 552 postExecuteCallbacks.add(callback); 553 } 554 clearPostExecuteCallbacks()555 private void clearPostExecuteCallbacks() { 556 postExecuteCallbacks.clear(); 557 } 558 handlePendingPostExecuteCallbacks()559 private void handlePendingPostExecuteCallbacks() { 560 // if any pending postExecuteCallbacks, flush via executeNoCommit 561 if (!postExecuteCallbacks.isEmpty()) { 562 executeNoCommit(false, true); 563 } 564 } 565 performPostExecuteCallbacks()566 private void performPostExecuteCallbacks() { 567 // check completed operations 568 StringBuilder exceptionMessages = new StringBuilder(); 569 for (Operation op: operationsToCheck) { 570 int code = op.getErrorCode(); 571 if (code != 0) { 572 int mysqlCode = op.getMysqlCode(); 573 int status = op.getStatus(); 574 int classification = op.getClassification(); 575 String message = local.message("ERR_Datastore", -1, code, mysqlCode, status, classification, 576 op.toString()); 577 exceptionMessages.append(message); 578 exceptionMessages.append('\n'); 579 } 580 } 581 operationsToCheck.clear(); 582 // TODO should this set rollback only? 583 try { 584 for (Runnable runnable: postExecuteCallbacks) { 585 try { 586 runnable.run(); 587 } catch (Throwable t) { 588 t.printStackTrace(); 589 exceptionMessages.append(t.getMessage()); 590 exceptionMessages.append('\n'); 591 } 592 } 593 } finally { 594 clearPostExecuteCallbacks(); 595 } 596 if (exceptionMessages.length() > 0) { 597 throw new ClusterJDatastoreException(exceptionMessages.toString()); 598 } 599 } 600 601 /** Handle errors from ScanOperation where the error returnCode is -1. 602 * 603 * @param returnCode the return code from the nextResult operation 604 */ handleError(int returnCode)605 protected void handleError(int returnCode) { 606 if (returnCode == -1) { 607 NdbErrorConst ndbError = ndbTransaction.getNdbError(); 608 String detail = db.getNdbErrorDetail(ndbError); 609 Utility.throwError(returnCode, ndbError, detail); 610 } 611 } 612 handleError(int returnCode, NdbTransaction ndbTransaction)613 protected void handleError(int returnCode, NdbTransaction ndbTransaction) { 614 if (returnCode == 0) { 615 return; 616 } else { 617 NdbErrorConst ndbError = ndbTransaction.getNdbError(); 618 if (ndbError.code() == 0) { 619 return; 620 } 621 String detail = db.getNdbErrorDetail(ndbError); 622 Utility.throwError(returnCode, ndbError, detail); 623 } 624 } 625 handleError(Object object, NdbTransaction ndbTransaction)626 protected void handleError(Object object, NdbTransaction ndbTransaction) { 627 if (object != null) { 628 return; 629 } else { 630 NdbErrorConst ndbError = ndbTransaction.getNdbError(); 631 String detail = db.getNdbErrorDetail(ndbError); 632 Utility.throwError(null, ndbError, detail); 633 } 634 } 635 handleError(Object object, Dictionary ndbDictionary)636 protected void handleError(Object object, Dictionary ndbDictionary) { 637 if (object != null) { 638 return; 639 } else { 640 NdbErrorConst ndbError = ndbDictionary.getNdbError(); 641 String detail = db.getNdbErrorDetail(ndbError); 642 Utility.throwError(null, ndbError, detail); 643 } 644 } 645 setPartitionKey(PartitionKey partitionKey)646 public void setPartitionKey(PartitionKey partitionKey) { 647 if (partitionKey == null) { 648 throw new ClusterJFatalInternalException( 649 local.message("ERR_Partition_Key_Null")); 650 } 651 this.partitionKey = (PartitionKeyImpl)partitionKey; 652 } 653 getCoordinatedTransactionId()654 public String getCoordinatedTransactionId() { 655 return coordinatedTransactionId; 656 } 657 658 /** Get the coordinated transaction id if possible and update the field with 659 * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie 660 * and native library) the ndbTransaction.getCoordinatedTransactionId() method 661 * will throw an Error of some kind (java.lang.NoSuchMethodError or 662 * java.lang.UnsatisfiedLinkError) and this will cause this instance 663 * (and any other instance with access to the new value of the static variable 664 * supportsGetCoordinatedTransactionId) to never try again. 665 * @param db the DbImpl instance 666 */ getCoordinatedTransactionId(DbImpl db)667 private void getCoordinatedTransactionId(DbImpl db) { 668 try { 669 if (supportsGetCoordinatedTransactionId) { 670 // not implemented quite yet... 671 // ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer(); 672 // coordinatedTransactionId = ndbTransaction. 673 // getCoordinatedTransactionId(buffer, buffer.capacity()); 674 if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: " 675 + coordinatedTransactionId); 676 throw new ClusterJFatalInternalException("Not Implemented"); 677 } 678 } catch (Throwable t) { 679 // oops, don't do this again 680 supportsGetCoordinatedTransactionId = false; 681 } 682 } 683 setCoordinatedTransactionId(String coordinatedTransactionId)684 public void setCoordinatedTransactionId(String coordinatedTransactionId) { 685 this.coordinatedTransactionId = coordinatedTransactionId; 686 } 687 setLockMode(LockMode lockmode)688 public void setLockMode(LockMode lockmode) { 689 findLockMode = translateLockMode(lockmode); 690 lookupLockMode = findLockMode; 691 indexScanLockMode = findLockMode; 692 tableScanLockMode = findLockMode; 693 } 694 translateLockMode(LockMode lockmode)695 private int translateLockMode(LockMode lockmode) { 696 switch(lockmode) { 697 case READ_COMMITTED: 698 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead; 699 case SHARED: 700 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read; 701 case EXCLUSIVE: 702 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive; 703 default: 704 throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Lock_Mode", lockmode)); 705 } 706 } 707 setAutocommit(boolean autocommit)708 public void setAutocommit(boolean autocommit) { 709 this.autocommit = autocommit; 710 } 711 getBufferManager()712 public BufferManager getBufferManager() { 713 return bufferManager; 714 } 715 716 /** Get the cached NdbRecordImpl for this table. The NdbRecordImpl is cached in the 717 * cluster connection. 718 * @param storeTable the table 719 * @return 720 */ getCachedNdbRecordImpl(Table storeTable)721 protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) { 722 return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable); 723 } 724 725 /** Get the cached NdbRecordImpl for this index and table. The NdbRecordImpl is cached in the 726 * cluster connection. 727 * @param storeTable the table 728 * @param storeIndex the index 729 * @return 730 */ getCachedNdbRecordImpl(Index storeIndex, Table storeTable)731 protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) { 732 return clusterConnectionImpl.getCachedNdbRecordImpl(storeIndex, storeTable); 733 } 734 735 /** 736 * Add an operation to check for errors after execute. 737 * @param op the operation to check 738 */ addOperationToCheck(Operation op)739 public void addOperationToCheck(Operation op) { 740 operationsToCheck.add(op); 741 } 742 743 } 744