1 /* 2 Copyright (c) 2009, 2021, Oracle and/or its affiliates. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package com.mysql.clusterj.core; 26 27 import com.mysql.clusterj.ClusterJException; 28 import com.mysql.clusterj.ClusterJFatalInternalException; 29 import com.mysql.clusterj.ClusterJUserException; 30 import com.mysql.clusterj.DynamicObject; 31 import com.mysql.clusterj.DynamicObjectDelegate; 32 import com.mysql.clusterj.LockMode; 33 import com.mysql.clusterj.Query; 34 import com.mysql.clusterj.Transaction; 35 36 import com.mysql.clusterj.core.spi.DomainTypeHandler; 37 import com.mysql.clusterj.core.spi.SmartValueHandler; 38 import com.mysql.clusterj.core.spi.ValueHandler; 39 40 import com.mysql.clusterj.core.query.QueryDomainTypeImpl; 41 import com.mysql.clusterj.core.query.QueryBuilderImpl; 42 import com.mysql.clusterj.core.query.QueryImpl; 43 44 import com.mysql.clusterj.core.spi.SessionSPI; 45 46 import com.mysql.clusterj.core.store.ClusterTransaction; 47 import com.mysql.clusterj.core.store.Db; 48 import com.mysql.clusterj.core.store.Dictionary; 49 import com.mysql.clusterj.core.store.Index; 50 import com.mysql.clusterj.core.store.IndexOperation; 51 import com.mysql.clusterj.core.store.IndexScanOperation; 52 import com.mysql.clusterj.core.store.Operation; 53 import com.mysql.clusterj.core.store.PartitionKey; 54 import com.mysql.clusterj.core.store.ResultData; 55 import com.mysql.clusterj.core.store.ScanOperation; 56 import com.mysql.clusterj.core.store.Table; 57 58 import com.mysql.clusterj.core.util.I18NHelper; 59 import com.mysql.clusterj.core.util.Logger; 60 import com.mysql.clusterj.core.util.LoggerFactoryService; 61 62 import com.mysql.clusterj.query.QueryBuilder; 63 import com.mysql.clusterj.query.QueryDefinition; 64 import com.mysql.clusterj.query.QueryDomainType; 65 66 import java.lang.reflect.Proxy; 67 import java.lang.reflect.InvocationHandler; 68 69 import java.util.ArrayList; 70 import java.util.BitSet; 71 import java.util.Collections; 72 import java.util.Iterator; 73 import java.util.List; 74 import java.util.Map; 75 76 /** 77 * This class implements Session, the main user interface to ClusterJ. 78 * It also implements SessionSPI, the main component interface. 79 */ 80 public class SessionImpl implements SessionSPI, CacheManager, StoreManager { 81 82 /** My message translator */ 83 static final I18NHelper local = I18NHelper.getInstance(SessionImpl.class); 84 85 /** My logger */ 86 static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionImpl.class); 87 88 /** My Factory. */ 89 protected SessionFactoryImpl factory; 90 91 /** Db: one per session. */ 92 protected Db db; 93 94 /** Dictionary: one per session. */ 95 protected Dictionary dictionary; 96 97 /** One transaction at a time. */ 98 protected TransactionImpl transactionImpl; 99 100 /** The partition key */ 101 protected PartitionKey partitionKey = null; 102 103 /** Rollback only status */ 104 protected boolean rollbackOnly = false; 105 106 /** The underlying ClusterTransaction */ 107 protected ClusterTransaction clusterTransaction; 108 109 /** The transaction id to join */ 110 protected String joinTransactionId = null; 111 112 /** The properties for this session */ 113 protected Map properties; 114 115 /** Flags for iterating a scan */ 116 protected final int RESULT_READY = 0; 117 protected final int SCAN_FINISHED = 1; 118 protected final int CACHE_EMPTY = 2; 119 120 /** The list of objects changed since the last flush */ 121 protected List<StateManager> changeList = new ArrayList<StateManager>(); 122 123 /** The list of pending operations, such as load operations, that need to be 124 * processed after the operation is sent to the database via @see #executeNoCommit(). 125 */ 126 protected List<Runnable> postExecuteOperations = new ArrayList<Runnable>(); 127 128 /** The transaction state of this session. */ 129 protected TransactionState transactionState; 130 131 /** The exception state of an internal transaction. */ 132 private ClusterJException transactionException; 133 134 /** Nested auto transaction counter. */ 135 protected int nestedAutoTransactionCounter = 0; 136 137 /** Number of retries for retriable exceptions */ 138 // TODO get this from properties 139 protected int numberOfRetries = 5; 140 141 /** The lock mode for read operations */ 142 private LockMode lockmode = LockMode.READ_COMMITTED; 143 144 /** Create a SessionImpl with factory, properties, Db, and dictionary 145 */ SessionImpl(SessionFactoryImpl factory, Map properties, Db db, Dictionary dictionary)146 SessionImpl(SessionFactoryImpl factory, Map properties, Db db, Dictionary dictionary) { 147 this.factory = factory; 148 this.db = db; 149 this.dictionary = dictionary; 150 this.properties = properties; 151 transactionImpl = new TransactionImpl(this); 152 transactionState = transactionStateNotActive; 153 } 154 155 /** Create a query from a query definition. 156 * 157 * @param qd the query definition 158 * @return the query 159 */ createQuery(QueryDefinition<T> qd)160 public <T> Query<T> createQuery(QueryDefinition<T> qd) { 161 if (!(qd instanceof QueryDomainTypeImpl)) { 162 throw new ClusterJUserException( 163 local.message("ERR_Exception_On_Method", "createQuery")); 164 } 165 return new QueryImpl<T>(this, (QueryDomainTypeImpl<T>)qd); 166 } 167 168 /** Find an instance by its class and primary key. 169 * If there is a compound primary key, the key is an Object[] containing 170 * all of the primary key fields in order of declaration in annotations. 171 * 172 * @param cls the class 173 * @param key the primary key 174 * @return the instance 175 */ find(Class<T> cls, Object key)176 public <T> T find(Class<T> cls, Object key) { 177 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls); 178 ValueHandler keyHandler = domainTypeHandler.createKeyValueHandler(key, db); 179 // initialize from the database using the key 180 return initializeFromDatabase(domainTypeHandler, null, null, keyHandler); 181 } 182 183 /** Initialize fields from the database. The keyHandler must 184 * contain the primary keys, none of which can be null. 185 * The instanceHandler, which may be null, manages the values 186 * of the instance. If it is null, both the instanceHandler and the 187 * instance are created if the instance exists in the database. 188 * The instance, which may be null, is the domain instance that is 189 * returned after loading the values from the database. 190 * 191 * @param domainTypeHandler domain type handler for the class 192 * @param keyHandler the primary key handler 193 * @param instanceHandler the handler for the instance 194 * (may be null if not yet initialized) 195 * @param instance the instance (may be null) 196 * @return the instance with fields initialized from the database 197 */ initializeFromDatabase(final DomainTypeHandler<T> domainTypeHandler, T instance, ValueHandler instanceHandler, ValueHandler keyHandler)198 public <T> T initializeFromDatabase(final DomainTypeHandler<T> domainTypeHandler, 199 T instance, 200 ValueHandler instanceHandler, ValueHandler keyHandler) { 201 startAutoTransaction(); 202 if (keyHandler instanceof SmartValueHandler) { 203 try { 204 final SmartValueHandler smartValueHandler = (SmartValueHandler)keyHandler; 205 setPartitionKey(domainTypeHandler, smartValueHandler); 206 // load the values from the database into the smart value handler 207 @SuppressWarnings("unused") 208 Operation operation = smartValueHandler.load(clusterTransaction); 209 endAutoTransaction(); 210 if (isActive()) { 211 // if this is a continuing transaction, flush the operation to get the result 212 clusterTransaction.executeNoCommit(false, true); 213 } 214 if (smartValueHandler.found()) { 215 // create a new proxy (or dynamic instance) with the smart value handler 216 return domainTypeHandler.newInstance(smartValueHandler); 217 } else { 218 // not found 219 return null; 220 } 221 } catch (ClusterJException ex) { 222 failAutoTransaction(); 223 throw ex; 224 } 225 } 226 try { 227 ResultData rs = selectUnique(domainTypeHandler, keyHandler, null); 228 if (rs.next()) { 229 // we have a result; initialize the instance 230 if (instanceHandler == null) { 231 if (logger.isDetailEnabled()) logger.detail("Creating instanceHandler for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler)); 232 // we need both a new instance and its handler 233 instance = domainTypeHandler.newInstance(db); 234 instanceHandler = domainTypeHandler.getValueHandler(instance); 235 } else if (instance == null) { 236 if (logger.isDetailEnabled()) logger.detail("Creating instance for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler)); 237 // we have a handler but no instance 238 instance = domainTypeHandler.getInstance(instanceHandler); 239 } 240 // found the instance in the datastore 241 instanceHandler.found(Boolean.TRUE); 242 // put the results into the instance 243 domainTypeHandler.objectSetValues(rs, instanceHandler); 244 // set the cache manager to track updates 245 domainTypeHandler.objectSetCacheManager(this, instanceHandler); 246 // reset modified bits in instance 247 domainTypeHandler.objectResetModified(instanceHandler); 248 } else { 249 if (logger.isDetailEnabled()) logger.detail("No instance found in database for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler)); 250 // no instance found in database 251 if (instanceHandler != null) { 252 // mark the handler as not found 253 instanceHandler.found(Boolean.FALSE); 254 } 255 endAutoTransaction(); 256 return null; 257 } 258 } catch (ClusterJException ex) { 259 failAutoTransaction(); 260 throw ex; 261 } 262 endAutoTransaction(); 263 return instance; 264 } 265 266 /** If a transaction is already enlisted, ignore. Otherwise, set 267 * the partition key based on the key handler. 268 * @param domainTypeHandler the domain type handler 269 * @param keyHandler the value handler that holds the key values 270 */ setPartitionKey(DomainTypeHandler<?> domainTypeHandler, ValueHandler keyHandler)271 private void setPartitionKey(DomainTypeHandler<?> domainTypeHandler, 272 ValueHandler keyHandler) { 273 if (!isEnlisted()) { 274 // there is still time to set the partition key 275 PartitionKey partitionKey = 276 domainTypeHandler.createPartitionKey(keyHandler); 277 clusterTransaction.setPartitionKey(partitionKey); 278 } 279 } 280 281 /** Create an instance of a class to be persisted. 282 * 283 * @param cls the class 284 * @return a new instance that can be used with makePersistent 285 */ newInstance(Class<T> cls)286 public <T> T newInstance(Class<T> cls) { 287 return factory.newInstance(cls, dictionary, db); 288 } 289 290 /** Create an instance of a class to be persisted and set the primary key. 291 * 292 * @param cls the class 293 * @return a new instance that can be used with makePersistent, 294 * savePersistent, writePersistent, updatePersistent, or deletePersistent 295 */ newInstance(Class<T> cls, Object key)296 public <T> T newInstance(Class<T> cls, Object key) { 297 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls); 298 T instance = factory.newInstance(cls, dictionary, db); 299 domainTypeHandler.objectSetKeys(key, instance); 300 return instance; 301 } 302 303 /** Create an instance from a result data row. 304 * @param resultData the result of a query 305 * @param domainTypeHandler the domain type handler 306 * @return the instance 307 */ newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler)308 public <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler) { 309 T result = domainTypeHandler.newInstance(resultData, db); 310 return result; 311 } 312 313 /** Load the instance from the database into memory. Loading 314 * is asynchronous and will be executed when an operation requiring 315 * database access is executed: find, flush, or query. The instance must 316 * have been returned from find or query; or 317 * created via session.newInstance and its primary key initialized. 318 * @param object the instance to load 319 * @return the instance 320 * @see #found(Object) 321 */ load(final T object)322 public <T> T load(final T object) { 323 if (object == null) { 324 return null; 325 } 326 if (Iterable.class.isAssignableFrom(object.getClass())) { 327 Iterable<?> instances = (Iterable<?>)object; 328 for (Object instance:instances) { 329 load(instance); 330 } 331 return object; 332 } 333 if (object.getClass().isArray()) { 334 Object[] instances = (Object[])object; 335 for (Object instance:instances) { 336 load(instance); 337 } 338 return object; 339 } 340 // a transaction must already be active (autocommit is not supported) 341 assertActive(); 342 final DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object); 343 final ValueHandler instanceHandler = domainTypeHandler.getValueHandler(object); 344 setPartitionKey(domainTypeHandler, instanceHandler); 345 if (instanceHandler instanceof SmartValueHandler) { 346 @SuppressWarnings("unused") 347 Operation operation = ((SmartValueHandler)instanceHandler).load(clusterTransaction); 348 return object; 349 } 350 Table storeTable = domainTypeHandler.getStoreTable(); 351 // perform a primary key operation 352 final Operation op = clusterTransaction.getSelectOperation(storeTable); 353 op.beginDefinition(); 354 // set the keys into the operation 355 domainTypeHandler.operationSetKeys(instanceHandler, op); 356 // set the expected columns into the operation 357 domainTypeHandler.operationGetValues(op); 358 op.endDefinition(); 359 final ResultData rs = op.resultData(false); 360 final SessionImpl cacheManager = this; 361 // defer execution of the key operation until the next find, flush, or query 362 Runnable postExecuteOperation = new Runnable() { 363 public void run() { 364 if (rs.next()) { 365 // found row in database 366 instanceHandler.found(Boolean.TRUE); 367 // put the results into the instance 368 domainTypeHandler.objectSetValues(rs, instanceHandler); 369 // set the cache manager to track updates 370 domainTypeHandler.objectSetCacheManager(cacheManager, instanceHandler); 371 // reset modified bits in instance 372 domainTypeHandler.objectResetModified(instanceHandler); 373 } else { 374 // mark instance as not found 375 instanceHandler.found(Boolean.FALSE); 376 } 377 378 } 379 }; 380 clusterTransaction.postExecuteCallback(postExecuteOperation); 381 return object; 382 } 383 384 /** Was this instance found in the database? 385 * @param instance the instance 386 * @return <ul><li>null if the instance is null or was created via newInstance and never loaded; 387 * </li><li>true if the instance was returned from a find or query 388 * or created via newInstance and successfully loaded; 389 * </li><li>false if the instance was created via newInstance and not found. 390 * </li></ul> 391 */ found(Object instance)392 public Boolean found(Object instance) { 393 if (instance == null) { 394 return null; 395 } 396 if (instance instanceof DynamicObject) { 397 return ((DynamicObject)instance).found(); 398 } 399 // make sure the instance is a persistent type 400 getDomainTypeHandler(instance); 401 return true; 402 } 403 404 /** Make an instance persistent. Also recursively make an iterable collection or array persistent. 405 * 406 * @param object the instance or array or iterable collection of instances 407 * @return the instance 408 */ makePersistent(T object)409 public <T> T makePersistent(T object) { 410 if (object == null) { 411 return null; 412 } 413 if (Iterable.class.isAssignableFrom(object.getClass())) { 414 startAutoTransaction(); 415 Iterable<?> instances = (Iterable<?>)object; 416 for (Object instance:instances) { 417 makePersistent(instance); 418 } 419 endAutoTransaction(); 420 return object; 421 } 422 if (object.getClass().isArray()) { 423 startAutoTransaction(); 424 Object[] instances = (Object[])object; 425 for (Object instance:instances) { 426 makePersistent(instance); 427 } 428 endAutoTransaction(); 429 return object; 430 } 431 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(object); 432 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object); 433 insert(domainTypeHandler, valueHandler); 434 return object; 435 } 436 insert( DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler)437 public Operation insert( DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) { 438 startAutoTransaction(); 439 setPartitionKey(domainTypeHandler, valueHandler); 440 if (valueHandler instanceof SmartValueHandler) { 441 try { 442 SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler; 443 Operation result = smartValueHandler.insert(clusterTransaction); 444 valueHandler.resetModified(); 445 endAutoTransaction(); 446 return result; 447 } catch (ClusterJException cjex) { 448 failAutoTransaction(); 449 throw cjex; 450 } 451 } 452 Operation op = null; 453 Table storeTable = null; 454 try { 455 storeTable = domainTypeHandler.getStoreTable(); 456 op = clusterTransaction.getInsertOperation(storeTable); 457 // set all values in the operation, keys first 458 op.beginDefinition(); 459 domainTypeHandler.operationSetKeys(valueHandler, op); 460 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op); 461 op.endDefinition(); 462 // reset modified bits in instance 463 domainTypeHandler.objectResetModified(valueHandler); 464 } catch (ClusterJUserException cjuex) { 465 failAutoTransaction(); 466 throw cjuex; 467 } catch (ClusterJException cjex) { 468 failAutoTransaction(); 469 logger.error(local.message("ERR_Insert", storeTable.getName())); 470 throw new ClusterJException( 471 local.message("ERR_Insert", storeTable.getName()), cjex); 472 } catch (RuntimeException rtex) { 473 failAutoTransaction(); 474 logger.error(local.message("ERR_Insert", storeTable.getName())); 475 throw new ClusterJException( 476 local.message("ERR_Insert", storeTable.getName()), rtex); 477 } 478 endAutoTransaction(); 479 return op; 480 } 481 482 /** Make a number of instances persistent. 483 * 484 * @param instances a Collection or array of objects to persist 485 * @return a Collection or array with the same order of iteration 486 */ makePersistentAll(Iterable instances)487 public Iterable makePersistentAll(Iterable instances) { 488 startAutoTransaction(); 489 List<Object> result = new ArrayList<Object>(); 490 for (Object instance:instances) { 491 result.add(makePersistent(instance)); 492 } 493 endAutoTransaction(); 494 return result; 495 } 496 497 /** Delete an instance of a class from the database given its primary key. 498 * For single-column keys, the key parameter is a wrapper (e.g. Integer). 499 * For multi-column keys, the key parameter is an Object[] in which 500 * elements correspond to the primary keys in order as defined in the schema. 501 * @param cls the class 502 * @param key the primary key 503 */ deletePersistent(Class<T> cls, Object key)504 public <T> void deletePersistent(Class<T> cls, Object key) { 505 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls); 506 ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(key, db); 507 delete(domainTypeHandler, keyValueHandler); 508 } 509 510 /** Remove an instance from the database. Only the key field(s) 511 * are used to identify the instance. 512 * 513 * @param object the instance to remove from the database 514 */ deletePersistent(Object object)515 public void deletePersistent(Object object) { 516 if (object == null) { 517 return; 518 } 519 DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object); 520 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object); 521 delete(domainTypeHandler, valueHandler); 522 } 523 delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler)524 public Operation delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) { 525 startAutoTransaction(); 526 Table storeTable = domainTypeHandler.getStoreTable(); 527 setPartitionKey(domainTypeHandler, valueHandler); 528 if (valueHandler instanceof SmartValueHandler) { 529 try { 530 SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler; 531 Operation result = smartValueHandler.delete(clusterTransaction); 532 endAutoTransaction(); 533 return result; 534 } catch (ClusterJException cjex) { 535 failAutoTransaction(); 536 throw cjex; 537 } 538 } 539 Operation op = null; 540 try { 541 op = clusterTransaction.getDeleteOperation(storeTable); 542 op.beginDefinition(); 543 domainTypeHandler.operationSetKeys(valueHandler, op); 544 op.endDefinition(); 545 } catch (ClusterJException ex) { 546 failAutoTransaction(); 547 throw new ClusterJException( 548 local.message("ERR_Delete", storeTable.getName()), ex); 549 } 550 endAutoTransaction(); 551 return op; 552 } 553 554 /** Delete the instances corresponding to the parameters. 555 * @param objects the objects to delete 556 */ deletePersistentAll(Iterable objects)557 public void deletePersistentAll(Iterable objects) { 558 startAutoTransaction(); 559 for (Iterator it = objects.iterator(); it.hasNext();) { 560 deletePersistent(it.next()); 561 } 562 endAutoTransaction(); 563 } 564 565 /** Delete all instances of the parameter class. 566 * @param cls the class of instances to delete 567 */ deletePersistentAll(Class<T> cls)568 public <T> int deletePersistentAll(Class<T> cls) { 569 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls); 570 return deletePersistentAll(domainTypeHandler); 571 } 572 573 /** Delete all instances of the parameter domainTypeHandler. 574 * @param domainTypeHandler the domainTypeHandler of instances to delete 575 * @return the number of instances deleted 576 */ deletePersistentAll(DomainTypeHandler<?> domainTypeHandler)577 public int deletePersistentAll(DomainTypeHandler<?> domainTypeHandler) { 578 startAutoTransaction(); 579 Table storeTable = domainTypeHandler.getStoreTable(); 580 String tableName = storeTable.getName(); 581 ScanOperation op = null; 582 int count = 0; 583 try { 584 op = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable); 585 count = deletePersistentAll(op, true); 586 } catch (ClusterJException ex) { 587 failAutoTransaction(); 588 // TODO add table name to the error message 589 throw new ClusterJException( 590 local.message("ERR_Delete_All", tableName), ex); 591 } 592 endAutoTransaction(); 593 return count; 594 } 595 596 /** Delete all instances retrieved by the operation. The operation must have exclusive 597 * access to the instances and have the ScanFlag.KEY_INFO flag set. 598 * @param op the scan operation 599 * @return the number of instances deleted 600 */ deletePersistentAll(ScanOperation op, boolean abort)601 public int deletePersistentAll(ScanOperation op, boolean abort) { 602 int cacheCount = 0; 603 int count = 0; 604 boolean done = false; 605 boolean fetch = true; 606 // cannot use early autocommit optimization here 607 clusterTransaction.setAutocommit(false); 608 // execute the operation 609 clusterTransaction.executeNoCommit(true, true); 610 while (!done ) { 611 int result = op.nextResult(fetch); 612 switch (result) { 613 case RESULT_READY: 614 op.deleteCurrentTuple(); 615 ++count; 616 ++cacheCount; 617 fetch = false; 618 break; 619 case SCAN_FINISHED: 620 done = true; 621 if (cacheCount != 0) { 622 clusterTransaction.executeNoCommit(abort, true); 623 } 624 op.close(); 625 break; 626 case CACHE_EMPTY: 627 clusterTransaction.executeNoCommit(abort, true); 628 cacheCount = 0; 629 fetch = true; 630 break; 631 default: 632 throw new ClusterJException( 633 local.message("ERR_Next_Result_Illegal", result)); 634 } 635 } 636 return count; 637 } 638 639 /** Select a single row from the database. Only the fields requested 640 * will be selected. A transaction must be active (either via begin 641 * or startAutoTransaction). 642 * 643 * @param domainTypeHandler the domainTypeHandler to be selected 644 * @param keyHandler the key supplier for the select 645 * @param fields the fields to select; null to select all fields 646 * @return the ResultData from the database 647 */ selectUnique(DomainTypeHandler<?> domainTypeHandler, ValueHandler keyHandler, BitSet fields)648 public ResultData selectUnique(DomainTypeHandler<?> domainTypeHandler, 649 ValueHandler keyHandler, BitSet fields) { 650 assertActive(); 651 setPartitionKey(domainTypeHandler, keyHandler); 652 Table storeTable = domainTypeHandler.getStoreTable(); 653 // perform a single select by key operation 654 Operation op = clusterTransaction.getSelectOperation(storeTable); 655 op.beginDefinition(); 656 // set the keys into the operation 657 domainTypeHandler.operationSetKeys(keyHandler, op); 658 // set the expected columns into the operation 659 domainTypeHandler.operationGetValues(op); 660 op.endDefinition(); 661 // execute the select and get results 662 ResultData rs = op.resultData(); 663 return rs; 664 } 665 666 /** Update an instance in the database. The key field(s) 667 * are used to identify the instance; modified fields change the 668 * values in the database. 669 * 670 * @param object the instance to update in the database 671 */ updatePersistent(Object object)672 public void updatePersistent(Object object) { 673 if (object == null) { 674 return; 675 } 676 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object); 677 if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + object); 678 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object); 679 update(domainTypeHandler, valueHandler); 680 } 681 update(DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler)682 public Operation update(DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) { 683 startAutoTransaction(); 684 setPartitionKey(domainTypeHandler, valueHandler); 685 if (valueHandler instanceof SmartValueHandler) { 686 try { 687 SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler; 688 Operation result = smartValueHandler.update(clusterTransaction); 689 endAutoTransaction(); 690 return result; 691 } catch (ClusterJException cjex) { 692 failAutoTransaction(); 693 throw cjex; 694 } 695 } 696 Table storeTable = null; 697 Operation op = null; 698 try { 699 storeTable = domainTypeHandler.getStoreTable(); 700 op = clusterTransaction.getUpdateOperation(storeTable); 701 domainTypeHandler.operationSetKeys(valueHandler, op); 702 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op); 703 if (logger.isDetailEnabled()) logger.detail("Updated object " + 704 valueHandler); 705 } catch (ClusterJException ex) { 706 failAutoTransaction(); 707 throw new ClusterJException( 708 local.message("ERR_Update", storeTable.getName()) ,ex); 709 } 710 endAutoTransaction(); 711 return op; 712 } 713 714 /** Update the instances corresponding to the parameters. 715 * @param objects the objects to update 716 */ updatePersistentAll(Iterable objects)717 public void updatePersistentAll(Iterable objects) { 718 startAutoTransaction(); 719 for (Iterator it = objects.iterator(); it.hasNext();) { 720 updatePersistent(it.next()); 721 } 722 endAutoTransaction(); 723 } 724 725 /** Save the instance even if it does not exist. 726 * @param instance the instance to save 727 */ savePersistent(T instance)728 public <T> T savePersistent(T instance) { 729 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(instance); 730 if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + instance); 731 ValueHandler valueHandler = domainTypeHandler.getValueHandler(instance); 732 startAutoTransaction(); 733 setPartitionKey(domainTypeHandler, valueHandler); 734 if (valueHandler instanceof SmartValueHandler) { 735 try { 736 SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler; 737 smartValueHandler.write(clusterTransaction); 738 valueHandler.resetModified(); 739 endAutoTransaction(); 740 return instance; 741 } catch (ClusterJException cjex) { 742 failAutoTransaction(); 743 throw cjex; 744 } 745 } 746 Table storeTable = null; 747 try { 748 storeTable = domainTypeHandler.getStoreTable(); 749 Operation op = null; 750 op = clusterTransaction.getWriteOperation(storeTable); 751 domainTypeHandler.operationSetKeys(valueHandler, op); 752 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op); 753 } catch (ClusterJException ex) { 754 failAutoTransaction(); 755 throw new ClusterJException( 756 local.message("ERR_Write", storeTable.getName()) ,ex); 757 } 758 endAutoTransaction(); 759 return instance; 760 } 761 762 /** Save the instances even if they do not exist. 763 * @param instances 764 */ savePersistentAll(Iterable instances)765 public Iterable savePersistentAll(Iterable instances) { 766 List<Object> result = new ArrayList<Object>(); 767 startAutoTransaction(); 768 for (Iterator it = instances.iterator(); it.hasNext();) { 769 result.add(savePersistent(it.next())); 770 } 771 endAutoTransaction(); 772 return result; 773 } 774 775 /** Get the current transaction. 776 * 777 * @return the transaction 778 */ currentTransaction()779 public Transaction currentTransaction() { 780 return transactionImpl; 781 } 782 783 /** Close this session and deallocate all resources. 784 * 785 */ close()786 public void close() { 787 if (clusterTransaction != null) { 788 clusterTransaction.close(); 789 clusterTransaction = null; 790 } 791 if (db != null) { 792 db.close(); 793 db = null; 794 } 795 } 796 isClosed()797 public boolean isClosed() { 798 return db==null; 799 } 800 801 /** Assert this session is not yet closed. */ assertNotClosed()802 protected void assertNotClosed() { 803 if (isClosed()) { 804 throw new ClusterJUserException( 805 local.message("ERR_Session_Closed")); 806 } 807 } 808 809 /** Begin the current transaction. 810 * 811 */ begin()812 public void begin() { 813 if (logger.isDebugEnabled()) logger.debug("begin transaction."); 814 transactionState = transactionState.begin(); 815 handleTransactionException(); 816 } 817 818 /** Internally begin the transaction. 819 * Called by transactionState.begin(). 820 */ internalBegin()821 protected void internalBegin() { 822 try { 823 clusterTransaction = db.startTransaction(joinTransactionId); 824 clusterTransaction.setLockMode(lockmode); 825 // if a transaction has already begun, tell the cluster transaction about the key 826 if (partitionKey != null) { 827 clusterTransaction.setPartitionKey(partitionKey); 828 } 829 } catch (ClusterJException ex) { 830 throw new ClusterJException( 831 local.message("ERR_Ndb_Start"), ex); 832 } 833 } 834 835 /** Commit the current transaction. 836 * 837 */ commit()838 public void commit() { 839 if (logger.isDebugEnabled()) logger.debug("commit transaction."); 840 transactionState = transactionState.commit(); 841 handleTransactionException(); 842 } 843 844 /** Internally commit the transaction. 845 * Called by transactionState.commit(). 846 */ internalCommit()847 protected void internalCommit() { 848 if (rollbackOnly) { 849 try { 850 internalRollback(); 851 throw new ClusterJException( 852 local.message("ERR_Transaction_Rollback_Only")); 853 } catch (ClusterJException ex) { 854 throw new ClusterJException( 855 local.message("ERR_Transaction_Rollback_Only"), ex); 856 } 857 } 858 try { 859 clusterTransaction.executeCommit(); 860 } finally { 861 // always close the transaction 862 clusterTransaction.close(); 863 clusterTransaction = null; 864 partitionKey = null; 865 } 866 } 867 868 /** Roll back the current transaction. 869 * 870 */ rollback()871 public void rollback() { 872 if (logger.isDebugEnabled()) logger.debug("roll back transaction."); 873 transactionState = transactionState.rollback(); 874 handleTransactionException(); 875 } 876 877 /** Internally roll back the transaction. 878 * Called by transactionState.rollback() and 879 * transactionState.commit() if the transaction is marked for rollback. 880 * 881 */ internalRollback()882 protected void internalRollback() { 883 try { 884 clusterTransaction.executeRollback(); 885 } catch (ClusterJException ex) { 886 throw new ClusterJException( 887 local.message("ERR_Transaction_Execute", "rollback"), ex); 888 } finally { 889 if (clusterTransaction != null) { 890 clusterTransaction.close(); 891 } 892 clusterTransaction = null; 893 partitionKey = null; 894 } 895 } 896 897 /** Start a transaction if there is not already an active transaction. 898 * Throw a ClusterJException if there is any problem. 899 */ startAutoTransaction()900 public void startAutoTransaction() { 901 if (logger.isDebugEnabled()) logger.debug("start AutoTransaction"); 902 transactionState = transactionState.start(); 903 handleTransactionException(); 904 } 905 906 /** End an auto transaction if it was started. 907 * Throw a ClusterJException if there is any problem. 908 */ endAutoTransaction()909 public void endAutoTransaction() { 910 if (logger.isDebugEnabled()) logger.debug("end AutoTransaction"); 911 transactionState = transactionState.end(); 912 handleTransactionException(); 913 } 914 915 /** Fail an auto transaction if it was started. 916 * Throw a ClusterJException if there is any problem. 917 */ failAutoTransaction()918 public void failAutoTransaction() { 919 if (logger.isDebugEnabled()) logger.debug("fail AutoTransaction"); 920 transactionState = transactionState.fail(); 921 } 922 handleTransactionException()923 protected void handleTransactionException() { 924 if (transactionException == null) { 925 return; 926 } else { 927 ClusterJException ex = transactionException; 928 transactionException = null; 929 throw ex; 930 } 931 } 932 933 /** Mark the current transaction as rollback only. 934 * 935 */ setRollbackOnly()936 public void setRollbackOnly() { 937 rollbackOnly = true; 938 } 939 940 /** Is the current transaction marked for rollback only? 941 * @return true if the current transaction is marked for rollback only 942 */ getRollbackOnly()943 public boolean getRollbackOnly() { 944 return rollbackOnly; 945 } 946 947 /** Manage the state of the transaction associated with this 948 * StateManager. 949 */ 950 protected interface TransactionState { isActive()951 boolean isActive(); 952 begin()953 TransactionState begin(); commit()954 TransactionState commit(); rollback()955 TransactionState rollback(); 956 start()957 TransactionState start(); end()958 TransactionState end(); fail()959 TransactionState fail(); 960 } 961 962 /** This represents the state of Transaction Not Active. */ 963 protected TransactionState transactionStateNotActive = 964 new TransactionState() { 965 966 public boolean isActive() { 967 return false; 968 } 969 970 public TransactionState begin() { 971 try { 972 internalBegin(); 973 return transactionStateActive; 974 } catch (ClusterJException ex) { 975 transactionException = ex; 976 return transactionStateNotActive; 977 } 978 } 979 980 public TransactionState commit() { 981 transactionException = new ClusterJUserException( 982 local.message("ERR_Transaction_Must_Be_Active_For_Method", 983 "commit")); 984 return transactionStateNotActive; 985 } 986 987 public TransactionState rollback() { 988 transactionException = new ClusterJUserException( 989 local.message("ERR_Transaction_Must_Be_Active_For_Method", 990 "rollback")); 991 return transactionStateNotActive; 992 } 993 994 public TransactionState start() { 995 try { 996 internalBegin(); 997 clusterTransaction.setAutocommit(true); 998 nestedAutoTransactionCounter = 1; 999 return transactionStateAutocommit; 1000 } catch (ClusterJException ex) { 1001 transactionException = ex; 1002 return transactionStateNotActive; 1003 } 1004 } 1005 1006 public TransactionState end() { 1007 throw new ClusterJFatalInternalException( 1008 local.message("ERR_Transaction_Auto_Start", "end")); 1009 } 1010 1011 public TransactionState fail() { 1012 return transactionStateNotActive; 1013 } 1014 1015 }; 1016 1017 /** This represents the state of Transaction Active. */ 1018 protected TransactionState transactionStateActive = 1019 new TransactionState() { 1020 1021 public boolean isActive() { 1022 return true; 1023 } 1024 1025 public TransactionState begin() { 1026 transactionException = new ClusterJUserException( 1027 local.message("ERR_Transaction_Must_Not_Be_Active_For_Method", 1028 "begin")); 1029 return transactionStateActive; 1030 } 1031 1032 public TransactionState commit() { 1033 try { 1034 // flush unwritten changes 1035 flush(true); 1036 } catch (ClusterJException ex) { 1037 transactionException = ex; 1038 } 1039 return transactionStateNotActive; 1040 } 1041 1042 public TransactionState rollback() { 1043 try { 1044 internalRollback(); 1045 return transactionStateNotActive; 1046 } catch (ClusterJException ex) { 1047 transactionException = ex; 1048 return transactionStateNotActive; 1049 } 1050 } 1051 1052 public TransactionState start() { 1053 // nothing to do 1054 return transactionStateActive; 1055 } 1056 1057 public TransactionState end() { 1058 // nothing to do 1059 return transactionStateActive; 1060 } 1061 1062 public TransactionState fail() { 1063 // nothing to do 1064 return transactionStateActive; 1065 } 1066 1067 }; 1068 1069 protected TransactionState transactionStateAutocommit = 1070 new TransactionState() { 1071 1072 public boolean isActive() { 1073 return true; 1074 } 1075 1076 public TransactionState begin() { 1077 throw new ClusterJFatalInternalException( 1078 local.message("ERR_Transaction_Auto_End", "begin")); 1079 } 1080 1081 public TransactionState commit() { 1082 throw new ClusterJFatalInternalException( 1083 local.message("ERR_Transaction_Auto_End", "commit")); 1084 } 1085 1086 public TransactionState rollback() { 1087 throw new ClusterJFatalInternalException( 1088 local.message("ERR_Transaction_Auto_End", "rollback")); 1089 } 1090 1091 public TransactionState start() { 1092 // nested start; increment counter 1093 nestedAutoTransactionCounter++; 1094 return transactionStateAutocommit; 1095 } 1096 1097 public TransactionState end() { 1098 if (--nestedAutoTransactionCounter > 0) { 1099 return transactionStateAutocommit; 1100 } else if (nestedAutoTransactionCounter == 0) { 1101 try { 1102 internalCommit(); 1103 } catch (ClusterJException ex) { 1104 transactionException = ex; 1105 } 1106 return transactionStateNotActive; 1107 } else { 1108 throw new ClusterJFatalInternalException( 1109 local.message("ERR_Transaction_Auto_Start", "end")); 1110 } 1111 } 1112 1113 public TransactionState fail() { 1114 try { 1115 nestedAutoTransactionCounter = 0; 1116 internalRollback(); 1117 return transactionStateNotActive; 1118 } catch (ClusterJException ex) { 1119 // ignore failures caused by internal rollback 1120 return transactionStateNotActive; 1121 } 1122 } 1123 1124 }; 1125 1126 /** Get the domain type handler for an instance. 1127 * 1128 * @param object the instance for which to get the domain type handler 1129 * @return the domain type handler 1130 */ getDomainTypeHandler(T object)1131 protected synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(T object) { 1132 DomainTypeHandler<T> domainTypeHandler = 1133 factory.getDomainTypeHandler(object, dictionary); 1134 return domainTypeHandler; 1135 } 1136 1137 /** Get the domain type handler for a class. 1138 * 1139 * @param cls the class 1140 * @return the domain type handler 1141 */ getDomainTypeHandler(Class<T> cls)1142 public synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) { 1143 DomainTypeHandler<T> domainTypeHandler = 1144 factory.getDomainTypeHandler(cls, dictionary); 1145 return domainTypeHandler; 1146 } 1147 getDictionary()1148 public Dictionary getDictionary() { 1149 return dictionary; 1150 } 1151 1152 /** Is there an active transaction. 1153 * 1154 * @return true if there is an active transaction 1155 */ isActive()1156 boolean isActive() { 1157 return transactionState.isActive(); 1158 } 1159 1160 /** Is the transaction enlisted. A transaction is enlisted if and only if 1161 * an operation has been defined that requires an ndb transaction to be 1162 * started. 1163 * @return true if the transaction is enlisted 1164 */ isEnlisted()1165 public boolean isEnlisted() { 1166 return clusterTransaction==null?false:clusterTransaction.isEnlisted(); 1167 } 1168 1169 /** Assert that there is an active transaction (the user has called begin 1170 * or an autotransaction has begun). 1171 * Throw a user exception if not. 1172 */ assertActive()1173 private void assertActive() { 1174 if (!transactionState.isActive()) { 1175 throw new ClusterJUserException( 1176 local.message("ERR_Transaction_Must_Be_Active")); 1177 } 1178 } 1179 1180 /** Assert that there is not an active transaction. 1181 * Throw a user exception if there is an active transaction. 1182 * @param methodName the name of the method 1183 */ assertNotActive(String methodName)1184 private void assertNotActive(String methodName) { 1185 if (transactionState.isActive()) { 1186 throw new ClusterJUserException( 1187 local.message("ERR_Transaction_Must_Not_Be_Active_For_Method", 1188 methodName)); 1189 } 1190 } 1191 1192 /** Create a query from a class. 1193 * 1194 * @param cls the class 1195 * @return the query 1196 */ createQuery(Class cls)1197 public Query createQuery(Class cls) { 1198 throw new UnsupportedOperationException( 1199 local.message("ERR_NotImplemented")); 1200 } 1201 1202 /** Get a query builder. 1203 * 1204 * @return the query builder 1205 */ getQueryBuilder()1206 public QueryBuilder getQueryBuilder() { 1207 return new QueryBuilderImpl(this); 1208 } 1209 1210 /** Create an index scan operation for an index and table. 1211 * 1212 * @param storeIndex the index 1213 * @param storeTable the table 1214 * @return the index scan operation 1215 */ getIndexScanOperation(Index storeIndex, Table storeTable)1216 public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) { 1217 assertActive(); 1218 try { 1219 IndexScanOperation result = clusterTransaction.getIndexScanOperation(storeIndex, storeTable); 1220 return result; 1221 } catch (ClusterJException ex) { 1222 throw new ClusterJException( 1223 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex); 1224 } 1225 } 1226 1227 /** Create an index scan operation for an index and table to be used for a multi-range scan. 1228 * 1229 * @param storeIndex the index 1230 * @param storeTable the table 1231 * @return the index scan operation 1232 */ getIndexScanOperationMultiRange(Index storeIndex, Table storeTable)1233 public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) { 1234 assertActive(); 1235 try { 1236 IndexScanOperation result = clusterTransaction.getIndexScanOperationMultiRange(storeIndex, storeTable); 1237 return result; 1238 } catch (ClusterJException ex) { 1239 throw new ClusterJException( 1240 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex); 1241 } 1242 } 1243 1244 /** Create an index scan delete operation for an index and table. 1245 * 1246 * @param storeIndex the index 1247 * @param storeTable the table 1248 * @return the index scan operation 1249 */ getIndexScanDeleteOperation(Index storeIndex, Table storeTable)1250 public IndexScanOperation getIndexScanDeleteOperation(Index storeIndex, Table storeTable) { 1251 assertActive(); 1252 try { 1253 IndexScanOperation result = clusterTransaction.getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(storeIndex, storeTable); 1254 return result; 1255 } catch (ClusterJException ex) { 1256 throw new ClusterJException( 1257 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex); 1258 } 1259 } 1260 1261 /** Create a table scan operation for a table. 1262 * 1263 * @param storeTable the table 1264 * @return the table scan operation 1265 */ getTableScanOperation(Table storeTable)1266 public ScanOperation getTableScanOperation(Table storeTable) { 1267 assertActive(); 1268 try { 1269 ScanOperation result = clusterTransaction.getTableScanOperation(storeTable); 1270 return result; 1271 } catch (ClusterJException ex) { 1272 throw new ClusterJException( 1273 local.message("ERR_Table_Scan", storeTable.getName()), ex); 1274 } 1275 } 1276 1277 /** Create a table scan delete operation for a table. 1278 * 1279 * @param storeTable the table 1280 * @return the table scan operation 1281 */ getTableScanDeleteOperation(Table storeTable)1282 public ScanOperation getTableScanDeleteOperation(Table storeTable) { 1283 assertActive(); 1284 try { 1285 ScanOperation result = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable); 1286 return result; 1287 } catch (ClusterJException ex) { 1288 throw new ClusterJException( 1289 local.message("ERR_Table_Scan", storeTable.getName()), ex); 1290 } 1291 } 1292 1293 /** Create a unique index operation for an index and table. 1294 * 1295 * @param storeIndex the index 1296 * @param storeTable the table 1297 * @return the index operation 1298 */ getUniqueIndexOperation(Index storeIndex, Table storeTable)1299 public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) { 1300 assertActive(); 1301 try { 1302 IndexOperation result = clusterTransaction.getUniqueIndexOperation(storeIndex, storeTable); 1303 return result; 1304 } catch (ClusterJException ex) { 1305 throw new ClusterJException( 1306 local.message("ERR_Unique_Index", storeTable.getName(), storeIndex.getName()), ex); 1307 } 1308 } 1309 1310 /** Create a unique index update operation for an index and table. 1311 * @param storeIndex the index 1312 * @param storeTable the table 1313 * @return the index operation 1314 */ getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable)1315 public IndexOperation getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable) { 1316 assertActive(); 1317 try { 1318 IndexOperation result = clusterTransaction.getUniqueIndexUpdateOperation(storeIndex, storeTable); 1319 return result; 1320 } catch (ClusterJException ex) { 1321 throw new ClusterJException( 1322 local.message("ERR_Unique_Index_Update", storeTable.getName(), storeIndex.getName()), ex); 1323 } 1324 } 1325 1326 /** Create a select operation for a table. 1327 * 1328 * @param storeTable the table 1329 * @return the operation 1330 */ getSelectOperation(Table storeTable)1331 public Operation getSelectOperation(Table storeTable) { 1332 assertActive(); 1333 try { 1334 Operation result = clusterTransaction.getSelectOperation(storeTable); 1335 return result; 1336 } catch (ClusterJException ex) { 1337 throw new ClusterJException( 1338 local.message("ERR_Select", storeTable), ex); 1339 } 1340 } 1341 1342 /** Create a delete operation for a table. 1343 * 1344 * @param storeTable the table 1345 * @return the operation 1346 */ getDeleteOperation(Table storeTable)1347 public Operation getDeleteOperation(Table storeTable) { 1348 assertActive(); 1349 try { 1350 Operation result = clusterTransaction.getDeleteOperation(storeTable); 1351 return result; 1352 } catch (ClusterJException ex) { 1353 throw new ClusterJException( 1354 local.message("ERR_Delete", storeTable), ex); 1355 } 1356 } 1357 1358 /** Create an update operation for a table. 1359 * 1360 * @param storeTable the table 1361 * @return the operation 1362 */ getUpdateOperation(Table storeTable)1363 public Operation getUpdateOperation(Table storeTable) { 1364 assertActive(); 1365 try { 1366 Operation result = clusterTransaction.getUpdateOperation(storeTable); 1367 return result; 1368 } catch (ClusterJException ex) { 1369 throw new ClusterJException( 1370 local.message("ERR_Update", storeTable), ex); 1371 } 1372 } 1373 1374 /** Create an index delete operation for an index and table. 1375 * 1376 * @param storeIndex the index 1377 * @param storeTable the table 1378 * @return the index operation 1379 */ getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable)1380 public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) { 1381 assertActive(); 1382 try { 1383 IndexOperation result = clusterTransaction.getUniqueIndexDeleteOperation(storeIndex, storeTable); 1384 return result; 1385 } catch (ClusterJException ex) { 1386 throw new ClusterJException( 1387 local.message("ERR_Unique_Index_Delete", storeTable.getName(), storeIndex.getName()), ex); 1388 } 1389 } 1390 flush(boolean commit)1391 public void flush(boolean commit) { 1392 if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size()); 1393 if (!changeList.isEmpty()) { 1394 for (StateManager sm: changeList) { 1395 sm.flush(this); 1396 } 1397 changeList.clear(); 1398 } 1399 // now flush changes to the back end 1400 if (clusterTransaction != null) { 1401 if (commit) { 1402 internalCommit(); 1403 } else { 1404 executeNoCommit(); 1405 handleTransactionException(); 1406 } 1407 } 1408 } 1409 flush()1410 public void flush() { 1411 flush(false); 1412 } 1413 getChangeList()1414 public List getChangeList() { 1415 return Collections.unmodifiableList(changeList); 1416 } 1417 persist(Object instance)1418 public void persist(Object instance) { 1419 makePersistent(instance); 1420 } 1421 remove(Object instance)1422 public void remove(Object instance) { 1423 deletePersistent(instance); 1424 } 1425 markModified(StateManager instance)1426 public void markModified(StateManager instance) { 1427 changeList.add(instance); 1428 } 1429 setPartitionKey(Class<?> domainClass, Object key)1430 public void setPartitionKey(Class<?> domainClass, Object key) { 1431 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(domainClass); 1432 String tableName = domainTypeHandler.getTableName(); 1433 // if transaction is enlisted, throw a user exception 1434 if (isEnlisted()) { 1435 throw new ClusterJUserException( 1436 local.message("ERR_Set_Partition_Key_After_Enlistment", tableName)); 1437 } 1438 // if a partition key has already been set, throw a user exception 1439 if (this.partitionKey != null) { 1440 throw new ClusterJUserException( 1441 local.message("ERR_Set_Partition_Key_Twice", tableName)); 1442 } 1443 ValueHandler handler = domainTypeHandler.createKeyValueHandler(key, db); 1444 this.partitionKey= domainTypeHandler.createPartitionKey(handler); 1445 // if a transaction has already begun, tell the cluster transaction about the key 1446 if (clusterTransaction != null) { 1447 clusterTransaction.setPartitionKey(partitionKey); 1448 } 1449 } 1450 1451 /** Mark the field in the instance as modified so it is flushed. 1452 * 1453 * @param instance the persistent instance 1454 * @param fieldName the field to mark as modified 1455 */ markModified(Object instance, String fieldName)1456 public void markModified(Object instance, String fieldName) { 1457 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(instance); 1458 ValueHandler handler = domainTypeHandler.getValueHandler(instance); 1459 domainTypeHandler.objectMarkModified(handler, fieldName); 1460 } 1461 1462 /** Execute any pending operations (insert, delete, update, load) 1463 * and then perform post-execute operations (for load) via 1464 * clusterTransaction.postExecuteCallback(). 1465 * @param abort abort this transaction on error 1466 * @param force force the operation to be sent immediately 1467 */ executeNoCommit(boolean abort, boolean force)1468 public void executeNoCommit(boolean abort, boolean force) { 1469 if (clusterTransaction != null) { 1470 try { 1471 clusterTransaction.executeNoCommit(abort, force); 1472 } catch (ClusterJException cjex) { 1473 transactionException = cjex; 1474 } 1475 } 1476 } 1477 1478 /** Execute any pending operations (insert, delete, update, load) 1479 * and then perform post-execute operations (for load) via 1480 * clusterTransaction.postExecuteCallback(). 1481 * Do not abort the transaction on error. Force the operation to be sent immediately. 1482 */ executeNoCommit()1483 public void executeNoCommit() { 1484 executeNoCommit(false, true); 1485 } 1486 createQueryDomainType(DomainTypeHandler<T> domainTypeHandler)1487 public <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> domainTypeHandler) { 1488 QueryBuilderImpl builder = (QueryBuilderImpl)getQueryBuilder(); 1489 return builder.createQueryDefinition(domainTypeHandler); 1490 } 1491 1492 /** Return the coordinatedTransactionId of the current transaction. 1493 * The transaction might not have been enlisted. 1494 * @return the coordinatedTransactionId 1495 */ getCoordinatedTransactionId()1496 public String getCoordinatedTransactionId() { 1497 return clusterTransaction.getCoordinatedTransactionId(); 1498 } 1499 1500 /** Set the coordinatedTransactionId for the next transaction. This 1501 * will take effect as soon as the transaction is enlisted. 1502 * @param coordinatedTransactionId the coordinatedTransactionId 1503 */ setCoordinatedTransactionId(String coordinatedTransactionId)1504 public void setCoordinatedTransactionId(String coordinatedTransactionId) { 1505 clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId); 1506 } 1507 1508 /** Set the lock mode for subsequent operations. The lock mode takes effect immediately 1509 * and continues until set again. 1510 * @param lockmode the lock mode 1511 */ setLockMode(LockMode lockmode)1512 public void setLockMode(LockMode lockmode) { 1513 this.lockmode = lockmode; 1514 if (clusterTransaction != null) { 1515 clusterTransaction.setLockMode(lockmode); 1516 } 1517 } 1518 1519 /** Unload the schema associated with the domain class. This allows schema changes to work. 1520 * @param cls the class for which to unload the schema 1521 */ unloadSchema(Class<?> cls)1522 public String unloadSchema(Class<?> cls) { 1523 return factory.unloadSchema(cls, dictionary); 1524 } 1525 1526 /** Release resources associated with an instance. The instance must be a domain object obtained via 1527 * session.newInstance(T.class), find(T.class), or query; or Iterable<T>, or array T[]. 1528 * Resources released can include direct buffers used to hold instance data. 1529 * Released resourced may be returned to a pool. 1530 * @throws ClusterJUserException if the instance is not a domain object T, Iterable<T>, or array T[], 1531 * or if the object is used after calling this method. 1532 */ release(T param)1533 public <T> T release(T param) { 1534 // is the parameter an Iterable? 1535 if (Iterable.class.isAssignableFrom(param.getClass())) { 1536 Iterable<?> instances = (Iterable<?>)param; 1537 for (Object instance:instances) { 1538 release(instance); 1539 } 1540 } else 1541 // is the parameter an array? 1542 if (param.getClass().isArray()) { 1543 Object[] instances = (Object[])param; 1544 for (Object instance:instances) { 1545 release(instance); 1546 } 1547 } else 1548 // is the parameter a Dynamic Object? 1549 if (DynamicObject.class.isAssignableFrom(param.getClass())) { 1550 DynamicObject dynamicObject = (DynamicObject)param; 1551 DynamicObjectDelegate delegate = dynamicObject.delegate(); 1552 if (delegate != null) { 1553 delegate.release(); 1554 } 1555 // it must be a Proxy with a clusterj InvocationHandler 1556 } else { 1557 try { 1558 InvocationHandler handler = Proxy.getInvocationHandler(param); 1559 if (!ValueHandler.class.isAssignableFrom(handler.getClass())) { 1560 throw new ClusterJUserException(local.message("ERR_Release_Parameter")); 1561 } 1562 ValueHandler valueHandler = (ValueHandler)handler; 1563 valueHandler.release(); 1564 } catch (Throwable t) { 1565 throw new ClusterJUserException(local.message("ERR_Release_Parameter"), t); 1566 } 1567 } 1568 return param; 1569 } 1570 1571 } 1572