1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /*
26 * testOIBasic - ordered index test
27 *
28 * dummy push to re-created mysql-5.1-telco ...
29 */
30
31 #include <ndb_global.h>
32
33 #include <NdbMain.h>
34 #include <NdbOut.hpp>
35 #include <NdbApi.hpp>
36 #include <NdbTest.hpp>
37 #include <NdbMutex.h>
38 #include <NdbCondition.h>
39 #include <NdbThread.h>
40 #include <NdbTick.h>
41 #include <NdbSleep.h>
42 #include <my_sys.h>
43 #include <NdbSqlUtil.hpp>
44 #include <ndb_version.h>
45
46 // options
47
48 struct Opt {
49 // common options
50 uint m_batch;
51 const char* m_bound;
52 const char* m_case;
53 bool m_collsp;
54 bool m_cont;
55 bool m_core;
56 const char* m_csname;
57 CHARSET_INFO* m_cs;
58 int m_die;
59 bool m_dups;
60 NdbDictionary::Object::FragmentType m_fragtype;
61 const char* m_index;
62 uint m_loop;
63 uint m_mrrmaxrng;
64 bool m_msglock;
65 bool m_nologging;
66 bool m_noverify;
67 uint m_pctmrr;
68 uint m_pctnull;
69 uint m_rows;
70 uint m_samples;
71 uint m_scanbatch;
72 uint m_scanpar;
73 uint m_scanstop;
74 int m_seed;
75 const char* m_skip;
76 uint m_sloop;
77 uint m_ssloop;
78 const char* m_table;
79 uint m_threads;
80 int m_v; // int for lint
OptOpt81 Opt() :
82 m_batch(32),
83 m_bound("01234"),
84 m_case(0),
85 m_collsp(false),
86 m_cont(false),
87 m_core(false),
88 m_csname("random"),
89 m_cs(0),
90 m_die(0),
91 m_dups(false),
92 m_fragtype(NdbDictionary::Object::FragUndefined),
93 m_index(0),
94 m_loop(1),
95 m_mrrmaxrng(10),
96 m_msglock(true),
97 m_nologging(false),
98 m_noverify(false),
99 m_pctmrr(50),
100 m_pctnull(10),
101 m_rows(1000),
102 m_samples(0),
103 m_scanbatch(0),
104 m_scanpar(0),
105 m_scanstop(0),
106 m_seed(-1),
107 m_skip(0),
108 m_sloop(4),
109 m_ssloop(4),
110 m_table(0),
111 m_threads(4),
112 m_v(1) {
113 }
114 };
115
116 static Opt g_opt;
117
118 static void printcases();
119 static void printtables();
120
121 static void
printhelp()122 printhelp()
123 {
124 Opt d;
125 ndbout
126 << "usage: testOIbasic [options]" << endl
127 << " -batch N pk operations in batch [" << d.m_batch << "]" << endl
128 << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
129 << " -case abc only given test cases (letters a-z)" << endl
130 << " -collsp use strnncollsp instead of strnxfrm" << endl
131 << " -cont on error continue to next test case [" << d.m_cont << "]" << endl
132 << " -core core dump on error [" << d.m_core << "]" << endl
133 << " -csname S charset or collation [" << d.m_csname << "]" << endl
134 << " -die nnn exit immediately on NDB error code nnn" << endl
135 << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
136 << " -fragtype T fragment type single/small/medium/large" << endl
137 << " -index xyz only given index numbers (digits 0-9)" << endl
138 << " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
139 << " -mrrmaxrng N max ranges to supply for MRR scan [" << d.m_mrrmaxrng << "]" << endl
140 << " -nologging create tables in no-logging mode" << endl
141 << " -noverify skip index verifications" << endl
142 << " -pctmrr N pct of index scans to use MRR [" << d.m_pctmrr << "]" << endl
143 << " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
144 << " -rows N rows per thread [" << d.m_rows << "]" << endl
145 << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
146 << " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
147 << " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
148 << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
149 << " -skip abc skip given test cases (letters a-z)" << endl
150 << " -sloop N level 2 (sub)loop count [" << d.m_sloop << "]" << endl
151 << " -ssloop N level 3 (sub)loop count [" << d.m_ssloop << "]" << endl
152 << " -table xyz only given table numbers (digits 0-9)" << endl
153 << " -threads N number of threads [" << d.m_threads << "]" << endl
154 << " -vN verbosity [" << d.m_v << "]" << endl
155 << " -h or -help print this help text" << endl
156 ;
157 printcases();
158 printtables();
159 }
160
161 // not yet configurable
162 static const bool g_store_null_key = true;
163
164 // compare NULL like normal value (NULL < not NULL, NULL == NULL)
165 static const bool g_compare_null = true;
166
167 static const char* hexstr = "0123456789abcdef";
168
169 // random ints
170 #ifdef NDB_WIN
171 #define random() rand()
172 #define srandom(SEED) srand(SEED)
173 #endif
174
175 static uint
urandom(uint n)176 urandom(uint n)
177 {
178 if (n == 0)
179 return 0;
180 uint i = random() % n;
181 return i;
182 }
183
184 static int
irandom(uint n)185 irandom(uint n)
186 {
187 if (n == 0)
188 return 0;
189 int i = random() % n;
190 if (random() & 0x1)
191 i = -i;
192 return i;
193 }
194
195 static bool
randompct(uint pct)196 randompct(uint pct)
197 {
198 if (pct == 0)
199 return false;
200 if (pct >= 100)
201 return true;
202 return urandom(100) < pct;
203 }
204
205 static uint
random_coprime(uint n)206 random_coprime(uint n)
207 {
208 uint prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
209 uint count = sizeof(prime) / sizeof(prime[0]);
210 if (n == 0)
211 return 0;
212 while (1) {
213 uint i = urandom(count);
214 if (n % prime[i] != 0)
215 return prime[i];
216 }
217 }
218
219 // random re-sequence of 0...(n-1)
220
221 struct Rsq {
222 Rsq(uint n);
223 uint next();
224 private:
225 uint m_n;
226 uint m_i;
227 uint m_start;
228 uint m_prime;
229 };
230
Rsq(uint n)231 Rsq::Rsq(uint n)
232 {
233 m_n = n;
234 m_i = 0;
235 m_start = urandom(n);
236 m_prime = random_coprime(n);
237 }
238
239 uint
next()240 Rsq::next()
241 {
242 assert(m_n != 0);
243 return (m_start + m_i++ * m_prime) % m_n;
244 }
245
246 // log and error macros
247
248 static NdbMutex *ndbout_mutex = NULL;
249 static const char* getthrprefix();
250
251 #define LLN(n, s) \
252 do { \
253 if ((n) > g_opt.m_v) break; \
254 if (g_opt.m_msglock) NdbMutex_Lock(ndbout_mutex); \
255 ndbout << getthrprefix(); \
256 if ((n) > 2) \
257 ndbout << "line " << __LINE__ << ": "; \
258 ndbout << s << endl; \
259 if (g_opt.m_msglock) NdbMutex_Unlock(ndbout_mutex); \
260 } while(0)
261
262 #define LL0(s) LLN(0, s)
263 #define LL1(s) LLN(1, s)
264 #define LL2(s) LLN(2, s)
265 #define LL3(s) LLN(3, s)
266 #define LL4(s) LLN(4, s)
267 #define LL5(s) LLN(5, s)
268
269 #define HEX(x) hex << (x) << dec
270
271 // following check a condition and return -1 on failure
272
273 #undef CHK // simple check
274 #undef CHKTRY // check with action on fail
275 #undef CHKCON // print NDB API errors on failure
276
277 #define CHK(x) CHKTRY(x, ;)
278
279 #define CHKTRY(x, act) \
280 do { \
281 if (x) break; \
282 LL0("line " << __LINE__ << ": " << #x << " failed"); \
283 if (g_opt.m_core) abort(); \
284 act; \
285 return -1; \
286 } while (0)
287
288 #define CHKCON(x, con) \
289 do { \
290 if (x) break; \
291 LL0("line " << __LINE__ << ": " << #x << " failed"); \
292 (con).printerror(ndbout); \
293 if (g_opt.m_core) abort(); \
294 return -1; \
295 } while (0)
296
297 // method parameters
298
299 struct Thr;
300 struct Con;
301 struct Tab;
302 struct ITab;
303 struct Set;
304 struct Tmr;
305
306 struct Par : public Opt {
307 uint m_no;
308 Con* m_con;
conPar309 Con& con() const { assert(m_con != 0); return *m_con; }
310 const Tab* m_tab;
tabPar311 const Tab& tab() const { assert(m_tab != 0); return *m_tab; }
312 const ITab* m_itab;
itabPar313 const ITab& itab() const { assert(m_itab != 0); return *m_itab; }
314 Set* m_set;
setPar315 Set& set() const { assert(m_set != 0); return *m_set; }
316 Tmr* m_tmr;
tmrPar317 Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
318 char m_currcase[2];
319 uint m_lno;
320 uint m_slno;
321 uint m_totrows;
322 // value calculation
323 uint m_range;
324 uint m_pctrange;
325 uint m_pctbrange;
326 int m_bdir;
327 bool m_noindexkeyupdate;
328 // choice of key
329 bool m_randomkey;
330 // do verify after read
331 bool m_verify;
332 // errors to catch (see Con)
333 uint m_catcherr;
334 // abort percentage
335 uint m_abortpct;
336 NdbOperation::LockMode m_lockmode;
337 // scan options
338 bool m_tupscan;
339 bool m_ordered;
340 bool m_descending;
341 bool m_multiRange;
342 // threads used by current test case
343 uint m_usedthreads;
ParPar344 Par(const Opt& opt) :
345 Opt(opt),
346 m_no(0),
347 m_con(0),
348 m_tab(0),
349 m_itab(0),
350 m_set(0),
351 m_tmr(0),
352 m_lno(0),
353 m_slno(0),
354 m_totrows(0),
355 m_range(m_rows),
356 m_pctrange(40),
357 m_pctbrange(80),
358 m_bdir(0),
359 m_noindexkeyupdate(false),
360 m_randomkey(false),
361 m_verify(false),
362 m_catcherr(0),
363 m_abortpct(0),
364 m_lockmode(NdbOperation::LM_Read),
365 m_tupscan(false),
366 m_ordered(false),
367 m_descending(false),
368 m_multiRange(false),
369 m_usedthreads(0)
370 {
371 m_currcase[0] = 0;
372 }
373 };
374
375 static bool
usetable(Par par,uint i)376 usetable(Par par, uint i)
377 {
378 return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
379 }
380
381 static bool
useindex(Par par,uint i)382 useindex(Par par, uint i)
383 {
384 return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
385 }
386
387 static uint
thrrow(Par par,uint j)388 thrrow(Par par, uint j)
389 {
390 return par.m_usedthreads * j + par.m_no;
391 }
392
393 #if 0
394 static bool
395 isthrrow(Par par, uint i)
396 {
397 return i % par.m_usedthreads == par.m_no;
398 }
399 #endif
400
401 // timer
402
403 struct Tmr {
404 void clr();
405 void on();
406 void off(uint cnt = 0);
407 const char* time();
408 const char* pct(const Tmr& t1);
409 const char* over(const Tmr& t1);
410 NDB_TICKS m_on;
411 NDB_TICKS m_ms;
412 uint m_cnt;
413 char m_time[100];
414 char m_text[100];
TmrTmr415 Tmr() { clr(); }
416 };
417
418 void
clr()419 Tmr::clr()
420 {
421 m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
422 }
423
424 void
on()425 Tmr::on()
426 {
427 assert(m_on == 0);
428 m_on = NdbTick_CurrentMillisecond();
429 }
430
431 void
off(uint cnt)432 Tmr::off(uint cnt)
433 {
434 NDB_TICKS off = NdbTick_CurrentMillisecond();
435 assert(m_on != 0 && off >= m_on);
436 m_ms += off - m_on;
437 m_cnt += cnt;
438 m_on = 0;
439 }
440
441 const char*
time()442 Tmr::time()
443 {
444 if (m_cnt == 0) {
445 sprintf(m_time, "%u ms", (unsigned)m_ms);
446 } else {
447 sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", (unsigned)m_ms, m_cnt, (unsigned)((1000 * m_ms) / m_cnt));
448 }
449 return m_time;
450 }
451
452 const char*
pct(const Tmr & t1)453 Tmr::pct(const Tmr& t1)
454 {
455 if (0 < t1.m_ms) {
456 sprintf(m_text, "%u pct", (unsigned)((100 * m_ms) / t1.m_ms));
457 } else {
458 sprintf(m_text, "[cannot measure]");
459 }
460 return m_text;
461 }
462
463 const char*
over(const Tmr & t1)464 Tmr::over(const Tmr& t1)
465 {
466 if (0 < t1.m_ms) {
467 if (t1.m_ms <= m_ms)
468 sprintf(m_text, "%u pct", (unsigned)((100 * (m_ms - t1.m_ms)) / t1.m_ms));
469 else
470 sprintf(m_text, "-%u pct", (unsigned)((100 * (t1.m_ms - m_ms)) / t1.m_ms));
471 } else {
472 sprintf(m_text, "[cannot measure]");
473 }
474 return m_text;
475 }
476
477 // character sets
478
479 static const uint maxcsnumber = 512;
480 static const uint maxcharcount = 32;
481 static const uint maxcharsize = 4;
482 static const uint maxxmulsize = 8;
483
484 // single mb char
485 struct Chr {
486 uchar m_bytes[maxcharsize];
487 uchar m_xbytes[maxxmulsize * maxcharsize];
488 uint m_size;
489 Chr();
490 };
491
Chr()492 Chr::Chr()
493 {
494 memset(m_bytes, 0, sizeof(m_bytes));
495 memset(m_xbytes, 0, sizeof(m_xbytes));
496 m_size = 0;
497 }
498
499 // charset and random valid chars to use
500 struct Chs {
501 CHARSET_INFO* m_cs;
502 uint m_xmul;
503 Chr* m_chr;
504 Chs(CHARSET_INFO* cs);
505 ~Chs();
506 };
507
508 static NdbOut&
509 operator<<(NdbOut& out, const Chs& chs);
510
Chs(CHARSET_INFO * cs)511 Chs::Chs(CHARSET_INFO* cs) :
512 m_cs(cs)
513 {
514 m_xmul = m_cs->strxfrm_multiply;
515 if (m_xmul == 0)
516 m_xmul = 1;
517 assert(m_xmul <= maxxmulsize);
518 m_chr = new Chr [maxcharcount];
519 uint i = 0;
520 uint miss1 = 0;
521 uint miss2 = 0;
522 uint miss3 = 0;
523 uint miss4 = 0;
524 while (i < maxcharcount) {
525 uchar* bytes = m_chr[i].m_bytes;
526 uchar* xbytes = m_chr[i].m_xbytes;
527 uint& size = m_chr[i].m_size;
528 bool ok;
529 size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
530 assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
531 // prefer longer chars
532 if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
533 continue;
534 for (uint j = 0; j < size; j++) {
535 bytes[j] = urandom(256);
536 }
537 int not_used;
538 // check wellformed
539 const char* sbytes = (const char*)bytes;
540 if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1, ¬_used) != size) {
541 miss1++;
542 continue;
543 }
544 // check no proper prefix wellformed
545 ok = true;
546 for (uint j = 1; j < size; j++) {
547 if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1, ¬_used) == j) {
548 ok = false;
549 break;
550 }
551 }
552 if (!ok) {
553 miss2++;
554 continue;
555 }
556 // normalize
557 memset(xbytes, 0, sizeof(xbytes));
558 // currently returns buffer size always
559 int xlen = NdbSqlUtil::ndb_strnxfrm(cs, xbytes, m_xmul * size, bytes, size);
560 // check we got something
561 ok = false;
562 for (uint j = 0; j < (uint)xlen; j++) {
563 if (xbytes[j] != 0) {
564 ok = true;
565 break;
566 }
567 }
568 if (!ok) {
569 miss3++;
570 continue;
571 }
572 // check for duplicate (before normalize)
573 ok = true;
574 for (uint j = 0; j < i; j++) {
575 const Chr& chr = m_chr[j];
576 if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
577 ok = false;
578 break;
579 }
580 }
581 if (!ok) {
582 miss4++;
583 continue;
584 }
585 i++;
586 }
587 bool disorder = true;
588 uint bubbles = 0;
589 while (disorder) {
590 disorder = false;
591 for (uint i = 1; i < maxcharcount; i++) {
592 uint len = sizeof(m_chr[i].m_xbytes);
593 if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
594 Chr chr = m_chr[i];
595 m_chr[i] = m_chr[i-1];
596 m_chr[i-1] = chr;
597 disorder = true;
598 bubbles++;
599 }
600 }
601 }
602 LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
603 }
604
~Chs()605 Chs::~Chs()
606 {
607 delete [] m_chr;
608 }
609
610 static NdbOut&
operator <<(NdbOut & out,const Chs & chs)611 operator<<(NdbOut& out, const Chs& chs)
612 {
613 CHARSET_INFO* cs = chs.m_cs;
614 out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
615 return out;
616 }
617
618 static Chs* cslist[maxcsnumber];
619
620 static void
resetcslist()621 resetcslist()
622 {
623 for (uint i = 0; i < maxcsnumber; i++) {
624 delete cslist[i];
625 cslist[i] = 0;
626 }
627 }
628
629 static Chs*
getcs(Par par)630 getcs(Par par)
631 {
632 CHARSET_INFO* cs;
633 if (par.m_cs != 0) {
634 cs = par.m_cs;
635 } else {
636 while (1) {
637 uint n = urandom(maxcsnumber);
638 cs = get_charset(n, MYF(0));
639 if (cs != 0) {
640 // avoid dodgy internal character sets
641 // see bug# 37554
642 if (cs->state & MY_CS_HIDDEN)
643 continue;
644 // prefer complex charsets
645 if (cs->mbmaxlen != 1 || urandom(5) == 0)
646 break;
647 }
648 }
649 }
650 if (cslist[cs->number] == 0)
651 cslist[cs->number] = new Chs(cs);
652 return cslist[cs->number];
653 }
654
655 // tables and indexes
656
657 // Col - table column
658
659 struct Col {
660 enum Type {
661 Unsigned = NdbDictionary::Column::Unsigned,
662 Char = NdbDictionary::Column::Char,
663 Varchar = NdbDictionary::Column::Varchar,
664 Longvarchar = NdbDictionary::Column::Longvarchar
665 };
666 const struct Tab& m_tab;
667 uint m_num;
668 const char* m_name;
669 bool m_pk;
670 Type m_type;
671 uint m_length;
672 uint m_bytelength; // multiplied by char width
673 uint m_attrsize; // base type size
674 uint m_headsize; // length bytes
675 uint m_bytesize; // full value size
676 bool m_nullable;
677 const Chs* m_chs;
678 Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs);
679 ~Col();
680 bool equal(const Col& col2) const;
681 void wellformed(const void* addr) const;
682 };
683
Col(const struct Tab & tab,uint num,const char * name,bool pk,Type type,uint length,bool nullable,const Chs * chs)684 Col::Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs) :
685 m_tab(tab),
686 m_num(num),
687 m_name(strcpy(new char [strlen(name) + 1], name)),
688 m_pk(pk),
689 m_type(type),
690 m_length(length),
691 m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
692 m_attrsize(
693 type == Unsigned ? sizeof(Uint32) :
694 type == Char ? sizeof(char) :
695 type == Varchar ? sizeof(char) :
696 type == Longvarchar ? sizeof(char) : ~0),
697 m_headsize(
698 type == Unsigned ? 0 :
699 type == Char ? 0 :
700 type == Varchar ? 1 :
701 type == Longvarchar ? 2 : ~0),
702 m_bytesize(m_headsize + m_attrsize * m_bytelength),
703 m_nullable(nullable),
704 m_chs(chs)
705 {
706 // fix long varchar
707 if (type == Varchar && m_bytelength > 255) {
708 m_type = Longvarchar;
709 m_headsize += 1;
710 m_bytesize += 1;
711 }
712 }
713
~Col()714 Col::~Col()
715 {
716 delete [] m_name;
717 }
718
719 bool
equal(const Col & col2) const720 Col::equal(const Col& col2) const
721 {
722 return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
723 }
724
725 void
wellformed(const void * addr) const726 Col::wellformed(const void* addr) const
727 {
728 switch (m_type) {
729 case Col::Unsigned:
730 break;
731 case Col::Char:
732 {
733 CHARSET_INFO* cs = m_chs->m_cs;
734 const char* src = (const char*)addr;
735 uint len = m_bytelength;
736 int not_used;
737 (void)not_used; /* squash warning when assert is #defined to nothing */
738 assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff, ¬_used) == len);
739 }
740 break;
741 case Col::Varchar:
742 {
743 CHARSET_INFO* cs = m_chs->m_cs;
744 const uchar* src = (const uchar*)addr;
745 const char* ssrc = (const char*)src;
746 uint len = src[0];
747 int not_used;
748 (void)not_used; /* squash warning when assert is #defined to nothing */
749 assert(len <= m_bytelength);
750 assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff, ¬_used) == len);
751 }
752 break;
753 case Col::Longvarchar:
754 {
755 CHARSET_INFO* cs = m_chs->m_cs;
756 const uchar* src = (const uchar*)addr;
757 const char* ssrc = (const char*)src;
758 uint len = src[0] + (src[1] << 8);
759 int not_used;
760 (void)not_used; /* squash warning when assert is #defined to nothing */
761 assert(len <= m_bytelength);
762 assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff, ¬_used) == len);
763 }
764 break;
765 default:
766 assert(false);
767 break;
768 }
769 }
770
771 static NdbOut&
operator <<(NdbOut & out,const Col & col)772 operator<<(NdbOut& out, const Col& col)
773 {
774 out << "col[" << col.m_num << "] " << col.m_name;
775 switch (col.m_type) {
776 case Col::Unsigned:
777 out << " uint";
778 break;
779 case Col::Char:
780 {
781 CHARSET_INFO* cs = col.m_chs->m_cs;
782 out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
783 }
784 break;
785 case Col::Varchar:
786 {
787 CHARSET_INFO* cs = col.m_chs->m_cs;
788 out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
789 }
790 break;
791 case Col::Longvarchar:
792 {
793 CHARSET_INFO* cs = col.m_chs->m_cs;
794 out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
795 }
796 break;
797 default:
798 out << "type" << (int)col.m_type;
799 assert(false);
800 break;
801 }
802 out << (col.m_pk ? " pk" : "");
803 out << (col.m_nullable ? " nullable" : "");
804 return out;
805 }
806
807 // ICol - index column
808
809 struct ICol {
810 const struct ITab& m_itab;
811 uint m_num;
812 const Col& m_col;
813 ICol(const struct ITab& itab, uint num, const Col& col);
814 ~ICol();
815 };
816
ICol(const struct ITab & itab,uint num,const Col & col)817 ICol::ICol(const struct ITab& itab, uint num, const Col& col) :
818 m_itab(itab),
819 m_num(num),
820 m_col(col)
821 {
822 }
823
~ICol()824 ICol::~ICol()
825 {
826 }
827
828 static NdbOut&
operator <<(NdbOut & out,const ICol & icol)829 operator<<(NdbOut& out, const ICol& icol)
830 {
831 out << "icol[" << icol.m_num << "] " << icol.m_col;
832 return out;
833 }
834
835 // ITab - index
836
837 struct ITab {
838 enum Type {
839 OrderedIndex = NdbDictionary::Index::OrderedIndex,
840 UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
841 };
842 const struct Tab& m_tab;
843 const char* m_name;
844 Type m_type;
845 uint m_icols;
846 const ICol** m_icol;
847 uint m_keymask;
848 ITab(const struct Tab& tab, const char* name, Type type, uint icols);
849 ~ITab();
850 void icoladd(uint k, const ICol* icolptr);
851 };
852
ITab(const struct Tab & tab,const char * name,Type type,uint icols)853 ITab::ITab(const struct Tab& tab, const char* name, Type type, uint icols) :
854 m_tab(tab),
855 m_name(strcpy(new char [strlen(name) + 1], name)),
856 m_type(type),
857 m_icols(icols),
858 m_icol(new const ICol* [icols + 1]),
859 m_keymask(0)
860 {
861 for (uint k = 0; k <= m_icols; k++)
862 m_icol[k] = 0;
863 }
864
~ITab()865 ITab::~ITab()
866 {
867 delete [] m_name;
868 for (uint i = 0; i < m_icols; i++)
869 delete m_icol[i];
870 delete [] m_icol;
871 }
872
873 void
icoladd(uint k,const ICol * icolptr)874 ITab::icoladd(uint k, const ICol* icolptr)
875 {
876 assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
877 m_icol[k] = icolptr;
878 m_keymask |= (1 << icolptr->m_col.m_num);
879 }
880
881 static NdbOut&
operator <<(NdbOut & out,const ITab & itab)882 operator<<(NdbOut& out, const ITab& itab)
883 {
884 out << "itab " << itab.m_name << " icols=" << itab.m_icols;
885 for (uint k = 0; k < itab.m_icols; k++) {
886 const ICol& icol = *itab.m_icol[k];
887 out << endl << icol;
888 }
889 return out;
890 }
891
892 // Tab - table
893
894 struct Tab {
895 const char* m_name;
896 uint m_cols;
897 const Col** m_col;
898 uint m_pkmask;
899 uint m_itabs;
900 const ITab** m_itab;
901 uint m_orderedindexes;
902 uint m_hashindexes;
903 // pk must contain an Unsigned column
904 uint m_keycol;
905 void coladd(uint k, Col* colptr);
906 void itabadd(uint j, ITab* itab);
907 Tab(const char* name, uint cols, uint itabs, uint keycol);
908 ~Tab();
909 };
910
Tab(const char * name,uint cols,uint itabs,uint keycol)911 Tab::Tab(const char* name, uint cols, uint itabs, uint keycol) :
912 m_name(strcpy(new char [strlen(name) + 1], name)),
913 m_cols(cols),
914 m_col(new const Col* [cols + 1]),
915 m_pkmask(0),
916 m_itabs(itabs),
917 m_itab(new const ITab* [itabs + 1]),
918 m_orderedindexes(0),
919 m_hashindexes(0),
920 m_keycol(keycol)
921 {
922 for (uint k = 0; k <= cols; k++)
923 m_col[k] = 0;
924 for (uint j = 0; j <= itabs; j++)
925 m_itab[j] = 0;
926 }
927
~Tab()928 Tab::~Tab()
929 {
930 delete [] m_name;
931 for (uint i = 0; i < m_cols; i++)
932 delete m_col[i];
933 delete [] m_col;
934 for (uint i = 0; i < m_itabs; i++)
935 delete m_itab[i];
936 delete [] m_itab;
937 }
938
939 void
coladd(uint k,Col * colptr)940 Tab::coladd(uint k, Col* colptr)
941 {
942 assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
943 m_col[k] = colptr;
944 if (colptr->m_pk)
945 m_pkmask |= (1 << k);
946 }
947
948 void
itabadd(uint j,ITab * itabptr)949 Tab::itabadd(uint j, ITab* itabptr)
950 {
951 assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
952 m_itab[j] = itabptr;
953 if (itabptr->m_type == ITab::OrderedIndex)
954 m_orderedindexes++;
955 else
956 m_hashindexes++;
957 }
958
959 static NdbOut&
operator <<(NdbOut & out,const Tab & tab)960 operator<<(NdbOut& out, const Tab& tab)
961 {
962 out << "tab " << tab.m_name << " cols=" << tab.m_cols;
963 for (uint k = 0; k < tab.m_cols; k++) {
964 const Col& col = *tab.m_col[k];
965 out << endl << col;
966 }
967 for (uint i = 0; i < tab.m_itabs; i++) {
968 if (tab.m_itab[i] == 0)
969 continue;
970 const ITab& itab = *tab.m_itab[i];
971 out << endl << itab;
972 }
973 return out;
974 }
975
976 // make table structs
977
978 static const Tab** tablist = 0;
979 static uint tabcount = 0;
980
981 static void
verifytables()982 verifytables()
983 {
984 for (uint j = 0; j < tabcount; j++) {
985 const Tab* t = tablist[j];
986 if (t == 0)
987 continue;
988 assert(t->m_cols != 0 && t->m_col != 0);
989 for (uint k = 0; k < t->m_cols; k++) {
990 const Col* c = t->m_col[k];
991 assert(c != 0 && c->m_num == k);
992 assert(!(c->m_pk && c->m_nullable));
993 }
994 assert(t->m_col[t->m_cols] == 0);
995 {
996 assert(t->m_keycol < t->m_cols);
997 const Col* c = t->m_col[t->m_keycol];
998 assert(c->m_pk && c->m_type == Col::Unsigned);
999 }
1000 assert(t->m_itabs != 0 && t->m_itab != 0);
1001 for (uint i = 0; i < t->m_itabs; i++) {
1002 const ITab* x = t->m_itab[i];
1003 if (x == 0)
1004 continue;
1005 assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
1006 for (uint k = 0; k < x->m_icols; k++) {
1007 const ICol* c = x->m_icol[k];
1008 assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
1009 if (x->m_type == ITab::UniqueHashIndex) {
1010 assert(!c->m_col.m_nullable);
1011 }
1012 }
1013 }
1014 assert(t->m_itab[t->m_itabs] == 0);
1015 }
1016 }
1017
1018 static void
makebuiltintables(Par par)1019 makebuiltintables(Par par)
1020 {
1021 LL2("makebuiltintables");
1022 resetcslist();
1023 tabcount = 3;
1024 if (tablist == 0) {
1025 tablist = new const Tab* [tabcount];
1026 for (uint j = 0; j < tabcount; j++) {
1027 tablist[j] = 0;
1028 }
1029 } else {
1030 for (uint j = 0; j < tabcount; j++) {
1031 delete tablist[j];
1032 tablist[j] = 0;
1033 }
1034 }
1035 // ti0 - basic
1036 if (usetable(par, 0)) {
1037 Tab* t = new Tab("ti0", 5, 7, 0);
1038 // name - pk - type - length - nullable - cs
1039 t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
1040 t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
1041 t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
1042 t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
1043 t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
1044 if (useindex(par, 0)) {
1045 // a
1046 ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
1047 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1048 t->itabadd(0, x);
1049 }
1050 if (useindex(par, 1)) {
1051 // b
1052 ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
1053 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1054 t->itabadd(1, x);
1055 }
1056 if (useindex(par, 2)) {
1057 // b, c
1058 ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
1059 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1060 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1061 t->itabadd(2, x);
1062 }
1063 if (useindex(par, 3)) {
1064 // b, e, c, d
1065 ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4);
1066 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1067 x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1068 x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1069 x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
1070 t->itabadd(3, x);
1071 }
1072 if (useindex(par, 4)) {
1073 // a, c
1074 ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2);
1075 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1076 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1077 t->itabadd(4, x);
1078 }
1079 if (useindex(par, 5)) {
1080 // a, e
1081 ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
1082 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1083 x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1084 t->itabadd(5, x);
1085 }
1086 tablist[0] = t;
1087 }
1088 // ti1 - simple char fields
1089 if (usetable(par, 1)) {
1090 Tab* t = new Tab("ti1", 5, 7, 1);
1091 // name - pk - type - length - nullable - cs
1092 t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
1093 t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
1094 t->coladd(2, new Col(*t, 2, "c", 0, Col::Varchar, 20, 0, getcs(par)));
1095 t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
1096 t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par)));
1097 if (useindex(par, 0)) {
1098 // b
1099 ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
1100 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1101 t->itabadd(0, x);
1102 }
1103 if (useindex(par, 1)) {
1104 // c, a
1105 ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
1106 x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
1107 x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
1108 t->itabadd(1, x);
1109 }
1110 if (useindex(par, 2)) {
1111 // d
1112 ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1);
1113 x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1114 t->itabadd(2, x);
1115 }
1116 if (useindex(par, 3)) {
1117 // e, d, c, b
1118 ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4);
1119 x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1120 x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1121 x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1122 x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1123 t->itabadd(3, x);
1124 }
1125 if (useindex(par, 4)) {
1126 // a, b
1127 ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2);
1128 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1129 x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
1130 t->itabadd(4, x);
1131 }
1132 if (useindex(par, 5)) {
1133 // b, c, d
1134 ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3);
1135 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1136 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1137 x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1138 t->itabadd(5, x);
1139 }
1140 tablist[1] = t;
1141 }
1142 // ti2 - complex char fields
1143 if (usetable(par, 2)) {
1144 Tab* t = new Tab("ti2", 5, 7, 2);
1145 // name - pk - type - length - nullable - cs
1146 t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
1147 t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
1148 t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
1149 t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par)));
1150 t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par)));
1151 if (useindex(par, 0)) {
1152 // a, c, d
1153 ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
1154 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1155 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1156 x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1157 t->itabadd(0, x);
1158 }
1159 if (useindex(par, 1)) {
1160 // e, d, c, b, a
1161 ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
1162 x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1163 x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1164 x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1165 x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1166 x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
1167 t->itabadd(1, x);
1168 }
1169 if (useindex(par, 2)) {
1170 // d
1171 ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
1172 x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1173 t->itabadd(2, x);
1174 }
1175 if (useindex(par, 3)) {
1176 // b
1177 ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
1178 x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1179 t->itabadd(3, x);
1180 }
1181 if (useindex(par, 4)) {
1182 // a, c
1183 ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2);
1184 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1185 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1186 t->itabadd(4, x);
1187 }
1188 if (useindex(par, 5)) {
1189 // a, c, d, e
1190 ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4);
1191 x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1192 x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1193 x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1194 x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
1195 t->itabadd(5, x);
1196 }
1197 tablist[2] = t;
1198 }
1199 verifytables();
1200 }
1201
1202 // connections
1203
1204 static Ndb_cluster_connection* g_ncc = 0;
1205
1206 struct Con {
1207 Ndb* m_ndb;
1208 NdbDictionary::Dictionary* m_dic;
1209 NdbTransaction* m_tx;
1210 Uint64 m_txid;
1211 NdbOperation* m_op;
1212 NdbIndexOperation* m_indexop;
1213 NdbScanOperation* m_scanop;
1214 NdbIndexScanOperation* m_indexscanop;
1215 NdbScanFilter* m_scanfilter;
1216 enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
1217 ScanMode m_scanmode;
1218 enum ErrType {
1219 ErrNone = 0,
1220 ErrDeadlock = 1,
1221 ErrNospace = 2,
1222 ErrOther = 4
1223 };
1224 ErrType m_errtype;
1225 char m_errname[100];
ConCon1226 Con() :
1227 m_ndb(0), m_dic(0), m_tx(0), m_txid(0), m_op(0), m_indexop(0),
1228 m_scanop(0), m_indexscanop(0), m_scanfilter(0),
1229 m_scanmode(ScanNo), m_errtype(ErrNone) {}
~ConCon1230 ~Con() {
1231 if (m_tx != 0)
1232 closeTransaction();
1233 }
1234 int connect();
1235 void connect(const Con& con);
1236 void disconnect();
1237 int startTransaction();
1238 int getNdbOperation(const Tab& tab);
1239 int getNdbIndexOperation1(const ITab& itab, const Tab& tab);
1240 int getNdbIndexOperation(const ITab& itab, const Tab& tab);
1241 int getNdbScanOperation(const Tab& tab);
1242 int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
1243 int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
1244 int getNdbScanFilter();
1245 int equal(int num, const char* addr);
1246 int getValue(int num, NdbRecAttr*& rec);
1247 int setValue(int num, const char* addr);
1248 int setBound(int num, int type, const void* value);
1249 int beginFilter(int group);
1250 int endFilter();
1251 int setFilter(int num, int cond, const void* value, uint len);
1252 int execute(ExecType et);
1253 int execute(ExecType et, uint& err);
1254 int readTuple(Par par);
1255 int readTuples(Par par);
1256 int readIndexTuples(Par par);
1257 int executeScan();
1258 int nextScanResult(bool fetchAllowed);
1259 int nextScanResult(bool fetchAllowed, uint& err);
1260 int updateScanTuple(Con& con2);
1261 int deleteScanTuple(Con& con2);
1262 void closeScan();
1263 void closeTransaction();
1264 const char* errname(uint err);
1265 void printerror(NdbOut& out);
1266 };
1267
1268 int
connect()1269 Con::connect()
1270 {
1271 assert(m_ndb == 0);
1272 m_ndb = new Ndb(g_ncc, "TEST_DB");
1273 CHKCON(m_ndb->init() == 0, *this);
1274 CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
1275 m_tx = 0, m_txid = 0, m_op = 0;
1276 return 0;
1277 }
1278
1279 void
connect(const Con & con)1280 Con::connect(const Con& con)
1281 {
1282 assert(m_ndb == 0);
1283 m_ndb = con.m_ndb;
1284 }
1285
1286 void
disconnect()1287 Con::disconnect()
1288 {
1289 delete m_ndb;
1290 m_ndb = 0, m_dic = 0, m_tx = 0, m_txid = 0, m_op = 0;
1291 }
1292
1293 int
startTransaction()1294 Con::startTransaction()
1295 {
1296 assert(m_ndb != 0);
1297 if (m_tx != 0)
1298 closeTransaction();
1299 CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
1300 m_txid = m_tx->getTransactionId();
1301 return 0;
1302 }
1303
1304 int
getNdbOperation(const Tab & tab)1305 Con::getNdbOperation(const Tab& tab)
1306 {
1307 assert(m_tx != 0);
1308 CHKCON((m_op = m_tx->getNdbOperation(tab.m_name)) != 0, *this);
1309 return 0;
1310 }
1311
1312 int
getNdbIndexOperation1(const ITab & itab,const Tab & tab)1313 Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab)
1314 {
1315 assert(m_tx != 0);
1316 CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
1317 return 0;
1318 }
1319
1320 int
getNdbIndexOperation(const ITab & itab,const Tab & tab)1321 Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
1322 {
1323 assert(m_tx != 0);
1324 uint tries = 0;
1325 while (1) {
1326 if (getNdbIndexOperation1(itab, tab) == 0)
1327 break;
1328 CHK(++tries < 10);
1329 NdbSleep_MilliSleep(100);
1330 }
1331 return 0;
1332 }
1333
1334 int
getNdbScanOperation(const Tab & tab)1335 Con::getNdbScanOperation(const Tab& tab)
1336 {
1337 assert(m_tx != 0);
1338 CHKCON((m_op = m_scanop = m_tx->getNdbScanOperation(tab.m_name)) != 0, *this);
1339 return 0;
1340 }
1341
1342 int
getNdbIndexScanOperation1(const ITab & itab,const Tab & tab)1343 Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab)
1344 {
1345 assert(m_tx != 0);
1346 CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this);
1347 return 0;
1348 }
1349
1350 int
getNdbIndexScanOperation(const ITab & itab,const Tab & tab)1351 Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
1352 {
1353 assert(m_tx != 0);
1354 uint tries = 0;
1355 while (1) {
1356 if (getNdbIndexScanOperation1(itab, tab) == 0)
1357 break;
1358 CHK(++tries < 10);
1359 NdbSleep_MilliSleep(100);
1360 }
1361 return 0;
1362 }
1363
1364 int
getNdbScanFilter()1365 Con::getNdbScanFilter()
1366 {
1367 assert(m_tx != 0 && m_scanop != 0);
1368 delete m_scanfilter;
1369 m_scanfilter = new NdbScanFilter(m_scanop);
1370 return 0;
1371 }
1372
1373 int
equal(int num,const char * addr)1374 Con::equal(int num, const char* addr)
1375 {
1376 assert(m_tx != 0 && m_op != 0);
1377 CHKCON(m_op->equal(num, addr) == 0, *this);
1378 return 0;
1379 }
1380
1381 int
getValue(int num,NdbRecAttr * & rec)1382 Con::getValue(int num, NdbRecAttr*& rec)
1383 {
1384 assert(m_tx != 0 && m_op != 0);
1385 CHKCON((rec = m_op->getValue(num, 0)) != 0, *this);
1386 return 0;
1387 }
1388
1389 int
setValue(int num,const char * addr)1390 Con::setValue(int num, const char* addr)
1391 {
1392 assert(m_tx != 0 && m_op != 0);
1393 CHKCON(m_op->setValue(num, addr) == 0, *this);
1394 return 0;
1395 }
1396
1397 int
setBound(int num,int type,const void * value)1398 Con::setBound(int num, int type, const void* value)
1399 {
1400 assert(m_tx != 0 && m_indexscanop != 0);
1401 CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
1402 return 0;
1403 }
1404
1405 int
beginFilter(int group)1406 Con::beginFilter(int group)
1407 {
1408 assert(m_tx != 0 && m_scanfilter != 0);
1409 CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
1410 return 0;
1411 }
1412
1413 int
endFilter()1414 Con::endFilter()
1415 {
1416 assert(m_tx != 0 && m_scanfilter != 0);
1417 CHKCON(m_scanfilter->end() == 0, *this);
1418 return 0;
1419 }
1420
1421 int
setFilter(int num,int cond,const void * value,uint len)1422 Con::setFilter(int num, int cond, const void* value, uint len)
1423 {
1424 assert(m_tx != 0 && m_scanfilter != 0);
1425 CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
1426 return 0;
1427 }
1428
1429 int
execute(ExecType et)1430 Con::execute(ExecType et)
1431 {
1432 assert(m_tx != 0);
1433 CHKCON(m_tx->execute(et) == 0, *this);
1434 return 0;
1435 }
1436
1437 int
execute(ExecType et,uint & err)1438 Con::execute(ExecType et, uint& err)
1439 {
1440 int ret = execute(et);
1441 // err in: errors to catch, out: error caught
1442 const uint errin = err;
1443 err = 0;
1444 if (ret == -1) {
1445 if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1446 LL3("caught deadlock");
1447 err = ErrDeadlock;
1448 ret = 0;
1449 }
1450 if (m_errtype == ErrNospace && (errin & ErrNospace)) {
1451 LL3("caught nospace");
1452 err = ErrNospace;
1453 ret = 0;
1454 }
1455 }
1456 CHK(ret == 0);
1457 return 0;
1458 }
1459
1460 int
readTuple(Par par)1461 Con::readTuple(Par par)
1462 {
1463 assert(m_tx != 0 && m_op != 0);
1464 NdbOperation::LockMode lm = par.m_lockmode;
1465 CHKCON(m_op->readTuple(lm) == 0, *this);
1466 return 0;
1467 }
1468
1469 int
readTuples(Par par)1470 Con::readTuples(Par par)
1471 {
1472 assert(m_tx != 0 && m_scanop != 0);
1473 int scan_flags = 0;
1474 if (par.m_tupscan)
1475 scan_flags |= NdbScanOperation::SF_TupScan;
1476 CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1477 return 0;
1478 }
1479
1480 int
readIndexTuples(Par par)1481 Con::readIndexTuples(Par par)
1482 {
1483 assert(m_tx != 0 && m_indexscanop != 0);
1484 int scan_flags = 0;
1485 if (par.m_ordered)
1486 scan_flags |= NdbScanOperation::SF_OrderBy;
1487 if (par.m_descending)
1488 scan_flags |= NdbScanOperation::SF_Descending;
1489 if (par.m_multiRange)
1490 {
1491 scan_flags |= NdbScanOperation::SF_MultiRange;
1492 scan_flags |= NdbScanOperation::SF_ReadRangeNo;
1493 }
1494 CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1495 return 0;
1496 }
1497
1498 int
executeScan()1499 Con::executeScan()
1500 {
1501 CHKCON(m_tx->execute(NoCommit) == 0, *this);
1502 return 0;
1503 }
1504
1505 int
nextScanResult(bool fetchAllowed)1506 Con::nextScanResult(bool fetchAllowed)
1507 {
1508 int ret;
1509 assert(m_scanop != 0);
1510 CHKCON((ret = m_scanop->nextResult(fetchAllowed)) != -1, *this);
1511 assert(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1512 return ret;
1513 }
1514
1515 int
nextScanResult(bool fetchAllowed,uint & err)1516 Con::nextScanResult(bool fetchAllowed, uint& err)
1517 {
1518 int ret = nextScanResult(fetchAllowed);
1519 // err in: errors to catch, out: error caught
1520 const uint errin = err;
1521 err = 0;
1522 if (ret == -1) {
1523 if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1524 LL3("caught deadlock");
1525 err = ErrDeadlock;
1526 ret = 0;
1527 }
1528 }
1529 CHK(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1530 return ret;
1531 }
1532
1533 int
updateScanTuple(Con & con2)1534 Con::updateScanTuple(Con& con2)
1535 {
1536 assert(con2.m_tx != 0);
1537 CHKCON((con2.m_op = m_scanop->updateCurrentTuple(con2.m_tx)) != 0, *this);
1538 con2.m_txid = m_txid; // in the kernel
1539 return 0;
1540 }
1541
1542 int
deleteScanTuple(Con & con2)1543 Con::deleteScanTuple(Con& con2)
1544 {
1545 assert(con2.m_tx != 0);
1546 CHKCON(m_scanop->deleteCurrentTuple(con2.m_tx) == 0, *this);
1547 con2.m_txid = m_txid; // in the kernel
1548 return 0;
1549 }
1550
1551 void
closeScan()1552 Con::closeScan()
1553 {
1554 assert(m_scanop != 0);
1555 m_scanop->close();
1556 m_scanop = 0, m_indexscanop = 0;
1557
1558 }
1559
1560 void
closeTransaction()1561 Con::closeTransaction()
1562 {
1563 assert(m_ndb != 0 && m_tx != 0);
1564 m_ndb->closeTransaction(m_tx);
1565 m_tx = 0, m_txid = 0, m_op = 0;
1566 m_scanop = 0, m_indexscanop = 0;
1567 }
1568
1569 const char*
errname(uint err)1570 Con::errname(uint err)
1571 {
1572 sprintf(m_errname, "0x%x", err);
1573 if (err & ErrDeadlock)
1574 strcat(m_errname, ",deadlock");
1575 if (err & ErrNospace)
1576 strcat(m_errname, ",nospace");
1577 return m_errname;
1578 }
1579
1580 void
printerror(NdbOut & out)1581 Con::printerror(NdbOut& out)
1582 {
1583 m_errtype = ErrOther;
1584 uint any = 0;
1585 int code;
1586 int die = 0;
1587 if (m_ndb) {
1588 if ((code = m_ndb->getNdbError().code) != 0) {
1589 LL0(++any << " ndb: error " << m_ndb->getNdbError());
1590 die += (code == g_opt.m_die);
1591 }
1592 if (m_dic && (code = m_dic->getNdbError().code) != 0) {
1593 LL0(++any << " dic: error " << m_dic->getNdbError());
1594 die += (code == g_opt.m_die);
1595 }
1596 if (m_tx) {
1597 if ((code = m_tx->getNdbError().code) != 0) {
1598 LL0(++any << " con: error " << m_tx->getNdbError());
1599 die += (code == g_opt.m_die);
1600 // 631 is new, occurs only on 4 db nodes, needs to be checked out
1601 if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
1602 m_errtype = ErrDeadlock;
1603 if (code == 826 || code == 827 || code == 902)
1604 m_errtype = ErrNospace;
1605 }
1606 if (m_op && m_op->getNdbError().code != 0) {
1607 LL0(++any << " op : error " << m_op->getNdbError());
1608 die += (code == g_opt.m_die);
1609 }
1610 }
1611 }
1612 if (!any) {
1613 LL0("failed but no NDB error code");
1614 }
1615 if (die) {
1616 if (g_opt.m_core)
1617 abort();
1618 exit(1);
1619 }
1620 }
1621
1622 // dictionary operations
1623
1624 static int
invalidateindex(Par par,const ITab & itab)1625 invalidateindex(Par par, const ITab& itab)
1626 {
1627 Con& con = par.con();
1628 const Tab& tab = par.tab();
1629 con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
1630 return 0;
1631 }
1632
1633 static int
invalidateindex(Par par)1634 invalidateindex(Par par)
1635 {
1636 const Tab& tab = par.tab();
1637 for (uint i = 0; i < tab.m_itabs; i++) {
1638 if (tab.m_itab[i] == 0)
1639 continue;
1640 const ITab& itab = *tab.m_itab[i];
1641 invalidateindex(par, itab);
1642 }
1643 return 0;
1644 }
1645
1646 static int
invalidatetable(Par par)1647 invalidatetable(Par par)
1648 {
1649 Con& con = par.con();
1650 const Tab& tab = par.tab();
1651 invalidateindex(par);
1652 con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
1653 return 0;
1654 }
1655
1656 static int
droptable(Par par)1657 droptable(Par par)
1658 {
1659 Con& con = par.con();
1660 const Tab& tab = par.tab();
1661 con.m_dic = con.m_ndb->getDictionary();
1662 if (con.m_dic->getTable(tab.m_name) == 0) {
1663 // how to check for error
1664 LL4("no table " << tab.m_name);
1665 } else {
1666 LL3("drop table " << tab.m_name);
1667 CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
1668 }
1669 con.m_dic = 0;
1670 return 0;
1671 }
1672
1673 static int
createtable(Par par)1674 createtable(Par par)
1675 {
1676 Con& con = par.con();
1677 const Tab& tab = par.tab();
1678 LL3("create table " << tab.m_name);
1679 LL4(tab);
1680 NdbDictionary::Table t(tab.m_name);
1681 if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
1682 t.setFragmentType(par.m_fragtype);
1683 }
1684 if (par.m_nologging) {
1685 t.setLogging(false);
1686 }
1687 for (uint k = 0; k < tab.m_cols; k++) {
1688 const Col& col = *tab.m_col[k];
1689 NdbDictionary::Column c(col.m_name);
1690 c.setType((NdbDictionary::Column::Type)col.m_type);
1691 c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
1692 c.setPrimaryKey(col.m_pk);
1693 c.setNullable(col.m_nullable);
1694 if (col.m_chs != 0)
1695 c.setCharset(col.m_chs->m_cs);
1696 t.addColumn(c);
1697 }
1698 con.m_dic = con.m_ndb->getDictionary();
1699 CHKCON(con.m_dic->createTable(t) == 0, con);
1700 con.m_dic = 0;
1701 return 0;
1702 }
1703
1704 static int
dropindex(Par par,const ITab & itab)1705 dropindex(Par par, const ITab& itab)
1706 {
1707 Con& con = par.con();
1708 const Tab& tab = par.tab();
1709 con.m_dic = con.m_ndb->getDictionary();
1710 if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
1711 // how to check for error
1712 LL4("no index " << itab.m_name);
1713 } else {
1714 LL3("drop index " << itab.m_name);
1715 CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
1716 }
1717 con.m_dic = 0;
1718 return 0;
1719 }
1720
1721 static int
dropindex(Par par)1722 dropindex(Par par)
1723 {
1724 const Tab& tab = par.tab();
1725 for (uint i = 0; i < tab.m_itabs; i++) {
1726 if (tab.m_itab[i] == 0)
1727 continue;
1728 const ITab& itab = *tab.m_itab[i];
1729 CHK(dropindex(par, itab) == 0);
1730 }
1731 return 0;
1732 }
1733
1734 static int
createindex(Par par,const ITab & itab)1735 createindex(Par par, const ITab& itab)
1736 {
1737 Con& con = par.con();
1738 const Tab& tab = par.tab();
1739 LL3("create index " << itab.m_name);
1740 LL4(itab);
1741 NdbDictionary::Index x(itab.m_name);
1742 x.setTable(tab.m_name);
1743 x.setType((NdbDictionary::Index::Type)itab.m_type);
1744 if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
1745 x.setLogging(false);
1746 }
1747 for (uint k = 0; k < itab.m_icols; k++) {
1748 const ICol& icol = *itab.m_icol[k];
1749 const Col& col = icol.m_col;
1750 x.addColumnName(col.m_name);
1751 }
1752 con.m_dic = con.m_ndb->getDictionary();
1753 CHKCON(con.m_dic->createIndex(x) == 0, con);
1754 con.m_dic = 0;
1755 return 0;
1756 }
1757
1758 static int
createindex(Par par)1759 createindex(Par par)
1760 {
1761 const Tab& tab = par.tab();
1762 for (uint i = 0; i < tab.m_itabs; i++) {
1763 if (tab.m_itab[i] == 0)
1764 continue;
1765 const ITab& itab = *tab.m_itab[i];
1766 CHK(createindex(par, itab) == 0);
1767 }
1768 return 0;
1769 }
1770
1771 // data sets
1772
1773 // Val - typed column value
1774
1775 struct Val {
1776 const Col& m_col;
1777 union {
1778 Uint32 m_uint32;
1779 uchar* m_char;
1780 uchar* m_varchar;
1781 uchar* m_longvarchar;
1782 };
1783 bool m_null;
1784 // construct
1785 Val(const Col& col);
1786 ~Val();
1787 void copy(const Val& val2);
1788 void copy(const void* addr);
1789 const void* dataaddr() const;
1790 void calc(Par par, uint i);
1791 void calckey(Par par, uint i);
1792 void calckeychars(Par par, uint i, uint& n, uchar* buf);
1793 void calcnokey(Par par);
1794 void calcnokeychars(Par par, uint& n, uchar* buf);
1795 // operations
1796 int setval(Par par) const;
1797 int setval(Par par, const ICol& icol) const;
1798 // compare
1799 int cmp(Par par, const Val& val2) const;
1800 int cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const;
1801 int verify(Par par, const Val& val2) const;
1802 private:
1803 Val& operator=(const Val& val2);
1804 };
1805
1806 static NdbOut&
1807 operator<<(NdbOut& out, const Val& val);
1808
1809 // construct
1810
Val(const Col & col)1811 Val::Val(const Col& col) :
1812 m_col(col)
1813 {
1814 switch (col.m_type) {
1815 case Col::Unsigned:
1816 m_uint32 = 0x7e7e7e7e;
1817 break;
1818 case Col::Char:
1819 m_char = new uchar [col.m_bytelength];
1820 memset(m_char, 0x7e, col.m_bytelength);
1821 break;
1822 case Col::Varchar:
1823 m_varchar = new uchar [1 + col.m_bytelength];
1824 memset(m_char, 0x7e, 1 + col.m_bytelength);
1825 break;
1826 case Col::Longvarchar:
1827 m_longvarchar = new uchar [2 + col.m_bytelength];
1828 memset(m_char, 0x7e, 2 + col.m_bytelength);
1829 break;
1830 default:
1831 assert(false);
1832 break;
1833 }
1834 }
1835
~Val()1836 Val::~Val()
1837 {
1838 const Col& col = m_col;
1839 switch (col.m_type) {
1840 case Col::Unsigned:
1841 break;
1842 case Col::Char:
1843 delete [] m_char;
1844 break;
1845 case Col::Varchar:
1846 delete [] m_varchar;
1847 break;
1848 case Col::Longvarchar:
1849 delete [] m_longvarchar;
1850 break;
1851 default:
1852 assert(false);
1853 break;
1854 }
1855 }
1856
1857 void
copy(const Val & val2)1858 Val::copy(const Val& val2)
1859 {
1860 const Col& col = m_col;
1861 const Col& col2 = val2.m_col;
1862 assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
1863 if (val2.m_null) {
1864 m_null = true;
1865 return;
1866 }
1867 copy(val2.dataaddr());
1868 }
1869
1870 void
copy(const void * addr)1871 Val::copy(const void* addr)
1872 {
1873 const Col& col = m_col;
1874 switch (col.m_type) {
1875 case Col::Unsigned:
1876 m_uint32 = *(const Uint32*)addr;
1877 break;
1878 case Col::Char:
1879 memcpy(m_char, addr, col.m_bytelength);
1880 break;
1881 case Col::Varchar:
1882 memcpy(m_varchar, addr, 1 + col.m_bytelength);
1883 break;
1884 case Col::Longvarchar:
1885 memcpy(m_longvarchar, addr, 2 + col.m_bytelength);
1886 break;
1887 default:
1888 assert(false);
1889 break;
1890 }
1891 m_null = false;
1892 }
1893
1894 const void*
dataaddr() const1895 Val::dataaddr() const
1896 {
1897 const Col& col = m_col;
1898 switch (col.m_type) {
1899 case Col::Unsigned:
1900 return &m_uint32;
1901 case Col::Char:
1902 return m_char;
1903 case Col::Varchar:
1904 return m_varchar;
1905 case Col::Longvarchar:
1906 return m_longvarchar;
1907 default:
1908 break;
1909 }
1910 assert(false);
1911 return 0;
1912 }
1913
1914 void
calc(Par par,uint i)1915 Val::calc(Par par, uint i)
1916 {
1917 const Col& col = m_col;
1918 col.m_pk ? calckey(par, i) : calcnokey(par);
1919 if (!m_null)
1920 col.wellformed(dataaddr());
1921 }
1922
1923 void
calckey(Par par,uint i)1924 Val::calckey(Par par, uint i)
1925 {
1926 const Col& col = m_col;
1927 m_null = false;
1928 switch (col.m_type) {
1929 case Col::Unsigned:
1930 m_uint32 = i;
1931 break;
1932 case Col::Char:
1933 {
1934 const Chs* chs = col.m_chs;
1935 CHARSET_INFO* cs = chs->m_cs;
1936 uint n = 0;
1937 calckeychars(par, i, n, m_char);
1938 // extend by appropriate space
1939 (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
1940 }
1941 break;
1942 case Col::Varchar:
1943 {
1944 uint n = 0;
1945 calckeychars(par, i, n, m_varchar + 1);
1946 // set length and pad with nulls
1947 m_varchar[0] = n;
1948 memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
1949 }
1950 break;
1951 case Col::Longvarchar:
1952 {
1953 uint n = 0;
1954 calckeychars(par, i, n, m_longvarchar + 2);
1955 // set length and pad with nulls
1956 m_longvarchar[0] = (n & 0xff);
1957 m_longvarchar[1] = (n >> 8);
1958 memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
1959 }
1960 break;
1961 default:
1962 assert(false);
1963 break;
1964 }
1965 }
1966
1967 void
calckeychars(Par par,uint i,uint & n,uchar * buf)1968 Val::calckeychars(Par par, uint i, uint& n, uchar* buf)
1969 {
1970 const Col& col = m_col;
1971 const Chs* chs = col.m_chs;
1972 n = 0;
1973 uint len = 0;
1974 while (len < col.m_length) {
1975 if (i % (1 + n) == 0) {
1976 break;
1977 }
1978 const Chr& chr = chs->m_chr[i % maxcharcount];
1979 assert(n + chr.m_size <= col.m_bytelength);
1980 memcpy(buf + n, chr.m_bytes, chr.m_size);
1981 n += chr.m_size;
1982 len++;
1983 }
1984 }
1985
1986 void
calcnokey(Par par)1987 Val::calcnokey(Par par)
1988 {
1989 const Col& col = m_col;
1990 m_null = false;
1991 if (col.m_nullable && urandom(100) < par.m_pctnull) {
1992 m_null = true;
1993 return;
1994 }
1995 int r = irandom((par.m_pctrange * par.m_range) / 100);
1996 if (par.m_bdir != 0 && urandom(10) != 0) {
1997 if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
1998 r = -r;
1999 }
2000 uint v = par.m_range + r;
2001 switch (col.m_type) {
2002 case Col::Unsigned:
2003 m_uint32 = v;
2004 break;
2005 case Col::Char:
2006 {
2007 const Chs* chs = col.m_chs;
2008 CHARSET_INFO* cs = chs->m_cs;
2009 uint n = 0;
2010 calcnokeychars(par, n, m_char);
2011 // extend by appropriate space
2012 (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
2013 }
2014 break;
2015 case Col::Varchar:
2016 {
2017 uint n = 0;
2018 calcnokeychars(par, n, m_varchar + 1);
2019 // set length and pad with nulls
2020 m_varchar[0] = n;
2021 memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
2022 }
2023 break;
2024 case Col::Longvarchar:
2025 {
2026 uint n = 0;
2027 calcnokeychars(par, n, m_longvarchar + 2);
2028 // set length and pad with nulls
2029 m_longvarchar[0] = (n & 0xff);
2030 m_longvarchar[1] = (n >> 8);
2031 memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
2032 }
2033 break;
2034 default:
2035 assert(false);
2036 break;
2037 }
2038 }
2039
2040 void
calcnokeychars(Par par,uint & n,uchar * buf)2041 Val::calcnokeychars(Par par, uint& n, uchar* buf)
2042 {
2043 const Col& col = m_col;
2044 const Chs* chs = col.m_chs;
2045 n = 0;
2046 uint len = 0;
2047 while (len < col.m_length) {
2048 if (urandom(1 + col.m_bytelength) == 0) {
2049 break;
2050 }
2051 uint half = maxcharcount / 2;
2052 int r = irandom((par.m_pctrange * half) / 100);
2053 if (par.m_bdir != 0 && urandom(10) != 0) {
2054 if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
2055 r = -r;
2056 }
2057 uint i = half + r;
2058 assert(i < maxcharcount);
2059 const Chr& chr = chs->m_chr[i];
2060 assert(n + chr.m_size <= col.m_bytelength);
2061 memcpy(buf + n, chr.m_bytes, chr.m_size);
2062 n += chr.m_size;
2063 len++;
2064 }
2065 }
2066
2067 // operations
2068
2069 int
setval(Par par) const2070 Val::setval(Par par) const
2071 {
2072 Con& con = par.con();
2073 const Col& col = m_col;
2074 if (col.m_pk) {
2075 assert(!m_null);
2076 const char* addr = (const char*)dataaddr();
2077 LL5("setval pk [" << col << "] " << *this);
2078 CHK(con.equal(col.m_num, addr) == 0);
2079 } else {
2080 const char* addr = !m_null ? (const char*)dataaddr() : 0;
2081 LL5("setval non-pk [" << col << "] " << *this);
2082 CHK(con.setValue(col.m_num, addr) == 0);
2083 }
2084 return 0;
2085 }
2086
2087 int
setval(Par par,const ICol & icol) const2088 Val::setval(Par par, const ICol& icol) const
2089 {
2090 Con& con = par.con();
2091 assert(!m_null);
2092 const char* addr = (const char*)dataaddr();
2093 LL5("setval key [" << icol << "] " << *this);
2094 CHK(con.equal(icol.m_num, addr) == 0);
2095 return 0;
2096 }
2097
2098 // compare
2099
2100 int
cmp(Par par,const Val & val2) const2101 Val::cmp(Par par, const Val& val2) const
2102 {
2103 const Col& col = m_col;
2104 const Col& col2 = val2.m_col;
2105 assert(col.equal(col2));
2106 if (m_null || val2.m_null) {
2107 if (!m_null)
2108 return +1;
2109 if (!val2.m_null)
2110 return -1;
2111 return 0;
2112 }
2113 // verify data formats
2114 col.wellformed(dataaddr());
2115 col.wellformed(val2.dataaddr());
2116 // compare
2117 switch (col.m_type) {
2118 case Col::Unsigned:
2119 {
2120 if (m_uint32 < val2.m_uint32)
2121 return -1;
2122 if (m_uint32 > val2.m_uint32)
2123 return +1;
2124 return 0;
2125 }
2126 break;
2127 case Col::Char:
2128 {
2129 uint len = col.m_bytelength;
2130 return cmpchars(par, m_char, len, val2.m_char, len);
2131 }
2132 break;
2133 case Col::Varchar:
2134 {
2135 uint len1 = m_varchar[0];
2136 uint len2 = val2.m_varchar[0];
2137 return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
2138 }
2139 break;
2140 case Col::Longvarchar:
2141 {
2142 uint len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
2143 uint len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
2144 return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
2145 }
2146 break;
2147 default:
2148 break;
2149 }
2150 assert(false);
2151 return 0;
2152 }
2153
2154 int
cmpchars(Par par,const uchar * buf1,uint len1,const uchar * buf2,uint len2) const2155 Val::cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const
2156 {
2157 const Col& col = m_col;
2158 const Chs* chs = col.m_chs;
2159 CHARSET_INFO* cs = chs->m_cs;
2160 int k;
2161 if (!par.m_collsp) {
2162 uchar x1[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2163 uchar x2[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2164 // make strxfrm pad both to same length
2165 uint len = maxxmulsize * col.m_bytelength;
2166 int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
2167 int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
2168 assert(n1 != -1 && n1 == n2);
2169 k = memcmp(x1, x2, n1);
2170 } else {
2171 k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false);
2172 }
2173 return k < 0 ? -1 : k > 0 ? +1 : 0;
2174 }
2175
2176 int
verify(Par par,const Val & val2) const2177 Val::verify(Par par, const Val& val2) const
2178 {
2179 CHK(cmp(par, val2) == 0);
2180 return 0;
2181 }
2182
2183 // print
2184
2185 static void
printstring(NdbOut & out,const uchar * str,uint len,bool showlen)2186 printstring(NdbOut& out, const uchar* str, uint len, bool showlen)
2187 {
2188 char buf[4 * NDB_MAX_TUPLE_SIZE];
2189 char *p = buf;
2190 *p++ = '[';
2191 if (showlen) {
2192 sprintf(p, "%u:", len);
2193 p += strlen(p);
2194 }
2195 for (uint i = 0; i < len; i++) {
2196 uchar c = str[i];
2197 if (c == '\\') {
2198 *p++ = '\\';
2199 *p++ = c;
2200 } else if (0x20 <= c && c <= 0x7e) {
2201 *p++ = c;
2202 } else {
2203 *p++ = '\\';
2204 *p++ = hexstr[c >> 4];
2205 *p++ = hexstr[c & 15];
2206 }
2207 }
2208 *p++ = ']';
2209 *p = 0;
2210 out << buf;
2211 }
2212
2213 static NdbOut&
operator <<(NdbOut & out,const Val & val)2214 operator<<(NdbOut& out, const Val& val)
2215 {
2216 const Col& col = val.m_col;
2217 if (val.m_null) {
2218 out << "NULL";
2219 return out;
2220 }
2221 switch (col.m_type) {
2222 case Col::Unsigned:
2223 out << val.m_uint32;
2224 break;
2225 case Col::Char:
2226 {
2227 uint len = col.m_bytelength;
2228 printstring(out, val.m_char, len, false);
2229 }
2230 break;
2231 case Col::Varchar:
2232 {
2233 uint len = val.m_varchar[0];
2234 printstring(out, val.m_varchar + 1, len, true);
2235 }
2236 break;
2237 case Col::Longvarchar:
2238 {
2239 uint len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
2240 printstring(out, val.m_longvarchar + 2, len, true);
2241 }
2242 break;
2243 default:
2244 out << "type" << col.m_type;
2245 assert(false);
2246 break;
2247 }
2248 return out;
2249 }
2250
2251 // Row - table tuple
2252
2253 struct Row {
2254 const Tab& m_tab;
2255 Val** m_val;
2256 enum St {
2257 StUndef = 0,
2258 StDefine = 1,
2259 StPrepare = 2,
2260 StCommit = 3
2261 };
2262 enum Op {
2263 OpNone = 0,
2264 OpIns = 2,
2265 OpUpd = 4,
2266 OpDel = 8,
2267 OpRead = 16,
2268 OpReadEx = 32,
2269 OpReadCom = 64,
2270 OpDML = 2 | 4 | 8,
2271 OpREAD = 16 | 32 | 64
2272 };
2273 St m_st;
2274 Op m_op;
2275 Uint64 m_txid;
2276 Row* m_bi;
2277 // construct
2278 Row(const Tab& tab);
2279 ~Row();
2280 void copy(const Row& row2, bool copy_bi);
2281 void copyval(const Row& row2, uint colmask = ~0);
2282 void calc(Par par, uint i, uint colmask = ~0);
2283 // operations
2284 int setval(Par par, uint colmask = ~0);
2285 int setval(Par par, const ITab& itab);
2286 int insrow(Par par);
2287 int updrow(Par par);
2288 int updrow(Par par, const ITab& itab);
2289 int delrow(Par par);
2290 int delrow(Par par, const ITab& itab);
2291 int selrow(Par par);
2292 int selrow(Par par, const ITab& itab);
2293 int setrow(Par par);
2294 // compare
2295 int cmp(Par par, const Row& row2) const;
2296 int cmp(Par par, const Row& row2, const ITab& itab) const;
2297 int verify(Par par, const Row& row2, bool pkonly) const;
2298 private:
2299 Row& operator=(const Row& row2);
2300 };
2301
2302 static NdbOut&
2303 operator<<(NdbOut& out, const Row* rowp);
2304
2305 static NdbOut&
2306 operator<<(NdbOut& out, const Row& row);
2307
2308 // construct
2309
Row(const Tab & tab)2310 Row::Row(const Tab& tab) :
2311 m_tab(tab)
2312 {
2313 m_val = new Val* [tab.m_cols];
2314 for (uint k = 0; k < tab.m_cols; k++) {
2315 const Col& col = *tab.m_col[k];
2316 m_val[k] = new Val(col);
2317 }
2318 m_st = StUndef;
2319 m_op = OpNone;
2320 m_txid = 0;
2321 m_bi = 0;
2322 }
2323
~Row()2324 Row::~Row()
2325 {
2326 const Tab& tab = m_tab;
2327 for (uint k = 0; k < tab.m_cols; k++) {
2328 delete m_val[k];
2329 }
2330 delete [] m_val;
2331 delete m_bi;
2332 }
2333
2334 void
copy(const Row & row2,bool copy_bi)2335 Row::copy(const Row& row2, bool copy_bi)
2336 {
2337 const Tab& tab = m_tab;
2338 copyval(row2);
2339 m_st = row2.m_st;
2340 m_op = row2.m_op;
2341 m_txid = row2.m_txid;
2342 assert(m_bi == 0);
2343 if (copy_bi && row2.m_bi != 0) {
2344 m_bi = new Row(tab);
2345 m_bi->copy(*row2.m_bi, copy_bi);
2346 }
2347 }
2348
2349 void
copyval(const Row & row2,uint colmask)2350 Row::copyval(const Row& row2, uint colmask)
2351 {
2352 const Tab& tab = m_tab;
2353 assert(&tab == &row2.m_tab);
2354 for (uint k = 0; k < tab.m_cols; k++) {
2355 Val& val = *m_val[k];
2356 const Val& val2 = *row2.m_val[k];
2357 if ((1 << k) & colmask)
2358 val.copy(val2);
2359 }
2360 }
2361
2362 void
calc(Par par,uint i,uint colmask)2363 Row::calc(Par par, uint i, uint colmask)
2364 {
2365 const Tab& tab = m_tab;
2366 for (uint k = 0; k < tab.m_cols; k++) {
2367 if ((1 << k) & colmask) {
2368 Val& val = *m_val[k];
2369 val.calc(par, i);
2370 }
2371 }
2372 }
2373
2374 // operations
2375
2376 int
setval(Par par,uint colmask)2377 Row::setval(Par par, uint colmask)
2378 {
2379 const Tab& tab = m_tab;
2380 Rsq rsq(tab.m_cols);
2381 for (uint k = 0; k < tab.m_cols; k++) {
2382 uint k2 = rsq.next();
2383 if ((1 << k2) & colmask) {
2384 const Val& val = *m_val[k2];
2385 CHK(val.setval(par) == 0);
2386 }
2387 }
2388 return 0;
2389 }
2390
2391 int
setval(Par par,const ITab & itab)2392 Row::setval(Par par, const ITab& itab)
2393 {
2394 Rsq rsq(itab.m_icols);
2395 for (uint k = 0; k < itab.m_icols; k++) {
2396 uint k2 = rsq.next();
2397 const ICol& icol = *itab.m_icol[k2];
2398 const Col& col = icol.m_col;
2399 uint m = col.m_num;
2400 const Val& val = *m_val[m];
2401 CHK(val.setval(par, icol) == 0);
2402 }
2403 return 0;
2404 }
2405
2406 int
insrow(Par par)2407 Row::insrow(Par par)
2408 {
2409 Con& con = par.con();
2410 const Tab& tab = m_tab;
2411 CHK(con.getNdbOperation(tab) == 0);
2412 CHKCON(con.m_op->insertTuple() == 0, con);
2413 CHK(setval(par, tab.m_pkmask) == 0);
2414 CHK(setval(par, ~tab.m_pkmask) == 0);
2415 assert(m_st == StUndef);
2416 m_st = StDefine;
2417 m_op = OpIns;
2418 m_txid = con.m_txid;
2419 return 0;
2420 }
2421
2422 int
updrow(Par par)2423 Row::updrow(Par par)
2424 {
2425 Con& con = par.con();
2426 const Tab& tab = m_tab;
2427 CHK(con.getNdbOperation(tab) == 0);
2428 CHKCON(con.m_op->updateTuple() == 0, con);
2429 CHK(setval(par, tab.m_pkmask) == 0);
2430 CHK(setval(par, ~tab.m_pkmask) == 0);
2431 assert(m_st == StUndef);
2432 m_st = StDefine;
2433 m_op = OpUpd;
2434 m_txid = con.m_txid;
2435 return 0;
2436 }
2437
2438 int
updrow(Par par,const ITab & itab)2439 Row::updrow(Par par, const ITab& itab)
2440 {
2441 Con& con = par.con();
2442 const Tab& tab = m_tab;
2443 assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2444 CHK(con.getNdbIndexOperation(itab, tab) == 0);
2445 CHKCON(con.m_op->updateTuple() == 0, con);
2446 CHK(setval(par, itab) == 0);
2447 CHK(setval(par, ~tab.m_pkmask) == 0);
2448 assert(m_st == StUndef);
2449 m_st = StDefine;
2450 m_op = OpUpd;
2451 m_txid = con.m_txid;
2452 return 0;
2453 }
2454
2455 int
delrow(Par par)2456 Row::delrow(Par par)
2457 {
2458 Con& con = par.con();
2459 const Tab& tab = m_tab;
2460 CHK(con.getNdbOperation(m_tab) == 0);
2461 CHKCON(con.m_op->deleteTuple() == 0, con);
2462 CHK(setval(par, tab.m_pkmask) == 0);
2463 assert(m_st == StUndef);
2464 m_st = StDefine;
2465 m_op = OpDel;
2466 m_txid = con.m_txid;
2467 return 0;
2468 }
2469
2470 int
delrow(Par par,const ITab & itab)2471 Row::delrow(Par par, const ITab& itab)
2472 {
2473 Con& con = par.con();
2474 const Tab& tab = m_tab;
2475 assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2476 CHK(con.getNdbIndexOperation(itab, tab) == 0);
2477 CHKCON(con.m_op->deleteTuple() == 0, con);
2478 CHK(setval(par, itab) == 0);
2479 assert(m_st == StUndef);
2480 m_st = StDefine;
2481 m_op = OpDel;
2482 m_txid = con.m_txid;
2483 return 0;
2484 }
2485
2486 int
selrow(Par par)2487 Row::selrow(Par par)
2488 {
2489 Con& con = par.con();
2490 const Tab& tab = m_tab;
2491 CHK(con.getNdbOperation(m_tab) == 0);
2492 CHKCON(con.readTuple(par) == 0, con);
2493 CHK(setval(par, tab.m_pkmask) == 0);
2494 // TODO state
2495 return 0;
2496 }
2497
2498 int
selrow(Par par,const ITab & itab)2499 Row::selrow(Par par, const ITab& itab)
2500 {
2501 Con& con = par.con();
2502 const Tab& tab = m_tab;
2503 assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2504 CHK(con.getNdbIndexOperation(itab, tab) == 0);
2505 CHKCON(con.readTuple(par) == 0, con);
2506 CHK(setval(par, itab) == 0);
2507 // TODO state
2508 return 0;
2509 }
2510
2511 int
setrow(Par par)2512 Row::setrow(Par par)
2513 {
2514 Con& con = par.con();
2515 const Tab& tab = m_tab;
2516 CHK(setval(par, ~tab.m_pkmask) == 0);
2517 assert(m_st == StUndef);
2518 m_st = StDefine;
2519 m_op = OpUpd;
2520 m_txid = con.m_txid;
2521 return 0;
2522 }
2523
2524 // compare
2525
2526 int
cmp(Par par,const Row & row2) const2527 Row::cmp(Par par, const Row& row2) const
2528 {
2529 const Tab& tab = m_tab;
2530 assert(&tab == &row2.m_tab);
2531 int c = 0;
2532 for (uint k = 0; k < tab.m_cols; k++) {
2533 const Val& val = *m_val[k];
2534 const Val& val2 = *row2.m_val[k];
2535 if ((c = val.cmp(par, val2)) != 0)
2536 break;
2537 }
2538 return c;
2539 }
2540
2541 int
cmp(Par par,const Row & row2,const ITab & itab) const2542 Row::cmp(Par par, const Row& row2, const ITab& itab) const
2543 {
2544 const Tab& tab = m_tab;
2545 int c = 0;
2546 for (uint i = 0; i < itab.m_icols; i++) {
2547 const ICol& icol = *itab.m_icol[i];
2548 const Col& col = icol.m_col;
2549 uint k = col.m_num;
2550 assert(k < tab.m_cols);
2551 const Val& val = *m_val[k];
2552 const Val& val2 = *row2.m_val[k];
2553 if ((c = val.cmp(par, val2)) != 0)
2554 break;
2555 }
2556 return c;
2557 }
2558
2559 int
verify(Par par,const Row & row2,bool pkonly) const2560 Row::verify(Par par, const Row& row2, bool pkonly) const
2561 {
2562 const Tab& tab = m_tab;
2563 const Row& row1 = *this;
2564 assert(&row1.m_tab == &row2.m_tab);
2565 for (uint k = 0; k < tab.m_cols; k++) {
2566 const Col& col = row1.m_val[k]->m_col;
2567 if (!pkonly || col.m_pk) {
2568 const Val& val1 = *row1.m_val[k];
2569 const Val& val2 = *row2.m_val[k];
2570 CHK(val1.verify(par, val2) == 0);
2571 }
2572 }
2573 return 0;
2574 }
2575
2576 // print
2577
2578 static NdbOut&
operator <<(NdbOut & out,const Row::St st)2579 operator<<(NdbOut& out, const Row::St st)
2580 {
2581 if (st == Row::StUndef)
2582 out << "StUndef";
2583 else if (st == Row::StDefine)
2584 out << "StDefine";
2585 else if (st == Row::StPrepare)
2586 out << "StPrepare";
2587 else if (st == Row::StCommit)
2588 out << "StCommit";
2589 else
2590 out << "st=" << st;
2591 return out;
2592 }
2593
2594 static NdbOut&
operator <<(NdbOut & out,const Row::Op op)2595 operator<<(NdbOut& out, const Row::Op op)
2596 {
2597 if (op == Row::OpNone)
2598 out << "OpNone";
2599 else if (op == Row::OpIns)
2600 out << "OpIns";
2601 else if (op == Row::OpUpd)
2602 out << "OpUpd";
2603 else if (op == Row::OpDel)
2604 out << "OpDel";
2605 else if (op == Row::OpRead)
2606 out << "OpRead";
2607 else if (op == Row::OpReadEx)
2608 out << "OpReadEx";
2609 else if (op == Row::OpReadCom)
2610 out << "OpReadCom";
2611 else
2612 out << "op=" << op;
2613 return out;
2614 }
2615
2616 static NdbOut&
operator <<(NdbOut & out,const Row * rowp)2617 operator<<(NdbOut& out, const Row* rowp)
2618 {
2619 if (rowp == 0)
2620 out << "[null]";
2621 else
2622 out << *rowp;
2623 return out;
2624 }
2625
2626 static NdbOut&
operator <<(NdbOut & out,const Row & row)2627 operator<<(NdbOut& out, const Row& row)
2628 {
2629 const Tab& tab = row.m_tab;
2630 out << "[";
2631 for (uint i = 0; i < tab.m_cols; i++) {
2632 if (i > 0)
2633 out << " ";
2634 out << *row.m_val[i];
2635 }
2636 out << " " << row.m_st;
2637 out << " " << row.m_op;
2638 out << " " << HEX(row.m_txid);
2639 if (row.m_bi != 0)
2640 out << " " << row.m_bi;
2641 out << "]";
2642 return out;
2643 }
2644
2645 // Set - set of table tuples
2646
2647 struct Set {
2648 const Tab& m_tab;
2649 uint m_rows;
2650 Row** m_row;
2651 uint* m_rowkey; // maps row number (from 0) in scan to tuple key
2652 Row* m_keyrow;
2653 NdbRecAttr** m_rec;
2654 // construct
2655 Set(const Tab& tab, uint rows);
2656 ~Set();
2657 void reset();
2658 bool compat(Par par, uint i, const Row::Op op) const;
2659 void push(uint i);
2660 void copyval(uint i, uint colmask = ~0); // from bi
2661 void calc(Par par, uint i, uint colmask = ~0);
2662 uint count() const;
2663 const Row* getrow(uint i, bool dirty = false) const;
2664 int setrow(uint i, const Row* src, bool force=false);
2665 // transaction
2666 void post(Par par, ExecType et);
2667 // operations
2668 int insrow(Par par, uint i);
2669 int updrow(Par par, uint i);
2670 int updrow(Par par, const ITab& itab, uint i);
2671 int delrow(Par par, uint i);
2672 int delrow(Par par, const ITab& itab, uint i);
2673 int selrow(Par par, const Row& keyrow);
2674 int selrow(Par par, const ITab& itab, const Row& keyrow);
2675 int setrow(Par par, uint i);
2676 int getval(Par par);
2677 int getkey(Par par, uint* i);
2678 int putval(uint i, bool force, uint n = ~0);
2679 // compare
2680 void sort(Par par, const ITab& itab);
2681 int verify(Par par, const Set& set2, bool pkonly, bool dirty = false) const;
2682 int verifyorder(Par par, const ITab& itab, bool descending) const;
2683 // protect structure
2684 NdbMutex* m_mutex;
lockSet2685 void lock() const {
2686 NdbMutex_Lock(m_mutex);
2687 }
unlockSet2688 void unlock() const {
2689 NdbMutex_Unlock(m_mutex);
2690 }
2691 private:
2692 void sort(Par par, const ITab& itab, uint lo, uint hi);
2693 Set& operator=(const Set& set2);
2694 };
2695
2696 // construct
2697
Set(const Tab & tab,uint rows)2698 Set::Set(const Tab& tab, uint rows) :
2699 m_tab(tab)
2700 {
2701 m_rows = rows;
2702 m_row = new Row* [m_rows];
2703 for (uint i = 0; i < m_rows; i++) {
2704 m_row[i] = 0;
2705 }
2706 m_rowkey = new uint [m_rows];
2707 for (uint n = 0; n < m_rows; n++) {
2708 m_rowkey[n] = ~0;
2709 }
2710 m_keyrow = new Row(tab);
2711 m_rec = new NdbRecAttr* [tab.m_cols];
2712 for (uint k = 0; k < tab.m_cols; k++) {
2713 m_rec[k] = 0;
2714 }
2715 m_mutex = NdbMutex_Create();
2716 assert(m_mutex != 0);
2717 }
2718
~Set()2719 Set::~Set()
2720 {
2721 for (uint i = 0; i < m_rows; i++) {
2722 delete m_row[i];
2723 }
2724 delete [] m_row;
2725 delete [] m_rowkey;
2726 delete m_keyrow;
2727 delete [] m_rec;
2728 NdbMutex_Destroy(m_mutex);
2729 }
2730
2731 void
reset()2732 Set::reset()
2733 {
2734 for (uint i = 0; i < m_rows; i++) {
2735 m_row[i] = 0;
2736 }
2737 }
2738
2739 // this sucks
2740 bool
compat(Par par,uint i,const Row::Op op) const2741 Set::compat(Par par, uint i, const Row::Op op) const
2742 {
2743 Con& con = par.con();
2744 int ret = -1;
2745 int place = 0;
2746 do {
2747 const Row* rowp = getrow(i);
2748 if (rowp == 0) {
2749 ret = op == Row::OpIns;
2750 place = 1;
2751 break;
2752 }
2753 const Row& row = *rowp;
2754 if (!(op & Row::OpREAD)) {
2755 if (row.m_st == Row::StDefine || row.m_st == Row::StPrepare) {
2756 assert(row.m_op & Row::OpDML);
2757 assert(row.m_txid != 0);
2758 if (con.m_txid != row.m_txid) {
2759 ret = false;
2760 place = 2;
2761 break;
2762 }
2763 if (row.m_op != Row::OpDel) {
2764 ret = op == Row::OpUpd || op == Row::OpDel;
2765 place = 3;
2766 break;
2767 }
2768 ret = op == Row::OpIns;
2769 place = 4;
2770 break;
2771 }
2772 if (row.m_st == Row::StCommit) {
2773 assert(row.m_op == Row::OpNone);
2774 assert(row.m_txid == 0);
2775 ret = op == Row::OpUpd || op == Row::OpDel;
2776 place = 5;
2777 break;
2778 }
2779 }
2780 if (op & Row::OpREAD) {
2781 bool dirty =
2782 con.m_txid != row.m_txid &&
2783 par.m_lockmode == NdbOperation::LM_CommittedRead;
2784 const Row* rowp2 = getrow(i, dirty);
2785 if (rowp2 == 0 || rowp2->m_op == Row::OpDel) {
2786 ret = false;
2787 place = 6;
2788 break;
2789 }
2790 ret = true;
2791 place = 7;
2792 break;
2793 }
2794 } while (0);
2795 LL4("compat ret=" << ret << " place=" << place);
2796 assert(ret == false || ret == true);
2797 return ret;
2798 }
2799
2800 void
push(uint i)2801 Set::push(uint i)
2802 {
2803 const Tab& tab = m_tab;
2804 assert(i < m_rows);
2805 Row* bi = m_row[i];
2806 m_row[i] = new Row(tab);
2807 Row& row = *m_row[i];
2808 row.m_bi = bi;
2809 if (bi != 0)
2810 row.copyval(*bi);
2811 }
2812
2813 void
copyval(uint i,uint colmask)2814 Set::copyval(uint i, uint colmask)
2815 {
2816 assert(m_row[i] != 0);
2817 Row& row = *m_row[i];
2818 assert(row.m_bi != 0);
2819 row.copyval(*row.m_bi, colmask);
2820 }
2821
2822 void
calc(Par par,uint i,uint colmask)2823 Set::calc(Par par, uint i, uint colmask)
2824 {
2825 assert(m_row[i] != 0);
2826 Row& row = *m_row[i];
2827 row.calc(par, i, colmask);
2828 }
2829
2830 uint
count() const2831 Set::count() const
2832 {
2833 uint count = 0;
2834 for (uint i = 0; i < m_rows; i++) {
2835 if (m_row[i] != 0)
2836 count++;
2837 }
2838 return count;
2839 }
2840
2841 const Row*
getrow(uint i,bool dirty) const2842 Set::getrow(uint i, bool dirty) const
2843 {
2844 assert(i < m_rows);
2845 const Row* rowp = m_row[i];
2846 if (dirty) {
2847 while (rowp != 0) {
2848 bool b1 = rowp->m_op == Row::OpNone;
2849 bool b2 = rowp->m_st == Row::StCommit;
2850 assert(b1 == b2);
2851 if (b1) {
2852 assert(rowp->m_bi == 0);
2853 break;
2854 }
2855 rowp = rowp->m_bi;
2856 }
2857 }
2858 return rowp;
2859 }
2860
2861 int
setrow(uint i,const Row * src,bool force)2862 Set::setrow(uint i, const Row* src, bool force)
2863 {
2864 assert(i < m_rows);
2865 if (m_row[i] != 0)
2866 if (!force)
2867 return -1;
2868
2869 Row* newRow= new Row(src->m_tab);
2870 newRow->copy(*src, true);
2871 return 0;
2872 }
2873
2874
2875 // transaction
2876
2877 void
post(Par par,ExecType et)2878 Set::post(Par par, ExecType et)
2879 {
2880 LL4("post");
2881 Con& con = par.con();
2882 assert(con.m_txid != 0);
2883 uint i;
2884 for (i = 0; i < m_rows; i++) {
2885 Row* rowp = m_row[i];
2886 if (rowp == 0) {
2887 LL5("skip " << i << " " << rowp);
2888 continue;
2889 }
2890 if (rowp->m_st == Row::StCommit) {
2891 assert(rowp->m_op == Row::OpNone);
2892 assert(rowp->m_txid == 0);
2893 assert(rowp->m_bi == 0);
2894 LL5("skip committed " << i << " " << rowp);
2895 continue;
2896 }
2897 assert(rowp->m_st == Row::StDefine || rowp->m_st == Row::StPrepare);
2898 assert(rowp->m_txid != 0);
2899 if (con.m_txid != rowp->m_txid) {
2900 LL5("skip txid " << i << " " << HEX(con.m_txid) << " " << rowp);
2901 continue;
2902 }
2903 // TODO read ops
2904 assert(rowp->m_op & Row::OpDML);
2905 LL4("post BEFORE " << rowp);
2906 if (et == NoCommit) {
2907 if (rowp->m_st == Row::StDefine) {
2908 rowp->m_st = Row::StPrepare;
2909 Row* bi = rowp->m_bi;
2910 while (bi != 0 && bi->m_st == Row::StDefine) {
2911 bi->m_st = Row::StPrepare;
2912 bi = bi->m_bi;
2913 }
2914 }
2915 } else if (et == Commit) {
2916 if (rowp->m_op != Row::OpDel) {
2917 rowp->m_st = Row::StCommit;
2918 rowp->m_op = Row::OpNone;
2919 rowp->m_txid = 0;
2920 delete rowp->m_bi;
2921 rowp->m_bi = 0;
2922 } else {
2923 delete rowp;
2924 rowp = 0;
2925 }
2926 } else if (et == Rollback) {
2927 while (rowp != 0 && rowp->m_st != Row::StCommit) {
2928 Row* tmp = rowp;
2929 rowp = rowp->m_bi;
2930 tmp->m_bi = 0;
2931 delete tmp;
2932 }
2933 } else {
2934 assert(false);
2935 }
2936 m_row[i] = rowp;
2937 LL4("post AFTER " << rowp);
2938 }
2939 }
2940
2941 // operations
2942
2943 int
insrow(Par par,uint i)2944 Set::insrow(Par par, uint i)
2945 {
2946 assert(m_row[i] != 0);
2947 Row& row = *m_row[i];
2948 CHK(row.insrow(par) == 0);
2949 return 0;
2950 }
2951
2952 int
updrow(Par par,uint i)2953 Set::updrow(Par par, uint i)
2954 {
2955 assert(m_row[i] != 0);
2956 Row& row = *m_row[i];
2957 CHK(row.updrow(par) == 0);
2958 return 0;
2959 }
2960
2961 int
updrow(Par par,const ITab & itab,uint i)2962 Set::updrow(Par par, const ITab& itab, uint i)
2963 {
2964 assert(m_row[i] != 0);
2965 Row& row = *m_row[i];
2966 CHK(row.updrow(par, itab) == 0);
2967 return 0;
2968 }
2969
2970 int
delrow(Par par,uint i)2971 Set::delrow(Par par, uint i)
2972 {
2973 assert(m_row[i] != 0);
2974 Row& row = *m_row[i];
2975 CHK(row.delrow(par) == 0);
2976 return 0;
2977 }
2978
2979 int
delrow(Par par,const ITab & itab,uint i)2980 Set::delrow(Par par, const ITab& itab, uint i)
2981 {
2982 assert(m_row[i] != 0);
2983 Row& row = *m_row[i];
2984 CHK(row.delrow(par, itab) == 0);
2985 return 0;
2986 }
2987
2988 int
selrow(Par par,const Row & keyrow)2989 Set::selrow(Par par, const Row& keyrow)
2990 {
2991 const Tab& tab = par.tab();
2992 LL5("selrow " << tab.m_name << " keyrow " << keyrow);
2993 m_keyrow->copyval(keyrow, tab.m_pkmask);
2994 CHK(m_keyrow->selrow(par) == 0);
2995 CHK(getval(par) == 0);
2996 return 0;
2997 }
2998
2999 int
selrow(Par par,const ITab & itab,const Row & keyrow)3000 Set::selrow(Par par, const ITab& itab, const Row& keyrow)
3001 {
3002 LL5("selrow " << itab.m_name << " keyrow " << keyrow);
3003 m_keyrow->copyval(keyrow, itab.m_keymask);
3004 CHK(m_keyrow->selrow(par, itab) == 0);
3005 CHK(getval(par) == 0);
3006 return 0;
3007 }
3008
3009 int
setrow(Par par,uint i)3010 Set::setrow(Par par, uint i)
3011 {
3012 assert(m_row[i] != 0);
3013 CHK(m_row[i]->setrow(par) == 0);
3014 return 0;
3015 }
3016
3017 int
getval(Par par)3018 Set::getval(Par par)
3019 {
3020 Con& con = par.con();
3021 const Tab& tab = m_tab;
3022 Rsq rsq1(tab.m_cols);
3023 for (uint k = 0; k < tab.m_cols; k++) {
3024 uint k2 = rsq1.next();
3025 CHK(con.getValue(k2, m_rec[k2]) == 0);
3026 }
3027 return 0;
3028 }
3029
3030 int
getkey(Par par,uint * i)3031 Set::getkey(Par par, uint* i)
3032 {
3033 const Tab& tab = m_tab;
3034 uint k = tab.m_keycol;
3035 assert(m_rec[k] != 0);
3036 const char* aRef = m_rec[k]->aRef();
3037 Uint32 key = *(const Uint32*)aRef;
3038 LL5("getkey: " << key);
3039 CHK(key < m_rows);
3040 *i = key;
3041 return 0;
3042 }
3043
3044 int
putval(uint i,bool force,uint n)3045 Set::putval(uint i, bool force, uint n)
3046 {
3047 const Tab& tab = m_tab;
3048 LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
3049 CHK( i<m_rows );
3050 if (m_row[i] != 0) {
3051 assert(force);
3052 delete m_row[i];
3053 m_row[i] = 0;
3054 }
3055 m_row[i] = new Row(tab);
3056 Row& row = *m_row[i];
3057 for (uint k = 0; k < tab.m_cols; k++) {
3058 Val& val = *row.m_val[k];
3059 NdbRecAttr* rec = m_rec[k];
3060 assert(rec != 0);
3061 if (rec->isNULL()) {
3062 val.m_null = true;
3063 continue;
3064 }
3065 const char* aRef = m_rec[k]->aRef();
3066 val.copy(aRef);
3067 val.m_null = false;
3068 }
3069 if (n != (uint) ~0)
3070 {
3071 CHK(n < m_rows);
3072 m_rowkey[n] = i;
3073 }
3074 return 0;
3075 }
3076
3077 // compare
3078
3079 void
sort(Par par,const ITab & itab)3080 Set::sort(Par par, const ITab& itab)
3081 {
3082 if (m_rows != 0)
3083 sort(par, itab, 0, m_rows - 1);
3084 }
3085
3086 void
sort(Par par,const ITab & itab,uint lo,uint hi)3087 Set::sort(Par par, const ITab& itab, uint lo, uint hi)
3088 {
3089 assert(lo < m_rows && hi < m_rows && lo <= hi);
3090 Row* const p = m_row[lo];
3091 uint i = lo;
3092 uint j = hi;
3093 while (i < j) {
3094 while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
3095 j--;
3096 if (i < j) {
3097 m_row[i] = m_row[j];
3098 i++;
3099 }
3100 while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
3101 i++;
3102 if (i < j) {
3103 m_row[j] = m_row[i];
3104 j--;
3105 }
3106 }
3107 m_row[i] = p;
3108 if (lo < i)
3109 sort(par, itab, lo, i - 1);
3110 if (hi > i)
3111 sort(par, itab, i + 1, hi);
3112 }
3113
3114 /*
3115 * set1 (self) is from dml and can contain un-committed operations.
3116 * set2 is from read and contains no operations. "dirty" applies
3117 * to set1: false = use latest row, true = use committed row.
3118 */
3119 int
verify(Par par,const Set & set2,bool pkonly,bool dirty) const3120 Set::verify(Par par, const Set& set2, bool pkonly, bool dirty) const
3121 {
3122 const Set& set1 = *this;
3123 assert(&set1.m_tab == &set2.m_tab && set1.m_rows == set2.m_rows);
3124 LL3("verify dirty:" << dirty);
3125 for (uint i = 0; i < set1.m_rows; i++) {
3126 // the row versions we actually compare
3127 const Row* row1p = set1.getrow(i, dirty);
3128 const Row* row2p = set2.getrow(i);
3129 bool ok = true;
3130 int place = 0;
3131 if (row1p == 0) {
3132 if (row2p != 0) {
3133 ok = false;
3134 place = 1;
3135 }
3136 } else {
3137 Row::Op op1 = row1p->m_op;
3138 if (op1 != Row::OpDel) {
3139 if (row2p == 0) {
3140 ok = false;
3141 place = 2;
3142 } else if (row1p->verify(par, *row2p, pkonly) == -1) {
3143 ok = false;
3144 place = 3;
3145 }
3146 } else if (row2p != 0) {
3147 ok = false;
3148 place = 4;
3149 }
3150 }
3151 if (!ok) {
3152 LL1("verify " << i << " failed at " << place);
3153 LL1("row1 " << row1p);
3154 LL1("row2 " << row2p);
3155 CHK(false);
3156 }
3157 }
3158 return 0;
3159 }
3160
3161 int
verifyorder(Par par,const ITab & itab,bool descending) const3162 Set::verifyorder(Par par, const ITab& itab, bool descending) const
3163 {
3164 for (uint n = 0; n < m_rows; n++) {
3165 uint i2 = m_rowkey[n];
3166 if (i2 == (uint) ~0)
3167 break;
3168 if (n == 0)
3169 continue;
3170 uint i1 = m_rowkey[n - 1];
3171 assert(m_row[i1] != 0 && m_row[i2] != 0);
3172 const Row& row1 = *m_row[i1];
3173 const Row& row2 = *m_row[i2];
3174 bool ok;
3175 if (!descending)
3176 ok= (row1.cmp(par, row2, itab) <= 0);
3177 else
3178 ok= (row1.cmp(par, row2, itab) >= 0);
3179
3180 if (!ok)
3181 {
3182 LL1("verifyorder " << n << " failed");
3183 LL1("row1 " << row1);
3184 LL1("row2 " << row2);
3185 CHK(false);
3186 }
3187 }
3188 return 0;
3189 }
3190
3191 // print
3192
3193 #if 0
3194 static NdbOut&
3195 operator<<(NdbOut& out, const Set& set)
3196 {
3197 for (uint i = 0; i < set.m_rows; i++) {
3198 const Row& row = *set.m_row[i];
3199 if (i > 0)
3200 out << endl;
3201 out << row;
3202 }
3203 return out;
3204 }
3205 #endif
3206
3207 // BVal - range scan bound
3208
3209 struct BVal : public Val {
3210 const ICol& m_icol;
3211 int m_type;
3212 BVal(const ICol& icol);
3213 int setbnd(Par par) const;
3214 int setflt(Par par) const;
3215 };
3216
BVal(const ICol & icol)3217 BVal::BVal(const ICol& icol) :
3218 Val(icol.m_col),
3219 m_icol(icol)
3220 {
3221 }
3222
3223 int
setbnd(Par par) const3224 BVal::setbnd(Par par) const
3225 {
3226 Con& con = par.con();
3227 assert(g_compare_null || !m_null);
3228 const char* addr = !m_null ? (const char*)dataaddr() : 0;
3229 const ICol& icol = m_icol;
3230 CHK(con.setBound(icol.m_num, m_type, addr) == 0);
3231 return 0;
3232 }
3233
3234 int
setflt(Par par) const3235 BVal::setflt(Par par) const
3236 {
3237 static uint index_bound_to_filter_bound[5] = {
3238 NdbScanFilter::COND_GE,
3239 NdbScanFilter::COND_GT,
3240 NdbScanFilter::COND_LE,
3241 NdbScanFilter::COND_LT,
3242 NdbScanFilter::COND_EQ
3243 };
3244 Con& con = par.con();
3245 assert(g_compare_null || !m_null);
3246 const char* addr = !m_null ? (const char*)dataaddr() : 0;
3247 const ICol& icol = m_icol;
3248 const Col& col = icol.m_col;
3249 uint length = col.m_bytesize;
3250 uint cond = index_bound_to_filter_bound[m_type];
3251 CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
3252 return 0;
3253 }
3254
3255 static NdbOut&
operator <<(NdbOut & out,const BVal & bval)3256 operator<<(NdbOut& out, const BVal& bval)
3257 {
3258 const ICol& icol = bval.m_icol;
3259 const Col& col = icol.m_col;
3260 const Val& val = bval;
3261 out << "type=" << bval.m_type;
3262 out << " icol=" << icol.m_num;
3263 out << " col=" << col.m_num << "," << col.m_name;
3264 out << " value=" << val;
3265 return out;
3266 }
3267
3268 // BSet - set of bounds
3269
3270 struct BSet {
3271 const Tab& m_tab;
3272 const ITab& m_itab;
3273 uint m_alloc;
3274 uint m_bvals;
3275 BVal** m_bval;
3276 BSet(const Tab& tab, const ITab& itab);
3277 ~BSet();
3278 void reset();
3279 void calc(Par par);
3280 void calcpk(Par par, uint i);
3281 int setbnd(Par par) const;
3282 int setflt(Par par) const;
3283 void filter(Par par, const Set& set, Set& set2) const;
3284 };
3285
BSet(const Tab & tab,const ITab & itab)3286 BSet::BSet(const Tab& tab, const ITab& itab) :
3287 m_tab(tab),
3288 m_itab(itab),
3289 m_alloc(2 * itab.m_icols),
3290 m_bvals(0)
3291 {
3292 m_bval = new BVal* [m_alloc];
3293 for (uint i = 0; i < m_alloc; i++) {
3294 m_bval[i] = 0;
3295 }
3296 }
3297
~BSet()3298 BSet::~BSet()
3299 {
3300 delete [] m_bval;
3301 }
3302
3303 void
reset()3304 BSet::reset()
3305 {
3306 while (m_bvals > 0) {
3307 uint i = --m_bvals;
3308 delete m_bval[i];
3309 m_bval[i] = 0;
3310 }
3311 }
3312
3313 void
calc(Par par)3314 BSet::calc(Par par)
3315 {
3316 const ITab& itab = m_itab;
3317 par.m_pctrange = par.m_pctbrange;
3318 reset();
3319 for (uint k = 0; k < itab.m_icols; k++) {
3320 const ICol& icol = *itab.m_icol[k];
3321 for (uint i = 0; i <= 1; i++) {
3322 if (m_bvals == 0 && urandom(100) == 0)
3323 return;
3324 if (m_bvals != 0 && urandom(3) == 0)
3325 return;
3326 assert(m_bvals < m_alloc);
3327 BVal& bval = *new BVal(icol);
3328 m_bval[m_bvals++] = &bval;
3329 bval.m_null = false;
3330 uint sel;
3331 do {
3332 // equality bound only on i==0
3333 sel = urandom(5 - i);
3334 } while (strchr(par.m_bound, '0' + sel) == 0);
3335 if (sel < 2)
3336 bval.m_type = 0 | (1 << i);
3337 else if (sel < 4)
3338 bval.m_type = 1 | (1 << i);
3339 else
3340 bval.m_type = 4;
3341 if (k + 1 < itab.m_icols)
3342 bval.m_type = 4;
3343 if (!g_compare_null)
3344 par.m_pctnull = 0;
3345 if (bval.m_type == 0 || bval.m_type == 1)
3346 par.m_bdir = -1;
3347 if (bval.m_type == 2 || bval.m_type == 3)
3348 par.m_bdir = +1;
3349 do {
3350 bval.calcnokey(par);
3351 if (i == 1) {
3352 assert(m_bvals >= 2);
3353 const BVal& bv1 = *m_bval[m_bvals - 2];
3354 const BVal& bv2 = *m_bval[m_bvals - 1];
3355 if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0)
3356 continue;
3357 }
3358 } while (0);
3359 // equality bound only once
3360 if (bval.m_type == 4)
3361 break;
3362 }
3363 }
3364 }
3365
3366 void
calcpk(Par par,uint i)3367 BSet::calcpk(Par par, uint i)
3368 {
3369 const ITab& itab = m_itab;
3370 reset();
3371 for (uint k = 0; k < itab.m_icols; k++) {
3372 const ICol& icol = *itab.m_icol[k];
3373 const Col& col = icol.m_col;
3374 assert(col.m_pk);
3375 assert(m_bvals < m_alloc);
3376 BVal& bval = *new BVal(icol);
3377 m_bval[m_bvals++] = &bval;
3378 bval.m_type = 4;
3379 bval.calc(par, i);
3380 }
3381 }
3382
3383 int
setbnd(Par par) const3384 BSet::setbnd(Par par) const
3385 {
3386 if (m_bvals != 0) {
3387 Rsq rsq1(m_bvals);
3388 for (uint j = 0; j < m_bvals; j++) {
3389 uint j2 = rsq1.next();
3390 const BVal& bval = *m_bval[j2];
3391 CHK(bval.setbnd(par) == 0);
3392 }
3393 }
3394 return 0;
3395 }
3396
3397 int
setflt(Par par) const3398 BSet::setflt(Par par) const
3399 {
3400 Con& con = par.con();
3401 CHK(con.getNdbScanFilter() == 0);
3402 CHK(con.beginFilter(NdbScanFilter::AND) == 0);
3403 if (m_bvals != 0) {
3404 Rsq rsq1(m_bvals);
3405 for (uint j = 0; j < m_bvals; j++) {
3406 uint j2 = rsq1.next();
3407 const BVal& bval = *m_bval[j2];
3408 CHK(bval.setflt(par) == 0);
3409 }
3410 // duplicate
3411 if (urandom(5) == 0) {
3412 uint j3 = urandom(m_bvals);
3413 const BVal& bval = *m_bval[j3];
3414 CHK(bval.setflt(par) == 0);
3415 }
3416 }
3417 CHK(con.endFilter() == 0);
3418 return 0;
3419 }
3420
3421 void
filter(Par par,const Set & set,Set & set2) const3422 BSet::filter(Par par, const Set& set, Set& set2) const
3423 {
3424 const Tab& tab = m_tab;
3425 const ITab& itab = m_itab;
3426 assert(&tab == &set2.m_tab && set.m_rows == set2.m_rows);
3427 assert(set2.count() == 0);
3428 for (uint i = 0; i < set.m_rows; i++) {
3429 set.lock();
3430 do {
3431 if (set.m_row[i] == 0) {
3432 break;
3433 }
3434 const Row& row = *set.m_row[i];
3435 if (!g_store_null_key) {
3436 bool ok1 = false;
3437 for (uint k = 0; k < itab.m_icols; k++) {
3438 const ICol& icol = *itab.m_icol[k];
3439 const Col& col = icol.m_col;
3440 const Val& val = *row.m_val[col.m_num];
3441 if (!val.m_null) {
3442 ok1 = true;
3443 break;
3444 }
3445 }
3446 if (!ok1)
3447 break;
3448 }
3449 bool ok2 = true;
3450 for (uint j = 0; j < m_bvals; j++) {
3451 const BVal& bval = *m_bval[j];
3452 const ICol& icol = bval.m_icol;
3453 const Col& col = icol.m_col;
3454 const Val& val = *row.m_val[col.m_num];
3455 int ret = bval.cmp(par, val);
3456 LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
3457 if (bval.m_type == 0)
3458 ok2 = (ret <= 0);
3459 else if (bval.m_type == 1)
3460 ok2 = (ret < 0);
3461 else if (bval.m_type == 2)
3462 ok2 = (ret >= 0);
3463 else if (bval.m_type == 3)
3464 ok2 = (ret > 0);
3465 else if (bval.m_type == 4)
3466 ok2 = (ret == 0);
3467 else {
3468 assert(false);
3469 }
3470 if (!ok2)
3471 break;
3472 }
3473 if (!ok2)
3474 break;
3475 assert(set2.m_row[i] == 0);
3476 set2.m_row[i] = new Row(tab);
3477 Row& row2 = *set2.m_row[i];
3478 row2.copy(row, true);
3479 } while (0);
3480 set.unlock();
3481 }
3482 }
3483
3484 static NdbOut&
operator <<(NdbOut & out,const BSet & bset)3485 operator<<(NdbOut& out, const BSet& bset)
3486 {
3487 out << "bounds=" << bset.m_bvals;
3488 for (uint j = 0; j < bset.m_bvals; j++) {
3489 const BVal& bval = *bset.m_bval[j];
3490 out << " [bound " << j << ": " << bval << "]";
3491 }
3492 return out;
3493 }
3494
3495 // pk operations
3496
3497 static int
pkinsert(Par par)3498 pkinsert(Par par)
3499 {
3500 Con& con = par.con();
3501 const Tab& tab = par.tab();
3502 Set& set = par.set();
3503 LL3("pkinsert " << tab.m_name);
3504 CHK(con.startTransaction() == 0);
3505 uint batch = 0;
3506 for (uint j = 0; j < par.m_rows; j++) {
3507 uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3508 uint i = thrrow(par, j2);
3509 set.lock();
3510 if (!set.compat(par, i, Row::OpIns)) {
3511 LL3("pkinsert SKIP " << i << " " << set.getrow(i));
3512 set.unlock();
3513 } else {
3514 set.push(i);
3515 set.calc(par, i);
3516 CHK(set.insrow(par, i) == 0);
3517 set.unlock();
3518 LL4("pkinsert key=" << i << " " << set.getrow(i));
3519 batch++;
3520 }
3521 bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3522 if (batch == par.m_batch || lastbatch) {
3523 uint err = par.m_catcherr;
3524 ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3525 CHK(con.execute(et, err) == 0);
3526 set.lock();
3527 set.post(par, !err ? et : Rollback);
3528 set.unlock();
3529 if (err) {
3530 LL1("pkinsert key=" << i << " stop on " << con.errname(err));
3531 break;
3532 }
3533 batch = 0;
3534 if (!lastbatch) {
3535 con.closeTransaction();
3536 CHK(con.startTransaction() == 0);
3537 }
3538 }
3539 }
3540 con.closeTransaction();
3541 return 0;
3542 }
3543
3544 static int
pkupdate(Par par)3545 pkupdate(Par par)
3546 {
3547 Con& con = par.con();
3548 const Tab& tab = par.tab();
3549 Set& set = par.set();
3550 LL3("pkupdate " << tab.m_name);
3551 CHK(con.startTransaction() == 0);
3552 uint batch = 0;
3553 for (uint j = 0; j < par.m_rows; j++) {
3554 uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3555 uint i = thrrow(par, j2);
3556 set.lock();
3557 if (!set.compat(par, i, Row::OpUpd)) {
3558 LL3("pkupdate SKIP " << i << " " << set.getrow(i));
3559 set.unlock();
3560 } else {
3561 set.push(i);
3562 set.copyval(i, tab.m_pkmask);
3563 set.calc(par, i, ~tab.m_pkmask);
3564 CHK(set.updrow(par, i) == 0);
3565 set.unlock();
3566 LL4("pkupdate key=" << i << " " << set.getrow(i));
3567 batch++;
3568 }
3569 bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3570 if (batch == par.m_batch || lastbatch) {
3571 uint err = par.m_catcherr;
3572 ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3573 CHK(con.execute(et, err) == 0);
3574 set.lock();
3575 set.post(par, !err ? et : Rollback);
3576 set.unlock();
3577 if (err) {
3578 LL1("pkupdate key=" << i << ": stop on " << con.errname(err));
3579 break;
3580 }
3581 batch = 0;
3582 if (!lastbatch) {
3583 con.closeTransaction();
3584 CHK(con.startTransaction() == 0);
3585 }
3586 }
3587 }
3588 con.closeTransaction();
3589 return 0;
3590 }
3591
3592 static int
pkdelete(Par par)3593 pkdelete(Par par)
3594 {
3595 Con& con = par.con();
3596 const Tab& tab = par.tab();
3597 Set& set = par.set();
3598 LL3("pkdelete " << tab.m_name);
3599 CHK(con.startTransaction() == 0);
3600 uint batch = 0;
3601 for (uint j = 0; j < par.m_rows; j++) {
3602 uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3603 uint i = thrrow(par, j2);
3604 set.lock();
3605 if (!set.compat(par, i, Row::OpDel)) {
3606 LL3("pkdelete SKIP " << i << " " << set.getrow(i));
3607 set.unlock();
3608 } else {
3609 set.push(i);
3610 set.copyval(i, tab.m_pkmask);
3611 CHK(set.delrow(par, i) == 0);
3612 set.unlock();
3613 LL4("pkdelete key=" << i << " " << set.getrow(i));
3614 batch++;
3615 }
3616 bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3617 if (batch == par.m_batch || lastbatch) {
3618 uint err = par.m_catcherr;
3619 ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3620 CHK(con.execute(et, err) == 0);
3621 set.lock();
3622 set.post(par, !err ? et : Rollback);
3623 set.unlock();
3624 if (err) {
3625 LL1("pkdelete key=" << i << " stop on " << con.errname(err));
3626 break;
3627 }
3628 batch = 0;
3629 if (!lastbatch) {
3630 con.closeTransaction();
3631 CHK(con.startTransaction() == 0);
3632 }
3633 }
3634 }
3635 con.closeTransaction();
3636 return 0;
3637 }
3638
3639 #if 0
3640 static int
3641 pkread(Par par)
3642 {
3643 Con& con = par.con();
3644 const Tab& tab = par.tab();
3645 Set& set = par.set();
3646 LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
3647 // expected
3648 const Set& set1 = set;
3649 Set set2(tab, set.m_rows);
3650 for (uint i = 0; i < set.m_rows; i++) {
3651 set.lock();
3652 // TODO lock mode
3653 if (!set.compat(par, i, Row::OpREAD)) {
3654 LL3("pkread SKIP " << i << " " << set.getrow(i));
3655 set.unlock();
3656 continue;
3657 }
3658 set.unlock();
3659 CHK(con.startTransaction() == 0);
3660 CHK(set2.selrow(par, *set1.m_row[i]) == 0);
3661 CHK(con.execute(Commit) == 0);
3662 uint i2 = (uint)-1;
3663 CHK(set2.getkey(par, &i2) == 0 && i == i2);
3664 CHK(set2.putval(i, false) == 0);
3665 LL4("row " << set2.count() << " " << set2.getrow(i));
3666 con.closeTransaction();
3667 }
3668 if (par.m_verify)
3669 CHK(set1.verify(par, set2, false) == 0);
3670 return 0;
3671 }
3672 #endif
3673
3674 static int
pkreadfast(Par par,uint count)3675 pkreadfast(Par par, uint count)
3676 {
3677 Con& con = par.con();
3678 const Tab& tab = par.tab();
3679 const Set& set = par.set();
3680 LL3("pkfast " << tab.m_name);
3681 Row keyrow(tab);
3682 // not batched on purpose
3683 for (uint j = 0; j < count; j++) {
3684 uint i = urandom(set.m_rows);
3685 assert(set.compat(par, i, Row::OpREAD));
3686 CHK(con.startTransaction() == 0);
3687 // define key
3688 keyrow.calc(par, i);
3689 CHK(keyrow.selrow(par) == 0);
3690 NdbRecAttr* rec;
3691 // get 1st column
3692 CHK(con.getValue((Uint32)0, rec) == 0);
3693 CHK(con.execute(Commit) == 0);
3694 con.closeTransaction();
3695 }
3696 return 0;
3697 }
3698
3699 // hash index operations
3700
3701 static int
hashindexupdate(Par par,const ITab & itab)3702 hashindexupdate(Par par, const ITab& itab)
3703 {
3704 Con& con = par.con();
3705 const Tab& tab = par.tab();
3706 Set& set = par.set();
3707 LL3("hashindexupdate " << itab.m_name);
3708 CHK(con.startTransaction() == 0);
3709 uint batch = 0;
3710 for (uint j = 0; j < par.m_rows; j++) {
3711 uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3712 uint i = thrrow(par, j2);
3713 set.lock();
3714 if (!set.compat(par, i, Row::OpUpd)) {
3715 LL3("hashindexupdate SKIP " << i << " " << set.getrow(i));
3716 set.unlock();
3717 } else {
3718 // table pk and index key are not updated
3719 set.push(i);
3720 uint keymask = tab.m_pkmask | itab.m_keymask;
3721 set.copyval(i, keymask);
3722 set.calc(par, i, ~keymask);
3723 CHK(set.updrow(par, itab, i) == 0);
3724 set.unlock();
3725 LL4("hashindexupdate " << i << " " << set.getrow(i));
3726 batch++;
3727 }
3728 bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3729 if (batch == par.m_batch || lastbatch) {
3730 uint err = par.m_catcherr;
3731 ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3732 CHK(con.execute(et, err) == 0);
3733 set.lock();
3734 set.post(par, !err ? et : Rollback);
3735 set.unlock();
3736 if (err) {
3737 LL1("hashindexupdate " << i << " stop on " << con.errname(err));
3738 break;
3739 }
3740 batch = 0;
3741 if (!lastbatch) {
3742 con.closeTransaction();
3743 CHK(con.startTransaction() == 0);
3744 }
3745 }
3746 }
3747 con.closeTransaction();
3748 return 0;
3749 }
3750
3751 static int
hashindexdelete(Par par,const ITab & itab)3752 hashindexdelete(Par par, const ITab& itab)
3753 {
3754 Con& con = par.con();
3755 Set& set = par.set();
3756 LL3("hashindexdelete " << itab.m_name);
3757 CHK(con.startTransaction() == 0);
3758 uint batch = 0;
3759 for (uint j = 0; j < par.m_rows; j++) {
3760 uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3761 uint i = thrrow(par, j2);
3762 set.lock();
3763 if (!set.compat(par, i, Row::OpDel)) {
3764 LL3("hashindexdelete SKIP " << i << " " << set.getrow(i));
3765 set.unlock();
3766 } else {
3767 set.push(i);
3768 set.copyval(i, itab.m_keymask);
3769 CHK(set.delrow(par, itab, i) == 0);
3770 set.unlock();
3771 LL4("hashindexdelete " << i << " " << set.getrow(i));
3772 batch++;
3773 }
3774 bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3775 if (batch == par.m_batch || lastbatch) {
3776 uint err = par.m_catcherr;
3777 ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3778 CHK(con.execute(et, err) == 0);
3779 set.lock();
3780 set.post(par, !err ? et : Rollback);
3781 set.unlock();
3782 if (err) {
3783 LL1("hashindexdelete " << i << " stop on " << con.errname(err));
3784 break;
3785 }
3786 batch = 0;
3787 if (!lastbatch) {
3788 con.closeTransaction();
3789 CHK(con.startTransaction() == 0);
3790 }
3791 }
3792 }
3793 con.closeTransaction();
3794 return 0;
3795 }
3796
3797 static int
hashindexread(Par par,const ITab & itab)3798 hashindexread(Par par, const ITab& itab)
3799 {
3800 Con& con = par.con();
3801 const Tab& tab = par.tab();
3802 Set& set = par.set();
3803 LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
3804 // expected
3805 const Set& set1 = set;
3806 Set set2(tab, set.m_rows);
3807 for (uint i = 0; i < set.m_rows; i++) {
3808 set.lock();
3809 // TODO lock mode
3810 if (!set.compat(par, i, Row::OpREAD)) {
3811 LL3("hashindexread SKIP " << i << " " << set.getrow(i));
3812 set.unlock();
3813 continue;
3814 }
3815 set.unlock();
3816 CHK(con.startTransaction() == 0);
3817 CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
3818 CHK(con.execute(Commit) == 0);
3819 uint i2 = (uint)-1;
3820 CHK(set2.getkey(par, &i2) == 0 && i == i2);
3821 CHK(set2.putval(i, false) == 0);
3822 LL4("row " << set2.count() << " " << *set2.m_row[i]);
3823 con.closeTransaction();
3824 }
3825 if (par.m_verify)
3826 CHK(set1.verify(par, set2, false) == 0);
3827 return 0;
3828 }
3829
3830 // scan read
3831
3832 static int
scanreadtable(Par par)3833 scanreadtable(Par par)
3834 {
3835 Con& con = par.con();
3836 const Tab& tab = par.tab();
3837 const Set& set = par.set();
3838 // expected
3839 const Set& set1 = set;
3840 LL3("scanreadtable " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
3841 Set set2(tab, set.m_rows);
3842 CHK(con.startTransaction() == 0);
3843 CHK(con.getNdbScanOperation(tab) == 0);
3844 CHK(con.readTuples(par) == 0);
3845 set2.getval(par);
3846 CHK(con.executeScan() == 0);
3847 uint n = 0;
3848 while (1) {
3849 int ret;
3850 uint err = par.m_catcherr;
3851 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3852 if (ret == 1)
3853 break;
3854 if (err) {
3855 LL1("scanreadtable stop on " << con.errname(err));
3856 break;
3857 }
3858 uint i = (uint)-1;
3859 CHK(set2.getkey(par, &i) == 0);
3860 CHK(set2.putval(i, false, n) == 0);
3861 LL4("row " << n << " " << *set2.m_row[i]);
3862 n++;
3863 }
3864 con.closeTransaction();
3865 if (par.m_verify)
3866 CHK(set1.verify(par, set2, false) == 0);
3867 LL3("scanreadtable " << tab.m_name << " done rows=" << n);
3868 return 0;
3869 }
3870
3871 static int
scanreadtablefast(Par par,uint countcheck)3872 scanreadtablefast(Par par, uint countcheck)
3873 {
3874 Con& con = par.con();
3875 const Tab& tab = par.tab();
3876 LL3("scanfast " << tab.m_name);
3877 CHK(con.startTransaction() == 0);
3878 CHK(con.getNdbScanOperation(tab) == 0);
3879 CHK(con.readTuples(par) == 0);
3880 // get 1st column
3881 NdbRecAttr* rec;
3882 CHK(con.getValue((Uint32)0, rec) == 0);
3883 CHK(con.executeScan() == 0);
3884 uint count = 0;
3885 while (1) {
3886 int ret;
3887 CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
3888 if (ret == 1)
3889 break;
3890 count++;
3891 }
3892 con.closeTransaction();
3893 CHK(count == countcheck);
3894 return 0;
3895 }
3896
3897 // try to get interesting bounds
3898 static void
calcscanbounds(Par par,const ITab & itab,BSet & bset,const Set & set,Set & set1)3899 calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
3900 {
3901 while (true) {
3902 bset.calc(par);
3903 bset.filter(par, set, set1);
3904 uint n = set1.count();
3905 // prefer proper subset
3906 if (0 < n && n < set.m_rows)
3907 break;
3908 if (urandom(5) == 0)
3909 break;
3910 set1.reset();
3911 }
3912 }
3913
3914 static int
scanreadindex(Par par,const ITab & itab,BSet & bset,bool calc)3915 scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
3916 {
3917 Con& con = par.con();
3918 const Tab& tab = par.tab();
3919 const Set& set = par.set();
3920 Set set1(tab, set.m_rows);
3921 if (calc) {
3922 calcscanbounds(par, itab, bset, set, set1);
3923 } else {
3924 bset.filter(par, set, set1);
3925 }
3926 LL3("scanreadindex " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
3927 Set set2(tab, set.m_rows);
3928 CHK(con.startTransaction() == 0);
3929 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
3930 CHK(con.readIndexTuples(par) == 0);
3931 CHK(bset.setbnd(par) == 0);
3932 set2.getval(par);
3933 CHK(con.executeScan() == 0);
3934 uint n = 0;
3935 while (1) {
3936 int ret;
3937 uint err = par.m_catcherr;
3938 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3939 if (ret == 1)
3940 break;
3941 if (err) {
3942 LL1("scanreadindex stop on " << con.errname(err));
3943 break;
3944 }
3945 uint i = (uint)-1;
3946 CHK(set2.getkey(par, &i) == 0);
3947 CHK(set2.putval(i, par.m_dups, n) == 0);
3948 LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
3949 n++;
3950 }
3951 con.closeTransaction();
3952 if (par.m_verify) {
3953 CHK(set1.verify(par, set2, false) == 0);
3954 if (par.m_ordered)
3955 CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
3956 }
3957 LL3("scanreadindex " << itab.m_name << " done rows=" << n);
3958 return 0;
3959 }
3960
3961
3962 static int
scanreadindexmrr(Par par,const ITab & itab,int numBsets)3963 scanreadindexmrr(Par par, const ITab& itab, int numBsets)
3964 {
3965 Con& con = par.con();
3966 const Tab& tab = par.tab();
3967 const Set& set = par.set();
3968
3969 /* Create space for different sets of bounds, expected results and
3970 * results
3971 * Calculate bounds and the sets of rows which would result
3972 */
3973 BSet** boundSets;
3974 Set** expectedResults;
3975 Set** actualResults;
3976 uint* setSizes;
3977
3978 CHK((boundSets= (BSet**) malloc(numBsets * sizeof(BSet*))) != 0);
3979 CHK((expectedResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3980 CHK((actualResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3981 CHK((setSizes= (uint*) malloc(numBsets * sizeof(uint))) != 0);
3982
3983 for (int n=0; n < numBsets; n++)
3984 {
3985 CHK((boundSets[n]= new BSet(tab, itab)) != NULL );
3986 CHK((expectedResults[n]= new Set(tab, set.m_rows)) != NULL);
3987 CHK((actualResults[n]= new Set(tab, set.m_rows)) != NULL);
3988 setSizes[n]= 0;
3989
3990 Set& results= *expectedResults[n];
3991 /* Calculate some scan bounds which are selective */
3992 do {
3993 results.reset();
3994 calcscanbounds(par, itab, *boundSets[n], set, results);
3995 } while ((*boundSets[n]).m_bvals == 0);
3996 }
3997
3998 /* Define scan with bounds */
3999 LL3("scanreadindexmrr " << itab.m_name << " ranges= " << numBsets << " lockmode=" << par.m_lockmode << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
4000 Set set2(tab, set.m_rows);
4001 /* Multirange + Read range number for this scan */
4002 par.m_multiRange= true;
4003 CHK(con.startTransaction() == 0);
4004 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4005 CHK(con.readIndexTuples(par) == 0);
4006 /* Set the bounds */
4007 for (int n=0; n < numBsets; n++)
4008 {
4009 CHK(boundSets[n]->setbnd(par) == 0);
4010 int res= con.m_indexscanop->end_of_bound(n);
4011 if (res != 0)
4012 {
4013 LL1("end_of_bound error : " << con.m_indexscanop->getNdbError().code);
4014 CHK (false);
4015 }
4016 }
4017 set2.getval(par);
4018 CHK(con.executeScan() == 0);
4019 int rows_received= 0;
4020 while (1) {
4021 int ret;
4022 uint err = par.m_catcherr;
4023 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4024 if (ret == 1)
4025 break;
4026 if (err) {
4027 LL1("scanreadindexmrr stop on " << con.errname(err));
4028 break;
4029 }
4030 uint i = (uint)-1;
4031 /* Put value into set2 temporarily */
4032 CHK(set2.getkey(par, &i) == 0);
4033 CHK(set2.putval(i, false, -1) == 0);
4034
4035 /* Now move it to the correct set, based on the range no */
4036 int rangeNum= con.m_indexscanop->get_range_no();
4037 CHK(rangeNum < numBsets);
4038 CHK(set2.m_row[i] != NULL);
4039 /* Get rowNum based on what's in the set already (slow) */
4040 CHK(setSizes[rangeNum] == actualResults[rangeNum]->count());
4041 int rowNum= setSizes[rangeNum];
4042 setSizes[rangeNum] ++;
4043 CHK((uint) rowNum < set2.m_rows);
4044 actualResults[rangeNum]->m_row[i]= set2.m_row[i];
4045 actualResults[rangeNum]->m_rowkey[rowNum]= i;
4046 set2.m_row[i]= 0;
4047 LL4("range " << rangeNum << " key " << i << " row " << rowNum << " " << *set2.m_row[i]);
4048 rows_received++;
4049 }
4050 con.closeTransaction();
4051
4052 /* Verify that each set has the expected rows, and optionally, that
4053 * they're ordered
4054 */
4055 if (par.m_verify)
4056 {
4057 LL4("Verifying " << numBsets << " sets, " << rows_received << " rows");
4058 for (int n=0; n < numBsets; n++)
4059 {
4060 LL5("Set " << n << " of " << expectedResults[n]->count() << " rows");
4061 CHK(expectedResults[n]->verify(par, *actualResults[n], false) == 0);
4062 if (par.m_ordered)
4063 {
4064 LL5("Verifying ordering");
4065 CHK(actualResults[n]->verifyorder(par, itab, par.m_descending) == 0);
4066 }
4067 }
4068 }
4069
4070 /* Cleanup */
4071 for (int n=0; n < numBsets; n++)
4072 {
4073 boundSets[n]->reset();
4074 delete boundSets[n];
4075 delete expectedResults[n];
4076 delete actualResults[n];
4077 }
4078
4079 free(boundSets);
4080 free(expectedResults);
4081 free(actualResults);
4082 free(setSizes);
4083
4084 LL3("scanreadindexmrr " << itab.m_name << " done rows=" << rows_received);
4085 return 0;
4086 }
4087
4088 static int
scanreadindexfast(Par par,const ITab & itab,const BSet & bset,uint countcheck)4089 scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
4090 {
4091 Con& con = par.con();
4092 const Tab& tab = par.tab();
4093 LL3("scanfast " << itab.m_name << " " << bset);
4094 LL4(bset);
4095 CHK(con.startTransaction() == 0);
4096 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4097 CHK(con.readIndexTuples(par) == 0);
4098 CHK(bset.setbnd(par) == 0);
4099 // get 1st column
4100 NdbRecAttr* rec;
4101 CHK(con.getValue((Uint32)0, rec) == 0);
4102 CHK(con.executeScan() == 0);
4103 uint count = 0;
4104 while (1) {
4105 int ret;
4106 CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
4107 if (ret == 1)
4108 break;
4109 count++;
4110 }
4111 con.closeTransaction();
4112 CHK(count == countcheck);
4113 return 0;
4114 }
4115
4116 static int
scanreadfilter(Par par,const ITab & itab,BSet & bset,bool calc)4117 scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
4118 {
4119 Con& con = par.con();
4120 const Tab& tab = par.tab();
4121 const Set& set = par.set();
4122 Set set1(tab, set.m_rows);
4123 if (calc) {
4124 calcscanbounds(par, itab, bset, set, set1);
4125 } else {
4126 bset.filter(par, set, set1);
4127 }
4128 LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
4129 Set set2(tab, set.m_rows);
4130 CHK(con.startTransaction() == 0);
4131 CHK(con.getNdbScanOperation(tab) == 0);
4132 CHK(con.readTuples(par) == 0);
4133 CHK(bset.setflt(par) == 0);
4134 set2.getval(par);
4135 CHK(con.executeScan() == 0);
4136 uint n = 0;
4137 while (1) {
4138 int ret;
4139 uint err = par.m_catcherr;
4140 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4141 if (ret == 1)
4142 break;
4143 if (err) {
4144 LL1("scanfilter stop on " << con.errname(err));
4145 break;
4146 }
4147 uint i = (uint)-1;
4148 CHK(set2.getkey(par, &i) == 0);
4149 CHK(set2.putval(i, par.m_dups, n) == 0);
4150 LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
4151 n++;
4152 }
4153 con.closeTransaction();
4154 if (par.m_verify) {
4155 CHK(set1.verify(par, set2, false) == 0);
4156 }
4157 LL3("scanfilter " << itab.m_name << " done rows=" << n);
4158 return 0;
4159 }
4160
4161 static int
scanreadindex(Par par,const ITab & itab)4162 scanreadindex(Par par, const ITab& itab)
4163 {
4164 const Tab& tab = par.tab();
4165 for (uint i = 0; i < par.m_ssloop; i++) {
4166 if (itab.m_type == ITab::OrderedIndex) {
4167 BSet bset(tab, itab);
4168 CHK(scanreadfilter(par, itab, bset, true) == 0);
4169 /* Single range or Multi range scan */
4170 if (randompct(g_opt.m_pctmrr))
4171 CHK(scanreadindexmrr(par,
4172 itab,
4173 1+urandom(g_opt.m_mrrmaxrng-1)) == 0);
4174 else
4175 CHK(scanreadindex(par, itab, bset, true) == 0);
4176 }
4177 }
4178 return 0;
4179 }
4180
4181 static int
scanreadindex(Par par)4182 scanreadindex(Par par)
4183 {
4184 const Tab& tab = par.tab();
4185 for (uint i = 0; i < tab.m_itabs; i++) {
4186 if (tab.m_itab[i] == 0)
4187 continue;
4188 const ITab& itab = *tab.m_itab[i];
4189 if (itab.m_type == ITab::OrderedIndex) {
4190 CHK(scanreadindex(par, itab) == 0);
4191 } else {
4192 CHK(hashindexread(par, itab) == 0);
4193 }
4194 }
4195 return 0;
4196 }
4197
4198 #if 0
4199 static int
4200 scanreadall(Par par)
4201 {
4202 CHK(scanreadtable(par) == 0);
4203 CHK(scanreadindex(par) == 0);
4204 return 0;
4205 }
4206 #endif
4207
4208 // timing scans
4209
4210 static int
timescantable(Par par)4211 timescantable(Par par)
4212 {
4213 par.tmr().on();
4214 CHK(scanreadtablefast(par, par.m_totrows) == 0);
4215 par.tmr().off(par.set().m_rows);
4216 return 0;
4217 }
4218
4219 static int
timescanpkindex(Par par)4220 timescanpkindex(Par par)
4221 {
4222 const Tab& tab = par.tab();
4223 const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
4224 BSet bset(tab, itab);
4225 par.tmr().on();
4226 CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
4227 par.tmr().off(par.set().m_rows);
4228 return 0;
4229 }
4230
4231 static int
timepkreadtable(Par par)4232 timepkreadtable(Par par)
4233 {
4234 par.tmr().on();
4235 uint count = par.m_samples;
4236 if (count == 0)
4237 count = par.m_totrows;
4238 CHK(pkreadfast(par, count) == 0);
4239 par.tmr().off(count);
4240 return 0;
4241 }
4242
4243 static int
timepkreadindex(Par par)4244 timepkreadindex(Par par)
4245 {
4246 const Tab& tab = par.tab();
4247 const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
4248 BSet bset(tab, itab);
4249 uint count = par.m_samples;
4250 if (count == 0)
4251 count = par.m_totrows;
4252 par.tmr().on();
4253 for (uint j = 0; j < count; j++) {
4254 uint i = urandom(par.m_totrows);
4255 bset.calcpk(par, i);
4256 CHK(scanreadindexfast(par, itab, bset, 1) == 0);
4257 }
4258 par.tmr().off(count);
4259 return 0;
4260 }
4261
4262 // scan update
4263
4264 static int
scanupdatetable(Par par)4265 scanupdatetable(Par par)
4266 {
4267 Con& con = par.con();
4268 const Tab& tab = par.tab();
4269 Set& set = par.set();
4270 LL3("scanupdatetable " << tab.m_name);
4271 Set set2(tab, set.m_rows);
4272 par.m_lockmode = NdbOperation::LM_Exclusive;
4273 CHK(con.startTransaction() == 0);
4274 CHK(con.getNdbScanOperation(tab) == 0);
4275 CHK(con.readTuples(par) == 0);
4276 set2.getval(par);
4277 CHK(con.executeScan() == 0);
4278 uint count = 0;
4279 // updating trans
4280 Con con2;
4281 con2.connect(con);
4282 CHK(con2.startTransaction() == 0);
4283 uint batch = 0;
4284 while (1) {
4285 int ret;
4286 uint32 err = par.m_catcherr;
4287 CHK((ret = con.nextScanResult(true, err)) != -1);
4288 if (ret != 0)
4289 break;
4290 if (err) {
4291 LL1("scanupdatetable [scan] stop on " << con.errname(err));
4292 break;
4293 }
4294 if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4295 con.closeScan();
4296 break;
4297 }
4298 while (1) {
4299 uint i = (uint)-1;
4300 CHK(set2.getkey(par, &i) == 0);
4301 set.lock();
4302 if (!set.compat(par, i, Row::OpUpd)) {
4303 LL3("scanupdatetable SKIP " << i << " " << set.getrow(i));
4304 } else {
4305 CHKTRY(set2.putval(i, false) == 0, set.unlock());
4306 CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4307 Par par2 = par;
4308 par2.m_con = &con2;
4309 set.push(i);
4310 set.calc(par, i, ~tab.m_pkmask);
4311 CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4312 LL4("scanupdatetable " << i << " " << set.getrow(i));
4313 batch++;
4314 }
4315 set.unlock();
4316 CHK((ret = con.nextScanResult(false)) != -1);
4317 bool lastbatch = (batch != 0 && ret != 0);
4318 if (batch == par.m_batch || lastbatch) {
4319 uint err = par.m_catcherr;
4320 ExecType et = Commit;
4321 CHK(con2.execute(et, err) == 0);
4322 set.lock();
4323 set.post(par, !err ? et : Rollback);
4324 set.unlock();
4325 if (err) {
4326 LL1("scanupdatetable [update] stop on " << con2.errname(err));
4327 goto out;
4328 }
4329 LL4("scanupdatetable committed batch");
4330 count += batch;
4331 batch = 0;
4332 con2.closeTransaction();
4333 CHK(con2.startTransaction() == 0);
4334 }
4335 if (ret != 0)
4336 break;
4337 }
4338 }
4339 out:
4340 con2.closeTransaction();
4341 LL3("scanupdatetable " << tab.m_name << " rows updated=" << count);
4342 con.closeTransaction();
4343 return 0;
4344 }
4345
4346 static int
scanupdateindex(Par par,const ITab & itab,BSet & bset,bool calc)4347 scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
4348 {
4349 Con& con = par.con();
4350 const Tab& tab = par.tab();
4351 Set& set = par.set();
4352 // expected
4353 Set set1(tab, set.m_rows);
4354 if (calc) {
4355 calcscanbounds(par, itab, bset, set, set1);
4356 } else {
4357 bset.filter(par, set, set1);
4358 }
4359 LL3("scanupdateindex " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
4360 Set set2(tab, set.m_rows);
4361 par.m_lockmode = NdbOperation::LM_Exclusive;
4362 CHK(con.startTransaction() == 0);
4363 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4364 CHK(con.readTuples(par) == 0);
4365 CHK(bset.setbnd(par) == 0);
4366 set2.getval(par);
4367 CHK(con.executeScan() == 0);
4368 uint count = 0;
4369 // updating trans
4370 Con con2;
4371 con2.connect(con);
4372 CHK(con2.startTransaction() == 0);
4373 uint batch = 0;
4374 while (1) {
4375 int ret;
4376 uint err = par.m_catcherr;
4377 CHK((ret = con.nextScanResult(true, err)) != -1);
4378 if (ret != 0)
4379 break;
4380 if (err) {
4381 LL1("scanupdateindex [scan] stop on " << con.errname(err));
4382 break;
4383 }
4384 if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4385 con.closeScan();
4386 break;
4387 }
4388 while (1) {
4389 uint i = (uint)-1;
4390 CHK(set2.getkey(par, &i) == 0);
4391 set.lock();
4392 if (!set.compat(par, i, Row::OpUpd)) {
4393 LL4("scanupdateindex SKIP " << set.getrow(i));
4394 } else {
4395 CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
4396 CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4397 Par par2 = par;
4398 par2.m_con = &con2;
4399 set.push(i);
4400 uint colmask = !par.m_noindexkeyupdate ? ~0 : ~itab.m_keymask;
4401 set.calc(par, i, colmask);
4402 CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4403 LL4("scanupdateindex " << i << " " << set.getrow(i));
4404 batch++;
4405 }
4406 set.unlock();
4407 CHK((ret = con.nextScanResult(false)) != -1);
4408 bool lastbatch = (batch != 0 && ret != 0);
4409 if (batch == par.m_batch || lastbatch) {
4410 uint err = par.m_catcherr;
4411 ExecType et = Commit;
4412 CHK(con2.execute(et, err) == 0);
4413 set.lock();
4414 set.post(par, !err ? et : Rollback);
4415 set.unlock();
4416 if (err) {
4417 LL1("scanupdateindex [update] stop on " << con2.errname(err));
4418 goto out;
4419 }
4420 LL4("scanupdateindex committed batch");
4421 count += batch;
4422 batch = 0;
4423 con2.closeTransaction();
4424 CHK(con2.startTransaction() == 0);
4425 }
4426 if (ret != 0)
4427 break;
4428 }
4429 }
4430 out:
4431 con2.closeTransaction();
4432 if (par.m_verify) {
4433 CHK(set1.verify(par, set2, true) == 0);
4434 if (par.m_ordered)
4435 CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
4436 }
4437 LL3("scanupdateindex " << itab.m_name << " rows updated=" << count);
4438 con.closeTransaction();
4439 return 0;
4440 }
4441
4442 static int
scanupdateindex(Par par,const ITab & itab)4443 scanupdateindex(Par par, const ITab& itab)
4444 {
4445 const Tab& tab = par.tab();
4446 for (uint i = 0; i < par.m_ssloop; i++) {
4447 if (itab.m_type == ITab::OrderedIndex) {
4448 BSet bset(tab, itab);
4449 CHK(scanupdateindex(par, itab, bset, true) == 0);
4450 } else {
4451 CHK(hashindexupdate(par, itab) == 0);
4452 }
4453 }
4454 return 0;
4455 }
4456
4457 static int
scanupdateindex(Par par)4458 scanupdateindex(Par par)
4459 {
4460 const Tab& tab = par.tab();
4461 for (uint i = 0; i < tab.m_itabs; i++) {
4462 if (tab.m_itab[i] == 0)
4463 continue;
4464 const ITab& itab = *tab.m_itab[i];
4465 CHK(scanupdateindex(par, itab) == 0);
4466 }
4467 return 0;
4468 }
4469
4470 #if 0
4471 static int
4472 scanupdateall(Par par)
4473 {
4474 CHK(scanupdatetable(par) == 0);
4475 CHK(scanupdateindex(par) == 0);
4476 return 0;
4477 }
4478 #endif
4479
4480 // medium level routines
4481
4482 static int
readverifyfull(Par par)4483 readverifyfull(Par par)
4484 {
4485 if (par.m_noverify)
4486 return 0;
4487 par.m_verify = true;
4488 if (par.m_abortpct != 0) {
4489 LL2("skip verify in this version"); // implement in 5.0 version
4490 par.m_verify = false;
4491 }
4492 par.m_lockmode = NdbOperation::LM_CommittedRead;
4493 const Tab& tab = par.tab();
4494 if (par.m_no == 0) {
4495 // thread 0 scans table
4496 CHK(scanreadtable(par) == 0);
4497 // once more via tup scan
4498 par.m_tupscan = true;
4499 CHK(scanreadtable(par) == 0);
4500 }
4501 // each thread scans different indexes
4502 for (uint i = 0; i < tab.m_itabs; i++) {
4503 if (i % par.m_usedthreads != par.m_no)
4504 continue;
4505 if (tab.m_itab[i] == 0)
4506 continue;
4507 const ITab& itab = *tab.m_itab[i];
4508 if (itab.m_type == ITab::OrderedIndex) {
4509 BSet bset(tab, itab);
4510 CHK(scanreadindex(par, itab, bset, false) == 0);
4511 } else {
4512 CHK(hashindexread(par, itab) == 0);
4513 }
4514 }
4515 return 0;
4516 }
4517
4518 static int
readverifyindex(Par par)4519 readverifyindex(Par par)
4520 {
4521 if (par.m_noverify)
4522 return 0;
4523 par.m_verify = true;
4524 par.m_lockmode = NdbOperation::LM_CommittedRead;
4525 uint sel = urandom(10);
4526 if (sel < 9) {
4527 par.m_ordered = true;
4528 par.m_descending = (sel < 5);
4529 }
4530 CHK(scanreadindex(par) == 0);
4531 return 0;
4532 }
4533
4534 static int
pkops(Par par)4535 pkops(Par par)
4536 {
4537 const Tab& tab = par.tab();
4538 par.m_randomkey = true;
4539 for (uint i = 0; i < par.m_ssloop; i++) {
4540 uint j = 0;
4541 while (j < tab.m_itabs) {
4542 if (tab.m_itab[j] != 0) {
4543 const ITab& itab = *tab.m_itab[j];
4544 if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
4545 break;
4546 }
4547 j++;
4548 }
4549 uint sel = urandom(10);
4550 if (par.m_slno % 2 == 0) {
4551 // favor insert
4552 if (sel < 8) {
4553 CHK(pkinsert(par) == 0);
4554 } else if (sel < 9) {
4555 if (j == tab.m_itabs)
4556 CHK(pkupdate(par) == 0);
4557 else {
4558 const ITab& itab = *tab.m_itab[j];
4559 CHK(hashindexupdate(par, itab) == 0);
4560 }
4561 } else {
4562 if (j == tab.m_itabs)
4563 CHK(pkdelete(par) == 0);
4564 else {
4565 const ITab& itab = *tab.m_itab[j];
4566 CHK(hashindexdelete(par, itab) == 0);
4567 }
4568 }
4569 } else {
4570 // favor delete
4571 if (sel < 1) {
4572 CHK(pkinsert(par) == 0);
4573 } else if (sel < 2) {
4574 if (j == tab.m_itabs)
4575 CHK(pkupdate(par) == 0);
4576 else {
4577 const ITab& itab = *tab.m_itab[j];
4578 CHK(hashindexupdate(par, itab) == 0);
4579 }
4580 } else {
4581 if (j == tab.m_itabs)
4582 CHK(pkdelete(par) == 0);
4583 else {
4584 const ITab& itab = *tab.m_itab[j];
4585 CHK(hashindexdelete(par, itab) == 0);
4586 }
4587 }
4588 }
4589 }
4590 return 0;
4591 }
4592
4593 static int
pkupdatescanread(Par par)4594 pkupdatescanread(Par par)
4595 {
4596 par.m_dups = true;
4597 par.m_catcherr |= Con::ErrDeadlock;
4598 uint sel = urandom(10);
4599 if (sel < 5) {
4600 CHK(pkupdate(par) == 0);
4601 } else if (sel < 6) {
4602 par.m_verify = false;
4603 CHK(scanreadtable(par) == 0);
4604 } else {
4605 par.m_verify = false;
4606 if (sel < 8) {
4607 par.m_ordered = true;
4608 par.m_descending = (sel < 7);
4609 }
4610 CHK(scanreadindex(par) == 0);
4611 }
4612 return 0;
4613 }
4614
4615 static int
mixedoperations(Par par)4616 mixedoperations(Par par)
4617 {
4618 par.m_dups = true;
4619 par.m_catcherr |= Con::ErrDeadlock;
4620 par.m_scanstop = par.m_totrows; // randomly close scans
4621 uint sel = urandom(10);
4622 if (sel < 2) {
4623 CHK(pkdelete(par) == 0);
4624 } else if (sel < 4) {
4625 CHK(pkupdate(par) == 0);
4626 } else if (sel < 6) {
4627 CHK(scanupdatetable(par) == 0);
4628 } else {
4629 if (sel < 8) {
4630 par.m_ordered = true;
4631 par.m_descending = (sel < 7);
4632 }
4633 CHK(scanupdateindex(par) == 0);
4634 }
4635 return 0;
4636 }
4637
4638 static int
parallelorderedupdate(Par par)4639 parallelorderedupdate(Par par)
4640 {
4641 const Tab& tab = par.tab();
4642 uint k = 0;
4643 for (uint i = 0; i < tab.m_itabs; i++) {
4644 if (tab.m_itab[i] == 0)
4645 continue;
4646 const ITab& itab = *tab.m_itab[i];
4647 if (itab.m_type != ITab::OrderedIndex)
4648 continue;
4649 // cannot sync threads yet except via subloop
4650 if (k++ == par.m_slno % tab.m_orderedindexes) {
4651 LL3("parallelorderedupdate: " << itab.m_name);
4652 par.m_noindexkeyupdate = true;
4653 par.m_ordered = true;
4654 par.m_descending = (par.m_slno != 0);
4655 par.m_dups = false;
4656 par.m_verify = true;
4657 BSet bset(tab, itab); // empty bounds
4658 // prefer empty bounds
4659 uint sel = urandom(10);
4660 CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
4661 }
4662 }
4663 return 0;
4664 }
4665
4666 static int
pkupdateindexbuild(Par par)4667 pkupdateindexbuild(Par par)
4668 {
4669 if (par.m_no == 0) {
4670 NdbSleep_MilliSleep(10 + urandom(100));
4671 CHK(createindex(par) == 0);
4672 } else {
4673 NdbSleep_MilliSleep(10 + urandom(100));
4674 par.m_randomkey = true;
4675 CHK(pkupdate(par) == 0);
4676 }
4677 return 0;
4678 }
4679
4680 // savepoint tests (single thread for now)
4681
4682 struct Spt {
4683 enum Res { Committed, Latest, Deadlock };
4684 bool m_same; // same transaction
4685 NdbOperation::LockMode m_lm;
4686 Res m_res;
4687 };
4688
4689 static Spt sptlist[] = {
4690 { 1, NdbOperation::LM_Read, Spt::Latest },
4691 { 1, NdbOperation::LM_Exclusive, Spt::Latest },
4692 { 1, NdbOperation::LM_CommittedRead, Spt::Latest },
4693 { 0, NdbOperation::LM_Read, Spt::Deadlock },
4694 { 0, NdbOperation::LM_Exclusive, Spt::Deadlock },
4695 { 0, NdbOperation::LM_CommittedRead, Spt::Committed }
4696 };
4697 static uint sptcount = sizeof(sptlist)/sizeof(sptlist[0]);
4698
4699 static int
savepointreadpk(Par par,Spt spt)4700 savepointreadpk(Par par, Spt spt)
4701 {
4702 LL3("savepointreadpk");
4703 Con& con = par.con();
4704 const Tab& tab = par.tab();
4705 Set& set = par.set();
4706 const Set& set1 = set;
4707 Set set2(tab, set.m_rows);
4708 uint n = 0;
4709 for (uint i = 0; i < set.m_rows; i++) {
4710 set.lock();
4711 if (!set.compat(par, i, Row::OpREAD)) {
4712 LL4("savepointreadpk SKIP " << i << " " << set.getrow(i));
4713 set.unlock();
4714 continue;
4715 }
4716 set.unlock();
4717 CHK(set2.selrow(par, *set1.m_row[i]) == 0);
4718 uint err = par.m_catcherr | Con::ErrDeadlock;
4719 ExecType et = NoCommit;
4720 CHK(con.execute(et, err) == 0);
4721 if (err) {
4722 if (err & Con::ErrDeadlock) {
4723 CHK(spt.m_res == Spt::Deadlock);
4724 // all rows have same behaviour
4725 CHK(n == 0);
4726 }
4727 LL1("savepointreadpk stop on " << con.errname(err));
4728 break;
4729 }
4730 uint i2 = (uint)-1;
4731 CHK(set2.getkey(par, &i2) == 0 && i == i2);
4732 CHK(set2.putval(i, false) == 0);
4733 LL4("row " << set2.count() << " " << set2.getrow(i));
4734 n++;
4735 }
4736 bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4737 if (spt.m_res != Spt::Deadlock)
4738 CHK(set1.verify(par, set2, false, dirty) == 0);
4739 return 0;
4740 }
4741
4742 static int
savepointreadhashindex(Par par,Spt spt)4743 savepointreadhashindex(Par par, Spt spt)
4744 {
4745 if (spt.m_lm == NdbOperation::LM_CommittedRead && !spt.m_same) {
4746 LL1("skip hash index dirty read");
4747 return 0;
4748 }
4749 LL3("savepointreadhashindex");
4750 Con& con = par.con();
4751 const Tab& tab = par.tab();
4752 const ITab& itab = par.itab();
4753 Set& set = par.set();
4754 const Set& set1 = set;
4755 Set set2(tab, set.m_rows);
4756 uint n = 0;
4757 for (uint i = 0; i < set.m_rows; i++) {
4758 set.lock();
4759 if (!set.compat(par, i, Row::OpREAD)) {
4760 LL3("savepointreadhashindex SKIP " << i << " " << set.getrow(i));
4761 set.unlock();
4762 continue;
4763 }
4764 set.unlock();
4765 CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
4766 uint err = par.m_catcherr | Con::ErrDeadlock;
4767 ExecType et = NoCommit;
4768 CHK(con.execute(et, err) == 0);
4769 if (err) {
4770 if (err & Con::ErrDeadlock) {
4771 CHK(spt.m_res == Spt::Deadlock);
4772 // all rows have same behaviour
4773 CHK(n == 0);
4774 }
4775 LL1("savepointreadhashindex stop on " << con.errname(err));
4776 break;
4777 }
4778 uint i2 = (uint)-1;
4779 CHK(set2.getkey(par, &i2) == 0 && i == i2);
4780 CHK(set2.putval(i, false) == 0);
4781 LL4("row " << set2.count() << " " << *set2.m_row[i]);
4782 n++;
4783 }
4784 bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4785 if (spt.m_res != Spt::Deadlock)
4786 CHK(set1.verify(par, set2, false, dirty) == 0);
4787 return 0;
4788 }
4789
4790 static int
savepointscantable(Par par,Spt spt)4791 savepointscantable(Par par, Spt spt)
4792 {
4793 LL3("savepointscantable");
4794 Con& con = par.con();
4795 const Tab& tab = par.tab();
4796 const Set& set = par.set();
4797 const Set& set1 = set; // not modifying current set
4798 Set set2(tab, set.m_rows); // scan result
4799 CHK(con.getNdbScanOperation(tab) == 0);
4800 CHK(con.readTuples(par) == 0);
4801 set2.getval(par); // getValue all columns
4802 CHK(con.executeScan() == 0);
4803 bool deadlock = false;
4804 uint n = 0;
4805 while (1) {
4806 int ret;
4807 uint err = par.m_catcherr | Con::ErrDeadlock;
4808 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4809 if (ret == 1)
4810 break;
4811 if (err) {
4812 if (err & Con::ErrDeadlock) {
4813 CHK(spt.m_res == Spt::Deadlock);
4814 // all rows have same behaviour
4815 CHK(n == 0);
4816 deadlock = true;
4817 }
4818 LL1("savepointscantable stop on " << con.errname(err));
4819 break;
4820 }
4821 CHK(spt.m_res != Spt::Deadlock);
4822 uint i = (uint)-1;
4823 CHK(set2.getkey(par, &i) == 0);
4824 CHK(set2.putval(i, false, n) == 0);
4825 LL4("row " << n << " key " << i << " " << set2.getrow(i));
4826 n++;
4827 }
4828 if (set1.m_rows > 0) {
4829 if (!deadlock)
4830 CHK(spt.m_res != Spt::Deadlock);
4831 else
4832 CHK(spt.m_res == Spt::Deadlock);
4833 }
4834 LL2("savepointscantable " << n << " rows");
4835 bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4836 if (spt.m_res != Spt::Deadlock)
4837 CHK(set1.verify(par, set2, false, dirty) == 0);
4838 return 0;
4839 }
4840
4841 static int
savepointscanindex(Par par,Spt spt)4842 savepointscanindex(Par par, Spt spt)
4843 {
4844 LL3("savepointscanindex");
4845 Con& con = par.con();
4846 const Tab& tab = par.tab();
4847 const ITab& itab = par.itab();
4848 const Set& set = par.set();
4849 const Set& set1 = set;
4850 Set set2(tab, set.m_rows);
4851 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4852 CHK(con.readIndexTuples(par) == 0);
4853 set2.getval(par);
4854 CHK(con.executeScan() == 0);
4855 bool deadlock = false;
4856 uint n = 0;
4857 while (1) {
4858 int ret;
4859 uint err = par.m_catcherr | Con::ErrDeadlock;
4860 CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4861 if (ret == 1)
4862 break;
4863 if (err) {
4864 if (err & Con::ErrDeadlock) {
4865 CHK(spt.m_res == Spt::Deadlock);
4866 // all rows have same behaviour
4867 CHK(n == 0);
4868 deadlock = true;
4869 }
4870 LL1("savepointscanindex stop on " << con.errname(err));
4871 break;
4872 }
4873 CHK(spt.m_res != Spt::Deadlock);
4874 uint i = (uint)-1;
4875 CHK(set2.getkey(par, &i) == 0);
4876 CHK(set2.putval(i, par.m_dups, n) == 0);
4877 LL4("row " << n << " key " << i << " " << set2.getrow(i));
4878 n++;
4879 }
4880 if (set1.m_rows > 0) {
4881 if (!deadlock)
4882 CHK(spt.m_res != Spt::Deadlock);
4883 else
4884 CHK(spt.m_res == Spt::Deadlock);
4885 }
4886 LL2("savepointscanindex " << n << " rows");
4887 bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4888 if (spt.m_res != Spt::Deadlock)
4889 CHK(set1.verify(par, set2, false, dirty) == 0);
4890 return 0;
4891 }
4892
4893 typedef int (*SptFun)(Par, Spt);
4894
4895 static int
savepointtest(Par par,Spt spt,SptFun fun)4896 savepointtest(Par par, Spt spt, SptFun fun)
4897 {
4898 Con& con = par.con();
4899 Par par2 = par;
4900 Con con2;
4901 if (!spt.m_same) {
4902 con2.connect(con); // copy ndb reference
4903 par2.m_con = &con2;
4904 CHK(con2.startTransaction() == 0);
4905 }
4906 par2.m_lockmode = spt.m_lm;
4907 CHK((*fun)(par2, spt) == 0);
4908 if (!spt.m_same) {
4909 con2.closeTransaction();
4910 }
4911 return 0;
4912 }
4913
4914 static int
savepointtest(Par par,const char * op)4915 savepointtest(Par par, const char* op)
4916 {
4917 Con& con = par.con();
4918 const Tab& tab = par.tab();
4919 Set& set = par.set();
4920 LL2("savepointtest op=\"" << op << "\"");
4921 CHK(con.startTransaction() == 0);
4922 const char* p = op;
4923 char c;
4924 while ((c = *p++) != 0) {
4925 uint j;
4926 for (j = 0; j < par.m_rows; j++) {
4927 uint i = thrrow(par, j);
4928 if (c == 'c') {
4929 ExecType et = Commit;
4930 CHK(con.execute(et) == 0);
4931 set.lock();
4932 set.post(par, et);
4933 set.unlock();
4934 CHK(con.startTransaction() == 0);
4935 } else {
4936 set.lock();
4937 set.push(i);
4938 if (c == 'i') {
4939 set.calc(par, i);
4940 CHK(set.insrow(par, i) == 0);
4941 } else if (c == 'u') {
4942 set.copyval(i, tab.m_pkmask);
4943 set.calc(par, i, ~tab.m_pkmask);
4944 CHK(set.updrow(par, i) == 0);
4945 } else if (c == 'd') {
4946 set.copyval(i, tab.m_pkmask);
4947 CHK(set.delrow(par, i) == 0);
4948 } else {
4949 assert(false);
4950 }
4951 set.unlock();
4952 }
4953 }
4954 }
4955 {
4956 ExecType et = NoCommit;
4957 CHK(con.execute(et) == 0);
4958 set.lock();
4959 set.post(par, et);
4960 set.unlock();
4961 }
4962 for (uint k = 0; k < sptcount; k++) {
4963 Spt spt = sptlist[k];
4964 LL2("spt lm=" << spt.m_lm << " same=" << spt.m_same);
4965 CHK(savepointtest(par, spt, &savepointreadpk) == 0);
4966 CHK(savepointtest(par, spt, &savepointscantable) == 0);
4967 for (uint i = 0; i < tab.m_itabs; i++) {
4968 if (tab.m_itab[i] == 0)
4969 continue;
4970 const ITab& itab = *tab.m_itab[i];
4971 par.m_itab = &itab;
4972 if (itab.m_type == ITab::OrderedIndex)
4973 CHK(savepointtest(par, spt, &savepointscanindex) == 0);
4974 else
4975 CHK(savepointtest(par, spt, &savepointreadhashindex) == 0);
4976 par.m_itab = 0;
4977 }
4978 }
4979 {
4980 ExecType et = Rollback;
4981 CHK(con.execute(et) == 0);
4982 set.lock();
4983 set.post(par, et);
4984 set.unlock();
4985 }
4986 con.closeTransaction();
4987 return 0;
4988 }
4989
4990 static int
savepointtest(Par par)4991 savepointtest(Par par)
4992 {
4993 assert(par.m_usedthreads == 1);
4994 const char* oplist[] = {
4995 // each based on previous and "c" not last
4996 "i",
4997 "icu",
4998 "uuuuu",
4999 "d",
5000 "dciuuuuud",
5001 0
5002 };
5003 int i;
5004 for (i = 0; oplist[i] != 0; i++) {
5005 CHK(savepointtest(par, oplist[i]) == 0);
5006 }
5007 return 0;
5008 }
5009
5010 static int
halloweentest(Par par,const ITab & itab)5011 halloweentest(Par par, const ITab& itab)
5012 {
5013 LL2("halloweentest " << itab.m_name);
5014 Con& con = par.con();
5015 const Tab& tab = par.tab();
5016 Set& set = par.set();
5017 CHK(con.startTransaction() == 0);
5018 // insert 1 row
5019 uint i = 0;
5020 set.push(i);
5021 set.calc(par, i);
5022 CHK(set.insrow(par, i) == 0);
5023 CHK(con.execute(NoCommit) == 0);
5024 // scan via index until Set m_rows reached
5025 uint scancount = 0;
5026 bool stop = false;
5027 while (!stop) {
5028 par.m_lockmode = // makes no difference
5029 scancount % 2 == 0 ? NdbOperation::LM_CommittedRead :
5030 NdbOperation::LM_Read;
5031 Set set1(tab, set.m_rows); // expected scan result
5032 Set set2(tab, set.m_rows); // actual scan result
5033 BSet bset(tab, itab);
5034 calcscanbounds(par, itab, bset, set, set1);
5035 CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
5036 CHK(con.readIndexTuples(par) == 0);
5037 CHK(bset.setbnd(par) == 0);
5038 set2.getval(par);
5039 CHK(con.executeScan() == 0);
5040 const uint savepoint = i;
5041 LL3("scancount=" << scancount << " savepoint=" << savepoint);
5042 uint n = 0;
5043 while (1) {
5044 int ret;
5045 CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
5046 if (ret == 1)
5047 break;
5048 uint k = (uint)-1;
5049 CHK(set2.getkey(par, &k) == 0);
5050 CHK(set2.putval(k, false, n) == 0);
5051 LL3("row=" << n << " key=" << k);
5052 CHK(k <= savepoint);
5053 if (++i == set.m_rows) {
5054 stop = true;
5055 break;
5056 }
5057 set.push(i);
5058 set.calc(par, i);
5059 CHK(set.insrow(par, i) == 0);
5060 CHK(con.execute(NoCommit) == 0);
5061 n++;
5062 }
5063 con.closeScan();
5064 LL3("scanrows=" << n);
5065 if (!stop) {
5066 CHK(set1.verify(par, set2, false) == 0);
5067 }
5068 scancount++;
5069 }
5070 CHK(con.execute(Commit) == 0);
5071 set.post(par, Commit);
5072 assert(set.count() == set.m_rows);
5073 CHK(pkdelete(par) == 0);
5074 return 0;
5075 }
5076
5077 static int
halloweentest(Par par)5078 halloweentest(Par par)
5079 {
5080 assert(par.m_usedthreads == 1);
5081 const Tab& tab = par.tab();
5082 for (uint i = 0; i < tab.m_itabs; i++) {
5083 if (tab.m_itab[i] == 0)
5084 continue;
5085 const ITab& itab = *tab.m_itab[i];
5086 if (itab.m_type == ITab::OrderedIndex)
5087 CHK(halloweentest(par, itab) == 0);
5088 }
5089 return 0;
5090 }
5091
5092 // threads
5093
5094 typedef int (*TFunc)(Par par);
5095 enum TMode { ST = 1, MT = 2 };
5096
5097 extern "C" { static void* runthread(void* arg); }
5098
5099 struct Thr {
5100 enum State { Wait, Start, Stop, Exit };
5101 State m_state;
5102 Par m_par;
5103 pthread_t m_id;
5104 NdbThread* m_thread;
5105 NdbMutex* m_mutex;
5106 NdbCondition* m_cond;
5107 TFunc m_func;
5108 int m_ret;
5109 void* m_status;
5110 char m_tmp[20]; // used for debug msg prefix
5111 Thr(Par par, uint n);
5112 ~Thr();
5113 int run();
5114 void start();
5115 void stop();
5116 void exit();
5117 //
lockThr5118 void lock() {
5119 NdbMutex_Lock(m_mutex);
5120 }
unlockThr5121 void unlock() {
5122 NdbMutex_Unlock(m_mutex);
5123 }
waitThr5124 void wait() {
5125 NdbCondition_Wait(m_cond, m_mutex);
5126 }
signalThr5127 void signal() {
5128 NdbCondition_Signal(m_cond);
5129 }
joinThr5130 void join() {
5131 NdbThread_WaitFor(m_thread, &m_status);
5132 m_thread = 0;
5133 }
5134 };
5135
Thr(Par par,uint n)5136 Thr::Thr(Par par, uint n) :
5137 m_state(Wait),
5138 m_par(par),
5139 m_thread(0),
5140 m_mutex(0),
5141 m_cond(0),
5142 m_func(0),
5143 m_ret(0),
5144 m_status(0)
5145 {
5146 m_par.m_no = n;
5147 char buf[10];
5148 sprintf(buf, "thr%03u", par.m_no);
5149 const char* name = strcpy(new char[10], buf);
5150 // mutex
5151 m_mutex = NdbMutex_Create();
5152 m_cond = NdbCondition_Create();
5153 assert(m_mutex != 0 && m_cond != 0);
5154 // run
5155 const uint stacksize = 256 * 1024;
5156 const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
5157 m_thread = NdbThread_Create(runthread, (void**)this, stacksize, name, prio);
5158 }
5159
~Thr()5160 Thr::~Thr()
5161 {
5162 if (m_thread != 0) {
5163 NdbThread_Destroy(&m_thread);
5164 m_thread = 0;
5165 }
5166 if (m_cond != 0) {
5167 NdbCondition_Destroy(m_cond);
5168 m_cond = 0;
5169 }
5170 if (m_mutex != 0) {
5171 NdbMutex_Destroy(m_mutex);
5172 m_mutex = 0;
5173 }
5174 }
5175
5176 static void*
runthread(void * arg)5177 runthread(void* arg)
5178 {
5179 Thr& thr = *(Thr*)arg;
5180 thr.m_id = pthread_self();
5181 if (thr.run() < 0) {
5182 LL1("exit on error");
5183 } else {
5184 LL4("exit ok");
5185 }
5186 return 0;
5187 }
5188
5189 int
run()5190 Thr::run()
5191 {
5192 LL4("run");
5193 Con con;
5194 CHK(con.connect() == 0);
5195 m_par.m_con = &con;
5196 LL4("connected");
5197 while (1) {
5198 lock();
5199 while (m_state != Start && m_state != Exit) {
5200 LL4("wait");
5201 wait();
5202 }
5203 if (m_state == Exit) {
5204 LL4("exit");
5205 unlock();
5206 break;
5207 }
5208 LL4("start");
5209 assert(m_state == Start);
5210 m_ret = (*m_func)(m_par);
5211 m_state = Stop;
5212 LL4("stop");
5213 signal();
5214 unlock();
5215 if (m_ret == -1) {
5216 if (m_par.m_cont)
5217 LL1("continue running due to -cont");
5218 else
5219 return -1;
5220 }
5221 }
5222 con.disconnect();
5223 return 0;
5224 }
5225
5226 void
start()5227 Thr::start()
5228 {
5229 lock();
5230 m_state = Start;
5231 signal();
5232 unlock();
5233 }
5234
5235 void
stop()5236 Thr::stop()
5237 {
5238 lock();
5239 while (m_state != Stop)
5240 wait();
5241 m_state = Wait;
5242 unlock();
5243 }
5244
5245 void
exit()5246 Thr::exit()
5247 {
5248 lock();
5249 m_state = Exit;
5250 signal();
5251 unlock();
5252 }
5253
5254 // test run
5255
5256 static Thr** g_thrlist = 0;
5257
5258 static Thr*
getthr()5259 getthr()
5260 {
5261 if (g_thrlist != 0) {
5262 pthread_t id = pthread_self();
5263 for (uint n = 0; n < g_opt.m_threads; n++) {
5264 if (g_thrlist[n] != 0) {
5265 Thr& thr = *g_thrlist[n];
5266 if (pthread_equal(thr.m_id, id))
5267 return &thr;
5268 }
5269 }
5270 }
5271 return 0;
5272 }
5273
5274 // for debug messages (par.m_no not available)
5275 static const char*
getthrprefix()5276 getthrprefix()
5277 {
5278 Thr* thrp = getthr();
5279 if (thrp != 0) {
5280 Thr& thr = *thrp;
5281 uint n = thr.m_par.m_no;
5282 uint m =
5283 g_opt.m_threads < 10 ? 1 :
5284 g_opt.m_threads < 100 ? 2 : 3;
5285 sprintf(thr.m_tmp, "[%0*u] ", m, n);
5286 return thr.m_tmp;
5287 }
5288 return "";
5289 }
5290
5291 static int
runstep(Par par,const char * fname,TFunc func,uint mode)5292 runstep(Par par, const char* fname, TFunc func, uint mode)
5293 {
5294 LL2("step: " << fname);
5295 const int threads = (mode & ST ? 1 : par.m_usedthreads);
5296 int n;
5297 for (n = 0; n < threads; n++) {
5298 LL4("start " << n);
5299 Thr& thr = *g_thrlist[n];
5300 Par oldpar = thr.m_par;
5301 // update parameters
5302 thr.m_par = par;
5303 thr.m_par.m_no = oldpar.m_no;
5304 thr.m_par.m_con = oldpar.m_con;
5305 thr.m_func = func;
5306 thr.start();
5307 }
5308 uint errs = 0;
5309 for (n = threads - 1; n >= 0; n--) {
5310 LL4("stop " << n);
5311 Thr& thr = *g_thrlist[n];
5312 thr.stop();
5313 if (thr.m_ret != 0)
5314 errs++;
5315 }
5316 CHK(errs == 0);
5317 return 0;
5318 }
5319
5320 #define RUNSTEP(par, func, mode) \
5321 CHK(runstep(par, #func, func, mode) == 0)
5322
5323 #define SUBLOOP(par) \
5324 "sloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
5325 par.m_tab->m_name << "/" << par.m_slno
5326
5327 static int
tbuild(Par par)5328 tbuild(Par par)
5329 {
5330 RUNSTEP(par, droptable, ST);
5331 RUNSTEP(par, createtable, ST);
5332 RUNSTEP(par, invalidatetable, MT);
5333 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5334 LL1(SUBLOOP(par));
5335 if (par.m_slno % 3 == 0) {
5336 RUNSTEP(par, createindex, ST);
5337 RUNSTEP(par, invalidateindex, MT);
5338 RUNSTEP(par, pkinsert, MT);
5339 RUNSTEP(par, pkupdate, MT);
5340 } else if (par.m_slno % 3 == 1) {
5341 RUNSTEP(par, pkinsert, MT);
5342 RUNSTEP(par, createindex, ST);
5343 RUNSTEP(par, invalidateindex, MT);
5344 RUNSTEP(par, pkupdate, MT);
5345 } else {
5346 RUNSTEP(par, pkinsert, MT);
5347 RUNSTEP(par, pkupdate, MT);
5348 RUNSTEP(par, createindex, ST);
5349 RUNSTEP(par, invalidateindex, MT);
5350 }
5351 RUNSTEP(par, readverifyfull, MT);
5352 // leave last one
5353 if (par.m_slno + 1 < par.m_sloop) {
5354 RUNSTEP(par, pkdelete, MT);
5355 RUNSTEP(par, readverifyfull, MT);
5356 RUNSTEP(par, dropindex, ST);
5357 }
5358 }
5359 return 0;
5360 }
5361
5362 static int
tindexscan(Par par)5363 tindexscan(Par par)
5364 {
5365 RUNSTEP(par, droptable, ST);
5366 RUNSTEP(par, createtable, ST);
5367 RUNSTEP(par, invalidatetable, MT);
5368 RUNSTEP(par, createindex, ST);
5369 RUNSTEP(par, invalidateindex, MT);
5370 RUNSTEP(par, pkinsert, MT);
5371 RUNSTEP(par, readverifyfull, MT);
5372 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5373 LL1(SUBLOOP(par));
5374 RUNSTEP(par, readverifyindex, MT);
5375 }
5376 return 0;
5377 }
5378
5379
5380 static int
tpkops(Par par)5381 tpkops(Par par)
5382 {
5383 RUNSTEP(par, droptable, ST);
5384 RUNSTEP(par, createtable, ST);
5385 RUNSTEP(par, invalidatetable, MT);
5386 RUNSTEP(par, createindex, ST);
5387 RUNSTEP(par, invalidateindex, MT);
5388 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5389 LL1(SUBLOOP(par));
5390 RUNSTEP(par, pkops, MT);
5391 LL2("rows=" << par.set().count());
5392 RUNSTEP(par, readverifyfull, MT);
5393 }
5394 return 0;
5395 }
5396
5397 static int
tpkopsread(Par par)5398 tpkopsread(Par par)
5399 {
5400 RUNSTEP(par, droptable, ST);
5401 RUNSTEP(par, createtable, ST);
5402 RUNSTEP(par, invalidatetable, MT);
5403 RUNSTEP(par, pkinsert, MT);
5404 RUNSTEP(par, createindex, ST);
5405 RUNSTEP(par, invalidateindex, MT);
5406 RUNSTEP(par, readverifyfull, MT);
5407 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5408 LL1(SUBLOOP(par));
5409 RUNSTEP(par, pkupdatescanread, MT);
5410 RUNSTEP(par, readverifyfull, MT);
5411 }
5412 RUNSTEP(par, pkdelete, MT);
5413 RUNSTEP(par, readverifyfull, MT);
5414 return 0;
5415 }
5416
5417 static int
tmixedops(Par par)5418 tmixedops(Par par)
5419 {
5420 RUNSTEP(par, droptable, ST);
5421 RUNSTEP(par, createtable, ST);
5422 RUNSTEP(par, invalidatetable, MT);
5423 RUNSTEP(par, pkinsert, MT);
5424 RUNSTEP(par, createindex, ST);
5425 RUNSTEP(par, invalidateindex, MT);
5426 RUNSTEP(par, readverifyfull, MT);
5427 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5428 LL1(SUBLOOP(par));
5429 RUNSTEP(par, mixedoperations, MT);
5430 RUNSTEP(par, readverifyfull, MT);
5431 }
5432 return 0;
5433 }
5434
5435 static int
tbusybuild(Par par)5436 tbusybuild(Par par)
5437 {
5438 RUNSTEP(par, droptable, ST);
5439 RUNSTEP(par, createtable, ST);
5440 RUNSTEP(par, invalidatetable, MT);
5441 RUNSTEP(par, pkinsert, MT);
5442 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5443 LL1(SUBLOOP(par));
5444 RUNSTEP(par, pkupdateindexbuild, MT);
5445 RUNSTEP(par, invalidateindex, MT);
5446 RUNSTEP(par, readverifyfull, MT);
5447 RUNSTEP(par, dropindex, ST);
5448 }
5449 return 0;
5450 }
5451
5452 static int
trollback(Par par)5453 trollback(Par par)
5454 {
5455 par.m_abortpct = 50;
5456 RUNSTEP(par, droptable, ST);
5457 RUNSTEP(par, createtable, ST);
5458 RUNSTEP(par, invalidatetable, MT);
5459 RUNSTEP(par, pkinsert, MT);
5460 RUNSTEP(par, createindex, ST);
5461 RUNSTEP(par, invalidateindex, MT);
5462 RUNSTEP(par, readverifyfull, MT);
5463 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5464 LL1(SUBLOOP(par));
5465 RUNSTEP(par, mixedoperations, MT);
5466 RUNSTEP(par, readverifyfull, MT);
5467 }
5468 return 0;
5469 }
5470
5471 static int
tparupdate(Par par)5472 tparupdate(Par par)
5473 {
5474 RUNSTEP(par, droptable, ST);
5475 RUNSTEP(par, createtable, ST);
5476 RUNSTEP(par, invalidatetable, MT);
5477 RUNSTEP(par, pkinsert, MT);
5478 RUNSTEP(par, createindex, ST);
5479 RUNSTEP(par, invalidateindex, MT);
5480 RUNSTEP(par, readverifyfull, MT);
5481 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5482 LL1(SUBLOOP(par));
5483 RUNSTEP(par, parallelorderedupdate, MT);
5484 RUNSTEP(par, readverifyfull, MT);
5485 }
5486 return 0;
5487 }
5488
5489 static int
tsavepoint(Par par)5490 tsavepoint(Par par)
5491 {
5492 RUNSTEP(par, droptable, ST);
5493 RUNSTEP(par, createtable, ST);
5494 RUNSTEP(par, invalidatetable, MT);
5495 RUNSTEP(par, createindex, ST);
5496 RUNSTEP(par, invalidateindex, MT);
5497 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5498 LL1(SUBLOOP(par));
5499 RUNSTEP(par, savepointtest, MT);
5500 RUNSTEP(par, readverifyfull, MT);
5501 }
5502 return 0;
5503 }
5504
5505 static int
thalloween(Par par)5506 thalloween(Par par)
5507 {
5508 RUNSTEP(par, droptable, ST);
5509 RUNSTEP(par, createtable, ST);
5510 RUNSTEP(par, invalidatetable, MT);
5511 RUNSTEP(par, createindex, ST);
5512 RUNSTEP(par, invalidateindex, MT);
5513 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5514 LL1(SUBLOOP(par));
5515 RUNSTEP(par, halloweentest, MT);
5516 }
5517 return 0;
5518 }
5519
5520 static int
ttimebuild(Par par)5521 ttimebuild(Par par)
5522 {
5523 Tmr t1;
5524 RUNSTEP(par, droptable, ST);
5525 RUNSTEP(par, createtable, ST);
5526 RUNSTEP(par, invalidatetable, MT);
5527 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5528 LL1(SUBLOOP(par));
5529 RUNSTEP(par, pkinsert, MT);
5530 t1.on();
5531 RUNSTEP(par, createindex, ST);
5532 t1.off(par.m_totrows);
5533 RUNSTEP(par, invalidateindex, MT);
5534 RUNSTEP(par, dropindex, ST);
5535 }
5536 LL1("build index - " << t1.time());
5537 return 0;
5538 }
5539
5540 static int
ttimemaint(Par par)5541 ttimemaint(Par par)
5542 {
5543 Tmr t1, t2;
5544 RUNSTEP(par, droptable, ST);
5545 RUNSTEP(par, createtable, ST);
5546 RUNSTEP(par, invalidatetable, MT);
5547 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5548 LL1(SUBLOOP(par));
5549 RUNSTEP(par, pkinsert, MT);
5550 t1.on();
5551 RUNSTEP(par, pkupdate, MT);
5552 t1.off(par.m_totrows);
5553 RUNSTEP(par, createindex, ST);
5554 RUNSTEP(par, invalidateindex, MT);
5555 t2.on();
5556 RUNSTEP(par, pkupdate, MT);
5557 t2.off(par.m_totrows);
5558 RUNSTEP(par, dropindex, ST);
5559 }
5560 LL1("update - " << t1.time());
5561 LL1("update indexed - " << t2.time());
5562 LL1("overhead - " << t2.over(t1));
5563 return 0;
5564 }
5565
5566 static int
ttimescan(Par par)5567 ttimescan(Par par)
5568 {
5569 if (par.tab().m_itab[0] == 0) {
5570 LL1("ttimescan - no index 0, skipped");
5571 return 0;
5572 }
5573 Tmr t1, t2;
5574 RUNSTEP(par, droptable, ST);
5575 RUNSTEP(par, createtable, ST);
5576 RUNSTEP(par, invalidatetable, MT);
5577 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5578 LL1(SUBLOOP(par));
5579 RUNSTEP(par, pkinsert, MT);
5580 RUNSTEP(par, createindex, ST);
5581 par.m_tmr = &t1;
5582 RUNSTEP(par, timescantable, ST);
5583 par.m_tmr = &t2;
5584 RUNSTEP(par, timescanpkindex, ST);
5585 RUNSTEP(par, dropindex, ST);
5586 }
5587 LL1("full scan table - " << t1.time());
5588 LL1("full scan PK index - " << t2.time());
5589 LL1("overhead - " << t2.over(t1));
5590 return 0;
5591 }
5592
5593 static int
ttimepkread(Par par)5594 ttimepkread(Par par)
5595 {
5596 if (par.tab().m_itab[0] == 0) {
5597 LL1("ttimescan - no index 0, skipped");
5598 return 0;
5599 }
5600 Tmr t1, t2;
5601 RUNSTEP(par, droptable, ST);
5602 RUNSTEP(par, createtable, ST);
5603 RUNSTEP(par, invalidatetable, MT);
5604 for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5605 LL1(SUBLOOP(par));
5606 RUNSTEP(par, pkinsert, MT);
5607 RUNSTEP(par, createindex, ST);
5608 par.m_tmr = &t1;
5609 RUNSTEP(par, timepkreadtable, ST);
5610 par.m_tmr = &t2;
5611 RUNSTEP(par, timepkreadindex, ST);
5612 RUNSTEP(par, dropindex, ST);
5613 }
5614 LL1("pk read table - " << t1.time());
5615 LL1("pk read PK index - " << t2.time());
5616 LL1("overhead - " << t2.over(t1));
5617 return 0;
5618 }
5619
5620 static int
tdrop(Par par)5621 tdrop(Par par)
5622 {
5623 RUNSTEP(par, droptable, ST);
5624 return 0;
5625 }
5626
5627 struct TCase {
5628 const char* m_name;
5629 TFunc m_func;
5630 const char* m_desc;
TCaseTCase5631 TCase(const char* name, TFunc func, const char* desc) :
5632 m_name(name),
5633 m_func(func),
5634 m_desc(desc) {
5635 }
5636 };
5637
5638 static const TCase
5639 tcaselist[] = {
5640 TCase("a", tbuild, "index build"),
5641 TCase("b", tindexscan, "index scans"),
5642 TCase("c", tpkops, "pk operations"),
5643 TCase("d", tpkopsread, "pk operations and scan reads"),
5644 TCase("e", tmixedops, "pk operations and scan operations"),
5645 TCase("f", tbusybuild, "pk operations and index build"),
5646 TCase("g", trollback, "operations with random rollbacks"),
5647 TCase("h", tparupdate, "parallel ordered update bug#20446"),
5648 TCase("i", tsavepoint, "savepoint test locking bug#31477"),
5649 TCase("j", thalloween, "savepoint test halloween problem"),
5650 TCase("t", ttimebuild, "time index build"),
5651 TCase("u", ttimemaint, "time index maintenance"),
5652 TCase("v", ttimescan, "time full scan table vs index on pk"),
5653 TCase("w", ttimepkread, "time pk read table vs index on pk"),
5654 TCase("z", tdrop, "drop test tables")
5655 };
5656
5657 static const uint
5658 tcasecount = sizeof(tcaselist) / sizeof(tcaselist[0]);
5659
5660 static void
printcases()5661 printcases()
5662 {
5663 ndbout << "test cases:" << endl;
5664 for (uint i = 0; i < tcasecount; i++) {
5665 const TCase& tcase = tcaselist[i];
5666 ndbout << " " << tcase.m_name << " - " << tcase.m_desc << endl;
5667 }
5668 }
5669
5670 static void
printtables()5671 printtables()
5672 {
5673 Par par(g_opt);
5674 makebuiltintables(par);
5675 ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
5676 for (uint j = 0; j < tabcount; j++) {
5677 if (tablist[j] == 0)
5678 continue;
5679 const Tab& tab = *tablist[j];
5680 const char* tname = tab.m_name;
5681 ndbout << " " << tname;
5682 for (uint i = 0; i < tab.m_itabs; i++) {
5683 if (tab.m_itab[i] == 0)
5684 continue;
5685 const ITab& itab = *tab.m_itab[i];
5686 const char* iname = itab.m_name;
5687 if (strncmp(tname, iname, strlen(tname)) == 0)
5688 iname += strlen(tname);
5689 ndbout << " " << iname;
5690 ndbout << "(";
5691 for (uint k = 0; k < itab.m_icols; k++) {
5692 if (k != 0)
5693 ndbout << ",";
5694 const ICol& icol = *itab.m_icol[k];
5695 const Col& col = icol.m_col;
5696 ndbout << col.m_name;
5697 }
5698 ndbout << ")";
5699 }
5700 ndbout << endl;
5701 }
5702 }
5703
5704 static bool
setcasepar(Par & par)5705 setcasepar(Par& par)
5706 {
5707 Opt d;
5708 const char* c = par.m_currcase;
5709 switch (c[0]) {
5710 case 'i':
5711 {
5712 if (par.m_usedthreads > 1) {
5713 par.m_usedthreads = 1;
5714 LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5715 }
5716 const uint rows = 100;
5717 if (par.m_rows > rows) {
5718 par.m_rows = rows;
5719 LL1("case " << c << " reduce rows to " << rows);
5720 }
5721 }
5722 break;
5723 case 'j':
5724 {
5725 if (par.m_usedthreads > 1) {
5726 par.m_usedthreads = 1;
5727 LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5728 }
5729 }
5730 break;
5731 default:
5732 break;
5733 }
5734 return true;
5735 }
5736
5737 static int
runtest(Par par)5738 runtest(Par par)
5739 {
5740 int totret = 0;
5741 if (par.m_seed == -1) {
5742 // good enough for daily run
5743 ushort seed = (ushort)getpid();
5744 LL0("random seed: " << seed);
5745 srandom((uint)seed);
5746 } else if (par.m_seed != 0) {
5747 LL0("random seed: " << par.m_seed);
5748 srandom(par.m_seed);
5749 } else {
5750 LL0("random seed: loop number");
5751 }
5752 // cs
5753 assert(par.m_csname != 0);
5754 if (strcmp(par.m_csname, "random") != 0) {
5755 CHARSET_INFO* cs;
5756 CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
5757 par.m_cs = cs;
5758 }
5759 // con
5760 Con con;
5761 CHK(con.connect() == 0);
5762 par.m_con = &con;
5763 par.m_catcherr |= Con::ErrNospace;
5764 // threads
5765 g_thrlist = new Thr* [par.m_threads];
5766 uint n;
5767 for (n = 0; n < par.m_threads; n++) {
5768 g_thrlist[n] = 0;
5769 }
5770 for (n = 0; n < par.m_threads; n++) {
5771 g_thrlist[n] = new Thr(par, n);
5772 Thr& thr = *g_thrlist[n];
5773 assert(thr.m_thread != 0);
5774 }
5775 for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
5776 LL1("loop: " << par.m_lno);
5777 if (par.m_seed == 0) {
5778 LL1("random seed: " << par.m_lno);
5779 srandom(par.m_lno);
5780 }
5781 for (uint i = 0; i < tcasecount; i++) {
5782 const TCase& tcase = tcaselist[i];
5783 if ((par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) ||
5784 (par.m_skip != 0 && strchr(par.m_skip, tcase.m_name[0]) != 0)) {
5785 continue;
5786 }
5787 sprintf(par.m_currcase, "%c", tcase.m_name[0]);
5788 par.m_usedthreads = par.m_threads;
5789 if (!setcasepar(par)) {
5790 LL1("case " << tcase.m_name << " cannot run with given options");
5791 continue;
5792 }
5793 par.m_totrows = par.m_usedthreads * par.m_rows;
5794 makebuiltintables(par);
5795 LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
5796 for (uint j = 0; j < tabcount; j++) {
5797 if (tablist[j] == 0)
5798 continue;
5799 const Tab& tab = *tablist[j];
5800 par.m_tab = &tab;
5801 par.m_set = new Set(tab, par.m_totrows);
5802 LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
5803 int ret = tcase.m_func(par);
5804 delete par.m_set;
5805 par.m_set = 0;
5806 if (ret == -1) {
5807 if (!par.m_cont)
5808 return -1;
5809 totret = -1;
5810 LL1("continue to next case due to -cont");
5811 break;
5812 }
5813 }
5814 }
5815 }
5816 for (n = 0; n < par.m_threads; n++) {
5817 Thr& thr = *g_thrlist[n];
5818 thr.exit();
5819 }
5820 for (n = 0; n < par.m_threads; n++) {
5821 Thr& thr = *g_thrlist[n];
5822 thr.join();
5823 delete &thr;
5824 }
5825 delete [] g_thrlist;
5826 g_thrlist = 0;
5827 con.disconnect();
5828 return totret;
5829 }
5830
5831 static const char* g_progname = "testOIBasic";
5832
5833 int
main(int argc,char ** argv)5834 main(int argc, char** argv)
5835 {
5836 ndb_init();
5837 uint i;
5838 ndbout << g_progname;
5839 for (i = 1; i < (uint) argc; i++)
5840 ndbout << " " << argv[i];
5841 ndbout << endl;
5842 ndbout_mutex = NdbMutex_Create();
5843 while (++argv, --argc > 0) {
5844 const char* arg = argv[0];
5845 if (*arg != '-') {
5846 ndbout << "testOIBasic: unknown argument " << arg;
5847 goto usage;
5848 }
5849 if (strcmp(arg, "-batch") == 0) {
5850 if (++argv, --argc > 0) {
5851 g_opt.m_batch = atoi(argv[0]);
5852 continue;
5853 }
5854 }
5855 if (strcmp(arg, "-bound") == 0) {
5856 if (++argv, --argc > 0) {
5857 const char* p = argv[0];
5858 if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) {
5859 g_opt.m_bound = strdup(p);
5860 continue;
5861 }
5862 }
5863 }
5864 if (strcmp(arg, "-case") == 0) {
5865 if (++argv, --argc > 0) {
5866 g_opt.m_case = strdup(argv[0]);
5867 continue;
5868 }
5869 }
5870 if (strcmp(arg, "-collsp") == 0) {
5871 g_opt.m_collsp = true;
5872 continue;
5873 }
5874 if (strcmp(arg, "-cont") == 0) {
5875 g_opt.m_cont = true;
5876 continue;
5877 }
5878 if (strcmp(arg, "-core") == 0) {
5879 g_opt.m_core = true;
5880 continue;
5881 }
5882 if (strcmp(arg, "-csname") == 0) {
5883 if (++argv, --argc > 0) {
5884 g_opt.m_csname = strdup(argv[0]);
5885 continue;
5886 }
5887 }
5888 if (strcmp(arg, "-die") == 0) {
5889 if (++argv, --argc > 0) {
5890 g_opt.m_die = atoi(argv[0]);
5891 continue;
5892 }
5893 }
5894 if (strcmp(arg, "-dups") == 0) {
5895 g_opt.m_dups = true;
5896 continue;
5897 }
5898 if (strcmp(arg, "-fragtype") == 0) {
5899 if (++argv, --argc > 0) {
5900 if (strcmp(argv[0], "single") == 0) {
5901 g_opt.m_fragtype = NdbDictionary::Object::FragSingle;
5902 continue;
5903 }
5904 if (strcmp(argv[0], "small") == 0) {
5905 g_opt.m_fragtype = NdbDictionary::Object::FragAllSmall;
5906 continue;
5907 }
5908 if (strcmp(argv[0], "medium") == 0) {
5909 g_opt.m_fragtype = NdbDictionary::Object::FragAllMedium;
5910 continue;
5911 }
5912 if (strcmp(argv[0], "large") == 0) {
5913 g_opt.m_fragtype = NdbDictionary::Object::FragAllLarge;
5914 continue;
5915 }
5916 }
5917 }
5918 if (strcmp(arg, "-index") == 0) {
5919 if (++argv, --argc > 0) {
5920 g_opt.m_index = strdup(argv[0]);
5921 continue;
5922 }
5923 }
5924 if (strcmp(arg, "-loop") == 0) {
5925 if (++argv, --argc > 0) {
5926 g_opt.m_loop = atoi(argv[0]);
5927 continue;
5928 }
5929 }
5930 if (strcmp(arg, "-mrrmaxrng") == 0) {
5931 if (++argv, --argc > 0) {
5932 g_opt.m_mrrmaxrng = atoi(argv[0]);
5933 continue;
5934 }
5935 }
5936 if (strcmp(arg, "-nologging") == 0) {
5937 g_opt.m_nologging = true;
5938 continue;
5939 }
5940 if (strcmp(arg, "-noverify") == 0) {
5941 g_opt.m_noverify = true;
5942 continue;
5943 }
5944 if (strcmp(arg, "-pctmrr") == 0) {
5945 if (++argv, --argc > 0) {
5946 g_opt.m_pctmrr = atoi(argv[0]);
5947 continue;
5948 }
5949 }
5950 if (strcmp(arg, "-pctnull") == 0) {
5951 if (++argv, --argc > 0) {
5952 g_opt.m_pctnull = atoi(argv[0]);
5953 continue;
5954 }
5955 }
5956 if (strcmp(arg, "-rows") == 0) {
5957 if (++argv, --argc > 0) {
5958 g_opt.m_rows = atoi(argv[0]);
5959 continue;
5960 }
5961 }
5962 if (strcmp(arg, "-samples") == 0) {
5963 if (++argv, --argc > 0) {
5964 g_opt.m_samples = atoi(argv[0]);
5965 continue;
5966 }
5967 }
5968 if (strcmp(arg, "-scanbatch") == 0) {
5969 if (++argv, --argc > 0) {
5970 g_opt.m_scanbatch = atoi(argv[0]);
5971 continue;
5972 }
5973 }
5974 if (strcmp(arg, "-scanpar") == 0) {
5975 if (++argv, --argc > 0) {
5976 g_opt.m_scanpar = atoi(argv[0]);
5977 continue;
5978 }
5979 }
5980 if (strcmp(arg, "-seed") == 0) {
5981 if (++argv, --argc > 0) {
5982 g_opt.m_seed = atoi(argv[0]);
5983 continue;
5984 }
5985 }
5986 if (strcmp(arg, "-skip") == 0) {
5987 if (++argv, --argc > 0) {
5988 g_opt.m_skip = strdup(argv[0]);
5989 continue;
5990 }
5991 }
5992 if (strcmp(arg, "-sloop") == 0) {
5993 if (++argv, --argc > 0) {
5994 g_opt.m_sloop = atoi(argv[0]);
5995 continue;
5996 }
5997 }
5998 if (strcmp(arg, "-ssloop") == 0) {
5999 if (++argv, --argc > 0) {
6000 g_opt.m_ssloop = atoi(argv[0]);
6001 continue;
6002 }
6003 }
6004 if (strcmp(arg, "-table") == 0) {
6005 if (++argv, --argc > 0) {
6006 g_opt.m_table = strdup(argv[0]);
6007 continue;
6008 }
6009 }
6010 if (strcmp(arg, "-threads") == 0) {
6011 if (++argv, --argc > 0) {
6012 g_opt.m_threads = atoi(argv[0]);
6013 if (1 <= g_opt.m_threads)
6014 continue;
6015 }
6016 }
6017 if (strcmp(arg, "-v") == 0) {
6018 if (++argv, --argc > 0) {
6019 g_opt.m_v = atoi(argv[0]);
6020 continue;
6021 }
6022 }
6023 if (strncmp(arg, "-v", 2) == 0 && isdigit(arg[2])) {
6024 g_opt.m_v = atoi(&arg[2]);
6025 continue;
6026 }
6027 if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
6028 printhelp();
6029 goto wrongargs;
6030 }
6031 ndbout << "testOIBasic: bad or unknown option " << arg;
6032 goto usage;
6033 }
6034 {
6035 Par par(g_opt);
6036 g_ncc = new Ndb_cluster_connection();
6037 if (g_ncc->connect(30) != 0 || runtest(par) < 0)
6038 goto failed;
6039 delete g_ncc;
6040 g_ncc = 0;
6041 }
6042 // ok
6043 return NDBT_ProgramExit(NDBT_OK);
6044 failed:
6045 return NDBT_ProgramExit(NDBT_FAILED);
6046 usage:
6047 ndbout << " (use -h for help)" << endl;
6048 wrongargs:
6049 return NDBT_ProgramExit(NDBT_WRONGARGS);
6050 }
6051
6052 // vim: set sw=2 et:
6053