1 /*
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 
20 package org.apache.hadoop.hbase.io.hfile;
21 
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotEquals;
25 import static org.junit.Assert.assertTrue;
26 
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.EnumMap;
31 import java.util.List;
32 import java.util.Random;
33 
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.Tag;
45 import org.apache.hadoop.hbase.client.Durability;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.fs.HFileSystem;
48 import org.apache.hadoop.hbase.io.compress.Compression;
49 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
51 import org.apache.hadoop.hbase.regionserver.BloomType;
52 import org.apache.hadoop.hbase.regionserver.HRegion;
53 import org.apache.hadoop.hbase.regionserver.Region;
54 import org.apache.hadoop.hbase.regionserver.StoreFile;
55 import org.apache.hadoop.hbase.util.BloomFilterFactory;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.ChecksumType;
58 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.junit.runner.RunWith;
65 import org.junit.runners.Parameterized;
66 import org.junit.runners.Parameterized.Parameters;
67 
68 import com.google.common.collect.Lists;
69 
70 /**
71  * Tests {@link HFile} cache-on-write functionality for the following block
72  * types: data blocks, non-root index blocks, and Bloom filter blocks.
73  */
74 @RunWith(Parameterized.class)
75 @Category(MediumTests.class)
76 public class TestCacheOnWrite {
77 
78   private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
79 
80   private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
81   private Configuration conf;
82   private CacheConfig cacheConf;
83   private FileSystem fs;
84   private Random rand = new Random(12983177L);
85   private Path storeFilePath;
86   private BlockCache blockCache;
87   private String testDescription;
88 
89   private final CacheOnWriteType cowType;
90   private final Compression.Algorithm compress;
91   private final boolean cacheCompressedData;
92 
93   private static final int DATA_BLOCK_SIZE = 2048;
94   private static final int NUM_KV = 25000;
95   private static final int INDEX_BLOCK_SIZE = 512;
96   private static final int BLOOM_BLOCK_SIZE = 4096;
97   private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
98   private static final int CKBYTES = 512;
99 
100   /** The number of valid key types possible in a store file */
101   private static final int NUM_VALID_KEY_TYPES =
102       KeyValue.Type.values().length - 2;
103 
104   private static enum CacheOnWriteType {
105     DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
106         BlockType.DATA, BlockType.ENCODED_DATA),
107     BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
108         BlockType.BLOOM_CHUNK),
109     INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
110         BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
111 
112     private final String confKey;
113     private final BlockType blockType1;
114     private final BlockType blockType2;
115 
CacheOnWriteType(String confKey, BlockType blockType)116     private CacheOnWriteType(String confKey, BlockType blockType) {
117       this(confKey, blockType, blockType);
118     }
119 
CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2)120     private CacheOnWriteType(String confKey, BlockType blockType1,
121         BlockType blockType2) {
122       this.blockType1 = blockType1;
123       this.blockType2 = blockType2;
124       this.confKey = confKey;
125     }
126 
shouldBeCached(BlockType blockType)127     public boolean shouldBeCached(BlockType blockType) {
128       return blockType == blockType1 || blockType == blockType2;
129     }
130 
modifyConf(Configuration conf)131     public void modifyConf(Configuration conf) {
132       for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
133         conf.setBoolean(cowType.confKey, cowType == this);
134       }
135     }
136   }
137 
TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, boolean cacheCompressedData, BlockCache blockCache)138   public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
139       boolean cacheCompressedData, BlockCache blockCache) {
140     this.cowType = cowType;
141     this.compress = compress;
142     this.cacheCompressedData = cacheCompressedData;
143     this.blockCache = blockCache;
144     testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
145         ", cacheCompressedData=" + cacheCompressedData + "]";
146     LOG.info(testDescription);
147   }
148 
getBlockCaches()149   private static List<BlockCache> getBlockCaches() throws IOException {
150     Configuration conf = TEST_UTIL.getConfiguration();
151     List<BlockCache> blockcaches = new ArrayList<BlockCache>();
152     // default
153     blockcaches.add(new CacheConfig(conf).getBlockCache());
154 
155     // memory
156     BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
157     blockcaches.add(lru);
158 
159     // bucket cache
160     FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
161     int[] bucketSizes =
162         { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
163     BlockCache bucketcache =
164         new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
165     blockcaches.add(bucketcache);
166     return blockcaches;
167   }
168 
169   @Parameters
getParameters()170   public static Collection<Object[]> getParameters() throws IOException {
171     List<Object[]> params = new ArrayList<Object[]>();
172     for (BlockCache blockCache : getBlockCaches()) {
173       for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
174         for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
175           for (boolean cacheCompressedData : new boolean[] { false, true }) {
176             params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache });
177           }
178         }
179       }
180     }
181     return params;
182   }
183 
clearBlockCache(BlockCache blockCache)184   private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
185     if (blockCache instanceof LruBlockCache) {
186       ((LruBlockCache) blockCache).clearCache();
187     } else {
188       // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
189       for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
190         if (clearCount > 0) {
191           LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
192               + blockCache.getBlockCount() + " blocks remaining");
193           Thread.sleep(10);
194         }
195         for (CachedBlock block : Lists.newArrayList(blockCache)) {
196           BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
197           // CombinedBucketCache may need evict two times.
198           for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
199             if (evictCount > 1) {
200               LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
201                   + " times, maybe a bug here");
202             }
203           }
204         }
205       }
206     }
207   }
208 
209   @Before
setUp()210   public void setUp() throws IOException {
211     conf = TEST_UTIL.getConfiguration();
212     this.conf.set("dfs.datanode.data.dir.perm", "700");
213     conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
214     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
215     conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
216         BLOOM_BLOCK_SIZE);
217     conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
218     cowType.modifyConf(conf);
219     fs = HFileSystem.get(conf);
220     CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache;
221     cacheConf =
222         new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
223         cowType.shouldBeCached(BlockType.LEAF_INDEX),
224         cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
225             false, false, false);
226   }
227 
228   @After
tearDown()229   public void tearDown() throws IOException, InterruptedException {
230     clearBlockCache(blockCache);
231   }
232 
233   @AfterClass
afterClass()234   public static void afterClass() throws IOException {
235     TEST_UTIL.cleanupTestDir();
236   }
237 
testStoreFileCacheOnWriteInternals(boolean useTags)238   private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
239     writeStoreFile(useTags);
240     readStoreFile(useTags);
241   }
242 
readStoreFile(boolean useTags)243   private void readStoreFile(boolean useTags) throws IOException {
244     AbstractHFileReader reader;
245     if (useTags) {
246         reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
247     } else {
248         reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
249     }
250     LOG.info("HFile information: " + reader);
251     HFileContext meta = new HFileContextBuilder().withCompression(compress)
252       .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
253       .withBlockSize(DATA_BLOCK_SIZE)
254       .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
255       .withIncludesTags(useTags).build();
256     final boolean cacheBlocks = false;
257     final boolean pread = false;
258     HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
259     assertTrue(testDescription, scanner.seekTo());
260 
261     long offset = 0;
262     HFileBlock prevBlock = null;
263     EnumMap<BlockType, Integer> blockCountByType =
264         new EnumMap<BlockType, Integer>(BlockType.class);
265 
266     DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
267     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
268       long onDiskSize = -1;
269       if (prevBlock != null) {
270          onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
271       }
272       // Flags: don't cache the block, use pread, this is not a compaction.
273       // Also, pass null for expected block type to avoid checking it.
274       HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
275         false, true, null, encodingInCache);
276       BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
277           offset);
278       HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
279       boolean isCached = fromCache != null;
280       boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
281       assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
282           "isCached: " + isCached + "\n" +
283           "Test description: " + testDescription + "\n" +
284           "block: " + block + "\n" +
285           "encodingInCache: " + encodingInCache + "\n" +
286           "blockCacheKey: " + blockCacheKey,
287         shouldBeCached == isCached);
288       if (isCached) {
289         if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
290           if (compress != Compression.Algorithm.NONE) {
291             assertFalse(fromCache.isUnpacked());
292           }
293           fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
294         } else {
295           assertTrue(fromCache.isUnpacked());
296         }
297         // block we cached at write-time and block read from file should be identical
298         assertEquals(block.getChecksumType(), fromCache.getChecksumType());
299         assertEquals(block.getBlockType(), fromCache.getBlockType());
300         assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA);
301         assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
302         assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
303         assertEquals(
304           block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
305       }
306       prevBlock = block;
307       offset += block.getOnDiskSizeWithHeader();
308       BlockType bt = block.getBlockType();
309       Integer count = blockCountByType.get(bt);
310       blockCountByType.put(bt, (count == null ? 0 : count) + 1);
311     }
312 
313     LOG.info("Block count by type: " + blockCountByType);
314     String countByType = blockCountByType.toString();
315     if (useTags) {
316       assertEquals("{" + BlockType.DATA
317           + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=34}", countByType);
318     } else {
319       assertEquals("{" + BlockType.DATA
320           + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
321     }
322 
323     // iterate all the keyvalue from hfile
324     while (scanner.next()) {
325       scanner.getKeyValue();
326     }
327     reader.close();
328   }
329 
generateKeyType(Random rand)330   public static KeyValue.Type generateKeyType(Random rand) {
331     if (rand.nextBoolean()) {
332       // Let's make half of KVs puts.
333       return KeyValue.Type.Put;
334     } else {
335       KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
336       if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
337         throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
338             + "Probably the layout of KeyValue.Type has changed.");
339       }
340       return keyType;
341     }
342   }
343 
writeStoreFile(boolean useTags)344   private void writeStoreFile(boolean useTags) throws IOException {
345     if(useTags) {
346       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
347     } else {
348       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
349     }
350     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
351         "test_cache_on_write");
352     HFileContext meta = new HFileContextBuilder().withCompression(compress)
353         .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
354         .withBlockSize(DATA_BLOCK_SIZE)
355         .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
356         .withIncludesTags(useTags).build();
357     StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
358         .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
359         .withFileContext(meta)
360         .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
361     byte[] cf = Bytes.toBytes("fam");
362     for (int i = 0; i < NUM_KV; ++i) {
363       byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
364       byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand);
365       byte[] value = TestHFileWriterV2.randomValue(rand);
366       KeyValue kv;
367       if(useTags) {
368         Tag t = new Tag((byte) 1, "visibility");
369         List<Tag> tagList = new ArrayList<Tag>();
370         tagList.add(t);
371         Tag[] tags = new Tag[1];
372         tags[0] = t;
373         kv =
374             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
375                 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
376       } else {
377         kv =
378             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
379                 rand.nextLong(), generateKeyType(rand), value, 0, value.length);
380       }
381       sfw.append(kv);
382     }
383 
384     sfw.close();
385     storeFilePath = sfw.getPath();
386   }
387 
testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)388   private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
389       throws IOException, InterruptedException {
390     if (useTags) {
391       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
392     } else {
393       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
394     }
395     // TODO: need to change this test if we add a cache size threshold for
396     // compactions, or if we implement some other kind of intelligent logic for
397     // deciding what blocks to cache-on-write on compaction.
398     final String table = "CompactionCacheOnWrite";
399     final String cf = "myCF";
400     final byte[] cfBytes = Bytes.toBytes(cf);
401     final int maxVersions = 3;
402     Region region = TEST_UTIL.createTestRegion(table,
403         new HColumnDescriptor(cf)
404             .setCompressionType(compress)
405             .setBloomFilterType(BLOOM_TYPE)
406             .setMaxVersions(maxVersions)
407             .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
408     );
409     int rowIdx = 0;
410     long ts = EnvironmentEdgeManager.currentTime();
411     for (int iFile = 0; iFile < 5; ++iFile) {
412       for (int iRow = 0; iRow < 500; ++iRow) {
413         String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
414             iRow;
415         Put p = new Put(Bytes.toBytes(rowStr));
416         ++rowIdx;
417         for (int iCol = 0; iCol < 10; ++iCol) {
418           String qualStr = "col" + iCol;
419           String valueStr = "value_" + rowStr + "_" + qualStr;
420           for (int iTS = 0; iTS < 5; ++iTS) {
421             if (useTags) {
422               Tag t = new Tag((byte) 1, "visibility");
423               Tag[] tags = new Tag[1];
424               tags[0] = t;
425               KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
426                   HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
427               p.add(kv);
428             } else {
429               p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
430             }
431           }
432         }
433         p.setDurability(Durability.ASYNC_WAL);
434         region.put(p);
435       }
436       region.flush(true);
437     }
438     clearBlockCache(blockCache);
439     assertEquals(0, blockCache.getBlockCount());
440     region.compact(false);
441     LOG.debug("compactStores() returned");
442 
443     for (CachedBlock block: blockCache) {
444       assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
445       assertNotEquals(BlockType.DATA, block.getBlockType());
446     }
447     ((HRegion)region).close();
448   }
449 
450   @Test
testStoreFileCacheOnWrite()451   public void testStoreFileCacheOnWrite() throws IOException {
452     testStoreFileCacheOnWriteInternals(false);
453     testStoreFileCacheOnWriteInternals(true);
454   }
455 
456   @Test
testNotCachingDataBlocksDuringCompaction()457   public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
458     testNotCachingDataBlocksDuringCompactionInternals(false);
459     testNotCachingDataBlocksDuringCompactionInternals(true);
460   }
461 }
462