1 /*
2 Copyright (c) 2015, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17 #ifdef _WIN32
18 #define _CRT_RAND_S
19 #endif
20 #include <my_global.h>
21 #ifdef _WIN32
22 #include <stdlib.h>
23 #define rand_r rand_s
24 #endif
25 /* This C++ file's header file */
26 #include "./properties_collector.h"
27
28 /* Standard C++ header files */
29 #include <algorithm>
30 #include <map>
31 #include <string>
32 #include <vector>
33
34 /* MySQL header files */
35 #include "./log.h"
36 #include "./my_stacktrace.h"
37 #include "./sql_array.h"
38
39 /* MyRocks header files */
40 #include "./rdb_datadic.h"
41 #include "./rdb_utils.h"
42
43 namespace myrocks {
44
45 std::atomic<uint64_t> rocksdb_num_sst_entry_put(0);
46 std::atomic<uint64_t> rocksdb_num_sst_entry_delete(0);
47 std::atomic<uint64_t> rocksdb_num_sst_entry_singledelete(0);
48 std::atomic<uint64_t> rocksdb_num_sst_entry_merge(0);
49 std::atomic<uint64_t> rocksdb_num_sst_entry_other(0);
50 my_bool rocksdb_compaction_sequential_deletes_count_sd = false;
51
Rdb_tbl_prop_coll(Rdb_ddl_manager * const ddl_manager,const Rdb_compact_params & params,const uint32_t cf_id,const uint8_t table_stats_sampling_pct)52 Rdb_tbl_prop_coll::Rdb_tbl_prop_coll(Rdb_ddl_manager *const ddl_manager,
53 const Rdb_compact_params ¶ms,
54 const uint32_t cf_id,
55 const uint8_t table_stats_sampling_pct)
56 : m_cf_id(cf_id),
57 m_ddl_manager(ddl_manager),
58 m_last_stats(nullptr),
59 m_rows(0l),
60 m_window_pos(0l),
61 m_deleted_rows(0l),
62 m_max_deleted_rows(0l),
63 m_file_size(0),
64 m_params(params),
65 m_cardinality_collector(table_stats_sampling_pct),
66 m_recorded(false) {
67 DBUG_ASSERT(ddl_manager != nullptr);
68
69 m_deleted_rows_window.resize(m_params.m_window, false);
70 }
71
72 /*
73 This function is called by RocksDB for every key in the SST file
74 */
AddUserKey(const rocksdb::Slice & key,const rocksdb::Slice & value,rocksdb::EntryType type,rocksdb::SequenceNumber seq,uint64_t file_size)75 rocksdb::Status Rdb_tbl_prop_coll::AddUserKey(const rocksdb::Slice &key,
76 const rocksdb::Slice &value,
77 rocksdb::EntryType type,
78 rocksdb::SequenceNumber seq,
79 uint64_t file_size) {
80 if (key.size() >= 4) {
81 AdjustDeletedRows(type);
82
83 m_rows++;
84
85 CollectStatsForRow(key, value, type, file_size);
86 }
87
88 return rocksdb::Status::OK();
89 }
90
AdjustDeletedRows(rocksdb::EntryType type)91 void Rdb_tbl_prop_coll::AdjustDeletedRows(rocksdb::EntryType type) {
92 if (m_params.m_window > 0) {
93 // record the "is deleted" flag into the sliding window
94 // the sliding window is implemented as a circular buffer
95 // in m_deleted_rows_window vector
96 // the current position in the circular buffer is pointed at by
97 // m_rows % m_deleted_rows_window.size()
98 // m_deleted_rows is the current number of 1's in the vector
99 // --update the counter for the element which will be overridden
100 const bool is_delete = (type == rocksdb::kEntryDelete ||
101 (type == rocksdb::kEntrySingleDelete &&
102 rocksdb_compaction_sequential_deletes_count_sd));
103
104 // Only make changes if the value at the current position needs to change
105 if (is_delete != m_deleted_rows_window[m_window_pos]) {
106 // Set or clear the flag at the current position as appropriate
107 m_deleted_rows_window[m_window_pos] = is_delete;
108 if (!is_delete) {
109 m_deleted_rows--;
110 } else if (++m_deleted_rows > m_max_deleted_rows) {
111 m_max_deleted_rows = m_deleted_rows;
112 }
113 }
114
115 if (++m_window_pos == m_params.m_window) {
116 m_window_pos = 0;
117 }
118 }
119 }
120
AccessStats(const rocksdb::Slice & key)121 Rdb_index_stats *Rdb_tbl_prop_coll::AccessStats(const rocksdb::Slice &key) {
122 GL_INDEX_ID gl_index_id;
123 gl_index_id.cf_id = m_cf_id;
124 gl_index_id.index_id = rdb_netbuf_to_uint32(reinterpret_cast<const uchar*>(key.data()));
125
126 if (m_last_stats == nullptr || m_last_stats->m_gl_index_id != gl_index_id) {
127 m_keydef = nullptr;
128
129 // starting a new table
130 // add the new element into m_stats
131 m_stats.emplace_back(gl_index_id);
132 m_last_stats = &m_stats.back();
133
134 if (m_ddl_manager) {
135 // safe_find() returns a std::shared_ptr<Rdb_key_def> with the count
136 // incremented (so it can't be deleted out from under us) and with
137 // the mutex locked (if setup has not occurred yet). We must make
138 // sure to free the mutex (via unblock_setup()) when we are done
139 // with this object. Currently this happens earlier in this function
140 // when we are switching to a new Rdb_key_def and when this object
141 // is destructed.
142 m_keydef = m_ddl_manager->safe_find(gl_index_id);
143 if (m_keydef != nullptr) {
144 // resize the array to the number of columns.
145 // It will be initialized with zeroes
146 m_last_stats->m_distinct_keys_per_prefix.resize(
147 m_keydef->get_key_parts());
148 m_last_stats->m_name = m_keydef->get_name();
149 }
150 }
151 m_cardinality_collector.Reset();
152 }
153
154 return m_last_stats;
155 }
156
CollectStatsForRow(const rocksdb::Slice & key,const rocksdb::Slice & value,const rocksdb::EntryType & type,const uint64_t file_size)157 void Rdb_tbl_prop_coll::CollectStatsForRow(const rocksdb::Slice &key,
158 const rocksdb::Slice &value,
159 const rocksdb::EntryType &type,
160 const uint64_t file_size) {
161 auto stats = AccessStats(key);
162
163 stats->m_data_size += key.size() + value.size();
164
165 // Incrementing per-index entry-type statistics
166 switch (type) {
167 case rocksdb::kEntryPut:
168 stats->m_rows++;
169 break;
170 case rocksdb::kEntryDelete:
171 stats->m_entry_deletes++;
172 break;
173 case rocksdb::kEntrySingleDelete:
174 stats->m_entry_single_deletes++;
175 break;
176 case rocksdb::kEntryMerge:
177 stats->m_entry_merges++;
178 break;
179 case rocksdb::kEntryOther:
180 stats->m_entry_others++;
181 break;
182 default:
183 // NO_LINT_DEBUG
184 sql_print_error(
185 "RocksDB: Unexpected entry type found: %u. "
186 "This should not happen so aborting the system.",
187 type);
188 abort();
189 break;
190 }
191
192 stats->m_actual_disk_size += file_size - m_file_size;
193 m_file_size = file_size;
194
195 if (m_keydef != nullptr) {
196 m_cardinality_collector.ProcessKey(key, m_keydef.get(), stats);
197 }
198 }
199
200 const char *Rdb_tbl_prop_coll::INDEXSTATS_KEY = "__indexstats__";
201
202 /*
203 This function is called by RocksDB to compute properties to store in sst file
204 */
Finish(rocksdb::UserCollectedProperties * const properties)205 rocksdb::Status Rdb_tbl_prop_coll::Finish(
206 rocksdb::UserCollectedProperties *const properties) {
207 uint64_t num_sst_entry_put = 0;
208 uint64_t num_sst_entry_delete = 0;
209 uint64_t num_sst_entry_singledelete = 0;
210 uint64_t num_sst_entry_merge = 0;
211 uint64_t num_sst_entry_other = 0;
212
213 DBUG_ASSERT(properties != nullptr);
214
215 for (auto it = m_stats.begin(); it != m_stats.end(); it++) {
216 num_sst_entry_put += it->m_rows;
217 num_sst_entry_delete += it->m_entry_deletes;
218 num_sst_entry_singledelete += it->m_entry_single_deletes;
219 num_sst_entry_merge += it->m_entry_merges;
220 num_sst_entry_other += it->m_entry_others;
221 }
222
223 if (!m_recorded) {
224 if (num_sst_entry_put > 0) {
225 rocksdb_num_sst_entry_put += num_sst_entry_put;
226 }
227
228 if (num_sst_entry_delete > 0) {
229 rocksdb_num_sst_entry_delete += num_sst_entry_delete;
230 }
231
232 if (num_sst_entry_singledelete > 0) {
233 rocksdb_num_sst_entry_singledelete += num_sst_entry_singledelete;
234 }
235
236 if (num_sst_entry_merge > 0) {
237 rocksdb_num_sst_entry_merge += num_sst_entry_merge;
238 }
239
240 if (num_sst_entry_other > 0) {
241 rocksdb_num_sst_entry_other += num_sst_entry_other;
242 }
243
244 for (Rdb_index_stats &stat : m_stats) {
245 m_cardinality_collector.AdjustStats(&stat);
246 }
247 m_recorded = true;
248 }
249 properties->insert({INDEXSTATS_KEY, Rdb_index_stats::materialize(m_stats)});
250 return rocksdb::Status::OK();
251 }
252
NeedCompact() const253 bool Rdb_tbl_prop_coll::NeedCompact() const {
254 return m_params.m_deletes && (m_params.m_window > 0) &&
255 (m_file_size > m_params.m_file_size) &&
256 (m_max_deleted_rows > m_params.m_deletes);
257 }
258
259 /*
260 Returns the same as above, but in human-readable way for logging
261 */
GetReadableProperties() const262 rocksdb::UserCollectedProperties Rdb_tbl_prop_coll::GetReadableProperties()
263 const {
264 std::string s;
265 #ifdef DBUG_OFF
266 s.append("[...");
267 s.append(std::to_string(m_stats.size()));
268 s.append(" records...]");
269 #else
270 bool first = true;
271 for (auto it : m_stats) {
272 if (first) {
273 first = false;
274 } else {
275 s.append(",");
276 }
277 s.append(GetReadableStats(it));
278 }
279 #endif
280 return rocksdb::UserCollectedProperties{{INDEXSTATS_KEY, s}};
281 }
282
GetReadableStats(const Rdb_index_stats & it)283 std::string Rdb_tbl_prop_coll::GetReadableStats(const Rdb_index_stats &it) {
284 std::string s;
285 s.append("(");
286 s.append(std::to_string(it.m_gl_index_id.cf_id));
287 s.append(", ");
288 s.append(std::to_string(it.m_gl_index_id.index_id));
289 s.append("):{name:");
290 s.append(it.m_name);
291 s.append(", size:");
292 s.append(std::to_string(it.m_data_size));
293 s.append(", m_rows:");
294 s.append(std::to_string(it.m_rows));
295 s.append(", m_actual_disk_size:");
296 s.append(std::to_string(it.m_actual_disk_size));
297 s.append(", deletes:");
298 s.append(std::to_string(it.m_entry_deletes));
299 s.append(", single_deletes:");
300 s.append(std::to_string(it.m_entry_single_deletes));
301 s.append(", merges:");
302 s.append(std::to_string(it.m_entry_merges));
303 s.append(", others:");
304 s.append(std::to_string(it.m_entry_others));
305 s.append(", distincts per prefix: [");
306 for (auto num : it.m_distinct_keys_per_prefix) {
307 s.append(std::to_string(num));
308 s.append(" ");
309 }
310 s.append("]}");
311 return s;
312 }
313
314 /*
315 Given the properties of an SST file, reads the stats from it and returns it.
316 */
317
read_stats_from_tbl_props(const std::shared_ptr<const rocksdb::TableProperties> & table_props,std::vector<Rdb_index_stats> * const out_stats_vector)318 void Rdb_tbl_prop_coll::read_stats_from_tbl_props(
319 const std::shared_ptr<const rocksdb::TableProperties> &table_props,
320 std::vector<Rdb_index_stats> *const out_stats_vector) {
321 DBUG_ASSERT(out_stats_vector != nullptr);
322 const auto &user_properties = table_props->user_collected_properties;
323 const auto it2 = user_properties.find(std::string(INDEXSTATS_KEY));
324 if (it2 != user_properties.end()) {
325 auto result MY_ATTRIBUTE((__unused__)) =
326 Rdb_index_stats::unmaterialize(it2->second, out_stats_vector);
327 DBUG_ASSERT(result == 0);
328 }
329 }
330
331 /*
332 Serializes an array of Rdb_index_stats into a network string.
333 */
materialize(const std::vector<Rdb_index_stats> & stats)334 std::string Rdb_index_stats::materialize(
335 const std::vector<Rdb_index_stats> &stats) {
336 String ret;
337 rdb_netstr_append_uint16(&ret, INDEX_STATS_VERSION_ENTRY_TYPES);
338 for (const auto &i : stats) {
339 rdb_netstr_append_uint32(&ret, i.m_gl_index_id.cf_id);
340 rdb_netstr_append_uint32(&ret, i.m_gl_index_id.index_id);
341 DBUG_ASSERT(sizeof i.m_data_size <= 8);
342 rdb_netstr_append_uint64(&ret, i.m_data_size);
343 rdb_netstr_append_uint64(&ret, i.m_rows);
344 rdb_netstr_append_uint64(&ret, i.m_actual_disk_size);
345 rdb_netstr_append_uint64(&ret, i.m_distinct_keys_per_prefix.size());
346 rdb_netstr_append_uint64(&ret, i.m_entry_deletes);
347 rdb_netstr_append_uint64(&ret, i.m_entry_single_deletes);
348 rdb_netstr_append_uint64(&ret, i.m_entry_merges);
349 rdb_netstr_append_uint64(&ret, i.m_entry_others);
350 for (const auto &num_keys : i.m_distinct_keys_per_prefix) {
351 rdb_netstr_append_uint64(&ret, num_keys);
352 }
353 }
354
355 return std::string((char *)ret.ptr(), ret.length());
356 }
357
358 /**
359 @brief
360 Reads an array of Rdb_index_stats from a string.
361 @return HA_EXIT_FAILURE if it detects any inconsistency in the input
362 @return HA_EXIT_SUCCESS if completes successfully
363 */
unmaterialize(const std::string & s,std::vector<Rdb_index_stats> * const ret)364 int Rdb_index_stats::unmaterialize(const std::string &s,
365 std::vector<Rdb_index_stats> *const ret) {
366 const uchar *p = rdb_std_str_to_uchar_ptr(s);
367 const uchar *const p2 = p + s.size();
368
369 DBUG_ASSERT(ret != nullptr);
370
371 if (p + 2 > p2) {
372 return HA_EXIT_FAILURE;
373 }
374
375 const int version = rdb_netbuf_read_uint16(&p);
376 Rdb_index_stats stats;
377 // Make sure version is within supported range.
378 if (version < INDEX_STATS_VERSION_INITIAL ||
379 version > INDEX_STATS_VERSION_ENTRY_TYPES) {
380 // NO_LINT_DEBUG
381 sql_print_error(
382 "Index stats version %d was outside of supported range. "
383 "This should not happen so aborting the system.",
384 version);
385 abort();
386 }
387
388 size_t needed = sizeof(stats.m_gl_index_id.cf_id) +
389 sizeof(stats.m_gl_index_id.index_id) +
390 sizeof(stats.m_data_size) + sizeof(stats.m_rows) +
391 sizeof(stats.m_actual_disk_size) + sizeof(uint64);
392 if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
393 needed += sizeof(stats.m_entry_deletes) +
394 sizeof(stats.m_entry_single_deletes) +
395 sizeof(stats.m_entry_merges) + sizeof(stats.m_entry_others);
396 }
397
398 while (p < p2) {
399 if (p + needed > p2) {
400 return HA_EXIT_FAILURE;
401 }
402 rdb_netbuf_read_gl_index(&p, &stats.m_gl_index_id);
403 stats.m_data_size = rdb_netbuf_read_uint64(&p);
404 stats.m_rows = rdb_netbuf_read_uint64(&p);
405 stats.m_actual_disk_size = rdb_netbuf_read_uint64(&p);
406 stats.m_distinct_keys_per_prefix.resize(rdb_netbuf_read_uint64(&p));
407 if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
408 stats.m_entry_deletes = rdb_netbuf_read_uint64(&p);
409 stats.m_entry_single_deletes = rdb_netbuf_read_uint64(&p);
410 stats.m_entry_merges = rdb_netbuf_read_uint64(&p);
411 stats.m_entry_others = rdb_netbuf_read_uint64(&p);
412 }
413 if (p + stats.m_distinct_keys_per_prefix.size() *
414 sizeof(stats.m_distinct_keys_per_prefix[0]) >
415 p2) {
416 return HA_EXIT_FAILURE;
417 }
418 for (std::size_t i = 0; i < stats.m_distinct_keys_per_prefix.size(); i++) {
419 stats.m_distinct_keys_per_prefix[i] = rdb_netbuf_read_uint64(&p);
420 }
421 ret->push_back(stats);
422 }
423 return HA_EXIT_SUCCESS;
424 }
425
426 /*
427 Merges one Rdb_index_stats into another. Can be used to come up with the stats
428 for the index based on stats for each sst
429 */
merge(const Rdb_index_stats & s,const bool increment,const int64_t estimated_data_len)430 void Rdb_index_stats::merge(const Rdb_index_stats &s, const bool increment,
431 const int64_t estimated_data_len) {
432 std::size_t i;
433
434 DBUG_ASSERT(estimated_data_len >= 0);
435
436 m_gl_index_id = s.m_gl_index_id;
437 if (m_distinct_keys_per_prefix.size() < s.m_distinct_keys_per_prefix.size()) {
438 m_distinct_keys_per_prefix.resize(s.m_distinct_keys_per_prefix.size());
439 }
440 if (increment) {
441 m_rows += s.m_rows;
442 m_data_size += s.m_data_size;
443
444 /*
445 The Data_length and Avg_row_length are trailing statistics, meaning
446 they don't get updated for the current SST until the next SST is
447 written. So, if rocksdb reports the data_length as 0,
448 we make a reasoned estimate for the data_file_length for the
449 index in the current SST.
450 */
451 m_actual_disk_size += s.m_actual_disk_size ? s.m_actual_disk_size
452 : estimated_data_len * s.m_rows;
453 m_entry_deletes += s.m_entry_deletes;
454 m_entry_single_deletes += s.m_entry_single_deletes;
455 m_entry_merges += s.m_entry_merges;
456 m_entry_others += s.m_entry_others;
457 for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
458 m_distinct_keys_per_prefix[i] += s.m_distinct_keys_per_prefix[i];
459 }
460 } else {
461 m_rows -= s.m_rows;
462 m_data_size -= s.m_data_size;
463 m_actual_disk_size -= s.m_actual_disk_size ? s.m_actual_disk_size
464 : estimated_data_len * s.m_rows;
465 m_entry_deletes -= s.m_entry_deletes;
466 m_entry_single_deletes -= s.m_entry_single_deletes;
467 m_entry_merges -= s.m_entry_merges;
468 m_entry_others -= s.m_entry_others;
469 for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
470 m_distinct_keys_per_prefix[i] -= s.m_distinct_keys_per_prefix[i];
471 }
472 }
473 }
474
Rdb_tbl_card_coll(const uint8_t table_stats_sampling_pct)475 Rdb_tbl_card_coll::Rdb_tbl_card_coll(const uint8_t table_stats_sampling_pct)
476 : m_table_stats_sampling_pct(table_stats_sampling_pct),
477 m_seed(time(nullptr)) {}
478
IsSampingDisabled()479 bool Rdb_tbl_card_coll::IsSampingDisabled() {
480 // Zero means that we'll use all the keys to update statistics.
481 return m_table_stats_sampling_pct == 0 ||
482 RDB_TBL_STATS_SAMPLE_PCT_MAX == m_table_stats_sampling_pct;
483 }
484
ShouldCollectStats()485 bool Rdb_tbl_card_coll::ShouldCollectStats() {
486 if (IsSampingDisabled()) {
487 return true; // collect every key
488 }
489
490 const int val = rand_r(&m_seed) % (RDB_TBL_STATS_SAMPLE_PCT_MAX -
491 RDB_TBL_STATS_SAMPLE_PCT_MIN + 1) +
492 RDB_TBL_STATS_SAMPLE_PCT_MIN;
493
494 DBUG_ASSERT(val >= RDB_TBL_STATS_SAMPLE_PCT_MIN);
495 DBUG_ASSERT(val <= RDB_TBL_STATS_SAMPLE_PCT_MAX);
496
497 return val <= m_table_stats_sampling_pct;
498 }
499
ProcessKey(const rocksdb::Slice & key,const Rdb_key_def * keydef,Rdb_index_stats * stats)500 void Rdb_tbl_card_coll::ProcessKey(const rocksdb::Slice &key,
501 const Rdb_key_def *keydef,
502 Rdb_index_stats *stats) {
503 if (ShouldCollectStats()) {
504 std::size_t column = 0;
505 bool new_key = true;
506
507 if (!m_last_key.empty()) {
508 rocksdb::Slice last(m_last_key.data(), m_last_key.size());
509 new_key = (keydef->compare_keys(&last, &key, &column) == 0);
510 }
511
512 if (new_key) {
513 DBUG_ASSERT(column <= stats->m_distinct_keys_per_prefix.size());
514
515 for (auto i = column; i < stats->m_distinct_keys_per_prefix.size(); i++) {
516 stats->m_distinct_keys_per_prefix[i]++;
517 }
518
519 // assign new last_key for the next call
520 // however, we only need to change the last key
521 // if one of the first n-1 columns is different
522 // If the n-1 prefix is the same, no sense in storing
523 // the new key
524 if (column < stats->m_distinct_keys_per_prefix.size()) {
525 m_last_key.assign(key.data(), key.size());
526 }
527 }
528 }
529 }
530
Reset()531 void Rdb_tbl_card_coll::Reset() { m_last_key.clear(); }
532
533 // We need to adjust the index cardinality numbers based on the sampling
534 // rate so that the output of "SHOW INDEX" command will reflect reality
535 // more closely. It will still be an approximation, just a better one.
AdjustStats(Rdb_index_stats * stats)536 void Rdb_tbl_card_coll::AdjustStats(Rdb_index_stats *stats) {
537 if (IsSampingDisabled()) {
538 // no sampling was done, return as stats is
539 return;
540 }
541 for (int64_t &num_keys : stats->m_distinct_keys_per_prefix) {
542 num_keys = num_keys * 100 / m_table_stats_sampling_pct;
543 }
544 }
545
546 } // namespace myrocks
547