1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je;
9 
10 import java.io.Closeable;
11 import java.util.Arrays;
12 import java.util.Comparator;
13 import java.util.logging.Level;
14 
15 import com.sleepycat.je.dbi.GetMode;
16 import com.sleepycat.je.dbi.CursorImpl.SearchMode;
17 import com.sleepycat.je.dbi.RecordVersion;
18 import com.sleepycat.je.txn.Locker;
19 import com.sleepycat.je.utilint.DatabaseUtil;
20 import com.sleepycat.je.utilint.Pair;
21 
22 /**
23  * A specialized join cursor for use in performing equality or natural joins on
24  * secondary indices.
25  *
26  * <p>A join cursor is returned when calling {@link Database#join
27  * Database.join}.</p>
28  *
29  * <p>To open a join cursor using two secondary cursors:</p>
30  *
31  * <pre>
32  *     Transaction txn = ...
33  *     Database primaryDb = ...
34  *     SecondaryDatabase secondaryDb1 = ...
35  *     SecondaryDatabase secondaryDb2 = ...
36  *     <p>
37  *     SecondaryCursor cursor1 = null;
38  *     SecondaryCursor cursor2 = null;
39  *     JoinCursor joinCursor = null;
40  *     try {
41  *         DatabaseEntry key = new DatabaseEntry();
42  *         DatabaseEntry data = new DatabaseEntry();
43  *         <p>
44  *         cursor1 = secondaryDb1.openSecondaryCursor(txn, null);
45  *         cursor2 = secondaryDb2.openSecondaryCursor(txn, null);
46  *         <p>
47  *         key.setData(...); // initialize key for secondary index 1
48  *         OperationStatus status1 =
49  *         cursor1.getSearchKey(key, data, LockMode.DEFAULT);
50  *         key.setData(...); // initialize key for secondary index 2
51  *         OperationStatus status2 =
52  *         cursor2.getSearchKey(key, data, LockMode.DEFAULT);
53  *         <p>
54  *         if (status1 == OperationStatus.SUCCESS &amp;&amp;
55  *                 status2 == OperationStatus.SUCCESS) {
56  *             <p>
57  *             SecondaryCursor[] cursors = {cursor1, cursor2};
58  *             joinCursor = primaryDb.join(cursors, null);
59  *             <p>
60  *             while (true) {
61  *                 OperationStatus joinStatus = joinCursor.getNext(key, data,
62  *                     LockMode.DEFAULT);
63  *                 if (joinStatus == OperationStatus.SUCCESS) {
64  *                      // Do something with the key and data.
65  *                 } else {
66  *                     break;
67  *                 }
68  *             }
69  *         }
70  *     } finally {
71  *         if (cursor1 != null) {
72  *             cursor1.close();
73  *         }
74  *         if (cursor2 != null) {
75  *             cursor2.close();
76  *         }
77  *         if (joinCursor != null) {
78  *             joinCursor.close();
79  *         }
80  *     }
81  * </pre>
82  *
83  * <p>The join algorithm is described here so that its cost can be estimated and
84  * compared to other approaches for performing a query.  Say that N cursors are
85  * provided for the join operation. According to the order they appear in the
86  * array the cursors are labeled C(1) through C(n), and the keys at each cursor
87  * position are labeled K(1) through K(n).</p>
88  *
89  * <ol>
90  *
91  * <li>Using C(1), the join algorithm iterates sequentially through all records
92  * having K(1).  This iteration is equivalent to a {@link Cursor#getNextDup
93  * Cursor.getNextDup} operation on the secondary index.  The primary key of a
94  * candidate record is determined in this manner.  The primary record itself is
95  * not retrieved and the primary database is not accessed.</li>
96  *
97  * <li>For each candidate primary key found in step 1, a Btree lookup is
98  * performed using C(2) through C(n), in that order.  The Btree lookups are
99  * exact searches to determine whether the candidate record also contains
100  * secondary keys K(2) through K(n).  The lookups are equivalent to a {@link
101  * Cursor#getSearchBoth Cursor.getSearchBoth} operation on the secondary index.
102  * The primary record itself is not retrieved and the primary database is not
103  * accessed.</li>
104  *
105  * <li>If any lookup in step 2 fails, the algorithm advances to the next
106  * candidate record using C(1).  Lookups are performed in the order of the
107  * cursor array, and the algorithm proceeds to the next C(1) candidate key as
108  * soon as a single lookup fails.</li>
109  *
110  * <li>If all lookups in step 2 succeed, then the matching key and/or data is
111  * returned by the {@code getNext} method.  If the {@link
112  * #getNext(DatabaseEntry,DatabaseEntry,LockMode)} method signature is used,
113  * then the primary database is read to obtain the record data, as if {@link
114  * Cursor#getSearchKey Cursor.getSearchKey} were called for the primary
115  * database.  If the {@link #getNext(DatabaseEntry,LockMode)} method signature
116  * is used, then only the primary key is returned and the primary database is
117  * not accessed.</li>
118  *
119  * <li>The algorithm ends when C(1) has no more candidate records with K(1),
120  * and the {@code getNext} method will then return {@link
121  * com.sleepycat.je.OperationStatus#NOTFOUND OperationStatus.NOTFOUND}.</li>
122  *
123  * </ol>
124  */
125 public class JoinCursor implements Closeable {
126 
127     private JoinConfig config;
128     private Database priDb;
129     private Cursor[] secCursors;
130     private DatabaseEntry[] cursorScratchEntries;
131     private DatabaseEntry scratchEntry;
132     private DatabaseEntry firstSecKey;
133     private boolean[] cursorFetchedFirst;
134 
135     /**
136      * Creates a join cursor without parameter checking.
137      */
JoinCursor(Locker locker, Database primaryDb, final Cursor[] cursors, JoinConfig configParam)138     JoinCursor(Locker locker,
139                Database primaryDb,
140                final Cursor[] cursors,
141                JoinConfig configParam)
142         throws DatabaseException {
143 
144         priDb = primaryDb;
145         config = (configParam != null) ? configParam.clone()
146                                        : JoinConfig.DEFAULT;
147         scratchEntry = new DatabaseEntry();
148         firstSecKey = new DatabaseEntry();
149         cursorScratchEntries = new DatabaseEntry[cursors.length];
150         for (int i = 0; i < cursors.length; i += 1) {
151             cursorScratchEntries[i] = new DatabaseEntry();
152         }
153         cursorFetchedFirst = new boolean[cursors.length];
154         Cursor[] sortedCursors = new Cursor[cursors.length];
155         System.arraycopy(cursors, 0, sortedCursors, 0, cursors.length);
156 
157         if (!config.getNoSort()) {
158 
159             /*
160              * Sort ascending by duplicate count.  Collect counts before
161              * sorting so that countEstimate is called only once per cursor.
162              */
163             final long[] counts = new long[cursors.length];
164             for (int i = 0; i < cursors.length; i += 1) {
165                 counts[i] = cursors[i].countEstimateInternal();
166                 assert counts[i] >= 0;
167             }
168             Arrays.sort(sortedCursors, new Comparator<Cursor>() {
169                 public int compare(Cursor o1, Cursor o2) {
170                     long count1 = -1;
171                     long count2 = -1;
172 
173                     /*
174                      * Scan for objects in cursors not sortedCursors since
175                      * sortedCursors is being sorted in place.
176                      */
177                     for (int i = 0; i < cursors.length &&
178                                     (count1 < 0 || count2 < 0); i += 1) {
179                         if (cursors[i] == o1) {
180                             count1 = counts[i];
181                         } else if (cursors[i] == o2) {
182                             count2 = counts[i];
183                         }
184                     }
185                     assert count1 >= 0 && count2 >= 0;
186                     long cmp = count1 - count2;
187                     return (cmp < 0) ? (-1) : ((cmp > 0) ? 1 : 0);
188                 }
189             });
190         }
191 
192         /*
193          * Dup cursors last.  If an error occurs before the constructor is
194          * complete, close them and ignore exceptions during close.
195          */
196         try {
197             secCursors = new Cursor[cursors.length];
198             for (int i = 0; i < cursors.length; i += 1) {
199                 secCursors[i] = sortedCursors[i].dup(true);
200             }
201         } catch (DatabaseException e) {
202             close(e); /* will throw e */
203         }
204     }
205 
206     /**
207      * Closes the cursors that have been opened by this join cursor.
208      *
209      * <p>The cursors passed to {@link Database#join Database.join} are not
210      * closed by this method, and should be closed by the caller.</p>
211      *
212      * <p>WARNING: To guard against memory leaks, the application should
213      * discard all references to the closed handle.  While BDB makes an effort
214      * to discard references from closed objects to the allocated memory for an
215      * environment, this behavior is not guaranteed.  The safe course of action
216      * for an application is to discard all references to closed BDB
217      * objects.</p>
218      *
219      * @throws EnvironmentFailureException if an unexpected, internal or
220      * environment-wide failure occurs.
221      */
close()222     public void close()
223         throws DatabaseException {
224 
225         if (priDb == null) {
226             return;
227         }
228         close(null);
229     }
230 
231     /**
232      * Close all cursors we own, throwing only the first exception that occurs.
233      *
234      * @param firstException an exception that has already occured, or null.
235      */
close(DatabaseException firstException)236     private void close(DatabaseException firstException)
237         throws DatabaseException {
238 
239         priDb = null;
240         for (int i = 0; i < secCursors.length; i += 1) {
241             if (secCursors[i] != null) {
242                 try {
243                     secCursors[i].close();
244                 } catch (DatabaseException e) {
245                     if (firstException == null) {
246                         firstException = e;
247                     }
248                 }
249                 secCursors[i] = null;
250             }
251         }
252         if (firstException != null) {
253             throw firstException;
254         }
255     }
256 
257     /**
258      * For unit testing.
259      */
getSortedCursors()260     Cursor[] getSortedCursors() {
261         return secCursors;
262     }
263 
264     /**
265      * Returns the primary database handle associated with this cursor.
266      *
267      * @return the primary database handle associated with this cursor.
268      */
getDatabase()269     public Database getDatabase() {
270 
271         return priDb;
272     }
273 
274     /**
275      * Returns this object's configuration.
276      *
277      * @return this object's configuration.
278      */
getConfig()279     public JoinConfig getConfig() {
280 
281         return config.clone();
282     }
283 
284     /**
285      * Returns the next primary key resulting from the join operation.
286      *
287      * <p>An entry is returned by the join cursor for each primary key/data
288      * pair having all secondary key values that were specified using the array
289      * of secondary cursors passed to {@link Database#join Database.join}.</p>
290      *
291      * <p>In a replicated environment, an explicit transaction must have been
292      * specified when opening each cursor, unless read-uncommitted isolation is
293      * specified via the {@code CursorConfig} or {@code LockMode}
294      * parameters.</p>
295      *
296      * @param key the primary key returned as output.  Its byte array does not
297      * need to be initialized by the caller.
298      *
299      * @return {@link com.sleepycat.je.OperationStatus#NOTFOUND
300      * OperationStatus.NOTFOUND} if no matching key/data pair is found;
301      * otherwise, {@link com.sleepycat.je.OperationStatus#SUCCESS
302      * OperationStatus.SUCCESS}.
303      *
304      * @param lockMode the locking attributes; if null, default attributes
305      * are used.
306      *
307      * @throws OperationFailureException if one of the <a
308      * href="OperationFailureException.html#readFailures">Read Operation
309      * Failures</a> occurs.
310      *
311      * @throws EnvironmentFailureException if an unexpected, internal or
312      * environment-wide failure occurs.
313      *
314      * @throws IllegalArgumentException if an invalid parameter is specified,
315      * for example, if a DatabaseEntry parameter is null or does not contain a
316      * required non-null byte array.
317      */
getNext(DatabaseEntry key, LockMode lockMode)318     public OperationStatus getNext(DatabaseEntry key,
319                                    LockMode lockMode)
320         throws DatabaseException {
321 
322         secCursors[0].checkEnv();
323         DatabaseUtil.checkForNullDbt(key, "key", false);
324         secCursors[0].trace(Level.FINEST, "JoinCursor.getNext(key): ",
325                             lockMode);
326 
327         return retrieveNext(key, null, lockMode);
328     }
329 
330     /**
331      * Returns the next primary key and data resulting from the join operation.
332      *
333      * <p>An entry is returned by the join cursor for each primary key/data
334      * pair having all secondary key values that were specified using the array
335      * of secondary cursors passed to {@link Database#join Database.join}.</p>
336      *
337      * <p>In a replicated environment, an explicit transaction must have been
338      * specified when opening each cursor, unless read-uncommitted isolation is
339      * specified via the {@code CursorConfig} or {@code LockMode}
340      * parameters.</p>
341      *
342      * @param key the primary key returned as output.  Its byte array does not
343      * need to be initialized by the caller.
344      *
345      * @param data the primary data returned as output.  Its byte array does
346      * not need to be initialized by the caller.
347      *
348      * @return {@link com.sleepycat.je.OperationStatus#NOTFOUND
349      * OperationStatus.NOTFOUND} if no matching key/data pair is found;
350      * otherwise, {@link com.sleepycat.je.OperationStatus#SUCCESS
351      * OperationStatus.SUCCESS}.
352      *
353      * @param lockMode the locking attributes; if null, default attributes
354      * are used.
355      *
356      * @throws OperationFailureException if one of the <a
357      * href="OperationFailureException.html#readFailures">Read Operation
358      * Failures</a> occurs.
359      *
360      * @throws EnvironmentFailureException if an unexpected, internal or
361      * environment-wide failure occurs.
362      *
363      * @throws IllegalArgumentException if an invalid parameter is specified,
364      * for example, if a DatabaseEntry parameter is null or does not contain a
365      * required non-null byte array.
366      */
getNext(DatabaseEntry key, DatabaseEntry data, LockMode lockMode)367     public OperationStatus getNext(DatabaseEntry key,
368                                    DatabaseEntry data,
369                                    LockMode lockMode)
370         throws DatabaseException {
371 
372         secCursors[0].checkEnv();
373         DatabaseUtil.checkForNullDbt(key, "key", false);
374         DatabaseUtil.checkForNullDbt(data, "data", false);
375         secCursors[0].trace(Level.FINEST, "JoinCursor.getNext(key,data): ",
376                             lockMode);
377 
378         return retrieveNext(key, data, lockMode);
379     }
380 
381     /**
382      * Internal version of getNext(), with an optional data param.
383      * <p>
384      * Since duplicates are always sorted and duplicate-duplicates are not
385      * allowed, a natural join can be implemented by simply traversing through
386      * the duplicates of the first cursor to find candidate keys, and then
387      * looking for each candidate key in the duplicate set of the other
388      * cursors, without ever reseting a cursor to the beginning of the
389      * duplicate set.
390      * <p>
391      * This only works when the same duplicate comparison method is used for
392      * all cursors.  We don't check for that, we just assume the user won't
393      * violate that rule.
394      * <p>
395      * A future optimization would be to add a SearchMode.BOTH_DUPS operation
396      * and use it instead of using SearchMode.BOTH.  This would be the
397      * equivalent of the undocumented DB_GET_BOTHC operation used by DB core's
398      * join() implementation.
399      */
retrieveNext(DatabaseEntry keyParam, DatabaseEntry dataParam, LockMode lockMode)400     private OperationStatus retrieveNext(DatabaseEntry keyParam,
401                                          DatabaseEntry dataParam,
402                                          LockMode lockMode)
403         throws DatabaseException {
404 
405         boolean readUncommitted =
406             secCursors[0].isReadUncommittedMode(lockMode);
407 
408         outerLoop: while (true) {
409 
410             /* Process the first cursor to get a candidate key. */
411             Cursor secCursor = secCursors[0];
412             DatabaseEntry candidateKey = cursorScratchEntries[0];
413             OperationStatus status;
414             if (!cursorFetchedFirst[0]) {
415                 /* Get first duplicate at initial cursor position. */
416                 status = secCursor.getCurrentInternal(
417                     firstSecKey, candidateKey, lockMode);
418                 if (readUncommitted && status == OperationStatus.KEYEMPTY) {
419                     /* Deleted underneath read-uncommitted cursor; skip it. */
420                     cursorFetchedFirst[0] = true;
421                     continue;
422                 }
423                 cursorFetchedFirst[0] = true;
424             } else {
425                 /* Already initialized, move to the next candidate key. */
426                 status = secCursor.retrieveNext(firstSecKey, candidateKey,
427                                                 lockMode, GetMode.NEXT_DUP);
428             }
429             if (status != OperationStatus.SUCCESS) {
430                 /* No more candidate keys. */
431                 return status;
432             }
433 
434             /* Process the second and following cursors. */
435             for (int i = 1; i < secCursors.length; i += 1) {
436                 secCursor = secCursors[i];
437                 DatabaseEntry secKey = cursorScratchEntries[i];
438                 if (!cursorFetchedFirst[i]) {
439                     status = secCursor.getCurrentInternal(
440                         secKey, scratchEntry, lockMode);
441                     if (readUncommitted &&
442                         status == OperationStatus.KEYEMPTY) {
443                         /* Deleted underneath read-uncommitted; skip it. */
444                         status = secCursor.retrieveNext(secKey, scratchEntry,
445                                                         lockMode,
446                                                         GetMode.NEXT_DUP);
447                         if (status != OperationStatus.SUCCESS) {
448                             /* All keys were deleted; no possible match. */
449                             return status;
450                         }
451                     }
452                     assert status == OperationStatus.SUCCESS;
453                     cursorFetchedFirst[i] = true;
454                 }
455                 scratchEntry.setData(secKey.getData(), secKey.getOffset(),
456                                      secKey.getSize());
457                 status = secCursor.search(scratchEntry, candidateKey, lockMode,
458                                           SearchMode.BOTH);
459                 if (status != OperationStatus.SUCCESS) {
460                     /* No match, get another candidate key. */
461                     continue outerLoop;
462                 }
463             }
464 
465             /* The candidate key was found for all cursors. */
466             if (dataParam != null) {
467                 final Pair<OperationStatus, RecordVersion> result =
468                     secCursors[0].readPrimaryAfterGet(
469                         priDb, firstSecKey, candidateKey, dataParam, lockMode,
470                         readUncommitted, false /*reverseLockOrder*/);
471                 status = result.first();
472                 if (status == OperationStatus.KEYEMPTY) {
473                     /* Deleted underneath read-uncommitted cursor; skip it. */
474                     continue;
475                 }
476                 assert status == OperationStatus.SUCCESS;
477 
478                 /*
479                  * Copy RecordVersion to all secondary cursors. This is only
480                  * possible if they are true secondary cursors, due to the
481                  * rules enforced by CursorImpl for primary/secondary cursors.
482                  */
483                 final RecordVersion version = result.second();
484                 for (int i = 0; i < secCursors.length; i += 1) {
485                     final Cursor c = secCursors[i];
486                     if (c.isSecondaryCursor()) {
487                         c.cursorImpl.setSecondaryCurrentVersion(version);
488                     }
489                 }
490             }
491             keyParam.setData(candidateKey.getData(), candidateKey.getOffset(),
492                              candidateKey.getSize());
493             return OperationStatus.SUCCESS;
494         }
495     }
496 }
497