1 /*
2    Copyright (c) 2015, Facebook, Inc.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16 
17 #ifdef _WIN32
18 #define _CRT_RAND_S
19 #endif
20 #include <my_global.h>
21 #ifdef _WIN32
22 #include <stdlib.h>
23 #define rand_r rand_s
24 #endif
25 /* This C++ file's header file */
26 #include "./properties_collector.h"
27 
28 /* Standard C++ header files */
29 #include <algorithm>
30 #include <map>
31 #include <string>
32 #include <vector>
33 
34 /* MySQL header files */
35 #include "./log.h"
36 #include "./my_stacktrace.h"
37 #include "./sql_array.h"
38 
39 /* MyRocks header files */
40 #include "./rdb_datadic.h"
41 #include "./rdb_utils.h"
42 
43 namespace myrocks {
44 
45 std::atomic<uint64_t> rocksdb_num_sst_entry_put(0);
46 std::atomic<uint64_t> rocksdb_num_sst_entry_delete(0);
47 std::atomic<uint64_t> rocksdb_num_sst_entry_singledelete(0);
48 std::atomic<uint64_t> rocksdb_num_sst_entry_merge(0);
49 std::atomic<uint64_t> rocksdb_num_sst_entry_other(0);
50 my_bool rocksdb_compaction_sequential_deletes_count_sd = false;
51 
Rdb_tbl_prop_coll(Rdb_ddl_manager * const ddl_manager,const Rdb_compact_params & params,const uint32_t cf_id,const uint8_t table_stats_sampling_pct)52 Rdb_tbl_prop_coll::Rdb_tbl_prop_coll(Rdb_ddl_manager *const ddl_manager,
53                                      const Rdb_compact_params &params,
54                                      const uint32_t cf_id,
55                                      const uint8_t table_stats_sampling_pct)
56     : m_cf_id(cf_id),
57       m_ddl_manager(ddl_manager),
58       m_last_stats(nullptr),
59       m_rows(0l),
60       m_window_pos(0l),
61       m_deleted_rows(0l),
62       m_max_deleted_rows(0l),
63       m_file_size(0),
64       m_params(params),
65       m_cardinality_collector(table_stats_sampling_pct),
66       m_recorded(false) {
67   DBUG_ASSERT(ddl_manager != nullptr);
68 
69   m_deleted_rows_window.resize(m_params.m_window, false);
70 }
71 
72 /*
73   This function is called by RocksDB for every key in the SST file
74 */
AddUserKey(const rocksdb::Slice & key,const rocksdb::Slice & value,rocksdb::EntryType type,rocksdb::SequenceNumber seq,uint64_t file_size)75 rocksdb::Status Rdb_tbl_prop_coll::AddUserKey(const rocksdb::Slice &key,
76                                               const rocksdb::Slice &value,
77                                               rocksdb::EntryType type,
78                                               rocksdb::SequenceNumber seq,
79                                               uint64_t file_size) {
80   if (key.size() >= 4) {
81     AdjustDeletedRows(type);
82 
83     m_rows++;
84 
85     CollectStatsForRow(key, value, type, file_size);
86   }
87 
88   return rocksdb::Status::OK();
89 }
90 
AdjustDeletedRows(rocksdb::EntryType type)91 void Rdb_tbl_prop_coll::AdjustDeletedRows(rocksdb::EntryType type) {
92   if (m_params.m_window > 0) {
93     // record the "is deleted" flag into the sliding window
94     // the sliding window is implemented as a circular buffer
95     // in m_deleted_rows_window vector
96     // the current position in the circular buffer is pointed at by
97     // m_rows % m_deleted_rows_window.size()
98     // m_deleted_rows is the current number of 1's in the vector
99     // --update the counter for the element which will be overridden
100     const bool is_delete = (type == rocksdb::kEntryDelete ||
101                             (type == rocksdb::kEntrySingleDelete &&
102                              rocksdb_compaction_sequential_deletes_count_sd));
103 
104     // Only make changes if the value at the current position needs to change
105     if (is_delete != m_deleted_rows_window[m_window_pos]) {
106       // Set or clear the flag at the current position as appropriate
107       m_deleted_rows_window[m_window_pos] = is_delete;
108       if (!is_delete) {
109         m_deleted_rows--;
110       } else if (++m_deleted_rows > m_max_deleted_rows) {
111         m_max_deleted_rows = m_deleted_rows;
112       }
113     }
114 
115     if (++m_window_pos == m_params.m_window) {
116       m_window_pos = 0;
117     }
118   }
119 }
120 
AccessStats(const rocksdb::Slice & key)121 Rdb_index_stats *Rdb_tbl_prop_coll::AccessStats(const rocksdb::Slice &key) {
122   GL_INDEX_ID gl_index_id;
123   gl_index_id.cf_id = m_cf_id;
124   gl_index_id.index_id = rdb_netbuf_to_uint32(reinterpret_cast<const uchar*>(key.data()));
125 
126   if (m_last_stats == nullptr || m_last_stats->m_gl_index_id != gl_index_id) {
127     m_keydef = nullptr;
128 
129     // starting a new table
130     // add the new element into m_stats
131     m_stats.emplace_back(gl_index_id);
132     m_last_stats = &m_stats.back();
133 
134     if (m_ddl_manager) {
135       // safe_find() returns a std::shared_ptr<Rdb_key_def> with the count
136       // incremented (so it can't be deleted out from under us) and with
137       // the mutex locked (if setup has not occurred yet).  We must make
138       // sure to free the mutex (via unblock_setup()) when we are done
139       // with this object.  Currently this happens earlier in this function
140       // when we are switching to a new Rdb_key_def and when this object
141       // is destructed.
142       m_keydef = m_ddl_manager->safe_find(gl_index_id);
143       if (m_keydef != nullptr) {
144         // resize the array to the number of columns.
145         // It will be initialized with zeroes
146         m_last_stats->m_distinct_keys_per_prefix.resize(
147             m_keydef->get_key_parts());
148         m_last_stats->m_name = m_keydef->get_name();
149       }
150     }
151     m_cardinality_collector.Reset();
152   }
153 
154   return m_last_stats;
155 }
156 
CollectStatsForRow(const rocksdb::Slice & key,const rocksdb::Slice & value,const rocksdb::EntryType & type,const uint64_t file_size)157 void Rdb_tbl_prop_coll::CollectStatsForRow(const rocksdb::Slice &key,
158                                            const rocksdb::Slice &value,
159                                            const rocksdb::EntryType &type,
160                                            const uint64_t file_size) {
161   auto stats = AccessStats(key);
162 
163   stats->m_data_size += key.size() + value.size();
164 
165   // Incrementing per-index entry-type statistics
166   switch (type) {
167     case rocksdb::kEntryPut:
168       stats->m_rows++;
169       break;
170     case rocksdb::kEntryDelete:
171       stats->m_entry_deletes++;
172       break;
173     case rocksdb::kEntrySingleDelete:
174       stats->m_entry_single_deletes++;
175       break;
176     case rocksdb::kEntryMerge:
177       stats->m_entry_merges++;
178       break;
179     case rocksdb::kEntryOther:
180       stats->m_entry_others++;
181       break;
182     default:
183       // NO_LINT_DEBUG
184       sql_print_error(
185           "RocksDB: Unexpected entry type found: %u. "
186           "This should not happen so aborting the system.",
187           type);
188       abort();
189       break;
190   }
191 
192   stats->m_actual_disk_size += file_size - m_file_size;
193   m_file_size = file_size;
194 
195   if (m_keydef != nullptr) {
196     m_cardinality_collector.ProcessKey(key, m_keydef.get(), stats);
197   }
198 }
199 
200 const char *Rdb_tbl_prop_coll::INDEXSTATS_KEY = "__indexstats__";
201 
202 /*
203   This function is called by RocksDB to compute properties to store in sst file
204 */
Finish(rocksdb::UserCollectedProperties * const properties)205 rocksdb::Status Rdb_tbl_prop_coll::Finish(
206     rocksdb::UserCollectedProperties *const properties) {
207   uint64_t num_sst_entry_put = 0;
208   uint64_t num_sst_entry_delete = 0;
209   uint64_t num_sst_entry_singledelete = 0;
210   uint64_t num_sst_entry_merge = 0;
211   uint64_t num_sst_entry_other = 0;
212 
213   DBUG_ASSERT(properties != nullptr);
214 
215   for (auto it = m_stats.begin(); it != m_stats.end(); it++) {
216     num_sst_entry_put += it->m_rows;
217     num_sst_entry_delete += it->m_entry_deletes;
218     num_sst_entry_singledelete += it->m_entry_single_deletes;
219     num_sst_entry_merge += it->m_entry_merges;
220     num_sst_entry_other += it->m_entry_others;
221   }
222 
223   if (!m_recorded) {
224     if (num_sst_entry_put > 0) {
225       rocksdb_num_sst_entry_put += num_sst_entry_put;
226     }
227 
228     if (num_sst_entry_delete > 0) {
229       rocksdb_num_sst_entry_delete += num_sst_entry_delete;
230     }
231 
232     if (num_sst_entry_singledelete > 0) {
233       rocksdb_num_sst_entry_singledelete += num_sst_entry_singledelete;
234     }
235 
236     if (num_sst_entry_merge > 0) {
237       rocksdb_num_sst_entry_merge += num_sst_entry_merge;
238     }
239 
240     if (num_sst_entry_other > 0) {
241       rocksdb_num_sst_entry_other += num_sst_entry_other;
242     }
243 
244     for (Rdb_index_stats &stat : m_stats) {
245       m_cardinality_collector.AdjustStats(&stat);
246     }
247     m_recorded = true;
248   }
249   properties->insert({INDEXSTATS_KEY, Rdb_index_stats::materialize(m_stats)});
250   return rocksdb::Status::OK();
251 }
252 
NeedCompact() const253 bool Rdb_tbl_prop_coll::NeedCompact() const {
254   return m_params.m_deletes && (m_params.m_window > 0) &&
255          (m_file_size > m_params.m_file_size) &&
256          (m_max_deleted_rows > m_params.m_deletes);
257 }
258 
259 /*
260   Returns the same as above, but in human-readable way for logging
261 */
GetReadableProperties() const262 rocksdb::UserCollectedProperties Rdb_tbl_prop_coll::GetReadableProperties()
263     const {
264   std::string s;
265 #ifdef DBUG_OFF
266   s.append("[...");
267   s.append(std::to_string(m_stats.size()));
268   s.append("  records...]");
269 #else
270   bool first = true;
271   for (auto it : m_stats) {
272     if (first) {
273       first = false;
274     } else {
275       s.append(",");
276     }
277     s.append(GetReadableStats(it));
278   }
279 #endif
280   return rocksdb::UserCollectedProperties{{INDEXSTATS_KEY, s}};
281 }
282 
GetReadableStats(const Rdb_index_stats & it)283 std::string Rdb_tbl_prop_coll::GetReadableStats(const Rdb_index_stats &it) {
284   std::string s;
285   s.append("(");
286   s.append(std::to_string(it.m_gl_index_id.cf_id));
287   s.append(", ");
288   s.append(std::to_string(it.m_gl_index_id.index_id));
289   s.append("):{name:");
290   s.append(it.m_name);
291   s.append(", size:");
292   s.append(std::to_string(it.m_data_size));
293   s.append(", m_rows:");
294   s.append(std::to_string(it.m_rows));
295   s.append(", m_actual_disk_size:");
296   s.append(std::to_string(it.m_actual_disk_size));
297   s.append(", deletes:");
298   s.append(std::to_string(it.m_entry_deletes));
299   s.append(", single_deletes:");
300   s.append(std::to_string(it.m_entry_single_deletes));
301   s.append(", merges:");
302   s.append(std::to_string(it.m_entry_merges));
303   s.append(", others:");
304   s.append(std::to_string(it.m_entry_others));
305   s.append(", distincts per prefix: [");
306   for (auto num : it.m_distinct_keys_per_prefix) {
307     s.append(std::to_string(num));
308     s.append(" ");
309   }
310   s.append("]}");
311   return s;
312 }
313 
314 /*
315   Given the properties of an SST file, reads the stats from it and returns it.
316 */
317 
read_stats_from_tbl_props(const std::shared_ptr<const rocksdb::TableProperties> & table_props,std::vector<Rdb_index_stats> * const out_stats_vector)318 void Rdb_tbl_prop_coll::read_stats_from_tbl_props(
319     const std::shared_ptr<const rocksdb::TableProperties> &table_props,
320     std::vector<Rdb_index_stats> *const out_stats_vector) {
321   DBUG_ASSERT(out_stats_vector != nullptr);
322   const auto &user_properties = table_props->user_collected_properties;
323   const auto it2 = user_properties.find(std::string(INDEXSTATS_KEY));
324   if (it2 != user_properties.end()) {
325     auto result MY_ATTRIBUTE((__unused__)) =
326         Rdb_index_stats::unmaterialize(it2->second, out_stats_vector);
327     DBUG_ASSERT(result == 0);
328   }
329 }
330 
331 /*
332   Serializes an array of Rdb_index_stats into a network string.
333 */
materialize(const std::vector<Rdb_index_stats> & stats)334 std::string Rdb_index_stats::materialize(
335     const std::vector<Rdb_index_stats> &stats) {
336   String ret;
337   rdb_netstr_append_uint16(&ret, INDEX_STATS_VERSION_ENTRY_TYPES);
338   for (const auto &i : stats) {
339     rdb_netstr_append_uint32(&ret, i.m_gl_index_id.cf_id);
340     rdb_netstr_append_uint32(&ret, i.m_gl_index_id.index_id);
341     DBUG_ASSERT(sizeof i.m_data_size <= 8);
342     rdb_netstr_append_uint64(&ret, i.m_data_size);
343     rdb_netstr_append_uint64(&ret, i.m_rows);
344     rdb_netstr_append_uint64(&ret, i.m_actual_disk_size);
345     rdb_netstr_append_uint64(&ret, i.m_distinct_keys_per_prefix.size());
346     rdb_netstr_append_uint64(&ret, i.m_entry_deletes);
347     rdb_netstr_append_uint64(&ret, i.m_entry_single_deletes);
348     rdb_netstr_append_uint64(&ret, i.m_entry_merges);
349     rdb_netstr_append_uint64(&ret, i.m_entry_others);
350     for (const auto &num_keys : i.m_distinct_keys_per_prefix) {
351       rdb_netstr_append_uint64(&ret, num_keys);
352     }
353   }
354 
355   return std::string((char *)ret.ptr(), ret.length());
356 }
357 
358 /**
359   @brief
360   Reads an array of Rdb_index_stats from a string.
361   @return HA_EXIT_FAILURE if it detects any inconsistency in the input
362   @return HA_EXIT_SUCCESS if completes successfully
363 */
unmaterialize(const std::string & s,std::vector<Rdb_index_stats> * const ret)364 int Rdb_index_stats::unmaterialize(const std::string &s,
365                                    std::vector<Rdb_index_stats> *const ret) {
366   const uchar *p = rdb_std_str_to_uchar_ptr(s);
367   const uchar *const p2 = p + s.size();
368 
369   DBUG_ASSERT(ret != nullptr);
370 
371   if (p + 2 > p2) {
372     return HA_EXIT_FAILURE;
373   }
374 
375   const int version = rdb_netbuf_read_uint16(&p);
376   Rdb_index_stats stats;
377   // Make sure version is within supported range.
378   if (version < INDEX_STATS_VERSION_INITIAL ||
379       version > INDEX_STATS_VERSION_ENTRY_TYPES) {
380     // NO_LINT_DEBUG
381     sql_print_error(
382         "Index stats version %d was outside of supported range. "
383         "This should not happen so aborting the system.",
384         version);
385     abort();
386   }
387 
388   size_t needed = sizeof(stats.m_gl_index_id.cf_id) +
389                   sizeof(stats.m_gl_index_id.index_id) +
390                   sizeof(stats.m_data_size) + sizeof(stats.m_rows) +
391                   sizeof(stats.m_actual_disk_size) + sizeof(uint64);
392   if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
393     needed += sizeof(stats.m_entry_deletes) +
394               sizeof(stats.m_entry_single_deletes) +
395               sizeof(stats.m_entry_merges) + sizeof(stats.m_entry_others);
396   }
397 
398   while (p < p2) {
399     if (p + needed > p2) {
400       return HA_EXIT_FAILURE;
401     }
402     rdb_netbuf_read_gl_index(&p, &stats.m_gl_index_id);
403     stats.m_data_size = rdb_netbuf_read_uint64(&p);
404     stats.m_rows = rdb_netbuf_read_uint64(&p);
405     stats.m_actual_disk_size = rdb_netbuf_read_uint64(&p);
406     stats.m_distinct_keys_per_prefix.resize(rdb_netbuf_read_uint64(&p));
407     if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
408       stats.m_entry_deletes = rdb_netbuf_read_uint64(&p);
409       stats.m_entry_single_deletes = rdb_netbuf_read_uint64(&p);
410       stats.m_entry_merges = rdb_netbuf_read_uint64(&p);
411       stats.m_entry_others = rdb_netbuf_read_uint64(&p);
412     }
413     if (p + stats.m_distinct_keys_per_prefix.size() *
414                 sizeof(stats.m_distinct_keys_per_prefix[0]) >
415         p2) {
416       return HA_EXIT_FAILURE;
417     }
418     for (std::size_t i = 0; i < stats.m_distinct_keys_per_prefix.size(); i++) {
419       stats.m_distinct_keys_per_prefix[i] = rdb_netbuf_read_uint64(&p);
420     }
421     ret->push_back(stats);
422   }
423   return HA_EXIT_SUCCESS;
424 }
425 
426 /*
427   Merges one Rdb_index_stats into another. Can be used to come up with the stats
428   for the index based on stats for each sst
429 */
merge(const Rdb_index_stats & s,const bool increment,const int64_t estimated_data_len)430 void Rdb_index_stats::merge(const Rdb_index_stats &s, const bool increment,
431                             const int64_t estimated_data_len) {
432   std::size_t i;
433 
434   DBUG_ASSERT(estimated_data_len >= 0);
435 
436   m_gl_index_id = s.m_gl_index_id;
437   if (m_distinct_keys_per_prefix.size() < s.m_distinct_keys_per_prefix.size()) {
438     m_distinct_keys_per_prefix.resize(s.m_distinct_keys_per_prefix.size());
439   }
440   if (increment) {
441     m_rows += s.m_rows;
442     m_data_size += s.m_data_size;
443 
444     /*
445       The Data_length and Avg_row_length are trailing statistics, meaning
446       they don't get updated for the current SST until the next SST is
447       written.  So, if rocksdb reports the data_length as 0,
448       we make a reasoned estimate for the data_file_length for the
449       index in the current SST.
450     */
451     m_actual_disk_size += s.m_actual_disk_size ? s.m_actual_disk_size
452                                                : estimated_data_len * s.m_rows;
453     m_entry_deletes += s.m_entry_deletes;
454     m_entry_single_deletes += s.m_entry_single_deletes;
455     m_entry_merges += s.m_entry_merges;
456     m_entry_others += s.m_entry_others;
457     for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
458       m_distinct_keys_per_prefix[i] += s.m_distinct_keys_per_prefix[i];
459     }
460   } else {
461     m_rows -= s.m_rows;
462     m_data_size -= s.m_data_size;
463     m_actual_disk_size -= s.m_actual_disk_size ? s.m_actual_disk_size
464                                                : estimated_data_len * s.m_rows;
465     m_entry_deletes -= s.m_entry_deletes;
466     m_entry_single_deletes -= s.m_entry_single_deletes;
467     m_entry_merges -= s.m_entry_merges;
468     m_entry_others -= s.m_entry_others;
469     for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
470       m_distinct_keys_per_prefix[i] -= s.m_distinct_keys_per_prefix[i];
471     }
472   }
473 }
474 
Rdb_tbl_card_coll(const uint8_t table_stats_sampling_pct)475 Rdb_tbl_card_coll::Rdb_tbl_card_coll(const uint8_t table_stats_sampling_pct)
476     : m_table_stats_sampling_pct(table_stats_sampling_pct),
477       m_seed(time(nullptr)) {}
478 
IsSampingDisabled()479 bool Rdb_tbl_card_coll::IsSampingDisabled() {
480   // Zero means that we'll use all the keys to update statistics.
481   return m_table_stats_sampling_pct == 0 ||
482          RDB_TBL_STATS_SAMPLE_PCT_MAX == m_table_stats_sampling_pct;
483 }
484 
ShouldCollectStats()485 bool Rdb_tbl_card_coll::ShouldCollectStats() {
486   if (IsSampingDisabled()) {
487     return true;  // collect every key
488   }
489 
490   const int val = rand_r(&m_seed) % (RDB_TBL_STATS_SAMPLE_PCT_MAX -
491                                      RDB_TBL_STATS_SAMPLE_PCT_MIN + 1) +
492                   RDB_TBL_STATS_SAMPLE_PCT_MIN;
493 
494   DBUG_ASSERT(val >= RDB_TBL_STATS_SAMPLE_PCT_MIN);
495   DBUG_ASSERT(val <= RDB_TBL_STATS_SAMPLE_PCT_MAX);
496 
497   return val <= m_table_stats_sampling_pct;
498 }
499 
ProcessKey(const rocksdb::Slice & key,const Rdb_key_def * keydef,Rdb_index_stats * stats)500 void Rdb_tbl_card_coll::ProcessKey(const rocksdb::Slice &key,
501                                    const Rdb_key_def *keydef,
502                                    Rdb_index_stats *stats) {
503   if (ShouldCollectStats()) {
504     std::size_t column = 0;
505     bool new_key = true;
506 
507     if (!m_last_key.empty()) {
508       rocksdb::Slice last(m_last_key.data(), m_last_key.size());
509       new_key = (keydef->compare_keys(&last, &key, &column) == 0);
510     }
511 
512     if (new_key) {
513       DBUG_ASSERT(column <= stats->m_distinct_keys_per_prefix.size());
514 
515       for (auto i = column; i < stats->m_distinct_keys_per_prefix.size(); i++) {
516         stats->m_distinct_keys_per_prefix[i]++;
517       }
518 
519       // assign new last_key for the next call
520       // however, we only need to change the last key
521       // if one of the first n-1 columns is different
522       // If the n-1 prefix is the same, no sense in storing
523       // the new key
524       if (column < stats->m_distinct_keys_per_prefix.size()) {
525         m_last_key.assign(key.data(), key.size());
526       }
527     }
528   }
529 }
530 
Reset()531 void Rdb_tbl_card_coll::Reset() { m_last_key.clear(); }
532 
533 // We need to adjust the index cardinality numbers based on the sampling
534 // rate so that the output of "SHOW INDEX" command will reflect reality
535 // more closely. It will still be an approximation, just a better one.
AdjustStats(Rdb_index_stats * stats)536 void Rdb_tbl_card_coll::AdjustStats(Rdb_index_stats *stats) {
537   if (IsSampingDisabled()) {
538     // no sampling was done, return as stats is
539     return;
540   }
541   for (int64_t &num_keys : stats->m_distinct_keys_per_prefix) {
542     num_keys = num_keys * 100 / m_table_stats_sampling_pct;
543   }
544 }
545 
546 }  // namespace myrocks
547