1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDB is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22 ======= */
23
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26 #include "tokudb_sysvars.h"
27 #include "toku_time.h"
28
29 #include "my_check_opt.h"
30
31 namespace tokudb {
32 namespace analyze {
33
34 class recount_rows_t : public tokudb::background::job_manager_t::job_t {
35 public:
36 void* operator new(size_t sz);
37 void operator delete(void* p);
38
39 recount_rows_t(
40 bool user_schedued,
41 THD* thd,
42 TOKUDB_SHARE* share,
43 DB_TXN* txn);
44
45 virtual ~recount_rows_t();
46
47 virtual const char* key();
48 virtual const char* database();
49 virtual const char* table();
50 virtual const char* type();
51 virtual const char* parameters();
52 virtual const char* status();
53
54 protected:
55 virtual void on_run();
56
57 virtual void on_destroy();
58
59 private:
60 // to be provided by the initiator of recount rows
61 THD* _thd;
62 TOKUDB_SHARE* _share;
63 DB_TXN* _txn;
64 ulonglong _throttle;
65
66 // for recount rows status reporting
67 char _parameters[256];
68 char _status[1024];
69 int _result;
70 ulonglong _recount_start; // in microseconds
71 ulonglong _total_elapsed_time; // in microseconds
72
73 bool _local_txn;
74 ulonglong _rows;
75 ulonglong _deleted_rows;
76 ulonglong _ticks;
77
78 static int analyze_recount_rows_progress(
79 uint64_t count,
80 uint64_t deleted,
81 void* extra);
82 int analyze_recount_rows_progress(uint64_t count, uint64_t deleted);
83 };
84
operator new(size_t sz)85 void* recount_rows_t::operator new(size_t sz) {
86 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
87 }
operator delete(void * p)88 void recount_rows_t::operator delete(void* p) {
89 tokudb::memory::free(p);
90 }
recount_rows_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)91 recount_rows_t::recount_rows_t(
92 bool user_scheduled,
93 THD* thd,
94 TOKUDB_SHARE* share,
95 DB_TXN* txn) :
96 tokudb::background::job_manager_t::job_t(user_scheduled),
97 _share(share),
98 _result(HA_ADMIN_OK),
99 _recount_start(0),
100 _total_elapsed_time(0),
101 _local_txn(false),
102 _rows(0),
103 _deleted_rows(0),
104 _ticks(0) {
105
106 assert_debug(thd != NULL);
107 assert_debug(share != NULL);
108
109 if (tokudb::sysvars::analyze_in_background(thd)) {
110 _thd = NULL;
111 _txn = NULL;
112 } else {
113 _thd = thd;
114 _txn = txn;
115 }
116
117 _throttle = tokudb::sysvars::analyze_throttle(thd);
118
119 snprintf(_parameters,
120 sizeof(_parameters),
121 "TOKUDB_ANALYZE_THROTTLE=%llu;",
122 _throttle);
123 _status[0] = '\0';
124 }
~recount_rows_t()125 recount_rows_t::~recount_rows_t() {
126 }
on_run()127 void recount_rows_t::on_run() {
128 const char* orig_proc_info = NULL;
129 if (_thd)
130 orig_proc_info = tokudb_thd_get_proc_info(_thd);
131 _recount_start = tokudb::time::microsec();
132 _total_elapsed_time = 0;
133
134 if (_txn == NULL) {
135 _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
136
137 if (_result != 0) {
138 _txn = NULL;
139 _result = HA_ADMIN_FAILED;
140 goto error;
141 }
142 _local_txn = true;
143 } else {
144 _local_txn = false;
145 }
146
147 _result =
148 _share->file->recount_rows(
149 _share->file,
150 analyze_recount_rows_progress,
151 this);
152
153 if (_result != 0) {
154 if (_local_txn) {
155 _txn->abort(_txn);
156 _txn = NULL;
157 }
158 _result = HA_ADMIN_FAILED;
159 goto error;
160 }
161
162 DB_BTREE_STAT64 dict_stats;
163 _result = _share->file->stat64(_share->file, _txn, &dict_stats);
164 if (_result == 0) {
165 _share->set_row_count(dict_stats.bt_ndata, false);
166 }
167 if (_result != 0)
168 _result = HA_ADMIN_FAILED;
169
170 if (_local_txn) {
171 if (_result == HA_ADMIN_OK) {
172 _txn->commit(_txn, 0);
173 } else {
174 _txn->abort(_txn);
175 }
176 _txn = NULL;
177 }
178
179 sql_print_information(
180 "tokudb analyze recount rows %d counted %lld",
181 _result,
182 _share->row_count());
183 error:
184 if(_thd)
185 tokudb_thd_set_proc_info(_thd, orig_proc_info);
186 return;
187 }
on_destroy()188 void recount_rows_t::on_destroy() {
189 _share->release();
190 }
key()191 const char* recount_rows_t::key() {
192 return _share->full_table_name();
193 }
database()194 const char* recount_rows_t::database() {
195 return _share->database_name();
196 }
table()197 const char* recount_rows_t::table() {
198 return _share->table_name();
199 }
type()200 const char* recount_rows_t::type() {
201 static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS";
202 return type;
203 }
parameters()204 const char* recount_rows_t::parameters() {
205 return _parameters;
206 }
status()207 const char* recount_rows_t::status() {
208 return _status;
209 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted,void * extra)210 int recount_rows_t::analyze_recount_rows_progress(
211 uint64_t count,
212 uint64_t deleted,
213 void* extra) {
214
215 recount_rows_t* context = (recount_rows_t*)extra;
216 return context->analyze_recount_rows_progress(count, deleted);
217 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted)218 int recount_rows_t::analyze_recount_rows_progress(
219 uint64_t count,
220 uint64_t deleted) {
221
222 _rows = count;
223 _deleted_rows += deleted;
224 deleted > 0 ? _ticks += deleted : _ticks++;
225
226 if (_ticks > 1000) {
227 _ticks = 0;
228 uint64_t now = tokudb::time::microsec();
229 _total_elapsed_time = now - _recount_start;
230 if ((_thd && thd_killed(_thd)) || cancelled()) {
231 // client killed
232 return ER_ABORTING_CONNECTION;
233 }
234
235 // rebuild status
236 // There is a slight race condition here,
237 // _status is used here for tokudb_thd_set_proc_info and it is also used
238 // for the status column in i_s.background_job_status.
239 // If someone happens to be querying/building the i_s table
240 // at the exact same time that the status is being rebuilt here,
241 // the i_s table could get some garbage status.
242 // This solution is a little heavy handed but it works, it prevents us
243 // from changing the status while someone might be immediately observing
244 // us and it prevents someone from observing us while we change the
245 // status
246 tokudb::background::_job_manager->lock();
247 snprintf(_status,
248 sizeof(_status),
249 "recount_rows %s.%s counted %llu rows and %llu deleted "
250 "in %llu seconds.",
251 _share->database_name(),
252 _share->table_name(),
253 _rows,
254 _deleted_rows,
255 _total_elapsed_time / tokudb::time::MICROSECONDS);
256 tokudb::background::_job_manager->unlock();
257
258 // report
259 if (_thd)
260 tokudb_thd_set_proc_info(_thd, _status);
261
262 // throttle
263 // given the throttle value, lets calculate the maximum number of rows
264 // we should have seen so far in a .1 sec resolution
265 if (_throttle > 0) {
266 uint64_t estimated_rows = _total_elapsed_time / 100000;
267 estimated_rows = estimated_rows * (_throttle / 10);
268 if (_rows + _deleted_rows > estimated_rows) {
269 // sleep for 1/10 of a second
270 tokudb::time::sleep_microsec(100000);
271 }
272 }
273 }
274 return 0;
275 }
276
277 class standard_t : public tokudb::background::job_manager_t::job_t {
278 public:
279 void* operator new(size_t sz);
280 void operator delete(void* p);
281
282 standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn);
283
284 virtual ~standard_t();
285
286 virtual const char* key(void);
287 virtual const char* database();
288 virtual const char* table();
289 virtual const char* type();
290 virtual const char* parameters();
291 virtual const char* status();
292
293 protected:
294 virtual void on_run();
295
296 virtual void on_destroy();
297
298 private:
299 // to be provided by initiator of analyze
300 THD* _thd;
301 TOKUDB_SHARE* _share;
302 DB_TXN* _txn;
303 ulonglong _throttle; // in microseconds
304 ulonglong _time_limit; // in microseconds
305 double _delete_fraction;
306
307 // for analyze status reporting, may also use other state
308 char _parameters[256];
309 char _status[1024];
310 int _result;
311 ulonglong _analyze_start; // in microseconds
312 ulonglong _total_elapsed_time; // in microseconds
313
314 // for analyze internal use, pretty much these are per-key/index
315 ulonglong _current_key;
316 bool _local_txn;
317 ulonglong _half_time;
318 ulonglong _half_rows;
319 ulonglong _rows;
320 ulonglong _deleted_rows;
321 ulonglong _ticks;
322 ulonglong _analyze_key_start; // in microseconds
323 ulonglong _key_elapsed_time; // in microseconds
324 uint _scan_direction;
325
326 static bool analyze_standard_cursor_callback(
327 void* extra,
328 uint64_t deleted_rows);
329 bool analyze_standard_cursor_callback(uint64_t deleted_rows);
330
331 int analyze_key_progress();
332 int analyze_key(uint64_t* rec_per_key_part);
333 };
334
operator new(size_t sz)335 void* standard_t::operator new(size_t sz) {
336 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
337 }
operator delete(void * p)338 void standard_t::operator delete(void* p) {
339 tokudb::memory::free(p);
340 }
standard_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)341 standard_t::standard_t(
342 bool user_scheduled,
343 THD* thd,
344 TOKUDB_SHARE* share,
345 DB_TXN* txn) :
346 tokudb::background::job_manager_t::job_t(user_scheduled),
347 _share(share),
348 _result(HA_ADMIN_OK),
349 _analyze_start(0),
350 _total_elapsed_time(0),
351 _current_key(0),
352 _local_txn(false),
353 _half_time(0),
354 _half_rows(0),
355 _rows(0),
356 _deleted_rows(0),
357 _ticks(0),
358 _analyze_key_start(0),
359 _key_elapsed_time(0),
360 _scan_direction(0) {
361
362 assert_debug(thd != NULL);
363 assert_debug(share != NULL);
364
365 if (tokudb::sysvars::analyze_in_background(thd)) {
366 _thd = NULL;
367 _txn = NULL;
368 } else {
369 _thd = thd;
370 _txn = txn;
371 }
372 _throttle = tokudb::sysvars::analyze_throttle(thd);
373 _time_limit =
374 tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS;
375 _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd);
376
377 snprintf(_parameters,
378 sizeof(_parameters),
379 "TOKUDB_ANALYZE_DELETE_FRACTION=%f; "
380 "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;",
381 _delete_fraction,
382 _time_limit / tokudb::time::MICROSECONDS,
383 _throttle);
384
385 _status[0] = '\0';
386 }
~standard_t()387 standard_t::~standard_t() {
388 }
on_run()389 void standard_t::on_run() {
390 DB_BTREE_STAT64 stat64;
391 uint64_t rec_per_key_part[_share->_max_key_parts];
392 uint64_t total_key_parts = 0;
393 const char* orig_proc_info = NULL;
394 if (_thd)
395 orig_proc_info = tokudb_thd_get_proc_info(_thd);
396
397 _analyze_start = tokudb::time::microsec();
398 _half_time = _time_limit > 0 ? _time_limit/2 : 0;
399
400 if (_txn == NULL) {
401 _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
402
403 if (_result != 0) {
404 _txn = NULL;
405 _result = HA_ADMIN_FAILED;
406 goto error;
407 }
408 _local_txn = true;
409 } else {
410 _local_txn = false;
411 }
412
413 assert_always(_share->key_file[0] != NULL);
414 _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64);
415 if (_result != 0) {
416 _result = HA_ADMIN_FAILED;
417 goto cleanup;
418 }
419 _half_rows = stat64.bt_ndata / 2;
420
421 for (ulonglong current_key = 0;
422 _result == HA_ADMIN_OK && current_key < _share->_keys;
423 current_key++) {
424
425 _current_key = current_key;
426 _rows = _deleted_rows = _ticks = 0;
427 _result = analyze_key(&rec_per_key_part[total_key_parts]);
428
429 if ((_result != 0 && _result != ETIME) ||
430 (_result != 0 && _rows == 0 && _deleted_rows > 0)) {
431 _result = HA_ADMIN_FAILED;
432 }
433 if (_thd && (_result == HA_ADMIN_FAILED ||
434 static_cast<double>(_deleted_rows) >
435 _delete_fraction * (_rows + _deleted_rows))) {
436
437 char name[256]; int namelen;
438 namelen =
439 snprintf(
440 name,
441 sizeof(name),
442 "%s.%s.%s",
443 _share->database_name(),
444 _share->table_name(),
445 _share->_key_descriptors[_current_key]._name);
446 _thd->get_protocol()->start_row();
447 _thd->get_protocol()->store(name, namelen, system_charset_info);
448 _thd->get_protocol()->store("analyze", 7, system_charset_info);
449 _thd->get_protocol()->store("info", 4, system_charset_info);
450 char rowmsg[256];
451 int rowmsglen;
452 rowmsglen =
453 snprintf(
454 rowmsg,
455 sizeof(rowmsg),
456 "rows processed %llu rows deleted %llu",
457 _rows,
458 _deleted_rows);
459 _thd->get_protocol()->store(rowmsg, rowmsglen,
460 system_charset_info);
461 _thd->get_protocol()->end_row();
462
463 sql_print_information(
464 "tokudb analyze on %.*s %.*s",
465 namelen,
466 name,
467 rowmsglen,
468 rowmsg);
469 }
470
471 total_key_parts += _share->_key_descriptors[_current_key]._parts;
472 }
473 if (_result == HA_ADMIN_OK) {
474 int error =
475 tokudb::set_card_in_status(
476 _share->status_block,
477 _txn,
478 total_key_parts,
479 rec_per_key_part);
480 if (error)
481 _result = HA_ADMIN_FAILED;
482
483 _share->lock();
484 _share->update_cardinality_counts(total_key_parts, rec_per_key_part);
485 _share->allow_auto_analysis(true);
486 _share->unlock();
487 }
488
489 cleanup:
490 if (_local_txn) {
491 if (_result == HA_ADMIN_OK) {
492 _txn->commit(_txn, 0);
493 } else {
494 _txn->abort(_txn);
495 }
496 _txn = NULL;
497 }
498
499 error:
500 if (_thd)
501 tokudb_thd_set_proc_info(_thd, orig_proc_info);
502 return;
503 }
on_destroy()504 void standard_t::on_destroy() {
505 _share->lock();
506 _share->allow_auto_analysis(false);
507 _share->unlock();
508 _share->release();
509 }
key()510 const char* standard_t::key() {
511 return _share->full_table_name();
512 }
database()513 const char* standard_t::database() {
514 return _share->database_name();
515 }
table()516 const char* standard_t::table() {
517 return _share->table_name();
518 }
type()519 const char* standard_t::type() {
520 static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD";
521 return type;
522 }
parameters()523 const char* standard_t::parameters() {
524 return _parameters;
525 }
status()526 const char* standard_t::status() {
527 return _status;
528 }
analyze_standard_cursor_callback(void * extra,uint64_t deleted_rows)529 bool standard_t::analyze_standard_cursor_callback(
530 void* extra,
531 uint64_t deleted_rows) {
532 standard_t* context = (standard_t*)extra;
533 return context->analyze_standard_cursor_callback(deleted_rows);
534 }
analyze_standard_cursor_callback(uint64_t deleted_rows)535 bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) {
536 _deleted_rows += deleted_rows;
537 _ticks += deleted_rows;
538 return analyze_key_progress() != 0;
539 }
analyze_key_progress(void)540 int standard_t::analyze_key_progress(void) {
541 if (_ticks > 1000) {
542 _ticks = 0;
543 uint64_t now = tokudb::time::microsec();
544 _total_elapsed_time = now - _analyze_start;
545 _key_elapsed_time = now - _analyze_key_start;
546 if ((_thd && thd_killed(_thd)) || cancelled()) {
547 // client killed
548 return ER_ABORTING_CONNECTION;
549 } else if (_time_limit > 0 &&
550 static_cast<uint64_t>(_key_elapsed_time) > _time_limit) {
551 // time limit reached
552 return ETIME;
553 }
554
555 // rebuild status
556 // There is a slight race condition here,
557 // _status is used here for tokudb_thd_set_proc_info and it is also used
558 // for the status column in i_s.background_job_status.
559 // If someone happens to be querying/building the i_s table
560 // at the exact same time that the status is being rebuilt here,
561 // the i_s table could get some garbage status.
562 // This solution is a little heavy handed but it works, it prevents us
563 // from changing the status while someone might be immediately observing
564 // us and it prevents someone from observing us while we change the
565 // status.
566 static const char* scan_direction_str[] = {"not scanning",
567 "scanning forward",
568 "scanning backward",
569 "scan unknown"};
570
571 const char* scan_direction = NULL;
572 switch (_scan_direction) {
573 case 0:
574 scan_direction = scan_direction_str[0];
575 break;
576 case DB_NEXT:
577 scan_direction = scan_direction_str[1];
578 break;
579 case DB_PREV:
580 scan_direction = scan_direction_str[2];
581 break;
582 default:
583 scan_direction = scan_direction_str[3];
584 break;
585 }
586
587 float progress_rows = 0.0;
588 if (_share->row_count() > 0)
589 progress_rows = static_cast<float>(_rows) /
590 static_cast<float>(_share->row_count());
591 float progress_time = 0.0;
592 if (_time_limit > 0)
593 progress_time = static_cast<float>(_key_elapsed_time) /
594 static_cast<float>(_time_limit);
595 tokudb::background::_job_manager->lock();
596 snprintf(
597 _status,
598 sizeof(_status),
599 "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% "
600 "time, %s",
601 _share->database_name(),
602 _share->table_name(),
603 _share->_key_descriptors[_current_key]._name,
604 _current_key,
605 _share->_keys,
606 progress_rows * 100.0,
607 progress_time * 100.0,
608 scan_direction);
609 tokudb::background::_job_manager->unlock();
610
611 // report
612 if (_thd)
613 tokudb_thd_set_proc_info(_thd, _status);
614
615 // throttle
616 // given the throttle value, lets calculate the maximum number of rows
617 // we should have seen so far in a .1 sec resolution
618 if (_throttle > 0) {
619 uint64_t estimated_rows = _key_elapsed_time / 100000;
620 estimated_rows = estimated_rows * (_throttle / 10);
621 if (_rows + _deleted_rows > estimated_rows) {
622 // sleep for 1/10 of a second
623 tokudb::time::sleep_microsec(100000);
624 }
625 }
626 }
627 return 0;
628 }
analyze_key(uint64_t * rec_per_key_part)629 int standard_t::analyze_key(uint64_t* rec_per_key_part) {
630 int error = 0;
631 DB* db = _share->key_file[_current_key];
632 assert_always(db != NULL);
633 uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts;
634 uint64_t unique_rows[num_key_parts];
635 bool is_unique = _share->_key_descriptors[_current_key]._is_unique;
636 DBC* cursor = NULL;
637 int close_error = 0;
638 DBT key, prev_key;
639 bool copy_key = false;
640
641 _analyze_key_start = tokudb::time::microsec();
642 _key_elapsed_time = 0;
643 _scan_direction = DB_NEXT;
644
645 if (is_unique && num_key_parts == 1) {
646 // don't compute for unique keys with a single part. we already know
647 // the answer.
648 _rows = unique_rows[0] = 1;
649 goto done;
650 }
651
652 for (uint64_t i = 0; i < num_key_parts; i++)
653 unique_rows[i] = 1;
654
655 // stop looking when the entire dictionary was analyzed, or a
656 // cap on execution time was reached, or the analyze was killed.
657 while (1) {
658 if (cursor == NULL) {
659 error = db->cursor(db, _txn, &cursor, 0);
660 if (error != 0)
661 goto done;
662
663 cursor->c_set_check_interrupt_callback(
664 cursor,
665 analyze_standard_cursor_callback,
666 this);
667
668 memset(&key, 0, sizeof(DBT));
669 memset(&prev_key, 0, sizeof(DBT));
670 copy_key = true;
671 }
672
673 error = cursor->c_get(cursor, &key, 0, _scan_direction);
674 if (error != 0) {
675 if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED)
676 error = 0; // not an error
677 break;
678 } else if (cancelled()) {
679 error = ER_ABORTING_CONNECTION;
680 break;
681 }
682
683 _rows++;
684 _ticks++;
685
686 // if copy_key is false at this pont, we have some value sitting in
687 // prev_key that we can compare to
688 // if the comparison reveals a unique key, we must set copy_key to true
689 // so the code following can copy he current key into prev_key for the
690 // next iteration
691 if (copy_key == false) {
692 // compare this key with the previous key. ignore
693 // appended PK for SK's.
694 // TODO if a prefix is different, then all larger keys
695 // that include the prefix are also different.
696 // TODO if we are comparing the entire primary key or
697 // the entire unique secondary key, then the cardinality
698 // must be 1, so we can avoid computing it.
699 for (uint64_t i = 0; i < num_key_parts; i++) {
700 int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
701 if (cmp != 0) {
702 unique_rows[i]++;
703 copy_key = true;
704 }
705 }
706 }
707
708 // prev_key = key or prev_key is NULL
709 if (copy_key) {
710 prev_key.data =
711 tokudb::memory::realloc(
712 prev_key.data,
713 key.size,
714 MYF(MY_WME|MY_ZEROFILL|MY_FAE));
715 assert_always(prev_key.data);
716 prev_key.size = key.size;
717 memcpy(prev_key.data, key.data, prev_key.size);
718 copy_key = false;
719 }
720
721 error = analyze_key_progress();
722 if (error == ETIME) {
723 error = 0;
724 break;
725 } else if (error) {
726 break;
727 }
728
729 // if we have a time limit, are scanning forward and have exceed the
730 // _half_time and not passed the _half_rows number of the rows in the
731 // index: clean up the keys, close the cursor and reverse direction.
732 if (TOKUDB_UNLIKELY(_half_time > 0 &&
733 _scan_direction == DB_NEXT &&
734 _key_elapsed_time >= _half_time &&
735 _rows < _half_rows)) {
736
737 tokudb::memory::free(prev_key.data); prev_key.data = NULL;
738 close_error = cursor->c_close(cursor);
739 assert_always(close_error == 0);
740 cursor = NULL;
741 _scan_direction = DB_PREV;
742 }
743 }
744 // cleanup
745 if (prev_key.data) tokudb::memory::free(prev_key.data);
746 if (cursor) close_error = cursor->c_close(cursor);
747 assert_always(close_error == 0);
748
749 done:
750 // in case we timed out (bunch of deleted records) without hitting a
751 // single row
752 if (_rows == 0)
753 _rows = 1;
754
755 // return cardinality
756 for (uint64_t i = 0; i < num_key_parts; i++) {
757 rec_per_key_part[i] = _rows / unique_rows[i];
758 }
759 return error;
760 }
761
762 } // namespace analyze
763 } // namespace tokudb
764
765
analyze(THD * thd,TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))766 int ha_tokudb::analyze(THD *thd, TOKUDB_UNUSED(HA_CHECK_OPT *check_opt)) {
767 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
768 int result = HA_ADMIN_OK;
769 tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd);
770
771 switch (mode) {
772 case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS:
773 result = share->analyze_recount_rows(thd, transaction);
774 break;
775 case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD:
776 share->lock();
777 result = share->analyze_standard(thd, transaction);
778 share->unlock();
779 break;
780 case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL:
781 share->cancel_background_jobs();
782 break;
783 default:
784 break; // no-op
785 }
786 TOKUDB_HANDLER_DBUG_RETURN(result);
787 }
788
analyze_recount_rows(THD * thd,DB_TXN * txn)789 int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) {
790 TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
791
792 assert_always(thd != NULL);
793
794 int result = HA_ADMIN_OK;
795
796 tokudb::analyze::recount_rows_t* job
797 = new tokudb::analyze::recount_rows_t(true, thd, this, txn);
798 assert_always(job != NULL);
799
800 // job->destroy will drop the ref
801 addref();
802 unlock();
803
804 bool ret = tokudb::background::_job_manager->
805 run_job(job, tokudb::sysvars::analyze_in_background(thd));
806
807 if (!ret) {
808 job->destroy();
809 delete job;
810 result = HA_ADMIN_FAILED;
811 }
812
813 TOKUDB_HANDLER_DBUG_RETURN(result);
814 }
815
816 // on entry, if txn is !NULL, it is a user session invoking ANALYZE directly
817 // and no lock will be held on 'this', else if txn is NULL it is an auto and
818 // 'this' will be locked.
analyze_standard(THD * thd,DB_TXN * txn)819 int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) {
820 TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
821
822 assert_always(thd != NULL);
823 assert_debug(_mutex.is_owned_by_me() == true);
824
825 int result = HA_ADMIN_OK;
826
827 // stub out analyze if optimize is remapped to alter recreate + analyze
828 // when not auto analyze or if this is an alter
829 if ((txn &&
830 thd_sql_command(thd) != SQLCOM_ANALYZE &&
831 thd_sql_command(thd) != SQLCOM_ALTER_TABLE) ||
832 thd_sql_command(thd) == SQLCOM_ALTER_TABLE) {
833 TOKUDB_HANDLER_DBUG_RETURN(result);
834 }
835
836 tokudb::analyze::standard_t* job
837 = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd,
838 this, txn);
839 assert_always(job != NULL);
840
841 // akin to calling addref, but we know, right here, right now, everything
842 // in the share is set up, files open, etc...
843 // job->destroy will drop the ref
844 _use_count++;
845
846 // don't want any autos kicking off while we are analyzing
847 disallow_auto_analysis();
848
849 unlock();
850
851 bool ret =
852 tokudb::background::_job_manager->run_job(
853 job,
854 tokudb::sysvars::analyze_in_background(thd));
855
856 if (!ret) {
857 job->destroy();
858 delete job;
859 result = HA_ADMIN_FAILED;
860 }
861
862 lock();
863
864 TOKUDB_HANDLER_DBUG_RETURN(result);
865 }
866
867
868 typedef struct hot_optimize_context {
869 THD* thd;
870 char* write_status_msg;
871 ha_tokudb* ha;
872 uint progress_stage;
873 uint current_table;
874 uint num_tables;
875 float progress_limit;
876 uint64_t progress_last_time;
877 uint64_t throttle;
878 } *HOT_OPTIMIZE_CONTEXT;
879
hot_optimize_progress_fun(void * extra,float progress)880 static int hot_optimize_progress_fun(void *extra, float progress) {
881 HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
882 if (thd_killed(context->thd)) {
883 sprintf(
884 context->write_status_msg,
885 "The process has been killed, aborting hot optimize.");
886 return ER_ABORTING_CONNECTION;
887 }
888 float percentage = progress * 100;
889 sprintf(
890 context->write_status_msg,
891 "Optimization of index %u of %u about %.lf%% done",
892 context->current_table + 1,
893 context->num_tables,
894 percentage);
895 thd_proc_info(context->thd, context->write_status_msg);
896 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
897 if (context->progress_stage < context->current_table) {
898 // the progress stage is behind the current table, so move up
899 // to the next stage and set the progress stage to current.
900 thd_progress_next_stage(context->thd);
901 context->progress_stage = context->current_table;
902 }
903 // the percentage we report here is for the current stage/db
904 thd_progress_report(context->thd, (unsigned long long) percentage, 100);
905 #endif
906
907 // throttle the optimize table
908 if (context->throttle) {
909 uint64_t time_now = toku_current_time_microsec();
910 uint64_t dt = time_now - context->progress_last_time;
911 uint64_t throttle_time = 1000000ULL / context->throttle;
912 if (throttle_time > dt) {
913 usleep(throttle_time - dt);
914 }
915 context->progress_last_time = toku_current_time_microsec();
916 }
917
918 // return 1 if progress has reach the progress limit
919 return progress >= context->progress_limit;
920 }
921
922 // flatten all DB's in this table, to do so, peform hot optimize on each db
do_optimize(THD * thd)923 int ha_tokudb::do_optimize(THD* thd) {
924 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
925 int error = 0;
926 const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
927 uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
928
929 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
930 // each DB is its own stage. as HOT goes through each db, we'll
931 // move on to the next stage.
932 thd_progress_init(thd, curr_num_DBs);
933 #endif
934
935 // for each DB, run optimize and hot_optimize
936 for (uint i = 0; i < curr_num_DBs; i++) {
937 // only optimize the index if it matches the optimize_index_name
938 // session variable
939 const char* optimize_index_name =
940 tokudb::sysvars::optimize_index_name(thd);
941 if (optimize_index_name) {
942 const char* this_index_name =
943 i >= table_share->keys ?
944 "primary" :
945 table_share->key_info[i].name;
946 if (strcasecmp(optimize_index_name, this_index_name) != 0) {
947 continue;
948 }
949 }
950
951 DB* db = share->key_file[i];
952 assert_always(db != NULL);
953 error = db->optimize(db);
954 if (error) {
955 goto cleanup;
956 }
957
958 struct hot_optimize_context hc;
959 memset(&hc, 0, sizeof hc);
960 hc.thd = thd;
961 hc.write_status_msg = this->write_status_msg;
962 hc.ha = this;
963 hc.current_table = i;
964 hc.num_tables = curr_num_DBs;
965 hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd);
966 hc.progress_last_time = toku_current_time_microsec();
967 hc.throttle = tokudb::sysvars::optimize_throttle(thd);
968 uint64_t loops_run;
969 error =
970 db->hot_optimize(
971 db,
972 NULL,
973 NULL,
974 hot_optimize_progress_fun,
975 &hc,
976 &loops_run);
977 if (error) {
978 goto cleanup;
979 }
980 }
981 error = 0;
982
983 cleanup:
984 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
985 thd_progress_end(thd);
986 #endif
987 thd_proc_info(thd, orig_proc_info);
988 TOKUDB_HANDLER_DBUG_RETURN(error);
989 }
990
optimize(TOKUDB_UNUSED (THD * thd),TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))991 int ha_tokudb::optimize(TOKUDB_UNUSED(THD* thd),
992 TOKUDB_UNUSED(HA_CHECK_OPT* check_opt)) {
993 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
994 int error;
995 #if TOKU_OPTIMIZE_WITH_RECREATE
996 error = HA_ADMIN_TRY_ALTER;
997 #else
998 error = do_optimize(thd);
999 #endif
1000 TOKUDB_HANDLER_DBUG_RETURN(error);
1001 }
1002
1003 struct check_context {
1004 THD* thd;
1005 };
1006
ha_tokudb_check_progress(void * extra,TOKUDB_UNUSED (float progress))1007 static int ha_tokudb_check_progress(void* extra,
1008 TOKUDB_UNUSED(float progress)) {
1009 struct check_context* context = (struct check_context*)extra;
1010 int result = 0;
1011 if (thd_killed(context->thd))
1012 result = ER_ABORTING_CONNECTION;
1013 return result;
1014 }
1015
ha_tokudb_check_info(THD * thd,TABLE * table,const char * msg)1016 static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) {
1017 assert(thd->active_vio);
1018
1019 char tablename[
1020 table->s->db.length + 1 +
1021 table->s->table_name.length + 1];
1022 snprintf(
1023 tablename,
1024 sizeof(tablename),
1025 "%.*s.%.*s",
1026 (int)table->s->db.length,
1027 table->s->db.str,
1028 (int)table->s->table_name.length,
1029 table->s->table_name.str);
1030 thd->get_protocol()->start_row();
1031 thd->get_protocol()->store(tablename, strlen(tablename),
1032 system_charset_info);
1033 thd->get_protocol()->store("check", 5, system_charset_info);
1034 thd->get_protocol()->store("info", 4, system_charset_info);
1035 thd->get_protocol()->store(msg, strlen(msg), system_charset_info);
1036 thd->get_protocol()->end_row();
1037 }
1038
check(THD * thd,HA_CHECK_OPT * check_opt)1039 int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) {
1040 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
1041 const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
1042 int result = HA_ADMIN_OK;
1043 int r;
1044
1045 int keep_going = 1;
1046 if (check_opt->flags & T_QUICK) {
1047 keep_going = 0;
1048 }
1049 if (check_opt->flags & T_EXTEND) {
1050 keep_going = 1;
1051 }
1052
1053 r = acquire_table_lock(transaction, lock_write);
1054 if (r != 0)
1055 result = HA_ADMIN_INTERNAL_ERROR;
1056 if (result == HA_ADMIN_OK) {
1057 uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key);
1058 snprintf(
1059 write_status_msg,
1060 sizeof(write_status_msg),
1061 "%s primary=%d num=%d",
1062 share->table_name(),
1063 primary_key,
1064 num_DBs);
1065 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1066 ha_tokudb_check_info(thd, table, write_status_msg);
1067 time_t now = time(0);
1068 char timebuf[32];
1069 TOKUDB_HANDLER_TRACE(
1070 "%.24s %s",
1071 ctime_r(&now, timebuf),
1072 write_status_msg);
1073 }
1074 for (uint i = 0; i < num_DBs; i++) {
1075 DB* db = share->key_file[i];
1076 assert_always(db != NULL);
1077 const char* kname =
1078 i == primary_key ? "primary" : table_share->key_info[i].name;
1079 snprintf(
1080 write_status_msg,
1081 sizeof(write_status_msg),
1082 "%s key=%s %u",
1083 share->table_name(),
1084 kname,
1085 i);
1086 thd_proc_info(thd, write_status_msg);
1087 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1088 ha_tokudb_check_info(thd, table, write_status_msg);
1089 time_t now = time(0);
1090 char timebuf[32];
1091 TOKUDB_HANDLER_TRACE(
1092 "%.24s %s",
1093 ctime_r(&now, timebuf),
1094 write_status_msg);
1095 }
1096 struct check_context check_context = { thd };
1097 r = db->verify_with_progress(
1098 db,
1099 ha_tokudb_check_progress,
1100 &check_context,
1101 (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0,
1102 keep_going);
1103 if (r != 0) {
1104 char msg[32 + strlen(kname)];
1105 sprintf(msg, "Corrupt %s", kname);
1106 ha_tokudb_check_info(thd, table, msg);
1107 }
1108 snprintf(
1109 write_status_msg,
1110 sizeof(write_status_msg),
1111 "%s key=%s %u result=%d",
1112 share->full_table_name(),
1113 kname,
1114 i,
1115 r);
1116 thd_proc_info(thd, write_status_msg);
1117 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1118 ha_tokudb_check_info(thd, table, write_status_msg);
1119 time_t now = time(0);
1120 char timebuf[32];
1121 TOKUDB_HANDLER_TRACE(
1122 "%.24s %s",
1123 ctime_r(&now, timebuf),
1124 write_status_msg);
1125 }
1126 if (result == HA_ADMIN_OK && r != 0) {
1127 result = HA_ADMIN_CORRUPT;
1128 if (!keep_going)
1129 break;
1130 }
1131 }
1132 }
1133 thd_proc_info(thd, orig_proc_info);
1134 TOKUDB_HANDLER_DBUG_RETURN(result);
1135 }
1136