1 /*
2    Copyright (c) 2009, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 package com.mysql.clusterj.core;
26 
27 import com.mysql.clusterj.ClusterJException;
28 import com.mysql.clusterj.ClusterJFatalInternalException;
29 import com.mysql.clusterj.ClusterJUserException;
30 import com.mysql.clusterj.DynamicObject;
31 import com.mysql.clusterj.DynamicObjectDelegate;
32 import com.mysql.clusterj.LockMode;
33 import com.mysql.clusterj.Query;
34 import com.mysql.clusterj.Transaction;
35 
36 import com.mysql.clusterj.core.spi.DomainTypeHandler;
37 import com.mysql.clusterj.core.spi.SmartValueHandler;
38 import com.mysql.clusterj.core.spi.ValueHandler;
39 
40 import com.mysql.clusterj.core.query.QueryDomainTypeImpl;
41 import com.mysql.clusterj.core.query.QueryBuilderImpl;
42 import com.mysql.clusterj.core.query.QueryImpl;
43 
44 import com.mysql.clusterj.core.spi.SessionSPI;
45 
46 import com.mysql.clusterj.core.store.ClusterTransaction;
47 import com.mysql.clusterj.core.store.Db;
48 import com.mysql.clusterj.core.store.Dictionary;
49 import com.mysql.clusterj.core.store.Index;
50 import com.mysql.clusterj.core.store.IndexOperation;
51 import com.mysql.clusterj.core.store.IndexScanOperation;
52 import com.mysql.clusterj.core.store.Operation;
53 import com.mysql.clusterj.core.store.PartitionKey;
54 import com.mysql.clusterj.core.store.ResultData;
55 import com.mysql.clusterj.core.store.ScanOperation;
56 import com.mysql.clusterj.core.store.Table;
57 
58 import com.mysql.clusterj.core.util.I18NHelper;
59 import com.mysql.clusterj.core.util.Logger;
60 import com.mysql.clusterj.core.util.LoggerFactoryService;
61 
62 import com.mysql.clusterj.query.QueryBuilder;
63 import com.mysql.clusterj.query.QueryDefinition;
64 import com.mysql.clusterj.query.QueryDomainType;
65 
66 import java.lang.reflect.Proxy;
67 import java.lang.reflect.InvocationHandler;
68 
69 import java.util.ArrayList;
70 import java.util.BitSet;
71 import java.util.Collections;
72 import java.util.Iterator;
73 import java.util.List;
74 import java.util.Map;
75 
76 /**
77  * This class implements Session, the main user interface to ClusterJ.
78  * It also implements SessionSPI, the main component interface.
79  */
80 public class SessionImpl implements SessionSPI, CacheManager, StoreManager {
81 
82     /** My message translator */
83     static final I18NHelper local = I18NHelper.getInstance(SessionImpl.class);
84 
85     /** My logger */
86     static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionImpl.class);
87 
88     /** My Factory. */
89     protected SessionFactoryImpl factory;
90 
91     /** Db: one per session. */
92     protected Db db;
93 
94     /** Dictionary: one per session. */
95     protected Dictionary dictionary;
96 
97     /** One transaction at a time. */
98     protected TransactionImpl transactionImpl;
99 
100     /** The partition key */
101     protected PartitionKey partitionKey = null;
102 
103     /** Rollback only status */
104     protected boolean rollbackOnly = false;
105 
106     /** The underlying ClusterTransaction */
107     protected ClusterTransaction clusterTransaction;
108 
109     /** The transaction id to join */
110     protected String joinTransactionId = null;
111 
112     /** The properties for this session */
113     protected Map properties;
114 
115     /** Flags for iterating a scan */
116     protected final int RESULT_READY = 0;
117     protected final int SCAN_FINISHED = 1;
118     protected final int CACHE_EMPTY = 2;
119 
120     /** The list of objects changed since the last flush */
121     protected List<StateManager> changeList = new ArrayList<StateManager>();
122 
123     /** The list of pending operations, such as load operations, that need to be
124      * processed after the operation is sent to the database via @see #executeNoCommit().
125      */
126     protected List<Runnable> postExecuteOperations = new ArrayList<Runnable>();
127 
128     /** The transaction state of this session. */
129     protected TransactionState transactionState;
130 
131     /** The exception state of an internal transaction. */
132     private ClusterJException transactionException;
133 
134     /** Nested auto transaction counter. */
135     protected int nestedAutoTransactionCounter = 0;
136 
137     /** Number of retries for retriable exceptions */
138     // TODO get this from properties
139     protected int numberOfRetries = 5;
140 
141     /** The lock mode for read operations */
142     private LockMode lockmode = LockMode.READ_COMMITTED;
143 
144     /** Create a SessionImpl with factory, properties, Db, and dictionary
145      */
SessionImpl(SessionFactoryImpl factory, Map properties, Db db, Dictionary dictionary)146     SessionImpl(SessionFactoryImpl factory, Map properties, Db db, Dictionary dictionary) {
147         this.factory = factory;
148         this.db = db;
149         this.dictionary = dictionary;
150         this.properties = properties;
151         transactionImpl = new TransactionImpl(this);
152         transactionState = transactionStateNotActive;
153     }
154 
155     /** Create a query from a query definition.
156      *
157      * @param qd the query definition
158      * @return the query
159      */
createQuery(QueryDefinition<T> qd)160     public <T> Query<T> createQuery(QueryDefinition<T> qd) {
161         if (!(qd instanceof QueryDomainTypeImpl)) {
162             throw new ClusterJUserException(
163                     local.message("ERR_Exception_On_Method", "createQuery"));
164         }
165         return new QueryImpl<T>(this, (QueryDomainTypeImpl<T>)qd);
166     }
167 
168     /** Find an instance by its class and primary key.
169      * If there is a compound primary key, the key is an Object[] containing
170      * all of the primary key fields in order of declaration in annotations.
171      *
172      * @param cls the class
173      * @param key the primary key
174      * @return the instance
175      */
find(Class<T> cls, Object key)176     public <T> T find(Class<T> cls, Object key) {
177         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
178         ValueHandler keyHandler = domainTypeHandler.createKeyValueHandler(key, db);
179         // initialize from the database using the key
180         return initializeFromDatabase(domainTypeHandler, null, null, keyHandler);
181     }
182 
183     /** Initialize fields from the database. The keyHandler must
184      * contain the primary keys, none of which can be null.
185      * The instanceHandler, which may be null, manages the values
186      * of the instance. If it is null, both the instanceHandler and the
187      * instance are created if the instance exists in the database.
188      * The instance, which may be null, is the domain instance that is
189      * returned after loading the values from the database.
190      *
191      * @param domainTypeHandler domain type handler for the class
192      * @param keyHandler the primary key handler
193      * @param instanceHandler the handler for the instance
194      * (may be null if not yet initialized)
195      * @param instance the instance (may be null)
196      * @return the instance with fields initialized from the database
197      */
initializeFromDatabase(final DomainTypeHandler<T> domainTypeHandler, T instance, ValueHandler instanceHandler, ValueHandler keyHandler)198     public <T> T initializeFromDatabase(final DomainTypeHandler<T> domainTypeHandler,
199             T instance,
200             ValueHandler instanceHandler, ValueHandler keyHandler) {
201         startAutoTransaction();
202         if (keyHandler instanceof SmartValueHandler) {
203             try {
204                 final SmartValueHandler smartValueHandler = (SmartValueHandler)keyHandler;
205                 setPartitionKey(domainTypeHandler, smartValueHandler);
206                 // load the values from the database into the smart value handler
207                 @SuppressWarnings("unused")
208                 Operation operation = smartValueHandler.load(clusterTransaction);
209                 endAutoTransaction();
210                 if (isActive()) {
211                     // if this is a continuing transaction, flush the operation to get the result
212                     clusterTransaction.executeNoCommit(false, true);
213                 }
214                 if (smartValueHandler.found()) {
215                     // create a new proxy (or dynamic instance) with the smart value handler
216                     return domainTypeHandler.newInstance(smartValueHandler);
217                 } else {
218                     // not found
219                     return null;
220                 }
221             } catch (ClusterJException ex) {
222             failAutoTransaction();
223             throw ex;
224             }
225         }
226         try {
227             ResultData rs = selectUnique(domainTypeHandler, keyHandler, null);
228             if (rs.next()) {
229                 // we have a result; initialize the instance
230                 if (instanceHandler == null) {
231                     if (logger.isDetailEnabled()) logger.detail("Creating instanceHandler for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
232                     // we need both a new instance and its handler
233                     instance = domainTypeHandler.newInstance(db);
234                     instanceHandler = domainTypeHandler.getValueHandler(instance);
235                 } else if (instance == null) {
236                 if (logger.isDetailEnabled()) logger.detail("Creating instance for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
237                     // we have a handler but no instance
238                     instance = domainTypeHandler.getInstance(instanceHandler);
239                 }
240                 // found the instance in the datastore
241                 instanceHandler.found(Boolean.TRUE);
242                 // put the results into the instance
243                 domainTypeHandler.objectSetValues(rs, instanceHandler);
244                 // set the cache manager to track updates
245                 domainTypeHandler.objectSetCacheManager(this, instanceHandler);
246                 // reset modified bits in instance
247                 domainTypeHandler.objectResetModified(instanceHandler);
248             } else {
249                 if (logger.isDetailEnabled()) logger.detail("No instance found in database for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
250                 // no instance found in database
251                 if (instanceHandler != null) {
252                     // mark the handler as not found
253                     instanceHandler.found(Boolean.FALSE);
254                 }
255                 endAutoTransaction();
256                 return null;
257             }
258         } catch (ClusterJException ex) {
259             failAutoTransaction();
260             throw ex;
261         }
262         endAutoTransaction();
263         return instance;
264     }
265 
266     /** If a transaction is already enlisted, ignore. Otherwise, set
267      * the partition key based on the key handler.
268      * @param domainTypeHandler the domain type handler
269      * @param keyHandler the value handler that holds the key values
270      */
setPartitionKey(DomainTypeHandler<?> domainTypeHandler, ValueHandler keyHandler)271     private void setPartitionKey(DomainTypeHandler<?> domainTypeHandler,
272             ValueHandler keyHandler) {
273         if (!isEnlisted()) {
274             // there is still time to set the partition key
275             PartitionKey partitionKey =
276                 domainTypeHandler.createPartitionKey(keyHandler);
277             clusterTransaction.setPartitionKey(partitionKey);
278         }
279     }
280 
281     /** Create an instance of a class to be persisted.
282      *
283      * @param cls the class
284      * @return a new instance that can be used with makePersistent
285      */
newInstance(Class<T> cls)286     public <T> T newInstance(Class<T> cls) {
287         return factory.newInstance(cls, dictionary, db);
288     }
289 
290     /** Create an instance of a class to be persisted and set the primary key.
291      *
292      * @param cls the class
293      * @return a new instance that can be used with makePersistent,
294      * savePersistent, writePersistent, updatePersistent, or deletePersistent
295      */
newInstance(Class<T> cls, Object key)296     public <T> T newInstance(Class<T> cls, Object key) {
297         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
298         T instance = factory.newInstance(cls, dictionary, db);
299         domainTypeHandler.objectSetKeys(key, instance);
300         return instance;
301     }
302 
303     /** Create an instance from a result data row.
304      * @param resultData the result of a query
305      * @param domainTypeHandler the domain type handler
306      * @return the instance
307      */
newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler)308     public <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler) {
309         T result = domainTypeHandler.newInstance(resultData, db);
310         return result;
311     }
312 
313     /** Load the instance from the database into memory. Loading
314      * is asynchronous and will be executed when an operation requiring
315      * database access is executed: find, flush, or query. The instance must
316      * have been returned from find or query; or
317      * created via session.newInstance and its primary key initialized.
318      * @param object the instance to load
319      * @return the instance
320      * @see #found(Object)
321      */
load(final T object)322     public <T> T load(final T object) {
323         if (object == null) {
324             return null;
325         }
326         if (Iterable.class.isAssignableFrom(object.getClass())) {
327             Iterable<?> instances = (Iterable<?>)object;
328             for (Object instance:instances) {
329                 load(instance);
330             }
331             return object;
332         }
333         if (object.getClass().isArray()) {
334             Object[] instances = (Object[])object;
335             for (Object instance:instances) {
336                 load(instance);
337             }
338             return object;
339         }
340         // a transaction must already be active (autocommit is not supported)
341         assertActive();
342         final DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object);
343         final ValueHandler instanceHandler = domainTypeHandler.getValueHandler(object);
344         setPartitionKey(domainTypeHandler, instanceHandler);
345         if (instanceHandler instanceof SmartValueHandler) {
346             @SuppressWarnings("unused")
347             Operation operation = ((SmartValueHandler)instanceHandler).load(clusterTransaction);
348             return object;
349         }
350         Table storeTable = domainTypeHandler.getStoreTable();
351         // perform a primary key operation
352         final Operation op = clusterTransaction.getSelectOperation(storeTable);
353         op.beginDefinition();
354         // set the keys into the operation
355         domainTypeHandler.operationSetKeys(instanceHandler, op);
356         // set the expected columns into the operation
357         domainTypeHandler.operationGetValues(op);
358         op.endDefinition();
359         final ResultData rs = op.resultData(false);
360         final SessionImpl cacheManager = this;
361         // defer execution of the key operation until the next find, flush, or query
362         Runnable postExecuteOperation = new Runnable() {
363             public void run() {
364                 if (rs.next()) {
365                     // found row in database
366                     instanceHandler.found(Boolean.TRUE);
367                    // put the results into the instance
368                     domainTypeHandler.objectSetValues(rs, instanceHandler);
369                     // set the cache manager to track updates
370                     domainTypeHandler.objectSetCacheManager(cacheManager, instanceHandler);
371                     // reset modified bits in instance
372                     domainTypeHandler.objectResetModified(instanceHandler);
373                 } else {
374                     // mark instance as not found
375                     instanceHandler.found(Boolean.FALSE);
376                 }
377 
378             }
379         };
380         clusterTransaction.postExecuteCallback(postExecuteOperation);
381         return object;
382     }
383 
384     /** Was this instance found in the database?
385      * @param instance the instance
386      * @return <ul><li>null if the instance is null or was created via newInstance and never loaded;
387      * </li><li>true if the instance was returned from a find or query
388      * or created via newInstance and successfully loaded;
389      * </li><li>false if the instance was created via newInstance and not found.
390      * </li></ul>
391      */
found(Object instance)392     public Boolean found(Object instance) {
393         if (instance == null) {
394             return null;
395         }
396         if (instance instanceof DynamicObject) {
397             return ((DynamicObject)instance).found();
398         }
399         // make sure the instance is a persistent type
400         getDomainTypeHandler(instance);
401         return true;
402     }
403 
404     /** Make an instance persistent. Also recursively make an iterable collection or array persistent.
405      *
406      * @param object the instance or array or iterable collection of instances
407      * @return the instance
408      */
makePersistent(T object)409     public <T> T makePersistent(T object) {
410         if (object == null) {
411             return null;
412         }
413         if (Iterable.class.isAssignableFrom(object.getClass())) {
414             startAutoTransaction();
415             Iterable<?> instances = (Iterable<?>)object;
416             for (Object instance:instances) {
417                 makePersistent(instance);
418             }
419             endAutoTransaction();
420             return object;
421         }
422         if (object.getClass().isArray()) {
423             startAutoTransaction();
424             Object[] instances = (Object[])object;
425             for (Object instance:instances) {
426                 makePersistent(instance);
427             }
428             endAutoTransaction();
429             return object;
430         }
431         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(object);
432         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
433         insert(domainTypeHandler, valueHandler);
434         return object;
435     }
436 
insert( DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler)437     public Operation insert( DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) {
438         startAutoTransaction();
439         setPartitionKey(domainTypeHandler, valueHandler);
440         if (valueHandler instanceof SmartValueHandler) {
441             try {
442             SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler;
443             Operation result = smartValueHandler.insert(clusterTransaction);
444             valueHandler.resetModified();
445             endAutoTransaction();
446             return result;
447             } catch (ClusterJException cjex) {
448                 failAutoTransaction();
449                 throw cjex;
450             }
451         }
452         Operation op = null;
453         Table storeTable = null;
454         try {
455             storeTable = domainTypeHandler.getStoreTable();
456             op = clusterTransaction.getInsertOperation(storeTable);
457             // set all values in the operation, keys first
458             op.beginDefinition();
459             domainTypeHandler.operationSetKeys(valueHandler, op);
460             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
461             op.endDefinition();
462             // reset modified bits in instance
463             domainTypeHandler.objectResetModified(valueHandler);
464         } catch (ClusterJUserException cjuex) {
465             failAutoTransaction();
466             throw cjuex;
467         } catch (ClusterJException cjex) {
468             failAutoTransaction();
469             logger.error(local.message("ERR_Insert", storeTable.getName()));
470             throw new ClusterJException(
471                     local.message("ERR_Insert", storeTable.getName()), cjex);
472         } catch (RuntimeException rtex) {
473             failAutoTransaction();
474             logger.error(local.message("ERR_Insert", storeTable.getName()));
475             throw new ClusterJException(
476                     local.message("ERR_Insert", storeTable.getName()), rtex);
477         }
478         endAutoTransaction();
479         return op;
480     }
481 
482     /** Make a number of instances persistent.
483      *
484      * @param instances a Collection or array of objects to persist
485      * @return a Collection or array with the same order of iteration
486      */
makePersistentAll(Iterable instances)487     public Iterable makePersistentAll(Iterable instances) {
488         startAutoTransaction();
489         List<Object> result = new ArrayList<Object>();
490         for (Object instance:instances) {
491             result.add(makePersistent(instance));
492             }
493         endAutoTransaction();
494         return result;
495     }
496 
497     /** Delete an instance of a class from the database given its primary key.
498      * For single-column keys, the key parameter is a wrapper (e.g. Integer).
499      * For multi-column keys, the key parameter is an Object[] in which
500      * elements correspond to the primary keys in order as defined in the schema.
501      * @param cls the class
502      * @param key the primary key
503      */
deletePersistent(Class<T> cls, Object key)504     public <T> void deletePersistent(Class<T> cls, Object key) {
505         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
506         ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(key, db);
507         delete(domainTypeHandler, keyValueHandler);
508     }
509 
510     /** Remove an instance from the database. Only the key field(s)
511      * are used to identify the instance.
512      *
513      * @param object the instance to remove from the database
514      */
deletePersistent(Object object)515     public void deletePersistent(Object object) {
516         if (object == null) {
517             return;
518         }
519         DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
520         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
521         delete(domainTypeHandler, valueHandler);
522     }
523 
delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler)524     public Operation delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
525         startAutoTransaction();
526         Table storeTable = domainTypeHandler.getStoreTable();
527         setPartitionKey(domainTypeHandler, valueHandler);
528         if (valueHandler instanceof SmartValueHandler) {
529             try {
530             SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler;
531             Operation result = smartValueHandler.delete(clusterTransaction);
532             endAutoTransaction();
533             return result;
534             } catch (ClusterJException cjex) {
535                 failAutoTransaction();
536                 throw cjex;
537             }
538         }
539         Operation op = null;
540         try {
541             op = clusterTransaction.getDeleteOperation(storeTable);
542             op.beginDefinition();
543             domainTypeHandler.operationSetKeys(valueHandler, op);
544             op.endDefinition();
545         } catch (ClusterJException ex) {
546             failAutoTransaction();
547             throw new ClusterJException(
548                     local.message("ERR_Delete", storeTable.getName()), ex);
549         }
550         endAutoTransaction();
551         return op;
552     }
553 
554     /** Delete the instances corresponding to the parameters.
555      * @param objects the objects to delete
556      */
deletePersistentAll(Iterable objects)557     public void deletePersistentAll(Iterable objects) {
558         startAutoTransaction();
559         for (Iterator it = objects.iterator(); it.hasNext();) {
560             deletePersistent(it.next());
561         }
562         endAutoTransaction();
563     }
564 
565     /** Delete all instances of the parameter class.
566      * @param cls the class of instances to delete
567      */
deletePersistentAll(Class<T> cls)568     public <T> int deletePersistentAll(Class<T> cls) {
569         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
570         return deletePersistentAll(domainTypeHandler);
571     }
572 
573     /** Delete all instances of the parameter domainTypeHandler.
574      * @param domainTypeHandler the domainTypeHandler of instances to delete
575      * @return the number of instances deleted
576      */
deletePersistentAll(DomainTypeHandler<?> domainTypeHandler)577     public int deletePersistentAll(DomainTypeHandler<?> domainTypeHandler) {
578         startAutoTransaction();
579         Table storeTable = domainTypeHandler.getStoreTable();
580         String tableName = storeTable.getName();
581         ScanOperation op = null;
582         int count = 0;
583         try {
584             op = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
585             count = deletePersistentAll(op, true);
586         } catch (ClusterJException ex) {
587             failAutoTransaction();
588             // TODO add table name to the error message
589             throw new ClusterJException(
590                     local.message("ERR_Delete_All", tableName), ex);
591         }
592         endAutoTransaction();
593         return count;
594     }
595 
596     /** Delete all instances retrieved by the operation. The operation must have exclusive
597      * access to the instances and have the ScanFlag.KEY_INFO flag set.
598      * @param op the scan operation
599      * @return the number of instances deleted
600      */
deletePersistentAll(ScanOperation op, boolean abort)601     public int deletePersistentAll(ScanOperation op, boolean abort) {
602         int cacheCount = 0;
603         int count = 0;
604         boolean done = false;
605         boolean fetch = true;
606         // cannot use early autocommit optimization here
607         clusterTransaction.setAutocommit(false);
608         // execute the operation
609         clusterTransaction.executeNoCommit(true, true);
610         while (!done ) {
611             int result = op.nextResult(fetch);
612             switch (result) {
613                 case RESULT_READY:
614                     op.deleteCurrentTuple();
615                     ++count;
616                     ++cacheCount;
617                     fetch = false;
618                     break;
619                 case SCAN_FINISHED:
620                     done = true;
621                     if (cacheCount != 0) {
622                         clusterTransaction.executeNoCommit(abort, true);
623                     }
624                     op.close();
625                     break;
626                 case CACHE_EMPTY:
627                     clusterTransaction.executeNoCommit(abort, true);
628                     cacheCount = 0;
629                     fetch = true;
630                     break;
631                 default:
632                     throw new ClusterJException(
633                             local.message("ERR_Next_Result_Illegal", result));
634             }
635         }
636         return count;
637     }
638 
639     /** Select a single row from the database. Only the fields requested
640      * will be selected. A transaction must be active (either via begin
641      * or startAutoTransaction).
642      *
643      * @param domainTypeHandler the domainTypeHandler to be selected
644      * @param keyHandler the key supplier for the select
645      * @param fields the fields to select; null to select all fields
646      * @return the ResultData from the database
647      */
selectUnique(DomainTypeHandler<?> domainTypeHandler, ValueHandler keyHandler, BitSet fields)648     public ResultData selectUnique(DomainTypeHandler<?> domainTypeHandler,
649             ValueHandler keyHandler, BitSet fields) {
650         assertActive();
651         setPartitionKey(domainTypeHandler, keyHandler);
652         Table storeTable = domainTypeHandler.getStoreTable();
653         // perform a single select by key operation
654         Operation op = clusterTransaction.getSelectOperation(storeTable);
655         op.beginDefinition();
656         // set the keys into the operation
657         domainTypeHandler.operationSetKeys(keyHandler, op);
658         // set the expected columns into the operation
659         domainTypeHandler.operationGetValues(op);
660         op.endDefinition();
661         // execute the select and get results
662         ResultData rs = op.resultData();
663         return rs;
664     }
665 
666     /** Update an instance in the database. The key field(s)
667      * are used to identify the instance; modified fields change the
668      * values in the database.
669      *
670      * @param object the instance to update in the database
671      */
updatePersistent(Object object)672     public void updatePersistent(Object object) {
673         if (object == null) {
674             return;
675         }
676         DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object);
677         if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + object);
678         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
679         update(domainTypeHandler, valueHandler);
680     }
681 
update(DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler)682     public Operation update(DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) {
683         startAutoTransaction();
684         setPartitionKey(domainTypeHandler, valueHandler);
685         if (valueHandler instanceof SmartValueHandler) {
686             try {
687             SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler;
688             Operation result = smartValueHandler.update(clusterTransaction);
689             endAutoTransaction();
690             return result;
691             } catch (ClusterJException cjex) {
692                 failAutoTransaction();
693                 throw cjex;
694             }
695         }
696         Table storeTable = null;
697         Operation op = null;
698         try {
699             storeTable = domainTypeHandler.getStoreTable();
700             op = clusterTransaction.getUpdateOperation(storeTable);
701             domainTypeHandler.operationSetKeys(valueHandler, op);
702             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
703             if (logger.isDetailEnabled()) logger.detail("Updated object " +
704                     valueHandler);
705         } catch (ClusterJException ex) {
706             failAutoTransaction();
707             throw new ClusterJException(
708                     local.message("ERR_Update", storeTable.getName()) ,ex);
709         }
710         endAutoTransaction();
711         return op;
712     }
713 
714     /** Update the instances corresponding to the parameters.
715      * @param objects the objects to update
716      */
updatePersistentAll(Iterable objects)717     public void updatePersistentAll(Iterable objects) {
718         startAutoTransaction();
719         for (Iterator it = objects.iterator(); it.hasNext();) {
720             updatePersistent(it.next());
721         }
722         endAutoTransaction();
723     }
724 
725     /** Save the instance even if it does not exist.
726      * @param instance the instance to save
727      */
savePersistent(T instance)728     public <T> T savePersistent(T instance) {
729         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(instance);
730         if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + instance);
731         ValueHandler valueHandler = domainTypeHandler.getValueHandler(instance);
732         startAutoTransaction();
733         setPartitionKey(domainTypeHandler, valueHandler);
734         if (valueHandler instanceof SmartValueHandler) {
735             try {
736             SmartValueHandler smartValueHandler = (SmartValueHandler)valueHandler;
737             smartValueHandler.write(clusterTransaction);
738             valueHandler.resetModified();
739             endAutoTransaction();
740             return instance;
741             } catch (ClusterJException cjex) {
742                 failAutoTransaction();
743                 throw cjex;
744             }
745         }
746         Table storeTable = null;
747         try {
748             storeTable = domainTypeHandler.getStoreTable();
749             Operation op = null;
750             op = clusterTransaction.getWriteOperation(storeTable);
751             domainTypeHandler.operationSetKeys(valueHandler, op);
752             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
753         } catch (ClusterJException ex) {
754             failAutoTransaction();
755             throw new ClusterJException(
756                     local.message("ERR_Write", storeTable.getName()) ,ex);
757         }
758         endAutoTransaction();
759         return instance;
760     }
761 
762     /** Save the instances even if they do not exist.
763      * @param instances
764      */
savePersistentAll(Iterable instances)765     public Iterable savePersistentAll(Iterable instances) {
766         List<Object> result = new ArrayList<Object>();
767         startAutoTransaction();
768         for (Iterator it = instances.iterator(); it.hasNext();) {
769             result.add(savePersistent(it.next()));
770         }
771         endAutoTransaction();
772         return result;
773     }
774 
775     /** Get the current transaction.
776      *
777      * @return the transaction
778      */
currentTransaction()779     public Transaction currentTransaction() {
780         return transactionImpl;
781     }
782 
783     /** Close this session and deallocate all resources.
784      *
785      */
close()786     public void close() {
787         if (clusterTransaction != null) {
788             clusterTransaction.close();
789             clusterTransaction = null;
790         }
791         if (db != null) {
792             db.close();
793             db = null;
794         }
795     }
796 
isClosed()797     public boolean isClosed() {
798         return db==null;
799     }
800 
801     /** Assert this session is not yet closed. */
assertNotClosed()802     protected void assertNotClosed() {
803         if (isClosed()) {
804             throw new ClusterJUserException(
805                     local.message("ERR_Session_Closed"));
806         }
807     }
808 
809     /** Begin the current transaction.
810      *
811      */
begin()812     public void begin() {
813         if (logger.isDebugEnabled()) logger.debug("begin transaction.");
814         transactionState = transactionState.begin();
815         handleTransactionException();
816     }
817 
818     /** Internally begin the transaction.
819      * Called by transactionState.begin().
820      */
internalBegin()821     protected void internalBegin() {
822         try {
823             clusterTransaction = db.startTransaction(joinTransactionId);
824             clusterTransaction.setLockMode(lockmode);
825             // if a transaction has already begun, tell the cluster transaction about the key
826             if (partitionKey != null) {
827                 clusterTransaction.setPartitionKey(partitionKey);
828             }
829         } catch (ClusterJException ex) {
830             throw new ClusterJException(
831                     local.message("ERR_Ndb_Start"), ex);
832         }
833     }
834 
835     /** Commit the current transaction.
836      *
837      */
commit()838     public void commit() {
839         if (logger.isDebugEnabled()) logger.debug("commit transaction.");
840         transactionState = transactionState.commit();
841         handleTransactionException();
842     }
843 
844     /** Internally commit the transaction.
845      * Called by transactionState.commit().
846      */
internalCommit()847     protected void internalCommit() {
848         if (rollbackOnly) {
849             try {
850                 internalRollback();
851                 throw new ClusterJException(
852                         local.message("ERR_Transaction_Rollback_Only"));
853             } catch (ClusterJException ex) {
854                 throw new ClusterJException(
855                         local.message("ERR_Transaction_Rollback_Only"), ex);
856             }
857         }
858         try {
859             clusterTransaction.executeCommit();
860         } finally {
861             // always close the transaction
862             clusterTransaction.close();
863             clusterTransaction = null;
864             partitionKey = null;
865         }
866     }
867 
868     /** Roll back the current transaction.
869      *
870      */
rollback()871     public void rollback() {
872         if (logger.isDebugEnabled()) logger.debug("roll back transaction.");
873         transactionState = transactionState.rollback();
874         handleTransactionException();
875     }
876 
877     /** Internally roll back the transaction.
878      * Called by transactionState.rollback() and
879      * transactionState.commit() if the transaction is marked for rollback.
880      *
881      */
internalRollback()882     protected void internalRollback() {
883         try {
884                 clusterTransaction.executeRollback();
885         } catch (ClusterJException ex) {
886             throw new ClusterJException(
887                     local.message("ERR_Transaction_Execute", "rollback"), ex);
888         } finally {
889             if (clusterTransaction != null) {
890                 clusterTransaction.close();
891             }
892             clusterTransaction = null;
893             partitionKey = null;
894         }
895     }
896 
897     /** Start a transaction if there is not already an active transaction.
898      * Throw a ClusterJException if there is any problem.
899      */
startAutoTransaction()900     public void startAutoTransaction() {
901         if (logger.isDebugEnabled()) logger.debug("start AutoTransaction");
902         transactionState = transactionState.start();
903         handleTransactionException();
904     }
905 
906     /** End an auto transaction if it was started.
907      * Throw a ClusterJException if there is any problem.
908      */
endAutoTransaction()909     public void endAutoTransaction() {
910         if (logger.isDebugEnabled()) logger.debug("end AutoTransaction");
911         transactionState = transactionState.end();
912         handleTransactionException();
913     }
914 
915     /** Fail an auto transaction if it was started.
916      * Throw a ClusterJException if there is any problem.
917      */
failAutoTransaction()918     public void failAutoTransaction() {
919         if (logger.isDebugEnabled()) logger.debug("fail AutoTransaction");
920         transactionState = transactionState.fail();
921     }
922 
handleTransactionException()923     protected void handleTransactionException() {
924         if (transactionException == null) {
925             return;
926         } else {
927             ClusterJException ex = transactionException;
928             transactionException = null;
929             throw ex;
930         }
931     }
932 
933     /** Mark the current transaction as rollback only.
934      *
935      */
setRollbackOnly()936     public void setRollbackOnly() {
937         rollbackOnly = true;
938     }
939 
940     /** Is the current transaction marked for rollback only?
941      * @return true if the current transaction is marked for rollback only
942      */
getRollbackOnly()943     public boolean getRollbackOnly() {
944         return rollbackOnly;
945     }
946 
947     /** Manage the state of the transaction associated with this
948      * StateManager.
949      */
950     protected interface TransactionState {
isActive()951         boolean isActive();
952 
begin()953         TransactionState begin();
commit()954         TransactionState commit();
rollback()955         TransactionState rollback();
956 
start()957         TransactionState start();
end()958         TransactionState end();
fail()959         TransactionState fail();
960     }
961 
962     /** This represents the state of Transaction Not Active. */
963     protected TransactionState transactionStateNotActive =
964             new TransactionState() {
965 
966         public boolean isActive() {
967             return false;
968         }
969 
970         public TransactionState begin() {
971             try {
972                 internalBegin();
973                 return transactionStateActive;
974             } catch (ClusterJException ex) {
975                 transactionException = ex;
976                 return transactionStateNotActive;
977             }
978         }
979 
980         public TransactionState commit() {
981             transactionException = new ClusterJUserException(
982                     local.message("ERR_Transaction_Must_Be_Active_For_Method",
983                     "commit"));
984             return transactionStateNotActive;
985         }
986 
987         public TransactionState rollback() {
988             transactionException = new ClusterJUserException(
989                     local.message("ERR_Transaction_Must_Be_Active_For_Method",
990                     "rollback"));
991             return transactionStateNotActive;
992         }
993 
994         public TransactionState start() {
995             try {
996                 internalBegin();
997                 clusterTransaction.setAutocommit(true);
998                 nestedAutoTransactionCounter = 1;
999                 return transactionStateAutocommit;
1000             } catch (ClusterJException ex) {
1001                 transactionException = ex;
1002                 return transactionStateNotActive;
1003             }
1004         }
1005 
1006         public TransactionState end() {
1007             throw new ClusterJFatalInternalException(
1008                     local.message("ERR_Transaction_Auto_Start", "end"));
1009         }
1010 
1011         public TransactionState fail() {
1012             return transactionStateNotActive;
1013         }
1014 
1015     };
1016 
1017     /** This represents the state of Transaction Active. */
1018     protected TransactionState transactionStateActive =
1019             new TransactionState() {
1020 
1021         public boolean isActive() {
1022             return true;
1023         }
1024 
1025         public TransactionState begin() {
1026             transactionException = new ClusterJUserException(
1027                     local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
1028                     "begin"));
1029             return transactionStateActive;
1030         }
1031 
1032         public TransactionState commit() {
1033             try {
1034                 // flush unwritten changes
1035                 flush(true);
1036             } catch (ClusterJException ex) {
1037                 transactionException = ex;
1038             }
1039             return transactionStateNotActive;
1040         }
1041 
1042         public TransactionState rollback() {
1043             try {
1044                 internalRollback();
1045                 return transactionStateNotActive;
1046             } catch (ClusterJException ex) {
1047                 transactionException = ex;
1048                 return transactionStateNotActive;
1049             }
1050         }
1051 
1052         public TransactionState start() {
1053             // nothing to do
1054             return transactionStateActive;
1055         }
1056 
1057         public TransactionState end() {
1058             // nothing to do
1059             return transactionStateActive;
1060         }
1061 
1062         public TransactionState fail() {
1063             // nothing to do
1064             return transactionStateActive;
1065         }
1066 
1067     };
1068 
1069     protected TransactionState transactionStateAutocommit =
1070             new TransactionState() {
1071 
1072         public boolean isActive() {
1073             return true;
1074         }
1075 
1076         public TransactionState begin() {
1077             throw new ClusterJFatalInternalException(
1078                     local.message("ERR_Transaction_Auto_End", "begin"));
1079         }
1080 
1081         public TransactionState commit() {
1082             throw new ClusterJFatalInternalException(
1083                     local.message("ERR_Transaction_Auto_End", "commit"));
1084         }
1085 
1086         public TransactionState rollback() {
1087             throw new ClusterJFatalInternalException(
1088                     local.message("ERR_Transaction_Auto_End", "rollback"));
1089         }
1090 
1091         public TransactionState start() {
1092             // nested start; increment counter
1093             nestedAutoTransactionCounter++;
1094             return transactionStateAutocommit;
1095         }
1096 
1097         public TransactionState end() {
1098             if (--nestedAutoTransactionCounter > 0) {
1099                 return transactionStateAutocommit;
1100             } else if (nestedAutoTransactionCounter == 0) {
1101                 try {
1102                     internalCommit();
1103                 } catch (ClusterJException ex) {
1104                     transactionException = ex;
1105                 }
1106                 return transactionStateNotActive;
1107             } else {
1108             throw new ClusterJFatalInternalException(
1109                     local.message("ERR_Transaction_Auto_Start", "end"));
1110             }
1111         }
1112 
1113         public TransactionState fail() {
1114             try {
1115                 nestedAutoTransactionCounter = 0;
1116                 internalRollback();
1117                 return transactionStateNotActive;
1118             } catch (ClusterJException ex) {
1119                 // ignore failures caused by internal rollback
1120                 return transactionStateNotActive;
1121             }
1122         }
1123 
1124     };
1125 
1126     /** Get the domain type handler for an instance.
1127      *
1128      * @param object the instance for which to get the domain type handler
1129      * @return the domain type handler
1130      */
getDomainTypeHandler(T object)1131     protected synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(T object) {
1132         DomainTypeHandler<T> domainTypeHandler =
1133                 factory.getDomainTypeHandler(object, dictionary);
1134         return domainTypeHandler;
1135     }
1136 
1137     /** Get the domain type handler for a class.
1138      *
1139      * @param cls the class
1140      * @return the domain type handler
1141      */
getDomainTypeHandler(Class<T> cls)1142     public synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
1143         DomainTypeHandler<T> domainTypeHandler =
1144                 factory.getDomainTypeHandler(cls, dictionary);
1145         return domainTypeHandler;
1146     }
1147 
getDictionary()1148     public Dictionary getDictionary() {
1149         return dictionary;
1150     }
1151 
1152     /** Is there an active transaction.
1153      *
1154      * @return true if there is an active transaction
1155      */
isActive()1156     boolean isActive() {
1157         return transactionState.isActive();
1158     }
1159 
1160     /** Is the transaction enlisted. A transaction is enlisted if and only if
1161      * an operation has been defined that requires an ndb transaction to be
1162      * started.
1163      * @return true if the transaction is enlisted
1164      */
isEnlisted()1165     public boolean isEnlisted() {
1166         return clusterTransaction==null?false:clusterTransaction.isEnlisted();
1167     }
1168 
1169     /** Assert that there is an active transaction (the user has called begin
1170      * or an autotransaction has begun).
1171      * Throw a user exception if not.
1172      */
assertActive()1173     private void assertActive() {
1174         if (!transactionState.isActive()) {
1175             throw new ClusterJUserException(
1176                     local.message("ERR_Transaction_Must_Be_Active"));
1177         }
1178     }
1179 
1180     /** Assert that there is not an active transaction.
1181      * Throw a user exception if there is an active transaction.
1182      * @param methodName the name of the method
1183      */
assertNotActive(String methodName)1184     private void assertNotActive(String methodName) {
1185         if (transactionState.isActive()) {
1186             throw new ClusterJUserException(
1187                     local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
1188                     methodName));
1189         }
1190     }
1191 
1192     /** Create a query from a class.
1193      *
1194      * @param cls the class
1195      * @return the query
1196      */
createQuery(Class cls)1197     public Query createQuery(Class cls) {
1198         throw new UnsupportedOperationException(
1199                 local.message("ERR_NotImplemented"));
1200     }
1201 
1202     /** Get a query builder.
1203      *
1204      * @return the query builder
1205      */
getQueryBuilder()1206     public QueryBuilder getQueryBuilder() {
1207         return new QueryBuilderImpl(this);
1208     }
1209 
1210     /** Create an index scan operation for an index and table.
1211      *
1212      * @param storeIndex the index
1213      * @param storeTable the table
1214      * @return the index scan operation
1215      */
getIndexScanOperation(Index storeIndex, Table storeTable)1216     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
1217         assertActive();
1218         try {
1219             IndexScanOperation result = clusterTransaction.getIndexScanOperation(storeIndex, storeTable);
1220             return result;
1221         } catch (ClusterJException ex) {
1222             throw new ClusterJException(
1223                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1224         }
1225     }
1226 
1227     /** Create an index scan operation for an index and table to be used for a multi-range scan.
1228      *
1229      * @param storeIndex the index
1230      * @param storeTable the table
1231      * @return the index scan operation
1232      */
getIndexScanOperationMultiRange(Index storeIndex, Table storeTable)1233     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
1234         assertActive();
1235         try {
1236             IndexScanOperation result = clusterTransaction.getIndexScanOperationMultiRange(storeIndex, storeTable);
1237             return result;
1238         } catch (ClusterJException ex) {
1239             throw new ClusterJException(
1240                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1241         }
1242     }
1243 
1244     /** Create an index scan delete operation for an index and table.
1245      *
1246      * @param storeIndex the index
1247      * @param storeTable the table
1248      * @return the index scan operation
1249      */
getIndexScanDeleteOperation(Index storeIndex, Table storeTable)1250     public IndexScanOperation getIndexScanDeleteOperation(Index storeIndex, Table storeTable) {
1251         assertActive();
1252         try {
1253             IndexScanOperation result = clusterTransaction.getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(storeIndex, storeTable);
1254             return result;
1255         } catch (ClusterJException ex) {
1256             throw new ClusterJException(
1257                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1258         }
1259     }
1260 
1261     /** Create a table scan operation for a table.
1262      *
1263      * @param storeTable the table
1264      * @return the table scan operation
1265      */
getTableScanOperation(Table storeTable)1266     public ScanOperation getTableScanOperation(Table storeTable) {
1267         assertActive();
1268         try {
1269             ScanOperation result = clusterTransaction.getTableScanOperation(storeTable);
1270             return result;
1271         } catch (ClusterJException ex) {
1272             throw new ClusterJException(
1273                     local.message("ERR_Table_Scan", storeTable.getName()), ex);
1274         }
1275     }
1276 
1277     /** Create a table scan delete operation for a table.
1278     *
1279     * @param storeTable the table
1280     * @return the table scan operation
1281     */
getTableScanDeleteOperation(Table storeTable)1282    public ScanOperation getTableScanDeleteOperation(Table storeTable) {
1283        assertActive();
1284        try {
1285            ScanOperation result = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
1286            return result;
1287        } catch (ClusterJException ex) {
1288            throw new ClusterJException(
1289                    local.message("ERR_Table_Scan", storeTable.getName()), ex);
1290        }
1291    }
1292 
1293     /** Create a unique index operation for an index and table.
1294      *
1295      * @param storeIndex the index
1296      * @param storeTable the table
1297      * @return the index operation
1298      */
getUniqueIndexOperation(Index storeIndex, Table storeTable)1299     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
1300         assertActive();
1301         try {
1302             IndexOperation result = clusterTransaction.getUniqueIndexOperation(storeIndex, storeTable);
1303             return result;
1304         } catch (ClusterJException ex) {
1305             throw new ClusterJException(
1306                     local.message("ERR_Unique_Index", storeTable.getName(), storeIndex.getName()), ex);
1307         }
1308     }
1309 
1310     /** Create a unique index update operation for an index and table.
1311      * @param storeIndex the index
1312      * @param storeTable the table
1313      * @return the index operation
1314      */
getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable)1315     public IndexOperation getUniqueIndexUpdateOperation(Index storeIndex, Table storeTable) {
1316         assertActive();
1317         try {
1318             IndexOperation result = clusterTransaction.getUniqueIndexUpdateOperation(storeIndex, storeTable);
1319             return result;
1320         } catch (ClusterJException ex) {
1321             throw new ClusterJException(
1322                     local.message("ERR_Unique_Index_Update", storeTable.getName(), storeIndex.getName()), ex);
1323         }
1324     }
1325 
1326     /** Create a select operation for a table.
1327      *
1328      * @param storeTable the table
1329      * @return the operation
1330      */
getSelectOperation(Table storeTable)1331     public Operation getSelectOperation(Table storeTable) {
1332         assertActive();
1333         try {
1334             Operation result = clusterTransaction.getSelectOperation(storeTable);
1335             return result;
1336         } catch (ClusterJException ex) {
1337             throw new ClusterJException(
1338                     local.message("ERR_Select", storeTable), ex);
1339         }
1340     }
1341 
1342     /** Create a delete operation for a table.
1343      *
1344      * @param storeTable the table
1345      * @return the operation
1346      */
getDeleteOperation(Table storeTable)1347     public Operation getDeleteOperation(Table storeTable) {
1348         assertActive();
1349         try {
1350             Operation result = clusterTransaction.getDeleteOperation(storeTable);
1351             return result;
1352         } catch (ClusterJException ex) {
1353             throw new ClusterJException(
1354                     local.message("ERR_Delete", storeTable), ex);
1355         }
1356     }
1357 
1358     /** Create an update operation for a table.
1359      *
1360      * @param storeTable the table
1361      * @return the operation
1362      */
getUpdateOperation(Table storeTable)1363     public Operation getUpdateOperation(Table storeTable) {
1364         assertActive();
1365         try {
1366             Operation result = clusterTransaction.getUpdateOperation(storeTable);
1367             return result;
1368         } catch (ClusterJException ex) {
1369             throw new ClusterJException(
1370                     local.message("ERR_Update", storeTable), ex);
1371         }
1372     }
1373 
1374     /** Create an index delete operation for an index and table.
1375     *
1376     * @param storeIndex the index
1377     * @param storeTable the table
1378     * @return the index operation
1379     */
getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable)1380    public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
1381        assertActive();
1382        try {
1383            IndexOperation result = clusterTransaction.getUniqueIndexDeleteOperation(storeIndex, storeTable);
1384            return result;
1385        } catch (ClusterJException ex) {
1386            throw new ClusterJException(
1387                    local.message("ERR_Unique_Index_Delete", storeTable.getName(), storeIndex.getName()), ex);
1388        }
1389    }
1390 
flush(boolean commit)1391     public void flush(boolean commit) {
1392         if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
1393         if (!changeList.isEmpty()) {
1394             for (StateManager sm: changeList) {
1395                 sm.flush(this);
1396             }
1397             changeList.clear();
1398         }
1399         // now flush changes to the back end
1400         if (clusterTransaction != null) {
1401             if (commit) {
1402                 internalCommit();
1403             } else {
1404                 executeNoCommit();
1405                 handleTransactionException();
1406             }
1407         }
1408     }
1409 
flush()1410     public void flush() {
1411         flush(false);
1412     }
1413 
getChangeList()1414     public List getChangeList() {
1415         return Collections.unmodifiableList(changeList);
1416     }
1417 
persist(Object instance)1418     public void persist(Object instance) {
1419         makePersistent(instance);
1420     }
1421 
remove(Object instance)1422     public void remove(Object instance) {
1423         deletePersistent(instance);
1424     }
1425 
markModified(StateManager instance)1426     public void markModified(StateManager instance) {
1427         changeList.add(instance);
1428     }
1429 
setPartitionKey(Class<?> domainClass, Object key)1430     public void setPartitionKey(Class<?> domainClass, Object key) {
1431         DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(domainClass);
1432         String tableName = domainTypeHandler.getTableName();
1433         // if transaction is enlisted, throw a user exception
1434         if (isEnlisted()) {
1435             throw new ClusterJUserException(
1436                     local.message("ERR_Set_Partition_Key_After_Enlistment", tableName));
1437         }
1438         // if a partition key has already been set, throw a user exception
1439         if (this.partitionKey != null) {
1440             throw new ClusterJUserException(
1441                     local.message("ERR_Set_Partition_Key_Twice", tableName));
1442         }
1443         ValueHandler handler = domainTypeHandler.createKeyValueHandler(key, db);
1444         this.partitionKey= domainTypeHandler.createPartitionKey(handler);
1445         // if a transaction has already begun, tell the cluster transaction about the key
1446         if (clusterTransaction != null) {
1447             clusterTransaction.setPartitionKey(partitionKey);
1448         }
1449     }
1450 
1451     /** Mark the field in the instance as modified so it is flushed.
1452      *
1453      * @param instance the persistent instance
1454      * @param fieldName the field to mark as modified
1455      */
markModified(Object instance, String fieldName)1456     public void markModified(Object instance, String fieldName) {
1457         DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(instance);
1458         ValueHandler handler = domainTypeHandler.getValueHandler(instance);
1459         domainTypeHandler.objectMarkModified(handler, fieldName);
1460     }
1461 
1462     /** Execute any pending operations (insert, delete, update, load)
1463      * and then perform post-execute operations (for load) via
1464      * clusterTransaction.postExecuteCallback().
1465      * @param abort abort this transaction on error
1466      * @param force force the operation to be sent immediately
1467      */
executeNoCommit(boolean abort, boolean force)1468     public void executeNoCommit(boolean abort, boolean force) {
1469         if (clusterTransaction != null) {
1470             try {
1471             clusterTransaction.executeNoCommit(abort, force);
1472             } catch (ClusterJException cjex) {
1473                 transactionException = cjex;
1474             }
1475         }
1476     }
1477 
1478     /** Execute any pending operations (insert, delete, update, load)
1479      * and then perform post-execute operations (for load) via
1480      * clusterTransaction.postExecuteCallback().
1481      * Do not abort the transaction on error. Force the operation to be sent immediately.
1482      */
executeNoCommit()1483     public void executeNoCommit() {
1484         executeNoCommit(false, true);
1485     }
1486 
createQueryDomainType(DomainTypeHandler<T> domainTypeHandler)1487     public <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> domainTypeHandler) {
1488         QueryBuilderImpl builder = (QueryBuilderImpl)getQueryBuilder();
1489         return builder.createQueryDefinition(domainTypeHandler);
1490     }
1491 
1492     /** Return the coordinatedTransactionId of the current transaction.
1493      * The transaction might not have been enlisted.
1494      * @return the coordinatedTransactionId
1495      */
getCoordinatedTransactionId()1496     public String getCoordinatedTransactionId() {
1497         return clusterTransaction.getCoordinatedTransactionId();
1498     }
1499 
1500     /** Set the coordinatedTransactionId for the next transaction. This
1501      * will take effect as soon as the transaction is enlisted.
1502      * @param coordinatedTransactionId the coordinatedTransactionId
1503      */
setCoordinatedTransactionId(String coordinatedTransactionId)1504     public void setCoordinatedTransactionId(String coordinatedTransactionId) {
1505         clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
1506     }
1507 
1508     /** Set the lock mode for subsequent operations. The lock mode takes effect immediately
1509      * and continues until set again.
1510      * @param lockmode the lock mode
1511      */
setLockMode(LockMode lockmode)1512     public void setLockMode(LockMode lockmode) {
1513         this.lockmode = lockmode;
1514         if (clusterTransaction != null) {
1515             clusterTransaction.setLockMode(lockmode);
1516         }
1517     }
1518 
1519     /** Unload the schema associated with the domain class. This allows schema changes to work.
1520      * @param cls the class for which to unload the schema
1521      */
unloadSchema(Class<?> cls)1522     public String unloadSchema(Class<?> cls) {
1523         return factory.unloadSchema(cls, dictionary);
1524     }
1525 
1526     /** Release resources associated with an instance. The instance must be a domain object obtained via
1527      * session.newInstance(T.class), find(T.class), or query; or Iterable<T>, or array T[].
1528      * Resources released can include direct buffers used to hold instance data.
1529      * Released resourced may be returned to a pool.
1530      * @throws ClusterJUserException if the instance is not a domain object T, Iterable<T>, or array T[],
1531      * or if the object is used after calling this method.
1532      */
release(T param)1533     public <T> T release(T param) {
1534         // is the parameter an Iterable?
1535         if (Iterable.class.isAssignableFrom(param.getClass())) {
1536             Iterable<?> instances = (Iterable<?>)param;
1537             for (Object instance:instances) {
1538                 release(instance);
1539             }
1540         } else
1541         // is the parameter an array?
1542         if (param.getClass().isArray()) {
1543             Object[] instances = (Object[])param;
1544             for (Object instance:instances) {
1545                 release(instance);
1546             }
1547         } else
1548         // is the parameter a Dynamic Object?
1549         if (DynamicObject.class.isAssignableFrom(param.getClass())) {
1550             DynamicObject dynamicObject = (DynamicObject)param;
1551             DynamicObjectDelegate delegate = dynamicObject.delegate();
1552             if (delegate != null) {
1553                 delegate.release();
1554             }
1555         // it must be a Proxy with a clusterj InvocationHandler
1556         } else {
1557             try {
1558                 InvocationHandler handler = Proxy.getInvocationHandler(param);
1559                 if (!ValueHandler.class.isAssignableFrom(handler.getClass())) {
1560                     throw new ClusterJUserException(local.message("ERR_Release_Parameter"));
1561                 }
1562                 ValueHandler valueHandler = (ValueHandler)handler;
1563                 valueHandler.release();
1564             } catch (Throwable t) {
1565                 throw new ClusterJUserException(local.message("ERR_Release_Parameter"), t);
1566             }
1567         }
1568         return param;
1569     }
1570 
1571 }
1572