1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 import java.io.File;
9 import java.util.ArrayList;
10 import java.util.Map;
11 import java.util.Random;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.CountDownLatch;
14 
15 import com.sleepycat.je.DatabaseException;
16 import com.sleepycat.je.LockConflictException;
17 import com.sleepycat.je.Transaction;
18 import com.sleepycat.je.rep.ReplicatedEnvironment;
19 import com.sleepycat.je.rep.utilint.RepTestUtils;
20 import com.sleepycat.je.rep.utilint.RepTestUtils.RepEnvInfo;
21 import com.sleepycat.persist.EntityCursor;
22 import com.sleepycat.persist.EntityIndex;
23 import com.sleepycat.persist.EntityStore;
24 import com.sleepycat.persist.PrimaryIndex;
25 
26 /**
27  * Applications does reading operations on replica may cause reader transaction
28  * deadlocks, since JE ReplayTxn would steal locks to make sure it can finish
29  * its own work. Simulate such an application and measure how many retries the
30  * reader transactions would do and check whether the log cleaning works as
31  * expected in HA.
32  *
33  * This test uses DPL and is divided into two phases: ramp up stage and steady
34  * stage. It's not a fail-over test, all replicas are alive during the test.
35  *
36  * Configurations
37  * ==========================================================================
38  * envRoot:      environment home root for the whole replication group, it's
39  *               the same as we used in HA unit tests.
40  * repNodeNum:  size of the replication group, default is 2.
41  * dbSize:       number of records in the database, default is 300.
42  * roundPerSync: master would traverse the database for this number before it
43  *               does a sync for the whole group.
44  * totalRounds:  total number of traversing the database in the whole test,
45  *               default is 20.
46  * txnOps:      number of operations the test wants to do protected by a
47  *              transaction, default is 10.
48  * nPriThreads: number of threads reading primary index, default is 2.
49  * nSecThreads: number of threads reading secondary index, default is 2.
50  *
51  * Work Flow
52  * ==========================================================================
53  * During the ramp up stage, master will do "dbSize" insertions and sync whole
54  * replication group.
55  *
56  * During the steady stage, the test would start "nPriThreads + nSecThreads"
57  * reading on replica, all of them read backwards. The reading threads can get
58  * records from primary index and secondary index, and check the data
59  * correctness.
60  *
61  * At the same time, the master would do "totalRounds * dbSize" updating
62  * operations. It would first delete the smallest "txnOps" records in the
63  * database. Next, it will do updates, and the transaction would abort
64  * randomly. At last, insert "txnOps" new records at the end of the database.
65  * After it traverses "roundPerSync" time on the database, the test would sync
66  * the whole group and check node equality.
67  *
68  * After finishing the "totalRounds" database traverse, both the reading
69  * operations on the replica and the update operations on master would stop.
70  * Then the test would close all the replicas and check whether log cleaning
71  * does work in this test.
72  *
73  * How To Run This Test
74  * ==========================================================================
75  * All the test configurations have a default value, except the envRoot, so
76  * you need to assign a directory to "envRoot" to start the test, like:
77  *    java ReplicaReading -envRoot data
78  *
79  * If you want to specify some configurations, please see the usage.
80  */
81 public class ReplicaReading {
82     /* Master of the replication group. */
83     private ReplicatedEnvironment master;
84     private RepEnvInfo[] repEnvInfo;
85     private boolean runnable = true;
86     /* The two variables saves the maximum and minimum reading retry number. */
87     private int minNum = 100;
88     private int maxNum = 0;
89     /* The smallest and largest key in the database. */
90     private volatile int beginKey;
91     private volatile int endKey;
92     /* Number of files deleted by Cleaner on each node. */
93     private long[] fileDeletions;
94 
95     /* ----------------Configurable params-----------------*/
96     /* Environment home root for whole replication group. */
97     private File envRoot;
98     /* Replication group size. */
99     private int nNodes = 2;
100     /* Database size. */
101     private int dbSize = 300;
102     /* Steady state would finish after traversing these rounds of database. */
103     private int totalRounds = 4000;
104     /* Do a sync after traversing the database for these rounds. */
105     private int roundPerSync= 20;
106     /* Transaction commits after doing this number of operations. */
107     private int txnOps = 10;
108     /* Thread number of reading PrimaryIndex on replica. */
109     private int nPriThreads = 2;
110     /* Thread number of reading SecondaryIndex on replica. */
111     private int nSecThreads = 2;
112     private int subDir = 3;
113     /* True if replica reading thread doing reverse reads. */
114     private boolean isReverseRead = true;
115     /* Size of each JE log file. */
116     private String logFileSize = "5000000";
117     /* Checkpointer wakes up when JE writes checkpointBytes bytes. */
118     private String checkpointBytes = "10000000";
119     /* The latch used to start all the threads at the same time. */
120     private CountDownLatch startSignal;
121     /* The latch used to stop the threads. */
122     private CountDownLatch endSignal;
123     private final Random random = new Random();
124     /* Database and PrimaryIndex used in this test.*/
125     private EntityStore dbStore;
126     private PrimaryIndex<Integer, RepTestData> primaryIndex;
127 
128     /*
129      * A map saves abort transactions, represented by txnRounds, maps from
130      * txnRounds to Key, so that we can remove those deleted transactions.
131      */
132     private final ConcurrentHashMap<Integer, Integer> abortMap =
133         new ConcurrentHashMap<Integer, Integer>();
134 
doRampup()135     public void doRampup()
136         throws Exception {
137 
138         repEnvInfo = Utils.setupGroup
139             (envRoot, nNodes, logFileSize, checkpointBytes, subDir);
140         master = Utils.getMaster(repEnvInfo);
141         fileDeletions = new long[nNodes];
142         RepTestData.insertData
143             (Utils.openStore(master, Utils.DB_NAME), dbSize, true);
144         beginKey = 1;
145         endKey = dbSize;
146         Utils.doSyncAndCheck(repEnvInfo);
147     }
148 
149     /*
150      * TODO: when replication mutable property is ready, need to test the two
151      * nodes replication.
152      */
doSteadyState()153     public void doSteadyState()
154         throws Exception {
155 
156         startSignal = new CountDownLatch(1);
157         endSignal = new CountDownLatch(nPriThreads + nSecThreads);
158         /* Start the threads. */
159         startThreads(nPriThreads, false);
160         startThreads(nSecThreads, true);
161         /* Count down the latch, so that all threads start to work. */
162         startSignal.countDown();
163         /* Doing the updates. */
164         doMasterUpdates();
165 
166         /* Print out the minimum and maximum retry number of this test. */
167         if (Utils.VERBOSE) {
168             System.out.println("The minimum retry number is: " + minNum);
169             System.out.println("The maximum retry number is: " + maxNum);
170         }
171 
172         /* Do the sync until the reading threads finish their work. */
173         endSignal.await();
174 
175         RepTestUtils.shutdownRepEnvs(repEnvInfo);
176     }
177 
178     /* Start the reading threads. */
startThreads(int threadNum, boolean secondary)179     private void startThreads(int threadNum,
180                               boolean secondary) {
181         for (int i = 0; i < threadNum; i++) {
182             Thread thread = new ReplicaReadingThread(repEnvInfo[1].getEnv(),
183                                                      secondary);
184             thread.start();
185         }
186     }
187 
188     /* Do the updates on master. */
doMasterUpdates()189     private void doMasterUpdates()
190         throws Exception {
191 
192         int txnRounds = 0;
193         int tempRound = roundPerSync;
194 
195         UpdateRange range = new UpdateRange();
196 
197         openStore();
198 
199         Transaction txn = null;
200         while (runnable) {
201             if (Utils.VERBOSE) {
202                 System.out.println
203                     ("Updating rounds left on Master: " + totalRounds);
204             }
205 
206             /* Traverse the database and do updates. */
207             for (int i = range.getStart(); i <= range.getEnd(); i++) {
208                 /* Create a new transaction for every txnOps operations. */
209                 if ((i - 1) % txnOps == 0) {
210                     txn = master.beginTransaction(null, null);
211                     txnRounds++;
212                 }
213 
214                 /* Do updates. */
215                 if (range.doDelete(i)) {
216                     doDeleteWork(primaryIndex, txn, i);
217                 } else {
218                     /* Updates and inserts are actually putting a record. */
219                     doPutWork(primaryIndex, txn, range, i, txnRounds);
220                 }
221             }
222 
223             /* Shift the range so that it can traverse the database again. */
224             range.shift();
225 
226             /* Exit the loop if the updates have been finished. */
227             if (--totalRounds == 0) {
228                 runnable = false;
229             }
230 
231             /* If a round of traverses finishes, synch the group. */
232             if (--tempRound == 0 || !runnable) {
233                 dbStore.close();
234                 Utils.doSyncAndCheck(repEnvInfo);
235                 if (runnable) {
236                     openStore();
237                 }
238                 tempRound = roundPerSync;
239             }
240         }
241     }
242 
243     /* Open the EntityStore. */
openStore()244     private void openStore()
245         throws Exception {
246 
247         dbStore = Utils.openStore(master, Utils.DB_NAME);
248         primaryIndex =
249             dbStore.getPrimaryIndex(Integer.class, RepTestData.class);
250     }
251 
252     /* Delete records on the database. */
doDeleteWork(PrimaryIndex<Integer, RepTestData> pIndex, Transaction txn, int key)253     private void doDeleteWork(PrimaryIndex<Integer, RepTestData> pIndex,
254                               Transaction txn,
255                               int key)
256         throws Exception {
257 
258         pIndex.delete(txn, key);
259         if (key % txnOps == 0) {
260             txn.commit();
261             /* Increase the beginKey since the smallest key has changed. */
262             beginKey += txnOps;
263             /* Delete those entries whose values are smaller than beginKey. */
264             if (abortMap.size() != 0) {
265                 ArrayList<Integer> keys = new ArrayList<Integer>();
266                 for (Map.Entry<Integer, Integer> entry : abortMap.entrySet()) {
267                     if (entry.getValue() <= beginKey) {
268                         keys.add(entry.getKey());
269                     }
270                 }
271                 for (Integer abortKey : keys) {
272                     abortMap.remove(abortKey);
273                 }
274             }
275         }
276     }
277 
278     /* Put records into database if the operations are updates or inserts. */
doPutWork(PrimaryIndex<Integer, RepTestData> pIndex, Transaction txn, UpdateRange range, int key, int txnRounds)279     private void doPutWork(PrimaryIndex<Integer, RepTestData> pIndex,
280                            Transaction txn,
281                            UpdateRange range,
282                            int key,
283                            int txnRounds)
284         throws Exception {
285 
286         /*
287          * Put a record into the database. If the key exists, doing updates.
288          * If the key doesn't exist, doing inserts.
289          */
290         RepTestData data = new RepTestData();
291         data.setKey(key);
292         data.setData(key);
293         data.setName("test" + txnRounds);
294         pIndex.put(txn, data);
295 
296         if (key % txnOps == 0) {
297             if (range.doUpdate(key)) {
298                 /* Random abort if it's an update operation. */
299                 if (random.nextBoolean()) {
300                     txn.abort();
301                     /* Put this abort data to the abort map. */
302                     abortMap.put(txnRounds, key);
303                 } else {
304                     txn.commit();
305                 }
306             } else {
307                 /* Increase the endKey if the insertion finishes. */
308                 txn.commit();
309                 endKey += txnOps;
310             }
311         }
312     }
313 
parseArgs(String args[])314     protected void parseArgs(String args[])
315         throws Exception {
316 
317         for (int i = 0; i < args.length; i++) {
318             boolean moreArgs = i < args.length - 1;
319             if (args[i].equals("-h") && moreArgs) {
320                 envRoot = new File(args[++i]);
321             } else if (args[i].equals("-repNodeNum") && moreArgs) {
322                 nNodes = Integer.parseInt(args[++i]);
323             } else if (args[i].equals("-dbSize") && moreArgs) {
324                 dbSize = Integer.parseInt(args[++i]);
325             } else if (args[i].equals("-totalRounds") && moreArgs) {
326                 totalRounds = Integer.parseInt(args[++i]);
327             } else if (args[i].equals("-roundPerSync") && moreArgs) {
328                 roundPerSync = Integer.parseInt(args[++i]);
329             } else if (args[i].equals("-txnOps") && moreArgs) {
330                 txnOps = Integer.parseInt(args[++i]);
331             } else if (args[i].equals("-logFileSize") && moreArgs) {
332                 logFileSize = args[++i];
333             } else if (args[i].equals("-checkpointBytes") && moreArgs) {
334                 checkpointBytes = args[++i];
335             } else if (args[i].equals("-nPriThreads") && moreArgs) {
336                 nPriThreads = Integer.parseInt(args[++i]);
337             } else if (args[i].equals("-nSecThreads") && moreArgs) {
338                 nSecThreads = Integer.parseInt(args[++i]);
339             } else if (args[i].equals("-isReverseRead") && moreArgs) {
340                 isReverseRead = Boolean.parseBoolean(args[++i]);
341             } else if (args[i].equals("-subDir") && moreArgs) {
342                 subDir = Integer.parseInt(args[++i]);
343             } else {
344                 usage("Unknown arg: " + args[i]);
345             }
346         }
347 
348         if (nNodes < 2) {
349             throw new IllegalArgumentException
350                 ("Replication group size should > 2!");
351         }
352 
353         if (txnOps >= dbSize || dbSize % txnOps != 0) {
354             throw new IllegalArgumentException
355                 ("dbSize should be larger and integral multiple of txnOps!");
356         }
357     }
358 
359     private void usage(String error) {
360         if (error != null) {
361             System.err.println(error);
362         }
363         System.err.println
364             ("java " + getClass().getName() + "\n" +
365              "     [-h <replication group Environment home dir>]\n" +
366              "     [-repNodeNum <replication group size>]\n" +
367              "     [-dbSize <records' number of the tested database>]\n" +
368              "     [-totalRounds <the total number of traversing the " +
369              "database insteady state>]\n" +
370              "     [-roundPerSync <do a sync in the replication group " +
371              "after traversing the database of this number]\n" +
372              "     [-txnOps <number of operations in each transaction>]\n" +
373              "     [-logFileSize <size of each log file>]\n" +
374              "     [-checkpointBytes <checkpointer wakes up after writing " +
375              "these bytes into the on disk log>]\n" +
376              "     [-nPriThreads <number of threads reading PrimaryIndex " +
377              "on replica>]\n" +
378              "     [-nSecThreads <number of threads reading SecondaryIndex " +
379              "on replica>]\n" +
380              "     [-isReverseRead <true if replica reading threads read " +
381              "backwards>]");
382         System.exit(2);
383     }
384 
385     public static void main(String args[]) {
386         try {
387             ReplicaReading test = new ReplicaReading();
388             test.parseArgs(args);
389             test.doRampup();
390             test.doSteadyState();
391         } catch (Throwable t) {
392             t.printStackTrace(System.err);
393             System.exit(1);
394         }
395     }
396 
397     /*
398      * This test traverses the database multiple times. In each traversal,
399      *  - the first <txnOps> records are deleted
400      *  - the rest of the records are updated,
401      *  - an additional <txnOps> worth of records are inserted, in order to
402      * keep the database the same size. The class saves the range for delete,
403      * update and insert.
404      */
405     class UpdateRange {
406         private int deleteStart;
407         private int deleteEnd;
408         private int updateStart;
409         private int updateEnd;
410         private int insertStart;
411         private int insertEnd;
412 
413         public UpdateRange() {
414             deleteStart = 1;
415             deleteEnd = deleteStart + txnOps - 1;
416             updateStart = deleteEnd + 1;
417             updateEnd = dbSize;
418             insertStart = updateEnd + 1;
419             insertEnd = insertStart + txnOps - 1;
420         }
421 
422         public int getStart() {
423             return deleteStart;
424         }
425 
426         public int getEnd() {
427             return insertEnd;
428         }
429 
430         /* Returns true if the key is in the scope of deletion. */
431         public boolean doDelete(int index) {
432             return (index >= deleteStart) && (index <= deleteEnd);
433         }
434 
435         /* Returns true if the key is in the scope of updates. */
doUpdate(int index)436         public boolean doUpdate(int index) {
437             return (index >= updateStart) && (index <= updateEnd);
438         }
439 
440         /*
441          * Adjust the traversal parameters for the next traverse of the
442          * database, since some records have been deleted and some added.
443          */
shift()444         public void shift() {
445             deleteStart += txnOps;
446             deleteEnd += txnOps;
447             updateStart += txnOps;
448             updateEnd += txnOps;
449             insertStart += txnOps;
450             insertEnd += txnOps;
451         }
452     }
453 
454     /* The reading thread on replica. */
455     class ReplicaReadingThread extends Thread {
456         private final ReplicatedEnvironment repEnv;
457         private boolean secondary;
458         private final ArrayList<RepTestData> list;
459 
ReplicaReadingThread(ReplicatedEnvironment repEnv, boolean secondary)460         public ReplicaReadingThread(ReplicatedEnvironment repEnv,
461                                     boolean secondary) {
462             this.repEnv = repEnv;
463             this.secondary = secondary;
464             list = new ArrayList<RepTestData>();
465         }
466 
467         @Override
run()468         public void run() {
469             try {
470                 startSignal.await();
471 
472                 EntityStore repStore = Utils.openStore(repEnv, Utils.DB_NAME);
473                 EntityIndex<Integer, RepTestData> index =
474                     repStore.getPrimaryIndex(Integer.class, RepTestData.class);
475                 if (secondary) {
476                     index = repStore.getSecondaryIndex
477                        ((PrimaryIndex<Integer, RepTestData>) index,
478                          Integer.class, "data");
479                 }
480 
481                 while (runnable) {
482                     int numIters = (endKey - beginKey) / txnOps;
483                     int startKey = beginKey;
484                     /* Do txnOps read operations during each transaction. */
485                     for (int i = 0; runnable && i < numIters; i++) {
486                         int start = i * txnOps + startKey;
487                         int end = start + txnOps - 1;
488 
489                         /*
490                          * If there is no data between start and end, then
491                          * break and get a new start and end.
492                          */
493                         if (doRetries(start, end, repEnv, index)) {
494                             break;
495                         }
496                     }
497                 }
498                 repStore.close();
499                 endSignal.countDown();
500             } catch (InterruptedException e) {
501                 e.printStackTrace();
502             }
503         }
504 
505         /* Retry if there exists deadlock. */
doRetries(int start, int end, ReplicatedEnvironment env, EntityIndex<Integer, RepTestData> index)506         private boolean doRetries(int start,
507                                   int end,
508                                   ReplicatedEnvironment env,
509                                    EntityIndex<Integer, RepTestData> index) {
510             boolean success = false;
511             boolean noData = false;
512             int maxTries = 100;
513 
514             for (int tries = 0; !success && tries < maxTries; tries++) {
515                 try {
516                     Transaction txn = env.beginTransaction(null, null);
517                     int realStart = 0;
518                     EntityCursor<RepTestData> cursor = null;
519                     try {
520                         cursor = index.entities(txn, null);
521                         realStart = cursor.first(null).getKey();
522                         cursor.close();
523                         cursor =
524                             index.entities(txn, start, true, end, true, null);
525                         noData = addRecordsToList(cursor);
526                         success = true;
527                     } finally {
528                         if (cursor != null) {
529                             cursor.close();
530                         }
531 
532                         if (success) {
533                             if (noData) {
534                                 checkNoDataCorrectness(start, realStart, tries);
535                             } else {
536                                 checkCorrectness(tries);
537                             }
538                             txn.commit();
539                         } else {
540                             txn.abort();
541                         }
542                         list.clear();
543                     }
544                 } catch (LockConflictException e) {
545                     success = false;
546                 }
547             }
548 
549             return noData;
550         }
551 
552         /*
553          * If there is no data in this cursor, return true. If there exists
554          * data in the cursor, add the datas into the list and return false.
555          */
addRecordsToList(EntityCursor<RepTestData> cursor)556         private boolean addRecordsToList(EntityCursor<RepTestData> cursor)
557             throws DatabaseException {
558 
559             if (isReverseRead) {
560                 RepTestData data = cursor.last(null);
561                 if (data == null) {
562                     return true;
563                 } else {
564                     list.add(data);
565                     while ((data = cursor.prev(null)) != null) {
566                         list.add(data);
567                     }
568                 }
569             } else {
570                 RepTestData data = cursor.first(null);
571                 if (data == null) {
572                     return true;
573                 } else {
574                     list.add(data);
575                     while ((data = cursor.next(null)) != null) {
576                         list.add(data);
577                     }
578                 }
579             }
580 
581             return false;
582         }
583 
584         /* Check the correctness if there is no data in the cursor. */
checkNoDataCorrectness(int start, int realStart, int tries)585         private void checkNoDataCorrectness(int start,
586                                             int realStart,
587                                             int tries) {
588             /* Expect the list size to 0. */
589             if (list.size() != 0) {
590                 System.err.println("The expected number of records should " +
591                                    "be 0, but it is " + list.size() + "!");
592                 System.exit(-1);
593             }
594 
595             /*
596              * The actual beginKey should be larger than the specified
597              * beginKey, and the distance between them should be integral
598              * multiple of txnOps.
599              */
600             if (realStart < start || ((realStart - start) % txnOps != 0)) {
601                 System.err.println("There are some deleted key exists in " +
602                                    "database!");
603                 System.err.println("Expected start key is: " + start +
604                                    ", real start key is: " + realStart);
605                 System.exit(-1);
606             }
607             updateRetries(tries);
608         }
609 
checkCorrectness(int tries)610         private void checkCorrectness(int tries) {
611             if (list.size() == txnOps) {
612                 int minus = isReverseRead ? 1 : -1;
613                 RepTestData firstData = list.get(0);
614 
615                 /* Check if the firstData is an abort data. */
616                 if (!("").equals(firstData.getName().substring(4))) {
617                     Integer txnRounds =
618                         new Integer(firstData.getName().substring(4));
619                     /* If this data is in the abort map, fail the test. */
620                     if (abortMap.get(txnRounds) != null) {
621                         System.err.println
622                             ("The reading thread is reading an abort data.");
623                         System.exit(-1);
624                     }
625                 }
626 
627                 /* Check that records in this list are valid. */
628                 for (int i = 0; i < list.size(); i++) {
629                     if (!firstData.logicEquals(list.get(i), i * minus)) {
630                         System.err.println("Reading data is wrong!" +
631                                            "FirstData: " + firstData +
632                                            "WrontData: " + list.get(i));
633                         for (RepTestData each : list) {
634                             System.err.println(each);
635                         }
636                         System.exit(-1);
637                     }
638                 }
639                 updateRetries(tries);
640             } else {
641                 System.err.println("The expected number of records should " +
642                                    "be: " + txnOps + ", but it is " +
643                                    list.size() + "!");
644                 System.exit(-1);
645             }
646         }
647 
updateRetries(int tries)648         private void updateRetries(int tries) {
649             /* Assign the value to maxNum and minNum. */
650             synchronized(this) {
651                 maxNum = maxNum < tries ? tries : maxNum;
652                 minNum = minNum < tries ? minNum : tries;
653             }
654             if (tries > 0 && Utils.VERBOSE) {
655                 System.err.println("Retries this round: " + tries);
656             }
657         }
658     }
659 }
660