1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import static org.junit.Assert.*;
22 
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicReference;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.commons.logging.impl.Log4JLogger;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.Waiter.Predicate;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Consistency;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
50 import org.apache.hadoop.hbase.util.Threads;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52 import org.apache.log4j.Level;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.rules.TestName;
59 import org.junit.runner.RunWith;
60 import org.junit.runners.Parameterized;
61 import org.junit.runners.Parameterized.Parameters;
62 
63 /**
64  * Tests failover of secondary region replicas.
65  */
66 @RunWith(Parameterized.class)
67 @Category(LargeTests.class)
68 public class TestRegionReplicaFailover {
69 
70   private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
71 
72   static {
73     ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
74   }
75 
76   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
77 
78   private static final int NB_SERVERS = 3;
79 
80   protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
81       HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
82   protected final byte[] fam = HBaseTestingUtility.fam1;
83   protected final byte[] qual1 = Bytes.toBytes("qual1");
84   protected final byte[] value1 = Bytes.toBytes("value1");
85   protected final byte[] row = Bytes.toBytes("rowA");
86   protected final byte[] row2 = Bytes.toBytes("rowB");
87 
88   @Rule public TestName name = new TestName();
89 
90   private HTableDescriptor htd;
91 
92   /*
93    * We are testing with dist log split and dist log replay separately
94    */
95   @Parameters
getParameters()96   public static Collection<Object[]> getParameters() {
97     Object[][] params =
98         new Boolean[][] { /*{true}, Disable DLR!!! It is going to be removed*/ {false} };
99     return Arrays.asList(params);
100   }
101 
102   @Parameterized.Parameter(0)
103   public boolean distributedLogReplay;
104 
105   @Before
before()106   public void before() throws Exception {
107     Configuration conf = HTU.getConfiguration();
108    // Up the handlers; this test needs more than usual.
109     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
110     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
111     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
112     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
113     conf.setInt("replication.stats.thread.period.seconds", 5);
114     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
115     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
116 
117     HTU.startMiniCluster(NB_SERVERS);
118     htd = HTU.createTableDescriptor(
119       name.getMethodName().substring(0, name.getMethodName().length()-3));
120     htd.setRegionReplication(3);
121     HTU.getHBaseAdmin().createTable(htd);
122   }
123 
124   @After
after()125   public void after() throws Exception {
126     HTU.deleteTableIfAny(htd.getTableName());
127     HTU.shutdownMiniCluster();
128   }
129 
130   /**
131    * Tests the case where a newly created table with region replicas and no data, the secondary
132    * region replicas are available to read immediately.
133    */
134   @Test(timeout = 60000)
testSecondaryRegionWithEmptyRegion()135   public void testSecondaryRegionWithEmptyRegion() throws IOException {
136     // Create a new table with region replication, don't put any data. Test that the secondary
137     // region replica is available to read.
138     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
139         Table table = connection.getTable(htd.getTableName())) {
140 
141       Get get = new Get(row);
142       get.setConsistency(Consistency.TIMELINE);
143       get.setReplicaId(1);
144       table.get(get); // this should not block
145     }
146   }
147 
148   /**
149    * Tests the case where if there is some data in the primary region, reopening the region replicas
150    * (enable/disable table, etc) makes the region replicas readable.
151    * @throws IOException
152    */
153   @Test(timeout = 60000)
testSecondaryRegionWithNonEmptyRegion()154   public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
155     // Create a new table with region replication and load some data
156     // than disable and enable the table again and verify the data from secondary
157     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
158         Table table = connection.getTable(htd.getTableName())) {
159 
160       HTU.loadNumericRows(table, fam, 0, 1000);
161 
162       HTU.getHBaseAdmin().disableTable(htd.getTableName());
163       HTU.getHBaseAdmin().enableTable(htd.getTableName());
164 
165       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
166     }
167   }
168 
169   /**
170    * Tests the case where killing a primary region with unflushed data recovers
171    */
172   @Test (timeout = 120000)
testPrimaryRegionKill()173   public void testPrimaryRegionKill() throws Exception {
174     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
175         Table table = connection.getTable(htd.getTableName())) {
176 
177       HTU.loadNumericRows(table, fam, 0, 1000);
178 
179       // wal replication is async, we have to wait until the replication catches up, or we timeout
180       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
181       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
182 
183       // we should not have flushed files now, but data in memstores of primary and secondary
184       // kill the primary region replica now, and ensure that when it comes back up, we can still
185       // read from it the same data from primary and secondaries
186       boolean aborted = false;
187       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
188         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
189           if (r.getRegionInfo().getReplicaId() == 0) {
190             LOG.info("Aborting region server hosting primary region replica");
191             rs.getRegionServer().abort("for test");
192             aborted = true;
193           }
194         }
195       }
196       assertTrue(aborted);
197 
198       // wal replication is async, we have to wait until the replication catches up, or we timeout
199       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
200       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
201       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
202     }
203 
204     // restart the region server
205     HTU.getMiniHBaseCluster().startRegionServer();
206   }
207 
208   /** wal replication is async, we have to wait until the replication catches up, or we timeout
209    */
verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, final int endRow, final int replicaId, final long timeout)210   private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
211       final int endRow, final int replicaId, final long timeout) throws Exception {
212     try {
213       HTU.waitFor(timeout, new Predicate<Exception>() {
214         @Override
215         public boolean evaluate() throws Exception {
216           try {
217             HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
218             return true;
219           } catch (AssertionError ae) {
220             return false;
221           }
222         }
223       });
224     } catch (Throwable t) {
225       // ignore this, but redo the verify do get the actual exception
226       HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
227     }
228   }
229 
230   /**
231    * Tests the case where killing a secondary region with unflushed data recovers, and the replica
232    * becomes available to read again shortly.
233    */
234   @Test (timeout = 120000)
testSecondaryRegionKill()235   public void testSecondaryRegionKill() throws Exception {
236     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
237         Table table = connection.getTable(htd.getTableName())) {
238       HTU.loadNumericRows(table, fam, 0, 1000);
239 
240       // wait for some time to ensure that async wal replication does it's magic
241       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
242       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
243 
244       // we should not have flushed files now, but data in memstores of primary and secondary
245       // kill the secondary region replica now, and ensure that when it comes back up, we can still
246       // read from it the same data
247       boolean aborted = false;
248       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
249         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
250           if (r.getRegionInfo().getReplicaId() == 1) {
251             LOG.info("Aborting region server hosting secondary region replica");
252             rs.getRegionServer().abort("for test");
253             aborted = true;
254           }
255         }
256       }
257       assertTrue(aborted);
258 
259       Threads.sleep(5000);
260 
261       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
262       HTU.verifyNumericRows(table, fam, 0, 1000, 2);
263     }
264 
265     // restart the region server
266     HTU.getMiniHBaseCluster().startRegionServer();
267   }
268 
269   /**
270    * Tests the case where there are 3 region replicas and the primary is continuously accepting
271    * new writes while one of the secondaries is killed. Verification is done for both of the
272    * secondary replicas.
273    */
274   @Test (timeout = 120000)
testSecondaryRegionKillWhilePrimaryIsAcceptingWrites()275   public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
276     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
277         Table table = connection.getTable(htd.getTableName());
278         Admin admin = connection.getAdmin()) {
279       // start a thread to do the loading of primary
280       HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
281       admin.flush(table.getName());
282       HTU.loadNumericRows(table, fam, 1000, 2000);
283 
284       final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
285       final AtomicBoolean done = new AtomicBoolean(false);
286       final AtomicInteger key = new AtomicInteger(2000);
287 
288       Thread loader = new Thread() {
289         @Override
290         public void run() {
291           while (!done.get()) {
292             try {
293               HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
294               key.addAndGet(1000);
295             } catch (Throwable e) {
296               ex.compareAndSet(null, e);
297             }
298           }
299         }
300       };
301       loader.start();
302 
303       Thread aborter = new Thread() {
304         @Override
305         public void run() {
306           try {
307             boolean aborted = false;
308             for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
309               for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
310                 if (r.getRegionInfo().getReplicaId() == 1) {
311                   LOG.info("Aborting region server hosting secondary region replica");
312                   rs.getRegionServer().abort("for test");
313                   aborted = true;
314                 }
315               }
316             }
317             assertTrue(aborted);
318           } catch (Throwable e) {
319             ex.compareAndSet(null, e);
320           }
321         };
322       };
323 
324       aborter.start();
325       aborter.join();
326       done.set(true);
327       loader.join();
328 
329       assertNull(ex.get());
330 
331       assertTrue(key.get() > 1000); // assert that the test is working as designed
332       LOG.info("Loaded up to key :" + key.get());
333       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
334       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
335       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
336     }
337 
338     // restart the region server
339     HTU.getMiniHBaseCluster().startRegionServer();
340   }
341 
342   /**
343    * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
344    * replicas should not block handlers on RS indefinitely.
345    */
346   @Test (timeout = 120000)
testLotsOfRegionReplicas()347   public void testLotsOfRegionReplicas() throws IOException {
348     int numRegions = NB_SERVERS * 20;
349     int regionReplication = 10;
350     String tableName = htd.getTableName().getNameAsString() + "2";
351     htd = HTU.createTableDescriptor(tableName);
352     htd.setRegionReplication(regionReplication);
353 
354     // dont care about splits themselves too much
355     byte[] startKey = Bytes.toBytes("aaa");
356     byte[] endKey = Bytes.toBytes("zzz");
357     byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
358     HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
359 
360     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
361         Table table = connection.getTable(htd.getTableName())) {
362 
363       for (int i = 1; i < splits.length; i++) {
364         for (int j = 0; j < regionReplication; j++) {
365           Get get = new Get(splits[i]);
366           get.setConsistency(Consistency.TIMELINE);
367           get.setReplicaId(j);
368           table.get(get); // this should not block. Regions should be coming online
369         }
370       }
371     }
372 
373     HTU.deleteTableIfAny(TableName.valueOf(tableName));
374   }
375 }
376