1 // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <iostream>
7 #include "db/db_impl/db_impl.h"
8 #include "rocksdb/db.h"
9 #include "rocksdb/merge_operator.h"
10 #include "rocksdb/utilities/db_ttl.h"
11 #include "test_util/testharness.h"
12 #include "util/random.h"
13 #include "utilities/cassandra/cassandra_compaction_filter.h"
14 #include "utilities/cassandra/merge_operator.h"
15 #include "utilities/cassandra/test_utils.h"
16 #include "utilities/merge_operators.h"
17
18 using namespace rocksdb;
19
20 namespace rocksdb {
21 namespace cassandra {
22
23 // Path to the database on file system
24 const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
25
26 class CassandraStore {
27 public:
CassandraStore(std::shared_ptr<DB> db)28 explicit CassandraStore(std::shared_ptr<DB> db)
29 : db_(db), write_option_(), get_option_() {
30 assert(db);
31 }
32
Append(const std::string & key,const RowValue & val)33 bool Append(const std::string& key, const RowValue& val){
34 std::string result;
35 val.Serialize(&result);
36 Slice valSlice(result.data(), result.size());
37 auto s = db_->Merge(write_option_, key, valSlice);
38
39 if (s.ok()) {
40 return true;
41 } else {
42 std::cerr << "ERROR " << s.ToString() << std::endl;
43 return false;
44 }
45 }
46
Put(const std::string & key,const RowValue & val)47 bool Put(const std::string& key, const RowValue& val) {
48 std::string result;
49 val.Serialize(&result);
50 Slice valSlice(result.data(), result.size());
51 auto s = db_->Put(write_option_, key, valSlice);
52 if (s.ok()) {
53 return true;
54 } else {
55 std::cerr << "ERROR " << s.ToString() << std::endl;
56 return false;
57 }
58 }
59
Flush()60 void Flush() {
61 dbfull()->TEST_FlushMemTable();
62 dbfull()->TEST_WaitForCompact();
63 }
64
Compact()65 void Compact() {
66 dbfull()->TEST_CompactRange(
67 0, nullptr, nullptr, db_->DefaultColumnFamily());
68 }
69
Get(const std::string & key)70 std::tuple<bool, RowValue> Get(const std::string& key){
71 std::string result;
72 auto s = db_->Get(get_option_, key, &result);
73
74 if (s.ok()) {
75 return std::make_tuple(true,
76 RowValue::Deserialize(result.data(),
77 result.size()));
78 }
79
80 if (!s.IsNotFound()) {
81 std::cerr << "ERROR " << s.ToString() << std::endl;
82 }
83
84 return std::make_tuple(false, RowValue(0, 0));
85 }
86
87 private:
88 std::shared_ptr<DB> db_;
89 WriteOptions write_option_;
90 ReadOptions get_option_;
91
dbfull()92 DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
93 };
94
95 class TestCompactionFilterFactory : public CompactionFilterFactory {
96 public:
TestCompactionFilterFactory(bool purge_ttl_on_expiration,int32_t gc_grace_period_in_seconds)97 explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
98 int32_t gc_grace_period_in_seconds)
99 : purge_ttl_on_expiration_(purge_ttl_on_expiration),
100 gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
101
CreateCompactionFilter(const CompactionFilter::Context &)102 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
103 const CompactionFilter::Context& /*context*/) override {
104 return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
105 purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
106 }
107
Name() const108 const char* Name() const override { return "TestCompactionFilterFactory"; }
109
110 private:
111 bool purge_ttl_on_expiration_;
112 int32_t gc_grace_period_in_seconds_;
113 };
114
115
116 // The class for unit-testing
117 class CassandraFunctionalTest : public testing::Test {
118 public:
CassandraFunctionalTest()119 CassandraFunctionalTest() {
120 DestroyDB(kDbName, Options()); // Start each test with a fresh DB
121 }
122
OpenDb()123 std::shared_ptr<DB> OpenDb() {
124 DB* db;
125 Options options;
126 options.create_if_missing = true;
127 options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
128 auto* cf_factory = new TestCompactionFilterFactory(
129 purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
130 options.compaction_filter_factory.reset(cf_factory);
131 EXPECT_OK(DB::Open(options, kDbName, &db));
132 return std::shared_ptr<DB>(db);
133 }
134
135 bool purge_ttl_on_expiration_ = false;
136 int32_t gc_grace_period_in_seconds_ = 100;
137 };
138
139 // THE TEST CASES BEGIN HERE
140
TEST_F(CassandraFunctionalTest,SimpleMergeTest)141 TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
142 CassandraStore store(OpenDb());
143 int64_t now = time(nullptr);
144
145 store.Append("k1", CreateTestRowValue({
146 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
147 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
148 CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
149 }));
150 store.Append("k1",CreateTestRowValue({
151 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
152 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
153 CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
154 CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
155 }));
156 store.Append("k1", CreateTestRowValue({
157 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
158 CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
159 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
160 CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
161 }));
162
163 auto ret = store.Get("k1");
164
165 ASSERT_TRUE(std::get<0>(ret));
166 RowValue& merged = std::get<1>(ret);
167 EXPECT_EQ(merged.columns_.size(), 5);
168 VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
169 VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
170 VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
171 VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
172 VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
173 }
174
TEST_F(CassandraFunctionalTest,CompactionShouldConvertExpiredColumnsToTombstone)175 TEST_F(CassandraFunctionalTest,
176 CompactionShouldConvertExpiredColumnsToTombstone) {
177 CassandraStore store(OpenDb());
178 int64_t now= time(nullptr);
179
180 store.Append("k1", CreateTestRowValue({
181 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
182 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
183 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
184 }));
185
186 store.Flush();
187
188 store.Append("k1",CreateTestRowValue({
189 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
190 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
191 }));
192
193 store.Flush();
194 store.Compact();
195
196 auto ret = store.Get("k1");
197 ASSERT_TRUE(std::get<0>(ret));
198 RowValue& merged = std::get<1>(ret);
199 EXPECT_EQ(merged.columns_.size(), 4);
200 VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
201 VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
202 VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
203 VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
204 }
205
206
TEST_F(CassandraFunctionalTest,CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn)207 TEST_F(CassandraFunctionalTest,
208 CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
209 purge_ttl_on_expiration_ = true;
210 CassandraStore store(OpenDb());
211 int64_t now = time(nullptr);
212
213 store.Append("k1", CreateTestRowValue({
214 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
215 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
216 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
217 }));
218
219 store.Flush();
220
221 store.Append("k1",CreateTestRowValue({
222 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
223 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
224 }));
225
226 store.Flush();
227 store.Compact();
228
229 auto ret = store.Get("k1");
230 ASSERT_TRUE(std::get<0>(ret));
231 RowValue& merged = std::get<1>(ret);
232 EXPECT_EQ(merged.columns_.size(), 3);
233 VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
234 VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
235 VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
236 }
237
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn)238 TEST_F(CassandraFunctionalTest,
239 CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
240 purge_ttl_on_expiration_ = true;
241 CassandraStore store(OpenDb());
242 int64_t now = time(nullptr);
243
244 store.Append("k1", CreateTestRowValue({
245 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
246 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
247 }));
248
249 store.Flush();
250
251 store.Append("k1",CreateTestRowValue({
252 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
253 }));
254
255 store.Flush();
256 store.Compact();
257 ASSERT_FALSE(std::get<0>(store.Get("k1")));
258 }
259
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveTombstoneExceedingGCGracePeriod)260 TEST_F(CassandraFunctionalTest,
261 CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
262 purge_ttl_on_expiration_ = true;
263 CassandraStore store(OpenDb());
264 int64_t now = time(nullptr);
265
266 store.Append("k1", CreateTestRowValue({
267 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
268 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
269 }));
270
271 store.Append("k2", CreateTestRowValue({
272 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
273 }));
274
275 store.Flush();
276
277 store.Append("k1",CreateTestRowValue({
278 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
279 }));
280
281 store.Flush();
282 store.Compact();
283
284 auto ret = store.Get("k1");
285 ASSERT_TRUE(std::get<0>(ret));
286 RowValue& gced = std::get<1>(ret);
287 EXPECT_EQ(gced.columns_.size(), 1);
288 VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
289 }
290
TEST_F(CassandraFunctionalTest,CompactionShouldRemoveTombstoneFromPut)291 TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
292 purge_ttl_on_expiration_ = true;
293 CassandraStore store(OpenDb());
294 int64_t now = time(nullptr);
295
296 store.Put("k1", CreateTestRowValue({
297 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
298 }));
299
300 store.Flush();
301 store.Compact();
302 ASSERT_FALSE(std::get<0>(store.Get("k1")));
303 }
304
305 } // namespace cassandra
306 } // namespace rocksdb
307
main(int argc,char ** argv)308 int main(int argc, char** argv) {
309 ::testing::InitGoogleTest(&argc, argv);
310 return RUN_ALL_TESTS();
311 }
312