1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #ifndef ROCKSDB_LITE
6
7 #include "utilities/ttl/db_ttl_impl.h"
8
9 #include "db/write_batch_internal.h"
10 #include "file/filename.h"
11 #include "rocksdb/convenience.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/iterator.h"
14 #include "rocksdb/system_clock.h"
15 #include "rocksdb/utilities/db_ttl.h"
16 #include "util/coding.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
SanitizeOptions(int32_t ttl,ColumnFamilyOptions * options,SystemClock * clock)20 void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
21 SystemClock* clock) {
22 if (options->compaction_filter) {
23 options->compaction_filter =
24 new TtlCompactionFilter(ttl, clock, options->compaction_filter);
25 } else {
26 options->compaction_filter_factory =
27 std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
28 ttl, clock, options->compaction_filter_factory));
29 }
30
31 if (options->merge_operator) {
32 options->merge_operator.reset(
33 new TtlMergeOperator(options->merge_operator, clock));
34 }
35 }
36
37 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
DBWithTTLImpl(DB * db)38 DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
39
~DBWithTTLImpl()40 DBWithTTLImpl::~DBWithTTLImpl() {
41 if (!closed_) {
42 Close().PermitUncheckedError();
43 }
44 }
45
Close()46 Status DBWithTTLImpl::Close() {
47 Status ret = Status::OK();
48 if (!closed_) {
49 Options default_options = GetOptions();
50 // Need to stop background compaction before getting rid of the filter
51 CancelAllBackgroundWork(db_, /* wait = */ true);
52 ret = db_->Close();
53 delete default_options.compaction_filter;
54 closed_ = true;
55 }
56 return ret;
57 }
58
OpenTtlDB(const Options & options,const std::string & dbname,StackableDB ** dbptr,int32_t ttl,bool read_only)59 Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
60 StackableDB** dbptr, int32_t ttl, bool read_only) {
61 DBWithTTL* db;
62 Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
63 if (s.ok()) {
64 *dbptr = db;
65 } else {
66 *dbptr = nullptr;
67 }
68 return s;
69 }
70
Open(const Options & options,const std::string & dbname,DBWithTTL ** dbptr,int32_t ttl,bool read_only)71 Status DBWithTTL::Open(const Options& options, const std::string& dbname,
72 DBWithTTL** dbptr, int32_t ttl, bool read_only) {
73
74 DBOptions db_options(options);
75 ColumnFamilyOptions cf_options(options);
76 std::vector<ColumnFamilyDescriptor> column_families;
77 column_families.push_back(
78 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
79 std::vector<ColumnFamilyHandle*> handles;
80 Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
81 dbptr, {ttl}, read_only);
82 if (s.ok()) {
83 assert(handles.size() == 1);
84 // i can delete the handle since DBImpl is always holding a reference to
85 // default column family
86 delete handles[0];
87 }
88 return s;
89 }
90
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DBWithTTL ** dbptr,const std::vector<int32_t> & ttls,bool read_only)91 Status DBWithTTL::Open(
92 const DBOptions& db_options, const std::string& dbname,
93 const std::vector<ColumnFamilyDescriptor>& column_families,
94 std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
95 const std::vector<int32_t>& ttls, bool read_only) {
96 if (ttls.size() != column_families.size()) {
97 return Status::InvalidArgument(
98 "ttls size has to be the same as number of column families");
99 }
100
101 SystemClock* clock = (db_options.env == nullptr)
102 ? SystemClock::Default().get()
103 : db_options.env->GetSystemClock().get();
104
105 std::vector<ColumnFamilyDescriptor> column_families_sanitized =
106 column_families;
107 for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
108 DBWithTTLImpl::SanitizeOptions(
109 ttls[i], &column_families_sanitized[i].options, clock);
110 }
111 DB* db;
112
113 Status st;
114 if (read_only) {
115 st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
116 handles, &db);
117 } else {
118 st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
119 }
120 if (st.ok()) {
121 *dbptr = new DBWithTTLImpl(db);
122 } else {
123 *dbptr = nullptr;
124 }
125 return st;
126 }
127
CreateColumnFamilyWithTtl(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle,int ttl)128 Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
129 const ColumnFamilyOptions& options, const std::string& column_family_name,
130 ColumnFamilyHandle** handle, int ttl) {
131 ColumnFamilyOptions sanitized_options = options;
132 DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
133 GetEnv()->GetSystemClock().get());
134
135 return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
136 handle);
137 }
138
CreateColumnFamily(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle)139 Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
140 const std::string& column_family_name,
141 ColumnFamilyHandle** handle) {
142 return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
143 }
144
145 // Appends the current timestamp to the string.
146 // Returns false if could not get the current_time, true if append succeeds
AppendTS(const Slice & val,std::string * val_with_ts,SystemClock * clock)147 Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
148 SystemClock* clock) {
149 val_with_ts->reserve(kTSLength + val.size());
150 char ts_string[kTSLength];
151 int64_t curtime;
152 Status st = clock->GetCurrentTime(&curtime);
153 if (!st.ok()) {
154 return st;
155 }
156 EncodeFixed32(ts_string, (int32_t)curtime);
157 val_with_ts->append(val.data(), val.size());
158 val_with_ts->append(ts_string, kTSLength);
159 return st;
160 }
161
162 // Returns corruption if the length of the string is lesser than timestamp, or
163 // timestamp refers to a time lesser than ttl-feature release time
SanityCheckTimestamp(const Slice & str)164 Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
165 if (str.size() < kTSLength) {
166 return Status::Corruption("Error: value's length less than timestamp's\n");
167 }
168 // Checks that TS is not lesser than kMinTimestamp
169 // Gaurds against corruption & normal database opened incorrectly in ttl mode
170 int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
171 if (timestamp_value < kMinTimestamp) {
172 return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
173 }
174 return Status::OK();
175 }
176
177 // Checks if the string is stale or not according to TTl provided
IsStale(const Slice & value,int32_t ttl,SystemClock * clock)178 bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
179 SystemClock* clock) {
180 if (ttl <= 0) { // Data is fresh if TTL is non-positive
181 return false;
182 }
183 int64_t curtime;
184 if (!clock->GetCurrentTime(&curtime).ok()) {
185 return false; // Treat the data as fresh if could not get current time
186 }
187 int32_t timestamp_value =
188 DecodeFixed32(value.data() + value.size() - kTSLength);
189 return (timestamp_value + ttl) < curtime;
190 }
191
192 // Strips the TS from the end of the slice
StripTS(PinnableSlice * pinnable_val)193 Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
194 if (pinnable_val->size() < kTSLength) {
195 return Status::Corruption("Bad timestamp in key-value");
196 }
197 // Erasing characters which hold the TS
198 pinnable_val->remove_suffix(kTSLength);
199 return Status::OK();
200 }
201
202 // Strips the TS from the end of the string
StripTS(std::string * str)203 Status DBWithTTLImpl::StripTS(std::string* str) {
204 if (str->length() < kTSLength) {
205 return Status::Corruption("Bad timestamp in key-value");
206 }
207 // Erasing characters which hold the TS
208 str->erase(str->length() - kTSLength, kTSLength);
209 return Status::OK();
210 }
211
Put(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)212 Status DBWithTTLImpl::Put(const WriteOptions& options,
213 ColumnFamilyHandle* column_family, const Slice& key,
214 const Slice& val) {
215 WriteBatch batch;
216 Status st = batch.Put(column_family, key, val);
217 if (st.ok()) {
218 st = Write(options, &batch);
219 }
220 return st;
221 }
222
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)223 Status DBWithTTLImpl::Get(const ReadOptions& options,
224 ColumnFamilyHandle* column_family, const Slice& key,
225 PinnableSlice* value) {
226 Status st = db_->Get(options, column_family, key, value);
227 if (!st.ok()) {
228 return st;
229 }
230 st = SanityCheckTimestamp(*value);
231 if (!st.ok()) {
232 return st;
233 }
234 return StripTS(value);
235 }
236
MultiGet(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)237 std::vector<Status> DBWithTTLImpl::MultiGet(
238 const ReadOptions& options,
239 const std::vector<ColumnFamilyHandle*>& column_family,
240 const std::vector<Slice>& keys, std::vector<std::string>* values) {
241 auto statuses = db_->MultiGet(options, column_family, keys, values);
242 for (size_t i = 0; i < keys.size(); ++i) {
243 if (!statuses[i].ok()) {
244 continue;
245 }
246 statuses[i] = SanityCheckTimestamp((*values)[i]);
247 if (!statuses[i].ok()) {
248 continue;
249 }
250 statuses[i] = StripTS(&(*values)[i]);
251 }
252 return statuses;
253 }
254
KeyMayExist(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool * value_found)255 bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
256 ColumnFamilyHandle* column_family,
257 const Slice& key, std::string* value,
258 bool* value_found) {
259 bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
260 if (ret && value != nullptr && value_found != nullptr && *value_found) {
261 if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
262 return false;
263 }
264 }
265 return ret;
266 }
267
Merge(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)268 Status DBWithTTLImpl::Merge(const WriteOptions& options,
269 ColumnFamilyHandle* column_family, const Slice& key,
270 const Slice& value) {
271 WriteBatch batch;
272 Status st = batch.Merge(column_family, key, value);
273 if (st.ok()) {
274 st = Write(options, &batch);
275 }
276 return st;
277 }
278
Write(const WriteOptions & opts,WriteBatch * updates)279 Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
280 class Handler : public WriteBatch::Handler {
281 public:
282 explicit Handler(SystemClock* clock) : clock_(clock) {}
283 WriteBatch updates_ttl;
284 Status PutCF(uint32_t column_family_id, const Slice& key,
285 const Slice& value) override {
286 std::string value_with_ts;
287 Status st = AppendTS(value, &value_with_ts, clock_);
288 if (!st.ok()) {
289 return st;
290 }
291 return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
292 value_with_ts);
293 }
294 Status MergeCF(uint32_t column_family_id, const Slice& key,
295 const Slice& value) override {
296 std::string value_with_ts;
297 Status st = AppendTS(value, &value_with_ts, clock_);
298 if (!st.ok()) {
299 return st;
300 }
301 return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
302 value_with_ts);
303 }
304 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
305 return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
306 }
307 void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
308
309 private:
310 SystemClock* clock_;
311 };
312 Handler handler(GetEnv()->GetSystemClock().get());
313 Status st = updates->Iterate(&handler);
314 if (!st.ok()) {
315 return st;
316 } else {
317 return db_->Write(opts, &(handler.updates_ttl));
318 }
319 }
320
NewIterator(const ReadOptions & opts,ColumnFamilyHandle * column_family)321 Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
322 ColumnFamilyHandle* column_family) {
323 return new TtlIterator(db_->NewIterator(opts, column_family));
324 }
325
SetTtl(ColumnFamilyHandle * h,int32_t ttl)326 void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
327 std::shared_ptr<TtlCompactionFilterFactory> filter;
328 Options opts;
329 opts = GetOptions(h);
330 filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
331 opts.compaction_filter_factory);
332 if (!filter)
333 return;
334 filter->SetTtl(ttl);
335 }
336
337 } // namespace ROCKSDB_NAMESPACE
338 #endif // ROCKSDB_LITE
339