1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.utilint;
9 
10 import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY;
11 
12 import java.io.ByteArrayOutputStream;
13 import java.io.File;
14 import java.io.FileInputStream;
15 import java.io.FileNotFoundException;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.util.Arrays;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32 
33 import org.junit.Assert;
34 
35 import com.sleepycat.bind.tuple.StringBinding;
36 import com.sleepycat.bind.tuple.TupleInput;
37 import com.sleepycat.je.CommitToken;
38 import com.sleepycat.je.Cursor;
39 import com.sleepycat.je.CursorConfig;
40 import com.sleepycat.je.Database;
41 import com.sleepycat.je.DatabaseConfig;
42 import com.sleepycat.je.DatabaseEntry;
43 import com.sleepycat.je.DatabaseException;
44 import com.sleepycat.je.DbInternal;
45 import com.sleepycat.je.Durability;
46 import com.sleepycat.je.Durability.ReplicaAckPolicy;
47 import com.sleepycat.je.Durability.SyncPolicy;
48 import com.sleepycat.je.Environment;
49 import com.sleepycat.je.EnvironmentConfig;
50 import com.sleepycat.je.LockMode;
51 import com.sleepycat.je.OperationStatus;
52 import com.sleepycat.je.ReplicaConsistencyPolicy;
53 import com.sleepycat.je.Transaction;
54 import com.sleepycat.je.TransactionConfig;
55 import com.sleepycat.je.cleaner.VerifyUtils;
56 import com.sleepycat.je.config.ConfigParam;
57 import com.sleepycat.je.dbi.DbType;
58 import com.sleepycat.je.dbi.EnvironmentImpl;
59 import com.sleepycat.je.log.FileManager;
60 import com.sleepycat.je.rep.CommitPointConsistencyPolicy;
61 import com.sleepycat.je.rep.GroupShutdownException;
62 import com.sleepycat.je.rep.InsufficientLogException;
63 import com.sleepycat.je.rep.InsufficientReplicasException;
64 import com.sleepycat.je.rep.NetworkRestore;
65 import com.sleepycat.je.rep.NetworkRestoreConfig;
66 import com.sleepycat.je.rep.NodeType;
67 import com.sleepycat.je.rep.QuorumPolicy;
68 import com.sleepycat.je.rep.RepInternal;
69 import com.sleepycat.je.rep.ReplicaConsistencyException;
70 import com.sleepycat.je.rep.ReplicatedEnvironment;
71 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
72 import com.sleepycat.je.rep.ReplicationConfig;
73 import com.sleepycat.je.rep.ReplicationNetworkConfig;
74 import com.sleepycat.je.rep.UnknownMasterException;
75 import com.sleepycat.je.rep.elections.Acceptor;
76 import com.sleepycat.je.rep.elections.Learner;
77 import com.sleepycat.je.rep.impl.PointConsistencyPolicy;
78 import com.sleepycat.je.rep.impl.RepGroupDB;
79 import com.sleepycat.je.rep.impl.RepGroupDB.GroupBinding;
80 import com.sleepycat.je.rep.impl.RepGroupDB.NodeBinding;
81 import com.sleepycat.je.rep.impl.RepGroupImpl;
82 import com.sleepycat.je.rep.impl.RepImpl;
83 import com.sleepycat.je.rep.impl.RepNodeImpl;
84 import com.sleepycat.je.rep.impl.RepParams;
85 import com.sleepycat.je.rep.impl.RepTestBase;
86 import com.sleepycat.je.rep.impl.node.FeederManager;
87 import com.sleepycat.je.rep.impl.node.NameIdPair;
88 import com.sleepycat.je.rep.impl.node.RepNode;
89 import com.sleepycat.je.rep.stream.FeederReader;
90 import com.sleepycat.je.rep.stream.OutputWireRecord;
91 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup.TestHook;
92 import com.sleepycat.je.rep.utilint.RepUtils.ConsistencyPolicyFormat;
93 import com.sleepycat.je.rep.vlsn.VLSNIndex;
94 import com.sleepycat.je.rep.vlsn.VLSNRange;
95 import com.sleepycat.je.util.DbBackup;
96 import com.sleepycat.je.util.TestUtils;
97 import com.sleepycat.je.utilint.DbLsn;
98 import com.sleepycat.je.utilint.VLSN;
99 import com.sleepycat.util.test.SharedTestUtils;
100 
101 /**
102  * Static utility methods and instances for replication unit tests.
103  *
104  * Examples of useful constructs here are methods that:
105  * <ul>
106  * <li>Create multiple environment directories suitable for unit testing
107  * a set of replicated nodes.
108  * <li>Create a router config that is initialized with exception and event
109  * listeners that will dump asynchronous exceptions to stderr, and which
110  * can be conditionalized to ignore exceptions at certain points when the
111  * test expects a disconnected node or other error condition.
112  * <li>Methods that compare two environments to see if they have a common
113  * replication stream.
114  * <li>etc ...
115  * </ul>
116  */
117 public class RepTestUtils {
118 
119     public static final String TEST_HOST = "localhost";
120     private static final String REPDIR = "rep";
121     public static final String TEST_REP_GROUP_NAME = "UnitTestGroup";
122     private static final String[] BUP_SUFFIXES = { FileManager.BUP_SUFFIX };
123 
124     /*
125      * If -DoverridePort=<val> is set, then replication groups will be
126      * set up with this default port value.
127      */
128     public static final String OVERRIDE_PORT = "overridePort";
129 
130     /*
131      * If -DlongTimeout is true, then this test will run with very long
132      * timeouts, to make interactive debugging easier.
133      */
134     private static final boolean longTimeout =
135         Boolean.getBoolean("longTimeout");
136 
137     public static final int MINUTE_MS = 60*1000;
138 
139     /* Time to wait for each node to start up and join the group. */
140     private static final long JOIN_WAIT_TIME = 20000;
141 
142     /* The basis for varying log file size */
143     private static int envCount = 1;
144 
145     /* Convenient constants */
146 
147     public final static Durability SYNC_SYNC_ALL_DURABILITY =
148         new Durability(Durability.SyncPolicy.SYNC,
149                        Durability.SyncPolicy.SYNC,
150                        Durability.ReplicaAckPolicy.ALL);
151 
152     public final static Durability SYNC_SYNC_NONE_DURABILITY =
153         new Durability(Durability.SyncPolicy.SYNC,
154                        Durability.SyncPolicy.SYNC,
155                        Durability.ReplicaAckPolicy.NONE);
156 
157     public final static Durability WNSYNC_NONE_DURABILITY =
158         new Durability(Durability.SyncPolicy.WRITE_NO_SYNC,
159                        Durability.SyncPolicy.WRITE_NO_SYNC,
160                        Durability.ReplicaAckPolicy.NONE);
161 
162     public static final Durability DEFAULT_DURABILITY =
163         new Durability(Durability.SyncPolicy.WRITE_NO_SYNC,
164                        Durability.SyncPolicy.WRITE_NO_SYNC,
165                        Durability.ReplicaAckPolicy.SIMPLE_MAJORITY);
166 
167     public final static TransactionConfig SYNC_SYNC_ALL_TC =
168         new TransactionConfig().setDurability(SYNC_SYNC_ALL_DURABILITY);
169 
170     public final static TransactionConfig SYNC_SYNC_NONE_TC =
171         new TransactionConfig().setDurability(SYNC_SYNC_NONE_DURABILITY);
172 
173     public final static TransactionConfig WNSYNC_NONE_TC =
174         new TransactionConfig().setDurability(WNSYNC_NONE_DURABILITY);
175 
176     public static final TransactionConfig NO_CONSISTENCY_TC =
177         new TransactionConfig()
178         .setConsistencyPolicy(NO_CONSISTENCY)
179         .setDurability(SYNC_SYNC_NONE_DURABILITY);
180 
getRepEnvDirs(File envRoot, int nNodes)181     public static File[] getRepEnvDirs(File envRoot, int nNodes) {
182         File envDirs[] = new File[nNodes];
183         for (int i=0; i < nNodes; i++) {
184             envDirs[i] = new File(envRoot, RepTestUtils.REPDIR + i);
185         }
186         return envDirs;
187     }
188 
189     /**
190      * Create nNode directories within the envRoot directory nodes, for housing
191      * a set of replicated environments. Each directory will be named
192      * <envRoot>/rep#, i.e <envRoot>/rep1, <envRoot>/rep2, etc.
193      */
makeRepEnvDirs(File envRoot, int nNodes)194     public static File[] makeRepEnvDirs(File envRoot, int nNodes)
195         throws IOException {
196 
197         File[] envHomes = new File[nNodes];
198         for (int i = 0; i < nNodes; i++) {
199             envHomes[i] = makeRepEnvDir(envRoot, i);
200         }
201         return envHomes;
202     }
203 
204     /**
205      * Create a directory within the envRoot directory nodes for housing a
206      * single replicated environment.  The directory will be named
207      * <envRoot>/rep<i>
208      */
makeRepEnvDir(File envRoot, int i)209     public static File makeRepEnvDir(File envRoot, int i)
210         throws IOException {
211 
212         File jeProperties = new File(envRoot, "je.properties");
213         File envHome = new File(envRoot, REPDIR + i);
214         envHome.mkdir();
215 
216         /* Copy the test je.properties into the new directories. */
217         File repProperties = new File(envHome, "je.properties");
218         FileInputStream from = null;
219         FileOutputStream to = null;
220         try {
221             try {
222                 from = new FileInputStream(jeProperties);
223             } catch (FileNotFoundException e) {
224                 jeProperties.createNewFile();
225 
226                 from = new FileInputStream(jeProperties);
227             }
228             to = new FileOutputStream(repProperties);
229             byte[] buffer = new byte[4096];
230             int bytesRead;
231 
232             while ((bytesRead = from.read(buffer)) != -1) {
233                 to.write(buffer, 0, bytesRead);
234             }
235         } finally {
236             if (from != null) {
237                 try {
238                     from.close();
239                 } catch (IOException ignore) {
240                 }
241             }
242             if (to != null) {
243                 try {
244                     to.close();
245                 } catch (IOException ignore) {
246                 }
247             }
248         }
249 
250         return envHome;
251     }
252 
253     /* Create the sub directories for replicated Environments. */
createRepSubDirs(RepEnvInfo[] repEnvInfo, int subDirNumber)254     public static void createRepSubDirs(RepEnvInfo[] repEnvInfo,
255                                         int subDirNumber) {
256         for (RepEnvInfo envInfo : repEnvInfo) {
257             if (envInfo != null) {
258                 TestUtils.createEnvHomeWithSubDir
259                     (envInfo.getEnvHome(), subDirNumber);
260             }
261         }
262     }
263 
264     /* Remove the sub directories inside the replicated Environment home. */
removeRepSubDirs(RepEnvInfo[] repEnvInfo)265     public static void removeRepSubDirs(RepEnvInfo[] repEnvInfo) {
266         for (RepEnvInfo envInfo : repEnvInfo) {
267             if (envInfo != null) {
268                 TestUtils.removeSubDirs(envInfo.getEnvHome());
269             }
270         }
271     }
272 
273     /** Convenience method to {@link #removeRepEnv} multiple nodes. */
removeRepDirs(RepEnvInfo... repEnvInfo)274     public static void removeRepDirs(RepEnvInfo... repEnvInfo) {
275         for (RepEnvInfo envInfo : repEnvInfo) {
276             if (envInfo != null) {
277                 removeRepEnv(envInfo.getEnvHome());
278             }
279         }
280     }
281 
282     /**
283      * Remove all the log files in the <envRoot>/rep* directories directory.
284      */
removeRepEnvironments(File envRoot)285     public static void removeRepEnvironments(File envRoot) {
286         File[] repEnvs = envRoot.listFiles();
287         for (File repEnv : repEnvs) {
288             if (repEnv.isDirectory()) {
289                 removeRepEnv(repEnv);
290             }
291         }
292         removeRepEnv(envRoot);
293     }
294 
295     /** Removes log/lck/bkp files from a single env home directory. */
removeRepEnv(File envHome)296     public static void removeRepEnv(File envHome) {
297         TestUtils.removeLogFiles("removeRepEnv", envHome, false);
298         new File(envHome, "je.lck").delete();
299         removeBackupFiles(envHome);
300     }
301 
removeBackupFiles(File repEnv)302     private static void removeBackupFiles(File repEnv) {
303         for (String fileName :
304              FileManager.listFiles(repEnv, BUP_SUFFIXES, false)) {
305             new File(repEnv, fileName).delete();
306         }
307     }
308 
309     /**
310      * Create an array of environments, with basically the same environment
311      * configuration.
312      */
313 
setupEnvInfos(File envRoot, int nNodes)314     public static RepEnvInfo[] setupEnvInfos(File envRoot, int nNodes)
315         throws IOException {
316 
317         return setupEnvInfos(envRoot, nNodes, DEFAULT_DURABILITY);
318     }
319 
320     /**
321      * Fill in an array of environments, with basically the same environment
322      * configuration. Only fill in the array slots which are null. Used to
323      * initialize semi-populated set of environments.
324      * @throws IOException
325      */
setupEnvInfos(File envRoot, int nNodes, Durability envDurability)326     public static RepEnvInfo[] setupEnvInfos(File envRoot,
327                                              int nNodes,
328                                              Durability envDurability)
329         throws IOException {
330 
331         File[] envHomes = makeRepEnvDirs(envRoot, nNodes);
332         RepEnvInfo[] repEnvInfo = new RepEnvInfo[envHomes.length];
333 
334         for (int i = 0; i < repEnvInfo.length; i++) {
335             repEnvInfo[i] = setupEnvInfo(envHomes[i],
336                                          envDurability,
337                                          (short) (i + 1), // nodeId
338                                          repEnvInfo[0]);
339         }
340         return repEnvInfo;
341     }
342 
setupEnvInfos(File envRoot, int nNodes, EnvironmentConfig envConfig)343     public static RepEnvInfo[] setupEnvInfos(File envRoot,
344                                              int nNodes,
345                                              EnvironmentConfig envConfig)
346         throws IOException {
347 
348         return setupEnvInfos(envRoot, nNodes, envConfig, null);
349     }
350 
setupEnvInfos(File envRoot, int nNodes, EnvironmentConfig envConfig, ReplicationConfig repConfig)351     public static RepEnvInfo[] setupEnvInfos(File envRoot,
352                                              int nNodes,
353                                              EnvironmentConfig envConfig,
354                                              ReplicationConfig repConfig)
355         throws IOException {
356 
357         File[] envdirs = makeRepEnvDirs(envRoot, nNodes);
358         RepEnvInfo[] repEnvInfo  = new RepEnvInfo[envdirs.length];
359 
360         for (int i = 0; i < repEnvInfo.length; i++) {
361             repEnvInfo[i] = setupEnvInfo(envdirs[i],
362                                          envConfig.clone(),
363                                          createRepConfig(repConfig, i + 1),
364                                          repEnvInfo[0]);
365         }
366         return repEnvInfo;
367     }
368 
369     /**
370      * Adds an additional replicated environment to the specified list and
371      * returns the extended list.  Uses the ReplicationConfig from the first
372      * initial node as the basis for the new node.
373      */
setupExtendEnvInfo( final RepEnvInfo[] initialEnvInfo, final int nNodes)374     public static RepEnvInfo[] setupExtendEnvInfo(
375         final RepEnvInfo[] initialEnvInfo,
376         final int nNodes)
377         throws IOException {
378 
379         final int initialNodesCount = initialEnvInfo.length;
380         final RepEnvInfo[] result =
381             Arrays.copyOf(initialEnvInfo, initialNodesCount + nNodes);
382         final File envRoot = initialEnvInfo[0].getEnvHome().getParentFile();
383         final ReplicationConfig baseRepConfig =
384             initialEnvInfo[0].getRepConfig();
385         for (int i = 0; i < nNodes; i++) {
386             final int pos = initialNodesCount + i;
387             result[pos] = setupEnvInfo(makeRepEnvDir(envRoot, pos),
388                                        createEnvConfig(DEFAULT_DURABILITY),
389                                        createRepConfig(baseRepConfig, pos + 1),
390                                        initialEnvInfo[0]);
391         }
392         return result;
393     }
394 
395     /**
396      * Create info for a single replicated environment.
397      */
setupEnvInfo(File envHome, Durability envDurability, int nodeId, RepEnvInfo helper)398     public static RepEnvInfo setupEnvInfo(File envHome,
399                                           Durability envDurability,
400                                           int nodeId,
401                                           RepEnvInfo helper) {
402 
403         EnvironmentConfig envConfig = createEnvConfig(envDurability);
404         return setupEnvInfo(envHome, envConfig, nodeId, helper);
405     }
406 
407     /**
408      * Create info for a single replicated environment.
409      */
setupEnvInfo(File envHome, EnvironmentConfig envConfig, int nodeId, RepEnvInfo helper)410     public static RepEnvInfo setupEnvInfo(File envHome,
411                                           EnvironmentConfig envConfig,
412                                           int nodeId,
413                                           RepEnvInfo helper) {
414         return setupEnvInfo(envHome,
415                             envConfig,
416                             createRepConfig(nodeId),
417                             helper);
418     }
419 
420     /**
421      * Create info for a single replicated environment.
422      */
setupEnvInfo(File envHome, EnvironmentConfig envConfig, ReplicationConfig repConfig, RepEnvInfo helper)423     public static RepEnvInfo setupEnvInfo(File envHome,
424                                           EnvironmentConfig envConfig,
425                                           ReplicationConfig repConfig,
426                                           RepEnvInfo helper) {
427 
428         /*
429          * Give all the environments the same environment configuration.
430          *
431          * If the file size is not set by the calling test, stagger their log
432          * file length to give them slightly different logs and VLSNs. Disable
433          * parameter validation because we want to make the log file length
434          * smaller than the minimums, for testing.
435          */
436         if (!envConfig.isConfigParamSet(EnvironmentConfig.LOG_FILE_MAX)) {
437             DbInternal.disableParameterValidation(envConfig);
438             /*  Vary the file size */
439             long fileLen = ((envCount++ % 100)+1) * 10000;
440             envConfig.setConfigParam(EnvironmentConfig.LOG_FILE_MAX,
441                                      Long.toString(fileLen));
442         }
443 
444         repConfig.setHelperHosts((helper == null) ?
445                                  repConfig.getNodeHostPort() :
446                                  helper.getRepConfig().getNodeHostPort());
447 
448         /*
449          * If -DlongTimeout is true, then this test will run with very long
450          * timeouts, to make interactive debugging easier.
451          */
452         if (longTimeout) {
453             setLongTimeouts(repConfig);
454         }
455 
456         /*
457          * If -DlongAckTimeout is true, then the test will set the
458          * REPLICA_TIMEOUT to 50secs.
459          */
460         if (Boolean.getBoolean("longAckTimeout")) {
461             repConfig.setReplicaAckTimeout(50, TimeUnit.SECONDS);
462         }
463         return new RepEnvInfo(envHome, repConfig, envConfig);
464     }
465 
createEnvConfig(Durability envDurability)466     public static EnvironmentConfig createEnvConfig(Durability envDurability) {
467         EnvironmentConfig envConfig = new EnvironmentConfig();
468         envConfig.setAllowCreate(true);
469         envConfig.setTransactional(true);
470         envConfig.setDurability(envDurability);
471 
472         /*
473          * Replicated tests use multiple environments, configure shared cache
474          * to reduce the memory consumption.
475          */
476         envConfig.setSharedCache(true);
477 
478         return envConfig;
479     }
480 
481     /**
482      * Create a test RepConfig for the node with the specified id. Note that
483      * the helper is not configured.
484      */
createRepConfig(int nodeId)485     public static ReplicationConfig createRepConfig(int nodeId)
486         throws NumberFormatException, IllegalArgumentException {
487 
488         return createRepConfig(null, nodeId);
489     }
490 
getDefaultPort()491     private static int getDefaultPort() {
492         return Integer.getInteger
493             (OVERRIDE_PORT,
494              Integer.parseInt(RepParams.DEFAULT_PORT.getDefault()));
495     }
496 
497     /**
498      * Create a test RepConfig for the node with the specified id, using the
499      * specified repConfig. The repConfig may have other parameters set
500      * already. Note that the helper is not configured.
501      */
502     public static
createRepConfig(ReplicationConfig repConfig, int nodeId)503         ReplicationConfig createRepConfig(ReplicationConfig repConfig,
504                                           int nodeId)
505         throws NumberFormatException, IllegalArgumentException {
506 
507         ReplicationConfig filledInConfig =
508             repConfig == null ? new ReplicationConfig() : repConfig.clone();
509 
510         final int firstPort = getDefaultPort();
511         filledInConfig.setConfigParam
512             (RepParams.ENV_SETUP_TIMEOUT.getName(), "60 s");
513         filledInConfig.setConfigParam
514             (ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, "60 s");
515         filledInConfig.setGroupName(TEST_REP_GROUP_NAME);
516         filledInConfig.setNodeName("Node" + nodeId);
517         String nodeHost = TEST_HOST + ":" + (firstPort + (nodeId - 1));
518         filledInConfig.setNodeHostPort(nodeHost);
519 
520         /* Minimize socket bind exceptions in tests. */
521         filledInConfig.setConfigParam(RepParams.SO_REUSEADDR.getName(),
522                                       "true");
523         filledInConfig.setConfigParam(RepParams.SO_BIND_WAIT_MS.getName(),
524                                       "150000");
525         return filledInConfig;
526     }
527 
readRepNetConfig()528     public static ReplicationNetworkConfig readRepNetConfig() {
529         /* Call to force class loading and parameter registration */
530         ReplicationNetworkConfig.registerParams();
531         return ReplicationNetworkConfig.create(readNetProps());
532     }
533 
readNetProps()534     public static Properties readNetProps() {
535         final File propFile =
536             new File(SharedTestUtils.getTestDir(), "je.properties");
537         final Properties props = new Properties();
538         RepUtils.populateNetProps(props, propFile);
539         return props;
540     }
541 
542     /**
543      * Set timeouts to long intervals for debugging interactively
544      */
setLongTimeouts(ReplicationConfig repConfig)545     public static void setLongTimeouts(ReplicationConfig repConfig) {
546 
547         RepInternal.disableParameterValidation(repConfig);
548 
549         /* Wait an hour for this node to join the group.*/
550         repConfig.setConfigParam(RepParams.ENV_SETUP_TIMEOUT.getName(),
551                                  "1 h");
552         repConfig.setConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT,
553                                  "1 h");
554 
555         /* Wait an hour for replica acks. */
556         repConfig.setConfigParam(ReplicationConfig.REPLICA_ACK_TIMEOUT,
557                                  "1 h");
558 
559         /* Have a heartbeat every five minutes. */
560         repConfig.setConfigParam(RepParams.HEARTBEAT_INTERVAL.getName(),
561                                  "5 min");
562     }
563 
564     /**
565      * Shuts down the environments with a checkpoint at the end.
566      *
567      * @param repEnvInfo the environments to be shutdown
568      */
shutdownRepEnvs(RepEnvInfo[] repEnvInfo)569     public static void shutdownRepEnvs(RepEnvInfo[] repEnvInfo) {
570 
571         shutdownRepEnvs(repEnvInfo, true);
572     }
573 
574     /**
575      * Shut down the environment, with an optional checkpoint. It sequences the
576      * shutdown so that all replicas are shutdown before the master.  This
577      * sequencing avoids side-effects in the tests where shutting down the
578      * master first results in elections and one of the "to be shutdown"
579      * replicas becomes a master and so on.
580      * <p>
581      * If an exception occurs for any reason while closing one env, other envs
582      * may be left open.  TODO: Determine if this behavior is really desired.
583      *
584      * @param repEnvInfo the environments to be shutdown
585      *
586      * @param doCheckpoint whether do a checkpoint at the end of close
587      */
shutdownRepEnvs(RepEnvInfo[] repEnvInfo, boolean doCheckpoint)588     public static void shutdownRepEnvs(RepEnvInfo[] repEnvInfo,
589                                        boolean doCheckpoint) {
590 
591         if (repEnvInfo == null) {
592             return;
593         }
594 
595         RepEnvInfo master = null;
596         for (RepEnvInfo ri : repEnvInfo) {
597             if ((ri.repEnv == null) || RepInternal.isClosed(ri.repEnv)) {
598                 ri.repEnv = null;
599                 continue;
600             }
601             if (ri.repEnv.getState().isMaster()) {
602                 if (master != null) {
603                     throw new IllegalStateException
604                         ("Multiple masters: " + master.getEnv().getNodeName() +
605                          " and " + ri.repEnv.getNodeName() +
606                          " are both masters.");
607                 }
608                 master = ri;
609             } else {
610                 try {
611                     if (doCheckpoint) {
612                         RepImpl repImpl = RepInternal.getRepImpl(ri.repEnv);
613                         ri.repEnv.close();
614                         if (!repImpl.isClosed()) {
615                             throw new IllegalStateException
616                                 ("Environment: " + ri.getEnvHome() +
617                                  " not released");
618                         }
619                     } else {
620                         RepInternal.getRepImpl(ri.repEnv).close(false);
621                     }
622                 } finally {
623                     ri.repEnv = null;
624                 }
625             }
626         }
627 
628         if (master != null) {
629             try {
630                 if (doCheckpoint) {
631                     master.getEnv().close();
632                 } else {
633                     RepInternal.getRepImpl(master.getEnv()).close(false);
634                 }
635             } finally {
636                 master.repEnv = null;
637             }
638         }
639     }
640 
641     /**
642      * All the non-closed, non-null environments in the array join the group.
643      * @return the replicator who is the master.
644      */
645     public static ReplicatedEnvironment
openRepEnvsJoin(RepEnvInfo[] repEnvInfo)646         openRepEnvsJoin(RepEnvInfo[] repEnvInfo) {
647 
648         return joinGroup(getOpenRepEnvs(repEnvInfo));
649     }
650 
651     /* Get open replicated environments from an array. */
getOpenRepEnvs(RepEnvInfo[] repEnvInfo)652     public static RepEnvInfo[] getOpenRepEnvs(RepEnvInfo[] repEnvInfo) {
653         Set<RepEnvInfo> repSet = new HashSet<RepEnvInfo>();
654         for (RepEnvInfo ri : repEnvInfo) {
655             if ((ri != null) &&
656                 (ri.getEnv() != null) &&
657                 !RepInternal.isClosed(ri.getEnv())) {
658                 repSet.add(ri);
659             }
660         }
661 
662         return repSet.toArray(new RepEnvInfo[repSet.size()]);
663     }
664 
665     /**
666      * Environment handles are created using the config information in
667      * repEnvInfo. Note that since this method causes handles to be created
668      * serially, it cannot be used to restart an existing group from scratch.
669      * It can only be used to start a new group, or have nodes join a group
670      * that is already active.
671      *
672      * @return the replicated environment associated with the master.
673      */
674     public static
joinGroup(RepEnvInfo .... repEnvInfo)675         ReplicatedEnvironment joinGroup(RepEnvInfo ... repEnvInfo) {
676 
677         int retries = 10;
678         final int retryWaitMillis = 5000;
679         ReplicatedEnvironment master = null;
680         List<RepEnvInfo> joinNotFinished =
681             new LinkedList<RepEnvInfo>(Arrays.asList(repEnvInfo));
682 
683         while (joinNotFinished.size() != 0) {
684             for (RepEnvInfo ri : joinNotFinished) {
685                 try {
686                     ReplicatedEnvironment.State joinState;
687                     if (ri.getEnv() != null) {
688 
689                         /*
690                          * Handle exists, make sure it's not in UNKNOWN state.
691                          */
692                         RepImpl rimpl = RepInternal.getRepImpl(ri.getEnv());
693                         joinState = rimpl.getState();
694                         Assert.assertFalse(
695                             "Node " + ri.getEnv().getNodeName() +
696                             " was detached",
697                             joinState.equals(State.DETACHED));
698                     } else {
699                         joinState = ri.openEnv().getState();
700                     }
701 
702                     if (joinState.equals(State.MASTER)) {
703                         if (master != null) {
704                             if (--retries > 0) {
705                                 Thread.sleep(retryWaitMillis);
706 
707                                 /*
708                                  * Start over. The master is slow making its
709                                  * transition, one of them has not realized
710                                  * that they are no longer the master.
711                                  */
712                                 joinNotFinished = new LinkedList<RepEnvInfo>
713                                     (Arrays.asList(repEnvInfo));
714                                 master = null;
715                                 break;
716                             }
717                             throw new RuntimeException
718                                 ("Dual masters: " + ri.getEnv().getNodeName() +
719                                  " and " +
720                                  master.getNodeName() + " despite retries");
721                         }
722                         master = ri.getEnv();
723                     }
724                     joinNotFinished.remove(ri);
725                     if ((joinNotFinished.size() == 0) && (master == null)) {
726                         if (--retries == 0) {
727                             throw new RuntimeException
728                             ("No master established despite retries");
729                         }
730                         Thread.sleep(retryWaitMillis);
731                         /* Start over, an election is still in progress. */
732                         joinNotFinished = new LinkedList<RepEnvInfo>
733                         (Arrays.asList(repEnvInfo));
734                     }
735                     break;
736                 } catch (UnknownMasterException retry) {
737                     /* Just retry. */
738                 } catch (InterruptedException e) {
739                     throw new RuntimeException(e);
740                 }
741             }
742         }
743         return master;
744     }
745 
746     /**
747      * Used to ensure that the entire group is in sync, that is, all replicas
748      * are consistent with the master's last commit. Note that it requires all
749      * the nodes in the replication group to be available.
750      *
751      * @param repEnvInfo the array holding the environments
752      * @param numSyncNodes the expected number of nodes to be synced; includes
753      * the master
754      * @throws InterruptedException
755      */
syncGroupToLastCommit(RepEnvInfo[] repEnvInfo, int numSyncNodes)756     public static VLSN syncGroupToLastCommit(RepEnvInfo[] repEnvInfo,
757                                              int numSyncNodes)
758         throws InterruptedException {
759 
760         CommitToken masterCommitToken = null;
761 
762         /*
763          * Create a transaction just to make sure all the replicas are awake
764          * and connected.
765          */
766         for (RepEnvInfo repi : repEnvInfo) {
767             ReplicatedEnvironment rep = repi.getEnv();
768             if (rep.getState().isMaster()) {
769                 try {
770                     Transaction txn=
771                         rep.
772                         beginTransaction(null, RepTestUtils.SYNC_SYNC_ALL_TC);
773                     txn.commit();
774                 } catch (InsufficientReplicasException e) {
775                     if (e.getAvailableReplicas().size() != (numSyncNodes-1)) {
776                         throw new IllegalStateException
777                             ("Expected replicas: " + (numSyncNodes - 1) +
778                              "available replicas: " +
779                              e.getAvailableReplicas());
780                     }
781                 }
782 
783                 /*
784                  * Handshakes with all replicas are now completed, if they were
785                  * not before. Now get a token to represent the last committed
786                  * point in the replication stream, from the master's
787                  * perspective.
788                  */
789                 RepNode repNode = RepInternal.getRepImpl(rep).getRepNode();
790                 masterCommitToken = new CommitToken
791                     (repNode.getUUID(),
792                      repNode.getCurrentTxnEndVLSN().getSequence());
793                 break;
794             }
795         }
796 
797         if (masterCommitToken  == null) {
798             throw new IllegalStateException("No current master");
799         }
800 
801         CommitPointConsistencyPolicy policy =
802             new CommitPointConsistencyPolicy(masterCommitToken, MINUTE_MS,
803                                              TimeUnit.MILLISECONDS);
804 
805         /*
806          * Check that the environments are caught up with the last master
807          * commit at the time of the call to this method.
808          */
809         for (RepEnvInfo repi : repEnvInfo) {
810             ReplicatedEnvironment rep = repi.getEnv();
811             if ((rep == null) ||
812                 RepInternal.isClosed(rep) ||
813                 rep.getState().isMaster() ||
814                 rep.getState().isDetached()) {
815                 continue;
816             }
817             policy.ensureConsistency(RepInternal.getRepImpl(rep));
818         }
819         return new VLSN(masterCommitToken.getVLSN());
820     }
821 
822     /**
823      * Used to ensure that the group is in sync with respect to a given
824      * VLSN. If numSyncNodes == repEnvInfo.length, all the nodes in the
825      * replication group must be alive and available. If numSyncNodes is less
826      * than the size of the group, a quorum will need to be alive and
827      * available.
828      *
829      * @param repEnvInfo the array holding the environments
830      * @param numSyncNodes the expected number of nodes to be synced; includes
831      * the master
832      * @throws InterruptedException
833      */
syncGroupToVLSN(RepEnvInfo[] repEnvInfo, int numSyncNodes, VLSN targetVLSN)834     public static void syncGroupToVLSN(RepEnvInfo[] repEnvInfo,
835                                        int numSyncNodes,
836                                        VLSN targetVLSN)
837         throws InterruptedException {
838 
839         /*
840          * Create a transaction just to make sure all the replicas are awake
841          * and connected.
842          */
843         for (RepEnvInfo repi : repEnvInfo) {
844             ReplicatedEnvironment rep = repi.getEnv();
845             if (rep == null) {
846                 continue;
847             }
848 
849             if (rep.getState().isMaster()) {
850                 TransactionConfig txnConfig = null;
851                 if (numSyncNodes == repEnvInfo.length) {
852                     txnConfig = RepTestUtils.SYNC_SYNC_ALL_TC;
853                 } else {
854                     txnConfig = new TransactionConfig();
855                     txnConfig.setDurability
856                         (new Durability(SyncPolicy.SYNC,
857                                         SyncPolicy.SYNC,
858                                         ReplicaAckPolicy.SIMPLE_MAJORITY));
859                 }
860 
861                 try {
862                     Transaction txn = rep.beginTransaction(null, txnConfig);
863                     txn.commit();
864                 } catch (InsufficientReplicasException e) {
865                     if (e.getAvailableReplicas().size() !=
866                         (numSyncNodes - 1)) {
867                         throw new IllegalStateException
868                             ("Expected replicas: " + (numSyncNodes - 1) +
869                              ", available replicas: " +
870                              e.getAvailableReplicas());
871                     }
872                 }
873             }
874         }
875 
876         syncGroup(repEnvInfo, targetVLSN);
877     }
878 
879     /* Syncs the group to the specific VLSN. */
syncGroup(RepEnvInfo[] repEnvInfo, VLSN targetVLSN)880     private static void syncGroup(RepEnvInfo[] repEnvInfo, VLSN targetVLSN)
881         throws InterruptedException {
882         PointConsistencyPolicy policy = new PointConsistencyPolicy(targetVLSN);
883 
884         /* Check that the environments are caught up with this VLSN. */
885         for (RepEnvInfo repi : repEnvInfo) {
886             ReplicatedEnvironment rep = repi.getEnv();
887             if (rep == null ||
888                 RepInternal.isClosed(rep) ||
889                 rep.getState().isMaster()) {
890                 continue;
891             }
892             policy.ensureConsistency(RepInternal.getRepImpl(rep));
893         }
894     }
895 
896     /**
897      * Synchronizes the group to the current vlsn on the master. Used to ensure
898      * that application level changes, even mid-transaction changes have been
899      * replicated to all the nodes before the method returns.
900      *
901      * Note that since CBVLSN updates are asynchronous the vlsn may continue
902      * moving forward, but the application level changes will have been
903      * propagated.
904      */
syncGroup(RepEnvInfo[] repEnvInfo)905     public static VLSN syncGroup(RepEnvInfo[] repEnvInfo) {
906         RepEnvInfo master = RepTestBase.findMaster(repEnvInfo);
907         if (master == null) {
908             throw new IllegalStateException("no master");
909         }
910         VLSN vlsn = master.getRepImpl().getVLSNIndex().getRange().getLast();
911         try {
912             syncGroup(repEnvInfo, vlsn);
913         } catch (Exception e) {
914             throw new IllegalStateException("unexpected exception");
915         }
916         return vlsn;
917     }
918 
checkUtilizationProfile(RepEnvInfo .... repEnvInfo)919     public static void checkUtilizationProfile(RepEnvInfo ... repEnvInfo) {
920         checkUtilizationProfile(null, repEnvInfo);
921     }
922 
923     /**
924      * Run utilization profile checking on all databases in the set of
925      * RepEnvInfo. The environment must be quiescent. The utility will lock
926      * out any cleaning by using DbBackup, during the check.
927      */
checkUtilizationProfile(PrintStream out, RepEnvInfo ... repEnvInfo)928     public static void checkUtilizationProfile(PrintStream out,
929                                                RepEnvInfo ... repEnvInfo) {
930         for (RepEnvInfo info : repEnvInfo) {
931             if (out != null) {
932                 out.println("checking " + info.getEnvHome());
933             }
934 
935             Environment env = info.getEnv();
936 
937             /* Use DbBackup to prevent log file deletion. */
938             DbBackup backup = new DbBackup(env);
939             backup.startBackup();
940 
941             try {
942                 List<String> dbNames = env.getDatabaseNames();
943 
944                 for (String dbName : dbNames) {
945                     if (out != null) {
946                         out.println("\tchecking " + dbName);
947                     }
948                     DatabaseConfig dbConfig = new DatabaseConfig();
949                     DbInternal.setUseExistingConfig(dbConfig, true);
950                     dbConfig.setTransactional(true);
951                     Database db = env.openDatabase(null, dbName, dbConfig);
952 
953                     try {
954                         VerifyUtils.checkLsns(db);
955                     } finally {
956                         db.close();
957                     }
958                 }
959             } finally {
960                 backup.endBackup();
961             }
962         }
963     }
964 
965     /**
966      * Confirm that all the nodes in this group match. Check number of
967      * databases, names of databases, per-database count, per-database
968      * records. Use the master node as the reference if it exists, else use the
969      * first replicator.
970      *
971      * @param limit The replication stream portion of the equality check is
972      * bounded at the upper end by this value. Limit is usually the commit sync
973      * or vlsn sync point explicitly called by a test before calling
974      * checkNodeEquality.  Each node must contain VLSNs up to and including the
975      * limit, and may also include additional VSLNs due to heartbeats, etc.
976      *
977      * @throws InterruptedException
978      *
979      * @throws RuntimeException if there is an incompatibility
980      */
checkNodeEquality(VLSN limit, boolean verbose, RepEnvInfo ... repEnvInfo)981     public static void checkNodeEquality(VLSN limit,
982                                          boolean verbose,
983                                          RepEnvInfo ... repEnvInfo)
984         throws InterruptedException {
985 
986         int referenceIndex = -1;
987         assert repEnvInfo.length > 0;
988         for (int i = 0; i < repEnvInfo.length; i++) {
989             if ((repEnvInfo[i] == null) ||
990                 (repEnvInfo[i].getEnv() == null)) {
991                 continue;
992             }
993             ReplicatedEnvironment repEnv = repEnvInfo[i].getEnv();
994             if (!RepInternal.isClosed(repEnv) && repEnv.getState().isMaster()) {
995                 referenceIndex = i;
996                 break;
997             }
998         }
999         assert referenceIndex != -1;
1000 
1001         ReplicatedEnvironment reference = repEnvInfo[referenceIndex].getEnv();
1002         for (int i = 0; i < repEnvInfo.length; i++) {
1003             if (i != referenceIndex) {
1004                 if ((repEnvInfo[i] == null) ||
1005                     (repEnvInfo[i].getEnv() == null)) {
1006                     continue;
1007                 }
1008 
1009                 ReplicatedEnvironment repEnv = repEnvInfo[i].getEnv();
1010                 if (verbose) {
1011                     System.out.println("Comparing master node " +
1012                                        reference.getNodeName() +
1013                                        " to node " +
1014                                        repEnv.getNodeName());
1015                 }
1016 
1017                 if (!RepInternal.isClosed(repEnv)) {
1018                     checkNodeEquality(reference, repEnv, limit, verbose);
1019                 }
1020             }
1021         }
1022     }
1023 
1024     /* Enable or disable the log cleaning on a replica. */
runOrPauseCleaners(ReplicatedEnvironment repEnv, boolean isPaused)1025     private static void runOrPauseCleaners(ReplicatedEnvironment repEnv,
1026                                            boolean isPaused)
1027         throws InterruptedException {
1028 
1029         if (!RepInternal.isClosed(repEnv)) {
1030             RepImpl repImpl = RepInternal.getRepImpl(repEnv);
1031             if (isPaused) {
1032                 repImpl.getCleaner().addProtectedFileRange(0L);
1033             } else {
1034                 repImpl.getCleaner().removeProtectedFileRange(0L);
1035             }
1036             Thread.sleep(100);
1037         }
1038     }
1039 
1040     /**
1041      * Confirm that the contents of these two nodes match. Check number of
1042      * databases, names of databases, per-database count, per-database records.
1043      *
1044      * @throws InterruptedException
1045      * @throws RuntimeException if there is an incompatiblity
1046      */
checkNodeEquality(ReplicatedEnvironment replicatorA, ReplicatedEnvironment replicatorB, VLSN limit, boolean verbose)1047     public static void checkNodeEquality(ReplicatedEnvironment replicatorA,
1048                                          ReplicatedEnvironment replicatorB,
1049                                          VLSN limit,
1050                                          boolean verbose)
1051         throws InterruptedException {
1052 
1053         runOrPauseCleaners(replicatorA, true);
1054         runOrPauseCleaners(replicatorB, true);
1055 
1056         String nodeA = replicatorA.getNodeName();
1057         String nodeB = replicatorB.getNodeName();
1058 
1059         Environment envA = replicatorA;
1060         Environment envB = replicatorB;
1061 
1062         RepImpl repImplA = RepInternal.getRepImpl(replicatorA);
1063         RepImpl repImplB = RepInternal.getRepImpl(replicatorB);
1064 
1065         try {
1066 
1067             /* Compare the replication related sequences. */
1068             if (verbose) {
1069                 System.out.println("Comparing sequences");
1070             }
1071 
1072             /* replicated node id sequence. */
1073             /*
1074               long nodeIdA =
1075               envImplA.getNodeSequence().getLastReplicatedNodeId();
1076               long nodeIdB =
1077               envImplB.getNodeSequence().getLastReplicatedNodeId();
1078 
1079               // TEMPORARILY DISABLED: sequences not synced up. This may
1080               // actually apply right now to database and txn ids too,
1081               // but it's less likely to manifest itself.
1082               if (nodeIdA != nodeIdB) {
1083               throw new RuntimeException
1084               ("NodeId mismatch. " + nodeA +
1085               " lastRepNodeId=" + nodeIdA + " " + nodeB +
1086               " lastRepNodeId=" + nodeIdB);
1087               }
1088             */
1089 
1090             /* replicated txn id sequence. */
1091             /*
1092               long txnIdA = repImplA.getTxnManager().getLastReplicatedTxnId();
1093               long txnIdB = repmplB.getTxnManager().getLastReplicatedTxnId();
1094               if (txnIdA != txnIdB) {
1095               throw new RuntimeException
1096               ("TxnId mismatch. A.lastRepTxnId=" + txnIdA +
1097               " B.lastRepTxnId=" + txnIdB);
1098               }
1099             */
1100 
1101             /* Replicated database id sequence. */
1102             long dbIdA = repImplA.getDbTree().getLastReplicatedDbId();
1103             long dbIdB = repImplB.getDbTree().getLastReplicatedDbId();
1104             if (dbIdA != dbIdB) {
1105                 throw new RuntimeException
1106                     ("DbId mismatch. A.lastRepDbId=" + dbIdA +
1107                      " B.lastRepDbId=" + dbIdB);
1108             }
1109 
1110             /* Check name and number of application databases first. */
1111             List<String> dbListA = envA.getDatabaseNames();
1112             List<String> dbListB = envB.getDatabaseNames();
1113 
1114             if (verbose) {
1115                 System.out.println("envEquals: check db list: " + nodeA +
1116                                    "=" + dbListA + " " + nodeB + "=" +
1117                                    dbListB);
1118             }
1119 
1120             if (!dbListA.equals(dbListB)) {
1121                 throw new RuntimeException("Mismatch: dbNameList " + nodeA +
1122                                            " =" + dbListA + " " +
1123                                            nodeB + " =" + dbListB);
1124             }
1125 
1126             /* Check record count and contents of each database. */
1127             DatabaseConfig checkConfig = new DatabaseConfig();
1128             checkConfig.setReadOnly(true);
1129             checkConfig.setTransactional(true);
1130             DbInternal.setUseExistingConfig(checkConfig, true);
1131             for (String dbName : dbListA) {
1132 
1133                 Database dbA = null;
1134                 Database dbB = null;
1135                 try {
1136                     dbA = envA.openDatabase(null, dbName, checkConfig);
1137                     dbB = envB.openDatabase(null, dbName, checkConfig);
1138 
1139                     int count = checkDbContents(dbA, dbB);
1140 
1141                     if (verbose) {
1142                         System.out.println("compared " + count + " records");
1143                     }
1144                 } finally {
1145                     if (dbA != null) {
1146                         dbA.close();
1147                     }
1148                     if (dbB != null) {
1149                         dbB.close();
1150                     }
1151                 }
1152             }
1153 
1154             /*
1155              * Check the replication stream of each environment. The subset of
1156              * VLSN entries common to both nodes should match.
1157              */
1158             checkStreamIntersection(nodeA,
1159                                     nodeB,
1160                                     RepInternal.getRepImpl(replicatorA),
1161                                     RepInternal.getRepImpl(replicatorB),
1162                                     limit,
1163                                     verbose);
1164         } catch (DatabaseException e) {
1165             throw new RuntimeException(e);
1166         } catch (IOException e) {
1167             throw new RuntimeException(e);
1168         }
1169 
1170         runOrPauseCleaners(replicatorA, false);
1171         runOrPauseCleaners(replicatorB, false);
1172     }
1173 
1174     /**
1175      * @throws RuntimeException if dbA and dbB don't have the same contents.
1176      */
checkDbContents(Database dbA, Database dbB)1177     private static int checkDbContents(Database dbA, Database dbB) {
1178 
1179         Cursor cursorA = null;
1180         Cursor cursorB = null;
1181         Transaction txnA = null;
1182         Transaction txnB = null;
1183         int debugCount = 0;
1184         boolean isGroupDB =
1185             dbA.getDatabaseName().equals(DbType.REP_GROUP.getInternalName());
1186 
1187         try {
1188             txnA = dbA.getEnvironment().beginTransaction(null, null);
1189             txnB = dbB.getEnvironment().beginTransaction(null, null);
1190             cursorA = dbA.openCursor(txnA, CursorConfig.READ_UNCOMMITTED);
1191             cursorB = dbB.openCursor(txnB, CursorConfig.READ_UNCOMMITTED);
1192             DatabaseEntry keyA = new DatabaseEntry();
1193             DatabaseEntry keyB = new DatabaseEntry();
1194             DatabaseEntry dataA = new DatabaseEntry();
1195             DatabaseEntry dataB = new DatabaseEntry();
1196             NodeBinding nodeBinding = null;
1197             while (cursorA.getNext(keyA, dataA, LockMode.DEFAULT) ==
1198                    OperationStatus.SUCCESS) {
1199                 debugCount++;
1200                 OperationStatus statusB = cursorB.getNext(keyB, dataB,
1201                                                           LockMode.DEFAULT);
1202                 if (statusB != OperationStatus.SUCCESS) {
1203                     throw new RuntimeException("Mismatch: debugCount=" +
1204                                                debugCount + "bad statusB = " +
1205                                                statusB);
1206                 }
1207                 if (!Arrays.equals(keyA.getData(), keyB.getData())) {
1208                     throw new RuntimeException("Mismatch: debugCount=" +
1209                                                debugCount + " keyA=" +
1210                                                keyA.getData() + " keyB=" +
1211                                                keyB.getData());
1212 
1213                 }
1214                 if (!Arrays.equals(dataA.getData(), dataB.getData())) {
1215                     if (isGroupDB &&
1216                         equalsNode(dataA.getData(), dataB.getData(),
1217                                    nodeBinding)) {
1218                         continue;
1219                     }
1220                     throw new RuntimeException("Mismatch: debugCount=" +
1221                                                debugCount + " dataA=" +
1222                                                dataA.getData() + " dataB=" +
1223                                                dataB.getData());
1224                 }
1225                 if (isGroupDB &&
1226                     (nodeBinding == null) &&
1227                     RepGroupDB.GROUP_KEY.equals(
1228                         StringBinding.entryToString(keyA))) {
1229                     final RepGroupImpl group =
1230                         new GroupBinding().entryToObject(dataA);
1231                     nodeBinding = new NodeBinding(group.getFormatVersion());
1232                 }
1233             }
1234             if (cursorB.getNext(keyB, dataB, LockMode.DEFAULT) ==
1235                 OperationStatus.SUCCESS) {
1236                 throw new RuntimeException("Mismatch: debugCount=" +
1237                                            debugCount + " keyA is missing" +
1238                                            " keyB=" + keyB.getData() +
1239                                            " dataB=" + dataB.getData());
1240             }
1241             return debugCount;
1242         } catch (DatabaseException e) {
1243             throw new RuntimeException(e);
1244         } finally {
1245             try {
1246                 if (cursorA != null) {
1247                     cursorA.close();
1248                 }
1249                 if (cursorB != null) {
1250                     cursorB.close();
1251                 }
1252                 if (txnA != null) {
1253                     txnA.commit();
1254                 }
1255                 if (txnB != null) {
1256                     txnB.commit();
1257                 }
1258             } catch (DatabaseException e) {
1259                 throw new RuntimeException(e);
1260             }
1261         }
1262     }
1263 
1264     /*
1265      * Implements a special check for group nodes which skips the syncup field.
1266      */
equalsNode(byte[] data1, byte[] data2, NodeBinding nodeBinding)1267     private static boolean equalsNode(byte[] data1, byte[] data2,
1268                                       NodeBinding nodeBinding) {
1269         Assert.assertNotNull("Node binding", nodeBinding);
1270         RepNodeImpl n1 = nodeBinding.entryToObject(new TupleInput(data1));
1271         RepNodeImpl n2 = nodeBinding.entryToObject(new TupleInput(data2));
1272         return n1.equivalent(n2);
1273     }
1274 
1275     /**
1276      * @throws InterruptedException
1277      * @throws IOException
1278      * @throws RuntimeException if envA and envB don't have the same set of
1279      * VLSN mappings, VLSN-tagged log entries, and replication sequences.
1280      */
1281     @SuppressWarnings("unused")
checkStreamIntersection(String nodeA, String nodeB, RepImpl repA, RepImpl repB, VLSN limit, boolean verbose)1282     private static void checkStreamIntersection(String nodeA,
1283                                                 String nodeB,
1284                                                 RepImpl repA,
1285                                                 RepImpl repB,
1286                                                 VLSN limit,
1287                                                 boolean verbose)
1288         throws IOException, InterruptedException {
1289 
1290         if (verbose) {
1291             System.out.println("Check intersection for " + nodeA +
1292                                " and " + nodeB);
1293         }
1294 
1295         VLSNIndex repAMap = repA.getVLSNIndex();
1296         VLSNRange repARange = repAMap.getRange();
1297         VLSNIndex repBMap = repB.getVLSNIndex();
1298         VLSNRange repBRange = repBMap.getRange();
1299 
1300         /*
1301          * Compare the vlsn ranges held on each environment and find the subset
1302          * common to both replicas.
1303          */
1304         VLSN firstA = repARange.getFirst();
1305         VLSN lastA = repARange.getLast();
1306         VLSN firstB = repBRange.getFirst();
1307         VLSN lastB = repBRange.getLast();
1308         VLSN lastSyncA = repARange.getLastSync();
1309 
1310         if (lastA.compareTo(limit) < 0) {
1311             throw new RuntimeException
1312                 ("CheckRepStream error: repA (" + repA.getNameIdPair() +
1313                  ") lastVLSN = " + lastA +
1314                  " < limit = " + limit);
1315         }
1316 
1317         if (lastB.compareTo(limit) < 0) {
1318             throw new RuntimeException
1319                 ("CheckRepStream error: repB (" + repB.getNameIdPair() +
1320                  ") lastVLSN = " + lastB +
1321                  " < limit = " + limit + ")");
1322         }
1323 
1324         /*
1325          * Calculate the largest VLSN range starting point and the smallest
1326          * VLSN range ending point for these two Replicators.
1327          */
1328         VLSN firstLarger = (firstA.compareTo(firstB) > 0) ? firstA : firstB;
1329         VLSN lastSmaller = (lastA.compareTo(lastB) < 0) ? lastA : lastB;
1330 
1331         try {
1332             /* The two replicas can read from the larger of the first VLSNs. */
1333             FeederReader readerA = new FeederReader(repA,
1334                                                     repAMap,
1335                                                     DbLsn.NULL_LSN,
1336                                                     100000,
1337                                                     repA.getNameIdPair());
1338             readerA.initScan(firstLarger);
1339 
1340             FeederReader readerB = new FeederReader(repB,
1341                                                     repBMap,
1342                                                     DbLsn.NULL_LSN,
1343                                                     100000,
1344                                                     repB.getNameIdPair());
1345             readerB.initScan(firstLarger);
1346 
1347             /* They should both find the smaller of the last VLSNs. */
1348             for (long vlsnVal = firstLarger.getSequence();
1349                  vlsnVal <= lastSmaller.getSequence();
1350                  vlsnVal++) {
1351 
1352                 OutputWireRecord wireRecordA =
1353                     readerA.scanForwards(new VLSN(vlsnVal), 0);
1354                 OutputWireRecord wireRecordB =
1355                     readerB.scanForwards(new VLSN(vlsnVal), 0);
1356 
1357                 if (!(wireRecordA.match(wireRecordB))) {
1358                     throw new RuntimeException(nodeA + " at vlsn " + vlsnVal +
1359                                                " has " + wireRecordA + " " +
1360                                                nodeB  + " has " + wireRecordB);
1361                 }
1362 
1363                 /* Check that db id, node id, txn id are negative. */
1364                 if (!repA.isRepConverted()) {
1365                     wireRecordA.verifyNegativeSequences(nodeA);
1366                 }
1367                 if (!repB.isRepConverted()) {
1368                     wireRecordB.verifyNegativeSequences(nodeB);
1369                 }
1370             }
1371 
1372             if (verbose) {
1373                 System.out.println("Checked from vlsn " + firstLarger +
1374                                    " to " + lastSmaller);
1375             }
1376         } catch (Exception e) {
1377             e.printStackTrace();
1378 
1379             System.err.println(nodeA + " vlsnMap=");
1380             repAMap.dumpDb(true);
1381             System.err.println(nodeB + " vlsnMap=");
1382             repBMap.dumpDb(true);
1383 
1384             throw new RuntimeException(e);
1385         }
1386     }
1387 
1388     /**
1389      * Return the number of nodes that constitute a quorum for this size
1390      * group. This should be replaced by ReplicaAckPolicy.requiredNodes;
1391      */
getQuorumSize(int groupSize)1392     public static int getQuorumSize(int groupSize) {
1393         assert groupSize > 0 : "groupSize = " + groupSize;
1394         if (groupSize == 1) {
1395             return 1;
1396         } else if (groupSize == 2) {
1397             return 1;
1398         } else {
1399             return (groupSize/2) + 1;
1400         }
1401     }
1402 
1403     /**
1404      * Create a rep group of a specified size on the local host, using the
1405      * default port configuration.
1406      *
1407      * @param electableNodes number of electable nodes in test group
1408      * @param monitorNodes number of monitor nodes in test group
1409      *
1410      * @return the simulated test RepGroup
1411      *
1412      * @throws UnknownHostException
1413      */
createTestRepGroup(int electableNodes, int monitorNodes)1414     public static RepGroupImpl createTestRepGroup(int electableNodes,
1415                                                   int monitorNodes)
1416         throws UnknownHostException {
1417 
1418         return createTestRepGroup(electableNodes, monitorNodes, 0);
1419     }
1420 
1421     /**
1422      * Create a rep group of a specified size on the local host, using the
1423      * default port configuration.
1424      *
1425      * @param electableNodes number of electable nodes in test group
1426      * @param monitorNodes number of monitor nodes in test group
1427      * @param secondaryNodes number of secondary nodes in the test group
1428      *
1429      * @return the simulated test RepGroup
1430      *
1431      * @throws UnknownHostException
1432      */
createTestRepGroup(int electableNodes, int monitorNodes, int secondaryNodes)1433     public static RepGroupImpl createTestRepGroup(int electableNodes,
1434                                                   int monitorNodes,
1435                                                   int secondaryNodes)
1436         throws UnknownHostException {
1437 
1438         Map<Integer, RepNodeImpl> allNodeInfo =
1439             new HashMap<Integer, RepNodeImpl>();
1440         final InetAddress ia = InetAddress.getLocalHost();
1441         int port = getDefaultPort();
1442         RepGroupImpl repGroup = new RepGroupImpl("TestGroup", null);
1443 
1444         for (int i=1; i <= electableNodes; i++) {
1445             allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("node"+i,i),
1446                                                NodeType.ELECTABLE,
1447                                                true,
1448                                                false,
1449                                                ia.getHostName(),
1450                                                port,
1451                                                repGroup.getChangeVersion(),
1452                                                null));
1453             port++;
1454         }
1455         for (int i= (electableNodes+1);
1456              i <= (electableNodes+monitorNodes);
1457              i++) {
1458             allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("mon"+i,i),
1459                                                NodeType.MONITOR,
1460                                                true,
1461                                                false,
1462                                                ia.getHostName(),
1463                                                port,
1464                                                repGroup.getChangeVersion(),
1465                                                null));
1466             port++;
1467         }
1468         for (int i = electableNodes + monitorNodes + 1;
1469              i <= electableNodes + monitorNodes + secondaryNodes;
1470              i++) {
1471             allNodeInfo.put(i, new RepNodeImpl(new NameIdPair("sec" + i, i),
1472                                                NodeType.SECONDARY,
1473                                                true,
1474                                                false,
1475                                                ia.getHostName(),
1476                                                port,
1477                                                repGroup.getChangeVersion(),
1478                                                null));
1479             port++;
1480         }
1481         repGroup.setNodes(allNodeInfo);
1482         return repGroup;
1483     }
1484 
1485     public static class RepEnvInfo {
1486         private final File envHome;
1487         private final ReplicationConfig repConfig;
1488         private EnvironmentConfig envConfig;
1489         private QuorumPolicy initialElectionPolicy =
1490             QuorumPolicy.SIMPLE_MAJORITY;
1491 
1492         private ReplicatedEnvironment repEnv = null;
1493 
RepEnvInfo(File envHome, ReplicationConfig repConfig, EnvironmentConfig envConfig)1494         public RepEnvInfo(File envHome,
1495                           ReplicationConfig repConfig,
1496                           EnvironmentConfig envConfig) {
1497             super();
1498             this.envHome = envHome;
1499             this.repConfig = repConfig;
1500             this.envConfig = envConfig;
1501         }
1502 
openEnv()1503         public ReplicatedEnvironment openEnv() {
1504             if (repEnv != null) {
1505                 throw new IllegalStateException("rep env already exists");
1506             }
1507 
1508             repEnv = new ReplicatedEnvironment(envHome,
1509                                                getRepConfig(),
1510                                                envConfig,
1511                                                null,
1512                                                initialElectionPolicy);
1513             return repEnv;
1514         }
1515 
openEnv(ReplicaConsistencyPolicy cp)1516         public ReplicatedEnvironment openEnv(ReplicaConsistencyPolicy cp) {
1517 
1518             if (repEnv != null) {
1519                 throw new IllegalStateException("rep env already exists");
1520             }
1521             repEnv = new ReplicatedEnvironment
1522                 (envHome, getRepConfig(), envConfig, cp,
1523                  initialElectionPolicy);
1524             return repEnv;
1525         }
1526 
openEnv(RepEnvInfo helper)1527         public ReplicatedEnvironment openEnv(RepEnvInfo helper) {
1528 
1529             repConfig.setHelperHosts((helper == null) ?
1530                                      repConfig.getNodeHostPort() :
1531                                      helper.getRepConfig().getNodeHostPort());
1532             return openEnv();
1533         }
1534 
getEnv()1535         public ReplicatedEnvironment getEnv() {
1536             return repEnv;
1537         }
1538 
getRepImpl()1539         public RepImpl getRepImpl() {
1540             return RepInternal.getRepImpl(repEnv);
1541         }
1542 
getRepNode()1543         public RepNode getRepNode() {
1544             return getRepImpl().getRepNode();
1545         }
1546 
getRepConfig()1547         public ReplicationConfig getRepConfig() {
1548             return repConfig;
1549         }
1550 
getEnvHome()1551         public File getEnvHome() {
1552             return envHome;
1553         }
1554 
setEnvConfig(final EnvironmentConfig envConfig)1555         public void setEnvConfig(final EnvironmentConfig envConfig) {
1556             this.envConfig = envConfig;
1557         }
1558 
getEnvConfig()1559         public EnvironmentConfig getEnvConfig() {
1560             return envConfig;
1561         }
1562 
getInitialElectionPolicy()1563         public QuorumPolicy getInitialElectionPolicy() {
1564             return initialElectionPolicy;
1565         }
1566 
setInitialElectionPolicy( final QuorumPolicy initialElectionPolicy)1567         public void setInitialElectionPolicy(
1568             final QuorumPolicy initialElectionPolicy) {
1569 
1570             this.initialElectionPolicy = initialElectionPolicy;
1571         }
1572 
closeEnv()1573         public void closeEnv()  {
1574             try {
1575                 if (repEnv != null) {
1576                    repEnv.close();
1577                 }
1578             } finally {
1579                 repEnv = null;
1580             }
1581         }
1582 
1583         /**
1584          * Convenience method that guards against a NPE when checking whether
1585          * the state of a node is MASTER.
1586          */
isMaster()1587         public boolean isMaster() {
1588             return (repEnv != null) && repEnv.getState().isMaster();
1589         }
1590 
1591         /**
1592          * Convenience method that guards against a NPE when checking whether
1593          * the state of a node is REPLICA.
1594          */
isReplica()1595         public boolean isReplica() {
1596             return (repEnv != null) && repEnv.getState().isReplica();
1597         }
1598 
1599         /**
1600          * Convenience method that guards against a NPE when checking whether
1601          * the state of a node is UNKNOWN.
1602          */
isUnknown()1603         public boolean isUnknown() {
1604             return (repEnv != null) && repEnv.getState().isUnknown();
1605         }
1606 
1607         /**
1608          * Simulate a crash of the environment, don't do a graceful close.
1609          */
abnormalCloseEnv()1610         public void abnormalCloseEnv() {
1611             try {
1612                 if (repEnv.isValid()) {
1613 
1614                     /*
1615                      * Although we want an abnormal close, we do want to flush.
1616                      * And if the env is valid, we expect it to work; so avoid
1617                      * ignoring exceptions from this call.
1618                      */
1619                     RepInternal.getRepImpl(repEnv).getLogManager().
1620                         flushNoSync();
1621                 }
1622                 try {
1623                     RepInternal.getRepImpl(repEnv).abnormalClose();
1624                 } catch (DatabaseException ignore) {
1625 
1626                     /*
1627                      * The close will face problems like unclosed txns, ignore.
1628                      * We're trying to simulate a crash.
1629                      */
1630                 }
1631             } finally {
1632                 repEnv = null;
1633             }
1634         }
1635 
1636         @Override
toString()1637         public String toString() {
1638             return (repEnv == null) ?
1639                     envHome.toString() : repEnv.getNodeName();
1640         }
1641     }
1642 
stackTraceString(final Throwable exception)1643     public static String stackTraceString(final Throwable exception) {
1644         ByteArrayOutputStream bao = new ByteArrayOutputStream();
1645         PrintStream printStream = new PrintStream(bao);
1646         exception.printStackTrace(printStream);
1647         String stackTraceString = bao.toString();
1648         return stackTraceString;
1649     }
1650 
1651     /**
1652      * Restarts a group associated with an existing environment on disk.
1653      * Returns the environment associated with the master.
1654      */
1655     public static ReplicatedEnvironment
restartGroup(RepEnvInfo .... repEnvInfo)1656         restartGroup(RepEnvInfo ... repEnvInfo) {
1657 
1658         return restartGroup(false /*replicasOnly*/, false, repEnvInfo);
1659     }
1660 
1661     public static ReplicatedEnvironment
restartGroup(boolean allowILE, RepEnvInfo ... repEnvInfo)1662         restartGroup(boolean allowILE, RepEnvInfo ... repEnvInfo) {
1663 
1664         return restartGroup(false, allowILE, repEnvInfo);
1665     }
1666 
1667     /**
1668      * Restarts a group of replicas associated with an existing environment on
1669      * disk.
1670      */
restartReplicas(RepEnvInfo .... repEnvInfo)1671     public static void restartReplicas(RepEnvInfo ... repEnvInfo) {
1672 
1673         restartGroup(true /*replicasOnly*/, false, repEnvInfo);
1674     }
1675 
1676     /**
1677      * Restarts a group associated with an existing environment on disk.
1678      * Returns the environment associated with the master.
1679      */
1680     private static ReplicatedEnvironment
restartGroup(boolean replicasOnly, boolean allowILE, RepEnvInfo ... repEnvInfo)1681         restartGroup(boolean replicasOnly,
1682                      boolean allowILE,
1683                      RepEnvInfo ... repEnvInfo) {
1684 
1685         /* To avoid the jdk bug: NullPointerException in Selector.open(). The
1686          * bug report can be found in
1687          * http://bugs.sun.com/view_bug.do?bug_id=6427854
1688          */
1689         System.setProperty("sun.nio.ch.bugLevel",
1690                            System.getProperty("sun.nio.ch.bugLevel",""));
1691 
1692         /* Restart the group, a thread for each node. */
1693         JoinThread threads[] = new JoinThread[repEnvInfo.length];
1694         for (int i=0; i < repEnvInfo.length; i++) {
1695             threads[i]= new JoinThread(repEnvInfo[i], allowILE);
1696             threads[i].start();
1697         }
1698 
1699         /*
1700          * Wait for each thread to have joined the group. The group must be
1701          * re-started in parallel to ensure that all nodes are up and elections
1702          * can be held.
1703          */
1704         for (int i=0; i < repEnvInfo.length; i++) {
1705             JoinThread jt = threads[i];
1706             try {
1707                 jt.join(JOIN_WAIT_TIME);
1708             } catch (InterruptedException e) {
1709                 throw new RuntimeException(e);
1710             }
1711             final Throwable exception = jt.testException;
1712             if (exception != null) {
1713                 throw new RuntimeException(
1714                     "Join thread exception for " + repEnvInfo[i] +
1715                     " still alive = " + jt.isAlive() + "\n" +
1716                     RepTestUtils.stackTraceString(exception));
1717             }
1718             if (jt.isAlive()) {
1719                 throw new IllegalStateException(
1720                     "Join thread for " + repEnvInfo[i] +
1721                     " still alive after " + JOIN_WAIT_TIME + "ms," +
1722                     " and testException is null.");
1723             }
1724         }
1725 
1726         /* All join threads are quiescent, now pick the master. */
1727         if (replicasOnly) {
1728             return null;
1729         }
1730 
1731         return getMaster(repEnvInfo, false /*openIfNeeded*/);
1732     }
1733 
1734     /**
1735      * Find the authoritative master (wait for election to quiesce).
1736      */
getMaster(RepEnvInfo[] repEnvInfo, boolean openIfNeeded)1737     public static ReplicatedEnvironment getMaster(RepEnvInfo[] repEnvInfo,
1738                                                   boolean openIfNeeded) {
1739 
1740         final int maxRetries = 100;
1741         int retries = maxRetries;
1742         while (true) {
1743             int masterId = -1;
1744             boolean multipleMasters = false;
1745             boolean nonAuthoritativeMaster = false;
1746             for (int i = 0; i < repEnvInfo.length; i++) {
1747                 if (openIfNeeded && repEnvInfo[i].getEnv() == null) {
1748                     final boolean VERBOSE = false;
1749                     if (VERBOSE) {
1750                         System.out.println("Opening node " + (i + 1));
1751                     }
1752                     repEnvInfo[i].openEnv();
1753                 }
1754                 if (repEnvInfo[i].getEnv().getState().isMaster()) {
1755                     if (!repEnvInfo[i].getRepImpl().getRepNode().
1756                         isAuthoritativeMaster()) {
1757                         nonAuthoritativeMaster = true;
1758                     }
1759                     if (masterId >= 0) {
1760                         multipleMasters = true;
1761                     } else {
1762                         masterId = i;
1763                     }
1764                 }
1765             }
1766             if (masterId >= 0 &&
1767                 !multipleMasters &&
1768                 !nonAuthoritativeMaster) {
1769                 return repEnvInfo[masterId].getEnv();
1770             }
1771             if (--retries >= 0) {
1772                 try {
1773                     Thread.sleep(1000);
1774                 } catch (InterruptedException e) {
1775                     throw new RuntimeException(e);
1776                 }
1777                 continue;
1778             }
1779             if (nonAuthoritativeMaster) {
1780                 throw new IllegalStateException(
1781                     "Non-authoritative master after " +
1782                     maxRetries + " retries.");
1783             }
1784             if (multipleMasters) {
1785                 throw new IllegalStateException(
1786                     "More than one master in group after " +
1787                     maxRetries + " retries.");
1788             }
1789             if (masterId < 0) {
1790                 throw new IllegalStateException
1791                     ("Node id of the elected master is invalid.");
1792             }
1793         }
1794     }
1795 
1796     /**
1797      * Threads used to simulate a parallel join group when multiple replication
1798      * nodes are first brought up for an existing environment.
1799      */
1800     private static class JoinThread extends Thread {
1801 
1802         final RepEnvInfo repEnvInfo;
1803         final boolean allowILE;
1804 
1805         /*
1806          * Captures any exception encountered in the process of joining.  The
1807          * presence of a non-null testException field indicates to the caller
1808          * that the join failed.
1809          */
1810         volatile Throwable testException = null;
1811         private static final int NUM_RETRIES=100;
1812 
1813         /* The state of the node at the time of joining the group. */
1814         @SuppressWarnings("unused")
1815         ReplicatedEnvironment.State state =
1816             ReplicatedEnvironment.State.UNKNOWN;
1817 
JoinThread(RepEnvInfo repEnvInfo, boolean allowILE)1818         JoinThread(RepEnvInfo repEnvInfo, boolean allowILE) {
1819             this.repEnvInfo = repEnvInfo;
1820             this.allowILE = allowILE;
1821         }
1822 
1823         @Override
run()1824         public void run() {
1825             /*
1826              * The open of this environment may fail due to timing mishaps if
1827              * the environment has just been shutdown, as can happen in a
1828              * number of tests that repeatedly open and close
1829              * environments. Retry a few time to give the node a chance to
1830              * settle down.
1831              */
1832             int numRetries = 0;
1833             while (numRetries < NUM_RETRIES) {
1834                 try {
1835                     state = repEnvInfo.openEnv().getState();
1836                     testException = null;
1837                     break;
1838                 } catch (InsufficientLogException ile) {
1839                     if (allowILE) {
1840                         NetworkRestore restore = new NetworkRestore();
1841                         NetworkRestoreConfig nrc = new NetworkRestoreConfig();
1842                         nrc.setRetainLogFiles(false);
1843                         restore.execute(ile, nrc);
1844                         state = repEnvInfo.openEnv().getState();
1845                         testException = null;
1846                     } else {
1847                         testException = ile;
1848                     }
1849                     break;
1850                 } catch (GroupShutdownException ge) {
1851                     /* Retry, this node is still shutting down. */
1852                     numRetries++;
1853                     testException = ge;
1854                     try {
1855                         Thread.sleep(100);
1856                     } catch (InterruptedException ignore) {
1857                     }
1858                 } catch (Throwable e) {
1859                     testException = e;
1860                     break;
1861                 }
1862             }
1863         }
1864     }
1865 
1866     /**
1867      * Issue DbSync on a group. All nodes are presumed to be closed.
1868      *
1869      * @param timeoutMs is the DbSync timeout (max time for replica to catch up
1870      * with master) as well as the join timeout for each thread calling DbSync.
1871      */
syncupGroup(long timeoutMs, RepEnvInfo ... repEnvInfo)1872     public static void syncupGroup(long timeoutMs, RepEnvInfo ... repEnvInfo) {
1873 
1874         /*
1875          * The call to DbSync blocks until the sync is done, so it must
1876          * be executed concurrently by a set of threads.
1877          */
1878         SyncThread threads[] = new SyncThread[repEnvInfo.length];
1879         String helperHost = repEnvInfo[0].getRepConfig().getNodeHostPort();
1880         for (int i=0; i < repEnvInfo.length; i++) {
1881             threads[i]= new SyncThread(timeoutMs, repEnvInfo[i], helperHost);
1882             threads[i].start();
1883         }
1884 
1885         /*
1886          * Wait for each thread to open, sync, and close the node.
1887          */
1888         for (int i=0; i < repEnvInfo.length; i++) {
1889             SyncThread t = threads[i];
1890             try {
1891                 t.join(timeoutMs);
1892             } catch (InterruptedException e) {
1893                 throw new RuntimeException(e);
1894             }
1895 
1896             if (t.isAlive()) {
1897                 throw new IllegalStateException("Expect SyncThread " + i +
1898                                                 " dead, but it's alive.");
1899             }
1900             final Throwable exception = t.testException;
1901             if (exception != null) {
1902                 throw new RuntimeException
1903                     ("Join thread exception.\n" +
1904                      RepTestUtils.stackTraceString(exception));
1905             }
1906         }
1907     }
1908 
1909     /**
1910      * Threads used to simulate a parallel join group when multiple replication
1911      * nodes are first brought up for an existing environment.
1912      */
1913     private static class SyncThread extends Thread {
1914 
1915         final RepEnvInfo repEnvInfo;
1916         final String helperHost;
1917         final long timeoutMs;
1918 
1919         /* Captures any exception encountered in the process of joining. */
1920         Throwable testException = null;
1921 
SyncThread(long timeoutMs, RepEnvInfo repEnvInfo, String helperHost)1922         SyncThread(long timeoutMs, RepEnvInfo repEnvInfo, String helperHost) {
1923             this.timeoutMs = timeoutMs;
1924             this.repEnvInfo = repEnvInfo;
1925             this.helperHost = helperHost;
1926         }
1927 
1928         @Override
run()1929         public void run() {
1930             try {
1931                 ReplicationConfig config = repEnvInfo.getRepConfig();
1932                 DbSync syncAgent =
1933                     new DbSync(repEnvInfo.getEnvHome().toString(),
1934                                repEnvInfo.getEnvConfig(),
1935                                config,
1936                                helperHost,
1937                                timeoutMs);
1938                 syncAgent.sync();
1939             } catch (Throwable e) {
1940                 testException = e;
1941             }
1942         }
1943     }
1944 
1945     /**
1946      * Disables network listening services, as a way of simulating a network partition
1947      * for testing.
1948      */
disableServices(final RepEnvInfo repEnvInfo)1949     public static void disableServices(final RepEnvInfo repEnvInfo) {
1950         final ServiceDispatcher sd1 = repEnvInfo.getRepNode().getServiceDispatcher();
1951         sd1.setSimulateIOException(Learner.SERVICE_NAME, true);
1952         sd1.setSimulateIOException(Acceptor.SERVICE_NAME, true);
1953         sd1.setSimulateIOException(FeederManager.FEEDER_SERVICE, true);
1954     }
1955 
1956     /**
1957      * Re-enables network services, to reverse the effect of a simulated
1958      * network partition.
1959      * @see #disableServices
1960      */
reenableServices(final RepEnvInfo repEnvInfo)1961     public static void reenableServices(final RepEnvInfo repEnvInfo) {
1962         final ServiceDispatcher sd1 = repEnvInfo.getRepNode().getServiceDispatcher();
1963         sd1.setSimulateIOException(Learner.SERVICE_NAME, false);
1964         sd1.setSimulateIOException(Acceptor.SERVICE_NAME, false);
1965         sd1.setSimulateIOException(FeederManager.FEEDER_SERVICE, false);
1966     }
1967 
awaitCondition(Callable<Boolean> predicate)1968     public static void awaitCondition(Callable<Boolean> predicate)
1969         throws Exception {
1970 
1971         awaitCondition(predicate, 5000);
1972     }
1973 
awaitCondition(Callable<Boolean> predicate, long timeout)1974     public static void awaitCondition(Callable<Boolean> predicate,
1975                                       long timeout)
1976         throws Exception {
1977 
1978         boolean done = false;
1979         long deadline = System.currentTimeMillis() + timeout;
1980         while (System.currentTimeMillis() < deadline) {
1981             if (predicate.call()) {
1982                 done = true;
1983                 break;
1984             }
1985             Thread.sleep(100);
1986         }
1987         Assert.assertTrue(done);
1988     }
1989 
1990     /**
1991      * Used for testing to force consistency checks to fail.
1992      */
1993     public static class AlwaysFail implements ReplicaConsistencyPolicy {
1994 
1995         public static final String NAME = "AlwaysFailConsistency";
1996 
AlwaysFail()1997         public AlwaysFail() {
1998         }
1999 
2000         @Override
ensureConsistency(EnvironmentImpl repInstance)2001         public void ensureConsistency(EnvironmentImpl repInstance)
2002             throws InterruptedException {
2003 
2004             throw new ReplicaConsistencyException("Always fails for testing",
2005                                                   this);
2006         }
2007 
2008         /**
2009          * Always returns 0, no timeout is needed for this policy.
2010          */
2011         @Override
getTimeout(TimeUnit unit)2012         public long getTimeout(TimeUnit unit) {
2013             return 1;
2014         }
2015 
2016         @Override
getName()2017         public String getName() {
2018             return NAME;
2019         }
2020     }
2021 
2022     /**
2023      * Set the basic SSL properties.  These rely on the build.xml configuration
2024      * that copies keystore and truststore files to the test environment.
2025      */
setUnitTestSSLProperties(Properties props)2026     public static void setUnitTestSSLProperties(Properties props) {
2027         File destDir = SharedTestUtils.getDestDir();
2028         String sslPath = new File(destDir.getPath(), "ssl").getPath();
2029 
2030         props.put("je.rep.channelType", "ssl");
2031         props.put("je.rep.ssl.keyStoreFile",
2032                   new File(sslPath, "keys.store").getPath());
2033         props.put("je.rep.ssl.keyStorePassword", "unittest");
2034         props.put("je.rep.ssl.trustStoreFile",
2035                   new File(sslPath, "trust.store").getPath());
2036         props.put("je.rep.ssl.clientKeyAlias", "mykey");
2037         props.put("je.rep.ssl.serverKeyAlias", "mykey");
2038     }
2039 
2040     /**
2041      * Used for testing to force consistency checks to fail.  Register the
2042      * format at the beginning of the test as follows:
2043      *
2044      *  // Register custom consistency policy format while quiescent.
2045      *  RepUtils.addConsistencyPolicyFormat
2046      *      (RepTestUtils.AlwaysFail.NAME,
2047      *       new RepTestUtils.AlwaysFailFormat());
2048      */
2049     public static class AlwaysFailFormat
2050         implements ConsistencyPolicyFormat<AlwaysFail> {
2051 
2052         @Override
policyToString(final AlwaysFail policy)2053         public String policyToString(final AlwaysFail policy) {
2054             return AlwaysFail.NAME;
2055         }
2056 
2057         @Override
stringToPolicy(final String string)2058         public AlwaysFail stringToPolicy(final String string) {
2059             return new AlwaysFail();
2060         }
2061     }
2062 
2063     /**
2064      * Wait until a replica/feeder syncup has been tried numSyncupAttempt times
2065      * on this node.
2066      */
setupWaitForSyncup(final ReplicatedEnvironment node, int numSyncupAttempts)2067     public static CountDownLatch setupWaitForSyncup
2068         (final ReplicatedEnvironment node, int numSyncupAttempts) {
2069         final CountDownLatch  waiter = new CountDownLatch(numSyncupAttempts);
2070 
2071         TestHook<Object> syncupFinished = new TestHook<Object>() {
2072             @Override
2073             public void doHook() throws InterruptedException {
2074                 waiter.countDown();
2075             }
2076         };
2077 
2078         RepInternal.getRepImpl(node).getRepNode().
2079             replica().setReplicaFeederSyncupHook(syncupFinished);
2080         return waiter;
2081     }
2082 
2083 
2084     /**
2085      * Modify the existing rep configuration with the new parameter value pair.
2086      */
setConfigParam(ConfigParam param, String value, RepEnvInfo repEnvInfo[])2087     public static void setConfigParam(ConfigParam param,
2088                                       String value,
2089                                       RepEnvInfo repEnvInfo[]) {
2090 
2091         for (RepEnvInfo info : repEnvInfo) {
2092             info.getRepConfig().setConfigParam(param.getName(), value);
2093         }
2094     }
2095 }
2096