1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifndef ROCKSDB_LITE
11 
12 #include "rocksdb/utilities/write_batch_with_index.h"
13 #include <map>
14 #include <memory>
15 #include "db/column_family.h"
16 #include "port/stack_trace.h"
17 #include "test_util/testharness.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/merge_operators.h"
21 #include "utilities/merge_operators/string_append/stringappend.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 namespace {
26 class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
27  public:
28   explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator)
29       : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
30         id_(id),
31         comparator_(comparator) {}
32   uint32_t GetID() const override { return id_; }
33   const Comparator* GetComparator() const override { return comparator_; }
34 
35  private:
36   uint32_t id_;
37   const Comparator* comparator_;
38 };
39 
40 struct Entry {
41   std::string key;
42   std::string value;
43   WriteType type;
44 };
45 
46 struct TestHandler : public WriteBatch::Handler {
47   std::map<uint32_t, std::vector<Entry>> seen;
48   Status PutCF(uint32_t column_family_id, const Slice& key,
49                const Slice& value) override {
50     Entry e;
51     e.key = key.ToString();
52     e.value = value.ToString();
53     e.type = kPutRecord;
54     seen[column_family_id].push_back(e);
55     return Status::OK();
56   }
57   Status MergeCF(uint32_t column_family_id, const Slice& key,
58                  const Slice& value) override {
59     Entry e;
60     e.key = key.ToString();
61     e.value = value.ToString();
62     e.type = kMergeRecord;
63     seen[column_family_id].push_back(e);
64     return Status::OK();
65   }
66   void LogData(const Slice& /*blob*/) override {}
67   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
68     Entry e;
69     e.key = key.ToString();
70     e.value = "";
71     e.type = kDeleteRecord;
72     seen[column_family_id].push_back(e);
73     return Status::OK();
74   }
75 };
76 }  // namespace anonymous
77 
78 class WriteBatchWithIndexTest : public testing::Test {};
79 
80 void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
81                                      WriteBatchWithIndex* batch) {
82   // In this test, we insert <key, value> to column family `data`, and
83   // <value, key> to column family `index`. Then iterator them in order
84   // and seek them by key.
85 
86   // Sort entries by key
87   std::map<std::string, std::vector<Entry*>> data_map;
88   // Sort entries by value
89   std::map<std::string, std::vector<Entry*>> index_map;
90   for (auto& e : entries) {
91     data_map[e.key].push_back(&e);
92     index_map[e.value].push_back(&e);
93   }
94 
95   ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
96   ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
97   for (auto& e : entries) {
98     if (e.type == kPutRecord) {
99       batch->Put(&data, e.key, e.value);
100       batch->Put(&index, e.value, e.key);
101     } else if (e.type == kMergeRecord) {
102       batch->Merge(&data, e.key, e.value);
103       batch->Put(&index, e.value, e.key);
104     } else {
105       assert(e.type == kDeleteRecord);
106       std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
107       iter->Seek(e.key);
108       ASSERT_OK(iter->status());
109       auto write_entry = iter->Entry();
110       ASSERT_EQ(e.key, write_entry.key.ToString());
111       ASSERT_EQ(e.value, write_entry.value.ToString());
112       batch->Delete(&data, e.key);
113       batch->Put(&index, e.value, "");
114     }
115   }
116 
117   // Iterator all keys
118   {
119     std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
120     for (int seek_to_first : {0, 1}) {
121       if (seek_to_first) {
122         iter->SeekToFirst();
123       } else {
124         iter->Seek("");
125       }
126       for (auto pair : data_map) {
127         for (auto v : pair.second) {
128           ASSERT_OK(iter->status());
129           ASSERT_TRUE(iter->Valid());
130           auto write_entry = iter->Entry();
131           ASSERT_EQ(pair.first, write_entry.key.ToString());
132           ASSERT_EQ(v->type, write_entry.type);
133           if (write_entry.type != kDeleteRecord) {
134             ASSERT_EQ(v->value, write_entry.value.ToString());
135           }
136           iter->Next();
137         }
138       }
139       ASSERT_TRUE(!iter->Valid());
140     }
141     iter->SeekToLast();
142     for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
143       for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
144         ASSERT_OK(iter->status());
145         ASSERT_TRUE(iter->Valid());
146         auto write_entry = iter->Entry();
147         ASSERT_EQ(pair->first, write_entry.key.ToString());
148         ASSERT_EQ((*v)->type, write_entry.type);
149         if (write_entry.type != kDeleteRecord) {
150           ASSERT_EQ((*v)->value, write_entry.value.ToString());
151         }
152         iter->Prev();
153       }
154     }
155     ASSERT_TRUE(!iter->Valid());
156   }
157 
158   // Iterator all indexes
159   {
160     std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
161     for (int seek_to_first : {0, 1}) {
162       if (seek_to_first) {
163         iter->SeekToFirst();
164       } else {
165         iter->Seek("");
166       }
167       for (auto pair : index_map) {
168         for (auto v : pair.second) {
169           ASSERT_OK(iter->status());
170           ASSERT_TRUE(iter->Valid());
171           auto write_entry = iter->Entry();
172           ASSERT_EQ(pair.first, write_entry.key.ToString());
173           if (v->type != kDeleteRecord) {
174             ASSERT_EQ(v->key, write_entry.value.ToString());
175             ASSERT_EQ(v->value, write_entry.key.ToString());
176           }
177           iter->Next();
178         }
179       }
180       ASSERT_TRUE(!iter->Valid());
181     }
182 
183     iter->SeekToLast();
184     for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
185       for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
186         ASSERT_OK(iter->status());
187         ASSERT_TRUE(iter->Valid());
188         auto write_entry = iter->Entry();
189         ASSERT_EQ(pair->first, write_entry.key.ToString());
190         if ((*v)->type != kDeleteRecord) {
191           ASSERT_EQ((*v)->key, write_entry.value.ToString());
192           ASSERT_EQ((*v)->value, write_entry.key.ToString());
193         }
194         iter->Prev();
195       }
196     }
197     ASSERT_TRUE(!iter->Valid());
198   }
199 
200   // Seek to every key
201   {
202     std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
203 
204     // Seek the keys one by one in reverse order
205     for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
206       iter->Seek(pair->first);
207       ASSERT_OK(iter->status());
208       for (auto v : pair->second) {
209         ASSERT_TRUE(iter->Valid());
210         auto write_entry = iter->Entry();
211         ASSERT_EQ(pair->first, write_entry.key.ToString());
212         ASSERT_EQ(v->type, write_entry.type);
213         if (write_entry.type != kDeleteRecord) {
214           ASSERT_EQ(v->value, write_entry.value.ToString());
215         }
216         iter->Next();
217         ASSERT_OK(iter->status());
218       }
219     }
220   }
221 
222   // Seek to every index
223   {
224     std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
225 
226     // Seek the keys one by one in reverse order
227     for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
228       iter->Seek(pair->first);
229       ASSERT_OK(iter->status());
230       for (auto v : pair->second) {
231         ASSERT_TRUE(iter->Valid());
232         auto write_entry = iter->Entry();
233         ASSERT_EQ(pair->first, write_entry.key.ToString());
234         ASSERT_EQ(v->value, write_entry.key.ToString());
235         if (v->type != kDeleteRecord) {
236           ASSERT_EQ(v->key, write_entry.value.ToString());
237         }
238         iter->Next();
239         ASSERT_OK(iter->status());
240       }
241     }
242   }
243 
244   // Verify WriteBatch can be iterated
245   TestHandler handler;
246   batch->GetWriteBatch()->Iterate(&handler);
247 
248   // Verify data column family
249   {
250     ASSERT_EQ(entries.size(), handler.seen[data.GetID()].size());
251     size_t i = 0;
252     for (auto e : handler.seen[data.GetID()]) {
253       auto write_entry = entries[i++];
254       ASSERT_EQ(e.type, write_entry.type);
255       ASSERT_EQ(e.key, write_entry.key);
256       if (e.type != kDeleteRecord) {
257         ASSERT_EQ(e.value, write_entry.value);
258       }
259     }
260   }
261 
262   // Verify index column family
263   {
264     ASSERT_EQ(entries.size(), handler.seen[index.GetID()].size());
265     size_t i = 0;
266     for (auto e : handler.seen[index.GetID()]) {
267       auto write_entry = entries[i++];
268       ASSERT_EQ(e.key, write_entry.value);
269       if (write_entry.type != kDeleteRecord) {
270         ASSERT_EQ(e.value, write_entry.key);
271       }
272     }
273   }
274 }
275 
276 TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
277   Entry entries[] = {
278       {"aaa", "0005", kPutRecord},
279       {"b", "0002", kPutRecord},
280       {"cdd", "0002", kMergeRecord},
281       {"aab", "00001", kPutRecord},
282       {"cc", "00005", kPutRecord},
283       {"cdd", "0002", kPutRecord},
284       {"aab", "0003", kPutRecord},
285       {"cc", "00005", kDeleteRecord},
286   };
287   std::vector<Entry> entries_list(entries, entries + 8);
288 
289   WriteBatchWithIndex batch(nullptr, 20);
290 
291   TestValueAsSecondaryIndexHelper(entries_list, &batch);
292 
293   // Clear batch and re-run test with new values
294   batch.Clear();
295 
296   Entry new_entries[] = {
297       {"aaa", "0005", kPutRecord},
298       {"e", "0002", kPutRecord},
299       {"add", "0002", kMergeRecord},
300       {"aab", "00001", kPutRecord},
301       {"zz", "00005", kPutRecord},
302       {"add", "0002", kPutRecord},
303       {"aab", "0003", kPutRecord},
304       {"zz", "00005", kDeleteRecord},
305   };
306 
307   entries_list = std::vector<Entry>(new_entries, new_entries + 8);
308 
309   TestValueAsSecondaryIndexHelper(entries_list, &batch);
310 }
311 
312 TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) {
313   ColumnFamilyHandleImplDummy cf1(6, nullptr);
314   ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
315   ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
316   WriteBatchWithIndex batch(BytewiseComparator(), 20);
317 
318   batch.Put(&cf1, "ddd", "");
319   batch.Put(&cf2, "aaa", "");
320   batch.Put(&cf2, "eee", "");
321   batch.Put(&cf1, "ccc", "");
322   batch.Put(&reverse_cf, "a11", "");
323   batch.Put(&cf1, "bbb", "");
324 
325   Slice key_slices[] = {"a", "3", "3"};
326   Slice value_slice = "";
327   batch.Put(&reverse_cf, SliceParts(key_slices, 3),
328             SliceParts(&value_slice, 1));
329   batch.Put(&reverse_cf, "a22", "");
330 
331   {
332     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
333     iter->Seek("");
334     ASSERT_OK(iter->status());
335     ASSERT_TRUE(iter->Valid());
336     ASSERT_EQ("bbb", iter->Entry().key.ToString());
337     iter->Next();
338     ASSERT_OK(iter->status());
339     ASSERT_TRUE(iter->Valid());
340     ASSERT_EQ("ccc", iter->Entry().key.ToString());
341     iter->Next();
342     ASSERT_OK(iter->status());
343     ASSERT_TRUE(iter->Valid());
344     ASSERT_EQ("ddd", iter->Entry().key.ToString());
345     iter->Next();
346     ASSERT_OK(iter->status());
347     ASSERT_TRUE(!iter->Valid());
348   }
349 
350   {
351     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
352     iter->Seek("");
353     ASSERT_OK(iter->status());
354     ASSERT_TRUE(iter->Valid());
355     ASSERT_EQ("aaa", iter->Entry().key.ToString());
356     iter->Next();
357     ASSERT_OK(iter->status());
358     ASSERT_TRUE(iter->Valid());
359     ASSERT_EQ("eee", iter->Entry().key.ToString());
360     iter->Next();
361     ASSERT_OK(iter->status());
362     ASSERT_TRUE(!iter->Valid());
363   }
364 
365   {
366     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
367     iter->Seek("");
368     ASSERT_OK(iter->status());
369     ASSERT_TRUE(!iter->Valid());
370 
371     iter->Seek("z");
372     ASSERT_OK(iter->status());
373     ASSERT_TRUE(iter->Valid());
374     ASSERT_EQ("a33", iter->Entry().key.ToString());
375     iter->Next();
376     ASSERT_OK(iter->status());
377     ASSERT_TRUE(iter->Valid());
378     ASSERT_EQ("a22", iter->Entry().key.ToString());
379     iter->Next();
380     ASSERT_OK(iter->status());
381     ASSERT_TRUE(iter->Valid());
382     ASSERT_EQ("a11", iter->Entry().key.ToString());
383     iter->Next();
384     ASSERT_OK(iter->status());
385     ASSERT_TRUE(!iter->Valid());
386 
387     iter->Seek("a22");
388     ASSERT_OK(iter->status());
389     ASSERT_TRUE(iter->Valid());
390     ASSERT_EQ("a22", iter->Entry().key.ToString());
391 
392     iter->Seek("a13");
393     ASSERT_OK(iter->status());
394     ASSERT_TRUE(iter->Valid());
395     ASSERT_EQ("a11", iter->Entry().key.ToString());
396   }
397 }
398 
399 TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) {
400   ColumnFamilyHandleImplDummy cf1(6, nullptr);
401   ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
402   ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
403   WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
404 
405   batch.Put(&cf1, "ddd", "");
406   batch.Merge(&cf1, "ddd", "");
407   batch.Delete(&cf1, "ddd");
408   batch.Put(&cf2, "aaa", "");
409   batch.Delete(&cf2, "aaa");
410   batch.Put(&cf2, "aaa", "aaa");
411   batch.Put(&cf2, "eee", "eee");
412   batch.Put(&cf1, "ccc", "");
413   batch.Put(&reverse_cf, "a11", "");
414   batch.Delete(&cf1, "ccc");
415   batch.Put(&reverse_cf, "a33", "a33");
416   batch.Put(&reverse_cf, "a11", "a11");
417   Slice slices[] = {"a", "3", "3"};
418   batch.Delete(&reverse_cf, SliceParts(slices, 3));
419 
420   {
421     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
422     iter->Seek("");
423     ASSERT_OK(iter->status());
424     ASSERT_TRUE(iter->Valid());
425     ASSERT_EQ("ccc", iter->Entry().key.ToString());
426     ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
427     iter->Next();
428     ASSERT_OK(iter->status());
429     ASSERT_TRUE(iter->Valid());
430     ASSERT_EQ("ddd", iter->Entry().key.ToString());
431     ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
432     iter->Next();
433     ASSERT_OK(iter->status());
434     ASSERT_TRUE(!iter->Valid());
435   }
436 
437   {
438     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
439     iter->SeekToLast();
440     ASSERT_OK(iter->status());
441     ASSERT_TRUE(iter->Valid());
442     ASSERT_EQ("eee", iter->Entry().key.ToString());
443     ASSERT_EQ("eee", iter->Entry().value.ToString());
444     iter->Prev();
445     ASSERT_OK(iter->status());
446     ASSERT_TRUE(iter->Valid());
447     ASSERT_EQ("aaa", iter->Entry().key.ToString());
448     ASSERT_EQ("aaa", iter->Entry().value.ToString());
449     iter->Prev();
450     ASSERT_OK(iter->status());
451     ASSERT_TRUE(!iter->Valid());
452 
453     iter->SeekToFirst();
454     ASSERT_OK(iter->status());
455     ASSERT_TRUE(iter->Valid());
456     ASSERT_EQ("aaa", iter->Entry().key.ToString());
457     ASSERT_EQ("aaa", iter->Entry().value.ToString());
458     iter->Next();
459     ASSERT_OK(iter->status());
460     ASSERT_TRUE(iter->Valid());
461     ASSERT_EQ("eee", iter->Entry().key.ToString());
462     ASSERT_EQ("eee", iter->Entry().value.ToString());
463     iter->Next();
464     ASSERT_OK(iter->status());
465     ASSERT_TRUE(!iter->Valid());
466   }
467 
468   {
469     std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
470     iter->Seek("");
471     ASSERT_OK(iter->status());
472     ASSERT_TRUE(!iter->Valid());
473 
474     iter->Seek("z");
475     ASSERT_OK(iter->status());
476     ASSERT_TRUE(iter->Valid());
477     ASSERT_EQ("a33", iter->Entry().key.ToString());
478     ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
479     iter->Next();
480     ASSERT_OK(iter->status());
481     ASSERT_TRUE(iter->Valid());
482     ASSERT_EQ("a11", iter->Entry().key.ToString());
483     ASSERT_EQ("a11", iter->Entry().value.ToString());
484     iter->Next();
485     ASSERT_OK(iter->status());
486     ASSERT_TRUE(!iter->Valid());
487 
488     iter->SeekToLast();
489     ASSERT_TRUE(iter->Valid());
490     ASSERT_EQ("a11", iter->Entry().key.ToString());
491     ASSERT_EQ("a11", iter->Entry().value.ToString());
492     iter->Prev();
493 
494     ASSERT_OK(iter->status());
495     ASSERT_TRUE(iter->Valid());
496     ASSERT_EQ("a33", iter->Entry().key.ToString());
497     ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
498     iter->Prev();
499     ASSERT_TRUE(!iter->Valid());
500   }
501 }
502 
503 namespace {
504 typedef std::map<std::string, std::string> KVMap;
505 
506 class KVIter : public Iterator {
507  public:
508   explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {}
509   bool Valid() const override { return iter_ != map_->end(); }
510   void SeekToFirst() override { iter_ = map_->begin(); }
511   void SeekToLast() override {
512     if (map_->empty()) {
513       iter_ = map_->end();
514     } else {
515       iter_ = map_->find(map_->rbegin()->first);
516     }
517   }
518   void Seek(const Slice& k) override {
519     iter_ = map_->lower_bound(k.ToString());
520   }
521   void SeekForPrev(const Slice& k) override {
522     iter_ = map_->upper_bound(k.ToString());
523     Prev();
524   }
525   void Next() override { ++iter_; }
526   void Prev() override {
527     if (iter_ == map_->begin()) {
528       iter_ = map_->end();
529       return;
530     }
531     --iter_;
532   }
533 
534   Slice key() const override { return iter_->first; }
535   Slice value() const override { return iter_->second; }
536   Status status() const override { return Status::OK(); }
537 
538  private:
539   const KVMap* const map_;
540   KVMap::const_iterator iter_;
541 };
542 
543 void AssertIter(Iterator* iter, const std::string& key,
544                 const std::string& value) {
545   ASSERT_OK(iter->status());
546   ASSERT_TRUE(iter->Valid());
547   ASSERT_EQ(key, iter->key().ToString());
548   ASSERT_EQ(value, iter->value().ToString());
549 }
550 
551 void AssertItersEqual(Iterator* iter1, Iterator* iter2) {
552   ASSERT_EQ(iter1->Valid(), iter2->Valid());
553   if (iter1->Valid()) {
554     ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString());
555     ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString());
556   }
557 }
558 }  // namespace
559 
560 TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
561   std::vector<std::string> source_strings = {"a", "b", "c", "d", "e",
562                                              "f", "g", "h", "i", "j"};
563   for (int rand_seed = 301; rand_seed < 366; rand_seed++) {
564     Random rnd(rand_seed);
565 
566     ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
567     ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
568     ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator());
569 
570     WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
571 
572     if (rand_seed % 2 == 0) {
573       batch.Put(&cf2, "zoo", "bar");
574     }
575     if (rand_seed % 4 == 1) {
576       batch.Put(&cf3, "zoo", "bar");
577     }
578 
579     KVMap map;
580     KVMap merged_map;
581     for (auto key : source_strings) {
582       std::string value = key + key;
583       int type = rnd.Uniform(6);
584       switch (type) {
585         case 0:
586           // only base has it
587           map[key] = value;
588           merged_map[key] = value;
589           break;
590         case 1:
591           // only delta has it
592           batch.Put(&cf1, key, value);
593           map[key] = value;
594           merged_map[key] = value;
595           break;
596         case 2:
597           // both has it. Delta should win
598           batch.Put(&cf1, key, value);
599           map[key] = "wrong_value";
600           merged_map[key] = value;
601           break;
602         case 3:
603           // both has it. Delta is delete
604           batch.Delete(&cf1, key);
605           map[key] = "wrong_value";
606           break;
607         case 4:
608           // only delta has it. Delta is delete
609           batch.Delete(&cf1, key);
610           map[key] = "wrong_value";
611           break;
612         default:
613           // Neither iterator has it.
614           break;
615       }
616     }
617 
618     std::unique_ptr<Iterator> iter(
619         batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
620     std::unique_ptr<Iterator> result_iter(new KVIter(&merged_map));
621 
622     bool is_valid = false;
623     for (int i = 0; i < 128; i++) {
624       // Random walk and make sure iter and result_iter returns the
625       // same key and value
626       int type = rnd.Uniform(6);
627       ASSERT_OK(iter->status());
628       switch (type) {
629         case 0:
630           // Seek to First
631           iter->SeekToFirst();
632           result_iter->SeekToFirst();
633           break;
634         case 1:
635           // Seek to last
636           iter->SeekToLast();
637           result_iter->SeekToLast();
638           break;
639         case 2: {
640           // Seek to random key
641           auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
642           auto key = source_strings[key_idx];
643           iter->Seek(key);
644           result_iter->Seek(key);
645           break;
646         }
647         case 3: {
648           // SeekForPrev to random key
649           auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
650           auto key = source_strings[key_idx];
651           iter->SeekForPrev(key);
652           result_iter->SeekForPrev(key);
653           break;
654         }
655         case 4:
656           // Next
657           if (is_valid) {
658             iter->Next();
659             result_iter->Next();
660           } else {
661             continue;
662           }
663           break;
664         default:
665           assert(type == 5);
666           // Prev
667           if (is_valid) {
668             iter->Prev();
669             result_iter->Prev();
670           } else {
671             continue;
672           }
673           break;
674       }
675       AssertItersEqual(iter.get(), result_iter.get());
676       is_valid = iter->Valid();
677     }
678   }
679 }
680 
681 TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) {
682   ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
683   ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
684   WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
685 
686   {
687     KVMap map;
688     map["a"] = "aa";
689     map["c"] = "cc";
690     map["e"] = "ee";
691     std::unique_ptr<Iterator> iter(
692         batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
693 
694     iter->SeekToFirst();
695     AssertIter(iter.get(), "a", "aa");
696     iter->Next();
697     AssertIter(iter.get(), "c", "cc");
698     iter->Next();
699     AssertIter(iter.get(), "e", "ee");
700     iter->Next();
701     ASSERT_OK(iter->status());
702     ASSERT_TRUE(!iter->Valid());
703 
704     iter->SeekToLast();
705     AssertIter(iter.get(), "e", "ee");
706     iter->Prev();
707     AssertIter(iter.get(), "c", "cc");
708     iter->Prev();
709     AssertIter(iter.get(), "a", "aa");
710     iter->Prev();
711     ASSERT_OK(iter->status());
712     ASSERT_TRUE(!iter->Valid());
713 
714     iter->Seek("b");
715     AssertIter(iter.get(), "c", "cc");
716 
717     iter->Prev();
718     AssertIter(iter.get(), "a", "aa");
719 
720     iter->Seek("a");
721     AssertIter(iter.get(), "a", "aa");
722   }
723 
724   // Test the case that there is one element in the write batch
725   batch.Put(&cf2, "zoo", "bar");
726   batch.Put(&cf1, "a", "aa");
727   {
728     KVMap empty_map;
729     std::unique_ptr<Iterator> iter(
730         batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
731 
732     iter->SeekToFirst();
733     AssertIter(iter.get(), "a", "aa");
734     iter->Next();
735     ASSERT_OK(iter->status());
736     ASSERT_TRUE(!iter->Valid());
737   }
738 
739   batch.Delete(&cf1, "b");
740   batch.Put(&cf1, "c", "cc");
741   batch.Put(&cf1, "d", "dd");
742   batch.Delete(&cf1, "e");
743 
744   {
745     KVMap map;
746     map["b"] = "";
747     map["cc"] = "cccc";
748     map["f"] = "ff";
749     std::unique_ptr<Iterator> iter(
750         batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
751 
752     iter->SeekToFirst();
753     AssertIter(iter.get(), "a", "aa");
754     iter->Next();
755     AssertIter(iter.get(), "c", "cc");
756     iter->Next();
757     AssertIter(iter.get(), "cc", "cccc");
758     iter->Next();
759     AssertIter(iter.get(), "d", "dd");
760     iter->Next();
761     AssertIter(iter.get(), "f", "ff");
762     iter->Next();
763     ASSERT_OK(iter->status());
764     ASSERT_TRUE(!iter->Valid());
765 
766     iter->SeekToLast();
767     AssertIter(iter.get(), "f", "ff");
768     iter->Prev();
769     AssertIter(iter.get(), "d", "dd");
770     iter->Prev();
771     AssertIter(iter.get(), "cc", "cccc");
772     iter->Prev();
773     AssertIter(iter.get(), "c", "cc");
774     iter->Next();
775     AssertIter(iter.get(), "cc", "cccc");
776     iter->Prev();
777     AssertIter(iter.get(), "c", "cc");
778     iter->Prev();
779     AssertIter(iter.get(), "a", "aa");
780     iter->Prev();
781     ASSERT_OK(iter->status());
782     ASSERT_TRUE(!iter->Valid());
783 
784     iter->Seek("c");
785     AssertIter(iter.get(), "c", "cc");
786 
787     iter->Seek("cb");
788     AssertIter(iter.get(), "cc", "cccc");
789 
790     iter->Seek("cc");
791     AssertIter(iter.get(), "cc", "cccc");
792     iter->Next();
793     AssertIter(iter.get(), "d", "dd");
794 
795     iter->Seek("e");
796     AssertIter(iter.get(), "f", "ff");
797 
798     iter->Prev();
799     AssertIter(iter.get(), "d", "dd");
800 
801     iter->Next();
802     AssertIter(iter.get(), "f", "ff");
803   }
804 
805   {
806     KVMap empty_map;
807     std::unique_ptr<Iterator> iter(
808         batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
809 
810     iter->SeekToFirst();
811     AssertIter(iter.get(), "a", "aa");
812     iter->Next();
813     AssertIter(iter.get(), "c", "cc");
814     iter->Next();
815     AssertIter(iter.get(), "d", "dd");
816     iter->Next();
817     ASSERT_OK(iter->status());
818     ASSERT_TRUE(!iter->Valid());
819 
820     iter->SeekToLast();
821     AssertIter(iter.get(), "d", "dd");
822     iter->Prev();
823     AssertIter(iter.get(), "c", "cc");
824     iter->Prev();
825     AssertIter(iter.get(), "a", "aa");
826 
827     iter->Prev();
828     ASSERT_OK(iter->status());
829     ASSERT_TRUE(!iter->Valid());
830 
831     iter->Seek("aa");
832     AssertIter(iter.get(), "c", "cc");
833     iter->Next();
834     AssertIter(iter.get(), "d", "dd");
835 
836     iter->Seek("ca");
837     AssertIter(iter.get(), "d", "dd");
838 
839     iter->Prev();
840     AssertIter(iter.get(), "c", "cc");
841   }
842 }
843 
844 TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {
845   ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator());
846   ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator());
847   WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
848 
849   // Test the case that there is one element in the write batch
850   batch.Put(&cf2, "zoo", "bar");
851   batch.Put(&cf1, "a", "aa");
852   {
853     KVMap empty_map;
854     std::unique_ptr<Iterator> iter(
855         batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
856 
857     iter->SeekToFirst();
858     AssertIter(iter.get(), "a", "aa");
859     iter->Next();
860     ASSERT_OK(iter->status());
861     ASSERT_TRUE(!iter->Valid());
862   }
863 
864   batch.Put(&cf1, "c", "cc");
865   {
866     KVMap map;
867     std::unique_ptr<Iterator> iter(
868         batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
869 
870     iter->SeekToFirst();
871     AssertIter(iter.get(), "c", "cc");
872     iter->Next();
873     AssertIter(iter.get(), "a", "aa");
874     iter->Next();
875     ASSERT_OK(iter->status());
876     ASSERT_TRUE(!iter->Valid());
877 
878     iter->SeekToLast();
879     AssertIter(iter.get(), "a", "aa");
880     iter->Prev();
881     AssertIter(iter.get(), "c", "cc");
882     iter->Prev();
883     ASSERT_OK(iter->status());
884     ASSERT_TRUE(!iter->Valid());
885 
886     iter->Seek("b");
887     AssertIter(iter.get(), "a", "aa");
888 
889     iter->Prev();
890     AssertIter(iter.get(), "c", "cc");
891 
892     iter->Seek("a");
893     AssertIter(iter.get(), "a", "aa");
894   }
895 
896   // default column family
897   batch.Put("a", "b");
898   {
899     KVMap map;
900     map["b"] = "";
901     std::unique_ptr<Iterator> iter(batch.NewIteratorWithBase(new KVIter(&map)));
902 
903     iter->SeekToFirst();
904     AssertIter(iter.get(), "a", "b");
905     iter->Next();
906     AssertIter(iter.get(), "b", "");
907     iter->Next();
908     ASSERT_OK(iter->status());
909     ASSERT_TRUE(!iter->Valid());
910 
911     iter->SeekToLast();
912     AssertIter(iter.get(), "b", "");
913     iter->Prev();
914     AssertIter(iter.get(), "a", "b");
915     iter->Prev();
916     ASSERT_OK(iter->status());
917     ASSERT_TRUE(!iter->Valid());
918 
919     iter->Seek("b");
920     AssertIter(iter.get(), "b", "");
921 
922     iter->Prev();
923     AssertIter(iter.get(), "a", "b");
924 
925     iter->Seek("0");
926     AssertIter(iter.get(), "a", "b");
927   }
928 }
929 
930 TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) {
931   Options options;
932   WriteBatchWithIndex batch;
933   Status s;
934   std::string value;
935 
936   s = batch.GetFromBatch(options, "b", &value);
937   ASSERT_TRUE(s.IsNotFound());
938 
939   batch.Put("a", "a");
940   batch.Put("b", "b");
941   batch.Put("c", "c");
942   batch.Put("a", "z");
943   batch.Delete("c");
944   batch.Delete("d");
945   batch.Delete("e");
946   batch.Put("e", "e");
947 
948   s = batch.GetFromBatch(options, "b", &value);
949   ASSERT_OK(s);
950   ASSERT_EQ("b", value);
951 
952   s = batch.GetFromBatch(options, "a", &value);
953   ASSERT_OK(s);
954   ASSERT_EQ("z", value);
955 
956   s = batch.GetFromBatch(options, "c", &value);
957   ASSERT_TRUE(s.IsNotFound());
958 
959   s = batch.GetFromBatch(options, "d", &value);
960   ASSERT_TRUE(s.IsNotFound());
961 
962   s = batch.GetFromBatch(options, "x", &value);
963   ASSERT_TRUE(s.IsNotFound());
964 
965   s = batch.GetFromBatch(options, "e", &value);
966   ASSERT_OK(s);
967   ASSERT_EQ("e", value);
968 
969   batch.Merge("z", "z");
970 
971   s = batch.GetFromBatch(options, "z", &value);
972   ASSERT_NOK(s);  // No merge operator specified.
973 
974   s = batch.GetFromBatch(options, "b", &value);
975   ASSERT_OK(s);
976   ASSERT_EQ("b", value);
977 }
978 
979 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
980   DB* db;
981   Options options;
982   options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
983   options.create_if_missing = true;
984 
985   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
986 
987   DestroyDB(dbname, options);
988   Status s = DB::Open(options, dbname, &db);
989   ASSERT_OK(s);
990 
991   ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
992   WriteBatchWithIndex batch;
993   std::string value;
994 
995   s = batch.GetFromBatch(options, "x", &value);
996   ASSERT_TRUE(s.IsNotFound());
997 
998   batch.Put("x", "X");
999   std::string expected = "X";
1000 
1001   for (int i = 0; i < 5; i++) {
1002     batch.Merge("x", ToString(i));
1003     expected = expected + "," + ToString(i);
1004 
1005     if (i % 2 == 0) {
1006       batch.Put("y", ToString(i / 2));
1007     }
1008 
1009     batch.Merge("z", "z");
1010 
1011     s = batch.GetFromBatch(column_family, options, "x", &value);
1012     ASSERT_OK(s);
1013     ASSERT_EQ(expected, value);
1014 
1015     s = batch.GetFromBatch(column_family, options, "y", &value);
1016     ASSERT_OK(s);
1017     ASSERT_EQ(ToString(i / 2), value);
1018 
1019     s = batch.GetFromBatch(column_family, options, "z", &value);
1020     ASSERT_TRUE(s.IsMergeInProgress());
1021   }
1022 
1023   delete db;
1024   DestroyDB(dbname, options);
1025 }
1026 
1027 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) {
1028   DB* db;
1029   Options options;
1030   options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
1031   options.create_if_missing = true;
1032 
1033   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
1034 
1035   DestroyDB(dbname, options);
1036   Status s = DB::Open(options, dbname, &db);
1037   ASSERT_OK(s);
1038 
1039   ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
1040 
1041   // Test batch with overwrite_key=true
1042   WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
1043   std::string value;
1044 
1045   s = batch.GetFromBatch(column_family, options, "X", &value);
1046   ASSERT_TRUE(s.IsNotFound());
1047 
1048   batch.Put(column_family, "X", "x");
1049   s = batch.GetFromBatch(column_family, options, "X", &value);
1050   ASSERT_OK(s);
1051   ASSERT_EQ("x", value);
1052 
1053   batch.Put(column_family, "X", "x2");
1054   s = batch.GetFromBatch(column_family, options, "X", &value);
1055   ASSERT_OK(s);
1056   ASSERT_EQ("x2", value);
1057 
1058   batch.Merge(column_family, "X", "aaa");
1059   s = batch.GetFromBatch(column_family, options, "X", &value);
1060   ASSERT_TRUE(s.IsMergeInProgress());
1061 
1062   batch.Merge(column_family, "X", "bbb");
1063   s = batch.GetFromBatch(column_family, options, "X", &value);
1064   ASSERT_TRUE(s.IsMergeInProgress());
1065 
1066   batch.Put(column_family, "X", "x3");
1067   s = batch.GetFromBatch(column_family, options, "X", &value);
1068   ASSERT_OK(s);
1069   ASSERT_EQ("x3", value);
1070 
1071   batch.Merge(column_family, "X", "ccc");
1072   s = batch.GetFromBatch(column_family, options, "X", &value);
1073   ASSERT_TRUE(s.IsMergeInProgress());
1074 
1075   batch.Delete(column_family, "X");
1076   s = batch.GetFromBatch(column_family, options, "X", &value);
1077   ASSERT_TRUE(s.IsNotFound());
1078 
1079   batch.Merge(column_family, "X", "ddd");
1080   s = batch.GetFromBatch(column_family, options, "X", &value);
1081   ASSERT_TRUE(s.IsMergeInProgress());
1082 
1083   delete db;
1084   DestroyDB(dbname, options);
1085 }
1086 
1087 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
1088   DB* db;
1089   Options options;
1090   options.create_if_missing = true;
1091   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
1092 
1093   DestroyDB(dbname, options);
1094   Status s = DB::Open(options, dbname, &db);
1095   ASSERT_OK(s);
1096 
1097   WriteBatchWithIndex batch;
1098   ReadOptions read_options;
1099   WriteOptions write_options;
1100   std::string value;
1101 
1102   s = db->Put(write_options, "a", "a");
1103   ASSERT_OK(s);
1104 
1105   s = db->Put(write_options, "b", "b");
1106   ASSERT_OK(s);
1107 
1108   s = db->Put(write_options, "c", "c");
1109   ASSERT_OK(s);
1110 
1111   batch.Put("a", "batch.a");
1112   batch.Delete("b");
1113 
1114   s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
1115   ASSERT_OK(s);
1116   ASSERT_EQ("batch.a", value);
1117 
1118   s = batch.GetFromBatchAndDB(db, read_options, "b", &value);
1119   ASSERT_TRUE(s.IsNotFound());
1120 
1121   s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
1122   ASSERT_OK(s);
1123   ASSERT_EQ("c", value);
1124 
1125   s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
1126   ASSERT_TRUE(s.IsNotFound());
1127 
1128   db->Delete(write_options, "x");
1129 
1130   s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
1131   ASSERT_TRUE(s.IsNotFound());
1132 
1133   delete db;
1134   DestroyDB(dbname, options);
1135 }
1136 
1137 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) {
1138   DB* db;
1139   Options options;
1140 
1141   options.create_if_missing = true;
1142   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
1143 
1144   options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
1145 
1146   DestroyDB(dbname, options);
1147   Status s = DB::Open(options, dbname, &db);
1148   assert(s.ok());
1149 
1150   WriteBatchWithIndex batch;
1151   ReadOptions read_options;
1152   WriteOptions write_options;
1153   std::string value;
1154 
1155   s = db->Put(write_options, "a", "a0");
1156   ASSERT_OK(s);
1157 
1158   s = db->Put(write_options, "b", "b0");
1159   ASSERT_OK(s);
1160 
1161   s = db->Merge(write_options, "b", "b1");
1162   ASSERT_OK(s);
1163 
1164   s = db->Merge(write_options, "c", "c0");
1165   ASSERT_OK(s);
1166 
1167   s = db->Merge(write_options, "d", "d0");
1168   ASSERT_OK(s);
1169 
1170   batch.Merge("a", "a1");
1171   batch.Merge("a", "a2");
1172   batch.Merge("b", "b2");
1173   batch.Merge("d", "d1");
1174   batch.Merge("e", "e0");
1175 
1176   s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
1177   ASSERT_OK(s);
1178   ASSERT_EQ("a0,a1,a2", value);
1179 
1180   s = batch.GetFromBatchAndDB(db, read_options, "b", &value);
1181   ASSERT_OK(s);
1182   ASSERT_EQ("b0,b1,b2", value);
1183 
1184   s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
1185   ASSERT_OK(s);
1186   ASSERT_EQ("c0", value);
1187 
1188   s = batch.GetFromBatchAndDB(db, read_options, "d", &value);
1189   ASSERT_OK(s);
1190   ASSERT_EQ("d0,d1", value);
1191 
1192   s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
1193   ASSERT_OK(s);
1194   ASSERT_EQ("e0", value);
1195 
1196   s = db->Delete(write_options, "x");
1197   ASSERT_OK(s);
1198 
1199   s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
1200   ASSERT_TRUE(s.IsNotFound());
1201 
1202   const Snapshot* snapshot = db->GetSnapshot();
1203   ReadOptions snapshot_read_options;
1204   snapshot_read_options.snapshot = snapshot;
1205 
1206   s = db->Delete(write_options, "a");
1207   ASSERT_OK(s);
1208 
1209   s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
1210   ASSERT_OK(s);
1211   ASSERT_EQ("a1,a2", value);
1212 
1213   s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value);
1214   ASSERT_OK(s);
1215   ASSERT_EQ("a0,a1,a2", value);
1216 
1217   batch.Delete("a");
1218 
1219   s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
1220   ASSERT_TRUE(s.IsNotFound());
1221 
1222   s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value);
1223   ASSERT_TRUE(s.IsNotFound());
1224 
1225   s = db->Merge(write_options, "c", "c1");
1226   ASSERT_OK(s);
1227 
1228   s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
1229   ASSERT_OK(s);
1230   ASSERT_EQ("c0,c1", value);
1231 
1232   s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value);
1233   ASSERT_OK(s);
1234   ASSERT_EQ("c0", value);
1235 
1236   s = db->Put(write_options, "e", "e1");
1237   ASSERT_OK(s);
1238 
1239   s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
1240   ASSERT_OK(s);
1241   ASSERT_EQ("e1,e0", value);
1242 
1243   s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value);
1244   ASSERT_OK(s);
1245   ASSERT_EQ("e0", value);
1246 
1247   s = db->Delete(write_options, "e");
1248   ASSERT_OK(s);
1249 
1250   s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
1251   ASSERT_OK(s);
1252   ASSERT_EQ("e0", value);
1253 
1254   s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value);
1255   ASSERT_OK(s);
1256   ASSERT_EQ("e0", value);
1257 
1258   db->ReleaseSnapshot(snapshot);
1259   delete db;
1260   DestroyDB(dbname, options);
1261 }
1262 
1263 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) {
1264   DB* db;
1265   Options options;
1266 
1267   options.create_if_missing = true;
1268   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
1269 
1270   options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
1271 
1272   DestroyDB(dbname, options);
1273   Status s = DB::Open(options, dbname, &db);
1274   assert(s.ok());
1275 
1276   // Test batch with overwrite_key=true
1277   WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
1278 
1279   ReadOptions read_options;
1280   WriteOptions write_options;
1281   std::string value;
1282 
1283   s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
1284   ASSERT_TRUE(s.IsNotFound());
1285 
1286   batch.Merge("A", "xxx");
1287 
1288   s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
1289   ASSERT_TRUE(s.IsMergeInProgress());
1290 
1291   batch.Merge("A", "yyy");
1292 
1293   s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
1294   ASSERT_TRUE(s.IsMergeInProgress());
1295 
1296   s = db->Put(write_options, "A", "a0");
1297   ASSERT_OK(s);
1298 
1299   s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
1300   ASSERT_TRUE(s.IsMergeInProgress());
1301 
1302   batch.Delete("A");
1303 
1304   s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
1305   ASSERT_TRUE(s.IsNotFound());
1306 
1307   delete db;
1308   DestroyDB(dbname, options);
1309 }
1310 
1311 TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) {
1312   DB* db;
1313   Options options;
1314 
1315   options.create_if_missing = true;
1316   std::string dbname = test::PerThreadDBPath("write_batch_with_index_test");
1317 
1318   options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
1319 
1320   DestroyDB(dbname, options);
1321   Status s = DB::Open(options, dbname, &db);
1322   assert(s.ok());
1323 
1324   ReadOptions read_options;
1325   WriteOptions write_options;
1326   FlushOptions flush_options;
1327   std::string value;
1328 
1329   WriteBatchWithIndex batch;
1330 
1331   ASSERT_OK(db->Put(write_options, "A", "1"));
1332   ASSERT_OK(db->Flush(flush_options, db->DefaultColumnFamily()));
1333   ASSERT_OK(batch.Merge("A", "2"));
1334 
1335   ASSERT_OK(batch.GetFromBatchAndDB(db, read_options, "A", &value));
1336   ASSERT_EQ(value, "1,2");
1337 
1338   delete db;
1339   DestroyDB(dbname, options);
1340 }
1341 
1342 void AssertKey(std::string key, WBWIIterator* iter) {
1343   ASSERT_TRUE(iter->Valid());
1344   ASSERT_EQ(key, iter->Entry().key.ToString());
1345 }
1346 
1347 void AssertValue(std::string value, WBWIIterator* iter) {
1348   ASSERT_TRUE(iter->Valid());
1349   ASSERT_EQ(value, iter->Entry().value.ToString());
1350 }
1351 
1352 // Tests that we can write to the WBWI while we iterate (from a single thread).
1353 // iteration should see the newest writes
1354 TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) {
1355   WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
1356   for (char c = 'a'; c <= 'z'; ++c) {
1357     batch.Put(std::string(1, c), std::string(1, c));
1358   }
1359 
1360   std::unique_ptr<WBWIIterator> iter(batch.NewIterator());
1361   iter->Seek("k");
1362   AssertKey("k", iter.get());
1363   iter->Next();
1364   AssertKey("l", iter.get());
1365   batch.Put("ab", "cc");
1366   iter->Next();
1367   AssertKey("m", iter.get());
1368   batch.Put("mm", "kk");
1369   iter->Next();
1370   AssertKey("mm", iter.get());
1371   AssertValue("kk", iter.get());
1372   batch.Delete("mm");
1373 
1374   iter->Next();
1375   AssertKey("n", iter.get());
1376   iter->Prev();
1377   AssertKey("mm", iter.get());
1378   ASSERT_EQ(kDeleteRecord, iter->Entry().type);
1379 
1380   iter->Seek("ab");
1381   AssertKey("ab", iter.get());
1382   batch.Delete("x");
1383   iter->Seek("x");
1384   AssertKey("x", iter.get());
1385   ASSERT_EQ(kDeleteRecord, iter->Entry().type);
1386   iter->Prev();
1387   AssertKey("w", iter.get());
1388 }
1389 
1390 void AssertIterKey(std::string key, Iterator* iter) {
1391   ASSERT_TRUE(iter->Valid());
1392   ASSERT_EQ(key, iter->key().ToString());
1393 }
1394 
1395 void AssertIterValue(std::string value, Iterator* iter) {
1396   ASSERT_TRUE(iter->Valid());
1397   ASSERT_EQ(value, iter->value().ToString());
1398 }
1399 
1400 // same thing as above, but testing IteratorWithBase
1401 TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) {
1402   WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
1403   for (char c = 'a'; c <= 'z'; ++c) {
1404     batch.Put(std::string(1, c), std::string(1, c));
1405   }
1406 
1407   KVMap map;
1408   map["aa"] = "aa";
1409   map["cc"] = "cc";
1410   map["ee"] = "ee";
1411   map["em"] = "me";
1412 
1413   std::unique_ptr<Iterator> iter(
1414       batch.NewIteratorWithBase(new KVIter(&map)));
1415   iter->Seek("k");
1416   AssertIterKey("k", iter.get());
1417   iter->Next();
1418   AssertIterKey("l", iter.get());
1419   batch.Put("ab", "cc");
1420   iter->Next();
1421   AssertIterKey("m", iter.get());
1422   batch.Put("mm", "kk");
1423   iter->Next();
1424   AssertIterKey("mm", iter.get());
1425   AssertIterValue("kk", iter.get());
1426   batch.Delete("mm");
1427   iter->Next();
1428   AssertIterKey("n", iter.get());
1429   iter->Prev();
1430   // "mm" is deleted, so we're back at "m"
1431   AssertIterKey("m", iter.get());
1432 
1433   iter->Seek("ab");
1434   AssertIterKey("ab", iter.get());
1435   iter->Prev();
1436   AssertIterKey("aa", iter.get());
1437   iter->Prev();
1438   AssertIterKey("a", iter.get());
1439   batch.Delete("aa");
1440   iter->Next();
1441   AssertIterKey("ab", iter.get());
1442   iter->Prev();
1443   AssertIterKey("a", iter.get());
1444 
1445   batch.Delete("x");
1446   iter->Seek("x");
1447   AssertIterKey("y", iter.get());
1448   iter->Next();
1449   AssertIterKey("z", iter.get());
1450   iter->Prev();
1451   iter->Prev();
1452   AssertIterKey("w", iter.get());
1453 
1454   batch.Delete("e");
1455   iter->Seek("e");
1456   AssertIterKey("ee", iter.get());
1457   AssertIterValue("ee", iter.get());
1458   batch.Put("ee", "xx");
1459   // still the same value
1460   AssertIterValue("ee", iter.get());
1461   iter->Next();
1462   AssertIterKey("em", iter.get());
1463   iter->Prev();
1464   // new value
1465   AssertIterValue("xx", iter.get());
1466 }
1467 
1468 // stress testing mutations with IteratorWithBase
1469 TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) {
1470   WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
1471   for (char c = 'a'; c <= 'z'; ++c) {
1472     batch.Put(std::string(1, c), std::string(1, c));
1473   }
1474 
1475   KVMap map;
1476   for (char c = 'a'; c <= 'z'; ++c) {
1477     map[std::string(2, c)] = std::string(2, c);
1478   }
1479 
1480   std::unique_ptr<Iterator> iter(
1481       batch.NewIteratorWithBase(new KVIter(&map)));
1482 
1483   Random rnd(301);
1484   for (int i = 0; i < 1000000; ++i) {
1485     int random = rnd.Uniform(8);
1486     char c = static_cast<char>(rnd.Uniform(26) + 'a');
1487     switch (random) {
1488       case 0:
1489         batch.Put(std::string(1, c), "xxx");
1490         break;
1491       case 1:
1492         batch.Put(std::string(2, c), "xxx");
1493         break;
1494       case 2:
1495         batch.Delete(std::string(1, c));
1496         break;
1497       case 3:
1498         batch.Delete(std::string(2, c));
1499         break;
1500       case 4:
1501         iter->Seek(std::string(1, c));
1502         break;
1503       case 5:
1504         iter->Seek(std::string(2, c));
1505         break;
1506       case 6:
1507         if (iter->Valid()) {
1508           iter->Next();
1509         }
1510         break;
1511       case 7:
1512         if (iter->Valid()) {
1513           iter->Prev();
1514         }
1515         break;
1516       default:
1517         assert(false);
1518     }
1519   }
1520 }
1521 
1522 static std::string PrintContents(WriteBatchWithIndex* batch,
1523                                  ColumnFamilyHandle* column_family) {
1524   std::string result;
1525 
1526   WBWIIterator* iter;
1527   if (column_family == nullptr) {
1528     iter = batch->NewIterator();
1529   } else {
1530     iter = batch->NewIterator(column_family);
1531   }
1532 
1533   iter->SeekToFirst();
1534   while (iter->Valid()) {
1535     WriteEntry e = iter->Entry();
1536 
1537     if (e.type == kPutRecord) {
1538       result.append("PUT(");
1539       result.append(e.key.ToString());
1540       result.append("):");
1541       result.append(e.value.ToString());
1542     } else if (e.type == kMergeRecord) {
1543       result.append("MERGE(");
1544       result.append(e.key.ToString());
1545       result.append("):");
1546       result.append(e.value.ToString());
1547     } else if (e.type == kSingleDeleteRecord) {
1548       result.append("SINGLE-DEL(");
1549       result.append(e.key.ToString());
1550       result.append(")");
1551     } else {
1552       assert(e.type == kDeleteRecord);
1553       result.append("DEL(");
1554       result.append(e.key.ToString());
1555       result.append(")");
1556     }
1557 
1558     result.append(",");
1559     iter->Next();
1560   }
1561 
1562   delete iter;
1563   return result;
1564 }
1565 
1566 static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map,
1567                                  ColumnFamilyHandle* column_family) {
1568   std::string result;
1569 
1570   Iterator* iter;
1571   if (column_family == nullptr) {
1572     iter = batch->NewIteratorWithBase(new KVIter(base_map));
1573   } else {
1574     iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map));
1575   }
1576 
1577   iter->SeekToFirst();
1578   while (iter->Valid()) {
1579     assert(iter->status().ok());
1580 
1581     Slice key = iter->key();
1582     Slice value = iter->value();
1583 
1584     result.append(key.ToString());
1585     result.append(":");
1586     result.append(value.ToString());
1587     result.append(",");
1588 
1589     iter->Next();
1590   }
1591 
1592   delete iter;
1593   return result;
1594 }
1595 
1596 TEST_F(WriteBatchWithIndexTest, SavePointTest) {
1597   WriteBatchWithIndex batch;
1598   ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
1599   Status s;
1600 
1601   batch.Put("A", "a");
1602   batch.Put("B", "b");
1603   batch.Put("A", "aa");
1604   batch.Put(&cf1, "A", "a1");
1605   batch.Delete(&cf1, "B");
1606   batch.Put(&cf1, "C", "c1");
1607   batch.Put(&cf1, "E", "e1");
1608 
1609   batch.SetSavePoint();  // 1
1610 
1611   batch.Put("C", "cc");
1612   batch.Put("B", "bb");
1613   batch.Delete("A");
1614   batch.Put(&cf1, "B", "b1");
1615   batch.Delete(&cf1, "A");
1616   batch.SingleDelete(&cf1, "E");
1617   batch.SetSavePoint();  // 2
1618 
1619   batch.Put("A", "aaa");
1620   batch.Put("A", "xxx");
1621   batch.Delete("B");
1622   batch.Put(&cf1, "B", "b2");
1623   batch.Delete(&cf1, "C");
1624   batch.SetSavePoint();  // 3
1625   batch.SetSavePoint();  // 4
1626   batch.SingleDelete("D");
1627   batch.Delete(&cf1, "D");
1628   batch.Delete(&cf1, "E");
1629 
1630   ASSERT_EQ(
1631       "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
1632       "B)"
1633       ",PUT(C):cc,SINGLE-DEL(D),",
1634       PrintContents(&batch, nullptr));
1635 
1636   ASSERT_EQ(
1637       "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),"
1638       "DEL(D),PUT(E):e1,SINGLE-DEL(E),DEL(E),",
1639       PrintContents(&batch, &cf1));
1640 
1641   ASSERT_OK(batch.RollbackToSavePoint());  // rollback to 4
1642   ASSERT_EQ(
1643       "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
1644       "B)"
1645       ",PUT(C):cc,",
1646       PrintContents(&batch, nullptr));
1647 
1648   ASSERT_EQ(
1649       "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),"
1650       "PUT(E):e1,SINGLE-DEL(E),",
1651       PrintContents(&batch, &cf1));
1652 
1653   ASSERT_OK(batch.RollbackToSavePoint());  // rollback to 3
1654   ASSERT_EQ(
1655       "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
1656       "B)"
1657       ",PUT(C):cc,",
1658       PrintContents(&batch, nullptr));
1659 
1660   ASSERT_EQ(
1661       "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),"
1662       "PUT(E):e1,SINGLE-DEL(E),",
1663       PrintContents(&batch, &cf1));
1664 
1665   ASSERT_OK(batch.RollbackToSavePoint());  // rollback to 2
1666   ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,",
1667             PrintContents(&batch, nullptr));
1668 
1669   ASSERT_EQ(
1670       "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,"
1671       "PUT(E):e1,SINGLE-DEL(E),",
1672       PrintContents(&batch, &cf1));
1673 
1674   batch.SetSavePoint();  // 5
1675   batch.Put("X", "x");
1676 
1677   ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,",
1678             PrintContents(&batch, nullptr));
1679 
1680   ASSERT_OK(batch.RollbackToSavePoint());  // rollback to 5
1681   ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,",
1682             PrintContents(&batch, nullptr));
1683 
1684   ASSERT_EQ(
1685       "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,"
1686       "PUT(E):e1,SINGLE-DEL(E),",
1687       PrintContents(&batch, &cf1));
1688 
1689   ASSERT_OK(batch.RollbackToSavePoint());  // rollback to 1
1690   ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr));
1691 
1692   ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,",
1693             PrintContents(&batch, &cf1));
1694 
1695   s = batch.RollbackToSavePoint();  // no savepoint found
1696   ASSERT_TRUE(s.IsNotFound());
1697   ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr));
1698 
1699   ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,",
1700             PrintContents(&batch, &cf1));
1701 
1702   batch.SetSavePoint();  // 6
1703 
1704   batch.Clear();
1705   ASSERT_EQ("", PrintContents(&batch, nullptr));
1706   ASSERT_EQ("", PrintContents(&batch, &cf1));
1707 
1708   s = batch.RollbackToSavePoint();  // rollback to 6
1709   ASSERT_TRUE(s.IsNotFound());
1710 }
1711 
1712 TEST_F(WriteBatchWithIndexTest, SingleDeleteTest) {
1713   WriteBatchWithIndex batch;
1714   Status s;
1715   std::string value;
1716   DBOptions db_options;
1717 
1718   batch.SingleDelete("A");
1719 
1720   s = batch.GetFromBatch(db_options, "A", &value);
1721   ASSERT_TRUE(s.IsNotFound());
1722   s = batch.GetFromBatch(db_options, "B", &value);
1723   ASSERT_TRUE(s.IsNotFound());
1724   value = PrintContents(&batch, nullptr);
1725   ASSERT_EQ("SINGLE-DEL(A),", value);
1726 
1727   batch.Clear();
1728   batch.Put("A", "a");
1729   batch.Put("A", "a2");
1730   batch.Put("B", "b");
1731   batch.SingleDelete("A");
1732 
1733   s = batch.GetFromBatch(db_options, "A", &value);
1734   ASSERT_TRUE(s.IsNotFound());
1735   s = batch.GetFromBatch(db_options, "B", &value);
1736   ASSERT_OK(s);
1737   ASSERT_EQ("b", value);
1738 
1739   value = PrintContents(&batch, nullptr);
1740   ASSERT_EQ("PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(B):b,", value);
1741 
1742   batch.Put("C", "c");
1743   batch.Put("A", "a3");
1744   batch.Delete("B");
1745   batch.SingleDelete("B");
1746   batch.SingleDelete("C");
1747 
1748   s = batch.GetFromBatch(db_options, "A", &value);
1749   ASSERT_OK(s);
1750   ASSERT_EQ("a3", value);
1751   s = batch.GetFromBatch(db_options, "B", &value);
1752   ASSERT_TRUE(s.IsNotFound());
1753   s = batch.GetFromBatch(db_options, "C", &value);
1754   ASSERT_TRUE(s.IsNotFound());
1755   s = batch.GetFromBatch(db_options, "D", &value);
1756   ASSERT_TRUE(s.IsNotFound());
1757 
1758   value = PrintContents(&batch, nullptr);
1759   ASSERT_EQ(
1760       "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,PUT(B):b,DEL(B),SINGLE-DEL(B)"
1761       ",PUT(C):c,SINGLE-DEL(C),",
1762       value);
1763 
1764   batch.Put("B", "b4");
1765   batch.Put("C", "c4");
1766   batch.Put("D", "d4");
1767   batch.SingleDelete("D");
1768   batch.SingleDelete("D");
1769   batch.Delete("A");
1770 
1771   s = batch.GetFromBatch(db_options, "A", &value);
1772   ASSERT_TRUE(s.IsNotFound());
1773   s = batch.GetFromBatch(db_options, "B", &value);
1774   ASSERT_OK(s);
1775   ASSERT_EQ("b4", value);
1776   s = batch.GetFromBatch(db_options, "C", &value);
1777   ASSERT_OK(s);
1778   ASSERT_EQ("c4", value);
1779   s = batch.GetFromBatch(db_options, "D", &value);
1780   ASSERT_TRUE(s.IsNotFound());
1781 
1782   value = PrintContents(&batch, nullptr);
1783   ASSERT_EQ(
1784       "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,DEL(A),PUT(B):b,DEL(B),"
1785       "SINGLE-DEL(B),PUT(B):b4,PUT(C):c,SINGLE-DEL(C),PUT(C):c4,PUT(D):d4,"
1786       "SINGLE-DEL(D),SINGLE-DEL(D),",
1787       value);
1788 }
1789 
1790 TEST_F(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) {
1791   Status s;
1792   std::string value;
1793   DBOptions db_options;
1794   WriteBatchWithIndex batch(BytewiseComparator(), 20, true /* overwrite_key */);
1795   batch.Put("A", "a");
1796   batch.Put("A", "a2");
1797   batch.Put("B", "b");
1798   batch.SingleDelete("A");
1799   batch.Delete("B");
1800 
1801   KVMap map;
1802   value = PrintContents(&batch, &map, nullptr);
1803   ASSERT_EQ("", value);
1804 
1805   map["A"] = "aa";
1806   map["C"] = "cc";
1807   map["D"] = "dd";
1808 
1809   batch.SingleDelete("B");
1810   batch.SingleDelete("C");
1811   batch.SingleDelete("Z");
1812 
1813   value = PrintContents(&batch, &map, nullptr);
1814   ASSERT_EQ("D:dd,", value);
1815 
1816   batch.Put("A", "a3");
1817   batch.Put("B", "b3");
1818   batch.SingleDelete("A");
1819   batch.SingleDelete("A");
1820   batch.SingleDelete("D");
1821   batch.SingleDelete("D");
1822   batch.Delete("D");
1823 
1824   map["E"] = "ee";
1825 
1826   value = PrintContents(&batch, &map, nullptr);
1827   ASSERT_EQ("B:b3,E:ee,", value);
1828 }
1829 
1830 }  // namespace ROCKSDB_NAMESPACE
1831 
1832 int main(int argc, char** argv) {
1833   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1834   ::testing::InitGoogleTest(&argc, argv);
1835   return RUN_ALL_TESTS();
1836 }
1837 
1838 #else
1839 #include <stdio.h>
1840 
1841 int main() {
1842   fprintf(stderr, "SKIPPED\n");
1843   return 0;
1844 }
1845 
1846 #endif  // !ROCKSDB_LITE
1847