1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.replication; 19 20 import static org.junit.Assert.assertArrayEquals; 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.fail; 23 24 import java.io.Closeable; 25 import java.io.IOException; 26 import java.util.List; 27 import java.util.Random; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.hbase.Cell; 32 import org.apache.hadoop.hbase.HBaseConfiguration; 33 import org.apache.hadoop.hbase.HBaseTestingUtility; 34 import org.apache.hadoop.hbase.HColumnDescriptor; 35 import org.apache.hadoop.hbase.HConstants; 36 import org.apache.hadoop.hbase.HTableDescriptor; 37 import org.apache.hadoop.hbase.KeyValue; 38 import org.apache.hadoop.hbase.testclassification.LargeTests; 39 import org.apache.hadoop.hbase.TableName; 40 import org.apache.hadoop.hbase.client.Admin; 41 import org.apache.hadoop.hbase.client.Delete; 42 import org.apache.hadoop.hbase.client.Durability; 43 import org.apache.hadoop.hbase.client.Get; 44 import org.apache.hadoop.hbase.client.HBaseAdmin; 45 import org.apache.hadoop.hbase.client.HTable; 46 import org.apache.hadoop.hbase.client.Put; 47 import org.apache.hadoop.hbase.client.Result; 48 import org.apache.hadoop.hbase.client.Table; 49 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 50 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; 51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 52 import org.apache.hadoop.hbase.coprocessor.ObserverContext; 53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 54 import org.apache.hadoop.hbase.regionserver.HRegionServer; 55 import org.apache.hadoop.hbase.regionserver.RSRpcServices; 56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit; 57 import org.apache.hadoop.hbase.util.Bytes; 58 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 59 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; 60 import org.junit.After; 61 import org.junit.Before; 62 import org.junit.Test; 63 import org.junit.experimental.categories.Category; 64 65 import com.google.protobuf.ServiceException; 66 67 @Category(LargeTests.class) 68 public class TestMasterReplication { 69 70 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class); 71 72 private Configuration baseConfiguration; 73 74 private HBaseTestingUtility[] utilities; 75 private Configuration[] configurations; 76 private MiniZooKeeperCluster miniZK; 77 78 private static final long SLEEP_TIME = 500; 79 private static final int NB_RETRIES = 10; 80 81 private static final TableName tableName = TableName.valueOf("test"); 82 private static final byte[] famName = Bytes.toBytes("f"); 83 private static final byte[] row = Bytes.toBytes("row"); 84 private static final byte[] row1 = Bytes.toBytes("row1"); 85 private static final byte[] row2 = Bytes.toBytes("row2"); 86 private static final byte[] row3 = Bytes.toBytes("row3"); 87 private static final byte[] row4 = Bytes.toBytes("row4"); 88 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 89 90 private static final byte[] count = Bytes.toBytes("count"); 91 private static final byte[] put = Bytes.toBytes("put"); 92 private static final byte[] delete = Bytes.toBytes("delete"); 93 94 private HTableDescriptor table; 95 96 @Before setUp()97 public void setUp() throws Exception { 98 baseConfiguration = HBaseConfiguration.create(); 99 // smaller block size and capacity to trigger more operations 100 // and test them 101 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 102 baseConfiguration.setInt("replication.source.size.capacity", 1024); 103 baseConfiguration.setLong("replication.source.sleepforretries", 100); 104 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); 105 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); 106 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, 107 HConstants.REPLICATION_ENABLE_DEFAULT); 108 baseConfiguration.setBoolean("dfs.support.append", true); 109 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 110 baseConfiguration.setStrings( 111 CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 112 CoprocessorCounter.class.getName()); 113 114 table = new HTableDescriptor(tableName); 115 HColumnDescriptor fam = new HColumnDescriptor(famName); 116 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 117 table.addFamily(fam); 118 fam = new HColumnDescriptor(noRepfamName); 119 table.addFamily(fam); 120 } 121 122 /** 123 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by 124 * adding and deleting a row to a table in each cluster, checking if it's 125 * replicated. It also tests that the puts and deletes are not replicated back 126 * to the originating cluster. 127 */ 128 @Test(timeout = 300000) testCyclicReplication1()129 public void testCyclicReplication1() throws Exception { 130 LOG.info("testSimplePutDelete"); 131 int numClusters = 2; 132 Table[] htables = null; 133 try { 134 startMiniClusters(numClusters); 135 createTableOnClusters(table); 136 137 htables = getHTablesOnClusters(tableName); 138 139 // Test the replication scenarios of 0 -> 1 -> 0 140 addPeer("1", 0, 1); 141 addPeer("1", 1, 0); 142 143 int[] expectedCounts = new int[] { 2, 2 }; 144 145 // add rows to both clusters, 146 // make sure they are both replication 147 putAndWait(row, famName, htables[0], htables[1]); 148 putAndWait(row1, famName, htables[1], htables[0]); 149 validateCounts(htables, put, expectedCounts); 150 151 deleteAndWait(row, htables[0], htables[1]); 152 deleteAndWait(row1, htables[1], htables[0]); 153 validateCounts(htables, delete, expectedCounts); 154 } finally { 155 close(htables); 156 shutDownMiniClusters(); 157 } 158 } 159 160 /** 161 * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and 162 * deleting rows to a table in each clusters and ensuring that the each of 163 * these clusters get the appropriate mutations. It also tests the grouping 164 * scenario where a cluster needs to replicate the edits originating from 165 * itself and also the edits that it received using replication from a 166 * different cluster. The scenario is explained in HBASE-9158 167 */ 168 @Test(timeout = 300000) testCyclicReplication2()169 public void testCyclicReplication2() throws Exception { 170 LOG.info("testCyclicReplication1"); 171 int numClusters = 3; 172 Table[] htables = null; 173 try { 174 startMiniClusters(numClusters); 175 createTableOnClusters(table); 176 177 // Test the replication scenario of 0 -> 1 -> 2 -> 0 178 addPeer("1", 0, 1); 179 addPeer("1", 1, 2); 180 addPeer("1", 2, 0); 181 182 htables = getHTablesOnClusters(tableName); 183 184 // put "row" and wait 'til it got around 185 putAndWait(row, famName, htables[0], htables[2]); 186 putAndWait(row1, famName, htables[1], htables[0]); 187 putAndWait(row2, famName, htables[2], htables[1]); 188 189 deleteAndWait(row, htables[0], htables[2]); 190 deleteAndWait(row1, htables[1], htables[0]); 191 deleteAndWait(row2, htables[2], htables[1]); 192 193 int[] expectedCounts = new int[] { 3, 3, 3 }; 194 validateCounts(htables, put, expectedCounts); 195 validateCounts(htables, delete, expectedCounts); 196 197 // Test HBASE-9158 198 disablePeer("1", 2); 199 // we now have an edit that was replicated into cluster originating from 200 // cluster 0 201 putAndWait(row3, famName, htables[0], htables[1]); 202 // now add a local edit to cluster 1 203 htables[1].put(new Put(row4).add(famName, row4, row4)); 204 // re-enable replication from cluster 2 to cluster 0 205 enablePeer("1", 2); 206 // without HBASE-9158 the edit for row4 would have been marked with 207 // cluster 0's id 208 // and hence not replicated to cluster 0 209 wait(row4, htables[0], true); 210 } finally { 211 close(htables); 212 shutDownMiniClusters(); 213 } 214 } 215 216 /** 217 * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. 218 */ 219 @Test(timeout = 300000) testCyclicReplication3()220 public void testCyclicReplication3() throws Exception { 221 LOG.info("testCyclicReplication2"); 222 int numClusters = 3; 223 Table[] htables = null; 224 try { 225 startMiniClusters(numClusters); 226 createTableOnClusters(table); 227 228 // Test the replication scenario of 0 -> 1 -> 2 -> 1 229 addPeer("1", 0, 1); 230 addPeer("1", 1, 2); 231 addPeer("1", 2, 1); 232 233 htables = getHTablesOnClusters(tableName); 234 235 // put "row" and wait 'til it got around 236 putAndWait(row, famName, htables[0], htables[2]); 237 putAndWait(row1, famName, htables[1], htables[2]); 238 putAndWait(row2, famName, htables[2], htables[1]); 239 240 deleteAndWait(row, htables[0], htables[2]); 241 deleteAndWait(row1, htables[1], htables[2]); 242 deleteAndWait(row2, htables[2], htables[1]); 243 244 int[] expectedCounts = new int[] { 1, 3, 3 }; 245 validateCounts(htables, put, expectedCounts); 246 validateCounts(htables, delete, expectedCounts); 247 } finally { 248 close(htables); 249 shutDownMiniClusters(); 250 } 251 } 252 253 /* 254 * Test RSRpcServices#replicateWALEntry when replication is disabled. This is to simulate 255 * HBASE-14840 256 */ 257 @Test(timeout = 180000, expected = ServiceException.class) testReplicateWALEntryWhenReplicationIsDisabled()258 public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception { 259 LOG.info("testSimplePutDelete"); 260 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false); 261 Table[] htables = null; 262 try { 263 startMiniClusters(1); 264 createTableOnClusters(table); 265 htables = getHTablesOnClusters(tableName); 266 267 HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName); 268 RSRpcServices rsrpc = new RSRpcServices(rs); 269 rsrpc.replicateWALEntry(null, null); 270 } finally { 271 close(htables); 272 shutDownMiniClusters(); 273 } 274 } 275 276 @After tearDown()277 public void tearDown() throws IOException { 278 configurations = null; 279 utilities = null; 280 } 281 282 @SuppressWarnings("resource") startMiniClusters(int numClusters)283 private void startMiniClusters(int numClusters) throws Exception { 284 Random random = new Random(); 285 utilities = new HBaseTestingUtility[numClusters]; 286 configurations = new Configuration[numClusters]; 287 for (int i = 0; i < numClusters; i++) { 288 Configuration conf = new Configuration(baseConfiguration); 289 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); 290 HBaseTestingUtility utility = new HBaseTestingUtility(conf); 291 if (i == 0) { 292 utility.startMiniZKCluster(); 293 miniZK = utility.getZkCluster(); 294 } else { 295 utility.setZkCluster(miniZK); 296 } 297 utility.startMiniCluster(); 298 utilities[i] = utility; 299 configurations[i] = conf; 300 new ZooKeeperWatcher(conf, "cluster" + i, null, true); 301 } 302 } 303 shutDownMiniClusters()304 private void shutDownMiniClusters() throws Exception { 305 int numClusters = utilities.length; 306 for (int i = numClusters - 1; i >= 0; i--) { 307 if (utilities[i] != null) { 308 utilities[i].shutdownMiniCluster(); 309 } 310 } 311 miniZK.shutdown(); 312 } 313 createTableOnClusters(HTableDescriptor table)314 private void createTableOnClusters(HTableDescriptor table) throws Exception { 315 int numClusters = configurations.length; 316 for (int i = 0; i < numClusters; i++) { 317 Admin hbaseAdmin = null; 318 try { 319 hbaseAdmin = new HBaseAdmin(configurations[i]); 320 hbaseAdmin.createTable(table); 321 } finally { 322 close(hbaseAdmin); 323 } 324 } 325 } 326 addPeer(String id, int masterClusterNumber, int slaveClusterNumber)327 private void addPeer(String id, int masterClusterNumber, 328 int slaveClusterNumber) throws Exception { 329 ReplicationAdmin replicationAdmin = null; 330 try { 331 replicationAdmin = new ReplicationAdmin( 332 configurations[masterClusterNumber]); 333 replicationAdmin.addPeer(id, 334 utilities[slaveClusterNumber].getClusterKey()); 335 } finally { 336 close(replicationAdmin); 337 } 338 } 339 disablePeer(String id, int masterClusterNumber)340 private void disablePeer(String id, int masterClusterNumber) throws Exception { 341 ReplicationAdmin replicationAdmin = null; 342 try { 343 replicationAdmin = new ReplicationAdmin( 344 configurations[masterClusterNumber]); 345 replicationAdmin.disablePeer(id); 346 } finally { 347 close(replicationAdmin); 348 } 349 } 350 enablePeer(String id, int masterClusterNumber)351 private void enablePeer(String id, int masterClusterNumber) throws Exception { 352 ReplicationAdmin replicationAdmin = null; 353 try { 354 replicationAdmin = new ReplicationAdmin( 355 configurations[masterClusterNumber]); 356 replicationAdmin.enablePeer(id); 357 } finally { 358 close(replicationAdmin); 359 } 360 } 361 close(Closeable... closeables)362 private void close(Closeable... closeables) { 363 try { 364 if (closeables != null) { 365 for (Closeable closeable : closeables) { 366 closeable.close(); 367 } 368 } 369 } catch (Exception e) { 370 LOG.warn("Exception occured while closing the object:", e); 371 } 372 } 373 374 @SuppressWarnings("resource") getHTablesOnClusters(TableName tableName)375 private Table[] getHTablesOnClusters(TableName tableName) throws Exception { 376 int numClusters = utilities.length; 377 Table[] htables = new Table[numClusters]; 378 for (int i = 0; i < numClusters; i++) { 379 Table htable = new HTable(configurations[i], tableName); 380 htable.setWriteBufferSize(1024); 381 htables[i] = htable; 382 } 383 return htables; 384 } 385 validateCounts(Table[] htables, byte[] type, int[] expectedCounts)386 private void validateCounts(Table[] htables, byte[] type, 387 int[] expectedCounts) throws IOException { 388 for (int i = 0; i < htables.length; i++) { 389 assertEquals(Bytes.toString(type) + " were replicated back ", 390 expectedCounts[i], getCount(htables[i], type)); 391 } 392 } 393 getCount(Table t, byte[] type)394 private int getCount(Table t, byte[] type) throws IOException { 395 Get test = new Get(row); 396 test.setAttribute("count", new byte[] {}); 397 Result res = t.get(test); 398 return Bytes.toInt(res.getValue(count, type)); 399 } 400 deleteAndWait(byte[] row, Table source, Table target)401 private void deleteAndWait(byte[] row, Table source, Table target) 402 throws Exception { 403 Delete del = new Delete(row); 404 source.delete(del); 405 wait(row, target, true); 406 } 407 putAndWait(byte[] row, byte[] fam, Table source, Table target)408 private void putAndWait(byte[] row, byte[] fam, Table source, Table target) 409 throws Exception { 410 Put put = new Put(row); 411 put.add(fam, row, row); 412 source.put(put); 413 wait(row, target, false); 414 } 415 wait(byte[] row, Table target, boolean isDeleted)416 private void wait(byte[] row, Table target, boolean isDeleted) 417 throws Exception { 418 Get get = new Get(row); 419 for (int i = 0; i < NB_RETRIES; i++) { 420 if (i == NB_RETRIES - 1) { 421 fail("Waited too much time for replication. Row:" + Bytes.toString(row) 422 + ". IsDeleteReplication:" + isDeleted); 423 } 424 Result res = target.get(get); 425 boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0; 426 if (sleep) { 427 LOG.info("Waiting for more time for replication. Row:" 428 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 429 Thread.sleep(SLEEP_TIME); 430 } else { 431 if (!isDeleted) { 432 assertArrayEquals(res.value(), row); 433 } 434 LOG.info("Obtained row:" 435 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 436 break; 437 } 438 } 439 } 440 441 /** 442 * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same 443 * timestamp there is otherwise no way to count them. 444 */ 445 public static class CoprocessorCounter extends BaseRegionObserver { 446 private int nCount = 0; 447 private int nDelete = 0; 448 449 @Override prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability)450 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 451 final WALEdit edit, final Durability durability) throws IOException { 452 nCount++; 453 } 454 455 @Override postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete, final WALEdit edit, final Durability durability)456 public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, 457 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 458 nDelete++; 459 } 460 461 @Override preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get, final List<Cell> result)462 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, 463 final Get get, final List<Cell> result) throws IOException { 464 if (get.getAttribute("count") != null) { 465 result.clear(); 466 // order is important! 467 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); 468 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); 469 c.bypass(); 470 } 471 } 472 } 473 474 } 475