1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDB is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 #include "tokudb_sysvars.h"
27 #include "toku_time.h"
28 
29 #include "my_check_opt.h"
30 
31 namespace tokudb {
32 namespace analyze {
33 
34 class recount_rows_t : public tokudb::background::job_manager_t::job_t {
35 public:
36     void* operator new(size_t sz);
37     void operator delete(void* p);
38 
39     recount_rows_t(
40         bool user_schedued,
41         THD* thd,
42         TOKUDB_SHARE* share,
43         DB_TXN* txn);
44 
45     virtual ~recount_rows_t();
46 
47     virtual const char* key();
48     virtual const char* database();
49     virtual const char* table();
50     virtual const char* type();
51     virtual const char* parameters();
52     virtual const char* status();
53 
54 protected:
55     virtual void on_run();
56 
57     virtual void on_destroy();
58 
59 private:
60     // to be provided by the initiator of recount rows
61     THD*            _thd;
62     TOKUDB_SHARE*   _share;
63     DB_TXN*         _txn;
64     ulonglong       _throttle;
65 
66     // for recount rows status reporting
67     char            _parameters[256];
68     char            _status[1024];
69     int             _result;
70     ulonglong       _recount_start; // in microseconds
71     ulonglong       _total_elapsed_time; // in microseconds
72 
73     bool            _local_txn;
74     ulonglong       _rows;
75     ulonglong       _deleted_rows;
76     ulonglong       _ticks;
77 
78     static int analyze_recount_rows_progress(
79         uint64_t count,
80         uint64_t deleted,
81         void* extra);
82     int analyze_recount_rows_progress(uint64_t count, uint64_t deleted);
83 };
84 
operator new(size_t sz)85 void* recount_rows_t::operator new(size_t sz) {
86     return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
87 }
operator delete(void * p)88 void recount_rows_t::operator delete(void* p) {
89     tokudb::memory::free(p);
90 }
recount_rows_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)91 recount_rows_t::recount_rows_t(
92     bool user_scheduled,
93     THD* thd,
94     TOKUDB_SHARE* share,
95     DB_TXN* txn) :
96     tokudb::background::job_manager_t::job_t(user_scheduled),
97     _share(share),
98     _result(HA_ADMIN_OK),
99     _recount_start(0),
100     _total_elapsed_time(0),
101     _local_txn(false),
102     _rows(0),
103     _deleted_rows(0),
104     _ticks(0) {
105 
106     assert_debug(thd != NULL);
107     assert_debug(share != NULL);
108 
109     if (tokudb::sysvars::analyze_in_background(thd)) {
110         _thd = NULL;
111         _txn = NULL;
112     } else {
113         _thd = thd;
114         _txn = txn;
115     }
116 
117     _throttle = tokudb::sysvars::analyze_throttle(thd);
118 
119     snprintf(_parameters,
120              sizeof(_parameters),
121              "TOKUDB_ANALYZE_THROTTLE=%llu;",
122              _throttle);
123     _status[0] = '\0';
124 }
~recount_rows_t()125 recount_rows_t::~recount_rows_t() {
126 }
on_run()127 void recount_rows_t::on_run() {
128     const char* orig_proc_info = NULL;
129     if (_thd)
130         orig_proc_info = tokudb_thd_get_proc_info(_thd);
131     _recount_start = tokudb::time::microsec();
132     _total_elapsed_time = 0;
133 
134     if (_txn == NULL) {
135         _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
136 
137         if (_result != 0) {
138             _txn = NULL;
139             _result = HA_ADMIN_FAILED;
140             goto error;
141         }
142         _local_txn = true;
143     } else {
144         _local_txn = false;
145     }
146 
147     _result =
148         _share->file->recount_rows(
149             _share->file,
150             analyze_recount_rows_progress,
151             this);
152 
153     if (_result != 0) {
154         if (_local_txn) {
155             _txn->abort(_txn);
156             _txn = NULL;
157         }
158         _result = HA_ADMIN_FAILED;
159         goto error;
160     }
161 
162     DB_BTREE_STAT64 dict_stats;
163     _result = _share->file->stat64(_share->file, _txn, &dict_stats);
164     if (_result == 0) {
165         _share->set_row_count(dict_stats.bt_ndata, false);
166     }
167     if (_result != 0)
168         _result = HA_ADMIN_FAILED;
169 
170     if (_local_txn) {
171         if (_result == HA_ADMIN_OK) {
172             _txn->commit(_txn, 0);
173         } else {
174             _txn->abort(_txn);
175         }
176         _txn = NULL;
177     }
178 
179     sql_print_information(
180         "tokudb analyze recount rows %d counted %lld",
181         _result,
182         _share->row_count());
183 error:
184     if(_thd)
185         tokudb_thd_set_proc_info(_thd, orig_proc_info);
186     return;
187 }
on_destroy()188 void recount_rows_t::on_destroy() {
189     _share->release();
190 }
key()191 const char* recount_rows_t::key() {
192     return _share->full_table_name();
193 }
database()194 const char* recount_rows_t::database() {
195     return _share->database_name();
196 }
table()197 const char* recount_rows_t::table() {
198     return _share->table_name();
199 }
type()200 const char* recount_rows_t::type() {
201     static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS";
202     return type;
203 }
parameters()204 const char* recount_rows_t::parameters() {
205     return _parameters;
206 }
status()207 const char* recount_rows_t::status() {
208     return _status;
209 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted,void * extra)210 int recount_rows_t::analyze_recount_rows_progress(
211     uint64_t count,
212     uint64_t deleted,
213     void* extra) {
214 
215     recount_rows_t* context = (recount_rows_t*)extra;
216     return context->analyze_recount_rows_progress(count, deleted);
217 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted)218 int recount_rows_t::analyze_recount_rows_progress(
219     uint64_t count,
220     uint64_t deleted) {
221 
222     _rows = count;
223     _deleted_rows += deleted;
224     deleted > 0 ? _ticks += deleted : _ticks++;
225 
226     if (_ticks > 1000) {
227         _ticks = 0;
228         uint64_t now = tokudb::time::microsec();
229         _total_elapsed_time = now - _recount_start;
230         if ((_thd && thd_killed(_thd)) || cancelled()) {
231             // client killed
232             return ER_ABORTING_CONNECTION;
233         }
234 
235         // rebuild status
236         // There is a slight race condition here,
237         // _status is used here for tokudb_thd_set_proc_info and it is also used
238         // for the status column in i_s.background_job_status.
239         // If someone happens to be querying/building the i_s table
240         // at the exact same time that the status is being rebuilt here,
241         // the i_s table could get some garbage status.
242         // This solution is a little heavy handed but it works, it prevents us
243         // from changing the status while someone might be immediately observing
244         // us and it prevents someone from observing us while we change the
245         // status
246         tokudb::background::_job_manager->lock();
247         snprintf(_status,
248                  sizeof(_status),
249                  "recount_rows %s.%s counted %llu rows and %llu deleted "
250                  "in %llu seconds.",
251                  _share->database_name(),
252                  _share->table_name(),
253                  _rows,
254                  _deleted_rows,
255                  _total_elapsed_time / tokudb::time::MICROSECONDS);
256         tokudb::background::_job_manager->unlock();
257 
258         // report
259         if (_thd)
260             tokudb_thd_set_proc_info(_thd, _status);
261 
262         // throttle
263         // given the throttle value, lets calculate the maximum number of rows
264         // we should have seen so far in a .1 sec resolution
265         if (_throttle > 0) {
266             uint64_t estimated_rows = _total_elapsed_time / 100000;
267             estimated_rows = estimated_rows * (_throttle / 10);
268             if (_rows + _deleted_rows > estimated_rows) {
269                 // sleep for 1/10 of a second
270                 tokudb::time::sleep_microsec(100000);
271             }
272         }
273     }
274     return 0;
275 }
276 
277 class standard_t : public tokudb::background::job_manager_t::job_t {
278 public:
279     void* operator new(size_t sz);
280     void operator delete(void* p);
281 
282     standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn);
283 
284     virtual ~standard_t();
285 
286     virtual const char* key(void);
287     virtual const char* database();
288     virtual const char* table();
289     virtual const char* type();
290     virtual const char* parameters();
291     virtual const char* status();
292 
293 protected:
294     virtual void on_run();
295 
296     virtual void on_destroy();
297 
298 private:
299     // to be provided by initiator of analyze
300     THD*            _thd;
301     TOKUDB_SHARE*   _share;
302     DB_TXN*         _txn;
303     ulonglong       _throttle;      // in microseconds
304     ulonglong       _time_limit;    // in microseconds
305     double          _delete_fraction;
306 
307     // for analyze status reporting, may also use other state
308     char            _parameters[256];
309     char            _status[1024];
310     int             _result;
311     ulonglong       _analyze_start; // in microseconds
312     ulonglong       _total_elapsed_time; // in microseconds
313 
314     // for analyze internal use, pretty much these are per-key/index
315     ulonglong       _current_key;
316     bool            _local_txn;
317     ulonglong       _half_time;
318     ulonglong       _half_rows;
319     ulonglong       _rows;
320     ulonglong       _deleted_rows;
321     ulonglong       _ticks;
322     ulonglong       _analyze_key_start; // in microseconds
323     ulonglong       _key_elapsed_time; // in microseconds
324     uint            _scan_direction;
325 
326     static bool analyze_standard_cursor_callback(
327         void* extra,
328         uint64_t deleted_rows);
329     bool analyze_standard_cursor_callback(uint64_t deleted_rows);
330 
331     int analyze_key_progress();
332     int analyze_key(uint64_t* rec_per_key_part);
333 };
334 
operator new(size_t sz)335 void* standard_t::operator new(size_t sz) {
336     return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
337 }
operator delete(void * p)338 void standard_t::operator delete(void* p) {
339     tokudb::memory::free(p);
340 }
standard_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)341 standard_t::standard_t(
342     bool user_scheduled,
343     THD* thd,
344     TOKUDB_SHARE* share,
345     DB_TXN* txn) :
346     tokudb::background::job_manager_t::job_t(user_scheduled),
347     _share(share),
348     _result(HA_ADMIN_OK),
349     _analyze_start(0),
350     _total_elapsed_time(0),
351     _current_key(0),
352     _local_txn(false),
353     _half_time(0),
354     _half_rows(0),
355     _rows(0),
356     _deleted_rows(0),
357     _ticks(0),
358     _analyze_key_start(0),
359     _key_elapsed_time(0),
360     _scan_direction(0) {
361 
362     assert_debug(thd != NULL);
363     assert_debug(share != NULL);
364 
365     if (tokudb::sysvars::analyze_in_background(thd)) {
366        _thd = NULL;
367        _txn = NULL;
368     } else {
369        _thd = thd;
370        _txn = txn;
371     }
372     _throttle = tokudb::sysvars::analyze_throttle(thd);
373     _time_limit =
374         tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS;
375     _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd);
376 
377     snprintf(_parameters,
378              sizeof(_parameters),
379              "TOKUDB_ANALYZE_DELETE_FRACTION=%f; "
380              "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;",
381              _delete_fraction,
382              _time_limit / tokudb::time::MICROSECONDS,
383              _throttle);
384 
385     _status[0] = '\0';
386 }
~standard_t()387 standard_t::~standard_t() {
388 }
on_run()389 void standard_t::on_run() {
390     DB_BTREE_STAT64 stat64;
391     uint64_t rec_per_key_part[_share->_max_key_parts];
392     uint64_t total_key_parts = 0;
393     const char* orig_proc_info = NULL;
394     if (_thd)
395         orig_proc_info = tokudb_thd_get_proc_info(_thd);
396 
397     _analyze_start = tokudb::time::microsec();
398     _half_time = _time_limit > 0 ? _time_limit/2 : 0;
399 
400     if (_txn == NULL) {
401         _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
402 
403         if (_result != 0) {
404             _txn = NULL;
405             _result = HA_ADMIN_FAILED;
406             goto error;
407         }
408         _local_txn = true;
409     } else {
410         _local_txn = false;
411     }
412 
413     assert_always(_share->key_file[0] != NULL);
414     _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64);
415     if (_result != 0) {
416         _result = HA_ADMIN_FAILED;
417         goto cleanup;
418     }
419     _half_rows = stat64.bt_ndata / 2;
420 
421     for (ulonglong current_key = 0;
422          _result == HA_ADMIN_OK && current_key < _share->_keys;
423          current_key++) {
424 
425         _current_key = current_key;
426         _rows = _deleted_rows = _ticks = 0;
427         _result = analyze_key(&rec_per_key_part[total_key_parts]);
428 
429         if ((_result != 0 && _result != ETIME) ||
430             (_result != 0 && _rows == 0 && _deleted_rows > 0)) {
431             _result = HA_ADMIN_FAILED;
432         }
433         if (_thd && (_result == HA_ADMIN_FAILED ||
434             static_cast<double>(_deleted_rows) >
435                 _delete_fraction * (_rows + _deleted_rows))) {
436 
437             char name[256]; int namelen;
438             namelen =
439                 snprintf(
440                     name,
441                     sizeof(name),
442                     "%s.%s.%s",
443                     _share->database_name(),
444                     _share->table_name(),
445                     _share->_key_descriptors[_current_key]._name);
446             _thd->get_protocol()->start_row();
447             _thd->get_protocol()->store(name, namelen,  system_charset_info);
448             _thd->get_protocol()->store("analyze", 7, system_charset_info);
449             _thd->get_protocol()->store("info", 4, system_charset_info);
450             char rowmsg[256];
451             int rowmsglen;
452             rowmsglen =
453                 snprintf(
454                     rowmsg,
455                     sizeof(rowmsg),
456                     "rows processed %llu rows deleted %llu",
457                     _rows,
458                     _deleted_rows);
459             _thd->get_protocol()->store(rowmsg, rowmsglen,
460                                         system_charset_info);
461             _thd->get_protocol()->end_row();
462 
463             sql_print_information(
464                 "tokudb analyze on %.*s %.*s",
465                 namelen,
466                 name,
467                 rowmsglen,
468                 rowmsg);
469         }
470 
471         total_key_parts += _share->_key_descriptors[_current_key]._parts;
472     }
473     if (_result == HA_ADMIN_OK) {
474         int error =
475             tokudb::set_card_in_status(
476                 _share->status_block,
477                 _txn,
478                 total_key_parts,
479                 rec_per_key_part);
480         if (error)
481             _result = HA_ADMIN_FAILED;
482 
483         _share->lock();
484         _share->update_cardinality_counts(total_key_parts, rec_per_key_part);
485         _share->allow_auto_analysis(true);
486         _share->unlock();
487     }
488 
489 cleanup:
490     if (_local_txn) {
491         if (_result == HA_ADMIN_OK) {
492             _txn->commit(_txn, 0);
493         } else {
494             _txn->abort(_txn);
495         }
496         _txn = NULL;
497     }
498 
499 error:
500     if (_thd)
501         tokudb_thd_set_proc_info(_thd, orig_proc_info);
502     return;
503 }
on_destroy()504 void standard_t::on_destroy() {
505     _share->lock();
506     _share->allow_auto_analysis(false);
507     _share->unlock();
508     _share->release();
509 }
key()510 const char* standard_t::key() {
511     return _share->full_table_name();
512 }
database()513 const char* standard_t::database() {
514     return _share->database_name();
515 }
table()516 const char* standard_t::table() {
517     return _share->table_name();
518 }
type()519 const char* standard_t::type() {
520     static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD";
521     return type;
522 }
parameters()523 const char* standard_t::parameters() {
524     return _parameters;
525 }
status()526 const char* standard_t::status() {
527     return _status;
528 }
analyze_standard_cursor_callback(void * extra,uint64_t deleted_rows)529 bool standard_t::analyze_standard_cursor_callback(
530     void* extra,
531     uint64_t deleted_rows) {
532     standard_t* context = (standard_t*)extra;
533     return context->analyze_standard_cursor_callback(deleted_rows);
534 }
analyze_standard_cursor_callback(uint64_t deleted_rows)535 bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) {
536     _deleted_rows += deleted_rows;
537     _ticks += deleted_rows;
538     return analyze_key_progress() != 0;
539 }
analyze_key_progress(void)540 int standard_t::analyze_key_progress(void) {
541     if (_ticks > 1000) {
542         _ticks = 0;
543         uint64_t now = tokudb::time::microsec();
544         _total_elapsed_time = now - _analyze_start;
545         _key_elapsed_time = now - _analyze_key_start;
546         if ((_thd && thd_killed(_thd)) || cancelled()) {
547             // client killed
548             return ER_ABORTING_CONNECTION;
549         } else if (_time_limit > 0 &&
550                    static_cast<uint64_t>(_key_elapsed_time) > _time_limit) {
551             // time limit reached
552             return ETIME;
553         }
554 
555         // rebuild status
556         // There is a slight race condition here,
557         // _status is used here for tokudb_thd_set_proc_info and it is also used
558         // for the status column in i_s.background_job_status.
559         // If someone happens to be querying/building the i_s table
560         // at the exact same time that the status is being rebuilt here,
561         // the i_s table could get some garbage status.
562         // This solution is a little heavy handed but it works, it prevents us
563         // from changing the status while someone might be immediately observing
564         // us and it prevents someone from observing us while we change the
565         // status.
566         static const char* scan_direction_str[] = {"not scanning",
567                                                    "scanning forward",
568                                                    "scanning backward",
569                                                    "scan unknown"};
570 
571         const char* scan_direction = NULL;
572         switch (_scan_direction) {
573             case 0:
574                 scan_direction = scan_direction_str[0];
575                 break;
576             case DB_NEXT:
577                 scan_direction = scan_direction_str[1];
578                 break;
579             case DB_PREV:
580                 scan_direction = scan_direction_str[2];
581                 break;
582             default:
583                 scan_direction = scan_direction_str[3];
584                 break;
585         }
586 
587         float progress_rows = 0.0;
588         if (_share->row_count() > 0)
589             progress_rows = static_cast<float>(_rows) /
590                             static_cast<float>(_share->row_count());
591         float progress_time = 0.0;
592         if (_time_limit > 0)
593             progress_time = static_cast<float>(_key_elapsed_time) /
594                             static_cast<float>(_time_limit);
595         tokudb::background::_job_manager->lock();
596         snprintf(
597             _status,
598             sizeof(_status),
599             "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% "
600             "time, %s",
601             _share->database_name(),
602             _share->table_name(),
603             _share->_key_descriptors[_current_key]._name,
604             _current_key,
605             _share->_keys,
606             progress_rows * 100.0,
607             progress_time * 100.0,
608             scan_direction);
609         tokudb::background::_job_manager->unlock();
610 
611         // report
612         if (_thd)
613             tokudb_thd_set_proc_info(_thd, _status);
614 
615         // throttle
616         // given the throttle value, lets calculate the maximum number of rows
617         // we should have seen so far in a .1 sec resolution
618         if (_throttle > 0) {
619             uint64_t estimated_rows = _key_elapsed_time / 100000;
620             estimated_rows = estimated_rows * (_throttle / 10);
621             if (_rows + _deleted_rows > estimated_rows) {
622                 // sleep for 1/10 of a second
623                 tokudb::time::sleep_microsec(100000);
624             }
625         }
626     }
627     return 0;
628 }
analyze_key(uint64_t * rec_per_key_part)629 int standard_t::analyze_key(uint64_t* rec_per_key_part) {
630     int error = 0;
631     DB* db = _share->key_file[_current_key];
632     assert_always(db != NULL);
633     uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts;
634     uint64_t unique_rows[num_key_parts];
635     bool is_unique = _share->_key_descriptors[_current_key]._is_unique;
636     DBC* cursor = NULL;
637     int close_error = 0;
638     DBT key, prev_key;
639     bool copy_key = false;
640 
641     _analyze_key_start = tokudb::time::microsec();
642     _key_elapsed_time = 0;
643     _scan_direction = DB_NEXT;
644 
645     if (is_unique && num_key_parts == 1) {
646         // don't compute for unique keys with a single part. we already know
647         // the answer.
648         _rows = unique_rows[0] = 1;
649         goto done;
650     }
651 
652     for (uint64_t i = 0; i < num_key_parts; i++)
653         unique_rows[i] = 1;
654 
655     // stop looking when the entire dictionary was analyzed, or a
656     // cap on execution time was reached, or the analyze was killed.
657     while (1) {
658         if (cursor == NULL) {
659             error = db->cursor(db, _txn, &cursor, 0);
660             if (error != 0)
661                 goto done;
662 
663             cursor->c_set_check_interrupt_callback(
664                 cursor,
665                 analyze_standard_cursor_callback,
666                 this);
667 
668             memset(&key, 0, sizeof(DBT));
669             memset(&prev_key, 0, sizeof(DBT));
670             copy_key = true;
671         }
672 
673         error = cursor->c_get(cursor, &key, 0, _scan_direction);
674         if (error != 0) {
675             if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED)
676                 error = 0; // not an error
677             break;
678         } else if (cancelled()) {
679             error = ER_ABORTING_CONNECTION;
680             break;
681         }
682 
683         _rows++;
684         _ticks++;
685 
686         // if copy_key is false at this pont, we have some value sitting in
687         // prev_key that we can compare to
688         // if the comparison reveals a unique key, we must set copy_key to true
689         // so the code following can copy he current key into prev_key for the
690         // next iteration
691         if (copy_key == false) {
692             // compare this key with the previous key. ignore
693             // appended PK for SK's.
694             // TODO if a prefix is different, then all larger keys
695             // that include the prefix are also different.
696             // TODO if we are comparing the entire primary key or
697             // the entire unique secondary key, then the cardinality
698             // must be 1, so we can avoid computing it.
699             for (uint64_t i = 0; i < num_key_parts; i++) {
700                 int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
701                 if (cmp != 0) {
702                     unique_rows[i]++;
703                     copy_key = true;
704                 }
705             }
706         }
707 
708         // prev_key = key or prev_key is NULL
709         if (copy_key) {
710             prev_key.data =
711                 tokudb::memory::realloc(
712                     prev_key.data,
713                     key.size,
714                     MYF(MY_WME|MY_ZEROFILL|MY_FAE));
715             assert_always(prev_key.data);
716             prev_key.size = key.size;
717             memcpy(prev_key.data, key.data, prev_key.size);
718             copy_key = false;
719         }
720 
721         error = analyze_key_progress();
722         if (error == ETIME) {
723             error = 0;
724             break;
725         } else if (error) {
726             break;
727         }
728 
729         // if we have a time limit, are scanning forward and have exceed the
730         // _half_time and not passed the _half_rows number of the rows in the
731         // index: clean up the keys, close the cursor and reverse direction.
732         if (TOKUDB_UNLIKELY(_half_time > 0 &&
733             _scan_direction == DB_NEXT &&
734             _key_elapsed_time >= _half_time &&
735             _rows < _half_rows)) {
736 
737             tokudb::memory::free(prev_key.data); prev_key.data = NULL;
738             close_error = cursor->c_close(cursor);
739             assert_always(close_error == 0);
740             cursor = NULL;
741             _scan_direction = DB_PREV;
742         }
743     }
744     // cleanup
745     if (prev_key.data) tokudb::memory::free(prev_key.data);
746     if (cursor) close_error = cursor->c_close(cursor);
747     assert_always(close_error == 0);
748 
749 done:
750     // in case we timed out (bunch of deleted records) without hitting a
751     // single row
752     if (_rows == 0)
753         _rows = 1;
754 
755     // return cardinality
756     for (uint64_t i = 0; i < num_key_parts; i++) {
757         rec_per_key_part[i] = _rows / unique_rows[i];
758     }
759     return error;
760 }
761 
762 } // namespace analyze
763 } // namespace tokudb
764 
765 
analyze(THD * thd,TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))766 int ha_tokudb::analyze(THD *thd, TOKUDB_UNUSED(HA_CHECK_OPT *check_opt)) {
767     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
768     int result = HA_ADMIN_OK;
769     tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd);
770 
771     switch (mode) {
772     case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS:
773         result = share->analyze_recount_rows(thd, transaction);
774         break;
775     case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD:
776         share->lock();
777         result = share->analyze_standard(thd, transaction);
778         share->unlock();
779         break;
780     case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL:
781         share->cancel_background_jobs();
782         break;
783     default:
784         break; // no-op
785     }
786     TOKUDB_HANDLER_DBUG_RETURN(result);
787 }
788 
analyze_recount_rows(THD * thd,DB_TXN * txn)789 int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) {
790     TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
791 
792     assert_always(thd != NULL);
793 
794     int result = HA_ADMIN_OK;
795 
796     tokudb::analyze::recount_rows_t* job
797         = new tokudb::analyze::recount_rows_t(true, thd, this, txn);
798     assert_always(job != NULL);
799 
800     // job->destroy will drop the ref
801     addref();
802     unlock();
803 
804     bool ret = tokudb::background::_job_manager->
805         run_job(job, tokudb::sysvars::analyze_in_background(thd));
806 
807     if (!ret) {
808         job->destroy();
809         delete job;
810         result = HA_ADMIN_FAILED;
811     }
812 
813     TOKUDB_HANDLER_DBUG_RETURN(result);
814 }
815 
816 // on entry, if txn is !NULL, it is a user session invoking ANALYZE directly
817 // and no lock will be held on 'this', else if txn is NULL it is an auto and
818 // 'this' will be locked.
analyze_standard(THD * thd,DB_TXN * txn)819 int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) {
820     TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
821 
822     assert_always(thd != NULL);
823     assert_debug(_mutex.is_owned_by_me() == true);
824 
825     int result = HA_ADMIN_OK;
826 
827     // stub out analyze if optimize is remapped to alter recreate + analyze
828     // when not auto analyze or if this is an alter
829     if ((txn &&
830          thd_sql_command(thd) != SQLCOM_ANALYZE &&
831          thd_sql_command(thd) != SQLCOM_ALTER_TABLE) ||
832         thd_sql_command(thd) == SQLCOM_ALTER_TABLE) {
833         TOKUDB_HANDLER_DBUG_RETURN(result);
834     }
835 
836     tokudb::analyze::standard_t* job
837         = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd,
838                                           this, txn);
839     assert_always(job != NULL);
840 
841     // akin to calling addref, but we know, right here, right now, everything
842     // in the share is set up, files open, etc...
843     // job->destroy will drop the ref
844     _use_count++;
845 
846     // don't want any autos kicking off while we are analyzing
847     disallow_auto_analysis();
848 
849     unlock();
850 
851     bool ret =
852         tokudb::background::_job_manager->run_job(
853             job,
854             tokudb::sysvars::analyze_in_background(thd));
855 
856     if (!ret) {
857         job->destroy();
858         delete job;
859         result = HA_ADMIN_FAILED;
860     }
861 
862     lock();
863 
864     TOKUDB_HANDLER_DBUG_RETURN(result);
865 }
866 
867 
868 typedef struct hot_optimize_context {
869     THD* thd;
870     char* write_status_msg;
871     ha_tokudb* ha;
872     uint progress_stage;
873     uint current_table;
874     uint num_tables;
875     float progress_limit;
876     uint64_t progress_last_time;
877     uint64_t throttle;
878 } *HOT_OPTIMIZE_CONTEXT;
879 
hot_optimize_progress_fun(void * extra,float progress)880 static int hot_optimize_progress_fun(void *extra, float progress) {
881     HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
882     if (thd_killed(context->thd)) {
883         sprintf(
884             context->write_status_msg,
885             "The process has been killed, aborting hot optimize.");
886         return ER_ABORTING_CONNECTION;
887     }
888     float percentage = progress * 100;
889     sprintf(
890         context->write_status_msg,
891         "Optimization of index %u of %u about %.lf%% done",
892         context->current_table + 1,
893         context->num_tables,
894         percentage);
895     thd_proc_info(context->thd, context->write_status_msg);
896 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
897     if (context->progress_stage < context->current_table) {
898         // the progress stage is behind the current table, so move up
899         // to the next stage and set the progress stage to current.
900         thd_progress_next_stage(context->thd);
901         context->progress_stage = context->current_table;
902     }
903     // the percentage we report here is for the current stage/db
904     thd_progress_report(context->thd, (unsigned long long) percentage, 100);
905 #endif
906 
907     // throttle the optimize table
908     if (context->throttle) {
909         uint64_t time_now = toku_current_time_microsec();
910         uint64_t dt = time_now - context->progress_last_time;
911         uint64_t throttle_time = 1000000ULL / context->throttle;
912         if (throttle_time > dt) {
913             usleep(throttle_time - dt);
914         }
915         context->progress_last_time = toku_current_time_microsec();
916     }
917 
918     // return 1 if progress has reach the progress limit
919     return progress >= context->progress_limit;
920 }
921 
922 // flatten all DB's in this table, to do so, peform hot optimize on each db
do_optimize(THD * thd)923 int ha_tokudb::do_optimize(THD* thd) {
924     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
925     int error = 0;
926     const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
927     uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
928 
929 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
930     // each DB is its own stage. as HOT goes through each db, we'll
931     // move on to the next stage.
932     thd_progress_init(thd, curr_num_DBs);
933 #endif
934 
935     // for each DB, run optimize and hot_optimize
936     for (uint i = 0; i < curr_num_DBs; i++) {
937         // only optimize the index if it matches the optimize_index_name
938         // session variable
939         const char* optimize_index_name =
940             tokudb::sysvars::optimize_index_name(thd);
941         if (optimize_index_name) {
942             const char* this_index_name =
943                 i >= table_share->keys ?
944                     "primary" :
945                     table_share->key_info[i].name;
946             if (strcasecmp(optimize_index_name, this_index_name) != 0) {
947                 continue;
948             }
949         }
950 
951         DB* db = share->key_file[i];
952         assert_always(db != NULL);
953         error = db->optimize(db);
954         if (error) {
955             goto cleanup;
956         }
957 
958         struct hot_optimize_context hc;
959         memset(&hc, 0, sizeof hc);
960         hc.thd = thd;
961         hc.write_status_msg = this->write_status_msg;
962         hc.ha = this;
963         hc.current_table = i;
964         hc.num_tables = curr_num_DBs;
965         hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd);
966         hc.progress_last_time = toku_current_time_microsec();
967         hc.throttle = tokudb::sysvars::optimize_throttle(thd);
968         uint64_t loops_run;
969         error =
970             db->hot_optimize(
971                 db,
972                 NULL,
973                 NULL,
974                 hot_optimize_progress_fun,
975                 &hc,
976                 &loops_run);
977         if (error) {
978             goto cleanup;
979         }
980     }
981     error = 0;
982 
983 cleanup:
984 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
985     thd_progress_end(thd);
986 #endif
987     thd_proc_info(thd, orig_proc_info);
988     TOKUDB_HANDLER_DBUG_RETURN(error);
989 }
990 
optimize(TOKUDB_UNUSED (THD * thd),TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))991 int ha_tokudb::optimize(TOKUDB_UNUSED(THD* thd),
992                         TOKUDB_UNUSED(HA_CHECK_OPT* check_opt)) {
993     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
994     int error;
995 #if TOKU_OPTIMIZE_WITH_RECREATE
996     error = HA_ADMIN_TRY_ALTER;
997 #else
998     error = do_optimize(thd);
999 #endif
1000     TOKUDB_HANDLER_DBUG_RETURN(error);
1001 }
1002 
1003 struct check_context {
1004     THD* thd;
1005 };
1006 
ha_tokudb_check_progress(void * extra,TOKUDB_UNUSED (float progress))1007 static int ha_tokudb_check_progress(void* extra,
1008                                     TOKUDB_UNUSED(float progress)) {
1009     struct check_context* context = (struct check_context*)extra;
1010     int result = 0;
1011     if (thd_killed(context->thd))
1012         result = ER_ABORTING_CONNECTION;
1013     return result;
1014 }
1015 
ha_tokudb_check_info(THD * thd,TABLE * table,const char * msg)1016 static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) {
1017     assert(thd->active_vio);
1018 
1019     char tablename[
1020         table->s->db.length + 1 +
1021         table->s->table_name.length + 1];
1022     snprintf(
1023         tablename,
1024         sizeof(tablename),
1025         "%.*s.%.*s",
1026         (int)table->s->db.length,
1027         table->s->db.str,
1028         (int)table->s->table_name.length,
1029         table->s->table_name.str);
1030     thd->get_protocol()->start_row();
1031     thd->get_protocol()->store(tablename, strlen(tablename),
1032                                system_charset_info);
1033     thd->get_protocol()->store("check", 5, system_charset_info);
1034     thd->get_protocol()->store("info", 4, system_charset_info);
1035     thd->get_protocol()->store(msg, strlen(msg), system_charset_info);
1036     thd->get_protocol()->end_row();
1037 }
1038 
check(THD * thd,HA_CHECK_OPT * check_opt)1039 int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) {
1040     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
1041     const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
1042     int result = HA_ADMIN_OK;
1043     int r;
1044 
1045     int keep_going = 1;
1046     if (check_opt->flags & T_QUICK) {
1047         keep_going = 0;
1048     }
1049     if (check_opt->flags & T_EXTEND) {
1050         keep_going = 1;
1051     }
1052 
1053     r = acquire_table_lock(transaction, lock_write);
1054     if (r != 0)
1055         result = HA_ADMIN_INTERNAL_ERROR;
1056     if (result == HA_ADMIN_OK) {
1057         uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key);
1058         snprintf(
1059             write_status_msg,
1060             sizeof(write_status_msg),
1061             "%s primary=%d num=%d",
1062             share->table_name(),
1063             primary_key,
1064             num_DBs);
1065         if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1066             ha_tokudb_check_info(thd, table, write_status_msg);
1067             time_t now = time(0);
1068             char timebuf[32];
1069             TOKUDB_HANDLER_TRACE(
1070                 "%.24s %s",
1071                 ctime_r(&now, timebuf),
1072                 write_status_msg);
1073         }
1074         for (uint i = 0; i < num_DBs; i++) {
1075             DB* db = share->key_file[i];
1076             assert_always(db != NULL);
1077             const char* kname =
1078                 i == primary_key ? "primary" : table_share->key_info[i].name;
1079             snprintf(
1080                 write_status_msg,
1081                 sizeof(write_status_msg),
1082                 "%s key=%s %u",
1083                 share->table_name(),
1084                 kname,
1085                 i);
1086             thd_proc_info(thd, write_status_msg);
1087             if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1088                 ha_tokudb_check_info(thd, table, write_status_msg);
1089                 time_t now = time(0);
1090                 char timebuf[32];
1091                 TOKUDB_HANDLER_TRACE(
1092                     "%.24s %s",
1093                     ctime_r(&now, timebuf),
1094                     write_status_msg);
1095             }
1096             struct check_context check_context = { thd };
1097             r = db->verify_with_progress(
1098                 db,
1099                 ha_tokudb_check_progress,
1100                 &check_context,
1101                 (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0,
1102                 keep_going);
1103             if (r != 0) {
1104                 char msg[32 + strlen(kname)];
1105                 sprintf(msg, "Corrupt %s", kname);
1106                 ha_tokudb_check_info(thd, table, msg);
1107             }
1108             snprintf(
1109                 write_status_msg,
1110                 sizeof(write_status_msg),
1111                 "%s key=%s %u result=%d",
1112                 share->full_table_name(),
1113                 kname,
1114                 i,
1115                 r);
1116             thd_proc_info(thd, write_status_msg);
1117             if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1118                 ha_tokudb_check_info(thd, table, write_status_msg);
1119                 time_t now = time(0);
1120                 char timebuf[32];
1121                 TOKUDB_HANDLER_TRACE(
1122                     "%.24s %s",
1123                     ctime_r(&now, timebuf),
1124                     write_status_msg);
1125             }
1126             if (result == HA_ADMIN_OK && r != 0) {
1127                 result = HA_ADMIN_CORRUPT;
1128                 if (!keep_going)
1129                     break;
1130             }
1131         }
1132     }
1133     thd_proc_info(thd, orig_proc_info);
1134     TOKUDB_HANDLER_DBUG_RETURN(result);
1135 }
1136