1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import java.io.DataInput;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.Map;
30 import java.util.SortedSet;
31 import java.util.UUID;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.KeyValue.KVComparator;
45 import org.apache.hadoop.hbase.KeyValueUtil;
46 import org.apache.hadoop.hbase.classification.InterfaceAudience;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
49 import org.apache.hadoop.hbase.io.TimeRange;
50 import org.apache.hadoop.hbase.io.hfile.BlockType;
51 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52 import org.apache.hadoop.hbase.io.hfile.HFile;
53 import org.apache.hadoop.hbase.io.hfile.HFileContext;
54 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
56 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
57 import org.apache.hadoop.hbase.util.BloomFilter;
58 import org.apache.hadoop.hbase.util.BloomFilterFactory;
59 import org.apache.hadoop.hbase.util.BloomFilterWriter;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.util.Writables;
62 import org.apache.hadoop.io.WritableUtils;
63 
64 import com.google.common.base.Function;
65 import com.google.common.base.Preconditions;
66 import com.google.common.collect.ImmutableList;
67 import com.google.common.collect.Ordering;
68 
69 /**
70  * A Store data file.  Stores usually have one or more of these files.  They
71  * are produced by flushing the memstore to disk.  To
72  * create, instantiate a writer using {@link StoreFile.WriterBuilder}
73  * and append data. Be sure to add any metadata before calling close on the
74  * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
75  * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
76  * passing filesystem and path.  To read, call {@link #createReader()}.
77  * <p>StoreFiles may also reference store files in another Store.
78  *
79  * The reason for this weird pattern where you use a different instance for the
80  * writer and a reader is that we write once but read a lot more.
81  */
82 @InterfaceAudience.LimitedPrivate("Coprocessor")
83 public class StoreFile {
84   private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
85 
86   // Keys for fileinfo values in HFile
87 
88   /** Max Sequence ID in FileInfo */
89   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
90 
91   /** Major compaction flag in FileInfo */
92   public static final byte[] MAJOR_COMPACTION_KEY =
93       Bytes.toBytes("MAJOR_COMPACTION_KEY");
94 
95   /** Minor compaction flag in FileInfo */
96   public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
97       Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
98 
99   /** Bloom filter Type in FileInfo */
100   public static final byte[] BLOOM_FILTER_TYPE_KEY =
101       Bytes.toBytes("BLOOM_FILTER_TYPE");
102 
103   /** Delete Family Count in FileInfo */
104   public static final byte[] DELETE_FAMILY_COUNT =
105       Bytes.toBytes("DELETE_FAMILY_COUNT");
106 
107   /** Last Bloom filter key in FileInfo */
108   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
109 
110   /** Key for Timerange information in metadata*/
111   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
112 
113   /** Key for timestamp of earliest-put in metadata*/
114   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
115 
116   private final StoreFileInfo fileInfo;
117   private final FileSystem fs;
118 
119   // Block cache configuration and reference.
120   private final CacheConfig cacheConf;
121 
122   // Keys for metadata stored in backing HFile.
123   // Set when we obtain a Reader.
124   private long sequenceid = -1;
125 
126   // max of the MemstoreTS in the KV's in this store
127   // Set when we obtain a Reader.
128   private long maxMemstoreTS = -1;
129 
130   // firstKey, lastkey and cellComparator will be set when openReader.
131   private byte[] firstKey;
132 
133   private byte[] lastKey;
134 
135   private KVComparator comparator;
136 
getCacheConf()137   CacheConfig getCacheConf() {
138     return cacheConf;
139   }
140 
getFirstKey()141   public byte[] getFirstKey() {
142     return firstKey;
143   }
144 
getLastKey()145   public byte[] getLastKey() {
146     return lastKey;
147   }
148 
getComparator()149   public KVComparator getComparator() {
150     return comparator;
151   }
152 
getMaxMemstoreTS()153   public long getMaxMemstoreTS() {
154     return maxMemstoreTS;
155   }
156 
setMaxMemstoreTS(long maxMemstoreTS)157   public void setMaxMemstoreTS(long maxMemstoreTS) {
158     this.maxMemstoreTS = maxMemstoreTS;
159   }
160 
161   // If true, this file was product of a major compaction.  Its then set
162   // whenever you get a Reader.
163   private AtomicBoolean majorCompaction = null;
164 
165   // If true, this file should not be included in minor compactions.
166   // It's set whenever you get a Reader.
167   private boolean excludeFromMinorCompaction = false;
168 
169   /** Meta key set when store file is a result of a bulk load */
170   public static final byte[] BULKLOAD_TASK_KEY =
171     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
172   public static final byte[] BULKLOAD_TIME_KEY =
173     Bytes.toBytes("BULKLOAD_TIMESTAMP");
174 
175   /**
176    * Map of the metadata entries in the corresponding HFile
177    */
178   private Map<byte[], byte[]> metadataMap;
179 
180   // StoreFile.Reader
181   private volatile Reader reader;
182 
183   /**
184    * Bloom filter type specified in column family configuration. Does not
185    * necessarily correspond to the Bloom filter type present in the HFile.
186    */
187   private final BloomType cfBloomType;
188 
189   /**
190    * Constructor, loads a reader and it's indices, etc. May allocate a
191    * substantial amount of ram depending on the underlying files (10-20MB?).
192    *
193    * @param fs  The current file system to use.
194    * @param p  The path of the file.
195    * @param conf  The current configuration.
196    * @param cacheConf  The cache configuration and block cache reference.
197    * @param cfBloomType The bloom type to use for this store file as specified
198    *          by column family configuration. This may or may not be the same
199    *          as the Bloom filter type actually present in the HFile, because
200    *          column family configuration might change. If this is
201    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
202    * @throws IOException When opening the reader fails.
203    */
StoreFile(final FileSystem fs, final Path p, final Configuration conf, final CacheConfig cacheConf, final BloomType cfBloomType)204   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
205         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
206     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
207   }
208 
209 
210   /**
211    * Constructor, loads a reader and it's indices, etc. May allocate a
212    * substantial amount of ram depending on the underlying files (10-20MB?).
213    *
214    * @param fs  The current file system to use.
215    * @param fileInfo  The store file information.
216    * @param conf  The current configuration.
217    * @param cacheConf  The cache configuration and block cache reference.
218    * @param cfBloomType The bloom type to use for this store file as specified
219    *          by column family configuration. This may or may not be the same
220    *          as the Bloom filter type actually present in the HFile, because
221    *          column family configuration might change. If this is
222    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
223    * @throws IOException When opening the reader fails.
224    */
StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf, final CacheConfig cacheConf, final BloomType cfBloomType)225   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
226       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
227     this.fs = fs;
228     this.fileInfo = fileInfo;
229     this.cacheConf = cacheConf;
230 
231     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
232       this.cfBloomType = cfBloomType;
233     } else {
234       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
235           "cfBloomType=" + cfBloomType + " (disabled in config)");
236       this.cfBloomType = BloomType.NONE;
237     }
238   }
239 
240   /**
241    * Clone
242    * @param other The StoreFile to clone from
243    */
StoreFile(final StoreFile other)244   public StoreFile(final StoreFile other) {
245     this.fs = other.fs;
246     this.fileInfo = other.fileInfo;
247     this.cacheConf = other.cacheConf;
248     this.cfBloomType = other.cfBloomType;
249   }
250 
251   /**
252    * @return the StoreFile object associated to this StoreFile.
253    *         null if the StoreFile is not a reference.
254    */
getFileInfo()255   public StoreFileInfo getFileInfo() {
256     return this.fileInfo;
257   }
258 
259   /**
260    * @return Path or null if this StoreFile was made with a Stream.
261    */
getPath()262   public Path getPath() {
263     return this.fileInfo.getPath();
264   }
265 
266   /**
267    * @return Returns the qualified path of this StoreFile
268    */
getQualifiedPath()269   public Path getQualifiedPath() {
270     return this.fileInfo.getPath().makeQualified(fs);
271   }
272 
273   /**
274    * @return True if this is a StoreFile Reference; call
275    * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
276    */
isReference()277   public boolean isReference() {
278     return this.fileInfo.isReference();
279   }
280 
281   /**
282    * @return True if this file was made by a major compaction.
283    */
isMajorCompaction()284   public boolean isMajorCompaction() {
285     if (this.majorCompaction == null) {
286       throw new NullPointerException("This has not been set yet");
287     }
288     return this.majorCompaction.get();
289   }
290 
291   /**
292    * @return True if this file should not be part of a minor compaction.
293    */
excludeFromMinorCompaction()294   public boolean excludeFromMinorCompaction() {
295     return this.excludeFromMinorCompaction;
296   }
297 
298   /**
299    * @return This files maximum edit sequence id.
300    */
getMaxSequenceId()301   public long getMaxSequenceId() {
302     return this.sequenceid;
303   }
304 
getModificationTimeStamp()305   public long getModificationTimeStamp() throws IOException {
306     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
307   }
308 
309   /**
310    * Only used by the Striped Compaction Policy
311    * @param key
312    * @return value associated with the metadata key
313    */
getMetadataValue(byte[] key)314   public byte[] getMetadataValue(byte[] key) {
315     return metadataMap.get(key);
316   }
317 
318   /**
319    * Return the largest memstoreTS found across all storefiles in
320    * the given list. Store files that were created by a mapreduce
321    * bulk load are ignored, as they do not correspond to any specific
322    * put operation, and thus do not have a memstoreTS associated with them.
323    * @return 0 if no non-bulk-load files are provided or, this is Store that
324    * does not yet have any store files.
325    */
getMaxMemstoreTSInList(Collection<StoreFile> sfs)326   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
327     long max = 0;
328     for (StoreFile sf : sfs) {
329       if (!sf.isBulkLoadResult()) {
330         max = Math.max(max, sf.getMaxMemstoreTS());
331       }
332     }
333     return max;
334   }
335 
336   /**
337    * Return the highest sequence ID found across all storefiles in
338    * the given list.
339    * @param sfs
340    * @return 0 if no non-bulk-load files are provided or, this is Store that
341    * does not yet have any store files.
342    */
getMaxSequenceIdInList(Collection<StoreFile> sfs)343   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
344     long max = 0;
345     for (StoreFile sf : sfs) {
346       max = Math.max(max, sf.getMaxSequenceId());
347     }
348     return max;
349   }
350 
351   /**
352    * Check if this storefile was created by bulk load.
353    * When a hfile is bulk loaded into HBase, we append
354    * '_SeqId_<id-when-loaded>' to the hfile name, unless
355    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
356    * explicitly turned off.
357    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
358    * is turned off, fall back to BULKLOAD_TIME_KEY.
359    * @return true if this storefile was created by bulk load.
360    */
isBulkLoadResult()361   boolean isBulkLoadResult() {
362     boolean bulkLoadedHFile = false;
363     String fileName = this.getPath().getName();
364     int startPos = fileName.indexOf("SeqId_");
365     if (startPos != -1) {
366       bulkLoadedHFile = true;
367     }
368     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
369   }
370 
371   /**
372    * Return the timestamp at which this bulk load file was generated.
373    */
getBulkLoadTimestamp()374   public long getBulkLoadTimestamp() {
375     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
376     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
377   }
378 
379   /**
380    * @return the cached value of HDFS blocks distribution. The cached value is
381    * calculated when store file is opened.
382    */
getHDFSBlockDistribution()383   public HDFSBlocksDistribution getHDFSBlockDistribution() {
384     return this.fileInfo.getHDFSBlockDistribution();
385   }
386 
387   /**
388    * Opens reader on this store file.  Called by Constructor.
389    * @return Reader for the store file.
390    * @throws IOException
391    * @see #closeReader(boolean)
392    */
open(boolean canUseDropBehind)393   private Reader open(boolean canUseDropBehind) throws IOException {
394     if (this.reader != null) {
395       throw new IllegalAccessError("Already open");
396     }
397 
398     // Open the StoreFile.Reader
399     this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
400 
401     // Load up indices and fileinfo. This also loads Bloom filter type.
402     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
403 
404     // Read in our metadata.
405     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
406     if (b != null) {
407       // By convention, if halfhfile, top half has a sequence number > bottom
408       // half. Thats why we add one in below. Its done for case the two halves
409       // are ever merged back together --rare.  Without it, on open of store,
410       // since store files are distinguished by sequence id, the one half would
411       // subsume the other.
412       this.sequenceid = Bytes.toLong(b);
413       if (fileInfo.isTopReference()) {
414         this.sequenceid += 1;
415       }
416     }
417 
418     if (isBulkLoadResult()){
419       // generate the sequenceId from the fileName
420       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
421       String fileName = this.getPath().getName();
422       // Use lastIndexOf() to get the last, most recent bulk load seqId.
423       int startPos = fileName.lastIndexOf("SeqId_");
424       if (startPos != -1) {
425         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
426             fileName.indexOf('_', startPos + 6)));
427         // Handle reference files as done above.
428         if (fileInfo.isTopReference()) {
429           this.sequenceid += 1;
430         }
431       }
432       this.reader.setBulkLoaded(true);
433     }
434     this.reader.setSequenceID(this.sequenceid);
435 
436     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
437     if (b != null) {
438       this.maxMemstoreTS = Bytes.toLong(b);
439     }
440 
441     b = metadataMap.get(MAJOR_COMPACTION_KEY);
442     if (b != null) {
443       boolean mc = Bytes.toBoolean(b);
444       if (this.majorCompaction == null) {
445         this.majorCompaction = new AtomicBoolean(mc);
446       } else {
447         this.majorCompaction.set(mc);
448       }
449     } else {
450       // Presume it is not major compacted if it doesn't explicity say so
451       // HFileOutputFormat explicitly sets the major compacted key.
452       this.majorCompaction = new AtomicBoolean(false);
453     }
454 
455     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
456     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
457 
458     BloomType hfileBloomType = reader.getBloomFilterType();
459     if (cfBloomType != BloomType.NONE) {
460       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
461       if (hfileBloomType != cfBloomType) {
462         LOG.info("HFile Bloom filter type for "
463             + reader.getHFileReader().getName() + ": " + hfileBloomType
464             + ", but " + cfBloomType + " specified in column family "
465             + "configuration");
466       }
467     } else if (hfileBloomType != BloomType.NONE) {
468       LOG.info("Bloom filter turned off by CF config for "
469           + reader.getHFileReader().getName());
470     }
471 
472     // load delete family bloom filter
473     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
474 
475     try {
476       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
477       if (timerangeBytes != null) {
478         this.reader.timeRangeTracker = new TimeRangeTracker();
479         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
480       }
481     } catch (IllegalArgumentException e) {
482       LOG.error("Error reading timestamp range data from meta -- " +
483           "proceeding without", e);
484       this.reader.timeRangeTracker = null;
485     }
486     // initialize so we can reuse them after reader closed.
487     firstKey = reader.getFirstKey();
488     lastKey = reader.getLastKey();
489     comparator = reader.getComparator();
490     return this.reader;
491   }
492 
createReader()493   public Reader createReader() throws IOException {
494     return createReader(false);
495   }
496 
497   /**
498    * @return Reader for StoreFile. creates if necessary
499    * @throws IOException
500    */
createReader(boolean canUseDropBehind)501   public Reader createReader(boolean canUseDropBehind) throws IOException {
502     if (this.reader == null) {
503       try {
504         this.reader = open(canUseDropBehind);
505       } catch (IOException e) {
506         try {
507           boolean evictOnClose =
508               cacheConf != null? cacheConf.shouldEvictOnClose(): true;
509           this.closeReader(evictOnClose);
510         } catch (IOException ee) {
511         }
512         throw e;
513       }
514 
515     }
516     return this.reader;
517   }
518 
519   /**
520    * @return Current reader.  Must call createReader first else returns null.
521    * @see #createReader()
522    */
getReader()523   public Reader getReader() {
524     return this.reader;
525   }
526 
527   /**
528    * @param evictOnClose whether to evict blocks belonging to this file
529    * @throws IOException
530    */
closeReader(boolean evictOnClose)531   public synchronized void closeReader(boolean evictOnClose)
532       throws IOException {
533     if (this.reader != null) {
534       this.reader.close(evictOnClose);
535       this.reader = null;
536     }
537   }
538 
539   /**
540    * Delete this file
541    * @throws IOException
542    */
deleteReader()543   public void deleteReader() throws IOException {
544     boolean evictOnClose =
545         cacheConf != null? cacheConf.shouldEvictOnClose(): true;
546     closeReader(evictOnClose);
547     this.fs.delete(getPath(), true);
548   }
549 
550   @Override
toString()551   public String toString() {
552     return this.fileInfo.toString();
553   }
554 
555   /**
556    * @return a length description of this StoreFile, suitable for debug output
557    */
toStringDetailed()558   public String toStringDetailed() {
559     StringBuilder sb = new StringBuilder();
560     sb.append(this.getPath().toString());
561     sb.append(", isReference=").append(isReference());
562     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
563     if (isBulkLoadResult()) {
564       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
565     } else {
566       sb.append(", seqid=").append(getMaxSequenceId());
567     }
568     sb.append(", majorCompaction=").append(isMajorCompaction());
569 
570     return sb.toString();
571   }
572 
573   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
574       justification="Will not overflow")
575   public static class WriterBuilder {
576     private final Configuration conf;
577     private final CacheConfig cacheConf;
578     private final FileSystem fs;
579 
580     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
581     private BloomType bloomType = BloomType.NONE;
582     private long maxKeyCount = 0;
583     private Path dir;
584     private Path filePath;
585     private InetSocketAddress[] favoredNodes;
586     private HFileContext fileContext;
587 
WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs)588     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
589         FileSystem fs) {
590       this.conf = conf;
591       this.cacheConf = cacheConf;
592       this.fs = fs;
593     }
594 
595     /**
596      * Use either this method or {@link #withFilePath}, but not both.
597      * @param dir Path to column family directory. The directory is created if
598      *          does not exist. The file is given a unique name within this
599      *          directory.
600      * @return this (for chained invocation)
601      */
withOutputDir(Path dir)602     public WriterBuilder withOutputDir(Path dir) {
603       Preconditions.checkNotNull(dir);
604       this.dir = dir;
605       return this;
606     }
607 
608     /**
609      * Use either this method or {@link #withOutputDir}, but not both.
610      * @param filePath the StoreFile path to write
611      * @return this (for chained invocation)
612      */
withFilePath(Path filePath)613     public WriterBuilder withFilePath(Path filePath) {
614       Preconditions.checkNotNull(filePath);
615       this.filePath = filePath;
616       return this;
617     }
618 
619     /**
620      * @param favoredNodes an array of favored nodes or possibly null
621      * @return this (for chained invocation)
622      */
withFavoredNodes(InetSocketAddress[] favoredNodes)623     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
624       this.favoredNodes = favoredNodes;
625       return this;
626     }
627 
withComparator(KeyValue.KVComparator comparator)628     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
629       Preconditions.checkNotNull(comparator);
630       this.comparator = comparator;
631       return this;
632     }
633 
withBloomType(BloomType bloomType)634     public WriterBuilder withBloomType(BloomType bloomType) {
635       Preconditions.checkNotNull(bloomType);
636       this.bloomType = bloomType;
637       return this;
638     }
639 
640     /**
641      * @param maxKeyCount estimated maximum number of keys we expect to add
642      * @return this (for chained invocation)
643      */
withMaxKeyCount(long maxKeyCount)644     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
645       this.maxKeyCount = maxKeyCount;
646       return this;
647     }
648 
withFileContext(HFileContext fileContext)649     public WriterBuilder withFileContext(HFileContext fileContext) {
650       this.fileContext = fileContext;
651       return this;
652     }
653 
withShouldDropCacheBehind(boolean shouldDropCacheBehind )654     public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind/*NOT USED!!*/) {
655       // TODO: HAS NO EFFECT!!! FIX!!
656       return this;
657     }
658     /**
659      * Create a store file writer. Client is responsible for closing file when
660      * done. If metadata, add BEFORE closing using
661      * {@link Writer#appendMetadata}.
662      */
build()663     public Writer build() throws IOException {
664       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
665         throw new IllegalArgumentException("Either specify parent directory " +
666             "or file path");
667       }
668 
669       if (dir == null) {
670         dir = filePath.getParent();
671       }
672 
673       if (!fs.exists(dir)) {
674         fs.mkdirs(dir);
675       }
676 
677       if (filePath == null) {
678         filePath = getUniqueFile(fs, dir);
679         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
680           bloomType = BloomType.NONE;
681         }
682       }
683 
684       if (comparator == null) {
685         comparator = KeyValue.COMPARATOR;
686       }
687       return new Writer(fs, filePath,
688           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
689     }
690   }
691 
692   /**
693    * @param fs
694    * @param dir Directory to create file in.
695    * @return random filename inside passed <code>dir</code>
696    */
getUniqueFile(final FileSystem fs, final Path dir)697   public static Path getUniqueFile(final FileSystem fs, final Path dir)
698       throws IOException {
699     if (!fs.getFileStatus(dir).isDirectory()) {
700       throw new IOException("Expecting " + dir.toString() +
701         " to be a directory");
702     }
703     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
704   }
705 
getMinimumTimestamp()706   public Long getMinimumTimestamp() {
707     return (getReader().timeRangeTracker == null) ?
708         null :
709         getReader().timeRangeTracker.getMinimumTimestamp();
710   }
711 
712   /**
713    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
714    * @param comparator Comparator used to compare KVs.
715    * @return The split point row, or null if splitting is not possible, or reader is null.
716    */
717   @SuppressWarnings("deprecation")
getFileSplitPoint(KVComparator comparator)718   byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
719     if (this.reader == null) {
720       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
721       return null;
722     }
723     // Get first, last, and mid keys.  Midkey is the key that starts block
724     // in middle of hfile.  Has column and timestamp.  Need to return just
725     // the row we want to split on as midkey.
726     byte [] midkey = this.reader.midkey();
727     if (midkey != null) {
728       KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
729       byte [] fk = this.reader.getFirstKey();
730       KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
731       byte [] lk = this.reader.getLastKey();
732       KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
733       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
734       if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
735         if (LOG.isDebugEnabled()) {
736           LOG.debug("cannot split because midkey is the same as first or last row");
737         }
738         return null;
739       }
740       return mk.getRow();
741     }
742     return null;
743   }
744 
745   /**
746    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
747    * local because it is an implementation detail of the HBase regionserver.
748    */
749   public static class Writer implements Compactor.CellSink {
750     private final BloomFilterWriter generalBloomFilterWriter;
751     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
752     private final BloomType bloomType;
753     private byte[] lastBloomKey;
754     private int lastBloomKeyOffset, lastBloomKeyLen;
755     private KVComparator kvComparator;
756     private Cell lastCell = null;
757     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
758     private Cell lastDeleteFamilyCell = null;
759     private long deleteFamilyCnt = 0;
760 
761     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
762     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
763      * When flushing a memstore, we set TimeRange and use this variable to
764      * indicate that it doesn't need to be calculated again while
765      * appending KeyValues.
766      * It is not set in cases of compactions when it is recalculated using only
767      * the appended KeyValues*/
768     boolean isTimeRangeTrackerSet = false;
769 
770     protected HFile.Writer writer;
771 
772     /**
773      * Creates an HFile.Writer that also write helpful meta data.
774      * @param fs file system to write to
775      * @param path file name to create
776      * @param conf user configuration
777      * @param comparator key comparator
778      * @param bloomType bloom filter setting
779      * @param maxKeys the expected maximum number of keys to be added. Was used
780      *        for Bloom filter size in {@link HFile} format version 1.
781      * @param favoredNodes
782      * @param fileContext - The HFile context
783      * @throws IOException problem writing to FS
784      */
Writer(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, final KVComparator comparator, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext)785     private Writer(FileSystem fs, Path path,
786         final Configuration conf,
787         CacheConfig cacheConf,
788         final KVComparator comparator, BloomType bloomType, long maxKeys,
789         InetSocketAddress[] favoredNodes, HFileContext fileContext)
790             throws IOException {
791       writer = HFile.getWriterFactory(conf, cacheConf)
792           .withPath(fs, path)
793           .withComparator(comparator)
794           .withFavoredNodes(favoredNodes)
795           .withFileContext(fileContext)
796           .create();
797 
798       this.kvComparator = comparator;
799 
800       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
801           conf, cacheConf, bloomType,
802           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
803 
804       if (generalBloomFilterWriter != null) {
805         this.bloomType = bloomType;
806         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
807           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
808       } else {
809         // Not using Bloom filters.
810         this.bloomType = BloomType.NONE;
811       }
812 
813       // initialize delete family Bloom filter when there is NO RowCol Bloom
814       // filter
815       if (this.bloomType != BloomType.ROWCOL) {
816         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
817             .createDeleteBloomAtWrite(conf, cacheConf,
818                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
819       } else {
820         deleteFamilyBloomFilterWriter = null;
821       }
822       if (deleteFamilyBloomFilterWriter != null) {
823         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
824             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
825       }
826     }
827 
828     /**
829      * Writes meta data.
830      * Call before {@link #close()} since its written as meta data to this file.
831      * @param maxSequenceId Maximum sequence id.
832      * @param majorCompaction True if this file is product of a major compaction
833      * @throws IOException problem writing to FS
834      */
appendMetadata(final long maxSequenceId, final boolean majorCompaction)835     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
836     throws IOException {
837       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
838       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
839           Bytes.toBytes(majorCompaction));
840       appendTrackedTimestampsToMetadata();
841     }
842 
843     /**
844      * Add TimestampRange and earliest put timestamp to Metadata
845      */
appendTrackedTimestampsToMetadata()846     public void appendTrackedTimestampsToMetadata() throws IOException {
847       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
848       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
849     }
850 
851     /**
852      * Set TimeRangeTracker
853      * @param trt
854      */
setTimeRangeTracker(final TimeRangeTracker trt)855     public void setTimeRangeTracker(final TimeRangeTracker trt) {
856       this.timeRangeTracker = trt;
857       isTimeRangeTrackerSet = true;
858     }
859 
860     /**
861      * Record the earlest Put timestamp.
862      *
863      * If the timeRangeTracker is not set,
864      * update TimeRangeTracker to include the timestamp of this key
865      * @param cell
866      */
trackTimestamps(final Cell cell)867     public void trackTimestamps(final Cell cell) {
868       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
869         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
870       }
871       if (!isTimeRangeTrackerSet) {
872         timeRangeTracker.includeTimestamp(cell);
873       }
874     }
875 
appendGeneralBloomfilter(final Cell cell)876     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
877       if (this.generalBloomFilterWriter != null) {
878         // only add to the bloom filter on a new, unique key
879         boolean newKey = true;
880         if (this.lastCell != null) {
881           switch(bloomType) {
882           case ROW:
883             newKey = ! kvComparator.matchingRows(cell, lastCell);
884             break;
885           case ROWCOL:
886             newKey = ! kvComparator.matchingRowColumn(cell, lastCell);
887             break;
888           case NONE:
889             newKey = false;
890             break;
891           default:
892             throw new IOException("Invalid Bloom filter type: " + bloomType +
893                 " (ROW or ROWCOL expected)");
894           }
895         }
896         if (newKey) {
897           /*
898            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
899            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
900            *
901            * 2 Types of Filtering:
902            *  1. Row = Row
903            *  2. RowCol = Row + Qualifier
904            */
905           byte[] bloomKey;
906           int bloomKeyOffset, bloomKeyLen;
907 
908           switch (bloomType) {
909           case ROW:
910             bloomKey = cell.getRowArray();
911             bloomKeyOffset = cell.getRowOffset();
912             bloomKeyLen = cell.getRowLength();
913             break;
914           case ROWCOL:
915             // merge(row, qualifier)
916             // TODO: could save one buffer copy in case of compound Bloom
917             // filters when this involves creating a KeyValue
918             bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(),
919                 cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(),
920                 cell.getQualifierOffset(), cell.getQualifierLength());
921             bloomKeyOffset = 0;
922             bloomKeyLen = bloomKey.length;
923             break;
924           default:
925             throw new IOException("Invalid Bloom filter type: " + bloomType +
926                 " (ROW or ROWCOL expected)");
927           }
928           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
929           if (lastBloomKey != null
930               && generalBloomFilterWriter.getComparator().compareFlatKey(bloomKey,
931                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
932                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
933             throw new IOException("Non-increasing Bloom keys: "
934                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
935                 + " after "
936                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
937                     lastBloomKeyLen));
938           }
939           lastBloomKey = bloomKey;
940           lastBloomKeyOffset = bloomKeyOffset;
941           lastBloomKeyLen = bloomKeyLen;
942           this.lastCell = cell;
943         }
944       }
945     }
946 
appendDeleteFamilyBloomFilter(final Cell cell)947     private void appendDeleteFamilyBloomFilter(final Cell cell)
948         throws IOException {
949       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
950         return;
951       }
952 
953       // increase the number of delete family in the store file
954       deleteFamilyCnt++;
955       if (null != this.deleteFamilyBloomFilterWriter) {
956         boolean newKey = true;
957         if (lastDeleteFamilyCell != null) {
958           newKey = !kvComparator.matchingRows(cell, lastDeleteFamilyCell);
959         }
960         if (newKey) {
961           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
962               cell.getRowOffset(), cell.getRowLength());
963           this.lastDeleteFamilyCell = cell;
964         }
965       }
966     }
967 
append(final Cell cell)968     public void append(final Cell cell) throws IOException {
969       appendGeneralBloomfilter(cell);
970       appendDeleteFamilyBloomFilter(cell);
971       writer.append(cell);
972       trackTimestamps(cell);
973     }
974 
getPath()975     public Path getPath() {
976       return this.writer.getPath();
977     }
978 
hasGeneralBloom()979     boolean hasGeneralBloom() {
980       return this.generalBloomFilterWriter != null;
981     }
982 
983     /**
984      * For unit testing only.
985      *
986      * @return the Bloom filter used by this writer.
987      */
getGeneralBloomWriter()988     BloomFilterWriter getGeneralBloomWriter() {
989       return generalBloomFilterWriter;
990     }
991 
closeBloomFilter(BloomFilterWriter bfw)992     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
993       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
994       if (haveBloom) {
995         bfw.compactBloom();
996       }
997       return haveBloom;
998     }
999 
closeGeneralBloomFilter()1000     private boolean closeGeneralBloomFilter() throws IOException {
1001       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1002 
1003       // add the general Bloom filter writer and append file info
1004       if (hasGeneralBloom) {
1005         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1006         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1007             Bytes.toBytes(bloomType.toString()));
1008         if (lastBloomKey != null) {
1009           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1010               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1011                   + lastBloomKeyLen));
1012         }
1013       }
1014       return hasGeneralBloom;
1015     }
1016 
closeDeleteFamilyBloomFilter()1017     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1018       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1019 
1020       // add the delete family Bloom filter writer
1021       if (hasDeleteFamilyBloom) {
1022         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1023       }
1024 
1025       // append file info about the number of delete family kvs
1026       // even if there is no delete family Bloom.
1027       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1028           Bytes.toBytes(this.deleteFamilyCnt));
1029 
1030       return hasDeleteFamilyBloom;
1031     }
1032 
close()1033     public void close() throws IOException {
1034       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1035       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1036 
1037       writer.close();
1038 
1039       // Log final Bloom filter statistics. This needs to be done after close()
1040       // because compound Bloom filters might be finalized as part of closing.
1041       if (StoreFile.LOG.isTraceEnabled()) {
1042         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1043           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1044           getPath());
1045       }
1046 
1047     }
1048 
appendFileInfo(byte[] key, byte[] value)1049     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1050       writer.appendFileInfo(key, value);
1051     }
1052 
1053     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1054      */
getHFileWriter()1055     HFile.Writer getHFileWriter() {
1056       return writer;
1057     }
1058   }
1059 
1060   /**
1061    * Reader for a StoreFile.
1062    */
1063   public static class Reader {
1064     private static final Log LOG = LogFactory.getLog(Reader.class.getName());
1065 
1066     protected BloomFilter generalBloomFilter = null;
1067     protected BloomFilter deleteFamilyBloomFilter = null;
1068     protected BloomType bloomFilterType;
1069     private final HFile.Reader reader;
1070     protected TimeRangeTracker timeRangeTracker = null;
1071     protected long sequenceID = -1;
1072     private byte[] lastBloomKey;
1073     private long deleteFamilyCnt = -1;
1074     private boolean bulkLoadResult = false;
1075 
Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)1076     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1077         throws IOException {
1078       reader = HFile.createReader(fs, path, cacheConf, conf);
1079       bloomFilterType = BloomType.NONE;
1080     }
1081 
Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Configuration conf)1082     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1083         CacheConfig cacheConf, Configuration conf) throws IOException {
1084       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1085       bloomFilterType = BloomType.NONE;
1086     }
1087 
setReplicaStoreFile(boolean isPrimaryReplicaStoreFile)1088     public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
1089       reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
1090     }
isPrimaryReplicaReader()1091     public boolean isPrimaryReplicaReader() {
1092       return reader.isPrimaryReplicaReader();
1093     }
1094 
1095     /**
1096      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1097      */
Reader()1098     Reader() {
1099       this.reader = null;
1100     }
1101 
getComparator()1102     public KVComparator getComparator() {
1103       return reader.getComparator();
1104     }
1105 
1106     /**
1107      * Get a scanner to scan over this StoreFile. Do not use
1108      * this overload if using this scanner for compactions.
1109      *
1110      * @param cacheBlocks should this scanner cache blocks?
1111      * @param pread use pread (for highly concurrent small readers)
1112      * @return a scanner
1113      */
getStoreFileScanner(boolean cacheBlocks, boolean pread)1114     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1115                                                boolean pread) {
1116       return getStoreFileScanner(cacheBlocks, pread, false,
1117         // 0 is passed as readpoint because this method is only used by test
1118         // where StoreFile is directly operated upon
1119         0);
1120     }
1121 
1122     /**
1123      * Get a scanner to scan over this StoreFile.
1124      *
1125      * @param cacheBlocks should this scanner cache blocks?
1126      * @param pread use pread (for highly concurrent small readers)
1127      * @param isCompaction is scanner being used for compaction?
1128      * @return a scanner
1129      */
getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt)1130     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1131                                                boolean pread,
1132                                                boolean isCompaction, long readPt) {
1133       return new StoreFileScanner(this,
1134                                  getScanner(cacheBlocks, pread, isCompaction),
1135                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1136     }
1137 
1138     /**
1139      * Warning: Do not write further code which depends on this call. Instead
1140      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1141      * which is the preferred way to scan a store with higher level concepts.
1142      *
1143      * @param cacheBlocks should we cache the blocks?
1144      * @param pread use pread (for concurrent small readers)
1145      * @return the underlying HFileScanner
1146      */
1147     @Deprecated
getScanner(boolean cacheBlocks, boolean pread)1148     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1149       return getScanner(cacheBlocks, pread, false);
1150     }
1151 
1152     /**
1153      * Warning: Do not write further code which depends on this call. Instead
1154      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1155      * which is the preferred way to scan a store with higher level concepts.
1156      *
1157      * @param cacheBlocks
1158      *          should we cache the blocks?
1159      * @param pread
1160      *          use pread (for concurrent small readers)
1161      * @param isCompaction
1162      *          is scanner being used for compaction?
1163      * @return the underlying HFileScanner
1164      */
1165     @Deprecated
getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction)1166     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1167         boolean isCompaction) {
1168       return reader.getScanner(cacheBlocks, pread, isCompaction);
1169     }
1170 
close(boolean evictOnClose)1171     public void close(boolean evictOnClose) throws IOException {
1172       reader.close(evictOnClose);
1173     }
1174 
1175     /**
1176      * Check if this storeFile may contain keys within the TimeRange that
1177      * have not expired (i.e. not older than oldestUnexpiredTS).
1178      * @param timeRange the timeRange to restrict
1179      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1180      *          determined by the column family's TTL
1181      * @return false if queried keys definitely don't exist in this StoreFile
1182      */
passesTimerangeFilter(TimeRange timeRange, long oldestUnexpiredTS)1183     boolean passesTimerangeFilter(TimeRange timeRange, long oldestUnexpiredTS) {
1184       if (timeRangeTracker == null) {
1185         return true;
1186       } else {
1187         return timeRangeTracker.includesTimeRange(timeRange) &&
1188             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1189       }
1190     }
1191 
1192     /**
1193      * Checks whether the given scan passes the Bloom filter (if present). Only
1194      * checks Bloom filters for single-row or single-row-column scans. Bloom
1195      * filter checking for multi-gets is implemented as part of the store
1196      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1197      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1198      * int, int)}.
1199      *
1200      * @param scan the scan specification. Used to determine the row, and to
1201      *          check whether this is a single-row ("get") scan.
1202      * @param columns the set of columns. Only used for row-column Bloom
1203      *          filters.
1204      * @return true if the scan with the given column set passes the Bloom
1205      *         filter, or if the Bloom filter is not applicable for the scan.
1206      *         False if the Bloom filter is applicable and the scan fails it.
1207      */
passesBloomFilter(Scan scan, final SortedSet<byte[]> columns)1208      boolean passesBloomFilter(Scan scan,
1209         final SortedSet<byte[]> columns) {
1210       // Multi-column non-get scans will use Bloom filters through the
1211       // lower-level API function that this function calls.
1212       if (!scan.isGetScan()) {
1213         return true;
1214       }
1215 
1216       byte[] row = scan.getStartRow();
1217       switch (this.bloomFilterType) {
1218         case ROW:
1219           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1220 
1221         case ROWCOL:
1222           if (columns != null && columns.size() == 1) {
1223             byte[] column = columns.first();
1224             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1225                 column.length);
1226           }
1227 
1228           // For multi-column queries the Bloom filter is checked from the
1229           // seekExact operation.
1230           return true;
1231 
1232         default:
1233           return true;
1234       }
1235     }
1236 
passesDeleteFamilyBloomFilter(byte[] row, int rowOffset, int rowLen)1237     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1238         int rowLen) {
1239       // Cache Bloom filter as a local variable in case it is set to null by
1240       // another thread on an IO error.
1241       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1242 
1243       // Empty file or there is no delete family at all
1244       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1245         return false;
1246       }
1247 
1248       if (bloomFilter == null) {
1249         return true;
1250       }
1251 
1252       try {
1253         if (!bloomFilter.supportsAutoLoading()) {
1254           return true;
1255         }
1256         return bloomFilter.contains(row, rowOffset, rowLen, null);
1257       } catch (IllegalArgumentException e) {
1258         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1259             e);
1260         setDeleteFamilyBloomFilterFaulty();
1261       }
1262 
1263       return true;
1264     }
1265 
1266     /**
1267      * A method for checking Bloom filters. Called directly from
1268      * StoreFileScanner in case of a multi-column query.
1269      *
1270      * @param row
1271      * @param rowOffset
1272      * @param rowLen
1273      * @param col
1274      * @param colOffset
1275      * @param colLen
1276      * @return True if passes
1277      */
passesGeneralBloomFilter(byte[] row, int rowOffset, int rowLen, byte[] col, int colOffset, int colLen)1278     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1279         int rowLen, byte[] col, int colOffset, int colLen) {
1280       // Cache Bloom filter as a local variable in case it is set to null by
1281       // another thread on an IO error.
1282       BloomFilter bloomFilter = this.generalBloomFilter;
1283       if (bloomFilter == null) {
1284         return true;
1285       }
1286 
1287       byte[] key;
1288       switch (bloomFilterType) {
1289         case ROW:
1290           if (col != null) {
1291             throw new RuntimeException("Row-only Bloom filter called with " +
1292                 "column specified");
1293           }
1294           if (rowOffset != 0 || rowLen != row.length) {
1295               throw new AssertionError("For row-only Bloom filters the row "
1296                   + "must occupy the whole array");
1297           }
1298           key = row;
1299           break;
1300 
1301         case ROWCOL:
1302           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1303               colOffset, colLen);
1304 
1305           break;
1306 
1307         default:
1308           return true;
1309       }
1310 
1311       // Empty file
1312       if (reader.getTrailer().getEntryCount() == 0)
1313         return false;
1314 
1315       try {
1316         boolean shouldCheckBloom;
1317         ByteBuffer bloom;
1318         if (bloomFilter.supportsAutoLoading()) {
1319           bloom = null;
1320           shouldCheckBloom = true;
1321         } else {
1322           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1323               true);
1324           shouldCheckBloom = bloom != null;
1325         }
1326 
1327         if (shouldCheckBloom) {
1328           boolean exists;
1329 
1330           // Whether the primary Bloom key is greater than the last Bloom key
1331           // from the file info. For row-column Bloom filters this is not yet
1332           // a sufficient condition to return false.
1333           boolean keyIsAfterLast = lastBloomKey != null
1334               && bloomFilter.getComparator().compareFlatKey(key, lastBloomKey) > 0;
1335 
1336           if (bloomFilterType == BloomType.ROWCOL) {
1337             // Since a Row Delete is essentially a DeleteFamily applied to all
1338             // columns, a file might be skipped if using row+col Bloom filter.
1339             // In order to ensure this file is included an additional check is
1340             // required looking only for a row bloom.
1341             byte[] rowBloomKey = bloomFilter.createBloomKey(row, rowOffset, rowLen,
1342                 null, 0, 0);
1343 
1344             if (keyIsAfterLast
1345                 && bloomFilter.getComparator().compareFlatKey(rowBloomKey,
1346                     lastBloomKey) > 0) {
1347               exists = false;
1348             } else {
1349               exists =
1350                   bloomFilter.contains(key, 0, key.length, bloom) ||
1351                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1352                       bloom);
1353             }
1354           } else {
1355             exists = !keyIsAfterLast
1356                 && bloomFilter.contains(key, 0, key.length, bloom);
1357           }
1358 
1359           return exists;
1360         }
1361       } catch (IOException e) {
1362         LOG.error("Error reading bloom filter data -- proceeding without",
1363             e);
1364         setGeneralBloomFilterFaulty();
1365       } catch (IllegalArgumentException e) {
1366         LOG.error("Bad bloom filter data -- proceeding without", e);
1367         setGeneralBloomFilterFaulty();
1368       }
1369 
1370       return true;
1371     }
1372 
1373     /**
1374      * Checks whether the given scan rowkey range overlaps with the current storefile's
1375      * @param scan the scan specification. Used to determine the rowkey range.
1376      * @return true if there is overlap, false otherwise
1377      */
passesKeyRangeFilter(Scan scan)1378     public boolean passesKeyRangeFilter(Scan scan) {
1379       if (this.getFirstKey() == null || this.getLastKey() == null) {
1380         // the file is empty
1381         return false;
1382       }
1383       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1384           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1385         return true;
1386       }
1387       KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValueUtil
1388           .createFirstOnRow(scan.getStopRow()) : KeyValueUtil.createFirstOnRow(scan
1389           .getStartRow());
1390       KeyValue largestScanKeyValue = scan.isReversed() ? KeyValueUtil
1391           .createLastOnRow(scan.getStartRow()) : KeyValueUtil.createLastOnRow(scan
1392           .getStopRow());
1393       boolean nonOverLapping = (getComparator().compareFlatKey(
1394           this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
1395           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1396               HConstants.EMPTY_END_ROW))
1397           || getComparator().compareFlatKey(this.getLastKey(),
1398               smallestScanKeyValue.getKey()) < 0;
1399       return !nonOverLapping;
1400     }
1401 
1402     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1403       Map<byte [], byte []> fi = reader.loadFileInfo();
1404 
1405       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1406       if (b != null) {
1407         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1408       }
1409 
1410       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1411       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1412       if (cnt != null) {
1413         deleteFamilyCnt = Bytes.toLong(cnt);
1414       }
1415 
1416       return fi;
1417     }
1418 
1419     public void loadBloomfilter() {
1420       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1421       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1422     }
1423 
1424     private void loadBloomfilter(BlockType blockType) {
1425       try {
1426         if (blockType == BlockType.GENERAL_BLOOM_META) {
1427           if (this.generalBloomFilter != null)
1428             return; // Bloom has been loaded
1429 
1430           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1431           if (bloomMeta != null) {
1432             // sanity check for NONE Bloom filter
1433             if (bloomFilterType == BloomType.NONE) {
1434               throw new IOException(
1435                   "valid bloom filter type not found in FileInfo");
1436             } else {
1437               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1438                   reader);
1439               if (LOG.isTraceEnabled()) {
1440                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1441                   + generalBloomFilter.getClass().getSimpleName()
1442                   + " metadata for " + reader.getName());
1443               }
1444             }
1445           }
1446         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1447           if (this.deleteFamilyBloomFilter != null)
1448             return; // Bloom has been loaded
1449 
1450           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1451           if (bloomMeta != null) {
1452             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1453                 bloomMeta, reader);
1454             LOG.info("Loaded Delete Family Bloom ("
1455                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1456                 + ") metadata for " + reader.getName());
1457           }
1458         } else {
1459           throw new RuntimeException("Block Type: " + blockType.toString()
1460               + "is not supported for Bloom filter");
1461         }
1462       } catch (IOException e) {
1463         LOG.error("Error reading bloom filter meta for " + blockType
1464             + " -- proceeding without", e);
1465         setBloomFilterFaulty(blockType);
1466       } catch (IllegalArgumentException e) {
1467         LOG.error("Bad bloom filter meta " + blockType
1468             + " -- proceeding without", e);
1469         setBloomFilterFaulty(blockType);
1470       }
1471     }
1472 
1473     private void setBloomFilterFaulty(BlockType blockType) {
1474       if (blockType == BlockType.GENERAL_BLOOM_META) {
1475         setGeneralBloomFilterFaulty();
1476       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1477         setDeleteFamilyBloomFilterFaulty();
1478       }
1479     }
1480 
1481     /**
1482      * The number of Bloom filter entries in this store file, or an estimate
1483      * thereof, if the Bloom filter is not loaded. This always returns an upper
1484      * bound of the number of Bloom filter entries.
1485      *
1486      * @return an estimate of the number of Bloom filter entries in this file
1487      */
1488     public long getFilterEntries() {
1489       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1490           : reader.getEntries();
1491     }
1492 
1493     public void setGeneralBloomFilterFaulty() {
1494       generalBloomFilter = null;
1495     }
1496 
1497     public void setDeleteFamilyBloomFilterFaulty() {
1498       this.deleteFamilyBloomFilter = null;
1499     }
1500 
1501     public byte[] getLastKey() {
1502       return reader.getLastKey();
1503     }
1504 
1505     public byte[] getLastRowKey() {
1506       return reader.getLastRowKey();
1507     }
1508 
1509     public byte[] midkey() throws IOException {
1510       return reader.midkey();
1511     }
1512 
1513     public long length() {
1514       return reader.length();
1515     }
1516 
1517     public long getTotalUncompressedBytes() {
1518       return reader.getTrailer().getTotalUncompressedBytes();
1519     }
1520 
1521     public long getEntries() {
1522       return reader.getEntries();
1523     }
1524 
1525     public long getDeleteFamilyCnt() {
1526       return deleteFamilyCnt;
1527     }
1528 
1529     public byte[] getFirstKey() {
1530       return reader.getFirstKey();
1531     }
1532 
1533     public long indexSize() {
1534       return reader.indexSize();
1535     }
1536 
1537     public BloomType getBloomFilterType() {
1538       return this.bloomFilterType;
1539     }
1540 
1541     public long getSequenceID() {
1542       return sequenceID;
1543     }
1544 
1545     public void setSequenceID(long sequenceID) {
1546       this.sequenceID = sequenceID;
1547     }
1548 
1549     public void setBulkLoaded(boolean bulkLoadResult) {
1550       this.bulkLoadResult = bulkLoadResult;
1551     }
1552 
1553     public boolean isBulkLoaded() {
1554       return this.bulkLoadResult;
1555     }
1556 
1557     BloomFilter getGeneralBloomFilter() {
1558       return generalBloomFilter;
1559     }
1560 
1561     long getUncompressedDataIndexSize() {
1562       return reader.getTrailer().getUncompressedDataIndexSize();
1563     }
1564 
1565     public long getTotalBloomSize() {
1566       if (generalBloomFilter == null)
1567         return 0;
1568       return generalBloomFilter.getByteSize();
1569     }
1570 
1571     public int getHFileVersion() {
1572       return reader.getTrailer().getMajorVersion();
1573     }
1574 
1575     public int getHFileMinorVersion() {
1576       return reader.getTrailer().getMinorVersion();
1577     }
1578 
1579     public HFile.Reader getHFileReader() {
1580       return reader;
1581     }
1582 
1583     void disableBloomFilterForTesting() {
1584       generalBloomFilter = null;
1585       this.deleteFamilyBloomFilter = null;
1586     }
1587 
1588     public long getMaxTimestamp() {
1589       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1590     }
1591   }
1592 
1593   /**
1594    * Useful comparators for comparing StoreFiles.
1595    */
1596   public abstract static class Comparators {
1597     /**
1598      * Comparator that compares based on the Sequence Ids of the
1599      * the StoreFiles. Bulk loads that did not request a seq ID
1600      * are given a seq id of -1; thus, they are placed before all non-
1601      * bulk loads, and bulk loads with sequence Id. Among these files,
1602      * the size is used to determine the ordering, then bulkLoadTime.
1603      * If there are ties, the path name is used as a tie-breaker.
1604      */
1605     public static final Comparator<StoreFile> SEQ_ID =
1606       Ordering.compound(ImmutableList.of(
1607           Ordering.natural().onResultOf(new GetSeqId()),
1608           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1609           Ordering.natural().onResultOf(new GetBulkTime()),
1610           Ordering.natural().onResultOf(new GetPathName())
1611       ));
1612 
1613     private static class GetSeqId implements Function<StoreFile, Long> {
1614       @Override
1615       public Long apply(StoreFile sf) {
1616         return sf.getMaxSequenceId();
1617       }
1618     }
1619 
1620     private static class GetFileSize implements Function<StoreFile, Long> {
1621       @Override
1622       public Long apply(StoreFile sf) {
1623         return sf.getReader().length();
1624       }
1625     }
1626 
1627     private static class GetBulkTime implements Function<StoreFile, Long> {
1628       @Override
1629       public Long apply(StoreFile sf) {
1630         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1631         return sf.getBulkLoadTimestamp();
1632       }
1633     }
1634 
1635     private static class GetPathName implements Function<StoreFile, String> {
1636       @Override
1637       public String apply(StoreFile sf) {
1638         return sf.getPath().getName();
1639       }
1640     }
1641   }
1642 }
1643