1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <functional>
11 #include <string>
12 #include <utility>
13 #include <vector>
14 
15 #include "db/arena_wrapped_db_iter.h"
16 #include "db/column_family.h"
17 #include "db/db_iter.h"
18 #include "db/db_test_util.h"
19 #include "db/dbformat.h"
20 #include "db/write_batch_internal.h"
21 #include "port/port.h"
22 #include "port/stack_trace.h"
23 #include "util/string_util.h"
24 #include "utilities/merge_operators.h"
25 
26 namespace ROCKSDB_NAMESPACE {
27 
28 // kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb
29 // should accept the value type on write, and report not supported value
30 // for reads, unless caller request for it explicitly. The base rocksdb
31 // doesn't understand format of actual blob index (the value).
32 class DBBlobIndexTest : public DBTestBase {
33  public:
34   enum Tier {
35     kMemtable = 0,
36     kImmutableMemtables = 1,
37     kL0SstFile = 2,
38     kLnSstFile = 3,
39   };
40   const std::vector<Tier> kAllTiers = {Tier::kMemtable,
41                                        Tier::kImmutableMemtables,
42                                        Tier::kL0SstFile, Tier::kLnSstFile};
43 
DBBlobIndexTest()44   DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {}
45 
cfh()46   ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }
47 
cfd()48   ColumnFamilyData* cfd() {
49     return reinterpret_cast<ColumnFamilyHandleImpl*>(cfh())->cfd();
50   }
51 
PutBlobIndex(WriteBatch * batch,const Slice & key,const Slice & blob_index)52   Status PutBlobIndex(WriteBatch* batch, const Slice& key,
53                       const Slice& blob_index) {
54     return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key,
55                                             blob_index);
56   }
57 
Write(WriteBatch * batch)58   Status Write(WriteBatch* batch) {
59     return dbfull()->Write(WriteOptions(), batch);
60   }
61 
GetImpl(const Slice & key,bool * is_blob_index=nullptr,const Snapshot * snapshot=nullptr)62   std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr,
63                       const Snapshot* snapshot = nullptr) {
64     ReadOptions read_options;
65     read_options.snapshot = snapshot;
66     PinnableSlice value;
67     DBImpl::GetImplOptions get_impl_options;
68     get_impl_options.column_family = cfh();
69     get_impl_options.value = &value;
70     get_impl_options.is_blob_index = is_blob_index;
71     auto s = dbfull()->GetImpl(read_options, key, get_impl_options);
72     if (s.IsNotFound()) {
73       return "NOT_FOUND";
74     }
75     if (s.IsNotSupported()) {
76       return "NOT_SUPPORTED";
77     }
78     if (!s.ok()) {
79       return s.ToString();
80     }
81     return value.ToString();
82   }
83 
GetBlobIndex(const Slice & key,const Snapshot * snapshot=nullptr)84   std::string GetBlobIndex(const Slice& key,
85                            const Snapshot* snapshot = nullptr) {
86     bool is_blob_index = false;
87     std::string value = GetImpl(key, &is_blob_index, snapshot);
88     if (!is_blob_index) {
89       return "NOT_BLOB";
90     }
91     return value;
92   }
93 
GetBlobIterator()94   ArenaWrappedDBIter* GetBlobIterator() {
95     return dbfull()->NewIteratorImpl(
96         ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
97         nullptr /*read_callback*/, true /*allow_blob*/);
98   }
99 
GetTestOptions()100   Options GetTestOptions() {
101     Options options;
102     options.create_if_missing = true;
103     options.num_levels = 2;
104     options.disable_auto_compactions = true;
105     // Disable auto flushes.
106     options.max_write_buffer_number = 10;
107     options.min_write_buffer_number_to_merge = 10;
108     options.merge_operator = MergeOperators::CreateStringAppendOperator();
109     return options;
110   }
111 
MoveDataTo(Tier tier)112   void MoveDataTo(Tier tier) {
113     switch (tier) {
114       case Tier::kMemtable:
115         break;
116       case Tier::kImmutableMemtables:
117         ASSERT_OK(dbfull()->TEST_SwitchMemtable());
118         break;
119       case Tier::kL0SstFile:
120         ASSERT_OK(Flush());
121         break;
122       case Tier::kLnSstFile:
123         ASSERT_OK(Flush());
124         ASSERT_OK(Put("a", "dummy"));
125         ASSERT_OK(Put("z", "dummy"));
126         ASSERT_OK(Flush());
127         ASSERT_OK(
128             dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
129 #ifndef ROCKSDB_LITE
130         ASSERT_EQ("0,1", FilesPerLevel());
131 #endif  // !ROCKSDB_LITE
132         break;
133     }
134   }
135 };
136 
137 // Should be able to write kTypeBlobIndex to memtables and SST files.
TEST_F(DBBlobIndexTest,Write)138 TEST_F(DBBlobIndexTest, Write) {
139   for (auto tier : kAllTiers) {
140     DestroyAndReopen(GetTestOptions());
141     for (int i = 1; i <= 5; i++) {
142       std::string index = ToString(i);
143       WriteBatch batch;
144       ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index));
145       ASSERT_OK(Write(&batch));
146     }
147     MoveDataTo(tier);
148     for (int i = 1; i <= 5; i++) {
149       std::string index = ToString(i);
150       ASSERT_EQ("blob" + index, GetBlobIndex("key" + index));
151     }
152   }
153 }
154 
155 // Get should be able to return blob index if is_blob_index is provided,
156 // otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest,Get)157 TEST_F(DBBlobIndexTest, Get) {
158   for (auto tier : kAllTiers) {
159     DestroyAndReopen(GetTestOptions());
160     WriteBatch batch;
161     ASSERT_OK(batch.Put("key", "value"));
162     ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index"));
163     ASSERT_OK(Write(&batch));
164     MoveDataTo(tier);
165     // Verify normal value
166     bool is_blob_index = false;
167     PinnableSlice value;
168     ASSERT_EQ("value", Get("key"));
169     ASSERT_EQ("value", GetImpl("key"));
170     ASSERT_EQ("value", GetImpl("key", &is_blob_index));
171     ASSERT_FALSE(is_blob_index);
172     // Verify blob index
173     ASSERT_TRUE(Get("blob_key", &value).IsNotSupported());
174     ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key"));
175     ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index));
176     ASSERT_TRUE(is_blob_index);
177   }
178 }
179 
180 // Get should NOT return Status::NotSupported if blob index is updated with
181 // a normal value.
TEST_F(DBBlobIndexTest,Updated)182 TEST_F(DBBlobIndexTest, Updated) {
183   for (auto tier : kAllTiers) {
184     DestroyAndReopen(GetTestOptions());
185     WriteBatch batch;
186     for (int i = 0; i < 10; i++) {
187       ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index"));
188     }
189     ASSERT_OK(Write(&batch));
190     // Avoid blob values from being purged.
191     const Snapshot* snapshot = dbfull()->GetSnapshot();
192     ASSERT_OK(Put("key1", "new_value"));
193     ASSERT_OK(Merge("key2", "a"));
194     ASSERT_OK(Merge("key2", "b"));
195     ASSERT_OK(Merge("key2", "c"));
196     ASSERT_OK(Delete("key3"));
197     ASSERT_OK(SingleDelete("key4"));
198     ASSERT_OK(Delete("key5"));
199     ASSERT_OK(Merge("key5", "a"));
200     ASSERT_OK(Merge("key5", "b"));
201     ASSERT_OK(Merge("key5", "c"));
202     ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9"));
203     MoveDataTo(tier);
204     for (int i = 0; i < 10; i++) {
205       ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot));
206     }
207     ASSERT_EQ("new_value", Get("key1"));
208     ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2"));
209     ASSERT_EQ("NOT_FOUND", Get("key3"));
210     ASSERT_EQ("NOT_FOUND", Get("key4"));
211     ASSERT_EQ("a,b,c", GetImpl("key5"));
212     for (int i = 6; i < 9; i++) {
213       ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
214     }
215     ASSERT_EQ("blob_index", GetBlobIndex("key9"));
216     dbfull()->ReleaseSnapshot(snapshot);
217   }
218 }
219 
220 // Iterator should get blob value if allow_blob flag is set,
221 // otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest,Iterate)222 TEST_F(DBBlobIndexTest, Iterate) {
223   const std::vector<std::vector<ValueType>> data = {
224       /*00*/ {kTypeValue},
225       /*01*/ {kTypeBlobIndex},
226       /*02*/ {kTypeValue},
227       /*03*/ {kTypeBlobIndex, kTypeValue},
228       /*04*/ {kTypeValue},
229       /*05*/ {kTypeValue, kTypeBlobIndex},
230       /*06*/ {kTypeValue},
231       /*07*/ {kTypeDeletion, kTypeBlobIndex},
232       /*08*/ {kTypeValue},
233       /*09*/ {kTypeSingleDeletion, kTypeBlobIndex},
234       /*10*/ {kTypeValue},
235       /*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex},
236       /*12*/ {kTypeValue},
237       /*13*/
238       {kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex},
239       /*14*/ {kTypeValue},
240       /*15*/ {kTypeBlobIndex},
241       /*16*/ {kTypeValue},
242   };
243 
244   auto get_key = [](int index) {
245     char buf[20];
246     snprintf(buf, sizeof(buf), "%02d", index);
247     return "key" + std::string(buf);
248   };
249 
250   auto get_value = [&](int index, int version) {
251     return get_key(index) + "_value" + ToString(version);
252   };
253 
254   auto check_iterator = [&](Iterator* iterator, Status::Code expected_status,
255                             const Slice& expected_value) {
256     ASSERT_EQ(expected_status, iterator->status().code());
257     if (expected_status == Status::kOk) {
258       ASSERT_TRUE(iterator->Valid());
259       ASSERT_EQ(expected_value, iterator->value());
260     } else {
261       ASSERT_FALSE(iterator->Valid());
262     }
263   };
264 
265   auto create_normal_iterator = [&]() -> Iterator* {
266     return dbfull()->NewIterator(ReadOptions());
267   };
268 
269   auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); };
270 
271   auto check_is_blob = [&](bool is_blob) {
272     return [is_blob](Iterator* iterator) {
273       ASSERT_EQ(is_blob,
274                 reinterpret_cast<ArenaWrappedDBIter*>(iterator)->IsBlob());
275     };
276   };
277 
278   auto verify = [&](int index, Status::Code expected_status,
279                     const Slice& forward_value, const Slice& backward_value,
280                     std::function<Iterator*()> create_iterator,
281                     std::function<void(Iterator*)> extra_check = nullptr) {
282     // Seek
283     auto* iterator = create_iterator();
284     ASSERT_OK(iterator->Refresh());
285     iterator->Seek(get_key(index));
286     check_iterator(iterator, expected_status, forward_value);
287     if (extra_check) {
288       extra_check(iterator);
289     }
290     delete iterator;
291 
292     // Next
293     iterator = create_iterator();
294     ASSERT_OK(iterator->Refresh());
295     iterator->Seek(get_key(index - 1));
296     ASSERT_TRUE(iterator->Valid());
297     iterator->Next();
298     check_iterator(iterator, expected_status, forward_value);
299     if (extra_check) {
300       extra_check(iterator);
301     }
302     delete iterator;
303 
304     // SeekForPrev
305     iterator = create_iterator();
306     ASSERT_OK(iterator->Refresh());
307     iterator->SeekForPrev(get_key(index));
308     check_iterator(iterator, expected_status, backward_value);
309     if (extra_check) {
310       extra_check(iterator);
311     }
312     delete iterator;
313 
314     // Prev
315     iterator = create_iterator();
316     iterator->Seek(get_key(index + 1));
317     ASSERT_TRUE(iterator->Valid());
318     iterator->Prev();
319     check_iterator(iterator, expected_status, backward_value);
320     if (extra_check) {
321       extra_check(iterator);
322     }
323     delete iterator;
324   };
325 
326   for (auto tier : {Tier::kMemtable} /*kAllTiers*/) {
327     // Avoid values from being purged.
328     std::vector<const Snapshot*> snapshots;
329     DestroyAndReopen(GetTestOptions());
330 
331     // fill data
332     for (int i = 0; i < static_cast<int>(data.size()); i++) {
333       for (int j = static_cast<int>(data[i].size()) - 1; j >= 0; j--) {
334         std::string key = get_key(i);
335         std::string value = get_value(i, j);
336         WriteBatch batch;
337         switch (data[i][j]) {
338           case kTypeValue:
339             ASSERT_OK(Put(key, value));
340             break;
341           case kTypeDeletion:
342             ASSERT_OK(Delete(key));
343             break;
344           case kTypeSingleDeletion:
345             ASSERT_OK(SingleDelete(key));
346             break;
347           case kTypeMerge:
348             ASSERT_OK(Merge(key, value));
349             break;
350           case kTypeBlobIndex:
351             ASSERT_OK(PutBlobIndex(&batch, key, value));
352             ASSERT_OK(Write(&batch));
353             break;
354           default:
355             assert(false);
356         };
357       }
358       snapshots.push_back(dbfull()->GetSnapshot());
359     }
360     ASSERT_OK(
361         dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16)));
362     snapshots.push_back(dbfull()->GetSnapshot());
363     MoveDataTo(tier);
364 
365     // Normal iterator
366     verify(1, Status::kNotSupported, "", "", create_normal_iterator);
367     verify(3, Status::kNotSupported, "", "", create_normal_iterator);
368     verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
369            create_normal_iterator);
370     verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
371            create_normal_iterator);
372     verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
373            create_normal_iterator);
374     verify(11, Status::kNotSupported, "", "", create_normal_iterator);
375     verify(13, Status::kOk,
376            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
377            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
378            create_normal_iterator);
379     verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
380            create_normal_iterator);
381 
382     // Iterator with blob support
383     verify(1, Status::kOk, get_value(1, 0), get_value(1, 0),
384            create_blob_iterator, check_is_blob(true));
385     verify(3, Status::kOk, get_value(3, 0), get_value(3, 0),
386            create_blob_iterator, check_is_blob(true));
387     verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
388            create_blob_iterator, check_is_blob(false));
389     verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
390            create_blob_iterator, check_is_blob(false));
391     verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
392            create_blob_iterator, check_is_blob(false));
393     verify(11, Status::kNotSupported, "", "", create_blob_iterator);
394     verify(13, Status::kOk,
395            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
396            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
397            create_blob_iterator, check_is_blob(false));
398     verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
399            create_blob_iterator, check_is_blob(false));
400 
401 #ifndef ROCKSDB_LITE
402     // Iterator with blob support and using seek.
403     ASSERT_OK(dbfull()->SetOptions(
404         cfh(), {{"max_sequential_skip_in_iterations", "0"}}));
405     verify(1, Status::kOk, get_value(1, 0), get_value(1, 0),
406            create_blob_iterator, check_is_blob(true));
407     verify(3, Status::kOk, get_value(3, 0), get_value(3, 0),
408            create_blob_iterator, check_is_blob(true));
409     verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
410            create_blob_iterator, check_is_blob(false));
411     verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
412            create_blob_iterator, check_is_blob(false));
413     verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
414            create_blob_iterator, check_is_blob(false));
415     verify(11, Status::kNotSupported, "", "", create_blob_iterator);
416     verify(13, Status::kOk,
417            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
418            get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
419            create_blob_iterator, check_is_blob(false));
420     verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
421            create_blob_iterator, check_is_blob(false));
422 #endif  // !ROCKSDB_LITE
423 
424     for (auto* snapshot : snapshots) {
425       dbfull()->ReleaseSnapshot(snapshot);
426     }
427   }
428 }
429 
430 }  // namespace ROCKSDB_NAMESPACE
431 
main(int argc,char ** argv)432 int main(int argc, char** argv) {
433   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
434   ::testing::InitGoogleTest(&argc, argv);
435   return RUN_ALL_TESTS();
436 }
437