1 /*
2    Copyright (c) 2006, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 #include <ndb_opts.h>
27 #include <NdbApi.hpp>
28 #include <NdbIndexStat.hpp>
29 #include <NdbTest.hpp>
30 #include <ndb_version.h>
31 #include <NDBT_Stats.hpp>
32 #include <math.h>
33 
34 #undef min
35 #undef max
36 #define min(a, b) ((a) <= (b) ? (a) : (b))
37 #define max(a, b) ((a) >= (b) ? (a) : (b))
38 
39 struct Opts {
40   int loglevel;
41   uint seed;
42   uint attrs;
43   uint loops;
44   uint rows;
45   uint ops;
46   uint nullkeys;
47   uint rpk;
48   uint rpkvar;
49   uint scanpct;
50   uint eqscans;
51   my_bool keeptable;
52   my_bool abort;
53   const char* dump;
OptsOpts54   Opts() :
55     loglevel(0),
56     seed(0),
57     attrs(3),
58     loops(1),
59     rows(10000),
60     ops(100),
61     nullkeys(10),
62     rpk(10),
63     rpkvar(10),
64     scanpct(10),
65     eqscans(30),
66     keeptable(false),
67     abort(false),
68     dump(0)
69   {}
70 };
71 
72 static Opts g_opts;
73 static uint g_loop = 0;
74 
75 static const char* g_tabname = "ts1";
76 static const char* g_indname = "ts1x1";
77 static const uint g_numattrs = 3;
78 static const uint g_charlen = 10;
79 static const char* g_csname = "latin1_swedish_ci";
80 static CHARSET_INFO* g_cs;
81 
82 // keys nullability
83 static const bool g_b_nullable = true;
84 static const bool g_c_nullable = true;
85 static const bool g_d_nullable = true;
86 
87 // value limits
88 struct Lim {
89   bool all_nullable;
90   uint b_min;
91   uint b_max;
92   const char* c_char;
93   uint d_min;
94   uint d_max;
95 };
96 
97 static Lim g_lim_val;
98 static Lim g_lim_bnd;
99 
100 static Ndb_cluster_connection* g_ncc = 0;
101 static Ndb* g_ndb = 0;
102 static Ndb* g_ndb_sys = 0;
103 static NdbDictionary::Dictionary* g_dic = 0;
104 static const NdbDictionary::Table* g_tab = 0;
105 static const NdbDictionary::Index* g_ind = 0;
106 static const NdbRecord* g_tab_rec = 0;
107 static const NdbRecord* g_ind_rec = 0;
108 
109 struct my_record
110 {
111   Uint8 m_null_bm;
112   Uint8 fill[3];
113   Uint32 m_a;
114   Uint32 m_b;
115   char m_c[1+g_charlen];
116   Uint16 m_d;
117 };
118 
119 static const Uint32 g_ndbrec_a_offset=offsetof(my_record, m_a);
120 static const Uint32 g_ndbrec_b_offset=offsetof(my_record, m_b);
121 static const Uint32 g_ndbrec_b_nb_offset=1;
122 static const Uint32 g_ndbrec_c_offset=offsetof(my_record, m_c);
123 static const Uint32 g_ndbrec_c_nb_offset=2;
124 static const Uint32 g_ndbrec_d_offset=offsetof(my_record, m_d);
125 static const Uint32 g_ndbrec_d_nb_offset=3;
126 static const Uint32 g_ndbrecord_bytes=sizeof(my_record);
127 
128 static NdbTransaction* g_con = 0;
129 static NdbOperation* g_op = 0;
130 static NdbScanOperation* g_scan_op = 0;
131 static NdbIndexScanOperation* g_rangescan_op = 0;
132 
133 static NdbIndexStat* g_is = 0;
134 static bool g_has_created_stat_tables = false;
135 static bool g_has_created_stat_events = false;
136 
137 static uint
urandom(uint m)138 urandom(uint m)
139 {
140   if (m == 0)
141     return 0;
142   uint r = (uint)rand();
143   r = r % m;
144   return r;
145 }
146 
147 static int& g_loglevel = g_opts.loglevel; // default log level
148 
149 #define chkdb(x) \
150   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; errdb(); if (g_opts.abort) abort(); return -1; } while (0)
151 
152 #define chker(x) \
153   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; ndbout << "errno: " << errno; if (g_opts.abort) abort(); return -1; } while (0)
154 
155 #define chkrc(x) \
156   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; if (g_opts.abort) abort(); return -1; } while (0)
157 
158 #define llx(n, x) \
159   do { if (likely(g_loglevel < n)) break; ndbout << x << endl; } while (0)
160 
161 #define ll0(x) llx(0, x)
162 #define ll1(x) llx(1, x)
163 #define ll2(x) llx(2, x)
164 #define ll3(x) llx(3, x)
165 
166 static void
errdb()167 errdb()
168 {
169   uint any = 0;
170   if (g_ncc != 0) {
171     NdbError e;
172     e.code = g_ncc->get_latest_error();
173     e.message = g_ncc->get_latest_error_msg();
174     if (e.code != 0)
175       ll0(++any << " ncc: error" << e);
176   }
177   if (g_ndb != 0) {
178     const NdbError& e = g_ndb->getNdbError();
179     if (e.code != 0)
180       ll0(++any << " ndb: error " << e);
181   }
182   if (g_dic != 0) {
183     const NdbError& e = g_dic->getNdbError();
184     if (e.code != 0)
185       ll0(++any << " dic: error " << e);
186   }
187   if (g_con != 0) {
188     const NdbError& e = g_con->getNdbError();
189     if (e.code != 0)
190       ll0(++any << " con: error " << e);
191   }
192   if (g_op != 0) {
193     const NdbError& e = g_op->getNdbError();
194     if (e.code != 0)
195       ll0(++any << " op: error " << e);
196   }
197   if (g_scan_op != 0) {
198     const NdbError& e = g_scan_op->getNdbError();
199     if (e.code != 0)
200       ll0(++any << " scan_op: error " << e);
201   }
202   if (g_rangescan_op != 0) {
203     const NdbError& e = g_rangescan_op->getNdbError();
204     if (e.code != 0)
205       ll0(++any << " rangescan_op: error " << e);
206   }
207   if (g_is != 0) {
208     const NdbIndexStat::Error& e = g_is->getNdbError();
209     if (e.code != 0)
210       ll0(++any << " stat: error " << e);
211   }
212   if (! any)
213     ll0("unknown db error");
214 }
215 
216 /* Methods to create NdbRecord structs for the table and index */
217 static int
createNdbRecords()218 createNdbRecords()
219 {
220   ll1("createNdbRecords");
221   const Uint32 numCols=4;
222   const Uint32 numIndexCols=3;
223   NdbDictionary::RecordSpecification recSpec[numCols];
224 
225   recSpec[0].column= g_tab->getColumn("a"); // 4 bytes
226   recSpec[0].offset= g_ndbrec_a_offset;
227   recSpec[0].nullbit_byte_offset= ~(Uint32)0;
228   recSpec[0].nullbit_bit_in_byte= ~(Uint32)0;
229 
230   recSpec[1].column= g_tab->getColumn("b"); // 4 bytes
231   recSpec[1].offset= g_ndbrec_b_offset;
232   if (g_b_nullable) {
233     recSpec[1].nullbit_byte_offset= 0;
234     recSpec[1].nullbit_bit_in_byte= g_ndbrec_b_nb_offset;
235   } else {
236     recSpec[1].nullbit_byte_offset= ~(Uint32)0;
237     recSpec[1].nullbit_bit_in_byte= ~(Uint32)0;
238   }
239 
240   recSpec[2].column= g_tab->getColumn("c"); // Varchar(10) -> ~12 bytes
241   recSpec[2].offset= g_ndbrec_c_offset;
242   if (g_c_nullable) {
243     recSpec[2].nullbit_byte_offset= 0;
244     recSpec[2].nullbit_bit_in_byte= g_ndbrec_c_nb_offset;
245   } else {
246     recSpec[2].nullbit_byte_offset= ~(Uint32)0;
247     recSpec[2].nullbit_bit_in_byte= ~(Uint32)0;
248   }
249 
250   recSpec[3].column= g_tab->getColumn("d"); // 2 bytes
251   recSpec[3].offset= g_ndbrec_d_offset;
252   if (g_d_nullable) {
253     recSpec[3].nullbit_byte_offset= 0;
254     recSpec[3].nullbit_bit_in_byte= g_ndbrec_d_nb_offset;
255   } else {
256     recSpec[3].nullbit_byte_offset= ~(Uint32)0;
257     recSpec[3].nullbit_bit_in_byte= ~(Uint32)0;
258   }
259 
260   g_dic = g_ndb->getDictionary();
261   g_tab_rec= g_dic->createRecord(g_tab,
262                                  &recSpec[0],
263                                  numCols,
264                                  sizeof(NdbDictionary::RecordSpecification),
265                                  0);
266 
267   chkdb(g_tab_rec != NULL);
268 
269   g_ind_rec= g_dic->createRecord(g_ind,
270                                  &recSpec[1],
271                                  numIndexCols,
272                                  sizeof(NdbDictionary::RecordSpecification),
273                                  0);
274 
275   chkdb(g_ind_rec != NULL);
276   g_dic = 0;
277 
278   return 0;
279 }
280 
281 // create table ts0 (
282 //   a int unsigned,
283 //   b int unsigned, c varchar(10), d smallint unsigned,
284 //   primary key using hash (a), index (b, c, d) )
285 
286 static int
createtable()287 createtable()
288 {
289   ll1("createtable");
290   NdbDictionary::Table tab(g_tabname);
291   tab.setLogging(false);
292   {
293     NdbDictionary::Column col("a");
294     col.setType(NdbDictionary::Column::Unsigned);
295     col.setPrimaryKey(true);
296     tab.addColumn(col);
297   }
298   {
299     NdbDictionary::Column col("b");
300     col.setType(NdbDictionary::Column::Unsigned);
301     col.setNullable(g_b_nullable);
302     tab.addColumn(col);
303   }
304   {
305     NdbDictionary::Column col("c");
306     col.setType(NdbDictionary::Column::Varchar);
307     col.setLength(g_charlen);
308     col.setCharset(g_cs);
309     col.setNullable(g_c_nullable);
310     tab.addColumn(col);
311   }
312   {
313     NdbDictionary::Column col("d");
314     col.setType(NdbDictionary::Column::Smallunsigned);
315     col.setNullable(g_d_nullable);
316     tab.addColumn(col);
317   }
318 
319   g_dic = g_ndb->getDictionary();
320   if (g_dic->getTable(g_tabname) != 0)
321     chkdb(g_dic->dropTable(g_tabname) == 0);
322   chkdb(g_dic->createTable(tab) == 0);
323   chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
324   g_dic = 0;
325   return 0;
326 }
327 
328 static int
createindex()329 createindex()
330 {
331   ll1("createindex");
332   NdbDictionary::Index ind(g_indname);
333   ind.setTable(g_tabname);
334   ind.setType(NdbDictionary::Index::OrderedIndex);
335   ind.setLogging(false);
336   ind.addColumnName("b");
337   ind.addColumnName("c");
338   ind.addColumnName("d");
339 
340   g_dic = g_ndb->getDictionary();
341   chkdb(g_dic->createIndex(ind) == 0);
342   chkdb((g_ind = g_dic->getIndex(g_indname, g_tabname)) != 0);
343   g_dic = 0;
344   return 0;
345 }
346 
347 static int
droptable()348 droptable()
349 {
350   ll1("droptable");
351   g_dic = g_ndb->getDictionary();
352   chkdb(g_dic->dropTable(g_tabname) == 0);
353   g_dic = 0;
354   return 0;
355 }
356 
357 // values for keys and bounds
358 
359 struct Val {
360   uint8 m_numattrs;
361   int8 b_null;
362   int8 c_null;
363   int8 d_null;
364   Uint32 b;
365   uchar c[1 + g_charlen];
366   Uint16 d;
367   Val();
368   void init();
369   void copy(const Val& val2);
370   void make(uint numattrs, const Lim& lim);
371   int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
372   void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
373 
374 private:
375   Val& operator=(const Val&);
376   Val(const Val&);
377 };
378 
379 static NdbOut&
operator <<(NdbOut & out,const Val & val)380 operator<<(NdbOut& out, const Val& val)
381 {
382   out << "[";
383   if (val.m_numattrs >= 1) {
384     if (val.b_null)
385       out << "NULL";
386     else
387       out << val.b;
388   }
389   if (val.m_numattrs >= 2) {
390     out << " ";
391     if (val.c_null)
392       out << "NULL";
393     else {
394       char buf[1 + g_charlen];
395       sprintf(buf, "%.*s", val.c[0], &val.c[1]);
396       out << "'" << buf << "'";
397     }
398   }
399   if (val.m_numattrs >= 3) {
400     out << " ";
401     if (val.d_null)
402       out <<" NULL";
403     else
404       out << val.d;
405   }
406   out << "]";
407   return out;
408 }
409 
Val()410 Val::Val()
411 {
412   init();
413 }
414 
415 void
init()416 Val::init()
417 {
418   m_numattrs = 0;
419   // junk rest
420   b_null = -1;
421   c_null = -1;
422   d_null = -1;
423   b = ~(Uint32)0;
424   memset(c, 0xff, sizeof(c));
425   d = ~(Uint16)0;
426 }
427 
428 void
copy(const Val & val2)429 Val::copy(const Val& val2)
430 {
431   require(this != &val2);
432   init();
433   m_numattrs = val2.m_numattrs;
434   if (m_numattrs >= 1) {
435     require(val2.b_null == 0 || val2.b_null == 1);
436     b_null = val2.b_null;
437     if (!b_null)
438       b = val2.b;
439   }
440   if (m_numattrs >= 2) {
441     require(val2.c_null == 0 || val2.c_null == 1);
442     c_null = val2.c_null;
443     if (!c_null)
444       memcpy(c, val2.c, sizeof(c));
445   }
446   if (m_numattrs >= 3) {
447     require(val2.d_null == 0 || val2.d_null == 1);
448     d_null = val2.d_null;
449     if (!d_null)
450       d = val2.d;
451   }
452 }
453 
454 void
make(uint numattrs,const Lim & lim)455 Val::make(uint numattrs, const Lim& lim)
456 {
457   require(numattrs <= g_numattrs);
458   if (numattrs >= 1) {
459     const bool nullable = g_b_nullable || lim.all_nullable;
460     if (nullable && urandom(100) < g_opts.nullkeys)
461       b_null = 1;
462     else {
463       require(lim.b_min <= lim.b_max);
464       b = lim.b_min + urandom(lim.b_max - lim.b_min + 1);
465       b_null = 0;
466     }
467   }
468   if (numattrs >= 2) {
469     const bool nullable = g_c_nullable || lim.all_nullable;
470     if (nullable && urandom(100) < g_opts.nullkeys)
471       c_null = 1;
472     else {
473       // prefer shorter
474       const uint len = urandom(urandom(g_charlen + 1) + 1);
475       c[0] = len;
476       for (uint j = 0; j < len; j++) {
477         uint k = urandom((uint)strlen(lim.c_char));
478         c[1 + j] = lim.c_char[k];
479       }
480       c_null = 0;
481     }
482   }
483   if (numattrs >= 3) {
484     const bool nullable = g_d_nullable || lim.all_nullable;
485     if (nullable && urandom(100) < g_opts.nullkeys)
486       d_null = 1;
487     else {
488       require(lim.d_min <= lim.d_max);
489       d = lim.d_min + urandom(lim.d_max - lim.d_min + 1);
490       d_null = 0;
491     }
492   }
493   m_numattrs = numattrs;
494 }
495 
496 int
cmp(const Val & val2,uint numattrs,uint * num_eq) const497 Val::cmp(const Val& val2, uint numattrs, uint* num_eq) const
498 {
499   require(numattrs <= m_numattrs);
500   require(numattrs <= val2.m_numattrs);
501   uint n = 0; // attr index where differs
502   uint k = 0;
503   if (k == 0 && numattrs >= 1) {
504     if (! b_null && ! val2.b_null) {
505       if (b < val2.b)
506         k = -1;
507       else if (b > val2.b)
508         k = +1;
509     } else if (! b_null) {
510       k = +1;
511     } else if (! val2.b_null) {
512       k = -1;
513     }
514     if (k == 0)
515       n++;
516   }
517   if (k == 0 && numattrs >= 2) {
518     if (! c_null && ! val2.c_null) {
519       const uchar* s1 = &c[1];
520       const uchar* s2 = &val2.c[1];
521       const uint l1 = (uint)c[0];
522       const uint l2 = (uint)val2.c[0];
523       require(l1 <= g_charlen && l2 <= g_charlen);
524       k = g_cs->coll->strnncollsp(g_cs, s1, l1, s2, l2, 0);
525     } else if (! c_null) {
526       k = +1;
527     } else if (! val2.c_null) {
528       k = -1;
529     }
530     if (k == 0)
531       n++;
532   }
533   if (k == 0 && numattrs >= 3) {
534     if (! d_null && ! val2.d_null) {
535       if (d < val2.d)
536         k = -1;
537       else if (d > val2.d)
538         k = +1;
539     } else if (! d_null) {
540       k = +1;
541     } else if (! val2.d_null) {
542       k = -1;
543     }
544     if (k == 0)
545       n++;
546   }
547   require(n <= numattrs);
548   if (num_eq != 0)
549     *num_eq = n;
550   return k;
551 }
552 
553 void
fromib(const NdbIndexScanOperation::IndexBound & ib,uint j)554 Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
555 {
556   const char* key = (j == 0 ? ib.low_key : ib.high_key);
557   const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
558   const Uint8 nullbits = *(const Uint8*)key;
559   require(numattrs <= g_numattrs);
560   if (numattrs >= 1) {
561     if (nullbits & (1 << g_ndbrec_b_nb_offset))
562       b_null = 1;
563     else {
564       memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b));
565       b_null = 0;
566     }
567   }
568   if (numattrs >= 2) {
569     if (nullbits & (1 << g_ndbrec_c_nb_offset))
570       c_null = 1;
571     else {
572       memcpy(c, &key[g_ndbrec_c_offset], sizeof(c));
573       c_null = 0;
574     }
575   }
576   if (numattrs >= 3) {
577     if (nullbits & (1 << g_ndbrec_d_nb_offset))
578       d_null = 1;
579     else {
580       memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d));
581       d_null = 0;
582     }
583   }
584   m_numattrs = numattrs;
585 }
586 
587 // index keys
588 
589 struct Key {
590   Val m_val;
591   int8 m_flag; // temp use
592   Key();
593 
594 private:
595   Key& operator=(const Key&);
596   Key(const Key&);
597 };
598 
599 static NdbOut&
operator <<(NdbOut & out,const Key & key)600 operator<<(NdbOut& out, const Key& key)
601 {
602   out << key.m_val;
603   if (key.m_flag != -1)
604     out << " flag: " << key.m_flag;
605   return out;
606 }
607 
Key()608 Key::Key()
609 {
610   m_flag = -1;
611 }
612 
613 static Key* g_keys = 0;
614 static uint* g_sortkeys = 0;
615 
616 static void
freekeys()617 freekeys()
618 {
619   delete [] g_keys;
620   delete [] g_sortkeys;
621   g_keys = 0;
622   g_sortkeys = 0;
623 }
624 
625 static void
allockeys()626 allockeys()
627 {
628   freekeys();
629   g_keys = new Key [g_opts.rows];
630   g_sortkeys = new uint [g_opts.rows];
631   require(g_keys != 0 && g_sortkeys != 0);
632   memset(g_sortkeys, 0xff, sizeof(uint) * g_opts.rows);
633 }
634 
635 static int
cmpkeys(const void * p1,const void * p2)636 cmpkeys(const void* p1, const void* p2)
637 {
638   const uint i1 = *(const uint*)p1;
639   const uint i2 = *(const uint*)p2;
640   require(i1 < g_opts.rows && i2 < g_opts.rows);
641   const Key& key1 = g_keys[i1];
642   const Key& key2 = g_keys[i2];
643   const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
644   return k;
645 }
646 
647 static void
sortkeys()648 sortkeys()
649 {
650   ll2("sortkeys");
651   uint i;
652 
653   // sort
654   for (i = 0; i < g_opts.rows; i++)
655     g_sortkeys[i] = i;
656   qsort(g_sortkeys, g_opts.rows, sizeof(uint), cmpkeys);
657 
658   // verify
659   uint unique = 1;
660   for (i = 1; i < g_opts.rows; i++) {
661     const uint i1 = g_sortkeys[i - 1];
662     const uint i2 = g_sortkeys[i];
663     require(i1 < g_opts.rows && i2 < g_opts.rows);
664     const Key& key1 = g_keys[i1];
665     const Key& key2 = g_keys[i2];
666     const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
667     require(k <= 0);
668     if (k < 0)
669       unique++;
670   }
671 
672   // show min max key
673   ll1("minkey:" << g_keys[g_sortkeys[0]]);
674   ll1("maxkey:" << g_keys[g_sortkeys[g_opts.rows - 1]]);
675   ll1("unique:" << unique);
676 }
677 
678 static void
makekeys()679 makekeys()
680 {
681   ll1("makekeys");
682 
683   uint initrows = g_opts.rows / g_opts.rpk;
684   require(initrows != 0);
685 
686   // distinct keys
687   uint i = 0;
688   while (i < initrows) {
689     Key& key = g_keys[i];
690     key.m_val.make(g_numattrs, g_lim_val);
691     i++;
692   }
693 
694   // remaining keys
695   while (i < g_opts.rows) {
696     // if rpkvar is 10, multiply rpk by number between 0.1 and 10.0
697     double a = (double)(1 + urandom(g_opts.rpkvar * g_opts.rpkvar));
698     double b = a / (double)g_opts.rpkvar;
699     double c = b * (double)g_opts.rpk;
700     const uint n = (uint)(c + 0.5);
701     // select random key to duplicate from initrows
702     const uint k = urandom(initrows);
703     uint j = 0;
704     while (i < g_opts.rows && j < n) {
705       g_keys[i].m_val.copy(g_keys[k].m_val);
706       j++;
707       i++;
708     }
709   }
710 
711   // shuffle
712   i = 0;
713   while (i < g_opts.rows) {
714     uint j = urandom(g_opts.rows);
715     if (i != j) {
716       Key tmp;
717       tmp.m_val.copy(g_keys[i].m_val);
718       g_keys[i].m_val.copy(g_keys[j].m_val);
719       g_keys[j].m_val.copy(tmp.m_val);
720     }
721     i++;
722   }
723 
724   // sort
725   sortkeys();
726 }
727 
728 // data loading
729 
730 static int
verifydata()731 verifydata()
732 {
733   ll3("verifydata");
734   chkdb((g_con = g_ndb->startTransaction()) != 0);
735   chkdb((g_scan_op = g_con->getNdbScanOperation(g_tab)) != 0);
736   chkdb(g_scan_op->readTuples(NdbScanOperation::LM_CommittedRead) == 0);
737   Uint32 a;
738   Val val;
739   val.m_numattrs = g_numattrs;
740   char* a_addr = (char*)&a;
741   char* b_addr = (char*)&val.b;
742   char* c_addr = (char*)val.c;
743   char* d_addr = (char*)&val.d;
744   Uint32 no = 0;
745   NdbRecAttr* b_ra;
746   NdbRecAttr* c_ra;
747   NdbRecAttr* d_ra;
748   chkdb(g_scan_op->getValue(no++, a_addr) != 0);
749   chkdb((b_ra = g_scan_op->getValue(no++, b_addr)) != 0);
750   chkdb((c_ra = g_scan_op->getValue(no++, c_addr)) != 0);
751   chkdb((d_ra = g_scan_op->getValue(no++, d_addr)) != 0);
752   chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
753   uint count = 0;
754   uint i;
755   for (i = 0; i < g_opts.rows; i++) {
756     Key& key = g_keys[i];
757     key.m_flag = false; // not scanned
758   }
759   while (1) {
760     int ret;
761     a = ~(Uint32)0;
762     chkdb((ret = g_scan_op->nextResult()) == 0 || ret == 1);
763     if (ret == 1)
764       break;
765     val.b_null = b_ra->isNULL();
766     val.c_null = c_ra->isNULL();
767     val.d_null = d_ra->isNULL();
768     require(val.b_null == 0 || (g_b_nullable && val.b_null == 1));
769     require(val.c_null == 0 || (g_c_nullable && val.c_null == 1));
770     require(val.d_null == 0 || (g_d_nullable && val.d_null == 1));
771     i = (uint)a;
772     chkrc(i < g_opts.rows);
773     Key& key = g_keys[i];
774     chkrc(key.m_val.cmp(val) == 0);
775     chkrc(key.m_flag == false);
776     key.m_flag = true;
777     count++;
778   }
779   g_ndb->closeTransaction(g_con);
780   g_con = 0;
781   g_scan_op = 0;
782   for (i = 0; i < g_opts.rows; i++) {
783     Key& key = g_keys[i];
784     chkrc(key.m_flag == true);
785     key.m_flag = -1; // forget
786   }
787   require(count == g_opts.rows);
788   ll3("verifydata: " << g_opts.rows << " rows");
789   return 0;
790 }
791 
792 static int
loaddata(bool update)793 loaddata(bool update)
794 {
795   ll1("loaddata: update: " << update);
796   const uint batch = 512;
797   chkdb((g_con = g_ndb->startTransaction()) != 0);
798   uint i = 0;
799   while (i < g_opts.rows) {
800     chkdb((g_op = g_con->getNdbOperation(g_tab)) != 0);
801     if (!update)
802       chkdb(g_op->insertTuple() == 0);
803     else
804       chkdb(g_op->updateTuple() == 0);
805     Uint32 a = i;
806     const Val& val = g_keys[i].m_val;
807     const char* a_addr = (const char*)&a;
808     const char* b_addr = ! val.b_null ? (const char*)&val.b : 0;
809     const char* c_addr = ! val.c_null ? (const char*)val.c : 0;
810     const char* d_addr = ! val.d_null ? (const char*)&val.d : 0;
811     Uint32 no = 0;
812     chkdb(g_op->equal(no++, a_addr) == 0);
813     chkdb(g_op->setValue(no++, b_addr) == 0);
814     chkdb(g_op->setValue(no++, c_addr) == 0);
815     chkdb(g_op->setValue(no++, d_addr) == 0);
816     if (i++ % batch == 0) {
817       chkdb(g_con->execute(NdbTransaction::Commit) == 0);
818       g_ndb->closeTransaction(g_con);
819       g_con = 0;
820       g_op = 0;
821       chkdb((g_con = g_ndb->startTransaction()) != 0);
822     }
823   }
824   chkdb(g_con->execute(NdbTransaction::Commit) == 0);
825   g_ndb->closeTransaction(g_con);
826   g_con = 0;
827   g_op = 0;
828 
829   // check data and cmp routines
830   chkrc(verifydata() == 0);
831 
832   for (uint i = 0; i < g_opts.rows; i++)
833     ll3("load " << i << ": " << g_keys[i]);
834   ll0("loaddata: " << g_opts.rows << " rows");
835   return 0;
836 }
837 
838 // bounds
839 
840 struct Bnd {
841   Val m_val;
842   /*
843    * A bound is a partial key value (0 to g_numattrs attributes).
844    * It is not equal to any key value.  Instead, it has a "side".
845    *
846    * side = 0 if the bound is empty
847    * side = -1 if the bound is "just before" its value
848    * side = +1 if the bound is "just after" its value
849    *
850    * This is another way of looking at strictness of non-empty
851    * start and end keys in a range.
852    *
853    * start key is strict if side = +1
854    * end key is strict if side = -1
855    *
856    * NDB API specifies strictness in the bound type of the last
857    * index attribute which is part of the start/end key.
858    *
859    * LE (0) - strict: n - side: -1
860    * LT (1) - strict: y - side: +1
861    * GE (2) - strict: n - side: +1
862    * GT (3) - strict: y - side: -1
863    *
864    * A non-empty bound divides keys into 2 disjoint subsets:
865    * keys before (cmp() == -1) and keys after (cmp() == +1).
866    */
867   int8 m_side;
868   int8 m_lohi; // 0-lo 1-hi as part of Rng
869   Bnd();
870   bool isempty() const;
871   void copy(const Bnd& bnd2); // does not copy m_lohi
872   Bnd& make(uint minattrs);
873   Bnd& make(uint minattrs, const Val& theval);
874   int cmp(const Key& key) const;
875   int cmp(const Bnd& bnd2);
876   int type(uint colno) const; // for setBound
877   void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
878 
879 private:
880   Bnd& operator=(const Bnd&);
881   Bnd(const Bnd&);
882 };
883 
884 static NdbOut&
operator <<(NdbOut & out,const Bnd & bnd)885 operator<<(NdbOut& out, const Bnd& bnd)
886 {
887   if (bnd.m_lohi == 0)
888     out << "L";
889   else if (bnd.m_lohi == 1)
890     out << "H";
891   else
892     out << bnd.m_lohi << "?";
893   out << bnd.m_val;
894   if (bnd.m_side == 0)
895     ;
896   else if (bnd.m_side == -1)
897     out << "-";
898   else if (bnd.m_side == +1)
899     out << "+";
900   return out;
901 }
902 
Bnd()903 Bnd::Bnd()
904 {
905   m_side = 0;
906   m_lohi = -1;
907 }
908 
909 bool
isempty() const910 Bnd::isempty() const
911 {
912   return m_val.m_numattrs == 0;
913 }
914 
915 void
copy(const Bnd & bnd2)916 Bnd::copy(const Bnd& bnd2)
917 {
918   m_val.copy(bnd2.m_val);
919   m_side = bnd2.m_side;
920 }
921 
922 Bnd&
make(uint minattrs)923 Bnd::make(uint minattrs)
924 {
925   require(minattrs <= g_opts.attrs);
926   require(m_lohi == 0 || m_lohi == 1);
927   uint numattrs = minattrs + urandom(g_numattrs - minattrs + 1);
928   m_val.make(numattrs, g_lim_bnd);
929   m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
930   return *this;
931 }
932 
933 Bnd&
make(uint minattrs,const Val & theval)934 Bnd::make(uint minattrs, const Val& theval)
935 {
936   uint numattrs = minattrs + urandom(g_numattrs - minattrs);
937   m_val.copy(theval);
938   m_val.m_numattrs = numattrs;
939   m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
940   return *this;
941 }
942 
943 int
cmp(const Key & key) const944 Bnd::cmp(const Key& key) const
945 {
946   int place; // debug
947   int ret;
948   do {
949     int k = key.m_val.cmp(m_val, m_val.m_numattrs);
950     if (k != 0) {
951       place = 1;
952       ret = k;
953       break;
954     }
955     if (m_side != 0) {
956       place = 2;
957       ret = (-1) * m_side;
958       break;
959     }
960     place = 3;
961     ret = 0;
962     require(m_val.m_numattrs == 0);
963   } while (0);
964   ll3("bnd: " << *this << " cmp key: " << key
965       << " ret: " << ret << " place: " << place);
966   return ret;
967 }
968 
969 int
cmp(const Bnd & bnd2)970 Bnd::cmp(const Bnd& bnd2)
971 {
972   int place; // debug
973   int ret;
974   const Bnd& bnd1 = *this;
975   const Val& val1 = bnd1.m_val;
976   const Val& val2 = bnd2.m_val;
977   const uint numattrs1 = val1.m_numattrs;
978   const uint numattrs2 = val2.m_numattrs;
979   const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2);
980   do {
981     int k = val1.cmp(val2, n);
982     if (k != 0) {
983       place = 1;
984       ret = k;
985       break;
986     }
987     if (numattrs1 < numattrs2) {
988       place = 2;
989       ret = (+1) * bnd1.m_side;
990       break;
991     }
992     if (numattrs1 > numattrs2) {
993       place = 3;
994       ret = (-1) * bnd1.m_side;
995       break;
996     }
997     if (bnd1.m_side < bnd2.m_side) {
998       place = 4;
999       ret = -1;
1000       break;
1001     }
1002     if (bnd1.m_side > bnd2.m_side) {
1003       place = 5;
1004       ret = +1;
1005       break;
1006     }
1007     place = 6;
1008     ret = 0;
1009   } while (0);
1010   ll3("bnd: " << *this << " cmp bnd: " << bnd2
1011       << " ret: " << ret << " place: " << place);
1012   return ret;
1013 }
1014 
1015 int
type(uint colno) const1016 Bnd::type(uint colno) const
1017 {
1018   int t;
1019   require(colno < m_val.m_numattrs && (m_side == -1 || m_side == +1));
1020   require(m_lohi == 0 || m_lohi == 1);
1021   if (m_lohi == 0) {
1022     if (colno + 1 < m_val.m_numattrs)
1023       t = 0; // LE
1024     else if (m_side == -1)
1025       t = 0; // LE
1026     else
1027       t = 1; // LT
1028   } else {
1029     if (colno + 1 < m_val.m_numattrs)
1030       t = 2; // GE
1031     else if (m_side == +1)
1032       t = 2; // GE
1033     else
1034       t = 3; // GT
1035   }
1036   return t;
1037 }
1038 
1039 void
fromib(const NdbIndexScanOperation::IndexBound & ib,uint j)1040 Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
1041 {
1042   Val& val = m_val;
1043   val.fromib(ib, j);
1044   const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
1045   const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive);
1046   if (numattrs == 0) {
1047     m_side = 0;
1048   } else {
1049     m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1));
1050   }
1051   m_lohi = j;
1052 }
1053 
1054 // stats values
1055 
1056 struct Stval {
1057   Uint32 rir_v2;
1058   double rir;
1059   double rpk[g_numattrs];
1060   bool empty;
1061   char rule[NdbIndexStat::RuleBufferBytes];
1062   Stval();
1063 };
1064 
1065 static NdbOut&
operator <<(NdbOut & out,const Stval & st)1066 operator<<(NdbOut& out, const Stval& st)
1067 {
1068   out << "rir_v2: " << st.rir_v2;
1069   out << " rir_v4: " << st.rir;
1070   out << " rpk:[ ";
1071   for (uint k = 0; k < g_opts.attrs; k++) {
1072     if (k != 0)
1073       out << " ";
1074     out << st.rpk[k];
1075   }
1076   out << " ]";
1077   out << " " << (st.empty ? "E" : "N");
1078   out << " " << st.rule;
1079   return out;
1080 }
1081 
Stval()1082 Stval::Stval()
1083 {
1084   rir_v2 = 0;
1085   rir = 0.0;
1086   for (uint k = 0; k < g_numattrs; k++)
1087     rpk[k] = 0.0;
1088   empty = false;
1089   strcpy(rule, "-");
1090 }
1091 
1092 // ranges
1093 
1094 struct Rng {
1095   Bnd m_bnd[2];
1096   Int32 m_rowcount;
1097   // stats v2
1098   double errpct;
1099   // stats v4
1100   Stval m_st_scan; // exact stats computed from keys in range
1101   Stval m_st_stat; // interpolated kernel stats via g_is
1102   Rng();
1103   uint minattrs() const;
1104   uint maxattrs() const;
1105   bool iseq() const;
1106   bool isempty() const;
1107   void copy(const Rng& rng2);
1108   int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
1109   uint rowcount() const;
1110   void fromib(const NdbIndexScanOperation::IndexBound& ib);
1111 
1112 private:
1113   Rng& operator=(const Rng&);
1114   Rng(const Rng&);
1115 };
1116 
1117 static NdbOut&
operator <<(NdbOut & out,const Rng & rng)1118 operator<<(NdbOut& out, const Rng& rng)
1119 {
1120   out << rng.m_bnd[0] << " " << rng.m_bnd[1];
1121   if (rng.m_rowcount != -1)
1122     out << " rows: " << rng.m_rowcount;
1123   return out;
1124 }
1125 
Rng()1126 Rng::Rng()
1127 {
1128   m_bnd[0].m_lohi = 0;
1129   m_bnd[1].m_lohi = 1;
1130   m_rowcount = -1;
1131 }
1132 
1133 uint
minattrs() const1134 Rng::minattrs() const
1135 {
1136   return min(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
1137 }
1138 
1139 uint
maxattrs() const1140 Rng::maxattrs() const
1141 {
1142   return max(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
1143 }
1144 
1145 bool
iseq() const1146 Rng::iseq() const
1147 {
1148   return
1149     minattrs() == maxattrs() &&
1150     m_bnd[0].m_val.cmp(m_bnd[1].m_val, minattrs()) == 0 &&
1151     m_bnd[0].m_side < m_bnd[1].m_side;
1152 }
1153 
1154 bool
isempty() const1155 Rng::isempty() const
1156 {
1157   return m_bnd[0].isempty() && m_bnd[1].isempty();
1158 }
1159 
1160 void
copy(const Rng & rng2)1161 Rng::copy(const Rng& rng2)
1162 {
1163   m_bnd[0].copy(rng2.m_bnd[0]);
1164   m_bnd[1].copy(rng2.m_bnd[1]);
1165   m_rowcount = rng2.m_rowcount;
1166 }
1167 
1168 int
cmp(const Key & key) const1169 Rng::cmp(const Key& key) const
1170 {
1171   int place; // debug
1172   int ret;
1173   do  {
1174     int k;
1175     k = m_bnd[0].cmp(key);
1176     if (k < 0) {
1177       place = 1;
1178       ret = -1;
1179       break;
1180     }
1181     k = m_bnd[1].cmp(key);
1182     if (k > 0) {
1183       place = 2;
1184       ret = +1;
1185       break;
1186     }
1187     place = 3;
1188     ret = 0;
1189   } while (0);
1190   ll3("rng: " << *this << " cmp key: " << key
1191       << " ret: " << ret << " place: " << place);
1192   return ret;
1193 }
1194 
1195 uint
rowcount() const1196 Rng::rowcount() const
1197 {
1198   ll3("rowcount: " << *this);
1199   int i;
1200   // binary search for first and last in range
1201   int lim[2];
1202   for (i = 0; i <= 1; i++) {
1203     ll3("search i=" << i);
1204     int lo = -1;
1205     int hi = (int)g_opts.rows;
1206     int ret;
1207     int j;
1208     do {
1209       j = (hi + lo) / 2;
1210       require(lo < j && j < hi);
1211       ret = cmp(g_keys[g_sortkeys[j]]);
1212       if (i == 0) {
1213         if (ret < 0)
1214           lo = j;
1215         else
1216           hi = j;
1217       } else {
1218         if (ret > 0)
1219           hi = j;
1220         else
1221           lo = j;
1222       }
1223     } while (hi - lo > 1);
1224     if (ret == 0)
1225       lim[i] = j;
1226     else if (i == 0)
1227       lim[i] = hi;
1228     else
1229       lim[i] = lo;
1230   }
1231 
1232   // verify is expensive due to makeranges() multiple tries
1233   const bool verify = (urandom(10) == 0);
1234   const int lo = max(lim[0], 0);
1235   const int hi = min(lim[1], (int)g_opts.rows - 1);
1236   if (verify) {
1237     int pos = -1; // before, within, after
1238     for (i = 0; i < (int)g_opts.rows; i++) {
1239       int k = cmp(g_keys[g_sortkeys[i]]);
1240       if (k < 0)
1241         require(i < lo);
1242       else if (k == 0)
1243         require(lo <= i && i <= hi);
1244       else
1245         require(i > hi);
1246       require(pos <= k);
1247       if (pos < k)
1248         pos = k;
1249     }
1250   }
1251 
1252   // result
1253   require(hi - lo + 1 >= 0);
1254   uint count = hi - lo + 1;
1255   ll3("rowcount: " << count << " lim: " << lim[0] << " " << lim[1]);
1256   return count;
1257 }
1258 
1259 void
fromib(const NdbIndexScanOperation::IndexBound & ib)1260 Rng::fromib(const NdbIndexScanOperation::IndexBound& ib)
1261 {
1262   for (uint j = 0; j <= 1; j++) {
1263     Bnd& bnd = m_bnd[j];
1264     bnd.fromib(ib, j);
1265   }
1266 }
1267 
1268 static Rng* g_rnglist = 0;
1269 
1270 static void
freeranges()1271 freeranges()
1272 {
1273   delete [] g_rnglist;
1274   g_rnglist = 0;
1275 }
1276 
1277 static void
allocranges()1278 allocranges()
1279 {
1280   freeranges();
1281   g_rnglist = new Rng [g_opts.ops];
1282   require(g_rnglist != 0);
1283 }
1284 
1285 static void
makeranges()1286 makeranges()
1287 {
1288   ll1("makeranges");
1289   const uint mintries = 20;
1290   const uint maxtries = 80;
1291   const uint fudgefac = 10;
1292 
1293   for (uint i = 0; i < g_opts.ops; i++) {
1294     const bool eqpart = (urandom(100) < g_opts.eqscans);
1295     const bool eqfull = eqpart && (urandom(100) < g_opts.eqscans);
1296     Rng rng; // candidate
1297     uint j;
1298     for (j = 0; j < maxtries; j++) {
1299       Rng rng2;
1300       if (!eqpart) {
1301         rng2.m_bnd[0].make(0);
1302         rng2.m_bnd[1].make(0);
1303       } else {
1304         const uint mincnt = eqfull ? g_opts.attrs : 1;
1305         rng2.m_bnd[0].make(mincnt);
1306         rng2.m_bnd[1].copy(rng2.m_bnd[0]);
1307         rng2.m_bnd[0].m_side = -1;
1308         rng2.m_bnd[1].m_side = +1;
1309         require(rng2.iseq());
1310       }
1311       rng2.m_rowcount = (Int32)rng2.rowcount();
1312       // 0-discard 1-replace or accept 2-accept
1313       int action = 0;
1314       do {
1315         // first candidate
1316         if (rng.m_rowcount == -1) {
1317           action = 1;
1318           break;
1319         }
1320         require(rng.m_rowcount != -1);
1321         // prefer some bounds
1322         if (rng2.isempty()) {
1323           if (urandom(fudgefac) != 0)
1324             action = 0;
1325           else
1326             action = 1;
1327           break;
1328         }
1329         // prefer some rows
1330         if (rng2.m_rowcount == 0) {
1331           action = 0;
1332           break;
1333         }
1334         // accept if row count under given pct
1335         require((uint)rng2.m_rowcount <= g_opts.rows);
1336         if (100 * (uint)rng2.m_rowcount <= g_opts.scanpct * g_opts.rows) {
1337           if (urandom(fudgefac) != 0) {
1338             action = 2;
1339             break;
1340           }
1341         }
1342         // replace if less rows
1343         if (rng2.m_rowcount < rng.m_rowcount) {
1344           if (urandom(fudgefac) != 0) {
1345             action = 1;
1346             break;
1347           }
1348         }
1349       } while (0);
1350       if (action != 0) {
1351         rng.copy(rng2);
1352         if (action == 2 || j >= mintries)
1353           break;
1354       }
1355     }
1356     g_rnglist[i].copy(rng);
1357     ll2("rng " << i << ": " << rng << " tries: " << j);
1358   }
1359 }
1360 
1361 // verify ranges via range scans
1362 
1363 static int
setbounds(const Rng & rng)1364 setbounds(const Rng& rng)
1365 {
1366   // currently must do each attr in order
1367   ll3("setbounds: " << rng);
1368   uint i;
1369   const Bnd (&bnd)[2] = rng.m_bnd;
1370   for (i = 0; i < g_numattrs; i++) {
1371     const Uint32 no = i; // index attribute number
1372     uint j;
1373     int type[2] = { -1, -1 };
1374     // determine inclusivity (boundtype) of upper+lower bounds on this col.
1375     // -1 == no bound on the col.
1376     for (j = 0; j <= 1; j++) {
1377       if (no < bnd[j].m_val.m_numattrs)
1378         type[j] = bnd[j].type(no);
1379     }
1380     for (j = 0; j <= 1; j++) {
1381       int t = type[j];
1382       if (t == -1)
1383         continue;
1384       if (no + 1 < bnd[j].m_val.m_numattrs)
1385         t &= ~(uint)1; // strict bit is set on last bound only
1386       const Val& val = bnd[j].m_val;
1387       const void* addr = 0;
1388       if (no == 0)
1389         addr = ! val.b_null ? (const void*)&val.b : 0;
1390       else if (no == 1)
1391         addr = ! val.c_null ? (const void*)val.c : 0;
1392       else if (no == 2)
1393         addr = ! val.d_null ? (const void*)&val.d : 0;
1394       else
1395         require(false);
1396       ll3("setBound attr:" << no << " type:" << t << " val: " << val);
1397       chkdb(g_rangescan_op->setBound(no, t, addr) == 0);
1398     }
1399   }
1400   return 0;
1401 }
1402 
1403 static int
scanrange(const Rng & rng)1404 scanrange(const Rng& rng)
1405 {
1406   ll3("scanrange: " << rng);
1407   chkdb((g_con = g_ndb->startTransaction()) != 0);
1408   chkdb((g_rangescan_op = g_con->getNdbIndexScanOperation(g_ind, g_tab)) != 0);
1409   chkdb(g_rangescan_op->readTuples() == 0);
1410   chkrc(setbounds(rng) == 0);
1411   Uint32 a;
1412   char* a_addr = (char*)&a;
1413   Uint32 no = 0;
1414   chkdb(g_rangescan_op->getValue(no++, a_addr) != 0);
1415   chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
1416   uint count = 0;
1417   uint i;
1418   for (i = 0; i < g_opts.rows; i++) {
1419     Key& key = g_keys[i];
1420     key.m_flag = false; // not scanned
1421   }
1422   while (1) {
1423     int ret;
1424     a = ~(Uint32)0;
1425     chkdb((ret = g_rangescan_op->nextResult()) == 0 || ret == 1);
1426     if (ret == 1)
1427       break;
1428     i = (uint)a;
1429     chkrc(i < g_opts.rows);
1430     Key& key = g_keys[i];
1431     ll3("scan: " << key);
1432     int k = rng.cmp(key);
1433     chkrc(k == 0);
1434     chkrc(key.m_flag == false);
1435     key.m_flag = true;
1436     count++;
1437   }
1438   g_ndb->closeTransaction(g_con);
1439   g_con = 0;
1440   g_rangescan_op = 0;
1441 
1442   for (i = 0; i < g_opts.rows; i++) {
1443     Key& key = g_keys[i];
1444     int k = rng.cmp(key);
1445     if (k != 0) // not in range
1446       chkrc(key.m_flag == false);
1447     else
1448       chkrc(key.m_flag == true);
1449     key.m_flag = -1; // forget
1450   }
1451   require((uint)rng.m_rowcount == count);
1452   return 0;
1453 }
1454 
1455 static int
scanranges()1456 scanranges()
1457 {
1458   ll1("scanranges");
1459   for (uint i = 0; i < g_opts.ops; i++) {
1460     const Rng& rng = g_rnglist[i];
1461     chkrc(scanrange(rng) == 0);
1462   }
1463   return 0;
1464 }
1465 
1466 // stats v4 update
1467 
1468 static int
definestat()1469 definestat()
1470 {
1471   ll1("definestat");
1472   require(g_is != 0 && g_ind != 0 && g_tab != 0);
1473   chkdb(g_is->set_index(*g_ind, *g_tab) == 0);
1474   return 0;
1475 }
1476 
1477 static int
updatestat()1478 updatestat()
1479 {
1480   ll1("updatestat");
1481   if (urandom(2) == 0) {
1482     g_dic = g_ndb->getDictionary();
1483     chkdb(g_dic->updateIndexStat(*g_ind, *g_tab) == 0);
1484     g_dic = 0;
1485   } else {
1486     chkdb(g_is->update_stat(g_ndb_sys) == 0);
1487   }
1488   return 0;
1489 }
1490 
1491 static int
readstat()1492 readstat()
1493 {
1494   ll1("readstat");
1495 
1496   NdbIndexStat::Head head;
1497   chkdb(g_is->read_head(g_ndb_sys) == 0);
1498   g_is->get_head(head);
1499   chkrc(head.m_found == true);
1500   chkrc(head.m_sampleVersion != 0);
1501   ll1("readstat:"
1502       << " sampleVersion: " << head.m_sampleVersion
1503       << " sampleCount: " << head.m_sampleCount);
1504 
1505   NdbIndexStat::CacheInfo infoQuery;
1506   chkdb(g_is->read_stat(g_ndb_sys) == 0);
1507   g_is->move_cache();
1508   g_is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
1509   ll1("readstat: cache bytes: " << infoQuery.m_totalBytes);
1510   return 0;
1511 }
1512 
1513 // test polling after updatestat
1514 
1515 static int
startlistener()1516 startlistener()
1517 {
1518   ll1("startlistener");
1519   chkdb(g_is->create_listener(g_ndb_sys) == 0);
1520   chkdb(g_is->execute_listener(g_ndb_sys) == 0);
1521   return 0;
1522 }
1523 
1524 static int
runlistener()1525 runlistener()
1526 {
1527   ll1("runlistener");
1528   int ret;
1529   chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1);
1530   chkrc(ret == 1);
1531   // one event is expected
1532   chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
1533   chkrc(ret == 1);
1534   chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
1535   chkrc(ret == 0);
1536   return 0;
1537 }
1538 
1539 static int
stoplistener()1540 stoplistener()
1541 {
1542   ll1("stoplistener");
1543   chkdb(g_is->drop_listener(g_ndb_sys) != -1);
1544   return 0;
1545 }
1546 
1547 // stats queries
1548 
1549 // exact stats from scan results
1550 static void
queryscan(Rng & rng)1551 queryscan(Rng& rng)
1552 {
1553   ll3("queryscan");
1554 
1555   uint rir;
1556   uint unq[g_numattrs];
1557   rir = 0;
1558   for (uint k = 0; k < g_opts.attrs; k++)
1559     unq[0] = 0;
1560   Key prevkey;
1561   for (uint i = 0; i < g_opts.rows; i++) {
1562     const Key& key = g_keys[g_sortkeys[i]];
1563     int res = rng.cmp(key);
1564     if (res != 0)
1565       continue;
1566     rir++;
1567     if (rir == 1) {
1568       for (uint k = 0; k < g_opts.attrs; k++)
1569         unq[k] = 1;
1570     } else {
1571       uint num_eq = ~0;
1572       int res = prevkey.m_val.cmp(key.m_val, g_opts.attrs, &num_eq);
1573       if (res == 0)
1574         require(num_eq == g_opts.attrs);
1575       else {
1576         require(res < 0);
1577         require(num_eq < g_opts.attrs);
1578         unq[num_eq]++;
1579         // propagate down
1580         for (uint k = num_eq + 1; k < g_opts.attrs; k++)
1581           unq[k]++;
1582       }
1583     }
1584     prevkey.m_val.copy(key.m_val);
1585   }
1586   require(rng.m_rowcount != -1);
1587   require((uint)rng.m_rowcount == rir);
1588 
1589   Stval& st = rng.m_st_scan;
1590   st.rir_v2 = rir;
1591   st.rir = rir == 0 ? 1.0 : (double)rir;
1592   for (uint k = 0; k < g_opts.attrs; k++) {
1593     if (rir == 0)
1594       st.rpk[k] = 1.0;
1595     else {
1596       require(rir >= unq[k]);
1597       require(unq[k] != 0);
1598       st.rpk[k] = (double)rir / (double)unq[k];
1599     }
1600   }
1601   st.empty = (rir == 0);
1602   ll2("queryscan: " << st);
1603 }
1604 
1605 /* This method initialises the passed in IndexBound
1606  * to represent the range passed in.
1607  * It assumes that the storage pointed to by low_key
1608  * and high_key in the passed IndexBound can be overwritten
1609  * and is long enough to store the data
1610  */
1611 static int
initialiseIndexBound(const Rng & rng,NdbIndexScanOperation::IndexBound & ib,my_record * low_key,my_record * high_key)1612 initialiseIndexBound(const Rng& rng,
1613                      NdbIndexScanOperation::IndexBound& ib,
1614                      my_record* low_key, my_record* high_key)
1615 {
1616   ll3("initialiseIndexBound: " << rng);
1617   uint i;
1618   const Bnd (&bnd)[2] = rng.m_bnd;
1619   Uint32 colsInBound[2]= {0, 0};
1620   bool boundInclusive[2]= {false, false};
1621 
1622   memset(&ib, 0xf1, sizeof(ib));
1623   memset(low_key, 0xf2, sizeof(*low_key));
1624   memset(high_key, 0xf3, sizeof(*high_key));
1625 
1626   // Clear nullbit storage
1627   low_key->m_null_bm = 0;
1628   high_key->m_null_bm = 0;
1629 
1630   for (i = 0; i < g_numattrs; i++) {
1631     const Uint32 no = i; // index attribute number
1632     uint j;
1633     int type[2] = { -1, -1 };
1634     // determine inclusivity (boundtype) of upper+lower bounds on this col.
1635     // -1 == no bound on the col.
1636     for (j = 0; j <= 1; j++) {
1637       if (no < bnd[j].m_val.m_numattrs)
1638         type[j] = bnd[j].type(no);
1639     }
1640     for (j = 0; j <= 1; j++) {
1641       /* Get ptr to key storage space for this bound */
1642       my_record* keyBuf= (j==0) ? low_key : high_key;
1643       int t = type[j];
1644       if (t == -1)
1645         continue;
1646       colsInBound[j]++;
1647 
1648       if (no + 1 >= bnd[j].m_val.m_numattrs)
1649         // Last column in bound, inclusive if GE or LE (or EQ)
1650         // i.e. bottom bit of boundtype is clear
1651         boundInclusive[j]= !(t & 1);
1652 
1653       const Val& val = bnd[j].m_val;
1654       if (no == 0)
1655       {
1656         if (! val.b_null)
1657           keyBuf->m_b= val.b;
1658 
1659         if (g_b_nullable)
1660           keyBuf->m_null_bm |= ((val.b_null?1:0) << g_ndbrec_b_nb_offset);
1661       }
1662       else if (no == 1)
1663       {
1664         if (! val.c_null)
1665           memcpy(&keyBuf->m_c[0], (const void*)&val.c, 1+ g_charlen);
1666 
1667         if (g_c_nullable)
1668           keyBuf->m_null_bm |= ((val.c_null?1:0) << g_ndbrec_c_nb_offset);
1669       }
1670       else if (no == 2)
1671       {
1672         if (! val.d_null)
1673           keyBuf->m_d= val.d;
1674 
1675         if (g_d_nullable)
1676           keyBuf->m_null_bm |= ((val.d_null?1:0) << g_ndbrec_d_nb_offset);
1677       }
1678       else
1679         require(false);
1680       ll3("initialiseIndexBound attr:" << no << " type:" << t << " val: " << val);
1681     }
1682   }
1683 
1684   /* Now have everything we need to initialise the IndexBound */
1685   ib.low_key = (char*)low_key;
1686   ib.low_key_count= colsInBound[0];
1687   ib.low_inclusive= boundInclusive[0];
1688   ib.high_key = (char*)high_key;
1689   ib.high_key_count= colsInBound[1];
1690   ib.high_inclusive= boundInclusive[1];
1691   ib.range_no= 0;
1692 
1693   ll3(" indexBound low_key_count=" << ib.low_key_count <<
1694       " low_inc=" << ib.low_inclusive <<
1695       " high_key_count=" << ib.high_key_count <<
1696       " high_inc=" << ib.high_inclusive);
1697   ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
1698       " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
1699       " first byte=" << ib.low_key[0]);
1700   ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
1701       " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
1702       " first byte=" << ib.high_key[0]);
1703 
1704   // verify by reverse
1705   {
1706     Rng rng;
1707     rng.fromib(ib);
1708     require(rng.m_bnd[0].cmp(bnd[0]) == 0);
1709     require(rng.m_bnd[1].cmp(bnd[1]) == 0);
1710   }
1711   return 0;
1712 }
1713 
1714 static int
querystat_v2(Rng & rng)1715 querystat_v2(Rng& rng)
1716 {
1717   ll3("querystat_v2");
1718 
1719   /* Create IndexBound and key storage space */
1720   NdbIndexScanOperation::IndexBound ib;
1721   my_record low_key;
1722   my_record high_key;
1723 
1724   chkdb((g_con = g_ndb->startTransaction()) != 0);
1725   chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
1726 
1727   Uint64 count = ~(Uint64)0;
1728   chkdb(g_is->records_in_range(g_ind,
1729                                  g_con,
1730                                  g_ind_rec,
1731                                  g_tab_rec,
1732                                  &ib,
1733                                  0,
1734                                  &count,
1735                                  0) == 0);
1736   g_ndb->closeTransaction(g_con);
1737   g_con = 0;
1738   g_rangescan_op = 0;
1739 
1740   Stval& st = rng.m_st_stat;
1741   chkrc(count < (1 << 30));
1742   st.rir_v2 = (Uint32)count;
1743   ll2("querystat_v2: " << st.rir_v2 << " rows");
1744   return 0;
1745 }
1746 
1747 static int
querystat(Rng & rng)1748 querystat(Rng& rng)
1749 {
1750   ll3("querystat");
1751 
1752   // set up range
1753   Uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
1754   Uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
1755   NdbIndexStat::Bound bound_lo(g_is, bound_lo_buffer);
1756   NdbIndexStat::Bound bound_hi(g_is, bound_hi_buffer);
1757   NdbIndexStat::Range range(bound_lo, bound_hi);
1758 
1759   // convert to IndexBound (like in mysqld)
1760   NdbIndexScanOperation::IndexBound ib;
1761   my_record low_key;
1762   my_record high_key;
1763   chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
1764   chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
1765 
1766   // index stat query
1767   Uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
1768   NdbIndexStat::Stat stat(stat_buffer);
1769   chkdb(g_is->query_stat(range, stat) == 0);
1770 
1771   // save result
1772   Stval& st = rng.m_st_stat;
1773   g_is->get_rir(stat, &st.rir);
1774   for (uint k = 0; k < g_opts.attrs; k++) {
1775     g_is->get_rpk(stat, k, &st.rpk[k]);
1776   }
1777   g_is->get_empty(stat, &st.empty);
1778   g_is->get_rule(stat, st.rule);
1779 
1780   ll2("querystat: " << st);
1781   return 0;
1782 }
1783 
1784 static int
queryranges()1785 queryranges()
1786 {
1787   ll2("queryranges");
1788   for (uint i = 0; i < g_opts.ops; i++) {
1789     Rng& rng = g_rnglist[i];
1790     ll1("rng " << i << ": " << rng);
1791     // exact stats
1792     queryscan(rng);
1793     // interpolated stats
1794     chkrc(querystat_v2(rng) == 0);
1795     chkrc(querystat(rng) == 0);
1796     const Stval& st1 = rng.m_st_scan;
1797     const Stval& st2 = rng.m_st_stat;
1798     // if rir v2 is zero then it is exact
1799     chkrc(st2.rir_v2 != 0 || st1.rir_v2 == 0);
1800   }
1801   return 0;
1802 }
1803 
1804 // general statistics methods
1805 
1806 struct Stats : public NDBT_Stats {
1807   Stats();
1808   void add(double x2);
1809   void add(const Stats& sum2);
1810 };
1811 
1812 static NdbOut&
operator <<(NdbOut & out,const Stats & st)1813 operator<<(NdbOut& out, const Stats& st)
1814 {
1815   out << "count: " << st.getCount()
1816       << " min: " << st.getMin()
1817       << " max: " << st.getMax()
1818       << " mean: " << st.getMean()
1819       << " stddev: " << st.getStddev();
1820   return out;
1821 }
1822 
Stats()1823 Stats::Stats()
1824 {
1825 }
1826 
1827 void
add(double x2)1828 Stats::add(double x2)
1829 {
1830   addObservation(x2);
1831 }
1832 
1833 void
add(const Stats & st2)1834 Stats::add(const Stats& st2)
1835 {
1836   *this += st2;
1837 }
1838 
1839 // error statistics scan vs stat
1840 
1841 struct Sterr {
1842   Stats rir_v2;
1843   Stats rir;
1844   Stats rpk[g_numattrs];
1845   Sterr();
1846   void add(const Sterr& st2);
1847 };
1848 
1849 static NdbOut&
operator <<(NdbOut & out,const Sterr & st)1850 operator<<(NdbOut& out, const Sterr& st)
1851 {
1852   out << "rir_v2: " << st.rir_v2 << endl;
1853   out << "rir_v4: " << st.rir;
1854   for (uint k = 0; k < g_opts.attrs; k++) {
1855     out << endl << "rpk[" << k << "]: " << st.rpk[k];
1856   }
1857   return out;
1858 }
1859 
Sterr()1860 Sterr::Sterr()
1861 {
1862 }
1863 
1864 void
add(const Sterr & st2)1865 Sterr::add(const Sterr& st2)
1866 {
1867   rir_v2.add(st2.rir_v2);
1868   rir.add(st2.rir);
1869   for (uint k = 0; k < g_opts.attrs; k++) {
1870     rpk[k].add(st2.rpk[k]);
1871   }
1872 }
1873 
1874 static void
sumrange(const Rng & rng,Sterr & st)1875 sumrange(const Rng& rng, Sterr& st)
1876 {
1877   const Stval& st1 = rng.m_st_scan;
1878   const Stval& st2 = rng.m_st_stat;
1879 
1880   // rir_v2 error as pct of total rows
1881   {
1882     double rows = (double)g_opts.rows;
1883     double x1 = (double)st1.rir_v2;
1884     double x2 = (double)st2.rir_v2;
1885     double x3 = 100.0 * (x2 - x1) / rows;
1886     st.rir_v2.add(x3);
1887   }
1888 
1889   // rir error as pct of total rows
1890   {
1891     double rows = (double)g_opts.rows;
1892     double x1 = st1.rir;
1893     double x2 = st2.rir;
1894     double x3 = 100.0 * (x2 - x1) / rows;
1895     st.rir.add(x3);
1896   }
1897 
1898   // rpk errors as plain diff
1899   for (uint k = 0; k < g_opts.attrs; k++) {
1900     double x1 = st1.rpk[k];
1901     double x2 = st2.rpk[k];
1902     double x3 = (x2 - x1);
1903     st.rpk[k].add(x3);
1904   }
1905 }
1906 
1907 static void
sumranges(Sterr & st)1908 sumranges(Sterr& st)
1909 {
1910   for (uint i = 0; i < g_opts.ops; i++) {
1911     const Rng& rng = g_rnglist[i];
1912     sumrange(rng, st);
1913   }
1914 }
1915 
1916 // loop and final stats
1917 
1918 static Sterr g_sterr;
1919 
1920 static void
loopstats()1921 loopstats()
1922 {
1923   Sterr st;
1924   sumranges(st);
1925   if (g_opts.loops != 1) {
1926     ll0("=== loop " << g_loop << " summary ===");
1927     ll0(st);
1928   }
1929   // accumulate
1930   g_sterr.add(st);
1931 }
1932 
1933 static int
loopdumps()1934 loopdumps()
1935 {
1936   char file[200];
1937   if (g_opts.dump == 0)
1938     return 0;
1939   {
1940     BaseString::snprintf(file, sizeof(file),
1941                          "%s.key.%d", g_opts.dump, g_loop);
1942     FILE* f = 0;
1943     chker((f = fopen(file, "w")) != 0);
1944     fprintf(f, "a");
1945     for (uint k = 0; k < g_opts.attrs; k++) {
1946       if (k == 0)
1947         fprintf(f, ",b_null,b");
1948       else if (k == 1)
1949         fprintf(f, ",c_null,c");
1950       else if (k == 2)
1951         fprintf(f, ",d_null,d");
1952       else
1953         require(false);
1954     }
1955     fprintf(f, "\n");
1956     for (uint i = 0; i < g_opts.rows; i++) {
1957       const Key& key = g_keys[g_sortkeys[i]];
1958       const Val& val = key.m_val;
1959       fprintf(f, "%u", i);
1960       for (uint k = 0; k < g_opts.attrs; k++) {
1961         if (k == 0) {
1962           fprintf(f, ",%d,", val.b_null);
1963           if (!val.b_null)
1964             fprintf(f, "%u", val.b);
1965         } else if (k == 1) {
1966           fprintf(f, ",%d,", val.c_null);
1967           if (!val.c_null)
1968             fprintf(f, "%.*s", val.c[0], &val.c[1]);
1969         } else if (k == 2) {
1970           fprintf(f, ",%d,", val.d_null);
1971           if (!val.d_null)
1972             fprintf(f, "%u", val.d);
1973         } else {
1974           require(false);
1975         }
1976       }
1977       fprintf(f, "\n");
1978     }
1979     chker(fclose(f) == 0);
1980   }
1981   {
1982     BaseString::snprintf(file, sizeof(file),
1983                          "%s.range.%d", g_opts.dump, g_loop);
1984     FILE* f = 0;
1985     chker((f = fopen(file, "w")) != 0);
1986     fprintf(f, "op");
1987     for (uint j = 0; j <= 1; j++) {
1988       const char* suf = (j == 0 ? "_lo" : "_hi");
1989       fprintf(f, ",attrs%s", suf);
1990       for (uint k = 0; k < g_opts.attrs; k++) {
1991         if (k == 0)
1992           fprintf(f, ",b_null%s,b%s", suf, suf);
1993         else if (k == 1)
1994           fprintf(f, ",c_null%s,c%s", suf, suf);
1995         else if (k == 2)
1996           fprintf(f, ",d_null%s,d%s", suf, suf);
1997         else
1998           require(false);
1999       }
2000       fprintf(f, ",side%s", suf);
2001     }
2002     fprintf(f, "\n");
2003     for (uint i = 0; i < g_opts.ops; i++) {
2004       const Rng& rng = g_rnglist[i];
2005       fprintf(f, "%u", i);
2006       for (uint j = 0; j <= 1; j++) {
2007         const Bnd& bnd = rng.m_bnd[j];
2008         const Val& val = bnd.m_val;
2009         fprintf(f, ",%u", val.m_numattrs);
2010         for (uint k = 0; k < g_opts.attrs; k++) {
2011           if (k >= val.m_numattrs)
2012             fprintf(f, ",,");
2013           else if (k == 0) {
2014             fprintf(f, ",%d,", val.b_null);
2015             if (!val.b_null)
2016               fprintf(f, "%u", val.b);
2017           } else if (k == 1) {
2018             fprintf(f, ",%d,", val.c_null);
2019             if (!val.c_null)
2020               fprintf(f, "%.*s", val.c[0], &val.c[1]);
2021           } else if (k == 2) {
2022             fprintf(f, ",%d,", val.d_null);
2023             if (!val.d_null)
2024               fprintf(f, "%u", val.d);
2025           } else {
2026             require(false);
2027           }
2028         }
2029         fprintf(f, ",%d", bnd.m_side);
2030       }
2031       fprintf(f, "\n");
2032     }
2033     chker(fclose(f) == 0);
2034   }
2035   {
2036     BaseString::snprintf(file, sizeof(file),
2037                          "%s.stat.%d", g_opts.dump, g_loop);
2038     FILE* f = 0;
2039     chker((f = fopen(file, "w")) != 0);
2040     fprintf(f, "op");
2041     for (uint j = 0; j <= 1; j++) {
2042       const char* suf = (j == 0 ? "_scan" : "_stat");
2043       fprintf(f, ",rir_v2%s", suf);
2044       fprintf(f, ",rir%s", suf);
2045       for (uint k = 0; k < g_opts.attrs; k++) {
2046         fprintf(f, ",rpk_%u%s", k, suf);
2047       }
2048       fprintf(f, ",empty%s", suf);
2049       if (j == 1)
2050         fprintf(f, ",rule%s", suf);
2051     }
2052     fprintf(f, "\n");
2053     for (uint i = 0; i < g_opts.ops; i++) {
2054       const Rng& rng = g_rnglist[i];
2055       fprintf(f, "%u", i);
2056       for (uint j = 0; j <= 1; j++) {
2057         const Stval& st = (j == 0 ? rng.m_st_scan : rng.m_st_stat);
2058         fprintf(f, ",%u", st.rir_v2);
2059         fprintf(f, ",%.2f", st.rir);
2060         for (uint k = 0; k < g_opts.attrs; k++) {
2061           fprintf(f, ",%.2f", st.rpk[k]);
2062         }
2063         fprintf(f, ",%d", st.empty);
2064         if (j == 1)
2065           fprintf(f, ",%s", st.rule);
2066       }
2067       fprintf(f, "\n");
2068     }
2069     chker(fclose(f) == 0);
2070   }
2071   return 0;
2072 }
2073 
2074 static void
finalstats()2075 finalstats()
2076 {
2077   ll0("=== summary ===");
2078   ll0(g_sterr);
2079 }
2080 
2081 static int
runtest()2082 runtest()
2083 {
2084   ll1("sizeof Val: " << sizeof(Val));
2085   ll1("sizeof Key: " << sizeof(Key));
2086   ll1("sizeof Bnd: " << sizeof(Bnd));
2087   ll1("sizeof Rng: " << sizeof(Rng));
2088 
2089   uint seed = g_opts.seed;
2090   if (seed != 1) { // not loop number
2091     if (seed == 0) { // random
2092       seed = 2 + (ushort)getpid();
2093     }
2094     ll0("random seed is " << seed);
2095     srand(seed);
2096   } else {
2097     ll0("random seed is " << "loop number");
2098   }
2099   g_cs = get_charset_by_name(g_csname, MYF(0));
2100   if (g_cs == 0)
2101     g_cs = get_charset_by_csname(g_csname, MY_CS_PRIMARY, MYF(0));
2102   chkrc(g_cs != 0);
2103 
2104   allockeys();
2105   allocranges();
2106   chkrc(createtable() == 0);
2107   chkrc(createindex() == 0);
2108   chkrc(createNdbRecords() == 0);
2109   chkrc(definestat() == 0);
2110   chkrc(startlistener() == 0);
2111 
2112   for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
2113     ll0("=== loop " << g_loop << " ===");
2114     uint seed = g_opts.seed;
2115     if (seed == 1) { // loop number
2116       seed = g_loop;
2117       srand(seed);
2118     }
2119     makekeys();
2120     chkrc(loaddata(g_loop != 0) == 0);
2121     makeranges();
2122     chkrc(scanranges() == 0);
2123     chkrc(updatestat() == 0);
2124     chkrc(runlistener() == 0);
2125     chkrc(readstat() == 0);
2126     chkrc(queryranges() == 0);
2127     loopstats();
2128     chkrc(loopdumps() == 0);
2129   }
2130   finalstats();
2131 
2132   chkrc(stoplistener() == 0);
2133   if (!g_opts.keeptable)
2134     chkrc(droptable() == 0);
2135   freeranges();
2136   freekeys();
2137   return 0;
2138 }
2139 
2140 static int
doconnect()2141 doconnect()
2142 {
2143   g_ncc = new Ndb_cluster_connection();
2144   require(g_ncc != 0);
2145   chkdb(g_ncc->connect(30) == 0);
2146   g_ndb = new Ndb(g_ncc, "TEST_DB");
2147   require(g_ndb != 0);
2148   chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0);
2149   g_ndb_sys = new Ndb(g_ncc, "mysql");
2150   require(g_ndb_sys != 0);
2151   chkdb(g_ndb_sys->init() == 0 && g_ndb_sys->waitUntilReady(30) == 0);
2152   g_is = new NdbIndexStat;
2153   require(g_is != 0);
2154   return 0;
2155 }
2156 
2157 static void
dodisconnect()2158 dodisconnect()
2159 {
2160   delete g_is;
2161   delete g_ndb_sys;
2162   delete g_ndb;
2163   delete g_ncc;
2164 }
2165 
2166 static struct my_option
2167 my_long_options[] =
2168 {
2169   NDB_STD_OPTS("testIndexStat"),
2170   { "loglevel", NDB_OPT_NOSHORT,
2171     "Logging level in this program 0-3 (default 0)",
2172     (uchar **)&g_opts.loglevel, (uchar **)&g_opts.loglevel, 0,
2173     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2174   { "seed", NDB_OPT_NOSHORT, "Random seed (default 0=random, 1=loop number)",
2175     (uchar **)&g_opts.seed, (uchar **)&g_opts.seed, 0,
2176     GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2177   { "loops", NDB_OPT_NOSHORT, "Number of test loops (default 1, 0=forever)",
2178     (uchar **)&g_opts.loops, (uchar **)&g_opts.loops, 0,
2179     GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
2180   { "rows", NDB_OPT_NOSHORT, "Number of rows (default 10000)",
2181     (uchar **)&g_opts.rows, (uchar **)&g_opts.rows, 0,
2182     GET_UINT, REQUIRED_ARG, 100000, 0, 0, 0, 0, 0 },
2183   { "ops", NDB_OPT_NOSHORT,"Number of index scans per loop (default 100)",
2184     (uchar **)&g_opts.ops, (uchar **)&g_opts.ops, 0,
2185     GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
2186   { "nullkeys", NDB_OPT_NOSHORT, "Pct nulls in each key attribute (default 10)",
2187     (uchar **)&g_opts.nullkeys, (uchar **)&g_opts.nullkeys, 0,
2188     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2189   { "rpk", NDB_OPT_NOSHORT, "Avg records per full key (default 10)",
2190     (uchar **)&g_opts.rpk, (uchar **)&g_opts.rpk, 0,
2191     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2192   { "rpkvar", NDB_OPT_NOSHORT, "Vary rpk by factor (default 10, none 1)",
2193     (uchar **)&g_opts.rpkvar, (uchar **)&g_opts.rpkvar, 0,
2194     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
2195   { "scanpct", NDB_OPT_NOSHORT,
2196     "Preferred max pct of total rows per scan (default 10)",
2197     (uchar **)&g_opts.scanpct, (uchar **)&g_opts.scanpct, 0,
2198     GET_UINT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
2199   { "eqscans", NDB_OPT_NOSHORT,
2200     "Pct scans for partial/full equality (default 30)",
2201     (uchar **)&g_opts.eqscans, (uchar **)&g_opts.eqscans, 0,
2202     GET_UINT, REQUIRED_ARG, 50, 0, 0, 0, 0, 0 },
2203   { "keeptable", NDB_OPT_NOSHORT,
2204     "Do not drop table at exit",
2205     (uchar **)&g_opts.keeptable, (uchar **)&g_opts.keeptable, 0,
2206     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
2207   { "abort", NDB_OPT_NOSHORT, "Dump core on any error",
2208     (uchar **)&g_opts.abort, (uchar **)&g_opts.abort, 0,
2209     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
2210   { "dump", NDB_OPT_NOSHORT, "Write CSV files name.* of keys,ranges,stats",
2211     (uchar **)&g_opts.dump, (uchar **)&g_opts.dump, 0,
2212     GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
2213   { 0, 0, 0,
2214     0, 0, 0,
2215     GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
2216 };
2217 
2218 const char*
2219 load_default_groups[] = { "mysql_cluster", 0 };
2220 
2221 static void
short_usage_sub()2222 short_usage_sub()
2223 {
2224   ndb_short_usage_sub(NULL);
2225 }
2226 
2227 static void
usage()2228 usage()
2229 {
2230   ndbout << my_progname << ": ordered index stats test" << endl;
2231   ndb_usage(short_usage_sub, load_default_groups, my_long_options);
2232 }
2233 
2234 static int
checkoptions()2235 checkoptions()
2236 {
2237   chkrc(g_opts.rows != 0);
2238   chkrc(g_opts.nullkeys <= 100);
2239   chkrc(g_opts.rpk != 0);
2240   g_opts.rpk = min(g_opts.rpk, g_opts.rows);
2241   chkrc(g_opts.rpkvar != 0);
2242   chkrc(g_opts.scanpct <= 100);
2243   chkrc(g_opts.eqscans <= 100);
2244   // set value limits
2245   g_lim_val.all_nullable = false;
2246   g_lim_bnd.all_nullable = true;
2247   g_lim_val.b_min = g_opts.rows;
2248   g_lim_val.b_max = 2 * g_opts.rows;
2249   g_lim_bnd.b_min = 90 * g_lim_val.b_min / 100;
2250   g_lim_bnd.b_max = 110 * g_lim_val.b_max / 100;
2251   g_lim_val.c_char = "bcd";
2252   g_lim_bnd.c_char = "abcde";
2253   g_lim_val.d_min = 100;
2254   g_lim_val.d_max = 200;
2255   g_lim_bnd.d_min = 0;
2256   g_lim_bnd.d_max = 300;
2257   return 0;
2258 }
2259 
2260 static
2261 int
docreate_stat_tables()2262 docreate_stat_tables()
2263 {
2264   if (g_is->check_systables(g_ndb_sys) == 0)
2265     return 0;
2266   ll1("check_systables: " << g_is->getNdbError());
2267 
2268   ll0("create stat tables");
2269   chkdb(g_is->create_systables(g_ndb_sys) == 0);
2270   g_has_created_stat_tables = true;
2271   return 0;
2272 }
2273 
2274 static
2275 int
dodrop_stat_tables()2276 dodrop_stat_tables()
2277 {
2278   if (g_has_created_stat_tables == false)
2279     return 0;
2280 
2281   ll0("drop stat tables");
2282   chkdb(g_is->drop_systables(g_ndb_sys) == 0);
2283   return 0;
2284 }
2285 
2286 static int
docreate_stat_events()2287 docreate_stat_events()
2288 {
2289   if (g_is->check_sysevents(g_ndb_sys) == 0)
2290     return 0;
2291   ll1("check_sysevents: " << g_is->getNdbError());
2292 
2293   ll0("create stat events");
2294   chkdb(g_is->create_sysevents(g_ndb_sys) == 0);
2295   g_has_created_stat_events = true;
2296   return 0;
2297 }
2298 
2299 static int
dodrop_stat_events()2300 dodrop_stat_events()
2301 {
2302   if (g_has_created_stat_events == false)
2303     return 0;
2304 
2305   ll0("drop stat events");
2306   chkdb(g_is->drop_sysevents(g_ndb_sys) == 0);
2307   return 0;
2308 }
2309 
2310 static int
docreate_sys_objects()2311 docreate_sys_objects()
2312 {
2313   require(g_is != 0 && g_ndb_sys != 0);
2314   chkrc(docreate_stat_tables() == 0);
2315   chkrc(docreate_stat_events() == 0);
2316   return 0;
2317 }
2318 
2319 static int
dodrop_sys_objects()2320 dodrop_sys_objects()
2321 {
2322   require(g_is != 0 && g_ndb_sys != 0);
2323   chkrc(dodrop_stat_events() == 0);
2324   chkrc(dodrop_stat_tables() == 0);
2325   return 0;
2326 }
2327 
2328 int
main(int argc,char ** argv)2329 main(int argc, char** argv)
2330 {
2331   ndb_init();
2332   my_progname = strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
2333   uint i;
2334   ndbout << my_progname;
2335   for (i = 1; i < (uint)argc; i++)
2336     ndbout << " " << argv[i];
2337   ndbout << endl;
2338   int ret;
2339   ndb_opt_set_usage_funcs(short_usage_sub, usage);
2340   ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
2341   if (ret != 0 || argc != 0) {
2342     ll0("wrong args");
2343     return NDBT_ProgramExit(NDBT_WRONGARGS);
2344   }
2345   if (checkoptions() == -1) {
2346     ll0("invalid args");
2347     return NDBT_ProgramExit(NDBT_WRONGARGS);
2348   }
2349   if (doconnect() == -1) {
2350     ll0("connect failed");
2351     return NDBT_ProgramExit(NDBT_FAILED);
2352   }
2353   if (docreate_sys_objects() == -1) {
2354     ll0("failed to check or create stat tables and events");
2355     goto failed;
2356   }
2357   if (runtest() == -1) {
2358     ll0("test failed");
2359     goto failed;
2360   }
2361   if (dodrop_sys_objects() == -1) {
2362     ll0("failed to drop created stat tables or events");
2363     goto failed;
2364   }
2365   dodisconnect();
2366   return NDBT_ProgramExit(NDBT_OK);
2367 failed:
2368   (void)dodrop_sys_objects();
2369   dodisconnect();
2370   return NDBT_ProgramExit(NDBT_FAILED);
2371 }
2372