1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <functional>
11 #include <string>
12 #include <utility>
13 #include <vector>
14
15 #include "db/arena_wrapped_db_iter.h"
16 #include "db/column_family.h"
17 #include "db/db_iter.h"
18 #include "db/db_test_util.h"
19 #include "db/dbformat.h"
20 #include "db/write_batch_internal.h"
21 #include "port/port.h"
22 #include "port/stack_trace.h"
23 #include "util/string_util.h"
24 #include "utilities/merge_operators.h"
25
26 namespace ROCKSDB_NAMESPACE {
27
28 // kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb
29 // should accept the value type on write, and report not supported value
30 // for reads, unless caller request for it explicitly. The base rocksdb
31 // doesn't understand format of actual blob index (the value).
32 class DBBlobIndexTest : public DBTestBase {
33 public:
34 enum Tier {
35 kMemtable = 0,
36 kImmutableMemtables = 1,
37 kL0SstFile = 2,
38 kLnSstFile = 3,
39 };
40 const std::vector<Tier> kAllTiers = {Tier::kMemtable,
41 Tier::kImmutableMemtables,
42 Tier::kL0SstFile, Tier::kLnSstFile};
43
DBBlobIndexTest()44 DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {}
45
cfh()46 ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }
47
cfd()48 ColumnFamilyData* cfd() {
49 return reinterpret_cast<ColumnFamilyHandleImpl*>(cfh())->cfd();
50 }
51
PutBlobIndex(WriteBatch * batch,const Slice & key,const Slice & blob_index)52 Status PutBlobIndex(WriteBatch* batch, const Slice& key,
53 const Slice& blob_index) {
54 return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key,
55 blob_index);
56 }
57
Write(WriteBatch * batch)58 Status Write(WriteBatch* batch) {
59 return dbfull()->Write(WriteOptions(), batch);
60 }
61
GetImpl(const Slice & key,bool * is_blob_index=nullptr,const Snapshot * snapshot=nullptr)62 std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr,
63 const Snapshot* snapshot = nullptr) {
64 ReadOptions read_options;
65 read_options.snapshot = snapshot;
66 PinnableSlice value;
67 DBImpl::GetImplOptions get_impl_options;
68 get_impl_options.column_family = cfh();
69 get_impl_options.value = &value;
70 get_impl_options.is_blob_index = is_blob_index;
71 auto s = dbfull()->GetImpl(read_options, key, get_impl_options);
72 if (s.IsNotFound()) {
73 return "NOT_FOUND";
74 }
75 if (s.IsNotSupported()) {
76 return "NOT_SUPPORTED";
77 }
78 if (!s.ok()) {
79 return s.ToString();
80 }
81 return value.ToString();
82 }
83
GetBlobIndex(const Slice & key,const Snapshot * snapshot=nullptr)84 std::string GetBlobIndex(const Slice& key,
85 const Snapshot* snapshot = nullptr) {
86 bool is_blob_index = false;
87 std::string value = GetImpl(key, &is_blob_index, snapshot);
88 if (!is_blob_index) {
89 return "NOT_BLOB";
90 }
91 return value;
92 }
93
GetBlobIterator()94 ArenaWrappedDBIter* GetBlobIterator() {
95 return dbfull()->NewIteratorImpl(
96 ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
97 nullptr /*read_callback*/, true /*allow_blob*/);
98 }
99
GetTestOptions()100 Options GetTestOptions() {
101 Options options;
102 options.create_if_missing = true;
103 options.num_levels = 2;
104 options.disable_auto_compactions = true;
105 // Disable auto flushes.
106 options.max_write_buffer_number = 10;
107 options.min_write_buffer_number_to_merge = 10;
108 options.merge_operator = MergeOperators::CreateStringAppendOperator();
109 return options;
110 }
111
MoveDataTo(Tier tier)112 void MoveDataTo(Tier tier) {
113 switch (tier) {
114 case Tier::kMemtable:
115 break;
116 case Tier::kImmutableMemtables:
117 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
118 break;
119 case Tier::kL0SstFile:
120 ASSERT_OK(Flush());
121 break;
122 case Tier::kLnSstFile:
123 ASSERT_OK(Flush());
124 ASSERT_OK(Put("a", "dummy"));
125 ASSERT_OK(Put("z", "dummy"));
126 ASSERT_OK(Flush());
127 ASSERT_OK(
128 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
129 #ifndef ROCKSDB_LITE
130 ASSERT_EQ("0,1", FilesPerLevel());
131 #endif // !ROCKSDB_LITE
132 break;
133 }
134 }
135 };
136
137 // Should be able to write kTypeBlobIndex to memtables and SST files.
TEST_F(DBBlobIndexTest,Write)138 TEST_F(DBBlobIndexTest, Write) {
139 for (auto tier : kAllTiers) {
140 DestroyAndReopen(GetTestOptions());
141 for (int i = 1; i <= 5; i++) {
142 std::string index = ToString(i);
143 WriteBatch batch;
144 ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index));
145 ASSERT_OK(Write(&batch));
146 }
147 MoveDataTo(tier);
148 for (int i = 1; i <= 5; i++) {
149 std::string index = ToString(i);
150 ASSERT_EQ("blob" + index, GetBlobIndex("key" + index));
151 }
152 }
153 }
154
155 // Get should be able to return blob index if is_blob_index is provided,
156 // otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest,Get)157 TEST_F(DBBlobIndexTest, Get) {
158 for (auto tier : kAllTiers) {
159 DestroyAndReopen(GetTestOptions());
160 WriteBatch batch;
161 ASSERT_OK(batch.Put("key", "value"));
162 ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index"));
163 ASSERT_OK(Write(&batch));
164 MoveDataTo(tier);
165 // Verify normal value
166 bool is_blob_index = false;
167 PinnableSlice value;
168 ASSERT_EQ("value", Get("key"));
169 ASSERT_EQ("value", GetImpl("key"));
170 ASSERT_EQ("value", GetImpl("key", &is_blob_index));
171 ASSERT_FALSE(is_blob_index);
172 // Verify blob index
173 ASSERT_TRUE(Get("blob_key", &value).IsNotSupported());
174 ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key"));
175 ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index));
176 ASSERT_TRUE(is_blob_index);
177 }
178 }
179
180 // Get should NOT return Status::NotSupported if blob index is updated with
181 // a normal value.
TEST_F(DBBlobIndexTest,Updated)182 TEST_F(DBBlobIndexTest, Updated) {
183 for (auto tier : kAllTiers) {
184 DestroyAndReopen(GetTestOptions());
185 WriteBatch batch;
186 for (int i = 0; i < 10; i++) {
187 ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index"));
188 }
189 ASSERT_OK(Write(&batch));
190 // Avoid blob values from being purged.
191 const Snapshot* snapshot = dbfull()->GetSnapshot();
192 ASSERT_OK(Put("key1", "new_value"));
193 ASSERT_OK(Merge("key2", "a"));
194 ASSERT_OK(Merge("key2", "b"));
195 ASSERT_OK(Merge("key2", "c"));
196 ASSERT_OK(Delete("key3"));
197 ASSERT_OK(SingleDelete("key4"));
198 ASSERT_OK(Delete("key5"));
199 ASSERT_OK(Merge("key5", "a"));
200 ASSERT_OK(Merge("key5", "b"));
201 ASSERT_OK(Merge("key5", "c"));
202 ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9"));
203 MoveDataTo(tier);
204 for (int i = 0; i < 10; i++) {
205 ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot));
206 }
207 ASSERT_EQ("new_value", Get("key1"));
208 ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2"));
209 ASSERT_EQ("NOT_FOUND", Get("key3"));
210 ASSERT_EQ("NOT_FOUND", Get("key4"));
211 ASSERT_EQ("a,b,c", GetImpl("key5"));
212 for (int i = 6; i < 9; i++) {
213 ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
214 }
215 ASSERT_EQ("blob_index", GetBlobIndex("key9"));
216 dbfull()->ReleaseSnapshot(snapshot);
217 }
218 }
219
220 // Iterator should get blob value if allow_blob flag is set,
221 // otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest,Iterate)222 TEST_F(DBBlobIndexTest, Iterate) {
223 const std::vector<std::vector<ValueType>> data = {
224 /*00*/ {kTypeValue},
225 /*01*/ {kTypeBlobIndex},
226 /*02*/ {kTypeValue},
227 /*03*/ {kTypeBlobIndex, kTypeValue},
228 /*04*/ {kTypeValue},
229 /*05*/ {kTypeValue, kTypeBlobIndex},
230 /*06*/ {kTypeValue},
231 /*07*/ {kTypeDeletion, kTypeBlobIndex},
232 /*08*/ {kTypeValue},
233 /*09*/ {kTypeSingleDeletion, kTypeBlobIndex},
234 /*10*/ {kTypeValue},
235 /*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex},
236 /*12*/ {kTypeValue},
237 /*13*/
238 {kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex},
239 /*14*/ {kTypeValue},
240 /*15*/ {kTypeBlobIndex},
241 /*16*/ {kTypeValue},
242 };
243
244 auto get_key = [](int index) {
245 char buf[20];
246 snprintf(buf, sizeof(buf), "%02d", index);
247 return "key" + std::string(buf);
248 };
249
250 auto get_value = [&](int index, int version) {
251 return get_key(index) + "_value" + ToString(version);
252 };
253
254 auto check_iterator = [&](Iterator* iterator, Status::Code expected_status,
255 const Slice& expected_value) {
256 ASSERT_EQ(expected_status, iterator->status().code());
257 if (expected_status == Status::kOk) {
258 ASSERT_TRUE(iterator->Valid());
259 ASSERT_EQ(expected_value, iterator->value());
260 } else {
261 ASSERT_FALSE(iterator->Valid());
262 }
263 };
264
265 auto create_normal_iterator = [&]() -> Iterator* {
266 return dbfull()->NewIterator(ReadOptions());
267 };
268
269 auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); };
270
271 auto check_is_blob = [&](bool is_blob) {
272 return [is_blob](Iterator* iterator) {
273 ASSERT_EQ(is_blob,
274 reinterpret_cast<ArenaWrappedDBIter*>(iterator)->IsBlob());
275 };
276 };
277
278 auto verify = [&](int index, Status::Code expected_status,
279 const Slice& forward_value, const Slice& backward_value,
280 std::function<Iterator*()> create_iterator,
281 std::function<void(Iterator*)> extra_check = nullptr) {
282 // Seek
283 auto* iterator = create_iterator();
284 ASSERT_OK(iterator->Refresh());
285 iterator->Seek(get_key(index));
286 check_iterator(iterator, expected_status, forward_value);
287 if (extra_check) {
288 extra_check(iterator);
289 }
290 delete iterator;
291
292 // Next
293 iterator = create_iterator();
294 ASSERT_OK(iterator->Refresh());
295 iterator->Seek(get_key(index - 1));
296 ASSERT_TRUE(iterator->Valid());
297 iterator->Next();
298 check_iterator(iterator, expected_status, forward_value);
299 if (extra_check) {
300 extra_check(iterator);
301 }
302 delete iterator;
303
304 // SeekForPrev
305 iterator = create_iterator();
306 ASSERT_OK(iterator->Refresh());
307 iterator->SeekForPrev(get_key(index));
308 check_iterator(iterator, expected_status, backward_value);
309 if (extra_check) {
310 extra_check(iterator);
311 }
312 delete iterator;
313
314 // Prev
315 iterator = create_iterator();
316 iterator->Seek(get_key(index + 1));
317 ASSERT_TRUE(iterator->Valid());
318 iterator->Prev();
319 check_iterator(iterator, expected_status, backward_value);
320 if (extra_check) {
321 extra_check(iterator);
322 }
323 delete iterator;
324 };
325
326 for (auto tier : {Tier::kMemtable} /*kAllTiers*/) {
327 // Avoid values from being purged.
328 std::vector<const Snapshot*> snapshots;
329 DestroyAndReopen(GetTestOptions());
330
331 // fill data
332 for (int i = 0; i < static_cast<int>(data.size()); i++) {
333 for (int j = static_cast<int>(data[i].size()) - 1; j >= 0; j--) {
334 std::string key = get_key(i);
335 std::string value = get_value(i, j);
336 WriteBatch batch;
337 switch (data[i][j]) {
338 case kTypeValue:
339 ASSERT_OK(Put(key, value));
340 break;
341 case kTypeDeletion:
342 ASSERT_OK(Delete(key));
343 break;
344 case kTypeSingleDeletion:
345 ASSERT_OK(SingleDelete(key));
346 break;
347 case kTypeMerge:
348 ASSERT_OK(Merge(key, value));
349 break;
350 case kTypeBlobIndex:
351 ASSERT_OK(PutBlobIndex(&batch, key, value));
352 ASSERT_OK(Write(&batch));
353 break;
354 default:
355 assert(false);
356 };
357 }
358 snapshots.push_back(dbfull()->GetSnapshot());
359 }
360 ASSERT_OK(
361 dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16)));
362 snapshots.push_back(dbfull()->GetSnapshot());
363 MoveDataTo(tier);
364
365 // Normal iterator
366 verify(1, Status::kNotSupported, "", "", create_normal_iterator);
367 verify(3, Status::kNotSupported, "", "", create_normal_iterator);
368 verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
369 create_normal_iterator);
370 verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
371 create_normal_iterator);
372 verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
373 create_normal_iterator);
374 verify(11, Status::kNotSupported, "", "", create_normal_iterator);
375 verify(13, Status::kOk,
376 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
377 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
378 create_normal_iterator);
379 verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
380 create_normal_iterator);
381
382 // Iterator with blob support
383 verify(1, Status::kOk, get_value(1, 0), get_value(1, 0),
384 create_blob_iterator, check_is_blob(true));
385 verify(3, Status::kOk, get_value(3, 0), get_value(3, 0),
386 create_blob_iterator, check_is_blob(true));
387 verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
388 create_blob_iterator, check_is_blob(false));
389 verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
390 create_blob_iterator, check_is_blob(false));
391 verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
392 create_blob_iterator, check_is_blob(false));
393 verify(11, Status::kNotSupported, "", "", create_blob_iterator);
394 verify(13, Status::kOk,
395 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
396 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
397 create_blob_iterator, check_is_blob(false));
398 verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
399 create_blob_iterator, check_is_blob(false));
400
401 #ifndef ROCKSDB_LITE
402 // Iterator with blob support and using seek.
403 ASSERT_OK(dbfull()->SetOptions(
404 cfh(), {{"max_sequential_skip_in_iterations", "0"}}));
405 verify(1, Status::kOk, get_value(1, 0), get_value(1, 0),
406 create_blob_iterator, check_is_blob(true));
407 verify(3, Status::kOk, get_value(3, 0), get_value(3, 0),
408 create_blob_iterator, check_is_blob(true));
409 verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
410 create_blob_iterator, check_is_blob(false));
411 verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
412 create_blob_iterator, check_is_blob(false));
413 verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
414 create_blob_iterator, check_is_blob(false));
415 verify(11, Status::kNotSupported, "", "", create_blob_iterator);
416 verify(13, Status::kOk,
417 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
418 get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
419 create_blob_iterator, check_is_blob(false));
420 verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
421 create_blob_iterator, check_is_blob(false));
422 #endif // !ROCKSDB_LITE
423
424 for (auto* snapshot : snapshots) {
425 dbfull()->ReleaseSnapshot(snapshot);
426 }
427 }
428 }
429
430 } // namespace ROCKSDB_NAMESPACE
431
main(int argc,char ** argv)432 int main(int argc, char** argv) {
433 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
434 ::testing::InitGoogleTest(&argc, argv);
435 return RUN_ALL_TESTS();
436 }
437