1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.test;
9 
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.fail;
15 
16 import java.io.File;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Random;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicReference;
32 
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 
37 import com.sleepycat.je.Cursor;
38 import com.sleepycat.je.CursorConfig;
39 import com.sleepycat.je.Database;
40 import com.sleepycat.je.DatabaseConfig;
41 import com.sleepycat.je.DatabaseEntry;
42 import com.sleepycat.je.Durability;
43 import com.sleepycat.je.Environment;
44 import com.sleepycat.je.EnvironmentConfig;
45 import com.sleepycat.je.LockMode;
46 import com.sleepycat.je.OperationStatus;
47 import com.sleepycat.je.SecondaryAssociation;
48 import com.sleepycat.je.SecondaryConfig;
49 import com.sleepycat.je.SecondaryCursor;
50 import com.sleepycat.je.SecondaryDatabase;
51 import com.sleepycat.je.SecondaryKeyCreator;
52 import com.sleepycat.util.test.SharedTestUtils;
53 import com.sleepycat.util.test.TestBase;
54 
55 /**
56  * Tests SecondaryAssociation with complex associations.
57  *
58  * SecondaryTest tests SecondaryAssociation in the simple case where each
59  * secondary is associated with a single primary. It performs a more exhaustive
60  * API test.
61  *
62  * This test is focused on complex associations and concurrent operations.  It
63  * includes:
64  * - Multiple primary DBs per index
65  * - Multiple "tables" per primary DB
66  * - Incremental primary key deletion
67  *
68  * This test is intended to be run either as part of the unit test suite, or as
69  * a longer running stress test when -Dlongtest=true is specified. In the
70  * default mode, it runs in less than one minute but still exercises concurrent
71  * operations to some degree.  When -Dlongtest=true is specified, it takes
72  * around 15 minutes.
73  *
74  * For simplicity and speed of execution, this is not a DualTestCase because
75  * SecondaryAssociation-with-HA testing is done by SecondaryTest.  TxnTestCase
76  * is also not used to vary txn type; all operations are transactional.
77  *
78  * In this test, a many-many mapping between primaries and secondaries is
79  * implemented as follows:
80  * - Each primary key is 4 bytes long.
81  * - A logical "table" is labeled by a primary key prefix Tn in the first two
82  *   bytes of the key:  T0, T1, T2, etc.
83  * - The next 2 bytes of the primary key are a randomly generated
84  *   discriminator,  meaning that there are 64K maximum records per table.
85  * - Primary records for all tables are spread among m primary DBs, and a
86  *   primary key is hashed to determine the primary DB ID.
87  * - Each table labeled Tn has n secondaries, e.g., T0 has no secondaries, and
88  *   T5 has 4 secondaries.
89  * - The secondaries have integer IDs from 0 to n-1, which are locally unique
90  *   for each table.
91  * - Each secondary key is one byte long.  It is extracted from the primary
92  *   data at index N, where N is the secondary ID.
93  *
94  * It is the application's responsibility to guarantee that a primary or
95  * secondary DB is not accessed after it is closed.  This test uses a "clean
96  * cycle" mechanism to ensure that all in-progress operations on a DB are
97  * completed after it is removed from the association, and before it is closed.
98  * A clean cycle is defined as a complete operation based on current
99  * information derived from the association.
100  *
101  * Limitations
102  * ===========
103  * Secondary addition/removal is not tested concurrently with primary
104  * addition/removal, although these combinations should work in principle.
105  */
106 public class SecondaryAssociationTest extends TestBase {
107     private static final int N_TABLES;
108     private static final int N_PRIMARIES;
109     private static final int N_KEY_DISCRIMINATOR_BYTES = 2;
110     private static final int SLEEP_MS_BETWEEN_PHASES;
111     private static final boolean VERBOSE;
112 
113     static {
114         if (SharedTestUtils.runLongTests()) {
115             N_TABLES = 20;
116             N_PRIMARIES = 50;
117             SLEEP_MS_BETWEEN_PHASES = 60 * 1000;
118             VERBOSE = true;
119         } else {
120             N_TABLES = 3;
121             N_PRIMARIES = 20;
122             SLEEP_MS_BETWEEN_PHASES = 1000;
123             VERBOSE = false;
124         }
125     }
126 
127     private final Random rnd;
128     private final AtomicBoolean shutdownFlag;
129     private final AtomicReference<Throwable> failureException;
130     private final AtomicInteger nWrites;
131     private final AtomicInteger nInserts;
132     private final AtomicInteger nUpdates;
133     private final AtomicInteger nDeletes;
134     private final MyAssociation assoc;
135     private final File envHome = SharedTestUtils.getTestDir();
136     private Environment env;
137     private ExecutorService executor;
138     private volatile int removedPriId = -1;
139     private volatile int addedPriId = -1;
140     private volatile Database addedPriDb;
141     private boolean useBatchMethod;
142 
SecondaryAssociationTest()143     public SecondaryAssociationTest() {
144         rnd = new Random(123);
145         shutdownFlag = new AtomicBoolean(false);
146         failureException = new AtomicReference<Throwable>(null);
147         nWrites = new AtomicInteger(0);
148         nInserts = new AtomicInteger(0);
149         nUpdates = new AtomicInteger(0);
150         nDeletes = new AtomicInteger(0);
151         assoc = new MyAssociation();
152         executor = Executors.newCachedThreadPool();
153     }
154 
155     @Before
setUp()156     public void setUp()
157         throws Exception {
158 
159         super.setUp();
160 
161         final EnvironmentConfig config = new EnvironmentConfig();
162         config.setAllowCreate(true);
163         config.setTransactional(true);
164         config.setDurability(Durability.COMMIT_NO_SYNC);
165 
166         /* Avoid lock timeouts on slow test machines. */
167         config.setLockTimeout(5, TimeUnit.SECONDS);
168 
169         env = new Environment(envHome, config);
170     }
171 
172     @After
tearDown()173     public void tearDown()
174         throws Exception {
175 
176         /* Ensure resources are released for the sake of tests that follow. */
177         try {
178             if (executor != null) {
179                 executor.shutdownNow();
180             }
181         } finally {
182             executor = null;
183             try {
184                 if (env != null) {
185                     env.close();
186                 }
187             } finally {
188                 env = null;
189                 /* Always call superclass method. */
190                 super.tearDown();
191             }
192         }
193     }
194 
195     @Test
concurrentTestsWithBatchMethod()196     public void concurrentTestsWithBatchMethod()
197         throws InterruptedException, ExecutionException, TimeoutException {
198 
199         useBatchMethod = true;
200         concurrentTests();
201     }
202 
203     @Test
concurrentTests()204     public void concurrentTests()
205         throws InterruptedException, ExecutionException, TimeoutException {
206 
207         /* Sleep calls are to let writes/verify run between stages. */
208         createAllTables();
209         final TaskMonitor writeMonitor = startPrimaryWrites();
210         final TaskMonitor verifyMonitor = startVerify();
211         waitForFullPrimaries();
212         addSecondaries();
213         Thread.sleep(SLEEP_MS_BETWEEN_PHASES);
214         removeOnePrimary(writeMonitor, verifyMonitor);
215         Thread.sleep(SLEEP_MS_BETWEEN_PHASES);
216         addOnePrimary(writeMonitor, verifyMonitor);
217         Thread.sleep(SLEEP_MS_BETWEEN_PHASES);
218         removeSecondaries(writeMonitor, verifyMonitor);
219         Thread.sleep(SLEEP_MS_BETWEEN_PHASES);
220         writeMonitor.stop();
221         verifyMonitor.stop();
222         shutdown();
223         closeAllTables();
224         checkFailure();
225     }
226 
createAllTables()227     private void createAllTables() {
228         for (int tableId = 0; tableId < N_TABLES; tableId += 1) {
229             assoc.addTable(tableId);
230         }
231         for (int priId = 0; priId < N_PRIMARIES; priId += 1) {
232             final Database db = openPrimaryDatabase(priId);
233             assoc.addPrimary(priId, db);
234         }
235     }
236 
openPrimaryDatabase(final int priId)237     private Database openPrimaryDatabase(final int priId) {
238         final DatabaseConfig dbConfig = new DatabaseConfig();
239         dbConfig.setTransactional(true);
240         dbConfig.setAllowCreate(true);
241         dbConfig.setExclusiveCreate(true);
242         dbConfig.setSecondaryAssociation(assoc);
243         return env.openDatabase(null, "P" + priId, dbConfig);
244     }
245 
closeAllTables()246     private void closeAllTables() {
247         for (final Database db : assoc.getAllPrimaries()) {
248             db.close();
249         }
250         for (final SecondaryDatabase secDb : assoc.getAllSecondaries()) {
251             secDb.close();
252         }
253     }
254 
addSecondaries()255     private void addSecondaries() {
256         if (VERBOSE) {
257             System.out.println("Start adding secondaries");
258         }
259         for (int secId = 0; secId < N_TABLES; secId += 1) {
260             /* Add one secondary (at most) to each table. */
261             final Collection<SecondaryDatabase> dbsAdded =
262                 new ArrayList<SecondaryDatabase>();
263             for (int tableId = 0; tableId < N_TABLES; tableId += 1) {
264                 if (secId >= tableId) {
265                     continue;
266                 }
267                 final SecondaryConfig dbConfig = new SecondaryConfig();
268                 dbConfig.setTransactional(true);
269                 dbConfig.setAllowCreate(true);
270                 dbConfig.setExclusiveCreate(true);
271                 dbConfig.setSecondaryAssociation(assoc);
272                 dbConfig.setKeyCreator(new MyKeyCreator(secId));
273                 dbConfig.setSortedDuplicates(true);
274                 final SecondaryDatabase db = env.openSecondaryDatabase(
275                     null, "T" + tableId + "S" + secId, null, dbConfig);
276                 /* Enable incremental mode BEFORE adding to association. */
277                 db.startIncrementalPopulation();
278                 assoc.addSecondary(tableId, secId, db);
279                 dbsAdded.add(db);
280                 checkFailure();
281             }
282             /* Populate the secondaries we just created. */
283             for (final Database db : assoc.getAllPrimaries()) {
284                 if (useBatchMethod) {
285                     final DatabaseEntry keyEntry = new DatabaseEntry();
286                     while (db.populateSecondaries(keyEntry, 100)) {
287                         checkFailure();
288                     }
289                 } else {
290                     final DatabaseEntry keyEntry = new DatabaseEntry();
291                     final DatabaseEntry dataEntry = new DatabaseEntry();
292                     final Cursor cursor = db.openCursor(
293                         null, CursorConfig.READ_COMMITTED);
294                     while (cursor.getNext(keyEntry, dataEntry, null) ==
295                         OperationStatus.SUCCESS) {
296                         db.populateSecondaries(null, keyEntry, dataEntry);
297                     }
298                     cursor.close();
299                 }
300             }
301             /* Disable incremental mode now that population is complete. */
302             for (final SecondaryDatabase db : dbsAdded) {
303                 db.endIncrementalPopulation();
304             }
305             if (VERBOSE) {
306                 System.out.format("Added %d secondaries after %,d writes\n",
307                                   dbsAdded.size(), nWrites.get());
308             }
309         }
310         if (VERBOSE) {
311             System.out.println("Done adding secondaries");
312         }
313     }
314 
removeSecondaries(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)315     private void removeSecondaries(final TaskMonitor writeMonitor,
316                                    final TaskMonitor verifyMonitor)
317         throws InterruptedException {
318 
319         if (VERBOSE) {
320             System.out.println("Start removing secondaries");
321         }
322         for (int tableId = 0; tableId < N_TABLES; tableId += 1) {
323             for (int secId = 0; secId < tableId; secId += 1) {
324                 /* 1. Remove from association. */
325                 final SecondaryDatabase db =
326                     assoc.removeSecondary(tableId, secId);
327                 /* 2. Wait for in-progress operations to complete. */
328                 writeMonitor.waitForCleanCycle();
329                 verifyMonitor.waitForCleanCycle();
330                 /* 3. Close/remove database. */
331                 final String dbName = db.getDatabaseName();
332                 db.close();
333                 env.removeDatabase(null, dbName);
334                 checkFailure();
335             }
336             assertEquals(0, assoc.getSecondaries(tableId).size());
337         }
338         if (VERBOSE) {
339             System.out.println("Done removing secondaries");
340         }
341     }
342 
removeOnePrimary(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)343     private void removeOnePrimary(final TaskMonitor writeMonitor,
344                                   final TaskMonitor verifyMonitor)
345         throws InterruptedException {
346 
347         if (VERBOSE) {
348             System.out.println("Start removing primary");
349         }
350 
351         /*
352          * 1. Remove from association.
353          *
354          * Remove last primary, as it has the most secondaries. removedPriId is
355          * set as an indicator that this DB should not longer be used for
356          * verify/writes.
357          */
358         removedPriId = N_PRIMARIES - 1;
359         final Database db = assoc.removePrimary(removedPriId);
360         final long recCount = db.count();
361 
362         if (VERBOSE) {
363             System.out.println("Wait for removed primary operations to stop");
364         }
365 
366         /* 2. Wait for in-progress operations to complete. */
367         writeMonitor.waitForCleanCycle();
368         verifyMonitor.waitForCleanCycle();
369 
370         if (VERBOSE) {
371             System.out.format("Close and remove primary DB with %,d records\n",
372                               recCount);
373         }
374 
375         /* 3. Close/remove database. */
376         final String dbName = db.getDatabaseName();
377         db.close();
378         env.removeDatabase(null, dbName);
379         if (VERBOSE) {
380             System.out.println("Delete obsolete primary keys");
381         }
382         for (final SecondaryDatabase secDb : assoc.getAllSecondaries()) {
383             final DatabaseEntry keyEntry = new DatabaseEntry();
384             final DatabaseEntry dataEntry = new DatabaseEntry();
385             while (secDb.deleteObsoletePrimaryKeys(keyEntry, dataEntry, 100)) {
386                 checkFailure();
387             }
388         }
389         if (VERBOSE) {
390             System.out.println("Done removing primary");
391         }
392     }
393 
addOnePrimary(final TaskMonitor writeMonitor, final TaskMonitor verifyMonitor)394     private void addOnePrimary(final TaskMonitor writeMonitor,
395                                final TaskMonitor verifyMonitor)
396         throws InterruptedException {
397 
398         if (VERBOSE) {
399             System.out.println("Start adding primary");
400         }
401 
402         assertTrue(removedPriId >= 0);
403         assertTrue(addedPriId < 0);
404         assertNull(addedPriDb);
405 
406         addedPriDb = openPrimaryDatabase(addedPriId);
407         addedPriId = removedPriId;
408 
409         final int initialWrites = nWrites.get();
410         while (nWrites.get() - initialWrites < 100000) {
411             Thread.sleep(10);
412             checkFailure();
413         }
414 
415         final long recCount = addedPriDb.count();
416 
417         assoc.addPrimary(addedPriId, addedPriDb);
418 
419         removedPriId = -1;
420         addedPriId = -1;
421         addedPriDb = null;
422 
423         if (VERBOSE) {
424             System.out.format("Done adding primary, wrote %,d\n", recCount);
425         }
426     }
427 
428     /**
429      * Starts two threads to do writes.
430      *
431      * Waits for at least 500 writes before returning, to ensure the next step
432      * is done concurrently with writing.
433      */
434     private TaskMonitor startPrimaryWrites()
435         throws InterruptedException {
436 
437         final AtomicBoolean stopTaskFlag = new AtomicBoolean(false);
438 
439         class WriteTask extends Task {
440             private final AtomicInteger cleanCycle;
441             private final String label;
442 
443             WriteTask(final AtomicInteger cleanCycle, final String label) {
444                 this.cleanCycle = cleanCycle;
445                 this.label = label;
446             }
447 
448             public void execute() {
449                 runPrimaryWrites(stopTaskFlag, cleanCycle, label);
450             }
451         }
452 
453         final AtomicInteger cleanCycle1 = new AtomicInteger(0);
454         final AtomicInteger cleanCycle2 = new AtomicInteger(0);
455         final Runnable task1 = new WriteTask(cleanCycle1, "t1");
456         final Runnable task2 = new WriteTask(cleanCycle2, "t2");
457         final Future<?> future1 = executor.submit(task1);
458         final Future<?> future2 = executor.submit(task2);
459 
460         final int initialWrites = nWrites.get();
461         while (nWrites.get() - initialWrites < 500) {
462             Thread.sleep(10);
463             checkFailure();
464         }
465 
466         final TaskMonitor taskMonitor = new TaskMonitor(stopTaskFlag);
467         taskMonitor.add(future1, cleanCycle1);
468         taskMonitor.add(future2, cleanCycle2);
469         return taskMonitor;
470     }
471 
472     /**
473      * Writes randomly generated primary records until shutdown/stop.
474      *
475      * Since the keyspace is small (64K maximum keys per table), this will
476      * eventually do updates as well as inserts.  For 1/5 records, they are
477      * immediately deleted after being written.
478      */
479     private void runPrimaryWrites(final AtomicBoolean stopTaskFlag,
480                                   final AtomicInteger cleanCycle,
481                                   final String label) {
482         /* Key and data are fixed length. */
483         final byte[] keyBytes =
484             new byte[2 + N_KEY_DISCRIMINATOR_BYTES];
485         final byte[] dataBytes = new byte[N_TABLES];
486         final DatabaseEntry keyEntry = new DatabaseEntry();
487         final DatabaseEntry dataEntry = new DatabaseEntry();
488         /* First byte of key is fixed. */
489         keyBytes[0] = 'T';
490         /* Write until shutdown or stopped. */
491         while (true) {
492             for (int tableId = 0; tableId < N_TABLES; tableId += 1) {
493                 if (shutdownFlag.get() || stopTaskFlag.get()) {
494                     return;
495                 }
496                 /* Second byte of key is table ID. */
497                 keyBytes[1] = (byte) tableId;
498                 /* Rest of key is random. */
499                 for (int j = 2; j < keyBytes.length; j += 1) {
500                     keyBytes[j] = (byte) rnd.nextInt(256);
501                 }
502                 /* Insert or update with random data. */
503                 keyEntry.setData(keyBytes);
504                 dataEntry.setData(dataBytes);
505                 Database priDb = assoc.getPrimary(keyEntry);
506                 if (priDb == null) {
507                     final int priId = getPrimaryId(keyEntry);
508                     if (priId == addedPriId) {
509                         priDb = addedPriDb;
510                     } else {
511                         assertEquals(removedPriId, priId);
512                         cleanCycle.incrementAndGet();
513                         continue;
514                     }
515                 }
516                 rnd.nextBytes(dataBytes);
517                 if (priDb.putNoOverwrite(null, keyEntry, dataEntry) ==
518                     OperationStatus.SUCCESS) {
519                     nInserts.incrementAndGet();
520                 } else {
521                     priDb.put(null, keyEntry, dataEntry);
522                     nUpdates.incrementAndGet();
523                 }
524                 /* Delete 1/5 records written. */
525                 if (rnd.nextInt(5) == 1) {
526                     priDb.delete(null, keyEntry);
527                     nDeletes.incrementAndGet();
528                 }
529                 nWrites.incrementAndGet();
530                 if (VERBOSE && (nWrites.get() % 100000 == 0)) {
531                     printWriteTotals(label);
532                 }
533                 cleanCycle.incrementAndGet();
534             }
535         }
536     }
537 
538     /**
539      * Waits for updates to be at least 1/5 of all writes, meaning that the
540      * keyspace for the primaries has been populated.
541      */
542     private void waitForFullPrimaries()
543         throws InterruptedException {
544 
545         while (4.0 * nUpdates.get() < nInserts.get()) {
546             Thread.sleep(10);
547             checkFailure();
548         }
549         if (VERBOSE) {
550             printWriteTotals("");
551         }
552     }
553 
554     /**
555      * Starts one thread to do verification.
556      */
557     private TaskMonitor startVerify() {
558 
559         final AtomicBoolean stopTaskFlag = new AtomicBoolean(false);
560         final AtomicInteger nPriVerified = new AtomicInteger(0);
561         final AtomicInteger nSecVerified = new AtomicInteger(0);
562         final AtomicInteger cleanCycles = new AtomicInteger(0);
563         final Runnable task = new Task() {
564             public void execute() {
565                 while (!shutdownFlag.get() && !stopTaskFlag.get()) {
566                     runVerify(stopTaskFlag, cleanCycles,
567                               nPriVerified, nSecVerified);
568                 }
569             }
570         };
571 
572         final Future<?> future = executor.submit(task);
573         final TaskMonitor taskMonitor = new TaskMonitor(stopTaskFlag);
574         taskMonitor.add(future, cleanCycles);
575         return taskMonitor;
576     }
577 
578     /**
579      * Checks primary-secondary linkages/integrity, namely that a primary
580      * record contains secondary keys matching the records present in the
581      * secondary databases.
582      */
583     private void runVerify(final AtomicBoolean stopTaskFlag,
584                            final AtomicInteger cleanCycles,
585                            final AtomicInteger nPriVerified,
586                            final AtomicInteger nSecVerified) {
587         final DatabaseEntry keyEntry = new DatabaseEntry();
588         final DatabaseEntry dataEntry = new DatabaseEntry();
589         final DatabaseEntry secKeyEntry = new DatabaseEntry();
590         final DatabaseEntry noReturnData = new DatabaseEntry();
591         noReturnData.setPartial(0, 0, true);
592 
593         for (int priId = 0; priId < N_PRIMARIES; priId += 1) {
594             final Database db = assoc.getPrimary(priId);
595             if (db == null) {
596                 assertEquals(removedPriId, priId);
597                 continue;
598             }
599             final Cursor c = db.openCursor(null, CursorConfig.READ_COMMITTED);
600             try {
601                 while (c.getNext(keyEntry, dataEntry, null) ==
602                        OperationStatus.SUCCESS) {
603                     if (assoc.getPrimary(priId) == null) {
604                         break;
605                     }
606                     final int tableId = keyEntry.getData()[1];
607                     final byte[] dataBytes = dataEntry.getData();
608                     for (int secId = 0; secId < tableId; secId += 1) {
609                         if (shutdownFlag.get() || stopTaskFlag.get()) {
610                             return;
611                         }
612                         final SecondaryDatabase secDb =
613                             assoc.getSecondary(tableId, secId);
614                         if (secDb == null ||
615                             secDb.isIncrementalPopulationEnabled()) {
616                             continue;
617                         }
618                         secKeyEntry.setData(new byte[] {dataBytes[secId]});
619                         final OperationStatus status = secDb.getSearchBoth(
620                             null, secKeyEntry, keyEntry, noReturnData,
621                             LockMode.READ_UNCOMMITTED);
622                         if (OperationStatus.SUCCESS != status) {
623                             if (assoc.getPrimary(priId) == null) {
624                                 break;
625                             }
626                             fail("Sec key missing " + status + ' ' +
627                                  secDb.getDatabaseName() + ' ' + priId + ' ' +
628                                  secKeyEntry + ' ' + keyEntry);
629                         }
630                     }
631                     nPriVerified.incrementAndGet();
632                     if (VERBOSE && nPriVerified.get() % 500000 == 0) {
633                         System.out.format("nPriVerified %,d\n",
634                                           nPriVerified.get());
635                     }
636                 }
637             } finally {
638                 c.close();
639             }
640             cleanCycles.incrementAndGet();
641         }
642 
643         /*
644          * TODO: Perform with normal locking rather than dirty-read, once the
645          * deadlock-free secondary feature is implemented.
646          */
647         for (int tableId = 0; tableId < N_TABLES; tableId += 1) {
648             for (int secId = 0; secId < tableId; secId += 1) {
649                 final SecondaryDatabase secDb =
650                     assoc.getSecondary(tableId, secId);
651                 if (secDb == null ||
652                     secDb.isIncrementalPopulationEnabled()) {
653                     continue;
654                 }
655                 final SecondaryCursor c =
656                     secDb.openCursor(null, CursorConfig.READ_UNCOMMITTED);
657                 try {
658                     while (c.getNext(secKeyEntry, keyEntry, dataEntry, null) ==
659                            OperationStatus.SUCCESS) {
660                         if (shutdownFlag.get() || stopTaskFlag.get()) {
661                             return;
662                         }
663                         assertEquals(tableId, keyEntry.getData()[1]);
664                         assertEquals(dataEntry.getData()[secId],
665                                      secKeyEntry.getData()[0]);
666                         nSecVerified.incrementAndGet();
667                         if (VERBOSE && nSecVerified.get() % 500000 == 0) {
668                             System.out.format("nSecVerified %,d\n",
669                                               nSecVerified.get());
670                         }
671                     }
672                 } finally {
673                     c.close();
674                 }
675                 cleanCycles.incrementAndGet();
676             }
677         }
678     }
679 
680     private class TaskMonitor {
681         private final AtomicBoolean stopFlag;
682         private final List<Future<?>> futures;
683         private final List<AtomicInteger> cleanCycles;
684 
685         TaskMonitor(final AtomicBoolean stopFlag) {
686             this.stopFlag = stopFlag;
687             futures = new ArrayList<Future<?>>();
688             cleanCycles = new ArrayList<AtomicInteger>();
689         }
690 
691         void add(final Future<?> future, final AtomicInteger cleanCycle) {
692             futures.add(future);
693             cleanCycles.add(cleanCycle);
694         }
695 
696         void waitForCleanCycle()
697             throws InterruptedException {
698 
699             final int[] prevCleanCycles = new int[cleanCycles.size()];
700             for (int i = 0; i < prevCleanCycles.length; i += 1) {
701                 prevCleanCycles[i] = cleanCycles.get(i).get();
702             }
703             while (true) {
704                 boolean allDone = true;
705                 for (int i = 0; i < prevCleanCycles.length; i += 1) {
706                     if (prevCleanCycles[i] >= cleanCycles.get(i).get()) {
707                         allDone = false;
708                         break;
709                     }
710                 }
711                 if (allDone) {
712                     break;
713                 }
714                 Thread.sleep(10);
715                 checkFailure();
716             }
717         }
718 
719         void stop()
720             throws InterruptedException, ExecutionException, TimeoutException {
721 
722             stopFlag.set(true);
723             for (final Future<?> future : futures) {
724                 future.get(10, TimeUnit.SECONDS);
725             }
726         }
727     }
728 
729     /**
730      * Saves first exception encountered, which also serves as a failure
731      * indicator -- non-null means failure.
732      */
733     private void noteFailure(Throwable t) {
734 
735         t.printStackTrace(System.out);
736         failureException.compareAndSet(null, t);
737     }
738 
739     /**
740      * If an exception caused a failure, throw it so it appears as the cause of
741      * the JUnit test failure.  This method is meant to be called from the
742      * main thread, i.e., the one running the JUnit test.
743      */
744     private void checkFailure() {
745         final Throwable t = failureException.get();
746         if (t == null) {
747             return;
748         }
749         throw new IllegalStateException(
750             "See cause exception. Other exceptions in output may also be " +
751             "related.", t);
752     }
753 
754     /**
755      * A Runnable that calls an execute() method, that is implemented by the
756      * caller, and handles exceptions.
757      */
758     private abstract class Task implements Runnable {
759 
760         public void run() {
761             try {
762                 execute();
763             } catch (Throwable t) {
764                 noteFailure(t);
765             }
766         }
767 
768         abstract void execute() throws Throwable;
769     }
770 
771     private void shutdown()
772         throws InterruptedException {
773 
774         shutdownFlag.set(true);
775         executor.shutdown();
776         if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
777             executor.shutdownNow();
778             throw new IllegalStateException(
779                 "Could not terminate executor normally");
780         }
781         if (VERBOSE) {
782             printWriteTotals("final");
783         }
784         checkFailure();
785     }
786 
787     private void printWriteTotals(final String label) {
788         System.out.format(
789             "%s nWrites %,d nInserts %,d, nUpdates %,d nDeletes %,d\n", label,
790             nWrites.get(), nInserts.get(), nUpdates.get(), nDeletes.get());
791     }
792 
793     /**
794      * Performs a simplistic (not very evenly distributed) hash of the primary
795      * key to get a primary DB ID between zero and (N_PRIMARIES - 1).  For
796      * this to work best, the primary key should contain some randomly
797      * generated values.
798      */
799     private static int getPrimaryId(final DatabaseEntry primaryKey) {
800         int sum = 0;
801         final byte[] data = primaryKey.getData();
802         for (int i = 0; i < data.length; i += 1) {
803             sum += data[i];
804         }
805         return Math.abs(sum % N_PRIMARIES);
806     }
807 
808     /**
809      * Creates a secondary key from the Nth byte of the primary data, where
810      * N is the secondary ID passed to the constructor.
811      *
812      * TODO replace with new SecondaryKeyExtractor when available.
813      */
814     private static class MyKeyCreator implements SecondaryKeyCreator {
815         private final int secId;
816 
817         MyKeyCreator(int secId) {
818             this.secId = secId;
819         }
820 
821         public boolean createSecondaryKey(SecondaryDatabase secondary,
822                                           DatabaseEntry key,
823                                           DatabaseEntry data,
824                                           DatabaseEntry result) {
825             result.setData(new byte[] {data.getData()[secId]});
826             return true;
827         }
828     }
829 
830     /**
831      * This class implements a SecondaryAssociation in a semi-realistic manner,
832      * simulating an app that maintains associations per logical table.
833      *
834      * However, in a real app, it is expected that the association metadata
835      * would be maintained separately and accessed in a read-only manner via
836      * this class. In other words, this class might not contain methods for
837      * adding and removing members in the association.
838      *
839      * Non-blocking data structures are used to hold association info, to avoid
840      * blocking on the methods in SecondaryAssociation, which are frequently
841      * called by many threads.
842      */
843     private static class MyAssociation implements SecondaryAssociation {
844 
845         /* Maps a primary DB ID to the primary DB. */
846         private final Map<Integer, Database> primaries =
847             new ConcurrentHashMap<Integer, Database>();
848 
849         /* Maps a table ID to its associated secondaries. */
850         private final Map<Integer, Map<Integer, SecondaryDatabase>> tables =
851             new ConcurrentHashMap<Integer, Map<Integer, SecondaryDatabase>>();
852 
853         /* Cheap-to-read indicator that any secondary DBs are present. */
854         private final AtomicInteger nSecondaries = new AtomicInteger(0);
855 
856         public boolean isEmpty() {
857             return (nSecondaries.get() == 0);
858         }
859 
860         public Database getPrimary(final DatabaseEntry primaryKey) {
861             final int priId = getPrimaryId(primaryKey);
862             return getPrimary(priId);
863         }
864 
865         public Collection<SecondaryDatabase> getSecondaries(
866             final DatabaseEntry primaryKey) {
867 
868             final int tableId = primaryKey.getData()[1];
869             return getSecondaries(tableId);
870         }
871 
872         Collection<SecondaryDatabase> getSecondaries(final int tableId) {
873             final Map<Integer, SecondaryDatabase> secondaries =
874                 tables.get(tableId);
875             assertNotNull(secondaries);
876             return secondaries.values();
877         }
878 
879         Database getPrimary(final int priId) {
880             assertTrue(String.valueOf(priId), priId >= 0);
881             assertTrue(String.valueOf(priId), priId < N_PRIMARIES);
882             return primaries.get(priId);
883         }
884 
885         void addPrimary(final int priId, final Database priDb) {
886             final Object oldVal = primaries.put(priId, priDb);
887             assertNull(oldVal);
888         }
889 
890         Database removePrimary(final int priId) {
891             final Database db = primaries.remove(priId);
892             assertNotNull(db);
893             return db;
894         }
895 
896         void addTable(final int tableId) {
897             final Map<Integer, SecondaryDatabase> secondaries =
898                 new ConcurrentHashMap<Integer, SecondaryDatabase>();
899             final Object oldVal = tables.put(tableId, secondaries);
900             assertNull(oldVal);
901         }
902 
903         SecondaryDatabase getSecondary(final int tableId, final int secId) {
904             final Map<Integer, SecondaryDatabase> secondaries =
905                 tables.get(tableId);
906             assertNotNull(secondaries);
907             final SecondaryDatabase secDb = secondaries.get(secId);
908             return secDb;
909         }
910 
911         void addSecondary(final int tableId,
912                           final int secId,
913                           final SecondaryDatabase secDb) {
914             final Map<Integer, SecondaryDatabase> secondaries =
915                 tables.get(tableId);
916             assertNotNull(secondaries);
917             final Object oldVal = secondaries.put(secId, secDb);
918             assertNull(oldVal);
919             nSecondaries.incrementAndGet();
920         }
921 
922         SecondaryDatabase removeSecondary(final int tableId, final int secId) {
923             final Map<Integer, SecondaryDatabase> secondaries =
924                 tables.get(tableId);
925             assertNotNull(secondaries);
926             final SecondaryDatabase secDb = secondaries.remove(secId);
927             assertNotNull(secDb);
928             nSecondaries.decrementAndGet();
929             return secDb;
930         }
931 
932         Collection<Database> getAllPrimaries() {
933             return primaries.values();
934         }
935 
936         Collection<SecondaryDatabase> getAllSecondaries() {
937             final Collection<SecondaryDatabase> dbs =
938                 new ArrayList<SecondaryDatabase>();
939             for (final Map<Integer, SecondaryDatabase> secondaries :
940                  tables.values()) {
941                 dbs.addAll(secondaries.values());
942             }
943             return dbs;
944         }
945     }
946 }
947