1 /*
2    Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbcluster.h"
27 #include "ha_ndb_index_stat.h"
28 #include <mysql/plugin.h>
29 #include <ctype.h>
30 
31 /* from other files */
32 extern struct st_ndb_status g_ndb_status;
33 extern native_mutex_t ndbcluster_mutex;
34 
35 // Implementation still uses its own instance
36 extern Ndb_index_stat_thread ndb_index_stat_thread;
37 
38 /* Implemented in ha_ndbcluster.cc */
39 extern bool ndb_index_stat_get_enable(THD *thd);
40 extern const char* g_ndb_status_index_stat_status;
41 extern long g_ndb_status_index_stat_cache_query;
42 extern long g_ndb_status_index_stat_cache_clean;
43 
44 // Typedefs for long names
45 typedef NdbDictionary::Table NDBTAB;
46 typedef NdbDictionary::Index NDBINDEX;
47 
48 /** ndb_index_stat_thread */
Ndb_index_stat_thread()49 Ndb_index_stat_thread::Ndb_index_stat_thread()
50   : Ndb_component("Index Stat"),
51     client_waiting(false)
52 {
53   native_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
54   native_cond_init(&COND);
55 
56   native_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
57   native_cond_init(&stat_cond);
58 }
59 
~Ndb_index_stat_thread()60 Ndb_index_stat_thread::~Ndb_index_stat_thread()
61 {
62   native_mutex_destroy(&LOCK);
63   native_cond_destroy(&COND);
64 
65   native_mutex_destroy(&stat_mutex);
66   native_cond_destroy(&stat_cond);
67 }
68 
do_wakeup()69 void Ndb_index_stat_thread::do_wakeup()
70 {
71   // Wakeup from potential wait
72   log_info("Wakeup");
73 
74   wakeup();
75 }
76 
wakeup()77 void Ndb_index_stat_thread::wakeup()
78 {
79   native_mutex_lock(&LOCK);
80   client_waiting= true;
81   native_cond_signal(&COND);
82   native_mutex_unlock(&LOCK);
83 }
84 
85 struct Ndb_index_stat {
86   enum {
87     LT_Undef= 0,
88     LT_New= 1,          /* new entry added by a table handler */
89     LT_Update = 2,      /* force kernel update from analyze table */
90     LT_Read= 3,         /* read or reread stats into new query cache */
91     LT_Idle= 4,         /* stats exist */
92     LT_Check= 5,        /* check for new stats */
93     LT_Delete= 6,       /* delete the entry */
94     LT_Error= 7,        /* error, on hold for a while */
95     LT_Count= 8
96   };
97   NdbIndexStat* is;
98   int index_id;
99   int index_version;
100 #ifndef DBUG_OFF
101   char id[32];
102 #endif
103   time_t access_time;   /* by any table handler */
104   time_t update_time;   /* latest successful update by us */
105   time_t load_time;     /* when stats were created by kernel */
106   time_t read_time;     /* when stats were read by us (>= load_time) */
107   uint sample_version;  /* goes with read_time */
108   time_t check_time;    /* when checked for updated stats (>= read_time) */
109   uint query_bytes;     /* cache query bytes in use */
110   uint clean_bytes;     /* cache clean bytes waiting to be deleted */
111   uint drop_bytes;      /* cache bytes waiting for drop */
112   uint evict_bytes;     /* cache bytes waiting for evict */
113   bool force_update;    /* one-time force update from analyze table */
114   bool no_stats;        /* have detected that no stats exist */
115   NdbIndexStat::Error error;
116   NdbIndexStat::Error client_error;
117   time_t error_time;
118   uint error_count;     /* forever increasing */
119   struct Ndb_index_stat *share_next; /* per-share list */
120   int lt;
121   int lt_old;     /* for info only */
122   struct Ndb_index_stat *list_next;
123   struct Ndb_index_stat *list_prev;
124   struct NDB_SHARE *share;
125   uint ref_count;       /* from client requests */
126   bool to_delete;       /* detached from share and marked for delete */
127   bool abort_request;   /* abort all requests and allow no more */
128   Ndb_index_stat();
129 };
130 
131 struct Ndb_index_stat_list {
132   const char *name;
133   int lt;
134   struct Ndb_index_stat *head;
135   struct Ndb_index_stat *tail;
136   uint count;
137   Ndb_index_stat_list(int the_lt, const char* the_name);
138 };
139 
140 extern Ndb_index_stat_list ndb_index_stat_list[];
141 
142 static time_t ndb_index_stat_time_now= 0;
143 
144 static time_t
ndb_index_stat_time()145 ndb_index_stat_time()
146 {
147   time_t now= time(0);
148 
149   if (unlikely(ndb_index_stat_time_now == 0))
150     ndb_index_stat_time_now= now;
151 
152   if (unlikely(now < ndb_index_stat_time_now))
153   {
154     DBUG_PRINT("index_stat", ("time moved backwards %d seconds",
155                               int(ndb_index_stat_time_now - now)));
156     now= ndb_index_stat_time_now;
157   }
158 
159   ndb_index_stat_time_now= now;
160   return now;
161 }
162 
163 /*
164   Return error on stats queries before stats thread starts
165   and after it exits.  This is only a pre-caution since mysqld
166   should not allow clients at these times.
167 */
168 static bool ndb_index_stat_allow_flag= false;
169 
170 static bool
ndb_index_stat_allow(int flag=-1)171 ndb_index_stat_allow(int flag= -1)
172 {
173   if (flag != -1)
174     ndb_index_stat_allow_flag= (bool)flag;
175   return ndb_index_stat_allow_flag;
176 }
177 
178 /* Options */
179 
180 /* Options in string format buffer size */
181 static const uint ndb_index_stat_option_sz= 512;
182 static void ndb_index_stat_opt2str(const struct Ndb_index_stat_opt&, char*);
183 
184 struct Ndb_index_stat_opt {
185   enum Unit {
186     Ubool = 1,
187     Usize = 2,
188     Utime = 3,
189     Umsec = 4
190   };
191   enum Flag {
192     Freadonly = (1 << 0),
193     Fcontrol = (1 << 1)
194   };
195   struct Val {
196     const char* name;
197     uint val;
198     uint minval;
199     uint maxval;
200     Unit unit;
201     uint flag;
202   };
203   enum Idx {
204     Iloop_enable = 0,
205     Iloop_idle = 1,
206     Iloop_busy = 2,
207     Iupdate_batch = 3,
208     Iread_batch = 4,
209     Iidle_batch = 5,
210     Icheck_batch = 6,
211     Icheck_delay = 7,
212     Idelete_batch = 8,
213     Iclean_delay = 9,
214     Ierror_batch = 10,
215     Ierror_delay = 11,
216     Ievict_batch = 12,
217     Ievict_delay = 13,
218     Icache_limit = 14,
219     Icache_lowpct = 15,
220     Izero_total = 16,
221     Imax = 17
222   };
223   Val val[Imax];
224   /* Options in string format (SYSVAR ndb_index_stat_option) */
225   char *option;
226   Ndb_index_stat_opt(char* buf);
getNdb_index_stat_opt227   uint get(Idx i) const {
228     assert(i < Imax);
229     return val[i].val;
230   }
setNdb_index_stat_opt231   void set(Idx i, uint the_val) {
232     assert(i < Imax);
233     val[i].val = the_val;
234   }
235 };
236 
Ndb_index_stat_opt(char * buf)237 Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
238   option(buf)
239 {
240 #define ival(aname, aval, aminval, amaxval, aunit, aflag) \
241   val[I##aname].name = #aname; \
242   val[I##aname].val = aval; \
243   val[I##aname].minval = aminval; \
244   val[I##aname].maxval = amaxval; \
245   val[I##aname].unit = aunit; \
246   val[I##aname].flag = aflag
247   ival(loop_enable, 1000, 0, ~(uint)0, Umsec, 0);
248   ival(loop_idle, 1000, 0, ~(uint)0, Umsec, 0);
249   ival(loop_busy, 100, 0, ~(uint)0, Umsec, 0);
250   ival(update_batch, 1, 1, ~(uint)0, Usize, 0);
251   ival(read_batch, 4, 1, ~(uint)0, Usize, 0);
252   ival(idle_batch, 32, 1, ~(uint)0, Usize, 0);
253   ival(check_batch, 8, 1, ~(uint)0, Usize, 0);
254   ival(check_delay, 600, 0, ~(uint)0, Utime, 0);
255   ival(clean_delay, 60, 0, ~(uint)0, Utime, 0);
256   ival(delete_batch, 8, 1, ~(uint)0, Usize, 0);
257   ival(error_batch, 4, 1, ~(uint)0, Usize, 0);
258   ival(error_delay, 60, 0, ~(uint)0, Utime, 0);
259   ival(evict_batch, 8, 1, ~(uint)0, Usize, 0);
260   ival(evict_delay, 60, 0, ~(uint)0, Utime, 0);
261   ival(cache_limit, 32*1024*1024, 0, ~(uint)0, Usize, 0);
262   ival(cache_lowpct, 90, 0, 100, Usize, 0);
263   ival(zero_total, 0, 0, 1, Ubool, Fcontrol);
264 #undef ival
265 
266   ndb_index_stat_opt2str(*this, option);
267 }
268 
269 /* Hard limits */
270 static const uint ndb_index_stat_max_evict_batch = 32;
271 
272 char ndb_index_stat_option_buf[ndb_index_stat_option_sz];
273 static Ndb_index_stat_opt ndb_index_stat_opt(ndb_index_stat_option_buf);
274 
275 /* Copy option struct to string buffer */
276 static void
ndb_index_stat_opt2str(const Ndb_index_stat_opt & opt,char * str)277 ndb_index_stat_opt2str(const Ndb_index_stat_opt& opt, char* str)
278 {
279   DBUG_ENTER("ndb_index_stat_opt2str");
280 
281   char buf[ndb_index_stat_option_sz];
282   char *const end= &buf[sizeof(buf)];
283   char* ptr= buf;
284   *ptr= 0;
285 
286   const uint imax= Ndb_index_stat_opt::Imax;
287   for (uint i= 0; i < imax; i++)
288   {
289     const Ndb_index_stat_opt::Val& v= opt.val[i];
290     ptr+= strlen(ptr);
291     const char* sep= (ptr == buf ? "" : ",");
292     const uint sz= ptr < end ? (uint)(end - ptr) : 0;
293 
294     switch (v.unit) {
295     case Ndb_index_stat_opt::Ubool:
296       {
297         DBUG_ASSERT(v.val == 0 || v.val == 1);
298         if (v.val == 0)
299           my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
300         else
301           my_snprintf(ptr, sz, "%s%s=1", sep, v.name);
302       }
303       break;
304 
305     case Ndb_index_stat_opt::Usize:
306       {
307         uint m;
308         if (v.val == 0)
309           my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
310         else if (v.val % (m= 1024*1024*1024) == 0)
311           my_snprintf(ptr, sz, "%s%s=%uG", sep, v.name, v.val / m);
312         else if (v.val % (m= 1024*1024) == 0)
313           my_snprintf(ptr, sz, "%s%s=%uM", sep, v.name, v.val / m);
314         else if (v.val % (m= 1024) == 0)
315           my_snprintf(ptr, sz, "%s%s=%uK", sep, v.name, v.val / m);
316         else
317           my_snprintf(ptr, sz, "%s%s=%u", sep, v.name, v.val);
318       }
319       break;
320 
321     case Ndb_index_stat_opt::Utime:
322       {
323         uint m;
324         if (v.val == 0)
325           my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
326         else if (v.val % (m= 60*60*24) == 0)
327           my_snprintf(ptr, sz, "%s%s=%ud", sep, v.name, v.val / m);
328         else if (v.val % (m= 60*60) == 0)
329           my_snprintf(ptr, sz, "%s%s=%uh", sep, v.name, v.val / m);
330         else if (v.val % (m= 60) == 0)
331           my_snprintf(ptr, sz, "%s%s=%um", sep, v.name, v.val / m);
332         else
333           my_snprintf(ptr, sz, "%s%s=%us", sep, v.name, v.val);
334       }
335       break;
336 
337     case Ndb_index_stat_opt::Umsec:
338       {
339         if (v.val == 0)
340           my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
341         else
342           my_snprintf(ptr, sz, "%s%s=%ums", sep, v.name, v.val);
343       }
344       break;
345 
346     default:
347       DBUG_ASSERT(false);
348       break;
349     }
350   }
351 
352   memset(str, 0, ndb_index_stat_option_sz);
353   strcpy(str, buf);
354   DBUG_PRINT("index_stat", ("str: \"%s\"", str));
355   DBUG_VOID_RETURN;
356 }
357 
358 static int
ndb_index_stat_option_parse(char * p,Ndb_index_stat_opt & opt)359 ndb_index_stat_option_parse(char* p, Ndb_index_stat_opt& opt)
360 {
361   DBUG_ENTER("ndb_index_stat_option_parse");
362 
363   char *r= strchr(p, '=');
364   if (r == 0)
365     DBUG_RETURN(-1);
366   *r++= 0;
367 
368   while (isspace(*r))
369     *r++= 0;
370   if (*r == 0)
371     DBUG_RETURN(-1);
372 
373   bool found= false;
374   const uint imax= Ndb_index_stat_opt::Imax;
375   for (uint i= 0; i < imax; i++)
376   {
377     Ndb_index_stat_opt::Val& v= opt.val[i];
378     if (strcmp(p, v.name) != 0)
379       continue;
380     found= true;
381 
382     char *s;
383     for (s= r; *s != 0; s++)
384       *s= tolower(*s);
385     ulonglong val= my_strtoull(r, &s, 10);
386 
387     switch (v.unit) {
388     case Ndb_index_stat_opt::Ubool:
389       {
390         if ((s > r && *s == 0 && val == 0) ||
391             strcmp(r, "off") == 0 ||
392             strcmp(r, "false") == 0)
393           val= 0;
394         else if ((s > r && *s == 0 && val == 1) ||
395             strcmp(r, "on") == 0 ||
396             strcmp(r, "true") == 0)
397           val= 1;
398         else
399           DBUG_RETURN(-1);
400         v.val= (uint)val;
401       }
402       break;
403 
404     case Ndb_index_stat_opt::Usize:
405       {
406         if (s == r)
407           DBUG_RETURN(-1);
408         if (strcmp(s, "") == 0)
409           ;
410         else if (strcmp(s, "k") == 0)
411           val*= 1024;
412         else if (strcmp(s, "m") == 0)
413           val*= 1024*1024;
414         else if (strcmp(s, "g") == 0)
415           val*= 1024*1024*1024;
416         else
417           DBUG_RETURN(-1);
418         if (val < v.minval || val > v.maxval)
419           DBUG_RETURN(-1);
420         v.val= (uint)val;
421       }
422       break;
423 
424     case Ndb_index_stat_opt::Utime:
425       {
426         if (s == r)
427           DBUG_RETURN(-1);
428         if (strcmp(s, "") == 0)
429           ;
430         else if (strcmp(s, "s") == 0)
431           ;
432         else if (strcmp(s, "m") == 0)
433           val*= 60;
434         else if (strcmp(s, "h") == 0)
435           val*= 60*60;
436         else if (strcmp(s, "d") == 0)
437           val*= 24*60*60;
438         else
439           DBUG_RETURN(-1);
440         if (val < v.minval || val > v.maxval)
441           DBUG_RETURN(-1);
442         v.val= (uint)val;
443       }
444       break;
445 
446     case Ndb_index_stat_opt::Umsec:
447       {
448         if (s == r)
449           DBUG_RETURN(-1);
450         if (strcmp(s, "") == 0)
451           ;
452         else if (strcmp(s, "ms") == 0)
453           ;
454         else
455           DBUG_RETURN(-1);
456         if (val < v.minval || val > v.maxval)
457           DBUG_RETURN(-1);
458         v.val= (uint)val;
459       }
460       break;
461 
462     default:
463       DBUG_ASSERT(false);
464       break;
465     }
466   }
467 
468   if (!found)
469     DBUG_RETURN(-1);
470   DBUG_RETURN(0);
471 }
472 
473 /* Copy option string to option struct */
474 static int
ndb_index_stat_str2opt(const char * str,Ndb_index_stat_opt & opt)475 ndb_index_stat_str2opt(const char *str, Ndb_index_stat_opt& opt)
476 {
477   DBUG_ENTER("ndb_index_stat_str2opt");
478   DBUG_PRINT("index_stat", ("str: \"%s\"", str));
479 
480   char buf[ndb_index_stat_option_sz];
481 
482   assert(str != 0);
483   if (strlen(str) >= sizeof(buf))
484     DBUG_RETURN(-1);
485   strcpy(buf, str);
486 
487   char *p= buf;
488   while (1)
489   {
490     while (isspace(*p))
491       p++;
492     if (*p == 0)
493       break;
494 
495     char *q= strchr(p, ',');
496     if (q == p)
497       DBUG_RETURN(-1);
498     if (q != 0)
499       *q= 0;
500 
501     DBUG_PRINT("index_stat", ("parse: %s", p));
502     if (ndb_index_stat_option_parse(p, opt) == -1)
503       DBUG_RETURN(-1);
504 
505     if (q == 0)
506       break;
507     p= q + 1;
508   }
509 
510   ndb_index_stat_opt2str(opt, opt.option);
511   DBUG_RETURN(0);
512 }
513 
514 /* Thanks to ha_innodb.cc */
515 
516 /* Need storage between check and update (assume locked) */
517 static char ndb_index_stat_option_tmp[ndb_index_stat_option_sz];
518 
519 int
ndb_index_stat_option_check(MYSQL_THD,struct st_mysql_sys_var * var,void * save,struct st_mysql_value * value)520 ndb_index_stat_option_check(MYSQL_THD,
521                             struct st_mysql_sys_var *var,
522                             void *save,
523                             struct st_mysql_value *value)
524 {
525   DBUG_ENTER("ndb_index_stat_option_check");
526   char buf[ndb_index_stat_option_sz];
527   int len= sizeof(buf);
528   const char *str= value->val_str(value, buf, &len);
529   if (str != 0)
530   {
531     /* Seems to be nothing in buf */
532     DBUG_PRINT("index_stat", ("str: %s len: %d", str, len));
533     char buf2[ndb_index_stat_option_sz];
534     Ndb_index_stat_opt opt(buf2);
535     if (ndb_index_stat_str2opt(str, opt) == 0)
536     {
537       /* Passed to update */
538       strcpy(ndb_index_stat_option_tmp, str);
539       *(const char**)save= ndb_index_stat_option_tmp;
540       DBUG_RETURN(0);
541     }
542   }
543   DBUG_RETURN(1);
544 }
545 
546 void
ndb_index_stat_option_update(MYSQL_THD,struct st_mysql_sys_var * var,void * var_ptr,const void * save)547 ndb_index_stat_option_update(MYSQL_THD,
548                              struct st_mysql_sys_var *var,
549                              void *var_ptr,
550                              const void *save)
551 {
552   DBUG_ENTER("ndb_index_stat_option_update");
553   const char *str= *(const char**)save;
554   DBUG_PRINT("index_stat", ("str: %s", str));
555   Ndb_index_stat_opt& opt= ndb_index_stat_opt;
556   int ret= ndb_index_stat_str2opt(str, opt);
557   assert(ret == 0); NDB_IGNORE_VALUE(ret);
558   *(const char**)var_ptr= ndb_index_stat_opt.option;
559   DBUG_VOID_RETURN;
560 }
561 
562 /* Global stuff */
563 
564 struct Ndb_index_stat_glob {
565   bool th_allow;          /* Queries allowed */
566   bool th_enable;         /* Stats thread idea of ndb_index_stat_enable */
567   bool th_busy;           /* Stats thread is busy-looping */
568   uint th_loop;           /* Stats thread current loop wait in ms */
569   uint force_update;
570   uint wait_update;
571   uint no_stats;
572   uint wait_stats;
573   /* Accumulating counters */
574   uint analyze_count;     /* Client counters */
575   uint analyze_error;
576   uint query_count;
577   uint query_no_stats;
578   uint query_error;
579   uint event_act;         /* Events acted on */
580   uint event_skip;        /* Events skipped (likely event-to-self) */
581   uint event_miss;        /* Events received for unknown index */
582   uint refresh_count;     /* Successful cache refreshes */
583   uint clean_count;       /* Times old caches (1 or more) cleaned */
584   uint pinned_count;      /* Times not cleaned due to old cache ref count */
585   uint drop_count;        /* From index drop */
586   uint evict_count;       /* From LRU cleanup */
587   /* Cache */
588   uint cache_query_bytes; /* In use */
589   uint cache_clean_bytes; /* Obsolete versions not yet removed */
590   uint cache_high_bytes;  /* Max ever of above */
591   uint cache_drop_bytes;  /* Part of above waiting to be evicted */
592   uint cache_evict_bytes; /* Part of above waiting to be evicted */
593   char status[2][1024];
594   uint status_i;
595 
596   Ndb_index_stat_glob();
597   void set_status();
598   void zero_total();
599 };
600 
Ndb_index_stat_glob()601 Ndb_index_stat_glob::Ndb_index_stat_glob()
602 {
603   th_allow= false;
604   th_enable= false;
605   th_busy= false;
606   th_loop= 0;
607   force_update= 0;
608   wait_update= 0;
609   no_stats= 0;
610   wait_stats= 0;
611   analyze_count= 0;
612   analyze_error= 0;
613   query_count= 0;
614   query_no_stats= 0;
615   query_error= 0;
616   event_act= 0;
617   event_skip= 0;
618   event_miss= 0;
619   refresh_count= 0;
620   clean_count= 0;
621   pinned_count= 0;
622   drop_count= 0;
623   evict_count= 0;
624   cache_query_bytes= 0;
625   cache_clean_bytes= 0;
626   cache_high_bytes= 0;
627   cache_drop_bytes= 0;
628   cache_evict_bytes= 0;
629   memset(status, 0, sizeof(status));
630   status_i= 0;
631 }
632 
633 /* Update status variable (must hold stat_mutex) */
634 void
set_status()635 Ndb_index_stat_glob::set_status()
636 {
637   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
638   char* p= status[status_i];
639 
640   // stats thread
641   th_allow= ndb_index_stat_allow();
642   sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%u",
643              th_allow, th_enable, th_busy, th_loop);
644   p+= strlen(p);
645 
646   // entry lists
647   strcpy(p, ",list:(");
648   p+= strlen(p);
649   uint list_count= 0;
650   for (int lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
651   {
652     const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
653     sprintf(p, "%s:%u,", list.name, list.count);
654     p+= strlen(p);
655     list_count+= list.count;
656   }
657   sprintf(p, "%s:%u)", "total", list_count);
658   p+= strlen(p);
659 
660   // special counters
661   sprintf(p, ",analyze:(queue:%u,wait:%u)", force_update, wait_update);
662   p+= strlen(p);
663   sprintf(p, ",stats:(nostats:%u,wait:%u)", no_stats, wait_stats);
664   p+= strlen(p);
665 
666   // accumulating counters
667   sprintf(p, ",total:(");
668   p+= strlen(p);
669   sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
670   p+= strlen(p);
671   sprintf(p, ",query:(all:%u,nostats:%u,error:%u)",
672              query_count, query_no_stats, query_error);
673   p+= strlen(p);
674   sprintf(p, ",event:(act:%u,skip:%u,miss:%u)",
675              event_act, event_skip, event_miss);
676   p+= strlen(p);
677   sprintf(p, ",cache:(refresh:%u,clean:%u,pinned:%u,drop:%u,evict:%u)",
678              refresh_count, clean_count, pinned_count, drop_count, evict_count);
679   p+= strlen(p);
680   sprintf(p, ")");
681   p+= strlen(p);
682 
683   // cache size
684   const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
685   const uint cache_total= cache_query_bytes + cache_clean_bytes;
686   double cache_pct= (double)0.0;
687   double cache_high_pct= (double)0.0;
688   if (cache_limit != 0)
689   {
690     cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
691     cache_high_pct= (double)100.0 * (double)cache_high_bytes / (double)cache_limit;
692   }
693   sprintf(p, ",cache:(query:%u,clean:%u"
694              ",drop:%u,evict:%u"
695              ",usedpct:%.2f,highpct:%.2f)",
696              cache_query_bytes, cache_clean_bytes,
697              cache_drop_bytes, cache_evict_bytes,
698              cache_pct, cache_high_pct);
699   p+= strlen(p);
700 
701   // alternating status buffers to keep this lock short
702   mysql_mutex_lock(&LOCK_global_system_variables);
703   g_ndb_status_index_stat_status= status[status_i];
704   status_i= (status_i + 1) % 2;
705   g_ndb_status_index_stat_cache_query= cache_query_bytes;
706   g_ndb_status_index_stat_cache_clean= cache_clean_bytes;
707   mysql_mutex_unlock(&LOCK_global_system_variables);
708 }
709 
710 /* Zero accumulating counters */
711 void
zero_total()712 Ndb_index_stat_glob::zero_total()
713 {
714   analyze_count= 0;
715   analyze_error= 0;
716   query_count= 0;
717   query_no_stats= 0;
718   query_error= 0;
719   event_act= 0;
720   event_skip= 0;
721   event_miss= 0;
722   refresh_count= 0;
723   clean_count= 0;
724   pinned_count= 0;
725   drop_count= 0;
726   evict_count= 0;
727   /* Reset highest use seen to current */
728   cache_high_bytes= cache_query_bytes + cache_clean_bytes;
729 }
730 
731 static Ndb_index_stat_glob ndb_index_stat_glob;
732 
733 /* Shared index entries */
734 
Ndb_index_stat()735 Ndb_index_stat::Ndb_index_stat()
736 {
737   is= 0;
738   index_id= 0;
739   index_version= 0;
740 #ifndef DBUG_OFF
741   memset(id, 0, sizeof(id));
742 #endif
743   access_time= 0;
744   update_time= 0;
745   load_time= 0;
746   read_time= 0;
747   sample_version= 0;
748   check_time= 0;
749   query_bytes= 0;
750   clean_bytes= 0;
751   drop_bytes= 0;
752   evict_bytes= 0;
753   force_update= false;
754   no_stats= false;
755   error_time= 0;
756   error_count= 0;
757   share_next= 0;
758   lt= 0;
759   lt_old= 0;
760   list_next= 0;
761   list_prev= 0;
762   share= 0;
763   ref_count= 0;
764   to_delete= false;
765   abort_request= false;
766 }
767 
768 /*
769   Called by stats thread and (rarely) by client.  Caller must hold
770   stat_mutex.  Client errors currently have no effect on execution
771   since they are probably local e.g. bad range (internal error).
772   Argument "from" is 0=stats thread 1=client.
773 */
774 static void
ndb_index_stat_error(Ndb_index_stat * st,int from,const char * place,int line)775 ndb_index_stat_error(Ndb_index_stat *st,
776                      int from, const char* place, int line)
777 {
778   time_t now= ndb_index_stat_time();
779   NdbIndexStat::Error error= st->is->getNdbError();
780   if (error.code == 0)
781   {
782     /* Make sure code is not 0 */
783     NdbIndexStat::Error error2;
784     error= error2;
785     error.code= NdbIndexStat::InternalError;
786     error.status= NdbError::TemporaryError;
787   }
788   if (from == 0)
789   {
790     st->error= error;
791     st->error_time= now; /* Controls proc_error */
792   }
793   else
794     st->client_error= error;
795   st->error_count++;
796 
797   DBUG_PRINT("index_stat", ("%s line %d: error %d line %d extra %d",
798                             place, line, error.code, error.line, error.extra));
799 }
800 
801 static void
ndb_index_stat_clear_error(Ndb_index_stat * st)802 ndb_index_stat_clear_error(Ndb_index_stat *st)
803 {
804   st->error.code= 0;
805   st->error.status= NdbError::Success;
806 }
807 
808 /* Lists across shares */
809 
Ndb_index_stat_list(int the_lt,const char * the_name)810 Ndb_index_stat_list::Ndb_index_stat_list(int the_lt, const char* the_name)
811 {
812   lt= the_lt;
813   name= the_name;
814   head= 0;
815   tail= 0;
816   count= 0;
817 }
818 
819 Ndb_index_stat_list ndb_index_stat_list[Ndb_index_stat::LT_Count] = {
820   Ndb_index_stat_list(0, 0),
821   Ndb_index_stat_list(Ndb_index_stat::LT_New,    "new"),
822   Ndb_index_stat_list(Ndb_index_stat::LT_Update, "update"),
823   Ndb_index_stat_list(Ndb_index_stat::LT_Read,   "read"),
824   Ndb_index_stat_list(Ndb_index_stat::LT_Idle,   "idle"),
825   Ndb_index_stat_list(Ndb_index_stat::LT_Check,  "check"),
826   Ndb_index_stat_list(Ndb_index_stat::LT_Delete, "delete"),
827   Ndb_index_stat_list(Ndb_index_stat::LT_Error,  "error")
828 };
829 
830 static void
ndb_index_stat_list_add(Ndb_index_stat * st,int lt)831 ndb_index_stat_list_add(Ndb_index_stat* st, int lt)
832 {
833   assert(st != 0 && st->lt == 0);
834   assert(st->list_next == 0 && st->list_prev == 0);
835   assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
836   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
837 
838   DBUG_PRINT("index_stat", ("st %s -> %s", st->id, list.name));
839 
840   if (list.count == 0)
841   {
842     assert(list.head == 0 && list.tail == 0);
843     list.head= st;
844     list.tail= st;
845   }
846   else
847   {
848     assert(list.tail != 0 && list.tail->list_next == 0);
849     st->list_prev= list.tail;
850     list.tail->list_next= st;
851     list.tail= st;
852   }
853   list.count++;
854 
855   st->lt= lt;
856 }
857 
858 static void
ndb_index_stat_list_remove(Ndb_index_stat * st)859 ndb_index_stat_list_remove(Ndb_index_stat* st)
860 {
861   assert(st != 0);
862   int lt= st->lt;
863   assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
864   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
865 
866   DBUG_PRINT("index_stat", ("st %s <- %s", st->id, list.name));
867 
868   Ndb_index_stat* next= st->list_next;
869   Ndb_index_stat* prev= st->list_prev;
870 
871   if (list.head == st)
872     list.head= next;
873   if (list.tail == st)
874     list.tail= prev;
875   assert(list.count != 0);
876   list.count--;
877 
878   if (next != 0)
879     next->list_prev= prev;
880   if (prev != 0)
881     prev->list_next= next;
882 
883   st->lt= 0;
884   st->lt_old= 0;
885   st->list_next= 0;
886   st->list_prev= 0;
887 }
888 
889 static void
ndb_index_stat_list_move(Ndb_index_stat * st,int lt)890 ndb_index_stat_list_move(Ndb_index_stat *st, int lt)
891 {
892   assert(st != 0);
893   ndb_index_stat_list_remove(st);
894   ndb_index_stat_list_add(st, lt);
895 }
896 
897 /* Stats entry changes (must hold stat_mutex) */
898 
899 static void
ndb_index_stat_force_update(Ndb_index_stat * st,bool onoff)900 ndb_index_stat_force_update(Ndb_index_stat *st, bool onoff)
901 {
902   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
903   if (onoff)
904   {
905     if (!st->force_update)
906     {
907       glob.force_update++;
908       st->force_update= true;
909       glob.set_status();
910     }
911   }
912   else
913   {
914     if (st->force_update)
915     {
916       assert(glob.force_update != 0);
917       glob.force_update--;
918       st->force_update= false;
919       glob.set_status();
920     }
921   }
922 }
923 
924 static void
ndb_index_stat_no_stats(Ndb_index_stat * st,bool flag)925 ndb_index_stat_no_stats(Ndb_index_stat *st, bool flag)
926 {
927   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
928   if (st->no_stats != flag)
929   {
930     if (flag)
931     {
932       glob.no_stats++;
933       st->no_stats= true;
934     }
935     else
936     {
937       assert(glob.no_stats >= 1);
938       glob.no_stats-= 1;
939       st->no_stats= false;
940     }
941     glob.set_status();
942   }
943 }
944 
945 static void
ndb_index_stat_ref_count(Ndb_index_stat * st,bool flag)946 ndb_index_stat_ref_count(Ndb_index_stat *st, bool flag)
947 {
948   uint old_count= st->ref_count;
949   (void)old_count; // USED
950   if (flag)
951   {
952     st->ref_count++;
953   }
954   else
955   {
956     assert(st->ref_count != 0);
957     st->ref_count--;
958   }
959   DBUG_PRINT("index_stat", ("st %s ref_count:%u->%u",
960                             st->id, old_count, st->ref_count));
961 }
962 
963 /* Find or add entry under the share */
964 
965 /* Saved in get_share() under stat_mutex */
966 struct Ndb_index_stat_snap {
967   time_t load_time;
968   uint sample_version;
969   uint error_count;
Ndb_index_stat_snapNdb_index_stat_snap970   Ndb_index_stat_snap() { load_time= 0; sample_version= 0; error_count= 0; }
971 };
972 
973 /* Subroutine, have lock */
974 static Ndb_index_stat*
ndb_index_stat_alloc(const NDBINDEX * index,const NDBTAB * table,int & err_out)975 ndb_index_stat_alloc(const NDBINDEX *index,
976                      const NDBTAB *table,
977                      int &err_out)
978 {
979   err_out= 0;
980   Ndb_index_stat *st= new Ndb_index_stat;
981   NdbIndexStat *is= new NdbIndexStat;
982   if (st != 0 && is != 0)
983   {
984     st->is= is;
985     st->index_id= index->getObjectId();
986     st->index_version= index->getObjectVersion();
987 #ifndef DBUG_OFF
988     my_snprintf(st->id, sizeof(st->id), "%d.%d", st->index_id, st->index_version);
989 #endif
990     if (is->set_index(*index, *table) == 0)
991       return st;
992     ndb_index_stat_error(st, 1, "set_index", __LINE__);
993     err_out= st->client_error.code;
994   }
995   else
996   {
997     err_out= NdbIndexStat::NoMemError;
998   }
999   if (is != 0)
1000     delete is;
1001   if (st != 0)
1002     delete st;
1003   return 0;
1004 }
1005 
1006 /* Subroutine, have lock */
1007 static Ndb_index_stat*
ndb_index_stat_find_share(NDB_SHARE * share,const NDBINDEX * index,Ndb_index_stat * & st_last)1008 ndb_index_stat_find_share(NDB_SHARE *share,
1009                           const NDBINDEX *index,
1010                           Ndb_index_stat *&st_last)
1011 {
1012   struct Ndb_index_stat *st= share->index_stat_list;
1013   st_last= 0;
1014   while (st != 0)
1015   {
1016     assert(st->share == share);
1017     assert(st->is != 0);
1018     NdbIndexStat::Head head;
1019     st->is->get_head(head);
1020     if (head.m_indexId == (uint)index->getObjectId() &&
1021         head.m_indexVersion == (uint)index->getObjectVersion())
1022       break;
1023     st_last= st;
1024     st= st->share_next;
1025   }
1026   return st;
1027 }
1028 
1029 /* Subroutine, have lock */
1030 static void
ndb_index_stat_add_share(NDB_SHARE * share,Ndb_index_stat * st,Ndb_index_stat * st_last)1031 ndb_index_stat_add_share(NDB_SHARE *share,
1032                          Ndb_index_stat *st,
1033                          Ndb_index_stat *st_last)
1034 {
1035   st->share= share;
1036   if (st_last == 0)
1037     share->index_stat_list= st;
1038   else
1039     st_last->share_next= st;
1040 }
1041 
1042 static Ndb_index_stat*
ndb_index_stat_get_share(NDB_SHARE * share,const NDBINDEX * index,const NDBTAB * table,Ndb_index_stat_snap & snap,int & err_out,bool allow_add,bool force_update)1043 ndb_index_stat_get_share(NDB_SHARE *share,
1044                          const NDBINDEX *index,
1045                          const NDBTAB *table,
1046                          Ndb_index_stat_snap &snap,
1047                          int &err_out,
1048                          bool allow_add,
1049                          bool force_update)
1050 {
1051   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1052 
1053   native_mutex_lock(&share->mutex);
1054   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1055   time_t now= ndb_index_stat_time();
1056   err_out= 0;
1057 
1058   struct Ndb_index_stat *st= 0;
1059   struct Ndb_index_stat *st_last= 0;
1060   do
1061   {
1062     if (unlikely(!ndb_index_stat_allow()))
1063     {
1064       err_out= NdbIndexStat::MyNotAllow;
1065       break;
1066     }
1067     st= ndb_index_stat_find_share(share, index, st_last);
1068     if (st == 0)
1069     {
1070       if (!allow_add)
1071       {
1072         err_out= NdbIndexStat::MyNotFound;
1073         break;
1074       }
1075       st= ndb_index_stat_alloc(index, table, err_out);
1076       if (st == 0)
1077       {
1078         assert(err_out != 0);
1079         break;
1080       }
1081       ndb_index_stat_add_share(share, st, st_last);
1082       ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
1083       glob.set_status();
1084     }
1085     else if (unlikely(st->abort_request))
1086     {
1087       err_out= NdbIndexStat::MyAbortReq;
1088       break;
1089     }
1090     if (force_update)
1091       ndb_index_stat_force_update(st, true);
1092     snap.load_time= st->load_time;
1093     snap.sample_version= st->sample_version;
1094     snap.error_count= st->error_count;
1095     st->access_time= now;
1096   }
1097   while (0);
1098 
1099   if (err_out == 0)
1100   {
1101     assert(st != 0);
1102     ndb_index_stat_ref_count(st, true);
1103   }
1104   else
1105     st= 0;
1106 
1107   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1108   native_mutex_unlock(&share->mutex);
1109   return st;
1110 }
1111 
1112 /*
1113   Prepare to delete index stat entry.  Remove it from per-share
1114   list and set "to_delete" flag.  Stats thread does real delete.
1115 */
1116 
1117 /* caller must hold stat_mutex */
1118 static void
ndb_index_stat_free(Ndb_index_stat * st)1119 ndb_index_stat_free(Ndb_index_stat *st)
1120 {
1121   DBUG_ENTER("ndb_index_stat_free");
1122   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1123   NDB_SHARE *share= st->share;
1124   assert(share != 0);
1125 
1126   Ndb_index_stat *st_head= 0;
1127   Ndb_index_stat *st_tail= 0;
1128   Ndb_index_stat *st_loop= share->index_stat_list;
1129   uint found= 0;
1130   while (st_loop != 0)
1131   {
1132     if (st == st_loop)
1133     {
1134       DBUG_PRINT("index_stat", ("st %s stat free one", st->id));
1135       st_loop= st_loop->share_next;
1136       st->share_next= 0;
1137       st->share= 0;
1138       assert(st->lt != 0);
1139       assert(st->lt != Ndb_index_stat::LT_Delete);
1140       assert(!st->to_delete);
1141       st->to_delete= true;
1142       st->abort_request= true;
1143       found++;
1144     }
1145     else
1146     {
1147       if (st_head == 0)
1148         st_head= st_loop;
1149       else
1150         st_tail->share_next= st_loop;
1151       st_tail= st_loop;
1152       st_loop= st_loop->share_next;
1153       st_tail->share_next= 0;
1154     }
1155   }
1156   assert(found == 1);
1157   share->index_stat_list= st_head;
1158 
1159   glob.set_status();
1160   DBUG_VOID_RETURN;
1161 }
1162 
1163 /* Interface to online drop index */
1164 void
ndb_index_stat_free(NDB_SHARE * share,int index_id,int index_version)1165 ndb_index_stat_free(NDB_SHARE *share, int index_id, int index_version)
1166 {
1167   DBUG_ENTER("ndb_index_stat_free");
1168   DBUG_PRINT("index_stat", ("(index_id:%d index_version:%d",
1169                             index_id, index_version));
1170   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1171   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1172 
1173   uint found= 0;
1174   Ndb_index_stat *st= share->index_stat_list;
1175   while (st != 0)
1176   {
1177     if (st->index_id == index_id &&
1178         st->index_version == index_version)
1179     {
1180       ndb_index_stat_free(st);
1181       found++;
1182       glob.drop_count++;
1183       assert(st->drop_bytes == 0);
1184       st->drop_bytes= st->query_bytes + st->clean_bytes;
1185       glob.cache_drop_bytes+= st->drop_bytes;
1186       break;
1187     }
1188     st= st->share_next;
1189   }
1190 
1191   glob.set_status();
1192   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1193   DBUG_VOID_RETURN;
1194 }
1195 
1196 void
ndb_index_stat_free(NDB_SHARE * share)1197 ndb_index_stat_free(NDB_SHARE *share)
1198 {
1199   DBUG_ENTER("ndb_index_stat_free");
1200   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1201   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1202 
1203   uint found= 0;
1204   Ndb_index_stat *st;
1205   while ((st= share->index_stat_list) != 0)
1206   {
1207     DBUG_PRINT("index_stat", ("st %s stat free all", st->id));
1208     share->index_stat_list= st->share_next;
1209     st->share_next= 0;
1210     st->share= 0;
1211     assert(st->lt != 0);
1212     assert(st->lt != Ndb_index_stat::LT_Delete);
1213     assert(!st->to_delete);
1214     st->to_delete= true;
1215     st->abort_request= true;
1216     found++;
1217     glob.drop_count++;
1218     assert(st->drop_bytes == 0);
1219     st->drop_bytes+= st->query_bytes + st->clean_bytes;
1220     glob.cache_drop_bytes+= st->drop_bytes;
1221   }
1222 
1223   glob.set_status();
1224   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1225   DBUG_VOID_RETURN;
1226 }
1227 
1228 /* Find entry across shares */
1229 /* wl4124_todo mutex overkill, hash table, can we find table share */
1230 static Ndb_index_stat*
ndb_index_stat_find_entry(int index_id,int index_version,int table_id)1231 ndb_index_stat_find_entry(int index_id, int index_version, int table_id)
1232 {
1233   DBUG_ENTER("ndb_index_stat_find_entry");
1234   native_mutex_lock(&ndbcluster_mutex);
1235   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1236   DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
1237                             index_id, index_version, table_id));
1238 
1239   int lt;
1240   for (lt=1; lt < Ndb_index_stat::LT_Count; lt++)
1241   {
1242     Ndb_index_stat *st=ndb_index_stat_list[lt].head;
1243     while (st != 0)
1244     {
1245       if (st->index_id == index_id &&
1246           st->index_version == index_version)
1247       {
1248         native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1249         native_mutex_unlock(&ndbcluster_mutex);
1250         DBUG_RETURN(st);
1251       }
1252       st= st->list_next;
1253     }
1254   }
1255 
1256   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1257   native_mutex_unlock(&ndbcluster_mutex);
1258   DBUG_RETURN(0);
1259 }
1260 
1261 /* Statistics thread sub-routines */
1262 
1263 static void
ndb_index_stat_cache_move(Ndb_index_stat * st)1264 ndb_index_stat_cache_move(Ndb_index_stat *st)
1265 {
1266   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1267   NdbIndexStat::CacheInfo infoBuild;
1268   NdbIndexStat::CacheInfo infoQuery;
1269 
1270   st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
1271   st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
1272   const uint new_query_bytes= infoBuild.m_totalBytes;
1273   const uint old_query_bytes= infoQuery.m_totalBytes;
1274   DBUG_PRINT("index_stat", ("st %s cache move: query:%u clean:%u",
1275                             st->id, new_query_bytes, old_query_bytes));
1276   st->is->move_cache();
1277   st->query_bytes= new_query_bytes;
1278   st->clean_bytes+= old_query_bytes;
1279   assert(glob.cache_query_bytes >= old_query_bytes);
1280   glob.cache_query_bytes-= old_query_bytes;
1281   glob.cache_query_bytes+= new_query_bytes;
1282   glob.cache_clean_bytes+= old_query_bytes;
1283   const uint cache_total= glob.cache_query_bytes + glob.cache_clean_bytes;
1284   if (glob.cache_high_bytes < cache_total)
1285     glob.cache_high_bytes= cache_total;
1286 }
1287 
1288 static bool
ndb_index_stat_cache_clean(Ndb_index_stat * st)1289 ndb_index_stat_cache_clean(Ndb_index_stat *st)
1290 {
1291   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1292   NdbIndexStat::CacheInfo infoClean;
1293 
1294   st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
1295   const uint old_clean_bytes= infoClean.m_totalBytes;
1296   const uint ref_count= infoClean.m_ref_count;
1297   DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u ref_count:%u",
1298                             st->id, old_clean_bytes, ref_count));
1299   if (ref_count != 0)
1300     return false;
1301   st->is->clean_cache();
1302   st->clean_bytes= 0;
1303   assert(glob.cache_clean_bytes >= old_clean_bytes);
1304   glob.cache_clean_bytes-= old_clean_bytes;
1305   return true;
1306 }
1307 
1308 static void
ndb_index_stat_cache_evict(Ndb_index_stat * st)1309 ndb_index_stat_cache_evict(Ndb_index_stat *st)
1310 {
1311   NdbIndexStat::Head head;
1312   NdbIndexStat::CacheInfo infoBuild;
1313   NdbIndexStat::CacheInfo infoQuery;
1314   NdbIndexStat::CacheInfo infoClean;
1315   st->is->get_head(head);
1316   st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
1317   st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
1318   st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
1319 
1320   DBUG_PRINT("index_stat",
1321              ("evict table: %u index: %u version: %u"
1322               " sample version: %u"
1323               " cache bytes build:%u query:%u clean:%u",
1324               head.m_tableId, head.m_indexId, head.m_indexVersion,
1325               head.m_sampleVersion,
1326               infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
1327 
1328   /* Twice to move all caches to clean */
1329   ndb_index_stat_cache_move(st);
1330   ndb_index_stat_cache_move(st);
1331   /* Unused variable release vs debug nonsense */
1332   bool ok= false;
1333   (void)ok; // USED
1334   ok= ndb_index_stat_cache_clean(st);
1335   assert(ok);
1336 }
1337 
1338 /* Misc in/out parameters for process steps */
1339 struct Ndb_index_stat_proc {
1340   NdbIndexStat* is_util; // For metadata and polling
1341   Ndb *ndb;
1342   time_t start; // start of current processing slice
1343   time_t now;
1344   int lt;
1345   bool busy;
1346   bool end;
1347 #ifndef DBUG_OFF
1348   uint cache_query_bytes;
1349   uint cache_clean_bytes;
1350 #endif
Ndb_index_stat_procNdb_index_stat_proc1351   Ndb_index_stat_proc() :
1352     is_util(0),
1353     ndb(0),
1354     now(0),
1355     lt(0),
1356     busy(false),
1357     end(false)
1358   {}
1359 
~Ndb_index_stat_procNdb_index_stat_proc1360   ~Ndb_index_stat_proc()
1361   {
1362     assert(ndb == NULL);
1363   }
1364 
init_ndbNdb_index_stat_proc1365   bool init_ndb(Ndb_cluster_connection* connection)
1366   {
1367     assert(ndb == NULL); // Should not have been created yet
1368     assert(connection);
1369 
1370     ndb= new Ndb(connection, "");
1371     if (!ndb)
1372       return false;
1373 
1374     if (ndb->setNdbObjectName("Ndb Index Statistics monitoring"))
1375     {
1376       sql_print_error("ndb_index_stat_proc: Failed to set object name, "
1377                       "error code %d", ndb->getNdbError().code);
1378     }
1379 
1380     if (ndb->init() != 0)
1381     {
1382       sql_print_error("ndb_index_stat_proc: Failed to init Ndb object");
1383       return false;
1384     }
1385 
1386     if (ndb->setDatabaseName(NDB_INDEX_STAT_DB) != 0)
1387     {
1388       sql_print_error("ndb_index_stat_proc: Failed to change database to %s",
1389                       NDB_INDEX_STAT_DB);
1390       return false;
1391     }
1392 
1393     sql_print_information("ndb_index_stat_proc: Created Ndb object, "
1394                           "reference: 0x%x, name: '%s'",
1395 			  ndb->getReference(), ndb->getNdbObjectName());
1396     return true;
1397   }
1398 
destroyNdb_index_stat_proc1399   void destroy(void)
1400   {
1401     delete ndb;
1402     ndb= NULL;
1403   }
1404 };
1405 
1406 static void
ndb_index_stat_proc_new(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1407 ndb_index_stat_proc_new(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1408 {
1409   assert(st->error.code == 0);
1410   if (st->force_update)
1411     pr.lt= Ndb_index_stat::LT_Update;
1412   else
1413     pr.lt= Ndb_index_stat::LT_Read;
1414 }
1415 
1416 static void
ndb_index_stat_proc_new(Ndb_index_stat_proc & pr)1417 ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
1418 {
1419   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1420   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1421   const int lt= Ndb_index_stat::LT_New;
1422   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1423 
1424   Ndb_index_stat *st_loop= list.head;
1425   while (st_loop != 0)
1426   {
1427     Ndb_index_stat *st= st_loop;
1428     st_loop= st_loop->list_next;
1429     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1430     ndb_index_stat_proc_new(pr, st);
1431     assert(pr.lt != lt);
1432     ndb_index_stat_list_move(st, pr.lt);
1433   }
1434   glob.set_status();
1435   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1436 }
1437 
1438 static void
ndb_index_stat_proc_update(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1439 ndb_index_stat_proc_update(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1440 {
1441   if (st->is->update_stat(pr.ndb) == -1)
1442   {
1443     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1444     ndb_index_stat_error(st, 0, "update_stat", __LINE__);
1445 
1446     /*
1447       Turn off force update or else proc_error() thinks
1448       it is a new analyze request.
1449     */
1450     ndb_index_stat_force_update(st, false);
1451 
1452     native_cond_broadcast(&ndb_index_stat_thread.stat_cond);
1453     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1454 
1455     pr.lt= Ndb_index_stat::LT_Error;
1456     return;
1457   }
1458 
1459   pr.now= ndb_index_stat_time();
1460   st->update_time= pr.now;
1461   pr.lt= Ndb_index_stat::LT_Read;
1462 }
1463 
1464 static void
ndb_index_stat_proc_update(Ndb_index_stat_proc & pr)1465 ndb_index_stat_proc_update(Ndb_index_stat_proc &pr)
1466 {
1467   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1468   const int lt= Ndb_index_stat::LT_Update;
1469   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1470   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1471   const uint batch= opt.get(Ndb_index_stat_opt::Iupdate_batch);
1472 
1473   Ndb_index_stat *st_loop= list.head;
1474   uint cnt= 0;
1475   while (st_loop != 0 && cnt < batch)
1476   {
1477     Ndb_index_stat *st= st_loop;
1478     st_loop= st_loop->list_next;
1479     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1480     ndb_index_stat_proc_update(pr, st);
1481     assert(pr.lt != lt);
1482     ndb_index_stat_list_move(st, pr.lt);
1483     // db op so update status after each
1484     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1485     glob.set_status();
1486     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1487     cnt++;
1488   }
1489   if (cnt == batch)
1490     pr.busy= true;
1491 }
1492 
1493 static void
ndb_index_stat_proc_read(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1494 ndb_index_stat_proc_read(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1495 {
1496   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1497   NdbIndexStat::Head head;
1498   if (st->is->read_stat(pr.ndb) == -1)
1499   {
1500     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1501     ndb_index_stat_error(st, 0, "read_stat", __LINE__);
1502     const bool force_update= st->force_update;
1503     ndb_index_stat_force_update(st, false);
1504 
1505     /* no stats is not unexpected error, unless analyze was done */
1506     if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats &&
1507         !force_update)
1508     {
1509       ndb_index_stat_no_stats(st, true);
1510       pr.lt= Ndb_index_stat::LT_Idle;
1511     }
1512     else
1513     {
1514       pr.lt= Ndb_index_stat::LT_Error;
1515     }
1516 
1517     native_cond_broadcast(&ndb_index_stat_thread.stat_cond);
1518     pr.now= ndb_index_stat_time();
1519     st->check_time= pr.now;
1520     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1521     return;
1522   }
1523 
1524   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1525   pr.now= ndb_index_stat_time();
1526   st->is->get_head(head);
1527   st->load_time= (time_t)head.m_loadTime;
1528   st->read_time= pr.now;
1529   st->sample_version= head.m_sampleVersion;
1530   st->check_time= pr.now;
1531 
1532   ndb_index_stat_force_update(st, false);
1533   ndb_index_stat_no_stats(st, false);
1534 
1535   ndb_index_stat_cache_move(st);
1536   pr.lt= Ndb_index_stat::LT_Idle;
1537   glob.refresh_count++;
1538   native_cond_broadcast(&ndb_index_stat_thread.stat_cond);
1539   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1540 }
1541 
1542 static void
ndb_index_stat_proc_read(Ndb_index_stat_proc & pr)1543 ndb_index_stat_proc_read(Ndb_index_stat_proc &pr)
1544 {
1545   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1546   const int lt= Ndb_index_stat::LT_Read;
1547   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1548   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1549   const uint batch= opt.get(Ndb_index_stat_opt::Iread_batch);
1550 
1551   Ndb_index_stat *st_loop= list.head;
1552   uint cnt= 0;
1553   while (st_loop != 0 && cnt < batch)
1554   {
1555     Ndb_index_stat *st= st_loop;
1556     st_loop= st_loop->list_next;
1557     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1558     ndb_index_stat_proc_read(pr, st);
1559     assert(pr.lt != lt);
1560     ndb_index_stat_list_move(st, pr.lt);
1561     // db op so update status after each
1562     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1563     glob.set_status();
1564     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1565     cnt++;
1566   }
1567   if (cnt == batch)
1568     pr.busy= true;
1569 }
1570 
1571 static void
ndb_index_stat_proc_idle(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1572 ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1573 {
1574   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1575   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1576   const longlong clean_delay= opt.get(Ndb_index_stat_opt::Iclean_delay);
1577   const longlong check_delay= opt.get(Ndb_index_stat_opt::Icheck_delay);
1578 
1579   const longlong pr_now= (longlong)pr.now;
1580   const longlong st_read_time= (longlong)st->read_time;
1581   const longlong st_check_time= (longlong)st->check_time;
1582 
1583   const longlong clean_wait= st_read_time + clean_delay - pr_now;
1584   const longlong check_wait= st_check_time + check_delay - pr_now;
1585 
1586   DBUG_PRINT("index_stat", ("st %s clean_wait:%lld check_wait:%lld"
1587                             " force_update:%d to_delete:%d",
1588                             st->id, clean_wait, check_wait,
1589                             st->force_update, st->to_delete));
1590 
1591   if (st->to_delete)
1592   {
1593     pr.lt= Ndb_index_stat::LT_Delete;
1594     return;
1595   }
1596 
1597   if (st->clean_bytes != 0 && clean_wait <= 0)
1598   {
1599     if (ndb_index_stat_cache_clean(st))
1600       glob.clean_count++;
1601     else
1602       glob.pinned_count++;
1603   }
1604   if (st->force_update)
1605   {
1606     pr.lt= Ndb_index_stat::LT_Update;
1607     pr.busy= true;
1608     return;
1609   }
1610   if (check_wait <= 0)
1611   {
1612     // avoid creating "idle" entries on Check list
1613     const int lt_check= Ndb_index_stat::LT_Check;
1614     const Ndb_index_stat_list &list_check= ndb_index_stat_list[lt_check];
1615     const uint check_batch= opt.get(Ndb_index_stat_opt::Icheck_batch);
1616     if (list_check.count < check_batch)
1617     {
1618       pr.lt= Ndb_index_stat::LT_Check;
1619       return;
1620     }
1621   }
1622   pr.lt= Ndb_index_stat::LT_Idle;
1623 }
1624 
1625 static void
ndb_index_stat_proc_idle(Ndb_index_stat_proc & pr)1626 ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr)
1627 {
1628   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1629   const int lt= Ndb_index_stat::LT_Idle;
1630   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1631   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1632   uint batch= opt.get(Ndb_index_stat_opt::Iidle_batch);
1633   {
1634     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1635     const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1636     const int lt_update= Ndb_index_stat::LT_Update;
1637     const Ndb_index_stat_list &list_update= ndb_index_stat_list[lt_update];
1638     if (glob.force_update > list_update.count)
1639     {
1640       // probably there is a force update waiting on Idle list
1641       batch= ~(uint)0;
1642     }
1643     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1644   }
1645   // entry may be moved to end of this list
1646   if (batch > list.count)
1647     batch= list.count;
1648   pr.now= ndb_index_stat_time();
1649 
1650   Ndb_index_stat *st_loop= list.head;
1651   uint cnt= 0;
1652   while (st_loop != 0 && cnt < batch)
1653   {
1654     Ndb_index_stat *st= st_loop;
1655     st_loop= st_loop->list_next;
1656     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1657     ndb_index_stat_proc_idle(pr, st);
1658     // rotates list if entry remains LT_Idle
1659     ndb_index_stat_list_move(st, pr.lt);
1660     cnt++;
1661   }
1662   // full batch does not set pr.busy
1663   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1664   glob.set_status();
1665   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1666 }
1667 
1668 static void
ndb_index_stat_proc_check(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1669 ndb_index_stat_proc_check(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1670 {
1671   pr.now= ndb_index_stat_time();
1672   st->check_time= pr.now;
1673   NdbIndexStat::Head head;
1674   if (st->is->read_head(pr.ndb) == -1)
1675   {
1676     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1677     ndb_index_stat_error(st, 0, "read_head", __LINE__);
1678     /* no stats is not unexpected error */
1679     if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats)
1680     {
1681       ndb_index_stat_no_stats(st, true);
1682       pr.lt= Ndb_index_stat::LT_Idle;
1683     }
1684     else
1685     {
1686       pr.lt= Ndb_index_stat::LT_Error;
1687     }
1688     native_cond_broadcast(&ndb_index_stat_thread.stat_cond);
1689     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1690     return;
1691   }
1692   st->is->get_head(head);
1693   const uint version_old= st->sample_version;
1694   const uint version_new= head.m_sampleVersion;
1695   if (version_old != version_new)
1696   {
1697     DBUG_PRINT("index_stat", ("st %s sample version old:%u new:%u",
1698                               st->id, version_old, version_new));
1699     pr.lt= Ndb_index_stat::LT_Read;
1700     return;
1701   }
1702   pr.lt= Ndb_index_stat::LT_Idle;
1703 }
1704 
1705 static void
ndb_index_stat_proc_check(Ndb_index_stat_proc & pr)1706 ndb_index_stat_proc_check(Ndb_index_stat_proc &pr)
1707 {
1708   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1709   const int lt= Ndb_index_stat::LT_Check;
1710   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1711   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1712   const uint batch= opt.get(Ndb_index_stat_opt::Icheck_batch);
1713 
1714   Ndb_index_stat *st_loop= list.head;
1715   uint cnt= 0;
1716   while (st_loop != 0 && cnt < batch)
1717   {
1718     Ndb_index_stat *st= st_loop;
1719     st_loop= st_loop->list_next;
1720     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1721     ndb_index_stat_proc_check(pr, st);
1722     assert(pr.lt != lt);
1723     ndb_index_stat_list_move(st, pr.lt);
1724     // db op so update status after each
1725     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1726     glob.set_status();
1727     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1728     cnt++;
1729   }
1730   if (cnt == batch)
1731     pr.busy= true;
1732 }
1733 
1734 /* Check if need to evict more */
1735 static bool
ndb_index_stat_proc_evict()1736 ndb_index_stat_proc_evict()
1737 {
1738   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1739   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1740   uint curr_size= glob.cache_query_bytes + glob.cache_clean_bytes;
1741 
1742   /* Subtract bytes already scheduled for evict */
1743   assert(curr_size >= glob.cache_evict_bytes);
1744   curr_size-= glob.cache_evict_bytes;
1745 
1746   const uint cache_lowpct= opt.get(Ndb_index_stat_opt::Icache_lowpct);
1747   const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
1748   if (100 * curr_size <= cache_lowpct * cache_limit)
1749     return false;
1750   return true;
1751 }
1752 
1753 /* Check if st1 is better or as good to evict than st2 */
1754 static bool
ndb_index_stat_evict(const Ndb_index_stat * st1,const Ndb_index_stat * st2)1755 ndb_index_stat_evict(const Ndb_index_stat *st1,
1756                      const Ndb_index_stat *st2)
1757 {
1758   if (st1->access_time < st2->access_time)
1759     return true;
1760   if (st1->access_time == st2->access_time &&
1761       st1->query_bytes + st1->clean_bytes >=
1762       st2->query_bytes + st2->clean_bytes)
1763     return true;
1764   return false;
1765 }
1766 
1767 static void
ndb_index_stat_proc_evict(Ndb_index_stat_proc & pr,int lt)1768 ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr, int lt)
1769 {
1770   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1771   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1772   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1773   const uint batch= opt.get(Ndb_index_stat_opt::Ievict_batch);
1774   const longlong evict_delay= opt.get(Ndb_index_stat_opt::Ievict_delay);
1775   pr.now= ndb_index_stat_time();
1776   const longlong pr_now= (longlong)pr.now;
1777 
1778   if (!ndb_index_stat_proc_evict())
1779     return;
1780 
1781   /* Mutex entire routine (protect access_time) */
1782   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1783 
1784   /* Create a LRU batch */
1785   Ndb_index_stat* st_lru_arr[ndb_index_stat_max_evict_batch + 1];
1786   uint st_lru_cnt= 0;
1787   Ndb_index_stat *st_loop= list.head;
1788   while (st_loop != 0 && st_lru_cnt < batch)
1789   {
1790     Ndb_index_stat *st= st_loop;
1791     st_loop= st_loop->list_next;
1792     const longlong st_read_time= (longlong)st->read_time;
1793     if (st_read_time + evict_delay <= pr_now &&
1794         st->query_bytes + st->clean_bytes != 0 &&
1795         !st->to_delete)
1796     {
1797       /* Insertion sort into the batch from the end */
1798       if (st_lru_cnt == 0)
1799         st_lru_arr[st_lru_cnt++]= st;
1800       else
1801       {
1802         uint i= st_lru_cnt;
1803         while (i != 0)
1804         {
1805           const Ndb_index_stat *st1= st_lru_arr[i-1];
1806           if (ndb_index_stat_evict(st1, st))
1807           {
1808             /*
1809               The old entry at i-1 is preferred over st.
1810               Stop at first such entry.  Therefore entries
1811               after it (>= i) are less preferred than st.
1812             */
1813             break;
1814           }
1815           i--;
1816         }
1817         if (i < st_lru_cnt)
1818         {
1819           /*
1820             Some old entry is less preferred than st.  If this is
1821             true for all then i is 0 and st becomes new first entry.
1822             Otherwise st is inserted after i-1.  In both case entries
1823             >= i are shifted up.  The extra position at the end of
1824             st_lru_arr avoids a special case when the array is full.
1825           */
1826           uint j= st_lru_cnt;
1827           while (j > i)
1828           {
1829             st_lru_arr[j]= st_lru_arr[j-1];
1830             j--;
1831           }
1832           st_lru_arr[i]= st;
1833           if (st_lru_cnt < batch)
1834             st_lru_cnt++;
1835         }
1836       }
1837     }
1838   }
1839 
1840 #ifndef DBUG_OFF
1841   for (uint i=0; i < st_lru_cnt; i++)
1842   {
1843     Ndb_index_stat* st1= st_lru_arr[i];
1844     assert(!st1->to_delete && st1->share != 0);
1845     if (i + 1 < st_lru_cnt)
1846     {
1847       Ndb_index_stat* st2= st_lru_arr[i+1];
1848       assert(ndb_index_stat_evict(st1, st2));
1849     }
1850   }
1851 #endif
1852 
1853   /* Process the LRU batch */
1854   uint cnt= 0;
1855   while (cnt < st_lru_cnt)
1856   {
1857     if (!ndb_index_stat_proc_evict())
1858       break;
1859 
1860     Ndb_index_stat *st= st_lru_arr[cnt];
1861     DBUG_PRINT("index_stat", ("st %s proc evict %s", st->id, list.name));
1862 
1863     /* Entry may have requests.  Cache is evicted at delete. */
1864     ndb_index_stat_free(st);
1865     assert(st->evict_bytes == 0);
1866     st->evict_bytes= st->query_bytes + st->clean_bytes;
1867     glob.cache_evict_bytes+= st->evict_bytes;
1868     cnt++;
1869   }
1870   if (cnt == batch)
1871     pr.busy= true;
1872 
1873   glob.evict_count+= cnt;
1874   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1875 }
1876 
1877 static void
ndb_index_stat_proc_evict(Ndb_index_stat_proc & pr)1878 ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr)
1879 {
1880   ndb_index_stat_proc_evict(pr, Ndb_index_stat::LT_Error);
1881   ndb_index_stat_proc_evict(pr, Ndb_index_stat::LT_Idle);
1882 }
1883 
1884 static void
ndb_index_stat_proc_delete(Ndb_index_stat_proc & pr)1885 ndb_index_stat_proc_delete(Ndb_index_stat_proc &pr)
1886 {
1887   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1888   const int lt= Ndb_index_stat::LT_Delete;
1889   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1890   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1891   const uint delete_batch= opt.get(Ndb_index_stat_opt::Idelete_batch);
1892   const uint batch= !pr.end ? delete_batch : ~(uint)0;
1893 
1894   /* Mutex entire routine */
1895   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
1896 
1897   Ndb_index_stat *st_loop= list.head;
1898   uint cnt= 0;
1899   while (st_loop != 0 && cnt < batch)
1900   {
1901     Ndb_index_stat *st= st_loop;
1902     st_loop= st_loop->list_next;
1903     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1904 
1905     // adjust global counters at drop
1906     ndb_index_stat_force_update(st, false);
1907     ndb_index_stat_no_stats(st, false);
1908 
1909     /*
1910       Do not wait for requests to terminate since this could
1911       risk stats thread hanging.  Instead try again next time.
1912       Presumably clients will eventually notice abort_request.
1913     */
1914     if (st->ref_count != 0)
1915     {
1916       DBUG_PRINT("index_stat", ("st %s proc %s: ref_count:%u",
1917                  st->id, list.name, st->ref_count));
1918       continue;
1919     }
1920 
1921     ndb_index_stat_cache_evict(st);
1922     assert(glob.cache_drop_bytes >= st->drop_bytes);
1923     glob.cache_drop_bytes-= st->drop_bytes;
1924     assert(glob.cache_evict_bytes >= st->evict_bytes);
1925     glob.cache_evict_bytes-= st->evict_bytes;
1926     ndb_index_stat_list_remove(st);
1927     delete st->is;
1928     delete st;
1929     cnt++;
1930   }
1931   if (cnt == batch)
1932     pr.busy= true;
1933 
1934   glob.set_status();
1935   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
1936 }
1937 
1938 static void
ndb_index_stat_proc_error(Ndb_index_stat_proc & pr,Ndb_index_stat * st)1939 ndb_index_stat_proc_error(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
1940 {
1941   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1942   const longlong error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
1943 
1944   const longlong pr_now= (longlong)pr.now;
1945   const longlong st_error_time= (longlong)st->error_time;
1946   const longlong error_wait= st_error_time + error_delay - pr_now;
1947 
1948   DBUG_PRINT("index_stat", ("st %s error_wait:%lld error_count:%u"
1949                             " force_update:%d to_delete:%d",
1950                             st->id, error_wait, st->error_count,
1951                             st->force_update, st->to_delete));
1952 
1953   if (st->to_delete)
1954   {
1955     pr.lt= Ndb_index_stat::LT_Delete;
1956     return;
1957   }
1958 
1959   if (error_wait <= 0 ||
1960       /* Analyze issued after previous error */
1961       st->force_update)
1962   {
1963     ndb_index_stat_clear_error(st);
1964     if (st->force_update)
1965       pr.lt= Ndb_index_stat::LT_Update;
1966     else
1967       pr.lt= Ndb_index_stat::LT_Read;
1968     return;
1969   }
1970   pr.lt= Ndb_index_stat::LT_Error;
1971 }
1972 
1973 static void
ndb_index_stat_proc_error(Ndb_index_stat_proc & pr)1974 ndb_index_stat_proc_error(Ndb_index_stat_proc &pr)
1975 {
1976   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
1977   const int lt= Ndb_index_stat::LT_Error;
1978   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
1979   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
1980   uint batch= opt.get(Ndb_index_stat_opt::Ierror_batch);
1981   // entry may be moved to end of this list
1982   if (batch > list.count)
1983     batch= list.count;
1984   pr.now= ndb_index_stat_time();
1985 
1986   Ndb_index_stat *st_loop= list.head;
1987   uint cnt= 0;
1988   while (st_loop != 0 && cnt < batch)
1989   {
1990     Ndb_index_stat *st= st_loop;
1991     st_loop= st_loop->list_next;
1992     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
1993     ndb_index_stat_proc_error(pr, st);
1994     // rotates list if entry remains LT_Error
1995     ndb_index_stat_list_move(st, pr.lt);
1996     cnt++;
1997   }
1998   // full batch does not set pr.busy
1999   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2000   glob.set_status();
2001   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2002 }
2003 
2004 static void
ndb_index_stat_proc_event(Ndb_index_stat_proc & pr,Ndb_index_stat * st)2005 ndb_index_stat_proc_event(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
2006 {
2007   /*
2008     Put on Check list if idle.
2009     We get event also for our own analyze but this should not matter.
2010 
2011     bug#13524696
2012     The useless event-to-self makes an immediate second analyze wait
2013     for loop_idle time since the entry moves to LT_Check temporarily.
2014     Ignore the event if an update was done near this processing slice.
2015    */
2016   pr.lt= st->lt;
2017   if (st->lt == Ndb_index_stat::LT_Idle ||
2018       st->lt == Ndb_index_stat::LT_Error)
2019   {
2020     if (st->update_time < pr.start)
2021     {
2022       DBUG_PRINT("index_stat", ("st %s accept event for check", st->id));
2023       pr.lt= Ndb_index_stat::LT_Check;
2024     }
2025     else
2026     {
2027       DBUG_PRINT("index_stat", ("st %s ignore likely event to self", st->id));
2028     }
2029   }
2030   else
2031   {
2032     DBUG_PRINT("index_stat", ("st %s ignore event on lt=%d", st->id, st->lt));
2033   }
2034 }
2035 
2036 static void
ndb_index_stat_proc_event(Ndb_index_stat_proc & pr)2037 ndb_index_stat_proc_event(Ndb_index_stat_proc &pr)
2038 {
2039   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2040   NdbIndexStat *is= pr.is_util;
2041   Ndb *ndb= pr.ndb;
2042   int ret;
2043   ret= is->poll_listener(ndb, 0);
2044   DBUG_PRINT("index_stat", ("poll_listener ret: %d", ret));
2045   if (ret == -1)
2046   {
2047     // wl4124_todo report error
2048     DBUG_ASSERT(false);
2049     return;
2050   }
2051   if (ret == 0)
2052     return;
2053 
2054   while (1)
2055   {
2056     ret= is->next_listener(ndb);
2057     DBUG_PRINT("index_stat", ("next_listener ret: %d", ret));
2058     if (ret == -1)
2059     {
2060       // wl4124_todo report error
2061       DBUG_ASSERT(false);
2062       return;
2063     }
2064     if (ret == 0)
2065       break;
2066 
2067     NdbIndexStat::Head head;
2068     is->get_head(head);
2069     DBUG_PRINT("index_stat", ("next_listener eventType: %d indexId: %u",
2070                               head.m_eventType, head.m_indexId));
2071 
2072     Ndb_index_stat *st= ndb_index_stat_find_entry(head.m_indexId,
2073                                                   head.m_indexVersion,
2074                                                   head.m_tableId);
2075     /*
2076       Another process can update stats for an index which is not found
2077       in this mysqld.  Ignore it.
2078      */
2079     if (st != 0)
2080     {
2081       DBUG_PRINT("index_stat", ("st %s proc %s", st->id, "event"));
2082       ndb_index_stat_proc_event(pr, st);
2083       if (pr.lt != st->lt)
2084       {
2085         ndb_index_stat_list_move(st, pr.lt);
2086         glob.event_act++;
2087       }
2088       else
2089         glob.event_skip++;
2090     }
2091     else
2092     {
2093       DBUG_PRINT("index_stat", ("entry not found in this mysqld"));
2094       glob.event_miss++;
2095     }
2096   }
2097   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2098   glob.set_status();
2099   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2100 }
2101 
2102 /* Control options */
2103 
2104 static void
ndb_index_stat_proc_control(Ndb_index_stat_proc & pr)2105 ndb_index_stat_proc_control(Ndb_index_stat_proc &pr)
2106 {
2107   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2108   Ndb_index_stat_opt &opt= ndb_index_stat_opt;
2109 
2110   /* Request to zero accumulating counters */
2111   if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
2112   {
2113     native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2114     glob.zero_total();
2115     glob.set_status();
2116     opt.set(Ndb_index_stat_opt::Izero_total, false);
2117     native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2118   }
2119 }
2120 
2121 #ifndef DBUG_OFF
2122 static void
ndb_index_stat_entry_verify(Ndb_index_stat_proc & pr,const Ndb_index_stat * st)2123 ndb_index_stat_entry_verify(Ndb_index_stat_proc &pr, const Ndb_index_stat *st)
2124 {
2125   const NDB_SHARE *share= st->share;
2126   if (st->to_delete)
2127   {
2128     assert(st->share_next == 0);
2129     assert(share == 0);
2130   }
2131   else
2132   {
2133     assert(share != 0);
2134     const Ndb_index_stat *st2= share->index_stat_list;
2135     assert(st2 != 0);
2136     uint found= 0;
2137     while (st2 != 0)
2138     {
2139       assert(st2->share == share);
2140       const Ndb_index_stat *st3= st2->share_next;
2141       uint guard= 0;
2142       while (st3 != 0)
2143       {
2144         assert(st2 != st3);
2145         guard++;
2146         assert(guard <= 1000); // MAX_INDEXES
2147         st3= st3->share_next;
2148       }
2149       if (st == st2)
2150         found++;
2151       st2= st2->share_next;
2152     }
2153     assert(found == 1);
2154   }
2155   assert(st->read_time <= st->check_time);
2156   pr.cache_query_bytes+= st->query_bytes;
2157   pr.cache_clean_bytes+= st->clean_bytes;
2158 }
2159 
2160 static void
ndb_index_stat_list_verify(Ndb_index_stat_proc & pr,int lt)2161 ndb_index_stat_list_verify(Ndb_index_stat_proc &pr, int lt)
2162 {
2163   const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
2164   const Ndb_index_stat *st= list.head;
2165   uint count= 0;
2166   while (st != 0)
2167   {
2168     count++;
2169     assert(count <= list.count);
2170     if (st->list_prev != 0)
2171     {
2172       assert(st->list_prev->list_next == st);
2173     }
2174     if (st->list_next != 0)
2175     {
2176       assert(st->list_next->list_prev == st);
2177     }
2178     if (count == 1)
2179     {
2180       assert(st == list.head);
2181     }
2182     if (count == list.count)
2183     {
2184       assert(st == list.tail);
2185     }
2186     if (st == list.head)
2187     {
2188       assert(count == 1);
2189       assert(st->list_prev == 0);
2190     }
2191     if (st == list.tail)
2192     {
2193       assert(count == list.count);
2194       assert(st->list_next == 0);
2195     }
2196     const Ndb_index_stat *st2= st->list_next;
2197     uint guard= 0;
2198     while (st2 != 0)
2199     {
2200       assert(st != st2);
2201       guard++;
2202       assert(guard <= list.count);
2203       st2= st2->list_next;
2204     }
2205     ndb_index_stat_entry_verify(pr, st);
2206     st= st->list_next;
2207   }
2208   assert(count == list.count);
2209 }
2210 
2211 static void
ndb_index_stat_list_verify(Ndb_index_stat_proc & pr)2212 ndb_index_stat_list_verify(Ndb_index_stat_proc &pr)
2213 {
2214   const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2215   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2216   pr.cache_query_bytes= 0;
2217   pr.cache_clean_bytes= 0;
2218 
2219   for (int lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
2220     ndb_index_stat_list_verify(pr, lt);
2221 
2222   assert(glob.cache_query_bytes == pr.cache_query_bytes);
2223   assert(glob.cache_clean_bytes == pr.cache_clean_bytes);
2224   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2225 }
2226 
2227 static void
ndb_index_stat_report(const Ndb_index_stat_glob & old_glob)2228 ndb_index_stat_report(const Ndb_index_stat_glob& old_glob)
2229 {
2230   const Ndb_index_stat_glob &new_glob= ndb_index_stat_glob;
2231   const char *old_status= old_glob.status[old_glob.status_i];
2232   const char *new_status= new_glob.status[new_glob.status_i];
2233 
2234   if (strcmp(old_status, new_status) != 0)
2235   {
2236     DBUG_PRINT("index_stat", ("old_status: %s", old_status));
2237     DBUG_PRINT("index_stat", ("new_status: %s", new_status));
2238   }
2239 }
2240 #endif
2241 
2242 static void
ndb_index_stat_proc(Ndb_index_stat_proc & pr)2243 ndb_index_stat_proc(Ndb_index_stat_proc &pr)
2244 {
2245   DBUG_ENTER("ndb_index_stat_proc");
2246 
2247   ndb_index_stat_proc_control(pr);
2248 
2249 #ifndef DBUG_OFF
2250   ndb_index_stat_list_verify(pr);
2251   Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
2252 #endif
2253 
2254   pr.start= pr.now= ndb_index_stat_time();
2255 
2256   ndb_index_stat_proc_new(pr);
2257   ndb_index_stat_proc_update(pr);
2258   ndb_index_stat_proc_read(pr);
2259   ndb_index_stat_proc_idle(pr);
2260   ndb_index_stat_proc_check(pr);
2261   ndb_index_stat_proc_evict(pr);
2262   ndb_index_stat_proc_delete(pr);
2263   ndb_index_stat_proc_error(pr);
2264   ndb_index_stat_proc_event(pr);
2265 
2266 #ifndef DBUG_OFF
2267   ndb_index_stat_list_verify(pr);
2268   ndb_index_stat_report(old_glob);
2269 #endif
2270   DBUG_VOID_RETURN;
2271 }
2272 
2273 /*
2274   Runs after stats thread exits and needs no locks.
2275 */
2276 void
ndb_index_stat_end()2277 ndb_index_stat_end()
2278 {
2279   DBUG_ENTER("ndb_index_stat_end");
2280   Ndb_index_stat_proc pr;
2281   pr.end= true;
2282 
2283   /*
2284    * Shares have been freed so any index stat entries left should be
2285    * in LT_Delete.  The first two steps here should be unnecessary.
2286    */
2287 
2288   int lt;
2289   for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
2290   {
2291     if (lt == (int)Ndb_index_stat::LT_Delete)
2292       continue;
2293     Ndb_index_stat_list &list= ndb_index_stat_list[lt];
2294     Ndb_index_stat *st_loop= list.head;
2295     while (st_loop != 0)
2296     {
2297       Ndb_index_stat *st= st_loop;
2298       st_loop= st_loop->list_next;
2299       DBUG_PRINT("index_stat", ("st %s end %s", st->id, list.name));
2300       pr.lt= Ndb_index_stat::LT_Delete;
2301       ndb_index_stat_list_move(st, pr.lt);
2302     }
2303   }
2304 
2305   /* Real free */
2306   ndb_index_stat_proc_delete(pr);
2307   DBUG_VOID_RETURN;
2308 }
2309 
2310 /* Index stats thread */
2311 
2312 static int
ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc & pr)2313 ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc &pr)
2314 {
2315   DBUG_ENTER("ndb_index_stat_check_or_create_systables");
2316 
2317   NdbIndexStat *is= pr.is_util;
2318   Ndb *ndb= pr.ndb;
2319 
2320   if (is->check_systables(ndb) == 0)
2321   {
2322     DBUG_PRINT("index_stat", ("using existing index stats tables"));
2323     DBUG_RETURN(0);
2324   }
2325 
2326   if (is->create_systables(ndb) == 0)
2327   {
2328     DBUG_PRINT("index_stat", ("created index stats tables"));
2329     DBUG_RETURN(0);
2330   }
2331 
2332   if (is->getNdbError().code == 721 ||
2333       is->getNdbError().code == 4244 ||
2334       is->getNdbError().code == 4009) // no connection
2335   {
2336     // race between mysqlds, maybe
2337     DBUG_PRINT("index_stat", ("create index stats tables failed: error %d line %d",
2338                               is->getNdbError().code, is->getNdbError().line));
2339     DBUG_RETURN(-1);
2340   }
2341 
2342   sql_print_warning("create index stats tables failed: error %d line %d",
2343                     is->getNdbError().code, is->getNdbError().line);
2344   DBUG_RETURN(-1);
2345 }
2346 
2347 static int
ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc & pr)2348 ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc &pr)
2349 {
2350   DBUG_ENTER("ndb_index_stat_check_or_create_sysevents");
2351 
2352   NdbIndexStat *is= pr.is_util;
2353   Ndb *ndb= pr.ndb;
2354 
2355   if (is->check_sysevents(ndb) == 0)
2356   {
2357     DBUG_PRINT("index_stat", ("using existing index stats events"));
2358     DBUG_RETURN(0);
2359   }
2360 
2361   if (is->create_sysevents(ndb) == 0)
2362   {
2363     DBUG_PRINT("index_stat", ("created index stats events"));
2364     DBUG_RETURN(0);
2365   }
2366 
2367   if (is->getNdbError().code == 746)
2368   {
2369     // race between mysqlds, maybe
2370     DBUG_PRINT("index_stat", ("create index stats events failed: error %d line %d",
2371                               is->getNdbError().code, is->getNdbError().line));
2372     DBUG_RETURN(-1);
2373   }
2374 
2375   sql_print_warning("create index stats events failed: error %d line %d",
2376                     is->getNdbError().code, is->getNdbError().line);
2377   DBUG_RETURN(-1);
2378 }
2379 
2380 static int
ndb_index_stat_start_listener(Ndb_index_stat_proc & pr)2381 ndb_index_stat_start_listener(Ndb_index_stat_proc &pr)
2382 {
2383   DBUG_ENTER("ndb_index_stat_start_listener");
2384 
2385   NdbIndexStat *is= pr.is_util;
2386   Ndb *ndb= pr.ndb;
2387 
2388   if (is->create_listener(ndb) == -1)
2389   {
2390     sql_print_warning("create index stats listener failed: error %d line %d",
2391                       is->getNdbError().code, is->getNdbError().line);
2392     DBUG_RETURN(-1);
2393   }
2394 
2395   if (is->execute_listener(ndb) == -1)
2396   {
2397     sql_print_warning("execute index stats listener failed: error %d line %d",
2398                       is->getNdbError().code, is->getNdbError().line);
2399     // Drop the created listener
2400     (void)is->drop_listener(ndb);
2401     DBUG_RETURN(-1);
2402   }
2403 
2404   DBUG_RETURN(0);
2405 }
2406 
2407 static int
ndb_index_stat_stop_listener(Ndb_index_stat_proc & pr)2408 ndb_index_stat_stop_listener(Ndb_index_stat_proc &pr)
2409 {
2410   DBUG_ENTER("ndb_index_stat_stop_listener");
2411 
2412   NdbIndexStat *is= pr.is_util;
2413   Ndb *ndb= pr.ndb;
2414 
2415   if (is->drop_listener(ndb) == -1)
2416   {
2417     sql_print_warning("drop index stats listener failed: error %d line %d",
2418                       is->getNdbError().code, is->getNdbError().line);
2419     DBUG_RETURN(-1);
2420   }
2421 
2422   DBUG_RETURN(0);
2423 }
2424 
2425 /* Restart things after system restart */
2426 
2427 static bool ndb_index_stat_restart_flag= false;
2428 
2429 void
ndb_index_stat_restart()2430 ndb_index_stat_restart()
2431 {
2432   DBUG_ENTER("ndb_index_stat_restart");
2433   ndb_index_stat_restart_flag= true;
2434   DBUG_VOID_RETURN;
2435 }
2436 
2437 bool
is_setup_complete()2438 Ndb_index_stat_thread::is_setup_complete()
2439 {
2440   if (ndb_index_stat_get_enable(NULL))
2441   {
2442     return ndb_index_stat_allow();
2443   }
2444   return true;
2445 }
2446 
2447 extern Ndb_cluster_connection* g_ndb_cluster_connection;
2448 extern handlerton *ndbcluster_hton;
2449 
2450 void
do_run()2451 Ndb_index_stat_thread::do_run()
2452 {
2453   struct timespec abstime;
2454   DBUG_ENTER("ndb_index_stat_thread_func");
2455 
2456   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2457   Ndb_index_stat_proc pr;
2458 
2459   bool have_listener;
2460   have_listener= false;
2461 
2462   log_info("Starting...");
2463 
2464   log_verbose(1, "Wait for server start completed");
2465   /*
2466     wait for mysql server to start
2467   */
2468   mysql_mutex_lock(&LOCK_server_started);
2469   while (!mysqld_server_started)
2470   {
2471     set_timespec(&abstime, 1);
2472     mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
2473 	                 &abstime);
2474     if (is_stop_requested())
2475     {
2476       mysql_mutex_unlock(&LOCK_server_started);
2477       native_mutex_lock(&LOCK);
2478       goto ndb_index_stat_thread_end;
2479     }
2480   }
2481   mysql_mutex_unlock(&LOCK_server_started);
2482 
2483   log_verbose(1, "Wait for cluster to start");
2484   /*
2485     Wait for cluster to start
2486   */
2487   native_mutex_lock(&ndb_util_thread.LOCK);
2488   while (!is_stop_requested() && !g_ndb_status.cluster_node_id &&
2489          (ndbcluster_hton->slot != ~(uint)0))
2490   {
2491     /* ndb not connected yet */
2492     native_cond_wait(&ndb_util_thread.COND, &ndb_util_thread.LOCK);
2493   }
2494   native_mutex_unlock(&ndb_util_thread.LOCK);
2495 
2496   if (is_stop_requested())
2497   {
2498     native_mutex_lock(&LOCK);
2499     goto ndb_index_stat_thread_end;
2500   }
2501 
2502   /* Get instance used for sys objects check and create */
2503   if (!(pr.is_util= new NdbIndexStat))
2504   {
2505     sql_print_error("Could not allocate NdbIndexStat is_util object");
2506     native_mutex_lock(&LOCK);
2507     goto ndb_index_stat_thread_end;
2508   }
2509 
2510   if (!pr.init_ndb(g_ndb_cluster_connection))
2511   {
2512     // Error already printed
2513     native_mutex_lock(&LOCK);
2514     goto ndb_index_stat_thread_end;
2515   }
2516 
2517   /* Allow clients */
2518   ndb_index_stat_allow(1);
2519 
2520   /* Fill in initial status variable */
2521   native_mutex_lock(&stat_mutex);
2522   glob.set_status();
2523   native_mutex_unlock(&stat_mutex);
2524 
2525   log_info("Started");
2526 
2527   bool enable_ok;
2528   enable_ok= false;
2529 
2530   set_timespec(&abstime, 0);
2531   for (;;)
2532   {
2533     native_mutex_lock(&LOCK);
2534     if (!is_stop_requested() && client_waiting == false) {
2535       int ret= native_cond_timedwait(&COND,
2536                                      &LOCK,
2537                                      &abstime);
2538       const char* reason= ret == ETIMEDOUT ? "timed out" : "wake up";
2539       (void)reason; // USED
2540       DBUG_PRINT("index_stat", ("loop: %s", reason));
2541     }
2542     if (is_stop_requested()) /* Shutting down server */
2543       goto ndb_index_stat_thread_end;
2544     client_waiting= false;
2545     native_mutex_unlock(&LOCK);
2546 
2547     if (ndb_index_stat_restart_flag)
2548     {
2549       ndb_index_stat_restart_flag= false;
2550       enable_ok= false;
2551       if (have_listener)
2552       {
2553         if (ndb_index_stat_stop_listener(pr) == 0)
2554           have_listener= false;
2555       }
2556     }
2557 
2558     /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
2559     const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
2560 
2561     do
2562     {
2563       if (enable_ok != enable_ok_new)
2564       {
2565         DBUG_PRINT("index_stat", ("global enable: %d -> %d",
2566                                   enable_ok, enable_ok_new));
2567 
2568         if (enable_ok_new)
2569         {
2570           // at enable check or create stats tables and events
2571           if (ndb_index_stat_check_or_create_systables(pr) == -1 ||
2572               ndb_index_stat_check_or_create_sysevents(pr) == -1 ||
2573               ndb_index_stat_start_listener(pr) == -1)
2574           {
2575             // try again in next loop
2576             break;
2577           }
2578           have_listener= true;
2579         }
2580         else
2581         {
2582           // not a normal use-case
2583           if (have_listener)
2584           {
2585             if (ndb_index_stat_stop_listener(pr) == 0)
2586               have_listener= false;
2587           }
2588         }
2589         enable_ok= enable_ok_new;
2590       }
2591 
2592       if (!enable_ok)
2593         break;
2594 
2595       pr.busy= false;
2596       ndb_index_stat_proc(pr);
2597     } while (0);
2598 
2599     /* Calculate new time to wake up */
2600 
2601     const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
2602     uint msecs= 0;
2603     if (!enable_ok)
2604       msecs= opt.get(Ndb_index_stat_opt::Iloop_enable);
2605     else if (!pr.busy)
2606       msecs= opt.get(Ndb_index_stat_opt::Iloop_idle);
2607     else
2608       msecs= opt.get(Ndb_index_stat_opt::Iloop_busy);
2609     DBUG_PRINT("index_stat", ("sleep %dms", msecs));
2610 
2611     set_timespec_nsec(&abstime, msecs * 1000000ULL);
2612 
2613     /* Update status variable */
2614     glob.th_enable= enable_ok;
2615     glob.th_busy= pr.busy;
2616     glob.th_loop= msecs;
2617     native_mutex_lock(&stat_mutex);
2618     glob.set_status();
2619     native_mutex_unlock(&stat_mutex);
2620   }
2621 
2622 ndb_index_stat_thread_end:
2623   log_info("Stopping...");
2624 
2625   /* Prevent clients */
2626   ndb_index_stat_allow(0);
2627 
2628   if (have_listener)
2629   {
2630     if (ndb_index_stat_stop_listener(pr) == 0)
2631       have_listener= false;
2632   }
2633   if (pr.is_util)
2634   {
2635     delete pr.is_util;
2636     pr.is_util= 0;
2637   }
2638 
2639   pr.destroy();
2640 
2641   native_mutex_unlock(&LOCK);
2642   DBUG_PRINT("exit", ("ndb_index_stat_thread"));
2643 
2644   log_info("Stopped");
2645 
2646   DBUG_VOID_RETURN;
2647 }
2648 
2649 /* Optimizer queries */
2650 
2651 static ulonglong
ndb_index_stat_round(double x)2652 ndb_index_stat_round(double x)
2653 {
2654   char buf[100];
2655   if (x < 0.0)
2656     x= 0.0;
2657   // my_snprintf has no float and windows has no snprintf
2658   sprintf(buf, "%.0f", x);
2659   /* mysql provides my_strtoull */
2660   ulonglong n= my_strtoull(buf, 0, 10);
2661   return n;
2662 }
2663 
2664 /*
2665   Client waits for query or analyze.  The routines are
2666   similar but separated for clarity.
2667 */
2668 
2669 static int
ndb_index_stat_wait_query(Ndb_index_stat * st,const Ndb_index_stat_snap & snap)2670 ndb_index_stat_wait_query(Ndb_index_stat *st,
2671                           const Ndb_index_stat_snap &snap)
2672 {
2673   DBUG_ENTER("ndb_index_stat_wait_query");
2674 
2675   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2676   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2677   int err= 0;
2678   uint count= 0;
2679   struct timespec abstime;
2680   glob.wait_stats++;
2681   glob.query_count++;
2682   while (true)
2683   {
2684     int ret= 0;
2685     /* Query waits for any samples */
2686     if (st->sample_version > 0)
2687       break;
2688     if (st->no_stats)
2689     {
2690       /* Have detected no stats now or before */
2691       err= NdbIndexStat::NoIndexStats;
2692       glob.query_no_stats++;
2693       break;
2694     }
2695     if (st->error.code != 0)
2696     {
2697       /* An error has accured now or before */
2698       err= NdbIndexStat::MyHasError;
2699       glob.query_error++;
2700       break;
2701     }
2702     /*
2703       Try to detect changes behind our backs.  Should really not
2704       happen but make sure.
2705     */
2706     if (st->load_time != snap.load_time ||
2707         st->sample_version != snap.sample_version)
2708     {
2709       DBUG_ASSERT(false);
2710       err= NdbIndexStat::NoIndexStats;
2711       break;
2712     }
2713     if (st->abort_request)
2714     {
2715       err= NdbIndexStat::MyAbortReq;
2716       break;
2717     }
2718     count++;
2719     DBUG_PRINT("index_stat", ("st %s wait_query count:%u",
2720                               st->id, count));
2721     ndb_index_stat_thread.wakeup();
2722 
2723     set_timespec(&abstime, 1);
2724     ret= native_cond_timedwait(&ndb_index_stat_thread.stat_cond,
2725                                &ndb_index_stat_thread.stat_mutex,
2726                                &abstime);
2727     if (ret != 0 && ret != ETIMEDOUT)
2728     {
2729       err= ret;
2730       break;
2731     }
2732   }
2733   assert(glob.wait_stats != 0);
2734   glob.wait_stats--;
2735   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2736   if (err != 0)
2737   {
2738     DBUG_PRINT("index_stat", ("st %s wait_query error: %d",
2739                                st->id, err));
2740     DBUG_RETURN(err);
2741   }
2742   DBUG_PRINT("index_stat", ("st %s wait_query ok: sample_version %u -> %u",
2743                             st->id, snap.sample_version, st->sample_version));
2744   DBUG_RETURN(0);
2745 }
2746 
2747 static int
ndb_index_stat_wait_analyze(Ndb_index_stat * st,const Ndb_index_stat_snap & snap)2748 ndb_index_stat_wait_analyze(Ndb_index_stat *st,
2749                             const Ndb_index_stat_snap &snap)
2750 {
2751   DBUG_ENTER("ndb_index_stat_wait_analyze");
2752 
2753   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
2754   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2755   int err= 0;
2756   uint count= 0;
2757   struct timespec abstime;
2758   glob.wait_update++;
2759   glob.analyze_count++;
2760   while (true)
2761   {
2762     int ret= 0;
2763     /* Analyze waits for newer samples */
2764     if (st->sample_version > snap.sample_version)
2765       break;
2766     if (st->error_count != snap.error_count)
2767     {
2768       /* A new error has occured */
2769       DBUG_ASSERT(st->error_count > snap.error_count);
2770       err= st->error.code;
2771       glob.analyze_error++;
2772       break;
2773     }
2774     /*
2775       Try to detect changes behind our backs.  If another process
2776       deleted stats, an analyze here could wait forever.
2777     */
2778     if (st->load_time != snap.load_time ||
2779         st->sample_version != snap.sample_version)
2780     {
2781       DBUG_ASSERT(false);
2782       err= NdbIndexStat::AlienUpdate;
2783       break;
2784     }
2785     if (st->abort_request)
2786     {
2787       err= NdbIndexStat::MyAbortReq;
2788       break;
2789     }
2790     count++;
2791     DBUG_PRINT("index_stat", ("st %s wait_analyze count:%u",
2792                               st->id, count));
2793     ndb_index_stat_thread.wakeup();
2794 
2795     set_timespec(&abstime, 1);
2796     ret= native_cond_timedwait(&ndb_index_stat_thread.stat_cond,
2797                                &ndb_index_stat_thread.stat_mutex,
2798                                &abstime);
2799     if (ret != 0 && ret != ETIMEDOUT)
2800     {
2801       err= ret;
2802       break;
2803     }
2804   }
2805   assert(glob.wait_update != 0);
2806   glob.wait_update--;
2807   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2808   if (err != 0)
2809   {
2810     DBUG_PRINT("index_stat", ("st %s wait_analyze error: %d",
2811                                st->id, err));
2812     DBUG_RETURN(err);
2813   }
2814   DBUG_PRINT("index_stat", ("st %s wait_analyze ok: sample_version %u -> %u",
2815                             st->id, snap.sample_version, st->sample_version));
2816   DBUG_RETURN(0);
2817 }
2818 
2819 int
ndb_index_stat_query(uint inx,const key_range * min_key,const key_range * max_key,NdbIndexStat::Stat & stat,int from)2820 ha_ndbcluster::ndb_index_stat_query(uint inx,
2821                                     const key_range *min_key,
2822                                     const key_range *max_key,
2823                                     NdbIndexStat::Stat& stat,
2824                                     int from)
2825 {
2826   DBUG_ENTER("ha_ndbcluster::ndb_index_stat_query");
2827 
2828   const KEY *key_info= table->key_info + inx;
2829   const NDB_INDEX_DATA &data= m_index[inx];
2830   const NDBINDEX *index= data.index;
2831   DBUG_PRINT("index_stat", ("index: %u name: %s", inx, index->getName()));
2832 
2833   int err= 0;
2834 
2835   /* Create an IndexBound struct for the keys */
2836   NdbIndexScanOperation::IndexBound ib;
2837   compute_index_bounds(ib, key_info, min_key, max_key, from);
2838   ib.range_no= 0;
2839 
2840   Ndb_index_stat_snap snap;
2841   Ndb_index_stat *st=
2842     ndb_index_stat_get_share(m_share, index, m_table, snap, err, true, false);
2843   if (st == 0)
2844     DBUG_RETURN(err);
2845   /* Now holding reference to st */
2846 
2847   do
2848   {
2849     err= ndb_index_stat_wait_query(st, snap);
2850     if (err != 0)
2851       break;
2852     assert(st->sample_version != 0);
2853     uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
2854     uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
2855     NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
2856     NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
2857     NdbIndexStat::Range range(bound_lo, bound_hi);
2858 
2859     const NdbRecord* key_record= data.ndb_record_key;
2860     if (st->is->convert_range(range, key_record, &ib) == -1)
2861     {
2862       native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2863       ndb_index_stat_error(st, 1, "convert_range", __LINE__);
2864       err= st->client_error.code;
2865       native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2866       break;
2867     }
2868     if (st->is->query_stat(range, stat) == -1)
2869     {
2870       /* Invalid cache - should remove the entry */
2871       native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2872       ndb_index_stat_error(st, 1, "query_stat", __LINE__);
2873       err= st->client_error.code;
2874       native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2875       break;
2876     }
2877   }
2878   while (0);
2879 
2880   /* Release reference to st */
2881   native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2882   ndb_index_stat_ref_count(st, false);
2883   native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2884   DBUG_RETURN(err);
2885 }
2886 
2887 int
ndb_index_stat_get_rir(uint inx,key_range * min_key,key_range * max_key,ha_rows * rows_out)2888 ha_ndbcluster::ndb_index_stat_get_rir(uint inx,
2889                                       key_range *min_key,
2890                                       key_range *max_key,
2891                                       ha_rows *rows_out)
2892 {
2893   DBUG_ENTER("ha_ndbcluster::ndb_index_stat_get_rir");
2894   uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
2895   NdbIndexStat::Stat stat(stat_buffer);
2896   int err= ndb_index_stat_query(inx, min_key, max_key, stat, 1);
2897   if (err == 0)
2898   {
2899     double rir= -1.0;
2900     NdbIndexStat::get_rir(stat, &rir);
2901     ha_rows rows= ndb_index_stat_round(rir);
2902     /* Estimate only so cannot return exact zero */
2903     if (rows == 0)
2904       rows= 1;
2905     *rows_out= rows;
2906 #ifndef DBUG_OFF
2907     char rule[NdbIndexStat::RuleBufferBytes];
2908     NdbIndexStat::get_rule(stat, rule);
2909 #endif
2910     DBUG_PRINT("index_stat", ("rir: %u rule: %s", (uint)rows, rule));
2911     DBUG_RETURN(0);
2912   }
2913   DBUG_RETURN(err);
2914 }
2915 
2916 int
ndb_index_stat_set_rpk(uint inx)2917 ha_ndbcluster::ndb_index_stat_set_rpk(uint inx)
2918 {
2919   DBUG_ENTER("ha_ndbcluster::ndb_index_stat_set_rpk");
2920 
2921   KEY *key_info= table->key_info + inx;
2922   int err= 0;
2923 
2924   uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
2925   NdbIndexStat::Stat stat(stat_buffer);
2926   const key_range *min_key= 0;
2927   const key_range *max_key= 0;
2928   err= ndb_index_stat_query(inx, min_key, max_key, stat, 2);
2929   if (err == 0)
2930   {
2931     uint k;
2932     for (k= 0; k < key_info->user_defined_key_parts; k++)
2933     {
2934       double rpk= -1.0;
2935       NdbIndexStat::get_rpk(stat, k, &rpk);
2936       key_info->set_records_per_key(k, static_cast<rec_per_key_t>(rpk));
2937 #ifndef DBUG_OFF
2938       char rule[NdbIndexStat::RuleBufferBytes];
2939       NdbIndexStat::get_rule(stat, rule);
2940 #endif
2941       DBUG_PRINT("index_stat", ("rpk[%u]: %f rule: %s", k, rpk, rule));
2942     }
2943     DBUG_RETURN(0);
2944   }
2945   DBUG_RETURN(err);
2946 }
2947 
2948 int
ndb_index_stat_analyze(Ndb * ndb,uint * inx_list,uint inx_count)2949 ha_ndbcluster::ndb_index_stat_analyze(Ndb *ndb,
2950                                       uint *inx_list,
2951                                       uint inx_count)
2952 {
2953   DBUG_ENTER("ha_ndbcluster::ndb_index_stat_analyze");
2954 
2955   struct Req {
2956     Ndb_index_stat *st;
2957     Ndb_index_stat_snap snap;
2958     int err;
2959     Req() { st= 0; err= 0; }
2960   };
2961   Req req[MAX_INDEXES];
2962 
2963   /* Force stats update on each index */
2964   for (uint i= 0; i < inx_count; i++)
2965   {
2966     Req &r= req[i];
2967     uint inx= inx_list[i];
2968     const NDB_INDEX_DATA &data= m_index[inx];
2969     const NDBINDEX *index= data.index;
2970     DBUG_PRINT("index_stat", ("force update: %s", index->getName()));
2971 
2972     r.st=
2973       ndb_index_stat_get_share(m_share, index, m_table, r.snap, r.err, true, true);
2974     assert((r.st != 0) == (r.err == 0));
2975     /* Now holding reference to r.st if r.err == 0 */
2976   }
2977 
2978   /* Wait for each update */
2979   for (uint i = 0; i < inx_count; i++)
2980   {
2981     Req &r= req[i];
2982     uint inx= inx_list[i];
2983     const NDB_INDEX_DATA &data= m_index[inx];
2984     const NDBINDEX *index= data.index;
2985     (void)index; // USED
2986 
2987     if (r.err == 0)
2988     {
2989       DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
2990       r.err=ndb_index_stat_wait_analyze(r.st, r.snap);
2991       /* Release reference to r.st */
2992       native_mutex_lock(&ndb_index_stat_thread.stat_mutex);
2993       ndb_index_stat_ref_count(r.st, false);
2994       native_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
2995     }
2996   }
2997 
2998   /* Return first error if any */
2999   int err= 0;
3000   for (uint i= 0; i < inx_count; i++)
3001   {
3002     Req &r= req[i];
3003     if (r.err != 0)
3004     {
3005       err= r.err;
3006       break;
3007     }
3008   }
3009 
3010   DBUG_RETURN(err);
3011 }
3012