1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/block_based/block_based_table_builder.h"
11 
12 #include <assert.h>
13 #include <stdio.h>
14 
15 #include <atomic>
16 #include <list>
17 #include <map>
18 #include <memory>
19 #include <numeric>
20 #include <string>
21 #include <unordered_map>
22 #include <utility>
23 
24 #include "db/dbformat.h"
25 #include "index_builder.h"
26 #include "memory/memory_allocator.h"
27 #include "rocksdb/cache.h"
28 #include "rocksdb/comparator.h"
29 #include "rocksdb/env.h"
30 #include "rocksdb/filter_policy.h"
31 #include "rocksdb/flush_block_policy.h"
32 #include "rocksdb/merge_operator.h"
33 #include "rocksdb/table.h"
34 #include "table/block_based/block.h"
35 #include "table/block_based/block_based_filter_block.h"
36 #include "table/block_based/block_based_table_factory.h"
37 #include "table/block_based/block_based_table_reader.h"
38 #include "table/block_based/block_builder.h"
39 #include "table/block_based/filter_block.h"
40 #include "table/block_based/filter_policy_internal.h"
41 #include "table/block_based/full_filter_block.h"
42 #include "table/block_based/partitioned_filter_block.h"
43 #include "table/format.h"
44 #include "table/table_builder.h"
45 #include "util/coding.h"
46 #include "util/compression.h"
47 #include "util/crc32c.h"
48 #include "util/stop_watch.h"
49 #include "util/string_util.h"
50 #include "util/work_queue.h"
51 #include "util/xxhash.h"
52 
53 namespace ROCKSDB_NAMESPACE {
54 
55 extern const std::string kHashIndexPrefixesBlock;
56 extern const std::string kHashIndexPrefixesMetadataBlock;
57 
58 
59 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60 namespace {
61 
62 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)63 FilterBlockBuilder* CreateFilterBlockBuilder(
64     const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
65     const FilterBuildingContext& context,
66     const bool use_delta_encoding_for_index_values,
67     PartitionedIndexBuilder* const p_index_builder) {
68   const BlockBasedTableOptions& table_opt = context.table_options;
69   assert(table_opt.filter_policy);  // precondition
70 
71   FilterBitsBuilder* filter_bits_builder =
72       BloomFilterPolicy::GetBuilderFromContext(context);
73   if (filter_bits_builder == nullptr) {
74     return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75                                             table_opt);
76   } else {
77     if (table_opt.partition_filters) {
78       assert(p_index_builder != nullptr);
79       // Since after partition cut request from filter builder it takes time
80       // until index builder actully cuts the partition, until the end of a
81       // data block potentially with many keys, we take the lower bound as
82       // partition size.
83       assert(table_opt.block_size_deviation <= 100);
84       auto partition_size =
85           static_cast<uint32_t>(((table_opt.metadata_block_size *
86                                   (100 - table_opt.block_size_deviation)) +
87                                  99) /
88                                 100);
89       partition_size = std::max(partition_size, static_cast<uint32_t>(1));
90       return new PartitionedFilterBlockBuilder(
91           mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
92           filter_bits_builder, table_opt.index_block_restart_interval,
93           use_delta_encoding_for_index_values, p_index_builder, partition_size);
94     } else {
95       return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
96                                         table_opt.whole_key_filtering,
97                                         filter_bits_builder);
98     }
99   }
100 }
101 
GoodCompressionRatio(size_t compressed_size,size_t raw_size)102 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
103   // Check to see if compressed less than 12.5%
104   return compressed_size < raw_size - (raw_size / 8u);
105 }
106 
107 }  // namespace
108 
109 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)110 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
111                     CompressionType* type, uint32_t format_version,
112                     bool do_sample, std::string* compressed_output,
113                     std::string* sampled_output_fast,
114                     std::string* sampled_output_slow) {
115   assert(type);
116   assert(compressed_output);
117   assert(compressed_output->empty());
118 
119   // If requested, we sample one in every N block with a
120   // fast and slow compression algorithm and report the stats.
121   // The users can use these stats to decide if it is worthwhile
122   // enabling compression and they also get a hint about which
123   // compression algorithm wil be beneficial.
124   if (do_sample && info.SampleForCompression() &&
125       Random::GetTLSInstance()->OneIn(
126           static_cast<int>(info.SampleForCompression()))) {
127     // Sampling with a fast compression algorithm
128     if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
129       CompressionType c =
130           LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
131       CompressionContext context(c);
132       CompressionOptions options;
133       CompressionInfo info_tmp(options, context,
134                                CompressionDict::GetEmptyDict(), c,
135                                info.SampleForCompression());
136 
137       CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
138                    sampled_output_fast);
139     }
140 
141     // Sampling with a slow but high-compression algorithm
142     if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
143       CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
144       CompressionContext context(c);
145       CompressionOptions options;
146       CompressionInfo info_tmp(options, context,
147                                CompressionDict::GetEmptyDict(), c,
148                                info.SampleForCompression());
149 
150       CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
151                    sampled_output_slow);
152     }
153   }
154 
155   if (info.type() == kNoCompression) {
156     *type = kNoCompression;
157     return raw;
158   }
159 
160   // Actually compress the data; if the compression method is not supported,
161   // or the compression fails etc., just fall back to uncompressed
162   if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
163                     compressed_output)) {
164     *type = kNoCompression;
165     return raw;
166   }
167 
168   // Check the compression ratio; if it's not good enough, just fall back to
169   // uncompressed
170   if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
171     *type = kNoCompression;
172     return raw;
173   }
174 
175   *type = info.type();
176   return *compressed_output;
177 }
178 
179 // kBlockBasedTableMagicNumber was picked by running
180 //    echo rocksdb.table.block_based | sha1sum
181 // and taking the leading 64 bits.
182 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
183 // .cc files
184 // for that reason we declare it extern in the header but to get the space
185 // allocated
186 // it must be not extern in one place.
187 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
188 // We also support reading and writing legacy block based table format (for
189 // backwards compatibility)
190 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
191 
192 // A collector that collects properties of interest to block-based table.
193 // For now this class looks heavy-weight since we only write one additional
194 // property.
195 // But in the foreseeable future, we will add more and more properties that are
196 // specific to block-based table.
197 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
198     : public IntTblPropCollector {
199  public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)200   explicit BlockBasedTablePropertiesCollector(
201       BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
202       bool prefix_filtering)
203       : index_type_(index_type),
204         whole_key_filtering_(whole_key_filtering),
205         prefix_filtering_(prefix_filtering) {}
206 
InternalAdd(const Slice &,const Slice &,uint64_t)207   Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
208                      uint64_t /*file_size*/) override {
209     // Intentionally left blank. Have no interest in collecting stats for
210     // individual key/value pairs.
211     return Status::OK();
212   }
213 
BlockAdd(uint64_t,uint64_t,uint64_t)214   virtual void BlockAdd(uint64_t /* block_raw_bytes */,
215                         uint64_t /* block_compressed_bytes_fast */,
216                         uint64_t /* block_compressed_bytes_slow */) override {
217     // Intentionally left blank. No interest in collecting stats for
218     // blocks.
219     return;
220   }
221 
Finish(UserCollectedProperties * properties)222   Status Finish(UserCollectedProperties* properties) override {
223     std::string val;
224     PutFixed32(&val, static_cast<uint32_t>(index_type_));
225     properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
226     properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
227                         whole_key_filtering_ ? kPropTrue : kPropFalse});
228     properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
229                         prefix_filtering_ ? kPropTrue : kPropFalse});
230     return Status::OK();
231   }
232 
233   // The name of the properties collector can be used for debugging purpose.
Name() const234   const char* Name() const override {
235     return "BlockBasedTablePropertiesCollector";
236   }
237 
GetReadableProperties() const238   UserCollectedProperties GetReadableProperties() const override {
239     // Intentionally left blank.
240     return UserCollectedProperties();
241   }
242 
243  private:
244   BlockBasedTableOptions::IndexType index_type_;
245   bool whole_key_filtering_;
246   bool prefix_filtering_;
247 };
248 
249 struct BlockBasedTableBuilder::Rep {
250   const ImmutableOptions ioptions;
251   const MutableCFOptions moptions;
252   const BlockBasedTableOptions table_options;
253   const InternalKeyComparator& internal_comparator;
254   WritableFileWriter* file;
255   std::atomic<uint64_t> offset;
256   size_t alignment;
257   BlockBuilder data_block;
258   // Buffers uncompressed data blocks to replay later. Needed when
259   // compression dictionary is enabled so we can finalize the dictionary before
260   // compressing any data blocks.
261   std::vector<std::string> data_block_buffers;
262   BlockBuilder range_del_block;
263 
264   InternalKeySliceTransform internal_prefix_transform;
265   std::unique_ptr<IndexBuilder> index_builder;
266   PartitionedIndexBuilder* p_index_builder_ = nullptr;
267 
268   std::string last_key;
269   const Slice* first_key_in_next_block = nullptr;
270   CompressionType compression_type;
271   uint64_t sample_for_compression;
272   std::atomic<uint64_t> compressible_input_data_bytes;
273   std::atomic<uint64_t> uncompressible_input_data_bytes;
274   std::atomic<uint64_t> sampled_input_data_bytes;
275   std::atomic<uint64_t> sampled_output_slow_data_bytes;
276   std::atomic<uint64_t> sampled_output_fast_data_bytes;
277   CompressionOptions compression_opts;
278   std::unique_ptr<CompressionDict> compression_dict;
279   std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
280   std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
281   std::unique_ptr<UncompressionDict> verify_dict;
282 
283   size_t data_begin_offset = 0;
284 
285   TableProperties props;
286 
287   // States of the builder.
288   //
289   // - `kBuffered`: This is the initial state where zero or more data blocks are
290   //   accumulated uncompressed in-memory. From this state, call
291   //   `EnterUnbuffered()` to finalize the compression dictionary if enabled,
292   //   compress/write out any buffered blocks, and proceed to the `kUnbuffered`
293   //   state.
294   //
295   // - `kUnbuffered`: This is the state when compression dictionary is finalized
296   //   either because it wasn't enabled in the first place or it's been created
297   //   from sampling previously buffered data. In this state, blocks are simply
298   //   compressed/written out as they fill up. From this state, call `Finish()`
299   //   to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
300   //   the partially created file.
301   //
302   // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
303   //   called, so the table builder is no longer usable. We must be in this
304   //   state by the time the destructor runs.
305   enum class State {
306     kBuffered,
307     kUnbuffered,
308     kClosed,
309   };
310   State state;
311   // `kBuffered` state is allowed only as long as the buffering of uncompressed
312   // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
313   uint64_t buffer_limit;
314 
315   const bool use_delta_encoding_for_index_values;
316   std::unique_ptr<FilterBlockBuilder> filter_builder;
317   char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
318   size_t compressed_cache_key_prefix_size;
319 
320   BlockHandle pending_handle;  // Handle to add to index block
321 
322   std::string compressed_output;
323   std::unique_ptr<FlushBlockPolicy> flush_block_policy;
324   uint32_t column_family_id;
325   std::string column_family_name;
326   uint64_t creation_time = 0;
327   uint64_t oldest_key_time = 0;
328   uint64_t file_creation_time = 0;
329 
330   // DB IDs
331   const std::string db_id;
332   const std::string db_session_id;
333   std::string db_host_id;
334 
335   std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
336 
337   std::unique_ptr<ParallelCompressionRep> pc_rep;
338 
get_offsetROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep339   uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
set_offsetROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep340   void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
341 
IsParallelCompressionEnabledROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep342   bool IsParallelCompressionEnabled() const {
343     return compression_opts.parallel_threads > 1;
344   }
345 
GetStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep346   Status GetStatus() {
347     // We need to make modifications of status visible when status_ok is set
348     // to false, and this is ensured by status_mutex, so no special memory
349     // order for status_ok is required.
350     if (status_ok.load(std::memory_order_relaxed)) {
351       return Status::OK();
352     } else {
353       return CopyStatus();
354     }
355   }
356 
CopyStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep357   Status CopyStatus() {
358     std::lock_guard<std::mutex> lock(status_mutex);
359     return status;
360   }
361 
GetIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep362   IOStatus GetIOStatus() {
363     // We need to make modifications of io_status visible when status_ok is set
364     // to false, and this is ensured by io_status_mutex, so no special memory
365     // order for io_status_ok is required.
366     if (io_status_ok.load(std::memory_order_relaxed)) {
367       return IOStatus::OK();
368     } else {
369       return CopyIOStatus();
370     }
371   }
372 
CopyIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep373   IOStatus CopyIOStatus() {
374     std::lock_guard<std::mutex> lock(io_status_mutex);
375     return io_status;
376   }
377 
378   // Never erase an existing status that is not OK.
SetStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep379   void SetStatus(Status s) {
380     if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
381       // Locking is an overkill for non compression_opts.parallel_threads
382       // case but since it's unlikely that s is not OK, we take this cost
383       // to be simplicity.
384       std::lock_guard<std::mutex> lock(status_mutex);
385       status = s;
386       status_ok.store(false, std::memory_order_relaxed);
387     }
388   }
389 
390   // Never erase an existing I/O status that is not OK.
SetIOStatusROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep391   void SetIOStatus(IOStatus ios) {
392     if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
393       // Locking is an overkill for non compression_opts.parallel_threads
394       // case but since it's unlikely that s is not OK, we take this cost
395       // to be simplicity.
396       std::lock_guard<std::mutex> lock(io_status_mutex);
397       io_status = ios;
398       io_status_ok.store(false, std::memory_order_relaxed);
399     }
400   }
401 
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep402   Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
403       WritableFileWriter* f)
404       : ioptions(tbo.ioptions),
405         moptions(tbo.moptions),
406         table_options(table_opt),
407         internal_comparator(tbo.internal_comparator),
408         file(f),
409         offset(0),
410         alignment(table_options.block_align
411                       ? std::min(table_options.block_size, kDefaultPageSize)
412                       : 0),
413         data_block(table_options.block_restart_interval,
414                    table_options.use_delta_encoding,
415                    false /* use_value_delta_encoding */,
416                    tbo.internal_comparator.user_comparator()
417                            ->CanKeysWithDifferentByteContentsBeEqual()
418                        ? BlockBasedTableOptions::kDataBlockBinarySearch
419                        : table_options.data_block_index_type,
420                    table_options.data_block_hash_table_util_ratio),
421         range_del_block(1 /* block_restart_interval */),
422         internal_prefix_transform(tbo.moptions.prefix_extractor.get()),
423         compression_type(tbo.compression_type),
424         sample_for_compression(tbo.moptions.sample_for_compression),
425         compressible_input_data_bytes(0),
426         uncompressible_input_data_bytes(0),
427         sampled_input_data_bytes(0),
428         sampled_output_slow_data_bytes(0),
429         sampled_output_fast_data_bytes(0),
430         compression_opts(tbo.compression_opts),
431         compression_dict(),
432         compression_ctxs(tbo.compression_opts.parallel_threads),
433         verify_ctxs(tbo.compression_opts.parallel_threads),
434         verify_dict(),
435         state((tbo.compression_opts.max_dict_bytes > 0) ? State::kBuffered
436                                                         : State::kUnbuffered),
437         use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
438                                             !table_opt.block_align),
439         compressed_cache_key_prefix_size(0),
440         flush_block_policy(
441             table_options.flush_block_policy_factory->NewFlushBlockPolicy(
442                 table_options, data_block)),
443         column_family_id(tbo.column_family_id),
444         column_family_name(tbo.column_family_name),
445         creation_time(tbo.creation_time),
446         oldest_key_time(tbo.oldest_key_time),
447         file_creation_time(tbo.file_creation_time),
448         db_id(tbo.db_id),
449         db_session_id(tbo.db_session_id),
450         db_host_id(ioptions.db_host_id),
451         status_ok(true),
452         io_status_ok(true) {
453     if (tbo.target_file_size == 0) {
454       buffer_limit = compression_opts.max_dict_buffer_bytes;
455     } else if (compression_opts.max_dict_buffer_bytes == 0) {
456       buffer_limit = tbo.target_file_size;
457     } else {
458       buffer_limit = std::min(tbo.target_file_size,
459                               compression_opts.max_dict_buffer_bytes);
460     }
461     for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
462       compression_ctxs[i].reset(new CompressionContext(compression_type));
463     }
464     if (table_options.index_type ==
465         BlockBasedTableOptions::kTwoLevelIndexSearch) {
466       p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
467           &internal_comparator, use_delta_encoding_for_index_values,
468           table_options);
469       index_builder.reset(p_index_builder_);
470     } else {
471       index_builder.reset(IndexBuilder::CreateIndexBuilder(
472           table_options.index_type, &internal_comparator,
473           &this->internal_prefix_transform, use_delta_encoding_for_index_values,
474           table_options));
475     }
476     if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
477       // Apply optimize_filters_for_hits setting here when applicable by
478       // skipping filter generation
479       filter_builder.reset();
480     } else if (tbo.skip_filters) {
481       // For SstFileWriter skip_filters
482       filter_builder.reset();
483     } else if (!table_options.filter_policy) {
484       // Null filter_policy -> no filter
485       filter_builder.reset();
486     } else {
487       FilterBuildingContext filter_context(table_options);
488 
489       filter_context.info_log = ioptions.logger;
490       filter_context.column_family_name = tbo.column_family_name;
491       filter_context.reason = tbo.reason;
492 
493       // Only populate other fields if known to be in LSM rather than
494       // generating external SST file
495       if (tbo.reason != TableFileCreationReason::kMisc) {
496         filter_context.compaction_style = ioptions.compaction_style;
497         filter_context.num_levels = ioptions.num_levels;
498         filter_context.level_at_creation = tbo.level_at_creation;
499         filter_context.is_bottommost = tbo.is_bottommost;
500         assert(filter_context.level_at_creation < filter_context.num_levels);
501       }
502 
503       filter_builder.reset(CreateFilterBlockBuilder(
504           ioptions, moptions, filter_context,
505           use_delta_encoding_for_index_values, p_index_builder_));
506     }
507 
508     const auto& factory_range = tbo.int_tbl_prop_collector_factories;
509     for (auto it = factory_range.first; it != factory_range.second; ++it) {
510       assert(*it);
511 
512       table_properties_collectors.emplace_back(
513           (*it)->CreateIntTblPropCollector(column_family_id));
514     }
515     table_properties_collectors.emplace_back(
516         new BlockBasedTablePropertiesCollector(
517             table_options.index_type, table_options.whole_key_filtering,
518             moptions.prefix_extractor != nullptr));
519     if (table_options.verify_compression) {
520       for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
521         verify_ctxs[i].reset(new UncompressionContext(compression_type));
522       }
523     }
524 
525     if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) {
526       ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
527     }
528   }
529 
530   Rep(const Rep&) = delete;
531   Rep& operator=(const Rep&) = delete;
532 
533  private:
534   // Synchronize status & io_status accesses across threads from main thread,
535   // compression thread and write thread in parallel compression.
536   std::mutex status_mutex;
537   std::atomic<bool> status_ok;
538   Status status;
539   std::mutex io_status_mutex;
540   std::atomic<bool> io_status_ok;
541   IOStatus io_status;
542 };
543 
544 struct BlockBasedTableBuilder::ParallelCompressionRep {
545   // Keys is a wrapper of vector of strings avoiding
546   // releasing string memories during vector clear()
547   // in order to save memory allocation overhead
548   class Keys {
549    public:
Keys()550     Keys() : keys_(kKeysInitSize), size_(0) {}
PushBack(const Slice & key)551     void PushBack(const Slice& key) {
552       if (size_ == keys_.size()) {
553         keys_.emplace_back(key.data(), key.size());
554       } else {
555         keys_[size_].assign(key.data(), key.size());
556       }
557       size_++;
558     }
SwapAssign(std::vector<std::string> & keys)559     void SwapAssign(std::vector<std::string>& keys) {
560       size_ = keys.size();
561       std::swap(keys_, keys);
562     }
Clear()563     void Clear() { size_ = 0; }
Size()564     size_t Size() { return size_; }
Back()565     std::string& Back() { return keys_[size_ - 1]; }
operator [](size_t idx)566     std::string& operator[](size_t idx) {
567       assert(idx < size_);
568       return keys_[idx];
569     }
570 
571    private:
572     const size_t kKeysInitSize = 32;
573     std::vector<std::string> keys_;
574     size_t size_;
575   };
576   std::unique_ptr<Keys> curr_block_keys;
577 
578   class BlockRepSlot;
579 
580   // BlockRep instances are fetched from and recycled to
581   // block_rep_pool during parallel compression.
582   struct BlockRep {
583     Slice contents;
584     Slice compressed_contents;
585     std::unique_ptr<std::string> data;
586     std::unique_ptr<std::string> compressed_data;
587     CompressionType compression_type;
588     std::unique_ptr<std::string> first_key_in_next_block;
589     std::unique_ptr<Keys> keys;
590     std::unique_ptr<BlockRepSlot> slot;
591     Status status;
592   };
593   // Use a vector of BlockRep as a buffer for a determined number
594   // of BlockRep structures. All data referenced by pointers in
595   // BlockRep will be freed when this vector is destructed.
596   typedef std::vector<BlockRep> BlockRepBuffer;
597   BlockRepBuffer block_rep_buf;
598   // Use a thread-safe queue for concurrent access from block
599   // building thread and writer thread.
600   typedef WorkQueue<BlockRep*> BlockRepPool;
601   BlockRepPool block_rep_pool;
602 
603   // Use BlockRepSlot to keep block order in write thread.
604   // slot_ will pass references to BlockRep
605   class BlockRepSlot {
606    public:
BlockRepSlot()607     BlockRepSlot() : slot_(1) {}
608     template <typename T>
Fill(T && rep)609     void Fill(T&& rep) {
610       slot_.push(std::forward<T>(rep));
611     };
Take(BlockRep * & rep)612     void Take(BlockRep*& rep) { slot_.pop(rep); }
613 
614    private:
615     // slot_ will pass references to BlockRep in block_rep_buf,
616     // and those references are always valid before the destruction of
617     // block_rep_buf.
618     WorkQueue<BlockRep*> slot_;
619   };
620 
621   // Compression queue will pass references to BlockRep in block_rep_buf,
622   // and those references are always valid before the destruction of
623   // block_rep_buf.
624   typedef WorkQueue<BlockRep*> CompressQueue;
625   CompressQueue compress_queue;
626   std::vector<port::Thread> compress_thread_pool;
627 
628   // Write queue will pass references to BlockRep::slot in block_rep_buf,
629   // and those references are always valid before the corresponding
630   // BlockRep::slot is destructed, which is before the destruction of
631   // block_rep_buf.
632   typedef WorkQueue<BlockRepSlot*> WriteQueue;
633   WriteQueue write_queue;
634   std::unique_ptr<port::Thread> write_thread;
635 
636   // Estimate output file size when parallel compression is enabled. This is
637   // necessary because compression & flush are no longer synchronized,
638   // and BlockBasedTableBuilder::FileSize() is no longer accurate.
639   // memory_order_relaxed suffices because accurate statistics is not required.
640   class FileSizeEstimator {
641    public:
FileSizeEstimator()642     explicit FileSizeEstimator()
643         : raw_bytes_compressed(0),
644           raw_bytes_curr_block(0),
645           raw_bytes_curr_block_set(false),
646           raw_bytes_inflight(0),
647           blocks_inflight(0),
648           curr_compression_ratio(0),
649           estimated_file_size(0) {}
650 
651     // Estimate file size when a block is about to be emitted to
652     // compression thread
EmitBlock(uint64_t raw_block_size,uint64_t curr_file_size)653     void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
654       uint64_t new_raw_bytes_inflight =
655           raw_bytes_inflight.fetch_add(raw_block_size,
656                                        std::memory_order_relaxed) +
657           raw_block_size;
658 
659       uint64_t new_blocks_inflight =
660           blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
661 
662       estimated_file_size.store(
663           curr_file_size +
664               static_cast<uint64_t>(
665                   static_cast<double>(new_raw_bytes_inflight) *
666                   curr_compression_ratio.load(std::memory_order_relaxed)) +
667               new_blocks_inflight * kBlockTrailerSize,
668           std::memory_order_relaxed);
669     }
670 
671     // Estimate file size when a block is already reaped from
672     // compression thread
ReapBlock(uint64_t compressed_block_size,uint64_t curr_file_size)673     void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
674       assert(raw_bytes_curr_block_set);
675 
676       uint64_t new_raw_bytes_compressed =
677           raw_bytes_compressed + raw_bytes_curr_block;
678       assert(new_raw_bytes_compressed > 0);
679 
680       curr_compression_ratio.store(
681           (curr_compression_ratio.load(std::memory_order_relaxed) *
682                raw_bytes_compressed +
683            compressed_block_size) /
684               static_cast<double>(new_raw_bytes_compressed),
685           std::memory_order_relaxed);
686       raw_bytes_compressed = new_raw_bytes_compressed;
687 
688       uint64_t new_raw_bytes_inflight =
689           raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
690                                        std::memory_order_relaxed) -
691           raw_bytes_curr_block;
692 
693       uint64_t new_blocks_inflight =
694           blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
695 
696       estimated_file_size.store(
697           curr_file_size +
698               static_cast<uint64_t>(
699                   static_cast<double>(new_raw_bytes_inflight) *
700                   curr_compression_ratio.load(std::memory_order_relaxed)) +
701               new_blocks_inflight * kBlockTrailerSize,
702           std::memory_order_relaxed);
703 
704       raw_bytes_curr_block_set = false;
705     }
706 
SetEstimatedFileSize(uint64_t size)707     void SetEstimatedFileSize(uint64_t size) {
708       estimated_file_size.store(size, std::memory_order_relaxed);
709     }
710 
GetEstimatedFileSize()711     uint64_t GetEstimatedFileSize() {
712       return estimated_file_size.load(std::memory_order_relaxed);
713     }
714 
SetCurrBlockRawSize(uint64_t size)715     void SetCurrBlockRawSize(uint64_t size) {
716       raw_bytes_curr_block = size;
717       raw_bytes_curr_block_set = true;
718     }
719 
720    private:
721     // Raw bytes compressed so far.
722     uint64_t raw_bytes_compressed;
723     // Size of current block being appended.
724     uint64_t raw_bytes_curr_block;
725     // Whether raw_bytes_curr_block has been set for next
726     // ReapBlock call.
727     bool raw_bytes_curr_block_set;
728     // Raw bytes under compression and not appended yet.
729     std::atomic<uint64_t> raw_bytes_inflight;
730     // Number of blocks under compression and not appended yet.
731     std::atomic<uint64_t> blocks_inflight;
732     // Current compression ratio, maintained by BGWorkWriteRawBlock.
733     std::atomic<double> curr_compression_ratio;
734     // Estimated SST file size.
735     std::atomic<uint64_t> estimated_file_size;
736   };
737   FileSizeEstimator file_size_estimator;
738 
739   // Facilities used for waiting first block completion. Need to Wait for
740   // the completion of first block compression and flush to get a non-zero
741   // compression ratio.
742   std::atomic<bool> first_block_processed;
743   std::condition_variable first_block_cond;
744   std::mutex first_block_mutex;
745 
ParallelCompressionRepROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep746   explicit ParallelCompressionRep(uint32_t parallel_threads)
747       : curr_block_keys(new Keys()),
748         block_rep_buf(parallel_threads),
749         block_rep_pool(parallel_threads),
750         compress_queue(parallel_threads),
751         write_queue(parallel_threads),
752         first_block_processed(false) {
753     for (uint32_t i = 0; i < parallel_threads; i++) {
754       block_rep_buf[i].contents = Slice();
755       block_rep_buf[i].compressed_contents = Slice();
756       block_rep_buf[i].data.reset(new std::string());
757       block_rep_buf[i].compressed_data.reset(new std::string());
758       block_rep_buf[i].compression_type = CompressionType();
759       block_rep_buf[i].first_key_in_next_block.reset(new std::string());
760       block_rep_buf[i].keys.reset(new Keys());
761       block_rep_buf[i].slot.reset(new BlockRepSlot());
762       block_rep_buf[i].status = Status::OK();
763       block_rep_pool.push(&block_rep_buf[i]);
764     }
765   }
766 
~ParallelCompressionRepROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep767   ~ParallelCompressionRep() { block_rep_pool.finish(); }
768 
769   // Make a block prepared to be emitted to compression thread
770   // Used in non-buffered mode
PrepareBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep771   BlockRep* PrepareBlock(CompressionType compression_type,
772                          const Slice* first_key_in_next_block,
773                          BlockBuilder* data_block) {
774     BlockRep* block_rep =
775         PrepareBlockInternal(compression_type, first_key_in_next_block);
776     assert(block_rep != nullptr);
777     data_block->SwapAndReset(*(block_rep->data));
778     block_rep->contents = *(block_rep->data);
779     std::swap(block_rep->keys, curr_block_keys);
780     curr_block_keys->Clear();
781     return block_rep;
782   }
783 
784   // Used in EnterUnbuffered
PrepareBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep785   BlockRep* PrepareBlock(CompressionType compression_type,
786                          const Slice* first_key_in_next_block,
787                          std::string* data_block,
788                          std::vector<std::string>* keys) {
789     BlockRep* block_rep =
790         PrepareBlockInternal(compression_type, first_key_in_next_block);
791     assert(block_rep != nullptr);
792     std::swap(*(block_rep->data), *data_block);
793     block_rep->contents = *(block_rep->data);
794     block_rep->keys->SwapAssign(*keys);
795     return block_rep;
796   }
797 
798   // Emit a block to compression thread
EmitBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep799   void EmitBlock(BlockRep* block_rep) {
800     assert(block_rep != nullptr);
801     assert(block_rep->status.ok());
802     if (!write_queue.push(block_rep->slot.get())) {
803       return;
804     }
805     if (!compress_queue.push(block_rep)) {
806       return;
807     }
808 
809     if (!first_block_processed.load(std::memory_order_relaxed)) {
810       std::unique_lock<std::mutex> lock(first_block_mutex);
811       first_block_cond.wait(lock, [this] {
812         return first_block_processed.load(std::memory_order_relaxed);
813       });
814     }
815   }
816 
817   // Reap a block from compression thread
ReapBlockROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep818   void ReapBlock(BlockRep* block_rep) {
819     assert(block_rep != nullptr);
820     block_rep->compressed_data->clear();
821     block_rep_pool.push(block_rep);
822 
823     if (!first_block_processed.load(std::memory_order_relaxed)) {
824       std::lock_guard<std::mutex> lock(first_block_mutex);
825       first_block_processed.store(true, std::memory_order_relaxed);
826       first_block_cond.notify_one();
827     }
828   }
829 
830  private:
PrepareBlockInternalROCKSDB_NAMESPACE::BlockBasedTableBuilder::ParallelCompressionRep831   BlockRep* PrepareBlockInternal(CompressionType compression_type,
832                                  const Slice* first_key_in_next_block) {
833     BlockRep* block_rep = nullptr;
834     block_rep_pool.pop(block_rep);
835     assert(block_rep != nullptr);
836 
837     assert(block_rep->data);
838 
839     block_rep->compression_type = compression_type;
840 
841     if (first_key_in_next_block == nullptr) {
842       block_rep->first_key_in_next_block.reset(nullptr);
843     } else {
844       block_rep->first_key_in_next_block->assign(
845           first_key_in_next_block->data(), first_key_in_next_block->size());
846     }
847 
848     return block_rep;
849   }
850 };
851 
BlockBasedTableBuilder(const BlockBasedTableOptions & table_options,const TableBuilderOptions & tbo,WritableFileWriter * file)852 BlockBasedTableBuilder::BlockBasedTableBuilder(
853     const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
854     WritableFileWriter* file) {
855   BlockBasedTableOptions sanitized_table_options(table_options);
856   if (sanitized_table_options.format_version == 0 &&
857       sanitized_table_options.checksum != kCRC32c) {
858     ROCKS_LOG_WARN(
859         tbo.ioptions.logger,
860         "Silently converting format_version to 1 because checksum is "
861         "non-default");
862     // silently convert format_version to 1 to keep consistent with current
863     // behavior
864     sanitized_table_options.format_version = 1;
865   }
866 
867   rep_ = new Rep(sanitized_table_options, tbo, file);
868 
869   if (rep_->filter_builder != nullptr) {
870     rep_->filter_builder->StartBlock(0);
871   }
872   if (table_options.block_cache_compressed.get() != nullptr) {
873     BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
874         table_options.block_cache_compressed.get(), file->writable_file(),
875         &rep_->compressed_cache_key_prefix[0],
876         &rep_->compressed_cache_key_prefix_size);
877   }
878 
879   if (rep_->IsParallelCompressionEnabled()) {
880     StartParallelCompression();
881   }
882 }
883 
~BlockBasedTableBuilder()884 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
885   // Catch errors where caller forgot to call Finish()
886   assert(rep_->state == Rep::State::kClosed);
887   delete rep_;
888 }
889 
Add(const Slice & key,const Slice & value)890 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
891   Rep* r = rep_;
892   assert(rep_->state != Rep::State::kClosed);
893   if (!ok()) return;
894   ValueType value_type = ExtractValueType(key);
895   if (IsValueType(value_type)) {
896 #ifndef NDEBUG
897     if (r->props.num_entries > r->props.num_range_deletions) {
898       assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
899     }
900 #endif  // !NDEBUG
901 
902     auto should_flush = r->flush_block_policy->Update(key, value);
903     if (should_flush) {
904       assert(!r->data_block.empty());
905       r->first_key_in_next_block = &key;
906       Flush();
907 
908       if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
909           r->data_begin_offset > r->buffer_limit) {
910         EnterUnbuffered();
911       }
912 
913       // Add item to index block.
914       // We do not emit the index entry for a block until we have seen the
915       // first key for the next data block.  This allows us to use shorter
916       // keys in the index block.  For example, consider a block boundary
917       // between the keys "the quick brown fox" and "the who".  We can use
918       // "the r" as the key for the index block entry since it is >= all
919       // entries in the first block and < all entries in subsequent
920       // blocks.
921       if (ok() && r->state == Rep::State::kUnbuffered) {
922         if (r->IsParallelCompressionEnabled()) {
923           r->pc_rep->curr_block_keys->Clear();
924         } else {
925           r->index_builder->AddIndexEntry(&r->last_key, &key,
926                                           r->pending_handle);
927         }
928       }
929     }
930 
931     // Note: PartitionedFilterBlockBuilder requires key being added to filter
932     // builder after being added to index builder.
933     if (r->state == Rep::State::kUnbuffered) {
934       if (r->IsParallelCompressionEnabled()) {
935         r->pc_rep->curr_block_keys->PushBack(key);
936       } else {
937         if (r->filter_builder != nullptr) {
938           size_t ts_sz =
939               r->internal_comparator.user_comparator()->timestamp_size();
940           r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
941         }
942       }
943     }
944 
945     r->last_key.assign(key.data(), key.size());
946     r->data_block.Add(key, value);
947     if (r->state == Rep::State::kBuffered) {
948       // Buffered keys will be replayed from data_block_buffers during
949       // `Finish()` once compression dictionary has been finalized.
950     } else {
951       if (!r->IsParallelCompressionEnabled()) {
952         r->index_builder->OnKeyAdded(key);
953       }
954     }
955     // TODO offset passed in is not accurate for parallel compression case
956     NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
957                                       r->table_properties_collectors,
958                                       r->ioptions.logger);
959 
960   } else if (value_type == kTypeRangeDeletion) {
961     r->range_del_block.Add(key, value);
962     // TODO offset passed in is not accurate for parallel compression case
963     NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
964                                       r->table_properties_collectors,
965                                       r->ioptions.logger);
966   } else {
967     assert(false);
968   }
969 
970   r->props.num_entries++;
971   r->props.raw_key_size += key.size();
972   r->props.raw_value_size += value.size();
973   if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
974     r->props.num_deletions++;
975   } else if (value_type == kTypeRangeDeletion) {
976     r->props.num_deletions++;
977     r->props.num_range_deletions++;
978   } else if (value_type == kTypeMerge) {
979     r->props.num_merge_operands++;
980   }
981 }
982 
Flush()983 void BlockBasedTableBuilder::Flush() {
984   Rep* r = rep_;
985   assert(rep_->state != Rep::State::kClosed);
986   if (!ok()) return;
987   if (r->data_block.empty()) return;
988   if (r->IsParallelCompressionEnabled() &&
989       r->state == Rep::State::kUnbuffered) {
990     r->data_block.Finish();
991     ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
992         r->compression_type, r->first_key_in_next_block, &(r->data_block));
993     assert(block_rep != nullptr);
994     r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
995                                              r->get_offset());
996     r->pc_rep->EmitBlock(block_rep);
997   } else {
998     WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
999   }
1000 }
1001 
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)1002 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
1003                                         BlockHandle* handle,
1004                                         bool is_data_block) {
1005   block->Finish();
1006   std::string raw_block_contents;
1007   block->SwapAndReset(raw_block_contents);
1008   if (rep_->state == Rep::State::kBuffered) {
1009     assert(is_data_block);
1010     rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
1011     rep_->data_begin_offset += rep_->data_block_buffers.back().size();
1012     return;
1013   }
1014   WriteBlock(raw_block_contents, handle, is_data_block);
1015 }
1016 
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)1017 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
1018                                         BlockHandle* handle,
1019                                         bool is_data_block) {
1020   Rep* r = rep_;
1021   assert(r->state == Rep::State::kUnbuffered);
1022   Slice block_contents;
1023   CompressionType type;
1024   Status compress_status;
1025   CompressAndVerifyBlock(raw_block_contents, is_data_block,
1026                          *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
1027                          &(r->compressed_output), &(block_contents), &type,
1028                          &compress_status);
1029   r->SetStatus(compress_status);
1030   if (!ok()) {
1031     return;
1032   }
1033   WriteRawBlock(block_contents, type, handle, is_data_block);
1034   r->compressed_output.clear();
1035   if (is_data_block) {
1036     if (r->filter_builder != nullptr) {
1037       r->filter_builder->StartBlock(r->get_offset());
1038     }
1039     r->props.data_size = r->get_offset();
1040     ++r->props.num_data_blocks;
1041   }
1042 }
1043 
BGWorkCompression(const CompressionContext & compression_ctx,UncompressionContext * verify_ctx)1044 void BlockBasedTableBuilder::BGWorkCompression(
1045     const CompressionContext& compression_ctx,
1046     UncompressionContext* verify_ctx) {
1047   ParallelCompressionRep::BlockRep* block_rep = nullptr;
1048   while (rep_->pc_rep->compress_queue.pop(block_rep)) {
1049     assert(block_rep != nullptr);
1050     CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
1051                            compression_ctx, verify_ctx,
1052                            block_rep->compressed_data.get(),
1053                            &block_rep->compressed_contents,
1054                            &(block_rep->compression_type), &block_rep->status);
1055     block_rep->slot->Fill(block_rep);
1056   }
1057 }
1058 
CompressAndVerifyBlock(const Slice & raw_block_contents,bool is_data_block,const CompressionContext & compression_ctx,UncompressionContext * verify_ctx,std::string * compressed_output,Slice * block_contents,CompressionType * type,Status * out_status)1059 void BlockBasedTableBuilder::CompressAndVerifyBlock(
1060     const Slice& raw_block_contents, bool is_data_block,
1061     const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
1062     std::string* compressed_output, Slice* block_contents,
1063     CompressionType* type, Status* out_status) {
1064   // File format contains a sequence of blocks where each block has:
1065   //    block_data: uint8[n]
1066   //    type: uint8
1067   //    crc: uint32
1068   Rep* r = rep_;
1069   bool is_status_ok = ok();
1070   if (!r->IsParallelCompressionEnabled()) {
1071     assert(is_status_ok);
1072   }
1073 
1074   *type = r->compression_type;
1075   uint64_t sample_for_compression = r->sample_for_compression;
1076   bool abort_compression = false;
1077 
1078   StopWatchNano timer(
1079       r->ioptions.clock,
1080       ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
1081 
1082   if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
1083     if (is_data_block) {
1084       r->compressible_input_data_bytes.fetch_add(raw_block_contents.size(),
1085                                                  std::memory_order_relaxed);
1086     }
1087     const CompressionDict* compression_dict;
1088     if (!is_data_block || r->compression_dict == nullptr) {
1089       compression_dict = &CompressionDict::GetEmptyDict();
1090     } else {
1091       compression_dict = r->compression_dict.get();
1092     }
1093     assert(compression_dict != nullptr);
1094     CompressionInfo compression_info(r->compression_opts, compression_ctx,
1095                                      *compression_dict, *type,
1096                                      sample_for_compression);
1097 
1098     std::string sampled_output_fast;
1099     std::string sampled_output_slow;
1100     *block_contents = CompressBlock(
1101         raw_block_contents, compression_info, type,
1102         r->table_options.format_version, is_data_block /* do_sample */,
1103         compressed_output, &sampled_output_fast, &sampled_output_slow);
1104 
1105     if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
1106       // Currently compression sampling is only enabled for data block.
1107       assert(is_data_block);
1108       r->sampled_input_data_bytes.fetch_add(raw_block_contents.size(),
1109                                             std::memory_order_relaxed);
1110       r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
1111                                                   std::memory_order_relaxed);
1112       r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
1113                                                   std::memory_order_relaxed);
1114     }
1115     // notify collectors on block add
1116     NotifyCollectTableCollectorsOnBlockAdd(
1117         r->table_properties_collectors, raw_block_contents.size(),
1118         sampled_output_fast.size(), sampled_output_slow.size());
1119 
1120     // Some of the compression algorithms are known to be unreliable. If
1121     // the verify_compression flag is set then try to de-compress the
1122     // compressed data and compare to the input.
1123     if (*type != kNoCompression && r->table_options.verify_compression) {
1124       // Retrieve the uncompressed contents into a new buffer
1125       const UncompressionDict* verify_dict;
1126       if (!is_data_block || r->verify_dict == nullptr) {
1127         verify_dict = &UncompressionDict::GetEmptyDict();
1128       } else {
1129         verify_dict = r->verify_dict.get();
1130       }
1131       assert(verify_dict != nullptr);
1132       BlockContents contents;
1133       UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
1134                                            r->compression_type);
1135       Status stat = UncompressBlockContentsForCompressionType(
1136           uncompression_info, block_contents->data(), block_contents->size(),
1137           &contents, r->table_options.format_version, r->ioptions);
1138 
1139       if (stat.ok()) {
1140         bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
1141         if (!compressed_ok) {
1142           // The result of the compression was invalid. abort.
1143           abort_compression = true;
1144           ROCKS_LOG_ERROR(r->ioptions.logger,
1145                           "Decompressed block did not match raw block");
1146           *out_status =
1147               Status::Corruption("Decompressed block did not match raw block");
1148         }
1149       } else {
1150         // Decompression reported an error. abort.
1151         *out_status = Status::Corruption(std::string("Could not decompress: ") +
1152                                          stat.getState());
1153         abort_compression = true;
1154       }
1155     }
1156   } else {
1157     // Block is too big to be compressed.
1158     if (is_data_block) {
1159       r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(),
1160                                                    std::memory_order_relaxed);
1161     }
1162     abort_compression = true;
1163   }
1164   if (is_data_block) {
1165     r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
1166                                                  std::memory_order_relaxed);
1167   }
1168 
1169   // Abort compression if the block is too big, or did not pass
1170   // verification.
1171   if (abort_compression) {
1172     RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1173     *type = kNoCompression;
1174     *block_contents = raw_block_contents;
1175   } else if (*type != kNoCompression) {
1176     if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) {
1177       RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
1178                             timer.ElapsedNanos());
1179     }
1180     RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED,
1181                       raw_block_contents.size());
1182     RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
1183   } else if (*type != r->compression_type) {
1184     RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1185   }
1186 }
1187 
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)1188 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
1189                                            CompressionType type,
1190                                            BlockHandle* handle,
1191                                            bool is_data_block) {
1192   Rep* r = rep_;
1193   Status s = Status::OK();
1194   IOStatus io_s = IOStatus::OK();
1195   StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
1196   handle->set_offset(r->get_offset());
1197   handle->set_size(block_contents.size());
1198   assert(status().ok());
1199   assert(io_status().ok());
1200   io_s = r->file->Append(block_contents);
1201   if (io_s.ok()) {
1202     char trailer[kBlockTrailerSize];
1203     trailer[0] = type;
1204     uint32_t checksum = 0;
1205     switch (r->table_options.checksum) {
1206       case kNoChecksum:
1207         break;
1208       case kCRC32c: {
1209         uint32_t crc =
1210             crc32c::Value(block_contents.data(), block_contents.size());
1211         // Extend to cover compression type
1212         crc = crc32c::Extend(crc, trailer, 1);
1213         checksum = crc32c::Mask(crc);
1214         break;
1215       }
1216       case kxxHash: {
1217         XXH32_state_t* const state = XXH32_createState();
1218         XXH32_reset(state, 0);
1219         XXH32_update(state, block_contents.data(), block_contents.size());
1220         // Extend to cover compression type
1221         XXH32_update(state, trailer, 1);
1222         checksum = XXH32_digest(state);
1223         XXH32_freeState(state);
1224         break;
1225       }
1226       case kxxHash64: {
1227         XXH64_state_t* const state = XXH64_createState();
1228         XXH64_reset(state, 0);
1229         XXH64_update(state, block_contents.data(), block_contents.size());
1230         // Extend to cover compression type
1231         XXH64_update(state, trailer, 1);
1232         checksum = Lower32of64(XXH64_digest(state));
1233         XXH64_freeState(state);
1234         break;
1235       }
1236       default:
1237         assert(false);
1238         break;
1239     }
1240     EncodeFixed32(trailer + 1, checksum);
1241     assert(io_s.ok());
1242     TEST_SYNC_POINT_CALLBACK(
1243         "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
1244         static_cast<char*>(trailer));
1245     io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
1246     if (io_s.ok()) {
1247       assert(s.ok());
1248       s = InsertBlockInCache(block_contents, type, handle);
1249       if (!s.ok()) {
1250         r->SetStatus(s);
1251       }
1252     } else {
1253       r->SetIOStatus(io_s);
1254     }
1255     if (s.ok() && io_s.ok()) {
1256       r->set_offset(r->get_offset() + block_contents.size() +
1257                     kBlockTrailerSize);
1258       if (r->table_options.block_align && is_data_block) {
1259         size_t pad_bytes =
1260             (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
1261                              (r->alignment - 1))) &
1262             (r->alignment - 1);
1263         io_s = r->file->Pad(pad_bytes);
1264         if (io_s.ok()) {
1265           r->set_offset(r->get_offset() + pad_bytes);
1266         } else {
1267           r->SetIOStatus(io_s);
1268         }
1269       }
1270       if (r->IsParallelCompressionEnabled()) {
1271         if (is_data_block) {
1272           r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
1273                                                    r->get_offset());
1274         } else {
1275           r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
1276         }
1277       }
1278     }
1279   } else {
1280     r->SetIOStatus(io_s);
1281   }
1282   if (!io_s.ok() && s.ok()) {
1283     r->SetStatus(io_s);
1284   }
1285 }
1286 
BGWorkWriteRawBlock()1287 void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
1288   Rep* r = rep_;
1289   ParallelCompressionRep::BlockRepSlot* slot = nullptr;
1290   ParallelCompressionRep::BlockRep* block_rep = nullptr;
1291   while (r->pc_rep->write_queue.pop(slot)) {
1292     assert(slot != nullptr);
1293     slot->Take(block_rep);
1294     assert(block_rep != nullptr);
1295     if (!block_rep->status.ok()) {
1296       r->SetStatus(block_rep->status);
1297       // Reap block so that blocked Flush() can finish
1298       // if there is one, and Flush() will notice !ok() next time.
1299       block_rep->status = Status::OK();
1300       r->pc_rep->ReapBlock(block_rep);
1301       continue;
1302     }
1303 
1304     for (size_t i = 0; i < block_rep->keys->Size(); i++) {
1305       auto& key = (*block_rep->keys)[i];
1306       if (r->filter_builder != nullptr) {
1307         size_t ts_sz =
1308             r->internal_comparator.user_comparator()->timestamp_size();
1309         r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1310       }
1311       r->index_builder->OnKeyAdded(key);
1312     }
1313 
1314     r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
1315     WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
1316                   &r->pending_handle, true /* is_data_block*/);
1317     if (!ok()) {
1318       break;
1319     }
1320 
1321     if (r->filter_builder != nullptr) {
1322       r->filter_builder->StartBlock(r->get_offset());
1323     }
1324     r->props.data_size = r->get_offset();
1325     ++r->props.num_data_blocks;
1326 
1327     if (block_rep->first_key_in_next_block == nullptr) {
1328       r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
1329                                       r->pending_handle);
1330     } else {
1331       Slice first_key_in_next_block =
1332           Slice(*block_rep->first_key_in_next_block);
1333       r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
1334                                       &first_key_in_next_block,
1335                                       r->pending_handle);
1336     }
1337 
1338     r->pc_rep->ReapBlock(block_rep);
1339   }
1340 }
1341 
StartParallelCompression()1342 void BlockBasedTableBuilder::StartParallelCompression() {
1343   rep_->pc_rep.reset(
1344       new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
1345   rep_->pc_rep->compress_thread_pool.reserve(
1346       rep_->compression_opts.parallel_threads);
1347   for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
1348     rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
1349       BGWorkCompression(*(rep_->compression_ctxs[i]),
1350                         rep_->verify_ctxs[i].get());
1351     });
1352   }
1353   rep_->pc_rep->write_thread.reset(
1354       new port::Thread([this] { BGWorkWriteRawBlock(); }));
1355 }
1356 
StopParallelCompression()1357 void BlockBasedTableBuilder::StopParallelCompression() {
1358   rep_->pc_rep->compress_queue.finish();
1359   for (auto& thread : rep_->pc_rep->compress_thread_pool) {
1360     thread.join();
1361   }
1362   rep_->pc_rep->write_queue.finish();
1363   rep_->pc_rep->write_thread->join();
1364 }
1365 
status() const1366 Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
1367 
io_status() const1368 IOStatus BlockBasedTableBuilder::io_status() const {
1369   return rep_->GetIOStatus();
1370 }
1371 
DeleteCachedBlockContents(const Slice &,void * value)1372 static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
1373   BlockContents* bc = reinterpret_cast<BlockContents*>(value);
1374   delete bc;
1375 }
1376 
1377 //
1378 // Make a copy of the block contents and insert into compressed block cache
1379 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)1380 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
1381                                                   const CompressionType type,
1382                                                   const BlockHandle* handle) {
1383   Rep* r = rep_;
1384   Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
1385 
1386   if (type != kNoCompression && block_cache_compressed != nullptr) {
1387     size_t size = block_contents.size();
1388 
1389     auto ubuf =
1390         AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
1391     memcpy(ubuf.get(), block_contents.data(), size);
1392     ubuf[size] = type;
1393 
1394     BlockContents* block_contents_to_cache =
1395         new BlockContents(std::move(ubuf), size);
1396 #ifndef NDEBUG
1397     block_contents_to_cache->is_raw_block = true;
1398 #endif  // NDEBUG
1399 
1400     // make cache key by appending the file offset to the cache prefix id
1401     char* end = EncodeVarint64(
1402         r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
1403         handle->offset());
1404     Slice key(r->compressed_cache_key_prefix,
1405               static_cast<size_t>(end - r->compressed_cache_key_prefix));
1406 
1407     // Insert into compressed block cache.
1408     // How should we deal with compressed cache full?
1409     block_cache_compressed
1410         ->Insert(key, block_contents_to_cache,
1411                  block_contents_to_cache->ApproximateMemoryUsage(),
1412                  &DeleteCachedBlockContents)
1413         .PermitUncheckedError();
1414 
1415     // Invalidate OS cache.
1416     r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
1417         .PermitUncheckedError();
1418   }
1419   return Status::OK();
1420 }
1421 
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)1422 void BlockBasedTableBuilder::WriteFilterBlock(
1423     MetaIndexBuilder* meta_index_builder) {
1424   BlockHandle filter_block_handle;
1425   bool empty_filter_block =
1426       (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty());
1427   if (ok() && !empty_filter_block) {
1428     rep_->props.num_filter_entries +=
1429         rep_->filter_builder->EstimateEntriesAdded();
1430     Status s = Status::Incomplete();
1431     while (ok() && s.IsIncomplete()) {
1432       Slice filter_content =
1433           rep_->filter_builder->Finish(filter_block_handle, &s);
1434       assert(s.ok() || s.IsIncomplete());
1435       rep_->props.filter_size += filter_content.size();
1436       WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
1437     }
1438   }
1439   if (ok() && !empty_filter_block) {
1440     // Add mapping from "<filter_block_prefix>.Name" to location
1441     // of filter data.
1442     std::string key;
1443     if (rep_->filter_builder->IsBlockBased()) {
1444       key = BlockBasedTable::kFilterBlockPrefix;
1445     } else {
1446       key = rep_->table_options.partition_filters
1447                 ? BlockBasedTable::kPartitionedFilterBlockPrefix
1448                 : BlockBasedTable::kFullFilterBlockPrefix;
1449     }
1450     key.append(rep_->table_options.filter_policy->Name());
1451     meta_index_builder->Add(key, filter_block_handle);
1452   }
1453 }
1454 
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)1455 void BlockBasedTableBuilder::WriteIndexBlock(
1456     MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
1457   IndexBuilder::IndexBlocks index_blocks;
1458   auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
1459   if (index_builder_status.IsIncomplete()) {
1460     // We we have more than one index partition then meta_blocks are not
1461     // supported for the index. Currently meta_blocks are used only by
1462     // HashIndexBuilder which is not multi-partition.
1463     assert(index_blocks.meta_blocks.empty());
1464   } else if (ok() && !index_builder_status.ok()) {
1465     rep_->SetStatus(index_builder_status);
1466   }
1467   if (ok()) {
1468     for (const auto& item : index_blocks.meta_blocks) {
1469       BlockHandle block_handle;
1470       WriteBlock(item.second, &block_handle, false /* is_data_block */);
1471       if (!ok()) {
1472         break;
1473       }
1474       meta_index_builder->Add(item.first, block_handle);
1475     }
1476   }
1477   if (ok()) {
1478     if (rep_->table_options.enable_index_compression) {
1479       WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
1480     } else {
1481       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1482                     index_block_handle);
1483     }
1484   }
1485   // If there are more index partitions, finish them and write them out
1486   if (index_builder_status.IsIncomplete()) {
1487     Status s = Status::Incomplete();
1488     while (ok() && s.IsIncomplete()) {
1489       s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
1490       if (!s.ok() && !s.IsIncomplete()) {
1491         rep_->SetStatus(s);
1492         return;
1493       }
1494       if (rep_->table_options.enable_index_compression) {
1495         WriteBlock(index_blocks.index_block_contents, index_block_handle,
1496                    false);
1497       } else {
1498         WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1499                       index_block_handle);
1500       }
1501       // The last index_block_handle will be for the partition index block
1502     }
1503   }
1504 }
1505 
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)1506 void BlockBasedTableBuilder::WritePropertiesBlock(
1507     MetaIndexBuilder* meta_index_builder) {
1508   BlockHandle properties_block_handle;
1509   if (ok()) {
1510     PropertyBlockBuilder property_block_builder;
1511     rep_->props.column_family_id = rep_->column_family_id;
1512     rep_->props.column_family_name = rep_->column_family_name;
1513     rep_->props.filter_policy_name =
1514         rep_->table_options.filter_policy != nullptr
1515             ? rep_->table_options.filter_policy->Name()
1516             : "";
1517     rep_->props.index_size =
1518         rep_->index_builder->IndexSize() + kBlockTrailerSize;
1519     rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
1520                                       ? rep_->ioptions.user_comparator->Name()
1521                                       : "nullptr";
1522     rep_->props.merge_operator_name =
1523         rep_->ioptions.merge_operator != nullptr
1524             ? rep_->ioptions.merge_operator->Name()
1525             : "nullptr";
1526     rep_->props.compression_name =
1527         CompressionTypeToString(rep_->compression_type);
1528     rep_->props.compression_options =
1529         CompressionOptionsToString(rep_->compression_opts);
1530     rep_->props.prefix_extractor_name =
1531         rep_->moptions.prefix_extractor != nullptr
1532             ? rep_->moptions.prefix_extractor->Name()
1533             : "nullptr";
1534 
1535     std::string property_collectors_names = "[";
1536     for (size_t i = 0;
1537          i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
1538       if (i != 0) {
1539         property_collectors_names += ",";
1540       }
1541       property_collectors_names +=
1542           rep_->ioptions.table_properties_collector_factories[i]->Name();
1543     }
1544     property_collectors_names += "]";
1545     rep_->props.property_collectors_names = property_collectors_names;
1546     if (rep_->table_options.index_type ==
1547         BlockBasedTableOptions::kTwoLevelIndexSearch) {
1548       assert(rep_->p_index_builder_ != nullptr);
1549       rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
1550       rep_->props.top_level_index_size =
1551           rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
1552     }
1553     rep_->props.index_key_is_user_key =
1554         !rep_->index_builder->seperator_is_key_plus_seq();
1555     rep_->props.index_value_is_delta_encoded =
1556         rep_->use_delta_encoding_for_index_values;
1557     rep_->props.creation_time = rep_->creation_time;
1558     rep_->props.oldest_key_time = rep_->oldest_key_time;
1559     rep_->props.file_creation_time = rep_->file_creation_time;
1560     if (rep_->sampled_input_data_bytes > 0) {
1561       rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
1562           static_cast<double>(rep_->sampled_output_slow_data_bytes) /
1563               rep_->sampled_input_data_bytes *
1564               rep_->compressible_input_data_bytes +
1565           rep_->uncompressible_input_data_bytes + 0.5);
1566       rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
1567           static_cast<double>(rep_->sampled_output_fast_data_bytes) /
1568               rep_->sampled_input_data_bytes *
1569               rep_->compressible_input_data_bytes +
1570           rep_->uncompressible_input_data_bytes + 0.5);
1571     } else if (rep_->sample_for_compression > 0) {
1572       // We tried to sample but none were found. Assume worst-case (compression
1573       // ratio 1.0) so data is complete and aggregatable.
1574       rep_->props.slow_compression_estimated_data_size =
1575           rep_->compressible_input_data_bytes +
1576           rep_->uncompressible_input_data_bytes;
1577       rep_->props.fast_compression_estimated_data_size =
1578           rep_->compressible_input_data_bytes +
1579           rep_->uncompressible_input_data_bytes;
1580     }
1581     rep_->props.db_id = rep_->db_id;
1582     rep_->props.db_session_id = rep_->db_session_id;
1583     rep_->props.db_host_id = rep_->db_host_id;
1584 
1585     // Add basic properties
1586     property_block_builder.AddTableProperty(rep_->props);
1587 
1588     // Add use collected properties
1589     NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
1590                                          rep_->ioptions.logger,
1591                                          &property_block_builder);
1592 
1593     WriteRawBlock(property_block_builder.Finish(), kNoCompression,
1594                   &properties_block_handle);
1595   }
1596   if (ok()) {
1597 #ifndef NDEBUG
1598     {
1599       uint64_t props_block_offset = properties_block_handle.offset();
1600       uint64_t props_block_size = properties_block_handle.size();
1601       TEST_SYNC_POINT_CALLBACK(
1602           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
1603           &props_block_offset);
1604       TEST_SYNC_POINT_CALLBACK(
1605           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
1606           &props_block_size);
1607     }
1608 #endif  // !NDEBUG
1609     meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
1610   }
1611 }
1612 
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1613 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1614     MetaIndexBuilder* meta_index_builder) {
1615   if (rep_->compression_dict != nullptr &&
1616       rep_->compression_dict->GetRawDict().size()) {
1617     BlockHandle compression_dict_block_handle;
1618     if (ok()) {
1619       WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1620                     &compression_dict_block_handle);
1621 #ifndef NDEBUG
1622       Slice compression_dict = rep_->compression_dict->GetRawDict();
1623       TEST_SYNC_POINT_CALLBACK(
1624           "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1625           &compression_dict);
1626 #endif  // NDEBUG
1627     }
1628     if (ok()) {
1629       meta_index_builder->Add(kCompressionDictBlock,
1630                               compression_dict_block_handle);
1631     }
1632   }
1633 }
1634 
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1635 void BlockBasedTableBuilder::WriteRangeDelBlock(
1636     MetaIndexBuilder* meta_index_builder) {
1637   if (ok() && !rep_->range_del_block.empty()) {
1638     BlockHandle range_del_block_handle;
1639     WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1640                   &range_del_block_handle);
1641     meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1642   }
1643 }
1644 
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1645 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1646                                          BlockHandle& index_block_handle) {
1647   Rep* r = rep_;
1648   // No need to write out new footer if we're using default checksum.
1649   // We're writing legacy magic number because we want old versions of RocksDB
1650   // be able to read files generated with new release (just in case if
1651   // somebody wants to roll back after an upgrade)
1652   // TODO(icanadi) at some point in the future, when we're absolutely sure
1653   // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1654   // number and always write new table files with new magic number
1655   bool legacy = (r->table_options.format_version == 0);
1656   // this is guaranteed by BlockBasedTableBuilder's constructor
1657   assert(r->table_options.checksum == kCRC32c ||
1658          r->table_options.format_version != 0);
1659   Footer footer(
1660       legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1661       r->table_options.format_version);
1662   footer.set_metaindex_handle(metaindex_block_handle);
1663   footer.set_index_handle(index_block_handle);
1664   footer.set_checksum(r->table_options.checksum);
1665   std::string footer_encoding;
1666   footer.EncodeTo(&footer_encoding);
1667   assert(ok());
1668   IOStatus ios = r->file->Append(footer_encoding);
1669   if (ios.ok()) {
1670     r->set_offset(r->get_offset() + footer_encoding.size());
1671   } else {
1672     r->SetIOStatus(ios);
1673     r->SetStatus(ios);
1674   }
1675 }
1676 
EnterUnbuffered()1677 void BlockBasedTableBuilder::EnterUnbuffered() {
1678   Rep* r = rep_;
1679   assert(r->state == Rep::State::kBuffered);
1680   r->state = Rep::State::kUnbuffered;
1681   const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1682                                   ? r->compression_opts.zstd_max_train_bytes
1683                                   : r->compression_opts.max_dict_bytes;
1684   const size_t kNumBlocksBuffered = r->data_block_buffers.size();
1685   if (kNumBlocksBuffered == 0) {
1686     // The below code is neither safe nor necessary for handling zero data
1687     // blocks.
1688     return;
1689   }
1690 
1691   // Abstract algebra teaches us that a finite cyclic group (such as the
1692   // additive group of integers modulo N) can be generated by a number that is
1693   // coprime with N. Since N is variable (number of buffered data blocks), we
1694   // must then pick a prime number in order to guarantee coprimeness with any N.
1695   //
1696   // One downside of this approach is the spread will be poor when
1697   // `kPrimeGeneratorRemainder` is close to zero or close to
1698   // `kNumBlocksBuffered`.
1699   //
1700   // Picked a random number between one and one trillion and then chose the
1701   // next prime number greater than or equal to it.
1702   const uint64_t kPrimeGenerator = 545055921143ull;
1703   // Can avoid repeated division by just adding the remainder repeatedly.
1704   const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
1705       kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
1706   const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
1707 
1708   std::string compression_dict_samples;
1709   std::vector<size_t> compression_dict_sample_lens;
1710   size_t buffer_idx = kInitSampleIdx;
1711   for (size_t i = 0;
1712        i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes;
1713        ++i) {
1714     size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(),
1715                                r->data_block_buffers[buffer_idx].size());
1716     compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0,
1717                                     copy_len);
1718     compression_dict_sample_lens.emplace_back(copy_len);
1719 
1720     buffer_idx += kPrimeGeneratorRemainder;
1721     if (buffer_idx >= kNumBlocksBuffered) {
1722       buffer_idx -= kNumBlocksBuffered;
1723     }
1724   }
1725 
1726   // final data block flushed, now we can generate dictionary from the samples.
1727   // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1728   std::string dict;
1729   if (r->compression_opts.zstd_max_train_bytes > 0) {
1730     dict = ZSTD_TrainDictionary(compression_dict_samples,
1731                                 compression_dict_sample_lens,
1732                                 r->compression_opts.max_dict_bytes);
1733   } else {
1734     dict = std::move(compression_dict_samples);
1735   }
1736   r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1737                                                 r->compression_opts.level));
1738   r->verify_dict.reset(new UncompressionDict(
1739       dict, r->compression_type == kZSTD ||
1740                 r->compression_type == kZSTDNotFinalCompression));
1741 
1742   auto get_iterator_for_block = [&r](size_t i) {
1743     auto& data_block = r->data_block_buffers[i];
1744     assert(!data_block.empty());
1745 
1746     Block reader{BlockContents{data_block}};
1747     DataBlockIter* iter = reader.NewDataIterator(
1748         r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber);
1749 
1750     iter->SeekToFirst();
1751     assert(iter->Valid());
1752     return std::unique_ptr<DataBlockIter>(iter);
1753   };
1754 
1755   std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
1756 
1757   for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
1758     if (iter == nullptr) {
1759       iter = get_iterator_for_block(i);
1760       assert(iter != nullptr);
1761     };
1762 
1763     if (i + 1 < r->data_block_buffers.size()) {
1764       next_block_iter = get_iterator_for_block(i + 1);
1765     }
1766 
1767     auto& data_block = r->data_block_buffers[i];
1768 
1769     if (r->IsParallelCompressionEnabled()) {
1770       Slice first_key_in_next_block;
1771       const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1772       if (i + 1 < r->data_block_buffers.size()) {
1773         assert(next_block_iter != nullptr);
1774         first_key_in_next_block = next_block_iter->key();
1775       } else {
1776         first_key_in_next_block_ptr = r->first_key_in_next_block;
1777       }
1778 
1779       std::vector<std::string> keys;
1780       for (; iter->Valid(); iter->Next()) {
1781         keys.emplace_back(iter->key().ToString());
1782       }
1783 
1784       ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
1785           r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
1786 
1787       assert(block_rep != nullptr);
1788       r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
1789                                                r->get_offset());
1790       r->pc_rep->EmitBlock(block_rep);
1791     } else {
1792       for (; iter->Valid(); iter->Next()) {
1793         Slice key = iter->key();
1794         if (r->filter_builder != nullptr) {
1795           size_t ts_sz =
1796               r->internal_comparator.user_comparator()->timestamp_size();
1797           r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1798         }
1799         r->index_builder->OnKeyAdded(key);
1800       }
1801       WriteBlock(Slice(data_block), &r->pending_handle,
1802                  true /* is_data_block */);
1803       if (ok() && i + 1 < r->data_block_buffers.size()) {
1804         assert(next_block_iter != nullptr);
1805         Slice first_key_in_next_block = next_block_iter->key();
1806 
1807         Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1808 
1809         iter->SeekToLast();
1810         std::string last_key = iter->key().ToString();
1811         r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
1812                                         r->pending_handle);
1813       }
1814     }
1815 
1816     std::swap(iter, next_block_iter);
1817   }
1818   r->data_block_buffers.clear();
1819 }
1820 
Finish()1821 Status BlockBasedTableBuilder::Finish() {
1822   Rep* r = rep_;
1823   assert(r->state != Rep::State::kClosed);
1824   bool empty_data_block = r->data_block.empty();
1825   r->first_key_in_next_block = nullptr;
1826   Flush();
1827   if (r->state == Rep::State::kBuffered) {
1828     EnterUnbuffered();
1829   }
1830   if (r->IsParallelCompressionEnabled()) {
1831     StopParallelCompression();
1832 #ifndef NDEBUG
1833     for (const auto& br : r->pc_rep->block_rep_buf) {
1834       assert(br.status.ok());
1835     }
1836 #endif  // !NDEBUG
1837   } else {
1838     // To make sure properties block is able to keep the accurate size of index
1839     // block, we will finish writing all index entries first.
1840     if (ok() && !empty_data_block) {
1841       r->index_builder->AddIndexEntry(
1842           &r->last_key, nullptr /* no next data block */, r->pending_handle);
1843     }
1844   }
1845 
1846   // Write meta blocks, metaindex block and footer in the following order.
1847   //    1. [meta block: filter]
1848   //    2. [meta block: index]
1849   //    3. [meta block: compression dictionary]
1850   //    4. [meta block: range deletion tombstone]
1851   //    5. [meta block: properties]
1852   //    6. [metaindex block]
1853   //    7. Footer
1854   BlockHandle metaindex_block_handle, index_block_handle;
1855   MetaIndexBuilder meta_index_builder;
1856   WriteFilterBlock(&meta_index_builder);
1857   WriteIndexBlock(&meta_index_builder, &index_block_handle);
1858   WriteCompressionDictBlock(&meta_index_builder);
1859   WriteRangeDelBlock(&meta_index_builder);
1860   WritePropertiesBlock(&meta_index_builder);
1861   if (ok()) {
1862     // flush the meta index block
1863     WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1864                   &metaindex_block_handle);
1865   }
1866   if (ok()) {
1867     WriteFooter(metaindex_block_handle, index_block_handle);
1868   }
1869   r->state = Rep::State::kClosed;
1870   r->SetStatus(r->CopyIOStatus());
1871   Status ret_status = r->CopyStatus();
1872   assert(!ret_status.ok() || io_status().ok());
1873   return ret_status;
1874 }
1875 
Abandon()1876 void BlockBasedTableBuilder::Abandon() {
1877   assert(rep_->state != Rep::State::kClosed);
1878   if (rep_->IsParallelCompressionEnabled()) {
1879     StopParallelCompression();
1880   }
1881   rep_->state = Rep::State::kClosed;
1882   rep_->CopyStatus().PermitUncheckedError();
1883   rep_->CopyIOStatus().PermitUncheckedError();
1884 }
1885 
NumEntries() const1886 uint64_t BlockBasedTableBuilder::NumEntries() const {
1887   return rep_->props.num_entries;
1888 }
1889 
IsEmpty() const1890 bool BlockBasedTableBuilder::IsEmpty() const {
1891   return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
1892 }
1893 
FileSize() const1894 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1895 
EstimatedFileSize() const1896 uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
1897   if (rep_->IsParallelCompressionEnabled()) {
1898     // Use compression ratio so far and inflight raw bytes to estimate
1899     // final SST size.
1900     return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
1901   } else {
1902     return FileSize();
1903   }
1904 }
1905 
NeedCompact() const1906 bool BlockBasedTableBuilder::NeedCompact() const {
1907   for (const auto& collector : rep_->table_properties_collectors) {
1908     if (collector->NeedCompact()) {
1909       return true;
1910     }
1911   }
1912   return false;
1913 }
1914 
GetTableProperties() const1915 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1916   TableProperties ret = rep_->props;
1917   for (const auto& collector : rep_->table_properties_collectors) {
1918     for (const auto& prop : collector->GetReadableProperties()) {
1919       ret.readable_properties.insert(prop);
1920     }
1921     collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
1922   }
1923   return ret;
1924 }
1925 
GetFileChecksum() const1926 std::string BlockBasedTableBuilder::GetFileChecksum() const {
1927   if (rep_->file != nullptr) {
1928     return rep_->file->GetFileChecksum();
1929   } else {
1930     return kUnknownFileChecksum;
1931   }
1932 }
1933 
GetFileChecksumFuncName() const1934 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1935   if (rep_->file != nullptr) {
1936     return rep_->file->GetFileChecksumFuncName();
1937   } else {
1938     return kUnknownFileChecksumFuncName;
1939   }
1940 }
1941 
1942 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1943 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1944 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1945     "partitionedfilter.";
1946 }  // namespace ROCKSDB_NAMESPACE
1947