1 /*
2    Copyright (c) 2006, 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <algorithm>
26 
27 #include <ndb_global.h>
28 #include "m_ctype.h"
29 #include <ndb_opts.h>
30 #include <NdbApi.hpp>
31 #include <NdbIndexStat.hpp>
32 #include <NdbTest.hpp>
33 #include <ndb_version.h>
34 #include <NDBT_Stats.hpp>
35 #include <math.h>
36 #include <NdbHost.h>
37 
38 struct Opts {
39   int loglevel;
40   uint seed;
41   uint attrs;
42   uint loops;
43   uint rows;
44   uint ops;
45   uint nullkeys;
46   uint rpk;
47   uint rpkvar;
48   uint scanpct;
49   uint eqscans;
50   bool keeptable;
51   bool abort;
52   const char* dump;
OptsOpts53   Opts() :
54     loglevel(0),
55     seed(0),
56     attrs(3),
57     loops(1),
58     rows(10000),
59     ops(100),
60     nullkeys(10),
61     rpk(10),
62     rpkvar(10),
63     scanpct(10),
64     eqscans(30),
65     keeptable(false),
66     abort(false),
67     dump(0)
68   {}
69 };
70 
71 static Opts g_opts;
72 static uint g_loop = 0;
73 
74 static const char* g_tabname = "ts1";
75 static const char* g_indname = "ts1x1";
76 static const uint g_numattrs = 3;
77 static const uint g_charlen = 10;
78 static const char* g_csname = "latin1_swedish_ci";
79 static CHARSET_INFO* g_cs;
80 
81 // keys nullability
82 static const bool g_b_nullable = true;
83 static const bool g_c_nullable = true;
84 static const bool g_d_nullable = true;
85 
86 // value limits
87 struct Lim {
88   bool all_nullable;
89   uint b_min;
90   uint b_max;
91   const char* c_char;
92   uint d_min;
93   uint d_max;
94 };
95 
96 static Lim g_lim_val;
97 static Lim g_lim_bnd;
98 
99 static Ndb_cluster_connection* g_ncc = 0;
100 static Ndb* g_ndb = 0;
101 static Ndb* g_ndb_sys = 0;
102 static NdbDictionary::Dictionary* g_dic = 0;
103 static const NdbDictionary::Table* g_tab = 0;
104 static const NdbDictionary::Index* g_ind = 0;
105 static const NdbRecord* g_tab_rec = 0;
106 static const NdbRecord* g_ind_rec = 0;
107 
108 struct my_record
109 {
110   Uint8 m_null_bm;
111   Uint8 fill[3];
112   Uint32 m_a;
113   Uint32 m_b;
114   char m_c[1+g_charlen];
115   Uint16 m_d;
116 };
117 
118 static const Uint32 g_ndbrec_a_offset=offsetof(my_record, m_a);
119 static const Uint32 g_ndbrec_b_offset=offsetof(my_record, m_b);
120 static const Uint32 g_ndbrec_b_nb_offset=1;
121 static const Uint32 g_ndbrec_c_offset=offsetof(my_record, m_c);
122 static const Uint32 g_ndbrec_c_nb_offset=2;
123 static const Uint32 g_ndbrec_d_offset=offsetof(my_record, m_d);
124 static const Uint32 g_ndbrec_d_nb_offset=3;
125 
126 static NdbTransaction* g_con = 0;
127 static NdbOperation* g_op = 0;
128 static NdbScanOperation* g_scan_op = 0;
129 static NdbIndexScanOperation* g_rangescan_op = 0;
130 
131 static NdbIndexStat* g_is = 0;
132 static bool g_has_created_stat_tables = false;
133 static bool g_has_created_stat_events = false;
134 
135 static uint
urandom(uint m)136 urandom(uint m)
137 {
138   if (m == 0)
139     return 0;
140   uint r = (uint)rand();
141   r = r % m;
142   return r;
143 }
144 
145 static int& g_loglevel = g_opts.loglevel; // default log level
146 
147 #define chkdb(x) \
148   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; errdb(); if (g_opts.abort) abort(); return -1; } while (0)
149 
150 #define chker(x) \
151   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; ndbout << "errno: " << errno; if (g_opts.abort) abort(); return -1; } while (0)
152 
153 #define chkrc(x) \
154   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; if (g_opts.abort) abort(); return -1; } while (0)
155 
156 #define chkrc_break(x) \
157   if (unlikely(!(x))) { ndbout << "line " << __LINE__ << " FAIL " << #x << endl; break; }
158 
159 #define llx(n, x) \
160   do { if (likely(g_loglevel < n)) break; ndbout << x << endl; } while (0)
161 
162 #define ll0(x) llx(0, x)
163 #define ll1(x) llx(1, x)
164 #define ll2(x) llx(2, x)
165 #define ll3(x) llx(3, x)
166 
167 static void
errdb()168 errdb()
169 {
170   uint any = 0;
171   if (g_ncc != 0) {
172     NdbError e;
173     e.code = g_ncc->get_latest_error();
174     e.message = g_ncc->get_latest_error_msg();
175     if (e.code != 0)
176       ll0(++any << " ncc: error" << e);
177   }
178   if (g_ndb != 0) {
179     const NdbError& e = g_ndb->getNdbError();
180     if (e.code != 0)
181       ll0(++any << " ndb: error " << e);
182   }
183   if (g_dic != 0) {
184     const NdbError& e = g_dic->getNdbError();
185     if (e.code != 0)
186       ll0(++any << " dic: error " << e);
187   }
188   if (g_con != 0) {
189     const NdbError& e = g_con->getNdbError();
190     if (e.code != 0)
191       ll0(++any << " con: error " << e);
192   }
193   if (g_op != 0) {
194     const NdbError& e = g_op->getNdbError();
195     if (e.code != 0)
196       ll0(++any << " op: error " << e);
197   }
198   if (g_scan_op != 0) {
199     const NdbError& e = g_scan_op->getNdbError();
200     if (e.code != 0)
201       ll0(++any << " scan_op: error " << e);
202   }
203   if (g_rangescan_op != 0) {
204     const NdbError& e = g_rangescan_op->getNdbError();
205     if (e.code != 0)
206       ll0(++any << " rangescan_op: error " << e);
207   }
208   if (g_is != 0) {
209     const NdbIndexStat::Error& e = g_is->getNdbError();
210     if (e.code != 0)
211       ll0(++any << " stat: error " << e);
212   }
213   if (! any)
214     ll0("unknown db error");
215 }
216 
217 /* Methods to create NdbRecord structs for the table and index */
218 static int
createNdbRecords()219 createNdbRecords()
220 {
221   ll1("createNdbRecords");
222   const Uint32 numCols=4;
223   const Uint32 numIndexCols=3;
224   NdbDictionary::RecordSpecification recSpec[numCols];
225 
226   recSpec[0].column= g_tab->getColumn("a"); // 4 bytes
227   recSpec[0].offset= g_ndbrec_a_offset;
228   recSpec[0].nullbit_byte_offset= ~(Uint32)0;
229   recSpec[0].nullbit_bit_in_byte= ~(Uint32)0;
230 
231   recSpec[1].column= g_tab->getColumn("b"); // 4 bytes
232   recSpec[1].offset= g_ndbrec_b_offset;
233   if (g_b_nullable) {
234     recSpec[1].nullbit_byte_offset= 0;
235     recSpec[1].nullbit_bit_in_byte= g_ndbrec_b_nb_offset;
236   } else {
237     recSpec[1].nullbit_byte_offset= ~(Uint32)0;
238     recSpec[1].nullbit_bit_in_byte= ~(Uint32)0;
239   }
240 
241   recSpec[2].column= g_tab->getColumn("c"); // Varchar(10) -> ~12 bytes
242   recSpec[2].offset= g_ndbrec_c_offset;
243   if (g_c_nullable) {
244     recSpec[2].nullbit_byte_offset= 0;
245     recSpec[2].nullbit_bit_in_byte= g_ndbrec_c_nb_offset;
246   } else {
247     recSpec[2].nullbit_byte_offset= ~(Uint32)0;
248     recSpec[2].nullbit_bit_in_byte= ~(Uint32)0;
249   }
250 
251   recSpec[3].column= g_tab->getColumn("d"); // 2 bytes
252   recSpec[3].offset= g_ndbrec_d_offset;
253   if (g_d_nullable) {
254     recSpec[3].nullbit_byte_offset= 0;
255     recSpec[3].nullbit_bit_in_byte= g_ndbrec_d_nb_offset;
256   } else {
257     recSpec[3].nullbit_byte_offset= ~(Uint32)0;
258     recSpec[3].nullbit_bit_in_byte= ~(Uint32)0;
259   }
260 
261   g_dic = g_ndb->getDictionary();
262   g_tab_rec= g_dic->createRecord(g_tab,
263                                  &recSpec[0],
264                                  numCols,
265                                  sizeof(NdbDictionary::RecordSpecification),
266                                  0);
267 
268   chkdb(g_tab_rec != NULL);
269 
270   g_ind_rec= g_dic->createRecord(g_ind,
271                                  &recSpec[1],
272                                  numIndexCols,
273                                  sizeof(NdbDictionary::RecordSpecification),
274                                  0);
275 
276   chkdb(g_ind_rec != NULL);
277   g_dic = 0;
278 
279   return 0;
280 }
281 
282 // create table ts0 (
283 //   a int unsigned,
284 //   b int unsigned, c varchar(10), d smallint unsigned,
285 //   primary key using hash (a), index (b, c, d) )
286 
287 static int
createtable()288 createtable()
289 {
290   ll1("createtable");
291   NdbDictionary::Table tab(g_tabname);
292   tab.setLogging(false);
293   {
294     NdbDictionary::Column col("a");
295     col.setType(NdbDictionary::Column::Unsigned);
296     col.setPrimaryKey(true);
297     tab.addColumn(col);
298   }
299   {
300     NdbDictionary::Column col("b");
301     col.setType(NdbDictionary::Column::Unsigned);
302     col.setNullable(g_b_nullable);
303     tab.addColumn(col);
304   }
305   {
306     NdbDictionary::Column col("c");
307     col.setType(NdbDictionary::Column::Varchar);
308     col.setLength(g_charlen);
309     col.setCharset(g_cs);
310     col.setNullable(g_c_nullable);
311     tab.addColumn(col);
312   }
313   {
314     NdbDictionary::Column col("d");
315     col.setType(NdbDictionary::Column::Smallunsigned);
316     col.setNullable(g_d_nullable);
317     tab.addColumn(col);
318   }
319 
320   g_dic = g_ndb->getDictionary();
321   if (g_dic->getTable(g_tabname) != 0)
322     chkdb(g_dic->dropTable(g_tabname) == 0);
323   chkdb(g_dic->createTable(tab) == 0);
324   chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
325   g_dic = 0;
326   return 0;
327 }
328 
329 static int
createindex()330 createindex()
331 {
332   ll1("createindex");
333   NdbDictionary::Index ind(g_indname);
334   ind.setTable(g_tabname);
335   ind.setType(NdbDictionary::Index::OrderedIndex);
336   ind.setLogging(false);
337   ind.addColumnName("b");
338   ind.addColumnName("c");
339   ind.addColumnName("d");
340 
341   g_dic = g_ndb->getDictionary();
342   chkdb(g_dic->createIndex(ind) == 0);
343   chkdb((g_ind = g_dic->getIndex(g_indname, g_tabname)) != 0);
344   g_dic = 0;
345   return 0;
346 }
347 
348 static int
droptable()349 droptable()
350 {
351   ll1("droptable");
352   g_dic = g_ndb->getDictionary();
353   chkdb(g_dic->dropTable(g_tabname) == 0);
354   g_dic = 0;
355   return 0;
356 }
357 
358 // values for keys and bounds
359 
360 struct Val {
361   uint8 m_numattrs;
362   int8 b_null;
363   int8 c_null;
364   int8 d_null;
365   Uint32 b;
366   uchar c[1 + g_charlen];
367   Uint16 d;
368   Val();
369   void init();
370   void copy(const Val& val2);
371   void make(uint numattrs, const Lim& lim);
372   int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
373   void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
374 
375 private:
376   Val& operator=(const Val&);
377   Val(const Val&);
378 };
379 
380 static NdbOut&
operator <<(NdbOut & out,const Val & val)381 operator<<(NdbOut& out, const Val& val)
382 {
383   out << "[";
384   if (val.m_numattrs >= 1) {
385     if (val.b_null)
386       out << "NULL";
387     else
388       out << val.b;
389   }
390   if (val.m_numattrs >= 2) {
391     out << " ";
392     if (val.c_null)
393       out << "NULL";
394     else {
395       char buf[1 + g_charlen];
396       sprintf(buf, "%.*s", val.c[0], &val.c[1]);
397       out << "'" << buf << "'";
398     }
399   }
400   if (val.m_numattrs >= 3) {
401     out << " ";
402     if (val.d_null)
403       out <<" NULL";
404     else
405       out << val.d;
406   }
407   out << "]";
408   return out;
409 }
410 
Val()411 Val::Val()
412 {
413   init();
414 }
415 
416 void
init()417 Val::init()
418 {
419   m_numattrs = 0;
420   // junk rest
421   b_null = -1;
422   c_null = -1;
423   d_null = -1;
424   b = ~(Uint32)0;
425   memset(c, 0xff, sizeof(c));
426   d = ~(Uint16)0;
427 }
428 
429 void
copy(const Val & val2)430 Val::copy(const Val& val2)
431 {
432   require(this != &val2);
433   init();
434   m_numattrs = val2.m_numattrs;
435   if (m_numattrs >= 1) {
436     require(val2.b_null == 0 || val2.b_null == 1);
437     b_null = val2.b_null;
438     if (!b_null)
439       b = val2.b;
440   }
441   if (m_numattrs >= 2) {
442     require(val2.c_null == 0 || val2.c_null == 1);
443     c_null = val2.c_null;
444     if (!c_null)
445       memcpy(c, val2.c, sizeof(c));
446   }
447   if (m_numattrs >= 3) {
448     require(val2.d_null == 0 || val2.d_null == 1);
449     d_null = val2.d_null;
450     if (!d_null)
451       d = val2.d;
452   }
453 }
454 
455 void
make(uint numattrs,const Lim & lim)456 Val::make(uint numattrs, const Lim& lim)
457 {
458   require(numattrs <= g_numattrs);
459   if (numattrs >= 1) {
460     const bool nullable = g_b_nullable || lim.all_nullable;
461     if (nullable && urandom(100) < g_opts.nullkeys)
462       b_null = 1;
463     else {
464       require(lim.b_min <= lim.b_max);
465       b = lim.b_min + urandom(lim.b_max - lim.b_min + 1);
466       b_null = 0;
467     }
468   }
469   if (numattrs >= 2) {
470     const bool nullable = g_c_nullable || lim.all_nullable;
471     if (nullable && urandom(100) < g_opts.nullkeys)
472       c_null = 1;
473     else {
474       // prefer shorter
475       const uint len = urandom(urandom(g_charlen + 1) + 1);
476       c[0] = len;
477       for (uint j = 0; j < len; j++) {
478         uint k = urandom((uint)strlen(lim.c_char));
479         c[1 + j] = lim.c_char[k];
480       }
481       c_null = 0;
482     }
483   }
484   if (numattrs >= 3) {
485     const bool nullable = g_d_nullable || lim.all_nullable;
486     if (nullable && urandom(100) < g_opts.nullkeys)
487       d_null = 1;
488     else {
489       require(lim.d_min <= lim.d_max);
490       d = lim.d_min + urandom(lim.d_max - lim.d_min + 1);
491       d_null = 0;
492     }
493   }
494   m_numattrs = numattrs;
495 }
496 
497 int
cmp(const Val & val2,uint numattrs,uint * num_eq) const498 Val::cmp(const Val& val2, uint numattrs, uint* num_eq) const
499 {
500   require(numattrs <= m_numattrs);
501   require(numattrs <= val2.m_numattrs);
502   uint n = 0; // attr index where differs
503   uint k = 0;
504   if (k == 0 && numattrs >= 1) {
505     if (! b_null && ! val2.b_null) {
506       if (b < val2.b)
507         k = -1;
508       else if (b > val2.b)
509         k = +1;
510     } else if (! b_null) {
511       k = +1;
512     } else if (! val2.b_null) {
513       k = -1;
514     }
515     if (k == 0)
516       n++;
517   }
518   if (k == 0 && numattrs >= 2) {
519     if (! c_null && ! val2.c_null) {
520       const uchar* s1 = &c[1];
521       const uchar* s2 = &val2.c[1];
522       const uint l1 = (uint)c[0];
523       const uint l2 = (uint)val2.c[0];
524       require(l1 <= g_charlen && l2 <= g_charlen);
525       k = g_cs->coll->strnncollsp(g_cs, s1, l1, s2, l2);
526     } else if (! c_null) {
527       k = +1;
528     } else if (! val2.c_null) {
529       k = -1;
530     }
531     if (k == 0)
532       n++;
533   }
534   if (k == 0 && numattrs >= 3) {
535     if (! d_null && ! val2.d_null) {
536       if (d < val2.d)
537         k = -1;
538       else if (d > val2.d)
539         k = +1;
540     } else if (! d_null) {
541       k = +1;
542     } else if (! val2.d_null) {
543       k = -1;
544     }
545     if (k == 0)
546       n++;
547   }
548   require(n <= numattrs);
549   if (num_eq != 0)
550     *num_eq = n;
551   return k;
552 }
553 
554 void
fromib(const NdbIndexScanOperation::IndexBound & ib,uint j)555 Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
556 {
557   const char* key = (j == 0 ? ib.low_key : ib.high_key);
558   const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
559   const Uint8 nullbits = *(const Uint8*)key;
560   require(numattrs <= g_numattrs);
561   if (numattrs >= 1) {
562     if (nullbits & (1 << g_ndbrec_b_nb_offset))
563       b_null = 1;
564     else {
565       memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b));
566       b_null = 0;
567     }
568   }
569   if (numattrs >= 2) {
570     if (nullbits & (1 << g_ndbrec_c_nb_offset))
571       c_null = 1;
572     else {
573       memcpy(c, &key[g_ndbrec_c_offset], sizeof(c));
574       c_null = 0;
575     }
576   }
577   if (numattrs >= 3) {
578     if (nullbits & (1 << g_ndbrec_d_nb_offset))
579       d_null = 1;
580     else {
581       memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d));
582       d_null = 0;
583     }
584   }
585   m_numattrs = numattrs;
586 }
587 
588 // index keys
589 
590 struct Key {
591   Val m_val;
592   int8 m_flag; // temp use
593   Key();
594 
595 private:
596   Key& operator=(const Key&);
597   Key(const Key&);
598 };
599 
600 static NdbOut&
operator <<(NdbOut & out,const Key & key)601 operator<<(NdbOut& out, const Key& key)
602 {
603   out << key.m_val;
604   if (key.m_flag != -1)
605     out << " flag: " << key.m_flag;
606   return out;
607 }
608 
Key()609 Key::Key()
610 {
611   m_flag = -1;
612 }
613 
614 static Key* g_keys = 0;
615 static uint* g_sortkeys = 0;
616 
617 static void
freekeys()618 freekeys()
619 {
620   delete [] g_keys;
621   delete [] g_sortkeys;
622   g_keys = 0;
623   g_sortkeys = 0;
624 }
625 
626 static void
allockeys()627 allockeys()
628 {
629   freekeys();
630   g_keys = new Key [g_opts.rows];
631   g_sortkeys = new uint [g_opts.rows];
632   require(g_keys != 0 && g_sortkeys != 0);
633   memset(g_sortkeys, 0xff, sizeof(uint) * g_opts.rows);
634 }
635 
636 static int
cmpkeys(const void * p1,const void * p2)637 cmpkeys(const void* p1, const void* p2)
638 {
639   const uint i1 = *(const uint*)p1;
640   const uint i2 = *(const uint*)p2;
641   require(i1 < g_opts.rows && i2 < g_opts.rows);
642   const Key& key1 = g_keys[i1];
643   const Key& key2 = g_keys[i2];
644   const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
645   return k;
646 }
647 
648 static void
sortkeys()649 sortkeys()
650 {
651   ll2("sortkeys");
652   uint i;
653 
654   // sort
655   for (i = 0; i < g_opts.rows; i++)
656     g_sortkeys[i] = i;
657   qsort(g_sortkeys, g_opts.rows, sizeof(uint), cmpkeys);
658 
659   // verify
660   uint unique = 1;
661   for (i = 1; i < g_opts.rows; i++) {
662     const uint i1 = g_sortkeys[i - 1];
663     const uint i2 = g_sortkeys[i];
664     require(i1 < g_opts.rows && i2 < g_opts.rows);
665     const Key& key1 = g_keys[i1];
666     const Key& key2 = g_keys[i2];
667     const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
668     require(k <= 0);
669     if (k < 0)
670       unique++;
671   }
672 
673   // show min max key
674   ll1("minkey:" << g_keys[g_sortkeys[0]]);
675   ll1("maxkey:" << g_keys[g_sortkeys[g_opts.rows - 1]]);
676   ll1("unique:" << unique);
677 }
678 
679 static void
makekeys()680 makekeys()
681 {
682   ll1("makekeys");
683 
684   uint initrows = g_opts.rows / g_opts.rpk;
685   require(initrows != 0);
686 
687   // distinct keys
688   uint i = 0;
689   while (i < initrows) {
690     Key& key = g_keys[i];
691     key.m_val.make(g_numattrs, g_lim_val);
692     i++;
693   }
694 
695   // remaining keys
696   while (i < g_opts.rows) {
697     // if rpkvar is 10, multiply rpk by number between 0.1 and 10.0
698     double a = (double)(1 + urandom(g_opts.rpkvar * g_opts.rpkvar));
699     double b = a / (double)g_opts.rpkvar;
700     double c = b * (double)g_opts.rpk;
701     const uint n = (uint)(c + 0.5);
702     // select random key to duplicate from initrows
703     const uint k = urandom(initrows);
704     uint j = 0;
705     while (i < g_opts.rows && j < n) {
706       g_keys[i].m_val.copy(g_keys[k].m_val);
707       j++;
708       i++;
709     }
710   }
711 
712   // shuffle
713   i = 0;
714   while (i < g_opts.rows) {
715     uint j = urandom(g_opts.rows);
716     if (i != j) {
717       Key tmp;
718       tmp.m_val.copy(g_keys[i].m_val);
719       g_keys[i].m_val.copy(g_keys[j].m_val);
720       g_keys[j].m_val.copy(tmp.m_val);
721     }
722     i++;
723   }
724 
725   // sort
726   sortkeys();
727 }
728 
729 // data loading
730 
731 static int
verifydata()732 verifydata()
733 {
734   ll3("verifydata");
735   chkdb((g_con = g_ndb->startTransaction()) != 0);
736   chkdb((g_scan_op = g_con->getNdbScanOperation(g_tab)) != 0);
737   chkdb(g_scan_op->readTuples(NdbScanOperation::LM_CommittedRead) == 0);
738   Uint32 a;
739   Val val;
740   val.m_numattrs = g_numattrs;
741   char* a_addr = (char*)&a;
742   char* b_addr = (char*)&val.b;
743   char* c_addr = (char*)val.c;
744   char* d_addr = (char*)&val.d;
745   Uint32 no = 0;
746   NdbRecAttr* b_ra;
747   NdbRecAttr* c_ra;
748   NdbRecAttr* d_ra;
749   chkdb(g_scan_op->getValue(no++, a_addr) != 0);
750   chkdb((b_ra = g_scan_op->getValue(no++, b_addr)) != 0);
751   chkdb((c_ra = g_scan_op->getValue(no++, c_addr)) != 0);
752   chkdb((d_ra = g_scan_op->getValue(no++, d_addr)) != 0);
753   chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
754   uint count = 0;
755   uint i;
756   for (i = 0; i < g_opts.rows; i++) {
757     Key& key = g_keys[i];
758     key.m_flag = false; // not scanned
759   }
760   while (1) {
761     int ret;
762     a = ~(Uint32)0;
763     chkdb((ret = g_scan_op->nextResult()) == 0 || ret == 1);
764     if (ret == 1)
765       break;
766     val.b_null = b_ra->isNULL();
767     val.c_null = c_ra->isNULL();
768     val.d_null = d_ra->isNULL();
769     require(val.b_null == 0 || (g_b_nullable && val.b_null == 1));
770     require(val.c_null == 0 || (g_c_nullable && val.c_null == 1));
771     require(val.d_null == 0 || (g_d_nullable && val.d_null == 1));
772     i = (uint)a;
773     chkrc(i < g_opts.rows);
774     Key& key = g_keys[i];
775     chkrc(key.m_val.cmp(val) == 0);
776     chkrc(key.m_flag == false);
777     key.m_flag = true;
778     count++;
779   }
780   g_ndb->closeTransaction(g_con);
781   g_con = 0;
782   g_scan_op = 0;
783   for (i = 0; i < g_opts.rows; i++) {
784     Key& key = g_keys[i];
785     chkrc(key.m_flag == true);
786     key.m_flag = -1; // forget
787   }
788   require(count == g_opts.rows);
789   ll3("verifydata: " << g_opts.rows << " rows");
790   return 0;
791 }
792 
793 static int
loaddata(bool update)794 loaddata(bool update)
795 {
796   ll1("loaddata: update: " << update);
797   const uint batch = 512;
798   chkdb((g_con = g_ndb->startTransaction()) != 0);
799   uint i = 0;
800   while (i < g_opts.rows) {
801     chkdb((g_op = g_con->getNdbOperation(g_tab)) != 0);
802     if (!update)
803       chkdb(g_op->insertTuple() == 0);
804     else
805       chkdb(g_op->updateTuple() == 0);
806     Uint32 a = i;
807     const Val& val = g_keys[i].m_val;
808     const char* a_addr = (const char*)&a;
809     const char* b_addr = ! val.b_null ? (const char*)&val.b : 0;
810     const char* c_addr = ! val.c_null ? (const char*)val.c : 0;
811     const char* d_addr = ! val.d_null ? (const char*)&val.d : 0;
812     Uint32 no = 0;
813     chkdb(g_op->equal(no++, a_addr) == 0);
814     chkdb(g_op->setValue(no++, b_addr) == 0);
815     chkdb(g_op->setValue(no++, c_addr) == 0);
816     chkdb(g_op->setValue(no++, d_addr) == 0);
817     if (i++ % batch == 0) {
818       chkdb(g_con->execute(NdbTransaction::Commit) == 0);
819       g_ndb->closeTransaction(g_con);
820       g_con = 0;
821       g_op = 0;
822       chkdb((g_con = g_ndb->startTransaction()) != 0);
823     }
824   }
825   chkdb(g_con->execute(NdbTransaction::Commit) == 0);
826   g_ndb->closeTransaction(g_con);
827   g_con = 0;
828   g_op = 0;
829 
830   // check data and cmp routines
831   chkrc(verifydata() == 0);
832 
833   for (uint i = 0; i < g_opts.rows; i++)
834     ll3("load " << i << ": " << g_keys[i]);
835   ll0("loaddata: " << g_opts.rows << " rows");
836   return 0;
837 }
838 
839 // bounds
840 
841 struct Bnd {
842   Val m_val;
843   /*
844    * A bound is a partial key value (0 to g_numattrs attributes).
845    * It is not equal to any key value.  Instead, it has a "side".
846    *
847    * side = 0 if the bound is empty
848    * side = -1 if the bound is "just before" its value
849    * side = +1 if the bound is "just after" its value
850    *
851    * This is another way of looking at strictness of non-empty
852    * start and end keys in a range.
853    *
854    * start key is strict if side = +1
855    * end key is strict if side = -1
856    *
857    * NDB API specifies strictness in the bound type of the last
858    * index attribute which is part of the start/end key.
859    *
860    * LE (0) - strict: n - side: -1
861    * LT (1) - strict: y - side: +1
862    * GE (2) - strict: n - side: +1
863    * GT (3) - strict: y - side: -1
864    *
865    * A non-empty bound divides keys into 2 disjoint subsets:
866    * keys before (cmp() == -1) and keys after (cmp() == +1).
867    */
868   int8 m_side;
869   int8 m_lohi; // 0-lo 1-hi as part of Rng
870   Bnd();
871   bool isempty() const;
872   void copy(const Bnd& bnd2); // does not copy m_lohi
873   Bnd& make(uint minattrs);
874   Bnd& make(uint minattrs, const Val& theval);
875   int cmp(const Key& key) const;
876   int cmp(const Bnd& bnd2);
877   int type(uint colno) const; // for setBound
878   void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
879 
880 private:
881   Bnd& operator=(const Bnd&);
882   Bnd(const Bnd&);
883 };
884 
885 static NdbOut&
operator <<(NdbOut & out,const Bnd & bnd)886 operator<<(NdbOut& out, const Bnd& bnd)
887 {
888   if (bnd.m_lohi == 0)
889     out << "L";
890   else if (bnd.m_lohi == 1)
891     out << "H";
892   else
893     out << bnd.m_lohi << "?";
894   out << bnd.m_val;
895   if (bnd.m_side == 0)
896     ;
897   else if (bnd.m_side == -1)
898     out << "-";
899   else if (bnd.m_side == +1)
900     out << "+";
901   return out;
902 }
903 
Bnd()904 Bnd::Bnd()
905 {
906   m_side = 0;
907   m_lohi = -1;
908 }
909 
910 bool
isempty() const911 Bnd::isempty() const
912 {
913   return m_val.m_numattrs == 0;
914 }
915 
916 void
copy(const Bnd & bnd2)917 Bnd::copy(const Bnd& bnd2)
918 {
919   m_val.copy(bnd2.m_val);
920   m_side = bnd2.m_side;
921 }
922 
923 Bnd&
make(uint minattrs)924 Bnd::make(uint minattrs)
925 {
926   require(minattrs <= g_opts.attrs);
927   require(m_lohi == 0 || m_lohi == 1);
928   uint numattrs = minattrs + urandom(g_numattrs - minattrs + 1);
929   m_val.make(numattrs, g_lim_bnd);
930   m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
931   return *this;
932 }
933 
934 Bnd&
make(uint minattrs,const Val & theval)935 Bnd::make(uint minattrs, const Val& theval)
936 {
937   uint numattrs = minattrs + urandom(g_numattrs - minattrs);
938   m_val.copy(theval);
939   m_val.m_numattrs = numattrs;
940   m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
941   return *this;
942 }
943 
944 int
cmp(const Key & key) const945 Bnd::cmp(const Key& key) const
946 {
947   int place; // debug
948   int ret;
949   do {
950     int k = key.m_val.cmp(m_val, m_val.m_numattrs);
951     if (k != 0) {
952       place = 1;
953       ret = k;
954       break;
955     }
956     if (m_side != 0) {
957       place = 2;
958       ret = (-1) * m_side;
959       break;
960     }
961     place = 3;
962     ret = 0;
963     require(m_val.m_numattrs == 0);
964   } while (0);
965   ll3("bnd: " << *this << " cmp key: " << key
966       << " ret: " << ret << " place: " << place);
967   return ret;
968 }
969 
970 int
cmp(const Bnd & bnd2)971 Bnd::cmp(const Bnd& bnd2)
972 {
973   int place; // debug
974   int ret;
975   const Bnd& bnd1 = *this;
976   const Val& val1 = bnd1.m_val;
977   const Val& val2 = bnd2.m_val;
978   const uint numattrs1 = val1.m_numattrs;
979   const uint numattrs2 = val2.m_numattrs;
980   const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2);
981   do {
982     int k = val1.cmp(val2, n);
983     if (k != 0) {
984       place = 1;
985       ret = k;
986       break;
987     }
988     if (numattrs1 < numattrs2) {
989       place = 2;
990       ret = (+1) * bnd1.m_side;
991       break;
992     }
993     if (numattrs1 > numattrs2) {
994       place = 3;
995       ret = (-1) * bnd1.m_side;
996       break;
997     }
998     if (bnd1.m_side < bnd2.m_side) {
999       place = 4;
1000       ret = -1;
1001       break;
1002     }
1003     if (bnd1.m_side > bnd2.m_side) {
1004       place = 5;
1005       ret = +1;
1006       break;
1007     }
1008     place = 6;
1009     ret = 0;
1010   } while (0);
1011   ll3("bnd: " << *this << " cmp bnd: " << bnd2
1012       << " ret: " << ret << " place: " << place);
1013   return ret;
1014 }
1015 
1016 int
type(uint colno) const1017 Bnd::type(uint colno) const
1018 {
1019   int t;
1020   require(colno < m_val.m_numattrs && (m_side == -1 || m_side == +1));
1021   require(m_lohi == 0 || m_lohi == 1);
1022   if (m_lohi == 0) {
1023     if (colno + 1 < m_val.m_numattrs)
1024       t = 0; // LE
1025     else if (m_side == -1)
1026       t = 0; // LE
1027     else
1028       t = 1; // LT
1029   } else {
1030     if (colno + 1 < m_val.m_numattrs)
1031       t = 2; // GE
1032     else if (m_side == +1)
1033       t = 2; // GE
1034     else
1035       t = 3; // GT
1036   }
1037   return t;
1038 }
1039 
1040 void
fromib(const NdbIndexScanOperation::IndexBound & ib,uint j)1041 Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
1042 {
1043   Val& val = m_val;
1044   val.fromib(ib, j);
1045   const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
1046   const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive);
1047   if (numattrs == 0) {
1048     m_side = 0;
1049   } else {
1050     m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1));
1051   }
1052   m_lohi = j;
1053 }
1054 
1055 // stats values
1056 
1057 struct Stval {
1058   Uint32 rir_v2;
1059   double rir;
1060   double rpk[g_numattrs];
1061   bool empty;
1062   char rule[NdbIndexStat::RuleBufferBytes];
1063   Stval();
1064 };
1065 
1066 static NdbOut&
operator <<(NdbOut & out,const Stval & st)1067 operator<<(NdbOut& out, const Stval& st)
1068 {
1069   out << "rir_v2: " << st.rir_v2;
1070   out << " rir_v4: " << st.rir;
1071   out << " rpk:[ ";
1072   for (uint k = 0; k < g_opts.attrs; k++) {
1073     if (k != 0)
1074       out << " ";
1075     out << st.rpk[k];
1076   }
1077   out << " ]";
1078   out << " " << (st.empty ? "E" : "N");
1079   out << " " << st.rule;
1080   return out;
1081 }
1082 
Stval()1083 Stval::Stval()
1084 {
1085   rir_v2 = 0;
1086   rir = 0.0;
1087   for (uint k = 0; k < g_numattrs; k++)
1088     rpk[k] = 0.0;
1089   empty = false;
1090   strcpy(rule, "-");
1091 }
1092 
1093 // ranges
1094 
1095 struct Rng {
1096   Bnd m_bnd[2];
1097   Int32 m_rowcount;
1098   // stats v2
1099   double errpct;
1100   // stats v4
1101   Stval m_st_scan; // exact stats computed from keys in range
1102   Stval m_st_stat; // interpolated kernel stats via g_is
1103   Rng();
1104   uint minattrs() const;
1105   uint maxattrs() const;
1106   bool iseq() const;
1107   bool isempty() const;
1108   void copy(const Rng& rng2);
1109   int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
1110   uint rowcount() const;
1111   void fromib(const NdbIndexScanOperation::IndexBound& ib);
1112 
1113 private:
1114   Rng& operator=(const Rng&);
1115   Rng(const Rng&);
1116 };
1117 
1118 static NdbOut&
operator <<(NdbOut & out,const Rng & rng)1119 operator<<(NdbOut& out, const Rng& rng)
1120 {
1121   out << rng.m_bnd[0] << " " << rng.m_bnd[1];
1122   if (rng.m_rowcount != -1)
1123     out << " rows: " << rng.m_rowcount;
1124   return out;
1125 }
1126 
Rng()1127 Rng::Rng()
1128 {
1129   m_bnd[0].m_lohi = 0;
1130   m_bnd[1].m_lohi = 1;
1131   m_rowcount = -1;
1132 }
1133 
1134 uint
minattrs() const1135 Rng::minattrs() const
1136 {
1137   return std::min(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
1138 }
1139 
1140 uint
maxattrs() const1141 Rng::maxattrs() const
1142 {
1143   return std::max(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
1144 }
1145 
1146 bool
iseq() const1147 Rng::iseq() const
1148 {
1149   return
1150     minattrs() == maxattrs() &&
1151     m_bnd[0].m_val.cmp(m_bnd[1].m_val, minattrs()) == 0 &&
1152     m_bnd[0].m_side < m_bnd[1].m_side;
1153 }
1154 
1155 bool
isempty() const1156 Rng::isempty() const
1157 {
1158   return m_bnd[0].isempty() && m_bnd[1].isempty();
1159 }
1160 
1161 void
copy(const Rng & rng2)1162 Rng::copy(const Rng& rng2)
1163 {
1164   m_bnd[0].copy(rng2.m_bnd[0]);
1165   m_bnd[1].copy(rng2.m_bnd[1]);
1166   m_rowcount = rng2.m_rowcount;
1167 }
1168 
1169 int
cmp(const Key & key) const1170 Rng::cmp(const Key& key) const
1171 {
1172   int place; // debug
1173   int ret;
1174   do  {
1175     int k;
1176     k = m_bnd[0].cmp(key);
1177     if (k < 0) {
1178       place = 1;
1179       ret = -1;
1180       break;
1181     }
1182     k = m_bnd[1].cmp(key);
1183     if (k > 0) {
1184       place = 2;
1185       ret = +1;
1186       break;
1187     }
1188     place = 3;
1189     ret = 0;
1190   } while (0);
1191   ll3("rng: " << *this << " cmp key: " << key
1192       << " ret: " << ret << " place: " << place);
1193   return ret;
1194 }
1195 
1196 uint
rowcount() const1197 Rng::rowcount() const
1198 {
1199   ll3("rowcount: " << *this);
1200   int i;
1201   // binary search for first and last in range
1202   int lim[2];
1203   for (i = 0; i <= 1; i++) {
1204     ll3("search i=" << i);
1205     int lo = -1;
1206     int hi = (int)g_opts.rows;
1207     int ret;
1208     int j;
1209     do {
1210       j = (hi + lo) / 2;
1211       require(lo < j && j < hi);
1212       ret = cmp(g_keys[g_sortkeys[j]]);
1213       if (i == 0) {
1214         if (ret < 0)
1215           lo = j;
1216         else
1217           hi = j;
1218       } else {
1219         if (ret > 0)
1220           hi = j;
1221         else
1222           lo = j;
1223       }
1224     } while (hi - lo > 1);
1225     if (ret == 0)
1226       lim[i] = j;
1227     else if (i == 0)
1228       lim[i] = hi;
1229     else
1230       lim[i] = lo;
1231   }
1232 
1233   // verify is expensive due to makeranges() multiple tries
1234   const bool verify = (urandom(10) == 0);
1235   const int lo = std::max(lim[0], 0);
1236   const int hi = std::min(lim[1], (int)g_opts.rows - 1);
1237   if (verify) {
1238     int pos = -1; // before, within, after
1239     for (i = 0; i < (int)g_opts.rows; i++) {
1240       int k = cmp(g_keys[g_sortkeys[i]]);
1241       if (k < 0)
1242         require(i < lo);
1243       else if (k == 0)
1244         require(lo <= i && i <= hi);
1245       else
1246         require(i > hi);
1247       require(pos <= k);
1248       if (pos < k)
1249         pos = k;
1250     }
1251   }
1252 
1253   // result
1254   require(hi - lo + 1 >= 0);
1255   uint count = hi - lo + 1;
1256   ll3("rowcount: " << count << " lim: " << lim[0] << " " << lim[1]);
1257   return count;
1258 }
1259 
1260 void
fromib(const NdbIndexScanOperation::IndexBound & ib)1261 Rng::fromib(const NdbIndexScanOperation::IndexBound& ib)
1262 {
1263   for (uint j = 0; j <= 1; j++) {
1264     Bnd& bnd = m_bnd[j];
1265     bnd.fromib(ib, j);
1266   }
1267 }
1268 
1269 static Rng* g_rnglist = 0;
1270 
1271 static void
freeranges()1272 freeranges()
1273 {
1274   delete [] g_rnglist;
1275   g_rnglist = 0;
1276 }
1277 
1278 static void
allocranges()1279 allocranges()
1280 {
1281   freeranges();
1282   g_rnglist = new Rng [g_opts.ops];
1283   require(g_rnglist != 0);
1284 }
1285 
1286 static void
makeranges()1287 makeranges()
1288 {
1289   ll1("makeranges");
1290   const uint mintries = 20;
1291   const uint maxtries = 80;
1292   const uint fudgefac = 10;
1293 
1294   for (uint i = 0; i < g_opts.ops; i++) {
1295     const bool eqpart = (urandom(100) < g_opts.eqscans);
1296     const bool eqfull = eqpart && (urandom(100) < g_opts.eqscans);
1297     Rng rng; // candidate
1298     uint j;
1299     for (j = 0; j < maxtries; j++) {
1300       Rng rng2;
1301       if (!eqpart) {
1302         rng2.m_bnd[0].make(0);
1303         rng2.m_bnd[1].make(0);
1304       } else {
1305         const uint mincnt = eqfull ? g_opts.attrs : 1;
1306         rng2.m_bnd[0].make(mincnt);
1307         rng2.m_bnd[1].copy(rng2.m_bnd[0]);
1308         rng2.m_bnd[0].m_side = -1;
1309         rng2.m_bnd[1].m_side = +1;
1310         require(rng2.iseq());
1311       }
1312       rng2.m_rowcount = (Int32)rng2.rowcount();
1313       // 0-discard 1-replace or accept 2-accept
1314       int action = 0;
1315       do {
1316         // first candidate
1317         if (rng.m_rowcount == -1) {
1318           action = 1;
1319           break;
1320         }
1321         require(rng.m_rowcount != -1);
1322         // prefer some bounds
1323         if (rng2.isempty()) {
1324           if (urandom(fudgefac) != 0)
1325             action = 0;
1326           else
1327             action = 1;
1328           break;
1329         }
1330         // prefer some rows
1331         if (rng2.m_rowcount == 0) {
1332           action = 0;
1333           break;
1334         }
1335         // accept if row count under given pct
1336         require((uint)rng2.m_rowcount <= g_opts.rows);
1337         if (100 * (uint)rng2.m_rowcount <= g_opts.scanpct * g_opts.rows) {
1338           if (urandom(fudgefac) != 0) {
1339             action = 2;
1340             break;
1341           }
1342         }
1343         // replace if less rows
1344         if (rng2.m_rowcount < rng.m_rowcount) {
1345           if (urandom(fudgefac) != 0) {
1346             action = 1;
1347             break;
1348           }
1349         }
1350       } while (0);
1351       if (action != 0) {
1352         rng.copy(rng2);
1353         if (action == 2 || j >= mintries)
1354           break;
1355       }
1356     }
1357     g_rnglist[i].copy(rng);
1358     ll2("rng " << i << ": " << rng << " tries: " << j);
1359   }
1360 }
1361 
1362 // verify ranges via range scans
1363 
1364 static int
setbounds(const Rng & rng)1365 setbounds(const Rng& rng)
1366 {
1367   // currently must do each attr in order
1368   ll3("setbounds: " << rng);
1369   uint i;
1370   const Bnd (&bnd)[2] = rng.m_bnd;
1371   for (i = 0; i < g_numattrs; i++) {
1372     const Uint32 no = i; // index attribute number
1373     uint j;
1374     int type[2] = { -1, -1 };
1375     // determine inclusivity (boundtype) of upper+lower bounds on this col.
1376     // -1 == no bound on the col.
1377     for (j = 0; j <= 1; j++) {
1378       if (no < bnd[j].m_val.m_numattrs)
1379         type[j] = bnd[j].type(no);
1380     }
1381     for (j = 0; j <= 1; j++) {
1382       int t = type[j];
1383       if (t == -1)
1384         continue;
1385       if (no + 1 < bnd[j].m_val.m_numattrs)
1386         t &= ~(uint)1; // strict bit is set on last bound only
1387       const Val& val = bnd[j].m_val;
1388       const void* addr = 0;
1389       if (no == 0)
1390         addr = ! val.b_null ? (const void*)&val.b : 0;
1391       else if (no == 1)
1392         addr = ! val.c_null ? (const void*)val.c : 0;
1393       else if (no == 2)
1394         addr = ! val.d_null ? (const void*)&val.d : 0;
1395       else
1396         require(false);
1397       ll3("setBound attr:" << no << " type:" << t << " val: " << val);
1398       chkdb(g_rangescan_op->setBound(no, t, addr) == 0);
1399     }
1400   }
1401   return 0;
1402 }
1403 
1404 static int
scanrange(const Rng & rng)1405 scanrange(const Rng& rng)
1406 {
1407   ll3("scanrange: " << rng);
1408   chkdb((g_con = g_ndb->startTransaction()) != 0);
1409   chkdb((g_rangescan_op = g_con->getNdbIndexScanOperation(g_ind, g_tab)) != 0);
1410   chkdb(g_rangescan_op->readTuples() == 0);
1411   chkrc(setbounds(rng) == 0);
1412   Uint32 a;
1413   char* a_addr = (char*)&a;
1414   Uint32 no = 0;
1415   chkdb(g_rangescan_op->getValue(no++, a_addr) != 0);
1416   chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
1417   uint count = 0;
1418   uint i;
1419   for (i = 0; i < g_opts.rows; i++) {
1420     Key& key = g_keys[i];
1421     key.m_flag = false; // not scanned
1422   }
1423   while (1) {
1424     int ret;
1425     a = ~(Uint32)0;
1426     chkdb((ret = g_rangescan_op->nextResult()) == 0 || ret == 1);
1427     if (ret == 1)
1428       break;
1429     i = (uint)a;
1430     chkrc(i < g_opts.rows);
1431     Key& key = g_keys[i];
1432     ll3("scan: " << key);
1433     int k = rng.cmp(key);
1434     chkrc(k == 0);
1435     chkrc(key.m_flag == false);
1436     key.m_flag = true;
1437     count++;
1438   }
1439   g_ndb->closeTransaction(g_con);
1440   g_con = 0;
1441   g_rangescan_op = 0;
1442 
1443   for (i = 0; i < g_opts.rows; i++) {
1444     Key& key = g_keys[i];
1445     int k = rng.cmp(key);
1446     if (k != 0) // not in range
1447       chkrc(key.m_flag == false);
1448     else
1449       chkrc(key.m_flag == true);
1450     key.m_flag = -1; // forget
1451   }
1452   require((uint)rng.m_rowcount == count);
1453   return 0;
1454 }
1455 
1456 static int
scanranges()1457 scanranges()
1458 {
1459   ll1("scanranges");
1460   for (uint i = 0; i < g_opts.ops; i++) {
1461     const Rng& rng = g_rnglist[i];
1462     chkrc(scanrange(rng) == 0);
1463   }
1464   return 0;
1465 }
1466 
1467 // stats v4 update
1468 
1469 static int
definestat()1470 definestat()
1471 {
1472   ll1("definestat");
1473   require(g_is != 0 && g_ind != 0 && g_tab != 0);
1474   chkdb(g_is->set_index(*g_ind, *g_tab) == 0);
1475   return 0;
1476 }
1477 
1478 static int
updatestat()1479 updatestat()
1480 {
1481   ll1("updatestat");
1482   if (urandom(2) == 0) {
1483     g_dic = g_ndb->getDictionary();
1484     chkdb(g_dic->updateIndexStat(*g_ind, *g_tab) == 0);
1485     g_dic = 0;
1486   } else {
1487     chkdb(g_is->update_stat(g_ndb_sys) == 0);
1488   }
1489   return 0;
1490 }
1491 
1492 static int
readstat()1493 readstat()
1494 {
1495   ll1("readstat");
1496 
1497   NdbIndexStat::Head head;
1498   chkdb(g_is->read_head(g_ndb_sys) == 0);
1499   g_is->get_head(head);
1500   chkrc(head.m_found == true);
1501   chkrc(head.m_sampleVersion != 0);
1502   ll1("readstat:"
1503       << " sampleVersion: " << head.m_sampleVersion
1504       << " sampleCount: " << head.m_sampleCount);
1505 
1506   NdbIndexStat::CacheInfo infoQuery;
1507   chkdb(g_is->read_stat(g_ndb_sys) == 0);
1508   g_is->move_cache();
1509   g_is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
1510   ll1("readstat: cache bytes: " << infoQuery.m_totalBytes);
1511   return 0;
1512 }
1513 
1514 // test polling after updatestat
1515 
1516 static int
startlistener()1517 startlistener()
1518 {
1519   ll1("startlistener");
1520   chkdb(g_is->create_listener(g_ndb_sys) == 0);
1521   chkdb(g_is->execute_listener(g_ndb_sys) == 0);
1522   return 0;
1523 }
1524 
1525 static int
runlistener()1526 runlistener()
1527 {
1528   ll1("runlistener");
1529   int ret;
1530   chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1);
1531   chkrc(ret == 1);
1532   // one event is expected
1533   chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
1534   chkrc(ret == 1);
1535   chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
1536   chkrc(ret == 0);
1537   return 0;
1538 }
1539 
1540 static int
stoplistener()1541 stoplistener()
1542 {
1543   ll1("stoplistener");
1544   chkdb(g_is->drop_listener(g_ndb_sys) != -1);
1545   return 0;
1546 }
1547 
1548 // stats queries
1549 
1550 // exact stats from scan results
1551 static void
queryscan(Rng & rng)1552 queryscan(Rng& rng)
1553 {
1554   ll3("queryscan");
1555 
1556   uint rir;
1557   uint unq[g_numattrs];
1558   rir = 0;
1559   for (uint k = 0; k < g_opts.attrs; k++)
1560     unq[0] = 0;
1561   Key prevkey;
1562   for (uint i = 0; i < g_opts.rows; i++) {
1563     const Key& key = g_keys[g_sortkeys[i]];
1564     int res = rng.cmp(key);
1565     if (res != 0)
1566       continue;
1567     rir++;
1568     if (rir == 1) {
1569       for (uint k = 0; k < g_opts.attrs; k++)
1570         unq[k] = 1;
1571     } else {
1572       uint num_eq = ~0;
1573       int res = prevkey.m_val.cmp(key.m_val, g_opts.attrs, &num_eq);
1574       if (res == 0)
1575         require(num_eq == g_opts.attrs);
1576       else {
1577         require(res < 0);
1578         require(num_eq < g_opts.attrs);
1579         unq[num_eq]++;
1580         // propagate down
1581         for (uint k = num_eq + 1; k < g_opts.attrs; k++)
1582           unq[k]++;
1583       }
1584     }
1585     prevkey.m_val.copy(key.m_val);
1586   }
1587   require(rng.m_rowcount != -1);
1588   require((uint)rng.m_rowcount == rir);
1589 
1590   Stval& st = rng.m_st_scan;
1591   st.rir_v2 = rir;
1592   st.rir = rir == 0 ? 1.0 : (double)rir;
1593   for (uint k = 0; k < g_opts.attrs; k++) {
1594     if (rir == 0)
1595       st.rpk[k] = 1.0;
1596     else {
1597       require(rir >= unq[k]);
1598       require(unq[k] != 0);
1599       st.rpk[k] = (double)rir / (double)unq[k];
1600     }
1601   }
1602   st.empty = (rir == 0);
1603   ll2("queryscan: " << st);
1604 }
1605 
1606 /* This method initialises the passed in IndexBound
1607  * to represent the range passed in.
1608  * It assumes that the storage pointed to by low_key
1609  * and high_key in the passed IndexBound can be overwritten
1610  * and is long enough to store the data
1611  */
1612 static int
initialiseIndexBound(const Rng & rng,NdbIndexScanOperation::IndexBound & ib,my_record * low_key,my_record * high_key)1613 initialiseIndexBound(const Rng& rng,
1614                      NdbIndexScanOperation::IndexBound& ib,
1615                      my_record* low_key, my_record* high_key)
1616 {
1617   ll3("initialiseIndexBound: " << rng);
1618   uint i;
1619   const Bnd (&bnd)[2] = rng.m_bnd;
1620   Uint32 colsInBound[2]= {0, 0};
1621   bool boundInclusive[2]= {false, false};
1622 
1623   memset(&ib, 0xf1, sizeof(ib));
1624   memset(low_key, 0xf2, sizeof(*low_key));
1625   memset(high_key, 0xf3, sizeof(*high_key));
1626 
1627   // Clear nullbit storage
1628   low_key->m_null_bm = 0;
1629   high_key->m_null_bm = 0;
1630 
1631   for (i = 0; i < g_numattrs; i++) {
1632     const Uint32 no = i; // index attribute number
1633     uint j;
1634     int type[2] = { -1, -1 };
1635     // determine inclusivity (boundtype) of upper+lower bounds on this col.
1636     // -1 == no bound on the col.
1637     for (j = 0; j <= 1; j++) {
1638       if (no < bnd[j].m_val.m_numattrs)
1639         type[j] = bnd[j].type(no);
1640     }
1641     for (j = 0; j <= 1; j++) {
1642       /* Get ptr to key storage space for this bound */
1643       my_record* keyBuf= (j==0) ? low_key : high_key;
1644       int t = type[j];
1645       if (t == -1)
1646         continue;
1647       colsInBound[j]++;
1648 
1649       if (no + 1 >= bnd[j].m_val.m_numattrs)
1650         // Last column in bound, inclusive if GE or LE (or EQ)
1651         // i.e. bottom bit of boundtype is clear
1652         boundInclusive[j]= !(t & 1);
1653 
1654       const Val& val = bnd[j].m_val;
1655       if (no == 0)
1656       {
1657         if (! val.b_null)
1658           keyBuf->m_b= val.b;
1659 
1660         if (g_b_nullable)
1661           keyBuf->m_null_bm |= ((val.b_null?1:0) << g_ndbrec_b_nb_offset);
1662       }
1663       else if (no == 1)
1664       {
1665         if (! val.c_null)
1666           memcpy(&keyBuf->m_c[0], (const void*)&val.c, 1+ g_charlen);
1667 
1668         if (g_c_nullable)
1669           keyBuf->m_null_bm |= ((val.c_null?1:0) << g_ndbrec_c_nb_offset);
1670       }
1671       else if (no == 2)
1672       {
1673         if (! val.d_null)
1674           keyBuf->m_d= val.d;
1675 
1676         if (g_d_nullable)
1677           keyBuf->m_null_bm |= ((val.d_null?1:0) << g_ndbrec_d_nb_offset);
1678       }
1679       else
1680         require(false);
1681       ll3("initialiseIndexBound attr:" << no << " type:" << t << " val: " << val);
1682     }
1683   }
1684 
1685   /* Now have everything we need to initialise the IndexBound */
1686   ib.low_key = (char*)low_key;
1687   ib.low_key_count= colsInBound[0];
1688   ib.low_inclusive= boundInclusive[0];
1689   ib.high_key = (char*)high_key;
1690   ib.high_key_count= colsInBound[1];
1691   ib.high_inclusive= boundInclusive[1];
1692   ib.range_no= 0;
1693 
1694   ll3(" indexBound low_key_count=" << ib.low_key_count <<
1695       " low_inc=" << ib.low_inclusive <<
1696       " high_key_count=" << ib.high_key_count <<
1697       " high_inc=" << ib.high_inclusive);
1698   ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
1699       " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
1700       " first byte=" << ib.low_key[0]);
1701   ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
1702       " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
1703       " first byte=" << ib.high_key[0]);
1704 
1705   // verify by reverse
1706   {
1707     Rng rng;
1708     rng.fromib(ib);
1709     require(rng.m_bnd[0].cmp(bnd[0]) == 0);
1710     require(rng.m_bnd[1].cmp(bnd[1]) == 0);
1711   }
1712   return 0;
1713 }
1714 
1715 static int
querystat_v2(Rng & rng)1716 querystat_v2(Rng& rng)
1717 {
1718   ll3("querystat_v2");
1719 
1720   /* Create IndexBound and key storage space */
1721   NdbIndexScanOperation::IndexBound ib;
1722   my_record low_key;
1723   my_record high_key;
1724 
1725   chkdb((g_con = g_ndb->startTransaction()) != 0);
1726   chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
1727 
1728   Uint64 count = ~(Uint64)0;
1729   chkdb(g_is->records_in_range(g_ind,
1730                                  g_con,
1731                                  g_ind_rec,
1732                                  g_tab_rec,
1733                                  &ib,
1734                                  0,
1735                                  &count,
1736                                  0) == 0);
1737   g_ndb->closeTransaction(g_con);
1738   g_con = 0;
1739   g_rangescan_op = 0;
1740 
1741   Stval& st = rng.m_st_stat;
1742   chkrc(count < (1 << 30));
1743   st.rir_v2 = (Uint32)count;
1744   ll2("querystat_v2: " << st.rir_v2 << " rows");
1745   return 0;
1746 }
1747 
1748 static int
querystat(Rng & rng)1749 querystat(Rng& rng)
1750 {
1751   ll3("querystat");
1752 
1753   // set up range
1754   Uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
1755   Uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
1756   NdbIndexStat::Bound bound_lo(g_is, bound_lo_buffer);
1757   NdbIndexStat::Bound bound_hi(g_is, bound_hi_buffer);
1758   NdbIndexStat::Range range(bound_lo, bound_hi);
1759 
1760   // convert to IndexBound (like in mysqld)
1761   NdbIndexScanOperation::IndexBound ib;
1762   my_record low_key;
1763   my_record high_key;
1764   chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
1765   chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
1766 
1767   // index stat query
1768   Uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
1769   NdbIndexStat::Stat stat(stat_buffer);
1770   chkdb(g_is->query_stat(range, stat) == 0);
1771 
1772   // save result
1773   Stval& st = rng.m_st_stat;
1774   g_is->get_rir(stat, &st.rir);
1775   for (uint k = 0; k < g_opts.attrs; k++) {
1776     g_is->get_rpk(stat, k, &st.rpk[k]);
1777   }
1778   g_is->get_empty(stat, &st.empty);
1779   g_is->get_rule(stat, st.rule);
1780 
1781   ll2("querystat: " << st);
1782   return 0;
1783 }
1784 
1785 static int
queryranges()1786 queryranges()
1787 {
1788   ll2("queryranges");
1789   for (uint i = 0; i < g_opts.ops; i++) {
1790     Rng& rng = g_rnglist[i];
1791     ll1("rng " << i << ": " << rng);
1792     // exact stats
1793     queryscan(rng);
1794     // interpolated stats
1795     chkrc(querystat_v2(rng) == 0);
1796     chkrc(querystat(rng) == 0);
1797     const Stval& st1 = rng.m_st_scan;
1798     const Stval& st2 = rng.m_st_stat;
1799     // if rir v2 is zero then it is exact
1800     chkrc(st2.rir_v2 != 0 || st1.rir_v2 == 0);
1801   }
1802   return 0;
1803 }
1804 
1805 // general statistics methods
1806 
1807 struct Stats : public NDBT_Stats {
1808   Stats();
1809   void add(double x2);
1810   void add(const Stats& sum2);
1811 };
1812 
1813 static NdbOut&
operator <<(NdbOut & out,const Stats & st)1814 operator<<(NdbOut& out, const Stats& st)
1815 {
1816   out << "count: " << st.getCount()
1817       << " min: " << st.getMin()
1818       << " max: " << st.getMax()
1819       << " mean: " << st.getMean()
1820       << " stddev: " << st.getStddev();
1821   return out;
1822 }
1823 
Stats()1824 Stats::Stats()
1825 {
1826 }
1827 
1828 void
add(double x2)1829 Stats::add(double x2)
1830 {
1831   addObservation(x2);
1832 }
1833 
1834 void
add(const Stats & st2)1835 Stats::add(const Stats& st2)
1836 {
1837   *this += st2;
1838 }
1839 
1840 // error statistics scan vs stat
1841 
1842 struct Sterr {
1843   Stats rir_v2;
1844   Stats rir;
1845   Stats rpk[g_numattrs];
1846   Sterr();
1847   void add(const Sterr& st2);
1848 };
1849 
1850 static NdbOut&
operator <<(NdbOut & out,const Sterr & st)1851 operator<<(NdbOut& out, const Sterr& st)
1852 {
1853   out << "rir_v2: " << st.rir_v2 << endl;
1854   out << "rir_v4: " << st.rir;
1855   for (uint k = 0; k < g_opts.attrs; k++) {
1856     out << endl << "rpk[" << k << "]: " << st.rpk[k];
1857   }
1858   return out;
1859 }
1860 
Sterr()1861 Sterr::Sterr()
1862 {
1863 }
1864 
1865 void
add(const Sterr & st2)1866 Sterr::add(const Sterr& st2)
1867 {
1868   rir_v2.add(st2.rir_v2);
1869   rir.add(st2.rir);
1870   for (uint k = 0; k < g_opts.attrs; k++) {
1871     rpk[k].add(st2.rpk[k]);
1872   }
1873 }
1874 
1875 static void
sumrange(const Rng & rng,Sterr & st)1876 sumrange(const Rng& rng, Sterr& st)
1877 {
1878   const Stval& st1 = rng.m_st_scan;
1879   const Stval& st2 = rng.m_st_stat;
1880 
1881   // rir_v2 error as pct of total rows
1882   {
1883     double rows = (double)g_opts.rows;
1884     double x1 = (double)st1.rir_v2;
1885     double x2 = (double)st2.rir_v2;
1886     double x3 = 100.0 * (x2 - x1) / rows;
1887     st.rir_v2.add(x3);
1888   }
1889 
1890   // rir error as pct of total rows
1891   {
1892     double rows = (double)g_opts.rows;
1893     double x1 = st1.rir;
1894     double x2 = st2.rir;
1895     double x3 = 100.0 * (x2 - x1) / rows;
1896     st.rir.add(x3);
1897   }
1898 
1899   // rpk errors as plain diff
1900   for (uint k = 0; k < g_opts.attrs; k++) {
1901     double x1 = st1.rpk[k];
1902     double x2 = st2.rpk[k];
1903     double x3 = (x2 - x1);
1904     st.rpk[k].add(x3);
1905   }
1906 }
1907 
1908 static void
sumranges(Sterr & st)1909 sumranges(Sterr& st)
1910 {
1911   for (uint i = 0; i < g_opts.ops; i++) {
1912     const Rng& rng = g_rnglist[i];
1913     sumrange(rng, st);
1914   }
1915 }
1916 
1917 // loop and final stats
1918 
1919 static Sterr g_sterr;
1920 
1921 static void
loopstats()1922 loopstats()
1923 {
1924   Sterr st;
1925   sumranges(st);
1926   if (g_opts.loops != 1) {
1927     ll0("=== loop " << g_loop << " summary ===");
1928     ll0(st);
1929   }
1930   // accumulate
1931   g_sterr.add(st);
1932 }
1933 
1934 static int
loopdumps()1935 loopdumps()
1936 {
1937   char file[200];
1938   if (g_opts.dump == 0)
1939     return 0;
1940   {
1941     BaseString::snprintf(file, sizeof(file),
1942                          "%s.key.%d", g_opts.dump, g_loop);
1943     FILE* f = 0;
1944     chker((f = fopen(file, "w")) != 0);
1945     fprintf(f, "a");
1946     for (uint k = 0; k < g_opts.attrs; k++) {
1947       if (k == 0)
1948         fprintf(f, ",b_null,b");
1949       else if (k == 1)
1950         fprintf(f, ",c_null,c");
1951       else if (k == 2)
1952         fprintf(f, ",d_null,d");
1953       else
1954         require(false);
1955     }
1956     fprintf(f, "\n");
1957     for (uint i = 0; i < g_opts.rows; i++) {
1958       const Key& key = g_keys[g_sortkeys[i]];
1959       const Val& val = key.m_val;
1960       fprintf(f, "%u", i);
1961       for (uint k = 0; k < g_opts.attrs; k++) {
1962         if (k == 0) {
1963           fprintf(f, ",%d,", val.b_null);
1964           if (!val.b_null)
1965             fprintf(f, "%u", val.b);
1966         } else if (k == 1) {
1967           fprintf(f, ",%d,", val.c_null);
1968           if (!val.c_null)
1969             fprintf(f, "%.*s", val.c[0], &val.c[1]);
1970         } else if (k == 2) {
1971           fprintf(f, ",%d,", val.d_null);
1972           if (!val.d_null)
1973             fprintf(f, "%u", val.d);
1974         } else {
1975           require(false);
1976         }
1977       }
1978       fprintf(f, "\n");
1979     }
1980     chker(fclose(f) == 0);
1981   }
1982   {
1983     BaseString::snprintf(file, sizeof(file),
1984                          "%s.range.%d", g_opts.dump, g_loop);
1985     FILE* f = 0;
1986     chker((f = fopen(file, "w")) != 0);
1987     fprintf(f, "op");
1988     for (uint j = 0; j <= 1; j++) {
1989       const char* suf = (j == 0 ? "_lo" : "_hi");
1990       fprintf(f, ",attrs%s", suf);
1991       for (uint k = 0; k < g_opts.attrs; k++) {
1992         if (k == 0)
1993           fprintf(f, ",b_null%s,b%s", suf, suf);
1994         else if (k == 1)
1995           fprintf(f, ",c_null%s,c%s", suf, suf);
1996         else if (k == 2)
1997           fprintf(f, ",d_null%s,d%s", suf, suf);
1998         else
1999           require(false);
2000       }
2001       fprintf(f, ",side%s", suf);
2002     }
2003     fprintf(f, "\n");
2004     for (uint i = 0; i < g_opts.ops; i++) {
2005       const Rng& rng = g_rnglist[i];
2006       fprintf(f, "%u", i);
2007       for (uint j = 0; j <= 1; j++) {
2008         const Bnd& bnd = rng.m_bnd[j];
2009         const Val& val = bnd.m_val;
2010         fprintf(f, ",%u", val.m_numattrs);
2011         for (uint k = 0; k < g_opts.attrs; k++) {
2012           if (k >= val.m_numattrs)
2013             fprintf(f, ",,");
2014           else if (k == 0) {
2015             fprintf(f, ",%d,", val.b_null);
2016             if (!val.b_null)
2017               fprintf(f, "%u", val.b);
2018           } else if (k == 1) {
2019             fprintf(f, ",%d,", val.c_null);
2020             if (!val.c_null)
2021               fprintf(f, "%.*s", val.c[0], &val.c[1]);
2022           } else if (k == 2) {
2023             fprintf(f, ",%d,", val.d_null);
2024             if (!val.d_null)
2025               fprintf(f, "%u", val.d);
2026           } else {
2027             require(false);
2028           }
2029         }
2030         fprintf(f, ",%d", bnd.m_side);
2031       }
2032       fprintf(f, "\n");
2033     }
2034     chker(fclose(f) == 0);
2035   }
2036   {
2037     BaseString::snprintf(file, sizeof(file),
2038                          "%s.stat.%d", g_opts.dump, g_loop);
2039     FILE* f = 0;
2040     chker((f = fopen(file, "w")) != 0);
2041     fprintf(f, "op");
2042     for (uint j = 0; j <= 1; j++) {
2043       const char* suf = (j == 0 ? "_scan" : "_stat");
2044       fprintf(f, ",rir_v2%s", suf);
2045       fprintf(f, ",rir%s", suf);
2046       for (uint k = 0; k < g_opts.attrs; k++) {
2047         fprintf(f, ",rpk_%u%s", k, suf);
2048       }
2049       fprintf(f, ",empty%s", suf);
2050       if (j == 1)
2051         fprintf(f, ",rule%s", suf);
2052     }
2053     fprintf(f, "\n");
2054     for (uint i = 0; i < g_opts.ops; i++) {
2055       const Rng& rng = g_rnglist[i];
2056       fprintf(f, "%u", i);
2057       for (uint j = 0; j <= 1; j++) {
2058         const Stval& st = (j == 0 ? rng.m_st_scan : rng.m_st_stat);
2059         fprintf(f, ",%u", st.rir_v2);
2060         fprintf(f, ",%.2f", st.rir);
2061         for (uint k = 0; k < g_opts.attrs; k++) {
2062           fprintf(f, ",%.2f", st.rpk[k]);
2063         }
2064         fprintf(f, ",%d", st.empty);
2065         if (j == 1)
2066           fprintf(f, ",%s", st.rule);
2067       }
2068       fprintf(f, "\n");
2069     }
2070     chker(fclose(f) == 0);
2071   }
2072   return 0;
2073 }
2074 
2075 static void
finalstats()2076 finalstats()
2077 {
2078   ll0("=== summary ===");
2079   ll0(g_sterr);
2080 }
2081 
2082 static int
runtest()2083 runtest()
2084 {
2085   ll1("sizeof Val: " << sizeof(Val));
2086   ll1("sizeof Key: " << sizeof(Key));
2087   ll1("sizeof Bnd: " << sizeof(Bnd));
2088   ll1("sizeof Rng: " << sizeof(Rng));
2089 
2090   uint seed = g_opts.seed;
2091   if (seed != 1) { // not loop number
2092     if (seed == 0) { // random
2093       seed = 2 + NdbHost_GetProcessId();
2094     }
2095     ll0("random seed is " << seed);
2096     srand(seed);
2097   } else {
2098     ll0("random seed is " << "loop number");
2099   }
2100   g_cs = get_charset_by_name(g_csname, MYF(0));
2101   if (g_cs == 0)
2102     g_cs = get_charset_by_csname(g_csname, MY_CS_PRIMARY, MYF(0));
2103   chkrc(g_cs != 0);
2104 
2105   allockeys();
2106   allocranges();
2107   chkrc(createtable() == 0);
2108   chkrc(createindex() == 0);
2109   chkrc(createNdbRecords() == 0);
2110   chkrc(definestat() == 0);
2111   chkrc(startlistener() == 0);
2112 
2113   for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
2114     ll0("=== loop " << g_loop << " ===");
2115     uint seed = g_opts.seed;
2116     if (seed == 1) { // loop number
2117       seed = g_loop;
2118       srand(seed);
2119     }
2120     makekeys();
2121     chkrc_break(loaddata(g_loop != 0) == 0);
2122     makeranges();
2123     chkrc_break(scanranges() == 0);
2124     chkrc_break(updatestat() == 0);
2125     chkrc_break(runlistener() == 0);
2126     chkrc_break(readstat() == 0);
2127     chkrc_break(queryranges() == 0);
2128     loopstats();
2129     chkrc_break(loopdumps() == 0);
2130   }
2131   finalstats();
2132 
2133   chkrc(stoplistener() == 0);
2134   if (!g_opts.keeptable)
2135     chkrc(droptable() == 0);
2136   freeranges();
2137   freekeys();
2138   return 0;
2139 }
2140 
2141 static int
doconnect()2142 doconnect()
2143 {
2144   g_ncc = new Ndb_cluster_connection();
2145   require(g_ncc != 0);
2146   chkdb(g_ncc->connect(30) == 0);
2147   g_ndb = new Ndb(g_ncc, "TEST_DB");
2148   require(g_ndb != 0);
2149   chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0);
2150   g_ndb_sys = new Ndb(g_ncc, "mysql");
2151   require(g_ndb_sys != 0);
2152   chkdb(g_ndb_sys->init() == 0 && g_ndb_sys->waitUntilReady(30) == 0);
2153   g_is = new NdbIndexStat;
2154   require(g_is != 0);
2155   return 0;
2156 }
2157 
2158 static void
dodisconnect()2159 dodisconnect()
2160 {
2161   delete g_is;
2162   delete g_ndb_sys;
2163   delete g_ndb;
2164   delete g_ncc;
2165 }
2166 
2167 static struct my_option
2168 my_long_options[] =
2169 {
2170   NDB_STD_OPTS("testIndexStat"),
2171   { "loglevel", NDB_OPT_NOSHORT,
2172     "Logging level in this program 0-3 (default 0)",
2173     (uchar **)&g_opts.loglevel, (uchar **)&g_opts.loglevel, 0,
2174     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2175   { "seed", NDB_OPT_NOSHORT, "Random seed (default 0=random, 1=loop number)",
2176     (uchar **)&g_opts.seed, (uchar **)&g_opts.seed, 0,
2177     GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2178   { "loops", NDB_OPT_NOSHORT, "Number of test loops (default 1, 0=forever)",
2179     (uchar **)&g_opts.loops, (uchar **)&g_opts.loops, 0,
2180     GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
2181   { "rows", NDB_OPT_NOSHORT, "Number of rows (default 10000)",
2182     (uchar **)&g_opts.rows, (uchar **)&g_opts.rows, 0,
2183     GET_UINT, REQUIRED_ARG, 100000, 0, 0, 0, 0, 0 },
2184   { "ops", NDB_OPT_NOSHORT,"Number of index scans per loop (default 100)",
2185     (uchar **)&g_opts.ops, (uchar **)&g_opts.ops, 0,
2186     GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
2187   { "nullkeys", NDB_OPT_NOSHORT, "Pct nulls in each key attribute (default 10)",
2188     (uchar **)&g_opts.nullkeys, (uchar **)&g_opts.nullkeys, 0,
2189     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2190   { "rpk", NDB_OPT_NOSHORT, "Avg records per full key (default 10)",
2191     (uchar **)&g_opts.rpk, (uchar **)&g_opts.rpk, 0,
2192     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2193   { "rpkvar", NDB_OPT_NOSHORT, "Vary rpk by factor (default 10, none 1)",
2194     (uchar **)&g_opts.rpkvar, (uchar **)&g_opts.rpkvar, 0,
2195     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2196   { "scanpct", NDB_OPT_NOSHORT,
2197     "Preferred max pct of total rows per scan (default 10)",
2198     (uchar **)&g_opts.scanpct, (uchar **)&g_opts.scanpct, 0,
2199     GET_UINT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
2200   { "eqscans", NDB_OPT_NOSHORT,
2201     "Pct scans for partial/full equality (default 30)",
2202     (uchar **)&g_opts.eqscans, (uchar **)&g_opts.eqscans, 0,
2203     GET_UINT, REQUIRED_ARG, 50, 0, 0, 0, 0, 0 },
2204   { "keeptable", NDB_OPT_NOSHORT,
2205     "Do not drop table at exit",
2206     (uchar **)&g_opts.keeptable, (uchar **)&g_opts.keeptable, 0,
2207     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
2208   { "abort", NDB_OPT_NOSHORT, "Dump core on any error",
2209     (uchar **)&g_opts.abort, (uchar **)&g_opts.abort, 0,
2210     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
2211   { "dump", NDB_OPT_NOSHORT, "Write CSV files name.* of keys,ranges,stats",
2212     (uchar **)&g_opts.dump, (uchar **)&g_opts.dump, 0,
2213     GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2214   { 0, 0, 0,
2215     0, 0, 0,
2216     GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
2217 };
2218 
2219 static void
short_usage_sub()2220 short_usage_sub()
2221 {
2222   ndb_short_usage_sub(NULL);
2223 }
2224 
2225 static void
usage()2226 usage()
2227 {
2228   ndbout << my_progname << ": ordered index stats test" << endl;
2229 }
2230 
2231 static int
checkoptions()2232 checkoptions()
2233 {
2234   chkrc(g_opts.rows != 0);
2235   chkrc(g_opts.nullkeys <= 100);
2236   chkrc(g_opts.rpk != 0);
2237   g_opts.rpk = std::min(g_opts.rpk, g_opts.rows);
2238   chkrc(g_opts.rpkvar != 0);
2239   chkrc(g_opts.scanpct <= 100);
2240   chkrc(g_opts.eqscans <= 100);
2241   // set value limits
2242   g_lim_val.all_nullable = false;
2243   g_lim_bnd.all_nullable = true;
2244   g_lim_val.b_min = g_opts.rows;
2245   g_lim_val.b_max = 2 * g_opts.rows;
2246   g_lim_bnd.b_min = 90 * g_lim_val.b_min / 100;
2247   g_lim_bnd.b_max = 110 * g_lim_val.b_max / 100;
2248   g_lim_val.c_char = "bcd";
2249   g_lim_bnd.c_char = "abcde";
2250   g_lim_val.d_min = 100;
2251   g_lim_val.d_max = 200;
2252   g_lim_bnd.d_min = 0;
2253   g_lim_bnd.d_max = 300;
2254   return 0;
2255 }
2256 
2257 static
2258 int
docreate_stat_tables()2259 docreate_stat_tables()
2260 {
2261   if (g_is->check_systables(g_ndb_sys) == 0)
2262     return 0;
2263   ll1("check_systables: " << g_is->getNdbError());
2264 
2265   ll0("create stat tables");
2266   chkdb(g_is->create_systables(g_ndb_sys) == 0);
2267   g_has_created_stat_tables = true;
2268   return 0;
2269 }
2270 
2271 static
2272 int
dodrop_stat_tables()2273 dodrop_stat_tables()
2274 {
2275   if (g_has_created_stat_tables == false)
2276     return 0;
2277 
2278   ll0("drop stat tables");
2279   chkdb(g_is->drop_systables(g_ndb_sys) == 0);
2280   return 0;
2281 }
2282 
2283 static int
docreate_stat_events()2284 docreate_stat_events()
2285 {
2286   if (g_is->check_sysevents(g_ndb_sys) == 0)
2287     return 0;
2288   ll1("check_sysevents: " << g_is->getNdbError());
2289 
2290   ll0("create stat events");
2291   chkdb(g_is->create_sysevents(g_ndb_sys) == 0);
2292   g_has_created_stat_events = true;
2293   return 0;
2294 }
2295 
2296 static int
dodrop_stat_events()2297 dodrop_stat_events()
2298 {
2299   if (g_has_created_stat_events == false)
2300     return 0;
2301 
2302   ll0("drop stat events");
2303   chkdb(g_is->drop_sysevents(g_ndb_sys) == 0);
2304   return 0;
2305 }
2306 
2307 static int
docreate_sys_objects()2308 docreate_sys_objects()
2309 {
2310   require(g_is != 0 && g_ndb_sys != 0);
2311   chkrc(docreate_stat_tables() == 0);
2312   chkrc(docreate_stat_events() == 0);
2313   return 0;
2314 }
2315 
2316 static int
dodrop_sys_objects()2317 dodrop_sys_objects()
2318 {
2319   require(g_is != 0 && g_ndb_sys != 0);
2320   chkrc(dodrop_stat_events() == 0);
2321   chkrc(dodrop_stat_tables() == 0);
2322   return 0;
2323 }
2324 
2325 int
main(int argc,char ** argv)2326 main(int argc, char** argv)
2327 {
2328   ndb_init();
2329   my_progname = strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
2330   uint i;
2331   ndbout << my_progname;
2332   for (i = 1; i < (uint)argc; i++)
2333     ndbout << " " << argv[i];
2334   ndbout << endl;
2335   int ret;
2336   ndb_opt_set_usage_funcs(short_usage_sub, usage);
2337   ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
2338   if (ret != 0 || argc != 0) {
2339     ll0("wrong args");
2340     return NDBT_ProgramExit(NDBT_WRONGARGS);
2341   }
2342   if (checkoptions() == -1) {
2343     ll0("invalid args");
2344     return NDBT_ProgramExit(NDBT_WRONGARGS);
2345   }
2346   if (doconnect() == -1) {
2347     ll0("connect failed");
2348     return NDBT_ProgramExit(NDBT_FAILED);
2349   }
2350   if (docreate_sys_objects() == -1) {
2351     ll0("failed to check or create stat tables and events");
2352     goto failed;
2353   }
2354   if (runtest() == -1) {
2355     ll0("test failed");
2356     goto failed;
2357   }
2358   if (dodrop_sys_objects() == -1) {
2359     ll0("failed to drop created stat tables or events");
2360     goto failed;
2361   }
2362   dodisconnect();
2363   return NDBT_ProgramExit(NDBT_OK);
2364 failed:
2365   (void)dodrop_sys_objects();
2366   dodisconnect();
2367   return NDBT_ProgramExit(NDBT_FAILED);
2368 }
2369