1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002-2014 Oracle.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep;
9 import static org.junit.Assert.assertEquals;
10 import static org.junit.Assert.assertTrue;
11 
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.atomic.AtomicInteger;
14 
15 import org.junit.Test;
16 
17 import com.sleepycat.je.Database;
18 import com.sleepycat.je.StatsConfig;
19 import com.sleepycat.je.Transaction;
20 import com.sleepycat.je.TransactionConfig;
21 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
22 import com.sleepycat.je.rep.impl.RepTestBase;
23 import com.sleepycat.je.rep.utilint.RepTestUtils;
24 
25 /**
26  * test for group commit functionality
27  */
28 public class GroupCommitTest extends RepTestBase {
29 
30     @Override
setUp()31     public void setUp()
32         throws Exception {
33 
34         /* need just one replica for this test. */
35         groupSize = 2;
36 
37         super.setUp();
38     }
39 
40     /**
41      * Verify that group commits can be initiated by either exceeding the
42      * time interval, or the group commit size.
43      */
44     @Test
testBasic()45     public void testBasic()
46         throws InterruptedException {
47 
48         /* Use a very generous full second for the group commit interval. */
49         final long intervalNs = TimeUnit.SECONDS.toNanos(1);
50         final int maxGroupCommit = 4;
51 
52         initGroupCommitConfig(intervalNs, maxGroupCommit);
53 
54         createGroup();
55         State state = repEnvInfo[0].getEnv().getState();
56         assertEquals(State.MASTER, state);
57         ReplicatedEnvironment menv = repEnvInfo[0].getEnv();
58         ReplicatedEnvironment renv = repEnvInfo[1].getEnv();
59 
60         long startNs = System.nanoTime();
61         final StatsConfig statsConfig = new StatsConfig().setClear(true);
62 
63         /* Clear and discard stats. */
64         renv.getRepStats(statsConfig);
65 
66         /* Just a single write. */
67         doWrites(menv, 1);
68 
69         ReplicatedEnvironmentStats rstats = renv.getRepStats(statsConfig);
70 
71         /* Verify that the group commit was the result of a timeout. */
72         assertTrue((System.nanoTime() - startNs) > intervalNs);
73 
74         assertEquals(1, rstats.getNReplayGroupCommitTxns());
75         assertEquals(1, rstats.getNReplayGroupCommits());
76         assertEquals(1, rstats.getNReplayGroupCommitTimeouts());
77         assertEquals(0, rstats.getNReplayCommitSyncs());
78         assertEquals(1, rstats.getNReplayCommitNoSyncs());
79 
80         /* Now force an exact group commit size overflow. */
81         doWrites(menv, maxGroupCommit);
82         rstats = renv.getRepStats(statsConfig);
83 
84         assertEquals(maxGroupCommit, rstats.getNReplayGroupCommitTxns());
85         assertEquals(1, rstats.getNReplayGroupCommits());
86         assertEquals(0, rstats.getNReplayGroupCommitTimeouts());
87         assertEquals(0, rstats.getNReplayCommitSyncs());
88         assertEquals(maxGroupCommit, rstats.getNReplayCommitNoSyncs());
89 
90         /* Group commit size + 1 timeout txn */
91         doWrites(menv, maxGroupCommit + 1);
92         rstats = renv.getRepStats(statsConfig);
93 
94         assertEquals(maxGroupCommit + 1, rstats.getNReplayGroupCommitTxns());
95         assertEquals(2, rstats.getNReplayGroupCommits());
96         assertEquals(1, rstats.getNReplayGroupCommitTimeouts());
97         assertEquals(0, rstats.getNReplayCommitSyncs());
98         assertEquals(maxGroupCommit + 1, rstats.getNReplayCommitNoSyncs());
99     }
100 
initGroupCommitConfig(final long intervalMs, final int maxGroupCommit)101     private void initGroupCommitConfig(final long intervalMs,
102                                        final int maxGroupCommit)
103         throws IllegalArgumentException {
104 
105         for (int i=0; i < groupSize; i++) {
106             repEnvInfo[i].getRepConfig().
107                 setConfigParam(ReplicationConfig.REPLICA_GROUP_COMMIT_INTERVAL,
108                                intervalMs + " ns");
109             repEnvInfo[i].getRepConfig().
110                 setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT,
111                                Integer.toString(maxGroupCommit));
112         }
113     }
114 
115     /**
116      * Verify that group commits can be turned off.
117      */
118     @Test
testGroupCommitOff()119     public void testGroupCommitOff()
120         throws InterruptedException {
121 
122         /* Now turn off group commits on the replica */
123         initGroupCommitConfig(Integer.MAX_VALUE, 0);
124 
125         createGroup();
126         /* Already joined, rejoin master. */
127         State state = repEnvInfo[0].getEnv().getState();
128         assertEquals(State.MASTER, state);
129         ReplicatedEnvironment menv = repEnvInfo[0].getEnv();
130         ReplicatedEnvironment renv = repEnvInfo[1].getEnv();
131 
132         final StatsConfig statsConfig = new StatsConfig().setClear(true);
133 
134         /* Clear and discard stats. */
135         renv.getRepStats(statsConfig);
136 
137         /* Just a single write. */
138         doWrites(menv, 1);
139 
140         ReplicatedEnvironmentStats rstats = renv.getRepStats(statsConfig);
141 
142         assertEquals(0, rstats.getNReplayGroupCommitTxns());
143         assertEquals(0, rstats.getNReplayGroupCommits());
144         assertEquals(0, rstats.getNReplayGroupCommitTimeouts());
145         assertEquals(1, rstats.getNReplayCommitSyncs());
146         assertEquals(0, rstats.getNReplayCommitNoSyncs());
147     }
148 
doWrites(ReplicatedEnvironment menv, int count)149     void doWrites(ReplicatedEnvironment menv, int count)
150         throws InterruptedException {
151 
152         final WriteThread wt[] = new WriteThread[count];
153 
154         for (int i=0; i < count; i++) {
155             wt[i] = new WriteThread(menv);
156             wt[i].start();
157         }
158 
159         for (int i=0; i < count; i++) {
160             wt[i].join(60000);
161         }
162     }
163 
164     /* Used as the basis for producing unique db names. */
165     private static AtomicInteger dbId = new AtomicInteger(0);
166 
167     /**
168      * Thread used to create concurrent updates amenable to group commits.
169      */
170     private class WriteThread extends Thread {
171         ReplicatedEnvironment menv;
172 
WriteThread(ReplicatedEnvironment menv)173         WriteThread(ReplicatedEnvironment menv) {
174             super();
175             this.menv = menv;
176         }
177 
178         @Override
run()179         public void run() {
180             final TransactionConfig mtc = new TransactionConfig();
181             mtc.setDurability(RepTestUtils.SYNC_SYNC_ALL_DURABILITY);
182             Transaction mt = menv.beginTransaction(null, mtc);
183             Database db = menv.openDatabase(mt,
184                                             "testDB" + dbId.incrementAndGet(),
185                                             dbconfig);
186             mt.commit();
187             db.close();
188         }
189     }
190 }
191