1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.replication;
19 
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.fail;
23 
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Random;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.testclassification.LargeTests;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.Admin;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.Durability;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.HBaseAdmin;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
50 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
59 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60 import org.junit.After;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 
65 import com.google.protobuf.ServiceException;
66 
67 @Category(LargeTests.class)
68 public class TestMasterReplication {
69 
70   private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
71 
72   private Configuration baseConfiguration;
73 
74   private HBaseTestingUtility[] utilities;
75   private Configuration[] configurations;
76   private MiniZooKeeperCluster miniZK;
77 
78   private static final long SLEEP_TIME = 500;
79   private static final int NB_RETRIES = 10;
80 
81   private static final TableName tableName = TableName.valueOf("test");
82   private static final byte[] famName = Bytes.toBytes("f");
83   private static final byte[] row = Bytes.toBytes("row");
84   private static final byte[] row1 = Bytes.toBytes("row1");
85   private static final byte[] row2 = Bytes.toBytes("row2");
86   private static final byte[] row3 = Bytes.toBytes("row3");
87   private static final byte[] row4 = Bytes.toBytes("row4");
88   private static final byte[] noRepfamName = Bytes.toBytes("norep");
89 
90   private static final byte[] count = Bytes.toBytes("count");
91   private static final byte[] put = Bytes.toBytes("put");
92   private static final byte[] delete = Bytes.toBytes("delete");
93 
94   private HTableDescriptor table;
95 
96   @Before
setUp()97   public void setUp() throws Exception {
98     baseConfiguration = HBaseConfiguration.create();
99     // smaller block size and capacity to trigger more operations
100     // and test them
101     baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
102     baseConfiguration.setInt("replication.source.size.capacity", 1024);
103     baseConfiguration.setLong("replication.source.sleepforretries", 100);
104     baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
105     baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
106     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
107         HConstants.REPLICATION_ENABLE_DEFAULT);
108     baseConfiguration.setBoolean("dfs.support.append", true);
109     baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
110     baseConfiguration.setStrings(
111         CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
112         CoprocessorCounter.class.getName());
113 
114     table = new HTableDescriptor(tableName);
115     HColumnDescriptor fam = new HColumnDescriptor(famName);
116     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
117     table.addFamily(fam);
118     fam = new HColumnDescriptor(noRepfamName);
119     table.addFamily(fam);
120   }
121 
122   /**
123    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
124    * adding and deleting a row to a table in each cluster, checking if it's
125    * replicated. It also tests that the puts and deletes are not replicated back
126    * to the originating cluster.
127    */
128   @Test(timeout = 300000)
testCyclicReplication1()129   public void testCyclicReplication1() throws Exception {
130     LOG.info("testSimplePutDelete");
131     int numClusters = 2;
132     Table[] htables = null;
133     try {
134       startMiniClusters(numClusters);
135       createTableOnClusters(table);
136 
137       htables = getHTablesOnClusters(tableName);
138 
139       // Test the replication scenarios of 0 -> 1 -> 0
140       addPeer("1", 0, 1);
141       addPeer("1", 1, 0);
142 
143       int[] expectedCounts = new int[] { 2, 2 };
144 
145       // add rows to both clusters,
146       // make sure they are both replication
147       putAndWait(row, famName, htables[0], htables[1]);
148       putAndWait(row1, famName, htables[1], htables[0]);
149       validateCounts(htables, put, expectedCounts);
150 
151       deleteAndWait(row, htables[0], htables[1]);
152       deleteAndWait(row1, htables[1], htables[0]);
153       validateCounts(htables, delete, expectedCounts);
154     } finally {
155       close(htables);
156       shutDownMiniClusters();
157     }
158   }
159 
160   /**
161    * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
162    * deleting rows to a table in each clusters and ensuring that the each of
163    * these clusters get the appropriate mutations. It also tests the grouping
164    * scenario where a cluster needs to replicate the edits originating from
165    * itself and also the edits that it received using replication from a
166    * different cluster. The scenario is explained in HBASE-9158
167    */
168   @Test(timeout = 300000)
testCyclicReplication2()169   public void testCyclicReplication2() throws Exception {
170     LOG.info("testCyclicReplication1");
171     int numClusters = 3;
172     Table[] htables = null;
173     try {
174       startMiniClusters(numClusters);
175       createTableOnClusters(table);
176 
177       // Test the replication scenario of 0 -> 1 -> 2 -> 0
178       addPeer("1", 0, 1);
179       addPeer("1", 1, 2);
180       addPeer("1", 2, 0);
181 
182       htables = getHTablesOnClusters(tableName);
183 
184       // put "row" and wait 'til it got around
185       putAndWait(row, famName, htables[0], htables[2]);
186       putAndWait(row1, famName, htables[1], htables[0]);
187       putAndWait(row2, famName, htables[2], htables[1]);
188 
189       deleteAndWait(row, htables[0], htables[2]);
190       deleteAndWait(row1, htables[1], htables[0]);
191       deleteAndWait(row2, htables[2], htables[1]);
192 
193       int[] expectedCounts = new int[] { 3, 3, 3 };
194       validateCounts(htables, put, expectedCounts);
195       validateCounts(htables, delete, expectedCounts);
196 
197       // Test HBASE-9158
198       disablePeer("1", 2);
199       // we now have an edit that was replicated into cluster originating from
200       // cluster 0
201       putAndWait(row3, famName, htables[0], htables[1]);
202       // now add a local edit to cluster 1
203       htables[1].put(new Put(row4).add(famName, row4, row4));
204       // re-enable replication from cluster 2 to cluster 0
205       enablePeer("1", 2);
206       // without HBASE-9158 the edit for row4 would have been marked with
207       // cluster 0's id
208       // and hence not replicated to cluster 0
209       wait(row4, htables[0], true);
210     } finally {
211       close(htables);
212       shutDownMiniClusters();
213     }
214   }
215 
216   /**
217    * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
218    */
219   @Test(timeout = 300000)
testCyclicReplication3()220   public void testCyclicReplication3() throws Exception {
221     LOG.info("testCyclicReplication2");
222     int numClusters = 3;
223     Table[] htables = null;
224     try {
225       startMiniClusters(numClusters);
226       createTableOnClusters(table);
227 
228       // Test the replication scenario of 0 -> 1 -> 2 -> 1
229       addPeer("1", 0, 1);
230       addPeer("1", 1, 2);
231       addPeer("1", 2, 1);
232 
233       htables = getHTablesOnClusters(tableName);
234 
235       // put "row" and wait 'til it got around
236       putAndWait(row, famName, htables[0], htables[2]);
237       putAndWait(row1, famName, htables[1], htables[2]);
238       putAndWait(row2, famName, htables[2], htables[1]);
239 
240       deleteAndWait(row, htables[0], htables[2]);
241       deleteAndWait(row1, htables[1], htables[2]);
242       deleteAndWait(row2, htables[2], htables[1]);
243 
244       int[] expectedCounts = new int[] { 1, 3, 3 };
245       validateCounts(htables, put, expectedCounts);
246       validateCounts(htables, delete, expectedCounts);
247     } finally {
248       close(htables);
249       shutDownMiniClusters();
250     }
251   }
252 
253   /*
254    * Test RSRpcServices#replicateWALEntry when replication is disabled. This is to simulate
255    * HBASE-14840
256    */
257   @Test(timeout = 180000, expected = ServiceException.class)
testReplicateWALEntryWhenReplicationIsDisabled()258   public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception {
259     LOG.info("testSimplePutDelete");
260     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
261     Table[] htables = null;
262     try {
263       startMiniClusters(1);
264       createTableOnClusters(table);
265       htables = getHTablesOnClusters(tableName);
266 
267       HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName);
268       RSRpcServices rsrpc = new RSRpcServices(rs);
269       rsrpc.replicateWALEntry(null, null);
270     } finally {
271       close(htables);
272       shutDownMiniClusters();
273     }
274   }
275 
276   @After
tearDown()277   public void tearDown() throws IOException {
278     configurations = null;
279     utilities = null;
280   }
281 
282   @SuppressWarnings("resource")
startMiniClusters(int numClusters)283   private void startMiniClusters(int numClusters) throws Exception {
284     Random random = new Random();
285     utilities = new HBaseTestingUtility[numClusters];
286     configurations = new Configuration[numClusters];
287     for (int i = 0; i < numClusters; i++) {
288       Configuration conf = new Configuration(baseConfiguration);
289       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
290       HBaseTestingUtility utility = new HBaseTestingUtility(conf);
291       if (i == 0) {
292         utility.startMiniZKCluster();
293         miniZK = utility.getZkCluster();
294       } else {
295         utility.setZkCluster(miniZK);
296       }
297       utility.startMiniCluster();
298       utilities[i] = utility;
299       configurations[i] = conf;
300       new ZooKeeperWatcher(conf, "cluster" + i, null, true);
301     }
302   }
303 
shutDownMiniClusters()304   private void shutDownMiniClusters() throws Exception {
305     int numClusters = utilities.length;
306     for (int i = numClusters - 1; i >= 0; i--) {
307       if (utilities[i] != null) {
308         utilities[i].shutdownMiniCluster();
309       }
310     }
311     miniZK.shutdown();
312   }
313 
createTableOnClusters(HTableDescriptor table)314   private void createTableOnClusters(HTableDescriptor table) throws Exception {
315     int numClusters = configurations.length;
316     for (int i = 0; i < numClusters; i++) {
317       Admin hbaseAdmin = null;
318       try {
319         hbaseAdmin = new HBaseAdmin(configurations[i]);
320         hbaseAdmin.createTable(table);
321       } finally {
322         close(hbaseAdmin);
323       }
324     }
325   }
326 
addPeer(String id, int masterClusterNumber, int slaveClusterNumber)327   private void addPeer(String id, int masterClusterNumber,
328       int slaveClusterNumber) throws Exception {
329     ReplicationAdmin replicationAdmin = null;
330     try {
331       replicationAdmin = new ReplicationAdmin(
332           configurations[masterClusterNumber]);
333       replicationAdmin.addPeer(id,
334           utilities[slaveClusterNumber].getClusterKey());
335     } finally {
336       close(replicationAdmin);
337     }
338   }
339 
disablePeer(String id, int masterClusterNumber)340   private void disablePeer(String id, int masterClusterNumber) throws Exception {
341     ReplicationAdmin replicationAdmin = null;
342     try {
343       replicationAdmin = new ReplicationAdmin(
344           configurations[masterClusterNumber]);
345       replicationAdmin.disablePeer(id);
346     } finally {
347       close(replicationAdmin);
348     }
349   }
350 
enablePeer(String id, int masterClusterNumber)351   private void enablePeer(String id, int masterClusterNumber) throws Exception {
352     ReplicationAdmin replicationAdmin = null;
353     try {
354       replicationAdmin = new ReplicationAdmin(
355           configurations[masterClusterNumber]);
356       replicationAdmin.enablePeer(id);
357     } finally {
358       close(replicationAdmin);
359     }
360   }
361 
close(Closeable... closeables)362   private void close(Closeable... closeables) {
363     try {
364       if (closeables != null) {
365         for (Closeable closeable : closeables) {
366           closeable.close();
367         }
368       }
369     } catch (Exception e) {
370       LOG.warn("Exception occured while closing the object:", e);
371     }
372   }
373 
374   @SuppressWarnings("resource")
getHTablesOnClusters(TableName tableName)375   private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
376     int numClusters = utilities.length;
377     Table[] htables = new Table[numClusters];
378     for (int i = 0; i < numClusters; i++) {
379       Table htable = new HTable(configurations[i], tableName);
380       htable.setWriteBufferSize(1024);
381       htables[i] = htable;
382     }
383     return htables;
384   }
385 
validateCounts(Table[] htables, byte[] type, int[] expectedCounts)386   private void validateCounts(Table[] htables, byte[] type,
387       int[] expectedCounts) throws IOException {
388     for (int i = 0; i < htables.length; i++) {
389       assertEquals(Bytes.toString(type) + " were replicated back ",
390           expectedCounts[i], getCount(htables[i], type));
391     }
392   }
393 
getCount(Table t, byte[] type)394   private int getCount(Table t, byte[] type) throws IOException {
395     Get test = new Get(row);
396     test.setAttribute("count", new byte[] {});
397     Result res = t.get(test);
398     return Bytes.toInt(res.getValue(count, type));
399   }
400 
deleteAndWait(byte[] row, Table source, Table target)401   private void deleteAndWait(byte[] row, Table source, Table target)
402       throws Exception {
403     Delete del = new Delete(row);
404     source.delete(del);
405     wait(row, target, true);
406   }
407 
putAndWait(byte[] row, byte[] fam, Table source, Table target)408   private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
409       throws Exception {
410     Put put = new Put(row);
411     put.add(fam, row, row);
412     source.put(put);
413     wait(row, target, false);
414   }
415 
wait(byte[] row, Table target, boolean isDeleted)416   private void wait(byte[] row, Table target, boolean isDeleted)
417       throws Exception {
418     Get get = new Get(row);
419     for (int i = 0; i < NB_RETRIES; i++) {
420       if (i == NB_RETRIES - 1) {
421         fail("Waited too much time for replication. Row:" + Bytes.toString(row)
422             + ". IsDeleteReplication:" + isDeleted);
423       }
424       Result res = target.get(get);
425       boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
426       if (sleep) {
427         LOG.info("Waiting for more time for replication. Row:"
428             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
429         Thread.sleep(SLEEP_TIME);
430       } else {
431         if (!isDeleted) {
432           assertArrayEquals(res.value(), row);
433         }
434         LOG.info("Obtained row:"
435             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
436         break;
437       }
438     }
439   }
440 
441   /**
442    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
443    * timestamp there is otherwise no way to count them.
444    */
445   public static class CoprocessorCounter extends BaseRegionObserver {
446     private int nCount = 0;
447     private int nDelete = 0;
448 
449     @Override
prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, final WALEdit edit, final Durability durability)450     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
451         final WALEdit edit, final Durability durability) throws IOException {
452       nCount++;
453     }
454 
455     @Override
postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete, final WALEdit edit, final Durability durability)456     public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
457         final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
458       nDelete++;
459     }
460 
461     @Override
preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get, final List<Cell> result)462     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
463         final Get get, final List<Cell> result) throws IOException {
464       if (get.getAttribute("count") != null) {
465         result.clear();
466         // order is important!
467         result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
468         result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
469         c.bypass();
470       }
471     }
472   }
473 
474 }
475