1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDB is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 #include "tokudb_sysvars.h"
27 #include "toku_time.h"
28 
29 namespace tokudb {
30 namespace analyze {
31 
32 class recount_rows_t : public tokudb::background::job_manager_t::job_t {
33 public:
34     void* operator new(size_t sz);
35     void operator delete(void* p);
36 
37     recount_rows_t(
38         bool user_schedued,
39         THD* thd,
40         TOKUDB_SHARE* share,
41         DB_TXN* txn);
42 
43     virtual ~recount_rows_t();
44 
45     virtual const char* key();
46     virtual const char* database();
47     virtual const char* table();
48     virtual const char* type();
49     virtual const char* parameters();
50     virtual const char* status();
51 
52 protected:
53     virtual void on_run();
54 
55     virtual void on_destroy();
56 
57 private:
58     // to be provided by the initiator of recount rows
59     THD*            _thd;
60     TOKUDB_SHARE*   _share;
61     DB_TXN*         _txn;
62     ulonglong       _throttle;
63 
64     // for recount rows status reporting
65     char            _parameters[256];
66     char            _status[1024];
67     int             _result;
68     ulonglong       _recount_start; // in microseconds
69     ulonglong       _total_elapsed_time; // in microseconds
70 
71     bool            _local_txn;
72     ulonglong       _rows;
73     ulonglong       _deleted_rows;
74     ulonglong       _ticks;
75 
76     static int analyze_recount_rows_progress(
77         uint64_t count,
78         uint64_t deleted,
79         void* extra);
80     int analyze_recount_rows_progress(uint64_t count, uint64_t deleted);
81 };
82 
operator new(size_t sz)83 void* recount_rows_t::operator new(size_t sz) {
84     return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
85 }
operator delete(void * p)86 void recount_rows_t::operator delete(void* p) {
87     tokudb::memory::free(p);
88 }
recount_rows_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)89 recount_rows_t::recount_rows_t(
90     bool user_scheduled,
91     THD* thd,
92     TOKUDB_SHARE* share,
93     DB_TXN* txn) :
94     tokudb::background::job_manager_t::job_t(user_scheduled),
95     _share(share),
96     _result(HA_ADMIN_OK),
97     _recount_start(0),
98     _total_elapsed_time(0),
99     _local_txn(false),
100     _rows(0),
101     _deleted_rows(0),
102     _ticks(0) {
103 
104     assert_debug(thd != NULL);
105     assert_debug(share != NULL);
106 
107     if (tokudb::sysvars::analyze_in_background(thd)) {
108         _thd = NULL;
109         _txn = NULL;
110     } else {
111         _thd = thd;
112         _txn = txn;
113     }
114 
115     _throttle = tokudb::sysvars::analyze_throttle(thd);
116 
117     snprintf(_parameters,
118              sizeof(_parameters),
119              "TOKUDB_ANALYZE_THROTTLE=%llu;",
120              _throttle);
121     _status[0] = '\0';
122 }
~recount_rows_t()123 recount_rows_t::~recount_rows_t() {
124 }
on_run()125 void recount_rows_t::on_run() {
126     const char* orig_proc_info = NULL;
127     if (_thd)
128         orig_proc_info = tokudb_thd_get_proc_info(_thd);
129     _recount_start = tokudb::time::microsec();
130     _total_elapsed_time = 0;
131 
132     if (_txn == NULL) {
133         _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
134 
135         if (_result != 0) {
136             _txn = NULL;
137             _result = HA_ADMIN_FAILED;
138             goto error;
139         }
140         _local_txn = true;
141     } else {
142         _local_txn = false;
143     }
144 
145     _result =
146         _share->file->recount_rows(
147             _share->file,
148             analyze_recount_rows_progress,
149             this);
150 
151     if (_result != 0) {
152         if (_local_txn) {
153             _txn->abort(_txn);
154             _txn = NULL;
155         }
156         _result = HA_ADMIN_FAILED;
157         goto error;
158     }
159 
160     DB_BTREE_STAT64 dict_stats;
161     _result = _share->file->stat64(_share->file, _txn, &dict_stats);
162     if (_result == 0) {
163         _share->set_row_count(dict_stats.bt_ndata, false);
164     }
165     if (_result != 0)
166         _result = HA_ADMIN_FAILED;
167 
168     if (_local_txn) {
169         if (_result == HA_ADMIN_OK) {
170             _txn->commit(_txn, 0);
171         } else {
172             _txn->abort(_txn);
173         }
174         _txn = NULL;
175     }
176 
177     sql_print_information(
178         "tokudb analyze recount rows %d counted %lld",
179         _result,
180         _share->row_count());
181 error:
182     if(_thd)
183         tokudb_thd_set_proc_info(_thd, orig_proc_info);
184     return;
185 }
on_destroy()186 void recount_rows_t::on_destroy() {
187     _share->release();
188 }
key()189 const char* recount_rows_t::key() {
190     return _share->full_table_name();
191 }
database()192 const char* recount_rows_t::database() {
193     return _share->database_name();
194 }
table()195 const char* recount_rows_t::table() {
196     return _share->table_name();
197 }
type()198 const char* recount_rows_t::type() {
199     static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS";
200     return type;
201 }
parameters()202 const char* recount_rows_t::parameters() {
203     return _parameters;
204 }
status()205 const char* recount_rows_t::status() {
206     return _status;
207 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted,void * extra)208 int recount_rows_t::analyze_recount_rows_progress(
209     uint64_t count,
210     uint64_t deleted,
211     void* extra) {
212 
213     recount_rows_t* context = (recount_rows_t*)extra;
214     return context->analyze_recount_rows_progress(count, deleted);
215 }
analyze_recount_rows_progress(uint64_t count,uint64_t deleted)216 int recount_rows_t::analyze_recount_rows_progress(
217     uint64_t count,
218     uint64_t deleted) {
219 
220     _rows = count;
221     _deleted_rows += deleted;
222     deleted > 0 ? _ticks += deleted : _ticks++;
223 
224     if (_ticks > 1000) {
225         _ticks = 0;
226         uint64_t now = tokudb::time::microsec();
227         _total_elapsed_time = now - _recount_start;
228         if ((_thd && thd_kill_level(_thd)) || cancelled()) {
229             // client killed
230             return ER_ABORTING_CONNECTION;
231         }
232 
233         // rebuild status
234         // There is a slight race condition here,
235         // _status is used here for tokudb_thd_set_proc_info and it is also used
236         // for the status column in i_s.background_job_status.
237         // If someone happens to be querying/building the i_s table
238         // at the exact same time that the status is being rebuilt here,
239         // the i_s table could get some garbage status.
240         // This solution is a little heavy handed but it works, it prevents us
241         // from changing the status while someone might be immediately observing
242         // us and it prevents someone from observing us while we change the
243         // status
244         tokudb::background::_job_manager->lock();
245         snprintf(_status,
246                  sizeof(_status),
247                  "recount_rows %s.%s counted %llu rows and %llu deleted "
248                  "in %llu seconds.",
249                  _share->database_name(),
250                  _share->table_name(),
251                  _rows,
252                  _deleted_rows,
253                  _total_elapsed_time / tokudb::time::MICROSECONDS);
254         tokudb::background::_job_manager->unlock();
255 
256         // report
257         if (_thd)
258             tokudb_thd_set_proc_info(_thd, _status);
259 
260         // throttle
261         // given the throttle value, lets calculate the maximum number of rows
262         // we should have seen so far in a .1 sec resolution
263         if (_throttle > 0) {
264             uint64_t estimated_rows = _total_elapsed_time / 100000;
265             estimated_rows = estimated_rows * (_throttle / 10);
266             if (_rows + _deleted_rows > estimated_rows) {
267                 // sleep for 1/10 of a second
268                 tokudb::time::sleep_microsec(100000);
269             }
270         }
271     }
272     return 0;
273 }
274 
275 class standard_t : public tokudb::background::job_manager_t::job_t {
276 public:
277     void* operator new(size_t sz);
278     void operator delete(void* p);
279 
280     standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn);
281 
282     virtual ~standard_t();
283 
284     virtual const char* key(void);
285     virtual const char* database();
286     virtual const char* table();
287     virtual const char* type();
288     virtual const char* parameters();
289     virtual const char* status();
290 
291 protected:
292     virtual void on_run();
293 
294     virtual void on_destroy();
295 
296 private:
297     // to be provided by initiator of analyze
298     THD*            _thd;
299     TOKUDB_SHARE*   _share;
300     DB_TXN*         _txn;
301     ulonglong       _throttle;      // in microseconds
302     ulonglong       _time_limit;    // in microseconds
303     double          _delete_fraction;
304 
305     // for analyze status reporting, may also use other state
306     char            _parameters[256];
307     char            _status[1024];
308     int             _result;
309     ulonglong       _analyze_start; // in microseconds
310     ulonglong       _total_elapsed_time; // in microseconds
311 
312     // for analyze internal use, pretty much these are per-key/index
313     ulonglong       _current_key;
314     bool            _local_txn;
315     ulonglong       _half_time;
316     ulonglong       _half_rows;
317     ulonglong       _rows;
318     ulonglong       _deleted_rows;
319     ulonglong       _ticks;
320     ulonglong       _analyze_key_start; // in microseconds
321     ulonglong       _key_elapsed_time; // in microseconds
322     uint            _scan_direction;
323 
324     static bool analyze_standard_cursor_callback(
325         void* extra,
326         uint64_t deleted_rows);
327     bool analyze_standard_cursor_callback(uint64_t deleted_rows);
328 
329     int analyze_key_progress();
330     int analyze_key(uint64_t* rec_per_key_part);
331 };
332 
operator new(size_t sz)333 void* standard_t::operator new(size_t sz) {
334     return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
335 }
operator delete(void * p)336 void standard_t::operator delete(void* p) {
337     tokudb::memory::free(p);
338 }
standard_t(bool user_scheduled,THD * thd,TOKUDB_SHARE * share,DB_TXN * txn)339 standard_t::standard_t(
340     bool user_scheduled,
341     THD* thd,
342     TOKUDB_SHARE* share,
343     DB_TXN* txn) :
344     tokudb::background::job_manager_t::job_t(user_scheduled),
345     _share(share),
346     _result(HA_ADMIN_OK),
347     _analyze_start(0),
348     _total_elapsed_time(0),
349     _current_key(0),
350     _local_txn(false),
351     _half_time(0),
352     _half_rows(0),
353     _rows(0),
354     _deleted_rows(0),
355     _ticks(0),
356     _analyze_key_start(0),
357     _key_elapsed_time(0),
358     _scan_direction(0) {
359 
360     assert_debug(thd != NULL);
361     assert_debug(share != NULL);
362 
363     if (tokudb::sysvars::analyze_in_background(thd)) {
364        _thd = NULL;
365        _txn = NULL;
366     } else {
367        _thd = thd;
368        _txn = txn;
369     }
370     _throttle = tokudb::sysvars::analyze_throttle(thd);
371     _time_limit =
372         tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS;
373     _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd);
374 
375     snprintf(_parameters,
376              sizeof(_parameters),
377              "TOKUDB_ANALYZE_DELETE_FRACTION=%f; "
378              "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;",
379              _delete_fraction,
380              _time_limit / tokudb::time::MICROSECONDS,
381              _throttle);
382 
383     _status[0] = '\0';
384 }
~standard_t()385 standard_t::~standard_t() {
386 }
on_run()387 void standard_t::on_run() {
388     DB_BTREE_STAT64 stat64;
389     uint64_t rec_per_key_part[_share->_max_key_parts];
390     uint64_t total_key_parts = 0;
391     const char* orig_proc_info = NULL;
392     if (_thd)
393         orig_proc_info = tokudb_thd_get_proc_info(_thd);
394 
395     _analyze_start = tokudb::time::microsec();
396     _half_time = _time_limit > 0 ? _time_limit/2 : 0;
397 
398     if (_txn == NULL) {
399         _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
400 
401         if (_result != 0) {
402             _txn = NULL;
403             _result = HA_ADMIN_FAILED;
404             goto error;
405         }
406         _local_txn = true;
407     } else {
408         _local_txn = false;
409     }
410 
411     assert_always(_share->key_file[0] != NULL);
412     _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64);
413     if (_result != 0) {
414         _result = HA_ADMIN_FAILED;
415         goto cleanup;
416     }
417     _half_rows = stat64.bt_ndata / 2;
418 
419     for (ulonglong current_key = 0;
420          _result == HA_ADMIN_OK && current_key < _share->_keys;
421          current_key++) {
422 
423         _current_key = current_key;
424         _rows = _deleted_rows = _ticks = 0;
425         _result = analyze_key(&rec_per_key_part[total_key_parts]);
426 
427         if ((_result != 0 && _result != ETIME) ||
428             (_result != 0 && _rows == 0 && _deleted_rows > 0)) {
429             _result = HA_ADMIN_FAILED;
430         }
431         if (_thd && (_result == HA_ADMIN_FAILED ||
432             static_cast<double>(_deleted_rows) >
433                 _delete_fraction * (_rows + _deleted_rows))) {
434 
435             char name[256]; int namelen;
436             namelen =
437                 snprintf(
438                     name,
439                     sizeof(name),
440                     "%s.%s.%s",
441                     _share->database_name(),
442                     _share->table_name(),
443                     _share->_key_descriptors[_current_key]._name);
444             _thd->protocol->prepare_for_resend();
445             _thd->protocol->store(name, namelen,  system_charset_info);
446             _thd->protocol->store("analyze", 7, system_charset_info);
447             _thd->protocol->store("info", 4, system_charset_info);
448             char rowmsg[256];
449             int rowmsglen;
450             rowmsglen =
451                 snprintf(
452                     rowmsg,
453                     sizeof(rowmsg),
454                     "rows processed %llu rows deleted %llu",
455                     _rows,
456                     _deleted_rows);
457             _thd->protocol->store(rowmsg, rowmsglen, system_charset_info);
458             _thd->protocol->write();
459 
460             sql_print_information(
461                 "tokudb analyze on %.*s %.*s",
462                 namelen,
463                 name,
464                 rowmsglen,
465                 rowmsg);
466         }
467 
468         total_key_parts += _share->_key_descriptors[_current_key]._parts;
469     }
470     if (_result == HA_ADMIN_OK) {
471         int error =
472             tokudb::set_card_in_status(
473                 _share->status_block,
474                 _txn,
475                 total_key_parts,
476                 rec_per_key_part);
477         if (error)
478             _result = HA_ADMIN_FAILED;
479 
480         _share->lock();
481         _share->update_cardinality_counts(total_key_parts, rec_per_key_part);
482         _share->allow_auto_analysis(true);
483         _share->unlock();
484     }
485 
486 cleanup:
487     if (_local_txn) {
488         if (_result == HA_ADMIN_OK) {
489             _txn->commit(_txn, 0);
490         } else {
491             _txn->abort(_txn);
492         }
493         _txn = NULL;
494     }
495 
496 error:
497     if (_thd)
498         tokudb_thd_set_proc_info(_thd, orig_proc_info);
499     return;
500 }
on_destroy()501 void standard_t::on_destroy() {
502     _share->lock();
503     _share->allow_auto_analysis(false);
504     _share->unlock();
505     _share->release();
506 }
key()507 const char* standard_t::key() {
508     return _share->full_table_name();
509 }
database()510 const char* standard_t::database() {
511     return _share->database_name();
512 }
table()513 const char* standard_t::table() {
514     return _share->table_name();
515 }
type()516 const char* standard_t::type() {
517     static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD";
518     return type;
519 }
parameters()520 const char* standard_t::parameters() {
521     return _parameters;
522 }
status()523 const char* standard_t::status() {
524     return _status;
525 }
analyze_standard_cursor_callback(void * extra,uint64_t deleted_rows)526 bool standard_t::analyze_standard_cursor_callback(
527     void* extra,
528     uint64_t deleted_rows) {
529     standard_t* context = (standard_t*)extra;
530     return context->analyze_standard_cursor_callback(deleted_rows);
531 }
analyze_standard_cursor_callback(uint64_t deleted_rows)532 bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) {
533     _deleted_rows += deleted_rows;
534     _ticks += deleted_rows;
535     return analyze_key_progress() != 0;
536 }
analyze_key_progress(void)537 int standard_t::analyze_key_progress(void) {
538     if (_ticks > 1000) {
539         _ticks = 0;
540         uint64_t now = tokudb::time::microsec();
541         _total_elapsed_time = now - _analyze_start;
542         _key_elapsed_time = now - _analyze_key_start;
543         if ((_thd && thd_kill_level(_thd)) || cancelled()) {
544             // client killed
545             return ER_ABORTING_CONNECTION;
546         } else if (_time_limit > 0 &&
547                    static_cast<uint64_t>(_key_elapsed_time) > _time_limit) {
548             // time limit reached
549             return ETIME;
550         }
551 
552         // rebuild status
553         // There is a slight race condition here,
554         // _status is used here for tokudb_thd_set_proc_info and it is also used
555         // for the status column in i_s.background_job_status.
556         // If someone happens to be querying/building the i_s table
557         // at the exact same time that the status is being rebuilt here,
558         // the i_s table could get some garbage status.
559         // This solution is a little heavy handed but it works, it prevents us
560         // from changing the status while someone might be immediately observing
561         // us and it prevents someone from observing us while we change the
562         // status.
563         static const char* scan_direction_str[] = {"not scanning",
564                                                    "scanning forward",
565                                                    "scanning backward",
566                                                    "scan unknown"};
567 
568         const char* scan_direction = NULL;
569         switch (_scan_direction) {
570             case 0:
571                 scan_direction = scan_direction_str[0];
572                 break;
573             case DB_NEXT:
574                 scan_direction = scan_direction_str[1];
575                 break;
576             case DB_PREV:
577                 scan_direction = scan_direction_str[2];
578                 break;
579             default:
580                 scan_direction = scan_direction_str[3];
581                 break;
582         }
583 
584         float progress_rows = 0.0;
585         if (_share->row_count() > 0)
586             progress_rows = static_cast<float>(_rows) /
587                             static_cast<float>(_share->row_count());
588         float progress_time = 0.0;
589         if (_time_limit > 0)
590             progress_time = static_cast<float>(_key_elapsed_time) /
591                             static_cast<float>(_time_limit);
592         tokudb::background::_job_manager->lock();
593         snprintf(
594             _status,
595             sizeof(_status),
596             "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% "
597             "time, %s",
598             _share->database_name(),
599             _share->table_name(),
600             _share->_key_descriptors[_current_key]._name,
601             _current_key,
602             _share->_keys,
603             progress_rows * 100.0,
604             progress_time * 100.0,
605             scan_direction);
606         tokudb::background::_job_manager->unlock();
607 
608         // report
609         if (_thd)
610             tokudb_thd_set_proc_info(_thd, _status);
611 
612         // throttle
613         // given the throttle value, lets calculate the maximum number of rows
614         // we should have seen so far in a .1 sec resolution
615         if (_throttle > 0) {
616             uint64_t estimated_rows = _key_elapsed_time / 100000;
617             estimated_rows = estimated_rows * (_throttle / 10);
618             if (_rows + _deleted_rows > estimated_rows) {
619                 // sleep for 1/10 of a second
620                 tokudb::time::sleep_microsec(100000);
621             }
622         }
623     }
624     return 0;
625 }
analyze_key(uint64_t * rec_per_key_part)626 int standard_t::analyze_key(uint64_t* rec_per_key_part) {
627     int error = 0;
628     DB* db = _share->key_file[_current_key];
629     assert_always(db != NULL);
630     uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts;
631     uint64_t unique_rows[num_key_parts];
632     bool is_unique = _share->_key_descriptors[_current_key]._is_unique;
633     DBC* cursor = NULL;
634     int close_error = 0;
635     DBT key, prev_key;
636     bool copy_key = false;
637 
638     _analyze_key_start = tokudb::time::microsec();
639     _key_elapsed_time = 0;
640     _scan_direction = DB_NEXT;
641 
642     if (is_unique && num_key_parts == 1) {
643         // don't compute for unique keys with a single part. we already know
644         // the answer.
645         _rows = unique_rows[0] = 1;
646         goto done;
647     }
648 
649     for (uint64_t i = 0; i < num_key_parts; i++)
650         unique_rows[i] = 1;
651 
652     // stop looking when the entire dictionary was analyzed, or a
653     // cap on execution time was reached, or the analyze was killed.
654     while (1) {
655         if (cursor == NULL) {
656             error = db->cursor(db, _txn, &cursor, 0);
657             if (error != 0)
658                 goto done;
659 
660             cursor->c_set_check_interrupt_callback(
661                 cursor,
662                 analyze_standard_cursor_callback,
663                 this);
664 
665             memset(&key, 0, sizeof(DBT));
666             memset(&prev_key, 0, sizeof(DBT));
667             copy_key = true;
668         }
669 
670         error = cursor->c_get(cursor, &key, 0, _scan_direction);
671         if (error != 0) {
672             if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED)
673                 error = 0; // not an error
674             break;
675         } else if (cancelled()) {
676             error = ER_ABORTING_CONNECTION;
677             break;
678         }
679 
680         _rows++;
681         _ticks++;
682 
683         // if copy_key is false at this pont, we have some value sitting in
684         // prev_key that we can compare to
685         // if the comparison reveals a unique key, we must set copy_key to true
686         // so the code following can copy he current key into prev_key for the
687         // next iteration
688         if (copy_key == false) {
689             // compare this key with the previous key. ignore
690             // appended PK for SK's.
691             // TODO if a prefix is different, then all larger keys
692             // that include the prefix are also different.
693             // TODO if we are comparing the entire primary key or
694             // the entire unique secondary key, then the cardinality
695             // must be 1, so we can avoid computing it.
696             for (uint64_t i = 0; i < num_key_parts; i++) {
697                 int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
698                 if (cmp != 0) {
699                     unique_rows[i]++;
700                     copy_key = true;
701                 }
702             }
703         }
704 
705         // prev_key = key or prev_key is NULL
706         if (copy_key) {
707             prev_key.data =
708                 tokudb::memory::realloc(
709                     prev_key.data,
710                     key.size,
711                     MYF(MY_WME|MY_ZEROFILL|MY_FAE));
712             assert_always(prev_key.data);
713             prev_key.size = key.size;
714             memcpy(prev_key.data, key.data, prev_key.size);
715             copy_key = false;
716         }
717 
718         error = analyze_key_progress();
719         if (error == ETIME) {
720             error = 0;
721             break;
722         } else if (error) {
723             break;
724         }
725 
726         // if we have a time limit, are scanning forward and have exceed the
727         // _half_time and not passed the _half_rows number of the rows in the
728         // index: clean up the keys, close the cursor and reverse direction.
729         if (TOKUDB_UNLIKELY(_half_time > 0 &&
730             _scan_direction == DB_NEXT &&
731             _key_elapsed_time >= _half_time &&
732             _rows < _half_rows)) {
733 
734             tokudb::memory::free(prev_key.data); prev_key.data = NULL;
735             close_error = cursor->c_close(cursor);
736             assert_always(close_error == 0);
737             cursor = NULL;
738             _scan_direction = DB_PREV;
739         }
740     }
741     // cleanup
742     if (prev_key.data) tokudb::memory::free(prev_key.data);
743     if (cursor) close_error = cursor->c_close(cursor);
744     assert_always(close_error == 0);
745 
746 done:
747     // in case we timed out (bunch of deleted records) without hitting a
748     // single row
749     if (_rows == 0)
750         _rows = 1;
751 
752     // return cardinality
753     for (uint64_t i = 0; i < num_key_parts; i++) {
754         rec_per_key_part[i] = _rows / unique_rows[i];
755     }
756     return error;
757 }
758 
759 } // namespace analyze
760 } // namespace tokudb
761 
762 
analyze(THD * thd,TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))763 int ha_tokudb::analyze(THD *thd, TOKUDB_UNUSED(HA_CHECK_OPT *check_opt)) {
764     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
765     int result = HA_ADMIN_OK;
766     tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd);
767 
768     switch (mode) {
769     case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS:
770         result = share->analyze_recount_rows(thd, transaction);
771         break;
772     case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD:
773         share->lock();
774         result = share->analyze_standard(thd, transaction);
775         share->unlock();
776         break;
777     case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL:
778         share->cancel_background_jobs();
779         break;
780     default:
781         break; // no-op
782     }
783     TOKUDB_HANDLER_DBUG_RETURN(result);
784 }
785 
analyze_recount_rows(THD * thd,DB_TXN * txn)786 int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) {
787     TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
788 
789     assert_always(thd != NULL);
790 
791     int result = HA_ADMIN_OK;
792 
793     tokudb::analyze::recount_rows_t* job
794         = new tokudb::analyze::recount_rows_t(true, thd, this, txn);
795     assert_always(job != NULL);
796 
797     // job->destroy will drop the ref
798     addref();
799     unlock();
800 
801     bool ret = tokudb::background::_job_manager->
802         run_job(job, tokudb::sysvars::analyze_in_background(thd));
803 
804     if (!ret) {
805         job->destroy();
806         delete job;
807         result = HA_ADMIN_FAILED;
808     }
809 
810     TOKUDB_HANDLER_DBUG_RETURN(result);
811 }
812 
813 // on entry, if txn is !NULL, it is a user session invoking ANALYZE directly
814 // and no lock will be held on 'this', else if txn is NULL it is an auto and
815 // 'this' will be locked.
analyze_standard(THD * thd,DB_TXN * txn)816 int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) {
817     TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
818 
819     assert_always(thd != NULL);
820     assert_debug(_mutex.is_owned_by_me() == true);
821 
822     int result = HA_ADMIN_OK;
823 
824     // stub out analyze if optimize is remapped to alter recreate + analyze
825     // when not auto analyze or if this is an alter
826     if ((txn &&
827          thd_sql_command(thd) != SQLCOM_ANALYZE &&
828          thd_sql_command(thd) != SQLCOM_ALTER_TABLE) ||
829         thd_sql_command(thd) == SQLCOM_ALTER_TABLE) {
830         TOKUDB_HANDLER_DBUG_RETURN(result);
831     }
832 
833     tokudb::analyze::standard_t* job
834         = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd,
835                                           this, txn);
836     assert_always(job != NULL);
837 
838     // akin to calling addref, but we know, right here, right now, everything
839     // in the share is set up, files open, etc...
840     // job->destroy will drop the ref
841     _use_count++;
842 
843     // don't want any autos kicking off while we are analyzing
844     disallow_auto_analysis();
845 
846     unlock();
847 
848     bool ret =
849         tokudb::background::_job_manager->run_job(
850             job,
851             tokudb::sysvars::analyze_in_background(thd));
852 
853     if (!ret) {
854         job->destroy();
855         delete job;
856         result = HA_ADMIN_FAILED;
857     }
858 
859     lock();
860 
861     TOKUDB_HANDLER_DBUG_RETURN(result);
862 }
863 
864 
865 typedef struct hot_optimize_context {
866     THD* thd;
867     char* write_status_msg;
868     ha_tokudb* ha;
869     uint progress_stage;
870     uint current_table;
871     uint num_tables;
872     float progress_limit;
873     uint64_t progress_last_time;
874     uint64_t throttle;
875 } *HOT_OPTIMIZE_CONTEXT;
876 
hot_optimize_progress_fun(void * extra,float progress)877 static int hot_optimize_progress_fun(void *extra, float progress) {
878     HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
879     if (thd_kill_level(context->thd)) {
880         sprintf(
881             context->write_status_msg,
882             "The process has been killed, aborting hot optimize.");
883         return ER_ABORTING_CONNECTION;
884     }
885     float percentage = progress * 100;
886     sprintf(
887         context->write_status_msg,
888         "Optimization of index %u of %u about %.lf%% done",
889         context->current_table + 1,
890         context->num_tables,
891         percentage);
892     thd_proc_info(context->thd, context->write_status_msg);
893 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
894     if (context->progress_stage < context->current_table) {
895         // the progress stage is behind the current table, so move up
896         // to the next stage and set the progress stage to current.
897         thd_progress_next_stage(context->thd);
898         context->progress_stage = context->current_table;
899     }
900     // the percentage we report here is for the current stage/db
901     thd_progress_report(context->thd, (unsigned long long) percentage, 100);
902 #endif
903 
904     // throttle the optimize table
905     if (context->throttle) {
906         uint64_t time_now = toku_current_time_microsec();
907         uint64_t dt = time_now - context->progress_last_time;
908         uint64_t throttle_time = 1000000ULL / context->throttle;
909         if (throttle_time > dt) {
910             usleep(throttle_time - dt);
911         }
912         context->progress_last_time = toku_current_time_microsec();
913     }
914 
915     // return 1 if progress has reach the progress limit
916     return progress >= context->progress_limit;
917 }
918 
919 // flatten all DB's in this table, to do so, peform hot optimize on each db
do_optimize(THD * thd)920 int ha_tokudb::do_optimize(THD* thd) {
921     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
922     int error = 0;
923     const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
924     uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
925 
926 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
927     // each DB is its own stage. as HOT goes through each db, we'll
928     // move on to the next stage.
929     thd_progress_init(thd, curr_num_DBs);
930 #endif
931 
932     // for each DB, run optimize and hot_optimize
933     for (uint i = 0; i < curr_num_DBs; i++) {
934         // only optimize the index if it matches the optimize_index_name
935         // session variable
936         const char* optimize_index_name =
937             tokudb::sysvars::optimize_index_name(thd);
938         if (optimize_index_name) {
939             const char* this_index_name =
940                 i >= table_share->keys ?
941                     "primary" :
942                     table_share->key_info[i].name.str;
943             if (strcasecmp(optimize_index_name, this_index_name) != 0) {
944                 continue;
945             }
946         }
947 
948         DB* db = share->key_file[i];
949         assert_always(db != NULL);
950         error = db->optimize(db);
951         if (error) {
952             goto cleanup;
953         }
954 
955         struct hot_optimize_context hc;
956         memset(&hc, 0, sizeof hc);
957         hc.thd = thd;
958         hc.write_status_msg = this->write_status_msg;
959         hc.ha = this;
960         hc.current_table = i;
961         hc.num_tables = curr_num_DBs;
962         hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd);
963         hc.progress_last_time = toku_current_time_microsec();
964         hc.throttle = tokudb::sysvars::optimize_throttle(thd);
965         uint64_t loops_run;
966         error =
967             db->hot_optimize(
968                 db,
969                 NULL,
970                 NULL,
971                 hot_optimize_progress_fun,
972                 &hc,
973                 &loops_run);
974         if (error) {
975             goto cleanup;
976         }
977     }
978     error = 0;
979 
980 cleanup:
981 #ifdef HA_TOKUDB_HAS_THD_PROGRESS
982     thd_progress_end(thd);
983 #endif
984     thd_proc_info(thd, orig_proc_info);
985     TOKUDB_HANDLER_DBUG_RETURN(error);
986 }
987 
optimize(TOKUDB_UNUSED (THD * thd),TOKUDB_UNUSED (HA_CHECK_OPT * check_opt))988 int ha_tokudb::optimize(TOKUDB_UNUSED(THD* thd),
989                         TOKUDB_UNUSED(HA_CHECK_OPT* check_opt)) {
990     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
991     int error;
992 #if TOKU_OPTIMIZE_WITH_RECREATE
993     error = HA_ADMIN_TRY_ALTER;
994 #else
995     error = do_optimize(thd);
996 #endif
997     TOKUDB_HANDLER_DBUG_RETURN(error);
998 }
999 
1000 struct check_context {
1001     THD* thd;
1002 };
1003 
ha_tokudb_check_progress(void * extra,TOKUDB_UNUSED (float progress))1004 static int ha_tokudb_check_progress(void* extra,
1005                                     TOKUDB_UNUSED(float progress)) {
1006     struct check_context* context = (struct check_context*)extra;
1007     int result = 0;
1008     if (thd_kill_level(context->thd))
1009         result = ER_ABORTING_CONNECTION;
1010     return result;
1011 }
1012 
ha_tokudb_check_info(THD * thd,TABLE * table,const char * msg)1013 static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) {
1014     if (thd->vio_ok()) {
1015         char tablename[
1016             table->s->db.length + 1 +
1017             table->s->table_name.length + 1];
1018         snprintf(
1019             tablename,
1020             sizeof(tablename),
1021             "%.*s.%.*s",
1022             (int)table->s->db.length,
1023             table->s->db.str,
1024             (int)table->s->table_name.length,
1025             table->s->table_name.str);
1026         thd->protocol->prepare_for_resend();
1027         thd->protocol->store(tablename, strlen(tablename), system_charset_info);
1028         thd->protocol->store("check", 5, system_charset_info);
1029         thd->protocol->store("info", 4, system_charset_info);
1030         thd->protocol->store(msg, strlen(msg), system_charset_info);
1031         thd->protocol->write();
1032     }
1033 }
1034 
check(THD * thd,HA_CHECK_OPT * check_opt)1035 int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) {
1036     TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
1037     const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
1038     int result = HA_ADMIN_OK;
1039     int r;
1040 
1041     int keep_going = 1;
1042     if (check_opt->flags & T_QUICK) {
1043         keep_going = 0;
1044     }
1045     if (check_opt->flags & T_EXTEND) {
1046         keep_going = 1;
1047     }
1048 
1049     r = acquire_table_lock(transaction, lock_write);
1050     if (r != 0)
1051         result = HA_ADMIN_INTERNAL_ERROR;
1052     if (result == HA_ADMIN_OK) {
1053         uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key);
1054         snprintf(
1055             write_status_msg,
1056             sizeof(write_status_msg),
1057             "%s primary=%d num=%d",
1058             share->table_name(),
1059             primary_key,
1060             num_DBs);
1061         if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1062             ha_tokudb_check_info(thd, table, write_status_msg);
1063             time_t now = time(0);
1064             char timebuf[32];
1065             TOKUDB_HANDLER_TRACE(
1066                 "%.24s %s",
1067                 ctime_r(&now, timebuf),
1068                 write_status_msg);
1069         }
1070         for (uint i = 0; i < num_DBs; i++) {
1071             DB* db = share->key_file[i];
1072             assert_always(db != NULL);
1073             const char* kname =
1074                 i == primary_key ? "primary" : table_share->key_info[i].name.str;
1075             snprintf(
1076                 write_status_msg,
1077                 sizeof(write_status_msg),
1078                 "%s key=%s %u",
1079                 share->table_name(),
1080                 kname,
1081                 i);
1082             thd_proc_info(thd, write_status_msg);
1083             if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1084                 ha_tokudb_check_info(thd, table, write_status_msg);
1085                 time_t now = time(0);
1086                 char timebuf[32];
1087                 TOKUDB_HANDLER_TRACE(
1088                     "%.24s %s",
1089                     ctime_r(&now, timebuf),
1090                     write_status_msg);
1091             }
1092             struct check_context check_context = { thd };
1093             r = db->verify_with_progress(
1094                 db,
1095                 ha_tokudb_check_progress,
1096                 &check_context,
1097                 (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0,
1098                 keep_going);
1099             if (r != 0) {
1100                 char msg[32 + strlen(kname)];
1101                 sprintf(msg, "Corrupt %s", kname);
1102                 ha_tokudb_check_info(thd, table, msg);
1103             }
1104             snprintf(
1105                 write_status_msg,
1106                 sizeof(write_status_msg),
1107                 "%s key=%s %u result=%d",
1108                 share->full_table_name(),
1109                 kname,
1110                 i,
1111                 r);
1112             thd_proc_info(thd, write_status_msg);
1113             if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1114                 ha_tokudb_check_info(thd, table, write_status_msg);
1115                 time_t now = time(0);
1116                 char timebuf[32];
1117                 TOKUDB_HANDLER_TRACE(
1118                     "%.24s %s",
1119                     ctime_r(&now, timebuf),
1120                     write_status_msg);
1121             }
1122             if (result == HA_ADMIN_OK && r != 0) {
1123                 result = HA_ADMIN_CORRUPT;
1124                 if (!keep_going)
1125                     break;
1126             }
1127         }
1128     }
1129     thd_proc_info(thd, orig_proc_info);
1130     TOKUDB_HANDLER_DBUG_RETURN(result);
1131 }
1132