1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #ifndef ROCKSDB_LITE
6 
7 #include "utilities/ttl/db_ttl_impl.h"
8 
9 #include "db/write_batch_internal.h"
10 #include "file/filename.h"
11 #include "rocksdb/convenience.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/iterator.h"
14 #include "rocksdb/system_clock.h"
15 #include "rocksdb/utilities/db_ttl.h"
16 #include "util/coding.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
SanitizeOptions(int32_t ttl,ColumnFamilyOptions * options,SystemClock * clock)20 void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
21                                     SystemClock* clock) {
22   if (options->compaction_filter) {
23     options->compaction_filter =
24         new TtlCompactionFilter(ttl, clock, options->compaction_filter);
25   } else {
26     options->compaction_filter_factory =
27         std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
28             ttl, clock, options->compaction_filter_factory));
29   }
30 
31   if (options->merge_operator) {
32     options->merge_operator.reset(
33         new TtlMergeOperator(options->merge_operator, clock));
34   }
35 }
36 
37 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
DBWithTTLImpl(DB * db)38 DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
39 
~DBWithTTLImpl()40 DBWithTTLImpl::~DBWithTTLImpl() {
41   if (!closed_) {
42     Close().PermitUncheckedError();
43   }
44 }
45 
Close()46 Status DBWithTTLImpl::Close() {
47   Status ret = Status::OK();
48   if (!closed_) {
49     Options default_options = GetOptions();
50     // Need to stop background compaction before getting rid of the filter
51     CancelAllBackgroundWork(db_, /* wait = */ true);
52     ret = db_->Close();
53     delete default_options.compaction_filter;
54     closed_ = true;
55   }
56   return ret;
57 }
58 
OpenTtlDB(const Options & options,const std::string & dbname,StackableDB ** dbptr,int32_t ttl,bool read_only)59 Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
60                             StackableDB** dbptr, int32_t ttl, bool read_only) {
61   DBWithTTL* db;
62   Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
63   if (s.ok()) {
64     *dbptr = db;
65   } else {
66     *dbptr = nullptr;
67   }
68   return s;
69 }
70 
Open(const Options & options,const std::string & dbname,DBWithTTL ** dbptr,int32_t ttl,bool read_only)71 Status DBWithTTL::Open(const Options& options, const std::string& dbname,
72                        DBWithTTL** dbptr, int32_t ttl, bool read_only) {
73 
74   DBOptions db_options(options);
75   ColumnFamilyOptions cf_options(options);
76   std::vector<ColumnFamilyDescriptor> column_families;
77   column_families.push_back(
78       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
79   std::vector<ColumnFamilyHandle*> handles;
80   Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
81                              dbptr, {ttl}, read_only);
82   if (s.ok()) {
83     assert(handles.size() == 1);
84     // i can delete the handle since DBImpl is always holding a reference to
85     // default column family
86     delete handles[0];
87   }
88   return s;
89 }
90 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DBWithTTL ** dbptr,const std::vector<int32_t> & ttls,bool read_only)91 Status DBWithTTL::Open(
92     const DBOptions& db_options, const std::string& dbname,
93     const std::vector<ColumnFamilyDescriptor>& column_families,
94     std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
95     const std::vector<int32_t>& ttls, bool read_only) {
96   if (ttls.size() != column_families.size()) {
97     return Status::InvalidArgument(
98         "ttls size has to be the same as number of column families");
99   }
100 
101   SystemClock* clock = (db_options.env == nullptr)
102                            ? SystemClock::Default().get()
103                            : db_options.env->GetSystemClock().get();
104 
105   std::vector<ColumnFamilyDescriptor> column_families_sanitized =
106       column_families;
107   for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
108     DBWithTTLImpl::SanitizeOptions(
109         ttls[i], &column_families_sanitized[i].options, clock);
110   }
111   DB* db;
112 
113   Status st;
114   if (read_only) {
115     st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
116                              handles, &db);
117   } else {
118     st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
119   }
120   if (st.ok()) {
121     *dbptr = new DBWithTTLImpl(db);
122   } else {
123     *dbptr = nullptr;
124   }
125   return st;
126 }
127 
CreateColumnFamilyWithTtl(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle,int ttl)128 Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
129     const ColumnFamilyOptions& options, const std::string& column_family_name,
130     ColumnFamilyHandle** handle, int ttl) {
131   ColumnFamilyOptions sanitized_options = options;
132   DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
133                                  GetEnv()->GetSystemClock().get());
134 
135   return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
136                                        handle);
137 }
138 
CreateColumnFamily(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle)139 Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
140                                          const std::string& column_family_name,
141                                          ColumnFamilyHandle** handle) {
142   return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
143 }
144 
145 // Appends the current timestamp to the string.
146 // Returns false if could not get the current_time, true if append succeeds
AppendTS(const Slice & val,std::string * val_with_ts,SystemClock * clock)147 Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
148                                SystemClock* clock) {
149   val_with_ts->reserve(kTSLength + val.size());
150   char ts_string[kTSLength];
151   int64_t curtime;
152   Status st = clock->GetCurrentTime(&curtime);
153   if (!st.ok()) {
154     return st;
155   }
156   EncodeFixed32(ts_string, (int32_t)curtime);
157   val_with_ts->append(val.data(), val.size());
158   val_with_ts->append(ts_string, kTSLength);
159   return st;
160 }
161 
162 // Returns corruption if the length of the string is lesser than timestamp, or
163 // timestamp refers to a time lesser than ttl-feature release time
SanityCheckTimestamp(const Slice & str)164 Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
165   if (str.size() < kTSLength) {
166     return Status::Corruption("Error: value's length less than timestamp's\n");
167   }
168   // Checks that TS is not lesser than kMinTimestamp
169   // Gaurds against corruption & normal database opened incorrectly in ttl mode
170   int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
171   if (timestamp_value < kMinTimestamp) {
172     return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
173   }
174   return Status::OK();
175 }
176 
177 // Checks if the string is stale or not according to TTl provided
IsStale(const Slice & value,int32_t ttl,SystemClock * clock)178 bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
179                             SystemClock* clock) {
180   if (ttl <= 0) {  // Data is fresh if TTL is non-positive
181     return false;
182   }
183   int64_t curtime;
184   if (!clock->GetCurrentTime(&curtime).ok()) {
185     return false;  // Treat the data as fresh if could not get current time
186   }
187   int32_t timestamp_value =
188       DecodeFixed32(value.data() + value.size() - kTSLength);
189   return (timestamp_value + ttl) < curtime;
190 }
191 
192 // Strips the TS from the end of the slice
StripTS(PinnableSlice * pinnable_val)193 Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
194   if (pinnable_val->size() < kTSLength) {
195     return Status::Corruption("Bad timestamp in key-value");
196   }
197   // Erasing characters which hold the TS
198   pinnable_val->remove_suffix(kTSLength);
199   return Status::OK();
200 }
201 
202 // Strips the TS from the end of the string
StripTS(std::string * str)203 Status DBWithTTLImpl::StripTS(std::string* str) {
204   if (str->length() < kTSLength) {
205     return Status::Corruption("Bad timestamp in key-value");
206   }
207   // Erasing characters which hold the TS
208   str->erase(str->length() - kTSLength, kTSLength);
209   return Status::OK();
210 }
211 
Put(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)212 Status DBWithTTLImpl::Put(const WriteOptions& options,
213                           ColumnFamilyHandle* column_family, const Slice& key,
214                           const Slice& val) {
215   WriteBatch batch;
216   Status st = batch.Put(column_family, key, val);
217   if (st.ok()) {
218     st = Write(options, &batch);
219   }
220   return st;
221 }
222 
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)223 Status DBWithTTLImpl::Get(const ReadOptions& options,
224                           ColumnFamilyHandle* column_family, const Slice& key,
225                           PinnableSlice* value) {
226   Status st = db_->Get(options, column_family, key, value);
227   if (!st.ok()) {
228     return st;
229   }
230   st = SanityCheckTimestamp(*value);
231   if (!st.ok()) {
232     return st;
233   }
234   return StripTS(value);
235 }
236 
MultiGet(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)237 std::vector<Status> DBWithTTLImpl::MultiGet(
238     const ReadOptions& options,
239     const std::vector<ColumnFamilyHandle*>& column_family,
240     const std::vector<Slice>& keys, std::vector<std::string>* values) {
241   auto statuses = db_->MultiGet(options, column_family, keys, values);
242   for (size_t i = 0; i < keys.size(); ++i) {
243     if (!statuses[i].ok()) {
244       continue;
245     }
246     statuses[i] = SanityCheckTimestamp((*values)[i]);
247     if (!statuses[i].ok()) {
248       continue;
249     }
250     statuses[i] = StripTS(&(*values)[i]);
251   }
252   return statuses;
253 }
254 
KeyMayExist(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool * value_found)255 bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
256                                 ColumnFamilyHandle* column_family,
257                                 const Slice& key, std::string* value,
258                                 bool* value_found) {
259   bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
260   if (ret && value != nullptr && value_found != nullptr && *value_found) {
261     if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
262       return false;
263     }
264   }
265   return ret;
266 }
267 
Merge(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)268 Status DBWithTTLImpl::Merge(const WriteOptions& options,
269                             ColumnFamilyHandle* column_family, const Slice& key,
270                             const Slice& value) {
271   WriteBatch batch;
272   Status st = batch.Merge(column_family, key, value);
273   if (st.ok()) {
274     st = Write(options, &batch);
275   }
276   return st;
277 }
278 
Write(const WriteOptions & opts,WriteBatch * updates)279 Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
280   class Handler : public WriteBatch::Handler {
281    public:
282     explicit Handler(SystemClock* clock) : clock_(clock) {}
283     WriteBatch updates_ttl;
284     Status PutCF(uint32_t column_family_id, const Slice& key,
285                  const Slice& value) override {
286       std::string value_with_ts;
287       Status st = AppendTS(value, &value_with_ts, clock_);
288       if (!st.ok()) {
289         return st;
290       }
291       return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
292                                      value_with_ts);
293     }
294     Status MergeCF(uint32_t column_family_id, const Slice& key,
295                    const Slice& value) override {
296       std::string value_with_ts;
297       Status st = AppendTS(value, &value_with_ts, clock_);
298       if (!st.ok()) {
299         return st;
300       }
301       return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
302                                        value_with_ts);
303     }
304     Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
305       return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
306     }
307     void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
308 
309    private:
310     SystemClock* clock_;
311   };
312   Handler handler(GetEnv()->GetSystemClock().get());
313   Status st = updates->Iterate(&handler);
314   if (!st.ok()) {
315     return st;
316   } else {
317     return db_->Write(opts, &(handler.updates_ttl));
318   }
319 }
320 
NewIterator(const ReadOptions & opts,ColumnFamilyHandle * column_family)321 Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
322                                      ColumnFamilyHandle* column_family) {
323   return new TtlIterator(db_->NewIterator(opts, column_family));
324 }
325 
SetTtl(ColumnFamilyHandle * h,int32_t ttl)326 void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
327   std::shared_ptr<TtlCompactionFilterFactory> filter;
328   Options opts;
329   opts = GetOptions(h);
330   filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
331                                        opts.compaction_filter_factory);
332   if (!filter)
333     return;
334   filter->SetTtl(ttl);
335 }
336 
337 }  // namespace ROCKSDB_NAMESPACE
338 #endif  // ROCKSDB_LITE
339