1 /*
2    Portions Copyright (c) 2016-Present, Facebook, Inc.
3    Portions Copyright (c) 2012, Monty Program Ab
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
17 #pragma once
18 
19 #ifdef USE_PRAGMA_IMPLEMENTATION
20 #pragma implementation  // gcc: Class implementation
21 #endif
22 
23 /* C++ system header files */
24 #include <time.h>
25 #include <string>
26 #include <ctime>
27 
28 /* RocksDB includes */
29 #include "rocksdb/compaction_filter.h"
30 
31 /* MyRocks includes */
32 #include "./ha_rocksdb_proto.h"
33 #include "./rdb_datadic.h"
34 
35 namespace myrocks {
36 
37 class Rdb_compact_filter : public rocksdb::CompactionFilter {
38  public:
39   Rdb_compact_filter(const Rdb_compact_filter &) = delete;
40   Rdb_compact_filter &operator=(const Rdb_compact_filter &) = delete;
41 
Rdb_compact_filter(uint32_t _cf_id)42   explicit Rdb_compact_filter(uint32_t _cf_id) : m_cf_id(_cf_id) {}
~Rdb_compact_filter()43   ~Rdb_compact_filter() {
44     // Increment stats by num expired at the end of compaction
45     rdb_update_global_stats(ROWS_EXPIRED, m_num_expired);
46   }
47 
48   // keys are passed in sorted order within the same sst.
49   // V1 Filter is thread safe on our usage (creating from Factory).
50   // Make sure to protect instance variables when switching to thread
51   // unsafe in the future.
Filter(int level,const rocksdb::Slice & key,const rocksdb::Slice & existing_value,std::string * new_value,bool * value_changed)52   virtual bool Filter(int level, const rocksdb::Slice &key,
53                       const rocksdb::Slice &existing_value,
54                       std::string *new_value,
55                       bool *value_changed) const override {
56     DBUG_ASSERT(key.size() >= sizeof(uint32));
57 
58     GL_INDEX_ID gl_index_id;
59     gl_index_id.cf_id = m_cf_id;
60     gl_index_id.index_id = rdb_netbuf_to_uint32((const uchar *)key.data());
61     DBUG_ASSERT(gl_index_id.index_id >= 1);
62 
63     if (gl_index_id != m_prev_index) {
64       m_should_delete =
65           rdb_get_dict_manager()->is_drop_index_ongoing(gl_index_id);
66 
67       if (!m_should_delete) {
68         get_ttl_duration_and_offset(gl_index_id, &m_ttl_duration,
69                                     &m_ttl_offset);
70 
71         if (m_ttl_duration != 0 && m_snapshot_timestamp == 0) {
72           /*
73             For efficiency reasons, we lazily call GetIntProperty to get the
74             oldest snapshot time (occurs once per compaction).
75           */
76           rocksdb::DB *const rdb = rdb_get_rocksdb_db();
77           if (!rdb->GetIntProperty(rocksdb::DB::Properties::kOldestSnapshotTime,
78                                    &m_snapshot_timestamp) ||
79               m_snapshot_timestamp == 0) {
80             m_snapshot_timestamp = static_cast<uint64_t>(std::time(nullptr));
81           }
82 
83 #ifndef DBUG_OFF
84           int snapshot_ts = rdb_dbug_set_ttl_snapshot_ts();
85           if (snapshot_ts) {
86             m_snapshot_timestamp =
87                 static_cast<uint64_t>(std::time(nullptr)) + snapshot_ts;
88           }
89 #endif
90         }
91       }
92 
93       m_prev_index = gl_index_id;
94     }
95 
96     if (m_should_delete) {
97       m_num_deleted++;
98       return true;
99     } else if (m_ttl_duration > 0 &&
100                should_filter_ttl_rec(key, existing_value)) {
101       m_num_expired++;
102       return true;
103     }
104 
105     return false;
106   }
107 
IgnoreSnapshots()108   virtual bool IgnoreSnapshots() const override { return true; }
109 
Name()110   virtual const char *Name() const override { return "Rdb_compact_filter"; }
111 
get_ttl_duration_and_offset(const GL_INDEX_ID & gl_index_id,uint64 * ttl_duration,uint32 * ttl_offset)112   void get_ttl_duration_and_offset(const GL_INDEX_ID &gl_index_id,
113                                    uint64 *ttl_duration,
114                                    uint32 *ttl_offset) const {
115     DBUG_ASSERT(ttl_duration != nullptr);
116     /*
117       If TTL is disabled set ttl_duration to 0.  This prevents the compaction
118       filter from dropping expired records.
119     */
120     if (!rdb_is_ttl_enabled()) {
121       *ttl_duration = 0;
122       return;
123     }
124 
125     /*
126       If key is part of system column family, it's definitely not a TTL key.
127     */
128     rocksdb::ColumnFamilyHandle *s_cf = rdb_get_dict_manager()->get_system_cf();
129     if (s_cf == nullptr || gl_index_id.cf_id == s_cf->GetID()) {
130       *ttl_duration = 0;
131       return;
132     }
133 
134     struct Rdb_index_info index_info;
135     if (!rdb_get_dict_manager()->get_index_info(gl_index_id, &index_info)) {
136       // NO_LINT_DEBUG
137       sql_print_error(
138           "RocksDB: Could not get index information "
139           "for Index Number (%u,%u)",
140           gl_index_id.cf_id, gl_index_id.index_id);
141     }
142 
143 #ifndef DBUG_OFF
144     if (rdb_dbug_set_ttl_ignore_pk() &&
145         index_info.m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY) {
146       *ttl_duration = 0;
147       return;
148     }
149 #endif
150 
151     *ttl_duration = index_info.m_ttl_duration;
152     if (Rdb_key_def::has_index_flag(index_info.m_index_flags,
153                                     Rdb_key_def::TTL_FLAG)) {
154       *ttl_offset = Rdb_key_def::calculate_index_flag_offset(
155           index_info.m_index_flags, Rdb_key_def::TTL_FLAG);
156     }
157   }
158 
should_filter_ttl_rec(const rocksdb::Slice & key,const rocksdb::Slice & existing_value)159   bool should_filter_ttl_rec(const rocksdb::Slice &key,
160                              const rocksdb::Slice &existing_value) const {
161     uint64 ttl_timestamp;
162     Rdb_string_reader reader(&existing_value);
163     if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) {
164       std::string buf;
165       buf = rdb_hexdump(existing_value.data(), existing_value.size(),
166                         RDB_MAX_HEXDUMP_LEN);
167       // NO_LINT_DEBUG
168       sql_print_error(
169           "Decoding ttl from PK value failed in compaction filter, "
170           "for index (%u,%u), val: %s",
171           m_prev_index.cf_id, m_prev_index.index_id, buf.c_str());
172       abort();
173     }
174 
175     /*
176       Filter out the record only if it is older than the oldest snapshot
177       timestamp.  This prevents any rows from expiring in the middle of
178       long-running transactions.
179     */
180     return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp;
181   }
182 
183  private:
184   // Column family for this compaction filter
185   const uint32_t m_cf_id;
186   // Index id of the previous record
187   mutable GL_INDEX_ID m_prev_index = {0, 0};
188   // Number of rows deleted for the same index id
189   mutable uint64 m_num_deleted = 0;
190   // Number of rows expired for the TTL index
191   mutable uint64 m_num_expired = 0;
192   // Current index id should be deleted or not (should be deleted if true)
193   mutable bool m_should_delete = false;
194   // TTL duration for the current index if TTL is enabled
195   mutable uint64 m_ttl_duration = 0;
196   // TTL offset for all records in the current index
197   mutable uint32 m_ttl_offset = 0;
198   // Oldest snapshot timestamp at the time a TTL index is discovered
199   mutable uint64_t m_snapshot_timestamp = 0;
200 };
201 
202 class Rdb_compact_filter_factory : public rocksdb::CompactionFilterFactory {
203  public:
204   Rdb_compact_filter_factory(const Rdb_compact_filter_factory &) = delete;
205   Rdb_compact_filter_factory &operator=(const Rdb_compact_filter_factory &) =
206       delete;
Rdb_compact_filter_factory()207   Rdb_compact_filter_factory() {}
208 
~Rdb_compact_filter_factory()209   ~Rdb_compact_filter_factory() {}
210 
Name()211   const char *Name() const override { return "Rdb_compact_filter_factory"; }
212 
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & context)213   std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
214       const rocksdb::CompactionFilter::Context &context) override {
215     return std::unique_ptr<rocksdb::CompactionFilter>(
216         new Rdb_compact_filter(context.column_family_id));
217   }
218 };
219 
220 }  // namespace myrocks
221