1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je; 9 10 import java.io.Closeable; 11 import java.util.Arrays; 12 import java.util.Comparator; 13 import java.util.logging.Level; 14 15 import com.sleepycat.je.dbi.GetMode; 16 import com.sleepycat.je.dbi.CursorImpl.SearchMode; 17 import com.sleepycat.je.dbi.RecordVersion; 18 import com.sleepycat.je.txn.Locker; 19 import com.sleepycat.je.utilint.DatabaseUtil; 20 import com.sleepycat.je.utilint.Pair; 21 22 /** 23 * A specialized join cursor for use in performing equality or natural joins on 24 * secondary indices. 25 * 26 * <p>A join cursor is returned when calling {@link Database#join 27 * Database.join}.</p> 28 * 29 * <p>To open a join cursor using two secondary cursors:</p> 30 * 31 * <pre> 32 * Transaction txn = ... 33 * Database primaryDb = ... 34 * SecondaryDatabase secondaryDb1 = ... 35 * SecondaryDatabase secondaryDb2 = ... 36 * <p> 37 * SecondaryCursor cursor1 = null; 38 * SecondaryCursor cursor2 = null; 39 * JoinCursor joinCursor = null; 40 * try { 41 * DatabaseEntry key = new DatabaseEntry(); 42 * DatabaseEntry data = new DatabaseEntry(); 43 * <p> 44 * cursor1 = secondaryDb1.openSecondaryCursor(txn, null); 45 * cursor2 = secondaryDb2.openSecondaryCursor(txn, null); 46 * <p> 47 * key.setData(...); // initialize key for secondary index 1 48 * OperationStatus status1 = 49 * cursor1.getSearchKey(key, data, LockMode.DEFAULT); 50 * key.setData(...); // initialize key for secondary index 2 51 * OperationStatus status2 = 52 * cursor2.getSearchKey(key, data, LockMode.DEFAULT); 53 * <p> 54 * if (status1 == OperationStatus.SUCCESS && 55 * status2 == OperationStatus.SUCCESS) { 56 * <p> 57 * SecondaryCursor[] cursors = {cursor1, cursor2}; 58 * joinCursor = primaryDb.join(cursors, null); 59 * <p> 60 * while (true) { 61 * OperationStatus joinStatus = joinCursor.getNext(key, data, 62 * LockMode.DEFAULT); 63 * if (joinStatus == OperationStatus.SUCCESS) { 64 * // Do something with the key and data. 65 * } else { 66 * break; 67 * } 68 * } 69 * } 70 * } finally { 71 * if (cursor1 != null) { 72 * cursor1.close(); 73 * } 74 * if (cursor2 != null) { 75 * cursor2.close(); 76 * } 77 * if (joinCursor != null) { 78 * joinCursor.close(); 79 * } 80 * } 81 * </pre> 82 * 83 * <p>The join algorithm is described here so that its cost can be estimated and 84 * compared to other approaches for performing a query. Say that N cursors are 85 * provided for the join operation. According to the order they appear in the 86 * array the cursors are labeled C(1) through C(n), and the keys at each cursor 87 * position are labeled K(1) through K(n).</p> 88 * 89 * <ol> 90 * 91 * <li>Using C(1), the join algorithm iterates sequentially through all records 92 * having K(1). This iteration is equivalent to a {@link Cursor#getNextDup 93 * Cursor.getNextDup} operation on the secondary index. The primary key of a 94 * candidate record is determined in this manner. The primary record itself is 95 * not retrieved and the primary database is not accessed.</li> 96 * 97 * <li>For each candidate primary key found in step 1, a Btree lookup is 98 * performed using C(2) through C(n), in that order. The Btree lookups are 99 * exact searches to determine whether the candidate record also contains 100 * secondary keys K(2) through K(n). The lookups are equivalent to a {@link 101 * Cursor#getSearchBoth Cursor.getSearchBoth} operation on the secondary index. 102 * The primary record itself is not retrieved and the primary database is not 103 * accessed.</li> 104 * 105 * <li>If any lookup in step 2 fails, the algorithm advances to the next 106 * candidate record using C(1). Lookups are performed in the order of the 107 * cursor array, and the algorithm proceeds to the next C(1) candidate key as 108 * soon as a single lookup fails.</li> 109 * 110 * <li>If all lookups in step 2 succeed, then the matching key and/or data is 111 * returned by the {@code getNext} method. If the {@link 112 * #getNext(DatabaseEntry,DatabaseEntry,LockMode)} method signature is used, 113 * then the primary database is read to obtain the record data, as if {@link 114 * Cursor#getSearchKey Cursor.getSearchKey} were called for the primary 115 * database. If the {@link #getNext(DatabaseEntry,LockMode)} method signature 116 * is used, then only the primary key is returned and the primary database is 117 * not accessed.</li> 118 * 119 * <li>The algorithm ends when C(1) has no more candidate records with K(1), 120 * and the {@code getNext} method will then return {@link 121 * com.sleepycat.je.OperationStatus#NOTFOUND OperationStatus.NOTFOUND}.</li> 122 * 123 * </ol> 124 */ 125 public class JoinCursor implements Closeable { 126 127 private JoinConfig config; 128 private Database priDb; 129 private Cursor[] secCursors; 130 private DatabaseEntry[] cursorScratchEntries; 131 private DatabaseEntry scratchEntry; 132 private DatabaseEntry firstSecKey; 133 private boolean[] cursorFetchedFirst; 134 135 /** 136 * Creates a join cursor without parameter checking. 137 */ JoinCursor(Locker locker, Database primaryDb, final Cursor[] cursors, JoinConfig configParam)138 JoinCursor(Locker locker, 139 Database primaryDb, 140 final Cursor[] cursors, 141 JoinConfig configParam) 142 throws DatabaseException { 143 144 priDb = primaryDb; 145 config = (configParam != null) ? configParam.clone() 146 : JoinConfig.DEFAULT; 147 scratchEntry = new DatabaseEntry(); 148 firstSecKey = new DatabaseEntry(); 149 cursorScratchEntries = new DatabaseEntry[cursors.length]; 150 for (int i = 0; i < cursors.length; i += 1) { 151 cursorScratchEntries[i] = new DatabaseEntry(); 152 } 153 cursorFetchedFirst = new boolean[cursors.length]; 154 Cursor[] sortedCursors = new Cursor[cursors.length]; 155 System.arraycopy(cursors, 0, sortedCursors, 0, cursors.length); 156 157 if (!config.getNoSort()) { 158 159 /* 160 * Sort ascending by duplicate count. Collect counts before 161 * sorting so that countEstimate is called only once per cursor. 162 */ 163 final long[] counts = new long[cursors.length]; 164 for (int i = 0; i < cursors.length; i += 1) { 165 counts[i] = cursors[i].countEstimateInternal(); 166 assert counts[i] >= 0; 167 } 168 Arrays.sort(sortedCursors, new Comparator<Cursor>() { 169 public int compare(Cursor o1, Cursor o2) { 170 long count1 = -1; 171 long count2 = -1; 172 173 /* 174 * Scan for objects in cursors not sortedCursors since 175 * sortedCursors is being sorted in place. 176 */ 177 for (int i = 0; i < cursors.length && 178 (count1 < 0 || count2 < 0); i += 1) { 179 if (cursors[i] == o1) { 180 count1 = counts[i]; 181 } else if (cursors[i] == o2) { 182 count2 = counts[i]; 183 } 184 } 185 assert count1 >= 0 && count2 >= 0; 186 long cmp = count1 - count2; 187 return (cmp < 0) ? (-1) : ((cmp > 0) ? 1 : 0); 188 } 189 }); 190 } 191 192 /* 193 * Dup cursors last. If an error occurs before the constructor is 194 * complete, close them and ignore exceptions during close. 195 */ 196 try { 197 secCursors = new Cursor[cursors.length]; 198 for (int i = 0; i < cursors.length; i += 1) { 199 secCursors[i] = sortedCursors[i].dup(true); 200 } 201 } catch (DatabaseException e) { 202 close(e); /* will throw e */ 203 } 204 } 205 206 /** 207 * Closes the cursors that have been opened by this join cursor. 208 * 209 * <p>The cursors passed to {@link Database#join Database.join} are not 210 * closed by this method, and should be closed by the caller.</p> 211 * 212 * <p>WARNING: To guard against memory leaks, the application should 213 * discard all references to the closed handle. While BDB makes an effort 214 * to discard references from closed objects to the allocated memory for an 215 * environment, this behavior is not guaranteed. The safe course of action 216 * for an application is to discard all references to closed BDB 217 * objects.</p> 218 * 219 * @throws EnvironmentFailureException if an unexpected, internal or 220 * environment-wide failure occurs. 221 */ close()222 public void close() 223 throws DatabaseException { 224 225 if (priDb == null) { 226 return; 227 } 228 close(null); 229 } 230 231 /** 232 * Close all cursors we own, throwing only the first exception that occurs. 233 * 234 * @param firstException an exception that has already occured, or null. 235 */ close(DatabaseException firstException)236 private void close(DatabaseException firstException) 237 throws DatabaseException { 238 239 priDb = null; 240 for (int i = 0; i < secCursors.length; i += 1) { 241 if (secCursors[i] != null) { 242 try { 243 secCursors[i].close(); 244 } catch (DatabaseException e) { 245 if (firstException == null) { 246 firstException = e; 247 } 248 } 249 secCursors[i] = null; 250 } 251 } 252 if (firstException != null) { 253 throw firstException; 254 } 255 } 256 257 /** 258 * For unit testing. 259 */ getSortedCursors()260 Cursor[] getSortedCursors() { 261 return secCursors; 262 } 263 264 /** 265 * Returns the primary database handle associated with this cursor. 266 * 267 * @return the primary database handle associated with this cursor. 268 */ getDatabase()269 public Database getDatabase() { 270 271 return priDb; 272 } 273 274 /** 275 * Returns this object's configuration. 276 * 277 * @return this object's configuration. 278 */ getConfig()279 public JoinConfig getConfig() { 280 281 return config.clone(); 282 } 283 284 /** 285 * Returns the next primary key resulting from the join operation. 286 * 287 * <p>An entry is returned by the join cursor for each primary key/data 288 * pair having all secondary key values that were specified using the array 289 * of secondary cursors passed to {@link Database#join Database.join}.</p> 290 * 291 * <p>In a replicated environment, an explicit transaction must have been 292 * specified when opening each cursor, unless read-uncommitted isolation is 293 * specified via the {@code CursorConfig} or {@code LockMode} 294 * parameters.</p> 295 * 296 * @param key the primary key returned as output. Its byte array does not 297 * need to be initialized by the caller. 298 * 299 * @return {@link com.sleepycat.je.OperationStatus#NOTFOUND 300 * OperationStatus.NOTFOUND} if no matching key/data pair is found; 301 * otherwise, {@link com.sleepycat.je.OperationStatus#SUCCESS 302 * OperationStatus.SUCCESS}. 303 * 304 * @param lockMode the locking attributes; if null, default attributes 305 * are used. 306 * 307 * @throws OperationFailureException if one of the <a 308 * href="OperationFailureException.html#readFailures">Read Operation 309 * Failures</a> occurs. 310 * 311 * @throws EnvironmentFailureException if an unexpected, internal or 312 * environment-wide failure occurs. 313 * 314 * @throws IllegalArgumentException if an invalid parameter is specified, 315 * for example, if a DatabaseEntry parameter is null or does not contain a 316 * required non-null byte array. 317 */ getNext(DatabaseEntry key, LockMode lockMode)318 public OperationStatus getNext(DatabaseEntry key, 319 LockMode lockMode) 320 throws DatabaseException { 321 322 secCursors[0].checkEnv(); 323 DatabaseUtil.checkForNullDbt(key, "key", false); 324 secCursors[0].trace(Level.FINEST, "JoinCursor.getNext(key): ", 325 lockMode); 326 327 return retrieveNext(key, null, lockMode); 328 } 329 330 /** 331 * Returns the next primary key and data resulting from the join operation. 332 * 333 * <p>An entry is returned by the join cursor for each primary key/data 334 * pair having all secondary key values that were specified using the array 335 * of secondary cursors passed to {@link Database#join Database.join}.</p> 336 * 337 * <p>In a replicated environment, an explicit transaction must have been 338 * specified when opening each cursor, unless read-uncommitted isolation is 339 * specified via the {@code CursorConfig} or {@code LockMode} 340 * parameters.</p> 341 * 342 * @param key the primary key returned as output. Its byte array does not 343 * need to be initialized by the caller. 344 * 345 * @param data the primary data returned as output. Its byte array does 346 * not need to be initialized by the caller. 347 * 348 * @return {@link com.sleepycat.je.OperationStatus#NOTFOUND 349 * OperationStatus.NOTFOUND} if no matching key/data pair is found; 350 * otherwise, {@link com.sleepycat.je.OperationStatus#SUCCESS 351 * OperationStatus.SUCCESS}. 352 * 353 * @param lockMode the locking attributes; if null, default attributes 354 * are used. 355 * 356 * @throws OperationFailureException if one of the <a 357 * href="OperationFailureException.html#readFailures">Read Operation 358 * Failures</a> occurs. 359 * 360 * @throws EnvironmentFailureException if an unexpected, internal or 361 * environment-wide failure occurs. 362 * 363 * @throws IllegalArgumentException if an invalid parameter is specified, 364 * for example, if a DatabaseEntry parameter is null or does not contain a 365 * required non-null byte array. 366 */ getNext(DatabaseEntry key, DatabaseEntry data, LockMode lockMode)367 public OperationStatus getNext(DatabaseEntry key, 368 DatabaseEntry data, 369 LockMode lockMode) 370 throws DatabaseException { 371 372 secCursors[0].checkEnv(); 373 DatabaseUtil.checkForNullDbt(key, "key", false); 374 DatabaseUtil.checkForNullDbt(data, "data", false); 375 secCursors[0].trace(Level.FINEST, "JoinCursor.getNext(key,data): ", 376 lockMode); 377 378 return retrieveNext(key, data, lockMode); 379 } 380 381 /** 382 * Internal version of getNext(), with an optional data param. 383 * <p> 384 * Since duplicates are always sorted and duplicate-duplicates are not 385 * allowed, a natural join can be implemented by simply traversing through 386 * the duplicates of the first cursor to find candidate keys, and then 387 * looking for each candidate key in the duplicate set of the other 388 * cursors, without ever reseting a cursor to the beginning of the 389 * duplicate set. 390 * <p> 391 * This only works when the same duplicate comparison method is used for 392 * all cursors. We don't check for that, we just assume the user won't 393 * violate that rule. 394 * <p> 395 * A future optimization would be to add a SearchMode.BOTH_DUPS operation 396 * and use it instead of using SearchMode.BOTH. This would be the 397 * equivalent of the undocumented DB_GET_BOTHC operation used by DB core's 398 * join() implementation. 399 */ retrieveNext(DatabaseEntry keyParam, DatabaseEntry dataParam, LockMode lockMode)400 private OperationStatus retrieveNext(DatabaseEntry keyParam, 401 DatabaseEntry dataParam, 402 LockMode lockMode) 403 throws DatabaseException { 404 405 boolean readUncommitted = 406 secCursors[0].isReadUncommittedMode(lockMode); 407 408 outerLoop: while (true) { 409 410 /* Process the first cursor to get a candidate key. */ 411 Cursor secCursor = secCursors[0]; 412 DatabaseEntry candidateKey = cursorScratchEntries[0]; 413 OperationStatus status; 414 if (!cursorFetchedFirst[0]) { 415 /* Get first duplicate at initial cursor position. */ 416 status = secCursor.getCurrentInternal( 417 firstSecKey, candidateKey, lockMode); 418 if (readUncommitted && status == OperationStatus.KEYEMPTY) { 419 /* Deleted underneath read-uncommitted cursor; skip it. */ 420 cursorFetchedFirst[0] = true; 421 continue; 422 } 423 cursorFetchedFirst[0] = true; 424 } else { 425 /* Already initialized, move to the next candidate key. */ 426 status = secCursor.retrieveNext(firstSecKey, candidateKey, 427 lockMode, GetMode.NEXT_DUP); 428 } 429 if (status != OperationStatus.SUCCESS) { 430 /* No more candidate keys. */ 431 return status; 432 } 433 434 /* Process the second and following cursors. */ 435 for (int i = 1; i < secCursors.length; i += 1) { 436 secCursor = secCursors[i]; 437 DatabaseEntry secKey = cursorScratchEntries[i]; 438 if (!cursorFetchedFirst[i]) { 439 status = secCursor.getCurrentInternal( 440 secKey, scratchEntry, lockMode); 441 if (readUncommitted && 442 status == OperationStatus.KEYEMPTY) { 443 /* Deleted underneath read-uncommitted; skip it. */ 444 status = secCursor.retrieveNext(secKey, scratchEntry, 445 lockMode, 446 GetMode.NEXT_DUP); 447 if (status != OperationStatus.SUCCESS) { 448 /* All keys were deleted; no possible match. */ 449 return status; 450 } 451 } 452 assert status == OperationStatus.SUCCESS; 453 cursorFetchedFirst[i] = true; 454 } 455 scratchEntry.setData(secKey.getData(), secKey.getOffset(), 456 secKey.getSize()); 457 status = secCursor.search(scratchEntry, candidateKey, lockMode, 458 SearchMode.BOTH); 459 if (status != OperationStatus.SUCCESS) { 460 /* No match, get another candidate key. */ 461 continue outerLoop; 462 } 463 } 464 465 /* The candidate key was found for all cursors. */ 466 if (dataParam != null) { 467 final Pair<OperationStatus, RecordVersion> result = 468 secCursors[0].readPrimaryAfterGet( 469 priDb, firstSecKey, candidateKey, dataParam, lockMode, 470 readUncommitted, false /*reverseLockOrder*/); 471 status = result.first(); 472 if (status == OperationStatus.KEYEMPTY) { 473 /* Deleted underneath read-uncommitted cursor; skip it. */ 474 continue; 475 } 476 assert status == OperationStatus.SUCCESS; 477 478 /* 479 * Copy RecordVersion to all secondary cursors. This is only 480 * possible if they are true secondary cursors, due to the 481 * rules enforced by CursorImpl for primary/secondary cursors. 482 */ 483 final RecordVersion version = result.second(); 484 for (int i = 0; i < secCursors.length; i += 1) { 485 final Cursor c = secCursors[i]; 486 if (c.isSecondaryCursor()) { 487 c.cursorImpl.setSecondaryCurrentVersion(version); 488 } 489 } 490 } 491 keyParam.setData(candidateKey.getData(), candidateKey.getOffset(), 492 candidateKey.getSize()); 493 return OperationStatus.SUCCESS; 494 } 495 } 496 } 497