1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /*
26  * testOIBasic - ordered index test
27  *
28  * dummy push to re-created mysql-5.1-telco ...
29  */
30 
31 #include <ndb_global.h>
32 
33 #include <NdbMain.h>
34 #include <NdbOut.hpp>
35 #include <NdbApi.hpp>
36 #include <NdbTest.hpp>
37 #include <NdbMutex.h>
38 #include <NdbCondition.h>
39 #include <NdbThread.h>
40 #include <NdbTick.h>
41 #include <NdbSleep.h>
42 #include <my_sys.h>
43 #include <NdbSqlUtil.hpp>
44 #include <ndb_version.h>
45 
46 // options
47 
48 struct Opt {
49   // common options
50   uint m_batch;
51   const char* m_bound;
52   const char* m_case;
53   bool m_collsp;
54   bool m_cont;
55   bool m_core;
56   const char* m_csname;
57   CHARSET_INFO* m_cs;
58   int m_die;
59   bool m_dups;
60   NdbDictionary::Object::FragmentType m_fragtype;
61   const char* m_index;
62   uint m_loop;
63   uint m_mrrmaxrng;
64   bool m_msglock;
65   bool m_nologging;
66   bool m_noverify;
67   uint m_pctmrr;
68   uint m_pctnull;
69   uint m_rows;
70   uint m_samples;
71   uint m_scanbatch;
72   uint m_scanpar;
73   uint m_scanstop;
74   int m_seed;
75   const char* m_skip;
76   uint m_sloop;
77   uint m_ssloop;
78   const char* m_table;
79   uint m_threads;
80   int m_v;      // int for lint
OptOpt81   Opt() :
82     m_batch(32),
83     m_bound("01234"),
84     m_case(0),
85     m_collsp(false),
86     m_cont(false),
87     m_core(false),
88     m_csname("random"),
89     m_cs(0),
90     m_die(0),
91     m_dups(false),
92     m_fragtype(NdbDictionary::Object::FragUndefined),
93     m_index(0),
94     m_loop(1),
95     m_mrrmaxrng(10),
96     m_msglock(true),
97     m_nologging(false),
98     m_noverify(false),
99     m_pctmrr(50),
100     m_pctnull(10),
101     m_rows(1000),
102     m_samples(0),
103     m_scanbatch(0),
104     m_scanpar(0),
105     m_scanstop(0),
106     m_seed(-1),
107     m_skip(0),
108     m_sloop(4),
109     m_ssloop(4),
110     m_table(0),
111     m_threads(4),
112     m_v(1) {
113   }
114 };
115 
116 static Opt g_opt;
117 
118 static void printcases();
119 static void printtables();
120 
121 static void
printhelp()122 printhelp()
123 {
124   Opt d;
125   ndbout
126     << "usage: testOIbasic [options]" << endl
127     << "  -batch N      pk operations in batch [" << d.m_batch << "]" << endl
128     << "  -bound xyz    use only these bound types 0-4 [" << d.m_bound << "]" << endl
129     << "  -case abc     only given test cases (letters a-z)" << endl
130     << "  -collsp       use strnncollsp instead of strnxfrm" << endl
131     << "  -cont         on error continue to next test case [" << d.m_cont << "]" << endl
132     << "  -core         core dump on error [" << d.m_core << "]" << endl
133     << "  -csname S     charset or collation [" << d.m_csname << "]" << endl
134     << "  -die nnn      exit immediately on NDB error code nnn" << endl
135     << "  -dups         allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
136     << "  -fragtype T   fragment type single/small/medium/large" << endl
137     << "  -index xyz    only given index numbers (digits 0-9)" << endl
138     << "  -loop N       loop count full suite 0=forever [" << d.m_loop << "]" << endl
139     << "  -mrrmaxrng N  max ranges to supply for MRR scan [" << d.m_mrrmaxrng << "]" << endl
140     << "  -nologging    create tables in no-logging mode" << endl
141     << "  -noverify     skip index verifications" << endl
142     << "  -pctmrr N     pct of index scans to use MRR [" << d.m_pctmrr << "]" << endl
143     << "  -pctnull N    pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
144     << "  -rows N       rows per thread [" << d.m_rows << "]" << endl
145     << "  -samples N    samples for some timings (0=all) [" << d.m_samples << "]" << endl
146     << "  -scanbatch N  scan batch 0=default [" << d.m_scanbatch << "]" << endl
147     << "  -scanpar N    scan parallel 0=default [" << d.m_scanpar << "]" << endl
148     << "  -seed N       srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
149     << "  -skip abc     skip given test cases (letters a-z)" << endl
150     << "  -sloop N      level 2 (sub)loop count [" << d.m_sloop << "]" << endl
151     << "  -ssloop N     level 3 (sub)loop count [" << d.m_ssloop << "]" << endl
152     << "  -table xyz    only given table numbers (digits 0-9)" << endl
153     << "  -threads N    number of threads [" << d.m_threads << "]" << endl
154     << "  -vN           verbosity [" << d.m_v << "]" << endl
155     << "  -h or -help   print this help text" << endl
156     ;
157   printcases();
158   printtables();
159 }
160 
161 // not yet configurable
162 static const bool g_store_null_key = true;
163 
164 // compare NULL like normal value (NULL < not NULL, NULL == NULL)
165 static const bool g_compare_null = true;
166 
167 static const char* hexstr = "0123456789abcdef";
168 
169 // random ints
170 #ifdef NDB_WIN
171 #define random() rand()
172 #define srandom(SEED) srand(SEED)
173 #endif
174 
175 static uint
urandom(uint n)176 urandom(uint n)
177 {
178   if (n == 0)
179     return 0;
180   uint i = random() % n;
181   return i;
182 }
183 
184 static int
irandom(uint n)185 irandom(uint n)
186 {
187   if (n == 0)
188     return 0;
189   int i = random() % n;
190   if (random() & 0x1)
191     i = -i;
192   return i;
193 }
194 
195 static bool
randompct(uint pct)196 randompct(uint pct)
197 {
198   if (pct == 0)
199     return false;
200   if (pct >= 100)
201     return true;
202   return urandom(100) < pct;
203 }
204 
205 static uint
random_coprime(uint n)206 random_coprime(uint n)
207 {
208     uint prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
209     uint count = sizeof(prime) / sizeof(prime[0]);
210     if (n == 0)
211       return 0;
212     while (1) {
213       uint i = urandom(count);
214       if (n % prime[i] != 0)
215         return prime[i];
216     }
217 }
218 
219 // random re-sequence of 0...(n-1)
220 
221 struct Rsq {
222   Rsq(uint n);
223   uint next();
224 private:
225   uint m_n;
226   uint m_i;
227   uint m_start;
228   uint m_prime;
229 };
230 
Rsq(uint n)231 Rsq::Rsq(uint n)
232 {
233   m_n = n;
234   m_i = 0;
235   m_start = urandom(n);
236   m_prime = random_coprime(n);
237 }
238 
239 uint
next()240 Rsq::next()
241 {
242   assert(m_n != 0);
243   return (m_start + m_i++ * m_prime) % m_n;
244 }
245 
246 // log and error macros
247 
248 static NdbMutex *ndbout_mutex = NULL;
249 static const char* getthrprefix();
250 
251 #define LLN(n, s) \
252   do { \
253     if ((n) > g_opt.m_v) break; \
254     if (g_opt.m_msglock) NdbMutex_Lock(ndbout_mutex); \
255     ndbout << getthrprefix(); \
256     if ((n) > 2) \
257       ndbout << "line " << __LINE__ << ": "; \
258     ndbout << s << endl; \
259     if (g_opt.m_msglock) NdbMutex_Unlock(ndbout_mutex); \
260   } while(0)
261 
262 #define LL0(s) LLN(0, s)
263 #define LL1(s) LLN(1, s)
264 #define LL2(s) LLN(2, s)
265 #define LL3(s) LLN(3, s)
266 #define LL4(s) LLN(4, s)
267 #define LL5(s) LLN(5, s)
268 
269 #define HEX(x)  hex << (x) << dec
270 
271 // following check a condition and return -1 on failure
272 
273 #undef CHK      // simple check
274 #undef CHKTRY   // check with action on fail
275 #undef CHKCON   // print NDB API errors on failure
276 
277 #define CHK(x)  CHKTRY(x, ;)
278 
279 #define CHKTRY(x, act) \
280   do { \
281     if (x) break; \
282     LL0("line " << __LINE__ << ": " << #x << " failed"); \
283     if (g_opt.m_core) abort(); \
284     act; \
285     return -1; \
286   } while (0)
287 
288 #define CHKCON(x, con) \
289   do { \
290     if (x) break; \
291     LL0("line " << __LINE__ << ": " << #x << " failed"); \
292     (con).printerror(ndbout); \
293     if (g_opt.m_core) abort(); \
294     return -1; \
295   } while (0)
296 
297 // method parameters
298 
299 struct Thr;
300 struct Con;
301 struct Tab;
302 struct ITab;
303 struct Set;
304 struct Tmr;
305 
306 struct Par : public Opt {
307   uint m_no;
308   Con* m_con;
conPar309   Con& con() const { assert(m_con != 0); return *m_con; }
310   const Tab* m_tab;
tabPar311   const Tab& tab() const { assert(m_tab != 0); return *m_tab; }
312   const ITab* m_itab;
itabPar313   const ITab& itab() const { assert(m_itab != 0); return *m_itab; }
314   Set* m_set;
setPar315   Set& set() const { assert(m_set != 0); return *m_set; }
316   Tmr* m_tmr;
tmrPar317   Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
318   char m_currcase[2];
319   uint m_lno;
320   uint m_slno;
321   uint m_totrows;
322   // value calculation
323   uint m_range;
324   uint m_pctrange;
325   uint m_pctbrange;
326   int m_bdir;
327   bool m_noindexkeyupdate;
328   // choice of key
329   bool m_randomkey;
330   // do verify after read
331   bool m_verify;
332   // errors to catch (see Con)
333   uint m_catcherr;
334   // abort percentage
335   uint m_abortpct;
336   NdbOperation::LockMode m_lockmode;
337   // scan options
338   bool m_tupscan;
339   bool m_ordered;
340   bool m_descending;
341   bool m_multiRange;
342   // threads used by current test case
343   uint m_usedthreads;
ParPar344   Par(const Opt& opt) :
345     Opt(opt),
346     m_no(0),
347     m_con(0),
348     m_tab(0),
349     m_itab(0),
350     m_set(0),
351     m_tmr(0),
352     m_lno(0),
353     m_slno(0),
354     m_totrows(0),
355     m_range(m_rows),
356     m_pctrange(40),
357     m_pctbrange(80),
358     m_bdir(0),
359     m_noindexkeyupdate(false),
360     m_randomkey(false),
361     m_verify(false),
362     m_catcherr(0),
363     m_abortpct(0),
364     m_lockmode(NdbOperation::LM_Read),
365     m_tupscan(false),
366     m_ordered(false),
367     m_descending(false),
368     m_multiRange(false),
369     m_usedthreads(0)
370   {
371     m_currcase[0] = 0;
372   }
373 };
374 
375 static bool
usetable(Par par,uint i)376 usetable(Par par, uint i)
377 {
378   return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
379 }
380 
381 static bool
useindex(Par par,uint i)382 useindex(Par par, uint i)
383 {
384   return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
385 }
386 
387 static uint
thrrow(Par par,uint j)388 thrrow(Par par, uint j)
389 {
390   return par.m_usedthreads * j + par.m_no;
391 }
392 
393 #if 0
394 static bool
395 isthrrow(Par par, uint i)
396 {
397   return i % par.m_usedthreads == par.m_no;
398 }
399 #endif
400 
401 // timer
402 
403 struct Tmr {
404   void clr();
405   void on();
406   void off(uint cnt = 0);
407   const char* time();
408   const char* pct(const Tmr& t1);
409   const char* over(const Tmr& t1);
410   NDB_TICKS m_on;
411   NDB_TICKS m_ms;
412   uint m_cnt;
413   char m_time[100];
414   char m_text[100];
TmrTmr415   Tmr() { clr(); }
416 };
417 
418 void
clr()419 Tmr::clr()
420 {
421   m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
422 }
423 
424 void
on()425 Tmr::on()
426 {
427   assert(m_on == 0);
428   m_on = NdbTick_CurrentMillisecond();
429 }
430 
431 void
off(uint cnt)432 Tmr::off(uint cnt)
433 {
434   NDB_TICKS off = NdbTick_CurrentMillisecond();
435   assert(m_on != 0 && off >= m_on);
436   m_ms += off - m_on;
437   m_cnt += cnt;
438   m_on = 0;
439 }
440 
441 const char*
time()442 Tmr::time()
443 {
444   if (m_cnt == 0) {
445     sprintf(m_time, "%u ms", (unsigned)m_ms);
446   } else {
447     sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", (unsigned)m_ms, m_cnt, (unsigned)((1000 * m_ms) / m_cnt));
448   }
449   return m_time;
450 }
451 
452 const char*
pct(const Tmr & t1)453 Tmr::pct(const Tmr& t1)
454 {
455   if (0 < t1.m_ms) {
456     sprintf(m_text, "%u pct", (unsigned)((100 * m_ms) / t1.m_ms));
457   } else {
458     sprintf(m_text, "[cannot measure]");
459   }
460   return m_text;
461 }
462 
463 const char*
over(const Tmr & t1)464 Tmr::over(const Tmr& t1)
465 {
466   if (0 < t1.m_ms) {
467     if (t1.m_ms <= m_ms)
468       sprintf(m_text, "%u pct", (unsigned)((100 * (m_ms - t1.m_ms)) / t1.m_ms));
469     else
470       sprintf(m_text, "-%u pct", (unsigned)((100 * (t1.m_ms - m_ms)) / t1.m_ms));
471   } else {
472     sprintf(m_text, "[cannot measure]");
473   }
474   return m_text;
475 }
476 
477 // character sets
478 
479 static const uint maxcsnumber = 512;
480 static const uint maxcharcount = 32;
481 static const uint maxcharsize = 4;
482 static const uint maxxmulsize = 8;
483 
484 // single mb char
485 struct Chr {
486   uchar m_bytes[maxcharsize];
487   uchar m_xbytes[maxxmulsize * maxcharsize];
488   uint m_size;
489   Chr();
490 };
491 
Chr()492 Chr::Chr()
493 {
494   memset(m_bytes, 0, sizeof(m_bytes));
495   memset(m_xbytes, 0, sizeof(m_xbytes));
496   m_size = 0;
497 }
498 
499 // charset and random valid chars to use
500 struct Chs {
501   CHARSET_INFO* m_cs;
502   uint m_xmul;
503   Chr* m_chr;
504   Chs(CHARSET_INFO* cs);
505   ~Chs();
506 };
507 
508 static NdbOut&
509 operator<<(NdbOut& out, const Chs& chs);
510 
Chs(CHARSET_INFO * cs)511 Chs::Chs(CHARSET_INFO* cs) :
512   m_cs(cs)
513 {
514   m_xmul = m_cs->strxfrm_multiply;
515   if (m_xmul == 0)
516     m_xmul = 1;
517   assert(m_xmul <= maxxmulsize);
518   m_chr = new Chr [maxcharcount];
519   uint i = 0;
520   uint miss1 = 0;
521   uint miss2 = 0;
522   uint miss3 = 0;
523   uint miss4 = 0;
524   while (i < maxcharcount) {
525     uchar* bytes = m_chr[i].m_bytes;
526     uchar* xbytes = m_chr[i].m_xbytes;
527     uint& size = m_chr[i].m_size;
528     bool ok;
529     size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
530     assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
531     // prefer longer chars
532     if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
533       continue;
534     for (uint j = 0; j < size; j++) {
535       bytes[j] = urandom(256);
536     }
537     int not_used;
538     // check wellformed
539     const char* sbytes = (const char*)bytes;
540     if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1, &not_used) != size) {
541       miss1++;
542       continue;
543     }
544     // check no proper prefix wellformed
545     ok = true;
546     for (uint j = 1; j < size; j++) {
547       if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1, &not_used) == j) {
548         ok = false;
549         break;
550       }
551     }
552     if (!ok) {
553       miss2++;
554       continue;
555     }
556     // normalize
557     memset(xbytes, 0, sizeof(xbytes));
558     // currently returns buffer size always
559     int xlen = NdbSqlUtil::ndb_strnxfrm(cs, xbytes, m_xmul * size, bytes, size);
560     // check we got something
561     ok = false;
562     for (uint j = 0; j < (uint)xlen; j++) {
563       if (xbytes[j] != 0) {
564         ok = true;
565         break;
566       }
567     }
568     if (!ok) {
569       miss3++;
570       continue;
571     }
572     // check for duplicate (before normalize)
573     ok = true;
574     for (uint j = 0; j < i; j++) {
575       const Chr& chr = m_chr[j];
576       if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
577         ok = false;
578         break;
579       }
580     }
581     if (!ok) {
582       miss4++;
583       continue;
584     }
585     i++;
586   }
587   bool disorder = true;
588   uint bubbles = 0;
589   while (disorder) {
590     disorder = false;
591     for (uint i = 1; i < maxcharcount; i++) {
592       uint len = sizeof(m_chr[i].m_xbytes);
593       if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
594         Chr chr = m_chr[i];
595         m_chr[i] = m_chr[i-1];
596         m_chr[i-1] = chr;
597         disorder = true;
598         bubbles++;
599       }
600     }
601   }
602   LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
603 }
604 
~Chs()605 Chs::~Chs()
606 {
607   delete [] m_chr;
608 }
609 
610 static NdbOut&
operator <<(NdbOut & out,const Chs & chs)611 operator<<(NdbOut& out, const Chs& chs)
612 {
613   CHARSET_INFO* cs = chs.m_cs;
614   out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
615   return out;
616 }
617 
618 static Chs* cslist[maxcsnumber];
619 
620 static void
resetcslist()621 resetcslist()
622 {
623   for (uint i = 0; i < maxcsnumber; i++) {
624     delete cslist[i];
625     cslist[i] = 0;
626   }
627 }
628 
629 static Chs*
getcs(Par par)630 getcs(Par par)
631 {
632   CHARSET_INFO* cs;
633   if (par.m_cs != 0) {
634     cs = par.m_cs;
635   } else {
636     while (1) {
637       uint n = urandom(maxcsnumber);
638       cs = get_charset(n, MYF(0));
639       if (cs != 0) {
640         // avoid dodgy internal character sets
641         // see bug# 37554
642         if (cs->state & MY_CS_HIDDEN)
643           continue;
644         // prefer complex charsets
645         if (cs->mbmaxlen != 1 || urandom(5) == 0)
646           break;
647       }
648     }
649   }
650   if (cslist[cs->number] == 0)
651     cslist[cs->number] = new Chs(cs);
652   return cslist[cs->number];
653 }
654 
655 // tables and indexes
656 
657 // Col - table column
658 
659 struct Col {
660   enum Type {
661     Unsigned = NdbDictionary::Column::Unsigned,
662     Char = NdbDictionary::Column::Char,
663     Varchar = NdbDictionary::Column::Varchar,
664     Longvarchar = NdbDictionary::Column::Longvarchar
665   };
666   const struct Tab& m_tab;
667   uint m_num;
668   const char* m_name;
669   bool m_pk;
670   Type m_type;
671   uint m_length;
672   uint m_bytelength;        // multiplied by char width
673   uint m_attrsize;          // base type size
674   uint m_headsize;          // length bytes
675   uint m_bytesize;          // full value size
676   bool m_nullable;
677   const Chs* m_chs;
678   Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs);
679   ~Col();
680   bool equal(const Col& col2) const;
681   void wellformed(const void* addr) const;
682 };
683 
Col(const struct Tab & tab,uint num,const char * name,bool pk,Type type,uint length,bool nullable,const Chs * chs)684 Col::Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs) :
685   m_tab(tab),
686   m_num(num),
687   m_name(strcpy(new char [strlen(name) + 1], name)),
688   m_pk(pk),
689   m_type(type),
690   m_length(length),
691   m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
692   m_attrsize(
693       type == Unsigned ? sizeof(Uint32) :
694       type == Char ? sizeof(char) :
695       type == Varchar ? sizeof(char) :
696       type == Longvarchar ? sizeof(char) : ~0),
697   m_headsize(
698       type == Unsigned ? 0 :
699       type == Char ? 0 :
700       type == Varchar ? 1 :
701       type == Longvarchar ? 2 : ~0),
702   m_bytesize(m_headsize + m_attrsize * m_bytelength),
703   m_nullable(nullable),
704   m_chs(chs)
705 {
706   // fix long varchar
707   if (type == Varchar && m_bytelength > 255) {
708     m_type = Longvarchar;
709     m_headsize += 1;
710     m_bytesize += 1;
711   }
712 }
713 
~Col()714 Col::~Col()
715 {
716   delete [] m_name;
717 }
718 
719 bool
equal(const Col & col2) const720 Col::equal(const Col& col2) const
721 {
722   return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
723 }
724 
725 void
wellformed(const void * addr) const726 Col::wellformed(const void* addr) const
727 {
728   switch (m_type) {
729   case Col::Unsigned:
730     break;
731   case Col::Char:
732     {
733       CHARSET_INFO* cs = m_chs->m_cs;
734       const char* src = (const char*)addr;
735       uint len = m_bytelength;
736       int not_used;
737 	  (void)not_used; /* squash warning when assert is #defined to nothing */
738       assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff, &not_used) == len);
739     }
740     break;
741   case Col::Varchar:
742     {
743       CHARSET_INFO* cs = m_chs->m_cs;
744       const uchar* src = (const uchar*)addr;
745       const char* ssrc = (const char*)src;
746       uint len = src[0];
747       int not_used;
748 	  (void)not_used; /* squash warning when assert is #defined to nothing */
749       assert(len <= m_bytelength);
750       assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff, &not_used) == len);
751     }
752     break;
753   case Col::Longvarchar:
754     {
755       CHARSET_INFO* cs = m_chs->m_cs;
756       const uchar* src = (const uchar*)addr;
757       const char* ssrc = (const char*)src;
758       uint len = src[0] + (src[1] << 8);
759       int not_used;
760 	  (void)not_used; /* squash warning when assert is #defined to nothing */
761       assert(len <= m_bytelength);
762       assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff, &not_used) == len);
763     }
764     break;
765   default:
766     assert(false);
767     break;
768   }
769 }
770 
771 static NdbOut&
operator <<(NdbOut & out,const Col & col)772 operator<<(NdbOut& out, const Col& col)
773 {
774   out << "col[" << col.m_num << "] " << col.m_name;
775   switch (col.m_type) {
776   case Col::Unsigned:
777     out << " uint";
778     break;
779   case Col::Char:
780     {
781       CHARSET_INFO* cs = col.m_chs->m_cs;
782       out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
783     }
784     break;
785   case Col::Varchar:
786     {
787       CHARSET_INFO* cs = col.m_chs->m_cs;
788       out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
789     }
790     break;
791   case Col::Longvarchar:
792     {
793       CHARSET_INFO* cs = col.m_chs->m_cs;
794       out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
795     }
796     break;
797   default:
798     out << "type" << (int)col.m_type;
799     assert(false);
800     break;
801   }
802   out << (col.m_pk ? " pk" : "");
803   out << (col.m_nullable ? " nullable" : "");
804   return out;
805 }
806 
807 // ICol - index column
808 
809 struct ICol {
810   const struct ITab& m_itab;
811   uint m_num;
812   const Col& m_col;
813   ICol(const struct ITab& itab, uint num, const Col& col);
814   ~ICol();
815 };
816 
ICol(const struct ITab & itab,uint num,const Col & col)817 ICol::ICol(const struct ITab& itab, uint num, const Col& col) :
818   m_itab(itab),
819   m_num(num),
820   m_col(col)
821 {
822 }
823 
~ICol()824 ICol::~ICol()
825 {
826 }
827 
828 static NdbOut&
operator <<(NdbOut & out,const ICol & icol)829 operator<<(NdbOut& out, const ICol& icol)
830 {
831   out << "icol[" << icol.m_num << "] " << icol.m_col;
832   return out;
833 }
834 
835 // ITab - index
836 
837 struct ITab {
838   enum Type {
839     OrderedIndex = NdbDictionary::Index::OrderedIndex,
840     UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
841   };
842   const struct Tab& m_tab;
843   const char* m_name;
844   Type m_type;
845   uint m_icols;
846   const ICol** m_icol;
847   uint m_keymask;
848   ITab(const struct Tab& tab, const char* name, Type type, uint icols);
849   ~ITab();
850   void icoladd(uint k, const ICol* icolptr);
851 };
852 
ITab(const struct Tab & tab,const char * name,Type type,uint icols)853 ITab::ITab(const struct Tab& tab, const char* name, Type type, uint icols) :
854   m_tab(tab),
855   m_name(strcpy(new char [strlen(name) + 1], name)),
856   m_type(type),
857   m_icols(icols),
858   m_icol(new const ICol* [icols + 1]),
859   m_keymask(0)
860 {
861   for (uint k = 0; k <= m_icols; k++)
862     m_icol[k] = 0;
863 }
864 
~ITab()865 ITab::~ITab()
866 {
867   delete [] m_name;
868   for (uint i = 0; i < m_icols; i++)
869     delete m_icol[i];
870   delete [] m_icol;
871 }
872 
873 void
icoladd(uint k,const ICol * icolptr)874 ITab::icoladd(uint k, const ICol* icolptr)
875 {
876   assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
877   m_icol[k] = icolptr;
878   m_keymask |= (1 << icolptr->m_col.m_num);
879 }
880 
881 static NdbOut&
operator <<(NdbOut & out,const ITab & itab)882 operator<<(NdbOut& out, const ITab& itab)
883 {
884   out << "itab " << itab.m_name << " icols=" << itab.m_icols;
885   for (uint k = 0; k < itab.m_icols; k++) {
886     const ICol& icol = *itab.m_icol[k];
887     out << endl << icol;
888   }
889   return out;
890 }
891 
892 // Tab - table
893 
894 struct Tab {
895   const char* m_name;
896   uint m_cols;
897   const Col** m_col;
898   uint m_pkmask;
899   uint m_itabs;
900   const ITab** m_itab;
901   uint m_orderedindexes;
902   uint m_hashindexes;
903   // pk must contain an Unsigned column
904   uint m_keycol;
905   void coladd(uint k, Col* colptr);
906   void itabadd(uint j, ITab* itab);
907   Tab(const char* name, uint cols, uint itabs, uint keycol);
908   ~Tab();
909 };
910 
Tab(const char * name,uint cols,uint itabs,uint keycol)911 Tab::Tab(const char* name, uint cols, uint itabs, uint keycol) :
912   m_name(strcpy(new char [strlen(name) + 1], name)),
913   m_cols(cols),
914   m_col(new const Col* [cols + 1]),
915   m_pkmask(0),
916   m_itabs(itabs),
917   m_itab(new const ITab* [itabs + 1]),
918   m_orderedindexes(0),
919   m_hashindexes(0),
920   m_keycol(keycol)
921 {
922   for (uint k = 0; k <= cols; k++)
923     m_col[k] = 0;
924   for (uint j = 0; j <= itabs; j++)
925     m_itab[j] = 0;
926 }
927 
~Tab()928 Tab::~Tab()
929 {
930   delete [] m_name;
931   for (uint i = 0; i < m_cols; i++)
932     delete m_col[i];
933   delete [] m_col;
934   for (uint i = 0; i < m_itabs; i++)
935     delete m_itab[i];
936   delete [] m_itab;
937 }
938 
939 void
coladd(uint k,Col * colptr)940 Tab::coladd(uint k, Col* colptr)
941 {
942   assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
943   m_col[k] = colptr;
944   if (colptr->m_pk)
945     m_pkmask |= (1 << k);
946 }
947 
948 void
itabadd(uint j,ITab * itabptr)949 Tab::itabadd(uint j, ITab* itabptr)
950 {
951   assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
952   m_itab[j] = itabptr;
953   if (itabptr->m_type == ITab::OrderedIndex)
954     m_orderedindexes++;
955   else
956     m_hashindexes++;
957 }
958 
959 static NdbOut&
operator <<(NdbOut & out,const Tab & tab)960 operator<<(NdbOut& out, const Tab& tab)
961 {
962   out << "tab " << tab.m_name << " cols=" << tab.m_cols;
963   for (uint k = 0; k < tab.m_cols; k++) {
964     const Col& col =  *tab.m_col[k];
965     out << endl << col;
966   }
967   for (uint i = 0; i < tab.m_itabs; i++) {
968     if (tab.m_itab[i] == 0)
969       continue;
970     const ITab& itab = *tab.m_itab[i];
971     out << endl << itab;
972   }
973   return out;
974 }
975 
976 // make table structs
977 
978 static const Tab** tablist = 0;
979 static uint tabcount = 0;
980 
981 static void
verifytables()982 verifytables()
983 {
984   for (uint j = 0; j < tabcount; j++) {
985     const Tab* t = tablist[j];
986     if (t == 0)
987       continue;
988     assert(t->m_cols != 0 && t->m_col != 0);
989     for (uint k = 0; k < t->m_cols; k++) {
990       const Col* c = t->m_col[k];
991       assert(c != 0 && c->m_num == k);
992       assert(!(c->m_pk && c->m_nullable));
993     }
994     assert(t->m_col[t->m_cols] == 0);
995     {
996       assert(t->m_keycol < t->m_cols);
997       const Col* c = t->m_col[t->m_keycol];
998       assert(c->m_pk && c->m_type == Col::Unsigned);
999     }
1000     assert(t->m_itabs != 0 && t->m_itab != 0);
1001     for (uint i = 0; i < t->m_itabs; i++) {
1002       const ITab* x = t->m_itab[i];
1003       if (x == 0)
1004         continue;
1005       assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
1006       for (uint k = 0; k < x->m_icols; k++) {
1007         const ICol* c = x->m_icol[k];
1008         assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
1009         if (x->m_type == ITab::UniqueHashIndex) {
1010           assert(!c->m_col.m_nullable);
1011         }
1012       }
1013     }
1014     assert(t->m_itab[t->m_itabs] == 0);
1015   }
1016 }
1017 
1018 static void
makebuiltintables(Par par)1019 makebuiltintables(Par par)
1020 {
1021   LL2("makebuiltintables");
1022   resetcslist();
1023   tabcount = 3;
1024   if (tablist == 0) {
1025     tablist = new const Tab* [tabcount];
1026     for (uint j = 0; j < tabcount; j++) {
1027       tablist[j] = 0;
1028     }
1029   } else {
1030     for (uint j = 0; j < tabcount; j++) {
1031       delete tablist[j];
1032       tablist[j] = 0;
1033     }
1034   }
1035   // ti0 - basic
1036   if (usetable(par, 0)) {
1037     Tab* t = new Tab("ti0", 5, 7, 0);
1038     // name - pk - type - length - nullable - cs
1039     t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
1040     t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
1041     t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
1042     t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
1043     t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
1044     if (useindex(par, 0)) {
1045       // a
1046       ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
1047       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1048       t->itabadd(0, x);
1049     }
1050     if (useindex(par, 1)) {
1051       // b
1052       ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
1053       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1054       t->itabadd(1, x);
1055     }
1056     if (useindex(par, 2)) {
1057       // b, c
1058       ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
1059       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1060       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1061       t->itabadd(2, x);
1062     }
1063     if (useindex(par, 3)) {
1064       // b, e, c, d
1065       ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4);
1066       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1067       x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1068       x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1069       x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
1070       t->itabadd(3, x);
1071     }
1072     if (useindex(par, 4)) {
1073       // a, c
1074       ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2);
1075       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1076       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1077       t->itabadd(4, x);
1078     }
1079     if (useindex(par, 5)) {
1080       // a, e
1081       ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
1082       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1083       x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1084       t->itabadd(5, x);
1085     }
1086     tablist[0] = t;
1087   }
1088   // ti1 - simple char fields
1089   if (usetable(par, 1)) {
1090     Tab* t = new Tab("ti1", 5, 7, 1);
1091     // name - pk - type - length - nullable - cs
1092     t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
1093     t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
1094     t->coladd(2, new Col(*t, 2, "c", 0, Col::Varchar, 20, 0, getcs(par)));
1095     t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
1096     t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par)));
1097     if (useindex(par, 0)) {
1098       // b
1099       ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
1100       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1101       t->itabadd(0, x);
1102     }
1103     if (useindex(par, 1)) {
1104       // c, a
1105       ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
1106       x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
1107       x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
1108       t->itabadd(1, x);
1109     }
1110     if (useindex(par, 2)) {
1111       // d
1112       ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1);
1113       x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1114       t->itabadd(2, x);
1115     }
1116     if (useindex(par, 3)) {
1117       // e, d, c, b
1118       ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4);
1119       x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1120       x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1121       x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1122       x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1123       t->itabadd(3, x);
1124     }
1125     if (useindex(par, 4)) {
1126       // a, b
1127       ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2);
1128       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1129       x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
1130       t->itabadd(4, x);
1131     }
1132     if (useindex(par, 5)) {
1133       // b, c, d
1134       ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3);
1135       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1136       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1137       x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1138       t->itabadd(5, x);
1139     }
1140     tablist[1] = t;
1141   }
1142   // ti2 - complex char fields
1143   if (usetable(par, 2)) {
1144     Tab* t = new Tab("ti2", 5, 7, 2);
1145     // name - pk - type - length - nullable - cs
1146     t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
1147     t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
1148     t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
1149     t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par)));
1150     t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par)));
1151     if (useindex(par, 0)) {
1152       // a, c, d
1153       ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
1154       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1155       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1156       x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1157       t->itabadd(0, x);
1158     }
1159     if (useindex(par, 1)) {
1160       // e, d, c, b, a
1161       ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
1162       x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1163       x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1164       x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1165       x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1166       x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
1167       t->itabadd(1, x);
1168     }
1169     if (useindex(par, 2)) {
1170       // d
1171       ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
1172       x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1173       t->itabadd(2, x);
1174     }
1175     if (useindex(par, 3)) {
1176       // b
1177       ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
1178       x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1179       t->itabadd(3, x);
1180     }
1181     if (useindex(par, 4)) {
1182       // a, c
1183       ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2);
1184       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1185       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1186       t->itabadd(4, x);
1187     }
1188     if (useindex(par, 5)) {
1189       // a, c, d, e
1190       ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4);
1191       x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1192       x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1193       x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1194       x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
1195       t->itabadd(5, x);
1196     }
1197     tablist[2] = t;
1198   }
1199   verifytables();
1200 }
1201 
1202 // connections
1203 
1204 static Ndb_cluster_connection* g_ncc = 0;
1205 
1206 struct Con {
1207   Ndb* m_ndb;
1208   NdbDictionary::Dictionary* m_dic;
1209   NdbTransaction* m_tx;
1210   Uint64 m_txid;
1211   NdbOperation* m_op;
1212   NdbIndexOperation* m_indexop;
1213   NdbScanOperation* m_scanop;
1214   NdbIndexScanOperation* m_indexscanop;
1215   NdbScanFilter* m_scanfilter;
1216   enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
1217   ScanMode m_scanmode;
1218   enum ErrType {
1219     ErrNone = 0,
1220     ErrDeadlock = 1,
1221     ErrNospace = 2,
1222     ErrOther = 4
1223   };
1224   ErrType m_errtype;
1225   char m_errname[100];
ConCon1226   Con() :
1227     m_ndb(0), m_dic(0), m_tx(0), m_txid(0), m_op(0), m_indexop(0),
1228     m_scanop(0), m_indexscanop(0), m_scanfilter(0),
1229     m_scanmode(ScanNo), m_errtype(ErrNone) {}
~ConCon1230   ~Con() {
1231     if (m_tx != 0)
1232       closeTransaction();
1233   }
1234   int connect();
1235   void connect(const Con& con);
1236   void disconnect();
1237   int startTransaction();
1238   int getNdbOperation(const Tab& tab);
1239   int getNdbIndexOperation1(const ITab& itab, const Tab& tab);
1240   int getNdbIndexOperation(const ITab& itab, const Tab& tab);
1241   int getNdbScanOperation(const Tab& tab);
1242   int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
1243   int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
1244   int getNdbScanFilter();
1245   int equal(int num, const char* addr);
1246   int getValue(int num, NdbRecAttr*& rec);
1247   int setValue(int num, const char* addr);
1248   int setBound(int num, int type, const void* value);
1249   int beginFilter(int group);
1250   int endFilter();
1251   int setFilter(int num, int cond, const void* value, uint len);
1252   int execute(ExecType et);
1253   int execute(ExecType et, uint& err);
1254   int readTuple(Par par);
1255   int readTuples(Par par);
1256   int readIndexTuples(Par par);
1257   int executeScan();
1258   int nextScanResult(bool fetchAllowed);
1259   int nextScanResult(bool fetchAllowed, uint& err);
1260   int updateScanTuple(Con& con2);
1261   int deleteScanTuple(Con& con2);
1262   void closeScan();
1263   void closeTransaction();
1264   const char* errname(uint err);
1265   void printerror(NdbOut& out);
1266 };
1267 
1268 int
connect()1269 Con::connect()
1270 {
1271   assert(m_ndb == 0);
1272   m_ndb = new Ndb(g_ncc, "TEST_DB");
1273   CHKCON(m_ndb->init() == 0, *this);
1274   CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
1275   m_tx = 0, m_txid = 0, m_op = 0;
1276   return 0;
1277 }
1278 
1279 void
connect(const Con & con)1280 Con::connect(const Con& con)
1281 {
1282   assert(m_ndb == 0);
1283   m_ndb = con.m_ndb;
1284 }
1285 
1286 void
disconnect()1287 Con::disconnect()
1288 {
1289   delete m_ndb;
1290   m_ndb = 0, m_dic = 0, m_tx = 0, m_txid = 0, m_op = 0;
1291 }
1292 
1293 int
startTransaction()1294 Con::startTransaction()
1295 {
1296   assert(m_ndb != 0);
1297   if (m_tx != 0)
1298     closeTransaction();
1299   CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
1300   m_txid = m_tx->getTransactionId();
1301   return 0;
1302 }
1303 
1304 int
getNdbOperation(const Tab & tab)1305 Con::getNdbOperation(const Tab& tab)
1306 {
1307   assert(m_tx != 0);
1308   CHKCON((m_op = m_tx->getNdbOperation(tab.m_name)) != 0, *this);
1309   return 0;
1310 }
1311 
1312 int
getNdbIndexOperation1(const ITab & itab,const Tab & tab)1313 Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab)
1314 {
1315   assert(m_tx != 0);
1316   CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
1317   return 0;
1318 }
1319 
1320 int
getNdbIndexOperation(const ITab & itab,const Tab & tab)1321 Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
1322 {
1323   assert(m_tx != 0);
1324   uint tries = 0;
1325   while (1) {
1326     if (getNdbIndexOperation1(itab, tab) == 0)
1327       break;
1328     CHK(++tries < 10);
1329     NdbSleep_MilliSleep(100);
1330   }
1331   return 0;
1332 }
1333 
1334 int
getNdbScanOperation(const Tab & tab)1335 Con::getNdbScanOperation(const Tab& tab)
1336 {
1337   assert(m_tx != 0);
1338   CHKCON((m_op = m_scanop = m_tx->getNdbScanOperation(tab.m_name)) != 0, *this);
1339   return 0;
1340 }
1341 
1342 int
getNdbIndexScanOperation1(const ITab & itab,const Tab & tab)1343 Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab)
1344 {
1345   assert(m_tx != 0);
1346   CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this);
1347   return 0;
1348 }
1349 
1350 int
getNdbIndexScanOperation(const ITab & itab,const Tab & tab)1351 Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
1352 {
1353   assert(m_tx != 0);
1354   uint tries = 0;
1355   while (1) {
1356     if (getNdbIndexScanOperation1(itab, tab) == 0)
1357       break;
1358     CHK(++tries < 10);
1359     NdbSleep_MilliSleep(100);
1360   }
1361   return 0;
1362 }
1363 
1364 int
getNdbScanFilter()1365 Con::getNdbScanFilter()
1366 {
1367   assert(m_tx != 0 && m_scanop != 0);
1368   delete m_scanfilter;
1369   m_scanfilter = new NdbScanFilter(m_scanop);
1370   return 0;
1371 }
1372 
1373 int
equal(int num,const char * addr)1374 Con::equal(int num, const char* addr)
1375 {
1376   assert(m_tx != 0 && m_op != 0);
1377   CHKCON(m_op->equal(num, addr) == 0, *this);
1378   return 0;
1379 }
1380 
1381 int
getValue(int num,NdbRecAttr * & rec)1382 Con::getValue(int num, NdbRecAttr*& rec)
1383 {
1384   assert(m_tx != 0 && m_op != 0);
1385   CHKCON((rec = m_op->getValue(num, 0)) != 0, *this);
1386   return 0;
1387 }
1388 
1389 int
setValue(int num,const char * addr)1390 Con::setValue(int num, const char* addr)
1391 {
1392   assert(m_tx != 0 && m_op != 0);
1393   CHKCON(m_op->setValue(num, addr) == 0, *this);
1394   return 0;
1395 }
1396 
1397 int
setBound(int num,int type,const void * value)1398 Con::setBound(int num, int type, const void* value)
1399 {
1400   assert(m_tx != 0 && m_indexscanop != 0);
1401   CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
1402   return 0;
1403 }
1404 
1405 int
beginFilter(int group)1406 Con::beginFilter(int group)
1407 {
1408   assert(m_tx != 0 && m_scanfilter != 0);
1409   CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
1410   return 0;
1411 }
1412 
1413 int
endFilter()1414 Con::endFilter()
1415 {
1416   assert(m_tx != 0 && m_scanfilter != 0);
1417   CHKCON(m_scanfilter->end() == 0, *this);
1418   return 0;
1419 }
1420 
1421 int
setFilter(int num,int cond,const void * value,uint len)1422 Con::setFilter(int num, int cond, const void* value, uint len)
1423 {
1424   assert(m_tx != 0 && m_scanfilter != 0);
1425   CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
1426   return 0;
1427 }
1428 
1429 int
execute(ExecType et)1430 Con::execute(ExecType et)
1431 {
1432   assert(m_tx != 0);
1433   CHKCON(m_tx->execute(et) == 0, *this);
1434   return 0;
1435 }
1436 
1437 int
execute(ExecType et,uint & err)1438 Con::execute(ExecType et, uint& err)
1439 {
1440   int ret = execute(et);
1441   // err in: errors to catch, out: error caught
1442   const uint errin = err;
1443   err = 0;
1444   if (ret == -1) {
1445     if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1446       LL3("caught deadlock");
1447       err = ErrDeadlock;
1448       ret = 0;
1449     }
1450     if (m_errtype == ErrNospace && (errin & ErrNospace)) {
1451       LL3("caught nospace");
1452       err = ErrNospace;
1453       ret = 0;
1454     }
1455   }
1456   CHK(ret == 0);
1457   return 0;
1458 }
1459 
1460 int
readTuple(Par par)1461 Con::readTuple(Par par)
1462 {
1463   assert(m_tx != 0 && m_op != 0);
1464   NdbOperation::LockMode lm = par.m_lockmode;
1465   CHKCON(m_op->readTuple(lm) == 0, *this);
1466   return 0;
1467 }
1468 
1469 int
readTuples(Par par)1470 Con::readTuples(Par par)
1471 {
1472   assert(m_tx != 0 && m_scanop != 0);
1473   int scan_flags = 0;
1474   if (par.m_tupscan)
1475     scan_flags |= NdbScanOperation::SF_TupScan;
1476   CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1477   return 0;
1478 }
1479 
1480 int
readIndexTuples(Par par)1481 Con::readIndexTuples(Par par)
1482 {
1483   assert(m_tx != 0 && m_indexscanop != 0);
1484   int scan_flags = 0;
1485   if (par.m_ordered)
1486     scan_flags |= NdbScanOperation::SF_OrderBy;
1487   if (par.m_descending)
1488     scan_flags |= NdbScanOperation::SF_Descending;
1489   if (par.m_multiRange)
1490   {
1491     scan_flags |= NdbScanOperation::SF_MultiRange;
1492     scan_flags |= NdbScanOperation::SF_ReadRangeNo;
1493   }
1494   CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1495   return 0;
1496 }
1497 
1498 int
executeScan()1499 Con::executeScan()
1500 {
1501   CHKCON(m_tx->execute(NoCommit) == 0, *this);
1502   return 0;
1503 }
1504 
1505 int
nextScanResult(bool fetchAllowed)1506 Con::nextScanResult(bool fetchAllowed)
1507 {
1508   int ret;
1509   assert(m_scanop != 0);
1510   CHKCON((ret = m_scanop->nextResult(fetchAllowed)) != -1, *this);
1511   assert(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1512   return ret;
1513 }
1514 
1515 int
nextScanResult(bool fetchAllowed,uint & err)1516 Con::nextScanResult(bool fetchAllowed, uint& err)
1517 {
1518   int ret = nextScanResult(fetchAllowed);
1519   // err in: errors to catch, out: error caught
1520   const uint errin = err;
1521   err = 0;
1522   if (ret == -1) {
1523     if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1524       LL3("caught deadlock");
1525       err = ErrDeadlock;
1526       ret = 0;
1527     }
1528   }
1529   CHK(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1530   return ret;
1531 }
1532 
1533 int
updateScanTuple(Con & con2)1534 Con::updateScanTuple(Con& con2)
1535 {
1536   assert(con2.m_tx != 0);
1537   CHKCON((con2.m_op = m_scanop->updateCurrentTuple(con2.m_tx)) != 0, *this);
1538   con2.m_txid = m_txid; // in the kernel
1539   return 0;
1540 }
1541 
1542 int
deleteScanTuple(Con & con2)1543 Con::deleteScanTuple(Con& con2)
1544 {
1545   assert(con2.m_tx != 0);
1546   CHKCON(m_scanop->deleteCurrentTuple(con2.m_tx) == 0, *this);
1547   con2.m_txid = m_txid; // in the kernel
1548   return 0;
1549 }
1550 
1551 void
closeScan()1552 Con::closeScan()
1553 {
1554   assert(m_scanop != 0);
1555   m_scanop->close();
1556   m_scanop = 0, m_indexscanop = 0;
1557 
1558 }
1559 
1560 void
closeTransaction()1561 Con::closeTransaction()
1562 {
1563   assert(m_ndb != 0 && m_tx != 0);
1564   m_ndb->closeTransaction(m_tx);
1565   m_tx = 0, m_txid = 0, m_op = 0;
1566   m_scanop = 0, m_indexscanop = 0;
1567 }
1568 
1569 const char*
errname(uint err)1570 Con::errname(uint err)
1571 {
1572   sprintf(m_errname, "0x%x", err);
1573   if (err & ErrDeadlock)
1574     strcat(m_errname, ",deadlock");
1575   if (err & ErrNospace)
1576     strcat(m_errname, ",nospace");
1577   return m_errname;
1578 }
1579 
1580 void
printerror(NdbOut & out)1581 Con::printerror(NdbOut& out)
1582 {
1583   m_errtype = ErrOther;
1584   uint any = 0;
1585   int code;
1586   int die = 0;
1587   if (m_ndb) {
1588     if ((code = m_ndb->getNdbError().code) != 0) {
1589       LL0(++any << " ndb: error " << m_ndb->getNdbError());
1590       die += (code == g_opt.m_die);
1591     }
1592     if (m_dic && (code = m_dic->getNdbError().code) != 0) {
1593       LL0(++any << " dic: error " << m_dic->getNdbError());
1594       die += (code == g_opt.m_die);
1595     }
1596     if (m_tx) {
1597       if ((code = m_tx->getNdbError().code) != 0) {
1598         LL0(++any << " con: error " << m_tx->getNdbError());
1599         die += (code == g_opt.m_die);
1600         // 631 is new, occurs only on 4 db nodes, needs to be checked out
1601         if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
1602           m_errtype = ErrDeadlock;
1603         if (code == 826 || code == 827 || code == 902)
1604           m_errtype = ErrNospace;
1605       }
1606       if (m_op && m_op->getNdbError().code != 0) {
1607         LL0(++any << " op : error " << m_op->getNdbError());
1608         die += (code == g_opt.m_die);
1609       }
1610     }
1611   }
1612   if (!any) {
1613     LL0("failed but no NDB error code");
1614   }
1615   if (die) {
1616     if (g_opt.m_core)
1617       abort();
1618     exit(1);
1619   }
1620 }
1621 
1622 // dictionary operations
1623 
1624 static int
invalidateindex(Par par,const ITab & itab)1625 invalidateindex(Par par, const ITab& itab)
1626 {
1627   Con& con = par.con();
1628   const Tab& tab = par.tab();
1629   con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
1630   return 0;
1631 }
1632 
1633 static int
invalidateindex(Par par)1634 invalidateindex(Par par)
1635 {
1636   const Tab& tab = par.tab();
1637   for (uint i = 0; i < tab.m_itabs; i++) {
1638     if (tab.m_itab[i] == 0)
1639       continue;
1640     const ITab& itab = *tab.m_itab[i];
1641     invalidateindex(par, itab);
1642   }
1643   return 0;
1644 }
1645 
1646 static int
invalidatetable(Par par)1647 invalidatetable(Par par)
1648 {
1649   Con& con = par.con();
1650   const Tab& tab = par.tab();
1651   invalidateindex(par);
1652   con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
1653   return 0;
1654 }
1655 
1656 static int
droptable(Par par)1657 droptable(Par par)
1658 {
1659   Con& con = par.con();
1660   const Tab& tab = par.tab();
1661   con.m_dic = con.m_ndb->getDictionary();
1662   if (con.m_dic->getTable(tab.m_name) == 0) {
1663     // how to check for error
1664     LL4("no table " << tab.m_name);
1665   } else {
1666     LL3("drop table " << tab.m_name);
1667     CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
1668   }
1669   con.m_dic = 0;
1670   return 0;
1671 }
1672 
1673 static int
createtable(Par par)1674 createtable(Par par)
1675 {
1676   Con& con = par.con();
1677   const Tab& tab = par.tab();
1678   LL3("create table " << tab.m_name);
1679   LL4(tab);
1680   NdbDictionary::Table t(tab.m_name);
1681   if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
1682     t.setFragmentType(par.m_fragtype);
1683   }
1684   if (par.m_nologging) {
1685     t.setLogging(false);
1686   }
1687   for (uint k = 0; k < tab.m_cols; k++) {
1688     const Col& col = *tab.m_col[k];
1689     NdbDictionary::Column c(col.m_name);
1690     c.setType((NdbDictionary::Column::Type)col.m_type);
1691     c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
1692     c.setPrimaryKey(col.m_pk);
1693     c.setNullable(col.m_nullable);
1694     if (col.m_chs != 0)
1695         c.setCharset(col.m_chs->m_cs);
1696     t.addColumn(c);
1697   }
1698   con.m_dic = con.m_ndb->getDictionary();
1699   CHKCON(con.m_dic->createTable(t) == 0, con);
1700   con.m_dic = 0;
1701   return 0;
1702 }
1703 
1704 static int
dropindex(Par par,const ITab & itab)1705 dropindex(Par par, const ITab& itab)
1706 {
1707   Con& con = par.con();
1708   const Tab& tab = par.tab();
1709   con.m_dic = con.m_ndb->getDictionary();
1710   if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
1711     // how to check for error
1712     LL4("no index " << itab.m_name);
1713   } else {
1714     LL3("drop index " << itab.m_name);
1715     CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
1716   }
1717   con.m_dic = 0;
1718   return 0;
1719 }
1720 
1721 static int
dropindex(Par par)1722 dropindex(Par par)
1723 {
1724   const Tab& tab = par.tab();
1725   for (uint i = 0; i < tab.m_itabs; i++) {
1726     if (tab.m_itab[i] == 0)
1727       continue;
1728     const ITab& itab = *tab.m_itab[i];
1729     CHK(dropindex(par, itab) == 0);
1730   }
1731   return 0;
1732 }
1733 
1734 static int
createindex(Par par,const ITab & itab)1735 createindex(Par par, const ITab& itab)
1736 {
1737   Con& con = par.con();
1738   const Tab& tab = par.tab();
1739   LL3("create index " << itab.m_name);
1740   LL4(itab);
1741   NdbDictionary::Index x(itab.m_name);
1742   x.setTable(tab.m_name);
1743   x.setType((NdbDictionary::Index::Type)itab.m_type);
1744   if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
1745     x.setLogging(false);
1746   }
1747   for (uint k = 0; k < itab.m_icols; k++) {
1748     const ICol& icol = *itab.m_icol[k];
1749     const Col& col = icol.m_col;
1750     x.addColumnName(col.m_name);
1751   }
1752   con.m_dic = con.m_ndb->getDictionary();
1753   CHKCON(con.m_dic->createIndex(x) == 0, con);
1754   con.m_dic = 0;
1755   return 0;
1756 }
1757 
1758 static int
createindex(Par par)1759 createindex(Par par)
1760 {
1761   const Tab& tab = par.tab();
1762   for (uint i = 0; i < tab.m_itabs; i++) {
1763     if (tab.m_itab[i] == 0)
1764       continue;
1765     const ITab& itab = *tab.m_itab[i];
1766     CHK(createindex(par, itab) == 0);
1767   }
1768   return 0;
1769 }
1770 
1771 // data sets
1772 
1773 // Val - typed column value
1774 
1775 struct Val {
1776   const Col& m_col;
1777   union {
1778   Uint32 m_uint32;
1779   uchar* m_char;
1780   uchar* m_varchar;
1781   uchar* m_longvarchar;
1782   };
1783   bool m_null;
1784   // construct
1785   Val(const Col& col);
1786   ~Val();
1787   void copy(const Val& val2);
1788   void copy(const void* addr);
1789   const void* dataaddr() const;
1790   void calc(Par par, uint i);
1791   void calckey(Par par, uint i);
1792   void calckeychars(Par par, uint i, uint& n, uchar* buf);
1793   void calcnokey(Par par);
1794   void calcnokeychars(Par par, uint& n, uchar* buf);
1795   // operations
1796   int setval(Par par) const;
1797   int setval(Par par, const ICol& icol) const;
1798   // compare
1799   int cmp(Par par, const Val& val2) const;
1800   int cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const;
1801   int verify(Par par, const Val& val2) const;
1802 private:
1803   Val& operator=(const Val& val2);
1804 };
1805 
1806 static NdbOut&
1807 operator<<(NdbOut& out, const Val& val);
1808 
1809 // construct
1810 
Val(const Col & col)1811 Val::Val(const Col& col) :
1812   m_col(col)
1813 {
1814   switch (col.m_type) {
1815   case Col::Unsigned:
1816     m_uint32 = 0x7e7e7e7e;
1817     break;
1818   case Col::Char:
1819     m_char = new uchar [col.m_bytelength];
1820     memset(m_char, 0x7e, col.m_bytelength);
1821     break;
1822   case Col::Varchar:
1823     m_varchar = new uchar [1 + col.m_bytelength];
1824     memset(m_char, 0x7e, 1 + col.m_bytelength);
1825     break;
1826   case Col::Longvarchar:
1827     m_longvarchar = new uchar [2 + col.m_bytelength];
1828     memset(m_char, 0x7e, 2 + col.m_bytelength);
1829     break;
1830   default:
1831     assert(false);
1832     break;
1833   }
1834 }
1835 
~Val()1836 Val::~Val()
1837 {
1838   const Col& col = m_col;
1839   switch (col.m_type) {
1840   case Col::Unsigned:
1841     break;
1842   case Col::Char:
1843     delete [] m_char;
1844     break;
1845   case Col::Varchar:
1846     delete [] m_varchar;
1847     break;
1848   case Col::Longvarchar:
1849     delete [] m_longvarchar;
1850     break;
1851   default:
1852     assert(false);
1853     break;
1854   }
1855 }
1856 
1857 void
copy(const Val & val2)1858 Val::copy(const Val& val2)
1859 {
1860   const Col& col = m_col;
1861   const Col& col2 = val2.m_col;
1862   assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
1863   if (val2.m_null) {
1864     m_null = true;
1865     return;
1866   }
1867   copy(val2.dataaddr());
1868 }
1869 
1870 void
copy(const void * addr)1871 Val::copy(const void* addr)
1872 {
1873   const Col& col = m_col;
1874   switch (col.m_type) {
1875   case Col::Unsigned:
1876     m_uint32 = *(const Uint32*)addr;
1877     break;
1878   case Col::Char:
1879     memcpy(m_char, addr, col.m_bytelength);
1880     break;
1881   case Col::Varchar:
1882     memcpy(m_varchar, addr, 1 + col.m_bytelength);
1883     break;
1884   case Col::Longvarchar:
1885     memcpy(m_longvarchar, addr, 2 + col.m_bytelength);
1886     break;
1887   default:
1888     assert(false);
1889     break;
1890   }
1891   m_null = false;
1892 }
1893 
1894 const void*
dataaddr() const1895 Val::dataaddr() const
1896 {
1897   const Col& col = m_col;
1898   switch (col.m_type) {
1899   case Col::Unsigned:
1900     return &m_uint32;
1901   case Col::Char:
1902     return m_char;
1903   case Col::Varchar:
1904     return m_varchar;
1905   case Col::Longvarchar:
1906     return m_longvarchar;
1907   default:
1908     break;
1909   }
1910   assert(false);
1911   return 0;
1912 }
1913 
1914 void
calc(Par par,uint i)1915 Val::calc(Par par, uint i)
1916 {
1917   const Col& col = m_col;
1918   col.m_pk ? calckey(par, i) : calcnokey(par);
1919   if (!m_null)
1920     col.wellformed(dataaddr());
1921 }
1922 
1923 void
calckey(Par par,uint i)1924 Val::calckey(Par par, uint i)
1925 {
1926   const Col& col = m_col;
1927   m_null = false;
1928   switch (col.m_type) {
1929   case Col::Unsigned:
1930     m_uint32 = i;
1931     break;
1932   case Col::Char:
1933     {
1934       const Chs* chs = col.m_chs;
1935       CHARSET_INFO* cs = chs->m_cs;
1936       uint n = 0;
1937       calckeychars(par, i, n, m_char);
1938       // extend by appropriate space
1939       (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
1940     }
1941     break;
1942   case Col::Varchar:
1943     {
1944       uint n = 0;
1945       calckeychars(par, i, n, m_varchar + 1);
1946       // set length and pad with nulls
1947       m_varchar[0] = n;
1948       memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
1949     }
1950     break;
1951   case Col::Longvarchar:
1952     {
1953       uint n = 0;
1954       calckeychars(par, i, n, m_longvarchar + 2);
1955       // set length and pad with nulls
1956       m_longvarchar[0] = (n & 0xff);
1957       m_longvarchar[1] = (n >> 8);
1958       memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
1959     }
1960     break;
1961   default:
1962     assert(false);
1963     break;
1964   }
1965 }
1966 
1967 void
calckeychars(Par par,uint i,uint & n,uchar * buf)1968 Val::calckeychars(Par par, uint i, uint& n, uchar* buf)
1969 {
1970   const Col& col = m_col;
1971   const Chs* chs = col.m_chs;
1972   n = 0;
1973   uint len = 0;
1974   while (len < col.m_length) {
1975     if (i % (1 + n) == 0) {
1976       break;
1977     }
1978     const Chr& chr = chs->m_chr[i % maxcharcount];
1979     assert(n + chr.m_size <= col.m_bytelength);
1980     memcpy(buf + n, chr.m_bytes, chr.m_size);
1981     n += chr.m_size;
1982     len++;
1983   }
1984 }
1985 
1986 void
calcnokey(Par par)1987 Val::calcnokey(Par par)
1988 {
1989   const Col& col = m_col;
1990   m_null = false;
1991   if (col.m_nullable && urandom(100) < par.m_pctnull) {
1992     m_null = true;
1993     return;
1994   }
1995   int r = irandom((par.m_pctrange * par.m_range) / 100);
1996   if (par.m_bdir != 0 && urandom(10) != 0) {
1997     if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
1998       r = -r;
1999   }
2000   uint v = par.m_range + r;
2001   switch (col.m_type) {
2002   case Col::Unsigned:
2003     m_uint32 = v;
2004     break;
2005   case Col::Char:
2006     {
2007       const Chs* chs = col.m_chs;
2008       CHARSET_INFO* cs = chs->m_cs;
2009       uint n = 0;
2010       calcnokeychars(par, n, m_char);
2011       // extend by appropriate space
2012       (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
2013     }
2014     break;
2015   case Col::Varchar:
2016     {
2017       uint n = 0;
2018       calcnokeychars(par, n, m_varchar + 1);
2019       // set length and pad with nulls
2020       m_varchar[0] = n;
2021       memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
2022     }
2023     break;
2024   case Col::Longvarchar:
2025     {
2026       uint n = 0;
2027       calcnokeychars(par, n, m_longvarchar + 2);
2028       // set length and pad with nulls
2029       m_longvarchar[0] = (n & 0xff);
2030       m_longvarchar[1] = (n >> 8);
2031       memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
2032     }
2033     break;
2034   default:
2035     assert(false);
2036     break;
2037   }
2038 }
2039 
2040 void
calcnokeychars(Par par,uint & n,uchar * buf)2041 Val::calcnokeychars(Par par, uint& n, uchar* buf)
2042 {
2043   const Col& col = m_col;
2044   const Chs* chs = col.m_chs;
2045   n = 0;
2046   uint len = 0;
2047   while (len < col.m_length) {
2048     if (urandom(1 + col.m_bytelength) == 0) {
2049       break;
2050     }
2051     uint half = maxcharcount / 2;
2052     int r = irandom((par.m_pctrange * half) / 100);
2053     if (par.m_bdir != 0 && urandom(10) != 0) {
2054       if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
2055         r = -r;
2056     }
2057     uint i = half + r;
2058     assert(i < maxcharcount);
2059     const Chr& chr = chs->m_chr[i];
2060     assert(n + chr.m_size <= col.m_bytelength);
2061     memcpy(buf + n, chr.m_bytes, chr.m_size);
2062     n += chr.m_size;
2063     len++;
2064   }
2065 }
2066 
2067 // operations
2068 
2069 int
setval(Par par) const2070 Val::setval(Par par) const
2071 {
2072   Con& con = par.con();
2073   const Col& col = m_col;
2074   if (col.m_pk) {
2075     assert(!m_null);
2076     const char* addr = (const char*)dataaddr();
2077     LL5("setval pk [" << col << "] " << *this);
2078     CHK(con.equal(col.m_num, addr) == 0);
2079   } else {
2080     const char* addr = !m_null ? (const char*)dataaddr() : 0;
2081     LL5("setval non-pk [" << col << "] " << *this);
2082     CHK(con.setValue(col.m_num, addr) == 0);
2083   }
2084   return 0;
2085 }
2086 
2087 int
setval(Par par,const ICol & icol) const2088 Val::setval(Par par, const ICol& icol) const
2089 {
2090   Con& con = par.con();
2091   assert(!m_null);
2092   const char* addr = (const char*)dataaddr();
2093   LL5("setval key [" << icol << "] " << *this);
2094   CHK(con.equal(icol.m_num, addr) == 0);
2095   return 0;
2096 }
2097 
2098 // compare
2099 
2100 int
cmp(Par par,const Val & val2) const2101 Val::cmp(Par par, const Val& val2) const
2102 {
2103   const Col& col = m_col;
2104   const Col& col2 = val2.m_col;
2105   assert(col.equal(col2));
2106   if (m_null || val2.m_null) {
2107     if (!m_null)
2108       return +1;
2109     if (!val2.m_null)
2110       return -1;
2111     return 0;
2112   }
2113   // verify data formats
2114   col.wellformed(dataaddr());
2115   col.wellformed(val2.dataaddr());
2116   // compare
2117   switch (col.m_type) {
2118   case Col::Unsigned:
2119     {
2120       if (m_uint32 < val2.m_uint32)
2121         return -1;
2122       if (m_uint32 > val2.m_uint32)
2123         return +1;
2124       return 0;
2125     }
2126     break;
2127   case Col::Char:
2128     {
2129       uint len = col.m_bytelength;
2130       return cmpchars(par, m_char, len, val2.m_char, len);
2131     }
2132     break;
2133   case Col::Varchar:
2134     {
2135       uint len1 = m_varchar[0];
2136       uint len2 = val2.m_varchar[0];
2137       return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
2138     }
2139     break;
2140   case Col::Longvarchar:
2141     {
2142       uint len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
2143       uint len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
2144       return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
2145     }
2146     break;
2147   default:
2148     break;
2149   }
2150   assert(false);
2151   return 0;
2152 }
2153 
2154 int
cmpchars(Par par,const uchar * buf1,uint len1,const uchar * buf2,uint len2) const2155 Val::cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const
2156 {
2157   const Col& col = m_col;
2158   const Chs* chs = col.m_chs;
2159   CHARSET_INFO* cs = chs->m_cs;
2160   int k;
2161   if (!par.m_collsp) {
2162     uchar x1[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2163     uchar x2[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2164     // make strxfrm pad both to same length
2165     uint len = maxxmulsize * col.m_bytelength;
2166     int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
2167     int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
2168     assert(n1 != -1 && n1 == n2);
2169     k = memcmp(x1, x2, n1);
2170   } else {
2171     k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false);
2172   }
2173   return k < 0 ? -1 : k > 0 ? +1 : 0;
2174 }
2175 
2176 int
verify(Par par,const Val & val2) const2177 Val::verify(Par par, const Val& val2) const
2178 {
2179   CHK(cmp(par, val2) == 0);
2180   return 0;
2181 }
2182 
2183 // print
2184 
2185 static void
printstring(NdbOut & out,const uchar * str,uint len,bool showlen)2186 printstring(NdbOut& out, const uchar* str, uint len, bool showlen)
2187 {
2188   char buf[4 * NDB_MAX_TUPLE_SIZE];
2189   char *p = buf;
2190   *p++ = '[';
2191   if (showlen) {
2192     sprintf(p, "%u:", len);
2193     p += strlen(p);
2194   }
2195   for (uint i = 0; i < len; i++) {
2196     uchar c = str[i];
2197     if (c == '\\') {
2198       *p++ = '\\';
2199       *p++ = c;
2200     } else if (0x20 <= c && c <= 0x7e) {
2201       *p++ = c;
2202     } else {
2203       *p++ = '\\';
2204       *p++ = hexstr[c >> 4];
2205       *p++ = hexstr[c & 15];
2206     }
2207   }
2208   *p++ = ']';
2209   *p = 0;
2210   out << buf;
2211 }
2212 
2213 static NdbOut&
operator <<(NdbOut & out,const Val & val)2214 operator<<(NdbOut& out, const Val& val)
2215 {
2216   const Col& col = val.m_col;
2217   if (val.m_null) {
2218     out << "NULL";
2219     return out;
2220   }
2221   switch (col.m_type) {
2222   case Col::Unsigned:
2223     out << val.m_uint32;
2224     break;
2225   case Col::Char:
2226     {
2227       uint len = col.m_bytelength;
2228       printstring(out, val.m_char, len, false);
2229     }
2230     break;
2231   case Col::Varchar:
2232     {
2233       uint len = val.m_varchar[0];
2234       printstring(out, val.m_varchar + 1, len, true);
2235     }
2236     break;
2237   case Col::Longvarchar:
2238     {
2239       uint len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
2240       printstring(out, val.m_longvarchar + 2, len, true);
2241     }
2242     break;
2243   default:
2244     out << "type" << col.m_type;
2245     assert(false);
2246     break;
2247   }
2248   return out;
2249 }
2250 
2251 // Row - table tuple
2252 
2253 struct Row {
2254   const Tab& m_tab;
2255   Val** m_val;
2256   enum St {
2257     StUndef = 0,
2258     StDefine = 1,
2259     StPrepare = 2,
2260     StCommit = 3
2261   };
2262   enum Op {
2263     OpNone = 0,
2264     OpIns = 2,
2265     OpUpd = 4,
2266     OpDel = 8,
2267     OpRead = 16,
2268     OpReadEx = 32,
2269     OpReadCom = 64,
2270     OpDML = 2 | 4 | 8,
2271     OpREAD = 16 | 32 | 64
2272   };
2273   St m_st;
2274   Op m_op;
2275   Uint64 m_txid;
2276   Row* m_bi;
2277   // construct
2278   Row(const Tab& tab);
2279   ~Row();
2280   void copy(const Row& row2, bool copy_bi);
2281   void copyval(const Row& row2, uint colmask = ~0);
2282   void calc(Par par, uint i, uint colmask = ~0);
2283   // operations
2284   int setval(Par par, uint colmask = ~0);
2285   int setval(Par par, const ITab& itab);
2286   int insrow(Par par);
2287   int updrow(Par par);
2288   int updrow(Par par, const ITab& itab);
2289   int delrow(Par par);
2290   int delrow(Par par, const ITab& itab);
2291   int selrow(Par par);
2292   int selrow(Par par, const ITab& itab);
2293   int setrow(Par par);
2294   // compare
2295   int cmp(Par par, const Row& row2) const;
2296   int cmp(Par par, const Row& row2, const ITab& itab) const;
2297   int verify(Par par, const Row& row2, bool pkonly) const;
2298 private:
2299   Row& operator=(const Row& row2);
2300 };
2301 
2302 static NdbOut&
2303 operator<<(NdbOut& out, const Row* rowp);
2304 
2305 static NdbOut&
2306 operator<<(NdbOut& out, const Row& row);
2307 
2308 // construct
2309 
Row(const Tab & tab)2310 Row::Row(const Tab& tab) :
2311   m_tab(tab)
2312 {
2313   m_val = new Val* [tab.m_cols];
2314   for (uint k = 0; k < tab.m_cols; k++) {
2315     const Col& col = *tab.m_col[k];
2316     m_val[k] = new Val(col);
2317   }
2318   m_st = StUndef;
2319   m_op = OpNone;
2320   m_txid = 0;
2321   m_bi = 0;
2322 }
2323 
~Row()2324 Row::~Row()
2325 {
2326   const Tab& tab = m_tab;
2327   for (uint k = 0; k < tab.m_cols; k++) {
2328     delete m_val[k];
2329   }
2330   delete [] m_val;
2331   delete m_bi;
2332 }
2333 
2334 void
copy(const Row & row2,bool copy_bi)2335 Row::copy(const Row& row2, bool copy_bi)
2336 {
2337   const Tab& tab = m_tab;
2338   copyval(row2);
2339   m_st = row2.m_st;
2340   m_op = row2.m_op;
2341   m_txid = row2.m_txid;
2342   assert(m_bi == 0);
2343   if (copy_bi && row2.m_bi != 0) {
2344     m_bi = new Row(tab);
2345     m_bi->copy(*row2.m_bi, copy_bi);
2346   }
2347 }
2348 
2349 void
copyval(const Row & row2,uint colmask)2350 Row::copyval(const Row& row2, uint colmask)
2351 {
2352   const Tab& tab = m_tab;
2353   assert(&tab == &row2.m_tab);
2354   for (uint k = 0; k < tab.m_cols; k++) {
2355     Val& val = *m_val[k];
2356     const Val& val2 = *row2.m_val[k];
2357     if ((1 << k) & colmask)
2358       val.copy(val2);
2359   }
2360 }
2361 
2362 void
calc(Par par,uint i,uint colmask)2363 Row::calc(Par par, uint i, uint colmask)
2364 {
2365   const Tab& tab = m_tab;
2366   for (uint k = 0; k < tab.m_cols; k++) {
2367     if ((1 << k) & colmask) {
2368       Val& val = *m_val[k];
2369       val.calc(par, i);
2370     }
2371   }
2372 }
2373 
2374 // operations
2375 
2376 int
setval(Par par,uint colmask)2377 Row::setval(Par par, uint colmask)
2378 {
2379   const Tab& tab = m_tab;
2380   Rsq rsq(tab.m_cols);
2381   for (uint k = 0; k < tab.m_cols; k++) {
2382     uint k2 = rsq.next();
2383     if ((1 << k2) & colmask) {
2384       const Val& val = *m_val[k2];
2385       CHK(val.setval(par) == 0);
2386     }
2387   }
2388   return 0;
2389 }
2390 
2391 int
setval(Par par,const ITab & itab)2392 Row::setval(Par par, const ITab& itab)
2393 {
2394   Rsq rsq(itab.m_icols);
2395   for (uint k = 0; k < itab.m_icols; k++) {
2396     uint k2 = rsq.next();
2397     const ICol& icol = *itab.m_icol[k2];
2398     const Col& col = icol.m_col;
2399     uint m = col.m_num;
2400     const Val& val = *m_val[m];
2401     CHK(val.setval(par, icol) == 0);
2402   }
2403   return 0;
2404 }
2405 
2406 int
insrow(Par par)2407 Row::insrow(Par par)
2408 {
2409   Con& con = par.con();
2410   const Tab& tab = m_tab;
2411   CHK(con.getNdbOperation(tab) == 0);
2412   CHKCON(con.m_op->insertTuple() == 0, con);
2413   CHK(setval(par, tab.m_pkmask) == 0);
2414   CHK(setval(par, ~tab.m_pkmask) == 0);
2415   assert(m_st == StUndef);
2416   m_st = StDefine;
2417   m_op = OpIns;
2418   m_txid = con.m_txid;
2419   return 0;
2420 }
2421 
2422 int
updrow(Par par)2423 Row::updrow(Par par)
2424 {
2425   Con& con = par.con();
2426   const Tab& tab = m_tab;
2427   CHK(con.getNdbOperation(tab) == 0);
2428   CHKCON(con.m_op->updateTuple() == 0, con);
2429   CHK(setval(par, tab.m_pkmask) == 0);
2430   CHK(setval(par, ~tab.m_pkmask) == 0);
2431   assert(m_st == StUndef);
2432   m_st = StDefine;
2433   m_op = OpUpd;
2434   m_txid = con.m_txid;
2435   return 0;
2436 }
2437 
2438 int
updrow(Par par,const ITab & itab)2439 Row::updrow(Par par, const ITab& itab)
2440 {
2441   Con& con = par.con();
2442   const Tab& tab = m_tab;
2443   assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2444   CHK(con.getNdbIndexOperation(itab, tab) == 0);
2445   CHKCON(con.m_op->updateTuple() == 0, con);
2446   CHK(setval(par, itab) == 0);
2447   CHK(setval(par, ~tab.m_pkmask) == 0);
2448   assert(m_st == StUndef);
2449   m_st = StDefine;
2450   m_op = OpUpd;
2451   m_txid = con.m_txid;
2452   return 0;
2453 }
2454 
2455 int
delrow(Par par)2456 Row::delrow(Par par)
2457 {
2458   Con& con = par.con();
2459   const Tab& tab = m_tab;
2460   CHK(con.getNdbOperation(m_tab) == 0);
2461   CHKCON(con.m_op->deleteTuple() == 0, con);
2462   CHK(setval(par, tab.m_pkmask) == 0);
2463   assert(m_st == StUndef);
2464   m_st = StDefine;
2465   m_op = OpDel;
2466   m_txid = con.m_txid;
2467   return 0;
2468 }
2469 
2470 int
delrow(Par par,const ITab & itab)2471 Row::delrow(Par par, const ITab& itab)
2472 {
2473   Con& con = par.con();
2474   const Tab& tab = m_tab;
2475   assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2476   CHK(con.getNdbIndexOperation(itab, tab) == 0);
2477   CHKCON(con.m_op->deleteTuple() == 0, con);
2478   CHK(setval(par, itab) == 0);
2479   assert(m_st == StUndef);
2480   m_st = StDefine;
2481   m_op = OpDel;
2482   m_txid = con.m_txid;
2483   return 0;
2484 }
2485 
2486 int
selrow(Par par)2487 Row::selrow(Par par)
2488 {
2489   Con& con = par.con();
2490   const Tab& tab = m_tab;
2491   CHK(con.getNdbOperation(m_tab) == 0);
2492   CHKCON(con.readTuple(par) == 0, con);
2493   CHK(setval(par, tab.m_pkmask) == 0);
2494   // TODO state
2495   return 0;
2496 }
2497 
2498 int
selrow(Par par,const ITab & itab)2499 Row::selrow(Par par, const ITab& itab)
2500 {
2501   Con& con = par.con();
2502   const Tab& tab = m_tab;
2503   assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2504   CHK(con.getNdbIndexOperation(itab, tab) == 0);
2505   CHKCON(con.readTuple(par) == 0, con);
2506   CHK(setval(par, itab) == 0);
2507   // TODO state
2508   return 0;
2509 }
2510 
2511 int
setrow(Par par)2512 Row::setrow(Par par)
2513 {
2514   Con& con = par.con();
2515   const Tab& tab = m_tab;
2516   CHK(setval(par, ~tab.m_pkmask) == 0);
2517   assert(m_st == StUndef);
2518   m_st = StDefine;
2519   m_op = OpUpd;
2520   m_txid = con.m_txid;
2521   return 0;
2522 }
2523 
2524 // compare
2525 
2526 int
cmp(Par par,const Row & row2) const2527 Row::cmp(Par par, const Row& row2) const
2528 {
2529   const Tab& tab = m_tab;
2530   assert(&tab == &row2.m_tab);
2531   int c = 0;
2532   for (uint k = 0; k < tab.m_cols; k++) {
2533     const Val& val = *m_val[k];
2534     const Val& val2 = *row2.m_val[k];
2535     if ((c = val.cmp(par, val2)) != 0)
2536       break;
2537   }
2538   return c;
2539 }
2540 
2541 int
cmp(Par par,const Row & row2,const ITab & itab) const2542 Row::cmp(Par par, const Row& row2, const ITab& itab) const
2543 {
2544   const Tab& tab = m_tab;
2545   int c = 0;
2546   for (uint i = 0; i < itab.m_icols; i++) {
2547     const ICol& icol = *itab.m_icol[i];
2548     const Col& col = icol.m_col;
2549     uint k = col.m_num;
2550     assert(k < tab.m_cols);
2551     const Val& val = *m_val[k];
2552     const Val& val2 = *row2.m_val[k];
2553     if ((c = val.cmp(par, val2)) != 0)
2554       break;
2555   }
2556   return c;
2557 }
2558 
2559 int
verify(Par par,const Row & row2,bool pkonly) const2560 Row::verify(Par par, const Row& row2, bool pkonly) const
2561 {
2562   const Tab& tab = m_tab;
2563   const Row& row1 = *this;
2564   assert(&row1.m_tab == &row2.m_tab);
2565   for (uint k = 0; k < tab.m_cols; k++) {
2566     const Col& col = row1.m_val[k]->m_col;
2567     if (!pkonly || col.m_pk) {
2568       const Val& val1 = *row1.m_val[k];
2569       const Val& val2 = *row2.m_val[k];
2570       CHK(val1.verify(par, val2) == 0);
2571     }
2572   }
2573   return 0;
2574 }
2575 
2576 // print
2577 
2578 static NdbOut&
operator <<(NdbOut & out,const Row::St st)2579 operator<<(NdbOut& out, const Row::St st)
2580 {
2581   if (st == Row::StUndef)
2582     out << "StUndef";
2583   else if (st == Row::StDefine)
2584     out << "StDefine";
2585   else if (st == Row::StPrepare)
2586     out << "StPrepare";
2587   else if (st == Row::StCommit)
2588     out << "StCommit";
2589   else
2590     out << "st=" << st;
2591   return out;
2592 }
2593 
2594 static NdbOut&
operator <<(NdbOut & out,const Row::Op op)2595 operator<<(NdbOut& out, const Row::Op op)
2596 {
2597   if (op == Row::OpNone)
2598     out << "OpNone";
2599   else if (op == Row::OpIns)
2600     out << "OpIns";
2601   else if (op == Row::OpUpd)
2602     out << "OpUpd";
2603   else if (op == Row::OpDel)
2604     out << "OpDel";
2605   else if (op == Row::OpRead)
2606     out << "OpRead";
2607   else if (op == Row::OpReadEx)
2608     out << "OpReadEx";
2609   else if (op == Row::OpReadCom)
2610     out << "OpReadCom";
2611   else
2612     out << "op=" << op;
2613   return out;
2614 }
2615 
2616 static NdbOut&
operator <<(NdbOut & out,const Row * rowp)2617 operator<<(NdbOut& out, const Row* rowp)
2618 {
2619   if (rowp == 0)
2620     out << "[null]";
2621   else
2622     out << *rowp;
2623   return out;
2624 }
2625 
2626 static NdbOut&
operator <<(NdbOut & out,const Row & row)2627 operator<<(NdbOut& out, const Row& row)
2628 {
2629   const Tab& tab = row.m_tab;
2630   out << "[";
2631   for (uint i = 0; i < tab.m_cols; i++) {
2632     if (i > 0)
2633       out << " ";
2634     out << *row.m_val[i];
2635   }
2636   out << " " << row.m_st;
2637   out << " " << row.m_op;
2638   out << " " << HEX(row.m_txid);
2639   if (row.m_bi != 0)
2640     out << " " << row.m_bi;
2641   out << "]";
2642   return out;
2643 }
2644 
2645 // Set - set of table tuples
2646 
2647 struct Set {
2648   const Tab& m_tab;
2649   uint m_rows;
2650   Row** m_row;
2651   uint* m_rowkey; // maps row number (from 0) in scan to tuple key
2652   Row* m_keyrow;
2653   NdbRecAttr** m_rec;
2654   // construct
2655   Set(const Tab& tab, uint rows);
2656   ~Set();
2657   void reset();
2658   bool compat(Par par, uint i, const Row::Op op) const;
2659   void push(uint i);
2660   void copyval(uint i, uint colmask = ~0); // from bi
2661   void calc(Par par, uint i, uint colmask = ~0);
2662   uint count() const;
2663   const Row* getrow(uint i, bool dirty = false) const;
2664   int setrow(uint i, const Row* src, bool force=false);
2665   // transaction
2666   void post(Par par, ExecType et);
2667   // operations
2668   int insrow(Par par, uint i);
2669   int updrow(Par par, uint i);
2670   int updrow(Par par, const ITab& itab, uint i);
2671   int delrow(Par par, uint i);
2672   int delrow(Par par, const ITab& itab, uint i);
2673   int selrow(Par par, const Row& keyrow);
2674   int selrow(Par par, const ITab& itab, const Row& keyrow);
2675   int setrow(Par par, uint i);
2676   int getval(Par par);
2677   int getkey(Par par, uint* i);
2678   int putval(uint i, bool force, uint n = ~0);
2679   // compare
2680   void sort(Par par, const ITab& itab);
2681   int verify(Par par, const Set& set2, bool pkonly, bool dirty = false) const;
2682   int verifyorder(Par par, const ITab& itab, bool descending) const;
2683   // protect structure
2684   NdbMutex* m_mutex;
lockSet2685   void lock() const {
2686     NdbMutex_Lock(m_mutex);
2687   }
unlockSet2688   void unlock() const {
2689     NdbMutex_Unlock(m_mutex);
2690   }
2691 private:
2692   void sort(Par par, const ITab& itab, uint lo, uint hi);
2693   Set& operator=(const Set& set2);
2694 };
2695 
2696 // construct
2697 
Set(const Tab & tab,uint rows)2698 Set::Set(const Tab& tab, uint rows) :
2699   m_tab(tab)
2700 {
2701   m_rows = rows;
2702   m_row = new Row* [m_rows];
2703   for (uint i = 0; i < m_rows; i++) {
2704     m_row[i] = 0;
2705   }
2706   m_rowkey = new uint [m_rows];
2707   for (uint n = 0; n < m_rows; n++) {
2708     m_rowkey[n] = ~0;
2709   }
2710   m_keyrow = new Row(tab);
2711   m_rec = new NdbRecAttr* [tab.m_cols];
2712   for (uint k = 0; k < tab.m_cols; k++) {
2713     m_rec[k] = 0;
2714   }
2715   m_mutex = NdbMutex_Create();
2716   assert(m_mutex != 0);
2717 }
2718 
~Set()2719 Set::~Set()
2720 {
2721   for (uint i = 0; i < m_rows; i++) {
2722     delete m_row[i];
2723   }
2724   delete [] m_row;
2725   delete [] m_rowkey;
2726   delete m_keyrow;
2727   delete [] m_rec;
2728   NdbMutex_Destroy(m_mutex);
2729 }
2730 
2731 void
reset()2732 Set::reset()
2733 {
2734   for (uint i = 0; i < m_rows; i++) {
2735     m_row[i] = 0;
2736   }
2737 }
2738 
2739 // this sucks
2740 bool
compat(Par par,uint i,const Row::Op op) const2741 Set::compat(Par par, uint i, const Row::Op op) const
2742 {
2743   Con& con = par.con();
2744   int ret = -1;
2745   int place = 0;
2746   do {
2747     const Row* rowp = getrow(i);
2748     if (rowp == 0) {
2749       ret = op == Row::OpIns;
2750       place = 1;
2751       break;
2752     }
2753     const Row& row = *rowp;
2754     if (!(op & Row::OpREAD)) {
2755       if (row.m_st == Row::StDefine || row.m_st == Row::StPrepare) {
2756         assert(row.m_op & Row::OpDML);
2757         assert(row.m_txid != 0);
2758         if (con.m_txid != row.m_txid) {
2759           ret = false;
2760           place = 2;
2761           break;
2762         }
2763         if (row.m_op != Row::OpDel) {
2764           ret = op == Row::OpUpd || op == Row::OpDel;
2765           place = 3;
2766           break;
2767         }
2768         ret = op == Row::OpIns;
2769         place = 4;
2770         break;
2771       }
2772       if (row.m_st == Row::StCommit) {
2773         assert(row.m_op == Row::OpNone);
2774         assert(row.m_txid == 0);
2775         ret = op == Row::OpUpd || op == Row::OpDel;
2776         place = 5;
2777         break;
2778       }
2779     }
2780     if (op & Row::OpREAD) {
2781       bool dirty =
2782         con.m_txid != row.m_txid &&
2783         par.m_lockmode == NdbOperation::LM_CommittedRead;
2784       const Row* rowp2 = getrow(i, dirty);
2785       if (rowp2 == 0 || rowp2->m_op == Row::OpDel) {
2786         ret = false;
2787         place = 6;
2788         break;
2789       }
2790       ret = true;
2791       place = 7;
2792       break;
2793     }
2794   } while (0);
2795   LL4("compat ret=" << ret << " place=" << place);
2796   assert(ret == false || ret == true);
2797   return ret;
2798 }
2799 
2800 void
push(uint i)2801 Set::push(uint i)
2802 {
2803   const Tab& tab = m_tab;
2804   assert(i < m_rows);
2805   Row* bi = m_row[i];
2806   m_row[i] = new Row(tab);
2807   Row& row = *m_row[i];
2808   row.m_bi = bi;
2809   if (bi != 0)
2810     row.copyval(*bi);
2811 }
2812 
2813 void
copyval(uint i,uint colmask)2814 Set::copyval(uint i, uint colmask)
2815 {
2816   assert(m_row[i] != 0);
2817   Row& row = *m_row[i];
2818   assert(row.m_bi != 0);
2819   row.copyval(*row.m_bi, colmask);
2820 }
2821 
2822 void
calc(Par par,uint i,uint colmask)2823 Set::calc(Par par, uint i, uint colmask)
2824 {
2825   assert(m_row[i] != 0);
2826   Row& row = *m_row[i];
2827   row.calc(par, i, colmask);
2828 }
2829 
2830 uint
count() const2831 Set::count() const
2832 {
2833   uint count = 0;
2834   for (uint i = 0; i < m_rows; i++) {
2835     if (m_row[i] != 0)
2836       count++;
2837   }
2838   return count;
2839 }
2840 
2841 const Row*
getrow(uint i,bool dirty) const2842 Set::getrow(uint i, bool dirty) const
2843 {
2844   assert(i < m_rows);
2845   const Row* rowp = m_row[i];
2846   if (dirty) {
2847     while (rowp != 0) {
2848       bool b1 = rowp->m_op == Row::OpNone;
2849       bool b2 = rowp->m_st == Row::StCommit;
2850       assert(b1 == b2);
2851       if (b1) {
2852         assert(rowp->m_bi == 0);
2853         break;
2854       }
2855       rowp = rowp->m_bi;
2856     }
2857   }
2858   return rowp;
2859 }
2860 
2861 int
setrow(uint i,const Row * src,bool force)2862 Set::setrow(uint i, const Row* src, bool force)
2863 {
2864   assert(i < m_rows);
2865   if (m_row[i] != 0)
2866     if (!force)
2867       return -1;
2868 
2869   Row* newRow= new Row(src->m_tab);
2870   newRow->copy(*src, true);
2871   return 0;
2872 }
2873 
2874 
2875 // transaction
2876 
2877 void
post(Par par,ExecType et)2878 Set::post(Par par, ExecType et)
2879 {
2880   LL4("post");
2881   Con& con = par.con();
2882   assert(con.m_txid != 0);
2883   uint i;
2884   for (i = 0; i < m_rows; i++) {
2885     Row* rowp = m_row[i];
2886     if (rowp == 0) {
2887       LL5("skip " << i << " " << rowp);
2888       continue;
2889     }
2890     if (rowp->m_st == Row::StCommit) {
2891       assert(rowp->m_op == Row::OpNone);
2892       assert(rowp->m_txid == 0);
2893       assert(rowp->m_bi == 0);
2894       LL5("skip committed " << i << " " << rowp);
2895       continue;
2896     }
2897     assert(rowp->m_st == Row::StDefine || rowp->m_st == Row::StPrepare);
2898     assert(rowp->m_txid != 0);
2899     if (con.m_txid != rowp->m_txid) {
2900       LL5("skip txid " << i << " " << HEX(con.m_txid) << " " << rowp);
2901       continue;
2902     }
2903     // TODO read ops
2904     assert(rowp->m_op & Row::OpDML);
2905     LL4("post BEFORE " << rowp);
2906     if (et == NoCommit) {
2907       if (rowp->m_st == Row::StDefine) {
2908         rowp->m_st = Row::StPrepare;
2909         Row* bi = rowp->m_bi;
2910         while (bi != 0 && bi->m_st == Row::StDefine) {
2911           bi->m_st = Row::StPrepare;
2912           bi = bi->m_bi;
2913         }
2914       }
2915     } else if (et == Commit) {
2916       if (rowp->m_op != Row::OpDel) {
2917         rowp->m_st = Row::StCommit;
2918         rowp->m_op = Row::OpNone;
2919         rowp->m_txid = 0;
2920         delete rowp->m_bi;
2921         rowp->m_bi = 0;
2922       } else {
2923         delete rowp;
2924         rowp = 0;
2925       }
2926     } else if (et == Rollback) {
2927       while (rowp != 0 && rowp->m_st != Row::StCommit) {
2928         Row* tmp = rowp;
2929         rowp = rowp->m_bi;
2930         tmp->m_bi = 0;
2931         delete tmp;
2932       }
2933     } else {
2934       assert(false);
2935     }
2936     m_row[i] = rowp;
2937     LL4("post AFTER " << rowp);
2938   }
2939 }
2940 
2941 // operations
2942 
2943 int
insrow(Par par,uint i)2944 Set::insrow(Par par, uint i)
2945 {
2946   assert(m_row[i] != 0);
2947   Row& row = *m_row[i];
2948   CHK(row.insrow(par) == 0);
2949   return 0;
2950 }
2951 
2952 int
updrow(Par par,uint i)2953 Set::updrow(Par par, uint i)
2954 {
2955   assert(m_row[i] != 0);
2956   Row& row = *m_row[i];
2957   CHK(row.updrow(par) == 0);
2958   return 0;
2959 }
2960 
2961 int
updrow(Par par,const ITab & itab,uint i)2962 Set::updrow(Par par, const ITab& itab, uint i)
2963 {
2964   assert(m_row[i] != 0);
2965   Row& row = *m_row[i];
2966   CHK(row.updrow(par, itab) == 0);
2967   return 0;
2968 }
2969 
2970 int
delrow(Par par,uint i)2971 Set::delrow(Par par, uint i)
2972 {
2973   assert(m_row[i] != 0);
2974   Row& row = *m_row[i];
2975   CHK(row.delrow(par) == 0);
2976   return 0;
2977 }
2978 
2979 int
delrow(Par par,const ITab & itab,uint i)2980 Set::delrow(Par par, const ITab& itab, uint i)
2981 {
2982   assert(m_row[i] != 0);
2983   Row& row = *m_row[i];
2984   CHK(row.delrow(par, itab) == 0);
2985   return 0;
2986 }
2987 
2988 int
selrow(Par par,const Row & keyrow)2989 Set::selrow(Par par, const Row& keyrow)
2990 {
2991   const Tab& tab = par.tab();
2992   LL5("selrow " << tab.m_name << " keyrow " << keyrow);
2993   m_keyrow->copyval(keyrow, tab.m_pkmask);
2994   CHK(m_keyrow->selrow(par) == 0);
2995   CHK(getval(par) == 0);
2996   return 0;
2997 }
2998 
2999 int
selrow(Par par,const ITab & itab,const Row & keyrow)3000 Set::selrow(Par par, const ITab& itab, const Row& keyrow)
3001 {
3002   LL5("selrow " << itab.m_name << " keyrow " << keyrow);
3003   m_keyrow->copyval(keyrow, itab.m_keymask);
3004   CHK(m_keyrow->selrow(par, itab) == 0);
3005   CHK(getval(par) == 0);
3006   return 0;
3007 }
3008 
3009 int
setrow(Par par,uint i)3010 Set::setrow(Par par, uint i)
3011 {
3012   assert(m_row[i] != 0);
3013   CHK(m_row[i]->setrow(par) == 0);
3014   return 0;
3015 }
3016 
3017 int
getval(Par par)3018 Set::getval(Par par)
3019 {
3020   Con& con = par.con();
3021   const Tab& tab = m_tab;
3022   Rsq rsq1(tab.m_cols);
3023   for (uint k = 0; k < tab.m_cols; k++) {
3024     uint k2 = rsq1.next();
3025     CHK(con.getValue(k2, m_rec[k2]) == 0);
3026   }
3027   return 0;
3028 }
3029 
3030 int
getkey(Par par,uint * i)3031 Set::getkey(Par par, uint* i)
3032 {
3033   const Tab& tab = m_tab;
3034   uint k = tab.m_keycol;
3035   assert(m_rec[k] != 0);
3036   const char* aRef = m_rec[k]->aRef();
3037   Uint32 key = *(const Uint32*)aRef;
3038   LL5("getkey: " << key);
3039   CHK(key < m_rows);
3040   *i = key;
3041   return 0;
3042 }
3043 
3044 int
putval(uint i,bool force,uint n)3045 Set::putval(uint i, bool force, uint n)
3046 {
3047   const Tab& tab = m_tab;
3048   LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
3049   CHK( i<m_rows );
3050   if (m_row[i] != 0) {
3051     assert(force);
3052     delete m_row[i];
3053     m_row[i] = 0;
3054   }
3055   m_row[i] = new Row(tab);
3056   Row& row = *m_row[i];
3057   for (uint k = 0; k < tab.m_cols; k++) {
3058     Val& val = *row.m_val[k];
3059     NdbRecAttr* rec = m_rec[k];
3060     assert(rec != 0);
3061     if (rec->isNULL()) {
3062       val.m_null = true;
3063       continue;
3064     }
3065     const char* aRef = m_rec[k]->aRef();
3066     val.copy(aRef);
3067     val.m_null = false;
3068   }
3069   if (n != (uint) ~0)
3070   {
3071     CHK(n < m_rows);
3072     m_rowkey[n] = i;
3073   }
3074   return 0;
3075 }
3076 
3077 // compare
3078 
3079 void
sort(Par par,const ITab & itab)3080 Set::sort(Par par, const ITab& itab)
3081 {
3082   if (m_rows != 0)
3083     sort(par, itab, 0, m_rows - 1);
3084 }
3085 
3086 void
sort(Par par,const ITab & itab,uint lo,uint hi)3087 Set::sort(Par par, const ITab& itab, uint lo, uint hi)
3088 {
3089   assert(lo < m_rows && hi < m_rows && lo <= hi);
3090   Row* const p = m_row[lo];
3091   uint i = lo;
3092   uint j = hi;
3093   while (i < j) {
3094     while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
3095       j--;
3096     if (i < j) {
3097       m_row[i] = m_row[j];
3098       i++;
3099     }
3100     while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
3101       i++;
3102     if (i < j) {
3103       m_row[j] = m_row[i];
3104       j--;
3105     }
3106   }
3107   m_row[i] = p;
3108   if (lo < i)
3109     sort(par, itab, lo, i - 1);
3110   if (hi > i)
3111     sort(par, itab, i + 1, hi);
3112 }
3113 
3114 /*
3115  * set1 (self) is from dml and can contain un-committed operations.
3116  * set2 is from read and contains no operations.  "dirty" applies
3117  * to set1: false = use latest row, true = use committed row.
3118  */
3119 int
verify(Par par,const Set & set2,bool pkonly,bool dirty) const3120 Set::verify(Par par, const Set& set2, bool pkonly, bool dirty) const
3121 {
3122   const Set& set1 = *this;
3123   assert(&set1.m_tab == &set2.m_tab && set1.m_rows == set2.m_rows);
3124   LL3("verify dirty:" << dirty);
3125   for (uint i = 0; i < set1.m_rows; i++) {
3126     // the row versions we actually compare
3127     const Row* row1p = set1.getrow(i, dirty);
3128     const Row* row2p = set2.getrow(i);
3129     bool ok = true;
3130     int place = 0;
3131     if (row1p == 0) {
3132       if (row2p != 0) {
3133         ok = false;
3134         place = 1;
3135       }
3136     } else {
3137       Row::Op op1 = row1p->m_op;
3138       if (op1 != Row::OpDel) {
3139         if (row2p == 0) {
3140           ok = false;
3141           place = 2;
3142         } else if (row1p->verify(par, *row2p, pkonly) == -1) {
3143           ok = false;
3144           place = 3;
3145         }
3146       } else if (row2p != 0) {
3147         ok = false;
3148         place = 4;
3149       }
3150     }
3151     if (!ok) {
3152       LL1("verify " << i << " failed at " << place);
3153       LL1("row1 " << row1p);
3154       LL1("row2 " << row2p);
3155       CHK(false);
3156     }
3157   }
3158   return 0;
3159 }
3160 
3161 int
verifyorder(Par par,const ITab & itab,bool descending) const3162 Set::verifyorder(Par par, const ITab& itab, bool descending) const
3163 {
3164   for (uint n = 0; n < m_rows; n++) {
3165     uint i2 = m_rowkey[n];
3166     if (i2 == (uint) ~0)
3167       break;
3168     if (n == 0)
3169       continue;
3170     uint i1 = m_rowkey[n - 1];
3171     assert(m_row[i1] != 0 && m_row[i2] != 0);
3172     const Row& row1 = *m_row[i1];
3173     const Row& row2 = *m_row[i2];
3174     bool ok;
3175     if (!descending)
3176       ok= (row1.cmp(par, row2, itab) <= 0);
3177     else
3178       ok= (row1.cmp(par, row2, itab) >= 0);
3179 
3180     if (!ok)
3181     {
3182       LL1("verifyorder " << n << " failed");
3183       LL1("row1 " << row1);
3184       LL1("row2 " << row2);
3185       CHK(false);
3186     }
3187   }
3188   return 0;
3189 }
3190 
3191 // print
3192 
3193 #if 0
3194 static NdbOut&
3195 operator<<(NdbOut& out, const Set& set)
3196 {
3197   for (uint i = 0; i < set.m_rows; i++) {
3198     const Row& row = *set.m_row[i];
3199     if (i > 0)
3200       out << endl;
3201     out << row;
3202   }
3203   return out;
3204 }
3205 #endif
3206 
3207 // BVal - range scan bound
3208 
3209 struct BVal : public Val {
3210   const ICol& m_icol;
3211   int m_type;
3212   BVal(const ICol& icol);
3213   int setbnd(Par par) const;
3214   int setflt(Par par) const;
3215 };
3216 
BVal(const ICol & icol)3217 BVal::BVal(const ICol& icol) :
3218   Val(icol.m_col),
3219   m_icol(icol)
3220 {
3221 }
3222 
3223 int
setbnd(Par par) const3224 BVal::setbnd(Par par) const
3225 {
3226   Con& con = par.con();
3227   assert(g_compare_null || !m_null);
3228   const char* addr = !m_null ? (const char*)dataaddr() : 0;
3229   const ICol& icol = m_icol;
3230   CHK(con.setBound(icol.m_num, m_type, addr) == 0);
3231   return 0;
3232 }
3233 
3234 int
setflt(Par par) const3235 BVal::setflt(Par par) const
3236 {
3237   static uint index_bound_to_filter_bound[5] = {
3238     NdbScanFilter::COND_GE,
3239     NdbScanFilter::COND_GT,
3240     NdbScanFilter::COND_LE,
3241     NdbScanFilter::COND_LT,
3242     NdbScanFilter::COND_EQ
3243   };
3244   Con& con = par.con();
3245   assert(g_compare_null || !m_null);
3246   const char* addr = !m_null ? (const char*)dataaddr() : 0;
3247   const ICol& icol = m_icol;
3248   const Col& col = icol.m_col;
3249   uint length = col.m_bytesize;
3250   uint cond = index_bound_to_filter_bound[m_type];
3251   CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
3252   return 0;
3253 }
3254 
3255 static NdbOut&
operator <<(NdbOut & out,const BVal & bval)3256 operator<<(NdbOut& out, const BVal& bval)
3257 {
3258   const ICol& icol = bval.m_icol;
3259   const Col& col = icol.m_col;
3260   const Val& val = bval;
3261   out << "type=" << bval.m_type;
3262   out << " icol=" << icol.m_num;
3263   out << " col=" << col.m_num << "," << col.m_name;
3264   out << " value=" << val;
3265   return out;
3266 }
3267 
3268 // BSet - set of bounds
3269 
3270 struct BSet {
3271   const Tab& m_tab;
3272   const ITab& m_itab;
3273   uint m_alloc;
3274   uint m_bvals;
3275   BVal** m_bval;
3276   BSet(const Tab& tab, const ITab& itab);
3277   ~BSet();
3278   void reset();
3279   void calc(Par par);
3280   void calcpk(Par par, uint i);
3281   int setbnd(Par par) const;
3282   int setflt(Par par) const;
3283   void filter(Par par, const Set& set, Set& set2) const;
3284 };
3285 
BSet(const Tab & tab,const ITab & itab)3286 BSet::BSet(const Tab& tab, const ITab& itab) :
3287   m_tab(tab),
3288   m_itab(itab),
3289   m_alloc(2 * itab.m_icols),
3290   m_bvals(0)
3291 {
3292   m_bval = new BVal* [m_alloc];
3293   for (uint i = 0; i < m_alloc; i++) {
3294     m_bval[i] = 0;
3295   }
3296 }
3297 
~BSet()3298 BSet::~BSet()
3299 {
3300   delete [] m_bval;
3301 }
3302 
3303 void
reset()3304 BSet::reset()
3305 {
3306   while (m_bvals > 0) {
3307     uint i = --m_bvals;
3308     delete m_bval[i];
3309     m_bval[i] = 0;
3310   }
3311 }
3312 
3313 void
calc(Par par)3314 BSet::calc(Par par)
3315 {
3316   const ITab& itab = m_itab;
3317   par.m_pctrange = par.m_pctbrange;
3318   reset();
3319   for (uint k = 0; k < itab.m_icols; k++) {
3320     const ICol& icol = *itab.m_icol[k];
3321     for (uint i = 0; i <= 1; i++) {
3322       if (m_bvals == 0 && urandom(100) == 0)
3323         return;
3324       if (m_bvals != 0 && urandom(3) == 0)
3325         return;
3326       assert(m_bvals < m_alloc);
3327       BVal& bval = *new BVal(icol);
3328       m_bval[m_bvals++] = &bval;
3329       bval.m_null = false;
3330       uint sel;
3331       do {
3332         // equality bound only on i==0
3333         sel = urandom(5 - i);
3334       } while (strchr(par.m_bound, '0' + sel) == 0);
3335       if (sel < 2)
3336         bval.m_type = 0 | (1 << i);
3337       else if (sel < 4)
3338         bval.m_type = 1 | (1 << i);
3339       else
3340         bval.m_type = 4;
3341       if (k + 1 < itab.m_icols)
3342         bval.m_type = 4;
3343       if (!g_compare_null)
3344         par.m_pctnull = 0;
3345       if (bval.m_type == 0 || bval.m_type == 1)
3346         par.m_bdir = -1;
3347       if (bval.m_type == 2 || bval.m_type == 3)
3348         par.m_bdir = +1;
3349       do {
3350         bval.calcnokey(par);
3351         if (i == 1) {
3352           assert(m_bvals >= 2);
3353           const BVal& bv1 = *m_bval[m_bvals - 2];
3354           const BVal& bv2 = *m_bval[m_bvals - 1];
3355           if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0)
3356             continue;
3357         }
3358       } while (0);
3359       // equality bound only once
3360       if (bval.m_type == 4)
3361         break;
3362     }
3363   }
3364 }
3365 
3366 void
calcpk(Par par,uint i)3367 BSet::calcpk(Par par, uint i)
3368 {
3369   const ITab& itab = m_itab;
3370   reset();
3371   for (uint k = 0; k < itab.m_icols; k++) {
3372     const ICol& icol = *itab.m_icol[k];
3373     const Col& col = icol.m_col;
3374     assert(col.m_pk);
3375     assert(m_bvals < m_alloc);
3376     BVal& bval = *new BVal(icol);
3377     m_bval[m_bvals++] = &bval;
3378     bval.m_type = 4;
3379     bval.calc(par, i);
3380   }
3381 }
3382 
3383 int
setbnd(Par par) const3384 BSet::setbnd(Par par) const
3385 {
3386   if (m_bvals != 0) {
3387     Rsq rsq1(m_bvals);
3388     for (uint j = 0; j < m_bvals; j++) {
3389       uint j2 = rsq1.next();
3390       const BVal& bval = *m_bval[j2];
3391       CHK(bval.setbnd(par) == 0);
3392     }
3393   }
3394   return 0;
3395 }
3396 
3397 int
setflt(Par par) const3398 BSet::setflt(Par par) const
3399 {
3400   Con& con = par.con();
3401   CHK(con.getNdbScanFilter() == 0);
3402   CHK(con.beginFilter(NdbScanFilter::AND) == 0);
3403   if (m_bvals != 0) {
3404     Rsq rsq1(m_bvals);
3405     for (uint j = 0; j < m_bvals; j++) {
3406       uint j2 = rsq1.next();
3407       const BVal& bval = *m_bval[j2];
3408       CHK(bval.setflt(par) == 0);
3409     }
3410     // duplicate
3411     if (urandom(5) == 0) {
3412       uint j3 = urandom(m_bvals);
3413       const BVal& bval = *m_bval[j3];
3414       CHK(bval.setflt(par) == 0);
3415     }
3416   }
3417   CHK(con.endFilter() == 0);
3418   return 0;
3419 }
3420 
3421 void
filter(Par par,const Set & set,Set & set2) const3422 BSet::filter(Par par, const Set& set, Set& set2) const
3423 {
3424   const Tab& tab = m_tab;
3425   const ITab& itab = m_itab;
3426   assert(&tab == &set2.m_tab && set.m_rows == set2.m_rows);
3427   assert(set2.count() == 0);
3428   for (uint i = 0; i < set.m_rows; i++) {
3429     set.lock();
3430     do {
3431       if (set.m_row[i] == 0) {
3432         break;
3433       }
3434       const Row& row = *set.m_row[i];
3435       if (!g_store_null_key) {
3436         bool ok1 = false;
3437         for (uint k = 0; k < itab.m_icols; k++) {
3438           const ICol& icol = *itab.m_icol[k];
3439           const Col& col = icol.m_col;
3440           const Val& val = *row.m_val[col.m_num];
3441           if (!val.m_null) {
3442             ok1 = true;
3443             break;
3444           }
3445         }
3446         if (!ok1)
3447           break;
3448       }
3449       bool ok2 = true;
3450       for (uint j = 0; j < m_bvals; j++) {
3451         const BVal& bval = *m_bval[j];
3452         const ICol& icol = bval.m_icol;
3453         const Col& col = icol.m_col;
3454         const Val& val = *row.m_val[col.m_num];
3455         int ret = bval.cmp(par, val);
3456         LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
3457         if (bval.m_type == 0)
3458           ok2 = (ret <= 0);
3459         else if (bval.m_type == 1)
3460           ok2 = (ret < 0);
3461         else if (bval.m_type == 2)
3462           ok2 = (ret >= 0);
3463         else if (bval.m_type == 3)
3464           ok2 = (ret > 0);
3465         else if (bval.m_type == 4)
3466           ok2 = (ret == 0);
3467         else {
3468           assert(false);
3469         }
3470         if (!ok2)
3471           break;
3472       }
3473       if (!ok2)
3474         break;
3475       assert(set2.m_row[i] == 0);
3476       set2.m_row[i] = new Row(tab);
3477       Row& row2 = *set2.m_row[i];
3478       row2.copy(row, true);
3479     } while (0);
3480     set.unlock();
3481   }
3482 }
3483 
3484 static NdbOut&
operator <<(NdbOut & out,const BSet & bset)3485 operator<<(NdbOut& out, const BSet& bset)
3486 {
3487   out << "bounds=" << bset.m_bvals;
3488   for (uint j = 0; j < bset.m_bvals; j++) {
3489     const BVal& bval = *bset.m_bval[j];
3490     out << " [bound " << j << ": " << bval << "]";
3491   }
3492   return out;
3493 }
3494 
3495 // pk operations
3496 
3497 static int
pkinsert(Par par)3498 pkinsert(Par par)
3499 {
3500   Con& con = par.con();
3501   const Tab& tab = par.tab();
3502   Set& set = par.set();
3503   LL3("pkinsert " << tab.m_name);
3504   CHK(con.startTransaction() == 0);
3505   uint batch = 0;
3506   for (uint j = 0; j < par.m_rows; j++) {
3507     uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3508     uint i = thrrow(par, j2);
3509     set.lock();
3510     if (!set.compat(par, i, Row::OpIns)) {
3511       LL3("pkinsert SKIP " << i << " " << set.getrow(i));
3512       set.unlock();
3513     } else {
3514       set.push(i);
3515       set.calc(par, i);
3516       CHK(set.insrow(par, i) == 0);
3517       set.unlock();
3518       LL4("pkinsert key=" << i << " " << set.getrow(i));
3519       batch++;
3520     }
3521     bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3522     if (batch == par.m_batch || lastbatch) {
3523       uint err = par.m_catcherr;
3524       ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3525       CHK(con.execute(et, err) == 0);
3526       set.lock();
3527       set.post(par, !err ? et : Rollback);
3528       set.unlock();
3529       if (err) {
3530         LL1("pkinsert key=" << i << " stop on " << con.errname(err));
3531         break;
3532       }
3533       batch = 0;
3534       if (!lastbatch) {
3535         con.closeTransaction();
3536         CHK(con.startTransaction() == 0);
3537       }
3538     }
3539   }
3540   con.closeTransaction();
3541   return 0;
3542 }
3543 
3544 static int
pkupdate(Par par)3545 pkupdate(Par par)
3546 {
3547   Con& con = par.con();
3548   const Tab& tab = par.tab();
3549   Set& set = par.set();
3550   LL3("pkupdate " << tab.m_name);
3551   CHK(con.startTransaction() == 0);
3552   uint batch = 0;
3553   for (uint j = 0; j < par.m_rows; j++) {
3554     uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3555     uint i = thrrow(par, j2);
3556     set.lock();
3557     if (!set.compat(par, i, Row::OpUpd)) {
3558       LL3("pkupdate SKIP " << i << " " << set.getrow(i));
3559       set.unlock();
3560     } else {
3561       set.push(i);
3562       set.copyval(i, tab.m_pkmask);
3563       set.calc(par, i, ~tab.m_pkmask);
3564       CHK(set.updrow(par, i) == 0);
3565       set.unlock();
3566       LL4("pkupdate key=" << i << " " << set.getrow(i));
3567       batch++;
3568     }
3569     bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3570     if (batch == par.m_batch || lastbatch) {
3571       uint err = par.m_catcherr;
3572       ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3573       CHK(con.execute(et, err) == 0);
3574       set.lock();
3575       set.post(par, !err ? et : Rollback);
3576       set.unlock();
3577       if (err) {
3578         LL1("pkupdate key=" << i << ": stop on " << con.errname(err));
3579         break;
3580       }
3581       batch = 0;
3582       if (!lastbatch) {
3583         con.closeTransaction();
3584         CHK(con.startTransaction() == 0);
3585       }
3586     }
3587   }
3588   con.closeTransaction();
3589   return 0;
3590 }
3591 
3592 static int
pkdelete(Par par)3593 pkdelete(Par par)
3594 {
3595   Con& con = par.con();
3596   const Tab& tab = par.tab();
3597   Set& set = par.set();
3598   LL3("pkdelete " << tab.m_name);
3599   CHK(con.startTransaction() == 0);
3600   uint batch = 0;
3601   for (uint j = 0; j < par.m_rows; j++) {
3602     uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3603     uint i = thrrow(par, j2);
3604     set.lock();
3605     if (!set.compat(par, i, Row::OpDel)) {
3606       LL3("pkdelete SKIP " << i << " " << set.getrow(i));
3607       set.unlock();
3608     } else {
3609       set.push(i);
3610       set.copyval(i, tab.m_pkmask);
3611       CHK(set.delrow(par, i) == 0);
3612       set.unlock();
3613       LL4("pkdelete key=" << i << " " << set.getrow(i));
3614       batch++;
3615     }
3616     bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3617     if (batch == par.m_batch || lastbatch) {
3618       uint err = par.m_catcherr;
3619       ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3620       CHK(con.execute(et, err) == 0);
3621       set.lock();
3622       set.post(par, !err ? et : Rollback);
3623       set.unlock();
3624       if (err) {
3625         LL1("pkdelete key=" << i << " stop on " << con.errname(err));
3626         break;
3627       }
3628       batch = 0;
3629       if (!lastbatch) {
3630         con.closeTransaction();
3631         CHK(con.startTransaction() == 0);
3632       }
3633     }
3634   }
3635   con.closeTransaction();
3636   return 0;
3637 }
3638 
3639 #if 0
3640 static int
3641 pkread(Par par)
3642 {
3643   Con& con = par.con();
3644   const Tab& tab = par.tab();
3645   Set& set = par.set();
3646   LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
3647   // expected
3648   const Set& set1 = set;
3649   Set set2(tab, set.m_rows);
3650   for (uint i = 0; i < set.m_rows; i++) {
3651     set.lock();
3652     // TODO lock mode
3653     if (!set.compat(par, i, Row::OpREAD)) {
3654       LL3("pkread SKIP " << i << " " << set.getrow(i));
3655       set.unlock();
3656       continue;
3657     }
3658     set.unlock();
3659     CHK(con.startTransaction() == 0);
3660     CHK(set2.selrow(par, *set1.m_row[i]) == 0);
3661     CHK(con.execute(Commit) == 0);
3662     uint i2 = (uint)-1;
3663     CHK(set2.getkey(par, &i2) == 0 && i == i2);
3664     CHK(set2.putval(i, false) == 0);
3665     LL4("row " << set2.count() << " " << set2.getrow(i));
3666     con.closeTransaction();
3667   }
3668   if (par.m_verify)
3669     CHK(set1.verify(par, set2, false) == 0);
3670   return 0;
3671 }
3672 #endif
3673 
3674 static int
pkreadfast(Par par,uint count)3675 pkreadfast(Par par, uint count)
3676 {
3677   Con& con = par.con();
3678   const Tab& tab = par.tab();
3679   const Set& set = par.set();
3680   LL3("pkfast " << tab.m_name);
3681   Row keyrow(tab);
3682   // not batched on purpose
3683   for (uint j = 0; j < count; j++) {
3684     uint i = urandom(set.m_rows);
3685     assert(set.compat(par, i, Row::OpREAD));
3686     CHK(con.startTransaction() == 0);
3687     // define key
3688     keyrow.calc(par, i);
3689     CHK(keyrow.selrow(par) == 0);
3690     NdbRecAttr* rec;
3691     // get 1st column
3692     CHK(con.getValue((Uint32)0, rec) == 0);
3693     CHK(con.execute(Commit) == 0);
3694     con.closeTransaction();
3695   }
3696   return 0;
3697 }
3698 
3699 // hash index operations
3700 
3701 static int
hashindexupdate(Par par,const ITab & itab)3702 hashindexupdate(Par par, const ITab& itab)
3703 {
3704   Con& con = par.con();
3705   const Tab& tab = par.tab();
3706   Set& set = par.set();
3707   LL3("hashindexupdate " << itab.m_name);
3708   CHK(con.startTransaction() == 0);
3709   uint batch = 0;
3710   for (uint j = 0; j < par.m_rows; j++) {
3711     uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3712     uint i = thrrow(par, j2);
3713     set.lock();
3714     if (!set.compat(par, i, Row::OpUpd)) {
3715       LL3("hashindexupdate SKIP " << i << " " << set.getrow(i));
3716       set.unlock();
3717     } else {
3718       // table pk and index key are not updated
3719       set.push(i);
3720       uint keymask = tab.m_pkmask | itab.m_keymask;
3721       set.copyval(i, keymask);
3722       set.calc(par, i, ~keymask);
3723       CHK(set.updrow(par, itab, i) == 0);
3724       set.unlock();
3725       LL4("hashindexupdate " << i << " " << set.getrow(i));
3726       batch++;
3727     }
3728     bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3729     if (batch == par.m_batch || lastbatch) {
3730       uint err = par.m_catcherr;
3731       ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3732       CHK(con.execute(et, err) == 0);
3733       set.lock();
3734       set.post(par, !err ? et : Rollback);
3735       set.unlock();
3736       if (err) {
3737         LL1("hashindexupdate " << i << " stop on " << con.errname(err));
3738         break;
3739       }
3740       batch = 0;
3741       if (!lastbatch) {
3742         con.closeTransaction();
3743         CHK(con.startTransaction() == 0);
3744       }
3745     }
3746   }
3747   con.closeTransaction();
3748   return 0;
3749 }
3750 
3751 static int
hashindexdelete(Par par,const ITab & itab)3752 hashindexdelete(Par par, const ITab& itab)
3753 {
3754   Con& con = par.con();
3755   Set& set = par.set();
3756   LL3("hashindexdelete " << itab.m_name);
3757   CHK(con.startTransaction() == 0);
3758   uint batch = 0;
3759   for (uint j = 0; j < par.m_rows; j++) {
3760     uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3761     uint i = thrrow(par, j2);
3762     set.lock();
3763     if (!set.compat(par, i, Row::OpDel)) {
3764       LL3("hashindexdelete SKIP " << i << " " << set.getrow(i));
3765       set.unlock();
3766     } else {
3767       set.push(i);
3768       set.copyval(i, itab.m_keymask);
3769       CHK(set.delrow(par, itab, i) == 0);
3770       set.unlock();
3771       LL4("hashindexdelete " << i << " " << set.getrow(i));
3772       batch++;
3773     }
3774     bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3775     if (batch == par.m_batch || lastbatch) {
3776       uint err = par.m_catcherr;
3777       ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3778       CHK(con.execute(et, err) == 0);
3779       set.lock();
3780       set.post(par, !err ? et : Rollback);
3781       set.unlock();
3782       if (err) {
3783         LL1("hashindexdelete " << i << " stop on " << con.errname(err));
3784         break;
3785       }
3786       batch = 0;
3787       if (!lastbatch) {
3788         con.closeTransaction();
3789         CHK(con.startTransaction() == 0);
3790       }
3791     }
3792   }
3793   con.closeTransaction();
3794   return 0;
3795 }
3796 
3797 static int
hashindexread(Par par,const ITab & itab)3798 hashindexread(Par par, const ITab& itab)
3799 {
3800   Con& con = par.con();
3801   const Tab& tab = par.tab();
3802   Set& set = par.set();
3803   LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
3804   // expected
3805   const Set& set1 = set;
3806   Set set2(tab, set.m_rows);
3807   for (uint i = 0; i < set.m_rows; i++) {
3808     set.lock();
3809     // TODO lock mode
3810     if (!set.compat(par, i, Row::OpREAD)) {
3811       LL3("hashindexread SKIP " << i << " " << set.getrow(i));
3812       set.unlock();
3813       continue;
3814     }
3815     set.unlock();
3816     CHK(con.startTransaction() == 0);
3817     CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
3818     CHK(con.execute(Commit) == 0);
3819     uint i2 = (uint)-1;
3820     CHK(set2.getkey(par, &i2) == 0 && i == i2);
3821     CHK(set2.putval(i, false) == 0);
3822     LL4("row " << set2.count() << " " << *set2.m_row[i]);
3823     con.closeTransaction();
3824   }
3825   if (par.m_verify)
3826     CHK(set1.verify(par, set2, false) == 0);
3827   return 0;
3828 }
3829 
3830 // scan read
3831 
3832 static int
scanreadtable(Par par)3833 scanreadtable(Par par)
3834 {
3835   Con& con = par.con();
3836   const Tab& tab = par.tab();
3837   const Set& set = par.set();
3838   // expected
3839   const Set& set1 = set;
3840   LL3("scanreadtable " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
3841   Set set2(tab, set.m_rows);
3842   CHK(con.startTransaction() == 0);
3843   CHK(con.getNdbScanOperation(tab) == 0);
3844   CHK(con.readTuples(par) == 0);
3845   set2.getval(par);
3846   CHK(con.executeScan() == 0);
3847   uint n = 0;
3848   while (1) {
3849     int ret;
3850     uint err = par.m_catcherr;
3851     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3852     if (ret == 1)
3853       break;
3854     if (err) {
3855       LL1("scanreadtable stop on " << con.errname(err));
3856       break;
3857     }
3858     uint i = (uint)-1;
3859     CHK(set2.getkey(par, &i) == 0);
3860     CHK(set2.putval(i, false, n) == 0);
3861     LL4("row " << n << " " << *set2.m_row[i]);
3862     n++;
3863   }
3864   con.closeTransaction();
3865   if (par.m_verify)
3866     CHK(set1.verify(par, set2, false) == 0);
3867   LL3("scanreadtable " << tab.m_name << " done rows=" << n);
3868   return 0;
3869 }
3870 
3871 static int
scanreadtablefast(Par par,uint countcheck)3872 scanreadtablefast(Par par, uint countcheck)
3873 {
3874   Con& con = par.con();
3875   const Tab& tab = par.tab();
3876   LL3("scanfast " << tab.m_name);
3877   CHK(con.startTransaction() == 0);
3878   CHK(con.getNdbScanOperation(tab) == 0);
3879   CHK(con.readTuples(par) == 0);
3880   // get 1st column
3881   NdbRecAttr* rec;
3882   CHK(con.getValue((Uint32)0, rec) == 0);
3883   CHK(con.executeScan() == 0);
3884   uint count = 0;
3885   while (1) {
3886     int ret;
3887     CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
3888     if (ret == 1)
3889       break;
3890     count++;
3891   }
3892   con.closeTransaction();
3893   CHK(count == countcheck);
3894   return 0;
3895 }
3896 
3897 // try to get interesting bounds
3898 static void
calcscanbounds(Par par,const ITab & itab,BSet & bset,const Set & set,Set & set1)3899 calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
3900 {
3901   while (true) {
3902     bset.calc(par);
3903     bset.filter(par, set, set1);
3904     uint n = set1.count();
3905     // prefer proper subset
3906     if (0 < n && n < set.m_rows)
3907       break;
3908     if (urandom(5) == 0)
3909       break;
3910     set1.reset();
3911   }
3912 }
3913 
3914 static int
scanreadindex(Par par,const ITab & itab,BSet & bset,bool calc)3915 scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
3916 {
3917   Con& con = par.con();
3918   const Tab& tab = par.tab();
3919   const Set& set = par.set();
3920   Set set1(tab, set.m_rows);
3921   if (calc) {
3922     calcscanbounds(par, itab, bset, set, set1);
3923   } else {
3924     bset.filter(par, set, set1);
3925   }
3926   LL3("scanreadindex " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
3927   Set set2(tab, set.m_rows);
3928   CHK(con.startTransaction() == 0);
3929   CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
3930   CHK(con.readIndexTuples(par) == 0);
3931   CHK(bset.setbnd(par) == 0);
3932   set2.getval(par);
3933   CHK(con.executeScan() == 0);
3934   uint n = 0;
3935   while (1) {
3936     int ret;
3937     uint err = par.m_catcherr;
3938     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3939     if (ret == 1)
3940       break;
3941     if (err) {
3942       LL1("scanreadindex stop on " << con.errname(err));
3943       break;
3944     }
3945     uint i = (uint)-1;
3946     CHK(set2.getkey(par, &i) == 0);
3947     CHK(set2.putval(i, par.m_dups, n) == 0);
3948     LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
3949     n++;
3950   }
3951   con.closeTransaction();
3952   if (par.m_verify) {
3953     CHK(set1.verify(par, set2, false) == 0);
3954     if (par.m_ordered)
3955       CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
3956   }
3957   LL3("scanreadindex " << itab.m_name << " done rows=" << n);
3958   return 0;
3959 }
3960 
3961 
3962 static int
scanreadindexmrr(Par par,const ITab & itab,int numBsets)3963 scanreadindexmrr(Par par, const ITab& itab, int numBsets)
3964 {
3965   Con& con = par.con();
3966   const Tab& tab = par.tab();
3967   const Set& set = par.set();
3968 
3969   /* Create space for different sets of bounds, expected results and
3970    * results
3971    * Calculate bounds and the sets of rows which would result
3972    */
3973   BSet** boundSets;
3974   Set** expectedResults;
3975   Set** actualResults;
3976   uint* setSizes;
3977 
3978   CHK((boundSets= (BSet**) malloc(numBsets * sizeof(BSet*))) != 0);
3979   CHK((expectedResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3980   CHK((actualResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3981   CHK((setSizes= (uint*) malloc(numBsets * sizeof(uint))) != 0);
3982 
3983   for (int n=0; n < numBsets; n++)
3984   {
3985     CHK((boundSets[n]= new BSet(tab, itab)) != NULL );
3986     CHK((expectedResults[n]= new Set(tab, set.m_rows)) != NULL);
3987     CHK((actualResults[n]= new Set(tab, set.m_rows)) != NULL);
3988     setSizes[n]= 0;
3989 
3990     Set& results= *expectedResults[n];
3991     /* Calculate some scan bounds which are selective */
3992     do {
3993       results.reset();
3994       calcscanbounds(par, itab, *boundSets[n], set, results);
3995     } while ((*boundSets[n]).m_bvals == 0);
3996   }
3997 
3998   /* Define scan with bounds */
3999   LL3("scanreadindexmrr " << itab.m_name << " ranges= " << numBsets << " lockmode=" << par.m_lockmode << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
4000   Set set2(tab, set.m_rows);
4001   /* Multirange + Read range number for this scan */
4002   par.m_multiRange= true;
4003   CHK(con.startTransaction() == 0);
4004   CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4005   CHK(con.readIndexTuples(par) == 0);
4006   /* Set the bounds */
4007   for (int n=0; n < numBsets; n++)
4008   {
4009     CHK(boundSets[n]->setbnd(par) == 0);
4010     int res= con.m_indexscanop->end_of_bound(n);
4011     if (res != 0)
4012     {
4013       LL1("end_of_bound error : " << con.m_indexscanop->getNdbError().code);
4014       CHK (false);
4015     }
4016   }
4017   set2.getval(par);
4018   CHK(con.executeScan() == 0);
4019   int rows_received= 0;
4020   while (1) {
4021     int ret;
4022     uint err = par.m_catcherr;
4023     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4024     if (ret == 1)
4025       break;
4026     if (err) {
4027       LL1("scanreadindexmrr stop on " << con.errname(err));
4028       break;
4029     }
4030     uint i = (uint)-1;
4031     /* Put value into set2 temporarily */
4032     CHK(set2.getkey(par, &i) == 0);
4033     CHK(set2.putval(i, false, -1) == 0);
4034 
4035     /* Now move it to the correct set, based on the range no */
4036     int rangeNum= con.m_indexscanop->get_range_no();
4037     CHK(rangeNum < numBsets);
4038     CHK(set2.m_row[i] != NULL);
4039     /* Get rowNum based on what's in the set already (slow) */
4040     CHK(setSizes[rangeNum] == actualResults[rangeNum]->count());
4041     int rowNum= setSizes[rangeNum];
4042     setSizes[rangeNum] ++;
4043     CHK((uint) rowNum < set2.m_rows);
4044     actualResults[rangeNum]->m_row[i]= set2.m_row[i];
4045     actualResults[rangeNum]->m_rowkey[rowNum]= i;
4046     set2.m_row[i]= 0;
4047     LL4("range " << rangeNum << " key " << i << " row " << rowNum << " " << *set2.m_row[i]);
4048     rows_received++;
4049   }
4050   con.closeTransaction();
4051 
4052   /* Verify that each set has the expected rows, and optionally, that
4053    * they're ordered
4054    */
4055   if (par.m_verify)
4056   {
4057     LL4("Verifying " << numBsets << " sets, " << rows_received << " rows");
4058     for (int n=0; n < numBsets; n++)
4059     {
4060       LL5("Set " << n << " of " << expectedResults[n]->count() << " rows");
4061       CHK(expectedResults[n]->verify(par, *actualResults[n], false) == 0);
4062       if (par.m_ordered)
4063       {
4064         LL5("Verifying ordering");
4065         CHK(actualResults[n]->verifyorder(par, itab, par.m_descending) == 0);
4066       }
4067     }
4068   }
4069 
4070   /* Cleanup */
4071   for (int n=0; n < numBsets; n++)
4072   {
4073     boundSets[n]->reset();
4074     delete boundSets[n];
4075     delete expectedResults[n];
4076     delete actualResults[n];
4077   }
4078 
4079   free(boundSets);
4080   free(expectedResults);
4081   free(actualResults);
4082   free(setSizes);
4083 
4084   LL3("scanreadindexmrr " << itab.m_name << " done rows=" << rows_received);
4085   return 0;
4086 }
4087 
4088 static int
scanreadindexfast(Par par,const ITab & itab,const BSet & bset,uint countcheck)4089 scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
4090 {
4091   Con& con = par.con();
4092   const Tab& tab = par.tab();
4093   LL3("scanfast " << itab.m_name << " " << bset);
4094   LL4(bset);
4095   CHK(con.startTransaction() == 0);
4096   CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4097   CHK(con.readIndexTuples(par) == 0);
4098   CHK(bset.setbnd(par) == 0);
4099   // get 1st column
4100   NdbRecAttr* rec;
4101   CHK(con.getValue((Uint32)0, rec) == 0);
4102   CHK(con.executeScan() == 0);
4103   uint count = 0;
4104   while (1) {
4105     int ret;
4106     CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
4107     if (ret == 1)
4108       break;
4109     count++;
4110   }
4111   con.closeTransaction();
4112   CHK(count == countcheck);
4113   return 0;
4114 }
4115 
4116 static int
scanreadfilter(Par par,const ITab & itab,BSet & bset,bool calc)4117 scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
4118 {
4119   Con& con = par.con();
4120   const Tab& tab = par.tab();
4121   const Set& set = par.set();
4122   Set set1(tab, set.m_rows);
4123   if (calc) {
4124     calcscanbounds(par, itab, bset, set, set1);
4125   } else {
4126     bset.filter(par, set, set1);
4127   }
4128   LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
4129   Set set2(tab, set.m_rows);
4130   CHK(con.startTransaction() == 0);
4131   CHK(con.getNdbScanOperation(tab) == 0);
4132   CHK(con.readTuples(par) == 0);
4133   CHK(bset.setflt(par) == 0);
4134   set2.getval(par);
4135   CHK(con.executeScan() == 0);
4136   uint n = 0;
4137   while (1) {
4138     int ret;
4139     uint err = par.m_catcherr;
4140     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4141     if (ret == 1)
4142       break;
4143     if (err) {
4144       LL1("scanfilter stop on " << con.errname(err));
4145       break;
4146     }
4147     uint i = (uint)-1;
4148     CHK(set2.getkey(par, &i) == 0);
4149     CHK(set2.putval(i, par.m_dups, n) == 0);
4150     LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
4151     n++;
4152   }
4153   con.closeTransaction();
4154   if (par.m_verify) {
4155     CHK(set1.verify(par, set2, false) == 0);
4156   }
4157   LL3("scanfilter " << itab.m_name << " done rows=" << n);
4158   return 0;
4159 }
4160 
4161 static int
scanreadindex(Par par,const ITab & itab)4162 scanreadindex(Par par, const ITab& itab)
4163 {
4164   const Tab& tab = par.tab();
4165   for (uint i = 0; i < par.m_ssloop; i++) {
4166     if (itab.m_type == ITab::OrderedIndex) {
4167       BSet bset(tab, itab);
4168       CHK(scanreadfilter(par, itab, bset, true) == 0);
4169       /* Single range or Multi range scan */
4170       if (randompct(g_opt.m_pctmrr))
4171         CHK(scanreadindexmrr(par,
4172                              itab,
4173                              1+urandom(g_opt.m_mrrmaxrng-1)) == 0);
4174       else
4175         CHK(scanreadindex(par, itab, bset, true) == 0);
4176     }
4177   }
4178   return 0;
4179 }
4180 
4181 static int
scanreadindex(Par par)4182 scanreadindex(Par par)
4183 {
4184   const Tab& tab = par.tab();
4185   for (uint i = 0; i < tab.m_itabs; i++) {
4186     if (tab.m_itab[i] == 0)
4187       continue;
4188     const ITab& itab = *tab.m_itab[i];
4189     if (itab.m_type == ITab::OrderedIndex) {
4190       CHK(scanreadindex(par, itab) == 0);
4191     } else {
4192       CHK(hashindexread(par, itab) == 0);
4193     }
4194   }
4195   return 0;
4196 }
4197 
4198 #if 0
4199 static int
4200 scanreadall(Par par)
4201 {
4202   CHK(scanreadtable(par) == 0);
4203   CHK(scanreadindex(par) == 0);
4204   return 0;
4205 }
4206 #endif
4207 
4208 // timing scans
4209 
4210 static int
timescantable(Par par)4211 timescantable(Par par)
4212 {
4213   par.tmr().on();
4214   CHK(scanreadtablefast(par, par.m_totrows) == 0);
4215   par.tmr().off(par.set().m_rows);
4216   return 0;
4217 }
4218 
4219 static int
timescanpkindex(Par par)4220 timescanpkindex(Par par)
4221 {
4222   const Tab& tab = par.tab();
4223   const ITab& itab = *tab.m_itab[0];    // 1st index is on PK
4224   BSet bset(tab, itab);
4225   par.tmr().on();
4226   CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
4227   par.tmr().off(par.set().m_rows);
4228   return 0;
4229 }
4230 
4231 static int
timepkreadtable(Par par)4232 timepkreadtable(Par par)
4233 {
4234   par.tmr().on();
4235   uint count = par.m_samples;
4236   if (count == 0)
4237     count = par.m_totrows;
4238   CHK(pkreadfast(par, count) == 0);
4239   par.tmr().off(count);
4240   return 0;
4241 }
4242 
4243 static int
timepkreadindex(Par par)4244 timepkreadindex(Par par)
4245 {
4246   const Tab& tab = par.tab();
4247   const ITab& itab = *tab.m_itab[0];    // 1st index is on PK
4248   BSet bset(tab, itab);
4249   uint count = par.m_samples;
4250   if (count == 0)
4251     count = par.m_totrows;
4252   par.tmr().on();
4253   for (uint j = 0; j < count; j++) {
4254     uint i = urandom(par.m_totrows);
4255     bset.calcpk(par, i);
4256     CHK(scanreadindexfast(par, itab, bset, 1) == 0);
4257   }
4258   par.tmr().off(count);
4259   return 0;
4260 }
4261 
4262 // scan update
4263 
4264 static int
scanupdatetable(Par par)4265 scanupdatetable(Par par)
4266 {
4267   Con& con = par.con();
4268   const Tab& tab = par.tab();
4269   Set& set = par.set();
4270   LL3("scanupdatetable " << tab.m_name);
4271   Set set2(tab, set.m_rows);
4272   par.m_lockmode = NdbOperation::LM_Exclusive;
4273   CHK(con.startTransaction() == 0);
4274   CHK(con.getNdbScanOperation(tab) == 0);
4275   CHK(con.readTuples(par) == 0);
4276   set2.getval(par);
4277   CHK(con.executeScan() == 0);
4278   uint count = 0;
4279   // updating trans
4280   Con con2;
4281   con2.connect(con);
4282   CHK(con2.startTransaction() == 0);
4283   uint batch = 0;
4284   while (1) {
4285     int ret;
4286     uint32 err = par.m_catcherr;
4287     CHK((ret = con.nextScanResult(true, err)) != -1);
4288     if (ret != 0)
4289       break;
4290     if (err) {
4291       LL1("scanupdatetable [scan] stop on " << con.errname(err));
4292       break;
4293     }
4294     if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4295       con.closeScan();
4296       break;
4297     }
4298     while (1) {
4299       uint i = (uint)-1;
4300       CHK(set2.getkey(par, &i) == 0);
4301       set.lock();
4302       if (!set.compat(par, i, Row::OpUpd)) {
4303         LL3("scanupdatetable SKIP " << i << " " << set.getrow(i));
4304       } else {
4305         CHKTRY(set2.putval(i, false) == 0, set.unlock());
4306         CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4307         Par par2 = par;
4308         par2.m_con = &con2;
4309         set.push(i);
4310         set.calc(par, i, ~tab.m_pkmask);
4311         CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4312         LL4("scanupdatetable " << i << " " << set.getrow(i));
4313         batch++;
4314       }
4315       set.unlock();
4316       CHK((ret = con.nextScanResult(false)) != -1);
4317       bool lastbatch = (batch != 0 && ret != 0);
4318       if (batch == par.m_batch || lastbatch) {
4319         uint err = par.m_catcherr;
4320         ExecType et = Commit;
4321         CHK(con2.execute(et, err) == 0);
4322         set.lock();
4323         set.post(par, !err ? et : Rollback);
4324         set.unlock();
4325         if (err) {
4326           LL1("scanupdatetable [update] stop on " << con2.errname(err));
4327           goto out;
4328         }
4329         LL4("scanupdatetable committed batch");
4330         count += batch;
4331         batch = 0;
4332         con2.closeTransaction();
4333         CHK(con2.startTransaction() == 0);
4334       }
4335       if (ret != 0)
4336         break;
4337     }
4338   }
4339 out:
4340   con2.closeTransaction();
4341   LL3("scanupdatetable " << tab.m_name << " rows updated=" << count);
4342   con.closeTransaction();
4343   return 0;
4344 }
4345 
4346 static int
scanupdateindex(Par par,const ITab & itab,BSet & bset,bool calc)4347 scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
4348 {
4349   Con& con = par.con();
4350   const Tab& tab = par.tab();
4351   Set& set = par.set();
4352   // expected
4353   Set set1(tab, set.m_rows);
4354   if (calc) {
4355     calcscanbounds(par, itab, bset, set, set1);
4356   } else {
4357     bset.filter(par, set, set1);
4358   }
4359   LL3("scanupdateindex " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
4360   Set set2(tab, set.m_rows);
4361   par.m_lockmode = NdbOperation::LM_Exclusive;
4362   CHK(con.startTransaction() == 0);
4363   CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4364   CHK(con.readTuples(par) == 0);
4365   CHK(bset.setbnd(par) == 0);
4366   set2.getval(par);
4367   CHK(con.executeScan() == 0);
4368   uint count = 0;
4369   // updating trans
4370   Con con2;
4371   con2.connect(con);
4372   CHK(con2.startTransaction() == 0);
4373   uint batch = 0;
4374   while (1) {
4375     int ret;
4376     uint err = par.m_catcherr;
4377     CHK((ret = con.nextScanResult(true, err)) != -1);
4378     if (ret != 0)
4379       break;
4380     if (err) {
4381       LL1("scanupdateindex [scan] stop on " << con.errname(err));
4382       break;
4383     }
4384     if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4385       con.closeScan();
4386       break;
4387     }
4388     while (1) {
4389       uint i = (uint)-1;
4390       CHK(set2.getkey(par, &i) == 0);
4391       set.lock();
4392       if (!set.compat(par, i, Row::OpUpd)) {
4393         LL4("scanupdateindex SKIP " << set.getrow(i));
4394       } else {
4395         CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
4396         CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4397         Par par2 = par;
4398         par2.m_con = &con2;
4399         set.push(i);
4400         uint colmask = !par.m_noindexkeyupdate ? ~0 : ~itab.m_keymask;
4401         set.calc(par, i, colmask);
4402         CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4403         LL4("scanupdateindex " << i << " " << set.getrow(i));
4404         batch++;
4405       }
4406       set.unlock();
4407       CHK((ret = con.nextScanResult(false)) != -1);
4408       bool lastbatch = (batch != 0 && ret != 0);
4409       if (batch == par.m_batch || lastbatch) {
4410         uint err = par.m_catcherr;
4411         ExecType et = Commit;
4412         CHK(con2.execute(et, err) == 0);
4413         set.lock();
4414         set.post(par, !err ? et : Rollback);
4415         set.unlock();
4416         if (err) {
4417           LL1("scanupdateindex [update] stop on " << con2.errname(err));
4418           goto out;
4419         }
4420         LL4("scanupdateindex committed batch");
4421         count += batch;
4422         batch = 0;
4423         con2.closeTransaction();
4424         CHK(con2.startTransaction() == 0);
4425       }
4426       if (ret != 0)
4427         break;
4428     }
4429   }
4430 out:
4431   con2.closeTransaction();
4432   if (par.m_verify) {
4433     CHK(set1.verify(par, set2, true) == 0);
4434     if (par.m_ordered)
4435       CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
4436   }
4437   LL3("scanupdateindex " << itab.m_name << " rows updated=" << count);
4438   con.closeTransaction();
4439   return 0;
4440 }
4441 
4442 static int
scanupdateindex(Par par,const ITab & itab)4443 scanupdateindex(Par par, const ITab& itab)
4444 {
4445   const Tab& tab = par.tab();
4446   for (uint i = 0; i < par.m_ssloop; i++) {
4447     if (itab.m_type == ITab::OrderedIndex) {
4448       BSet bset(tab, itab);
4449       CHK(scanupdateindex(par, itab, bset, true) == 0);
4450     } else {
4451       CHK(hashindexupdate(par, itab) == 0);
4452     }
4453   }
4454   return 0;
4455 }
4456 
4457 static int
scanupdateindex(Par par)4458 scanupdateindex(Par par)
4459 {
4460   const Tab& tab = par.tab();
4461   for (uint i = 0; i < tab.m_itabs; i++) {
4462     if (tab.m_itab[i] == 0)
4463       continue;
4464     const ITab& itab = *tab.m_itab[i];
4465     CHK(scanupdateindex(par, itab) == 0);
4466   }
4467   return 0;
4468 }
4469 
4470 #if 0
4471 static int
4472 scanupdateall(Par par)
4473 {
4474   CHK(scanupdatetable(par) == 0);
4475   CHK(scanupdateindex(par) == 0);
4476   return 0;
4477 }
4478 #endif
4479 
4480 // medium level routines
4481 
4482 static int
readverifyfull(Par par)4483 readverifyfull(Par par)
4484 {
4485   if (par.m_noverify)
4486     return 0;
4487   par.m_verify = true;
4488   if (par.m_abortpct != 0) {
4489     LL2("skip verify in this version"); // implement in 5.0 version
4490     par.m_verify = false;
4491   }
4492   par.m_lockmode = NdbOperation::LM_CommittedRead;
4493   const Tab& tab = par.tab();
4494   if (par.m_no == 0) {
4495     // thread 0 scans table
4496     CHK(scanreadtable(par) == 0);
4497     // once more via tup scan
4498     par.m_tupscan = true;
4499     CHK(scanreadtable(par) == 0);
4500   }
4501   // each thread scans different indexes
4502   for (uint i = 0; i < tab.m_itabs; i++) {
4503     if (i % par.m_usedthreads != par.m_no)
4504       continue;
4505     if (tab.m_itab[i] == 0)
4506       continue;
4507     const ITab& itab = *tab.m_itab[i];
4508     if (itab.m_type == ITab::OrderedIndex) {
4509       BSet bset(tab, itab);
4510       CHK(scanreadindex(par, itab, bset, false) == 0);
4511     } else {
4512       CHK(hashindexread(par, itab) == 0);
4513     }
4514   }
4515   return 0;
4516 }
4517 
4518 static int
readverifyindex(Par par)4519 readverifyindex(Par par)
4520 {
4521   if (par.m_noverify)
4522     return 0;
4523   par.m_verify = true;
4524   par.m_lockmode = NdbOperation::LM_CommittedRead;
4525   uint sel = urandom(10);
4526   if (sel < 9) {
4527     par.m_ordered = true;
4528     par.m_descending = (sel < 5);
4529   }
4530   CHK(scanreadindex(par) == 0);
4531   return 0;
4532 }
4533 
4534 static int
pkops(Par par)4535 pkops(Par par)
4536 {
4537   const Tab& tab = par.tab();
4538   par.m_randomkey = true;
4539   for (uint i = 0; i < par.m_ssloop; i++) {
4540     uint j = 0;
4541     while (j < tab.m_itabs) {
4542       if (tab.m_itab[j] != 0) {
4543         const ITab& itab = *tab.m_itab[j];
4544         if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
4545           break;
4546       }
4547       j++;
4548     }
4549     uint sel = urandom(10);
4550     if (par.m_slno % 2 == 0) {
4551       // favor insert
4552       if (sel < 8) {
4553         CHK(pkinsert(par) == 0);
4554       } else if (sel < 9) {
4555         if (j == tab.m_itabs)
4556           CHK(pkupdate(par) == 0);
4557         else {
4558           const ITab& itab = *tab.m_itab[j];
4559           CHK(hashindexupdate(par, itab) == 0);
4560         }
4561       } else {
4562         if (j == tab.m_itabs)
4563           CHK(pkdelete(par) == 0);
4564         else {
4565           const ITab& itab = *tab.m_itab[j];
4566           CHK(hashindexdelete(par, itab) == 0);
4567         }
4568       }
4569     } else {
4570       // favor delete
4571       if (sel < 1) {
4572         CHK(pkinsert(par) == 0);
4573       } else if (sel < 2) {
4574         if (j == tab.m_itabs)
4575           CHK(pkupdate(par) == 0);
4576         else {
4577           const ITab& itab = *tab.m_itab[j];
4578           CHK(hashindexupdate(par, itab) == 0);
4579         }
4580       } else {
4581         if (j == tab.m_itabs)
4582           CHK(pkdelete(par) == 0);
4583         else {
4584           const ITab& itab = *tab.m_itab[j];
4585           CHK(hashindexdelete(par, itab) == 0);
4586         }
4587       }
4588     }
4589   }
4590   return 0;
4591 }
4592 
4593 static int
pkupdatescanread(Par par)4594 pkupdatescanread(Par par)
4595 {
4596   par.m_dups = true;
4597   par.m_catcherr |= Con::ErrDeadlock;
4598   uint sel = urandom(10);
4599   if (sel < 5) {
4600     CHK(pkupdate(par) == 0);
4601   } else if (sel < 6) {
4602     par.m_verify = false;
4603     CHK(scanreadtable(par) == 0);
4604   } else {
4605     par.m_verify = false;
4606     if (sel < 8) {
4607       par.m_ordered = true;
4608       par.m_descending = (sel < 7);
4609     }
4610     CHK(scanreadindex(par) == 0);
4611   }
4612   return 0;
4613 }
4614 
4615 static int
mixedoperations(Par par)4616 mixedoperations(Par par)
4617 {
4618   par.m_dups = true;
4619   par.m_catcherr |= Con::ErrDeadlock;
4620   par.m_scanstop = par.m_totrows;       // randomly close scans
4621   uint sel = urandom(10);
4622   if (sel < 2) {
4623     CHK(pkdelete(par) == 0);
4624   } else if (sel < 4) {
4625     CHK(pkupdate(par) == 0);
4626   } else if (sel < 6) {
4627     CHK(scanupdatetable(par) == 0);
4628   } else {
4629     if (sel < 8) {
4630       par.m_ordered = true;
4631       par.m_descending = (sel < 7);
4632     }
4633     CHK(scanupdateindex(par) == 0);
4634   }
4635   return 0;
4636 }
4637 
4638 static int
parallelorderedupdate(Par par)4639 parallelorderedupdate(Par par)
4640 {
4641   const Tab& tab = par.tab();
4642   uint k = 0;
4643   for (uint i = 0; i < tab.m_itabs; i++) {
4644     if (tab.m_itab[i] == 0)
4645       continue;
4646     const ITab& itab = *tab.m_itab[i];
4647     if (itab.m_type != ITab::OrderedIndex)
4648       continue;
4649     // cannot sync threads yet except via subloop
4650     if (k++ == par.m_slno % tab.m_orderedindexes) {
4651       LL3("parallelorderedupdate: " << itab.m_name);
4652       par.m_noindexkeyupdate = true;
4653       par.m_ordered = true;
4654       par.m_descending = (par.m_slno != 0);
4655       par.m_dups = false;
4656       par.m_verify = true;
4657       BSet bset(tab, itab); // empty bounds
4658       // prefer empty bounds
4659       uint sel = urandom(10);
4660       CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
4661     }
4662   }
4663   return 0;
4664 }
4665 
4666 static int
pkupdateindexbuild(Par par)4667 pkupdateindexbuild(Par par)
4668 {
4669   if (par.m_no == 0) {
4670     NdbSleep_MilliSleep(10 + urandom(100));
4671     CHK(createindex(par) == 0);
4672   } else {
4673     NdbSleep_MilliSleep(10 + urandom(100));
4674     par.m_randomkey = true;
4675     CHK(pkupdate(par) == 0);
4676   }
4677   return 0;
4678 }
4679 
4680 // savepoint tests (single thread for now)
4681 
4682 struct Spt {
4683   enum Res { Committed, Latest, Deadlock };
4684   bool m_same; // same transaction
4685   NdbOperation::LockMode m_lm;
4686   Res m_res;
4687 };
4688 
4689 static Spt sptlist[] = {
4690   { 1, NdbOperation::LM_Read, Spt::Latest },
4691   { 1, NdbOperation::LM_Exclusive, Spt::Latest },
4692   { 1, NdbOperation::LM_CommittedRead, Spt::Latest },
4693   { 0, NdbOperation::LM_Read, Spt::Deadlock },
4694   { 0, NdbOperation::LM_Exclusive, Spt::Deadlock },
4695   { 0, NdbOperation::LM_CommittedRead, Spt::Committed }
4696 };
4697 static uint sptcount = sizeof(sptlist)/sizeof(sptlist[0]);
4698 
4699 static int
savepointreadpk(Par par,Spt spt)4700 savepointreadpk(Par par, Spt spt)
4701 {
4702   LL3("savepointreadpk");
4703   Con& con = par.con();
4704   const Tab& tab = par.tab();
4705   Set& set = par.set();
4706   const Set& set1 = set;
4707   Set set2(tab, set.m_rows);
4708   uint n = 0;
4709   for (uint i = 0; i < set.m_rows; i++) {
4710     set.lock();
4711     if (!set.compat(par, i, Row::OpREAD)) {
4712       LL4("savepointreadpk SKIP " << i << " " << set.getrow(i));
4713       set.unlock();
4714       continue;
4715     }
4716     set.unlock();
4717     CHK(set2.selrow(par, *set1.m_row[i]) == 0);
4718     uint err = par.m_catcherr | Con::ErrDeadlock;
4719     ExecType et = NoCommit;
4720     CHK(con.execute(et, err) == 0);
4721     if (err) {
4722       if (err & Con::ErrDeadlock) {
4723         CHK(spt.m_res == Spt::Deadlock);
4724         // all rows have same behaviour
4725         CHK(n == 0);
4726       }
4727       LL1("savepointreadpk stop on " << con.errname(err));
4728       break;
4729     }
4730     uint i2 = (uint)-1;
4731     CHK(set2.getkey(par, &i2) == 0 && i == i2);
4732     CHK(set2.putval(i, false) == 0);
4733     LL4("row " << set2.count() << " " << set2.getrow(i));
4734     n++;
4735   }
4736   bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4737   if (spt.m_res != Spt::Deadlock)
4738     CHK(set1.verify(par, set2, false, dirty) == 0);
4739   return 0;
4740 }
4741 
4742 static int
savepointreadhashindex(Par par,Spt spt)4743 savepointreadhashindex(Par par, Spt spt)
4744 {
4745   if (spt.m_lm == NdbOperation::LM_CommittedRead && !spt.m_same) {
4746     LL1("skip hash index dirty read");
4747     return 0;
4748   }
4749   LL3("savepointreadhashindex");
4750   Con& con = par.con();
4751   const Tab& tab = par.tab();
4752   const ITab& itab = par.itab();
4753   Set& set = par.set();
4754   const Set& set1 = set;
4755   Set set2(tab, set.m_rows);
4756   uint n = 0;
4757   for (uint i = 0; i < set.m_rows; i++) {
4758     set.lock();
4759     if (!set.compat(par, i, Row::OpREAD)) {
4760       LL3("savepointreadhashindex SKIP " << i << " " << set.getrow(i));
4761       set.unlock();
4762       continue;
4763     }
4764     set.unlock();
4765     CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
4766     uint err = par.m_catcherr | Con::ErrDeadlock;
4767     ExecType et = NoCommit;
4768     CHK(con.execute(et, err) == 0);
4769     if (err) {
4770       if (err & Con::ErrDeadlock) {
4771         CHK(spt.m_res == Spt::Deadlock);
4772         // all rows have same behaviour
4773         CHK(n == 0);
4774       }
4775       LL1("savepointreadhashindex stop on " << con.errname(err));
4776       break;
4777     }
4778     uint i2 = (uint)-1;
4779     CHK(set2.getkey(par, &i2) == 0 && i == i2);
4780     CHK(set2.putval(i, false) == 0);
4781     LL4("row " << set2.count() << " " << *set2.m_row[i]);
4782     n++;
4783   }
4784   bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4785   if (spt.m_res != Spt::Deadlock)
4786     CHK(set1.verify(par, set2, false, dirty) == 0);
4787   return 0;
4788 }
4789 
4790 static int
savepointscantable(Par par,Spt spt)4791 savepointscantable(Par par, Spt spt)
4792 {
4793   LL3("savepointscantable");
4794   Con& con = par.con();
4795   const Tab& tab = par.tab();
4796   const Set& set = par.set();
4797   const Set& set1 = set; // not modifying current set
4798   Set set2(tab, set.m_rows); // scan result
4799   CHK(con.getNdbScanOperation(tab) == 0);
4800   CHK(con.readTuples(par) == 0);
4801   set2.getval(par); // getValue all columns
4802   CHK(con.executeScan() == 0);
4803   bool deadlock = false;
4804   uint n = 0;
4805   while (1) {
4806     int ret;
4807     uint err = par.m_catcherr | Con::ErrDeadlock;
4808     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4809     if (ret == 1)
4810       break;
4811     if (err) {
4812       if (err & Con::ErrDeadlock) {
4813         CHK(spt.m_res == Spt::Deadlock);
4814         // all rows have same behaviour
4815         CHK(n == 0);
4816         deadlock = true;
4817       }
4818       LL1("savepointscantable stop on " << con.errname(err));
4819       break;
4820     }
4821     CHK(spt.m_res != Spt::Deadlock);
4822     uint i = (uint)-1;
4823     CHK(set2.getkey(par, &i) == 0);
4824     CHK(set2.putval(i, false, n) == 0);
4825     LL4("row " << n << " key " << i << " " << set2.getrow(i));
4826     n++;
4827   }
4828   if (set1.m_rows > 0) {
4829     if (!deadlock)
4830       CHK(spt.m_res != Spt::Deadlock);
4831     else
4832       CHK(spt.m_res == Spt::Deadlock);
4833   }
4834   LL2("savepointscantable " << n << " rows");
4835   bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4836   if (spt.m_res != Spt::Deadlock)
4837     CHK(set1.verify(par, set2, false, dirty) == 0);
4838   return 0;
4839 }
4840 
4841 static int
savepointscanindex(Par par,Spt spt)4842 savepointscanindex(Par par, Spt spt)
4843 {
4844   LL3("savepointscanindex");
4845   Con& con = par.con();
4846   const Tab& tab = par.tab();
4847   const ITab& itab = par.itab();
4848   const Set& set = par.set();
4849   const Set& set1 = set;
4850   Set set2(tab, set.m_rows);
4851   CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4852   CHK(con.readIndexTuples(par) == 0);
4853   set2.getval(par);
4854   CHK(con.executeScan() == 0);
4855   bool deadlock = false;
4856   uint n = 0;
4857   while (1) {
4858     int ret;
4859     uint err = par.m_catcherr | Con::ErrDeadlock;
4860     CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4861     if (ret == 1)
4862       break;
4863     if (err) {
4864       if (err & Con::ErrDeadlock) {
4865         CHK(spt.m_res == Spt::Deadlock);
4866         // all rows have same behaviour
4867         CHK(n == 0);
4868         deadlock = true;
4869       }
4870       LL1("savepointscanindex stop on " << con.errname(err));
4871       break;
4872     }
4873     CHK(spt.m_res != Spt::Deadlock);
4874     uint i = (uint)-1;
4875     CHK(set2.getkey(par, &i) == 0);
4876     CHK(set2.putval(i, par.m_dups, n) == 0);
4877     LL4("row " << n << " key " << i << " " << set2.getrow(i));
4878     n++;
4879   }
4880   if (set1.m_rows > 0) {
4881     if (!deadlock)
4882       CHK(spt.m_res != Spt::Deadlock);
4883     else
4884       CHK(spt.m_res == Spt::Deadlock);
4885   }
4886   LL2("savepointscanindex " << n << " rows");
4887   bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4888   if (spt.m_res != Spt::Deadlock)
4889     CHK(set1.verify(par, set2, false, dirty) == 0);
4890   return 0;
4891 }
4892 
4893 typedef int (*SptFun)(Par, Spt);
4894 
4895 static int
savepointtest(Par par,Spt spt,SptFun fun)4896 savepointtest(Par par, Spt spt, SptFun fun)
4897 {
4898   Con& con = par.con();
4899   Par par2 = par;
4900   Con con2;
4901   if (!spt.m_same) {
4902     con2.connect(con); // copy ndb reference
4903     par2.m_con = &con2;
4904     CHK(con2.startTransaction() == 0);
4905   }
4906   par2.m_lockmode = spt.m_lm;
4907   CHK((*fun)(par2, spt) == 0);
4908   if (!spt.m_same) {
4909     con2.closeTransaction();
4910   }
4911   return 0;
4912 }
4913 
4914 static int
savepointtest(Par par,const char * op)4915 savepointtest(Par par, const char* op)
4916 {
4917   Con& con = par.con();
4918   const Tab& tab = par.tab();
4919   Set& set = par.set();
4920   LL2("savepointtest op=\"" << op << "\"");
4921   CHK(con.startTransaction() == 0);
4922   const char* p = op;
4923   char c;
4924   while ((c = *p++) != 0) {
4925     uint j;
4926     for (j = 0; j < par.m_rows; j++) {
4927       uint i = thrrow(par, j);
4928       if (c == 'c') {
4929         ExecType et = Commit;
4930         CHK(con.execute(et) == 0);
4931         set.lock();
4932         set.post(par, et);
4933         set.unlock();
4934         CHK(con.startTransaction() == 0);
4935       } else {
4936         set.lock();
4937         set.push(i);
4938         if (c == 'i') {
4939           set.calc(par, i);
4940           CHK(set.insrow(par, i) == 0);
4941         } else if (c == 'u') {
4942           set.copyval(i, tab.m_pkmask);
4943           set.calc(par, i, ~tab.m_pkmask);
4944           CHK(set.updrow(par, i) == 0);
4945         } else if (c == 'd') {
4946           set.copyval(i, tab.m_pkmask);
4947           CHK(set.delrow(par, i) == 0);
4948         } else {
4949           assert(false);
4950         }
4951         set.unlock();
4952       }
4953     }
4954   }
4955   {
4956     ExecType et = NoCommit;
4957     CHK(con.execute(et) == 0);
4958     set.lock();
4959     set.post(par, et);
4960     set.unlock();
4961   }
4962   for (uint k = 0; k < sptcount; k++) {
4963     Spt spt = sptlist[k];
4964     LL2("spt lm=" << spt.m_lm << " same=" << spt.m_same);
4965     CHK(savepointtest(par, spt, &savepointreadpk) == 0);
4966     CHK(savepointtest(par, spt, &savepointscantable) == 0);
4967     for (uint i = 0; i < tab.m_itabs; i++) {
4968       if (tab.m_itab[i] == 0)
4969         continue;
4970       const ITab& itab = *tab.m_itab[i];
4971       par.m_itab = &itab;
4972       if (itab.m_type == ITab::OrderedIndex)
4973         CHK(savepointtest(par, spt, &savepointscanindex) == 0);
4974       else
4975         CHK(savepointtest(par, spt, &savepointreadhashindex) == 0);
4976       par.m_itab = 0;
4977     }
4978   }
4979   {
4980     ExecType et = Rollback;
4981     CHK(con.execute(et) == 0);
4982     set.lock();
4983     set.post(par, et);
4984     set.unlock();
4985   }
4986   con.closeTransaction();
4987   return 0;
4988 }
4989 
4990 static int
savepointtest(Par par)4991 savepointtest(Par par)
4992 {
4993   assert(par.m_usedthreads == 1);
4994   const char* oplist[] = {
4995     // each based on previous and "c" not last
4996     "i",
4997     "icu",
4998     "uuuuu",
4999     "d",
5000     "dciuuuuud",
5001     0
5002   };
5003   int i;
5004   for (i = 0; oplist[i] != 0; i++) {
5005     CHK(savepointtest(par, oplist[i]) == 0);
5006   }
5007   return 0;
5008 }
5009 
5010 static int
halloweentest(Par par,const ITab & itab)5011 halloweentest(Par par, const ITab& itab)
5012 {
5013   LL2("halloweentest " << itab.m_name);
5014   Con& con = par.con();
5015   const Tab& tab = par.tab();
5016   Set& set = par.set();
5017   CHK(con.startTransaction() == 0);
5018   // insert 1 row
5019   uint i = 0;
5020   set.push(i);
5021   set.calc(par, i);
5022   CHK(set.insrow(par, i) == 0);
5023   CHK(con.execute(NoCommit) == 0);
5024   // scan via index until Set m_rows reached
5025   uint scancount = 0;
5026   bool stop = false;
5027   while (!stop) {
5028     par.m_lockmode = // makes no difference
5029       scancount % 2 == 0 ? NdbOperation::LM_CommittedRead :
5030                            NdbOperation::LM_Read;
5031     Set set1(tab, set.m_rows); // expected scan result
5032     Set set2(tab, set.m_rows); // actual scan result
5033     BSet bset(tab, itab);
5034     calcscanbounds(par, itab, bset, set, set1);
5035     CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
5036     CHK(con.readIndexTuples(par) == 0);
5037     CHK(bset.setbnd(par) == 0);
5038     set2.getval(par);
5039     CHK(con.executeScan() == 0);
5040     const uint savepoint = i;
5041     LL3("scancount=" << scancount << " savepoint=" << savepoint);
5042     uint n = 0;
5043     while (1) {
5044       int ret;
5045       CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
5046       if (ret == 1)
5047         break;
5048       uint k = (uint)-1;
5049       CHK(set2.getkey(par, &k) == 0);
5050       CHK(set2.putval(k, false, n) == 0);
5051       LL3("row=" << n << " key=" << k);
5052       CHK(k <= savepoint);
5053       if (++i == set.m_rows) {
5054         stop = true;
5055         break;
5056       }
5057       set.push(i);
5058       set.calc(par, i);
5059       CHK(set.insrow(par, i) == 0);
5060       CHK(con.execute(NoCommit) == 0);
5061       n++;
5062     }
5063     con.closeScan();
5064     LL3("scanrows=" << n);
5065     if (!stop) {
5066       CHK(set1.verify(par, set2, false) == 0);
5067     }
5068     scancount++;
5069   }
5070   CHK(con.execute(Commit) == 0);
5071   set.post(par, Commit);
5072   assert(set.count() == set.m_rows);
5073   CHK(pkdelete(par) == 0);
5074   return 0;
5075 }
5076 
5077 static int
halloweentest(Par par)5078 halloweentest(Par par)
5079 {
5080   assert(par.m_usedthreads == 1);
5081   const Tab& tab = par.tab();
5082   for (uint i = 0; i < tab.m_itabs; i++) {
5083     if (tab.m_itab[i] == 0)
5084       continue;
5085     const ITab& itab = *tab.m_itab[i];
5086     if (itab.m_type == ITab::OrderedIndex)
5087       CHK(halloweentest(par, itab) == 0);
5088   }
5089   return 0;
5090 }
5091 
5092 // threads
5093 
5094 typedef int (*TFunc)(Par par);
5095 enum TMode { ST = 1, MT = 2 };
5096 
5097 extern "C" { static void* runthread(void* arg); }
5098 
5099 struct Thr {
5100   enum State { Wait, Start, Stop, Exit };
5101   State m_state;
5102   Par m_par;
5103   pthread_t m_id;
5104   NdbThread* m_thread;
5105   NdbMutex* m_mutex;
5106   NdbCondition* m_cond;
5107   TFunc m_func;
5108   int m_ret;
5109   void* m_status;
5110   char m_tmp[20]; // used for debug msg prefix
5111   Thr(Par par, uint n);
5112   ~Thr();
5113   int run();
5114   void start();
5115   void stop();
5116   void exit();
5117   //
lockThr5118   void lock() {
5119     NdbMutex_Lock(m_mutex);
5120   }
unlockThr5121   void unlock() {
5122     NdbMutex_Unlock(m_mutex);
5123   }
waitThr5124   void wait() {
5125     NdbCondition_Wait(m_cond, m_mutex);
5126   }
signalThr5127   void signal() {
5128     NdbCondition_Signal(m_cond);
5129   }
joinThr5130   void join() {
5131     NdbThread_WaitFor(m_thread, &m_status);
5132     m_thread = 0;
5133   }
5134 };
5135 
Thr(Par par,uint n)5136 Thr::Thr(Par par, uint n) :
5137   m_state(Wait),
5138   m_par(par),
5139   m_thread(0),
5140   m_mutex(0),
5141   m_cond(0),
5142   m_func(0),
5143   m_ret(0),
5144   m_status(0)
5145 {
5146   m_par.m_no = n;
5147   char buf[10];
5148   sprintf(buf, "thr%03u", par.m_no);
5149   const char* name = strcpy(new char[10], buf);
5150   // mutex
5151   m_mutex = NdbMutex_Create();
5152   m_cond = NdbCondition_Create();
5153   assert(m_mutex != 0 && m_cond != 0);
5154   // run
5155   const uint stacksize = 256 * 1024;
5156   const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
5157   m_thread = NdbThread_Create(runthread, (void**)this, stacksize, name, prio);
5158 }
5159 
~Thr()5160 Thr::~Thr()
5161 {
5162   if (m_thread != 0) {
5163     NdbThread_Destroy(&m_thread);
5164     m_thread = 0;
5165   }
5166   if (m_cond != 0) {
5167     NdbCondition_Destroy(m_cond);
5168     m_cond = 0;
5169   }
5170   if (m_mutex != 0) {
5171     NdbMutex_Destroy(m_mutex);
5172     m_mutex = 0;
5173   }
5174 }
5175 
5176 static void*
runthread(void * arg)5177 runthread(void* arg)
5178 {
5179   Thr& thr = *(Thr*)arg;
5180   thr.m_id = pthread_self();
5181   if (thr.run() < 0) {
5182     LL1("exit on error");
5183   } else {
5184     LL4("exit ok");
5185   }
5186   return 0;
5187 }
5188 
5189 int
run()5190 Thr::run()
5191 {
5192   LL4("run");
5193   Con con;
5194   CHK(con.connect() == 0);
5195   m_par.m_con = &con;
5196   LL4("connected");
5197   while (1) {
5198     lock();
5199     while (m_state != Start && m_state != Exit) {
5200       LL4("wait");
5201       wait();
5202     }
5203     if (m_state == Exit) {
5204       LL4("exit");
5205       unlock();
5206       break;
5207     }
5208     LL4("start");
5209     assert(m_state == Start);
5210     m_ret = (*m_func)(m_par);
5211     m_state = Stop;
5212     LL4("stop");
5213     signal();
5214     unlock();
5215     if (m_ret == -1) {
5216       if (m_par.m_cont)
5217         LL1("continue running due to -cont");
5218       else
5219         return -1;
5220     }
5221   }
5222   con.disconnect();
5223   return 0;
5224 }
5225 
5226 void
start()5227 Thr::start()
5228 {
5229   lock();
5230   m_state = Start;
5231   signal();
5232   unlock();
5233 }
5234 
5235 void
stop()5236 Thr::stop()
5237 {
5238   lock();
5239   while (m_state != Stop)
5240     wait();
5241   m_state = Wait;
5242   unlock();
5243 }
5244 
5245 void
exit()5246 Thr::exit()
5247 {
5248   lock();
5249   m_state = Exit;
5250   signal();
5251   unlock();
5252 }
5253 
5254 // test run
5255 
5256 static Thr** g_thrlist = 0;
5257 
5258 static Thr*
getthr()5259 getthr()
5260 {
5261   if (g_thrlist != 0) {
5262     pthread_t id = pthread_self();
5263     for (uint n = 0; n < g_opt.m_threads; n++) {
5264       if (g_thrlist[n] != 0) {
5265         Thr& thr = *g_thrlist[n];
5266         if (pthread_equal(thr.m_id, id))
5267           return &thr;
5268       }
5269     }
5270   }
5271   return 0;
5272 }
5273 
5274 // for debug messages (par.m_no not available)
5275 static const char*
getthrprefix()5276 getthrprefix()
5277 {
5278   Thr* thrp = getthr();
5279   if (thrp != 0) {
5280     Thr& thr = *thrp;
5281     uint n = thr.m_par.m_no;
5282     uint m =
5283       g_opt.m_threads < 10 ? 1 :
5284       g_opt.m_threads < 100 ? 2 : 3;
5285     sprintf(thr.m_tmp, "[%0*u] ", m, n);
5286     return thr.m_tmp;
5287   }
5288   return "";
5289 }
5290 
5291 static int
runstep(Par par,const char * fname,TFunc func,uint mode)5292 runstep(Par par, const char* fname, TFunc func, uint mode)
5293 {
5294   LL2("step: " << fname);
5295   const int threads = (mode & ST ? 1 : par.m_usedthreads);
5296   int n;
5297   for (n = 0; n < threads; n++) {
5298     LL4("start " << n);
5299     Thr& thr = *g_thrlist[n];
5300     Par oldpar = thr.m_par;
5301     // update parameters
5302     thr.m_par = par;
5303     thr.m_par.m_no = oldpar.m_no;
5304     thr.m_par.m_con = oldpar.m_con;
5305     thr.m_func = func;
5306     thr.start();
5307   }
5308   uint errs = 0;
5309   for (n = threads - 1; n >= 0; n--) {
5310     LL4("stop " << n);
5311     Thr& thr = *g_thrlist[n];
5312     thr.stop();
5313     if (thr.m_ret != 0)
5314       errs++;
5315   }
5316   CHK(errs == 0);
5317   return 0;
5318 }
5319 
5320 #define RUNSTEP(par, func, mode) \
5321   CHK(runstep(par, #func, func, mode) == 0)
5322 
5323 #define SUBLOOP(par) \
5324   "sloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
5325   par.m_tab->m_name << "/" << par.m_slno
5326 
5327 static int
tbuild(Par par)5328 tbuild(Par par)
5329 {
5330   RUNSTEP(par, droptable, ST);
5331   RUNSTEP(par, createtable, ST);
5332   RUNSTEP(par, invalidatetable, MT);
5333   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5334     LL1(SUBLOOP(par));
5335     if (par.m_slno % 3 == 0) {
5336       RUNSTEP(par, createindex, ST);
5337       RUNSTEP(par, invalidateindex, MT);
5338       RUNSTEP(par, pkinsert, MT);
5339       RUNSTEP(par, pkupdate, MT);
5340     } else if (par.m_slno % 3 == 1) {
5341       RUNSTEP(par, pkinsert, MT);
5342       RUNSTEP(par, createindex, ST);
5343       RUNSTEP(par, invalidateindex, MT);
5344       RUNSTEP(par, pkupdate, MT);
5345     } else {
5346       RUNSTEP(par, pkinsert, MT);
5347       RUNSTEP(par, pkupdate, MT);
5348       RUNSTEP(par, createindex, ST);
5349       RUNSTEP(par, invalidateindex, MT);
5350     }
5351     RUNSTEP(par, readverifyfull, MT);
5352     // leave last one
5353     if (par.m_slno + 1 < par.m_sloop) {
5354       RUNSTEP(par, pkdelete, MT);
5355       RUNSTEP(par, readverifyfull, MT);
5356       RUNSTEP(par, dropindex, ST);
5357     }
5358   }
5359   return 0;
5360 }
5361 
5362 static int
tindexscan(Par par)5363 tindexscan(Par par)
5364 {
5365   RUNSTEP(par, droptable, ST);
5366   RUNSTEP(par, createtable, ST);
5367   RUNSTEP(par, invalidatetable, MT);
5368   RUNSTEP(par, createindex, ST);
5369   RUNSTEP(par, invalidateindex, MT);
5370   RUNSTEP(par, pkinsert, MT);
5371   RUNSTEP(par, readverifyfull, MT);
5372   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5373     LL1(SUBLOOP(par));
5374     RUNSTEP(par, readverifyindex, MT);
5375   }
5376   return 0;
5377 }
5378 
5379 
5380 static int
tpkops(Par par)5381 tpkops(Par par)
5382 {
5383   RUNSTEP(par, droptable, ST);
5384   RUNSTEP(par, createtable, ST);
5385   RUNSTEP(par, invalidatetable, MT);
5386   RUNSTEP(par, createindex, ST);
5387   RUNSTEP(par, invalidateindex, MT);
5388   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5389     LL1(SUBLOOP(par));
5390     RUNSTEP(par, pkops, MT);
5391     LL2("rows=" << par.set().count());
5392     RUNSTEP(par, readverifyfull, MT);
5393   }
5394   return 0;
5395 }
5396 
5397 static int
tpkopsread(Par par)5398 tpkopsread(Par par)
5399 {
5400   RUNSTEP(par, droptable, ST);
5401   RUNSTEP(par, createtable, ST);
5402   RUNSTEP(par, invalidatetable, MT);
5403   RUNSTEP(par, pkinsert, MT);
5404   RUNSTEP(par, createindex, ST);
5405   RUNSTEP(par, invalidateindex, MT);
5406   RUNSTEP(par, readverifyfull, MT);
5407   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5408     LL1(SUBLOOP(par));
5409     RUNSTEP(par, pkupdatescanread, MT);
5410     RUNSTEP(par, readverifyfull, MT);
5411   }
5412   RUNSTEP(par, pkdelete, MT);
5413   RUNSTEP(par, readverifyfull, MT);
5414   return 0;
5415 }
5416 
5417 static int
tmixedops(Par par)5418 tmixedops(Par par)
5419 {
5420   RUNSTEP(par, droptable, ST);
5421   RUNSTEP(par, createtable, ST);
5422   RUNSTEP(par, invalidatetable, MT);
5423   RUNSTEP(par, pkinsert, MT);
5424   RUNSTEP(par, createindex, ST);
5425   RUNSTEP(par, invalidateindex, MT);
5426   RUNSTEP(par, readverifyfull, MT);
5427   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5428     LL1(SUBLOOP(par));
5429     RUNSTEP(par, mixedoperations, MT);
5430     RUNSTEP(par, readverifyfull, MT);
5431   }
5432   return 0;
5433 }
5434 
5435 static int
tbusybuild(Par par)5436 tbusybuild(Par par)
5437 {
5438   RUNSTEP(par, droptable, ST);
5439   RUNSTEP(par, createtable, ST);
5440   RUNSTEP(par, invalidatetable, MT);
5441   RUNSTEP(par, pkinsert, MT);
5442   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5443     LL1(SUBLOOP(par));
5444     RUNSTEP(par, pkupdateindexbuild, MT);
5445     RUNSTEP(par, invalidateindex, MT);
5446     RUNSTEP(par, readverifyfull, MT);
5447     RUNSTEP(par, dropindex, ST);
5448   }
5449   return 0;
5450 }
5451 
5452 static int
trollback(Par par)5453 trollback(Par par)
5454 {
5455   par.m_abortpct = 50;
5456   RUNSTEP(par, droptable, ST);
5457   RUNSTEP(par, createtable, ST);
5458   RUNSTEP(par, invalidatetable, MT);
5459   RUNSTEP(par, pkinsert, MT);
5460   RUNSTEP(par, createindex, ST);
5461   RUNSTEP(par, invalidateindex, MT);
5462   RUNSTEP(par, readverifyfull, MT);
5463   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5464     LL1(SUBLOOP(par));
5465     RUNSTEP(par, mixedoperations, MT);
5466     RUNSTEP(par, readverifyfull, MT);
5467   }
5468   return 0;
5469 }
5470 
5471 static int
tparupdate(Par par)5472 tparupdate(Par par)
5473 {
5474   RUNSTEP(par, droptable, ST);
5475   RUNSTEP(par, createtable, ST);
5476   RUNSTEP(par, invalidatetable, MT);
5477   RUNSTEP(par, pkinsert, MT);
5478   RUNSTEP(par, createindex, ST);
5479   RUNSTEP(par, invalidateindex, MT);
5480   RUNSTEP(par, readverifyfull, MT);
5481   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5482     LL1(SUBLOOP(par));
5483     RUNSTEP(par, parallelorderedupdate, MT);
5484     RUNSTEP(par, readverifyfull, MT);
5485   }
5486   return 0;
5487 }
5488 
5489 static int
tsavepoint(Par par)5490 tsavepoint(Par par)
5491 {
5492   RUNSTEP(par, droptable, ST);
5493   RUNSTEP(par, createtable, ST);
5494   RUNSTEP(par, invalidatetable, MT);
5495   RUNSTEP(par, createindex, ST);
5496   RUNSTEP(par, invalidateindex, MT);
5497   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5498     LL1(SUBLOOP(par));
5499     RUNSTEP(par, savepointtest, MT);
5500     RUNSTEP(par, readverifyfull, MT);
5501   }
5502   return 0;
5503 }
5504 
5505 static int
thalloween(Par par)5506 thalloween(Par par)
5507 {
5508   RUNSTEP(par, droptable, ST);
5509   RUNSTEP(par, createtable, ST);
5510   RUNSTEP(par, invalidatetable, MT);
5511   RUNSTEP(par, createindex, ST);
5512   RUNSTEP(par, invalidateindex, MT);
5513   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5514     LL1(SUBLOOP(par));
5515     RUNSTEP(par, halloweentest, MT);
5516   }
5517   return 0;
5518 }
5519 
5520 static int
ttimebuild(Par par)5521 ttimebuild(Par par)
5522 {
5523   Tmr t1;
5524   RUNSTEP(par, droptable, ST);
5525   RUNSTEP(par, createtable, ST);
5526   RUNSTEP(par, invalidatetable, MT);
5527   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5528     LL1(SUBLOOP(par));
5529     RUNSTEP(par, pkinsert, MT);
5530     t1.on();
5531     RUNSTEP(par, createindex, ST);
5532     t1.off(par.m_totrows);
5533     RUNSTEP(par, invalidateindex, MT);
5534     RUNSTEP(par, dropindex, ST);
5535   }
5536   LL1("build index - " << t1.time());
5537   return 0;
5538 }
5539 
5540 static int
ttimemaint(Par par)5541 ttimemaint(Par par)
5542 {
5543   Tmr t1, t2;
5544   RUNSTEP(par, droptable, ST);
5545   RUNSTEP(par, createtable, ST);
5546   RUNSTEP(par, invalidatetable, MT);
5547   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5548     LL1(SUBLOOP(par));
5549     RUNSTEP(par, pkinsert, MT);
5550     t1.on();
5551     RUNSTEP(par, pkupdate, MT);
5552     t1.off(par.m_totrows);
5553     RUNSTEP(par, createindex, ST);
5554     RUNSTEP(par, invalidateindex, MT);
5555     t2.on();
5556     RUNSTEP(par, pkupdate, MT);
5557     t2.off(par.m_totrows);
5558     RUNSTEP(par, dropindex, ST);
5559   }
5560   LL1("update - " << t1.time());
5561   LL1("update indexed - " << t2.time());
5562   LL1("overhead - " << t2.over(t1));
5563   return 0;
5564 }
5565 
5566 static int
ttimescan(Par par)5567 ttimescan(Par par)
5568 {
5569   if (par.tab().m_itab[0] == 0) {
5570     LL1("ttimescan - no index 0, skipped");
5571     return 0;
5572   }
5573   Tmr t1, t2;
5574   RUNSTEP(par, droptable, ST);
5575   RUNSTEP(par, createtable, ST);
5576   RUNSTEP(par, invalidatetable, MT);
5577   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5578     LL1(SUBLOOP(par));
5579     RUNSTEP(par, pkinsert, MT);
5580     RUNSTEP(par, createindex, ST);
5581     par.m_tmr = &t1;
5582     RUNSTEP(par, timescantable, ST);
5583     par.m_tmr = &t2;
5584     RUNSTEP(par, timescanpkindex, ST);
5585     RUNSTEP(par, dropindex, ST);
5586   }
5587   LL1("full scan table - " << t1.time());
5588   LL1("full scan PK index - " << t2.time());
5589   LL1("overhead - " << t2.over(t1));
5590   return 0;
5591 }
5592 
5593 static int
ttimepkread(Par par)5594 ttimepkread(Par par)
5595 {
5596   if (par.tab().m_itab[0] == 0) {
5597     LL1("ttimescan - no index 0, skipped");
5598     return 0;
5599   }
5600   Tmr t1, t2;
5601   RUNSTEP(par, droptable, ST);
5602   RUNSTEP(par, createtable, ST);
5603   RUNSTEP(par, invalidatetable, MT);
5604   for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5605     LL1(SUBLOOP(par));
5606     RUNSTEP(par, pkinsert, MT);
5607     RUNSTEP(par, createindex, ST);
5608     par.m_tmr = &t1;
5609     RUNSTEP(par, timepkreadtable, ST);
5610     par.m_tmr = &t2;
5611     RUNSTEP(par, timepkreadindex, ST);
5612     RUNSTEP(par, dropindex, ST);
5613   }
5614   LL1("pk read table - " << t1.time());
5615   LL1("pk read PK index - " << t2.time());
5616   LL1("overhead - " << t2.over(t1));
5617   return 0;
5618 }
5619 
5620 static int
tdrop(Par par)5621 tdrop(Par par)
5622 {
5623   RUNSTEP(par, droptable, ST);
5624   return 0;
5625 }
5626 
5627 struct TCase {
5628   const char* m_name;
5629   TFunc m_func;
5630   const char* m_desc;
TCaseTCase5631   TCase(const char* name, TFunc func, const char* desc) :
5632     m_name(name),
5633     m_func(func),
5634     m_desc(desc) {
5635   }
5636 };
5637 
5638 static const TCase
5639 tcaselist[] = {
5640   TCase("a", tbuild, "index build"),
5641   TCase("b", tindexscan, "index scans"),
5642   TCase("c", tpkops, "pk operations"),
5643   TCase("d", tpkopsread, "pk operations and scan reads"),
5644   TCase("e", tmixedops, "pk operations and scan operations"),
5645   TCase("f", tbusybuild, "pk operations and index build"),
5646   TCase("g", trollback, "operations with random rollbacks"),
5647   TCase("h", tparupdate, "parallel ordered update bug#20446"),
5648   TCase("i", tsavepoint, "savepoint test locking bug#31477"),
5649   TCase("j", thalloween, "savepoint test halloween problem"),
5650   TCase("t", ttimebuild, "time index build"),
5651   TCase("u", ttimemaint, "time index maintenance"),
5652   TCase("v", ttimescan, "time full scan table vs index on pk"),
5653   TCase("w", ttimepkread, "time pk read table vs index on pk"),
5654   TCase("z", tdrop, "drop test tables")
5655 };
5656 
5657 static const uint
5658 tcasecount = sizeof(tcaselist) / sizeof(tcaselist[0]);
5659 
5660 static void
printcases()5661 printcases()
5662 {
5663   ndbout << "test cases:" << endl;
5664   for (uint i = 0; i < tcasecount; i++) {
5665     const TCase& tcase = tcaselist[i];
5666     ndbout << "  " << tcase.m_name << " - " << tcase.m_desc << endl;
5667   }
5668 }
5669 
5670 static void
printtables()5671 printtables()
5672 {
5673   Par par(g_opt);
5674   makebuiltintables(par);
5675   ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
5676   for (uint j = 0; j < tabcount; j++) {
5677     if (tablist[j] == 0)
5678       continue;
5679     const Tab& tab = *tablist[j];
5680     const char* tname = tab.m_name;
5681     ndbout << "  " << tname;
5682     for (uint i = 0; i < tab.m_itabs; i++) {
5683       if (tab.m_itab[i] == 0)
5684         continue;
5685       const ITab& itab = *tab.m_itab[i];
5686       const char* iname = itab.m_name;
5687       if (strncmp(tname, iname, strlen(tname)) == 0)
5688         iname += strlen(tname);
5689       ndbout << " " << iname;
5690       ndbout << "(";
5691       for (uint k = 0; k < itab.m_icols; k++) {
5692         if (k != 0)
5693           ndbout << ",";
5694         const ICol& icol = *itab.m_icol[k];
5695         const Col& col = icol.m_col;
5696         ndbout << col.m_name;
5697       }
5698       ndbout << ")";
5699     }
5700     ndbout << endl;
5701   }
5702 }
5703 
5704 static bool
setcasepar(Par & par)5705 setcasepar(Par& par)
5706 {
5707   Opt d;
5708   const char* c = par.m_currcase;
5709   switch (c[0]) {
5710   case 'i':
5711     {
5712       if (par.m_usedthreads > 1) {
5713         par.m_usedthreads = 1;
5714         LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5715       }
5716       const uint rows = 100;
5717       if (par.m_rows > rows) {
5718         par.m_rows = rows;
5719         LL1("case " << c << " reduce rows to " << rows);
5720       }
5721     }
5722     break;
5723   case 'j':
5724     {
5725       if (par.m_usedthreads > 1) {
5726         par.m_usedthreads = 1;
5727         LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5728       }
5729     }
5730     break;
5731   default:
5732     break;
5733   }
5734   return true;
5735 }
5736 
5737 static int
runtest(Par par)5738 runtest(Par par)
5739 {
5740   int totret = 0;
5741   if (par.m_seed == -1) {
5742     // good enough for daily run
5743     ushort seed = (ushort)getpid();
5744     LL0("random seed: " << seed);
5745     srandom((uint)seed);
5746   } else if (par.m_seed != 0) {
5747     LL0("random seed: " << par.m_seed);
5748     srandom(par.m_seed);
5749   } else {
5750     LL0("random seed: loop number");
5751   }
5752   // cs
5753   assert(par.m_csname != 0);
5754   if (strcmp(par.m_csname, "random") != 0) {
5755     CHARSET_INFO* cs;
5756     CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
5757     par.m_cs = cs;
5758   }
5759   // con
5760   Con con;
5761   CHK(con.connect() == 0);
5762   par.m_con = &con;
5763   par.m_catcherr |= Con::ErrNospace;
5764   // threads
5765   g_thrlist = new Thr* [par.m_threads];
5766   uint n;
5767   for (n = 0; n < par.m_threads; n++) {
5768     g_thrlist[n] = 0;
5769   }
5770   for (n = 0; n < par.m_threads; n++) {
5771     g_thrlist[n] = new Thr(par, n);
5772     Thr& thr = *g_thrlist[n];
5773     assert(thr.m_thread != 0);
5774   }
5775   for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
5776     LL1("loop: " << par.m_lno);
5777     if (par.m_seed == 0) {
5778       LL1("random seed: " << par.m_lno);
5779       srandom(par.m_lno);
5780     }
5781     for (uint i = 0; i < tcasecount; i++) {
5782       const TCase& tcase = tcaselist[i];
5783       if ((par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) ||
5784           (par.m_skip != 0 && strchr(par.m_skip, tcase.m_name[0]) != 0)) {
5785         continue;
5786       }
5787       sprintf(par.m_currcase, "%c", tcase.m_name[0]);
5788       par.m_usedthreads = par.m_threads;
5789       if (!setcasepar(par)) {
5790         LL1("case " << tcase.m_name << " cannot run with given options");
5791         continue;
5792       }
5793       par.m_totrows = par.m_usedthreads * par.m_rows;
5794       makebuiltintables(par);
5795       LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
5796       for (uint j = 0; j < tabcount; j++) {
5797         if (tablist[j] == 0)
5798           continue;
5799         const Tab& tab = *tablist[j];
5800         par.m_tab = &tab;
5801         par.m_set = new Set(tab, par.m_totrows);
5802         LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
5803         int ret = tcase.m_func(par);
5804         delete par.m_set;
5805         par.m_set = 0;
5806         if (ret == -1) {
5807           if (!par.m_cont)
5808             return -1;
5809           totret = -1;
5810           LL1("continue to next case due to -cont");
5811           break;
5812         }
5813       }
5814     }
5815   }
5816   for (n = 0; n < par.m_threads; n++) {
5817     Thr& thr = *g_thrlist[n];
5818     thr.exit();
5819   }
5820   for (n = 0; n < par.m_threads; n++) {
5821     Thr& thr = *g_thrlist[n];
5822     thr.join();
5823     delete &thr;
5824   }
5825   delete [] g_thrlist;
5826   g_thrlist = 0;
5827   con.disconnect();
5828   return totret;
5829 }
5830 
5831 static const char* g_progname = "testOIBasic";
5832 
5833 int
main(int argc,char ** argv)5834 main(int argc,  char** argv)
5835 {
5836   ndb_init();
5837   uint i;
5838   ndbout << g_progname;
5839   for (i = 1; i < (uint) argc; i++)
5840     ndbout << " " << argv[i];
5841   ndbout << endl;
5842   ndbout_mutex = NdbMutex_Create();
5843   while (++argv, --argc > 0) {
5844     const char* arg = argv[0];
5845     if (*arg != '-') {
5846       ndbout << "testOIBasic: unknown argument " << arg;
5847       goto usage;
5848     }
5849     if (strcmp(arg, "-batch") == 0) {
5850       if (++argv, --argc > 0) {
5851         g_opt.m_batch = atoi(argv[0]);
5852         continue;
5853       }
5854     }
5855     if (strcmp(arg, "-bound") == 0) {
5856       if (++argv, --argc > 0) {
5857         const char* p = argv[0];
5858         if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) {
5859           g_opt.m_bound = strdup(p);
5860           continue;
5861         }
5862       }
5863     }
5864     if (strcmp(arg, "-case") == 0) {
5865       if (++argv, --argc > 0) {
5866         g_opt.m_case = strdup(argv[0]);
5867         continue;
5868       }
5869     }
5870     if (strcmp(arg, "-collsp") == 0) {
5871       g_opt.m_collsp = true;
5872       continue;
5873     }
5874     if (strcmp(arg, "-cont") == 0) {
5875       g_opt.m_cont = true;
5876       continue;
5877     }
5878     if (strcmp(arg, "-core") == 0) {
5879       g_opt.m_core = true;
5880       continue;
5881     }
5882     if (strcmp(arg, "-csname") == 0) {
5883       if (++argv, --argc > 0) {
5884         g_opt.m_csname = strdup(argv[0]);
5885         continue;
5886       }
5887     }
5888     if (strcmp(arg, "-die") == 0) {
5889       if (++argv, --argc > 0) {
5890         g_opt.m_die = atoi(argv[0]);
5891         continue;
5892       }
5893     }
5894     if (strcmp(arg, "-dups") == 0) {
5895       g_opt.m_dups = true;
5896       continue;
5897     }
5898     if (strcmp(arg, "-fragtype") == 0) {
5899       if (++argv, --argc > 0) {
5900         if (strcmp(argv[0], "single") == 0) {
5901           g_opt.m_fragtype = NdbDictionary::Object::FragSingle;
5902           continue;
5903         }
5904         if (strcmp(argv[0], "small") == 0) {
5905           g_opt.m_fragtype = NdbDictionary::Object::FragAllSmall;
5906           continue;
5907         }
5908         if (strcmp(argv[0], "medium") == 0) {
5909           g_opt.m_fragtype = NdbDictionary::Object::FragAllMedium;
5910           continue;
5911         }
5912         if (strcmp(argv[0], "large") == 0) {
5913           g_opt.m_fragtype = NdbDictionary::Object::FragAllLarge;
5914           continue;
5915         }
5916       }
5917     }
5918     if (strcmp(arg, "-index") == 0) {
5919       if (++argv, --argc > 0) {
5920         g_opt.m_index = strdup(argv[0]);
5921         continue;
5922       }
5923     }
5924     if (strcmp(arg, "-loop") == 0) {
5925       if (++argv, --argc > 0) {
5926         g_opt.m_loop = atoi(argv[0]);
5927         continue;
5928       }
5929     }
5930     if (strcmp(arg, "-mrrmaxrng") == 0) {
5931       if (++argv, --argc > 0) {
5932         g_opt.m_mrrmaxrng = atoi(argv[0]);
5933         continue;
5934       }
5935     }
5936     if (strcmp(arg, "-nologging") == 0) {
5937       g_opt.m_nologging = true;
5938       continue;
5939     }
5940     if (strcmp(arg, "-noverify") == 0) {
5941       g_opt.m_noverify = true;
5942       continue;
5943     }
5944     if (strcmp(arg, "-pctmrr") == 0) {
5945       if (++argv, --argc > 0) {
5946         g_opt.m_pctmrr = atoi(argv[0]);
5947         continue;
5948       }
5949     }
5950     if (strcmp(arg, "-pctnull") == 0) {
5951       if (++argv, --argc > 0) {
5952         g_opt.m_pctnull = atoi(argv[0]);
5953         continue;
5954       }
5955     }
5956     if (strcmp(arg, "-rows") == 0) {
5957       if (++argv, --argc > 0) {
5958         g_opt.m_rows = atoi(argv[0]);
5959         continue;
5960       }
5961     }
5962     if (strcmp(arg, "-samples") == 0) {
5963       if (++argv, --argc > 0) {
5964         g_opt.m_samples = atoi(argv[0]);
5965         continue;
5966       }
5967     }
5968     if (strcmp(arg, "-scanbatch") == 0) {
5969       if (++argv, --argc > 0) {
5970         g_opt.m_scanbatch = atoi(argv[0]);
5971         continue;
5972       }
5973     }
5974     if (strcmp(arg, "-scanpar") == 0) {
5975       if (++argv, --argc > 0) {
5976         g_opt.m_scanpar = atoi(argv[0]);
5977         continue;
5978       }
5979     }
5980     if (strcmp(arg, "-seed") == 0) {
5981       if (++argv, --argc > 0) {
5982         g_opt.m_seed = atoi(argv[0]);
5983         continue;
5984       }
5985     }
5986     if (strcmp(arg, "-skip") == 0) {
5987       if (++argv, --argc > 0) {
5988         g_opt.m_skip = strdup(argv[0]);
5989         continue;
5990       }
5991     }
5992     if (strcmp(arg, "-sloop") == 0) {
5993       if (++argv, --argc > 0) {
5994         g_opt.m_sloop = atoi(argv[0]);
5995         continue;
5996       }
5997     }
5998     if (strcmp(arg, "-ssloop") == 0) {
5999       if (++argv, --argc > 0) {
6000         g_opt.m_ssloop = atoi(argv[0]);
6001         continue;
6002       }
6003     }
6004     if (strcmp(arg, "-table") == 0) {
6005       if (++argv, --argc > 0) {
6006         g_opt.m_table = strdup(argv[0]);
6007         continue;
6008       }
6009     }
6010     if (strcmp(arg, "-threads") == 0) {
6011       if (++argv, --argc > 0) {
6012         g_opt.m_threads = atoi(argv[0]);
6013         if (1 <= g_opt.m_threads)
6014           continue;
6015       }
6016     }
6017     if (strcmp(arg, "-v") == 0) {
6018       if (++argv, --argc > 0) {
6019         g_opt.m_v = atoi(argv[0]);
6020         continue;
6021       }
6022     }
6023     if (strncmp(arg, "-v", 2) == 0 && isdigit(arg[2])) {
6024       g_opt.m_v = atoi(&arg[2]);
6025       continue;
6026     }
6027     if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
6028       printhelp();
6029       goto wrongargs;
6030     }
6031     ndbout << "testOIBasic: bad or unknown option " << arg;
6032     goto usage;
6033   }
6034   {
6035     Par par(g_opt);
6036     g_ncc = new Ndb_cluster_connection();
6037     if (g_ncc->connect(30) != 0 || runtest(par) < 0)
6038       goto failed;
6039     delete g_ncc;
6040     g_ncc = 0;
6041   }
6042 // ok
6043   return NDBT_ProgramExit(NDBT_OK);
6044 failed:
6045   return NDBT_ProgramExit(NDBT_FAILED);
6046 usage:
6047   ndbout << " (use -h for help)" << endl;
6048 wrongargs:
6049   return NDBT_ProgramExit(NDBT_WRONGARGS);
6050 }
6051 
6052 // vim: set sw=2 et:
6053