1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "leveldb/table_builder.h"
6 
7 #include <assert.h>
8 
9 #include "leveldb/comparator.h"
10 #include "leveldb/env.h"
11 #include "leveldb/filter_policy.h"
12 #include "leveldb/options.h"
13 #include "table/block_builder.h"
14 #include "table/filter_block.h"
15 #include "table/format.h"
16 #include "util/coding.h"
17 #include "util/crc32c.h"
18 
19 namespace leveldb {
20 
21 struct TableBuilder::Rep {
Repleveldb::TableBuilder::Rep22   Rep(const Options& opt, WritableFile* f)
23       : options(opt),
24         index_block_options(opt),
25         file(f),
26         offset(0),
27         data_block(&options),
28         index_block(&index_block_options),
29         num_entries(0),
30         closed(false),
31         filter_block(opt.filter_policy == nullptr
32                          ? nullptr
33                          : new FilterBlockBuilder(opt.filter_policy)),
34         pending_index_entry(false) {
35     index_block_options.block_restart_interval = 1;
36   }
37 
38   Options options;
39   Options index_block_options;
40   WritableFile* file;
41   uint64_t offset;
42   Status status;
43   BlockBuilder data_block;
44   BlockBuilder index_block;
45   std::string last_key;
46   int64_t num_entries;
47   bool closed;  // Either Finish() or Abandon() has been called.
48   FilterBlockBuilder* filter_block;
49 
50   // We do not emit the index entry for a block until we have seen the
51   // first key for the next data block.  This allows us to use shorter
52   // keys in the index block.  For example, consider a block boundary
53   // between the keys "the quick brown fox" and "the who".  We can use
54   // "the r" as the key for the index block entry since it is >= all
55   // entries in the first block and < all entries in subsequent
56   // blocks.
57   //
58   // Invariant: r->pending_index_entry is true only if data_block is empty.
59   bool pending_index_entry;
60   BlockHandle pending_handle;  // Handle to add to index block
61 
62   std::string compressed_output;
63 };
64 
TableBuilder(const Options & options,WritableFile * file)65 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
66     : rep_(new Rep(options, file)) {
67   if (rep_->filter_block != nullptr) {
68     rep_->filter_block->StartBlock(0);
69   }
70 }
71 
~TableBuilder()72 TableBuilder::~TableBuilder() {
73   assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
74   delete rep_->filter_block;
75   delete rep_;
76 }
77 
ChangeOptions(const Options & options)78 Status TableBuilder::ChangeOptions(const Options& options) {
79   // Note: if more fields are added to Options, update
80   // this function to catch changes that should not be allowed to
81   // change in the middle of building a Table.
82   if (options.comparator != rep_->options.comparator) {
83     return Status::InvalidArgument("changing comparator while building table");
84   }
85 
86   // Note that any live BlockBuilders point to rep_->options and therefore
87   // will automatically pick up the updated options.
88   rep_->options = options;
89   rep_->index_block_options = options;
90   rep_->index_block_options.block_restart_interval = 1;
91   return Status::OK();
92 }
93 
Add(const Slice & key,const Slice & value)94 void TableBuilder::Add(const Slice& key, const Slice& value) {
95   Rep* r = rep_;
96   assert(!r->closed);
97   if (!ok()) return;
98   if (r->num_entries > 0) {
99     assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
100   }
101 
102   if (r->pending_index_entry) {
103     assert(r->data_block.empty());
104     r->options.comparator->FindShortestSeparator(&r->last_key, key);
105     std::string handle_encoding;
106     r->pending_handle.EncodeTo(&handle_encoding);
107     r->index_block.Add(r->last_key, Slice(handle_encoding));
108     r->pending_index_entry = false;
109   }
110 
111   if (r->filter_block != nullptr) {
112     r->filter_block->AddKey(key);
113   }
114 
115   r->last_key.assign(key.data(), key.size());
116   r->num_entries++;
117   r->data_block.Add(key, value);
118 
119   const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
120   if (estimated_block_size >= r->options.block_size) {
121     Flush();
122   }
123 }
124 
Flush()125 void TableBuilder::Flush() {
126   Rep* r = rep_;
127   assert(!r->closed);
128   if (!ok()) return;
129   if (r->data_block.empty()) return;
130   assert(!r->pending_index_entry);
131   WriteBlock(&r->data_block, &r->pending_handle);
132   if (ok()) {
133     r->pending_index_entry = true;
134     r->status = r->file->Flush();
135   }
136   if (r->filter_block != nullptr) {
137     r->filter_block->StartBlock(r->offset);
138   }
139 }
140 
WriteBlock(BlockBuilder * block,BlockHandle * handle)141 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
142   // File format contains a sequence of blocks where each block has:
143   //    block_data: uint8[n]
144   //    type: uint8
145   //    crc: uint32
146   assert(ok());
147   Rep* r = rep_;
148   Slice raw = block->Finish();
149 
150   Slice block_contents;
151   CompressionType type = r->options.compression;
152   // TODO(postrelease): Support more compression options: zlib?
153   switch (type) {
154     case kNoCompression:
155       block_contents = raw;
156       break;
157 
158     case kSnappyCompression: {
159       std::string* compressed = &r->compressed_output;
160       if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
161           compressed->size() < raw.size() - (raw.size() / 8u)) {
162         block_contents = *compressed;
163       } else {
164         // Snappy not supported, or compressed less than 12.5%, so just
165         // store uncompressed form
166         block_contents = raw;
167         type = kNoCompression;
168       }
169       break;
170     }
171   }
172   WriteRawBlock(block_contents, type, handle);
173   r->compressed_output.clear();
174   block->Reset();
175 }
176 
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle)177 void TableBuilder::WriteRawBlock(const Slice& block_contents,
178                                  CompressionType type, BlockHandle* handle) {
179   Rep* r = rep_;
180   handle->set_offset(r->offset);
181   handle->set_size(block_contents.size());
182   r->status = r->file->Append(block_contents);
183   if (r->status.ok()) {
184     char trailer[kBlockTrailerSize];
185     trailer[0] = type;
186     uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
187     crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
188     EncodeFixed32(trailer + 1, crc32c::Mask(crc));
189     r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
190     if (r->status.ok()) {
191       r->offset += block_contents.size() + kBlockTrailerSize;
192     }
193   }
194 }
195 
status() const196 Status TableBuilder::status() const { return rep_->status; }
197 
Finish()198 Status TableBuilder::Finish() {
199   Rep* r = rep_;
200   Flush();
201   assert(!r->closed);
202   r->closed = true;
203 
204   BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
205 
206   // Write filter block
207   if (ok() && r->filter_block != nullptr) {
208     WriteRawBlock(r->filter_block->Finish(), kNoCompression,
209                   &filter_block_handle);
210   }
211 
212   // Write metaindex block
213   if (ok()) {
214     BlockBuilder meta_index_block(&r->options);
215     if (r->filter_block != nullptr) {
216       // Add mapping from "filter.Name" to location of filter data
217       std::string key = "filter.";
218       key.append(r->options.filter_policy->Name());
219       std::string handle_encoding;
220       filter_block_handle.EncodeTo(&handle_encoding);
221       meta_index_block.Add(key, handle_encoding);
222     }
223 
224     // TODO(postrelease): Add stats and other meta blocks
225     WriteBlock(&meta_index_block, &metaindex_block_handle);
226   }
227 
228   // Write index block
229   if (ok()) {
230     if (r->pending_index_entry) {
231       r->options.comparator->FindShortSuccessor(&r->last_key);
232       std::string handle_encoding;
233       r->pending_handle.EncodeTo(&handle_encoding);
234       r->index_block.Add(r->last_key, Slice(handle_encoding));
235       r->pending_index_entry = false;
236     }
237     WriteBlock(&r->index_block, &index_block_handle);
238   }
239 
240   // Write footer
241   if (ok()) {
242     Footer footer;
243     footer.set_metaindex_handle(metaindex_block_handle);
244     footer.set_index_handle(index_block_handle);
245     std::string footer_encoding;
246     footer.EncodeTo(&footer_encoding);
247     r->status = r->file->Append(footer_encoding);
248     if (r->status.ok()) {
249       r->offset += footer_encoding.size();
250     }
251   }
252   return r->status;
253 }
254 
Abandon()255 void TableBuilder::Abandon() {
256   Rep* r = rep_;
257   assert(!r->closed);
258   r->closed = true;
259 }
260 
NumEntries() const261 uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
262 
FileSize() const263 uint64_t TableBuilder::FileSize() const { return rep_->offset; }
264 
265 }  // namespace leveldb
266