1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.regionserver; 20 21 import static org.junit.Assert.*; 22 23 import java.io.IOException; 24 import java.util.Arrays; 25 import java.util.Collection; 26 import java.util.concurrent.atomic.AtomicBoolean; 27 import java.util.concurrent.atomic.AtomicInteger; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.commons.logging.impl.Log4JLogger; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.hbase.HBaseTestingUtility; 35 import org.apache.hadoop.hbase.HConstants; 36 import org.apache.hadoop.hbase.HTableDescriptor; 37 import org.apache.hadoop.hbase.TableName; 38 import org.apache.hadoop.hbase.Waiter.Predicate; 39 import org.apache.hadoop.hbase.client.Admin; 40 import org.apache.hadoop.hbase.client.Connection; 41 import org.apache.hadoop.hbase.client.ConnectionFactory; 42 import org.apache.hadoop.hbase.client.Consistency; 43 import org.apache.hadoop.hbase.client.Get; 44 import org.apache.hadoop.hbase.client.RpcRetryingCaller; 45 import org.apache.hadoop.hbase.client.Table; 46 import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint; 47 import org.apache.hadoop.hbase.testclassification.LargeTests; 48 import org.apache.hadoop.hbase.util.Bytes; 49 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 50 import org.apache.hadoop.hbase.util.Threads; 51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 52 import org.apache.log4j.Level; 53 import org.junit.After; 54 import org.junit.Before; 55 import org.junit.Rule; 56 import org.junit.Test; 57 import org.junit.experimental.categories.Category; 58 import org.junit.rules.TestName; 59 import org.junit.runner.RunWith; 60 import org.junit.runners.Parameterized; 61 import org.junit.runners.Parameterized.Parameters; 62 63 /** 64 * Tests failover of secondary region replicas. 65 */ 66 @RunWith(Parameterized.class) 67 @Category(LargeTests.class) 68 public class TestRegionReplicaFailover { 69 70 private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); 71 72 static { 73 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); 74 } 75 76 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 77 78 private static final int NB_SERVERS = 3; 79 80 protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1, 81 HBaseTestingUtility.fam2, HBaseTestingUtility.fam3}; 82 protected final byte[] fam = HBaseTestingUtility.fam1; 83 protected final byte[] qual1 = Bytes.toBytes("qual1"); 84 protected final byte[] value1 = Bytes.toBytes("value1"); 85 protected final byte[] row = Bytes.toBytes("rowA"); 86 protected final byte[] row2 = Bytes.toBytes("rowB"); 87 88 @Rule public TestName name = new TestName(); 89 90 private HTableDescriptor htd; 91 92 /* 93 * We are testing with dist log split and dist log replay separately 94 */ 95 @Parameters getParameters()96 public static Collection<Object[]> getParameters() { 97 Object[][] params = 98 new Boolean[][] { /*{true}, Disable DLR!!! It is going to be removed*/ {false} }; 99 return Arrays.asList(params); 100 } 101 102 @Parameterized.Parameter(0) 103 public boolean distributedLogReplay; 104 105 @Before before()106 public void before() throws Exception { 107 Configuration conf = HTU.getConfiguration(); 108 // Up the handlers; this test needs more than usual. 109 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 110 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); 111 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 112 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); 113 conf.setInt("replication.stats.thread.period.seconds", 5); 114 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 115 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); 116 117 HTU.startMiniCluster(NB_SERVERS); 118 htd = HTU.createTableDescriptor( 119 name.getMethodName().substring(0, name.getMethodName().length()-3)); 120 htd.setRegionReplication(3); 121 HTU.getHBaseAdmin().createTable(htd); 122 } 123 124 @After after()125 public void after() throws Exception { 126 HTU.deleteTableIfAny(htd.getTableName()); 127 HTU.shutdownMiniCluster(); 128 } 129 130 /** 131 * Tests the case where a newly created table with region replicas and no data, the secondary 132 * region replicas are available to read immediately. 133 */ 134 @Test(timeout = 60000) testSecondaryRegionWithEmptyRegion()135 public void testSecondaryRegionWithEmptyRegion() throws IOException { 136 // Create a new table with region replication, don't put any data. Test that the secondary 137 // region replica is available to read. 138 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 139 Table table = connection.getTable(htd.getTableName())) { 140 141 Get get = new Get(row); 142 get.setConsistency(Consistency.TIMELINE); 143 get.setReplicaId(1); 144 table.get(get); // this should not block 145 } 146 } 147 148 /** 149 * Tests the case where if there is some data in the primary region, reopening the region replicas 150 * (enable/disable table, etc) makes the region replicas readable. 151 * @throws IOException 152 */ 153 @Test(timeout = 60000) testSecondaryRegionWithNonEmptyRegion()154 public void testSecondaryRegionWithNonEmptyRegion() throws IOException { 155 // Create a new table with region replication and load some data 156 // than disable and enable the table again and verify the data from secondary 157 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 158 Table table = connection.getTable(htd.getTableName())) { 159 160 HTU.loadNumericRows(table, fam, 0, 1000); 161 162 HTU.getHBaseAdmin().disableTable(htd.getTableName()); 163 HTU.getHBaseAdmin().enableTable(htd.getTableName()); 164 165 HTU.verifyNumericRows(table, fam, 0, 1000, 1); 166 } 167 } 168 169 /** 170 * Tests the case where killing a primary region with unflushed data recovers 171 */ 172 @Test (timeout = 120000) testPrimaryRegionKill()173 public void testPrimaryRegionKill() throws Exception { 174 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 175 Table table = connection.getTable(htd.getTableName())) { 176 177 HTU.loadNumericRows(table, fam, 0, 1000); 178 179 // wal replication is async, we have to wait until the replication catches up, or we timeout 180 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 181 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 182 183 // we should not have flushed files now, but data in memstores of primary and secondary 184 // kill the primary region replica now, and ensure that when it comes back up, we can still 185 // read from it the same data from primary and secondaries 186 boolean aborted = false; 187 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 188 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { 189 if (r.getRegionInfo().getReplicaId() == 0) { 190 LOG.info("Aborting region server hosting primary region replica"); 191 rs.getRegionServer().abort("for test"); 192 aborted = true; 193 } 194 } 195 } 196 assertTrue(aborted); 197 198 // wal replication is async, we have to wait until the replication catches up, or we timeout 199 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000); 200 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 201 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 202 } 203 204 // restart the region server 205 HTU.getMiniHBaseCluster().startRegionServer(); 206 } 207 208 /** wal replication is async, we have to wait until the replication catches up, or we timeout 209 */ verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, final int endRow, final int replicaId, final long timeout)210 private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, 211 final int endRow, final int replicaId, final long timeout) throws Exception { 212 try { 213 HTU.waitFor(timeout, new Predicate<Exception>() { 214 @Override 215 public boolean evaluate() throws Exception { 216 try { 217 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 218 return true; 219 } catch (AssertionError ae) { 220 return false; 221 } 222 } 223 }); 224 } catch (Throwable t) { 225 // ignore this, but redo the verify do get the actual exception 226 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 227 } 228 } 229 230 /** 231 * Tests the case where killing a secondary region with unflushed data recovers, and the replica 232 * becomes available to read again shortly. 233 */ 234 @Test (timeout = 120000) testSecondaryRegionKill()235 public void testSecondaryRegionKill() throws Exception { 236 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 237 Table table = connection.getTable(htd.getTableName())) { 238 HTU.loadNumericRows(table, fam, 0, 1000); 239 240 // wait for some time to ensure that async wal replication does it's magic 241 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 242 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 243 244 // we should not have flushed files now, but data in memstores of primary and secondary 245 // kill the secondary region replica now, and ensure that when it comes back up, we can still 246 // read from it the same data 247 boolean aborted = false; 248 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 249 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { 250 if (r.getRegionInfo().getReplicaId() == 1) { 251 LOG.info("Aborting region server hosting secondary region replica"); 252 rs.getRegionServer().abort("for test"); 253 aborted = true; 254 } 255 } 256 } 257 assertTrue(aborted); 258 259 Threads.sleep(5000); 260 261 HTU.verifyNumericRows(table, fam, 0, 1000, 1); 262 HTU.verifyNumericRows(table, fam, 0, 1000, 2); 263 } 264 265 // restart the region server 266 HTU.getMiniHBaseCluster().startRegionServer(); 267 } 268 269 /** 270 * Tests the case where there are 3 region replicas and the primary is continuously accepting 271 * new writes while one of the secondaries is killed. Verification is done for both of the 272 * secondary replicas. 273 */ 274 @Test (timeout = 120000) testSecondaryRegionKillWhilePrimaryIsAcceptingWrites()275 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception { 276 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 277 Table table = connection.getTable(htd.getTableName()); 278 Admin admin = connection.getAdmin()) { 279 // start a thread to do the loading of primary 280 HTU.loadNumericRows(table, fam, 0, 1000); // start with some base 281 admin.flush(table.getName()); 282 HTU.loadNumericRows(table, fam, 1000, 2000); 283 284 final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null); 285 final AtomicBoolean done = new AtomicBoolean(false); 286 final AtomicInteger key = new AtomicInteger(2000); 287 288 Thread loader = new Thread() { 289 @Override 290 public void run() { 291 while (!done.get()) { 292 try { 293 HTU.loadNumericRows(table, fam, key.get(), key.get()+1000); 294 key.addAndGet(1000); 295 } catch (Throwable e) { 296 ex.compareAndSet(null, e); 297 } 298 } 299 } 300 }; 301 loader.start(); 302 303 Thread aborter = new Thread() { 304 @Override 305 public void run() { 306 try { 307 boolean aborted = false; 308 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 309 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { 310 if (r.getRegionInfo().getReplicaId() == 1) { 311 LOG.info("Aborting region server hosting secondary region replica"); 312 rs.getRegionServer().abort("for test"); 313 aborted = true; 314 } 315 } 316 } 317 assertTrue(aborted); 318 } catch (Throwable e) { 319 ex.compareAndSet(null, e); 320 } 321 }; 322 }; 323 324 aborter.start(); 325 aborter.join(); 326 done.set(true); 327 loader.join(); 328 329 assertNull(ex.get()); 330 331 assertTrue(key.get() > 1000); // assert that the test is working as designed 332 LOG.info("Loaded up to key :" + key.get()); 333 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000); 334 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000); 335 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000); 336 } 337 338 // restart the region server 339 HTU.getMiniHBaseCluster().startRegionServer(); 340 } 341 342 /** 343 * Tests the case where we are creating a table with a lot of regions and replicas. Opening region 344 * replicas should not block handlers on RS indefinitely. 345 */ 346 @Test (timeout = 120000) testLotsOfRegionReplicas()347 public void testLotsOfRegionReplicas() throws IOException { 348 int numRegions = NB_SERVERS * 20; 349 int regionReplication = 10; 350 String tableName = htd.getTableName().getNameAsString() + "2"; 351 htd = HTU.createTableDescriptor(tableName); 352 htd.setRegionReplication(regionReplication); 353 354 // dont care about splits themselves too much 355 byte[] startKey = Bytes.toBytes("aaa"); 356 byte[] endKey = Bytes.toBytes("zzz"); 357 byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions); 358 HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions); 359 360 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 361 Table table = connection.getTable(htd.getTableName())) { 362 363 for (int i = 1; i < splits.length; i++) { 364 for (int j = 0; j < regionReplication; j++) { 365 Get get = new Get(splits[i]); 366 get.setConsistency(Consistency.TIMELINE); 367 get.setReplicaId(j); 368 table.get(get); // this should not block. Regions should be coming online 369 } 370 } 371 } 372 373 HTU.deleteTableIfAny(TableName.valueOf(tableName)); 374 } 375 } 376