1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 import java.io.File; 9 import java.util.ArrayList; 10 import java.util.Map; 11 import java.util.Random; 12 import java.util.concurrent.ConcurrentHashMap; 13 import java.util.concurrent.CountDownLatch; 14 15 import com.sleepycat.je.DatabaseException; 16 import com.sleepycat.je.LockConflictException; 17 import com.sleepycat.je.Transaction; 18 import com.sleepycat.je.rep.ReplicatedEnvironment; 19 import com.sleepycat.je.rep.utilint.RepTestUtils; 20 import com.sleepycat.je.rep.utilint.RepTestUtils.RepEnvInfo; 21 import com.sleepycat.persist.EntityCursor; 22 import com.sleepycat.persist.EntityIndex; 23 import com.sleepycat.persist.EntityStore; 24 import com.sleepycat.persist.PrimaryIndex; 25 26 /** 27 * Applications does reading operations on replica may cause reader transaction 28 * deadlocks, since JE ReplayTxn would steal locks to make sure it can finish 29 * its own work. Simulate such an application and measure how many retries the 30 * reader transactions would do and check whether the log cleaning works as 31 * expected in HA. 32 * 33 * This test uses DPL and is divided into two phases: ramp up stage and steady 34 * stage. It's not a fail-over test, all replicas are alive during the test. 35 * 36 * Configurations 37 * ========================================================================== 38 * envRoot: environment home root for the whole replication group, it's 39 * the same as we used in HA unit tests. 40 * repNodeNum: size of the replication group, default is 2. 41 * dbSize: number of records in the database, default is 300. 42 * roundPerSync: master would traverse the database for this number before it 43 * does a sync for the whole group. 44 * totalRounds: total number of traversing the database in the whole test, 45 * default is 20. 46 * txnOps: number of operations the test wants to do protected by a 47 * transaction, default is 10. 48 * nPriThreads: number of threads reading primary index, default is 2. 49 * nSecThreads: number of threads reading secondary index, default is 2. 50 * 51 * Work Flow 52 * ========================================================================== 53 * During the ramp up stage, master will do "dbSize" insertions and sync whole 54 * replication group. 55 * 56 * During the steady stage, the test would start "nPriThreads + nSecThreads" 57 * reading on replica, all of them read backwards. The reading threads can get 58 * records from primary index and secondary index, and check the data 59 * correctness. 60 * 61 * At the same time, the master would do "totalRounds * dbSize" updating 62 * operations. It would first delete the smallest "txnOps" records in the 63 * database. Next, it will do updates, and the transaction would abort 64 * randomly. At last, insert "txnOps" new records at the end of the database. 65 * After it traverses "roundPerSync" time on the database, the test would sync 66 * the whole group and check node equality. 67 * 68 * After finishing the "totalRounds" database traverse, both the reading 69 * operations on the replica and the update operations on master would stop. 70 * Then the test would close all the replicas and check whether log cleaning 71 * does work in this test. 72 * 73 * How To Run This Test 74 * ========================================================================== 75 * All the test configurations have a default value, except the envRoot, so 76 * you need to assign a directory to "envRoot" to start the test, like: 77 * java ReplicaReading -envRoot data 78 * 79 * If you want to specify some configurations, please see the usage. 80 */ 81 public class ReplicaReading { 82 /* Master of the replication group. */ 83 private ReplicatedEnvironment master; 84 private RepEnvInfo[] repEnvInfo; 85 private boolean runnable = true; 86 /* The two variables saves the maximum and minimum reading retry number. */ 87 private int minNum = 100; 88 private int maxNum = 0; 89 /* The smallest and largest key in the database. */ 90 private volatile int beginKey; 91 private volatile int endKey; 92 /* Number of files deleted by Cleaner on each node. */ 93 private long[] fileDeletions; 94 95 /* ----------------Configurable params-----------------*/ 96 /* Environment home root for whole replication group. */ 97 private File envRoot; 98 /* Replication group size. */ 99 private int nNodes = 2; 100 /* Database size. */ 101 private int dbSize = 300; 102 /* Steady state would finish after traversing these rounds of database. */ 103 private int totalRounds = 4000; 104 /* Do a sync after traversing the database for these rounds. */ 105 private int roundPerSync= 20; 106 /* Transaction commits after doing this number of operations. */ 107 private int txnOps = 10; 108 /* Thread number of reading PrimaryIndex on replica. */ 109 private int nPriThreads = 2; 110 /* Thread number of reading SecondaryIndex on replica. */ 111 private int nSecThreads = 2; 112 private int subDir = 3; 113 /* True if replica reading thread doing reverse reads. */ 114 private boolean isReverseRead = true; 115 /* Size of each JE log file. */ 116 private String logFileSize = "5000000"; 117 /* Checkpointer wakes up when JE writes checkpointBytes bytes. */ 118 private String checkpointBytes = "10000000"; 119 /* The latch used to start all the threads at the same time. */ 120 private CountDownLatch startSignal; 121 /* The latch used to stop the threads. */ 122 private CountDownLatch endSignal; 123 private final Random random = new Random(); 124 /* Database and PrimaryIndex used in this test.*/ 125 private EntityStore dbStore; 126 private PrimaryIndex<Integer, RepTestData> primaryIndex; 127 128 /* 129 * A map saves abort transactions, represented by txnRounds, maps from 130 * txnRounds to Key, so that we can remove those deleted transactions. 131 */ 132 private final ConcurrentHashMap<Integer, Integer> abortMap = 133 new ConcurrentHashMap<Integer, Integer>(); 134 doRampup()135 public void doRampup() 136 throws Exception { 137 138 repEnvInfo = Utils.setupGroup 139 (envRoot, nNodes, logFileSize, checkpointBytes, subDir); 140 master = Utils.getMaster(repEnvInfo); 141 fileDeletions = new long[nNodes]; 142 RepTestData.insertData 143 (Utils.openStore(master, Utils.DB_NAME), dbSize, true); 144 beginKey = 1; 145 endKey = dbSize; 146 Utils.doSyncAndCheck(repEnvInfo); 147 } 148 149 /* 150 * TODO: when replication mutable property is ready, need to test the two 151 * nodes replication. 152 */ doSteadyState()153 public void doSteadyState() 154 throws Exception { 155 156 startSignal = new CountDownLatch(1); 157 endSignal = new CountDownLatch(nPriThreads + nSecThreads); 158 /* Start the threads. */ 159 startThreads(nPriThreads, false); 160 startThreads(nSecThreads, true); 161 /* Count down the latch, so that all threads start to work. */ 162 startSignal.countDown(); 163 /* Doing the updates. */ 164 doMasterUpdates(); 165 166 /* Print out the minimum and maximum retry number of this test. */ 167 if (Utils.VERBOSE) { 168 System.out.println("The minimum retry number is: " + minNum); 169 System.out.println("The maximum retry number is: " + maxNum); 170 } 171 172 /* Do the sync until the reading threads finish their work. */ 173 endSignal.await(); 174 175 RepTestUtils.shutdownRepEnvs(repEnvInfo); 176 } 177 178 /* Start the reading threads. */ startThreads(int threadNum, boolean secondary)179 private void startThreads(int threadNum, 180 boolean secondary) { 181 for (int i = 0; i < threadNum; i++) { 182 Thread thread = new ReplicaReadingThread(repEnvInfo[1].getEnv(), 183 secondary); 184 thread.start(); 185 } 186 } 187 188 /* Do the updates on master. */ doMasterUpdates()189 private void doMasterUpdates() 190 throws Exception { 191 192 int txnRounds = 0; 193 int tempRound = roundPerSync; 194 195 UpdateRange range = new UpdateRange(); 196 197 openStore(); 198 199 Transaction txn = null; 200 while (runnable) { 201 if (Utils.VERBOSE) { 202 System.out.println 203 ("Updating rounds left on Master: " + totalRounds); 204 } 205 206 /* Traverse the database and do updates. */ 207 for (int i = range.getStart(); i <= range.getEnd(); i++) { 208 /* Create a new transaction for every txnOps operations. */ 209 if ((i - 1) % txnOps == 0) { 210 txn = master.beginTransaction(null, null); 211 txnRounds++; 212 } 213 214 /* Do updates. */ 215 if (range.doDelete(i)) { 216 doDeleteWork(primaryIndex, txn, i); 217 } else { 218 /* Updates and inserts are actually putting a record. */ 219 doPutWork(primaryIndex, txn, range, i, txnRounds); 220 } 221 } 222 223 /* Shift the range so that it can traverse the database again. */ 224 range.shift(); 225 226 /* Exit the loop if the updates have been finished. */ 227 if (--totalRounds == 0) { 228 runnable = false; 229 } 230 231 /* If a round of traverses finishes, synch the group. */ 232 if (--tempRound == 0 || !runnable) { 233 dbStore.close(); 234 Utils.doSyncAndCheck(repEnvInfo); 235 if (runnable) { 236 openStore(); 237 } 238 tempRound = roundPerSync; 239 } 240 } 241 } 242 243 /* Open the EntityStore. */ openStore()244 private void openStore() 245 throws Exception { 246 247 dbStore = Utils.openStore(master, Utils.DB_NAME); 248 primaryIndex = 249 dbStore.getPrimaryIndex(Integer.class, RepTestData.class); 250 } 251 252 /* Delete records on the database. */ doDeleteWork(PrimaryIndex<Integer, RepTestData> pIndex, Transaction txn, int key)253 private void doDeleteWork(PrimaryIndex<Integer, RepTestData> pIndex, 254 Transaction txn, 255 int key) 256 throws Exception { 257 258 pIndex.delete(txn, key); 259 if (key % txnOps == 0) { 260 txn.commit(); 261 /* Increase the beginKey since the smallest key has changed. */ 262 beginKey += txnOps; 263 /* Delete those entries whose values are smaller than beginKey. */ 264 if (abortMap.size() != 0) { 265 ArrayList<Integer> keys = new ArrayList<Integer>(); 266 for (Map.Entry<Integer, Integer> entry : abortMap.entrySet()) { 267 if (entry.getValue() <= beginKey) { 268 keys.add(entry.getKey()); 269 } 270 } 271 for (Integer abortKey : keys) { 272 abortMap.remove(abortKey); 273 } 274 } 275 } 276 } 277 278 /* Put records into database if the operations are updates or inserts. */ doPutWork(PrimaryIndex<Integer, RepTestData> pIndex, Transaction txn, UpdateRange range, int key, int txnRounds)279 private void doPutWork(PrimaryIndex<Integer, RepTestData> pIndex, 280 Transaction txn, 281 UpdateRange range, 282 int key, 283 int txnRounds) 284 throws Exception { 285 286 /* 287 * Put a record into the database. If the key exists, doing updates. 288 * If the key doesn't exist, doing inserts. 289 */ 290 RepTestData data = new RepTestData(); 291 data.setKey(key); 292 data.setData(key); 293 data.setName("test" + txnRounds); 294 pIndex.put(txn, data); 295 296 if (key % txnOps == 0) { 297 if (range.doUpdate(key)) { 298 /* Random abort if it's an update operation. */ 299 if (random.nextBoolean()) { 300 txn.abort(); 301 /* Put this abort data to the abort map. */ 302 abortMap.put(txnRounds, key); 303 } else { 304 txn.commit(); 305 } 306 } else { 307 /* Increase the endKey if the insertion finishes. */ 308 txn.commit(); 309 endKey += txnOps; 310 } 311 } 312 } 313 parseArgs(String args[])314 protected void parseArgs(String args[]) 315 throws Exception { 316 317 for (int i = 0; i < args.length; i++) { 318 boolean moreArgs = i < args.length - 1; 319 if (args[i].equals("-h") && moreArgs) { 320 envRoot = new File(args[++i]); 321 } else if (args[i].equals("-repNodeNum") && moreArgs) { 322 nNodes = Integer.parseInt(args[++i]); 323 } else if (args[i].equals("-dbSize") && moreArgs) { 324 dbSize = Integer.parseInt(args[++i]); 325 } else if (args[i].equals("-totalRounds") && moreArgs) { 326 totalRounds = Integer.parseInt(args[++i]); 327 } else if (args[i].equals("-roundPerSync") && moreArgs) { 328 roundPerSync = Integer.parseInt(args[++i]); 329 } else if (args[i].equals("-txnOps") && moreArgs) { 330 txnOps = Integer.parseInt(args[++i]); 331 } else if (args[i].equals("-logFileSize") && moreArgs) { 332 logFileSize = args[++i]; 333 } else if (args[i].equals("-checkpointBytes") && moreArgs) { 334 checkpointBytes = args[++i]; 335 } else if (args[i].equals("-nPriThreads") && moreArgs) { 336 nPriThreads = Integer.parseInt(args[++i]); 337 } else if (args[i].equals("-nSecThreads") && moreArgs) { 338 nSecThreads = Integer.parseInt(args[++i]); 339 } else if (args[i].equals("-isReverseRead") && moreArgs) { 340 isReverseRead = Boolean.parseBoolean(args[++i]); 341 } else if (args[i].equals("-subDir") && moreArgs) { 342 subDir = Integer.parseInt(args[++i]); 343 } else { 344 usage("Unknown arg: " + args[i]); 345 } 346 } 347 348 if (nNodes < 2) { 349 throw new IllegalArgumentException 350 ("Replication group size should > 2!"); 351 } 352 353 if (txnOps >= dbSize || dbSize % txnOps != 0) { 354 throw new IllegalArgumentException 355 ("dbSize should be larger and integral multiple of txnOps!"); 356 } 357 } 358 359 private void usage(String error) { 360 if (error != null) { 361 System.err.println(error); 362 } 363 System.err.println 364 ("java " + getClass().getName() + "\n" + 365 " [-h <replication group Environment home dir>]\n" + 366 " [-repNodeNum <replication group size>]\n" + 367 " [-dbSize <records' number of the tested database>]\n" + 368 " [-totalRounds <the total number of traversing the " + 369 "database insteady state>]\n" + 370 " [-roundPerSync <do a sync in the replication group " + 371 "after traversing the database of this number]\n" + 372 " [-txnOps <number of operations in each transaction>]\n" + 373 " [-logFileSize <size of each log file>]\n" + 374 " [-checkpointBytes <checkpointer wakes up after writing " + 375 "these bytes into the on disk log>]\n" + 376 " [-nPriThreads <number of threads reading PrimaryIndex " + 377 "on replica>]\n" + 378 " [-nSecThreads <number of threads reading SecondaryIndex " + 379 "on replica>]\n" + 380 " [-isReverseRead <true if replica reading threads read " + 381 "backwards>]"); 382 System.exit(2); 383 } 384 385 public static void main(String args[]) { 386 try { 387 ReplicaReading test = new ReplicaReading(); 388 test.parseArgs(args); 389 test.doRampup(); 390 test.doSteadyState(); 391 } catch (Throwable t) { 392 t.printStackTrace(System.err); 393 System.exit(1); 394 } 395 } 396 397 /* 398 * This test traverses the database multiple times. In each traversal, 399 * - the first <txnOps> records are deleted 400 * - the rest of the records are updated, 401 * - an additional <txnOps> worth of records are inserted, in order to 402 * keep the database the same size. The class saves the range for delete, 403 * update and insert. 404 */ 405 class UpdateRange { 406 private int deleteStart; 407 private int deleteEnd; 408 private int updateStart; 409 private int updateEnd; 410 private int insertStart; 411 private int insertEnd; 412 413 public UpdateRange() { 414 deleteStart = 1; 415 deleteEnd = deleteStart + txnOps - 1; 416 updateStart = deleteEnd + 1; 417 updateEnd = dbSize; 418 insertStart = updateEnd + 1; 419 insertEnd = insertStart + txnOps - 1; 420 } 421 422 public int getStart() { 423 return deleteStart; 424 } 425 426 public int getEnd() { 427 return insertEnd; 428 } 429 430 /* Returns true if the key is in the scope of deletion. */ 431 public boolean doDelete(int index) { 432 return (index >= deleteStart) && (index <= deleteEnd); 433 } 434 435 /* Returns true if the key is in the scope of updates. */ doUpdate(int index)436 public boolean doUpdate(int index) { 437 return (index >= updateStart) && (index <= updateEnd); 438 } 439 440 /* 441 * Adjust the traversal parameters for the next traverse of the 442 * database, since some records have been deleted and some added. 443 */ shift()444 public void shift() { 445 deleteStart += txnOps; 446 deleteEnd += txnOps; 447 updateStart += txnOps; 448 updateEnd += txnOps; 449 insertStart += txnOps; 450 insertEnd += txnOps; 451 } 452 } 453 454 /* The reading thread on replica. */ 455 class ReplicaReadingThread extends Thread { 456 private final ReplicatedEnvironment repEnv; 457 private boolean secondary; 458 private final ArrayList<RepTestData> list; 459 ReplicaReadingThread(ReplicatedEnvironment repEnv, boolean secondary)460 public ReplicaReadingThread(ReplicatedEnvironment repEnv, 461 boolean secondary) { 462 this.repEnv = repEnv; 463 this.secondary = secondary; 464 list = new ArrayList<RepTestData>(); 465 } 466 467 @Override run()468 public void run() { 469 try { 470 startSignal.await(); 471 472 EntityStore repStore = Utils.openStore(repEnv, Utils.DB_NAME); 473 EntityIndex<Integer, RepTestData> index = 474 repStore.getPrimaryIndex(Integer.class, RepTestData.class); 475 if (secondary) { 476 index = repStore.getSecondaryIndex 477 ((PrimaryIndex<Integer, RepTestData>) index, 478 Integer.class, "data"); 479 } 480 481 while (runnable) { 482 int numIters = (endKey - beginKey) / txnOps; 483 int startKey = beginKey; 484 /* Do txnOps read operations during each transaction. */ 485 for (int i = 0; runnable && i < numIters; i++) { 486 int start = i * txnOps + startKey; 487 int end = start + txnOps - 1; 488 489 /* 490 * If there is no data between start and end, then 491 * break and get a new start and end. 492 */ 493 if (doRetries(start, end, repEnv, index)) { 494 break; 495 } 496 } 497 } 498 repStore.close(); 499 endSignal.countDown(); 500 } catch (InterruptedException e) { 501 e.printStackTrace(); 502 } 503 } 504 505 /* Retry if there exists deadlock. */ doRetries(int start, int end, ReplicatedEnvironment env, EntityIndex<Integer, RepTestData> index)506 private boolean doRetries(int start, 507 int end, 508 ReplicatedEnvironment env, 509 EntityIndex<Integer, RepTestData> index) { 510 boolean success = false; 511 boolean noData = false; 512 int maxTries = 100; 513 514 for (int tries = 0; !success && tries < maxTries; tries++) { 515 try { 516 Transaction txn = env.beginTransaction(null, null); 517 int realStart = 0; 518 EntityCursor<RepTestData> cursor = null; 519 try { 520 cursor = index.entities(txn, null); 521 realStart = cursor.first(null).getKey(); 522 cursor.close(); 523 cursor = 524 index.entities(txn, start, true, end, true, null); 525 noData = addRecordsToList(cursor); 526 success = true; 527 } finally { 528 if (cursor != null) { 529 cursor.close(); 530 } 531 532 if (success) { 533 if (noData) { 534 checkNoDataCorrectness(start, realStart, tries); 535 } else { 536 checkCorrectness(tries); 537 } 538 txn.commit(); 539 } else { 540 txn.abort(); 541 } 542 list.clear(); 543 } 544 } catch (LockConflictException e) { 545 success = false; 546 } 547 } 548 549 return noData; 550 } 551 552 /* 553 * If there is no data in this cursor, return true. If there exists 554 * data in the cursor, add the datas into the list and return false. 555 */ addRecordsToList(EntityCursor<RepTestData> cursor)556 private boolean addRecordsToList(EntityCursor<RepTestData> cursor) 557 throws DatabaseException { 558 559 if (isReverseRead) { 560 RepTestData data = cursor.last(null); 561 if (data == null) { 562 return true; 563 } else { 564 list.add(data); 565 while ((data = cursor.prev(null)) != null) { 566 list.add(data); 567 } 568 } 569 } else { 570 RepTestData data = cursor.first(null); 571 if (data == null) { 572 return true; 573 } else { 574 list.add(data); 575 while ((data = cursor.next(null)) != null) { 576 list.add(data); 577 } 578 } 579 } 580 581 return false; 582 } 583 584 /* Check the correctness if there is no data in the cursor. */ checkNoDataCorrectness(int start, int realStart, int tries)585 private void checkNoDataCorrectness(int start, 586 int realStart, 587 int tries) { 588 /* Expect the list size to 0. */ 589 if (list.size() != 0) { 590 System.err.println("The expected number of records should " + 591 "be 0, but it is " + list.size() + "!"); 592 System.exit(-1); 593 } 594 595 /* 596 * The actual beginKey should be larger than the specified 597 * beginKey, and the distance between them should be integral 598 * multiple of txnOps. 599 */ 600 if (realStart < start || ((realStart - start) % txnOps != 0)) { 601 System.err.println("There are some deleted key exists in " + 602 "database!"); 603 System.err.println("Expected start key is: " + start + 604 ", real start key is: " + realStart); 605 System.exit(-1); 606 } 607 updateRetries(tries); 608 } 609 checkCorrectness(int tries)610 private void checkCorrectness(int tries) { 611 if (list.size() == txnOps) { 612 int minus = isReverseRead ? 1 : -1; 613 RepTestData firstData = list.get(0); 614 615 /* Check if the firstData is an abort data. */ 616 if (!("").equals(firstData.getName().substring(4))) { 617 Integer txnRounds = 618 new Integer(firstData.getName().substring(4)); 619 /* If this data is in the abort map, fail the test. */ 620 if (abortMap.get(txnRounds) != null) { 621 System.err.println 622 ("The reading thread is reading an abort data."); 623 System.exit(-1); 624 } 625 } 626 627 /* Check that records in this list are valid. */ 628 for (int i = 0; i < list.size(); i++) { 629 if (!firstData.logicEquals(list.get(i), i * minus)) { 630 System.err.println("Reading data is wrong!" + 631 "FirstData: " + firstData + 632 "WrontData: " + list.get(i)); 633 for (RepTestData each : list) { 634 System.err.println(each); 635 } 636 System.exit(-1); 637 } 638 } 639 updateRetries(tries); 640 } else { 641 System.err.println("The expected number of records should " + 642 "be: " + txnOps + ", but it is " + 643 list.size() + "!"); 644 System.exit(-1); 645 } 646 } 647 updateRetries(int tries)648 private void updateRetries(int tries) { 649 /* Assign the value to maxNum and minNum. */ 650 synchronized(this) { 651 maxNum = maxNum < tries ? tries : maxNum; 652 minNum = minNum < tries ? minNum : tries; 653 } 654 if (tries > 0 && Utils.VERBOSE) { 655 System.err.println("Retries this round: " + tries); 656 } 657 } 658 } 659 } 660