1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/block_based/block_based_table_builder.h"
11 
12 #include <assert.h>
13 #include <stdio.h>
14 #include <list>
15 #include <map>
16 #include <memory>
17 #include <string>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "db/dbformat.h"
22 #include "index_builder.h"
23 
24 #include "rocksdb/cache.h"
25 #include "rocksdb/comparator.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/flush_block_policy.h"
28 #include "rocksdb/merge_operator.h"
29 #include "rocksdb/table.h"
30 
31 #include "table/block_based/block.h"
32 #include "table/block_based/block_based_filter_block.h"
33 #include "table/block_based/block_based_table_factory.h"
34 #include "table/block_based/block_based_table_reader.h"
35 #include "table/block_based/block_builder.h"
36 #include "table/block_based/filter_block.h"
37 #include "table/block_based/filter_policy_internal.h"
38 #include "table/block_based/full_filter_block.h"
39 #include "table/block_based/partitioned_filter_block.h"
40 #include "table/format.h"
41 #include "table/table_builder.h"
42 
43 #include "memory/memory_allocator.h"
44 #include "util/coding.h"
45 #include "util/compression.h"
46 #include "util/crc32c.h"
47 #include "util/stop_watch.h"
48 #include "util/string_util.h"
49 #include "util/xxhash.h"
50 
51 namespace ROCKSDB_NAMESPACE {
52 
53 extern const std::string kHashIndexPrefixesBlock;
54 extern const std::string kHashIndexPrefixesMetadataBlock;
55 
56 typedef BlockBasedTableOptions::IndexType IndexType;
57 
58 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
59 namespace {
60 
61 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)62 FilterBlockBuilder* CreateFilterBlockBuilder(
63     const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
64     const FilterBuildingContext& context,
65     const bool use_delta_encoding_for_index_values,
66     PartitionedIndexBuilder* const p_index_builder) {
67   const BlockBasedTableOptions& table_opt = context.table_options;
68   if (table_opt.filter_policy == nullptr) return nullptr;
69 
70   FilterBitsBuilder* filter_bits_builder =
71       BloomFilterPolicy::GetBuilderFromContext(context);
72   if (filter_bits_builder == nullptr) {
73     return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
74                                             table_opt);
75   } else {
76     if (table_opt.partition_filters) {
77       assert(p_index_builder != nullptr);
78       // Since after partition cut request from filter builder it takes time
79       // until index builder actully cuts the partition, we take the lower bound
80       // as partition size.
81       assert(table_opt.block_size_deviation <= 100);
82       auto partition_size =
83           static_cast<uint32_t>(((table_opt.metadata_block_size *
84                                   (100 - table_opt.block_size_deviation)) +
85                                  99) /
86                                 100);
87       partition_size = std::max(partition_size, static_cast<uint32_t>(1));
88       return new PartitionedFilterBlockBuilder(
89           mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
90           filter_bits_builder, table_opt.index_block_restart_interval,
91           use_delta_encoding_for_index_values, p_index_builder, partition_size);
92     } else {
93       return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
94                                         table_opt.whole_key_filtering,
95                                         filter_bits_builder);
96     }
97   }
98 }
99 
GoodCompressionRatio(size_t compressed_size,size_t raw_size)100 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
101   // Check to see if compressed less than 12.5%
102   return compressed_size < raw_size - (raw_size / 8u);
103 }
104 
CompressBlockInternal(const Slice & raw,const CompressionInfo & compression_info,uint32_t format_version,std::string * compressed_output)105 bool CompressBlockInternal(const Slice& raw,
106                            const CompressionInfo& compression_info,
107                            uint32_t format_version,
108                            std::string* compressed_output) {
109   // Will return compressed block contents if (1) the compression method is
110   // supported in this platform and (2) the compression rate is "good enough".
111   switch (compression_info.type()) {
112     case kSnappyCompression:
113       return Snappy_Compress(compression_info, raw.data(), raw.size(),
114                              compressed_output);
115     case kZlibCompression:
116       return Zlib_Compress(
117           compression_info,
118           GetCompressFormatForVersion(kZlibCompression, format_version),
119           raw.data(), raw.size(), compressed_output);
120     case kBZip2Compression:
121       return BZip2_Compress(
122           compression_info,
123           GetCompressFormatForVersion(kBZip2Compression, format_version),
124           raw.data(), raw.size(), compressed_output);
125     case kLZ4Compression:
126       return LZ4_Compress(
127           compression_info,
128           GetCompressFormatForVersion(kLZ4Compression, format_version),
129           raw.data(), raw.size(), compressed_output);
130     case kLZ4HCCompression:
131       return LZ4HC_Compress(
132           compression_info,
133           GetCompressFormatForVersion(kLZ4HCCompression, format_version),
134           raw.data(), raw.size(), compressed_output);
135     case kXpressCompression:
136       return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
137     case kZSTD:
138     case kZSTDNotFinalCompression:
139       return ZSTD_Compress(compression_info, raw.data(), raw.size(),
140                            compressed_output);
141     default:
142       // Do not recognize this compression type
143       return false;
144   }
145 }
146 
147 }  // namespace
148 
149 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)150 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
151                     CompressionType* type, uint32_t format_version,
152                     bool do_sample, std::string* compressed_output,
153                     std::string* sampled_output_fast,
154                     std::string* sampled_output_slow) {
155   *type = info.type();
156 
157   if (info.type() == kNoCompression && !info.SampleForCompression()) {
158     return raw;
159   }
160 
161   // If requested, we sample one in every N block with a
162   // fast and slow compression algorithm and report the stats.
163   // The users can use these stats to decide if it is worthwhile
164   // enabling compression and they also get a hint about which
165   // compression algorithm wil be beneficial.
166   if (do_sample && info.SampleForCompression() &&
167       Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
168       sampled_output_fast && sampled_output_slow) {
169     // Sampling with a fast compression algorithm
170     if (LZ4_Supported() || Snappy_Supported()) {
171       CompressionType c =
172           LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
173       CompressionContext context(c);
174       CompressionOptions options;
175       CompressionInfo info_tmp(options, context,
176                                CompressionDict::GetEmptyDict(), c,
177                                info.SampleForCompression());
178 
179       CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
180     }
181 
182     // Sampling with a slow but high-compression algorithm
183     if (ZSTD_Supported() || Zlib_Supported()) {
184       CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
185       CompressionContext context(c);
186       CompressionOptions options;
187       CompressionInfo info_tmp(options, context,
188                                CompressionDict::GetEmptyDict(), c,
189                                info.SampleForCompression());
190       CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
191     }
192   }
193 
194   // Actually compress the data
195   if (*type != kNoCompression) {
196     if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
197         GoodCompressionRatio(compressed_output->size(), raw.size())) {
198       return *compressed_output;
199     }
200   }
201 
202   // Compression method is not supported, or not good
203   // compression ratio, so just fall back to uncompressed form.
204   *type = kNoCompression;
205   return raw;
206 }
207 
208 // kBlockBasedTableMagicNumber was picked by running
209 //    echo rocksdb.table.block_based | sha1sum
210 // and taking the leading 64 bits.
211 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
212 // .cc files
213 // for that reason we declare it extern in the header but to get the space
214 // allocated
215 // it must be not extern in one place.
216 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
217 // We also support reading and writing legacy block based table format (for
218 // backwards compatibility)
219 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
220 
221 // A collector that collects properties of interest to block-based table.
222 // For now this class looks heavy-weight since we only write one additional
223 // property.
224 // But in the foreseeable future, we will add more and more properties that are
225 // specific to block-based table.
226 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
227     : public IntTblPropCollector {
228  public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)229   explicit BlockBasedTablePropertiesCollector(
230       BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
231       bool prefix_filtering)
232       : index_type_(index_type),
233         whole_key_filtering_(whole_key_filtering),
234         prefix_filtering_(prefix_filtering) {}
235 
InternalAdd(const Slice &,const Slice &,uint64_t)236   Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
237                      uint64_t /*file_size*/) override {
238     // Intentionally left blank. Have no interest in collecting stats for
239     // individual key/value pairs.
240     return Status::OK();
241   }
242 
BlockAdd(uint64_t,uint64_t,uint64_t)243   virtual void BlockAdd(uint64_t /* blockRawBytes */,
244                         uint64_t /* blockCompressedBytesFast */,
245                         uint64_t /* blockCompressedBytesSlow */) override {
246     // Intentionally left blank. No interest in collecting stats for
247     // blocks.
248     return;
249   }
250 
Finish(UserCollectedProperties * properties)251   Status Finish(UserCollectedProperties* properties) override {
252     std::string val;
253     PutFixed32(&val, static_cast<uint32_t>(index_type_));
254     properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
255     properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
256                         whole_key_filtering_ ? kPropTrue : kPropFalse});
257     properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
258                         prefix_filtering_ ? kPropTrue : kPropFalse});
259     return Status::OK();
260   }
261 
262   // The name of the properties collector can be used for debugging purpose.
Name() const263   const char* Name() const override {
264     return "BlockBasedTablePropertiesCollector";
265   }
266 
GetReadableProperties() const267   UserCollectedProperties GetReadableProperties() const override {
268     // Intentionally left blank.
269     return UserCollectedProperties();
270   }
271 
272  private:
273   BlockBasedTableOptions::IndexType index_type_;
274   bool whole_key_filtering_;
275   bool prefix_filtering_;
276 };
277 
278 struct BlockBasedTableBuilder::Rep {
279   const ImmutableCFOptions ioptions;
280   const MutableCFOptions moptions;
281   const BlockBasedTableOptions table_options;
282   const InternalKeyComparator& internal_comparator;
283   WritableFileWriter* file;
284   uint64_t offset = 0;
285   Status status;
286   size_t alignment;
287   BlockBuilder data_block;
288   // Buffers uncompressed data blocks and keys to replay later. Needed when
289   // compression dictionary is enabled so we can finalize the dictionary before
290   // compressing any data blocks.
291   // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
292   // blocks as it's redundant, but it's easier to implement for now.
293   std::vector<std::pair<std::string, std::vector<std::string>>>
294       data_block_and_keys_buffers;
295   BlockBuilder range_del_block;
296 
297   InternalKeySliceTransform internal_prefix_transform;
298   std::unique_ptr<IndexBuilder> index_builder;
299   PartitionedIndexBuilder* p_index_builder_ = nullptr;
300 
301   std::string last_key;
302   CompressionType compression_type;
303   uint64_t sample_for_compression;
304   CompressionOptions compression_opts;
305   std::unique_ptr<CompressionDict> compression_dict;
306   CompressionContext compression_ctx;
307   std::unique_ptr<UncompressionContext> verify_ctx;
308   std::unique_ptr<UncompressionDict> verify_dict;
309 
310   size_t data_begin_offset = 0;
311 
312   TableProperties props;
313 
314   // States of the builder.
315   //
316   // - `kBuffered`: This is the initial state where zero or more data blocks are
317   //   accumulated uncompressed in-memory. From this state, call
318   //   `EnterUnbuffered()` to finalize the compression dictionary if enabled,
319   //   compress/write out any buffered blocks, and proceed to the `kUnbuffered`
320   //   state.
321   //
322   // - `kUnbuffered`: This is the state when compression dictionary is finalized
323   //   either because it wasn't enabled in the first place or it's been created
324   //   from sampling previously buffered data. In this state, blocks are simply
325   //   compressed/written out as they fill up. From this state, call `Finish()`
326   //   to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
327   //   the partially created file.
328   //
329   // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
330   //   called, so the table builder is no longer usable. We must be in this
331   //   state by the time the destructor runs.
332   enum class State {
333     kBuffered,
334     kUnbuffered,
335     kClosed,
336   };
337   State state;
338 
339   const bool use_delta_encoding_for_index_values;
340   std::unique_ptr<FilterBlockBuilder> filter_builder;
341   char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
342   size_t compressed_cache_key_prefix_size;
343 
344   BlockHandle pending_handle;  // Handle to add to index block
345 
346   std::string compressed_output;
347   std::unique_ptr<FlushBlockPolicy> flush_block_policy;
348   int level_at_creation;
349   uint32_t column_family_id;
350   const std::string& column_family_name;
351   uint64_t creation_time = 0;
352   uint64_t oldest_key_time = 0;
353   const uint64_t target_file_size;
354   uint64_t file_creation_time = 0;
355 
356   std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
357 
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep358   Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
359       const BlockBasedTableOptions& table_opt,
360       const InternalKeyComparator& icomparator,
361       const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
362           int_tbl_prop_collector_factories,
363       uint32_t _column_family_id, WritableFileWriter* f,
364       const CompressionType _compression_type,
365       const uint64_t _sample_for_compression,
366       const CompressionOptions& _compression_opts, const bool skip_filters,
367       const int _level_at_creation, const std::string& _column_family_name,
368       const uint64_t _creation_time, const uint64_t _oldest_key_time,
369       const uint64_t _target_file_size, const uint64_t _file_creation_time)
370       : ioptions(_ioptions),
371         moptions(_moptions),
372         table_options(table_opt),
373         internal_comparator(icomparator),
374         file(f),
375         alignment(table_options.block_align
376                       ? std::min(table_options.block_size, kDefaultPageSize)
377                       : 0),
378         data_block(table_options.block_restart_interval,
379                    table_options.use_delta_encoding,
380                    false /* use_value_delta_encoding */,
381                    icomparator.user_comparator()
382                            ->CanKeysWithDifferentByteContentsBeEqual()
383                        ? BlockBasedTableOptions::kDataBlockBinarySearch
384                        : table_options.data_block_index_type,
385                    table_options.data_block_hash_table_util_ratio),
386         range_del_block(1 /* block_restart_interval */),
387         internal_prefix_transform(_moptions.prefix_extractor.get()),
388         compression_type(_compression_type),
389         sample_for_compression(_sample_for_compression),
390         compression_opts(_compression_opts),
391         compression_dict(),
392         compression_ctx(_compression_type),
393         verify_dict(),
394         state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
395                                                      : State::kUnbuffered),
396         use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
397                                             !table_opt.block_align),
398         compressed_cache_key_prefix_size(0),
399         flush_block_policy(
400             table_options.flush_block_policy_factory->NewFlushBlockPolicy(
401                 table_options, data_block)),
402         level_at_creation(_level_at_creation),
403         column_family_id(_column_family_id),
404         column_family_name(_column_family_name),
405         creation_time(_creation_time),
406         oldest_key_time(_oldest_key_time),
407         target_file_size(_target_file_size),
408         file_creation_time(_file_creation_time) {
409     if (table_options.index_type ==
410         BlockBasedTableOptions::kTwoLevelIndexSearch) {
411       p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
412           &internal_comparator, use_delta_encoding_for_index_values,
413           table_options);
414       index_builder.reset(p_index_builder_);
415     } else {
416       index_builder.reset(IndexBuilder::CreateIndexBuilder(
417           table_options.index_type, &internal_comparator,
418           &this->internal_prefix_transform, use_delta_encoding_for_index_values,
419           table_options));
420     }
421     if (skip_filters) {
422       filter_builder = nullptr;
423     } else {
424       FilterBuildingContext context(table_options);
425       context.column_family_name = column_family_name;
426       context.compaction_style = ioptions.compaction_style;
427       context.level_at_creation = level_at_creation;
428       context.info_log = ioptions.info_log;
429       filter_builder.reset(CreateFilterBlockBuilder(
430           ioptions, moptions, context, use_delta_encoding_for_index_values,
431           p_index_builder_));
432     }
433 
434     for (auto& collector_factories : *int_tbl_prop_collector_factories) {
435       table_properties_collectors.emplace_back(
436           collector_factories->CreateIntTblPropCollector(column_family_id));
437     }
438     table_properties_collectors.emplace_back(
439         new BlockBasedTablePropertiesCollector(
440             table_options.index_type, table_options.whole_key_filtering,
441             _moptions.prefix_extractor != nullptr));
442     if (table_options.verify_compression) {
443       verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
444                                                 compression_type));
445     }
446   }
447 
448   Rep(const Rep&) = delete;
449   Rep& operator=(const Rep&) = delete;
450 
~RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep451   ~Rep() {}
452 };
453 
BlockBasedTableBuilder(const ImmutableCFOptions & ioptions,const MutableCFOptions & moptions,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,const std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories,uint32_t column_family_id,WritableFileWriter * file,const CompressionType compression_type,const uint64_t sample_for_compression,const CompressionOptions & compression_opts,const bool skip_filters,const std::string & column_family_name,const int level_at_creation,const uint64_t creation_time,const uint64_t oldest_key_time,const uint64_t target_file_size,const uint64_t file_creation_time)454 BlockBasedTableBuilder::BlockBasedTableBuilder(
455     const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
456     const BlockBasedTableOptions& table_options,
457     const InternalKeyComparator& internal_comparator,
458     const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
459         int_tbl_prop_collector_factories,
460     uint32_t column_family_id, WritableFileWriter* file,
461     const CompressionType compression_type,
462     const uint64_t sample_for_compression,
463     const CompressionOptions& compression_opts, const bool skip_filters,
464     const std::string& column_family_name, const int level_at_creation,
465     const uint64_t creation_time, const uint64_t oldest_key_time,
466     const uint64_t target_file_size, const uint64_t file_creation_time) {
467   BlockBasedTableOptions sanitized_table_options(table_options);
468   if (sanitized_table_options.format_version == 0 &&
469       sanitized_table_options.checksum != kCRC32c) {
470     ROCKS_LOG_WARN(
471         ioptions.info_log,
472         "Silently converting format_version to 1 because checksum is "
473         "non-default");
474     // silently convert format_version to 1 to keep consistent with current
475     // behavior
476     sanitized_table_options.format_version = 1;
477   }
478 
479   rep_ = new Rep(ioptions, moptions, sanitized_table_options,
480                  internal_comparator, int_tbl_prop_collector_factories,
481                  column_family_id, file, compression_type,
482                  sample_for_compression, compression_opts, skip_filters,
483                  level_at_creation, column_family_name, creation_time,
484                  oldest_key_time, target_file_size, file_creation_time);
485 
486   if (rep_->filter_builder != nullptr) {
487     rep_->filter_builder->StartBlock(0);
488   }
489   if (table_options.block_cache_compressed.get() != nullptr) {
490     BlockBasedTable::GenerateCachePrefix(
491         table_options.block_cache_compressed.get(), file->writable_file(),
492         &rep_->compressed_cache_key_prefix[0],
493         &rep_->compressed_cache_key_prefix_size);
494   }
495 }
496 
~BlockBasedTableBuilder()497 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
498   // Catch errors where caller forgot to call Finish()
499   assert(rep_->state == Rep::State::kClosed);
500   delete rep_;
501 }
502 
Add(const Slice & key,const Slice & value)503 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
504   Rep* r = rep_;
505   assert(rep_->state != Rep::State::kClosed);
506   if (!ok()) return;
507   ValueType value_type = ExtractValueType(key);
508   if (IsValueType(value_type)) {
509 #ifndef NDEBUG
510     if (r->props.num_entries > r->props.num_range_deletions) {
511       assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
512     }
513 #endif  // NDEBUG
514 
515     auto should_flush = r->flush_block_policy->Update(key, value);
516     if (should_flush) {
517       assert(!r->data_block.empty());
518       Flush();
519 
520       if (r->state == Rep::State::kBuffered &&
521           r->data_begin_offset > r->target_file_size) {
522         EnterUnbuffered();
523       }
524 
525       // Add item to index block.
526       // We do not emit the index entry for a block until we have seen the
527       // first key for the next data block.  This allows us to use shorter
528       // keys in the index block.  For example, consider a block boundary
529       // between the keys "the quick brown fox" and "the who".  We can use
530       // "the r" as the key for the index block entry since it is >= all
531       // entries in the first block and < all entries in subsequent
532       // blocks.
533       if (ok() && r->state == Rep::State::kUnbuffered) {
534         r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
535       }
536     }
537 
538     // Note: PartitionedFilterBlockBuilder requires key being added to filter
539     // builder after being added to index builder.
540     if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
541       size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
542       r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
543     }
544 
545     r->last_key.assign(key.data(), key.size());
546     r->data_block.Add(key, value);
547     if (r->state == Rep::State::kBuffered) {
548       // Buffer keys to be replayed during `Finish()` once compression
549       // dictionary has been finalized.
550       if (r->data_block_and_keys_buffers.empty() || should_flush) {
551         r->data_block_and_keys_buffers.emplace_back();
552       }
553       r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
554     } else {
555       r->index_builder->OnKeyAdded(key);
556     }
557     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
558                                       r->table_properties_collectors,
559                                       r->ioptions.info_log);
560 
561   } else if (value_type == kTypeRangeDeletion) {
562     r->range_del_block.Add(key, value);
563     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
564                                       r->table_properties_collectors,
565                                       r->ioptions.info_log);
566   } else {
567     assert(false);
568   }
569 
570   r->props.num_entries++;
571   r->props.raw_key_size += key.size();
572   r->props.raw_value_size += value.size();
573   if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
574     r->props.num_deletions++;
575   } else if (value_type == kTypeRangeDeletion) {
576     r->props.num_deletions++;
577     r->props.num_range_deletions++;
578   } else if (value_type == kTypeMerge) {
579     r->props.num_merge_operands++;
580   }
581 }
582 
Flush()583 void BlockBasedTableBuilder::Flush() {
584   Rep* r = rep_;
585   assert(rep_->state != Rep::State::kClosed);
586   if (!ok()) return;
587   if (r->data_block.empty()) return;
588   WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
589 }
590 
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)591 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
592                                         BlockHandle* handle,
593                                         bool is_data_block) {
594   WriteBlock(block->Finish(), handle, is_data_block);
595   block->Reset();
596 }
597 
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)598 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
599                                         BlockHandle* handle,
600                                         bool is_data_block) {
601   // File format contains a sequence of blocks where each block has:
602   //    block_data: uint8[n]
603   //    type: uint8
604   //    crc: uint32
605   assert(ok());
606   Rep* r = rep_;
607 
608   auto type = r->compression_type;
609   uint64_t sample_for_compression = r->sample_for_compression;
610   Slice block_contents;
611   bool abort_compression = false;
612 
613   StopWatchNano timer(
614       r->ioptions.env,
615       ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
616 
617   if (r->state == Rep::State::kBuffered) {
618     assert(is_data_block);
619     assert(!r->data_block_and_keys_buffers.empty());
620     r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
621     r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
622     return;
623   }
624 
625   if (raw_block_contents.size() < kCompressionSizeLimit) {
626     const CompressionDict* compression_dict;
627     if (!is_data_block || r->compression_dict == nullptr) {
628       compression_dict = &CompressionDict::GetEmptyDict();
629     } else {
630       compression_dict = r->compression_dict.get();
631     }
632     assert(compression_dict != nullptr);
633     CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
634                                      *compression_dict, type,
635                                      sample_for_compression);
636 
637     std::string sampled_output_fast;
638     std::string sampled_output_slow;
639     block_contents = CompressBlock(
640         raw_block_contents, compression_info, &type,
641         r->table_options.format_version, is_data_block /* do_sample */,
642         &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
643 
644     // notify collectors on block add
645     NotifyCollectTableCollectorsOnBlockAdd(
646         r->table_properties_collectors, raw_block_contents.size(),
647         sampled_output_fast.size(), sampled_output_slow.size());
648 
649     // Some of the compression algorithms are known to be unreliable. If
650     // the verify_compression flag is set then try to de-compress the
651     // compressed data and compare to the input.
652     if (type != kNoCompression && r->table_options.verify_compression) {
653       // Retrieve the uncompressed contents into a new buffer
654       const UncompressionDict* verify_dict;
655       if (!is_data_block || r->verify_dict == nullptr) {
656         verify_dict = &UncompressionDict::GetEmptyDict();
657       } else {
658         verify_dict = r->verify_dict.get();
659       }
660       assert(verify_dict != nullptr);
661       BlockContents contents;
662       UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
663                                            r->compression_type);
664       Status stat = UncompressBlockContentsForCompressionType(
665           uncompression_info, block_contents.data(), block_contents.size(),
666           &contents, r->table_options.format_version, r->ioptions);
667 
668       if (stat.ok()) {
669         bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
670         if (!compressed_ok) {
671           // The result of the compression was invalid. abort.
672           abort_compression = true;
673           ROCKS_LOG_ERROR(r->ioptions.info_log,
674                           "Decompressed block did not match raw block");
675           r->status =
676               Status::Corruption("Decompressed block did not match raw block");
677         }
678       } else {
679         // Decompression reported an error. abort.
680         r->status = Status::Corruption("Could not decompress");
681         abort_compression = true;
682       }
683     }
684   } else {
685     // Block is too big to be compressed.
686     abort_compression = true;
687   }
688 
689   // Abort compression if the block is too big, or did not pass
690   // verification.
691   if (abort_compression) {
692     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
693     type = kNoCompression;
694     block_contents = raw_block_contents;
695   } else if (type != kNoCompression) {
696     if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
697       RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
698                             timer.ElapsedNanos());
699     }
700     RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
701                       raw_block_contents.size());
702     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
703   } else if (type != r->compression_type) {
704     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
705   }
706 
707   WriteRawBlock(block_contents, type, handle, is_data_block);
708   r->compressed_output.clear();
709   if (is_data_block) {
710     if (r->filter_builder != nullptr) {
711       r->filter_builder->StartBlock(r->offset);
712     }
713     r->props.data_size = r->offset;
714     ++r->props.num_data_blocks;
715   }
716 }
717 
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)718 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
719                                            CompressionType type,
720                                            BlockHandle* handle,
721                                            bool is_data_block) {
722   Rep* r = rep_;
723   StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
724   handle->set_offset(r->offset);
725   handle->set_size(block_contents.size());
726   assert(r->status.ok());
727   r->status = r->file->Append(block_contents);
728   if (r->status.ok()) {
729     char trailer[kBlockTrailerSize];
730     trailer[0] = type;
731     char* trailer_without_type = trailer + 1;
732     switch (r->table_options.checksum) {
733       case kNoChecksum:
734         EncodeFixed32(trailer_without_type, 0);
735         break;
736       case kCRC32c: {
737         auto crc = crc32c::Value(block_contents.data(), block_contents.size());
738         crc = crc32c::Extend(crc, trailer, 1);  // Extend to cover block type
739         EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
740         break;
741       }
742       case kxxHash: {
743         XXH32_state_t* const state = XXH32_createState();
744         XXH32_reset(state, 0);
745         XXH32_update(state, block_contents.data(),
746                      static_cast<uint32_t>(block_contents.size()));
747         XXH32_update(state, trailer, 1);  // Extend  to cover block type
748         EncodeFixed32(trailer_without_type, XXH32_digest(state));
749         XXH32_freeState(state);
750         break;
751       }
752       case kxxHash64: {
753         XXH64_state_t* const state = XXH64_createState();
754         XXH64_reset(state, 0);
755         XXH64_update(state, block_contents.data(),
756                      static_cast<uint32_t>(block_contents.size()));
757         XXH64_update(state, trailer, 1);  // Extend  to cover block type
758         EncodeFixed32(
759             trailer_without_type,
760             static_cast<uint32_t>(XXH64_digest(state) &  // lower 32 bits
761                                   uint64_t{0xffffffff}));
762         XXH64_freeState(state);
763         break;
764       }
765     }
766 
767     assert(r->status.ok());
768     TEST_SYNC_POINT_CALLBACK(
769         "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
770         static_cast<char*>(trailer));
771     r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
772     if (r->status.ok()) {
773       r->status = InsertBlockInCache(block_contents, type, handle);
774     }
775     if (r->status.ok()) {
776       r->offset += block_contents.size() + kBlockTrailerSize;
777       if (r->table_options.block_align && is_data_block) {
778         size_t pad_bytes =
779             (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
780                              (r->alignment - 1))) &
781             (r->alignment - 1);
782         r->status = r->file->Pad(pad_bytes);
783         if (r->status.ok()) {
784           r->offset += pad_bytes;
785         }
786       }
787     }
788   }
789 }
790 
status() const791 Status BlockBasedTableBuilder::status() const { return rep_->status; }
792 
DeleteCachedBlockContents(const Slice &,void * value)793 static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
794   BlockContents* bc = reinterpret_cast<BlockContents*>(value);
795   delete bc;
796 }
797 
798 //
799 // Make a copy of the block contents and insert into compressed block cache
800 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)801 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
802                                                   const CompressionType type,
803                                                   const BlockHandle* handle) {
804   Rep* r = rep_;
805   Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
806 
807   if (type != kNoCompression && block_cache_compressed != nullptr) {
808     size_t size = block_contents.size();
809 
810     auto ubuf =
811         AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
812     memcpy(ubuf.get(), block_contents.data(), size);
813     ubuf[size] = type;
814 
815     BlockContents* block_contents_to_cache =
816         new BlockContents(std::move(ubuf), size);
817 #ifndef NDEBUG
818     block_contents_to_cache->is_raw_block = true;
819 #endif  // NDEBUG
820 
821     // make cache key by appending the file offset to the cache prefix id
822     char* end = EncodeVarint64(
823         r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
824         handle->offset());
825     Slice key(r->compressed_cache_key_prefix,
826               static_cast<size_t>(end - r->compressed_cache_key_prefix));
827 
828     // Insert into compressed block cache.
829     block_cache_compressed->Insert(
830         key, block_contents_to_cache,
831         block_contents_to_cache->ApproximateMemoryUsage(),
832         &DeleteCachedBlockContents);
833 
834     // Invalidate OS cache.
835     r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
836   }
837   return Status::OK();
838 }
839 
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)840 void BlockBasedTableBuilder::WriteFilterBlock(
841     MetaIndexBuilder* meta_index_builder) {
842   BlockHandle filter_block_handle;
843   bool empty_filter_block = (rep_->filter_builder == nullptr ||
844                              rep_->filter_builder->NumAdded() == 0);
845   if (ok() && !empty_filter_block) {
846     Status s = Status::Incomplete();
847     while (ok() && s.IsIncomplete()) {
848       Slice filter_content =
849           rep_->filter_builder->Finish(filter_block_handle, &s);
850       assert(s.ok() || s.IsIncomplete());
851       rep_->props.filter_size += filter_content.size();
852       WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
853     }
854   }
855   if (ok() && !empty_filter_block) {
856     // Add mapping from "<filter_block_prefix>.Name" to location
857     // of filter data.
858     std::string key;
859     if (rep_->filter_builder->IsBlockBased()) {
860       key = BlockBasedTable::kFilterBlockPrefix;
861     } else {
862       key = rep_->table_options.partition_filters
863                 ? BlockBasedTable::kPartitionedFilterBlockPrefix
864                 : BlockBasedTable::kFullFilterBlockPrefix;
865     }
866     key.append(rep_->table_options.filter_policy->Name());
867     meta_index_builder->Add(key, filter_block_handle);
868   }
869 }
870 
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)871 void BlockBasedTableBuilder::WriteIndexBlock(
872     MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
873   IndexBuilder::IndexBlocks index_blocks;
874   auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
875   if (index_builder_status.IsIncomplete()) {
876     // We we have more than one index partition then meta_blocks are not
877     // supported for the index. Currently meta_blocks are used only by
878     // HashIndexBuilder which is not multi-partition.
879     assert(index_blocks.meta_blocks.empty());
880   } else if (ok() && !index_builder_status.ok()) {
881     rep_->status = index_builder_status;
882   }
883   if (ok()) {
884     for (const auto& item : index_blocks.meta_blocks) {
885       BlockHandle block_handle;
886       WriteBlock(item.second, &block_handle, false /* is_data_block */);
887       if (!ok()) {
888         break;
889       }
890       meta_index_builder->Add(item.first, block_handle);
891     }
892   }
893   if (ok()) {
894     if (rep_->table_options.enable_index_compression) {
895       WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
896     } else {
897       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
898                     index_block_handle);
899     }
900   }
901   // If there are more index partitions, finish them and write them out
902   Status s = index_builder_status;
903   while (ok() && s.IsIncomplete()) {
904     s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
905     if (!s.ok() && !s.IsIncomplete()) {
906       rep_->status = s;
907       return;
908     }
909     if (rep_->table_options.enable_index_compression) {
910       WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
911     } else {
912       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
913                     index_block_handle);
914     }
915     // The last index_block_handle will be for the partition index block
916   }
917 }
918 
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)919 void BlockBasedTableBuilder::WritePropertiesBlock(
920     MetaIndexBuilder* meta_index_builder) {
921   BlockHandle properties_block_handle;
922   if (ok()) {
923     PropertyBlockBuilder property_block_builder;
924     rep_->props.column_family_id = rep_->column_family_id;
925     rep_->props.column_family_name = rep_->column_family_name;
926     rep_->props.filter_policy_name =
927         rep_->table_options.filter_policy != nullptr
928             ? rep_->table_options.filter_policy->Name()
929             : "";
930     rep_->props.index_size =
931         rep_->index_builder->IndexSize() + kBlockTrailerSize;
932     rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
933                                       ? rep_->ioptions.user_comparator->Name()
934                                       : "nullptr";
935     rep_->props.merge_operator_name =
936         rep_->ioptions.merge_operator != nullptr
937             ? rep_->ioptions.merge_operator->Name()
938             : "nullptr";
939     rep_->props.compression_name =
940         CompressionTypeToString(rep_->compression_type);
941     rep_->props.compression_options =
942         CompressionOptionsToString(rep_->compression_opts);
943     rep_->props.prefix_extractor_name =
944         rep_->moptions.prefix_extractor != nullptr
945             ? rep_->moptions.prefix_extractor->Name()
946             : "nullptr";
947 
948     std::string property_collectors_names = "[";
949     for (size_t i = 0;
950          i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
951       if (i != 0) {
952         property_collectors_names += ",";
953       }
954       property_collectors_names +=
955           rep_->ioptions.table_properties_collector_factories[i]->Name();
956     }
957     property_collectors_names += "]";
958     rep_->props.property_collectors_names = property_collectors_names;
959     if (rep_->table_options.index_type ==
960         BlockBasedTableOptions::kTwoLevelIndexSearch) {
961       assert(rep_->p_index_builder_ != nullptr);
962       rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
963       rep_->props.top_level_index_size =
964           rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
965     }
966     rep_->props.index_key_is_user_key =
967         !rep_->index_builder->seperator_is_key_plus_seq();
968     rep_->props.index_value_is_delta_encoded =
969         rep_->use_delta_encoding_for_index_values;
970     rep_->props.creation_time = rep_->creation_time;
971     rep_->props.oldest_key_time = rep_->oldest_key_time;
972     rep_->props.file_creation_time = rep_->file_creation_time;
973 
974     // Add basic properties
975     property_block_builder.AddTableProperty(rep_->props);
976 
977     // Add use collected properties
978     NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
979                                          rep_->ioptions.info_log,
980                                          &property_block_builder);
981 
982     WriteRawBlock(property_block_builder.Finish(), kNoCompression,
983                   &properties_block_handle);
984   }
985   if (ok()) {
986 #ifndef NDEBUG
987     {
988       uint64_t props_block_offset = properties_block_handle.offset();
989       uint64_t props_block_size = properties_block_handle.size();
990       TEST_SYNC_POINT_CALLBACK(
991           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
992           &props_block_offset);
993       TEST_SYNC_POINT_CALLBACK(
994           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
995           &props_block_size);
996     }
997 #endif  // !NDEBUG
998     meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
999   }
1000 }
1001 
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1002 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1003     MetaIndexBuilder* meta_index_builder) {
1004   if (rep_->compression_dict != nullptr &&
1005       rep_->compression_dict->GetRawDict().size()) {
1006     BlockHandle compression_dict_block_handle;
1007     if (ok()) {
1008       WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1009                     &compression_dict_block_handle);
1010 #ifndef NDEBUG
1011       Slice compression_dict = rep_->compression_dict->GetRawDict();
1012       TEST_SYNC_POINT_CALLBACK(
1013           "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1014           &compression_dict);
1015 #endif  // NDEBUG
1016     }
1017     if (ok()) {
1018       meta_index_builder->Add(kCompressionDictBlock,
1019                               compression_dict_block_handle);
1020     }
1021   }
1022 }
1023 
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1024 void BlockBasedTableBuilder::WriteRangeDelBlock(
1025     MetaIndexBuilder* meta_index_builder) {
1026   if (ok() && !rep_->range_del_block.empty()) {
1027     BlockHandle range_del_block_handle;
1028     WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1029                   &range_del_block_handle);
1030     meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1031   }
1032 }
1033 
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1034 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1035                                          BlockHandle& index_block_handle) {
1036   Rep* r = rep_;
1037   // No need to write out new footer if we're using default checksum.
1038   // We're writing legacy magic number because we want old versions of RocksDB
1039   // be able to read files generated with new release (just in case if
1040   // somebody wants to roll back after an upgrade)
1041   // TODO(icanadi) at some point in the future, when we're absolutely sure
1042   // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1043   // number and always write new table files with new magic number
1044   bool legacy = (r->table_options.format_version == 0);
1045   // this is guaranteed by BlockBasedTableBuilder's constructor
1046   assert(r->table_options.checksum == kCRC32c ||
1047          r->table_options.format_version != 0);
1048   Footer footer(
1049       legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1050       r->table_options.format_version);
1051   footer.set_metaindex_handle(metaindex_block_handle);
1052   footer.set_index_handle(index_block_handle);
1053   footer.set_checksum(r->table_options.checksum);
1054   std::string footer_encoding;
1055   footer.EncodeTo(&footer_encoding);
1056   assert(r->status.ok());
1057   r->status = r->file->Append(footer_encoding);
1058   if (r->status.ok()) {
1059     r->offset += footer_encoding.size();
1060   }
1061 }
1062 
EnterUnbuffered()1063 void BlockBasedTableBuilder::EnterUnbuffered() {
1064   Rep* r = rep_;
1065   assert(r->state == Rep::State::kBuffered);
1066   r->state = Rep::State::kUnbuffered;
1067   const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1068                                   ? r->compression_opts.zstd_max_train_bytes
1069                                   : r->compression_opts.max_dict_bytes;
1070   Random64 generator{r->creation_time};
1071   std::string compression_dict_samples;
1072   std::vector<size_t> compression_dict_sample_lens;
1073   if (!r->data_block_and_keys_buffers.empty()) {
1074     while (compression_dict_samples.size() < kSampleBytes) {
1075       size_t rand_idx =
1076           static_cast<size_t>(
1077               generator.Uniform(r->data_block_and_keys_buffers.size()));
1078       size_t copy_len =
1079           std::min(kSampleBytes - compression_dict_samples.size(),
1080                    r->data_block_and_keys_buffers[rand_idx].first.size());
1081       compression_dict_samples.append(
1082           r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1083       compression_dict_sample_lens.emplace_back(copy_len);
1084     }
1085   }
1086 
1087   // final data block flushed, now we can generate dictionary from the samples.
1088   // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1089   std::string dict;
1090   if (r->compression_opts.zstd_max_train_bytes > 0) {
1091     dict = ZSTD_TrainDictionary(compression_dict_samples,
1092                                 compression_dict_sample_lens,
1093                                 r->compression_opts.max_dict_bytes);
1094   } else {
1095     dict = std::move(compression_dict_samples);
1096   }
1097   r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1098                                                 r->compression_opts.level));
1099   r->verify_dict.reset(new UncompressionDict(
1100       dict, r->compression_type == kZSTD ||
1101                 r->compression_type == kZSTDNotFinalCompression));
1102 
1103   for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
1104     const auto& data_block = r->data_block_and_keys_buffers[i].first;
1105     auto& keys = r->data_block_and_keys_buffers[i].second;
1106     assert(!data_block.empty());
1107     assert(!keys.empty());
1108 
1109     for (const auto& key : keys) {
1110       if (r->filter_builder != nullptr) {
1111         size_t ts_sz =
1112             r->internal_comparator.user_comparator()->timestamp_size();
1113         r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1114       }
1115       r->index_builder->OnKeyAdded(key);
1116     }
1117     WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
1118     if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1119       Slice first_key_in_next_block =
1120           r->data_block_and_keys_buffers[i + 1].second.front();
1121       Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1122       r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
1123                                       r->pending_handle);
1124     }
1125   }
1126   r->data_block_and_keys_buffers.clear();
1127 }
1128 
Finish()1129 Status BlockBasedTableBuilder::Finish() {
1130   Rep* r = rep_;
1131   assert(r->state != Rep::State::kClosed);
1132   bool empty_data_block = r->data_block.empty();
1133   Flush();
1134   if (r->state == Rep::State::kBuffered) {
1135     EnterUnbuffered();
1136   }
1137   // To make sure properties block is able to keep the accurate size of index
1138   // block, we will finish writing all index entries first.
1139   if (ok() && !empty_data_block) {
1140     r->index_builder->AddIndexEntry(
1141         &r->last_key, nullptr /* no next data block */, r->pending_handle);
1142   }
1143 
1144   // Write meta blocks, metaindex block and footer in the following order.
1145   //    1. [meta block: filter]
1146   //    2. [meta block: index]
1147   //    3. [meta block: compression dictionary]
1148   //    4. [meta block: range deletion tombstone]
1149   //    5. [meta block: properties]
1150   //    6. [metaindex block]
1151   //    7. Footer
1152   BlockHandle metaindex_block_handle, index_block_handle;
1153   MetaIndexBuilder meta_index_builder;
1154   WriteFilterBlock(&meta_index_builder);
1155   WriteIndexBlock(&meta_index_builder, &index_block_handle);
1156   WriteCompressionDictBlock(&meta_index_builder);
1157   WriteRangeDelBlock(&meta_index_builder);
1158   WritePropertiesBlock(&meta_index_builder);
1159   if (ok()) {
1160     // flush the meta index block
1161     WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1162                   &metaindex_block_handle);
1163   }
1164   if (ok()) {
1165     WriteFooter(metaindex_block_handle, index_block_handle);
1166   }
1167   if (r->file != nullptr) {
1168     file_checksum_ = r->file->GetFileChecksum();
1169   }
1170   r->state = Rep::State::kClosed;
1171   return r->status;
1172 }
1173 
Abandon()1174 void BlockBasedTableBuilder::Abandon() {
1175   assert(rep_->state != Rep::State::kClosed);
1176   rep_->state = Rep::State::kClosed;
1177 }
1178 
NumEntries() const1179 uint64_t BlockBasedTableBuilder::NumEntries() const {
1180   return rep_->props.num_entries;
1181 }
1182 
FileSize() const1183 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1184 
NeedCompact() const1185 bool BlockBasedTableBuilder::NeedCompact() const {
1186   for (const auto& collector : rep_->table_properties_collectors) {
1187     if (collector->NeedCompact()) {
1188       return true;
1189     }
1190   }
1191   return false;
1192 }
1193 
GetTableProperties() const1194 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1195   TableProperties ret = rep_->props;
1196   for (const auto& collector : rep_->table_properties_collectors) {
1197     for (const auto& prop : collector->GetReadableProperties()) {
1198       ret.readable_properties.insert(prop);
1199     }
1200     collector->Finish(&ret.user_collected_properties);
1201   }
1202   return ret;
1203 }
1204 
GetFileChecksumFuncName() const1205 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1206   if (rep_->file != nullptr) {
1207     return rep_->file->GetFileChecksumFuncName();
1208   } else {
1209     return kUnknownFileChecksumFuncName.c_str();
1210   }
1211 }
1212 
1213 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1214 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1215 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1216     "partitionedfilter.";
1217 }  // namespace ROCKSDB_NAMESPACE
1218