1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "rocksdb/sst_file_writer.h"
7
8 #include <vector>
9
10 #include "db/dbformat.h"
11 #include "env/composite_env_wrapper.h"
12 #include "file/writable_file_writer.h"
13 #include "rocksdb/table.h"
14 #include "table/block_based/block_based_table_builder.h"
15 #include "table/sst_file_writer_collectors.h"
16 #include "test_util/sync_point.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 const std::string ExternalSstFilePropertyNames::kVersion =
21 "rocksdb.external_sst_file.version";
22 const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
23 "rocksdb.external_sst_file.global_seqno";
24
25 #ifndef ROCKSDB_LITE
26
27 const size_t kFadviseTrigger = 1024 * 1024; // 1MB
28
29 struct SstFileWriter::Rep {
RepROCKSDB_NAMESPACE::SstFileWriter::Rep30 Rep(const EnvOptions& _env_options, const Options& options,
31 Env::IOPriority _io_priority, const Comparator* _user_comparator,
32 ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
33 : env_options(_env_options),
34 ioptions(options),
35 mutable_cf_options(options),
36 io_priority(_io_priority),
37 internal_comparator(_user_comparator),
38 cfh(_cfh),
39 invalidate_page_cache(_invalidate_page_cache),
40 last_fadvise_size(0),
41 skip_filters(_skip_filters) {}
42
43 std::unique_ptr<WritableFileWriter> file_writer;
44 std::unique_ptr<TableBuilder> builder;
45 EnvOptions env_options;
46 ImmutableCFOptions ioptions;
47 MutableCFOptions mutable_cf_options;
48 Env::IOPriority io_priority;
49 InternalKeyComparator internal_comparator;
50 ExternalSstFileInfo file_info;
51 InternalKey ikey;
52 std::string column_family_name;
53 ColumnFamilyHandle* cfh;
54 // If true, We will give the OS a hint that this file pages is not needed
55 // every time we write 1MB to the file.
56 bool invalidate_page_cache;
57 // The size of the file during the last time we called Fadvise to remove
58 // cached pages from page cache.
59 uint64_t last_fadvise_size;
60 bool skip_filters;
AddROCKSDB_NAMESPACE::SstFileWriter::Rep61 Status Add(const Slice& user_key, const Slice& value,
62 const ValueType value_type) {
63 if (!builder) {
64 return Status::InvalidArgument("File is not opened");
65 }
66
67 if (file_info.num_entries == 0) {
68 file_info.smallest_key.assign(user_key.data(), user_key.size());
69 } else {
70 if (internal_comparator.user_comparator()->Compare(
71 user_key, file_info.largest_key) <= 0) {
72 // Make sure that keys are added in order
73 return Status::InvalidArgument(
74 "Keys must be added in strict ascending order.");
75 }
76 }
77
78 // TODO(tec) : For external SST files we could omit the seqno and type.
79 switch (value_type) {
80 case ValueType::kTypeValue:
81 ikey.Set(user_key, 0 /* Sequence Number */,
82 ValueType::kTypeValue /* Put */);
83 break;
84 case ValueType::kTypeMerge:
85 ikey.Set(user_key, 0 /* Sequence Number */,
86 ValueType::kTypeMerge /* Merge */);
87 break;
88 case ValueType::kTypeDeletion:
89 ikey.Set(user_key, 0 /* Sequence Number */,
90 ValueType::kTypeDeletion /* Delete */);
91 break;
92 default:
93 return Status::InvalidArgument("Value type is not supported");
94 }
95 builder->Add(ikey.Encode(), value);
96
97 // update file info
98 file_info.num_entries++;
99 file_info.largest_key.assign(user_key.data(), user_key.size());
100 file_info.file_size = builder->FileSize();
101
102 InvalidatePageCache(false /* closing */);
103
104 return Status::OK();
105 }
106
DeleteRangeROCKSDB_NAMESPACE::SstFileWriter::Rep107 Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
108 if (!builder) {
109 return Status::InvalidArgument("File is not opened");
110 }
111
112 RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
113 if (file_info.num_range_del_entries == 0) {
114 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
115 tombstone.start_key_.size());
116 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
117 tombstone.end_key_.size());
118 } else {
119 if (internal_comparator.user_comparator()->Compare(
120 tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
121 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
122 tombstone.start_key_.size());
123 }
124 if (internal_comparator.user_comparator()->Compare(
125 tombstone.end_key_, file_info.largest_range_del_key) > 0) {
126 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
127 tombstone.end_key_.size());
128 }
129 }
130
131 auto ikey_and_end_key = tombstone.Serialize();
132 builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
133
134 // update file info
135 file_info.num_range_del_entries++;
136 file_info.file_size = builder->FileSize();
137
138 InvalidatePageCache(false /* closing */);
139
140 return Status::OK();
141 }
142
InvalidatePageCacheROCKSDB_NAMESPACE::SstFileWriter::Rep143 void InvalidatePageCache(bool closing) {
144 if (invalidate_page_cache == false) {
145 // Fadvise disabled
146 return;
147 }
148 uint64_t bytes_since_last_fadvise =
149 builder->FileSize() - last_fadvise_size;
150 if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
151 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
152 &(bytes_since_last_fadvise));
153 // Tell the OS that we dont need this file in page cache
154 file_writer->InvalidateCache(0, 0);
155 last_fadvise_size = builder->FileSize();
156 }
157 }
158
159 };
160
SstFileWriter(const EnvOptions & env_options,const Options & options,const Comparator * user_comparator,ColumnFamilyHandle * column_family,bool invalidate_page_cache,Env::IOPriority io_priority,bool skip_filters)161 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
162 const Options& options,
163 const Comparator* user_comparator,
164 ColumnFamilyHandle* column_family,
165 bool invalidate_page_cache,
166 Env::IOPriority io_priority, bool skip_filters)
167 : rep_(new Rep(env_options, options, io_priority, user_comparator,
168 column_family, invalidate_page_cache, skip_filters)) {
169 rep_->file_info.file_size = 0;
170 }
171
~SstFileWriter()172 SstFileWriter::~SstFileWriter() {
173 if (rep_->builder) {
174 // User did not call Finish() or Finish() failed, we need to
175 // abandon the builder.
176 rep_->builder->Abandon();
177 }
178 }
179
Open(const std::string & file_path)180 Status SstFileWriter::Open(const std::string& file_path) {
181 Rep* r = rep_.get();
182 Status s;
183 std::unique_ptr<WritableFile> sst_file;
184 s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
185 if (!s.ok()) {
186 return s;
187 }
188
189 sst_file->SetIOPriority(r->io_priority);
190
191 CompressionType compression_type;
192 CompressionOptions compression_opts;
193 if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
194 compression_type = r->ioptions.bottommost_compression;
195 if (r->ioptions.bottommost_compression_opts.enabled) {
196 compression_opts = r->ioptions.bottommost_compression_opts;
197 } else {
198 compression_opts = r->ioptions.compression_opts;
199 }
200 } else if (!r->ioptions.compression_per_level.empty()) {
201 // Use the compression of the last level if we have per level compression
202 compression_type = *(r->ioptions.compression_per_level.rbegin());
203 compression_opts = r->ioptions.compression_opts;
204 } else {
205 compression_type = r->mutable_cf_options.compression;
206 compression_opts = r->ioptions.compression_opts;
207 }
208 uint64_t sample_for_compression =
209 r->mutable_cf_options.sample_for_compression;
210
211 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
212 int_tbl_prop_collector_factories;
213
214 // SstFileWriter properties collector to add SstFileWriter version.
215 int_tbl_prop_collector_factories.emplace_back(
216 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
217 0 /* global_seqno*/));
218
219 // User collector factories
220 auto user_collector_factories =
221 r->ioptions.table_properties_collector_factories;
222 for (size_t i = 0; i < user_collector_factories.size(); i++) {
223 int_tbl_prop_collector_factories.emplace_back(
224 new UserKeyTablePropertiesCollectorFactory(
225 user_collector_factories[i]));
226 }
227 int unknown_level = -1;
228 uint32_t cf_id;
229
230 if (r->cfh != nullptr) {
231 // user explicitly specified that this file will be ingested into cfh,
232 // we can persist this information in the file.
233 cf_id = r->cfh->GetID();
234 r->column_family_name = r->cfh->GetName();
235 } else {
236 r->column_family_name = "";
237 cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
238 }
239
240 TableBuilderOptions table_builder_options(
241 r->ioptions, r->mutable_cf_options, r->internal_comparator,
242 &int_tbl_prop_collector_factories, compression_type,
243 sample_for_compression, compression_opts, r->skip_filters,
244 r->column_family_name, unknown_level);
245 r->file_writer.reset(
246 new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(sst_file)),
247 file_path, r->env_options, r->ioptions.env,
248 nullptr /* stats */, r->ioptions.listeners));
249
250 // TODO(tec) : If table_factory is using compressed block cache, we will
251 // be adding the external sst file blocks into it, which is wasteful.
252 r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
253 table_builder_options, cf_id, r->file_writer.get()));
254
255 r->file_info = ExternalSstFileInfo();
256 r->file_info.file_path = file_path;
257 r->file_info.version = 2;
258 return s;
259 }
260
Add(const Slice & user_key,const Slice & value)261 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
262 return rep_->Add(user_key, value, ValueType::kTypeValue);
263 }
264
Put(const Slice & user_key,const Slice & value)265 Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
266 return rep_->Add(user_key, value, ValueType::kTypeValue);
267 }
268
Merge(const Slice & user_key,const Slice & value)269 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
270 return rep_->Add(user_key, value, ValueType::kTypeMerge);
271 }
272
Delete(const Slice & user_key)273 Status SstFileWriter::Delete(const Slice& user_key) {
274 return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
275 }
276
DeleteRange(const Slice & begin_key,const Slice & end_key)277 Status SstFileWriter::DeleteRange(const Slice& begin_key,
278 const Slice& end_key) {
279 return rep_->DeleteRange(begin_key, end_key);
280 }
281
Finish(ExternalSstFileInfo * file_info)282 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
283 Rep* r = rep_.get();
284 if (!r->builder) {
285 return Status::InvalidArgument("File is not opened");
286 }
287 if (r->file_info.num_entries == 0 &&
288 r->file_info.num_range_del_entries == 0) {
289 return Status::InvalidArgument("Cannot create sst file with no entries");
290 }
291
292 Status s = r->builder->Finish();
293 r->file_info.file_size = r->builder->FileSize();
294
295 if (s.ok()) {
296 s = r->file_writer->Sync(r->ioptions.use_fsync);
297 r->InvalidatePageCache(true /* closing */);
298 if (s.ok()) {
299 s = r->file_writer->Close();
300 }
301 }
302 if (!s.ok()) {
303 r->ioptions.env->DeleteFile(r->file_info.file_path);
304 }
305
306 if (file_info != nullptr) {
307 *file_info = r->file_info;
308 }
309
310 r->builder.reset();
311 return s;
312 }
313
FileSize()314 uint64_t SstFileWriter::FileSize() {
315 return rep_->file_info.file_size;
316 }
317 #endif // !ROCKSDB_LITE
318
319 } // namespace ROCKSDB_NAMESPACE
320