1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.utilint; 9 10 import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY; 11 12 import java.io.ByteArrayOutputStream; 13 import java.io.File; 14 import java.io.FileInputStream; 15 import java.io.FileNotFoundException; 16 import java.io.FileOutputStream; 17 import java.io.IOException; 18 import java.io.PrintStream; 19 import java.net.InetAddress; 20 import java.net.UnknownHostException; 21 import java.util.Arrays; 22 import java.util.HashMap; 23 import java.util.HashSet; 24 import java.util.LinkedList; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Properties; 28 import java.util.Set; 29 import java.util.concurrent.Callable; 30 import java.util.concurrent.CountDownLatch; 31 import java.util.concurrent.TimeUnit; 32 33 import org.junit.Assert; 34 35 import com.sleepycat.bind.tuple.StringBinding; 36 import com.sleepycat.bind.tuple.TupleInput; 37 import com.sleepycat.je.CommitToken; 38 import com.sleepycat.je.Cursor; 39 import com.sleepycat.je.CursorConfig; 40 import com.sleepycat.je.Database; 41 import com.sleepycat.je.DatabaseConfig; 42 import com.sleepycat.je.DatabaseEntry; 43 import com.sleepycat.je.DatabaseException; 44 import com.sleepycat.je.DbInternal; 45 import com.sleepycat.je.Durability; 46 import com.sleepycat.je.Durability.ReplicaAckPolicy; 47 import com.sleepycat.je.Durability.SyncPolicy; 48 import com.sleepycat.je.Environment; 49 import com.sleepycat.je.EnvironmentConfig; 50 import com.sleepycat.je.LockMode; 51 import com.sleepycat.je.OperationStatus; 52 import com.sleepycat.je.ReplicaConsistencyPolicy; 53 import com.sleepycat.je.Transaction; 54 import com.sleepycat.je.TransactionConfig; 55 import com.sleepycat.je.cleaner.VerifyUtils; 56 import com.sleepycat.je.config.ConfigParam; 57 import com.sleepycat.je.dbi.DbType; 58 import com.sleepycat.je.dbi.EnvironmentImpl; 59 import com.sleepycat.je.log.FileManager; 60 import com.sleepycat.je.rep.CommitPointConsistencyPolicy; 61 import com.sleepycat.je.rep.GroupShutdownException; 62 import com.sleepycat.je.rep.InsufficientLogException; 63 import com.sleepycat.je.rep.InsufficientReplicasException; 64 import com.sleepycat.je.rep.NetworkRestore; 65 import com.sleepycat.je.rep.NetworkRestoreConfig; 66 import com.sleepycat.je.rep.NodeType; 67 import com.sleepycat.je.rep.QuorumPolicy; 68 import com.sleepycat.je.rep.RepInternal; 69 import com.sleepycat.je.rep.ReplicaConsistencyException; 70 import com.sleepycat.je.rep.ReplicatedEnvironment; 71 import com.sleepycat.je.rep.ReplicatedEnvironment.State; 72 import com.sleepycat.je.rep.ReplicationConfig; 73 import com.sleepycat.je.rep.ReplicationNetworkConfig; 74 import com.sleepycat.je.rep.UnknownMasterException; 75 import com.sleepycat.je.rep.elections.Acceptor; 76 import com.sleepycat.je.rep.elections.Learner; 77 import com.sleepycat.je.rep.impl.PointConsistencyPolicy; 78 import com.sleepycat.je.rep.impl.RepGroupDB; 79 import com.sleepycat.je.rep.impl.RepGroupDB.GroupBinding; 80 import com.sleepycat.je.rep.impl.RepGroupDB.NodeBinding; 81 import com.sleepycat.je.rep.impl.RepGroupImpl; 82 import com.sleepycat.je.rep.impl.RepImpl; 83 import com.sleepycat.je.rep.impl.RepNodeImpl; 84 import com.sleepycat.je.rep.impl.RepParams; 85 import com.sleepycat.je.rep.impl.RepTestBase; 86 import com.sleepycat.je.rep.impl.node.FeederManager; 87 import com.sleepycat.je.rep.impl.node.NameIdPair; 88 import com.sleepycat.je.rep.impl.node.RepNode; 89 import com.sleepycat.je.rep.stream.FeederReader; 90 import com.sleepycat.je.rep.stream.OutputWireRecord; 91 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup.TestHook; 92 import com.sleepycat.je.rep.utilint.RepUtils.ConsistencyPolicyFormat; 93 import com.sleepycat.je.rep.vlsn.VLSNIndex; 94 import com.sleepycat.je.rep.vlsn.VLSNRange; 95 import com.sleepycat.je.util.DbBackup; 96 import com.sleepycat.je.util.TestUtils; 97 import com.sleepycat.je.utilint.DbLsn; 98 import com.sleepycat.je.utilint.VLSN; 99 import com.sleepycat.util.test.SharedTestUtils; 100 101 /** 102 * Static utility methods and instances for replication unit tests. 103 * 104 * Examples of useful constructs here are methods that: 105 * <ul> 106 * <li>Create multiple environment directories suitable for unit testing 107 * a set of replicated nodes. 108 * <li>Create a router config that is initialized with exception and event 109 * listeners that will dump asynchronous exceptions to stderr, and which 110 * can be conditionalized to ignore exceptions at certain points when the 111 * test expects a disconnected node or other error condition. 112 * <li>Methods that compare two environments to see if they have a common 113 * replication stream. 114 * <li>etc ... 115 * </ul> 116 */ 117 public class RepTestUtils { 118 119 public static final String TEST_HOST = "localhost"; 120 private static final String REPDIR = "rep"; 121 public static final String TEST_REP_GROUP_NAME = "UnitTestGroup"; 122 private static final String[] BUP_SUFFIXES = { FileManager.BUP_SUFFIX }; 123 124 /* 125 * If -DoverridePort=<val> is set, then replication groups will be 126 * set up with this default port value. 127 */ 128 public static final String OVERRIDE_PORT = "overridePort"; 129 130 /* 131 * If -DlongTimeout is true, then this test will run with very long 132 * timeouts, to make interactive debugging easier. 133 */ 134 private static final boolean longTimeout = 135 Boolean.getBoolean("longTimeout"); 136 137 public static final int MINUTE_MS = 60*1000; 138 139 /* Time to wait for each node to start up and join the group. */ 140 private static final long JOIN_WAIT_TIME = 20000; 141 142 /* The basis for varying log file size */ 143 private static int envCount = 1; 144 145 /* Convenient constants */ 146 147 public final static Durability SYNC_SYNC_ALL_DURABILITY = 148 new Durability(Durability.SyncPolicy.SYNC, 149 Durability.SyncPolicy.SYNC, 150 Durability.ReplicaAckPolicy.ALL); 151 152 public final static Durability SYNC_SYNC_NONE_DURABILITY = 153 new Durability(Durability.SyncPolicy.SYNC, 154 Durability.SyncPolicy.SYNC, 155 Durability.ReplicaAckPolicy.NONE); 156 157 public final static Durability WNSYNC_NONE_DURABILITY = 158 new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, 159 Durability.SyncPolicy.WRITE_NO_SYNC, 160 Durability.ReplicaAckPolicy.NONE); 161 162 public static final Durability DEFAULT_DURABILITY = 163 new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, 164 Durability.SyncPolicy.WRITE_NO_SYNC, 165 Durability.ReplicaAckPolicy.SIMPLE_MAJORITY); 166 167 public final static TransactionConfig SYNC_SYNC_ALL_TC = 168 new TransactionConfig().setDurability(SYNC_SYNC_ALL_DURABILITY); 169 170 public final static TransactionConfig SYNC_SYNC_NONE_TC = 171 new TransactionConfig().setDurability(SYNC_SYNC_NONE_DURABILITY); 172 173 public final static TransactionConfig WNSYNC_NONE_TC = 174 new TransactionConfig().setDurability(WNSYNC_NONE_DURABILITY); 175 176 public static final TransactionConfig NO_CONSISTENCY_TC = 177 new TransactionConfig() 178 .setConsistencyPolicy(NO_CONSISTENCY) 179 .setDurability(SYNC_SYNC_NONE_DURABILITY); 180 getRepEnvDirs(File envRoot, int nNodes)181 public static File[] getRepEnvDirs(File envRoot, int nNodes) { 182 File envDirs[] = new File[nNodes]; 183 for (int i=0; i < nNodes; i++) { 184 envDirs[i] = new File(envRoot, RepTestUtils.REPDIR + i); 185 } 186 return envDirs; 187 } 188 189 /** 190 * Create nNode directories within the envRoot directory nodes, for housing 191 * a set of replicated environments. Each directory will be named 192 * <envRoot>/rep#, i.e <envRoot>/rep1, <envRoot>/rep2, etc. 193 */ makeRepEnvDirs(File envRoot, int nNodes)194 public static File[] makeRepEnvDirs(File envRoot, int nNodes) 195 throws IOException { 196 197 File[] envHomes = new File[nNodes]; 198 for (int i = 0; i < nNodes; i++) { 199 envHomes[i] = makeRepEnvDir(envRoot, i); 200 } 201 return envHomes; 202 } 203 204 /** 205 * Create a directory within the envRoot directory nodes for housing a 206 * single replicated environment. The directory will be named 207 * <envRoot>/rep<i> 208 */ makeRepEnvDir(File envRoot, int i)209 public static File makeRepEnvDir(File envRoot, int i) 210 throws IOException { 211 212 File jeProperties = new File(envRoot, "je.properties"); 213 File envHome = new File(envRoot, REPDIR + i); 214 envHome.mkdir(); 215 216 /* Copy the test je.properties into the new directories. */ 217 File repProperties = new File(envHome, "je.properties"); 218 FileInputStream from = null; 219 FileOutputStream to = null; 220 try { 221 try { 222 from = new FileInputStream(jeProperties); 223 } catch (FileNotFoundException e) { 224 jeProperties.createNewFile(); 225 226 from = new FileInputStream(jeProperties); 227 } 228 to = new FileOutputStream(repProperties); 229 byte[] buffer = new byte[4096]; 230 int bytesRead; 231 232 while ((bytesRead = from.read(buffer)) != -1) { 233 to.write(buffer, 0, bytesRead); 234 } 235 } finally { 236 if (from != null) { 237 try { 238 from.close(); 239 } catch (IOException ignore) { 240 } 241 } 242 if (to != null) { 243 try { 244 to.close(); 245 } catch (IOException ignore) { 246 } 247 } 248 } 249 250 return envHome; 251 } 252 253 /* Create the sub directories for replicated Environments. */ createRepSubDirs(RepEnvInfo[] repEnvInfo, int subDirNumber)254 public static void createRepSubDirs(RepEnvInfo[] repEnvInfo, 255 int subDirNumber) { 256 for (RepEnvInfo envInfo : repEnvInfo) { 257 if (envInfo != null) { 258 TestUtils.createEnvHomeWithSubDir 259 (envInfo.getEnvHome(), subDirNumber); 260 } 261 } 262 } 263 264 /* Remove the sub directories inside the replicated Environment home. */ removeRepSubDirs(RepEnvInfo[] repEnvInfo)265 public static void removeRepSubDirs(RepEnvInfo[] repEnvInfo) { 266 for (RepEnvInfo envInfo : repEnvInfo) { 267 if (envInfo != null) { 268 TestUtils.removeSubDirs(envInfo.getEnvHome()); 269 } 270 } 271 } 272 273 /** Convenience method to {@link #removeRepEnv} multiple nodes. */ removeRepDirs(RepEnvInfo... repEnvInfo)274 public static void removeRepDirs(RepEnvInfo... repEnvInfo) { 275 for (RepEnvInfo envInfo : repEnvInfo) { 276 if (envInfo != null) { 277 removeRepEnv(envInfo.getEnvHome()); 278 } 279 } 280 } 281 282 /** 283 * Remove all the log files in the <envRoot>/rep* directories directory. 284 */ removeRepEnvironments(File envRoot)285 public static void removeRepEnvironments(File envRoot) { 286 File[] repEnvs = envRoot.listFiles(); 287 for (File repEnv : repEnvs) { 288 if (repEnv.isDirectory()) { 289 removeRepEnv(repEnv); 290 } 291 } 292 removeRepEnv(envRoot); 293 } 294 295 /** Removes log/lck/bkp files from a single env home directory. */ removeRepEnv(File envHome)296 public static void removeRepEnv(File envHome) { 297 TestUtils.removeLogFiles("removeRepEnv", envHome, false); 298 new File(envHome, "je.lck").delete(); 299 removeBackupFiles(envHome); 300 } 301 removeBackupFiles(File repEnv)302 private static void removeBackupFiles(File repEnv) { 303 for (String fileName : 304 FileManager.listFiles(repEnv, BUP_SUFFIXES, false)) { 305 new File(repEnv, fileName).delete(); 306 } 307 } 308 309 /** 310 * Create an array of environments, with basically the same environment 311 * configuration. 312 */ 313 setupEnvInfos(File envRoot, int nNodes)314 public static RepEnvInfo[] setupEnvInfos(File envRoot, int nNodes) 315 throws IOException { 316 317 return setupEnvInfos(envRoot, nNodes, DEFAULT_DURABILITY); 318 } 319 320 /** 321 * Fill in an array of environments, with basically the same environment 322 * configuration. Only fill in the array slots which are null. Used to 323 * initialize semi-populated set of environments. 324 * @throws IOException 325 */ setupEnvInfos(File envRoot, int nNodes, Durability envDurability)326 public static RepEnvInfo[] setupEnvInfos(File envRoot, 327 int nNodes, 328 Durability envDurability) 329 throws IOException { 330 331 File[] envHomes = makeRepEnvDirs(envRoot, nNodes); 332 RepEnvInfo[] repEnvInfo = new RepEnvInfo[envHomes.length]; 333 334 for (int i = 0; i < repEnvInfo.length; i++) { 335 repEnvInfo[i] = setupEnvInfo(envHomes[i], 336 envDurability, 337 (short) (i + 1), // nodeId 338 repEnvInfo[0]); 339 } 340 return repEnvInfo; 341 } 342 setupEnvInfos(File envRoot, int nNodes, EnvironmentConfig envConfig)343 public static RepEnvInfo[] setupEnvInfos(File envRoot, 344 int nNodes, 345 EnvironmentConfig envConfig) 346 throws IOException { 347 348 return setupEnvInfos(envRoot, nNodes, envConfig, null); 349 } 350 setupEnvInfos(File envRoot, int nNodes, EnvironmentConfig envConfig, ReplicationConfig repConfig)351 public static RepEnvInfo[] setupEnvInfos(File envRoot, 352 int nNodes, 353 EnvironmentConfig envConfig, 354 ReplicationConfig repConfig) 355 throws IOException { 356 357 File[] envdirs = makeRepEnvDirs(envRoot, nNodes); 358 RepEnvInfo[] repEnvInfo = new RepEnvInfo[envdirs.length]; 359 360 for (int i = 0; i < repEnvInfo.length; i++) { 361 repEnvInfo[i] = setupEnvInfo(envdirs[i], 362 envConfig.clone(), 363 createRepConfig(repConfig, i + 1), 364 repEnvInfo[0]); 365 } 366 return repEnvInfo; 367 } 368 369 /** 370 * Adds an additional replicated environment to the specified list and 371 * returns the extended list. Uses the ReplicationConfig from the first 372 * initial node as the basis for the new node. 373 */ setupExtendEnvInfo( final RepEnvInfo[] initialEnvInfo, final int nNodes)374 public static RepEnvInfo[] setupExtendEnvInfo( 375 final RepEnvInfo[] initialEnvInfo, 376 final int nNodes) 377 throws IOException { 378 379 final int initialNodesCount = initialEnvInfo.length; 380 final RepEnvInfo[] result = 381 Arrays.copyOf(initialEnvInfo, initialNodesCount + nNodes); 382 final File envRoot = initialEnvInfo[0].getEnvHome().getParentFile(); 383 final ReplicationConfig baseRepConfig = 384 initialEnvInfo[0].getRepConfig(); 385 for (int i = 0; i < nNodes; i++) { 386 final int pos = initialNodesCount + i; 387 result[pos] = setupEnvInfo(makeRepEnvDir(envRoot, pos), 388 createEnvConfig(DEFAULT_DURABILITY), 389 createRepConfig(baseRepConfig, pos + 1), 390 initialEnvInfo[0]); 391 } 392 return result; 393 } 394 395 /** 396 * Create info for a single replicated environment. 397 */ setupEnvInfo(File envHome, Durability envDurability, int nodeId, RepEnvInfo helper)398 public static RepEnvInfo setupEnvInfo(File envHome, 399 Durability envDurability, 400 int nodeId, 401 RepEnvInfo helper) { 402 403 EnvironmentConfig envConfig = createEnvConfig(envDurability); 404 return setupEnvInfo(envHome, envConfig, nodeId, helper); 405 } 406 407 /** 408 * Create info for a single replicated environment. 409 */ setupEnvInfo(File envHome, EnvironmentConfig envConfig, int nodeId, RepEnvInfo helper)410 public static RepEnvInfo setupEnvInfo(File envHome, 411 EnvironmentConfig envConfig, 412 int nodeId, 413 RepEnvInfo helper) { 414 return setupEnvInfo(envHome, 415 envConfig, 416 createRepConfig(nodeId), 417 helper); 418 } 419 420 /** 421 * Create info for a single replicated environment. 422 */ setupEnvInfo(File envHome, EnvironmentConfig envConfig, ReplicationConfig repConfig, RepEnvInfo helper)423 public static RepEnvInfo setupEnvInfo(File envHome, 424 EnvironmentConfig envConfig, 425 ReplicationConfig repConfig, 426 RepEnvInfo helper) { 427 428 /* 429 * Give all the environments the same environment configuration. 430 * 431 * If the file size is not set by the calling test, stagger their log 432 * file length to give them slightly different logs and VLSNs. Disable 433 * parameter validation because we want to make the log file length 434 * smaller than the minimums, for testing. 435 */ 436 if (!envConfig.isConfigParamSet(EnvironmentConfig.LOG_FILE_MAX)) { 437 DbInternal.disableParameterValidation(envConfig); 438 /* Vary the file size */ 439 long fileLen = ((envCount++ % 100)+1) * 10000; 440 envConfig.setConfigParam(EnvironmentConfig.LOG_FILE_MAX, 441 Long.toString(fileLen)); 442 } 443 444 repConfig.setHelperHosts((helper == null) ? 445 repConfig.getNodeHostPort() : 446 helper.getRepConfig().getNodeHostPort()); 447 448 /* 449 * If -DlongTimeout is true, then this test will run with very long 450 * timeouts, to make interactive debugging easier. 451 */ 452 if (longTimeout) { 453 setLongTimeouts(repConfig); 454 } 455 456 /* 457 * If -DlongAckTimeout is true, then the test will set the 458 * REPLICA_TIMEOUT to 50secs. 459 */ 460 if (Boolean.getBoolean("longAckTimeout")) { 461 repConfig.setReplicaAckTimeout(50, TimeUnit.SECONDS); 462 } 463 return new RepEnvInfo(envHome, repConfig, envConfig); 464 } 465 createEnvConfig(Durability envDurability)466 public static EnvironmentConfig createEnvConfig(Durability envDurability) { 467 EnvironmentConfig envConfig = new EnvironmentConfig(); 468 envConfig.setAllowCreate(true); 469 envConfig.setTransactional(true); 470 envConfig.setDurability(envDurability); 471 472 /* 473 * Replicated tests use multiple environments, configure shared cache 474 * to reduce the memory consumption. 475 */ 476 envConfig.setSharedCache(true); 477 478 return envConfig; 479 } 480 481 /** 482 * Create a test RepConfig for the node with the specified id. Note that 483 * the helper is not configured. 484 */ createRepConfig(int nodeId)485 public static ReplicationConfig createRepConfig(int nodeId) 486 throws NumberFormatException, IllegalArgumentException { 487 488 return createRepConfig(null, nodeId); 489 } 490 getDefaultPort()491 private static int getDefaultPort() { 492 return Integer.getInteger 493 (OVERRIDE_PORT, 494 Integer.parseInt(RepParams.DEFAULT_PORT.getDefault())); 495 } 496 497 /** 498 * Create a test RepConfig for the node with the specified id, using the 499 * specified repConfig. The repConfig may have other parameters set 500 * already. Note that the helper is not configured. 501 */ 502 public static createRepConfig(ReplicationConfig repConfig, int nodeId)503 ReplicationConfig createRepConfig(ReplicationConfig repConfig, 504 int nodeId) 505 throws NumberFormatException, IllegalArgumentException { 506 507 ReplicationConfig filledInConfig = 508 repConfig == null ? new ReplicationConfig() : repConfig.clone(); 509 510 final int firstPort = getDefaultPort(); 511 filledInConfig.setConfigParam 512 (RepParams.ENV_SETUP_TIMEOUT.getName(), "60 s"); 513 filledInConfig.setConfigParam 514 (ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, "60 s"); 515 filledInConfig.setGroupName(TEST_REP_GROUP_NAME); 516 filledInConfig.setNodeName("Node" + nodeId); 517 String nodeHost = TEST_HOST + ":" + (firstPort + (nodeId - 1)); 518 filledInConfig.setNodeHostPort(nodeHost); 519 520 /* Minimize socket bind exceptions in tests. */ 521 filledInConfig.setConfigParam(RepParams.SO_REUSEADDR.getName(), 522 "true"); 523 filledInConfig.setConfigParam(RepParams.SO_BIND_WAIT_MS.getName(), 524 "150000"); 525 return filledInConfig; 526 } 527 readRepNetConfig()528 public static ReplicationNetworkConfig readRepNetConfig() { 529 /* Call to force class loading and parameter registration */ 530 ReplicationNetworkConfig.registerParams(); 531 return ReplicationNetworkConfig.create(readNetProps()); 532 } 533 readNetProps()534 public static Properties readNetProps() { 535 final File propFile = 536 new File(SharedTestUtils.getTestDir(), "je.properties"); 537 final Properties props = new Properties(); 538 RepUtils.populateNetProps(props, propFile); 539 return props; 540 } 541 542 /** 543 * Set timeouts to long intervals for debugging interactively 544 */ setLongTimeouts(ReplicationConfig repConfig)545 public static void setLongTimeouts(ReplicationConfig repConfig) { 546 547 RepInternal.disableParameterValidation(repConfig); 548 549 /* Wait an hour for this node to join the group.*/ 550 repConfig.setConfigParam(RepParams.ENV_SETUP_TIMEOUT.getName(), 551 "1 h"); 552 repConfig.setConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, 553 "1 h"); 554 555 /* Wait an hour for replica acks. */ 556 repConfig.setConfigParam(ReplicationConfig.REPLICA_ACK_TIMEOUT, 557 "1 h"); 558 559 /* Have a heartbeat every five minutes. */ 560 repConfig.setConfigParam(RepParams.HEARTBEAT_INTERVAL.getName(), 561 "5 min"); 562 } 563 564 /** 565 * Shuts down the environments with a checkpoint at the end. 566 * 567 * @param repEnvInfo the environments to be shutdown 568 */ shutdownRepEnvs(RepEnvInfo[] repEnvInfo)569 public static void shutdownRepEnvs(RepEnvInfo[] repEnvInfo) { 570 571 shutdownRepEnvs(repEnvInfo, true); 572 } 573 574 /** 575 * Shut down the environment, with an optional checkpoint. It sequences the 576 * shutdown so that all replicas are shutdown before the master. This 577 * sequencing avoids side-effects in the tests where shutting down the 578 * master first results in elections and one of the "to be shutdown" 579 * replicas becomes a master and so on. 580 * <p> 581 * If an exception occurs for any reason while closing one env, other envs 582 * may be left open. TODO: Determine if this behavior is really desired. 583 * 584 * @param repEnvInfo the environments to be shutdown 585 * 586 * @param doCheckpoint whether do a checkpoint at the end of close 587 */ shutdownRepEnvs(RepEnvInfo[] repEnvInfo, boolean doCheckpoint)588 public static void shutdownRepEnvs(RepEnvInfo[] repEnvInfo, 589 boolean doCheckpoint) { 590 591 if (repEnvInfo == null) { 592 return; 593 } 594 595 RepEnvInfo master = null; 596 for (RepEnvInfo ri : repEnvInfo) { 597 if ((ri.repEnv == null) || RepInternal.isClosed(ri.repEnv)) { 598 ri.repEnv = null; 599 continue; 600 } 601 if (ri.repEnv.getState().isMaster()) { 602 if (master != null) { 603 throw new IllegalStateException 604 ("Multiple masters: " + master.getEnv().getNodeName() + 605 " and " + ri.repEnv.getNodeName() + 606 " are both masters."); 607 } 608 master = ri; 609 } else { 610 try { 611 if (doCheckpoint) { 612 RepImpl repImpl = RepInternal.getRepImpl(ri.repEnv); 613 ri.repEnv.close(); 614 if (!repImpl.isClosed()) { 615 throw new IllegalStateException 616 ("Environment: " + ri.getEnvHome() + 617 " not released"); 618 } 619 } else { 620 RepInternal.getRepImpl(ri.repEnv).close(false); 621 } 622 } finally { 623 ri.repEnv = null; 624 } 625 } 626 } 627 628 if (master != null) { 629 try { 630 if (doCheckpoint) { 631 master.getEnv().close(); 632 } else { 633 RepInternal.getRepImpl(master.getEnv()).close(false); 634 } 635 } finally { 636 master.repEnv = null; 637 } 638 } 639 } 640 641 /** 642 * All the non-closed, non-null environments in the array join the group. 643 * @return the replicator who is the master. 644 */ 645 public static ReplicatedEnvironment openRepEnvsJoin(RepEnvInfo[] repEnvInfo)646 openRepEnvsJoin(RepEnvInfo[] repEnvInfo) { 647 648 return joinGroup(getOpenRepEnvs(repEnvInfo)); 649 } 650 651 /* Get open replicated environments from an array. */ getOpenRepEnvs(RepEnvInfo[] repEnvInfo)652 public static RepEnvInfo[] getOpenRepEnvs(RepEnvInfo[] repEnvInfo) { 653 Set<RepEnvInfo> repSet = new HashSet<RepEnvInfo>(); 654 for (RepEnvInfo ri : repEnvInfo) { 655 if ((ri != null) && 656 (ri.getEnv() != null) && 657 !RepInternal.isClosed(ri.getEnv())) { 658 repSet.add(ri); 659 } 660 } 661 662 return repSet.toArray(new RepEnvInfo[repSet.size()]); 663 } 664 665 /** 666 * Environment handles are created using the config information in 667 * repEnvInfo. Note that since this method causes handles to be created 668 * serially, it cannot be used to restart an existing group from scratch. 669 * It can only be used to start a new group, or have nodes join a group 670 * that is already active. 671 * 672 * @return the replicated environment associated with the master. 673 */ 674 public static joinGroup(RepEnvInfo .... repEnvInfo)675 ReplicatedEnvironment joinGroup(RepEnvInfo ... repEnvInfo) { 676 677 int retries = 10; 678 final int retryWaitMillis = 5000; 679 ReplicatedEnvironment master = null; 680 List<RepEnvInfo> joinNotFinished = 681 new LinkedList<RepEnvInfo>(Arrays.asList(repEnvInfo)); 682 683 while (joinNotFinished.size() != 0) { 684 for (RepEnvInfo ri : joinNotFinished) { 685 try { 686 ReplicatedEnvironment.State joinState; 687 if (ri.getEnv() != null) { 688 689 /* 690 * Handle exists, make sure it's not in UNKNOWN state. 691 */ 692 RepImpl rimpl = RepInternal.getRepImpl(ri.getEnv()); 693 joinState = rimpl.getState(); 694 Assert.assertFalse( 695 "Node " + ri.getEnv().getNodeName() + 696 " was detached", 697 joinState.equals(State.DETACHED)); 698 } else { 699 joinState = ri.openEnv().getState(); 700 } 701 702 if (joinState.equals(State.MASTER)) { 703 if (master != null) { 704 if (--retries > 0) { 705 Thread.sleep(retryWaitMillis); 706 707 /* 708 * Start over. The master is slow making its 709 * transition, one of them has not realized 710 * that they are no longer the master. 711 */ 712 joinNotFinished = new LinkedList<RepEnvInfo> 713 (Arrays.asList(repEnvInfo)); 714 master = null; 715 break; 716 } 717 throw new RuntimeException 718 ("Dual masters: " + ri.getEnv().getNodeName() + 719 " and " + 720 master.getNodeName() + " despite retries"); 721 } 722 master = ri.getEnv(); 723 } 724 joinNotFinished.remove(ri); 725 if ((joinNotFinished.size() == 0) && (master == null)) { 726 if (--retries == 0) { 727 throw new RuntimeException 728 ("No master established despite retries"); 729 } 730 Thread.sleep(retryWaitMillis); 731 /* Start over, an election is still in progress. */ 732 joinNotFinished = new LinkedList<RepEnvInfo> 733 (Arrays.asList(repEnvInfo)); 734 } 735 break; 736 } catch (UnknownMasterException retry) { 737 /* Just retry. */ 738 } catch (InterruptedException e) { 739 throw new RuntimeException(e); 740 } 741 } 742 } 743 return master; 744 } 745 746 /** 747 * Used to ensure that the entire group is in sync, that is, all replicas 748 * are consistent with the master's last commit. Note that it requires all 749 * the nodes in the replication group to be available. 750 * 751 * @param repEnvInfo the array holding the environments 752 * @param numSyncNodes the expected number of nodes to be synced; includes 753 * the master 754 * @throws InterruptedException 755 */ syncGroupToLastCommit(RepEnvInfo[] repEnvInfo, int numSyncNodes)756 public static VLSN syncGroupToLastCommit(RepEnvInfo[] repEnvInfo, 757 int numSyncNodes) 758 throws InterruptedException { 759 760 CommitToken masterCommitToken = null; 761 762 /* 763 * Create a transaction just to make sure all the replicas are awake 764 * and connected. 765 */ 766 for (RepEnvInfo repi : repEnvInfo) { 767 ReplicatedEnvironment rep = repi.getEnv(); 768 if (rep.getState().isMaster()) { 769 try { 770 Transaction txn= 771 rep. 772 beginTransaction(null, RepTestUtils.SYNC_SYNC_ALL_TC); 773 txn.commit(); 774 } catch (InsufficientReplicasException e) { 775 if (e.getAvailableReplicas().size() != (numSyncNodes-1)) { 776 throw new IllegalStateException 777 ("Expected replicas: " + (numSyncNodes - 1) + 778 "available replicas: " + 779 e.getAvailableReplicas()); 780 } 781 } 782 783 /* 784 * Handshakes with all replicas are now completed, if they were 785 * not before. Now get a token to represent the last committed 786 * point in the replication stream, from the master's 787 * perspective. 788 */ 789 RepNode repNode = RepInternal.getRepImpl(rep).getRepNode(); 790 masterCommitToken = new CommitToken 791 (repNode.getUUID(), 792 repNode.getCurrentTxnEndVLSN().getSequence()); 793 break; 794 } 795 } 796 797 if (masterCommitToken == null) { 798 throw new IllegalStateException("No current master"); 799 } 800 801 CommitPointConsistencyPolicy policy = 802 new CommitPointConsistencyPolicy(masterCommitToken, MINUTE_MS, 803 TimeUnit.MILLISECONDS); 804 805 /* 806 * Check that the environments are caught up with the last master 807 * commit at the time of the call to this method. 808 */ 809 for (RepEnvInfo repi : repEnvInfo) { 810 ReplicatedEnvironment rep = repi.getEnv(); 811 if ((rep == null) || 812 RepInternal.isClosed(rep) || 813 rep.getState().isMaster() || 814 rep.getState().isDetached()) { 815 continue; 816 } 817 policy.ensureConsistency(RepInternal.getRepImpl(rep)); 818 } 819 return new VLSN(masterCommitToken.getVLSN()); 820 } 821 822 /** 823 * Used to ensure that the group is in sync with respect to a given 824 * VLSN. If numSyncNodes == repEnvInfo.length, all the nodes in the 825 * replication group must be alive and available. If numSyncNodes is less 826 * than the size of the group, a quorum will need to be alive and 827 * available. 828 * 829 * @param repEnvInfo the array holding the environments 830 * @param numSyncNodes the expected number of nodes to be synced; includes 831 * the master 832 * @throws InterruptedException 833 */ syncGroupToVLSN(RepEnvInfo[] repEnvInfo, int numSyncNodes, VLSN targetVLSN)834 public static void syncGroupToVLSN(RepEnvInfo[] repEnvInfo, 835 int numSyncNodes, 836 VLSN targetVLSN) 837 throws InterruptedException { 838 839 /* 840 * Create a transaction just to make sure all the replicas are awake 841 * and connected. 842 */ 843 for (RepEnvInfo repi : repEnvInfo) { 844 ReplicatedEnvironment rep = repi.getEnv(); 845 if (rep == null) { 846 continue; 847 } 848 849 if (rep.getState().isMaster()) { 850 TransactionConfig txnConfig = null; 851 if (numSyncNodes == repEnvInfo.length) { 852 txnConfig = RepTestUtils.SYNC_SYNC_ALL_TC; 853 } else { 854 txnConfig = new TransactionConfig(); 855 txnConfig.setDurability 856 (new Durability(SyncPolicy.SYNC, 857 SyncPolicy.SYNC, 858 ReplicaAckPolicy.SIMPLE_MAJORITY)); 859 } 860 861 try { 862 Transaction txn = rep.beginTransaction(null, txnConfig); 863 txn.commit(); 864 } catch (InsufficientReplicasException e) { 865 if (e.getAvailableReplicas().size() != 866 (numSyncNodes - 1)) { 867 throw new IllegalStateException 868 ("Expected replicas: " + (numSyncNodes - 1) + 869 ", available replicas: " + 870 e.getAvailableReplicas()); 871 } 872 } 873 } 874 } 875 876 syncGroup(repEnvInfo, targetVLSN); 877 } 878 879 /* Syncs the group to the specific VLSN. */ syncGroup(RepEnvInfo[] repEnvInfo, VLSN targetVLSN)880 private static void syncGroup(RepEnvInfo[] repEnvInfo, VLSN targetVLSN) 881 throws InterruptedException { 882 PointConsistencyPolicy policy = new PointConsistencyPolicy(targetVLSN); 883 884 /* Check that the environments are caught up with this VLSN. */ 885 for (RepEnvInfo repi : repEnvInfo) { 886 ReplicatedEnvironment rep = repi.getEnv(); 887 if (rep == null || 888 RepInternal.isClosed(rep) || 889 rep.getState().isMaster()) { 890 continue; 891 } 892 policy.ensureConsistency(RepInternal.getRepImpl(rep)); 893 } 894 } 895 896 /** 897 * Synchronizes the group to the current vlsn on the master. Used to ensure 898 * that application level changes, even mid-transaction changes have been 899 * replicated to all the nodes before the method returns. 900 * 901 * Note that since CBVLSN updates are asynchronous the vlsn may continue 902 * moving forward, but the application level changes will have been 903 * propagated. 904 */ syncGroup(RepEnvInfo[] repEnvInfo)905 public static VLSN syncGroup(RepEnvInfo[] repEnvInfo) { 906 RepEnvInfo master = RepTestBase.findMaster(repEnvInfo); 907 if (master == null) { 908 throw new IllegalStateException("no master"); 909 } 910 VLSN vlsn = master.getRepImpl().getVLSNIndex().getRange().getLast(); 911 try { 912 syncGroup(repEnvInfo, vlsn); 913 } catch (Exception e) { 914 throw new IllegalStateException("unexpected exception"); 915 } 916 return vlsn; 917 } 918 checkUtilizationProfile(RepEnvInfo .... repEnvInfo)919 public static void checkUtilizationProfile(RepEnvInfo ... repEnvInfo) { 920 checkUtilizationProfile(null, repEnvInfo); 921 } 922 923 /** 924 * Run utilization profile checking on all databases in the set of 925 * RepEnvInfo. The environment must be quiescent. The utility will lock 926 * out any cleaning by using DbBackup, during the check. 927 */ checkUtilizationProfile(PrintStream out, RepEnvInfo ... repEnvInfo)928 public static void checkUtilizationProfile(PrintStream out, 929 RepEnvInfo ... repEnvInfo) { 930 for (RepEnvInfo info : repEnvInfo) { 931 if (out != null) { 932 out.println("checking " + info.getEnvHome()); 933 } 934 935 Environment env = info.getEnv(); 936 937 /* Use DbBackup to prevent log file deletion. */ 938 DbBackup backup = new DbBackup(env); 939 backup.startBackup(); 940 941 try { 942 List<String> dbNames = env.getDatabaseNames(); 943 944 for (String dbName : dbNames) { 945 if (out != null) { 946 out.println("\tchecking " + dbName); 947 } 948 DatabaseConfig dbConfig = new DatabaseConfig(); 949 DbInternal.setUseExistingConfig(dbConfig, true); 950 dbConfig.setTransactional(true); 951 Database db = env.openDatabase(null, dbName, dbConfig); 952 953 try { 954 VerifyUtils.checkLsns(db); 955 } finally { 956 db.close(); 957 } 958 } 959 } finally { 960 backup.endBackup(); 961 } 962 } 963 } 964 965 /** 966 * Confirm that all the nodes in this group match. Check number of 967 * databases, names of databases, per-database count, per-database 968 * records. Use the master node as the reference if it exists, else use the 969 * first replicator. 970 * 971 * @param limit The replication stream portion of the equality check is 972 * bounded at the upper end by this value. Limit is usually the commit sync 973 * or vlsn sync point explicitly called by a test before calling 974 * checkNodeEquality. Each node must contain VLSNs up to and including the 975 * limit, and may also include additional VSLNs due to heartbeats, etc. 976 * 977 * @throws InterruptedException 978 * 979 * @throws RuntimeException if there is an incompatibility 980 */ checkNodeEquality(VLSN limit, boolean verbose, RepEnvInfo ... repEnvInfo)981 public static void checkNodeEquality(VLSN limit, 982 boolean verbose, 983 RepEnvInfo ... repEnvInfo) 984 throws InterruptedException { 985 986 int referenceIndex = -1; 987 assert repEnvInfo.length > 0; 988 for (int i = 0; i < repEnvInfo.length; i++) { 989 if ((repEnvInfo[i] == null) || 990 (repEnvInfo[i].getEnv() == null)) { 991 continue; 992 } 993 ReplicatedEnvironment repEnv = repEnvInfo[i].getEnv(); 994 if (!RepInternal.isClosed(repEnv) && repEnv.getState().isMaster()) { 995 referenceIndex = i; 996 break; 997 } 998 } 999 assert referenceIndex != -1; 1000 1001 ReplicatedEnvironment reference = repEnvInfo[referenceIndex].getEnv(); 1002 for (int i = 0; i < repEnvInfo.length; i++) { 1003 if (i != referenceIndex) { 1004 if ((repEnvInfo[i] == null) || 1005 (repEnvInfo[i].getEnv() == null)) { 1006 continue; 1007 } 1008 1009 ReplicatedEnvironment repEnv = repEnvInfo[i].getEnv(); 1010 if (verbose) { 1011 System.out.println("Comparing master node " + 1012 reference.getNodeName() + 1013 " to node " + 1014 repEnv.getNodeName()); 1015 } 1016 1017 if (!RepInternal.isClosed(repEnv)) { 1018 checkNodeEquality(reference, repEnv, limit, verbose); 1019 } 1020 } 1021 } 1022 } 1023 1024 /* Enable or disable the log cleaning on a replica. */ runOrPauseCleaners(ReplicatedEnvironment repEnv, boolean isPaused)1025 private static void runOrPauseCleaners(ReplicatedEnvironment repEnv, 1026 boolean isPaused) 1027 throws InterruptedException { 1028 1029 if (!RepInternal.isClosed(repEnv)) { 1030 RepImpl repImpl = RepInternal.getRepImpl(repEnv); 1031 if (isPaused) { 1032 repImpl.getCleaner().addProtectedFileRange(0L); 1033 } else { 1034 repImpl.getCleaner().removeProtectedFileRange(0L); 1035 } 1036 Thread.sleep(100); 1037 } 1038 } 1039 1040 /** 1041 * Confirm that the contents of these two nodes match. Check number of 1042 * databases, names of databases, per-database count, per-database records. 1043 * 1044 * @throws InterruptedException 1045 * @throws RuntimeException if there is an incompatiblity 1046 */ checkNodeEquality(ReplicatedEnvironment replicatorA, ReplicatedEnvironment replicatorB, VLSN limit, boolean verbose)1047 public static void checkNodeEquality(ReplicatedEnvironment replicatorA, 1048 ReplicatedEnvironment replicatorB, 1049 VLSN limit, 1050 boolean verbose) 1051 throws InterruptedException { 1052 1053 runOrPauseCleaners(replicatorA, true); 1054 runOrPauseCleaners(replicatorB, true); 1055 1056 String nodeA = replicatorA.getNodeName(); 1057 String nodeB = replicatorB.getNodeName(); 1058 1059 Environment envA = replicatorA; 1060 Environment envB = replicatorB; 1061 1062 RepImpl repImplA = RepInternal.getRepImpl(replicatorA); 1063 RepImpl repImplB = RepInternal.getRepImpl(replicatorB); 1064 1065 try { 1066 1067 /* Compare the replication related sequences. */ 1068 if (verbose) { 1069 System.out.println("Comparing sequences"); 1070 } 1071 1072 /* replicated node id sequence. */ 1073 /* 1074 long nodeIdA = 1075 envImplA.getNodeSequence().getLastReplicatedNodeId(); 1076 long nodeIdB = 1077 envImplB.getNodeSequence().getLastReplicatedNodeId(); 1078 1079 // TEMPORARILY DISABLED: sequences not synced up. This may 1080 // actually apply right now to database and txn ids too, 1081 // but it's less likely to manifest itself. 1082 if (nodeIdA != nodeIdB) { 1083 throw new RuntimeException 1084 ("NodeId mismatch. " + nodeA + 1085 " lastRepNodeId=" + nodeIdA + " " + nodeB + 1086 " lastRepNodeId=" + nodeIdB); 1087 } 1088 */ 1089 1090 /* replicated txn id sequence. */ 1091 /* 1092 long txnIdA = repImplA.getTxnManager().getLastReplicatedTxnId(); 1093 long txnIdB = repmplB.getTxnManager().getLastReplicatedTxnId(); 1094 if (txnIdA != txnIdB) { 1095 throw new RuntimeException 1096 ("TxnId mismatch. A.lastRepTxnId=" + txnIdA + 1097 " B.lastRepTxnId=" + txnIdB); 1098 } 1099 */ 1100 1101 /* Replicated database id sequence. */ 1102 long dbIdA = repImplA.getDbTree().getLastReplicatedDbId(); 1103 long dbIdB = repImplB.getDbTree().getLastReplicatedDbId(); 1104 if (dbIdA != dbIdB) { 1105 throw new RuntimeException 1106 ("DbId mismatch. A.lastRepDbId=" + dbIdA + 1107 " B.lastRepDbId=" + dbIdB); 1108 } 1109 1110 /* Check name and number of application databases first. */ 1111 List<String> dbListA = envA.getDatabaseNames(); 1112 List<String> dbListB = envB.getDatabaseNames(); 1113 1114 if (verbose) { 1115 System.out.println("envEquals: check db list: " + nodeA + 1116 "=" + dbListA + " " + nodeB + "=" + 1117 dbListB); 1118 } 1119 1120 if (!dbListA.equals(dbListB)) { 1121 throw new RuntimeException("Mismatch: dbNameList " + nodeA + 1122 " =" + dbListA + " " + 1123 nodeB + " =" + dbListB); 1124 } 1125 1126 /* Check record count and contents of each database. */ 1127 DatabaseConfig checkConfig = new DatabaseConfig(); 1128 checkConfig.setReadOnly(true); 1129 checkConfig.setTransactional(true); 1130 DbInternal.setUseExistingConfig(checkConfig, true); 1131 for (String dbName : dbListA) { 1132 1133 Database dbA = null; 1134 Database dbB = null; 1135 try { 1136 dbA = envA.openDatabase(null, dbName, checkConfig); 1137 dbB = envB.openDatabase(null, dbName, checkConfig); 1138 1139 int count = checkDbContents(dbA, dbB); 1140 1141 if (verbose) { 1142 System.out.println("compared " + count + " records"); 1143 } 1144 } finally { 1145 if (dbA != null) { 1146 dbA.close(); 1147 } 1148 if (dbB != null) { 1149 dbB.close(); 1150 } 1151 } 1152 } 1153 1154 /* 1155 * Check the replication stream of each environment. The subset of 1156 * VLSN entries common to both nodes should match. 1157 */ 1158 checkStreamIntersection(nodeA, 1159 nodeB, 1160 RepInternal.getRepImpl(replicatorA), 1161 RepInternal.getRepImpl(replicatorB), 1162 limit, 1163 verbose); 1164 } catch (DatabaseException e) { 1165 throw new RuntimeException(e); 1166 } catch (IOException e) { 1167 throw new RuntimeException(e); 1168 } 1169 1170 runOrPauseCleaners(replicatorA, false); 1171 runOrPauseCleaners(replicatorB, false); 1172 } 1173 1174 /** 1175 * @throws RuntimeException if dbA and dbB don't have the same contents. 1176 */ checkDbContents(Database dbA, Database dbB)1177 private static int checkDbContents(Database dbA, Database dbB) { 1178 1179 Cursor cursorA = null; 1180 Cursor cursorB = null; 1181 Transaction txnA = null; 1182 Transaction txnB = null; 1183 int debugCount = 0; 1184 boolean isGroupDB = 1185 dbA.getDatabaseName().equals(DbType.REP_GROUP.getInternalName()); 1186 1187 try { 1188 txnA = dbA.getEnvironment().beginTransaction(null, null); 1189 txnB = dbB.getEnvironment().beginTransaction(null, null); 1190 cursorA = dbA.openCursor(txnA, CursorConfig.READ_UNCOMMITTED); 1191 cursorB = dbB.openCursor(txnB, CursorConfig.READ_UNCOMMITTED); 1192 DatabaseEntry keyA = new DatabaseEntry(); 1193 DatabaseEntry keyB = new DatabaseEntry(); 1194 DatabaseEntry dataA = new DatabaseEntry(); 1195 DatabaseEntry dataB = new DatabaseEntry(); 1196 NodeBinding nodeBinding = null; 1197 while (cursorA.getNext(keyA, dataA, LockMode.DEFAULT) == 1198 OperationStatus.SUCCESS) { 1199 debugCount++; 1200 OperationStatus statusB = cursorB.getNext(keyB, dataB, 1201 LockMode.DEFAULT); 1202 if (statusB != OperationStatus.SUCCESS) { 1203 throw new RuntimeException("Mismatch: debugCount=" + 1204 debugCount + "bad statusB = " + 1205 statusB); 1206 } 1207 if (!Arrays.equals(keyA.getData(), keyB.getData())) { 1208 throw new RuntimeException("Mismatch: debugCount=" + 1209 debugCount + " keyA=" + 1210 keyA.getData() + " keyB=" + 1211 keyB.getData()); 1212 1213 } 1214 if (!Arrays.equals(dataA.getData(), dataB.getData())) { 1215 if (isGroupDB && 1216 equalsNode(dataA.getData(), dataB.getData(), 1217 nodeBinding)) { 1218 continue; 1219 } 1220 throw new RuntimeException("Mismatch: debugCount=" + 1221 debugCount + " dataA=" + 1222 dataA.getData() + " dataB=" + 1223 dataB.getData()); 1224 } 1225 if (isGroupDB && 1226 (nodeBinding == null) && 1227 RepGroupDB.GROUP_KEY.equals( 1228 StringBinding.entryToString(keyA))) { 1229 final RepGroupImpl group = 1230 new GroupBinding().entryToObject(dataA); 1231 nodeBinding = new NodeBinding(group.getFormatVersion()); 1232 } 1233 } 1234 if (cursorB.getNext(keyB, dataB, LockMode.DEFAULT) == 1235 OperationStatus.SUCCESS) { 1236 throw new RuntimeException("Mismatch: debugCount=" + 1237 debugCount + " keyA is missing" + 1238 " keyB=" + keyB.getData() + 1239 " dataB=" + dataB.getData()); 1240 } 1241 return debugCount; 1242 } catch (DatabaseException e) { 1243 throw new RuntimeException(e); 1244 } finally { 1245 try { 1246 if (cursorA != null) { 1247 cursorA.close(); 1248 } 1249 if (cursorB != null) { 1250 cursorB.close(); 1251 } 1252 if (txnA != null) { 1253 txnA.commit(); 1254 } 1255 if (txnB != null) { 1256 txnB.commit(); 1257 } 1258 } catch (DatabaseException e) { 1259 throw new RuntimeException(e); 1260 } 1261 } 1262 } 1263 1264 /* 1265 * Implements a special check for group nodes which skips the syncup field. 1266 */ equalsNode(byte[] data1, byte[] data2, NodeBinding nodeBinding)1267 private static boolean equalsNode(byte[] data1, byte[] data2, 1268 NodeBinding nodeBinding) { 1269 Assert.assertNotNull("Node binding", nodeBinding); 1270 RepNodeImpl n1 = nodeBinding.entryToObject(new TupleInput(data1)); 1271 RepNodeImpl n2 = nodeBinding.entryToObject(new TupleInput(data2)); 1272 return n1.equivalent(n2); 1273 } 1274 1275 /** 1276 * @throws InterruptedException 1277 * @throws IOException 1278 * @throws RuntimeException if envA and envB don't have the same set of 1279 * VLSN mappings, VLSN-tagged log entries, and replication sequences. 1280 */ 1281 @SuppressWarnings("unused") checkStreamIntersection(String nodeA, String nodeB, RepImpl repA, RepImpl repB, VLSN limit, boolean verbose)1282 private static void checkStreamIntersection(String nodeA, 1283 String nodeB, 1284 RepImpl repA, 1285 RepImpl repB, 1286 VLSN limit, 1287 boolean verbose) 1288 throws IOException, InterruptedException { 1289 1290 if (verbose) { 1291 System.out.println("Check intersection for " + nodeA + 1292 " and " + nodeB); 1293 } 1294 1295 VLSNIndex repAMap = repA.getVLSNIndex(); 1296 VLSNRange repARange = repAMap.getRange(); 1297 VLSNIndex repBMap = repB.getVLSNIndex(); 1298 VLSNRange repBRange = repBMap.getRange(); 1299 1300 /* 1301 * Compare the vlsn ranges held on each environment and find the subset 1302 * common to both replicas. 1303 */ 1304 VLSN firstA = repARange.getFirst(); 1305 VLSN lastA = repARange.getLast(); 1306 VLSN firstB = repBRange.getFirst(); 1307 VLSN lastB = repBRange.getLast(); 1308 VLSN lastSyncA = repARange.getLastSync(); 1309 1310 if (lastA.compareTo(limit) < 0) { 1311 throw new RuntimeException 1312 ("CheckRepStream error: repA (" + repA.getNameIdPair() + 1313 ") lastVLSN = " + lastA + 1314 " < limit = " + limit); 1315 } 1316 1317 if (lastB.compareTo(limit) < 0) { 1318 throw new RuntimeException 1319 ("CheckRepStream error: repB (" + repB.getNameIdPair() + 1320 ") lastVLSN = " + lastB + 1321 " < limit = " + limit + ")"); 1322 } 1323 1324 /* 1325 * Calculate the largest VLSN range starting point and the smallest 1326 * VLSN range ending point for these two Replicators. 1327 */ 1328 VLSN firstLarger = (firstA.compareTo(firstB) > 0) ? firstA : firstB; 1329 VLSN lastSmaller = (lastA.compareTo(lastB) < 0) ? lastA : lastB; 1330 1331 try { 1332 /* The two replicas can read from the larger of the first VLSNs. */ 1333 FeederReader readerA = new FeederReader(repA, 1334 repAMap, 1335 DbLsn.NULL_LSN, 1336 100000, 1337 repA.getNameIdPair()); 1338 readerA.initScan(firstLarger); 1339 1340 FeederReader readerB = new FeederReader(repB, 1341 repBMap, 1342 DbLsn.NULL_LSN, 1343 100000, 1344 repB.getNameIdPair()); 1345 readerB.initScan(firstLarger); 1346 1347 /* They should both find the smaller of the last VLSNs. */ 1348 for (long vlsnVal = firstLarger.getSequence(); 1349 vlsnVal <= lastSmaller.getSequence(); 1350 vlsnVal++) { 1351 1352 OutputWireRecord wireRecordA = 1353 readerA.scanForwards(new VLSN(vlsnVal), 0); 1354 OutputWireRecord wireRecordB = 1355 readerB.scanForwards(new VLSN(vlsnVal), 0); 1356 1357 if (!(wireRecordA.match(wireRecordB))) { 1358 throw new RuntimeException(nodeA + " at vlsn " + vlsnVal + 1359 " has " + wireRecordA + " " + 1360 nodeB + " has " + wireRecordB); 1361 } 1362 1363 /* Check that db id, node id, txn id are negative. */ 1364 if (!repA.isRepConverted()) { 1365 wireRecordA.verifyNegativeSequences(nodeA); 1366 } 1367 if (!repB.isRepConverted()) { 1368 wireRecordB.verifyNegativeSequences(nodeB); 1369 } 1370 } 1371 1372 if (verbose) { 1373 System.out.println("Checked from vlsn " + firstLarger + 1374 " to " + lastSmaller); 1375 } 1376 } catch (Exception e) { 1377 e.printStackTrace(); 1378 1379 System.err.println(nodeA + " vlsnMap="); 1380 repAMap.dumpDb(true); 1381 System.err.println(nodeB + " vlsnMap="); 1382 repBMap.dumpDb(true); 1383 1384 throw new RuntimeException(e); 1385 } 1386 } 1387 1388 /** 1389 * Return the number of nodes that constitute a quorum for this size 1390 * group. This should be replaced by ReplicaAckPolicy.requiredNodes; 1391 */ getQuorumSize(int groupSize)1392 public static int getQuorumSize(int groupSize) { 1393 assert groupSize > 0 : "groupSize = " + groupSize; 1394 if (groupSize == 1) { 1395 return 1; 1396 } else if (groupSize == 2) { 1397 return 1; 1398 } else { 1399 return (groupSize/2) + 1; 1400 } 1401 } 1402 1403 /** 1404 * Create a rep group of a specified size on the local host, using the 1405 * default port configuration. 1406 * 1407 * @param electableNodes number of electable nodes in test group 1408 * @param monitorNodes number of monitor nodes in test group 1409 * 1410 * @return the simulated test RepGroup 1411 * 1412 * @throws UnknownHostException 1413 */ createTestRepGroup(int electableNodes, int monitorNodes)1414 public static RepGroupImpl createTestRepGroup(int electableNodes, 1415 int monitorNodes) 1416 throws UnknownHostException { 1417 1418 return createTestRepGroup(electableNodes, monitorNodes, 0); 1419 } 1420 1421 /** 1422 * Create a rep group of a specified size on the local host, using the 1423 * default port configuration. 1424 * 1425 * @param electableNodes number of electable nodes in test group 1426 * @param monitorNodes number of monitor nodes in test group 1427 * @param secondaryNodes number of secondary nodes in the test group 1428 * 1429 * @return the simulated test RepGroup 1430 * 1431 * @throws UnknownHostException 1432 */ createTestRepGroup(int electableNodes, int monitorNodes, int secondaryNodes)1433 public static RepGroupImpl createTestRepGroup(int electableNodes, 1434 int monitorNodes, 1435 int secondaryNodes) 1436 throws UnknownHostException { 1437 1438 Map<Integer, RepNodeImpl> allNodeInfo = 1439 new HashMap<Integer, RepNodeImpl>(); 1440 final InetAddress ia = InetAddress.getLocalHost(); 1441 int port = getDefaultPort(); 1442 RepGroupImpl repGroup = new RepGroupImpl("TestGroup", null); 1443 1444 for (int i=1; i <= electableNodes; i++) { 1445 allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("node"+i,i), 1446 NodeType.ELECTABLE, 1447 true, 1448 false, 1449 ia.getHostName(), 1450 port, 1451 repGroup.getChangeVersion(), 1452 null)); 1453 port++; 1454 } 1455 for (int i= (electableNodes+1); 1456 i <= (electableNodes+monitorNodes); 1457 i++) { 1458 allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("mon"+i,i), 1459 NodeType.MONITOR, 1460 true, 1461 false, 1462 ia.getHostName(), 1463 port, 1464 repGroup.getChangeVersion(), 1465 null)); 1466 port++; 1467 } 1468 for (int i = electableNodes + monitorNodes + 1; 1469 i <= electableNodes + monitorNodes + secondaryNodes; 1470 i++) { 1471 allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("sec" + i, i), 1472 NodeType.SECONDARY, 1473 true, 1474 false, 1475 ia.getHostName(), 1476 port, 1477 repGroup.getChangeVersion(), 1478 null)); 1479 port++; 1480 } 1481 repGroup.setNodes(allNodeInfo); 1482 return repGroup; 1483 } 1484 1485 public static class RepEnvInfo { 1486 private final File envHome; 1487 private final ReplicationConfig repConfig; 1488 private EnvironmentConfig envConfig; 1489 private QuorumPolicy initialElectionPolicy = 1490 QuorumPolicy.SIMPLE_MAJORITY; 1491 1492 private ReplicatedEnvironment repEnv = null; 1493 RepEnvInfo(File envHome, ReplicationConfig repConfig, EnvironmentConfig envConfig)1494 public RepEnvInfo(File envHome, 1495 ReplicationConfig repConfig, 1496 EnvironmentConfig envConfig) { 1497 super(); 1498 this.envHome = envHome; 1499 this.repConfig = repConfig; 1500 this.envConfig = envConfig; 1501 } 1502 openEnv()1503 public ReplicatedEnvironment openEnv() { 1504 if (repEnv != null) { 1505 throw new IllegalStateException("rep env already exists"); 1506 } 1507 1508 repEnv = new ReplicatedEnvironment(envHome, 1509 getRepConfig(), 1510 envConfig, 1511 null, 1512 initialElectionPolicy); 1513 return repEnv; 1514 } 1515 openEnv(ReplicaConsistencyPolicy cp)1516 public ReplicatedEnvironment openEnv(ReplicaConsistencyPolicy cp) { 1517 1518 if (repEnv != null) { 1519 throw new IllegalStateException("rep env already exists"); 1520 } 1521 repEnv = new ReplicatedEnvironment 1522 (envHome, getRepConfig(), envConfig, cp, 1523 initialElectionPolicy); 1524 return repEnv; 1525 } 1526 openEnv(RepEnvInfo helper)1527 public ReplicatedEnvironment openEnv(RepEnvInfo helper) { 1528 1529 repConfig.setHelperHosts((helper == null) ? 1530 repConfig.getNodeHostPort() : 1531 helper.getRepConfig().getNodeHostPort()); 1532 return openEnv(); 1533 } 1534 getEnv()1535 public ReplicatedEnvironment getEnv() { 1536 return repEnv; 1537 } 1538 getRepImpl()1539 public RepImpl getRepImpl() { 1540 return RepInternal.getRepImpl(repEnv); 1541 } 1542 getRepNode()1543 public RepNode getRepNode() { 1544 return getRepImpl().getRepNode(); 1545 } 1546 getRepConfig()1547 public ReplicationConfig getRepConfig() { 1548 return repConfig; 1549 } 1550 getEnvHome()1551 public File getEnvHome() { 1552 return envHome; 1553 } 1554 setEnvConfig(final EnvironmentConfig envConfig)1555 public void setEnvConfig(final EnvironmentConfig envConfig) { 1556 this.envConfig = envConfig; 1557 } 1558 getEnvConfig()1559 public EnvironmentConfig getEnvConfig() { 1560 return envConfig; 1561 } 1562 getInitialElectionPolicy()1563 public QuorumPolicy getInitialElectionPolicy() { 1564 return initialElectionPolicy; 1565 } 1566 setInitialElectionPolicy( final QuorumPolicy initialElectionPolicy)1567 public void setInitialElectionPolicy( 1568 final QuorumPolicy initialElectionPolicy) { 1569 1570 this.initialElectionPolicy = initialElectionPolicy; 1571 } 1572 closeEnv()1573 public void closeEnv() { 1574 try { 1575 if (repEnv != null) { 1576 repEnv.close(); 1577 } 1578 } finally { 1579 repEnv = null; 1580 } 1581 } 1582 1583 /** 1584 * Convenience method that guards against a NPE when checking whether 1585 * the state of a node is MASTER. 1586 */ isMaster()1587 public boolean isMaster() { 1588 return (repEnv != null) && repEnv.getState().isMaster(); 1589 } 1590 1591 /** 1592 * Convenience method that guards against a NPE when checking whether 1593 * the state of a node is REPLICA. 1594 */ isReplica()1595 public boolean isReplica() { 1596 return (repEnv != null) && repEnv.getState().isReplica(); 1597 } 1598 1599 /** 1600 * Convenience method that guards against a NPE when checking whether 1601 * the state of a node is UNKNOWN. 1602 */ isUnknown()1603 public boolean isUnknown() { 1604 return (repEnv != null) && repEnv.getState().isUnknown(); 1605 } 1606 1607 /** 1608 * Simulate a crash of the environment, don't do a graceful close. 1609 */ abnormalCloseEnv()1610 public void abnormalCloseEnv() { 1611 try { 1612 if (repEnv.isValid()) { 1613 1614 /* 1615 * Although we want an abnormal close, we do want to flush. 1616 * And if the env is valid, we expect it to work; so avoid 1617 * ignoring exceptions from this call. 1618 */ 1619 RepInternal.getRepImpl(repEnv).getLogManager(). 1620 flushNoSync(); 1621 } 1622 try { 1623 RepInternal.getRepImpl(repEnv).abnormalClose(); 1624 } catch (DatabaseException ignore) { 1625 1626 /* 1627 * The close will face problems like unclosed txns, ignore. 1628 * We're trying to simulate a crash. 1629 */ 1630 } 1631 } finally { 1632 repEnv = null; 1633 } 1634 } 1635 1636 @Override toString()1637 public String toString() { 1638 return (repEnv == null) ? 1639 envHome.toString() : repEnv.getNodeName(); 1640 } 1641 } 1642 stackTraceString(final Throwable exception)1643 public static String stackTraceString(final Throwable exception) { 1644 ByteArrayOutputStream bao = new ByteArrayOutputStream(); 1645 PrintStream printStream = new PrintStream(bao); 1646 exception.printStackTrace(printStream); 1647 String stackTraceString = bao.toString(); 1648 return stackTraceString; 1649 } 1650 1651 /** 1652 * Restarts a group associated with an existing environment on disk. 1653 * Returns the environment associated with the master. 1654 */ 1655 public static ReplicatedEnvironment restartGroup(RepEnvInfo .... repEnvInfo)1656 restartGroup(RepEnvInfo ... repEnvInfo) { 1657 1658 return restartGroup(false /*replicasOnly*/, false, repEnvInfo); 1659 } 1660 1661 public static ReplicatedEnvironment restartGroup(boolean allowILE, RepEnvInfo ... repEnvInfo)1662 restartGroup(boolean allowILE, RepEnvInfo ... repEnvInfo) { 1663 1664 return restartGroup(false, allowILE, repEnvInfo); 1665 } 1666 1667 /** 1668 * Restarts a group of replicas associated with an existing environment on 1669 * disk. 1670 */ restartReplicas(RepEnvInfo .... repEnvInfo)1671 public static void restartReplicas(RepEnvInfo ... repEnvInfo) { 1672 1673 restartGroup(true /*replicasOnly*/, false, repEnvInfo); 1674 } 1675 1676 /** 1677 * Restarts a group associated with an existing environment on disk. 1678 * Returns the environment associated with the master. 1679 */ 1680 private static ReplicatedEnvironment restartGroup(boolean replicasOnly, boolean allowILE, RepEnvInfo ... repEnvInfo)1681 restartGroup(boolean replicasOnly, 1682 boolean allowILE, 1683 RepEnvInfo ... repEnvInfo) { 1684 1685 /* To avoid the jdk bug: NullPointerException in Selector.open(). The 1686 * bug report can be found in 1687 * http://bugs.sun.com/view_bug.do?bug_id=6427854 1688 */ 1689 System.setProperty("sun.nio.ch.bugLevel", 1690 System.getProperty("sun.nio.ch.bugLevel","")); 1691 1692 /* Restart the group, a thread for each node. */ 1693 JoinThread threads[] = new JoinThread[repEnvInfo.length]; 1694 for (int i=0; i < repEnvInfo.length; i++) { 1695 threads[i]= new JoinThread(repEnvInfo[i], allowILE); 1696 threads[i].start(); 1697 } 1698 1699 /* 1700 * Wait for each thread to have joined the group. The group must be 1701 * re-started in parallel to ensure that all nodes are up and elections 1702 * can be held. 1703 */ 1704 for (int i=0; i < repEnvInfo.length; i++) { 1705 JoinThread jt = threads[i]; 1706 try { 1707 jt.join(JOIN_WAIT_TIME); 1708 } catch (InterruptedException e) { 1709 throw new RuntimeException(e); 1710 } 1711 final Throwable exception = jt.testException; 1712 if (exception != null) { 1713 throw new RuntimeException( 1714 "Join thread exception for " + repEnvInfo[i] + 1715 " still alive = " + jt.isAlive() + "\n" + 1716 RepTestUtils.stackTraceString(exception)); 1717 } 1718 if (jt.isAlive()) { 1719 throw new IllegalStateException( 1720 "Join thread for " + repEnvInfo[i] + 1721 " still alive after " + JOIN_WAIT_TIME + "ms," + 1722 " and testException is null."); 1723 } 1724 } 1725 1726 /* All join threads are quiescent, now pick the master. */ 1727 if (replicasOnly) { 1728 return null; 1729 } 1730 1731 return getMaster(repEnvInfo, false /*openIfNeeded*/); 1732 } 1733 1734 /** 1735 * Find the authoritative master (wait for election to quiesce). 1736 */ getMaster(RepEnvInfo[] repEnvInfo, boolean openIfNeeded)1737 public static ReplicatedEnvironment getMaster(RepEnvInfo[] repEnvInfo, 1738 boolean openIfNeeded) { 1739 1740 final int maxRetries = 100; 1741 int retries = maxRetries; 1742 while (true) { 1743 int masterId = -1; 1744 boolean multipleMasters = false; 1745 boolean nonAuthoritativeMaster = false; 1746 for (int i = 0; i < repEnvInfo.length; i++) { 1747 if (openIfNeeded && repEnvInfo[i].getEnv() == null) { 1748 final boolean VERBOSE = false; 1749 if (VERBOSE) { 1750 System.out.println("Opening node " + (i + 1)); 1751 } 1752 repEnvInfo[i].openEnv(); 1753 } 1754 if (repEnvInfo[i].getEnv().getState().isMaster()) { 1755 if (!repEnvInfo[i].getRepImpl().getRepNode(). 1756 isAuthoritativeMaster()) { 1757 nonAuthoritativeMaster = true; 1758 } 1759 if (masterId >= 0) { 1760 multipleMasters = true; 1761 } else { 1762 masterId = i; 1763 } 1764 } 1765 } 1766 if (masterId >= 0 && 1767 !multipleMasters && 1768 !nonAuthoritativeMaster) { 1769 return repEnvInfo[masterId].getEnv(); 1770 } 1771 if (--retries >= 0) { 1772 try { 1773 Thread.sleep(1000); 1774 } catch (InterruptedException e) { 1775 throw new RuntimeException(e); 1776 } 1777 continue; 1778 } 1779 if (nonAuthoritativeMaster) { 1780 throw new IllegalStateException( 1781 "Non-authoritative master after " + 1782 maxRetries + " retries."); 1783 } 1784 if (multipleMasters) { 1785 throw new IllegalStateException( 1786 "More than one master in group after " + 1787 maxRetries + " retries."); 1788 } 1789 if (masterId < 0) { 1790 throw new IllegalStateException 1791 ("Node id of the elected master is invalid."); 1792 } 1793 } 1794 } 1795 1796 /** 1797 * Threads used to simulate a parallel join group when multiple replication 1798 * nodes are first brought up for an existing environment. 1799 */ 1800 private static class JoinThread extends Thread { 1801 1802 final RepEnvInfo repEnvInfo; 1803 final boolean allowILE; 1804 1805 /* 1806 * Captures any exception encountered in the process of joining. The 1807 * presence of a non-null testException field indicates to the caller 1808 * that the join failed. 1809 */ 1810 volatile Throwable testException = null; 1811 private static final int NUM_RETRIES=100; 1812 1813 /* The state of the node at the time of joining the group. */ 1814 @SuppressWarnings("unused") 1815 ReplicatedEnvironment.State state = 1816 ReplicatedEnvironment.State.UNKNOWN; 1817 JoinThread(RepEnvInfo repEnvInfo, boolean allowILE)1818 JoinThread(RepEnvInfo repEnvInfo, boolean allowILE) { 1819 this.repEnvInfo = repEnvInfo; 1820 this.allowILE = allowILE; 1821 } 1822 1823 @Override run()1824 public void run() { 1825 /* 1826 * The open of this environment may fail due to timing mishaps if 1827 * the environment has just been shutdown, as can happen in a 1828 * number of tests that repeatedly open and close 1829 * environments. Retry a few time to give the node a chance to 1830 * settle down. 1831 */ 1832 int numRetries = 0; 1833 while (numRetries < NUM_RETRIES) { 1834 try { 1835 state = repEnvInfo.openEnv().getState(); 1836 testException = null; 1837 break; 1838 } catch (InsufficientLogException ile) { 1839 if (allowILE) { 1840 NetworkRestore restore = new NetworkRestore(); 1841 NetworkRestoreConfig nrc = new NetworkRestoreConfig(); 1842 nrc.setRetainLogFiles(false); 1843 restore.execute(ile, nrc); 1844 state = repEnvInfo.openEnv().getState(); 1845 testException = null; 1846 } else { 1847 testException = ile; 1848 } 1849 break; 1850 } catch (GroupShutdownException ge) { 1851 /* Retry, this node is still shutting down. */ 1852 numRetries++; 1853 testException = ge; 1854 try { 1855 Thread.sleep(100); 1856 } catch (InterruptedException ignore) { 1857 } 1858 } catch (Throwable e) { 1859 testException = e; 1860 break; 1861 } 1862 } 1863 } 1864 } 1865 1866 /** 1867 * Issue DbSync on a group. All nodes are presumed to be closed. 1868 * 1869 * @param timeoutMs is the DbSync timeout (max time for replica to catch up 1870 * with master) as well as the join timeout for each thread calling DbSync. 1871 */ syncupGroup(long timeoutMs, RepEnvInfo ... repEnvInfo)1872 public static void syncupGroup(long timeoutMs, RepEnvInfo ... repEnvInfo) { 1873 1874 /* 1875 * The call to DbSync blocks until the sync is done, so it must 1876 * be executed concurrently by a set of threads. 1877 */ 1878 SyncThread threads[] = new SyncThread[repEnvInfo.length]; 1879 String helperHost = repEnvInfo[0].getRepConfig().getNodeHostPort(); 1880 for (int i=0; i < repEnvInfo.length; i++) { 1881 threads[i]= new SyncThread(timeoutMs, repEnvInfo[i], helperHost); 1882 threads[i].start(); 1883 } 1884 1885 /* 1886 * Wait for each thread to open, sync, and close the node. 1887 */ 1888 for (int i=0; i < repEnvInfo.length; i++) { 1889 SyncThread t = threads[i]; 1890 try { 1891 t.join(timeoutMs); 1892 } catch (InterruptedException e) { 1893 throw new RuntimeException(e); 1894 } 1895 1896 if (t.isAlive()) { 1897 throw new IllegalStateException("Expect SyncThread " + i + 1898 " dead, but it's alive."); 1899 } 1900 final Throwable exception = t.testException; 1901 if (exception != null) { 1902 throw new RuntimeException 1903 ("Join thread exception.\n" + 1904 RepTestUtils.stackTraceString(exception)); 1905 } 1906 } 1907 } 1908 1909 /** 1910 * Threads used to simulate a parallel join group when multiple replication 1911 * nodes are first brought up for an existing environment. 1912 */ 1913 private static class SyncThread extends Thread { 1914 1915 final RepEnvInfo repEnvInfo; 1916 final String helperHost; 1917 final long timeoutMs; 1918 1919 /* Captures any exception encountered in the process of joining. */ 1920 Throwable testException = null; 1921 SyncThread(long timeoutMs, RepEnvInfo repEnvInfo, String helperHost)1922 SyncThread(long timeoutMs, RepEnvInfo repEnvInfo, String helperHost) { 1923 this.timeoutMs = timeoutMs; 1924 this.repEnvInfo = repEnvInfo; 1925 this.helperHost = helperHost; 1926 } 1927 1928 @Override run()1929 public void run() { 1930 try { 1931 ReplicationConfig config = repEnvInfo.getRepConfig(); 1932 DbSync syncAgent = 1933 new DbSync(repEnvInfo.getEnvHome().toString(), 1934 repEnvInfo.getEnvConfig(), 1935 config, 1936 helperHost, 1937 timeoutMs); 1938 syncAgent.sync(); 1939 } catch (Throwable e) { 1940 testException = e; 1941 } 1942 } 1943 } 1944 1945 /** 1946 * Disables network listening services, as a way of simulating a network partition 1947 * for testing. 1948 */ disableServices(final RepEnvInfo repEnvInfo)1949 public static void disableServices(final RepEnvInfo repEnvInfo) { 1950 final ServiceDispatcher sd1 = repEnvInfo.getRepNode().getServiceDispatcher(); 1951 sd1.setSimulateIOException(Learner.SERVICE_NAME, true); 1952 sd1.setSimulateIOException(Acceptor.SERVICE_NAME, true); 1953 sd1.setSimulateIOException(FeederManager.FEEDER_SERVICE, true); 1954 } 1955 1956 /** 1957 * Re-enables network services, to reverse the effect of a simulated 1958 * network partition. 1959 * @see #disableServices 1960 */ reenableServices(final RepEnvInfo repEnvInfo)1961 public static void reenableServices(final RepEnvInfo repEnvInfo) { 1962 final ServiceDispatcher sd1 = repEnvInfo.getRepNode().getServiceDispatcher(); 1963 sd1.setSimulateIOException(Learner.SERVICE_NAME, false); 1964 sd1.setSimulateIOException(Acceptor.SERVICE_NAME, false); 1965 sd1.setSimulateIOException(FeederManager.FEEDER_SERVICE, false); 1966 } 1967 awaitCondition(Callable<Boolean> predicate)1968 public static void awaitCondition(Callable<Boolean> predicate) 1969 throws Exception { 1970 1971 awaitCondition(predicate, 5000); 1972 } 1973 awaitCondition(Callable<Boolean> predicate, long timeout)1974 public static void awaitCondition(Callable<Boolean> predicate, 1975 long timeout) 1976 throws Exception { 1977 1978 boolean done = false; 1979 long deadline = System.currentTimeMillis() + timeout; 1980 while (System.currentTimeMillis() < deadline) { 1981 if (predicate.call()) { 1982 done = true; 1983 break; 1984 } 1985 Thread.sleep(100); 1986 } 1987 Assert.assertTrue(done); 1988 } 1989 1990 /** 1991 * Used for testing to force consistency checks to fail. 1992 */ 1993 public static class AlwaysFail implements ReplicaConsistencyPolicy { 1994 1995 public static final String NAME = "AlwaysFailConsistency"; 1996 AlwaysFail()1997 public AlwaysFail() { 1998 } 1999 2000 @Override ensureConsistency(EnvironmentImpl repInstance)2001 public void ensureConsistency(EnvironmentImpl repInstance) 2002 throws InterruptedException { 2003 2004 throw new ReplicaConsistencyException("Always fails for testing", 2005 this); 2006 } 2007 2008 /** 2009 * Always returns 0, no timeout is needed for this policy. 2010 */ 2011 @Override getTimeout(TimeUnit unit)2012 public long getTimeout(TimeUnit unit) { 2013 return 1; 2014 } 2015 2016 @Override getName()2017 public String getName() { 2018 return NAME; 2019 } 2020 } 2021 2022 /** 2023 * Set the basic SSL properties. These rely on the build.xml configuration 2024 * that copies keystore and truststore files to the test environment. 2025 */ setUnitTestSSLProperties(Properties props)2026 public static void setUnitTestSSLProperties(Properties props) { 2027 File destDir = SharedTestUtils.getDestDir(); 2028 String sslPath = new File(destDir.getPath(), "ssl").getPath(); 2029 2030 props.put("je.rep.channelType", "ssl"); 2031 props.put("je.rep.ssl.keyStoreFile", 2032 new File(sslPath, "keys.store").getPath()); 2033 props.put("je.rep.ssl.keyStorePassword", "unittest"); 2034 props.put("je.rep.ssl.trustStoreFile", 2035 new File(sslPath, "trust.store").getPath()); 2036 props.put("je.rep.ssl.clientKeyAlias", "mykey"); 2037 props.put("je.rep.ssl.serverKeyAlias", "mykey"); 2038 } 2039 2040 /** 2041 * Used for testing to force consistency checks to fail. Register the 2042 * format at the beginning of the test as follows: 2043 * 2044 * // Register custom consistency policy format while quiescent. 2045 * RepUtils.addConsistencyPolicyFormat 2046 * (RepTestUtils.AlwaysFail.NAME, 2047 * new RepTestUtils.AlwaysFailFormat()); 2048 */ 2049 public static class AlwaysFailFormat 2050 implements ConsistencyPolicyFormat<AlwaysFail> { 2051 2052 @Override policyToString(final AlwaysFail policy)2053 public String policyToString(final AlwaysFail policy) { 2054 return AlwaysFail.NAME; 2055 } 2056 2057 @Override stringToPolicy(final String string)2058 public AlwaysFail stringToPolicy(final String string) { 2059 return new AlwaysFail(); 2060 } 2061 } 2062 2063 /** 2064 * Wait until a replica/feeder syncup has been tried numSyncupAttempt times 2065 * on this node. 2066 */ setupWaitForSyncup(final ReplicatedEnvironment node, int numSyncupAttempts)2067 public static CountDownLatch setupWaitForSyncup 2068 (final ReplicatedEnvironment node, int numSyncupAttempts) { 2069 final CountDownLatch waiter = new CountDownLatch(numSyncupAttempts); 2070 2071 TestHook<Object> syncupFinished = new TestHook<Object>() { 2072 @Override 2073 public void doHook() throws InterruptedException { 2074 waiter.countDown(); 2075 } 2076 }; 2077 2078 RepInternal.getRepImpl(node).getRepNode(). 2079 replica().setReplicaFeederSyncupHook(syncupFinished); 2080 return waiter; 2081 } 2082 2083 2084 /** 2085 * Modify the existing rep configuration with the new parameter value pair. 2086 */ setConfigParam(ConfigParam param, String value, RepEnvInfo repEnvInfo[])2087 public static void setConfigParam(ConfigParam param, 2088 String value, 2089 RepEnvInfo repEnvInfo[]) { 2090 2091 for (RepEnvInfo info : repEnvInfo) { 2092 info.getRepConfig().setConfigParam(param.getName(), value); 2093 } 2094 } 2095 } 2096