1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_based/block_based_table_builder.h"
11
12 #include <assert.h>
13 #include <stdio.h>
14 #include <list>
15 #include <map>
16 #include <memory>
17 #include <string>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "db/dbformat.h"
22 #include "index_builder.h"
23
24 #include "rocksdb/cache.h"
25 #include "rocksdb/comparator.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/flush_block_policy.h"
28 #include "rocksdb/merge_operator.h"
29 #include "rocksdb/table.h"
30
31 #include "table/block_based/block.h"
32 #include "table/block_based/block_based_filter_block.h"
33 #include "table/block_based/block_based_table_factory.h"
34 #include "table/block_based/block_based_table_reader.h"
35 #include "table/block_based/block_builder.h"
36 #include "table/block_based/filter_block.h"
37 #include "table/block_based/filter_policy_internal.h"
38 #include "table/block_based/full_filter_block.h"
39 #include "table/block_based/partitioned_filter_block.h"
40 #include "table/format.h"
41 #include "table/table_builder.h"
42
43 #include "memory/memory_allocator.h"
44 #include "util/coding.h"
45 #include "util/compression.h"
46 #include "util/crc32c.h"
47 #include "util/stop_watch.h"
48 #include "util/string_util.h"
49 #include "util/xxhash.h"
50
51 namespace ROCKSDB_NAMESPACE {
52
53 extern const std::string kHashIndexPrefixesBlock;
54 extern const std::string kHashIndexPrefixesMetadataBlock;
55
56 typedef BlockBasedTableOptions::IndexType IndexType;
57
58 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
59 namespace {
60
61 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)62 FilterBlockBuilder* CreateFilterBlockBuilder(
63 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
64 const FilterBuildingContext& context,
65 const bool use_delta_encoding_for_index_values,
66 PartitionedIndexBuilder* const p_index_builder) {
67 const BlockBasedTableOptions& table_opt = context.table_options;
68 if (table_opt.filter_policy == nullptr) return nullptr;
69
70 FilterBitsBuilder* filter_bits_builder =
71 BloomFilterPolicy::GetBuilderFromContext(context);
72 if (filter_bits_builder == nullptr) {
73 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
74 table_opt);
75 } else {
76 if (table_opt.partition_filters) {
77 assert(p_index_builder != nullptr);
78 // Since after partition cut request from filter builder it takes time
79 // until index builder actully cuts the partition, we take the lower bound
80 // as partition size.
81 assert(table_opt.block_size_deviation <= 100);
82 auto partition_size =
83 static_cast<uint32_t>(((table_opt.metadata_block_size *
84 (100 - table_opt.block_size_deviation)) +
85 99) /
86 100);
87 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
88 return new PartitionedFilterBlockBuilder(
89 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
90 filter_bits_builder, table_opt.index_block_restart_interval,
91 use_delta_encoding_for_index_values, p_index_builder, partition_size);
92 } else {
93 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
94 table_opt.whole_key_filtering,
95 filter_bits_builder);
96 }
97 }
98 }
99
GoodCompressionRatio(size_t compressed_size,size_t raw_size)100 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
101 // Check to see if compressed less than 12.5%
102 return compressed_size < raw_size - (raw_size / 8u);
103 }
104
CompressBlockInternal(const Slice & raw,const CompressionInfo & compression_info,uint32_t format_version,std::string * compressed_output)105 bool CompressBlockInternal(const Slice& raw,
106 const CompressionInfo& compression_info,
107 uint32_t format_version,
108 std::string* compressed_output) {
109 // Will return compressed block contents if (1) the compression method is
110 // supported in this platform and (2) the compression rate is "good enough".
111 switch (compression_info.type()) {
112 case kSnappyCompression:
113 return Snappy_Compress(compression_info, raw.data(), raw.size(),
114 compressed_output);
115 case kZlibCompression:
116 return Zlib_Compress(
117 compression_info,
118 GetCompressFormatForVersion(kZlibCompression, format_version),
119 raw.data(), raw.size(), compressed_output);
120 case kBZip2Compression:
121 return BZip2_Compress(
122 compression_info,
123 GetCompressFormatForVersion(kBZip2Compression, format_version),
124 raw.data(), raw.size(), compressed_output);
125 case kLZ4Compression:
126 return LZ4_Compress(
127 compression_info,
128 GetCompressFormatForVersion(kLZ4Compression, format_version),
129 raw.data(), raw.size(), compressed_output);
130 case kLZ4HCCompression:
131 return LZ4HC_Compress(
132 compression_info,
133 GetCompressFormatForVersion(kLZ4HCCompression, format_version),
134 raw.data(), raw.size(), compressed_output);
135 case kXpressCompression:
136 return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
137 case kZSTD:
138 case kZSTDNotFinalCompression:
139 return ZSTD_Compress(compression_info, raw.data(), raw.size(),
140 compressed_output);
141 default:
142 // Do not recognize this compression type
143 return false;
144 }
145 }
146
147 } // namespace
148
149 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)150 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
151 CompressionType* type, uint32_t format_version,
152 bool do_sample, std::string* compressed_output,
153 std::string* sampled_output_fast,
154 std::string* sampled_output_slow) {
155 *type = info.type();
156
157 if (info.type() == kNoCompression && !info.SampleForCompression()) {
158 return raw;
159 }
160
161 // If requested, we sample one in every N block with a
162 // fast and slow compression algorithm and report the stats.
163 // The users can use these stats to decide if it is worthwhile
164 // enabling compression and they also get a hint about which
165 // compression algorithm wil be beneficial.
166 if (do_sample && info.SampleForCompression() &&
167 Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
168 sampled_output_fast && sampled_output_slow) {
169 // Sampling with a fast compression algorithm
170 if (LZ4_Supported() || Snappy_Supported()) {
171 CompressionType c =
172 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
173 CompressionContext context(c);
174 CompressionOptions options;
175 CompressionInfo info_tmp(options, context,
176 CompressionDict::GetEmptyDict(), c,
177 info.SampleForCompression());
178
179 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
180 }
181
182 // Sampling with a slow but high-compression algorithm
183 if (ZSTD_Supported() || Zlib_Supported()) {
184 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
185 CompressionContext context(c);
186 CompressionOptions options;
187 CompressionInfo info_tmp(options, context,
188 CompressionDict::GetEmptyDict(), c,
189 info.SampleForCompression());
190 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
191 }
192 }
193
194 // Actually compress the data
195 if (*type != kNoCompression) {
196 if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
197 GoodCompressionRatio(compressed_output->size(), raw.size())) {
198 return *compressed_output;
199 }
200 }
201
202 // Compression method is not supported, or not good
203 // compression ratio, so just fall back to uncompressed form.
204 *type = kNoCompression;
205 return raw;
206 }
207
208 // kBlockBasedTableMagicNumber was picked by running
209 // echo rocksdb.table.block_based | sha1sum
210 // and taking the leading 64 bits.
211 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
212 // .cc files
213 // for that reason we declare it extern in the header but to get the space
214 // allocated
215 // it must be not extern in one place.
216 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
217 // We also support reading and writing legacy block based table format (for
218 // backwards compatibility)
219 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
220
221 // A collector that collects properties of interest to block-based table.
222 // For now this class looks heavy-weight since we only write one additional
223 // property.
224 // But in the foreseeable future, we will add more and more properties that are
225 // specific to block-based table.
226 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
227 : public IntTblPropCollector {
228 public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)229 explicit BlockBasedTablePropertiesCollector(
230 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
231 bool prefix_filtering)
232 : index_type_(index_type),
233 whole_key_filtering_(whole_key_filtering),
234 prefix_filtering_(prefix_filtering) {}
235
InternalAdd(const Slice &,const Slice &,uint64_t)236 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
237 uint64_t /*file_size*/) override {
238 // Intentionally left blank. Have no interest in collecting stats for
239 // individual key/value pairs.
240 return Status::OK();
241 }
242
BlockAdd(uint64_t,uint64_t,uint64_t)243 virtual void BlockAdd(uint64_t /* blockRawBytes */,
244 uint64_t /* blockCompressedBytesFast */,
245 uint64_t /* blockCompressedBytesSlow */) override {
246 // Intentionally left blank. No interest in collecting stats for
247 // blocks.
248 return;
249 }
250
Finish(UserCollectedProperties * properties)251 Status Finish(UserCollectedProperties* properties) override {
252 std::string val;
253 PutFixed32(&val, static_cast<uint32_t>(index_type_));
254 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
255 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
256 whole_key_filtering_ ? kPropTrue : kPropFalse});
257 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
258 prefix_filtering_ ? kPropTrue : kPropFalse});
259 return Status::OK();
260 }
261
262 // The name of the properties collector can be used for debugging purpose.
Name() const263 const char* Name() const override {
264 return "BlockBasedTablePropertiesCollector";
265 }
266
GetReadableProperties() const267 UserCollectedProperties GetReadableProperties() const override {
268 // Intentionally left blank.
269 return UserCollectedProperties();
270 }
271
272 private:
273 BlockBasedTableOptions::IndexType index_type_;
274 bool whole_key_filtering_;
275 bool prefix_filtering_;
276 };
277
278 struct BlockBasedTableBuilder::Rep {
279 const ImmutableCFOptions ioptions;
280 const MutableCFOptions moptions;
281 const BlockBasedTableOptions table_options;
282 const InternalKeyComparator& internal_comparator;
283 WritableFileWriter* file;
284 uint64_t offset = 0;
285 Status status;
286 size_t alignment;
287 BlockBuilder data_block;
288 // Buffers uncompressed data blocks and keys to replay later. Needed when
289 // compression dictionary is enabled so we can finalize the dictionary before
290 // compressing any data blocks.
291 // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
292 // blocks as it's redundant, but it's easier to implement for now.
293 std::vector<std::pair<std::string, std::vector<std::string>>>
294 data_block_and_keys_buffers;
295 BlockBuilder range_del_block;
296
297 InternalKeySliceTransform internal_prefix_transform;
298 std::unique_ptr<IndexBuilder> index_builder;
299 PartitionedIndexBuilder* p_index_builder_ = nullptr;
300
301 std::string last_key;
302 CompressionType compression_type;
303 uint64_t sample_for_compression;
304 CompressionOptions compression_opts;
305 std::unique_ptr<CompressionDict> compression_dict;
306 CompressionContext compression_ctx;
307 std::unique_ptr<UncompressionContext> verify_ctx;
308 std::unique_ptr<UncompressionDict> verify_dict;
309
310 size_t data_begin_offset = 0;
311
312 TableProperties props;
313
314 // States of the builder.
315 //
316 // - `kBuffered`: This is the initial state where zero or more data blocks are
317 // accumulated uncompressed in-memory. From this state, call
318 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
319 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
320 // state.
321 //
322 // - `kUnbuffered`: This is the state when compression dictionary is finalized
323 // either because it wasn't enabled in the first place or it's been created
324 // from sampling previously buffered data. In this state, blocks are simply
325 // compressed/written out as they fill up. From this state, call `Finish()`
326 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
327 // the partially created file.
328 //
329 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
330 // called, so the table builder is no longer usable. We must be in this
331 // state by the time the destructor runs.
332 enum class State {
333 kBuffered,
334 kUnbuffered,
335 kClosed,
336 };
337 State state;
338
339 const bool use_delta_encoding_for_index_values;
340 std::unique_ptr<FilterBlockBuilder> filter_builder;
341 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
342 size_t compressed_cache_key_prefix_size;
343
344 BlockHandle pending_handle; // Handle to add to index block
345
346 std::string compressed_output;
347 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
348 int level_at_creation;
349 uint32_t column_family_id;
350 const std::string& column_family_name;
351 uint64_t creation_time = 0;
352 uint64_t oldest_key_time = 0;
353 const uint64_t target_file_size;
354 uint64_t file_creation_time = 0;
355
356 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
357
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep358 Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
359 const BlockBasedTableOptions& table_opt,
360 const InternalKeyComparator& icomparator,
361 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
362 int_tbl_prop_collector_factories,
363 uint32_t _column_family_id, WritableFileWriter* f,
364 const CompressionType _compression_type,
365 const uint64_t _sample_for_compression,
366 const CompressionOptions& _compression_opts, const bool skip_filters,
367 const int _level_at_creation, const std::string& _column_family_name,
368 const uint64_t _creation_time, const uint64_t _oldest_key_time,
369 const uint64_t _target_file_size, const uint64_t _file_creation_time)
370 : ioptions(_ioptions),
371 moptions(_moptions),
372 table_options(table_opt),
373 internal_comparator(icomparator),
374 file(f),
375 alignment(table_options.block_align
376 ? std::min(table_options.block_size, kDefaultPageSize)
377 : 0),
378 data_block(table_options.block_restart_interval,
379 table_options.use_delta_encoding,
380 false /* use_value_delta_encoding */,
381 icomparator.user_comparator()
382 ->CanKeysWithDifferentByteContentsBeEqual()
383 ? BlockBasedTableOptions::kDataBlockBinarySearch
384 : table_options.data_block_index_type,
385 table_options.data_block_hash_table_util_ratio),
386 range_del_block(1 /* block_restart_interval */),
387 internal_prefix_transform(_moptions.prefix_extractor.get()),
388 compression_type(_compression_type),
389 sample_for_compression(_sample_for_compression),
390 compression_opts(_compression_opts),
391 compression_dict(),
392 compression_ctx(_compression_type),
393 verify_dict(),
394 state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
395 : State::kUnbuffered),
396 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
397 !table_opt.block_align),
398 compressed_cache_key_prefix_size(0),
399 flush_block_policy(
400 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
401 table_options, data_block)),
402 level_at_creation(_level_at_creation),
403 column_family_id(_column_family_id),
404 column_family_name(_column_family_name),
405 creation_time(_creation_time),
406 oldest_key_time(_oldest_key_time),
407 target_file_size(_target_file_size),
408 file_creation_time(_file_creation_time) {
409 if (table_options.index_type ==
410 BlockBasedTableOptions::kTwoLevelIndexSearch) {
411 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
412 &internal_comparator, use_delta_encoding_for_index_values,
413 table_options);
414 index_builder.reset(p_index_builder_);
415 } else {
416 index_builder.reset(IndexBuilder::CreateIndexBuilder(
417 table_options.index_type, &internal_comparator,
418 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
419 table_options));
420 }
421 if (skip_filters) {
422 filter_builder = nullptr;
423 } else {
424 FilterBuildingContext context(table_options);
425 context.column_family_name = column_family_name;
426 context.compaction_style = ioptions.compaction_style;
427 context.level_at_creation = level_at_creation;
428 context.info_log = ioptions.info_log;
429 filter_builder.reset(CreateFilterBlockBuilder(
430 ioptions, moptions, context, use_delta_encoding_for_index_values,
431 p_index_builder_));
432 }
433
434 for (auto& collector_factories : *int_tbl_prop_collector_factories) {
435 table_properties_collectors.emplace_back(
436 collector_factories->CreateIntTblPropCollector(column_family_id));
437 }
438 table_properties_collectors.emplace_back(
439 new BlockBasedTablePropertiesCollector(
440 table_options.index_type, table_options.whole_key_filtering,
441 _moptions.prefix_extractor != nullptr));
442 if (table_options.verify_compression) {
443 verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
444 compression_type));
445 }
446 }
447
448 Rep(const Rep&) = delete;
449 Rep& operator=(const Rep&) = delete;
450
~RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep451 ~Rep() {}
452 };
453
BlockBasedTableBuilder(const ImmutableCFOptions & ioptions,const MutableCFOptions & moptions,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,const std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories,uint32_t column_family_id,WritableFileWriter * file,const CompressionType compression_type,const uint64_t sample_for_compression,const CompressionOptions & compression_opts,const bool skip_filters,const std::string & column_family_name,const int level_at_creation,const uint64_t creation_time,const uint64_t oldest_key_time,const uint64_t target_file_size,const uint64_t file_creation_time)454 BlockBasedTableBuilder::BlockBasedTableBuilder(
455 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
456 const BlockBasedTableOptions& table_options,
457 const InternalKeyComparator& internal_comparator,
458 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
459 int_tbl_prop_collector_factories,
460 uint32_t column_family_id, WritableFileWriter* file,
461 const CompressionType compression_type,
462 const uint64_t sample_for_compression,
463 const CompressionOptions& compression_opts, const bool skip_filters,
464 const std::string& column_family_name, const int level_at_creation,
465 const uint64_t creation_time, const uint64_t oldest_key_time,
466 const uint64_t target_file_size, const uint64_t file_creation_time) {
467 BlockBasedTableOptions sanitized_table_options(table_options);
468 if (sanitized_table_options.format_version == 0 &&
469 sanitized_table_options.checksum != kCRC32c) {
470 ROCKS_LOG_WARN(
471 ioptions.info_log,
472 "Silently converting format_version to 1 because checksum is "
473 "non-default");
474 // silently convert format_version to 1 to keep consistent with current
475 // behavior
476 sanitized_table_options.format_version = 1;
477 }
478
479 rep_ = new Rep(ioptions, moptions, sanitized_table_options,
480 internal_comparator, int_tbl_prop_collector_factories,
481 column_family_id, file, compression_type,
482 sample_for_compression, compression_opts, skip_filters,
483 level_at_creation, column_family_name, creation_time,
484 oldest_key_time, target_file_size, file_creation_time);
485
486 if (rep_->filter_builder != nullptr) {
487 rep_->filter_builder->StartBlock(0);
488 }
489 if (table_options.block_cache_compressed.get() != nullptr) {
490 BlockBasedTable::GenerateCachePrefix(
491 table_options.block_cache_compressed.get(), file->writable_file(),
492 &rep_->compressed_cache_key_prefix[0],
493 &rep_->compressed_cache_key_prefix_size);
494 }
495 }
496
~BlockBasedTableBuilder()497 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
498 // Catch errors where caller forgot to call Finish()
499 assert(rep_->state == Rep::State::kClosed);
500 delete rep_;
501 }
502
Add(const Slice & key,const Slice & value)503 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
504 Rep* r = rep_;
505 assert(rep_->state != Rep::State::kClosed);
506 if (!ok()) return;
507 ValueType value_type = ExtractValueType(key);
508 if (IsValueType(value_type)) {
509 #ifndef NDEBUG
510 if (r->props.num_entries > r->props.num_range_deletions) {
511 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
512 }
513 #endif // NDEBUG
514
515 auto should_flush = r->flush_block_policy->Update(key, value);
516 if (should_flush) {
517 assert(!r->data_block.empty());
518 Flush();
519
520 if (r->state == Rep::State::kBuffered &&
521 r->data_begin_offset > r->target_file_size) {
522 EnterUnbuffered();
523 }
524
525 // Add item to index block.
526 // We do not emit the index entry for a block until we have seen the
527 // first key for the next data block. This allows us to use shorter
528 // keys in the index block. For example, consider a block boundary
529 // between the keys "the quick brown fox" and "the who". We can use
530 // "the r" as the key for the index block entry since it is >= all
531 // entries in the first block and < all entries in subsequent
532 // blocks.
533 if (ok() && r->state == Rep::State::kUnbuffered) {
534 r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
535 }
536 }
537
538 // Note: PartitionedFilterBlockBuilder requires key being added to filter
539 // builder after being added to index builder.
540 if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
541 size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
542 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
543 }
544
545 r->last_key.assign(key.data(), key.size());
546 r->data_block.Add(key, value);
547 if (r->state == Rep::State::kBuffered) {
548 // Buffer keys to be replayed during `Finish()` once compression
549 // dictionary has been finalized.
550 if (r->data_block_and_keys_buffers.empty() || should_flush) {
551 r->data_block_and_keys_buffers.emplace_back();
552 }
553 r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
554 } else {
555 r->index_builder->OnKeyAdded(key);
556 }
557 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
558 r->table_properties_collectors,
559 r->ioptions.info_log);
560
561 } else if (value_type == kTypeRangeDeletion) {
562 r->range_del_block.Add(key, value);
563 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
564 r->table_properties_collectors,
565 r->ioptions.info_log);
566 } else {
567 assert(false);
568 }
569
570 r->props.num_entries++;
571 r->props.raw_key_size += key.size();
572 r->props.raw_value_size += value.size();
573 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
574 r->props.num_deletions++;
575 } else if (value_type == kTypeRangeDeletion) {
576 r->props.num_deletions++;
577 r->props.num_range_deletions++;
578 } else if (value_type == kTypeMerge) {
579 r->props.num_merge_operands++;
580 }
581 }
582
Flush()583 void BlockBasedTableBuilder::Flush() {
584 Rep* r = rep_;
585 assert(rep_->state != Rep::State::kClosed);
586 if (!ok()) return;
587 if (r->data_block.empty()) return;
588 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
589 }
590
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)591 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
592 BlockHandle* handle,
593 bool is_data_block) {
594 WriteBlock(block->Finish(), handle, is_data_block);
595 block->Reset();
596 }
597
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)598 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
599 BlockHandle* handle,
600 bool is_data_block) {
601 // File format contains a sequence of blocks where each block has:
602 // block_data: uint8[n]
603 // type: uint8
604 // crc: uint32
605 assert(ok());
606 Rep* r = rep_;
607
608 auto type = r->compression_type;
609 uint64_t sample_for_compression = r->sample_for_compression;
610 Slice block_contents;
611 bool abort_compression = false;
612
613 StopWatchNano timer(
614 r->ioptions.env,
615 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
616
617 if (r->state == Rep::State::kBuffered) {
618 assert(is_data_block);
619 assert(!r->data_block_and_keys_buffers.empty());
620 r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
621 r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
622 return;
623 }
624
625 if (raw_block_contents.size() < kCompressionSizeLimit) {
626 const CompressionDict* compression_dict;
627 if (!is_data_block || r->compression_dict == nullptr) {
628 compression_dict = &CompressionDict::GetEmptyDict();
629 } else {
630 compression_dict = r->compression_dict.get();
631 }
632 assert(compression_dict != nullptr);
633 CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
634 *compression_dict, type,
635 sample_for_compression);
636
637 std::string sampled_output_fast;
638 std::string sampled_output_slow;
639 block_contents = CompressBlock(
640 raw_block_contents, compression_info, &type,
641 r->table_options.format_version, is_data_block /* do_sample */,
642 &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
643
644 // notify collectors on block add
645 NotifyCollectTableCollectorsOnBlockAdd(
646 r->table_properties_collectors, raw_block_contents.size(),
647 sampled_output_fast.size(), sampled_output_slow.size());
648
649 // Some of the compression algorithms are known to be unreliable. If
650 // the verify_compression flag is set then try to de-compress the
651 // compressed data and compare to the input.
652 if (type != kNoCompression && r->table_options.verify_compression) {
653 // Retrieve the uncompressed contents into a new buffer
654 const UncompressionDict* verify_dict;
655 if (!is_data_block || r->verify_dict == nullptr) {
656 verify_dict = &UncompressionDict::GetEmptyDict();
657 } else {
658 verify_dict = r->verify_dict.get();
659 }
660 assert(verify_dict != nullptr);
661 BlockContents contents;
662 UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
663 r->compression_type);
664 Status stat = UncompressBlockContentsForCompressionType(
665 uncompression_info, block_contents.data(), block_contents.size(),
666 &contents, r->table_options.format_version, r->ioptions);
667
668 if (stat.ok()) {
669 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
670 if (!compressed_ok) {
671 // The result of the compression was invalid. abort.
672 abort_compression = true;
673 ROCKS_LOG_ERROR(r->ioptions.info_log,
674 "Decompressed block did not match raw block");
675 r->status =
676 Status::Corruption("Decompressed block did not match raw block");
677 }
678 } else {
679 // Decompression reported an error. abort.
680 r->status = Status::Corruption("Could not decompress");
681 abort_compression = true;
682 }
683 }
684 } else {
685 // Block is too big to be compressed.
686 abort_compression = true;
687 }
688
689 // Abort compression if the block is too big, or did not pass
690 // verification.
691 if (abort_compression) {
692 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
693 type = kNoCompression;
694 block_contents = raw_block_contents;
695 } else if (type != kNoCompression) {
696 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
697 RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
698 timer.ElapsedNanos());
699 }
700 RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
701 raw_block_contents.size());
702 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
703 } else if (type != r->compression_type) {
704 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
705 }
706
707 WriteRawBlock(block_contents, type, handle, is_data_block);
708 r->compressed_output.clear();
709 if (is_data_block) {
710 if (r->filter_builder != nullptr) {
711 r->filter_builder->StartBlock(r->offset);
712 }
713 r->props.data_size = r->offset;
714 ++r->props.num_data_blocks;
715 }
716 }
717
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)718 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
719 CompressionType type,
720 BlockHandle* handle,
721 bool is_data_block) {
722 Rep* r = rep_;
723 StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
724 handle->set_offset(r->offset);
725 handle->set_size(block_contents.size());
726 assert(r->status.ok());
727 r->status = r->file->Append(block_contents);
728 if (r->status.ok()) {
729 char trailer[kBlockTrailerSize];
730 trailer[0] = type;
731 char* trailer_without_type = trailer + 1;
732 switch (r->table_options.checksum) {
733 case kNoChecksum:
734 EncodeFixed32(trailer_without_type, 0);
735 break;
736 case kCRC32c: {
737 auto crc = crc32c::Value(block_contents.data(), block_contents.size());
738 crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
739 EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
740 break;
741 }
742 case kxxHash: {
743 XXH32_state_t* const state = XXH32_createState();
744 XXH32_reset(state, 0);
745 XXH32_update(state, block_contents.data(),
746 static_cast<uint32_t>(block_contents.size()));
747 XXH32_update(state, trailer, 1); // Extend to cover block type
748 EncodeFixed32(trailer_without_type, XXH32_digest(state));
749 XXH32_freeState(state);
750 break;
751 }
752 case kxxHash64: {
753 XXH64_state_t* const state = XXH64_createState();
754 XXH64_reset(state, 0);
755 XXH64_update(state, block_contents.data(),
756 static_cast<uint32_t>(block_contents.size()));
757 XXH64_update(state, trailer, 1); // Extend to cover block type
758 EncodeFixed32(
759 trailer_without_type,
760 static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
761 uint64_t{0xffffffff}));
762 XXH64_freeState(state);
763 break;
764 }
765 }
766
767 assert(r->status.ok());
768 TEST_SYNC_POINT_CALLBACK(
769 "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
770 static_cast<char*>(trailer));
771 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
772 if (r->status.ok()) {
773 r->status = InsertBlockInCache(block_contents, type, handle);
774 }
775 if (r->status.ok()) {
776 r->offset += block_contents.size() + kBlockTrailerSize;
777 if (r->table_options.block_align && is_data_block) {
778 size_t pad_bytes =
779 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
780 (r->alignment - 1))) &
781 (r->alignment - 1);
782 r->status = r->file->Pad(pad_bytes);
783 if (r->status.ok()) {
784 r->offset += pad_bytes;
785 }
786 }
787 }
788 }
789 }
790
status() const791 Status BlockBasedTableBuilder::status() const { return rep_->status; }
792
DeleteCachedBlockContents(const Slice &,void * value)793 static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
794 BlockContents* bc = reinterpret_cast<BlockContents*>(value);
795 delete bc;
796 }
797
798 //
799 // Make a copy of the block contents and insert into compressed block cache
800 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)801 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
802 const CompressionType type,
803 const BlockHandle* handle) {
804 Rep* r = rep_;
805 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
806
807 if (type != kNoCompression && block_cache_compressed != nullptr) {
808 size_t size = block_contents.size();
809
810 auto ubuf =
811 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
812 memcpy(ubuf.get(), block_contents.data(), size);
813 ubuf[size] = type;
814
815 BlockContents* block_contents_to_cache =
816 new BlockContents(std::move(ubuf), size);
817 #ifndef NDEBUG
818 block_contents_to_cache->is_raw_block = true;
819 #endif // NDEBUG
820
821 // make cache key by appending the file offset to the cache prefix id
822 char* end = EncodeVarint64(
823 r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
824 handle->offset());
825 Slice key(r->compressed_cache_key_prefix,
826 static_cast<size_t>(end - r->compressed_cache_key_prefix));
827
828 // Insert into compressed block cache.
829 block_cache_compressed->Insert(
830 key, block_contents_to_cache,
831 block_contents_to_cache->ApproximateMemoryUsage(),
832 &DeleteCachedBlockContents);
833
834 // Invalidate OS cache.
835 r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
836 }
837 return Status::OK();
838 }
839
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)840 void BlockBasedTableBuilder::WriteFilterBlock(
841 MetaIndexBuilder* meta_index_builder) {
842 BlockHandle filter_block_handle;
843 bool empty_filter_block = (rep_->filter_builder == nullptr ||
844 rep_->filter_builder->NumAdded() == 0);
845 if (ok() && !empty_filter_block) {
846 Status s = Status::Incomplete();
847 while (ok() && s.IsIncomplete()) {
848 Slice filter_content =
849 rep_->filter_builder->Finish(filter_block_handle, &s);
850 assert(s.ok() || s.IsIncomplete());
851 rep_->props.filter_size += filter_content.size();
852 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
853 }
854 }
855 if (ok() && !empty_filter_block) {
856 // Add mapping from "<filter_block_prefix>.Name" to location
857 // of filter data.
858 std::string key;
859 if (rep_->filter_builder->IsBlockBased()) {
860 key = BlockBasedTable::kFilterBlockPrefix;
861 } else {
862 key = rep_->table_options.partition_filters
863 ? BlockBasedTable::kPartitionedFilterBlockPrefix
864 : BlockBasedTable::kFullFilterBlockPrefix;
865 }
866 key.append(rep_->table_options.filter_policy->Name());
867 meta_index_builder->Add(key, filter_block_handle);
868 }
869 }
870
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)871 void BlockBasedTableBuilder::WriteIndexBlock(
872 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
873 IndexBuilder::IndexBlocks index_blocks;
874 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
875 if (index_builder_status.IsIncomplete()) {
876 // We we have more than one index partition then meta_blocks are not
877 // supported for the index. Currently meta_blocks are used only by
878 // HashIndexBuilder which is not multi-partition.
879 assert(index_blocks.meta_blocks.empty());
880 } else if (ok() && !index_builder_status.ok()) {
881 rep_->status = index_builder_status;
882 }
883 if (ok()) {
884 for (const auto& item : index_blocks.meta_blocks) {
885 BlockHandle block_handle;
886 WriteBlock(item.second, &block_handle, false /* is_data_block */);
887 if (!ok()) {
888 break;
889 }
890 meta_index_builder->Add(item.first, block_handle);
891 }
892 }
893 if (ok()) {
894 if (rep_->table_options.enable_index_compression) {
895 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
896 } else {
897 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
898 index_block_handle);
899 }
900 }
901 // If there are more index partitions, finish them and write them out
902 Status s = index_builder_status;
903 while (ok() && s.IsIncomplete()) {
904 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
905 if (!s.ok() && !s.IsIncomplete()) {
906 rep_->status = s;
907 return;
908 }
909 if (rep_->table_options.enable_index_compression) {
910 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
911 } else {
912 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
913 index_block_handle);
914 }
915 // The last index_block_handle will be for the partition index block
916 }
917 }
918
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)919 void BlockBasedTableBuilder::WritePropertiesBlock(
920 MetaIndexBuilder* meta_index_builder) {
921 BlockHandle properties_block_handle;
922 if (ok()) {
923 PropertyBlockBuilder property_block_builder;
924 rep_->props.column_family_id = rep_->column_family_id;
925 rep_->props.column_family_name = rep_->column_family_name;
926 rep_->props.filter_policy_name =
927 rep_->table_options.filter_policy != nullptr
928 ? rep_->table_options.filter_policy->Name()
929 : "";
930 rep_->props.index_size =
931 rep_->index_builder->IndexSize() + kBlockTrailerSize;
932 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
933 ? rep_->ioptions.user_comparator->Name()
934 : "nullptr";
935 rep_->props.merge_operator_name =
936 rep_->ioptions.merge_operator != nullptr
937 ? rep_->ioptions.merge_operator->Name()
938 : "nullptr";
939 rep_->props.compression_name =
940 CompressionTypeToString(rep_->compression_type);
941 rep_->props.compression_options =
942 CompressionOptionsToString(rep_->compression_opts);
943 rep_->props.prefix_extractor_name =
944 rep_->moptions.prefix_extractor != nullptr
945 ? rep_->moptions.prefix_extractor->Name()
946 : "nullptr";
947
948 std::string property_collectors_names = "[";
949 for (size_t i = 0;
950 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
951 if (i != 0) {
952 property_collectors_names += ",";
953 }
954 property_collectors_names +=
955 rep_->ioptions.table_properties_collector_factories[i]->Name();
956 }
957 property_collectors_names += "]";
958 rep_->props.property_collectors_names = property_collectors_names;
959 if (rep_->table_options.index_type ==
960 BlockBasedTableOptions::kTwoLevelIndexSearch) {
961 assert(rep_->p_index_builder_ != nullptr);
962 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
963 rep_->props.top_level_index_size =
964 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
965 }
966 rep_->props.index_key_is_user_key =
967 !rep_->index_builder->seperator_is_key_plus_seq();
968 rep_->props.index_value_is_delta_encoded =
969 rep_->use_delta_encoding_for_index_values;
970 rep_->props.creation_time = rep_->creation_time;
971 rep_->props.oldest_key_time = rep_->oldest_key_time;
972 rep_->props.file_creation_time = rep_->file_creation_time;
973
974 // Add basic properties
975 property_block_builder.AddTableProperty(rep_->props);
976
977 // Add use collected properties
978 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
979 rep_->ioptions.info_log,
980 &property_block_builder);
981
982 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
983 &properties_block_handle);
984 }
985 if (ok()) {
986 #ifndef NDEBUG
987 {
988 uint64_t props_block_offset = properties_block_handle.offset();
989 uint64_t props_block_size = properties_block_handle.size();
990 TEST_SYNC_POINT_CALLBACK(
991 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
992 &props_block_offset);
993 TEST_SYNC_POINT_CALLBACK(
994 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
995 &props_block_size);
996 }
997 #endif // !NDEBUG
998 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
999 }
1000 }
1001
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1002 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1003 MetaIndexBuilder* meta_index_builder) {
1004 if (rep_->compression_dict != nullptr &&
1005 rep_->compression_dict->GetRawDict().size()) {
1006 BlockHandle compression_dict_block_handle;
1007 if (ok()) {
1008 WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1009 &compression_dict_block_handle);
1010 #ifndef NDEBUG
1011 Slice compression_dict = rep_->compression_dict->GetRawDict();
1012 TEST_SYNC_POINT_CALLBACK(
1013 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1014 &compression_dict);
1015 #endif // NDEBUG
1016 }
1017 if (ok()) {
1018 meta_index_builder->Add(kCompressionDictBlock,
1019 compression_dict_block_handle);
1020 }
1021 }
1022 }
1023
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1024 void BlockBasedTableBuilder::WriteRangeDelBlock(
1025 MetaIndexBuilder* meta_index_builder) {
1026 if (ok() && !rep_->range_del_block.empty()) {
1027 BlockHandle range_del_block_handle;
1028 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1029 &range_del_block_handle);
1030 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1031 }
1032 }
1033
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1034 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1035 BlockHandle& index_block_handle) {
1036 Rep* r = rep_;
1037 // No need to write out new footer if we're using default checksum.
1038 // We're writing legacy magic number because we want old versions of RocksDB
1039 // be able to read files generated with new release (just in case if
1040 // somebody wants to roll back after an upgrade)
1041 // TODO(icanadi) at some point in the future, when we're absolutely sure
1042 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1043 // number and always write new table files with new magic number
1044 bool legacy = (r->table_options.format_version == 0);
1045 // this is guaranteed by BlockBasedTableBuilder's constructor
1046 assert(r->table_options.checksum == kCRC32c ||
1047 r->table_options.format_version != 0);
1048 Footer footer(
1049 legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1050 r->table_options.format_version);
1051 footer.set_metaindex_handle(metaindex_block_handle);
1052 footer.set_index_handle(index_block_handle);
1053 footer.set_checksum(r->table_options.checksum);
1054 std::string footer_encoding;
1055 footer.EncodeTo(&footer_encoding);
1056 assert(r->status.ok());
1057 r->status = r->file->Append(footer_encoding);
1058 if (r->status.ok()) {
1059 r->offset += footer_encoding.size();
1060 }
1061 }
1062
EnterUnbuffered()1063 void BlockBasedTableBuilder::EnterUnbuffered() {
1064 Rep* r = rep_;
1065 assert(r->state == Rep::State::kBuffered);
1066 r->state = Rep::State::kUnbuffered;
1067 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1068 ? r->compression_opts.zstd_max_train_bytes
1069 : r->compression_opts.max_dict_bytes;
1070 Random64 generator{r->creation_time};
1071 std::string compression_dict_samples;
1072 std::vector<size_t> compression_dict_sample_lens;
1073 if (!r->data_block_and_keys_buffers.empty()) {
1074 while (compression_dict_samples.size() < kSampleBytes) {
1075 size_t rand_idx =
1076 static_cast<size_t>(
1077 generator.Uniform(r->data_block_and_keys_buffers.size()));
1078 size_t copy_len =
1079 std::min(kSampleBytes - compression_dict_samples.size(),
1080 r->data_block_and_keys_buffers[rand_idx].first.size());
1081 compression_dict_samples.append(
1082 r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1083 compression_dict_sample_lens.emplace_back(copy_len);
1084 }
1085 }
1086
1087 // final data block flushed, now we can generate dictionary from the samples.
1088 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1089 std::string dict;
1090 if (r->compression_opts.zstd_max_train_bytes > 0) {
1091 dict = ZSTD_TrainDictionary(compression_dict_samples,
1092 compression_dict_sample_lens,
1093 r->compression_opts.max_dict_bytes);
1094 } else {
1095 dict = std::move(compression_dict_samples);
1096 }
1097 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1098 r->compression_opts.level));
1099 r->verify_dict.reset(new UncompressionDict(
1100 dict, r->compression_type == kZSTD ||
1101 r->compression_type == kZSTDNotFinalCompression));
1102
1103 for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
1104 const auto& data_block = r->data_block_and_keys_buffers[i].first;
1105 auto& keys = r->data_block_and_keys_buffers[i].second;
1106 assert(!data_block.empty());
1107 assert(!keys.empty());
1108
1109 for (const auto& key : keys) {
1110 if (r->filter_builder != nullptr) {
1111 size_t ts_sz =
1112 r->internal_comparator.user_comparator()->timestamp_size();
1113 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1114 }
1115 r->index_builder->OnKeyAdded(key);
1116 }
1117 WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
1118 if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1119 Slice first_key_in_next_block =
1120 r->data_block_and_keys_buffers[i + 1].second.front();
1121 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1122 r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
1123 r->pending_handle);
1124 }
1125 }
1126 r->data_block_and_keys_buffers.clear();
1127 }
1128
Finish()1129 Status BlockBasedTableBuilder::Finish() {
1130 Rep* r = rep_;
1131 assert(r->state != Rep::State::kClosed);
1132 bool empty_data_block = r->data_block.empty();
1133 Flush();
1134 if (r->state == Rep::State::kBuffered) {
1135 EnterUnbuffered();
1136 }
1137 // To make sure properties block is able to keep the accurate size of index
1138 // block, we will finish writing all index entries first.
1139 if (ok() && !empty_data_block) {
1140 r->index_builder->AddIndexEntry(
1141 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1142 }
1143
1144 // Write meta blocks, metaindex block and footer in the following order.
1145 // 1. [meta block: filter]
1146 // 2. [meta block: index]
1147 // 3. [meta block: compression dictionary]
1148 // 4. [meta block: range deletion tombstone]
1149 // 5. [meta block: properties]
1150 // 6. [metaindex block]
1151 // 7. Footer
1152 BlockHandle metaindex_block_handle, index_block_handle;
1153 MetaIndexBuilder meta_index_builder;
1154 WriteFilterBlock(&meta_index_builder);
1155 WriteIndexBlock(&meta_index_builder, &index_block_handle);
1156 WriteCompressionDictBlock(&meta_index_builder);
1157 WriteRangeDelBlock(&meta_index_builder);
1158 WritePropertiesBlock(&meta_index_builder);
1159 if (ok()) {
1160 // flush the meta index block
1161 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1162 &metaindex_block_handle);
1163 }
1164 if (ok()) {
1165 WriteFooter(metaindex_block_handle, index_block_handle);
1166 }
1167 if (r->file != nullptr) {
1168 file_checksum_ = r->file->GetFileChecksum();
1169 }
1170 r->state = Rep::State::kClosed;
1171 return r->status;
1172 }
1173
Abandon()1174 void BlockBasedTableBuilder::Abandon() {
1175 assert(rep_->state != Rep::State::kClosed);
1176 rep_->state = Rep::State::kClosed;
1177 }
1178
NumEntries() const1179 uint64_t BlockBasedTableBuilder::NumEntries() const {
1180 return rep_->props.num_entries;
1181 }
1182
FileSize() const1183 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1184
NeedCompact() const1185 bool BlockBasedTableBuilder::NeedCompact() const {
1186 for (const auto& collector : rep_->table_properties_collectors) {
1187 if (collector->NeedCompact()) {
1188 return true;
1189 }
1190 }
1191 return false;
1192 }
1193
GetTableProperties() const1194 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1195 TableProperties ret = rep_->props;
1196 for (const auto& collector : rep_->table_properties_collectors) {
1197 for (const auto& prop : collector->GetReadableProperties()) {
1198 ret.readable_properties.insert(prop);
1199 }
1200 collector->Finish(&ret.user_collected_properties);
1201 }
1202 return ret;
1203 }
1204
GetFileChecksumFuncName() const1205 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1206 if (rep_->file != nullptr) {
1207 return rep_->file->GetFileChecksumFuncName();
1208 } else {
1209 return kUnknownFileChecksumFuncName.c_str();
1210 }
1211 }
1212
1213 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1214 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1215 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1216 "partitionedfilter.";
1217 } // namespace ROCKSDB_NAMESPACE
1218