1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.regionserver; 20 21 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*; 22 import java.io.IOException; 23 import java.util.Random; 24 import java.util.concurrent.ExecutorService; 25 import java.util.concurrent.Executors; 26 import java.util.concurrent.TimeUnit; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.hbase.Cell; 33 import org.apache.hadoop.hbase.HBaseTestingUtility; 34 import org.apache.hadoop.hbase.HConstants; 35 import org.apache.hadoop.hbase.HRegionInfo; 36 import org.apache.hadoop.hbase.testclassification.MediumTests; 37 import org.apache.hadoop.hbase.TableName; 38 import org.apache.hadoop.hbase.TestMetaTableAccessor; 39 import org.apache.hadoop.hbase.client.Consistency; 40 import org.apache.hadoop.hbase.client.Get; 41 import org.apache.hadoop.hbase.client.HTable; 42 import org.apache.hadoop.hbase.client.Put; 43 import org.apache.hadoop.hbase.client.Result; 44 import org.apache.hadoop.hbase.client.Table; 45 import org.apache.hadoop.hbase.io.hfile.HFileScanner; 46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 47 import org.apache.hadoop.hbase.protobuf.RequestConverter; 48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 49 import org.apache.hadoop.hbase.util.Bytes; 50 import org.apache.hadoop.hbase.util.Threads; 51 import org.apache.hadoop.hbase.zookeeper.ZKAssign; 52 import org.apache.hadoop.hdfs.DFSConfigKeys; 53 import org.apache.hadoop.util.StringUtils; 54 import org.junit.After; 55 import org.junit.AfterClass; 56 import org.junit.Assert; 57 import org.junit.BeforeClass; 58 import org.junit.Test; 59 import org.junit.experimental.categories.Category; 60 61 import com.google.protobuf.ServiceException; 62 63 /** 64 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 65 * cluster. See {@link TestRegionServerNoMaster}. 66 */ 67 @Category(MediumTests.class) 68 public class TestRegionReplicas { 69 private static final Log LOG = LogFactory.getLog(TestRegionReplicas.class); 70 71 private static final int NB_SERVERS = 1; 72 private static HTable table; 73 private static final byte[] row = "TestRegionReplicas".getBytes(); 74 75 private static HRegionInfo hriPrimary; 76 private static HRegionInfo hriSecondary; 77 78 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 79 private static final byte[] f = HConstants.CATALOG_FAMILY; 80 81 @BeforeClass before()82 public static void before() throws Exception { 83 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 84 // when the file is moved to archive (e.g. compaction) 85 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 86 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 87 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 88 89 HTU.startMiniCluster(NB_SERVERS); 90 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 91 92 // Create table then get the single region for our new table. 93 table = HTU.createTable(tableName, f); 94 95 hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); 96 97 // mock a secondary region info to open 98 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 99 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 100 101 // No master 102 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 103 } 104 105 @AfterClass afterClass()106 public static void afterClass() throws Exception { 107 table.close(); 108 HTU.shutdownMiniCluster(); 109 } 110 111 @After after()112 public void after() throws Exception { 113 // Clean the state if the test failed before cleaning the znode 114 // It does not manage all bad failures, so if there are multiple failures, only 115 // the first one should be looked at. 116 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary); 117 } 118 getRS()119 private HRegionServer getRS() { 120 return HTU.getMiniHBaseCluster().getRegionServer(0); 121 } 122 123 @Test(timeout = 60000) testOpenRegionReplica()124 public void testOpenRegionReplica() throws Exception { 125 openRegion(HTU, getRS(), hriSecondary); 126 try { 127 //load some data to primary 128 HTU.loadNumericRows(table, f, 0, 1000); 129 130 // assert that we can read back from primary 131 Assert.assertEquals(1000, HTU.countRows(table)); 132 } finally { 133 HTU.deleteNumericRows(table, f, 0, 1000); 134 closeRegion(HTU, getRS(), hriSecondary); 135 } 136 } 137 138 /** Tests that the meta location is saved for secondary regions */ 139 @Test(timeout = 60000) testRegionReplicaUpdatesMetaLocation()140 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 141 openRegion(HTU, getRS(), hriSecondary); 142 Table meta = null; 143 try { 144 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 145 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName() 146 , getRS().getServerName(), -1, 1, false); 147 } finally { 148 if (meta != null ) meta.close(); 149 closeRegion(HTU, getRS(), hriSecondary); 150 } 151 } 152 153 @Test(timeout = 60000) testRegionReplicaGets()154 public void testRegionReplicaGets() throws Exception { 155 try { 156 //load some data to primary 157 HTU.loadNumericRows(table, f, 0, 1000); 158 // assert that we can read back from primary 159 Assert.assertEquals(1000, HTU.countRows(table)); 160 // flush so that region replica can read 161 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 162 region.flush(true); 163 164 openRegion(HTU, getRS(), hriSecondary); 165 166 // first try directly against region 167 region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); 168 assertGet(region, 42, true); 169 170 assertGetRpc(hriSecondary, 42, true); 171 } finally { 172 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 173 closeRegion(HTU, getRS(), hriSecondary); 174 } 175 } 176 177 @Test(timeout = 60000) testGetOnTargetRegionReplica()178 public void testGetOnTargetRegionReplica() throws Exception { 179 try { 180 //load some data to primary 181 HTU.loadNumericRows(table, f, 0, 1000); 182 // assert that we can read back from primary 183 Assert.assertEquals(1000, HTU.countRows(table)); 184 // flush so that region replica can read 185 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 186 region.flush(true); 187 188 openRegion(HTU, getRS(), hriSecondary); 189 190 // try directly Get against region replica 191 byte[] row = Bytes.toBytes(String.valueOf(42)); 192 Get get = new Get(row); 193 get.setConsistency(Consistency.TIMELINE); 194 get.setReplicaId(1); 195 Result result = table.get(get); 196 Assert.assertArrayEquals(row, result.getValue(f, null)); 197 } finally { 198 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 199 closeRegion(HTU, getRS(), hriSecondary); 200 } 201 } 202 assertGet(Region region, int value, boolean expect)203 private void assertGet(Region region, int value, boolean expect) throws IOException { 204 byte[] row = Bytes.toBytes(String.valueOf(value)); 205 Get get = new Get(row); 206 Result result = region.get(get); 207 if (expect) { 208 Assert.assertArrayEquals(row, result.getValue(f, null)); 209 } else { 210 result.isEmpty(); 211 } 212 } 213 214 // build a mock rpc assertGetRpc(HRegionInfo info, int value, boolean expect)215 private void assertGetRpc(HRegionInfo info, int value, boolean expect) 216 throws IOException, ServiceException { 217 byte[] row = Bytes.toBytes(String.valueOf(value)); 218 Get get = new Get(row); 219 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 220 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 221 Result result = ProtobufUtil.toResult(getResp.getResult()); 222 if (expect) { 223 Assert.assertArrayEquals(row, result.getValue(f, null)); 224 } else { 225 result.isEmpty(); 226 } 227 } 228 restartRegionServer()229 private void restartRegionServer() throws Exception { 230 afterClass(); 231 before(); 232 } 233 234 @Test(timeout = 300000) testRefreshStoreFiles()235 public void testRefreshStoreFiles() throws Exception { 236 // enable store file refreshing 237 final int refreshPeriod = 2000; // 2 sec 238 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 239 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 240 refreshPeriod); 241 // restart the region server so that it starts the refresher chore 242 restartRegionServer(); 243 244 try { 245 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 246 openRegion(HTU, getRS(), hriSecondary); 247 248 //load some data to primary 249 LOG.info("Loading data to primary region"); 250 HTU.loadNumericRows(table, f, 0, 1000); 251 // assert that we can read back from primary 252 Assert.assertEquals(1000, HTU.countRows(table)); 253 // flush so that region replica can read 254 LOG.info("Flushing primary region"); 255 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 256 region.flush(true); 257 258 // ensure that chore is run 259 LOG.info("Sleeping for " + (4 * refreshPeriod)); 260 Threads.sleep(4 * refreshPeriod); 261 262 LOG.info("Checking results from secondary region replica"); 263 Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); 264 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 265 266 assertGet(secondaryRegion, 42, true); 267 assertGetRpc(hriSecondary, 42, true); 268 assertGetRpc(hriSecondary, 1042, false); 269 270 //load some data to primary 271 HTU.loadNumericRows(table, f, 1000, 1100); 272 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 273 region.flush(true); 274 275 HTU.loadNumericRows(table, f, 2000, 2100); 276 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 277 region.flush(true); 278 279 // ensure that chore is run 280 Threads.sleep(4 * refreshPeriod); 281 282 assertGetRpc(hriSecondary, 42, true); 283 assertGetRpc(hriSecondary, 1042, true); 284 assertGetRpc(hriSecondary, 2042, true); 285 286 // ensure that we are see the 3 store files 287 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 288 289 // force compaction 290 HTU.compact(table.getName(), true); 291 292 long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod; 293 while (System.currentTimeMillis() < wakeUpTime) { 294 assertGetRpc(hriSecondary, 42, true); 295 assertGetRpc(hriSecondary, 1042, true); 296 assertGetRpc(hriSecondary, 2042, true); 297 Threads.sleep(10); 298 } 299 300 // ensure that we see the compacted file only 301 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 302 303 } finally { 304 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 305 closeRegion(HTU, getRS(), hriSecondary); 306 } 307 } 308 309 @Test(timeout = 300000) testFlushAndCompactionsInPrimary()310 public void testFlushAndCompactionsInPrimary() throws Exception { 311 312 long runtime = 30 * 1000; 313 // enable store file refreshing 314 final int refreshPeriod = 100; // 100ms refresh is a lot 315 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 316 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); 317 // restart the region server so that it starts the refresher chore 318 restartRegionServer(); 319 final int startKey = 0, endKey = 1000; 320 321 try { 322 openRegion(HTU, getRS(), hriSecondary); 323 324 //load some data to primary so that reader won't fail 325 HTU.loadNumericRows(table, f, startKey, endKey); 326 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 327 // ensure that chore is run 328 Threads.sleep(2 * refreshPeriod); 329 330 final AtomicBoolean running = new AtomicBoolean(true); 331 @SuppressWarnings("unchecked") 332 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 333 for (int i=0; i < exceptions.length; i++) { 334 exceptions[i] = new AtomicReference<Exception>(); 335 } 336 337 Runnable writer = new Runnable() { 338 int key = startKey; 339 @Override 340 public void run() { 341 try { 342 while (running.get()) { 343 byte[] data = Bytes.toBytes(String.valueOf(key)); 344 Put put = new Put(data); 345 put.add(f, null, data); 346 table.put(put); 347 key++; 348 if (key == endKey) key = startKey; 349 } 350 } catch (Exception ex) { 351 LOG.warn(ex); 352 exceptions[0].compareAndSet(null, ex); 353 } 354 } 355 }; 356 357 Runnable flusherCompactor = new Runnable() { 358 Random random = new Random(); 359 @Override 360 public void run() { 361 try { 362 while (running.get()) { 363 // flush or compact 364 if (random.nextBoolean()) { 365 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 366 } else { 367 HTU.compact(table.getName(), random.nextBoolean()); 368 } 369 } 370 } catch (Exception ex) { 371 LOG.warn(ex); 372 exceptions[1].compareAndSet(null, ex); 373 } 374 } 375 }; 376 377 Runnable reader = new Runnable() { 378 Random random = new Random(); 379 @Override 380 public void run() { 381 try { 382 while (running.get()) { 383 // whether to do a close and open 384 if (random.nextInt(10) == 0) { 385 try { 386 closeRegion(HTU, getRS(), hriSecondary); 387 } catch (Exception ex) { 388 LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 389 exceptions[2].compareAndSet(null, ex); 390 } 391 try { 392 openRegion(HTU, getRS(), hriSecondary); 393 } catch (Exception ex) { 394 LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 395 exceptions[2].compareAndSet(null, ex); 396 } 397 } 398 399 int key = random.nextInt(endKey - startKey) + startKey; 400 assertGetRpc(hriSecondary, key, true); 401 } 402 } catch (Exception ex) { 403 LOG.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 404 exceptions[2].compareAndSet(null, ex); 405 } 406 } 407 }; 408 409 LOG.info("Starting writer and reader"); 410 ExecutorService executor = Executors.newFixedThreadPool(3); 411 executor.submit(writer); 412 executor.submit(flusherCompactor); 413 executor.submit(reader); 414 415 // wait for threads 416 Threads.sleep(runtime); 417 running.set(false); 418 executor.shutdown(); 419 executor.awaitTermination(30, TimeUnit.SECONDS); 420 421 for (AtomicReference<Exception> exRef : exceptions) { 422 Assert.assertNull(exRef.get()); 423 } 424 } finally { 425 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 426 closeRegion(HTU, getRS(), hriSecondary); 427 } 428 } 429 430 @Test(timeout = 300000) testVerifySecondaryAbilityToReadWithOnFiles()431 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 432 // disable the store file refresh chore (we do this by hand) 433 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 434 restartRegionServer(); 435 436 try { 437 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 438 openRegion(HTU, getRS(), hriSecondary); 439 440 // load some data to primary 441 LOG.info("Loading data to primary region"); 442 for (int i = 0; i < 3; ++i) { 443 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 444 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 445 region.flush(true); 446 } 447 448 Region primaryRegion = getRS().getFromOnlineRegions(hriPrimary.getEncodedName()); 449 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 450 451 // Refresh store files on the secondary 452 Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); 453 secondaryRegion.getStore(f).refreshStoreFiles(); 454 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 455 456 // force compaction 457 LOG.info("Force Major compaction on primary region " + hriPrimary); 458 primaryRegion.compact(true); 459 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 460 461 // scan all the hfiles on the secondary. 462 // since there are no read on the secondary when we ask locations to 463 // the NN a FileNotFound exception will be returned and the FileLink 464 // should be able to deal with it giving us all the result we expect. 465 int keys = 0; 466 int sum = 0; 467 for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) { 468 // Our file does not exist anymore. was moved by the compaction above. 469 LOG.debug(getRS().getFileSystem().exists(sf.getPath())); 470 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); 471 472 HFileScanner scanner = sf.getReader().getScanner(false, false); 473 scanner.seekTo(); 474 do { 475 keys++; 476 477 Cell cell = scanner.getKeyValue(); 478 sum += Integer.parseInt(Bytes.toString(cell.getRowArray(), 479 cell.getRowOffset(), cell.getRowLength())); 480 } while (scanner.next()); 481 } 482 Assert.assertEquals(3000, keys); 483 Assert.assertEquals(4498500, sum); 484 } finally { 485 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 486 closeRegion(HTU, getRS(), hriSecondary); 487 } 488 } 489 } 490