1 // Copyright (c) 2017-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <iostream>
7 #include "db/db_impl/db_impl.h"
8 #include "rocksdb/db.h"
9 #include "rocksdb/merge_operator.h"
10 #include "rocksdb/utilities/db_ttl.h"
11 #include "test_util/testharness.h"
12 #include "util/random.h"
13 #include "utilities/cassandra/cassandra_compaction_filter.h"
14 #include "utilities/cassandra/merge_operator.h"
15 #include "utilities/cassandra/test_utils.h"
16 #include "utilities/merge_operators.h"
17 
18 using namespace rocksdb;
19 
20 namespace rocksdb {
21 namespace cassandra {
22 
23 // Path to the database on file system
24 const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
25 
26 class CassandraStore {
27  public:
CassandraStore(std::shared_ptr<DB> db)28   explicit CassandraStore(std::shared_ptr<DB> db)
29       : db_(db), write_option_(), get_option_() {
30     assert(db);
31   }
32 
Append(const std::string & key,const RowValue & val)33   bool Append(const std::string& key, const RowValue& val){
34     std::string result;
35     val.Serialize(&result);
36     Slice valSlice(result.data(), result.size());
37     auto s = db_->Merge(write_option_, key, valSlice);
38 
39     if (s.ok()) {
40       return true;
41     } else {
42       std::cerr << "ERROR " << s.ToString() << std::endl;
43       return false;
44     }
45   }
46 
Put(const std::string & key,const RowValue & val)47   bool Put(const std::string& key, const RowValue& val) {
48     std::string result;
49     val.Serialize(&result);
50     Slice valSlice(result.data(), result.size());
51     auto s = db_->Put(write_option_, key, valSlice);
52     if (s.ok()) {
53       return true;
54     } else {
55       std::cerr << "ERROR " << s.ToString() << std::endl;
56       return false;
57     }
58   }
59 
Flush()60   void Flush() {
61     dbfull()->TEST_FlushMemTable();
62     dbfull()->TEST_WaitForCompact();
63   }
64 
Compact()65   void Compact() {
66     dbfull()->TEST_CompactRange(
67       0, nullptr, nullptr, db_->DefaultColumnFamily());
68   }
69 
Get(const std::string & key)70   std::tuple<bool, RowValue> Get(const std::string& key){
71     std::string result;
72     auto s = db_->Get(get_option_, key, &result);
73 
74     if (s.ok()) {
75       return std::make_tuple(true,
76                              RowValue::Deserialize(result.data(),
77                                                    result.size()));
78     }
79 
80     if (!s.IsNotFound()) {
81       std::cerr << "ERROR " << s.ToString() << std::endl;
82     }
83 
84     return std::make_tuple(false, RowValue(0, 0));
85   }
86 
87  private:
88   std::shared_ptr<DB> db_;
89   WriteOptions write_option_;
90   ReadOptions get_option_;
91 
dbfull()92   DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
93 };
94 
95 class TestCompactionFilterFactory : public CompactionFilterFactory {
96 public:
TestCompactionFilterFactory(bool purge_ttl_on_expiration,int32_t gc_grace_period_in_seconds)97  explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
98                                       int32_t gc_grace_period_in_seconds)
99      : purge_ttl_on_expiration_(purge_ttl_on_expiration),
100        gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
101 
CreateCompactionFilter(const CompactionFilter::Context &)102  std::unique_ptr<CompactionFilter> CreateCompactionFilter(
103      const CompactionFilter::Context& /*context*/) override {
104    return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
105        purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
106  }
107 
Name() const108  const char* Name() const override { return "TestCompactionFilterFactory"; }
109 
110 private:
111   bool purge_ttl_on_expiration_;
112   int32_t gc_grace_period_in_seconds_;
113 };
114 
115 
116 // The class for unit-testing
117 class CassandraFunctionalTest : public testing::Test {
118 public:
CassandraFunctionalTest()119   CassandraFunctionalTest() {
120     DestroyDB(kDbName, Options());    // Start each test with a fresh DB
121   }
122 
OpenDb()123   std::shared_ptr<DB> OpenDb() {
124     DB* db;
125     Options options;
126     options.create_if_missing = true;
127     options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
128     auto* cf_factory = new TestCompactionFilterFactory(
129         purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
130     options.compaction_filter_factory.reset(cf_factory);
131     EXPECT_OK(DB::Open(options, kDbName, &db));
132     return std::shared_ptr<DB>(db);
133   }
134 
135   bool purge_ttl_on_expiration_ = false;
136   int32_t gc_grace_period_in_seconds_ = 100;
137 };
138 
139 // THE TEST CASES BEGIN HERE
140 
TEST_F(CassandraFunctionalTest,SimpleMergeTest)141 TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
142   CassandraStore store(OpenDb());
143   int64_t now = time(nullptr);
144 
145   store.Append("k1", CreateTestRowValue({
146     CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
147     CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
148     CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
149   }));
150   store.Append("k1",CreateTestRowValue({
151     CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
152     CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
153     CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
154     CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
155   }));
156   store.Append("k1", CreateTestRowValue({
157     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
158     CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
159     CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
160     CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
161   }));
162 
163   auto ret = store.Get("k1");
164 
165   ASSERT_TRUE(std::get<0>(ret));
166   RowValue& merged = std::get<1>(ret);
167   EXPECT_EQ(merged.columns_.size(), 5);
168   VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
169   VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
170   VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
171   VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
172   VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
173 }
174 
TEST_F(CassandraFunctionalTest,CompactionShouldConvertExpiredColumnsToTombstone)175 TEST_F(CassandraFunctionalTest,
176        CompactionShouldConvertExpiredColumnsToTombstone) {
177   CassandraStore store(OpenDb());
178   int64_t now= time(nullptr);
179 
180   store.Append("k1", CreateTestRowValue({
181     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
182     CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
183     CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
184   }));
185 
186   store.Flush();
187 
188   store.Append("k1",CreateTestRowValue({
189     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
190     CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
191   }));
192 
193   store.Flush();
194   store.Compact();
195 
196   auto ret = store.Get("k1");
197   ASSERT_TRUE(std::get<0>(ret));
198   RowValue& merged = std::get<1>(ret);
199   EXPECT_EQ(merged.columns_.size(), 4);
200   VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
201   VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
202   VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
203   VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
204 }
205 
206 
TEST_F(CassandraFunctionalTest,CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn)207 TEST_F(CassandraFunctionalTest,
208        CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
209   purge_ttl_on_expiration_ = true;
210   CassandraStore store(OpenDb());
211   int64_t now = time(nullptr);
212 
213   store.Append("k1", CreateTestRowValue({
214     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
215     CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
216     CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
217   }));
218 
219   store.Flush();
220 
221   store.Append("k1",CreateTestRowValue({
222     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
223     CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
224   }));
225 
226   store.Flush();
227   store.Compact();
228 
229   auto ret = store.Get("k1");
230   ASSERT_TRUE(std::get<0>(ret));
231   RowValue& merged = std::get<1>(ret);
232   EXPECT_EQ(merged.columns_.size(), 3);
233   VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
234   VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
235   VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
236 }
237 
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn)238 TEST_F(CassandraFunctionalTest,
239        CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
240   purge_ttl_on_expiration_ = true;
241   CassandraStore store(OpenDb());
242   int64_t now = time(nullptr);
243 
244   store.Append("k1", CreateTestRowValue({
245     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
246     CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
247   }));
248 
249   store.Flush();
250 
251   store.Append("k1",CreateTestRowValue({
252     CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
253   }));
254 
255   store.Flush();
256   store.Compact();
257   ASSERT_FALSE(std::get<0>(store.Get("k1")));
258 }
259 
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveTombstoneExceedingGCGracePeriod)260 TEST_F(CassandraFunctionalTest,
261        CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
262   purge_ttl_on_expiration_ = true;
263   CassandraStore store(OpenDb());
264   int64_t now = time(nullptr);
265 
266   store.Append("k1", CreateTestRowValue({
267     CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
268     CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
269   }));
270 
271   store.Append("k2", CreateTestRowValue({
272     CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
273   }));
274 
275   store.Flush();
276 
277   store.Append("k1",CreateTestRowValue({
278     CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
279   }));
280 
281   store.Flush();
282   store.Compact();
283 
284   auto ret = store.Get("k1");
285   ASSERT_TRUE(std::get<0>(ret));
286   RowValue& gced = std::get<1>(ret);
287   EXPECT_EQ(gced.columns_.size(), 1);
288   VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
289 }
290 
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveTombstoneFromPut)291 TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
292   purge_ttl_on_expiration_ = true;
293   CassandraStore store(OpenDb());
294   int64_t now = time(nullptr);
295 
296   store.Put("k1", CreateTestRowValue({
297     CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
298   }));
299 
300   store.Flush();
301   store.Compact();
302   ASSERT_FALSE(std::get<0>(store.Get("k1")));
303 }
304 
305 } // namespace cassandra
306 } // namespace rocksdb
307 
main(int argc,char ** argv)308 int main(int argc, char** argv) {
309   ::testing::InitGoogleTest(&argc, argv);
310   return RUN_ALL_TESTS();
311 }
312