1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_based/block_based_table_builder.h"
11
12 #include <assert.h>
13 #include <stdio.h>
14
15 #include <atomic>
16 #include <list>
17 #include <map>
18 #include <memory>
19 #include <numeric>
20 #include <string>
21 #include <unordered_map>
22 #include <utility>
23
24 #include "db/dbformat.h"
25 #include "index_builder.h"
26 #include "memory/memory_allocator.h"
27 #include "rocksdb/cache.h"
28 #include "rocksdb/comparator.h"
29 #include "rocksdb/env.h"
30 #include "rocksdb/filter_policy.h"
31 #include "rocksdb/flush_block_policy.h"
32 #include "rocksdb/merge_operator.h"
33 #include "rocksdb/table.h"
34 #include "table/block_based/block.h"
35 #include "table/block_based/block_based_filter_block.h"
36 #include "table/block_based/block_based_table_factory.h"
37 #include "table/block_based/block_based_table_reader.h"
38 #include "table/block_based/block_builder.h"
39 #include "table/block_based/filter_block.h"
40 #include "table/block_based/filter_policy_internal.h"
41 #include "table/block_based/full_filter_block.h"
42 #include "table/block_based/partitioned_filter_block.h"
43 #include "table/format.h"
44 #include "table/table_builder.h"
45 #include "util/coding.h"
46 #include "util/compression.h"
47 #include "util/crc32c.h"
48 #include "util/stop_watch.h"
49 #include "util/string_util.h"
50 #include "util/work_queue.h"
51 #include "util/xxhash.h"
52
53 namespace ROCKSDB_NAMESPACE {
54
55 extern const std::string kHashIndexPrefixesBlock;
56 extern const std::string kHashIndexPrefixesMetadataBlock;
57
58
59 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60 namespace {
61
62 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)63 FilterBlockBuilder* CreateFilterBlockBuilder(
64 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
65 const FilterBuildingContext& context,
66 const bool use_delta_encoding_for_index_values,
67 PartitionedIndexBuilder* const p_index_builder) {
68 const BlockBasedTableOptions& table_opt = context.table_options;
69 assert(table_opt.filter_policy); // precondition
70
71 FilterBitsBuilder* filter_bits_builder =
72 BloomFilterPolicy::GetBuilderFromContext(context);
73 if (filter_bits_builder == nullptr) {
74 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75 table_opt);
76 } else {
77 if (table_opt.partition_filters) {
78 assert(p_index_builder != nullptr);
79 // Since after partition cut request from filter builder it takes time
80 // until index builder actully cuts the partition, until the end of a
81 // data block potentially with many keys, we take the lower bound as
82 // partition size.
83 assert(table_opt.block_size_deviation <= 100);
84 auto partition_size =
85 static_cast<uint32_t>(((table_opt.metadata_block_size *
86 (100 - table_opt.block_size_deviation)) +
87 99) /
88 100);
89 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
90 return new PartitionedFilterBlockBuilder(
91 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
92 filter_bits_builder, table_opt.index_block_restart_interval,
93 use_delta_encoding_for_index_values, p_index_builder, partition_size);
94 } else {
95 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
96 table_opt.whole_key_filtering,
97 filter_bits_builder);
98 }
99 }
100 }
101
GoodCompressionRatio(size_t compressed_size,size_t raw_size)102 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
103 // Check to see if compressed less than 12.5%
104 return compressed_size < raw_size - (raw_size / 8u);
105 }
106
107 } // namespace
108
109 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)110 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
111 CompressionType* type, uint32_t format_version,
112 bool do_sample, std::string* compressed_output,
113 std::string* sampled_output_fast,
114 std::string* sampled_output_slow) {
115 assert(type);
116 assert(compressed_output);
117 assert(compressed_output->empty());
118
119 // If requested, we sample one in every N block with a
120 // fast and slow compression algorithm and report the stats.
121 // The users can use these stats to decide if it is worthwhile
122 // enabling compression and they also get a hint about which
123 // compression algorithm wil be beneficial.
124 if (do_sample && info.SampleForCompression() &&
125 Random::GetTLSInstance()->OneIn(
126 static_cast<int>(info.SampleForCompression()))) {
127 // Sampling with a fast compression algorithm
128 if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
129 CompressionType c =
130 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
131 CompressionContext context(c);
132 CompressionOptions options;
133 CompressionInfo info_tmp(options, context,
134 CompressionDict::GetEmptyDict(), c,
135 info.SampleForCompression());
136
137 CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
138 sampled_output_fast);
139 }
140
141 // Sampling with a slow but high-compression algorithm
142 if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
143 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
144 CompressionContext context(c);
145 CompressionOptions options;
146 CompressionInfo info_tmp(options, context,
147 CompressionDict::GetEmptyDict(), c,
148 info.SampleForCompression());
149
150 CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
151 sampled_output_slow);
152 }
153 }
154
155 if (info.type() == kNoCompression) {
156 *type = kNoCompression;
157 return raw;
158 }
159
160 // Actually compress the data; if the compression method is not supported,
161 // or the compression fails etc., just fall back to uncompressed
162 if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
163 compressed_output)) {
164 *type = kNoCompression;
165 return raw;
166 }
167
168 // Check the compression ratio; if it's not good enough, just fall back to
169 // uncompressed
170 if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
171 *type = kNoCompression;
172 return raw;
173 }
174
175 *type = info.type();
176 return *compressed_output;
177 }
178
179 // kBlockBasedTableMagicNumber was picked by running
180 // echo rocksdb.table.block_based | sha1sum
181 // and taking the leading 64 bits.
182 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
183 // .cc files
184 // for that reason we declare it extern in the header but to get the space
185 // allocated
186 // it must be not extern in one place.
187 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
188 // We also support reading and writing legacy block based table format (for
189 // backwards compatibility)
190 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
191
192 // A collector that collects properties of interest to block-based table.
193 // For now this class looks heavy-weight since we only write one additional
194 // property.
195 // But in the foreseeable future, we will add more and more properties that are
196 // specific to block-based table.
197 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
198 : public IntTblPropCollector {
199 public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)200 explicit BlockBasedTablePropertiesCollector(
201 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
202 bool prefix_filtering)
203 : index_type_(index_type),
204 whole_key_filtering_(whole_key_filtering),
205 prefix_filtering_(prefix_filtering) {}
206
InternalAdd(const Slice &,const Slice &,uint64_t)207 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
208 uint64_t /*file_size*/) override {
209 // Intentionally left blank. Have no interest in collecting stats for
210 // individual key/value pairs.
211 return Status::OK();
212 }
213
BlockAdd(uint64_t,uint64_t,uint64_t)214 virtual void BlockAdd(uint64_t /* block_raw_bytes */,
215 uint64_t /* block_compressed_bytes_fast */,
216 uint64_t /* block_compressed_bytes_slow */) override {
217 // Intentionally left blank. No interest in collecting stats for
218 // blocks.
219 return;
220 }
221
Finish(UserCollectedProperties * properties)222 Status Finish(UserCollectedProperties* properties) override {
223 std::string val;
224 PutFixed32(&val, static_cast<uint32_t>(index_type_));
225 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
226 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
227 whole_key_filtering_ ? kPropTrue : kPropFalse});
228 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
229 prefix_filtering_ ? kPropTrue : kPropFalse});
230 return Status::OK();
231 }
232
233 // The name of the properties collector can be used for debugging purpose.
Name() const234 const char* Name() const override {
235 return "BlockBasedTablePropertiesCollector";
236 }
237
GetReadableProperties() const238 UserCollectedProperties GetReadableProperties() const override {
239 // Intentionally left blank.
240 return UserCollectedProperties();
241 }
242
243 private:
244 BlockBasedTableOptions::IndexType index_type_;
245 bool whole_key_filtering_;
246 bool prefix_filtering_;
247 };
248
249 struct BlockBasedTableBuilder::Rep {
250 const ImmutableOptions ioptions;
251 const MutableCFOptions moptions;
252 const BlockBasedTableOptions table_options;
253 const InternalKeyComparator& internal_comparator;
254 WritableFileWriter* file;
255 std::atomic<uint64_t> offset;
256 size_t alignment;
257 BlockBuilder data_block;
258 // Buffers uncompressed data blocks to replay later. Needed when
259 // compression dictionary is enabled so we can finalize the dictionary before
260 // compressing any data blocks.
261 std::vector<std::string> data_block_buffers;
262 BlockBuilder range_del_block;
263
264 InternalKeySliceTransform internal_prefix_transform;
265 std::unique_ptr<IndexBuilder> index_builder;
266 PartitionedIndexBuilder* p_index_builder_ = nullptr;
267
268 std::string last_key;
269 const Slice* first_key_in_next_block = nullptr;
270 CompressionType compression_type;
271 uint64_t sample_for_compression;
272 std::atomic<uint64_t> compressible_input_data_bytes;
273 std::atomic<uint64_t> uncompressible_input_data_bytes;
274 std::atomic<uint64_t> sampled_input_data_bytes;
275 std::atomic<uint64_t> sampled_output_slow_data_bytes;
276 std::atomic<uint64_t> sampled_output_fast_data_bytes;
277 CompressionOptions compression_opts;
278 std::unique_ptr<CompressionDict> compression_dict;
279 std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
280 std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
281 std::unique_ptr<UncompressionDict> verify_dict;
282
283 size_t data_begin_offset = 0;
284
285 TableProperties props;
286
287 // States of the builder.
288 //
289 // - `kBuffered`: This is the initial state where zero or more data blocks are
290 // accumulated uncompressed in-memory. From this state, call
291 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
292 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
293 // state.
294 //
295 // - `kUnbuffered`: This is the state when compression dictionary is finalized
296 // either because it wasn't enabled in the first place or it's been created
297 // from sampling previously buffered data. In this state, blocks are simply
298 // compressed/written out as they fill up. From this state, call `Finish()`
299 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
300 // the partially created file.
301 //
302 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
303 // called, so the table builder is no longer usable. We must be in this
304 // state by the time the destructor runs.
305 enum class State {
306 kBuffered,
307 kUnbuffered,
308 kClosed,
309 };
310 State state;
311 // `kBuffered` state is allowed only as long as the buffering of uncompressed
312 // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
313 uint64_t buffer_limit;
314
315 const bool use_delta_encoding_for_index_values;
316 std::unique_ptr<FilterBlockBuilder> filter_builder;
317 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
318 size_t compressed_cache_key_prefix_size;
319
320 BlockHandle pending_handle; // Handle to add to index block
321
322 std::string compressed_output;
323 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
324 uint32_t column_family_id;
325 std::string column_family_name;
326 uint64_t creation_time = 0;
327 uint64_t oldest_key_time = 0;
328 uint64_t file_creation_time = 0;
329
330 // DB IDs
331 const std::string db_id;
332 const std::string db_session_id;
333 std::string db_host_id;
334
335 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
336
337 std::unique_ptr<ParallelCompressionRep> pc_rep;
338
get_offsetROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep339 uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
set_offsetROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep340 void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
341
IsParallelCompressionEnabledROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep342 bool IsParallelCompressionEnabled() const {
343 return compression_opts.parallel_threads > 1;
344 }
345
GetStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep346 Status GetStatus() {
347 // We need to make modifications of status visible when status_ok is set
348 // to false, and this is ensured by status_mutex, so no special memory
349 // order for status_ok is required.
350 if (status_ok.load(std::memory_order_relaxed)) {
351 return Status::OK();
352 } else {
353 return CopyStatus();
354 }
355 }
356
CopyStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep357 Status CopyStatus() {
358 std::lock_guard<std::mutex> lock(status_mutex);
359 return status;
360 }
361
GetIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep362 IOStatus GetIOStatus() {
363 // We need to make modifications of io_status visible when status_ok is set
364 // to false, and this is ensured by io_status_mutex, so no special memory
365 // order for io_status_ok is required.
366 if (io_status_ok.load(std::memory_order_relaxed)) {
367 return IOStatus::OK();
368 } else {
369 return CopyIOStatus();
370 }
371 }
372
CopyIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep373 IOStatus CopyIOStatus() {
374 std::lock_guard<std::mutex> lock(io_status_mutex);
375 return io_status;
376 }
377
378 // Never erase an existing status that is not OK.
SetStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep379 void SetStatus(Status s) {
380 if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
381 // Locking is an overkill for non compression_opts.parallel_threads
382 // case but since it's unlikely that s is not OK, we take this cost
383 // to be simplicity.
384 std::lock_guard<std::mutex> lock(status_mutex);
385 status = s;
386 status_ok.store(false, std::memory_order_relaxed);
387 }
388 }
389
390 // Never erase an existing I/O status that is not OK.
SetIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep391 void SetIOStatus(IOStatus ios) {
392 if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
393 // Locking is an overkill for non compression_opts.parallel_threads
394 // case but since it's unlikely that s is not OK, we take this cost
395 // to be simplicity.
396 std::lock_guard<std::mutex> lock(io_status_mutex);
397 io_status = ios;
398 io_status_ok.store(false, std::memory_order_relaxed);
399 }
400 }
401
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep402 Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
403 WritableFileWriter* f)
404 : ioptions(tbo.ioptions),
405 moptions(tbo.moptions),
406 table_options(table_opt),
407 internal_comparator(tbo.internal_comparator),
408 file(f),
409 offset(0),
410 alignment(table_options.block_align
411 ? std::min(table_options.block_size, kDefaultPageSize)
412 : 0),
413 data_block(table_options.block_restart_interval,
414 table_options.use_delta_encoding,
415 false /* use_value_delta_encoding */,
416 tbo.internal_comparator.user_comparator()
417 ->CanKeysWithDifferentByteContentsBeEqual()
418 ? BlockBasedTableOptions::kDataBlockBinarySearch
419 : table_options.data_block_index_type,
420 table_options.data_block_hash_table_util_ratio),
421 range_del_block(1 /* block_restart_interval */),
422 internal_prefix_transform(tbo.moptions.prefix_extractor.get()),
423 compression_type(tbo.compression_type),
424 sample_for_compression(tbo.moptions.sample_for_compression),
425 compressible_input_data_bytes(0),
426 uncompressible_input_data_bytes(0),
427 sampled_input_data_bytes(0),
428 sampled_output_slow_data_bytes(0),
429 sampled_output_fast_data_bytes(0),
430 compression_opts(tbo.compression_opts),
431 compression_dict(),
432 compression_ctxs(tbo.compression_opts.parallel_threads),
433 verify_ctxs(tbo.compression_opts.parallel_threads),
434 verify_dict(),
435 state((tbo.compression_opts.max_dict_bytes > 0) ? State::kBuffered
436 : State::kUnbuffered),
437 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
438 !table_opt.block_align),
439 compressed_cache_key_prefix_size(0),
440 flush_block_policy(
441 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
442 table_options, data_block)),
443 column_family_id(tbo.column_family_id),
444 column_family_name(tbo.column_family_name),
445 creation_time(tbo.creation_time),
446 oldest_key_time(tbo.oldest_key_time),
447 file_creation_time(tbo.file_creation_time),
448 db_id(tbo.db_id),
449 db_session_id(tbo.db_session_id),
450 db_host_id(ioptions.db_host_id),
451 status_ok(true),
452 io_status_ok(true) {
453 if (tbo.target_file_size == 0) {
454 buffer_limit = compression_opts.max_dict_buffer_bytes;
455 } else if (compression_opts.max_dict_buffer_bytes == 0) {
456 buffer_limit = tbo.target_file_size;
457 } else {
458 buffer_limit = std::min(tbo.target_file_size,
459 compression_opts.max_dict_buffer_bytes);
460 }
461 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
462 compression_ctxs[i].reset(new CompressionContext(compression_type));
463 }
464 if (table_options.index_type ==
465 BlockBasedTableOptions::kTwoLevelIndexSearch) {
466 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
467 &internal_comparator, use_delta_encoding_for_index_values,
468 table_options);
469 index_builder.reset(p_index_builder_);
470 } else {
471 index_builder.reset(IndexBuilder::CreateIndexBuilder(
472 table_options.index_type, &internal_comparator,
473 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
474 table_options));
475 }
476 if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
477 // Apply optimize_filters_for_hits setting here when applicable by
478 // skipping filter generation
479 filter_builder.reset();
480 } else if (tbo.skip_filters) {
481 // For SstFileWriter skip_filters
482 filter_builder.reset();
483 } else if (!table_options.filter_policy) {
484 // Null filter_policy -> no filter
485 filter_builder.reset();
486 } else {
487 FilterBuildingContext filter_context(table_options);
488
489 filter_context.info_log = ioptions.logger;
490 filter_context.column_family_name = tbo.column_family_name;
491 filter_context.reason = tbo.reason;
492
493 // Only populate other fields if known to be in LSM rather than
494 // generating external SST file
495 if (tbo.reason != TableFileCreationReason::kMisc) {
496 filter_context.compaction_style = ioptions.compaction_style;
497 filter_context.num_levels = ioptions.num_levels;
498 filter_context.level_at_creation = tbo.level_at_creation;
499 filter_context.is_bottommost = tbo.is_bottommost;
500 assert(filter_context.level_at_creation < filter_context.num_levels);
501 }
502
503 filter_builder.reset(CreateFilterBlockBuilder(
504 ioptions, moptions, filter_context,
505 use_delta_encoding_for_index_values, p_index_builder_));
506 }
507
508 const auto& factory_range = tbo.int_tbl_prop_collector_factories;
509 for (auto it = factory_range.first; it != factory_range.second; ++it) {
510 assert(*it);
511
512 table_properties_collectors.emplace_back(
513 (*it)->CreateIntTblPropCollector(column_family_id));
514 }
515 table_properties_collectors.emplace_back(
516 new BlockBasedTablePropertiesCollector(
517 table_options.index_type, table_options.whole_key_filtering,
518 moptions.prefix_extractor != nullptr));
519 if (table_options.verify_compression) {
520 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
521 verify_ctxs[i].reset(new UncompressionContext(compression_type));
522 }
523 }
524
525 if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) {
526 ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
527 }
528 }
529
530 Rep(const Rep&) = delete;
531 Rep& operator=(const Rep&) = delete;
532
533 private:
534 // Synchronize status & io_status accesses across threads from main thread,
535 // compression thread and write thread in parallel compression.
536 std::mutex status_mutex;
537 std::atomic<bool> status_ok;
538 Status status;
539 std::mutex io_status_mutex;
540 std::atomic<bool> io_status_ok;
541 IOStatus io_status;
542 };
543
544 struct BlockBasedTableBuilder::ParallelCompressionRep {
545 // Keys is a wrapper of vector of strings avoiding
546 // releasing string memories during vector clear()
547 // in order to save memory allocation overhead
548 class Keys {
549 public:
Keys()550 Keys() : keys_(kKeysInitSize), size_(0) {}
PushBack(const Slice & key)551 void PushBack(const Slice& key) {
552 if (size_ == keys_.size()) {
553 keys_.emplace_back(key.data(), key.size());
554 } else {
555 keys_[size_].assign(key.data(), key.size());
556 }
557 size_++;
558 }
SwapAssign(std::vector<std::string> & keys)559 void SwapAssign(std::vector<std::string>& keys) {
560 size_ = keys.size();
561 std::swap(keys_, keys);
562 }
Clear()563 void Clear() { size_ = 0; }
Size()564 size_t Size() { return size_; }
Back()565 std::string& Back() { return keys_[size_ - 1]; }
operator [](size_t idx)566 std::string& operator[](size_t idx) {
567 assert(idx < size_);
568 return keys_[idx];
569 }
570
571 private:
572 const size_t kKeysInitSize = 32;
573 std::vector<std::string> keys_;
574 size_t size_;
575 };
576 std::unique_ptr<Keys> curr_block_keys;
577
578 class BlockRepSlot;
579
580 // BlockRep instances are fetched from and recycled to
581 // block_rep_pool during parallel compression.
582 struct BlockRep {
583 Slice contents;
584 Slice compressed_contents;
585 std::unique_ptr<std::string> data;
586 std::unique_ptr<std::string> compressed_data;
587 CompressionType compression_type;
588 std::unique_ptr<std::string> first_key_in_next_block;
589 std::unique_ptr<Keys> keys;
590 std::unique_ptr<BlockRepSlot> slot;
591 Status status;
592 };
593 // Use a vector of BlockRep as a buffer for a determined number
594 // of BlockRep structures. All data referenced by pointers in
595 // BlockRep will be freed when this vector is destructed.
596 typedef std::vector<BlockRep> BlockRepBuffer;
597 BlockRepBuffer block_rep_buf;
598 // Use a thread-safe queue for concurrent access from block
599 // building thread and writer thread.
600 typedef WorkQueue<BlockRep*> BlockRepPool;
601 BlockRepPool block_rep_pool;
602
603 // Use BlockRepSlot to keep block order in write thread.
604 // slot_ will pass references to BlockRep
605 class BlockRepSlot {
606 public:
BlockRepSlot()607 BlockRepSlot() : slot_(1) {}
608 template <typename T>
Fill(T && rep)609 void Fill(T&& rep) {
610 slot_.push(std::forward<T>(rep));
611 };
Take(BlockRep * & rep)612 void Take(BlockRep*& rep) { slot_.pop(rep); }
613
614 private:
615 // slot_ will pass references to BlockRep in block_rep_buf,
616 // and those references are always valid before the destruction of
617 // block_rep_buf.
618 WorkQueue<BlockRep*> slot_;
619 };
620
621 // Compression queue will pass references to BlockRep in block_rep_buf,
622 // and those references are always valid before the destruction of
623 // block_rep_buf.
624 typedef WorkQueue<BlockRep*> CompressQueue;
625 CompressQueue compress_queue;
626 std::vector<port::Thread> compress_thread_pool;
627
628 // Write queue will pass references to BlockRep::slot in block_rep_buf,
629 // and those references are always valid before the corresponding
630 // BlockRep::slot is destructed, which is before the destruction of
631 // block_rep_buf.
632 typedef WorkQueue<BlockRepSlot*> WriteQueue;
633 WriteQueue write_queue;
634 std::unique_ptr<port::Thread> write_thread;
635
636 // Estimate output file size when parallel compression is enabled. This is
637 // necessary because compression & flush are no longer synchronized,
638 // and BlockBasedTableBuilder::FileSize() is no longer accurate.
639 // memory_order_relaxed suffices because accurate statistics is not required.
640 class FileSizeEstimator {
641 public:
FileSizeEstimator()642 explicit FileSizeEstimator()
643 : raw_bytes_compressed(0),
644 raw_bytes_curr_block(0),
645 raw_bytes_curr_block_set(false),
646 raw_bytes_inflight(0),
647 blocks_inflight(0),
648 curr_compression_ratio(0),
649 estimated_file_size(0) {}
650
651 // Estimate file size when a block is about to be emitted to
652 // compression thread
EmitBlock(uint64_t raw_block_size,uint64_t curr_file_size)653 void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
654 uint64_t new_raw_bytes_inflight =
655 raw_bytes_inflight.fetch_add(raw_block_size,
656 std::memory_order_relaxed) +
657 raw_block_size;
658
659 uint64_t new_blocks_inflight =
660 blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
661
662 estimated_file_size.store(
663 curr_file_size +
664 static_cast<uint64_t>(
665 static_cast<double>(new_raw_bytes_inflight) *
666 curr_compression_ratio.load(std::memory_order_relaxed)) +
667 new_blocks_inflight * kBlockTrailerSize,
668 std::memory_order_relaxed);
669 }
670
671 // Estimate file size when a block is already reaped from
672 // compression thread
ReapBlock(uint64_t compressed_block_size,uint64_t curr_file_size)673 void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
674 assert(raw_bytes_curr_block_set);
675
676 uint64_t new_raw_bytes_compressed =
677 raw_bytes_compressed + raw_bytes_curr_block;
678 assert(new_raw_bytes_compressed > 0);
679
680 curr_compression_ratio.store(
681 (curr_compression_ratio.load(std::memory_order_relaxed) *
682 raw_bytes_compressed +
683 compressed_block_size) /
684 static_cast<double>(new_raw_bytes_compressed),
685 std::memory_order_relaxed);
686 raw_bytes_compressed = new_raw_bytes_compressed;
687
688 uint64_t new_raw_bytes_inflight =
689 raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
690 std::memory_order_relaxed) -
691 raw_bytes_curr_block;
692
693 uint64_t new_blocks_inflight =
694 blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
695
696 estimated_file_size.store(
697 curr_file_size +
698 static_cast<uint64_t>(
699 static_cast<double>(new_raw_bytes_inflight) *
700 curr_compression_ratio.load(std::memory_order_relaxed)) +
701 new_blocks_inflight * kBlockTrailerSize,
702 std::memory_order_relaxed);
703
704 raw_bytes_curr_block_set = false;
705 }
706
SetEstimatedFileSize(uint64_t size)707 void SetEstimatedFileSize(uint64_t size) {
708 estimated_file_size.store(size, std::memory_order_relaxed);
709 }
710
GetEstimatedFileSize()711 uint64_t GetEstimatedFileSize() {
712 return estimated_file_size.load(std::memory_order_relaxed);
713 }
714
SetCurrBlockRawSize(uint64_t size)715 void SetCurrBlockRawSize(uint64_t size) {
716 raw_bytes_curr_block = size;
717 raw_bytes_curr_block_set = true;
718 }
719
720 private:
721 // Raw bytes compressed so far.
722 uint64_t raw_bytes_compressed;
723 // Size of current block being appended.
724 uint64_t raw_bytes_curr_block;
725 // Whether raw_bytes_curr_block has been set for next
726 // ReapBlock call.
727 bool raw_bytes_curr_block_set;
728 // Raw bytes under compression and not appended yet.
729 std::atomic<uint64_t> raw_bytes_inflight;
730 // Number of blocks under compression and not appended yet.
731 std::atomic<uint64_t> blocks_inflight;
732 // Current compression ratio, maintained by BGWorkWriteRawBlock.
733 std::atomic<double> curr_compression_ratio;
734 // Estimated SST file size.
735 std::atomic<uint64_t> estimated_file_size;
736 };
737 FileSizeEstimator file_size_estimator;
738
739 // Facilities used for waiting first block completion. Need to Wait for
740 // the completion of first block compression and flush to get a non-zero
741 // compression ratio.
742 std::atomic<bool> first_block_processed;
743 std::condition_variable first_block_cond;
744 std::mutex first_block_mutex;
745
ParallelCompressionRepROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep746 explicit ParallelCompressionRep(uint32_t parallel_threads)
747 : curr_block_keys(new Keys()),
748 block_rep_buf(parallel_threads),
749 block_rep_pool(parallel_threads),
750 compress_queue(parallel_threads),
751 write_queue(parallel_threads),
752 first_block_processed(false) {
753 for (uint32_t i = 0; i < parallel_threads; i++) {
754 block_rep_buf[i].contents = Slice();
755 block_rep_buf[i].compressed_contents = Slice();
756 block_rep_buf[i].data.reset(new std::string());
757 block_rep_buf[i].compressed_data.reset(new std::string());
758 block_rep_buf[i].compression_type = CompressionType();
759 block_rep_buf[i].first_key_in_next_block.reset(new std::string());
760 block_rep_buf[i].keys.reset(new Keys());
761 block_rep_buf[i].slot.reset(new BlockRepSlot());
762 block_rep_buf[i].status = Status::OK();
763 block_rep_pool.push(&block_rep_buf[i]);
764 }
765 }
766
~ParallelCompressionRepROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep767 ~ParallelCompressionRep() { block_rep_pool.finish(); }
768
769 // Make a block prepared to be emitted to compression thread
770 // Used in non-buffered mode
PrepareBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep771 BlockRep* PrepareBlock(CompressionType compression_type,
772 const Slice* first_key_in_next_block,
773 BlockBuilder* data_block) {
774 BlockRep* block_rep =
775 PrepareBlockInternal(compression_type, first_key_in_next_block);
776 assert(block_rep != nullptr);
777 data_block->SwapAndReset(*(block_rep->data));
778 block_rep->contents = *(block_rep->data);
779 std::swap(block_rep->keys, curr_block_keys);
780 curr_block_keys->Clear();
781 return block_rep;
782 }
783
784 // Used in EnterUnbuffered
PrepareBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep785 BlockRep* PrepareBlock(CompressionType compression_type,
786 const Slice* first_key_in_next_block,
787 std::string* data_block,
788 std::vector<std::string>* keys) {
789 BlockRep* block_rep =
790 PrepareBlockInternal(compression_type, first_key_in_next_block);
791 assert(block_rep != nullptr);
792 std::swap(*(block_rep->data), *data_block);
793 block_rep->contents = *(block_rep->data);
794 block_rep->keys->SwapAssign(*keys);
795 return block_rep;
796 }
797
798 // Emit a block to compression thread
EmitBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep799 void EmitBlock(BlockRep* block_rep) {
800 assert(block_rep != nullptr);
801 assert(block_rep->status.ok());
802 if (!write_queue.push(block_rep->slot.get())) {
803 return;
804 }
805 if (!compress_queue.push(block_rep)) {
806 return;
807 }
808
809 if (!first_block_processed.load(std::memory_order_relaxed)) {
810 std::unique_lock<std::mutex> lock(first_block_mutex);
811 first_block_cond.wait(lock, [this] {
812 return first_block_processed.load(std::memory_order_relaxed);
813 });
814 }
815 }
816
817 // Reap a block from compression thread
ReapBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep818 void ReapBlock(BlockRep* block_rep) {
819 assert(block_rep != nullptr);
820 block_rep->compressed_data->clear();
821 block_rep_pool.push(block_rep);
822
823 if (!first_block_processed.load(std::memory_order_relaxed)) {
824 std::lock_guard<std::mutex> lock(first_block_mutex);
825 first_block_processed.store(true, std::memory_order_relaxed);
826 first_block_cond.notify_one();
827 }
828 }
829
830 private:
PrepareBlockInternalROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep831 BlockRep* PrepareBlockInternal(CompressionType compression_type,
832 const Slice* first_key_in_next_block) {
833 BlockRep* block_rep = nullptr;
834 block_rep_pool.pop(block_rep);
835 assert(block_rep != nullptr);
836
837 assert(block_rep->data);
838
839 block_rep->compression_type = compression_type;
840
841 if (first_key_in_next_block == nullptr) {
842 block_rep->first_key_in_next_block.reset(nullptr);
843 } else {
844 block_rep->first_key_in_next_block->assign(
845 first_key_in_next_block->data(), first_key_in_next_block->size());
846 }
847
848 return block_rep;
849 }
850 };
851
BlockBasedTableBuilder(const BlockBasedTableOptions & table_options,const TableBuilderOptions & tbo,WritableFileWriter * file)852 BlockBasedTableBuilder::BlockBasedTableBuilder(
853 const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
854 WritableFileWriter* file) {
855 BlockBasedTableOptions sanitized_table_options(table_options);
856 if (sanitized_table_options.format_version == 0 &&
857 sanitized_table_options.checksum != kCRC32c) {
858 ROCKS_LOG_WARN(
859 tbo.ioptions.logger,
860 "Silently converting format_version to 1 because checksum is "
861 "non-default");
862 // silently convert format_version to 1 to keep consistent with current
863 // behavior
864 sanitized_table_options.format_version = 1;
865 }
866
867 rep_ = new Rep(sanitized_table_options, tbo, file);
868
869 if (rep_->filter_builder != nullptr) {
870 rep_->filter_builder->StartBlock(0);
871 }
872 if (table_options.block_cache_compressed.get() != nullptr) {
873 BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
874 table_options.block_cache_compressed.get(), file->writable_file(),
875 &rep_->compressed_cache_key_prefix[0],
876 &rep_->compressed_cache_key_prefix_size);
877 }
878
879 if (rep_->IsParallelCompressionEnabled()) {
880 StartParallelCompression();
881 }
882 }
883
~BlockBasedTableBuilder()884 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
885 // Catch errors where caller forgot to call Finish()
886 assert(rep_->state == Rep::State::kClosed);
887 delete rep_;
888 }
889
Add(const Slice & key,const Slice & value)890 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
891 Rep* r = rep_;
892 assert(rep_->state != Rep::State::kClosed);
893 if (!ok()) return;
894 ValueType value_type = ExtractValueType(key);
895 if (IsValueType(value_type)) {
896 #ifndef NDEBUG
897 if (r->props.num_entries > r->props.num_range_deletions) {
898 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
899 }
900 #endif // !NDEBUG
901
902 auto should_flush = r->flush_block_policy->Update(key, value);
903 if (should_flush) {
904 assert(!r->data_block.empty());
905 r->first_key_in_next_block = &key;
906 Flush();
907
908 if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
909 r->data_begin_offset > r->buffer_limit) {
910 EnterUnbuffered();
911 }
912
913 // Add item to index block.
914 // We do not emit the index entry for a block until we have seen the
915 // first key for the next data block. This allows us to use shorter
916 // keys in the index block. For example, consider a block boundary
917 // between the keys "the quick brown fox" and "the who". We can use
918 // "the r" as the key for the index block entry since it is >= all
919 // entries in the first block and < all entries in subsequent
920 // blocks.
921 if (ok() && r->state == Rep::State::kUnbuffered) {
922 if (r->IsParallelCompressionEnabled()) {
923 r->pc_rep->curr_block_keys->Clear();
924 } else {
925 r->index_builder->AddIndexEntry(&r->last_key, &key,
926 r->pending_handle);
927 }
928 }
929 }
930
931 // Note: PartitionedFilterBlockBuilder requires key being added to filter
932 // builder after being added to index builder.
933 if (r->state == Rep::State::kUnbuffered) {
934 if (r->IsParallelCompressionEnabled()) {
935 r->pc_rep->curr_block_keys->PushBack(key);
936 } else {
937 if (r->filter_builder != nullptr) {
938 size_t ts_sz =
939 r->internal_comparator.user_comparator()->timestamp_size();
940 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
941 }
942 }
943 }
944
945 r->last_key.assign(key.data(), key.size());
946 r->data_block.Add(key, value);
947 if (r->state == Rep::State::kBuffered) {
948 // Buffered keys will be replayed from data_block_buffers during
949 // `Finish()` once compression dictionary has been finalized.
950 } else {
951 if (!r->IsParallelCompressionEnabled()) {
952 r->index_builder->OnKeyAdded(key);
953 }
954 }
955 // TODO offset passed in is not accurate for parallel compression case
956 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
957 r->table_properties_collectors,
958 r->ioptions.logger);
959
960 } else if (value_type == kTypeRangeDeletion) {
961 r->range_del_block.Add(key, value);
962 // TODO offset passed in is not accurate for parallel compression case
963 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
964 r->table_properties_collectors,
965 r->ioptions.logger);
966 } else {
967 assert(false);
968 }
969
970 r->props.num_entries++;
971 r->props.raw_key_size += key.size();
972 r->props.raw_value_size += value.size();
973 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
974 r->props.num_deletions++;
975 } else if (value_type == kTypeRangeDeletion) {
976 r->props.num_deletions++;
977 r->props.num_range_deletions++;
978 } else if (value_type == kTypeMerge) {
979 r->props.num_merge_operands++;
980 }
981 }
982
Flush()983 void BlockBasedTableBuilder::Flush() {
984 Rep* r = rep_;
985 assert(rep_->state != Rep::State::kClosed);
986 if (!ok()) return;
987 if (r->data_block.empty()) return;
988 if (r->IsParallelCompressionEnabled() &&
989 r->state == Rep::State::kUnbuffered) {
990 r->data_block.Finish();
991 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
992 r->compression_type, r->first_key_in_next_block, &(r->data_block));
993 assert(block_rep != nullptr);
994 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
995 r->get_offset());
996 r->pc_rep->EmitBlock(block_rep);
997 } else {
998 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
999 }
1000 }
1001
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)1002 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
1003 BlockHandle* handle,
1004 bool is_data_block) {
1005 block->Finish();
1006 std::string raw_block_contents;
1007 block->SwapAndReset(raw_block_contents);
1008 if (rep_->state == Rep::State::kBuffered) {
1009 assert(is_data_block);
1010 rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
1011 rep_->data_begin_offset += rep_->data_block_buffers.back().size();
1012 return;
1013 }
1014 WriteBlock(raw_block_contents, handle, is_data_block);
1015 }
1016
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)1017 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
1018 BlockHandle* handle,
1019 bool is_data_block) {
1020 Rep* r = rep_;
1021 assert(r->state == Rep::State::kUnbuffered);
1022 Slice block_contents;
1023 CompressionType type;
1024 Status compress_status;
1025 CompressAndVerifyBlock(raw_block_contents, is_data_block,
1026 *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
1027 &(r->compressed_output), &(block_contents), &type,
1028 &compress_status);
1029 r->SetStatus(compress_status);
1030 if (!ok()) {
1031 return;
1032 }
1033 WriteRawBlock(block_contents, type, handle, is_data_block);
1034 r->compressed_output.clear();
1035 if (is_data_block) {
1036 if (r->filter_builder != nullptr) {
1037 r->filter_builder->StartBlock(r->get_offset());
1038 }
1039 r->props.data_size = r->get_offset();
1040 ++r->props.num_data_blocks;
1041 }
1042 }
1043
BGWorkCompression(const CompressionContext & compression_ctx,UncompressionContext * verify_ctx)1044 void BlockBasedTableBuilder::BGWorkCompression(
1045 const CompressionContext& compression_ctx,
1046 UncompressionContext* verify_ctx) {
1047 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1048 while (rep_->pc_rep->compress_queue.pop(block_rep)) {
1049 assert(block_rep != nullptr);
1050 CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
1051 compression_ctx, verify_ctx,
1052 block_rep->compressed_data.get(),
1053 &block_rep->compressed_contents,
1054 &(block_rep->compression_type), &block_rep->status);
1055 block_rep->slot->Fill(block_rep);
1056 }
1057 }
1058
CompressAndVerifyBlock(const Slice & raw_block_contents,bool is_data_block,const CompressionContext & compression_ctx,UncompressionContext * verify_ctx,std::string * compressed_output,Slice * block_contents,CompressionType * type,Status * out_status)1059 void BlockBasedTableBuilder::CompressAndVerifyBlock(
1060 const Slice& raw_block_contents, bool is_data_block,
1061 const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
1062 std::string* compressed_output, Slice* block_contents,
1063 CompressionType* type, Status* out_status) {
1064 // File format contains a sequence of blocks where each block has:
1065 // block_data: uint8[n]
1066 // type: uint8
1067 // crc: uint32
1068 Rep* r = rep_;
1069 bool is_status_ok = ok();
1070 if (!r->IsParallelCompressionEnabled()) {
1071 assert(is_status_ok);
1072 }
1073
1074 *type = r->compression_type;
1075 uint64_t sample_for_compression = r->sample_for_compression;
1076 bool abort_compression = false;
1077
1078 StopWatchNano timer(
1079 r->ioptions.clock,
1080 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
1081
1082 if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
1083 if (is_data_block) {
1084 r->compressible_input_data_bytes.fetch_add(raw_block_contents.size(),
1085 std::memory_order_relaxed);
1086 }
1087 const CompressionDict* compression_dict;
1088 if (!is_data_block || r->compression_dict == nullptr) {
1089 compression_dict = &CompressionDict::GetEmptyDict();
1090 } else {
1091 compression_dict = r->compression_dict.get();
1092 }
1093 assert(compression_dict != nullptr);
1094 CompressionInfo compression_info(r->compression_opts, compression_ctx,
1095 *compression_dict, *type,
1096 sample_for_compression);
1097
1098 std::string sampled_output_fast;
1099 std::string sampled_output_slow;
1100 *block_contents = CompressBlock(
1101 raw_block_contents, compression_info, type,
1102 r->table_options.format_version, is_data_block /* do_sample */,
1103 compressed_output, &sampled_output_fast, &sampled_output_slow);
1104
1105 if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
1106 // Currently compression sampling is only enabled for data block.
1107 assert(is_data_block);
1108 r->sampled_input_data_bytes.fetch_add(raw_block_contents.size(),
1109 std::memory_order_relaxed);
1110 r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
1111 std::memory_order_relaxed);
1112 r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
1113 std::memory_order_relaxed);
1114 }
1115 // notify collectors on block add
1116 NotifyCollectTableCollectorsOnBlockAdd(
1117 r->table_properties_collectors, raw_block_contents.size(),
1118 sampled_output_fast.size(), sampled_output_slow.size());
1119
1120 // Some of the compression algorithms are known to be unreliable. If
1121 // the verify_compression flag is set then try to de-compress the
1122 // compressed data and compare to the input.
1123 if (*type != kNoCompression && r->table_options.verify_compression) {
1124 // Retrieve the uncompressed contents into a new buffer
1125 const UncompressionDict* verify_dict;
1126 if (!is_data_block || r->verify_dict == nullptr) {
1127 verify_dict = &UncompressionDict::GetEmptyDict();
1128 } else {
1129 verify_dict = r->verify_dict.get();
1130 }
1131 assert(verify_dict != nullptr);
1132 BlockContents contents;
1133 UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
1134 r->compression_type);
1135 Status stat = UncompressBlockContentsForCompressionType(
1136 uncompression_info, block_contents->data(), block_contents->size(),
1137 &contents, r->table_options.format_version, r->ioptions);
1138
1139 if (stat.ok()) {
1140 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
1141 if (!compressed_ok) {
1142 // The result of the compression was invalid. abort.
1143 abort_compression = true;
1144 ROCKS_LOG_ERROR(r->ioptions.logger,
1145 "Decompressed block did not match raw block");
1146 *out_status =
1147 Status::Corruption("Decompressed block did not match raw block");
1148 }
1149 } else {
1150 // Decompression reported an error. abort.
1151 *out_status = Status::Corruption(std::string("Could not decompress: ") +
1152 stat.getState());
1153 abort_compression = true;
1154 }
1155 }
1156 } else {
1157 // Block is too big to be compressed.
1158 if (is_data_block) {
1159 r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(),
1160 std::memory_order_relaxed);
1161 }
1162 abort_compression = true;
1163 }
1164 if (is_data_block) {
1165 r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
1166 std::memory_order_relaxed);
1167 }
1168
1169 // Abort compression if the block is too big, or did not pass
1170 // verification.
1171 if (abort_compression) {
1172 RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1173 *type = kNoCompression;
1174 *block_contents = raw_block_contents;
1175 } else if (*type != kNoCompression) {
1176 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) {
1177 RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
1178 timer.ElapsedNanos());
1179 }
1180 RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED,
1181 raw_block_contents.size());
1182 RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
1183 } else if (*type != r->compression_type) {
1184 RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1185 }
1186 }
1187
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)1188 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
1189 CompressionType type,
1190 BlockHandle* handle,
1191 bool is_data_block) {
1192 Rep* r = rep_;
1193 Status s = Status::OK();
1194 IOStatus io_s = IOStatus::OK();
1195 StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
1196 handle->set_offset(r->get_offset());
1197 handle->set_size(block_contents.size());
1198 assert(status().ok());
1199 assert(io_status().ok());
1200 io_s = r->file->Append(block_contents);
1201 if (io_s.ok()) {
1202 char trailer[kBlockTrailerSize];
1203 trailer[0] = type;
1204 uint32_t checksum = 0;
1205 switch (r->table_options.checksum) {
1206 case kNoChecksum:
1207 break;
1208 case kCRC32c: {
1209 uint32_t crc =
1210 crc32c::Value(block_contents.data(), block_contents.size());
1211 // Extend to cover compression type
1212 crc = crc32c::Extend(crc, trailer, 1);
1213 checksum = crc32c::Mask(crc);
1214 break;
1215 }
1216 case kxxHash: {
1217 XXH32_state_t* const state = XXH32_createState();
1218 XXH32_reset(state, 0);
1219 XXH32_update(state, block_contents.data(), block_contents.size());
1220 // Extend to cover compression type
1221 XXH32_update(state, trailer, 1);
1222 checksum = XXH32_digest(state);
1223 XXH32_freeState(state);
1224 break;
1225 }
1226 case kxxHash64: {
1227 XXH64_state_t* const state = XXH64_createState();
1228 XXH64_reset(state, 0);
1229 XXH64_update(state, block_contents.data(), block_contents.size());
1230 // Extend to cover compression type
1231 XXH64_update(state, trailer, 1);
1232 checksum = Lower32of64(XXH64_digest(state));
1233 XXH64_freeState(state);
1234 break;
1235 }
1236 default:
1237 assert(false);
1238 break;
1239 }
1240 EncodeFixed32(trailer + 1, checksum);
1241 assert(io_s.ok());
1242 TEST_SYNC_POINT_CALLBACK(
1243 "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
1244 static_cast<char*>(trailer));
1245 io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
1246 if (io_s.ok()) {
1247 assert(s.ok());
1248 s = InsertBlockInCache(block_contents, type, handle);
1249 if (!s.ok()) {
1250 r->SetStatus(s);
1251 }
1252 } else {
1253 r->SetIOStatus(io_s);
1254 }
1255 if (s.ok() && io_s.ok()) {
1256 r->set_offset(r->get_offset() + block_contents.size() +
1257 kBlockTrailerSize);
1258 if (r->table_options.block_align && is_data_block) {
1259 size_t pad_bytes =
1260 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
1261 (r->alignment - 1))) &
1262 (r->alignment - 1);
1263 io_s = r->file->Pad(pad_bytes);
1264 if (io_s.ok()) {
1265 r->set_offset(r->get_offset() + pad_bytes);
1266 } else {
1267 r->SetIOStatus(io_s);
1268 }
1269 }
1270 if (r->IsParallelCompressionEnabled()) {
1271 if (is_data_block) {
1272 r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
1273 r->get_offset());
1274 } else {
1275 r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
1276 }
1277 }
1278 }
1279 } else {
1280 r->SetIOStatus(io_s);
1281 }
1282 if (!io_s.ok() && s.ok()) {
1283 r->SetStatus(io_s);
1284 }
1285 }
1286
BGWorkWriteRawBlock()1287 void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
1288 Rep* r = rep_;
1289 ParallelCompressionRep::BlockRepSlot* slot = nullptr;
1290 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1291 while (r->pc_rep->write_queue.pop(slot)) {
1292 assert(slot != nullptr);
1293 slot->Take(block_rep);
1294 assert(block_rep != nullptr);
1295 if (!block_rep->status.ok()) {
1296 r->SetStatus(block_rep->status);
1297 // Reap block so that blocked Flush() can finish
1298 // if there is one, and Flush() will notice !ok() next time.
1299 block_rep->status = Status::OK();
1300 r->pc_rep->ReapBlock(block_rep);
1301 continue;
1302 }
1303
1304 for (size_t i = 0; i < block_rep->keys->Size(); i++) {
1305 auto& key = (*block_rep->keys)[i];
1306 if (r->filter_builder != nullptr) {
1307 size_t ts_sz =
1308 r->internal_comparator.user_comparator()->timestamp_size();
1309 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1310 }
1311 r->index_builder->OnKeyAdded(key);
1312 }
1313
1314 r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
1315 WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
1316 &r->pending_handle, true /* is_data_block*/);
1317 if (!ok()) {
1318 break;
1319 }
1320
1321 if (r->filter_builder != nullptr) {
1322 r->filter_builder->StartBlock(r->get_offset());
1323 }
1324 r->props.data_size = r->get_offset();
1325 ++r->props.num_data_blocks;
1326
1327 if (block_rep->first_key_in_next_block == nullptr) {
1328 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
1329 r->pending_handle);
1330 } else {
1331 Slice first_key_in_next_block =
1332 Slice(*block_rep->first_key_in_next_block);
1333 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
1334 &first_key_in_next_block,
1335 r->pending_handle);
1336 }
1337
1338 r->pc_rep->ReapBlock(block_rep);
1339 }
1340 }
1341
StartParallelCompression()1342 void BlockBasedTableBuilder::StartParallelCompression() {
1343 rep_->pc_rep.reset(
1344 new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
1345 rep_->pc_rep->compress_thread_pool.reserve(
1346 rep_->compression_opts.parallel_threads);
1347 for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
1348 rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
1349 BGWorkCompression(*(rep_->compression_ctxs[i]),
1350 rep_->verify_ctxs[i].get());
1351 });
1352 }
1353 rep_->pc_rep->write_thread.reset(
1354 new port::Thread([this] { BGWorkWriteRawBlock(); }));
1355 }
1356
StopParallelCompression()1357 void BlockBasedTableBuilder::StopParallelCompression() {
1358 rep_->pc_rep->compress_queue.finish();
1359 for (auto& thread : rep_->pc_rep->compress_thread_pool) {
1360 thread.join();
1361 }
1362 rep_->pc_rep->write_queue.finish();
1363 rep_->pc_rep->write_thread->join();
1364 }
1365
status() const1366 Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
1367
io_status() const1368 IOStatus BlockBasedTableBuilder::io_status() const {
1369 return rep_->GetIOStatus();
1370 }
1371
DeleteCachedBlockContents(const Slice &,void * value)1372 static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
1373 BlockContents* bc = reinterpret_cast<BlockContents*>(value);
1374 delete bc;
1375 }
1376
1377 //
1378 // Make a copy of the block contents and insert into compressed block cache
1379 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)1380 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
1381 const CompressionType type,
1382 const BlockHandle* handle) {
1383 Rep* r = rep_;
1384 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
1385
1386 if (type != kNoCompression && block_cache_compressed != nullptr) {
1387 size_t size = block_contents.size();
1388
1389 auto ubuf =
1390 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
1391 memcpy(ubuf.get(), block_contents.data(), size);
1392 ubuf[size] = type;
1393
1394 BlockContents* block_contents_to_cache =
1395 new BlockContents(std::move(ubuf), size);
1396 #ifndef NDEBUG
1397 block_contents_to_cache->is_raw_block = true;
1398 #endif // NDEBUG
1399
1400 // make cache key by appending the file offset to the cache prefix id
1401 char* end = EncodeVarint64(
1402 r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
1403 handle->offset());
1404 Slice key(r->compressed_cache_key_prefix,
1405 static_cast<size_t>(end - r->compressed_cache_key_prefix));
1406
1407 // Insert into compressed block cache.
1408 // How should we deal with compressed cache full?
1409 block_cache_compressed
1410 ->Insert(key, block_contents_to_cache,
1411 block_contents_to_cache->ApproximateMemoryUsage(),
1412 &DeleteCachedBlockContents)
1413 .PermitUncheckedError();
1414
1415 // Invalidate OS cache.
1416 r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
1417 .PermitUncheckedError();
1418 }
1419 return Status::OK();
1420 }
1421
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)1422 void BlockBasedTableBuilder::WriteFilterBlock(
1423 MetaIndexBuilder* meta_index_builder) {
1424 BlockHandle filter_block_handle;
1425 bool empty_filter_block =
1426 (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty());
1427 if (ok() && !empty_filter_block) {
1428 rep_->props.num_filter_entries +=
1429 rep_->filter_builder->EstimateEntriesAdded();
1430 Status s = Status::Incomplete();
1431 while (ok() && s.IsIncomplete()) {
1432 Slice filter_content =
1433 rep_->filter_builder->Finish(filter_block_handle, &s);
1434 assert(s.ok() || s.IsIncomplete());
1435 rep_->props.filter_size += filter_content.size();
1436 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
1437 }
1438 }
1439 if (ok() && !empty_filter_block) {
1440 // Add mapping from "<filter_block_prefix>.Name" to location
1441 // of filter data.
1442 std::string key;
1443 if (rep_->filter_builder->IsBlockBased()) {
1444 key = BlockBasedTable::kFilterBlockPrefix;
1445 } else {
1446 key = rep_->table_options.partition_filters
1447 ? BlockBasedTable::kPartitionedFilterBlockPrefix
1448 : BlockBasedTable::kFullFilterBlockPrefix;
1449 }
1450 key.append(rep_->table_options.filter_policy->Name());
1451 meta_index_builder->Add(key, filter_block_handle);
1452 }
1453 }
1454
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)1455 void BlockBasedTableBuilder::WriteIndexBlock(
1456 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
1457 IndexBuilder::IndexBlocks index_blocks;
1458 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
1459 if (index_builder_status.IsIncomplete()) {
1460 // We we have more than one index partition then meta_blocks are not
1461 // supported for the index. Currently meta_blocks are used only by
1462 // HashIndexBuilder which is not multi-partition.
1463 assert(index_blocks.meta_blocks.empty());
1464 } else if (ok() && !index_builder_status.ok()) {
1465 rep_->SetStatus(index_builder_status);
1466 }
1467 if (ok()) {
1468 for (const auto& item : index_blocks.meta_blocks) {
1469 BlockHandle block_handle;
1470 WriteBlock(item.second, &block_handle, false /* is_data_block */);
1471 if (!ok()) {
1472 break;
1473 }
1474 meta_index_builder->Add(item.first, block_handle);
1475 }
1476 }
1477 if (ok()) {
1478 if (rep_->table_options.enable_index_compression) {
1479 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
1480 } else {
1481 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1482 index_block_handle);
1483 }
1484 }
1485 // If there are more index partitions, finish them and write them out
1486 if (index_builder_status.IsIncomplete()) {
1487 Status s = Status::Incomplete();
1488 while (ok() && s.IsIncomplete()) {
1489 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
1490 if (!s.ok() && !s.IsIncomplete()) {
1491 rep_->SetStatus(s);
1492 return;
1493 }
1494 if (rep_->table_options.enable_index_compression) {
1495 WriteBlock(index_blocks.index_block_contents, index_block_handle,
1496 false);
1497 } else {
1498 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1499 index_block_handle);
1500 }
1501 // The last index_block_handle will be for the partition index block
1502 }
1503 }
1504 }
1505
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)1506 void BlockBasedTableBuilder::WritePropertiesBlock(
1507 MetaIndexBuilder* meta_index_builder) {
1508 BlockHandle properties_block_handle;
1509 if (ok()) {
1510 PropertyBlockBuilder property_block_builder;
1511 rep_->props.column_family_id = rep_->column_family_id;
1512 rep_->props.column_family_name = rep_->column_family_name;
1513 rep_->props.filter_policy_name =
1514 rep_->table_options.filter_policy != nullptr
1515 ? rep_->table_options.filter_policy->Name()
1516 : "";
1517 rep_->props.index_size =
1518 rep_->index_builder->IndexSize() + kBlockTrailerSize;
1519 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
1520 ? rep_->ioptions.user_comparator->Name()
1521 : "nullptr";
1522 rep_->props.merge_operator_name =
1523 rep_->ioptions.merge_operator != nullptr
1524 ? rep_->ioptions.merge_operator->Name()
1525 : "nullptr";
1526 rep_->props.compression_name =
1527 CompressionTypeToString(rep_->compression_type);
1528 rep_->props.compression_options =
1529 CompressionOptionsToString(rep_->compression_opts);
1530 rep_->props.prefix_extractor_name =
1531 rep_->moptions.prefix_extractor != nullptr
1532 ? rep_->moptions.prefix_extractor->Name()
1533 : "nullptr";
1534
1535 std::string property_collectors_names = "[";
1536 for (size_t i = 0;
1537 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
1538 if (i != 0) {
1539 property_collectors_names += ",";
1540 }
1541 property_collectors_names +=
1542 rep_->ioptions.table_properties_collector_factories[i]->Name();
1543 }
1544 property_collectors_names += "]";
1545 rep_->props.property_collectors_names = property_collectors_names;
1546 if (rep_->table_options.index_type ==
1547 BlockBasedTableOptions::kTwoLevelIndexSearch) {
1548 assert(rep_->p_index_builder_ != nullptr);
1549 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
1550 rep_->props.top_level_index_size =
1551 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
1552 }
1553 rep_->props.index_key_is_user_key =
1554 !rep_->index_builder->seperator_is_key_plus_seq();
1555 rep_->props.index_value_is_delta_encoded =
1556 rep_->use_delta_encoding_for_index_values;
1557 rep_->props.creation_time = rep_->creation_time;
1558 rep_->props.oldest_key_time = rep_->oldest_key_time;
1559 rep_->props.file_creation_time = rep_->file_creation_time;
1560 if (rep_->sampled_input_data_bytes > 0) {
1561 rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
1562 static_cast<double>(rep_->sampled_output_slow_data_bytes) /
1563 rep_->sampled_input_data_bytes *
1564 rep_->compressible_input_data_bytes +
1565 rep_->uncompressible_input_data_bytes + 0.5);
1566 rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
1567 static_cast<double>(rep_->sampled_output_fast_data_bytes) /
1568 rep_->sampled_input_data_bytes *
1569 rep_->compressible_input_data_bytes +
1570 rep_->uncompressible_input_data_bytes + 0.5);
1571 } else if (rep_->sample_for_compression > 0) {
1572 // We tried to sample but none were found. Assume worst-case (compression
1573 // ratio 1.0) so data is complete and aggregatable.
1574 rep_->props.slow_compression_estimated_data_size =
1575 rep_->compressible_input_data_bytes +
1576 rep_->uncompressible_input_data_bytes;
1577 rep_->props.fast_compression_estimated_data_size =
1578 rep_->compressible_input_data_bytes +
1579 rep_->uncompressible_input_data_bytes;
1580 }
1581 rep_->props.db_id = rep_->db_id;
1582 rep_->props.db_session_id = rep_->db_session_id;
1583 rep_->props.db_host_id = rep_->db_host_id;
1584
1585 // Add basic properties
1586 property_block_builder.AddTableProperty(rep_->props);
1587
1588 // Add use collected properties
1589 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
1590 rep_->ioptions.logger,
1591 &property_block_builder);
1592
1593 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
1594 &properties_block_handle);
1595 }
1596 if (ok()) {
1597 #ifndef NDEBUG
1598 {
1599 uint64_t props_block_offset = properties_block_handle.offset();
1600 uint64_t props_block_size = properties_block_handle.size();
1601 TEST_SYNC_POINT_CALLBACK(
1602 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
1603 &props_block_offset);
1604 TEST_SYNC_POINT_CALLBACK(
1605 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
1606 &props_block_size);
1607 }
1608 #endif // !NDEBUG
1609 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
1610 }
1611 }
1612
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1613 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1614 MetaIndexBuilder* meta_index_builder) {
1615 if (rep_->compression_dict != nullptr &&
1616 rep_->compression_dict->GetRawDict().size()) {
1617 BlockHandle compression_dict_block_handle;
1618 if (ok()) {
1619 WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1620 &compression_dict_block_handle);
1621 #ifndef NDEBUG
1622 Slice compression_dict = rep_->compression_dict->GetRawDict();
1623 TEST_SYNC_POINT_CALLBACK(
1624 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1625 &compression_dict);
1626 #endif // NDEBUG
1627 }
1628 if (ok()) {
1629 meta_index_builder->Add(kCompressionDictBlock,
1630 compression_dict_block_handle);
1631 }
1632 }
1633 }
1634
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1635 void BlockBasedTableBuilder::WriteRangeDelBlock(
1636 MetaIndexBuilder* meta_index_builder) {
1637 if (ok() && !rep_->range_del_block.empty()) {
1638 BlockHandle range_del_block_handle;
1639 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1640 &range_del_block_handle);
1641 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1642 }
1643 }
1644
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1645 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1646 BlockHandle& index_block_handle) {
1647 Rep* r = rep_;
1648 // No need to write out new footer if we're using default checksum.
1649 // We're writing legacy magic number because we want old versions of RocksDB
1650 // be able to read files generated with new release (just in case if
1651 // somebody wants to roll back after an upgrade)
1652 // TODO(icanadi) at some point in the future, when we're absolutely sure
1653 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1654 // number and always write new table files with new magic number
1655 bool legacy = (r->table_options.format_version == 0);
1656 // this is guaranteed by BlockBasedTableBuilder's constructor
1657 assert(r->table_options.checksum == kCRC32c ||
1658 r->table_options.format_version != 0);
1659 Footer footer(
1660 legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1661 r->table_options.format_version);
1662 footer.set_metaindex_handle(metaindex_block_handle);
1663 footer.set_index_handle(index_block_handle);
1664 footer.set_checksum(r->table_options.checksum);
1665 std::string footer_encoding;
1666 footer.EncodeTo(&footer_encoding);
1667 assert(ok());
1668 IOStatus ios = r->file->Append(footer_encoding);
1669 if (ios.ok()) {
1670 r->set_offset(r->get_offset() + footer_encoding.size());
1671 } else {
1672 r->SetIOStatus(ios);
1673 r->SetStatus(ios);
1674 }
1675 }
1676
EnterUnbuffered()1677 void BlockBasedTableBuilder::EnterUnbuffered() {
1678 Rep* r = rep_;
1679 assert(r->state == Rep::State::kBuffered);
1680 r->state = Rep::State::kUnbuffered;
1681 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1682 ? r->compression_opts.zstd_max_train_bytes
1683 : r->compression_opts.max_dict_bytes;
1684 const size_t kNumBlocksBuffered = r->data_block_buffers.size();
1685 if (kNumBlocksBuffered == 0) {
1686 // The below code is neither safe nor necessary for handling zero data
1687 // blocks.
1688 return;
1689 }
1690
1691 // Abstract algebra teaches us that a finite cyclic group (such as the
1692 // additive group of integers modulo N) can be generated by a number that is
1693 // coprime with N. Since N is variable (number of buffered data blocks), we
1694 // must then pick a prime number in order to guarantee coprimeness with any N.
1695 //
1696 // One downside of this approach is the spread will be poor when
1697 // `kPrimeGeneratorRemainder` is close to zero or close to
1698 // `kNumBlocksBuffered`.
1699 //
1700 // Picked a random number between one and one trillion and then chose the
1701 // next prime number greater than or equal to it.
1702 const uint64_t kPrimeGenerator = 545055921143ull;
1703 // Can avoid repeated division by just adding the remainder repeatedly.
1704 const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
1705 kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
1706 const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
1707
1708 std::string compression_dict_samples;
1709 std::vector<size_t> compression_dict_sample_lens;
1710 size_t buffer_idx = kInitSampleIdx;
1711 for (size_t i = 0;
1712 i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes;
1713 ++i) {
1714 size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(),
1715 r->data_block_buffers[buffer_idx].size());
1716 compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0,
1717 copy_len);
1718 compression_dict_sample_lens.emplace_back(copy_len);
1719
1720 buffer_idx += kPrimeGeneratorRemainder;
1721 if (buffer_idx >= kNumBlocksBuffered) {
1722 buffer_idx -= kNumBlocksBuffered;
1723 }
1724 }
1725
1726 // final data block flushed, now we can generate dictionary from the samples.
1727 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1728 std::string dict;
1729 if (r->compression_opts.zstd_max_train_bytes > 0) {
1730 dict = ZSTD_TrainDictionary(compression_dict_samples,
1731 compression_dict_sample_lens,
1732 r->compression_opts.max_dict_bytes);
1733 } else {
1734 dict = std::move(compression_dict_samples);
1735 }
1736 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1737 r->compression_opts.level));
1738 r->verify_dict.reset(new UncompressionDict(
1739 dict, r->compression_type == kZSTD ||
1740 r->compression_type == kZSTDNotFinalCompression));
1741
1742 auto get_iterator_for_block = [&r](size_t i) {
1743 auto& data_block = r->data_block_buffers[i];
1744 assert(!data_block.empty());
1745
1746 Block reader{BlockContents{data_block}};
1747 DataBlockIter* iter = reader.NewDataIterator(
1748 r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber);
1749
1750 iter->SeekToFirst();
1751 assert(iter->Valid());
1752 return std::unique_ptr<DataBlockIter>(iter);
1753 };
1754
1755 std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
1756
1757 for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
1758 if (iter == nullptr) {
1759 iter = get_iterator_for_block(i);
1760 assert(iter != nullptr);
1761 };
1762
1763 if (i + 1 < r->data_block_buffers.size()) {
1764 next_block_iter = get_iterator_for_block(i + 1);
1765 }
1766
1767 auto& data_block = r->data_block_buffers[i];
1768
1769 if (r->IsParallelCompressionEnabled()) {
1770 Slice first_key_in_next_block;
1771 const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1772 if (i + 1 < r->data_block_buffers.size()) {
1773 assert(next_block_iter != nullptr);
1774 first_key_in_next_block = next_block_iter->key();
1775 } else {
1776 first_key_in_next_block_ptr = r->first_key_in_next_block;
1777 }
1778
1779 std::vector<std::string> keys;
1780 for (; iter->Valid(); iter->Next()) {
1781 keys.emplace_back(iter->key().ToString());
1782 }
1783
1784 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
1785 r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
1786
1787 assert(block_rep != nullptr);
1788 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
1789 r->get_offset());
1790 r->pc_rep->EmitBlock(block_rep);
1791 } else {
1792 for (; iter->Valid(); iter->Next()) {
1793 Slice key = iter->key();
1794 if (r->filter_builder != nullptr) {
1795 size_t ts_sz =
1796 r->internal_comparator.user_comparator()->timestamp_size();
1797 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1798 }
1799 r->index_builder->OnKeyAdded(key);
1800 }
1801 WriteBlock(Slice(data_block), &r->pending_handle,
1802 true /* is_data_block */);
1803 if (ok() && i + 1 < r->data_block_buffers.size()) {
1804 assert(next_block_iter != nullptr);
1805 Slice first_key_in_next_block = next_block_iter->key();
1806
1807 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1808
1809 iter->SeekToLast();
1810 std::string last_key = iter->key().ToString();
1811 r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
1812 r->pending_handle);
1813 }
1814 }
1815
1816 std::swap(iter, next_block_iter);
1817 }
1818 r->data_block_buffers.clear();
1819 }
1820
Finish()1821 Status BlockBasedTableBuilder::Finish() {
1822 Rep* r = rep_;
1823 assert(r->state != Rep::State::kClosed);
1824 bool empty_data_block = r->data_block.empty();
1825 r->first_key_in_next_block = nullptr;
1826 Flush();
1827 if (r->state == Rep::State::kBuffered) {
1828 EnterUnbuffered();
1829 }
1830 if (r->IsParallelCompressionEnabled()) {
1831 StopParallelCompression();
1832 #ifndef NDEBUG
1833 for (const auto& br : r->pc_rep->block_rep_buf) {
1834 assert(br.status.ok());
1835 }
1836 #endif // !NDEBUG
1837 } else {
1838 // To make sure properties block is able to keep the accurate size of index
1839 // block, we will finish writing all index entries first.
1840 if (ok() && !empty_data_block) {
1841 r->index_builder->AddIndexEntry(
1842 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1843 }
1844 }
1845
1846 // Write meta blocks, metaindex block and footer in the following order.
1847 // 1. [meta block: filter]
1848 // 2. [meta block: index]
1849 // 3. [meta block: compression dictionary]
1850 // 4. [meta block: range deletion tombstone]
1851 // 5. [meta block: properties]
1852 // 6. [metaindex block]
1853 // 7. Footer
1854 BlockHandle metaindex_block_handle, index_block_handle;
1855 MetaIndexBuilder meta_index_builder;
1856 WriteFilterBlock(&meta_index_builder);
1857 WriteIndexBlock(&meta_index_builder, &index_block_handle);
1858 WriteCompressionDictBlock(&meta_index_builder);
1859 WriteRangeDelBlock(&meta_index_builder);
1860 WritePropertiesBlock(&meta_index_builder);
1861 if (ok()) {
1862 // flush the meta index block
1863 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1864 &metaindex_block_handle);
1865 }
1866 if (ok()) {
1867 WriteFooter(metaindex_block_handle, index_block_handle);
1868 }
1869 r->state = Rep::State::kClosed;
1870 r->SetStatus(r->CopyIOStatus());
1871 Status ret_status = r->CopyStatus();
1872 assert(!ret_status.ok() || io_status().ok());
1873 return ret_status;
1874 }
1875
Abandon()1876 void BlockBasedTableBuilder::Abandon() {
1877 assert(rep_->state != Rep::State::kClosed);
1878 if (rep_->IsParallelCompressionEnabled()) {
1879 StopParallelCompression();
1880 }
1881 rep_->state = Rep::State::kClosed;
1882 rep_->CopyStatus().PermitUncheckedError();
1883 rep_->CopyIOStatus().PermitUncheckedError();
1884 }
1885
NumEntries() const1886 uint64_t BlockBasedTableBuilder::NumEntries() const {
1887 return rep_->props.num_entries;
1888 }
1889
IsEmpty() const1890 bool BlockBasedTableBuilder::IsEmpty() const {
1891 return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
1892 }
1893
FileSize() const1894 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1895
EstimatedFileSize() const1896 uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
1897 if (rep_->IsParallelCompressionEnabled()) {
1898 // Use compression ratio so far and inflight raw bytes to estimate
1899 // final SST size.
1900 return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
1901 } else {
1902 return FileSize();
1903 }
1904 }
1905
NeedCompact() const1906 bool BlockBasedTableBuilder::NeedCompact() const {
1907 for (const auto& collector : rep_->table_properties_collectors) {
1908 if (collector->NeedCompact()) {
1909 return true;
1910 }
1911 }
1912 return false;
1913 }
1914
GetTableProperties() const1915 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1916 TableProperties ret = rep_->props;
1917 for (const auto& collector : rep_->table_properties_collectors) {
1918 for (const auto& prop : collector->GetReadableProperties()) {
1919 ret.readable_properties.insert(prop);
1920 }
1921 collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
1922 }
1923 return ret;
1924 }
1925
GetFileChecksum() const1926 std::string BlockBasedTableBuilder::GetFileChecksum() const {
1927 if (rep_->file != nullptr) {
1928 return rep_->file->GetFileChecksum();
1929 } else {
1930 return kUnknownFileChecksum;
1931 }
1932 }
1933
GetFileChecksumFuncName() const1934 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1935 if (rep_->file != nullptr) {
1936 return rep_->file->GetFileChecksumFuncName();
1937 } else {
1938 return kUnknownFileChecksumFuncName;
1939 }
1940 }
1941
1942 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1943 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1944 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1945 "partitionedfilter.";
1946 } // namespace ROCKSDB_NAMESPACE
1947