1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.test; 9 10 import static org.junit.Assert.assertEquals; 11 import static org.junit.Assert.assertNull; 12 import static org.junit.Assert.assertTrue; 13 import static org.junit.Assert.assertNotNull; 14 import static org.junit.Assert.fail; 15 16 import java.io.File; 17 import java.util.ArrayList; 18 import java.util.Collection; 19 import java.util.List; 20 import java.util.Map; 21 import java.util.Random; 22 import java.util.concurrent.ConcurrentHashMap; 23 import java.util.concurrent.ExecutionException; 24 import java.util.concurrent.Executors; 25 import java.util.concurrent.ExecutorService; 26 import java.util.concurrent.Future; 27 import java.util.concurrent.TimeoutException; 28 import java.util.concurrent.TimeUnit; 29 import java.util.concurrent.atomic.AtomicBoolean; 30 import java.util.concurrent.atomic.AtomicInteger; 31 import java.util.concurrent.atomic.AtomicReference; 32 33 import org.junit.After; 34 import org.junit.Before; 35 import org.junit.Test; 36 37 import com.sleepycat.je.Cursor; 38 import com.sleepycat.je.CursorConfig; 39 import com.sleepycat.je.Database; 40 import com.sleepycat.je.DatabaseConfig; 41 import com.sleepycat.je.DatabaseEntry; 42 import com.sleepycat.je.Durability; 43 import com.sleepycat.je.Environment; 44 import com.sleepycat.je.EnvironmentConfig; 45 import com.sleepycat.je.LockMode; 46 import com.sleepycat.je.OperationStatus; 47 import com.sleepycat.je.SecondaryAssociation; 48 import com.sleepycat.je.SecondaryConfig; 49 import com.sleepycat.je.SecondaryCursor; 50 import com.sleepycat.je.SecondaryDatabase; 51 import com.sleepycat.je.SecondaryKeyCreator; 52 import com.sleepycat.util.test.SharedTestUtils; 53 import com.sleepycat.util.test.TestBase; 54 55 /** 56 * Tests SecondaryAssociation with complex associations. 57 * 58 * SecondaryTest tests SecondaryAssociation in the simple case where each 59 * secondary is associated with a single primary. It performs a more exhaustive 60 * API test. 61 * 62 * This test is focused on complex associations and concurrent operations. It 63 * includes: 64 * - Multiple primary DBs per index 65 * - Multiple "tables" per primary DB 66 * - Incremental primary key deletion 67 * 68 * This test is intended to be run either as part of the unit test suite, or as 69 * a longer running stress test when -Dlongtest=true is specified. In the 70 * default mode, it runs in less than one minute but still exercises concurrent 71 * operations to some degree. When -Dlongtest=true is specified, it takes 72 * around 15 minutes. 73 * 74 * For simplicity and speed of execution, this is not a DualTestCase because 75 * SecondaryAssociation-with-HA testing is done by SecondaryTest. TxnTestCase 76 * is also not used to vary txn type; all operations are transactional. 77 * 78 * In this test, a many-many mapping between primaries and secondaries is 79 * implemented as follows: 80 * - Each primary key is 4 bytes long. 81 * - A logical "table" is labeled by a primary key prefix Tn in the first two 82 * bytes of the key: T0, T1, T2, etc. 83 * - The next 2 bytes of the primary key are a randomly generated 84 * discriminator, meaning that there are 64K maximum records per table. 85 * - Primary records for all tables are spread among m primary DBs, and a 86 * primary key is hashed to determine the primary DB ID. 87 * - Each table labeled Tn has n secondaries, e.g., T0 has no secondaries, and 88 * T5 has 4 secondaries. 89 * - The secondaries have integer IDs from 0 to n-1, which are locally unique 90 * for each table. 91 * - Each secondary key is one byte long. It is extracted from the primary 92 * data at index N, where N is the secondary ID. 93 * 94 * It is the application's responsibility to guarantee that a primary or 95 * secondary DB is not accessed after it is closed. This test uses a "clean 96 * cycle" mechanism to ensure that all in-progress operations on a DB are 97 * completed after it is removed from the association, and before it is closed. 98 * A clean cycle is defined as a complete operation based on current 99 * information derived from the association. 100 * 101 * Limitations 102 * =========== 103 * Secondary addition/removal is not tested concurrently with primary 104 * addition/removal, although these combinations should work in principle. 105 */ 106 public class SecondaryAssociationTest extends TestBase { 107 private static final int N_TABLES; 108 private static final int N_PRIMARIES; 109 private static final int N_KEY_DISCRIMINATOR_BYTES = 2; 110 private static final int SLEEP_MS_BETWEEN_PHASES; 111 private static final boolean VERBOSE; 112 113 static { 114 if (SharedTestUtils.runLongTests()) { 115 N_TABLES = 20; 116 N_PRIMARIES = 50; 117 SLEEP_MS_BETWEEN_PHASES = 60 * 1000; 118 VERBOSE = true; 119 } else { 120 N_TABLES = 3; 121 N_PRIMARIES = 20; 122 SLEEP_MS_BETWEEN_PHASES = 1000; 123 VERBOSE = false; 124 } 125 } 126 127 private final Random rnd; 128 private final AtomicBoolean shutdownFlag; 129 private final AtomicReference<Throwable> failureException; 130 private final AtomicInteger nWrites; 131 private final AtomicInteger nInserts; 132 private final AtomicInteger nUpdates; 133 private final AtomicInteger nDeletes; 134 private final MyAssociation assoc; 135 private final File envHome = SharedTestUtils.getTestDir(); 136 private Environment env; 137 private ExecutorService executor; 138 private volatile int removedPriId = -1; 139 private volatile int addedPriId = -1; 140 private volatile Database addedPriDb; 141 private boolean useBatchMethod; 142 SecondaryAssociationTest()143 public SecondaryAssociationTest() { 144 rnd = new Random(123); 145 shutdownFlag = new AtomicBoolean(false); 146 failureException = new AtomicReference<Throwable>(null); 147 nWrites = new AtomicInteger(0); 148 nInserts = new AtomicInteger(0); 149 nUpdates = new AtomicInteger(0); 150 nDeletes = new AtomicInteger(0); 151 assoc = new MyAssociation(); 152 executor = Executors.newCachedThreadPool(); 153 } 154 155 @Before setUp()156 public void setUp() 157 throws Exception { 158 159 super.setUp(); 160 161 final EnvironmentConfig config = new EnvironmentConfig(); 162 config.setAllowCreate(true); 163 config.setTransactional(true); 164 config.setDurability(Durability.COMMIT_NO_SYNC); 165 166 /* Avoid lock timeouts on slow test machines. */ 167 config.setLockTimeout(5, TimeUnit.SECONDS); 168 169 env = new Environment(envHome, config); 170 } 171 172 @After tearDown()173 public void tearDown() 174 throws Exception { 175 176 /* Ensure resources are released for the sake of tests that follow. */ 177 try { 178 if (executor != null) { 179 executor.shutdownNow(); 180 } 181 } finally { 182 executor = null; 183 try { 184 if (env != null) { 185 env.close(); 186 } 187 } finally { 188 env = null; 189 /* Always call superclass method. */ 190 super.tearDown(); 191 } 192 } 193 } 194 195 @Test concurrentTestsWithBatchMethod()196 public void concurrentTestsWithBatchMethod() 197 throws InterruptedException, ExecutionException, TimeoutException { 198 199 useBatchMethod = true; 200 concurrentTests(); 201 } 202 203 @Test concurrentTests()204 public void concurrentTests() 205 throws InterruptedException, ExecutionException, TimeoutException { 206 207 /* Sleep calls are to let writes/verify run between stages. */ 208 createAllTables(); 209 final TaskMonitor writeMonitor = startPrimaryWrites(); 210 final TaskMonitor verifyMonitor = startVerify(); 211 waitForFullPrimaries(); 212 addSecondaries(); 213 Thread.sleep(SLEEP_MS_BETWEEN_PHASES); 214 removeOnePrimary(writeMonitor, verifyMonitor); 215 Thread.sleep(SLEEP_MS_BETWEEN_PHASES); 216 addOnePrimary(writeMonitor, verifyMonitor); 217 Thread.sleep(SLEEP_MS_BETWEEN_PHASES); 218 removeSecondaries(writeMonitor, verifyMonitor); 219 Thread.sleep(SLEEP_MS_BETWEEN_PHASES); 220 writeMonitor.stop(); 221 verifyMonitor.stop(); 222 shutdown(); 223 closeAllTables(); 224 checkFailure(); 225 } 226 createAllTables()227 private void createAllTables() { 228 for (int tableId = 0; tableId < N_TABLES; tableId += 1) { 229 assoc.addTable(tableId); 230 } 231 for (int priId = 0; priId < N_PRIMARIES; priId += 1) { 232 final Database db = openPrimaryDatabase(priId); 233 assoc.addPrimary(priId, db); 234 } 235 } 236 openPrimaryDatabase(final int priId)237 private Database openPrimaryDatabase(final int priId) { 238 final DatabaseConfig dbConfig = new DatabaseConfig(); 239 dbConfig.setTransactional(true); 240 dbConfig.setAllowCreate(true); 241 dbConfig.setExclusiveCreate(true); 242 dbConfig.setSecondaryAssociation(assoc); 243 return env.openDatabase(null, "P" + priId, dbConfig); 244 } 245 closeAllTables()246 private void closeAllTables() { 247 for (final Database db : assoc.getAllPrimaries()) { 248 db.close(); 249 } 250 for (final SecondaryDatabase secDb : assoc.getAllSecondaries()) { 251 secDb.close(); 252 } 253 } 254 addSecondaries()255 private void addSecondaries() { 256 if (VERBOSE) { 257 System.out.println("Start adding secondaries"); 258 } 259 for (int secId = 0; secId < N_TABLES; secId += 1) { 260 /* Add one secondary (at most) to each table. */ 261 final Collection<SecondaryDatabase> dbsAdded = 262 new ArrayList<SecondaryDatabase>(); 263 for (int tableId = 0; tableId < N_TABLES; tableId += 1) { 264 if (secId >= tableId) { 265 continue; 266 } 267 final SecondaryConfig dbConfig = new SecondaryConfig(); 268 dbConfig.setTransactional(true); 269 dbConfig.setAllowCreate(true); 270 dbConfig.setExclusiveCreate(true); 271 dbConfig.setSecondaryAssociation(assoc); 272 dbConfig.setKeyCreator(new MyKeyCreator(secId)); 273 dbConfig.setSortedDuplicates(true); 274 final SecondaryDatabase db = env.openSecondaryDatabase( 275 null, "T" + tableId + "S" + secId, null, dbConfig); 276 /* Enable incremental mode BEFORE adding to association. */ 277 db.startIncrementalPopulation(); 278 assoc.addSecondary(tableId, secId, db); 279 dbsAdded.add(db); 280 checkFailure(); 281 } 282 /* Populate the secondaries we just created. */ 283 for (final Database db : assoc.getAllPrimaries()) { 284 if (useBatchMethod) { 285 final DatabaseEntry keyEntry = new DatabaseEntry(); 286 while (db.populateSecondaries(keyEntry, 100)) { 287 checkFailure(); 288 } 289 } else { 290 final DatabaseEntry keyEntry = new DatabaseEntry(); 291 final DatabaseEntry dataEntry = new DatabaseEntry(); 292 final Cursor cursor = db.openCursor( 293 null, CursorConfig.READ_COMMITTED); 294 while (cursor.getNext(keyEntry, dataEntry, null) == 295 OperationStatus.SUCCESS) { 296 db.populateSecondaries(null, keyEntry, dataEntry); 297 } 298 cursor.close(); 299 } 300 } 301 /* Disable incremental mode now that population is complete. */ 302 for (final SecondaryDatabase db : dbsAdded) { 303 db.endIncrementalPopulation(); 304 } 305 if (VERBOSE) { 306 System.out.format("Added %d secondaries after %,d writes\n", 307 dbsAdded.size(), nWrites.get()); 308 } 309 } 310 if (VERBOSE) { 311 System.out.println("Done adding secondaries"); 312 } 313 } 314 removeSecondaries(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)315 private void removeSecondaries(final TaskMonitor writeMonitor, 316 final TaskMonitor verifyMonitor) 317 throws InterruptedException { 318 319 if (VERBOSE) { 320 System.out.println("Start removing secondaries"); 321 } 322 for (int tableId = 0; tableId < N_TABLES; tableId += 1) { 323 for (int secId = 0; secId < tableId; secId += 1) { 324 /* 1. Remove from association. */ 325 final SecondaryDatabase db = 326 assoc.removeSecondary(tableId, secId); 327 /* 2. Wait for in-progress operations to complete. */ 328 writeMonitor.waitForCleanCycle(); 329 verifyMonitor.waitForCleanCycle(); 330 /* 3. Close/remove database. */ 331 final String dbName = db.getDatabaseName(); 332 db.close(); 333 env.removeDatabase(null, dbName); 334 checkFailure(); 335 } 336 assertEquals(0, assoc.getSecondaries(tableId).size()); 337 } 338 if (VERBOSE) { 339 System.out.println("Done removing secondaries"); 340 } 341 } 342 removeOnePrimary(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)343 private void removeOnePrimary(final TaskMonitor writeMonitor, 344 final TaskMonitor verifyMonitor) 345 throws InterruptedException { 346 347 if (VERBOSE) { 348 System.out.println("Start removing primary"); 349 } 350 351 /* 352 * 1. Remove from association. 353 * 354 * Remove last primary, as it has the most secondaries. removedPriId is 355 * set as an indicator that this DB should not longer be used for 356 * verify/writes. 357 */ 358 removedPriId = N_PRIMARIES - 1; 359 final Database db = assoc.removePrimary(removedPriId); 360 final long recCount = db.count(); 361 362 if (VERBOSE) { 363 System.out.println("Wait for removed primary operations to stop"); 364 } 365 366 /* 2. Wait for in-progress operations to complete. */ 367 writeMonitor.waitForCleanCycle(); 368 verifyMonitor.waitForCleanCycle(); 369 370 if (VERBOSE) { 371 System.out.format("Close and remove primary DB with %,d records\n", 372 recCount); 373 } 374 375 /* 3. Close/remove database. */ 376 final String dbName = db.getDatabaseName(); 377 db.close(); 378 env.removeDatabase(null, dbName); 379 if (VERBOSE) { 380 System.out.println("Delete obsolete primary keys"); 381 } 382 for (final SecondaryDatabase secDb : assoc.getAllSecondaries()) { 383 final DatabaseEntry keyEntry = new DatabaseEntry(); 384 final DatabaseEntry dataEntry = new DatabaseEntry(); 385 while (secDb.deleteObsoletePrimaryKeys(keyEntry, dataEntry, 100)) { 386 checkFailure(); 387 } 388 } 389 if (VERBOSE) { 390 System.out.println("Done removing primary"); 391 } 392 } 393 addOnePrimary(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)394 private void addOnePrimary(final TaskMonitor writeMonitor, 395 final TaskMonitor verifyMonitor) 396 throws InterruptedException { 397 398 if (VERBOSE) { 399 System.out.println("Start adding primary"); 400 } 401 402 assertTrue(removedPriId >= 0); 403 assertTrue(addedPriId < 0); 404 assertNull(addedPriDb); 405 406 addedPriDb = openPrimaryDatabase(addedPriId); 407 addedPriId = removedPriId; 408 409 final int initialWrites = nWrites.get(); 410 while (nWrites.get() - initialWrites < 100000) { 411 Thread.sleep(10); 412 checkFailure(); 413 } 414 415 final long recCount = addedPriDb.count(); 416 417 assoc.addPrimary(addedPriId, addedPriDb); 418 419 removedPriId = -1; 420 addedPriId = -1; 421 addedPriDb = null; 422 423 if (VERBOSE) { 424 System.out.format("Done adding primary, wrote %,d\n", recCount); 425 } 426 } 427 428 /** 429 * Starts two threads to do writes. 430 * 431 * Waits for at least 500 writes before returning, to ensure the next step 432 * is done concurrently with writing. 433 */ 434 private TaskMonitor startPrimaryWrites() 435 throws InterruptedException { 436 437 final AtomicBoolean stopTaskFlag = new AtomicBoolean(false); 438 439 class WriteTask extends Task { 440 private final AtomicInteger cleanCycle; 441 private final String label; 442 443 WriteTask(final AtomicInteger cleanCycle, final String label) { 444 this.cleanCycle = cleanCycle; 445 this.label = label; 446 } 447 448 public void execute() { 449 runPrimaryWrites(stopTaskFlag, cleanCycle, label); 450 } 451 } 452 453 final AtomicInteger cleanCycle1 = new AtomicInteger(0); 454 final AtomicInteger cleanCycle2 = new AtomicInteger(0); 455 final Runnable task1 = new WriteTask(cleanCycle1, "t1"); 456 final Runnable task2 = new WriteTask(cleanCycle2, "t2"); 457 final Future<?> future1 = executor.submit(task1); 458 final Future<?> future2 = executor.submit(task2); 459 460 final int initialWrites = nWrites.get(); 461 while (nWrites.get() - initialWrites < 500) { 462 Thread.sleep(10); 463 checkFailure(); 464 } 465 466 final TaskMonitor taskMonitor = new TaskMonitor(stopTaskFlag); 467 taskMonitor.add(future1, cleanCycle1); 468 taskMonitor.add(future2, cleanCycle2); 469 return taskMonitor; 470 } 471 472 /** 473 * Writes randomly generated primary records until shutdown/stop. 474 * 475 * Since the keyspace is small (64K maximum keys per table), this will 476 * eventually do updates as well as inserts. For 1/5 records, they are 477 * immediately deleted after being written. 478 */ 479 private void runPrimaryWrites(final AtomicBoolean stopTaskFlag, 480 final AtomicInteger cleanCycle, 481 final String label) { 482 /* Key and data are fixed length. */ 483 final byte[] keyBytes = 484 new byte[2 + N_KEY_DISCRIMINATOR_BYTES]; 485 final byte[] dataBytes = new byte[N_TABLES]; 486 final DatabaseEntry keyEntry = new DatabaseEntry(); 487 final DatabaseEntry dataEntry = new DatabaseEntry(); 488 /* First byte of key is fixed. */ 489 keyBytes[0] = 'T'; 490 /* Write until shutdown or stopped. */ 491 while (true) { 492 for (int tableId = 0; tableId < N_TABLES; tableId += 1) { 493 if (shutdownFlag.get() || stopTaskFlag.get()) { 494 return; 495 } 496 /* Second byte of key is table ID. */ 497 keyBytes[1] = (byte) tableId; 498 /* Rest of key is random. */ 499 for (int j = 2; j < keyBytes.length; j += 1) { 500 keyBytes[j] = (byte) rnd.nextInt(256); 501 } 502 /* Insert or update with random data. */ 503 keyEntry.setData(keyBytes); 504 dataEntry.setData(dataBytes); 505 Database priDb = assoc.getPrimary(keyEntry); 506 if (priDb == null) { 507 final int priId = getPrimaryId(keyEntry); 508 if (priId == addedPriId) { 509 priDb = addedPriDb; 510 } else { 511 assertEquals(removedPriId, priId); 512 cleanCycle.incrementAndGet(); 513 continue; 514 } 515 } 516 rnd.nextBytes(dataBytes); 517 if (priDb.putNoOverwrite(null, keyEntry, dataEntry) == 518 OperationStatus.SUCCESS) { 519 nInserts.incrementAndGet(); 520 } else { 521 priDb.put(null, keyEntry, dataEntry); 522 nUpdates.incrementAndGet(); 523 } 524 /* Delete 1/5 records written. */ 525 if (rnd.nextInt(5) == 1) { 526 priDb.delete(null, keyEntry); 527 nDeletes.incrementAndGet(); 528 } 529 nWrites.incrementAndGet(); 530 if (VERBOSE && (nWrites.get() % 100000 == 0)) { 531 printWriteTotals(label); 532 } 533 cleanCycle.incrementAndGet(); 534 } 535 } 536 } 537 538 /** 539 * Waits for updates to be at least 1/5 of all writes, meaning that the 540 * keyspace for the primaries has been populated. 541 */ 542 private void waitForFullPrimaries() 543 throws InterruptedException { 544 545 while (4.0 * nUpdates.get() < nInserts.get()) { 546 Thread.sleep(10); 547 checkFailure(); 548 } 549 if (VERBOSE) { 550 printWriteTotals(""); 551 } 552 } 553 554 /** 555 * Starts one thread to do verification. 556 */ 557 private TaskMonitor startVerify() { 558 559 final AtomicBoolean stopTaskFlag = new AtomicBoolean(false); 560 final AtomicInteger nPriVerified = new AtomicInteger(0); 561 final AtomicInteger nSecVerified = new AtomicInteger(0); 562 final AtomicInteger cleanCycles = new AtomicInteger(0); 563 final Runnable task = new Task() { 564 public void execute() { 565 while (!shutdownFlag.get() && !stopTaskFlag.get()) { 566 runVerify(stopTaskFlag, cleanCycles, 567 nPriVerified, nSecVerified); 568 } 569 } 570 }; 571 572 final Future<?> future = executor.submit(task); 573 final TaskMonitor taskMonitor = new TaskMonitor(stopTaskFlag); 574 taskMonitor.add(future, cleanCycles); 575 return taskMonitor; 576 } 577 578 /** 579 * Checks primary-secondary linkages/integrity, namely that a primary 580 * record contains secondary keys matching the records present in the 581 * secondary databases. 582 */ 583 private void runVerify(final AtomicBoolean stopTaskFlag, 584 final AtomicInteger cleanCycles, 585 final AtomicInteger nPriVerified, 586 final AtomicInteger nSecVerified) { 587 final DatabaseEntry keyEntry = new DatabaseEntry(); 588 final DatabaseEntry dataEntry = new DatabaseEntry(); 589 final DatabaseEntry secKeyEntry = new DatabaseEntry(); 590 final DatabaseEntry noReturnData = new DatabaseEntry(); 591 noReturnData.setPartial(0, 0, true); 592 593 for (int priId = 0; priId < N_PRIMARIES; priId += 1) { 594 final Database db = assoc.getPrimary(priId); 595 if (db == null) { 596 assertEquals(removedPriId, priId); 597 continue; 598 } 599 final Cursor c = db.openCursor(null, CursorConfig.READ_COMMITTED); 600 try { 601 while (c.getNext(keyEntry, dataEntry, null) == 602 OperationStatus.SUCCESS) { 603 if (assoc.getPrimary(priId) == null) { 604 break; 605 } 606 final int tableId = keyEntry.getData()[1]; 607 final byte[] dataBytes = dataEntry.getData(); 608 for (int secId = 0; secId < tableId; secId += 1) { 609 if (shutdownFlag.get() || stopTaskFlag.get()) { 610 return; 611 } 612 final SecondaryDatabase secDb = 613 assoc.getSecondary(tableId, secId); 614 if (secDb == null || 615 secDb.isIncrementalPopulationEnabled()) { 616 continue; 617 } 618 secKeyEntry.setData(new byte[] {dataBytes[secId]}); 619 final OperationStatus status = secDb.getSearchBoth( 620 null, secKeyEntry, keyEntry, noReturnData, 621 LockMode.READ_UNCOMMITTED); 622 if (OperationStatus.SUCCESS != status) { 623 if (assoc.getPrimary(priId) == null) { 624 break; 625 } 626 fail("Sec key missing " + status + ' ' + 627 secDb.getDatabaseName() + ' ' + priId + ' ' + 628 secKeyEntry + ' ' + keyEntry); 629 } 630 } 631 nPriVerified.incrementAndGet(); 632 if (VERBOSE && nPriVerified.get() % 500000 == 0) { 633 System.out.format("nPriVerified %,d\n", 634 nPriVerified.get()); 635 } 636 } 637 } finally { 638 c.close(); 639 } 640 cleanCycles.incrementAndGet(); 641 } 642 643 /* 644 * TODO: Perform with normal locking rather than dirty-read, once the 645 * deadlock-free secondary feature is implemented. 646 */ 647 for (int tableId = 0; tableId < N_TABLES; tableId += 1) { 648 for (int secId = 0; secId < tableId; secId += 1) { 649 final SecondaryDatabase secDb = 650 assoc.getSecondary(tableId, secId); 651 if (secDb == null || 652 secDb.isIncrementalPopulationEnabled()) { 653 continue; 654 } 655 final SecondaryCursor c = 656 secDb.openCursor(null, CursorConfig.READ_UNCOMMITTED); 657 try { 658 while (c.getNext(secKeyEntry, keyEntry, dataEntry, null) == 659 OperationStatus.SUCCESS) { 660 if (shutdownFlag.get() || stopTaskFlag.get()) { 661 return; 662 } 663 assertEquals(tableId, keyEntry.getData()[1]); 664 assertEquals(dataEntry.getData()[secId], 665 secKeyEntry.getData()[0]); 666 nSecVerified.incrementAndGet(); 667 if (VERBOSE && nSecVerified.get() % 500000 == 0) { 668 System.out.format("nSecVerified %,d\n", 669 nSecVerified.get()); 670 } 671 } 672 } finally { 673 c.close(); 674 } 675 cleanCycles.incrementAndGet(); 676 } 677 } 678 } 679 680 private class TaskMonitor { 681 private final AtomicBoolean stopFlag; 682 private final List<Future<?>> futures; 683 private final List<AtomicInteger> cleanCycles; 684 685 TaskMonitor(final AtomicBoolean stopFlag) { 686 this.stopFlag = stopFlag; 687 futures = new ArrayList<Future<?>>(); 688 cleanCycles = new ArrayList<AtomicInteger>(); 689 } 690 691 void add(final Future<?> future, final AtomicInteger cleanCycle) { 692 futures.add(future); 693 cleanCycles.add(cleanCycle); 694 } 695 696 void waitForCleanCycle() 697 throws InterruptedException { 698 699 final int[] prevCleanCycles = new int[cleanCycles.size()]; 700 for (int i = 0; i < prevCleanCycles.length; i += 1) { 701 prevCleanCycles[i] = cleanCycles.get(i).get(); 702 } 703 while (true) { 704 boolean allDone = true; 705 for (int i = 0; i < prevCleanCycles.length; i += 1) { 706 if (prevCleanCycles[i] >= cleanCycles.get(i).get()) { 707 allDone = false; 708 break; 709 } 710 } 711 if (allDone) { 712 break; 713 } 714 Thread.sleep(10); 715 checkFailure(); 716 } 717 } 718 719 void stop() 720 throws InterruptedException, ExecutionException, TimeoutException { 721 722 stopFlag.set(true); 723 for (final Future<?> future : futures) { 724 future.get(10, TimeUnit.SECONDS); 725 } 726 } 727 } 728 729 /** 730 * Saves first exception encountered, which also serves as a failure 731 * indicator -- non-null means failure. 732 */ 733 private void noteFailure(Throwable t) { 734 735 t.printStackTrace(System.out); 736 failureException.compareAndSet(null, t); 737 } 738 739 /** 740 * If an exception caused a failure, throw it so it appears as the cause of 741 * the JUnit test failure. This method is meant to be called from the 742 * main thread, i.e., the one running the JUnit test. 743 */ 744 private void checkFailure() { 745 final Throwable t = failureException.get(); 746 if (t == null) { 747 return; 748 } 749 throw new IllegalStateException( 750 "See cause exception. Other exceptions in output may also be " + 751 "related.", t); 752 } 753 754 /** 755 * A Runnable that calls an execute() method, that is implemented by the 756 * caller, and handles exceptions. 757 */ 758 private abstract class Task implements Runnable { 759 760 public void run() { 761 try { 762 execute(); 763 } catch (Throwable t) { 764 noteFailure(t); 765 } 766 } 767 768 abstract void execute() throws Throwable; 769 } 770 771 private void shutdown() 772 throws InterruptedException { 773 774 shutdownFlag.set(true); 775 executor.shutdown(); 776 if (!executor.awaitTermination(20, TimeUnit.SECONDS)) { 777 executor.shutdownNow(); 778 throw new IllegalStateException( 779 "Could not terminate executor normally"); 780 } 781 if (VERBOSE) { 782 printWriteTotals("final"); 783 } 784 checkFailure(); 785 } 786 787 private void printWriteTotals(final String label) { 788 System.out.format( 789 "%s nWrites %,d nInserts %,d, nUpdates %,d nDeletes %,d\n", label, 790 nWrites.get(), nInserts.get(), nUpdates.get(), nDeletes.get()); 791 } 792 793 /** 794 * Performs a simplistic (not very evenly distributed) hash of the primary 795 * key to get a primary DB ID between zero and (N_PRIMARIES - 1). For 796 * this to work best, the primary key should contain some randomly 797 * generated values. 798 */ 799 private static int getPrimaryId(final DatabaseEntry primaryKey) { 800 int sum = 0; 801 final byte[] data = primaryKey.getData(); 802 for (int i = 0; i < data.length; i += 1) { 803 sum += data[i]; 804 } 805 return Math.abs(sum % N_PRIMARIES); 806 } 807 808 /** 809 * Creates a secondary key from the Nth byte of the primary data, where 810 * N is the secondary ID passed to the constructor. 811 * 812 * TODO replace with new SecondaryKeyExtractor when available. 813 */ 814 private static class MyKeyCreator implements SecondaryKeyCreator { 815 private final int secId; 816 817 MyKeyCreator(int secId) { 818 this.secId = secId; 819 } 820 821 public boolean createSecondaryKey(SecondaryDatabase secondary, 822 DatabaseEntry key, 823 DatabaseEntry data, 824 DatabaseEntry result) { 825 result.setData(new byte[] {data.getData()[secId]}); 826 return true; 827 } 828 } 829 830 /** 831 * This class implements a SecondaryAssociation in a semi-realistic manner, 832 * simulating an app that maintains associations per logical table. 833 * 834 * However, in a real app, it is expected that the association metadata 835 * would be maintained separately and accessed in a read-only manner via 836 * this class. In other words, this class might not contain methods for 837 * adding and removing members in the association. 838 * 839 * Non-blocking data structures are used to hold association info, to avoid 840 * blocking on the methods in SecondaryAssociation, which are frequently 841 * called by many threads. 842 */ 843 private static class MyAssociation implements SecondaryAssociation { 844 845 /* Maps a primary DB ID to the primary DB. */ 846 private final Map<Integer, Database> primaries = 847 new ConcurrentHashMap<Integer, Database>(); 848 849 /* Maps a table ID to its associated secondaries. */ 850 private final Map<Integer, Map<Integer, SecondaryDatabase>> tables = 851 new ConcurrentHashMap<Integer, Map<Integer, SecondaryDatabase>>(); 852 853 /* Cheap-to-read indicator that any secondary DBs are present. */ 854 private final AtomicInteger nSecondaries = new AtomicInteger(0); 855 856 public boolean isEmpty() { 857 return (nSecondaries.get() == 0); 858 } 859 860 public Database getPrimary(final DatabaseEntry primaryKey) { 861 final int priId = getPrimaryId(primaryKey); 862 return getPrimary(priId); 863 } 864 865 public Collection<SecondaryDatabase> getSecondaries( 866 final DatabaseEntry primaryKey) { 867 868 final int tableId = primaryKey.getData()[1]; 869 return getSecondaries(tableId); 870 } 871 872 Collection<SecondaryDatabase> getSecondaries(final int tableId) { 873 final Map<Integer, SecondaryDatabase> secondaries = 874 tables.get(tableId); 875 assertNotNull(secondaries); 876 return secondaries.values(); 877 } 878 879 Database getPrimary(final int priId) { 880 assertTrue(String.valueOf(priId), priId >= 0); 881 assertTrue(String.valueOf(priId), priId < N_PRIMARIES); 882 return primaries.get(priId); 883 } 884 885 void addPrimary(final int priId, final Database priDb) { 886 final Object oldVal = primaries.put(priId, priDb); 887 assertNull(oldVal); 888 } 889 890 Database removePrimary(final int priId) { 891 final Database db = primaries.remove(priId); 892 assertNotNull(db); 893 return db; 894 } 895 896 void addTable(final int tableId) { 897 final Map<Integer, SecondaryDatabase> secondaries = 898 new ConcurrentHashMap<Integer, SecondaryDatabase>(); 899 final Object oldVal = tables.put(tableId, secondaries); 900 assertNull(oldVal); 901 } 902 903 SecondaryDatabase getSecondary(final int tableId, final int secId) { 904 final Map<Integer, SecondaryDatabase> secondaries = 905 tables.get(tableId); 906 assertNotNull(secondaries); 907 final SecondaryDatabase secDb = secondaries.get(secId); 908 return secDb; 909 } 910 911 void addSecondary(final int tableId, 912 final int secId, 913 final SecondaryDatabase secDb) { 914 final Map<Integer, SecondaryDatabase> secondaries = 915 tables.get(tableId); 916 assertNotNull(secondaries); 917 final Object oldVal = secondaries.put(secId, secDb); 918 assertNull(oldVal); 919 nSecondaries.incrementAndGet(); 920 } 921 922 SecondaryDatabase removeSecondary(final int tableId, final int secId) { 923 final Map<Integer, SecondaryDatabase> secondaries = 924 tables.get(tableId); 925 assertNotNull(secondaries); 926 final SecondaryDatabase secDb = secondaries.remove(secId); 927 assertNotNull(secDb); 928 nSecondaries.decrementAndGet(); 929 return secDb; 930 } 931 932 Collection<Database> getAllPrimaries() { 933 return primaries.values(); 934 } 935 936 Collection<SecondaryDatabase> getAllSecondaries() { 937 final Collection<SecondaryDatabase> dbs = 938 new ArrayList<SecondaryDatabase>(); 939 for (final Map<Integer, SecondaryDatabase> secondaries : 940 tables.values()) { 941 dbs.addAll(secondaries.values()); 942 } 943 return dbs; 944 } 945 } 946 } 947