1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "rocksdb/db.h"
11 
12 #include <memory>
13 #include "db/column_family.h"
14 #include "db/memtable.h"
15 #include "db/write_batch_internal.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/memtablerep.h"
18 #include "rocksdb/utilities/write_batch_with_index.h"
19 #include "rocksdb/write_buffer_manager.h"
20 #include "table/scoped_arena_iterator.h"
21 #include "test_util/testharness.h"
22 #include "util/string_util.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 
PrintContents(WriteBatch * b)26 static std::string PrintContents(WriteBatch* b) {
27   InternalKeyComparator cmp(BytewiseComparator());
28   auto factory = std::make_shared<SkipListFactory>();
29   Options options;
30   options.memtable_factory = factory;
31   ImmutableCFOptions ioptions(options);
32   WriteBufferManager wb(options.db_write_buffer_size);
33   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
34                                kMaxSequenceNumber, 0 /* column_family_id */);
35   mem->Ref();
36   std::string state;
37   ColumnFamilyMemTablesDefault cf_mems_default(mem);
38   Status s =
39       WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr);
40   uint32_t count = 0;
41   int put_count = 0;
42   int delete_count = 0;
43   int single_delete_count = 0;
44   int delete_range_count = 0;
45   int merge_count = 0;
46   for (int i = 0; i < 2; ++i) {
47     Arena arena;
48     ScopedArenaIterator arena_iter_guard;
49     std::unique_ptr<InternalIterator> iter_guard;
50     InternalIterator* iter;
51     if (i == 0) {
52       iter = mem->NewIterator(ReadOptions(), &arena);
53       arena_iter_guard.set(iter);
54     } else {
55       iter = mem->NewRangeTombstoneIterator(ReadOptions(),
56                                             kMaxSequenceNumber /* read_seq */);
57       iter_guard.reset(iter);
58     }
59     if (iter == nullptr) {
60       continue;
61     }
62     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
63       ParsedInternalKey ikey;
64       ikey.clear();
65       EXPECT_TRUE(ParseInternalKey(iter->key(), &ikey));
66       switch (ikey.type) {
67         case kTypeValue:
68           state.append("Put(");
69           state.append(ikey.user_key.ToString());
70           state.append(", ");
71           state.append(iter->value().ToString());
72           state.append(")");
73           count++;
74           put_count++;
75           break;
76         case kTypeDeletion:
77           state.append("Delete(");
78           state.append(ikey.user_key.ToString());
79           state.append(")");
80           count++;
81           delete_count++;
82           break;
83         case kTypeSingleDeletion:
84           state.append("SingleDelete(");
85           state.append(ikey.user_key.ToString());
86           state.append(")");
87           count++;
88           single_delete_count++;
89           break;
90         case kTypeRangeDeletion:
91           state.append("DeleteRange(");
92           state.append(ikey.user_key.ToString());
93           state.append(", ");
94           state.append(iter->value().ToString());
95           state.append(")");
96           count++;
97           delete_range_count++;
98           break;
99         case kTypeMerge:
100           state.append("Merge(");
101           state.append(ikey.user_key.ToString());
102           state.append(", ");
103           state.append(iter->value().ToString());
104           state.append(")");
105           count++;
106           merge_count++;
107           break;
108         default:
109           assert(false);
110           break;
111       }
112       state.append("@");
113       state.append(NumberToString(ikey.sequence));
114     }
115   }
116   EXPECT_EQ(b->HasPut(), put_count > 0);
117   EXPECT_EQ(b->HasDelete(), delete_count > 0);
118   EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
119   EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0);
120   EXPECT_EQ(b->HasMerge(), merge_count > 0);
121   if (!s.ok()) {
122     state.append(s.ToString());
123   } else if (count != WriteBatchInternal::Count(b)) {
124     state.append("CountMismatch()");
125   }
126   delete mem->Unref();
127   return state;
128 }
129 
130 class WriteBatchTest : public testing::Test {};
131 
TEST_F(WriteBatchTest,Empty)132 TEST_F(WriteBatchTest, Empty) {
133   WriteBatch batch;
134   ASSERT_EQ("", PrintContents(&batch));
135   ASSERT_EQ(0u, WriteBatchInternal::Count(&batch));
136   ASSERT_EQ(0u, batch.Count());
137 }
138 
TEST_F(WriteBatchTest,Multiple)139 TEST_F(WriteBatchTest, Multiple) {
140   WriteBatch batch;
141   batch.Put(Slice("foo"), Slice("bar"));
142   batch.Delete(Slice("box"));
143   batch.DeleteRange(Slice("bar"), Slice("foo"));
144   batch.Put(Slice("baz"), Slice("boo"));
145   WriteBatchInternal::SetSequence(&batch, 100);
146   ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
147   ASSERT_EQ(4u, WriteBatchInternal::Count(&batch));
148   ASSERT_EQ(
149       "Put(baz, boo)@103"
150       "Delete(box)@101"
151       "Put(foo, bar)@100"
152       "DeleteRange(bar, foo)@102",
153       PrintContents(&batch));
154   ASSERT_EQ(4u, batch.Count());
155 }
156 
TEST_F(WriteBatchTest,Corruption)157 TEST_F(WriteBatchTest, Corruption) {
158   WriteBatch batch;
159   batch.Put(Slice("foo"), Slice("bar"));
160   batch.Delete(Slice("box"));
161   WriteBatchInternal::SetSequence(&batch, 200);
162   Slice contents = WriteBatchInternal::Contents(&batch);
163   WriteBatchInternal::SetContents(&batch,
164                                   Slice(contents.data(),contents.size()-1));
165   ASSERT_EQ("Put(foo, bar)@200"
166             "Corruption: bad WriteBatch Delete",
167             PrintContents(&batch));
168 }
169 
TEST_F(WriteBatchTest,Append)170 TEST_F(WriteBatchTest, Append) {
171   WriteBatch b1, b2;
172   WriteBatchInternal::SetSequence(&b1, 200);
173   WriteBatchInternal::SetSequence(&b2, 300);
174   WriteBatchInternal::Append(&b1, &b2);
175   ASSERT_EQ("",
176             PrintContents(&b1));
177   ASSERT_EQ(0u, b1.Count());
178   b2.Put("a", "va");
179   WriteBatchInternal::Append(&b1, &b2);
180   ASSERT_EQ("Put(a, va)@200",
181             PrintContents(&b1));
182   ASSERT_EQ(1u, b1.Count());
183   b2.Clear();
184   b2.Put("b", "vb");
185   WriteBatchInternal::Append(&b1, &b2);
186   ASSERT_EQ("Put(a, va)@200"
187             "Put(b, vb)@201",
188             PrintContents(&b1));
189   ASSERT_EQ(2u, b1.Count());
190   b2.Delete("foo");
191   WriteBatchInternal::Append(&b1, &b2);
192   ASSERT_EQ("Put(a, va)@200"
193             "Put(b, vb)@202"
194             "Put(b, vb)@201"
195             "Delete(foo)@203",
196             PrintContents(&b1));
197   ASSERT_EQ(4u, b1.Count());
198   b2.Clear();
199   b2.Put("c", "cc");
200   b2.Put("d", "dd");
201   b2.MarkWalTerminationPoint();
202   b2.Put("e", "ee");
203   WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true);
204   ASSERT_EQ(
205       "Put(a, va)@200"
206       "Put(b, vb)@202"
207       "Put(b, vb)@201"
208       "Put(c, cc)@204"
209       "Put(d, dd)@205"
210       "Delete(foo)@203",
211       PrintContents(&b1));
212   ASSERT_EQ(6u, b1.Count());
213   ASSERT_EQ(
214       "Put(c, cc)@0"
215       "Put(d, dd)@1"
216       "Put(e, ee)@2",
217       PrintContents(&b2));
218   ASSERT_EQ(3u, b2.Count());
219 }
220 
TEST_F(WriteBatchTest,SingleDeletion)221 TEST_F(WriteBatchTest, SingleDeletion) {
222   WriteBatch batch;
223   WriteBatchInternal::SetSequence(&batch, 100);
224   ASSERT_EQ("", PrintContents(&batch));
225   ASSERT_EQ(0u, batch.Count());
226   batch.Put("a", "va");
227   ASSERT_EQ("Put(a, va)@100", PrintContents(&batch));
228   ASSERT_EQ(1u, batch.Count());
229   batch.SingleDelete("a");
230   ASSERT_EQ(
231       "SingleDelete(a)@101"
232       "Put(a, va)@100",
233       PrintContents(&batch));
234   ASSERT_EQ(2u, batch.Count());
235 }
236 
237 namespace {
238   struct TestHandler : public WriteBatch::Handler {
239     std::string seen;
PutCFROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler240     Status PutCF(uint32_t column_family_id, const Slice& key,
241                  const Slice& value) override {
242       if (column_family_id == 0) {
243         seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
244       } else {
245         seen += "PutCF(" + ToString(column_family_id) + ", " +
246                 key.ToString() + ", " + value.ToString() + ")";
247       }
248       return Status::OK();
249     }
DeleteCFROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler250     Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
251       if (column_family_id == 0) {
252         seen += "Delete(" + key.ToString() + ")";
253       } else {
254         seen += "DeleteCF(" + ToString(column_family_id) + ", " +
255                 key.ToString() + ")";
256       }
257       return Status::OK();
258     }
SingleDeleteCFROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler259     Status SingleDeleteCF(uint32_t column_family_id,
260                           const Slice& key) override {
261       if (column_family_id == 0) {
262         seen += "SingleDelete(" + key.ToString() + ")";
263       } else {
264         seen += "SingleDeleteCF(" + ToString(column_family_id) + ", " +
265                 key.ToString() + ")";
266       }
267       return Status::OK();
268     }
DeleteRangeCFROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler269     Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
270                          const Slice& end_key) override {
271       if (column_family_id == 0) {
272         seen += "DeleteRange(" + begin_key.ToString() + ", " +
273                 end_key.ToString() + ")";
274       } else {
275         seen += "DeleteRangeCF(" + ToString(column_family_id) + ", " +
276                 begin_key.ToString() + ", " + end_key.ToString() + ")";
277       }
278       return Status::OK();
279     }
MergeCFROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler280     Status MergeCF(uint32_t column_family_id, const Slice& key,
281                    const Slice& value) override {
282       if (column_family_id == 0) {
283         seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
284       } else {
285         seen += "MergeCF(" + ToString(column_family_id) + ", " +
286                 key.ToString() + ", " + value.ToString() + ")";
287       }
288       return Status::OK();
289     }
LogDataROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler290     void LogData(const Slice& blob) override {
291       seen += "LogData(" + blob.ToString() + ")";
292     }
MarkBeginPrepareROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler293     Status MarkBeginPrepare(bool unprepare) override {
294       seen +=
295           "MarkBeginPrepare(" + std::string(unprepare ? "true" : "false") + ")";
296       return Status::OK();
297     }
MarkEndPrepareROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler298     Status MarkEndPrepare(const Slice& xid) override {
299       seen += "MarkEndPrepare(" + xid.ToString() + ")";
300       return Status::OK();
301     }
MarkNoopROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler302     Status MarkNoop(bool empty_batch) override {
303       seen += "MarkNoop(" + std::string(empty_batch ? "true" : "false") + ")";
304       return Status::OK();
305     }
MarkCommitROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler306     Status MarkCommit(const Slice& xid) override {
307       seen += "MarkCommit(" + xid.ToString() + ")";
308       return Status::OK();
309     }
MarkRollbackROCKSDB_NAMESPACE::__anon0f00522c0111::TestHandler310     Status MarkRollback(const Slice& xid) override {
311       seen += "MarkRollback(" + xid.ToString() + ")";
312       return Status::OK();
313     }
314   };
315 }
316 
TEST_F(WriteBatchTest,PutNotImplemented)317 TEST_F(WriteBatchTest, PutNotImplemented) {
318   WriteBatch batch;
319   batch.Put(Slice("k1"), Slice("v1"));
320   ASSERT_EQ(1u, batch.Count());
321   ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
322 
323   WriteBatch::Handler handler;
324   ASSERT_OK(batch.Iterate(&handler));
325 }
326 
TEST_F(WriteBatchTest,DeleteNotImplemented)327 TEST_F(WriteBatchTest, DeleteNotImplemented) {
328   WriteBatch batch;
329   batch.Delete(Slice("k2"));
330   ASSERT_EQ(1u, batch.Count());
331   ASSERT_EQ("Delete(k2)@0", PrintContents(&batch));
332 
333   WriteBatch::Handler handler;
334   ASSERT_OK(batch.Iterate(&handler));
335 }
336 
TEST_F(WriteBatchTest,SingleDeleteNotImplemented)337 TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
338   WriteBatch batch;
339   batch.SingleDelete(Slice("k2"));
340   ASSERT_EQ(1u, batch.Count());
341   ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch));
342 
343   WriteBatch::Handler handler;
344   ASSERT_OK(batch.Iterate(&handler));
345 }
346 
TEST_F(WriteBatchTest,MergeNotImplemented)347 TEST_F(WriteBatchTest, MergeNotImplemented) {
348   WriteBatch batch;
349   batch.Merge(Slice("foo"), Slice("bar"));
350   ASSERT_EQ(1u, batch.Count());
351   ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch));
352 
353   WriteBatch::Handler handler;
354   ASSERT_OK(batch.Iterate(&handler));
355 }
356 
TEST_F(WriteBatchTest,Blob)357 TEST_F(WriteBatchTest, Blob) {
358   WriteBatch batch;
359   batch.Put(Slice("k1"), Slice("v1"));
360   batch.Put(Slice("k2"), Slice("v2"));
361   batch.Put(Slice("k3"), Slice("v3"));
362   batch.PutLogData(Slice("blob1"));
363   batch.Delete(Slice("k2"));
364   batch.SingleDelete(Slice("k3"));
365   batch.PutLogData(Slice("blob2"));
366   batch.Merge(Slice("foo"), Slice("bar"));
367   ASSERT_EQ(6u, batch.Count());
368   ASSERT_EQ(
369       "Merge(foo, bar)@5"
370       "Put(k1, v1)@0"
371       "Delete(k2)@3"
372       "Put(k2, v2)@1"
373       "SingleDelete(k3)@4"
374       "Put(k3, v3)@2",
375       PrintContents(&batch));
376 
377   TestHandler handler;
378   batch.Iterate(&handler);
379   ASSERT_EQ(
380       "Put(k1, v1)"
381       "Put(k2, v2)"
382       "Put(k3, v3)"
383       "LogData(blob1)"
384       "Delete(k2)"
385       "SingleDelete(k3)"
386       "LogData(blob2)"
387       "Merge(foo, bar)",
388       handler.seen);
389 }
390 
TEST_F(WriteBatchTest,PrepareCommit)391 TEST_F(WriteBatchTest, PrepareCommit) {
392   WriteBatch batch;
393   WriteBatchInternal::InsertNoop(&batch);
394   batch.Put(Slice("k1"), Slice("v1"));
395   batch.Put(Slice("k2"), Slice("v2"));
396   batch.SetSavePoint();
397   WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1"));
398   Status s = batch.RollbackToSavePoint();
399   ASSERT_EQ(s, Status::NotFound());
400   WriteBatchInternal::MarkCommit(&batch, Slice("xid1"));
401   WriteBatchInternal::MarkRollback(&batch, Slice("xid1"));
402   ASSERT_EQ(2u, batch.Count());
403 
404   TestHandler handler;
405   batch.Iterate(&handler);
406   ASSERT_EQ(
407       "MarkBeginPrepare(false)"
408       "Put(k1, v1)"
409       "Put(k2, v2)"
410       "MarkEndPrepare(xid1)"
411       "MarkCommit(xid1)"
412       "MarkRollback(xid1)",
413       handler.seen);
414 }
415 
416 // It requires more than 30GB of memory to run the test. With single memory
417 // allocation of more than 30GB.
418 // Not all platform can run it. Also it runs a long time. So disable it.
TEST_F(WriteBatchTest,DISABLED_ManyUpdates)419 TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
420   // Insert key and value of 3GB and push total batch size to 12GB.
421   static const size_t kKeyValueSize = 4u;
422   static const uint32_t kNumUpdates = uint32_t(3 << 30);
423   std::string raw(kKeyValueSize, 'A');
424   WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u);
425   char c = 'A';
426   for (uint32_t i = 0; i < kNumUpdates; i++) {
427     if (c > 'Z') {
428       c = 'A';
429     }
430     raw[0] = c;
431     raw[raw.length() - 1] = c;
432     c++;
433     batch.Put(raw, raw);
434   }
435 
436   ASSERT_EQ(kNumUpdates, batch.Count());
437 
438   struct NoopHandler : public WriteBatch::Handler {
439     uint32_t num_seen = 0;
440     char expected_char = 'A';
441     Status PutCF(uint32_t /*column_family_id*/, const Slice& key,
442                  const Slice& value) override {
443       EXPECT_EQ(kKeyValueSize, key.size());
444       EXPECT_EQ(kKeyValueSize, value.size());
445       EXPECT_EQ(expected_char, key[0]);
446       EXPECT_EQ(expected_char, value[0]);
447       EXPECT_EQ(expected_char, key[kKeyValueSize - 1]);
448       EXPECT_EQ(expected_char, value[kKeyValueSize - 1]);
449       expected_char++;
450       if (expected_char > 'Z') {
451         expected_char = 'A';
452       }
453       ++num_seen;
454       return Status::OK();
455     }
456     Status DeleteCF(uint32_t /*column_family_id*/,
457                     const Slice& /*key*/) override {
458       ADD_FAILURE();
459       return Status::OK();
460     }
461     Status SingleDeleteCF(uint32_t /*column_family_id*/,
462                           const Slice& /*key*/) override {
463       ADD_FAILURE();
464       return Status::OK();
465     }
466     Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
467                    const Slice& /*value*/) override {
468       ADD_FAILURE();
469       return Status::OK();
470     }
471     void LogData(const Slice& /*blob*/) override { ADD_FAILURE(); }
472     bool Continue() override { return num_seen < kNumUpdates; }
473   } handler;
474 
475   batch.Iterate(&handler);
476   ASSERT_EQ(kNumUpdates, handler.num_seen);
477 }
478 
479 // The test requires more than 18GB memory to run it, with single memory
480 // allocation of more than 12GB. Not all the platform can run it. So disable it.
TEST_F(WriteBatchTest,DISABLED_LargeKeyValue)481 TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
482   // Insert key and value of 3GB and push total batch size to 12GB.
483   static const size_t kKeyValueSize = 3221225472u;
484   std::string raw(kKeyValueSize, 'A');
485   WriteBatch batch(size_t(12884901888ull + 1024u));
486   for (char i = 0; i < 2; i++) {
487     raw[0] = 'A' + i;
488     raw[raw.length() - 1] = 'A' - i;
489     batch.Put(raw, raw);
490   }
491 
492   ASSERT_EQ(2u, batch.Count());
493 
494   struct NoopHandler : public WriteBatch::Handler {
495     int num_seen = 0;
496     Status PutCF(uint32_t /*column_family_id*/, const Slice& key,
497                  const Slice& value) override {
498       EXPECT_EQ(kKeyValueSize, key.size());
499       EXPECT_EQ(kKeyValueSize, value.size());
500       EXPECT_EQ('A' + num_seen, key[0]);
501       EXPECT_EQ('A' + num_seen, value[0]);
502       EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]);
503       EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]);
504       ++num_seen;
505       return Status::OK();
506     }
507     Status DeleteCF(uint32_t /*column_family_id*/,
508                     const Slice& /*key*/) override {
509       ADD_FAILURE();
510       return Status::OK();
511     }
512     Status SingleDeleteCF(uint32_t /*column_family_id*/,
513                           const Slice& /*key*/) override {
514       ADD_FAILURE();
515       return Status::OK();
516     }
517     Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
518                    const Slice& /*value*/) override {
519       ADD_FAILURE();
520       return Status::OK();
521     }
522     void LogData(const Slice& /*blob*/) override { ADD_FAILURE(); }
523     bool Continue() override { return num_seen < 2; }
524   } handler;
525 
526   batch.Iterate(&handler);
527   ASSERT_EQ(2, handler.num_seen);
528 }
529 
TEST_F(WriteBatchTest,Continue)530 TEST_F(WriteBatchTest, Continue) {
531   WriteBatch batch;
532 
533   struct Handler : public TestHandler {
534     int num_seen = 0;
535     Status PutCF(uint32_t column_family_id, const Slice& key,
536                  const Slice& value) override {
537       ++num_seen;
538       return TestHandler::PutCF(column_family_id, key, value);
539     }
540     Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
541       ++num_seen;
542       return TestHandler::DeleteCF(column_family_id, key);
543     }
544     Status SingleDeleteCF(uint32_t column_family_id,
545                           const Slice& key) override {
546       ++num_seen;
547       return TestHandler::SingleDeleteCF(column_family_id, key);
548     }
549     Status MergeCF(uint32_t column_family_id, const Slice& key,
550                    const Slice& value) override {
551       ++num_seen;
552       return TestHandler::MergeCF(column_family_id, key, value);
553     }
554     void LogData(const Slice& blob) override {
555       ++num_seen;
556       TestHandler::LogData(blob);
557     }
558     bool Continue() override { return num_seen < 5; }
559   } handler;
560 
561   batch.Put(Slice("k1"), Slice("v1"));
562   batch.Put(Slice("k2"), Slice("v2"));
563   batch.PutLogData(Slice("blob1"));
564   batch.Delete(Slice("k1"));
565   batch.SingleDelete(Slice("k2"));
566   batch.PutLogData(Slice("blob2"));
567   batch.Merge(Slice("foo"), Slice("bar"));
568   batch.Iterate(&handler);
569   ASSERT_EQ(
570       "Put(k1, v1)"
571       "Put(k2, v2)"
572       "LogData(blob1)"
573       "Delete(k1)"
574       "SingleDelete(k2)",
575       handler.seen);
576 }
577 
TEST_F(WriteBatchTest,PutGatherSlices)578 TEST_F(WriteBatchTest, PutGatherSlices) {
579   WriteBatch batch;
580   batch.Put(Slice("foo"), Slice("bar"));
581 
582   {
583     // Try a write where the key is one slice but the value is two
584     Slice key_slice("baz");
585     Slice value_slices[2] = { Slice("header"), Slice("payload") };
586     batch.Put(SliceParts(&key_slice, 1),
587               SliceParts(value_slices, 2));
588   }
589 
590   {
591     // One where the key is composite but the value is a single slice
592     Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") };
593     Slice value_slice("value");
594     batch.Put(SliceParts(key_slices, 3),
595               SliceParts(&value_slice, 1));
596   }
597 
598   WriteBatchInternal::SetSequence(&batch, 100);
599   ASSERT_EQ("Put(baz, headerpayload)@101"
600             "Put(foo, bar)@100"
601             "Put(keypart2part3, value)@102",
602             PrintContents(&batch));
603   ASSERT_EQ(3u, batch.Count());
604 }
605 
606 namespace {
607 class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
608  public:
ColumnFamilyHandleImplDummy(int id)609   explicit ColumnFamilyHandleImplDummy(int id)
610       : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
GetID() const611   uint32_t GetID() const override { return id_; }
GetComparator() const612   const Comparator* GetComparator() const override {
613     return BytewiseComparator();
614   }
615 
616  private:
617   uint32_t id_;
618 };
619 }  // namespace anonymous
620 
TEST_F(WriteBatchTest,ColumnFamiliesBatchTest)621 TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
622   WriteBatch batch;
623   ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
624   batch.Put(&zero, Slice("foo"), Slice("bar"));
625   batch.Put(&two, Slice("twofoo"), Slice("bar2"));
626   batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
627   batch.Delete(&eight, Slice("eightfoo"));
628   batch.SingleDelete(&two, Slice("twofoo"));
629   batch.DeleteRange(&two, Slice("3foo"), Slice("4foo"));
630   batch.Merge(&three, Slice("threethree"), Slice("3three"));
631   batch.Put(&zero, Slice("foo"), Slice("bar"));
632   batch.Merge(Slice("omom"), Slice("nom"));
633 
634   TestHandler handler;
635   batch.Iterate(&handler);
636   ASSERT_EQ(
637       "Put(foo, bar)"
638       "PutCF(2, twofoo, bar2)"
639       "PutCF(8, eightfoo, bar8)"
640       "DeleteCF(8, eightfoo)"
641       "SingleDeleteCF(2, twofoo)"
642       "DeleteRangeCF(2, 3foo, 4foo)"
643       "MergeCF(3, threethree, 3three)"
644       "Put(foo, bar)"
645       "Merge(omom, nom)",
646       handler.seen);
647 }
648 
649 #ifndef ROCKSDB_LITE
TEST_F(WriteBatchTest,ColumnFamiliesBatchWithIndexTest)650 TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
651   WriteBatchWithIndex batch;
652   ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
653   batch.Put(&zero, Slice("foo"), Slice("bar"));
654   batch.Put(&two, Slice("twofoo"), Slice("bar2"));
655   batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
656   batch.Delete(&eight, Slice("eightfoo"));
657   batch.SingleDelete(&two, Slice("twofoo"));
658   batch.Merge(&three, Slice("threethree"), Slice("3three"));
659   batch.Put(&zero, Slice("foo"), Slice("bar"));
660   batch.Merge(Slice("omom"), Slice("nom"));
661 
662   std::unique_ptr<WBWIIterator> iter;
663 
664   iter.reset(batch.NewIterator(&eight));
665   iter->Seek("eightfoo");
666   ASSERT_OK(iter->status());
667   ASSERT_TRUE(iter->Valid());
668   ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
669   ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
670   ASSERT_EQ("bar8", iter->Entry().value.ToString());
671 
672   iter->Next();
673   ASSERT_OK(iter->status());
674   ASSERT_TRUE(iter->Valid());
675   ASSERT_EQ(WriteType::kDeleteRecord, iter->Entry().type);
676   ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
677 
678   iter->Next();
679   ASSERT_OK(iter->status());
680   ASSERT_TRUE(!iter->Valid());
681 
682   iter.reset(batch.NewIterator(&two));
683   iter->Seek("twofoo");
684   ASSERT_OK(iter->status());
685   ASSERT_TRUE(iter->Valid());
686   ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
687   ASSERT_EQ("twofoo", iter->Entry().key.ToString());
688   ASSERT_EQ("bar2", iter->Entry().value.ToString());
689 
690   iter->Next();
691   ASSERT_OK(iter->status());
692   ASSERT_TRUE(iter->Valid());
693   ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type);
694   ASSERT_EQ("twofoo", iter->Entry().key.ToString());
695 
696   iter->Next();
697   ASSERT_OK(iter->status());
698   ASSERT_TRUE(!iter->Valid());
699 
700   iter.reset(batch.NewIterator());
701   iter->Seek("gggg");
702   ASSERT_OK(iter->status());
703   ASSERT_TRUE(iter->Valid());
704   ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
705   ASSERT_EQ("omom", iter->Entry().key.ToString());
706   ASSERT_EQ("nom", iter->Entry().value.ToString());
707 
708   iter->Next();
709   ASSERT_OK(iter->status());
710   ASSERT_TRUE(!iter->Valid());
711 
712   iter.reset(batch.NewIterator(&zero));
713   iter->Seek("foo");
714   ASSERT_OK(iter->status());
715   ASSERT_TRUE(iter->Valid());
716   ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
717   ASSERT_EQ("foo", iter->Entry().key.ToString());
718   ASSERT_EQ("bar", iter->Entry().value.ToString());
719 
720   iter->Next();
721   ASSERT_OK(iter->status());
722   ASSERT_TRUE(iter->Valid());
723   ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
724   ASSERT_EQ("foo", iter->Entry().key.ToString());
725   ASSERT_EQ("bar", iter->Entry().value.ToString());
726 
727   iter->Next();
728   ASSERT_OK(iter->status());
729   ASSERT_TRUE(iter->Valid());
730   ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
731   ASSERT_EQ("omom", iter->Entry().key.ToString());
732   ASSERT_EQ("nom", iter->Entry().value.ToString());
733 
734   iter->Next();
735   ASSERT_OK(iter->status());
736   ASSERT_TRUE(!iter->Valid());
737 
738   TestHandler handler;
739   batch.GetWriteBatch()->Iterate(&handler);
740   ASSERT_EQ(
741       "Put(foo, bar)"
742       "PutCF(2, twofoo, bar2)"
743       "PutCF(8, eightfoo, bar8)"
744       "DeleteCF(8, eightfoo)"
745       "SingleDeleteCF(2, twofoo)"
746       "MergeCF(3, threethree, 3three)"
747       "Put(foo, bar)"
748       "Merge(omom, nom)",
749       handler.seen);
750 }
751 #endif  // !ROCKSDB_LITE
752 
TEST_F(WriteBatchTest,SavePointTest)753 TEST_F(WriteBatchTest, SavePointTest) {
754   Status s;
755   WriteBatch batch;
756   batch.SetSavePoint();
757 
758   batch.Put("A", "a");
759   batch.Put("B", "b");
760   batch.SetSavePoint();
761 
762   batch.Put("C", "c");
763   batch.Delete("A");
764   batch.SetSavePoint();
765   batch.SetSavePoint();
766 
767   ASSERT_OK(batch.RollbackToSavePoint());
768   ASSERT_EQ(
769       "Delete(A)@3"
770       "Put(A, a)@0"
771       "Put(B, b)@1"
772       "Put(C, c)@2",
773       PrintContents(&batch));
774 
775   ASSERT_OK(batch.RollbackToSavePoint());
776   ASSERT_OK(batch.RollbackToSavePoint());
777   ASSERT_EQ(
778       "Put(A, a)@0"
779       "Put(B, b)@1",
780       PrintContents(&batch));
781 
782   batch.Delete("A");
783   batch.Put("B", "bb");
784 
785   ASSERT_OK(batch.RollbackToSavePoint());
786   ASSERT_EQ("", PrintContents(&batch));
787 
788   s = batch.RollbackToSavePoint();
789   ASSERT_TRUE(s.IsNotFound());
790   ASSERT_EQ("", PrintContents(&batch));
791 
792   batch.Put("D", "d");
793   batch.Delete("A");
794 
795   batch.SetSavePoint();
796 
797   batch.Put("A", "aaa");
798 
799   ASSERT_OK(batch.RollbackToSavePoint());
800   ASSERT_EQ(
801       "Delete(A)@1"
802       "Put(D, d)@0",
803       PrintContents(&batch));
804 
805   batch.SetSavePoint();
806 
807   batch.Put("D", "d");
808   batch.Delete("A");
809 
810   ASSERT_OK(batch.RollbackToSavePoint());
811   ASSERT_EQ(
812       "Delete(A)@1"
813       "Put(D, d)@0",
814       PrintContents(&batch));
815 
816   s = batch.RollbackToSavePoint();
817   ASSERT_TRUE(s.IsNotFound());
818   ASSERT_EQ(
819       "Delete(A)@1"
820       "Put(D, d)@0",
821       PrintContents(&batch));
822 
823   WriteBatch batch2;
824 
825   s = batch2.RollbackToSavePoint();
826   ASSERT_TRUE(s.IsNotFound());
827   ASSERT_EQ("", PrintContents(&batch2));
828 
829   batch2.Delete("A");
830   batch2.SetSavePoint();
831 
832   s = batch2.RollbackToSavePoint();
833   ASSERT_OK(s);
834   ASSERT_EQ("Delete(A)@0", PrintContents(&batch2));
835 
836   batch2.Clear();
837   ASSERT_EQ("", PrintContents(&batch2));
838 
839   batch2.SetSavePoint();
840 
841   batch2.Delete("B");
842   ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
843 
844   batch2.SetSavePoint();
845   s = batch2.RollbackToSavePoint();
846   ASSERT_OK(s);
847   ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
848 
849   s = batch2.RollbackToSavePoint();
850   ASSERT_OK(s);
851   ASSERT_EQ("", PrintContents(&batch2));
852 
853   s = batch2.RollbackToSavePoint();
854   ASSERT_TRUE(s.IsNotFound());
855   ASSERT_EQ("", PrintContents(&batch2));
856 
857   WriteBatch batch3;
858 
859   s = batch3.PopSavePoint();
860   ASSERT_TRUE(s.IsNotFound());
861   ASSERT_EQ("", PrintContents(&batch3));
862 
863   batch3.SetSavePoint();
864   batch3.Delete("A");
865 
866   s = batch3.PopSavePoint();
867   ASSERT_OK(s);
868   ASSERT_EQ("Delete(A)@0", PrintContents(&batch3));
869 }
870 
TEST_F(WriteBatchTest,MemoryLimitTest)871 TEST_F(WriteBatchTest, MemoryLimitTest) {
872   Status s;
873   // The header size is 12 bytes. The two Puts take 8 bytes which gives total
874   // of 12 + 8 * 2 = 28 bytes.
875   WriteBatch batch(0, 28);
876 
877   ASSERT_OK(batch.Put("a", "...."));
878   ASSERT_OK(batch.Put("b", "...."));
879   s = batch.Put("c", "....");
880   ASSERT_TRUE(s.IsMemoryLimit());
881 }
882 
883 }  // namespace ROCKSDB_NAMESPACE
884 
main(int argc,char ** argv)885 int main(int argc, char** argv) {
886   ::testing::InitGoogleTest(&argc, argv);
887   return RUN_ALL_TESTS();
888 }
889