1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.Comparator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.TreeSet;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HBaseTestCase;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.KeyValueUtil;
44 import org.apache.hadoop.hbase.testclassification.SmallTests;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.io.HFileLink;
48 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49 import org.apache.hadoop.hbase.io.hfile.BlockCache;
50 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
51 import org.apache.hadoop.hbase.io.hfile.CacheStats;
52 import org.apache.hadoop.hbase.io.hfile.HFileContext;
53 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
54 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
55 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
56 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
57 import org.apache.hadoop.hbase.util.BloomFilterFactory;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.ChecksumType;
60 import org.apache.hadoop.hbase.util.FSUtils;
61 import org.junit.experimental.categories.Category;
62 import org.mockito.Mockito;
63 
64 import com.google.common.base.Joiner;
65 import com.google.common.collect.Iterables;
66 import com.google.common.collect.Lists;
67 
68 import static org.mockito.Mockito.mock;
69 import static org.mockito.Mockito.spy;
70 import static org.mockito.Mockito.when;
71 
72 /**
73  * Test HStoreFile
74  */
75 @Category(SmallTests.class)
76 public class TestStoreFile extends HBaseTestCase {
77   private static final Log LOG = LogFactory.getLog(TestStoreFile.class);
78   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
79   private CacheConfig cacheConf =  new CacheConfig(TEST_UTIL.getConfiguration());
80   private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
81   private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
82   private static final int CKBYTES = 512;
83   private static String TEST_FAMILY = "cf";
84 
85   @Override
setUp()86   public void setUp() throws Exception {
87     super.setUp();
88   }
89 
90   @Override
tearDown()91   public void tearDown() throws Exception {
92     super.tearDown();
93   }
94 
95   /**
96    * Write a file and then assert that we can read from top and bottom halves
97    * using two HalfMapFiles.
98    * @throws Exception
99    */
testBasicHalfMapFile()100   public void testBasicHalfMapFile() throws Exception {
101     final HRegionInfo hri =
102         new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
103     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
104       conf, fs, new Path(this.testDir, hri.getTable().getNameAsString()), hri);
105 
106     HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
107     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
108             .withFilePath(regionFs.createTempName())
109             .withFileContext(meta)
110             .build();
111     writeStoreFile(writer);
112 
113     Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
114     StoreFile sf = new StoreFile(this.fs, sfPath, conf, cacheConf,
115       BloomType.NONE);
116     checkHalfHFile(regionFs, sf);
117   }
118 
writeStoreFile(final StoreFile.Writer writer)119   private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
120     writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
121   }
122 
123   // pick an split point (roughly halfway)
124   byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
125 
126   /*
127    * Writes HStoreKey and ImmutableBytes data to passed writer and
128    * then closes it.
129    * @param writer
130    * @throws IOException
131    */
writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)132   public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)
133   throws IOException {
134     long now = System.currentTimeMillis();
135     try {
136       for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
137         for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
138           byte[] b = new byte[] { (byte) d, (byte) e };
139           writer.append(new KeyValue(b, fam, qualifier, now, b));
140         }
141       }
142     } finally {
143       writer.close();
144     }
145   }
146 
147   /**
148    * Test that our mechanism of writing store files in one region to reference
149    * store files in other regions works.
150    * @throws IOException
151    */
testReference()152   public void testReference() throws IOException {
153     final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testReferenceTb"));
154     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
155       conf, fs, new Path(this.testDir, hri.getTable().getNameAsString()), hri);
156 
157     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
158     // Make a store file and write data to it.
159     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
160             .withFilePath(regionFs.createTempName())
161             .withFileContext(meta)
162             .build();
163     writeStoreFile(writer);
164 
165     Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
166     StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf,
167       BloomType.NONE);
168     StoreFile.Reader reader = hsf.createReader();
169     // Split on a row, not in middle of row.  Midkey returned by reader
170     // may be in middle of row.  Create new one with empty column and
171     // timestamp.
172 
173     KeyValue kv = KeyValue.createKeyValueFromKey(reader.midkey());
174     byte [] midRow = kv.getRow();
175     kv = KeyValue.createKeyValueFromKey(reader.getLastKey());
176     byte [] finalRow = kv.getRow();
177     hsf.closeReader(true);
178 
179     // Make a reference
180     HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
181     Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
182     StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf,
183       BloomType.NONE);
184     // Now confirm that I can read from the reference and that it only gets
185     // keys from top half of the file.
186     HFileScanner s = refHsf.createReader().getScanner(false, false);
187     for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
188       ByteBuffer bb = s.getKey();
189       kv = KeyValue.createKeyValueFromKey(bb);
190       if (first) {
191         assertTrue(Bytes.equals(kv.getRow(), midRow));
192         first = false;
193       }
194     }
195     assertTrue(Bytes.equals(kv.getRow(), finalRow));
196   }
197 
testHFileLink()198   public void testHFileLink() throws IOException {
199     final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testHFileLinkTb"));
200     // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
201     Configuration testConf = new Configuration(this.conf);
202     FSUtils.setRootDir(testConf, this.testDir);
203     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
204       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()), hri);
205     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
206 
207     // Make a store file and write data to it.
208     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
209             .withFilePath(regionFs.createTempName())
210             .withFileContext(meta)
211             .build();
212     writeStoreFile(writer);
213 
214     Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
215     Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", TEST_FAMILY));
216     HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
217     Path linkFilePath = new Path(dstPath,
218                   HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
219 
220     // Try to open store file from link
221     StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
222     StoreFile hsf = new StoreFile(this.fs, storeFileInfo, testConf, cacheConf,
223       BloomType.NONE);
224     assertTrue(storeFileInfo.isLink());
225 
226     // Now confirm that I can read from the link
227     int count = 1;
228     HFileScanner s = hsf.createReader().getScanner(false, false);
229     s.seekTo();
230     while (s.next()) {
231       count++;
232     }
233     assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
234   }
235 
testEmptyStoreFileRestrictKeyRanges()236   public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
237     StoreFile.Reader reader = mock(StoreFile.Reader.class);
238     Store store = mock(Store.class);
239     HColumnDescriptor hcd = mock(HColumnDescriptor.class);
240     byte[] cf = Bytes.toBytes("ty");
241     when(hcd.getName()).thenReturn(cf);
242     when(store.getFamily()).thenReturn(hcd);
243     StoreFileScanner scanner =
244         new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0);
245     Scan scan = new Scan();
246     scan.setColumnFamilyTimeRange(cf, 0, 1);
247     assertFalse(scanner.shouldUseScanner(scan, store, 0));
248   }
249 
250   /**
251    * This test creates an hfile and then the dir structures and files to verify that references
252    * to hfilelinks (created by snapshot clones) can be properly interpreted.
253    */
testReferenceToHFileLink()254   public void testReferenceToHFileLink() throws IOException {
255     // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
256     Configuration testConf = new Configuration(this.conf);
257     FSUtils.setRootDir(testConf, this.testDir);
258 
259     // adding legal table name chars to verify regex handles it.
260     HRegionInfo hri = new HRegionInfo(TableName.valueOf("_original-evil-name"));
261     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
262       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()), hri);
263 
264     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
265     // Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
266     StoreFile.Writer writer = new StoreFile.WriterBuilder(testConf, cacheConf, this.fs)
267             .withFilePath(regionFs.createTempName())
268             .withFileContext(meta)
269             .build();
270     writeStoreFile(writer);
271     Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
272 
273     // create link to store file. <root>/clone/region/<cf>/<hfile>-<region>-<table>
274     HRegionInfo hriClone = new HRegionInfo(TableName.valueOf("clone"));
275     HRegionFileSystem cloneRegionFs = HRegionFileSystem.createRegionOnFileSystem(
276       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()),
277         hriClone);
278     Path dstPath = cloneRegionFs.getStoreDir(TEST_FAMILY);
279     HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
280     Path linkFilePath = new Path(dstPath,
281                   HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
282 
283     // create splits of the link.
284     // <root>/clone/splitA/<cf>/<reftohfilelink>,
285     // <root>/clone/splitB/<cf>/<reftohfilelink>
286     HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
287     HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
288     StoreFile f = new StoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE);
289     f.createReader();
290     Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
291     Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
292     f.closeReader(true);
293     // OK test the thing
294     FSUtils.logFileSystemState(fs, this.testDir, LOG);
295 
296     // There is a case where a file with the hfilelink pattern is actually a daughter
297     // reference to a hfile link.  This code in StoreFile that handles this case.
298 
299     // Try to open store file from link
300     StoreFile hsfA = new StoreFile(this.fs, pathA, testConf, cacheConf,
301       BloomType.NONE);
302 
303     // Now confirm that I can read from the ref to link
304     int count = 1;
305     HFileScanner s = hsfA.createReader().getScanner(false, false);
306     s.seekTo();
307     while (s.next()) {
308       count++;
309     }
310     assertTrue(count > 0); // read some rows here
311 
312     // Try to open store file from link
313     StoreFile hsfB = new StoreFile(this.fs, pathB, testConf, cacheConf,
314       BloomType.NONE);
315 
316     // Now confirm that I can read from the ref to link
317     HFileScanner sB = hsfB.createReader().getScanner(false, false);
318     sB.seekTo();
319 
320     //count++ as seekTo() will advance the scanner
321     count++;
322     while (sB.next()) {
323       count++;
324     }
325 
326     // read the rest of the rows
327     assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
328   }
329 
checkHalfHFile(final HRegionFileSystem regionFs, final StoreFile f)330   private void checkHalfHFile(final HRegionFileSystem regionFs, final StoreFile f)
331       throws IOException {
332     byte [] midkey = f.createReader().midkey();
333     KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);
334     byte [] midRow = midKV.getRow();
335     // Create top split.
336     HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
337         null, midRow);
338     Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
339     // Create bottom split.
340     HRegionInfo bottomHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
341         midRow, null);
342     Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
343     // Make readers on top and bottom.
344     StoreFile.Reader top = new StoreFile(
345       this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
346     StoreFile.Reader bottom = new StoreFile(
347       this.fs, bottomPath, conf, cacheConf, BloomType.NONE).createReader();
348     ByteBuffer previous = null;
349     LOG.info("Midkey: " + midKV.toString());
350     ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midkey);
351     try {
352       // Now make two HalfMapFiles and assert they can read the full backing
353       // file, one from the top and the other from the bottom.
354       // Test bottom half first.
355       // Now test reading from the top.
356       boolean first = true;
357       ByteBuffer key = null;
358       HFileScanner topScanner = top.getScanner(false, false);
359       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
360              (topScanner.isSeeked() && topScanner.next())) {
361         key = topScanner.getKey();
362 
363         if (topScanner.getReader().getComparator().compareFlatKey(key.array(),
364           key.arrayOffset(), key.limit(), midkey, 0, midkey.length) < 0) {
365           fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
366               Bytes.toStringBinary(midkey));
367         }
368         if (first) {
369           first = false;
370           LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
371         }
372       }
373       LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
374 
375       first = true;
376       HFileScanner bottomScanner = bottom.getScanner(false, false);
377       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
378           bottomScanner.next()) {
379         previous = bottomScanner.getKey();
380         key = bottomScanner.getKey();
381         if (first) {
382           first = false;
383           LOG.info("First in bottom: " +
384             Bytes.toString(Bytes.toBytes(previous)));
385         }
386         assertTrue(key.compareTo(bbMidkeyBytes) < 0);
387       }
388       if (previous != null) {
389         LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
390       }
391       // Remove references.
392       regionFs.cleanupDaughterRegion(topHri);
393       regionFs.cleanupDaughterRegion(bottomHri);
394 
395       // Next test using a midkey that does not exist in the file.
396       // First, do a key that is < than first key. Ensure splits behave
397       // properly.
398       byte [] badmidkey = Bytes.toBytes("  .");
399       assertTrue(fs.exists(f.getPath()));
400       topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, badmidkey, true);
401       bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
402 
403       assertNull(bottomPath);
404 
405       top = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
406       // Now read from the top.
407       first = true;
408       topScanner = top.getScanner(false, false);
409       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
410           topScanner.next()) {
411         key = topScanner.getKey();
412         assertTrue(topScanner.getReader().getComparator().compareFlatKey(key.array(),
413             key.arrayOffset(), key.limit(), badmidkey, 0, badmidkey.length) >= 0);
414         if (first) {
415           first = false;
416           KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
417           LOG.info("First top when key < bottom: " + keyKV);
418           String tmp = Bytes.toString(keyKV.getRow());
419           for (int i = 0; i < tmp.length(); i++) {
420             assertTrue(tmp.charAt(i) == 'a');
421           }
422         }
423       }
424       KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
425       LOG.info("Last top when key < bottom: " + keyKV);
426       String tmp = Bytes.toString(keyKV.getRow());
427       for (int i = 0; i < tmp.length(); i++) {
428         assertTrue(tmp.charAt(i) == 'z');
429       }
430       // Remove references.
431       regionFs.cleanupDaughterRegion(topHri);
432       regionFs.cleanupDaughterRegion(bottomHri);
433 
434       // Test when badkey is > than last key in file ('||' > 'zz').
435       badmidkey = Bytes.toBytes("|||");
436       topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
437       bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
438       assertNull(topPath);
439       bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf,
440         BloomType.NONE).createReader();
441       first = true;
442       bottomScanner = bottom.getScanner(false, false);
443       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
444           bottomScanner.next()) {
445         key = bottomScanner.getKey();
446         if (first) {
447           first = false;
448           keyKV = KeyValue.createKeyValueFromKey(key);
449           LOG.info("First bottom when key > top: " + keyKV);
450           tmp = Bytes.toString(keyKV.getRow());
451           for (int i = 0; i < tmp.length(); i++) {
452             assertTrue(tmp.charAt(i) == 'a');
453           }
454         }
455       }
456       keyKV = KeyValue.createKeyValueFromKey(key);
457       LOG.info("Last bottom when key > top: " + keyKV);
458       for (int i = 0; i < tmp.length(); i++) {
459         assertTrue(Bytes.toString(keyKV.getRow()).charAt(i) == 'z');
460       }
461     } finally {
462       if (top != null) {
463         top.close(true); // evict since we are about to delete the file
464       }
465       if (bottom != null) {
466         bottom.close(true); // evict since we are about to delete the file
467       }
468       fs.delete(f.getPath(), true);
469     }
470   }
471 
472   private static final String localFormatter = "%010d";
473 
bloomWriteRead(StoreFile.Writer writer, FileSystem fs)474   private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) throws Exception {
475     float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
476     Path f = writer.getPath();
477     long now = System.currentTimeMillis();
478     for (int i = 0; i < 2000; i += 2) {
479       String row = String.format(localFormatter, i);
480       KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
481         "col".getBytes(), now, "value".getBytes());
482       writer.append(kv);
483     }
484     writer.close();
485 
486     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
487     reader.loadFileInfo();
488     reader.loadBloomfilter();
489     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
490 
491     // check false positives rate
492     int falsePos = 0;
493     int falseNeg = 0;
494     for (int i = 0; i < 2000; i++) {
495       String row = String.format(localFormatter, i);
496       TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
497       columns.add("family:col".getBytes());
498 
499       Scan scan = new Scan(row.getBytes(),row.getBytes());
500       scan.addColumn("family".getBytes(), "family:col".getBytes());
501       Store store = mock(Store.class);
502       HColumnDescriptor hcd = mock(HColumnDescriptor.class);
503       when(hcd.getName()).thenReturn(Bytes.toBytes("family"));
504       when(store.getFamily()).thenReturn(hcd);
505       boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
506       if (i % 2 == 0) {
507         if (!exists) falseNeg++;
508       } else {
509         if (exists) falsePos++;
510       }
511     }
512     reader.close(true); // evict because we are about to delete the file
513     fs.delete(f, true);
514     assertEquals("False negatives: " + falseNeg, 0, falseNeg);
515     int maxFalsePos = (int) (2 * 2000 * err);
516     assertTrue("Too many false positives: " + falsePos + " (err=" + err
517         + ", expected no more than " + maxFalsePos + ")",
518         falsePos <= maxFalsePos);
519   }
520 
521   private static final int BLOCKSIZE_SMALL = 8192;
522 
testBloomFilter()523   public void testBloomFilter() throws Exception {
524     FileSystem fs = FileSystem.getLocal(conf);
525     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
526     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
527 
528     // write the file
529     Path f = new Path(ROOT_DIR, getName());
530     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
531                         .withChecksumType(CKTYPE)
532                         .withBytesPerCheckSum(CKBYTES).build();
533     // Make a store file and write data to it.
534     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
535             .withFilePath(f)
536             .withBloomType(BloomType.ROW)
537             .withMaxKeyCount(2000)
538             .withFileContext(meta)
539             .build();
540     bloomWriteRead(writer, fs);
541   }
542 
testDeleteFamilyBloomFilter()543   public void testDeleteFamilyBloomFilter() throws Exception {
544     FileSystem fs = FileSystem.getLocal(conf);
545     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
546     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
547     float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
548 
549     // write the file
550     Path f = new Path(ROOT_DIR, getName());
551 
552     HFileContext meta = new HFileContextBuilder()
553                         .withBlockSize(BLOCKSIZE_SMALL)
554                         .withChecksumType(CKTYPE)
555                         .withBytesPerCheckSum(CKBYTES).build();
556     // Make a store file and write data to it.
557     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
558             .withFilePath(f)
559             .withMaxKeyCount(2000)
560             .withFileContext(meta)
561             .build();
562 
563     // add delete family
564     long now = System.currentTimeMillis();
565     for (int i = 0; i < 2000; i += 2) {
566       String row = String.format(localFormatter, i);
567       KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
568           "col".getBytes(), now, KeyValue.Type.DeleteFamily, "value".getBytes());
569       writer.append(kv);
570     }
571     writer.close();
572 
573     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
574     reader.loadFileInfo();
575     reader.loadBloomfilter();
576 
577     // check false positives rate
578     int falsePos = 0;
579     int falseNeg = 0;
580     for (int i = 0; i < 2000; i++) {
581       String row = String.format(localFormatter, i);
582       byte[] rowKey = Bytes.toBytes(row);
583       boolean exists = reader.passesDeleteFamilyBloomFilter(rowKey, 0,
584           rowKey.length);
585       if (i % 2 == 0) {
586         if (!exists)
587           falseNeg++;
588       } else {
589         if (exists)
590           falsePos++;
591       }
592     }
593     assertEquals(1000, reader.getDeleteFamilyCnt());
594     reader.close(true); // evict because we are about to delete the file
595     fs.delete(f, true);
596     assertEquals("False negatives: " + falseNeg, 0, falseNeg);
597     int maxFalsePos = (int) (2 * 2000 * err);
598     assertTrue("Too many false positives: " + falsePos + " (err=" + err
599         + ", expected no more than " + maxFalsePos, falsePos <= maxFalsePos);
600   }
601 
602   /**
603    * Test for HBASE-8012
604    */
testReseek()605   public void testReseek() throws Exception {
606     // write the file
607     Path f = new Path(ROOT_DIR, getName());
608     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
609     // Make a store file and write data to it.
610     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
611             .withFilePath(f)
612             .withFileContext(meta)
613             .build();
614 
615     writeStoreFile(writer);
616     writer.close();
617 
618     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
619 
620     // Now do reseek with empty KV to position to the beginning of the file
621 
622     KeyValue k = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
623     StoreFileScanner s = reader.getStoreFileScanner(false, false);
624     s.reseek(k);
625 
626     assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
627   }
628 
testBloomTypes()629   public void testBloomTypes() throws Exception {
630     float err = (float) 0.01;
631     FileSystem fs = FileSystem.getLocal(conf);
632     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
633     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
634 
635     int rowCount = 50;
636     int colCount = 10;
637     int versions = 2;
638 
639     // run once using columns and once using rows
640     BloomType[] bt = {BloomType.ROWCOL, BloomType.ROW};
641     int[] expKeys  = {rowCount*colCount, rowCount};
642     // below line deserves commentary.  it is expected bloom false positives
643     //  column = rowCount*2*colCount inserts
644     //  row-level = only rowCount*2 inserts, but failures will be magnified by
645     //              2nd for loop for every column (2*colCount)
646     float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
647 
648     for (int x : new int[]{0,1}) {
649       // write the file
650       Path f = new Path(ROOT_DIR, getName() + x);
651       HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
652           .withChecksumType(CKTYPE)
653           .withBytesPerCheckSum(CKBYTES).build();
654       // Make a store file and write data to it.
655       StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
656               .withFilePath(f)
657               .withBloomType(bt[x])
658               .withMaxKeyCount(expKeys[x])
659               .withFileContext(meta)
660               .build();
661 
662       long now = System.currentTimeMillis();
663       for (int i = 0; i < rowCount*2; i += 2) { // rows
664         for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
665           String row = String.format(localFormatter, i);
666           String col = String.format(localFormatter, j);
667           for (int k= 0; k < versions; ++k) { // versions
668             KeyValue kv = new KeyValue(row.getBytes(),
669               "family".getBytes(), ("col" + col).getBytes(),
670                 now-k, Bytes.toBytes((long)-1));
671             writer.append(kv);
672           }
673         }
674       }
675       writer.close();
676 
677       StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
678       reader.loadFileInfo();
679       reader.loadBloomfilter();
680       StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
681       assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount());
682 
683       Store store = mock(Store.class);
684       HColumnDescriptor hcd = mock(HColumnDescriptor.class);
685       when(hcd.getName()).thenReturn(Bytes.toBytes("family"));
686       when(store.getFamily()).thenReturn(hcd);
687       // check false positives rate
688       int falsePos = 0;
689       int falseNeg = 0;
690       for (int i = 0; i < rowCount*2; ++i) { // rows
691         for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
692           String row = String.format(localFormatter, i);
693           String col = String.format(localFormatter, j);
694           TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
695           columns.add(("col" + col).getBytes());
696 
697           Scan scan = new Scan(row.getBytes(),row.getBytes());
698           scan.addColumn("family".getBytes(), ("col"+col).getBytes());
699           boolean exists =
700               scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
701           boolean shouldRowExist = i % 2 == 0;
702           boolean shouldColExist = j % 2 == 0;
703           shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
704           if (shouldRowExist && shouldColExist) {
705             if (!exists) falseNeg++;
706           } else {
707             if (exists) falsePos++;
708           }
709         }
710       }
711       reader.close(true); // evict because we are about to delete the file
712       fs.delete(f, true);
713       System.out.println(bt[x].toString());
714       System.out.println("  False negatives: " + falseNeg);
715       System.out.println("  False positives: " + falsePos);
716       assertEquals(0, falseNeg);
717       assertTrue(falsePos < 2*expErr[x]);
718     }
719   }
720 
721   public void testSeqIdComparator() {
722     assertOrdering(StoreFile.Comparators.SEQ_ID, mockStoreFile(true, 100, 1000, -1, "/foo/123"),
723         mockStoreFile(true, 100, 1000, -1, "/foo/124"),
724         mockStoreFile(true, 99, 1000, -1, "/foo/126"),
725         mockStoreFile(true, 98, 2000, -1, "/foo/126"), mockStoreFile(false, 3453, -1, 1, "/foo/1"),
726         mockStoreFile(false, 2, -1, 3, "/foo/2"), mockStoreFile(false, 1000, -1, 5, "/foo/2"),
727         mockStoreFile(false, 76, -1, 5, "/foo/3"));
728   }
729 
730   /**
731    * Assert that the given comparator orders the given storefiles in the
732    * same way that they're passed.
733    */
734   private void assertOrdering(Comparator<StoreFile> comparator, StoreFile ... sfs) {
735     ArrayList<StoreFile> sorted = Lists.newArrayList(sfs);
736     Collections.shuffle(sorted);
737     Collections.sort(sorted, comparator);
738     LOG.debug("sfs: " + Joiner.on(",").join(sfs));
739     LOG.debug("sorted: " + Joiner.on(",").join(sorted));
740     assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
741   }
742 
743   /**
744    * Create a mock StoreFile with the given attributes.
745    */
746   private StoreFile mockStoreFile(boolean bulkLoad,
747                                   long size,
748                                   long bulkTimestamp,
749                                   long seqId,
750                                   String path) {
751     StoreFile mock = Mockito.mock(StoreFile.class);
752     StoreFile.Reader reader = Mockito.mock(StoreFile.Reader.class);
753 
754     Mockito.doReturn(size).when(reader).length();
755 
756     Mockito.doReturn(reader).when(mock).getReader();
757     Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
758     Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
759     Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
760     Mockito.doReturn(new Path(path)).when(mock).getPath();
761     String name = "mock storefile, bulkLoad=" + bulkLoad +
762       " bulkTimestamp=" + bulkTimestamp +
763       " seqId=" + seqId +
764       " path=" + path;
765     Mockito.doReturn(name).when(mock).toString();
766     return mock;
767   }
768 
769   /**
770    * Generate a list of KeyValues for testing based on given parameters
771    * @param timestamps
772    * @param numRows
773    * @param qualifier
774    * @param family
775    * @return
776    */
777   List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
778       byte[] qualifier, byte[] family) {
779     List<KeyValue> kvList = new ArrayList<KeyValue>();
780     for (int i=1;i<=numRows;i++) {
781       byte[] b = Bytes.toBytes(i) ;
782       LOG.info(Bytes.toString(b));
783       LOG.info(Bytes.toString(b));
784       for (long timestamp: timestamps)
785       {
786         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
787       }
788     }
789     return kvList;
790   }
791 
792   /**
793    * Test to ensure correctness when using StoreFile with multiple timestamps
794    * @throws IOException
795    */
796   public void testMultipleTimestamps() throws IOException {
797     byte[] family = Bytes.toBytes("familyname");
798     byte[] qualifier = Bytes.toBytes("qualifier");
799     int numRows = 10;
800     long[] timestamps = new long[] {20,10,5,1};
801     Scan scan = new Scan();
802 
803     // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
804     Path storedir = new Path(new Path(this.testDir, "7e0102"), "familyname");
805     Path dir = new Path(storedir, "1234567890");
806     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
807     // Make a store file and write data to it.
808     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
809             .withOutputDir(dir)
810             .withFileContext(meta)
811             .build();
812 
813     List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
814         qualifier, family);
815 
816     for (KeyValue kv : kvList) {
817       writer.append(kv);
818     }
819     writer.appendMetadata(0, false);
820     writer.close();
821 
822     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
823       BloomType.NONE);
824     Store store = mock(Store.class);
825     HColumnDescriptor hcd = mock(HColumnDescriptor.class);
826     when(hcd.getName()).thenReturn(family);
827     when(store.getFamily()).thenReturn(hcd);
828     StoreFile.Reader reader = hsf.createReader();
829     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
830     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
831     columns.add(qualifier);
832 
833     scan.setTimeRange(20, 100);
834     assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
835 
836     scan.setTimeRange(1, 2);
837     // lets make sure it still works with column family time ranges
838     scan.setColumnFamilyTimeRange(family, 7, 50);
839     assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
840 
841     scan.setTimeRange(8, 10);
842     assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
843 
844     scan.setTimeRange(7, 50);
845     assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
846 
847     // This test relies on the timestamp range optimization
848     scan = new Scan();
849     scan.setTimeRange(27, 50);
850     assertTrue(!scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
851 
852     // should still use the scanner because we override the family time range
853     scan = new Scan();
854     scan.setTimeRange(27, 50);
855     scan.setColumnFamilyTimeRange(family, 7, 50);
856     assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
857 
858   }
859 
860   public void testCacheOnWriteEvictOnClose() throws Exception {
861     Configuration conf = this.conf;
862 
863     // Find a home for our files (regiondir ("7e0102") and familyname).
864     Path baseDir = new Path(new Path(this.testDir, "7e0102"),"twoCOWEOC");
865 
866     // Grab the block cache and get the initial hit/miss counts
867     BlockCache bc = new CacheConfig(conf).getBlockCache();
868     assertNotNull(bc);
869     CacheStats cs = bc.getStats();
870     long startHit = cs.getHitCount();
871     long startMiss = cs.getMissCount();
872     long startEvicted = cs.getEvictedCount();
873 
874     // Let's write a StoreFile with three blocks, with cache on write off
875     conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
876     CacheConfig cacheConf = new CacheConfig(conf);
877     Path pathCowOff = new Path(baseDir, "123456789");
878     StoreFile.Writer writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
879     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
880       BloomType.NONE);
881     LOG.debug(hsf.getPath().toString());
882 
883     // Read this file, we should see 3 misses
884     StoreFile.Reader reader = hsf.createReader();
885     reader.loadFileInfo();
886     StoreFileScanner scanner = reader.getStoreFileScanner(true, true);
887     scanner.seek(KeyValue.LOWESTKEY);
888     while (scanner.next() != null);
889     assertEquals(startHit, cs.getHitCount());
890     assertEquals(startMiss + 3, cs.getMissCount());
891     assertEquals(startEvicted, cs.getEvictedCount());
892     startMiss += 3;
893     scanner.close();
894     reader.close(cacheConf.shouldEvictOnClose());
895 
896     // Now write a StoreFile with three blocks, with cache on write on
897     conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
898     cacheConf = new CacheConfig(conf);
899     Path pathCowOn = new Path(baseDir, "123456788");
900     writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
901     hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
902       BloomType.NONE);
903 
904     // Read this file, we should see 3 hits
905     reader = hsf.createReader();
906     scanner = reader.getStoreFileScanner(true, true);
907     scanner.seek(KeyValue.LOWESTKEY);
908     while (scanner.next() != null);
909     assertEquals(startHit + 3, cs.getHitCount());
910     assertEquals(startMiss, cs.getMissCount());
911     assertEquals(startEvicted, cs.getEvictedCount());
912     startHit += 3;
913     scanner.close();
914     reader.close(cacheConf.shouldEvictOnClose());
915 
916     // Let's read back the two files to ensure the blocks exactly match
917     hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
918       BloomType.NONE);
919     StoreFile.Reader readerOne = hsf.createReader();
920     readerOne.loadFileInfo();
921     StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
922     scannerOne.seek(KeyValue.LOWESTKEY);
923     hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
924       BloomType.NONE);
925     StoreFile.Reader readerTwo = hsf.createReader();
926     readerTwo.loadFileInfo();
927     StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
928     scannerTwo.seek(KeyValue.LOWESTKEY);
929     Cell kv1 = null;
930     Cell kv2 = null;
931     while ((kv1 = scannerOne.next()) != null) {
932       kv2 = scannerTwo.next();
933       assertTrue(kv1.equals(kv2));
934       KeyValue keyv1 = KeyValueUtil.ensureKeyValue(kv1);
935       KeyValue keyv2 = KeyValueUtil.ensureKeyValue(kv2);
936       assertTrue(Bytes.compareTo(
937           keyv1.getBuffer(), keyv1.getKeyOffset(), keyv1.getKeyLength(),
938           keyv2.getBuffer(), keyv2.getKeyOffset(), keyv2.getKeyLength()) == 0);
939       assertTrue(Bytes.compareTo(
940           kv1.getValueArray(), kv1.getValueOffset(), kv1.getValueLength(),
941           kv2.getValueArray(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
942     }
943     assertNull(scannerTwo.next());
944     assertEquals(startHit + 6, cs.getHitCount());
945     assertEquals(startMiss, cs.getMissCount());
946     assertEquals(startEvicted, cs.getEvictedCount());
947     startHit += 6;
948     scannerOne.close();
949     readerOne.close(cacheConf.shouldEvictOnClose());
950     scannerTwo.close();
951     readerTwo.close(cacheConf.shouldEvictOnClose());
952 
953     // Let's close the first file with evict on close turned on
954     conf.setBoolean("hbase.rs.evictblocksonclose", true);
955     cacheConf = new CacheConfig(conf);
956     hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
957       BloomType.NONE);
958     reader = hsf.createReader();
959     reader.close(cacheConf.shouldEvictOnClose());
960 
961     // We should have 3 new evictions
962     assertEquals(startHit, cs.getHitCount());
963     assertEquals(startMiss, cs.getMissCount());
964     assertEquals(startEvicted + 3, cs.getEvictedCount());
965     startEvicted += 3;
966 
967     // Let's close the second file with evict on close turned off
968     conf.setBoolean("hbase.rs.evictblocksonclose", false);
969     cacheConf = new CacheConfig(conf);
970     hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
971       BloomType.NONE);
972     reader = hsf.createReader();
973     reader.close(cacheConf.shouldEvictOnClose());
974 
975     // We expect no changes
976     assertEquals(startHit, cs.getHitCount());
977     assertEquals(startMiss, cs.getMissCount());
978     assertEquals(startEvicted, cs.getEvictedCount());
979   }
980 
981   private Path splitStoreFile(final HRegionFileSystem regionFs, final HRegionInfo hri,
982       final String family, final StoreFile sf, final byte[] splitKey, boolean isTopRef)
983       throws IOException {
984     FileSystem fs = regionFs.getFileSystem();
985     Path path = regionFs.splitStoreFile(hri, family, sf, splitKey, isTopRef, null);
986     if (null == path) {
987       return null;
988     }
989     Path regionDir = regionFs.commitDaughterRegion(hri);
990     return new Path(new Path(regionDir, family), path.getName());
991   }
992 
993   private StoreFile.Writer writeStoreFile(Configuration conf,
994       CacheConfig cacheConf, Path path, int numBlocks)
995   throws IOException {
996     // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
997     int numKVs = 5 * numBlocks;
998     List<KeyValue> kvs = new ArrayList<KeyValue>(numKVs);
999     byte [] b = Bytes.toBytes("x");
1000     int totalSize = 0;
1001     for (int i=numKVs;i>0;i--) {
1002       KeyValue kv = new KeyValue(b, b, b, i, b);
1003       kvs.add(kv);
1004       // kv has memstoreTS 0, which takes 1 byte to store.
1005       totalSize += kv.getLength() + 1;
1006     }
1007     int blockSize = totalSize / numBlocks;
1008     HFileContext meta = new HFileContextBuilder().withBlockSize(blockSize)
1009                         .withChecksumType(CKTYPE)
1010                         .withBytesPerCheckSum(CKBYTES)
1011                         .build();
1012     // Make a store file and write data to it.
1013     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
1014             .withFilePath(path)
1015             .withMaxKeyCount(2000)
1016             .withFileContext(meta)
1017             .build();
1018     // We'll write N-1 KVs to ensure we don't write an extra block
1019     kvs.remove(kvs.size()-1);
1020     for (KeyValue kv : kvs) {
1021       writer.append(kv);
1022     }
1023     writer.appendMetadata(0, false);
1024     writer.close();
1025     return writer;
1026   }
1027 
1028   /**
1029    * Check if data block encoding information is saved correctly in HFile's
1030    * file info.
1031    */
1032   public void testDataBlockEncodingMetaData() throws IOException {
1033     // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
1034     Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
1035     Path path = new Path(dir, "1234567890");
1036 
1037     DataBlockEncoding dataBlockEncoderAlgo =
1038         DataBlockEncoding.FAST_DIFF;
1039     HFileDataBlockEncoder dataBlockEncoder =
1040         new HFileDataBlockEncoderImpl(
1041             dataBlockEncoderAlgo);
1042     cacheConf = new CacheConfig(conf);
1043     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
1044         .withChecksumType(CKTYPE)
1045         .withBytesPerCheckSum(CKBYTES)
1046         .withDataBlockEncoding(dataBlockEncoderAlgo)
1047         .build();
1048     // Make a store file and write data to it.
1049     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
1050             .withFilePath(path)
1051             .withMaxKeyCount(2000)
1052             .withFileContext(meta)
1053             .build();
1054     writer.close();
1055 
1056     StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf,
1057       cacheConf, BloomType.NONE);
1058     StoreFile.Reader reader = storeFile.createReader();
1059 
1060     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1061     byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
1062     assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
1063   }
1064 }
1065 
1066