1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002-2014 Oracle. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep; 9 import static org.junit.Assert.assertEquals; 10 import static org.junit.Assert.assertTrue; 11 12 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.atomic.AtomicInteger; 14 15 import org.junit.Test; 16 17 import com.sleepycat.je.Database; 18 import com.sleepycat.je.StatsConfig; 19 import com.sleepycat.je.Transaction; 20 import com.sleepycat.je.TransactionConfig; 21 import com.sleepycat.je.rep.ReplicatedEnvironment.State; 22 import com.sleepycat.je.rep.impl.RepTestBase; 23 import com.sleepycat.je.rep.utilint.RepTestUtils; 24 25 /** 26 * test for group commit functionality 27 */ 28 public class GroupCommitTest extends RepTestBase { 29 30 @Override setUp()31 public void setUp() 32 throws Exception { 33 34 /* need just one replica for this test. */ 35 groupSize = 2; 36 37 super.setUp(); 38 } 39 40 /** 41 * Verify that group commits can be initiated by either exceeding the 42 * time interval, or the group commit size. 43 */ 44 @Test testBasic()45 public void testBasic() 46 throws InterruptedException { 47 48 /* Use a very generous full second for the group commit interval. */ 49 final long intervalNs = TimeUnit.SECONDS.toNanos(1); 50 final int maxGroupCommit = 4; 51 52 initGroupCommitConfig(intervalNs, maxGroupCommit); 53 54 createGroup(); 55 State state = repEnvInfo[0].getEnv().getState(); 56 assertEquals(State.MASTER, state); 57 ReplicatedEnvironment menv = repEnvInfo[0].getEnv(); 58 ReplicatedEnvironment renv = repEnvInfo[1].getEnv(); 59 60 long startNs = System.nanoTime(); 61 final StatsConfig statsConfig = new StatsConfig().setClear(true); 62 63 /* Clear and discard stats. */ 64 renv.getRepStats(statsConfig); 65 66 /* Just a single write. */ 67 doWrites(menv, 1); 68 69 ReplicatedEnvironmentStats rstats = renv.getRepStats(statsConfig); 70 71 /* Verify that the group commit was the result of a timeout. */ 72 assertTrue((System.nanoTime() - startNs) > intervalNs); 73 74 assertEquals(1, rstats.getNReplayGroupCommitTxns()); 75 assertEquals(1, rstats.getNReplayGroupCommits()); 76 assertEquals(1, rstats.getNReplayGroupCommitTimeouts()); 77 assertEquals(0, rstats.getNReplayCommitSyncs()); 78 assertEquals(1, rstats.getNReplayCommitNoSyncs()); 79 80 /* Now force an exact group commit size overflow. */ 81 doWrites(menv, maxGroupCommit); 82 rstats = renv.getRepStats(statsConfig); 83 84 assertEquals(maxGroupCommit, rstats.getNReplayGroupCommitTxns()); 85 assertEquals(1, rstats.getNReplayGroupCommits()); 86 assertEquals(0, rstats.getNReplayGroupCommitTimeouts()); 87 assertEquals(0, rstats.getNReplayCommitSyncs()); 88 assertEquals(maxGroupCommit, rstats.getNReplayCommitNoSyncs()); 89 90 /* Group commit size + 1 timeout txn */ 91 doWrites(menv, maxGroupCommit + 1); 92 rstats = renv.getRepStats(statsConfig); 93 94 assertEquals(maxGroupCommit + 1, rstats.getNReplayGroupCommitTxns()); 95 assertEquals(2, rstats.getNReplayGroupCommits()); 96 assertEquals(1, rstats.getNReplayGroupCommitTimeouts()); 97 assertEquals(0, rstats.getNReplayCommitSyncs()); 98 assertEquals(maxGroupCommit + 1, rstats.getNReplayCommitNoSyncs()); 99 } 100 initGroupCommitConfig(final long intervalMs, final int maxGroupCommit)101 private void initGroupCommitConfig(final long intervalMs, 102 final int maxGroupCommit) 103 throws IllegalArgumentException { 104 105 for (int i=0; i < groupSize; i++) { 106 repEnvInfo[i].getRepConfig(). 107 setConfigParam(ReplicationConfig.REPLICA_GROUP_COMMIT_INTERVAL, 108 intervalMs + " ns"); 109 repEnvInfo[i].getRepConfig(). 110 setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, 111 Integer.toString(maxGroupCommit)); 112 } 113 } 114 115 /** 116 * Verify that group commits can be turned off. 117 */ 118 @Test testGroupCommitOff()119 public void testGroupCommitOff() 120 throws InterruptedException { 121 122 /* Now turn off group commits on the replica */ 123 initGroupCommitConfig(Integer.MAX_VALUE, 0); 124 125 createGroup(); 126 /* Already joined, rejoin master. */ 127 State state = repEnvInfo[0].getEnv().getState(); 128 assertEquals(State.MASTER, state); 129 ReplicatedEnvironment menv = repEnvInfo[0].getEnv(); 130 ReplicatedEnvironment renv = repEnvInfo[1].getEnv(); 131 132 final StatsConfig statsConfig = new StatsConfig().setClear(true); 133 134 /* Clear and discard stats. */ 135 renv.getRepStats(statsConfig); 136 137 /* Just a single write. */ 138 doWrites(menv, 1); 139 140 ReplicatedEnvironmentStats rstats = renv.getRepStats(statsConfig); 141 142 assertEquals(0, rstats.getNReplayGroupCommitTxns()); 143 assertEquals(0, rstats.getNReplayGroupCommits()); 144 assertEquals(0, rstats.getNReplayGroupCommitTimeouts()); 145 assertEquals(1, rstats.getNReplayCommitSyncs()); 146 assertEquals(0, rstats.getNReplayCommitNoSyncs()); 147 } 148 doWrites(ReplicatedEnvironment menv, int count)149 void doWrites(ReplicatedEnvironment menv, int count) 150 throws InterruptedException { 151 152 final WriteThread wt[] = new WriteThread[count]; 153 154 for (int i=0; i < count; i++) { 155 wt[i] = new WriteThread(menv); 156 wt[i].start(); 157 } 158 159 for (int i=0; i < count; i++) { 160 wt[i].join(60000); 161 } 162 } 163 164 /* Used as the basis for producing unique db names. */ 165 private static AtomicInteger dbId = new AtomicInteger(0); 166 167 /** 168 * Thread used to create concurrent updates amenable to group commits. 169 */ 170 private class WriteThread extends Thread { 171 ReplicatedEnvironment menv; 172 WriteThread(ReplicatedEnvironment menv)173 WriteThread(ReplicatedEnvironment menv) { 174 super(); 175 this.menv = menv; 176 } 177 178 @Override run()179 public void run() { 180 final TransactionConfig mtc = new TransactionConfig(); 181 mtc.setDurability(RepTestUtils.SYNC_SYNC_ALL_DURABILITY); 182 Transaction mt = menv.beginTransaction(null, mtc); 183 Database db = menv.openDatabase(mt, 184 "testDB" + dbId.incrementAndGet(), 185 dbconfig); 186 mt.commit(); 187 db.close(); 188 } 189 } 190 } 191