1 /* 2 Portions Copyright (c) 2016-Present, Facebook, Inc. 3 Portions Copyright (c) 2012, Monty Program Ab 4 5 This program is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published by 7 the Free Software Foundation; version 2 of the License. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ 17 #pragma once 18 19 #ifdef USE_PRAGMA_IMPLEMENTATION 20 #pragma implementation // gcc: Class implementation 21 #endif 22 23 /* C++ system header files */ 24 #include <time.h> 25 #include <string> 26 #include <ctime> 27 28 /* RocksDB includes */ 29 #include "rocksdb/compaction_filter.h" 30 31 /* MyRocks includes */ 32 #include "./ha_rocksdb_proto.h" 33 #include "./rdb_datadic.h" 34 35 namespace myrocks { 36 37 class Rdb_compact_filter : public rocksdb::CompactionFilter { 38 public: 39 Rdb_compact_filter(const Rdb_compact_filter &) = delete; 40 Rdb_compact_filter &operator=(const Rdb_compact_filter &) = delete; 41 Rdb_compact_filter(uint32_t _cf_id)42 explicit Rdb_compact_filter(uint32_t _cf_id) : m_cf_id(_cf_id) {} ~Rdb_compact_filter()43 ~Rdb_compact_filter() { 44 // Increment stats by num expired at the end of compaction 45 rdb_update_global_stats(ROWS_EXPIRED, m_num_expired); 46 } 47 48 // keys are passed in sorted order within the same sst. 49 // V1 Filter is thread safe on our usage (creating from Factory). 50 // Make sure to protect instance variables when switching to thread 51 // unsafe in the future. Filter(int level,const rocksdb::Slice & key,const rocksdb::Slice & existing_value,std::string * new_value,bool * value_changed)52 virtual bool Filter(int level, const rocksdb::Slice &key, 53 const rocksdb::Slice &existing_value, 54 std::string *new_value, 55 bool *value_changed) const override { 56 DBUG_ASSERT(key.size() >= sizeof(uint32)); 57 58 GL_INDEX_ID gl_index_id; 59 gl_index_id.cf_id = m_cf_id; 60 gl_index_id.index_id = rdb_netbuf_to_uint32((const uchar *)key.data()); 61 DBUG_ASSERT(gl_index_id.index_id >= 1); 62 63 if (gl_index_id != m_prev_index) { 64 m_should_delete = 65 rdb_get_dict_manager()->is_drop_index_ongoing(gl_index_id); 66 67 if (!m_should_delete) { 68 get_ttl_duration_and_offset(gl_index_id, &m_ttl_duration, 69 &m_ttl_offset); 70 71 if (m_ttl_duration != 0 && m_snapshot_timestamp == 0) { 72 /* 73 For efficiency reasons, we lazily call GetIntProperty to get the 74 oldest snapshot time (occurs once per compaction). 75 */ 76 rocksdb::DB *const rdb = rdb_get_rocksdb_db(); 77 if (!rdb->GetIntProperty(rocksdb::DB::Properties::kOldestSnapshotTime, 78 &m_snapshot_timestamp) || 79 m_snapshot_timestamp == 0) { 80 m_snapshot_timestamp = static_cast<uint64_t>(std::time(nullptr)); 81 } 82 83 #ifndef DBUG_OFF 84 int snapshot_ts = rdb_dbug_set_ttl_snapshot_ts(); 85 if (snapshot_ts) { 86 m_snapshot_timestamp = 87 static_cast<uint64_t>(std::time(nullptr)) + snapshot_ts; 88 } 89 #endif 90 } 91 } 92 93 m_prev_index = gl_index_id; 94 } 95 96 if (m_should_delete) { 97 m_num_deleted++; 98 return true; 99 } else if (m_ttl_duration > 0 && 100 should_filter_ttl_rec(key, existing_value)) { 101 m_num_expired++; 102 return true; 103 } 104 105 return false; 106 } 107 IgnoreSnapshots()108 virtual bool IgnoreSnapshots() const override { return true; } 109 Name()110 virtual const char *Name() const override { return "Rdb_compact_filter"; } 111 get_ttl_duration_and_offset(const GL_INDEX_ID & gl_index_id,uint64 * ttl_duration,uint32 * ttl_offset)112 void get_ttl_duration_and_offset(const GL_INDEX_ID &gl_index_id, 113 uint64 *ttl_duration, 114 uint32 *ttl_offset) const { 115 DBUG_ASSERT(ttl_duration != nullptr); 116 /* 117 If TTL is disabled set ttl_duration to 0. This prevents the compaction 118 filter from dropping expired records. 119 */ 120 if (!rdb_is_ttl_enabled()) { 121 *ttl_duration = 0; 122 return; 123 } 124 125 /* 126 If key is part of system column family, it's definitely not a TTL key. 127 */ 128 rocksdb::ColumnFamilyHandle *s_cf = rdb_get_dict_manager()->get_system_cf(); 129 if (s_cf == nullptr || gl_index_id.cf_id == s_cf->GetID()) { 130 *ttl_duration = 0; 131 return; 132 } 133 134 struct Rdb_index_info index_info; 135 if (!rdb_get_dict_manager()->get_index_info(gl_index_id, &index_info)) { 136 // NO_LINT_DEBUG 137 sql_print_error( 138 "RocksDB: Could not get index information " 139 "for Index Number (%u,%u)", 140 gl_index_id.cf_id, gl_index_id.index_id); 141 } 142 143 #ifndef DBUG_OFF 144 if (rdb_dbug_set_ttl_ignore_pk() && 145 index_info.m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY) { 146 *ttl_duration = 0; 147 return; 148 } 149 #endif 150 151 *ttl_duration = index_info.m_ttl_duration; 152 if (Rdb_key_def::has_index_flag(index_info.m_index_flags, 153 Rdb_key_def::TTL_FLAG)) { 154 *ttl_offset = Rdb_key_def::calculate_index_flag_offset( 155 index_info.m_index_flags, Rdb_key_def::TTL_FLAG); 156 } 157 } 158 should_filter_ttl_rec(const rocksdb::Slice & key,const rocksdb::Slice & existing_value)159 bool should_filter_ttl_rec(const rocksdb::Slice &key, 160 const rocksdb::Slice &existing_value) const { 161 uint64 ttl_timestamp; 162 Rdb_string_reader reader(&existing_value); 163 if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) { 164 std::string buf; 165 buf = rdb_hexdump(existing_value.data(), existing_value.size(), 166 RDB_MAX_HEXDUMP_LEN); 167 // NO_LINT_DEBUG 168 sql_print_error( 169 "Decoding ttl from PK value failed in compaction filter, " 170 "for index (%u,%u), val: %s", 171 m_prev_index.cf_id, m_prev_index.index_id, buf.c_str()); 172 abort(); 173 } 174 175 /* 176 Filter out the record only if it is older than the oldest snapshot 177 timestamp. This prevents any rows from expiring in the middle of 178 long-running transactions. 179 */ 180 return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp; 181 } 182 183 private: 184 // Column family for this compaction filter 185 const uint32_t m_cf_id; 186 // Index id of the previous record 187 mutable GL_INDEX_ID m_prev_index = {0, 0}; 188 // Number of rows deleted for the same index id 189 mutable uint64 m_num_deleted = 0; 190 // Number of rows expired for the TTL index 191 mutable uint64 m_num_expired = 0; 192 // Current index id should be deleted or not (should be deleted if true) 193 mutable bool m_should_delete = false; 194 // TTL duration for the current index if TTL is enabled 195 mutable uint64 m_ttl_duration = 0; 196 // TTL offset for all records in the current index 197 mutable uint32 m_ttl_offset = 0; 198 // Oldest snapshot timestamp at the time a TTL index is discovered 199 mutable uint64_t m_snapshot_timestamp = 0; 200 }; 201 202 class Rdb_compact_filter_factory : public rocksdb::CompactionFilterFactory { 203 public: 204 Rdb_compact_filter_factory(const Rdb_compact_filter_factory &) = delete; 205 Rdb_compact_filter_factory &operator=(const Rdb_compact_filter_factory &) = 206 delete; Rdb_compact_filter_factory()207 Rdb_compact_filter_factory() {} 208 ~Rdb_compact_filter_factory()209 ~Rdb_compact_filter_factory() {} 210 Name()211 const char *Name() const override { return "Rdb_compact_filter_factory"; } 212 CreateCompactionFilter(const rocksdb::CompactionFilter::Context & context)213 std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter( 214 const rocksdb::CompactionFilter::Context &context) override { 215 return std::unique_ptr<rocksdb::CompactionFilter>( 216 new Rdb_compact_filter(context.column_family_id)); 217 } 218 }; 219 220 } // namespace myrocks 221