1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
22 import java.io.IOException;
23 import java.util.Random;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicReference;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.TestMetaTableAccessor;
39 import org.apache.hadoop.hbase.client.Consistency;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.HTable;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.RequestConverter;
48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.Threads;
51 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
52 import org.apache.hadoop.hdfs.DFSConfigKeys;
53 import org.apache.hadoop.util.StringUtils;
54 import org.junit.After;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.BeforeClass;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 
61 import com.google.protobuf.ServiceException;
62 
63 /**
64  * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
65  * cluster. See {@link TestRegionServerNoMaster}.
66  */
67 @Category(MediumTests.class)
68 public class TestRegionReplicas {
69   private static final Log LOG = LogFactory.getLog(TestRegionReplicas.class);
70 
71   private static final int NB_SERVERS = 1;
72   private static HTable table;
73   private static final byte[] row = "TestRegionReplicas".getBytes();
74 
75   private static HRegionInfo hriPrimary;
76   private static HRegionInfo hriSecondary;
77 
78   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
79   private static final byte[] f = HConstants.CATALOG_FAMILY;
80 
81   @BeforeClass
before()82   public static void before() throws Exception {
83     // Reduce the hdfs block size and prefetch to trigger the file-link reopen
84     // when the file is moved to archive (e.g. compaction)
85     HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
86     HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
87     HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
88 
89     HTU.startMiniCluster(NB_SERVERS);
90     final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
91 
92     // Create table then get the single region for our new table.
93     table = HTU.createTable(tableName, f);
94 
95     hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
96 
97     // mock a secondary region info to open
98     hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
99         hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
100 
101     // No master
102     TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
103   }
104 
105   @AfterClass
afterClass()106   public static void afterClass() throws Exception {
107     table.close();
108     HTU.shutdownMiniCluster();
109   }
110 
111   @After
after()112   public void after() throws Exception {
113     // Clean the state if the test failed before cleaning the znode
114     // It does not manage all bad failures, so if there are multiple failures, only
115     //  the first one should be looked at.
116     ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
117   }
118 
getRS()119   private HRegionServer getRS() {
120     return HTU.getMiniHBaseCluster().getRegionServer(0);
121   }
122 
123   @Test(timeout = 60000)
testOpenRegionReplica()124   public void testOpenRegionReplica() throws Exception {
125     openRegion(HTU, getRS(), hriSecondary);
126     try {
127       //load some data to primary
128       HTU.loadNumericRows(table, f, 0, 1000);
129 
130       // assert that we can read back from primary
131       Assert.assertEquals(1000, HTU.countRows(table));
132     } finally {
133       HTU.deleteNumericRows(table, f, 0, 1000);
134       closeRegion(HTU, getRS(), hriSecondary);
135     }
136   }
137 
138   /** Tests that the meta location is saved for secondary regions */
139   @Test(timeout = 60000)
testRegionReplicaUpdatesMetaLocation()140   public void testRegionReplicaUpdatesMetaLocation() throws Exception {
141     openRegion(HTU, getRS(), hriSecondary);
142     Table meta = null;
143     try {
144       meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
145       TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
146         , getRS().getServerName(), -1, 1, false);
147     } finally {
148       if (meta != null ) meta.close();
149       closeRegion(HTU, getRS(), hriSecondary);
150     }
151   }
152 
153   @Test(timeout = 60000)
testRegionReplicaGets()154   public void testRegionReplicaGets() throws Exception {
155     try {
156       //load some data to primary
157       HTU.loadNumericRows(table, f, 0, 1000);
158       // assert that we can read back from primary
159       Assert.assertEquals(1000, HTU.countRows(table));
160       // flush so that region replica can read
161       Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
162       region.flush(true);
163 
164       openRegion(HTU, getRS(), hriSecondary);
165 
166       // first try directly against region
167       region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
168       assertGet(region, 42, true);
169 
170       assertGetRpc(hriSecondary, 42, true);
171     } finally {
172       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
173       closeRegion(HTU, getRS(), hriSecondary);
174     }
175   }
176 
177   @Test(timeout = 60000)
testGetOnTargetRegionReplica()178   public void testGetOnTargetRegionReplica() throws Exception {
179     try {
180       //load some data to primary
181       HTU.loadNumericRows(table, f, 0, 1000);
182       // assert that we can read back from primary
183       Assert.assertEquals(1000, HTU.countRows(table));
184       // flush so that region replica can read
185       Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
186       region.flush(true);
187 
188       openRegion(HTU, getRS(), hriSecondary);
189 
190       // try directly Get against region replica
191       byte[] row = Bytes.toBytes(String.valueOf(42));
192       Get get = new Get(row);
193       get.setConsistency(Consistency.TIMELINE);
194       get.setReplicaId(1);
195       Result result = table.get(get);
196       Assert.assertArrayEquals(row, result.getValue(f, null));
197     } finally {
198       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
199       closeRegion(HTU, getRS(), hriSecondary);
200     }
201   }
202 
assertGet(Region region, int value, boolean expect)203   private void assertGet(Region region, int value, boolean expect) throws IOException {
204     byte[] row = Bytes.toBytes(String.valueOf(value));
205     Get get = new Get(row);
206     Result result = region.get(get);
207     if (expect) {
208       Assert.assertArrayEquals(row, result.getValue(f, null));
209     } else {
210       result.isEmpty();
211     }
212   }
213 
214   // build a mock rpc
assertGetRpc(HRegionInfo info, int value, boolean expect)215   private void assertGetRpc(HRegionInfo info, int value, boolean expect)
216       throws IOException, ServiceException {
217     byte[] row = Bytes.toBytes(String.valueOf(value));
218     Get get = new Get(row);
219     ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
220     ClientProtos.GetResponse getResp =  getRS().getRSRpcServices().get(null, getReq);
221     Result result = ProtobufUtil.toResult(getResp.getResult());
222     if (expect) {
223       Assert.assertArrayEquals(row, result.getValue(f, null));
224     } else {
225       result.isEmpty();
226     }
227   }
228 
restartRegionServer()229   private void restartRegionServer() throws Exception {
230     afterClass();
231     before();
232   }
233 
234   @Test(timeout = 300000)
testRefreshStoreFiles()235   public void testRefreshStoreFiles() throws Exception {
236     // enable store file refreshing
237     final int refreshPeriod = 2000; // 2 sec
238     HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
239     HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
240       refreshPeriod);
241     // restart the region server so that it starts the refresher chore
242     restartRegionServer();
243 
244     try {
245       LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
246       openRegion(HTU, getRS(), hriSecondary);
247 
248       //load some data to primary
249       LOG.info("Loading data to primary region");
250       HTU.loadNumericRows(table, f, 0, 1000);
251       // assert that we can read back from primary
252       Assert.assertEquals(1000, HTU.countRows(table));
253       // flush so that region replica can read
254       LOG.info("Flushing primary region");
255       Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
256       region.flush(true);
257 
258       // ensure that chore is run
259       LOG.info("Sleeping for " + (4 * refreshPeriod));
260       Threads.sleep(4 * refreshPeriod);
261 
262       LOG.info("Checking results from secondary region replica");
263       Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
264       Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
265 
266       assertGet(secondaryRegion, 42, true);
267       assertGetRpc(hriSecondary, 42, true);
268       assertGetRpc(hriSecondary, 1042, false);
269 
270       //load some data to primary
271       HTU.loadNumericRows(table, f, 1000, 1100);
272       region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
273       region.flush(true);
274 
275       HTU.loadNumericRows(table, f, 2000, 2100);
276       region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
277       region.flush(true);
278 
279       // ensure that chore is run
280       Threads.sleep(4 * refreshPeriod);
281 
282       assertGetRpc(hriSecondary, 42, true);
283       assertGetRpc(hriSecondary, 1042, true);
284       assertGetRpc(hriSecondary, 2042, true);
285 
286       // ensure that we are see the 3 store files
287       Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
288 
289       // force compaction
290       HTU.compact(table.getName(), true);
291 
292       long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
293       while (System.currentTimeMillis() < wakeUpTime) {
294         assertGetRpc(hriSecondary, 42, true);
295         assertGetRpc(hriSecondary, 1042, true);
296         assertGetRpc(hriSecondary, 2042, true);
297         Threads.sleep(10);
298       }
299 
300       // ensure that we see the compacted file only
301       Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
302 
303     } finally {
304       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
305       closeRegion(HTU, getRS(), hriSecondary);
306     }
307   }
308 
309   @Test(timeout = 300000)
testFlushAndCompactionsInPrimary()310   public void testFlushAndCompactionsInPrimary() throws Exception {
311 
312     long runtime = 30 * 1000;
313     // enable store file refreshing
314     final int refreshPeriod = 100; // 100ms refresh is a lot
315     HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
316     HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
317     // restart the region server so that it starts the refresher chore
318     restartRegionServer();
319     final int startKey = 0, endKey = 1000;
320 
321     try {
322       openRegion(HTU, getRS(), hriSecondary);
323 
324       //load some data to primary so that reader won't fail
325       HTU.loadNumericRows(table, f, startKey, endKey);
326       TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
327       // ensure that chore is run
328       Threads.sleep(2 * refreshPeriod);
329 
330       final AtomicBoolean running = new AtomicBoolean(true);
331       @SuppressWarnings("unchecked")
332       final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
333       for (int i=0; i < exceptions.length; i++) {
334         exceptions[i] = new AtomicReference<Exception>();
335       }
336 
337       Runnable writer = new Runnable() {
338         int key = startKey;
339         @Override
340         public void run() {
341           try {
342             while (running.get()) {
343               byte[] data = Bytes.toBytes(String.valueOf(key));
344               Put put = new Put(data);
345               put.add(f, null, data);
346               table.put(put);
347               key++;
348               if (key == endKey) key = startKey;
349             }
350           } catch (Exception ex) {
351             LOG.warn(ex);
352             exceptions[0].compareAndSet(null, ex);
353           }
354         }
355       };
356 
357       Runnable flusherCompactor = new Runnable() {
358         Random random = new Random();
359         @Override
360         public void run() {
361           try {
362             while (running.get()) {
363               // flush or compact
364               if (random.nextBoolean()) {
365                 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
366               } else {
367                 HTU.compact(table.getName(), random.nextBoolean());
368               }
369             }
370           } catch (Exception ex) {
371             LOG.warn(ex);
372             exceptions[1].compareAndSet(null, ex);
373           }
374         }
375       };
376 
377       Runnable reader = new Runnable() {
378         Random random = new Random();
379         @Override
380         public void run() {
381           try {
382             while (running.get()) {
383               // whether to do a close and open
384               if (random.nextInt(10) == 0) {
385                 try {
386                   closeRegion(HTU, getRS(), hriSecondary);
387                 } catch (Exception ex) {
388                   LOG.warn("Failed closing the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
389                   exceptions[2].compareAndSet(null, ex);
390                 }
391                 try {
392                   openRegion(HTU, getRS(), hriSecondary);
393                 } catch (Exception ex) {
394                   LOG.warn("Failed opening the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
395                   exceptions[2].compareAndSet(null, ex);
396                 }
397               }
398 
399               int key = random.nextInt(endKey - startKey) + startKey;
400               assertGetRpc(hriSecondary, key, true);
401             }
402           } catch (Exception ex) {
403             LOG.warn("Failed getting the value in the region " + hriSecondary + " "  + StringUtils.stringifyException(ex));
404             exceptions[2].compareAndSet(null, ex);
405           }
406         }
407       };
408 
409       LOG.info("Starting writer and reader");
410       ExecutorService executor = Executors.newFixedThreadPool(3);
411       executor.submit(writer);
412       executor.submit(flusherCompactor);
413       executor.submit(reader);
414 
415       // wait for threads
416       Threads.sleep(runtime);
417       running.set(false);
418       executor.shutdown();
419       executor.awaitTermination(30, TimeUnit.SECONDS);
420 
421       for (AtomicReference<Exception> exRef : exceptions) {
422         Assert.assertNull(exRef.get());
423       }
424     } finally {
425       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
426       closeRegion(HTU, getRS(), hriSecondary);
427     }
428   }
429 
430   @Test(timeout = 300000)
testVerifySecondaryAbilityToReadWithOnFiles()431   public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
432     // disable the store file refresh chore (we do this by hand)
433     HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
434     restartRegionServer();
435 
436     try {
437       LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
438       openRegion(HTU, getRS(), hriSecondary);
439 
440       // load some data to primary
441       LOG.info("Loading data to primary region");
442       for (int i = 0; i < 3; ++i) {
443         HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
444         Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
445         region.flush(true);
446       }
447 
448       Region primaryRegion = getRS().getFromOnlineRegions(hriPrimary.getEncodedName());
449       Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
450 
451       // Refresh store files on the secondary
452       Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
453       secondaryRegion.getStore(f).refreshStoreFiles();
454       Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
455 
456       // force compaction
457       LOG.info("Force Major compaction on primary region " + hriPrimary);
458       primaryRegion.compact(true);
459       Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
460 
461       // scan all the hfiles on the secondary.
462       // since there are no read on the secondary when we ask locations to
463       // the NN a FileNotFound exception will be returned and the FileLink
464       // should be able to deal with it giving us all the result we expect.
465       int keys = 0;
466       int sum = 0;
467       for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) {
468         // Our file does not exist anymore. was moved by the compaction above.
469         LOG.debug(getRS().getFileSystem().exists(sf.getPath()));
470         Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
471 
472         HFileScanner scanner = sf.getReader().getScanner(false, false);
473         scanner.seekTo();
474         do {
475           keys++;
476 
477           Cell cell = scanner.getKeyValue();
478           sum += Integer.parseInt(Bytes.toString(cell.getRowArray(),
479             cell.getRowOffset(), cell.getRowLength()));
480         } while (scanner.next());
481       }
482       Assert.assertEquals(3000, keys);
483       Assert.assertEquals(4498500, sum);
484     } finally {
485       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
486       closeRegion(HTU, getRS(), hriSecondary);
487     }
488   }
489 }
490