1 /* 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase.io.hfile; 21 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNotEquals; 25 import static org.junit.Assert.assertTrue; 26 27 import java.io.IOException; 28 import java.util.ArrayList; 29 import java.util.Collection; 30 import java.util.EnumMap; 31 import java.util.List; 32 import java.util.Random; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.fs.FileSystem; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.hbase.HBaseTestingUtility; 40 import org.apache.hadoop.hbase.HColumnDescriptor; 41 import org.apache.hadoop.hbase.HConstants; 42 import org.apache.hadoop.hbase.KeyValue; 43 import org.apache.hadoop.hbase.testclassification.MediumTests; 44 import org.apache.hadoop.hbase.Tag; 45 import org.apache.hadoop.hbase.client.Durability; 46 import org.apache.hadoop.hbase.client.Put; 47 import org.apache.hadoop.hbase.fs.HFileSystem; 48 import org.apache.hadoop.hbase.io.compress.Compression; 49 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 50 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 51 import org.apache.hadoop.hbase.regionserver.BloomType; 52 import org.apache.hadoop.hbase.regionserver.HRegion; 53 import org.apache.hadoop.hbase.regionserver.Region; 54 import org.apache.hadoop.hbase.regionserver.StoreFile; 55 import org.apache.hadoop.hbase.util.BloomFilterFactory; 56 import org.apache.hadoop.hbase.util.Bytes; 57 import org.apache.hadoop.hbase.util.ChecksumType; 58 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 59 import org.junit.After; 60 import org.junit.AfterClass; 61 import org.junit.Before; 62 import org.junit.Test; 63 import org.junit.experimental.categories.Category; 64 import org.junit.runner.RunWith; 65 import org.junit.runners.Parameterized; 66 import org.junit.runners.Parameterized.Parameters; 67 68 import com.google.common.collect.Lists; 69 70 /** 71 * Tests {@link HFile} cache-on-write functionality for the following block 72 * types: data blocks, non-root index blocks, and Bloom filter blocks. 73 */ 74 @RunWith(Parameterized.class) 75 @Category(MediumTests.class) 76 public class TestCacheOnWrite { 77 78 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class); 79 80 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 81 private Configuration conf; 82 private CacheConfig cacheConf; 83 private FileSystem fs; 84 private Random rand = new Random(12983177L); 85 private Path storeFilePath; 86 private BlockCache blockCache; 87 private String testDescription; 88 89 private final CacheOnWriteType cowType; 90 private final Compression.Algorithm compress; 91 private final boolean cacheCompressedData; 92 93 private static final int DATA_BLOCK_SIZE = 2048; 94 private static final int NUM_KV = 25000; 95 private static final int INDEX_BLOCK_SIZE = 512; 96 private static final int BLOOM_BLOCK_SIZE = 4096; 97 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; 98 private static final int CKBYTES = 512; 99 100 /** The number of valid key types possible in a store file */ 101 private static final int NUM_VALID_KEY_TYPES = 102 KeyValue.Type.values().length - 2; 103 104 private static enum CacheOnWriteType { 105 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, 106 BlockType.DATA, BlockType.ENCODED_DATA), 107 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 108 BlockType.BLOOM_CHUNK), 109 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 110 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); 111 112 private final String confKey; 113 private final BlockType blockType1; 114 private final BlockType blockType2; 115 CacheOnWriteType(String confKey, BlockType blockType)116 private CacheOnWriteType(String confKey, BlockType blockType) { 117 this(confKey, blockType, blockType); 118 } 119 CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2)120 private CacheOnWriteType(String confKey, BlockType blockType1, 121 BlockType blockType2) { 122 this.blockType1 = blockType1; 123 this.blockType2 = blockType2; 124 this.confKey = confKey; 125 } 126 shouldBeCached(BlockType blockType)127 public boolean shouldBeCached(BlockType blockType) { 128 return blockType == blockType1 || blockType == blockType2; 129 } 130 modifyConf(Configuration conf)131 public void modifyConf(Configuration conf) { 132 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 133 conf.setBoolean(cowType.confKey, cowType == this); 134 } 135 } 136 } 137 TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, boolean cacheCompressedData, BlockCache blockCache)138 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, 139 boolean cacheCompressedData, BlockCache blockCache) { 140 this.cowType = cowType; 141 this.compress = compress; 142 this.cacheCompressedData = cacheCompressedData; 143 this.blockCache = blockCache; 144 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 145 ", cacheCompressedData=" + cacheCompressedData + "]"; 146 LOG.info(testDescription); 147 } 148 getBlockCaches()149 private static List<BlockCache> getBlockCaches() throws IOException { 150 Configuration conf = TEST_UTIL.getConfiguration(); 151 List<BlockCache> blockcaches = new ArrayList<BlockCache>(); 152 // default 153 blockcaches.add(new CacheConfig(conf).getBlockCache()); 154 155 // memory 156 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); 157 blockcaches.add(lru); 158 159 // bucket cache 160 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); 161 int[] bucketSizes = 162 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; 163 BlockCache bucketcache = 164 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); 165 blockcaches.add(bucketcache); 166 return blockcaches; 167 } 168 169 @Parameters getParameters()170 public static Collection<Object[]> getParameters() throws IOException { 171 List<Object[]> params = new ArrayList<Object[]>(); 172 for (BlockCache blockCache : getBlockCaches()) { 173 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 174 for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) { 175 for (boolean cacheCompressedData : new boolean[] { false, true }) { 176 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); 177 } 178 } 179 } 180 } 181 return params; 182 } 183 clearBlockCache(BlockCache blockCache)184 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { 185 if (blockCache instanceof LruBlockCache) { 186 ((LruBlockCache) blockCache).clearCache(); 187 } else { 188 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. 189 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { 190 if (clearCount > 0) { 191 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " 192 + blockCache.getBlockCount() + " blocks remaining"); 193 Thread.sleep(10); 194 } 195 for (CachedBlock block : Lists.newArrayList(blockCache)) { 196 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); 197 // CombinedBucketCache may need evict two times. 198 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { 199 if (evictCount > 1) { 200 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount 201 + " times, maybe a bug here"); 202 } 203 } 204 } 205 } 206 } 207 } 208 209 @Before setUp()210 public void setUp() throws IOException { 211 conf = TEST_UTIL.getConfiguration(); 212 this.conf.set("dfs.datanode.data.dir.perm", "700"); 213 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); 214 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); 215 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, 216 BLOOM_BLOCK_SIZE); 217 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); 218 cowType.modifyConf(conf); 219 fs = HFileSystem.get(conf); 220 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache; 221 cacheConf = 222 new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), 223 cowType.shouldBeCached(BlockType.LEAF_INDEX), 224 cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, 225 false, false, false); 226 } 227 228 @After tearDown()229 public void tearDown() throws IOException, InterruptedException { 230 clearBlockCache(blockCache); 231 } 232 233 @AfterClass afterClass()234 public static void afterClass() throws IOException { 235 TEST_UTIL.cleanupTestDir(); 236 } 237 testStoreFileCacheOnWriteInternals(boolean useTags)238 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { 239 writeStoreFile(useTags); 240 readStoreFile(useTags); 241 } 242 readStoreFile(boolean useTags)243 private void readStoreFile(boolean useTags) throws IOException { 244 AbstractHFileReader reader; 245 if (useTags) { 246 reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf); 247 } else { 248 reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); 249 } 250 LOG.info("HFile information: " + reader); 251 HFileContext meta = new HFileContextBuilder().withCompression(compress) 252 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 253 .withBlockSize(DATA_BLOCK_SIZE) 254 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 255 .withIncludesTags(useTags).build(); 256 final boolean cacheBlocks = false; 257 final boolean pread = false; 258 HFileScanner scanner = reader.getScanner(cacheBlocks, pread); 259 assertTrue(testDescription, scanner.seekTo()); 260 261 long offset = 0; 262 HFileBlock prevBlock = null; 263 EnumMap<BlockType, Integer> blockCountByType = 264 new EnumMap<BlockType, Integer>(BlockType.class); 265 266 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); 267 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 268 long onDiskSize = -1; 269 if (prevBlock != null) { 270 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); 271 } 272 // Flags: don't cache the block, use pread, this is not a compaction. 273 // Also, pass null for expected block type to avoid checking it. 274 HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, 275 false, true, null, encodingInCache); 276 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), 277 offset); 278 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); 279 boolean isCached = fromCache != null; 280 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); 281 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + 282 "isCached: " + isCached + "\n" + 283 "Test description: " + testDescription + "\n" + 284 "block: " + block + "\n" + 285 "encodingInCache: " + encodingInCache + "\n" + 286 "blockCacheKey: " + blockCacheKey, 287 shouldBeCached == isCached); 288 if (isCached) { 289 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { 290 if (compress != Compression.Algorithm.NONE) { 291 assertFalse(fromCache.isUnpacked()); 292 } 293 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); 294 } else { 295 assertTrue(fromCache.isUnpacked()); 296 } 297 // block we cached at write-time and block read from file should be identical 298 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); 299 assertEquals(block.getBlockType(), fromCache.getBlockType()); 300 assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA); 301 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); 302 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); 303 assertEquals( 304 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); 305 } 306 prevBlock = block; 307 offset += block.getOnDiskSizeWithHeader(); 308 BlockType bt = block.getBlockType(); 309 Integer count = blockCountByType.get(bt); 310 blockCountByType.put(bt, (count == null ? 0 : count) + 1); 311 } 312 313 LOG.info("Block count by type: " + blockCountByType); 314 String countByType = blockCountByType.toString(); 315 if (useTags) { 316 assertEquals("{" + BlockType.DATA 317 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=34}", countByType); 318 } else { 319 assertEquals("{" + BlockType.DATA 320 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); 321 } 322 323 // iterate all the keyvalue from hfile 324 while (scanner.next()) { 325 scanner.getKeyValue(); 326 } 327 reader.close(); 328 } 329 generateKeyType(Random rand)330 public static KeyValue.Type generateKeyType(Random rand) { 331 if (rand.nextBoolean()) { 332 // Let's make half of KVs puts. 333 return KeyValue.Type.Put; 334 } else { 335 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 336 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 337 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 338 + "Probably the layout of KeyValue.Type has changed."); 339 } 340 return keyType; 341 } 342 } 343 writeStoreFile(boolean useTags)344 private void writeStoreFile(boolean useTags) throws IOException { 345 if(useTags) { 346 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 347 } else { 348 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); 349 } 350 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), 351 "test_cache_on_write"); 352 HFileContext meta = new HFileContextBuilder().withCompression(compress) 353 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) 354 .withBlockSize(DATA_BLOCK_SIZE) 355 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 356 .withIncludesTags(useTags).build(); 357 StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs) 358 .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR) 359 .withFileContext(meta) 360 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); 361 byte[] cf = Bytes.toBytes("fam"); 362 for (int i = 0; i < NUM_KV; ++i) { 363 byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i); 364 byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand); 365 byte[] value = TestHFileWriterV2.randomValue(rand); 366 KeyValue kv; 367 if(useTags) { 368 Tag t = new Tag((byte) 1, "visibility"); 369 List<Tag> tagList = new ArrayList<Tag>(); 370 tagList.add(t); 371 Tag[] tags = new Tag[1]; 372 tags[0] = t; 373 kv = 374 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 375 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList); 376 } else { 377 kv = 378 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 379 rand.nextLong(), generateKeyType(rand), value, 0, value.length); 380 } 381 sfw.append(kv); 382 } 383 384 sfw.close(); 385 storeFilePath = sfw.getPath(); 386 } 387 testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)388 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) 389 throws IOException, InterruptedException { 390 if (useTags) { 391 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 392 } else { 393 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); 394 } 395 // TODO: need to change this test if we add a cache size threshold for 396 // compactions, or if we implement some other kind of intelligent logic for 397 // deciding what blocks to cache-on-write on compaction. 398 final String table = "CompactionCacheOnWrite"; 399 final String cf = "myCF"; 400 final byte[] cfBytes = Bytes.toBytes(cf); 401 final int maxVersions = 3; 402 Region region = TEST_UTIL.createTestRegion(table, 403 new HColumnDescriptor(cf) 404 .setCompressionType(compress) 405 .setBloomFilterType(BLOOM_TYPE) 406 .setMaxVersions(maxVersions) 407 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 408 ); 409 int rowIdx = 0; 410 long ts = EnvironmentEdgeManager.currentTime(); 411 for (int iFile = 0; iFile < 5; ++iFile) { 412 for (int iRow = 0; iRow < 500; ++iRow) { 413 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + 414 iRow; 415 Put p = new Put(Bytes.toBytes(rowStr)); 416 ++rowIdx; 417 for (int iCol = 0; iCol < 10; ++iCol) { 418 String qualStr = "col" + iCol; 419 String valueStr = "value_" + rowStr + "_" + qualStr; 420 for (int iTS = 0; iTS < 5; ++iTS) { 421 if (useTags) { 422 Tag t = new Tag((byte) 1, "visibility"); 423 Tag[] tags = new Tag[1]; 424 tags[0] = t; 425 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 426 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); 427 p.add(kv); 428 } else { 429 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); 430 } 431 } 432 } 433 p.setDurability(Durability.ASYNC_WAL); 434 region.put(p); 435 } 436 region.flush(true); 437 } 438 clearBlockCache(blockCache); 439 assertEquals(0, blockCache.getBlockCount()); 440 region.compact(false); 441 LOG.debug("compactStores() returned"); 442 443 for (CachedBlock block: blockCache) { 444 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 445 assertNotEquals(BlockType.DATA, block.getBlockType()); 446 } 447 ((HRegion)region).close(); 448 } 449 450 @Test testStoreFileCacheOnWrite()451 public void testStoreFileCacheOnWrite() throws IOException { 452 testStoreFileCacheOnWriteInternals(false); 453 testStoreFileCacheOnWriteInternals(true); 454 } 455 456 @Test testNotCachingDataBlocksDuringCompaction()457 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { 458 testNotCachingDataBlocksDuringCompactionInternals(false); 459 testNotCachingDataBlocksDuringCompactionInternals(true); 460 } 461 } 462