1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /*
26  * testBlobs
27  */
28 
29 #include <ndb_global.h>
30 #include <NdbMain.h>
31 #include <NdbOut.hpp>
32 #include <OutputStream.hpp>
33 #include <NdbTest.hpp>
34 #include <NdbTick.h>
35 #include <my_sys.h>
36 #include <NdbRestarter.hpp>
37 
38 #include <ndb_rand.h>
39 
40 struct Chr {
41   NdbDictionary::Column::Type m_type;
42   bool m_fixed;
43   bool m_binary;
44   uint m_len; // native
45   uint m_bytelen; // in bytes
46   uint m_totlen; // plus length bytes
47   const char* m_cs;
48   CHARSET_INFO* m_csinfo;
49   uint m_mblen;
50   bool m_caseins; // for latin letters
ChrChr51   Chr() :
52     m_type(NdbDictionary::Column::Varchar),
53     m_fixed(false),
54     m_binary(false),
55     m_len(55),
56     m_bytelen(0),
57     m_totlen(0),
58     m_cs("latin1"),
59     m_csinfo(0),
60     m_caseins(true)
61   {}
62 };
63 
64 struct Opt {
65   unsigned m_batch;
66   bool m_core;
67   bool m_dbg;
68   const char* m_debug;
69   bool m_fac;
70   bool m_full;
71   unsigned m_loop;
72   bool m_min;
73   unsigned m_parts;
74   unsigned m_rows;
75   int m_seed;
76   const char* m_skip;
77   const char* m_test;
78   int m_timeout_retries;
79   int m_blob_version;
80   // metadata
81   const char* m_tname;
82   const char* m_x1name;  // hash index
83   const char* m_x2name;  // ordered index
84   unsigned m_pk1off;
85   Chr m_pk2chr;
86   bool m_pk2part;
87   bool m_oneblob;
88 
89   int m_rbatch;
90   int m_wbatch;
91   // perf
92   const char* m_tnameperf;
93   unsigned m_rowsperf;
94   // bugs
95   int m_bug;
96   int (*m_bugtest)();
OptOpt97   Opt() :
98     m_batch(7),
99     m_core(false),
100     m_dbg(false),
101     m_debug(0),
102     m_fac(false),
103     m_full(false),
104     m_loop(1),
105     m_min(false),
106     m_parts(10),
107     m_rows(100),
108     m_seed(-1),
109     m_skip(0),
110     m_test(0),
111     m_timeout_retries(10),
112     m_blob_version(2),
113     // metadata
114     m_tname("TB1"),
115     m_x1name("TB1X1"),
116     m_x2name("TB1X2"),
117     m_pk1off(0x12340000),
118     m_pk2chr(),
119     m_pk2part(false),
120     m_oneblob(false),
121     m_rbatch(-1),
122     m_wbatch(-1),
123     // perf
124     m_tnameperf("TB2"),
125     m_rowsperf(10000),
126     // bugs
127     m_bug(0),
128     m_bugtest(0)
129   {}
130 };
131 
132 static void
printusage()133 printusage()
134 {
135   Opt d;
136   ndbout
137     << "usage: testBlobs options [default/max]" << endl
138     << "  -batch N    number of pk ops in batch [" << d.m_batch << "]" << endl
139     << "  -core       dump core on error" << endl
140     << "  -dbg        print program debug" << endl
141     << "  -debug opt  also ndb api DBUG (if no ':' becomes d:t:F:L:o,opt)" << endl
142     << "  -fac        fetch across commit in scan delete" << endl
143     << "  -full       read/write only full blob values" << endl
144     << "  -loop N     loop N times 0=forever [" << d.m_loop << "]" << endl
145     << "  -min        small blob sizes" << endl
146     << "  -parts N    max parts in blob value [" << d.m_parts << "]" << endl
147     << "  -rows N     number of rows [" << d.m_rows << "]" << endl
148     << "  -rowsperf N rows for performace test [" << d.m_rowsperf << "]" << endl
149     << "  -seed N     random seed 0=loop number -1=random [" << d.m_seed << "]" << endl
150     << "  -skip xxx   skip given tests (see list) [no tests]" << endl
151     << "  -test xxx   only given tests (see list) [all tests]" << endl
152     << "  -timeoutretries N Number of times to retry in deadlock situations ["
153     << d.m_timeout_retries << "]" << endl
154     << "  -version N  blob version 1 or 2 [" << d.m_blob_version << "]" << endl
155     << "metadata" << endl
156     << "  -pk2len N   native length of PK2, zero omits PK2,PK3 [" << d.m_pk2chr.m_len << "]" << endl
157     << "  -pk2fixed   PK2 is Char [default Varchar]" << endl
158     << "  -pk2binary  PK2 is Binary or Varbinary" << endl
159     << "  -pk2cs      PK2 charset or collation [" << d.m_pk2chr.m_cs << "]" << endl
160     << "  -pk2part    partition primary table by PK2" << endl
161     << "  -oneblob    only 1 blob attribute [default 2]" << endl
162     << "  -rbatch     N Read parts batchsize (bytes) [default -1] -1=random" << endl
163     << "  -wbatch     N Write parts batchsize (bytes) [default -1] -1=random" << endl
164     << "disk or memory storage for blobs.  Don't apply to performance test" << endl
165     << "  m           Blob columns stored in memory" << endl
166     << "  h           Blob columns stored on disk" << endl
167     << "api styles for test/skip.  Don't apply to performance test" << endl
168     << "  a           NdbRecAttr(old) interface" << endl
169     << "  b           NdbRecord interface" << endl
170     << "test cases for test/skip" << endl
171     << "  k           primary key ops" << endl
172     << "  i           hash index ops" << endl
173     << "  s           table scans" << endl
174     << "  r           ordered index scans" << endl
175     << "  p           performance test" << endl
176     << "operations for test/skip" << endl
177     << "  u           update existing blob value" << endl
178     << "  n           normal insert and update" << endl
179     << "  w           insert and update using writeTuple" << endl
180     << "  d           delete, can skip only for one subtest" << endl
181     << "  l           read with lock and unlock" << endl
182     << "blob operation styles for test/skip" << endl
183     << "  0           getValue / setValue" << endl
184     << "  1           setActiveHook" << endl
185     << "  2           readData / writeData" << endl
186     << "example: -test makn0 (need all 4 parts)" << endl
187     << "example: -test mhabkisrunwd012 (Everything except performance tests" << endl
188     << "bug tests" << endl
189     << "  -bug 4088   ndb api hang with mixed ops on index table" << endl
190     << "  -bug 27018  middle partial part write clobbers rest of part" << endl
191     << "  -bug 27370  Potential inconsistent blob reads for ReadCommitted reads" << endl
192     << "  -bug 36756  Handling execute(.., abortOption) and Blobs " << endl
193     << "  -bug 45768  execute(Commit) after failing blob batch " << endl
194     << "  -bug 62321  Blob obscures ignored error codes in batch" << endl
195     ;
196 }
197 
198 static Opt g_opt;
199 
200 static bool
testcase(char x)201 testcase(char x)
202 {
203   if (x < 10)
204     x += '0';
205 
206   return
207     (g_opt.m_test == 0 || strchr(g_opt.m_test, x) != 0) &&
208     (g_opt.m_skip == 0 || strchr(g_opt.m_skip, x) == 0);
209 }
210 
211 static Ndb_cluster_connection* g_ncc = 0;
212 static Ndb* g_ndb = 0;
213 static NdbDictionary::Dictionary* g_dic = 0;
214 static NdbConnection* g_con = 0;
215 static NdbOperation* g_opr = 0;
216 static const NdbOperation* g_const_opr = 0;
217 static NdbIndexOperation* g_opx = 0;
218 static NdbScanOperation* g_ops = 0;
219 static NdbBlob* g_bh1 = 0;
220 static NdbBlob* g_bh2 = 0;
221 static bool g_printerror = true;
222 static unsigned g_loop = 0;
223 static NdbRecord *g_key_record= 0;
224 static NdbRecord *g_blob_record= 0;
225 static NdbRecord *g_full_record= 0;
226 static NdbRecord *g_idx_record= 0;
227 static NdbRecord *g_ord_record= 0;
228 static unsigned g_pk1_offset= 0;
229 static unsigned g_pk2_offset= 0;
230 static unsigned g_pk3_offset= 0;
231 static unsigned g_blob1_offset= 0;
232 static unsigned g_blob1_null_offset= 0;
233 static unsigned g_blob2_offset= 0;
234 static unsigned g_blob2_null_offset= 0;
235 static unsigned g_rowsize= 0;
236 static const char* g_tsName= "DEFAULT-TS";
237 static Uint32 g_batchSize= 0;
238 static Uint32 g_scanFlags= 0;
239 static Uint32 g_parallel= 0;
240 static Uint32 g_usingDisk= false;
241 static const Uint32 MAX_FRAGS=48 * 8 * 4; // e.g. 48 nodes, 8 frags/node, 4 replicas
242 static Uint32 frag_ng_mappings[MAX_FRAGS];
243 
244 
245 static const char* stylename[3] = {
246   "style=getValue/setValue",
247   "style=setActiveHook",
248   "style=readData/writeData"
249 };
250 
251 // Blob API variants
252 static const char* apiName[2] = {
253   "api=NdbRecAttr",
254   "api=NdbRecord"
255 };
256 
257 static const char apiSymbol[2] = {
258   'a',  // RecAttr
259   'b'   // NdbRecord
260 };
261 
262 static const int API_RECATTR=0;
263 static const int API_NDBRECORD=1;
264 
265 static const char* storageName[2] = {
266   "storage=memory",
267   "storage=disk"
268 };
269 
270 static const char storageSymbol[2] = {
271   'm',  // Memory storage
272   'h'   // Disk storage
273 };
274 
275 static const int STORAGE_MEM=0;
276 static const int STORAGE_DISK=1;
277 
278 static void
printerror(int line,const char * msg)279 printerror(int line, const char* msg)
280 {
281   ndbout << "line " << line << " FAIL " << msg << endl;
282   if (! g_printerror) {
283     return;
284   }
285   if (g_ndb != 0 && g_ndb->getNdbError().code != 0) {
286     ndbout << "ndb: " << g_ndb->getNdbError() << endl;
287   }
288   if (g_dic != 0 && g_dic->getNdbError().code != 0) {
289     ndbout << "dic: " << g_dic->getNdbError() << endl;
290   }
291   if (g_con != 0 && g_con->getNdbError().code != 0) {
292     ndbout << "con: " << g_con->getNdbError() << endl;
293     if (g_opr != 0 && g_opr->getNdbError().code != 0) {
294       ndbout << "opr: table=" << g_opr->getTableName() << " " << g_opr->getNdbError() << endl;
295     }
296     if (g_const_opr != 0 && g_const_opr->getNdbError().code !=0) {
297       ndbout << "const_opr: table=" << g_const_opr->getTableName() << " " << g_const_opr->getNdbError() << endl;
298     }
299     if (g_opx != 0 && g_opx->getNdbError().code != 0) {
300       ndbout << "opx: table=" << g_opx->getTableName() << " " << g_opx->getNdbError() << endl;
301     }
302     if (g_ops != 0 && g_ops->getNdbError().code != 0) {
303       ndbout << "ops: table=" << g_ops->getTableName() << " " << g_ops->getNdbError() << endl;
304     }
305     NdbOperation* ope = g_con->getNdbErrorOperation();
306     if (ope != 0 && ope->getNdbError().code != 0) {
307       if (ope != g_opr && ope != g_const_opr && ope != g_opx && ope != g_ops)
308         ndbout << "ope: ptr=" << ope << " table=" << ope->getTableName() << " type= "<< ope->getType() << " " << ope->getNdbError() << endl;
309     }
310   }
311   if (g_bh1 != 0 && g_bh1->getNdbError().code != 0) {
312     ndbout << "bh1: " << g_bh1->getNdbError() << endl;
313   }
314   if (g_bh2 != 0 && g_bh2->getNdbError().code != 0) {
315     ndbout << "bh2: " << g_bh2->getNdbError() << endl;
316   }
317   if (g_opt.m_core) {
318     abort();
319   }
320   g_printerror = false;
321 }
322 
323 #define CHK(x) \
324   do { \
325     if (x) break; \
326     printerror(__LINE__, #x); return -1; \
327   } while (0)
328 #define DBG(x) \
329   do { \
330     if (! g_opt.m_dbg) break; \
331     ndbout << "line " << __LINE__ << " " << x << endl; \
332   } while (0)
333 #define DISP(x) \
334   do { \
335     ndbout << "line " << __LINE__ << " " << x << endl; \
336   } while (0)
337 
338 struct Bcol {
339   int m_type;
340   int m_version;
341   bool m_nullable;
342   uint m_inline;
343   uint m_partsize;
344   uint m_stripe;
345   char m_btname[200];
BcolBcol346   Bcol() { memset(this, 0, sizeof(*this)); }
347 };
348 
349 static Bcol g_blob1;
350 static Bcol g_blob2;
351 
352 enum OpState {Normal, Retrying};
353 
354 static void
initblobs()355 initblobs()
356 {
357   {
358     Bcol& b = g_blob1;
359     b.m_type = NdbDictionary::Column::Text;
360     b.m_version = g_opt.m_blob_version;
361     b.m_nullable = false;
362     b.m_inline = g_opt.m_min ? 8 : 240;
363     b.m_partsize = g_opt.m_min ? 8 : 2000;
364     b.m_stripe = b.m_version == 1 ? 4 : 0;
365   }
366   {
367     Bcol& b = g_blob2;
368     b.m_type = NdbDictionary::Column::Blob;
369     b.m_version = g_opt.m_blob_version;
370     b.m_nullable = true;
371     b.m_inline = g_opt.m_min ? 9 : 99;
372     b.m_partsize = g_opt.m_min ? 5 : 55;
373     b.m_stripe = 3;
374   }
375 }
376 
377 static void
initConstants()378 initConstants()
379 {
380   g_pk1_offset= 0;
381   g_pk2_offset= g_pk1_offset + 4;
382   g_pk3_offset= g_pk2_offset + g_opt.m_pk2chr.m_totlen;
383   g_blob1_offset= g_pk3_offset + 2;
384   g_blob2_offset= g_blob1_offset + sizeof(NdbBlob *);
385   g_blob1_null_offset= g_blob2_offset + sizeof(NdbBlob *);
386   g_blob2_null_offset= g_blob1_null_offset + 1;
387   g_rowsize= g_blob2_null_offset + 1;
388 }
389 
390 static int
createDefaultTableSpace()391 createDefaultTableSpace()
392 {
393   /* 'Inspired' by NDBT_Tables::create_default_tablespace */
394   int res;
395   NdbDictionary::LogfileGroup lg = g_dic->getLogfileGroup("DEFAULT-LG");
396   if (strcmp(lg.getName(), "DEFAULT-LG") != 0)
397   {
398     lg.setName("DEFAULT-LG");
399     lg.setUndoBufferSize(8*1024*1024);
400     res = g_dic->createLogfileGroup(lg);
401     if(res != 0){
402       DBG("Failed to create logfilegroup:"
403           << endl << g_dic->getNdbError() << endl);
404       return -1;
405     }
406   }
407   {
408     NdbDictionary::Undofile uf = g_dic->getUndofile(0, "undofile01.dat");
409     if (strcmp(uf.getPath(), "undofile01.dat") != 0)
410     {
411       uf.setPath("undofile01.dat");
412       uf.setSize(32*1024*1024);
413       uf.setLogfileGroup("DEFAULT-LG");
414 
415       res = g_dic->createUndofile(uf, true);
416       if(res != 0){
417 	DBG("Failed to create undofile:"
418             << endl << g_dic->getNdbError() << endl);
419 	return -1;
420       }
421     }
422   }
423   {
424     NdbDictionary::Undofile uf = g_dic->getUndofile(0, "undofile02.dat");
425     if (strcmp(uf.getPath(), "undofile02.dat") != 0)
426     {
427       uf.setPath("undofile02.dat");
428       uf.setSize(32*1024*1024);
429       uf.setLogfileGroup("DEFAULT-LG");
430 
431       res = g_dic->createUndofile(uf, true);
432       if(res != 0){
433 	DBG("Failed to create undofile:"
434             << endl << g_dic->getNdbError() << endl);
435 	return -1;
436       }
437     }
438   }
439   NdbDictionary::Tablespace ts = g_dic->getTablespace(g_tsName);
440   if (strcmp(ts.getName(), g_tsName) != 0)
441   {
442     ts.setName(g_tsName);
443     ts.setExtentSize(1024*1024);
444     ts.setDefaultLogfileGroup("DEFAULT-LG");
445 
446     res = g_dic->createTablespace(ts);
447     if(res != 0){
448       DBG("Failed to create tablespace:"
449           << endl << g_dic->getNdbError() << endl);
450       return -1;
451     }
452   }
453 
454   {
455     NdbDictionary::Datafile df = g_dic->getDatafile(0, "datafile01.dat");
456     if (strcmp(df.getPath(), "datafile01.dat") != 0)
457     {
458       df.setPath("datafile01.dat");
459       df.setSize(64*1024*1024);
460       df.setTablespace(g_tsName);
461 
462       res = g_dic->createDatafile(df, true);
463       if(res != 0){
464 	DBG("Failed to create datafile:"
465             << endl << g_dic->getNdbError() << endl);
466 	return -1;
467       }
468     }
469   }
470 
471   {
472     NdbDictionary::Datafile df = g_dic->getDatafile(0, "datafile02.dat");
473     if (strcmp(df.getPath(), "datafile02.dat") != 0)
474     {
475       df.setPath("datafile02.dat");
476       df.setSize(64*1024*1024);
477       df.setTablespace(g_tsName);
478 
479       res = g_dic->createDatafile(df, true);
480       if(res != 0){
481 	DBG("Failed to create datafile:"
482             << endl << g_dic->getNdbError() << endl);
483 	return -1;
484       }
485     }
486   }
487 
488   return 0;
489 }
490 
491 static int
dropTable()492 dropTable()
493 {
494   NdbDictionary::Table tab(g_opt.m_tname);
495   if (g_dic->getTable(g_opt.m_tname) != 0)
496     CHK(g_dic->dropTable(g_opt.m_tname) == 0);
497 
498   if (g_key_record != NULL)
499     g_dic->releaseRecord(g_key_record);
500   if (g_blob_record != NULL)
501     g_dic->releaseRecord(g_blob_record);
502   if (g_full_record != NULL)
503     g_dic->releaseRecord(g_full_record);
504 
505   if (g_opt.m_pk2chr.m_len != 0)
506   {
507     if (g_idx_record != NULL)
508       g_dic->releaseRecord(g_idx_record);
509     if (g_ord_record != NULL)
510       g_dic->releaseRecord(g_ord_record);
511   }
512 
513   g_key_record= NULL;
514   g_blob_record= NULL;
515   g_full_record= NULL;
516   g_idx_record= NULL;
517   g_ord_record= NULL;
518 
519   return 0;
520 }
521 
522 static unsigned
urandom(unsigned n)523 urandom(unsigned n)
524 {
525   return n == 0 ? 0 : ndb_rand() % n;
526 }
527 
528 static int
createTable(int storageType)529 createTable(int storageType)
530 {
531   /* No logging for memory tables */
532   bool loggingRequired=(storageType == STORAGE_DISK);
533   NdbDictionary::Column::StorageType blobStorageType=
534     (storageType == STORAGE_MEM)?
535     NdbDictionary::Column::StorageTypeMemory :
536     NdbDictionary::Column::StorageTypeDisk;
537 
538   NdbDictionary::Table tab(g_opt.m_tname);
539   if (storageType == STORAGE_DISK)
540     tab.setTablespaceName(g_tsName);
541   tab.setLogging(loggingRequired);
542 
543   /* Choose from the interesting fragmentation types :
544    * DistrKeyHash, DistrKeyLin, UserDefined, HashMapPartitioned
545    * Others are obsolete fragment-count setting variants
546    * of DistrKeyLin
547    * For UserDefined partitioning, we need to set the partition
548    * id for all PK operations.
549    */
550   Uint32 fragTypeRange= 1 + (NdbDictionary::Object::HashMapPartition -
551                              NdbDictionary::Object::DistrKeyHash);
552   Uint32 fragType= NdbDictionary::Object::DistrKeyHash + urandom(fragTypeRange);
553 
554   /* Value 8 is unused currently, map it to something else */
555   if (fragType == 8)
556     fragType= NdbDictionary::Object::UserDefined;
557 
558   tab.setFragmentType((NdbDictionary::Object::FragmentType)fragType);
559 
560   if (fragType == NdbDictionary::Object::UserDefined)
561   {
562     /* Need to set the FragmentCount and fragment to NG mapping
563      * for this partitioning type
564      */
565     const Uint32 numNodes= g_ncc->no_db_nodes();
566     const Uint32 numReplicas= 2; // Assumption
567     const Uint32 guessNumNgs= numNodes/2;
568     const Uint32 numNgs= guessNumNgs?guessNumNgs : 1;
569     const Uint32 numFragsPerNode= 2 + (rand() % 3);
570     const Uint32 numPartitions= numReplicas * numNgs * numFragsPerNode;
571 
572     tab.setFragmentCount(numPartitions);
573     for (Uint32 i=0; i<numPartitions; i++)
574     {
575       frag_ng_mappings[i]= i % numNgs;
576     }
577     tab.setFragmentData(frag_ng_mappings, numPartitions);
578   }
579   const Chr& pk2chr = g_opt.m_pk2chr;
580   // col PK1 - Uint32
581   { NdbDictionary::Column col("PK1");
582     col.setType(NdbDictionary::Column::Unsigned);
583     col.setPrimaryKey(true);
584     tab.addColumn(col);
585   }
586   // col BL1 - Text not-nullable
587   { NdbDictionary::Column col("BL1");
588     const Bcol& b = g_blob1;
589     col.setType((NdbDictionary::Column::Type)b.m_type);
590     col.setBlobVersion(b.m_version);
591     col.setNullable(b.m_nullable);
592     col.setInlineSize(b.m_inline);
593     col.setPartSize(b.m_partsize);
594     col.setStripeSize(b.m_stripe);
595     col.setStorageType(blobStorageType);
596     tab.addColumn(col);
597   }
598   // col PK2 - Char or Varchar
599   if (pk2chr.m_len != 0)
600   { NdbDictionary::Column col("PK2");
601     col.setType(pk2chr.m_type);
602     col.setPrimaryKey(true);
603     col.setLength(pk2chr.m_bytelen);
604     if (pk2chr.m_csinfo != 0)
605       col.setCharset(pk2chr.m_csinfo);
606     if (g_opt.m_pk2part)
607       col.setPartitionKey(true);
608     tab.addColumn(col);
609   }
610   // col BL2 - Blob nullable
611   if (! g_opt.m_oneblob)
612   { NdbDictionary::Column col("BL2");
613     const Bcol& b = g_blob2;
614     col.setType((NdbDictionary::Column::Type)b.m_type);
615     col.setBlobVersion(b.m_version);
616     col.setNullable(b.m_nullable);
617     col.setInlineSize(b.m_inline);
618     col.setPartSize(b.m_partsize);
619     col.setStripeSize(b.m_stripe);
620     col.setStorageType(blobStorageType);
621     tab.addColumn(col);
622   }
623   // col PK3 - puts the Var* key PK2 between PK1 and PK3
624   if (pk2chr.m_len != 0)
625   { NdbDictionary::Column col("PK3");
626     col.setType(NdbDictionary::Column::Smallunsigned);
627     col.setPrimaryKey(true);
628 
629     tab.addColumn(col);
630   }
631   // create table
632   CHK(g_dic->createTable(tab) == 0);
633   // unique hash index on PK2,PK3
634   if (g_opt.m_pk2chr.m_len != 0)
635   { NdbDictionary::Index idx(g_opt.m_x1name);
636     idx.setType(NdbDictionary::Index::UniqueHashIndex);
637     idx.setLogging(loggingRequired);
638     idx.setTable(g_opt.m_tname);
639     idx.addColumnName("PK2");
640     idx.addColumnName("PK3");
641     CHK(g_dic->createIndex(idx) == 0);
642   }
643   // ordered index on PK2
644   if (g_opt.m_pk2chr.m_len != 0)
645   { NdbDictionary::Index idx(g_opt.m_x2name);
646     idx.setType(NdbDictionary::Index::OrderedIndex);
647     idx.setLogging(false);
648     idx.setTable(g_opt.m_tname);
649     idx.addColumnName("PK2");
650     CHK(g_dic->createIndex(idx) == 0);
651   }
652 
653   NdbDictionary::RecordSpecification spec[5];
654   unsigned numpks= g_opt.m_pk2chr.m_len == 0 ? 1 : 3;
655   unsigned numblobs= g_opt.m_oneblob ? 1 : 2;
656 
657   const NdbDictionary::Table *dict_table;
658   CHK((dict_table= g_dic->getTable(g_opt.m_tname)) != 0);
659   memset(spec, 0, sizeof(spec));
660   spec[0].column= dict_table->getColumn("PK1");
661   spec[0].offset= g_pk1_offset;
662   spec[numpks].column= dict_table->getColumn("BL1");
663   spec[numpks].offset= g_blob1_offset;
664   spec[numpks].nullbit_byte_offset= g_blob1_null_offset;
665   spec[numpks].nullbit_bit_in_byte= 0;
666   if (g_opt.m_pk2chr.m_len != 0)
667   {
668     spec[1].column= dict_table->getColumn("PK2");
669     spec[1].offset= g_pk2_offset;
670     spec[2].column= dict_table->getColumn("PK3");
671     spec[2].offset= g_pk3_offset;
672   }
673   if (! g_opt.m_oneblob)
674   {
675     spec[numpks+1].column= dict_table->getColumn("BL2");
676     spec[numpks+1].offset= g_blob2_offset;
677     spec[numpks+1].nullbit_byte_offset= g_blob2_null_offset;
678     spec[numpks+1].nullbit_bit_in_byte= 0;
679   }
680   CHK((g_key_record= g_dic->createRecord(dict_table, &spec[0], numpks,
681                                          sizeof(spec[0]))) != 0);
682   CHK((g_blob_record= g_dic->createRecord(dict_table, &spec[numpks], numblobs,
683                                          sizeof(spec[0]))) != 0);
684   CHK((g_full_record= g_dic->createRecord(dict_table, &spec[0], numpks+numblobs,
685                                          sizeof(spec[0]))) != 0);
686 
687   if (g_opt.m_pk2chr.m_len != 0)
688   {
689     const NdbDictionary::Index *dict_index;
690     CHK((dict_index= g_dic->getIndex(g_opt.m_x1name, g_opt.m_tname)) != 0);
691     CHK((g_idx_record= g_dic->createRecord(dict_index, &spec[1], 2,
692                                            sizeof(spec[0]))) != 0);
693     CHK((dict_index= g_dic->getIndex(g_opt.m_x2name, g_opt.m_tname)) != 0);
694     CHK((g_ord_record= g_dic->createRecord(dict_index, &spec[1], 1,
695                                            sizeof(spec[0]))) != 0);
696   }
697 
698   return 0;
699 }
700 
701 // tuples
702 
703 struct Bval {
704   const Bcol& m_bcol;
705   char* m_val;
706   unsigned m_len;
707   char* m_buf; // read/write buffer
708   unsigned m_buflen;
709   int m_error_code; // for testing expected error code
BvalBval710   Bval(const Bcol& bcol) :
711     m_bcol(bcol),
712     m_val(0),
713     m_len(0),
714     m_buf(0),
715     m_buflen(0),
716     m_error_code(0)
717     {}
~BvalBval718   ~Bval() { delete [] m_val; delete [] m_buf; }
allocBval719   void alloc() {
720     alloc(m_bcol.m_inline + m_bcol.m_partsize * g_opt.m_parts);
721   }
allocBval722   void alloc(unsigned buflen) {
723     m_buflen = buflen;
724     delete [] m_buf;
725     m_buf = new char [m_buflen];
726     trash();
727   }
copyfromBval728   void copyfrom(const Bval& v) {
729     m_len = v.m_len;
730     delete [] m_val;
731     if (v.m_val == 0)
732       m_val = 0;
733     else
734       m_val = (char*)memcpy(new char [m_len], v.m_val, m_len);
735   }
trashBval736   void trash() const {
737     assert(m_buf != 0);
738     memset(m_buf, 'x', m_buflen);
739   }
740 private:
741   Bval(const Bval&);
742   Bval& operator=(const Bval&);
743 };
744 
745 NdbOut&
operator <<(NdbOut & out,const Bval & v)746 operator<<(NdbOut& out, const Bval& v)
747 {
748   if (g_opt.m_min && v.m_val != 0) {
749     out << "[" << v.m_len << "]";
750     for (uint i = 0; i < v.m_len; i++) {
751       const Bcol& b = v.m_bcol;
752       if (i == b.m_inline ||
753           (i > b.m_inline && (i - b.m_inline) % b.m_partsize == 0))
754         out.print("|");
755       out.print("%c", v.m_val[i]);
756     }
757   }
758   return out;
759 }
760 
761 struct Tup {
762   bool m_exists;        // exists in table
763   Uint32 m_pk1;         // in V1 primary keys concatenated like keyinfo
764   char* m_pk2;
765   char* m_pk2eq;        // equivalent (if case independent)
766   Uint16 m_pk3;
767   Bval m_bval1;
768   Bval m_bval2;
769   char *m_key_row;
770   char *m_row;
771   Uint32 m_frag;
TupTup772   Tup() :
773     m_exists(false),
774     m_pk2(new char [g_opt.m_pk2chr.m_totlen + 1]), // nullterm for convenience
775     m_pk2eq(new char [g_opt.m_pk2chr.m_totlen + 1]),
776     m_bval1(g_blob1),
777     m_bval2(g_blob2),
778     m_key_row(new char[g_rowsize]),
779     m_row(new char[g_rowsize]),
780     m_frag(~(Uint32)0)
781     {}
~TupTup782   ~Tup() {
783     delete [] m_pk2;
784     m_pk2 = 0;
785     delete [] m_pk2eq;
786     m_pk2eq = 0;
787     delete [] m_key_row;
788     m_key_row= 0;
789     delete [] m_row;
790     m_row= 0;
791   }
792   // alloc buffers of max size
allocTup793   void alloc() {
794     m_bval1.alloc();
795     m_bval2.alloc();
796   }
copyfromTup797   void copyfrom(const Tup& tup) {
798     assert(m_pk1 == tup.m_pk1);
799     m_bval1.copyfrom(tup.m_bval1);
800     m_bval2.copyfrom(tup.m_bval2);
801   }
802   /*
803    * in V2 return pk2 or pk2eq at random
804    * in V1 mixed cases do not work in general due to key packing
805    * luckily they do work via mysql
806    */
pk2Tup807   char* pk2() {
808     if (g_opt.m_blob_version == 1)
809       return m_pk2;
810     return urandom(2) == 0 ? m_pk2 : m_pk2eq;
811   }
getPartitionIdTup812   Uint32 getPartitionId(Uint32 numParts) const {
813     /* Only for UserDefined tables really */
814     return m_pk1 % numParts; // MySQLD hash(PK1) style partitioning
815   }
816 
817 private:
818   Tup(const Tup&);
819   Tup& operator=(const Tup&);
820 };
821 
822 static Tup* g_tups;
823 
824 static void
setUDpartId(const Tup & tup,NdbOperation * op)825 setUDpartId(const Tup& tup, NdbOperation* op)
826 {
827   const NdbDictionary::Table* tab= op->getTable();
828   if (tab->getFragmentType() == NdbDictionary::Object::UserDefined)
829   {
830     Uint32 partId= tup.getPartitionId(tab->getFragmentCount());
831     DBG("Setting partition id to " << partId << " out of " <<
832         tab->getFragmentCount());
833     op->setPartitionId(partId);
834   }
835 }
836 
837 static void
setUDpartIdNdbRecord(const Tup & tup,const NdbDictionary::Table * tab,NdbOperation::OperationOptions & opts)838 setUDpartIdNdbRecord(const Tup& tup,
839                      const NdbDictionary::Table* tab,
840                      NdbOperation::OperationOptions& opts)
841 {
842   opts.optionsPresent= 0;
843   if (tab->getFragmentType() == NdbDictionary::Object::UserDefined)
844   {
845     opts.optionsPresent= NdbOperation::OperationOptions::OO_PARTITION_ID;
846     opts.partitionId= tup.getPartitionId(tab->getFragmentCount());
847   }
848 }
849 
850 static void
calcBval(const Bcol & b,Bval & v,bool keepsize)851 calcBval(const Bcol& b, Bval& v, bool keepsize)
852 {
853   if (b.m_nullable && urandom(10) == 0) {
854     v.m_len = 0;
855     delete [] v.m_val;
856     v.m_val = 0;
857     v.m_buf = new char [1];
858   } else {
859     if (keepsize && v.m_val != 0)
860       ;
861     else if (urandom(10) == 0)
862       v.m_len = urandom(b.m_inline);
863     else
864       v.m_len = urandom(b.m_inline + g_opt.m_parts * b.m_partsize + 1);
865     delete [] v.m_val;
866     v.m_val = new char [v.m_len + 1];
867     for (unsigned i = 0; i < v.m_len; i++)
868       v.m_val[i] = 'a' + urandom(26);
869     v.m_val[v.m_len] = 0;
870     v.m_buf = new char [v.m_len];
871   }
872   v.m_buflen = v.m_len;
873   v.trash();
874 }
875 
876 static bool
conHasTimeoutError()877 conHasTimeoutError()
878 {
879   Uint32 code= g_con->getNdbError().code;
880   /* Indicate timeout for cases where LQH too slow responding
881    * (As can happen for disk based tuples with batching or
882    *  lots of parts)
883    */
884   // 296 == Application timeout waiting for SCAN_NEXTREQ from API
885   // 297 == Error code in response to SCAN_NEXTREQ for timed-out scan
886   bool isTimeout= ((code == 274) || // General TC connection timeout
887                    (code == 266));  // TC Scan frag timeout
888   if (!isTimeout)
889     ndbout << "Connection error is not timeout, but is "
890            << code << endl;
891 
892   return isTimeout;
893 }
894 
895 static
conError()896 Uint32 conError()
897 {
898   return g_con->getNdbError().code;
899 }
900 
901 static void
calcBval(Tup & tup,bool keepsize)902 calcBval(Tup& tup, bool keepsize)
903 {
904   calcBval(g_blob1, tup.m_bval1, keepsize);
905   if (! g_opt.m_oneblob)
906     calcBval(g_blob2, tup.m_bval2, keepsize);
907 }
908 
909 // dont remember what the keepsize was for..
910 static void
calcTups(bool keys,bool keepsize=false)911 calcTups(bool keys, bool keepsize = false)
912 {
913   for (uint k = 0; k < g_opt.m_rows; k++) {
914     Tup& tup = g_tups[k];
915     if (keys) {
916       tup.m_pk1 = g_opt.m_pk1off + k;
917       {
918         const Chr& c = g_opt.m_pk2chr;
919         char* const p = tup.m_pk2;
920         char* const q = tup.m_pk2eq;
921         uint len = urandom(c.m_len + 1);
922         uint i = 0;
923         if (! c.m_fixed) {
924           *(uchar*)&p[0] = *(uchar*)&q[0] = len;
925           i++;
926         }
927         uint j = 0;
928         while (j < len) {
929           // mixed case for distribution check
930           if (urandom(3) == 0) {
931             uint u = urandom(26);
932             p[i] = 'A' + u;
933             q[i] = c.m_caseins ? 'a' + u : 'A' + u;
934           } else {
935             uint u = urandom(26);
936             p[i] = 'a' + u;
937             q[i] = c.m_caseins ? 'A' + u : 'a' + u;
938           }
939           i++;
940           j++;
941         }
942         while (j < c.m_bytelen) {
943           if (c.m_fixed)
944             p[i] = q[i] = 0x20;
945           else
946             p[i] = q[i] = '#'; // garbage
947           i++;
948           j++;
949         }
950         assert(i == c.m_totlen);
951         p[i] = q[i] = 0; // convenience
952       }
953       tup.m_pk3 = (Uint16)k;
954     }
955     calcBval(tup, keepsize);
956   }
957 }
958 
setBatchSizes()959 static void setBatchSizes()
960 {
961   if (g_opt.m_rbatch != 0)
962   {
963     Uint32 byteSize = (g_opt.m_rbatch == -1) ?
964       urandom(~Uint32(0)) :
965       g_opt.m_rbatch;
966 
967     DBG("Setting read batch size to " << byteSize
968         << " bytes.");
969     g_con->setMaxPendingBlobReadBytes(byteSize);
970   }
971 
972   if (g_opt.m_wbatch != 0)
973   {
974     Uint32 byteSize = (g_opt.m_wbatch == -1) ?
975       urandom(~Uint32(0)) :
976       g_opt.m_wbatch;
977 
978     DBG("Setting write batch size to " << byteSize
979         << " bytes.");
980     g_con->setMaxPendingBlobWriteBytes(byteSize);
981   }
982 }
983 
984 
985 // blob handle ops
986 // const version for NdbRecord defined operations
987 static int
getBlobHandles(const NdbOperation * opr)988 getBlobHandles(const NdbOperation* opr)
989 {
990   CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
991   if (! g_opt.m_oneblob)
992     CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
993 
994   setBatchSizes();
995   return 0;
996 }
997 
998 // non-const version for NdbRecAttr defined operations
999 // and scans
1000 static int
getBlobHandles(NdbOperation * opr)1001 getBlobHandles(NdbOperation* opr)
1002 {
1003   CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
1004   if (! g_opt.m_oneblob)
1005     CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
1006   setBatchSizes();
1007   return 0;
1008 }
1009 
1010 
1011 static int
getBlobHandles(NdbScanOperation * ops)1012 getBlobHandles(NdbScanOperation* ops)
1013 {
1014   CHK((g_bh1 = ops->getBlobHandle("BL1")) != 0);
1015   if (! g_opt.m_oneblob)
1016     CHK((g_bh2 = ops->getBlobHandle("BL2")) != 0);
1017   setBatchSizes();
1018   return 0;
1019 }
1020 
1021 static int
getBlobLength(NdbBlob * h,unsigned & len)1022 getBlobLength(NdbBlob* h, unsigned& len)
1023 {
1024   Uint64 len2 = (unsigned)-1;
1025   CHK(h->getLength(len2) == 0);
1026   len = (unsigned)len2;
1027   assert(len == len2);
1028   bool isNull;
1029   CHK(h->getNull(isNull) == 0);
1030   DBG("getBlobLength " << h->getColumn()->getName() << " len=" << len << " null=" << isNull);
1031   return 0;
1032 }
1033 
1034 // setValue / getValue
1035 
1036 static int
setBlobValue(NdbBlob * h,const Bval & v,int error_code=0)1037 setBlobValue(NdbBlob* h, const Bval& v, int error_code = 0)
1038 {
1039   bool null = (v.m_val == 0);
1040   bool isNull;
1041   unsigned len;
1042   DBG("setValue " <<  h->getColumn()->getName() << " len=" << v.m_len << " null=" << null << " " << v);
1043   if (null) {
1044     CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
1045     if (error_code)
1046       return 0;
1047     isNull = false;
1048     CHK(h->getNull(isNull) == 0 && isNull == true);
1049     CHK(getBlobLength(h, len) == 0 && len == 0);
1050   } else {
1051     CHK(h->setValue(v.m_val, v.m_len) == 0 || h->getNdbError().code == error_code);
1052     if (error_code)
1053       return 0;
1054     CHK(h->getNull(isNull) == 0 && isNull == false);
1055     CHK(getBlobLength(h, len) == 0 && len == v.m_len);
1056   }
1057   return 0;
1058 }
1059 
1060 static int
setBlobValue(const Tup & tup,int error_code=0)1061 setBlobValue(const Tup& tup, int error_code = 0)
1062 {
1063   CHK(setBlobValue(g_bh1, tup.m_bval1, error_code) == 0);
1064   if (! g_opt.m_oneblob)
1065     CHK(setBlobValue(g_bh2, tup.m_bval2, error_code) == 0);
1066   return 0;
1067 }
1068 
1069 static int
getBlobValue(NdbBlob * h,const Bval & v)1070 getBlobValue(NdbBlob* h, const Bval& v)
1071 {
1072   DBG("getValue " <<  h->getColumn()->getName() << " buflen=" << v.m_buflen);
1073   CHK(h->getValue(v.m_buf, v.m_buflen) == 0);
1074   return 0;
1075 }
1076 
1077 static int
getBlobValue(const Tup & tup)1078 getBlobValue(const Tup& tup)
1079 {
1080   CHK(getBlobValue(g_bh1, tup.m_bval1) == 0);
1081   if (! g_opt.m_oneblob)
1082     CHK(getBlobValue(g_bh2, tup.m_bval2) == 0);
1083   return 0;
1084 }
1085 
1086 /*
1087  * presetBH1
1088  * This method controls how BL1 is pre-set (using setValue()) for
1089  * inserts and writes that later use writeData to set the correct
1090  * value.
1091  * Sometimes it is set to length zero, other times to the value
1092  * for some other row in the dataset.  This tests that the writeData()
1093  * functionality correctly overwrites values written in the
1094  * prepare phase.
1095  */
presetBH1(int rowNumber)1096 static int presetBH1(int rowNumber)
1097 {
1098   unsigned int variant = urandom(2);
1099   DBG("presetBH1 - Variant=" << variant);
1100   if (variant==0)
1101     CHK(g_bh1->setValue("", 0) == 0);
1102   else
1103   {
1104     CHK(setBlobValue(g_tups[(rowNumber+1) % g_opt.m_rows]) == 0); // Pre-set to something else
1105   };
1106   return 0;
1107 }
1108 
1109 static int
verifyBlobValue(NdbBlob * h,const Bval & v)1110 verifyBlobValue(NdbBlob* h, const Bval& v)
1111 {
1112   bool null = (v.m_val == 0);
1113   bool isNull;
1114   unsigned len;
1115   if (null) {
1116     isNull = false;
1117     CHK(h->getNull(isNull) == 0 && isNull == true);
1118     CHK(getBlobLength(h, len) == 0 && len == 0);
1119   } else {
1120     isNull = true;
1121     CHK(h->getNull(isNull) == 0 && isNull == false);
1122     CHK(getBlobLength(h, len) == 0 && len == v.m_len);
1123     for (unsigned i = 0; i < v.m_len; i++)
1124       CHK(v.m_val[i] == v.m_buf[i]);
1125   }
1126   return 0;
1127 }
1128 
1129 static int
verifyBlobValue(const Tup & tup)1130 verifyBlobValue(const Tup& tup)
1131 {
1132   CHK(verifyBlobValue(g_bh1, tup.m_bval1) == 0);
1133   if (! g_opt.m_oneblob)
1134     CHK(verifyBlobValue(g_bh2, tup.m_bval2) == 0);
1135   return 0;
1136 }
1137 
1138 // readData / writeData
1139 
1140 static int
writeBlobData(NdbBlob * h,const Bval & v)1141 writeBlobData(NdbBlob* h, const Bval& v)
1142 {
1143   bool null = (v.m_val == 0);
1144   bool isNull;
1145   unsigned len;
1146   DBG("write " <<  h->getColumn()->getName() << " len=" << v.m_len << " null=" << null << " " << v);
1147   int error_code = v.m_error_code;
1148   if (null) {
1149     CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
1150     if (error_code)
1151       return 0;
1152     isNull = false;
1153     CHK(h->getNull(isNull) == 0 && isNull == true);
1154     CHK(getBlobLength(h, len) == 0 && len == 0);
1155   } else {
1156     CHK(h->truncate(v.m_len) == 0 || h->getNdbError().code == error_code);
1157     if (error_code)
1158       return 0;
1159     CHK(h->setPos(0) == 0); // Reset write pointer in case there was a previous write.
1160     unsigned n = 0;
1161     do {
1162       unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
1163       if (m > v.m_len - n)
1164         m = v.m_len - n;
1165       DBG("write pos=" << n << " cnt=" << m);
1166       CHK(h->writeData(v.m_val + n, m) == 0);
1167       n += m;
1168     } while (n < v.m_len);
1169     assert(n == v.m_len);
1170     isNull = true;
1171     CHK(h->getNull(isNull) == 0 && isNull == false);
1172     CHK(getBlobLength(h, len) == 0 && len == v.m_len);
1173   }
1174   return 0;
1175 }
1176 
1177 static int
writeBlobData(Tup & tup,int error_code=0)1178 writeBlobData(Tup& tup, int error_code = 0)
1179 {
1180   tup.m_bval1.m_error_code = error_code;
1181   CHK(writeBlobData(g_bh1, tup.m_bval1) == 0);
1182   if (! g_opt.m_oneblob) {
1183     tup.m_bval2.m_error_code = error_code;
1184     CHK(writeBlobData(g_bh2, tup.m_bval2) == 0);
1185   }
1186   return 0;
1187 }
1188 
1189 static int
readBlobData(NdbBlob * h,const Bval & v)1190 readBlobData(NdbBlob* h, const Bval& v)
1191 {
1192   bool null = (v.m_val == 0);
1193   bool isNull;
1194   unsigned len;
1195   DBG("read " <<  h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
1196   if (null) {
1197     isNull = false;
1198     CHK(h->getNull(isNull) == 0 && isNull == true);
1199     CHK(getBlobLength(h, len) == 0 && len == 0);
1200   } else {
1201     isNull = true;
1202     CHK(h->getNull(isNull) == 0 && isNull == false);
1203     CHK(getBlobLength(h, len) == 0 && len == v.m_len);
1204     v.trash();
1205     unsigned n = 0;
1206     while (n < v.m_len) {
1207       unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
1208       if (m > v.m_len - n)
1209         m = v.m_len - n;
1210       DBG("read pos=" << n << " cnt=" << m);
1211       const unsigned m2 = m;
1212       CHK(h->readData(v.m_buf + n, m) == 0);
1213       CHK(m2 == m);
1214       n += m;
1215     }
1216     assert(n == v.m_len);
1217     // need to execute to see the data
1218     CHK(g_con->execute(NoCommit) == 0);
1219     for (unsigned i = 0; i < v.m_len; i++)
1220       CHK(v.m_val[i] == v.m_buf[i]);
1221   }
1222   return 0;
1223 }
1224 
1225 static int
readBlobData(const Tup & tup)1226 readBlobData(const Tup& tup)
1227 {
1228   CHK(readBlobData(g_bh1, tup.m_bval1) == 0);
1229   if (! g_opt.m_oneblob)
1230     CHK(readBlobData(g_bh2, tup.m_bval2) == 0);
1231   return 0;
1232 }
1233 
1234 // hooks
1235 
1236 static NdbBlob::ActiveHook blobWriteHook;
1237 
1238 static int
blobWriteHook(NdbBlob * h,void * arg)1239 blobWriteHook(NdbBlob* h, void* arg)
1240 {
1241   DBG("blobWriteHook");
1242   Bval& v = *(Bval*)arg;
1243   CHK(writeBlobData(h, v) == 0);
1244   return 0;
1245 }
1246 
1247 
1248 static int
setBlobWriteHook(NdbBlob * h,Bval & v,int error_code=0)1249 setBlobWriteHook(NdbBlob* h, Bval& v, int error_code = 0)
1250 {
1251   DBG("setBlobWriteHook");
1252   v.m_error_code = error_code;
1253   CHK(h->setActiveHook(blobWriteHook, &v) == 0);
1254   return 0;
1255 }
1256 
1257 static int
setBlobWriteHook(Tup & tup,int error_code=0)1258 setBlobWriteHook(Tup& tup, int error_code = 0)
1259 {
1260   CHK(setBlobWriteHook(g_bh1, tup.m_bval1, error_code) == 0);
1261   if (! g_opt.m_oneblob)
1262     CHK(setBlobWriteHook(g_bh2, tup.m_bval2, error_code) == 0);
1263   return 0;
1264 }
1265 
1266 static NdbBlob::ActiveHook blobReadHook;
1267 
1268 // no PK yet to identify tuple so just read the value
1269 static int
blobReadHook(NdbBlob * h,void * arg)1270 blobReadHook(NdbBlob* h, void* arg)
1271 {
1272   DBG("blobReadHook");
1273   Bval& v = *(Bval*)arg;
1274   unsigned len;
1275   CHK(getBlobLength(h, len) == 0);
1276   v.alloc(len);
1277   Uint32 maxlen = 0xffffffff;
1278   CHK(h->readData(v.m_buf, maxlen) == 0);
1279   DBG("read " << maxlen << " bytes");
1280   CHK(len == maxlen);
1281   return 0;
1282 }
1283 
1284 static int
setBlobReadHook(NdbBlob * h,Bval & v)1285 setBlobReadHook(NdbBlob* h, Bval& v)
1286 {
1287   DBG("setBlobReadHook");
1288   CHK(h->setActiveHook(blobReadHook, &v) == 0);
1289   return 0;
1290 }
1291 
1292 static int
setBlobReadHook(Tup & tup)1293 setBlobReadHook(Tup& tup)
1294 {
1295   CHK(setBlobReadHook(g_bh1, tup.m_bval1) == 0);
1296   if (! g_opt.m_oneblob)
1297     CHK(setBlobReadHook(g_bh2, tup.m_bval2) == 0);
1298   return 0;
1299 }
1300 
1301 static int
tryRowLock(Tup & tup,bool exclusive)1302 tryRowLock(Tup& tup, bool exclusive)
1303 {
1304   NdbTransaction* testTrans;
1305   NdbOperation* testOp;
1306   CHK((testTrans = g_ndb->startTransaction()) != NULL);
1307   CHK((testOp = testTrans->getNdbOperation(g_opt.m_tname)) != 0);
1308   CHK(testOp->readTuple(exclusive?
1309                         NdbOperation::LM_Exclusive:
1310                         NdbOperation::LM_Read) == 0);
1311   CHK(testOp->equal("PK1", tup.m_pk1) == 0);
1312   if (g_opt.m_pk2chr.m_len != 0) {
1313     CHK(testOp->equal("PK2", tup.m_pk2) == 0);
1314     CHK(testOp->equal("PK3", tup.m_pk3) == 0);
1315   }
1316   setUDpartId(tup, testOp);
1317 
1318   if (testTrans->execute(Commit, AbortOnError) == 0)
1319   {
1320     /* Successfully claimed lock */
1321     testTrans->close();
1322     return 0;
1323   }
1324   else
1325   {
1326     if (testTrans->getNdbError().code == 266)
1327     {
1328       /* Error as expected for lock already claimed */
1329       testTrans->close();
1330       return -2;
1331     }
1332     else
1333     {
1334       DBG("Error on tryRowLock, exclusive = " << exclusive
1335           << endl << testTrans->getNdbError() << endl);
1336       testTrans->close();
1337       return -1;
1338     }
1339   }
1340 }
1341 
1342 
1343 static int
verifyRowLocked(Tup & tup)1344 verifyRowLocked(Tup& tup)
1345 {
1346   CHK(tryRowLock(tup, true) == -2);
1347   return 0;
1348 }
1349 
1350 static int
verifyRowNotLocked(Tup & tup)1351 verifyRowNotLocked(Tup& tup)
1352 {
1353   CHK(tryRowLock(tup, true) == 0);
1354   return 0;
1355 }
1356 
1357 // verify blob data
1358 
1359 static int
verifyHeadInline(const Bcol & b,const Bval & v,NdbRecAttr * ra)1360 verifyHeadInline(const Bcol& b, const Bval& v, NdbRecAttr* ra)
1361 {
1362   if (v.m_val == 0) {
1363     CHK(ra->isNULL() == 1);
1364   } else {
1365     CHK(ra->isNULL() == 0);
1366     NdbBlob::Head head;
1367     NdbBlob::unpackBlobHead(head, ra->aRef(), b.m_version);
1368     CHK(head.length == v.m_len);
1369     const char* data = ra->aRef() + head.headsize;
1370     for (unsigned i = 0; i < head.length && i < b.m_inline; i++)
1371       CHK(data[i] == v.m_val[i]);
1372   }
1373   return 0;
1374 }
1375 
1376 static int
verifyHeadInline(Tup & tup)1377 verifyHeadInline(Tup& tup)
1378 {
1379   DBG("verifyHeadInline pk1=" << hex << tup.m_pk1);
1380   CHK((g_con = g_ndb->startTransaction()) != 0);
1381   CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
1382   CHK(g_opr->readTuple() == 0);
1383   CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
1384   if (g_opt.m_pk2chr.m_len != 0) {
1385     CHK(g_opr->equal("PK2", tup.pk2()) == 0);
1386     CHK(g_opr->equal("PK3", (char*)&tup.m_pk3) == 0);
1387   }
1388   setUDpartId(tup, g_opr);
1389   NdbRecAttr* ra1;
1390   NdbRecAttr* ra2;
1391   NdbRecAttr* ra_frag;
1392   CHK((ra1 = g_opr->getValue("BL1")) != 0);
1393   if (! g_opt.m_oneblob)
1394     CHK((ra2 = g_opr->getValue("BL2")) != 0);
1395   CHK((ra_frag = g_opr->getValue(NdbDictionary::Column::FRAGMENT)) != 0);
1396   if (tup.m_exists) {
1397     CHK(g_con->execute(Commit, AbortOnError) == 0);
1398     tup.m_frag = ra_frag->u_32_value();
1399     DBG("fragment id: " << tup.m_frag);
1400     DBG("verifyHeadInline BL1");
1401     CHK(verifyHeadInline(g_blob1, tup.m_bval1, ra1) == 0);
1402     if (! g_opt.m_oneblob) {
1403       DBG("verifyHeadInline BL2");
1404       CHK(verifyHeadInline(g_blob2, tup.m_bval2, ra2) == 0);
1405     }
1406   } else {
1407     CHK(g_con->execute(Commit, AbortOnError) == -1 &&
1408 	g_con->getNdbError().code == 626);
1409   }
1410   g_ndb->closeTransaction(g_con);
1411   g_opr = 0;
1412   g_con = 0;
1413   return 0;
1414 }
1415 
1416 static unsigned
getvarsize(const char * buf)1417 getvarsize(const char* buf)
1418 {
1419   const unsigned char* p = (const unsigned char*)buf;
1420   return p[0] + (p[1] << 8);
1421 }
1422 
1423 static int
verifyBlobTable(const Bval & v,Uint32 pk1,Uint32 frag,bool exists)1424 verifyBlobTable(const Bval& v, Uint32 pk1, Uint32 frag, bool exists)
1425 {
1426   const Bcol& b = v.m_bcol;
1427   DBG("verify " << b.m_btname << " pk1=" << hex << pk1);
1428   NdbRecAttr* ra_pk = 0; // V1
1429   NdbRecAttr* ra_pk1 = 0; // V2
1430   NdbRecAttr* ra_pk2 = 0; // V2
1431   NdbRecAttr* ra_pk3 = 0; // V2
1432   NdbRecAttr* ra_part = 0;
1433   NdbRecAttr* ra_data = 0;
1434   NdbRecAttr* ra_frag = 0;
1435   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
1436   enum OpState opState;
1437 
1438   do
1439   {
1440     opState= Normal;
1441     CHK((g_con = g_ndb->startTransaction()) != 0);
1442     CHK((g_ops = g_con->getNdbScanOperation(b.m_btname)) != 0);
1443     CHK(g_ops->readTuples(NdbScanOperation::LM_Read,
1444                           g_scanFlags,
1445                           g_batchSize,
1446                           g_parallel) == 0);
1447     if (b.m_version == 1) {
1448       CHK((ra_pk = g_ops->getValue("PK")) != 0);
1449       CHK((ra_part = g_ops->getValue("PART")) != 0);
1450       CHK((ra_data = g_ops->getValue("DATA")) != 0);
1451     } else {
1452       CHK((ra_pk1 = g_ops->getValue("PK1")) != 0);
1453       if (g_opt.m_pk2chr.m_len != 0) {
1454         CHK((ra_pk2 = g_ops->getValue("PK2")) != 0);
1455         CHK((ra_pk3 = g_ops->getValue("PK3")) != 0);
1456       }
1457       CHK((ra_part = g_ops->getValue("NDB$PART")) != 0);
1458       CHK((ra_data = g_ops->getValue("NDB$DATA")) != 0);
1459     }
1460 
1461     /* No partition id set on Blob part table scan so that we
1462      * find any misplaced parts in other partitions
1463      */
1464 
1465     CHK((ra_frag = g_ops->getValue(NdbDictionary::Column::FRAGMENT)) != 0);
1466     CHK(g_con->execute(NoCommit) == 0);
1467     unsigned partcount;
1468     if (! exists || v.m_len <= b.m_inline)
1469       partcount = 0;
1470     else
1471       partcount = (v.m_len - b.m_inline + b.m_partsize - 1) / b.m_partsize;
1472     char* seen = new char [partcount];
1473     memset(seen, 0, partcount);
1474     while (1) {
1475       int ret= g_ops->nextResult();
1476       if (ret == -1)
1477       {
1478         /* Timeout? */
1479         CHK(conHasTimeoutError());
1480 
1481         /* Break out and restart scan unless we've
1482          * run out of attempts
1483          */
1484         DISP("Parts table scan failed due to timeout("
1485              << conError() <<").  Retries left : "
1486              << opTimeoutRetries -1);
1487         CHK(--opTimeoutRetries);
1488 
1489         opState= Retrying;
1490         sleep(1);
1491         break;
1492       }
1493       CHK(opState == Normal);
1494       CHK((ret == 0) || (ret == 1));
1495       if (ret == 1)
1496         break;
1497       if (b.m_version == 1) {
1498         if (pk1 != ra_pk->u_32_value())
1499           continue;
1500       } else {
1501         if (pk1 != ra_pk1->u_32_value())
1502           continue;
1503       }
1504       Uint32 part = ra_part->u_32_value();
1505       Uint32 frag2 = ra_frag->u_32_value();
1506       DBG("part " << part << " of " << partcount << " from fragment " << frag2);
1507       CHK(part < partcount && ! seen[part]);
1508       seen[part] = 1;
1509       unsigned n = b.m_inline + part * b.m_partsize;
1510       assert(exists && v.m_val != 0 && n < v.m_len);
1511       unsigned m = v.m_len - n;
1512       if (m > b.m_partsize)
1513         m = b.m_partsize;
1514       const char* data = ra_data->aRef();
1515       if (b.m_version == 1)
1516         ;
1517       else {
1518         // Blob v2 stored on disk is currently fixed
1519         // size, so we skip these tests.
1520         if (!g_usingDisk)
1521         {
1522           unsigned sz = getvarsize(data);
1523           DBG("varsize " << sz);
1524           DBG("b.m_partsize " << b.m_partsize);
1525           CHK(sz <= b.m_partsize);
1526           data += 2;
1527           if (part + 1 < partcount)
1528             CHK(sz == b.m_partsize);
1529           else
1530             CHK(sz == m);
1531         }
1532       }
1533       CHK(memcmp(data, v.m_val + n, m) == 0);
1534       if (b.m_version == 1 ||
1535           g_usingDisk ) { // Blob v2 stored on disk is currently
1536         // fixed size, so we do these tests.
1537         char fillchr;
1538         if (b.m_type == NdbDictionary::Column::Text)
1539           fillchr = 0x20;
1540         else
1541           fillchr = 0x0;
1542         uint i = m;
1543         while (i < b.m_partsize) {
1544           CHK(data[i] == fillchr);
1545           i++;
1546         }
1547       }
1548       DBG("frags main=" << frag << " blob=" << frag2 << " stripe=" << b.m_stripe);
1549       if (b.m_stripe == 0)
1550         CHK(frag == frag2);
1551     }
1552 
1553     if (opState == Normal)
1554     {
1555       for (unsigned i = 0; i < partcount; i++)
1556         CHK(seen[i] == 1);
1557     }
1558     delete [] seen;
1559     g_ops->close();
1560     g_ndb->closeTransaction(g_con);
1561   } while (opState == Retrying);
1562 
1563   g_ops = 0;
1564   g_con = 0;
1565   return 0;
1566 }
1567 
1568 static int
verifyBlobTable(const Tup & tup)1569 verifyBlobTable(const Tup& tup)
1570 {
1571   CHK(verifyBlobTable(tup.m_bval1, tup.m_pk1, tup.m_frag, tup.m_exists) == 0);
1572   if (! g_opt.m_oneblob)
1573     CHK(verifyBlobTable(tup.m_bval2, tup.m_pk1, tup.m_frag, tup.m_exists) == 0);
1574   return 0;
1575 }
1576 
1577 static int
verifyBlob()1578 verifyBlob()
1579 {
1580   for (unsigned k = 0; k < g_opt.m_rows; k++) {
1581     Tup& tup = g_tups[k];
1582     DBG("verifyBlob pk1=" << hex << tup.m_pk1);
1583     CHK(verifyHeadInline(tup) == 0);
1584     CHK(verifyBlobTable(tup) == 0);
1585   }
1586   return 0;
1587 }
1588 
1589 static int
rowIsLocked(Tup & tup)1590 rowIsLocked(Tup& tup)
1591 {
1592   NdbTransaction* testTrans;
1593   CHK((testTrans = g_ndb->startTransaction()) != 0);
1594 
1595   NdbOperation* testOp;
1596   CHK((testOp = testTrans->getNdbOperation(g_opt.m_tname)) != 0);
1597 
1598   CHK(testOp->readTuple(NdbOperation::LM_Exclusive) == 0);
1599   CHK(testOp->equal("PK1", tup.m_pk1) == 0);
1600   if (g_opt.m_pk2chr.m_len != 0)
1601   {
1602     CHK(testOp->equal("PK2", tup.m_pk2) == 0);
1603     CHK(testOp->equal("PK3", tup.m_pk3) == 0);
1604   }
1605   setUDpartId(tup, testOp);
1606   CHK(testOp->getValue("PK1") != 0);
1607 
1608   CHK(testTrans->execute(Commit) == -1);
1609   CHK(testTrans->getNdbError().code == 266);
1610 
1611   testTrans->close();
1612 
1613   return 0;
1614 }
1615 
1616 // operations
1617 
1618 // pk ops
1619 
1620 static int
insertPk(int style,int api)1621 insertPk(int style, int api)
1622 {
1623   DBG("--- insertPk " << stylename[style] << " " << apiName[api] << " ---");
1624   unsigned n = 0;
1625   unsigned k = 0;
1626   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
1627   enum OpState opState;
1628 
1629   do
1630   {
1631     opState= Normal;
1632     CHK((g_con = g_ndb->startTransaction()) != 0);
1633     for (; k < g_opt.m_rows; k++) {
1634       Tup& tup = g_tups[k];
1635       DBG("insertPk pk1=" << hex << tup.m_pk1);
1636       if (api == API_RECATTR)
1637       {
1638         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
1639         CHK(g_opr->insertTuple() ==0);
1640         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
1641         if (g_opt.m_pk2chr.m_len != 0)
1642         {
1643           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
1644           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
1645         }
1646         setUDpartId(tup, g_opr);
1647         CHK(getBlobHandles(g_opr) == 0);
1648       }
1649       else
1650       {
1651         memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
1652         if (g_opt.m_pk2chr.m_len != 0) {
1653           memcpy(&tup.m_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
1654           memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
1655         }
1656         NdbOperation::OperationOptions opts;
1657         setUDpartIdNdbRecord(tup,
1658                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
1659                              opts);
1660         CHK((g_const_opr = g_con->insertTuple(g_full_record,
1661                                               tup.m_row,
1662                                               NULL,
1663                                               &opts,
1664                                               sizeof(opts))) != 0);
1665         CHK(getBlobHandles(g_const_opr) == 0);
1666       }
1667       bool timeout= false;
1668       if (style == 0) {
1669         CHK(setBlobValue(tup) == 0);
1670       } else if (style == 1) {
1671         CHK(presetBH1(k) == 0);
1672         CHK(setBlobWriteHook(tup) == 0);
1673       } else {
1674         CHK(presetBH1(k) == 0);
1675         CHK(g_con->execute(NoCommit) == 0);
1676         if (writeBlobData(tup) == -1)
1677           CHK((timeout= conHasTimeoutError()) == true);
1678       }
1679 
1680       if (!timeout &&
1681           (++n == g_opt.m_batch)) {
1682         if (g_con->execute(Commit) == 0)
1683         {
1684           g_ndb->closeTransaction(g_con);
1685           CHK((g_con = g_ndb->startTransaction()) != 0);
1686           n = 0;
1687         }
1688         else
1689         {
1690           CHK((timeout = conHasTimeoutError()) == true);
1691           n-= 1;
1692         }
1693       }
1694 
1695       if (timeout)
1696       {
1697         /* Timeout */
1698         DISP("Insert failed due to timeout("
1699              << conError() <<")  "
1700              << " Operations lost : " << n - 1
1701              << " Retries left : "
1702              << opTimeoutRetries -1);
1703         CHK(--opTimeoutRetries);
1704 
1705         k = k - n;
1706         n = 0;
1707         opState= Retrying;
1708         sleep(1);
1709         break;
1710       }
1711 
1712       g_const_opr = 0;
1713       g_opr = 0;
1714       tup.m_exists = true;
1715     }
1716     if (opState == Normal)
1717     {
1718       if (n != 0) {
1719         CHK(g_con->execute(Commit) == 0);
1720         n = 0;
1721       }
1722     }
1723     g_ndb->closeTransaction(g_con);
1724   } while (opState == Retrying);
1725   g_con = 0;
1726   return 0;
1727 }
1728 
1729 static int
readPk(int style,int api)1730 readPk(int style, int api)
1731 {
1732   DBG("--- readPk " << stylename[style] <<" " << apiName[api] << " ---");
1733   for (unsigned k = 0; k < g_opt.m_rows; k++) {
1734     Tup& tup = g_tups[k];
1735     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
1736     OpState opState;
1737 
1738     do
1739     {
1740       opState= Normal;
1741       DBG("readPk pk1=" << hex << tup.m_pk1);
1742       CHK((g_con = g_ndb->startTransaction()) != 0);
1743       NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead;
1744       switch(urandom(3))
1745       {
1746       case 0:
1747         lm = NdbOperation::LM_Read;
1748         break;
1749       case 1:
1750         lm = NdbOperation::LM_SimpleRead;
1751         break;
1752       default:
1753         break;
1754       }
1755       if (api == API_RECATTR)
1756       {
1757         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
1758         CHK(g_opr->readTuple(lm) == 0);
1759         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
1760         if (g_opt.m_pk2chr.m_len != 0)
1761         {
1762           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
1763           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
1764         }
1765         setUDpartId(tup, g_opr);
1766         CHK(getBlobHandles(g_opr) == 0);
1767       }
1768       else
1769       { // NdbRecord
1770         memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
1771         if (g_opt.m_pk2chr.m_len != 0) {
1772           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
1773           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
1774         }
1775         NdbOperation::OperationOptions opts;
1776         setUDpartIdNdbRecord(tup,
1777                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
1778                              opts);
1779         CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
1780                                             g_blob_record, tup.m_row,
1781                                             lm,
1782                                             NULL,
1783                                             &opts,
1784                                             sizeof(opts))) != 0);
1785 
1786         CHK(getBlobHandles(g_const_opr) == 0);
1787       }
1788       bool timeout= false;
1789       if (style == 0) {
1790         CHK(getBlobValue(tup) == 0);
1791       } else if (style == 1) {
1792         CHK(setBlobReadHook(tup) == 0);
1793       } else {
1794         CHK(g_con->execute(NoCommit) == 0);
1795         if (readBlobData(tup) == -1)
1796           CHK((timeout= conHasTimeoutError()) == true);
1797       }
1798       if (!timeout)
1799       {
1800         if (urandom(200) == 0)
1801         {
1802           if (g_con->execute(NoCommit) == 0)
1803           {
1804             /* Verify row is locked */
1805             //ndbout << "Checking row is locked for lm "
1806             //       << lm << endl;
1807             CHK(rowIsLocked(tup) == 0);
1808             CHK(g_con->execute(Commit) == 0);
1809           }
1810           else
1811           {
1812             CHK((timeout= conHasTimeoutError()) == true);
1813           }
1814         }
1815         else
1816         {
1817           if (g_con->execute(Commit) != 0)
1818           {
1819             CHK((timeout= conHasTimeoutError()) == true);
1820           }
1821         }
1822       }
1823       if (timeout)
1824       {
1825         DISP("ReadPk failed due to timeout("
1826              << conError() <<")  Retries left : "
1827              << opTimeoutRetries -1);
1828         CHK(--opTimeoutRetries);
1829         opState= Retrying;
1830         sleep(1);
1831       }
1832       else
1833       {
1834         // verify lock mode upgrade
1835         CHK((g_opr?g_opr:g_const_opr)->getLockMode() == NdbOperation::LM_Read);
1836 
1837         if (style == 0 || style == 1) {
1838           CHK(verifyBlobValue(tup) == 0);
1839         }
1840       }
1841       g_ndb->closeTransaction(g_con);
1842     } while (opState == Retrying);
1843     g_opr = 0;
1844     g_const_opr = 0;
1845     g_con = 0;
1846   }
1847   return 0;
1848 }
1849 
1850 static int
readLockPk(int style,int api)1851 readLockPk(int style, int api)
1852 {
1853   DBG("--- readLockPk " << stylename[style] <<" " << apiName[api] << " ---");
1854   for (unsigned k = 0; k < g_opt.m_rows; k++) {
1855     Tup& tup = g_tups[k];
1856     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
1857     OpState opState;
1858 
1859     do
1860     {
1861       opState= Normal;
1862       DBG("readLockPk pk1=" << hex << tup.m_pk1);
1863       CHK((g_con = g_ndb->startTransaction()) != 0);
1864       NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead;
1865       switch(urandom(4))
1866       {
1867       case 0:
1868         lm = NdbOperation::LM_Exclusive;
1869         break;
1870       case 1:
1871         lm = NdbOperation::LM_Read;
1872         break;
1873       case 2:
1874         lm = NdbOperation::LM_SimpleRead;
1875       default:
1876         break;
1877       }
1878 
1879       bool manualUnlock = ( (lm == NdbOperation::LM_Read) ||
1880                             (lm == NdbOperation::LM_Exclusive));
1881 
1882       if (api == API_RECATTR)
1883       {
1884         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
1885         CHK(g_opr->readTuple(lm) == 0);
1886 
1887         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
1888         if (g_opt.m_pk2chr.m_len != 0)
1889         {
1890           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
1891           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
1892         }
1893         setUDpartId(tup, g_opr);
1894         CHK(getBlobHandles(g_opr) == 0);
1895         if (manualUnlock)
1896         {
1897           CHK(g_opr->getLockHandle() != NULL);
1898         }
1899       }
1900       else
1901       { // NdbRecord
1902         memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
1903         if (g_opt.m_pk2chr.m_len != 0) {
1904           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
1905           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
1906         }
1907         NdbOperation::OperationOptions opts;
1908         setUDpartIdNdbRecord(tup,
1909                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
1910                              opts);
1911         if (manualUnlock)
1912         {
1913           opts.optionsPresent |= NdbOperation::OperationOptions::OO_LOCKHANDLE;
1914         }
1915         CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
1916                                             g_blob_record, tup.m_row,
1917                                             lm,
1918                                             NULL,
1919                                             &opts,
1920                                             sizeof(opts))) != 0);
1921         CHK(getBlobHandles(g_const_opr) == 0);
1922       }
1923       bool timeout= false;
1924       if (style == 0) {
1925         CHK(getBlobValue(tup) == 0);
1926       } else if (style == 1) {
1927         CHK(setBlobReadHook(tup) == 0);
1928       } else {
1929         CHK(g_con->execute(NoCommit) == 0);
1930         if (readBlobData(tup) == -1)
1931           CHK((timeout= conHasTimeoutError()) == true);
1932       }
1933       if (!timeout)
1934       {
1935         if (g_con->execute(NoCommit) == 0)
1936         {
1937           /* Ok, read executed ok, now
1938            * - Verify the Blob data
1939            * - Verify the row is locked
1940            * - Close the Blob handles
1941            * - Attempt to unlock
1942            */
1943           NdbOperation::LockMode lmused = (g_opr?g_opr:g_const_opr)->getLockMode();
1944           CHK((lmused == NdbOperation::LM_Read) ||
1945               (lmused == NdbOperation::LM_Exclusive));
1946 
1947           if (style == 0 || style == 1) {
1948             CHK(verifyBlobValue(tup) == 0);
1949           }
1950 
1951           /* Occasionally check that we are locked */
1952           if (urandom(200) == 0)
1953             CHK(verifyRowLocked(tup) == 0);
1954 
1955           /* Close Blob handles */
1956           CHK(g_bh1->close() == 0);
1957           CHK(g_bh1->getState() == NdbBlob::Closed);
1958           if (! g_opt.m_oneblob)
1959           {
1960             CHK(g_bh2->close() == 0);
1961             CHK(g_bh2->getState() == NdbBlob::Closed);
1962           }
1963 
1964           /* Check Blob handle is closed */
1965           char byte;
1966           Uint32 len = 1;
1967           CHK(g_bh1->readData(&byte, len) != 0);
1968           CHK(g_bh1->getNdbError().code == 4265);
1969           CHK(g_bh1->close() != 0);
1970           CHK(g_bh1->getNdbError().code == 4554);
1971           if(! g_opt.m_oneblob)
1972           {
1973             CHK(g_bh2->readData(&byte, len) != 0);
1974             CHK(g_bh2->getNdbError().code == 4265);
1975             CHK(g_bh2->close() != 0);
1976             CHK(g_bh2->getNdbError().code == 4554);
1977           }
1978 
1979 
1980           if (manualUnlock)
1981           {
1982             /* All Blob handles closed, now we can issue an
1983              * unlock operation and the main row should be
1984              * unlocked
1985              */
1986             const NdbOperation* readOp = (g_opr?g_opr:g_const_opr);
1987             const NdbLockHandle* lh = readOp->getLockHandle();
1988             CHK(lh != NULL);
1989             const NdbOperation* unlockOp = g_con->unlock(lh);
1990             CHK(unlockOp != NULL);
1991           }
1992 
1993           /* All Blob handles closed - manual or automatic
1994            * unlock op has been enqueued.  Now execute and
1995            * check that the row is unlocked.
1996            */
1997           CHK(g_con->execute(NoCommit) == 0);
1998           CHK(verifyRowNotLocked(tup) == 0);
1999 
2000           if (g_con->execute(Commit) != 0)
2001           {
2002             CHK((timeout= conHasTimeoutError()) == true);
2003           }
2004         }
2005         else
2006         {
2007           CHK((timeout= conHasTimeoutError()) == true);
2008         }
2009       }
2010       if (timeout)
2011       {
2012         DISP("ReadLockPk failed due to timeout on read("
2013              << conError() <<")  Retries left : "
2014              << opTimeoutRetries -1);
2015         CHK(--opTimeoutRetries);
2016         opState= Retrying;
2017         sleep(1);
2018       }
2019 
2020       g_ndb->closeTransaction(g_con);
2021     } while (opState == Retrying);
2022     g_opr = 0;
2023     g_const_opr = 0;
2024     g_con = 0;
2025   }
2026   return 0;
2027 }
2028 
2029 static int
updatePk(int style,int api)2030 updatePk(int style, int api)
2031 {
2032   DBG("--- updatePk " << stylename[style] << " " << apiName[api] << " ---");
2033   for (unsigned k = 0; k < g_opt.m_rows; k++) {
2034     Tup& tup = g_tups[k];
2035     DBG("updatePk pk1=" << hex << tup.m_pk1);
2036     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2037     OpState opState;
2038 
2039     do
2040     {
2041       opState= Normal;
2042       int mode = urandom(3);
2043       int error_code = mode == 0 ? 0 : 4275;
2044       CHK((g_con = g_ndb->startTransaction()) != 0);
2045       if (api == API_RECATTR)
2046       {
2047         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
2048         if (mode == 0) {
2049           DBG("using updateTuple");
2050           CHK(g_opr->updateTuple() == 0);
2051         } else if (mode == 1) {
2052           DBG("using readTuple exclusive");
2053           CHK(g_opr->readTuple(NdbOperation::LM_Exclusive) == 0);
2054         } else {
2055           DBG("using readTuple - will fail and retry");
2056           CHK(g_opr->readTuple() == 0);
2057         }
2058         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
2059         if (g_opt.m_pk2chr.m_len != 0)
2060         {
2061           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
2062           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
2063         }
2064         setUDpartId(tup, g_opr);
2065         CHK(getBlobHandles(g_opr) == 0);
2066       }
2067       else
2068       {
2069         memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
2070         if (g_opt.m_pk2chr.m_len != 0) {
2071           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2072           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2073         }
2074         NdbOperation::OperationOptions opts;
2075         setUDpartIdNdbRecord(tup,
2076                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
2077                              opts);
2078         if (mode == 0) {
2079           DBG("using updateTuple");
2080           CHK((g_const_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
2081                                                g_blob_record, tup.m_row,
2082                                                NULL, &opts, sizeof(opts))) != 0);
2083         } else if (mode == 1) {
2084           DBG("using readTuple exclusive");
2085           CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
2086                                              g_blob_record, tup.m_row,
2087                                              NdbOperation::LM_Exclusive,
2088                                              NULL, &opts, sizeof(opts))) != 0);
2089         } else {
2090           DBG("using readTuple - will fail and retry");
2091           CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
2092                                              g_blob_record, tup.m_row,
2093                                              NdbOperation::LM_Read,
2094                                              NULL, &opts, sizeof(opts))) != 0);
2095         }
2096         CHK(getBlobHandles(g_const_opr) == 0);
2097       }
2098 
2099       bool timeout= false;
2100       if (style == 0) {
2101         CHK(setBlobValue(tup, error_code) == 0);
2102       } else if (style == 1) {
2103         CHK(setBlobWriteHook(tup, error_code) == 0);
2104       } else {
2105         CHK(g_con->execute(NoCommit) == 0);
2106         if (writeBlobData(tup, error_code) != 0)
2107           CHK((timeout= conHasTimeoutError()) == true);
2108       }
2109       if (!timeout &&
2110           (error_code == 0)) {
2111         /* Normal success case, try execute commit */
2112         if (g_con->execute(Commit) != 0)
2113           CHK((timeout= conHasTimeoutError()) == true);
2114         else
2115         {
2116           g_ndb->closeTransaction(g_con);
2117           break;
2118         }
2119       }
2120       if (timeout)
2121       {
2122         DISP("UpdatePk failed due to timeout("
2123              << conError() <<")  Retries left : "
2124              << opTimeoutRetries -1);
2125         CHK(--opTimeoutRetries);
2126 
2127         opState= Retrying;
2128         sleep(1);
2129       }
2130       if (error_code)
2131         opState= Retrying;
2132 
2133       g_ndb->closeTransaction(g_con);
2134     } while (opState == Retrying);
2135     g_const_opr = 0;
2136     g_opr = 0;
2137     g_con = 0;
2138     tup.m_exists = true;
2139   }
2140   return 0;
2141 }
2142 
2143 static int
writePk(int style,int api)2144 writePk(int style, int api)
2145 {
2146   DBG("--- writePk " << stylename[style] << " " << apiName[api] << " ---");
2147   for (unsigned k = 0; k < g_opt.m_rows; k++) {
2148     Tup& tup = g_tups[k];
2149     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2150     enum OpState opState;
2151 
2152     do
2153     {
2154       opState= Normal;
2155       DBG("writePk pk1=" << hex << tup.m_pk1);
2156       CHK((g_con = g_ndb->startTransaction()) != 0);
2157       if (api == API_RECATTR)
2158       {
2159         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
2160         CHK(g_opr->writeTuple() == 0);
2161         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
2162         if (g_opt.m_pk2chr.m_len != 0)
2163         {
2164           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
2165           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
2166         }
2167         setUDpartId(tup, g_opr);
2168         CHK(getBlobHandles(g_opr) == 0);
2169       }
2170       else
2171       {
2172         memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
2173         memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
2174         if (g_opt.m_pk2chr.m_len != 0) {
2175           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2176           memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2177           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2178           memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2179         }
2180         NdbOperation::OperationOptions opts;
2181         setUDpartIdNdbRecord(tup,
2182                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
2183                              opts);
2184         CHK((g_const_opr= g_con->writeTuple(g_key_record, tup.m_key_row,
2185                                             g_full_record, tup.m_row,
2186                                             NULL, &opts, sizeof(opts))) != 0);
2187         CHK(getBlobHandles(g_const_opr) == 0);
2188       }
2189       bool timeout= false;
2190       if (style == 0) {
2191         CHK(setBlobValue(tup) == 0);
2192       } else if (style == 1) {
2193         CHK(presetBH1(k) == 0);
2194         CHK(setBlobWriteHook(tup) == 0);
2195       } else {
2196         CHK(presetBH1(k) == 0);
2197         CHK(g_con->execute(NoCommit) == 0);
2198         if (writeBlobData(tup) != 0)
2199           CHK((timeout= conHasTimeoutError()) == true);
2200       }
2201 
2202       if (!timeout)
2203       {
2204         if (g_con->execute(Commit) != 0)
2205           CHK((timeout= conHasTimeoutError()) == true);
2206       }
2207       if (timeout)
2208       {
2209         DISP("WritePk failed due to timeout("
2210              << conError() <<")  Retries left : "
2211              << opTimeoutRetries -1);
2212         CHK(--opTimeoutRetries);
2213 
2214         opState= Retrying;
2215         sleep(1);
2216       }
2217       g_ndb->closeTransaction(g_con);
2218     } while (opState == Retrying);
2219 
2220     g_const_opr = 0;
2221     g_opr = 0;
2222     g_con = 0;
2223     tup.m_exists = true;
2224   }
2225   return 0;
2226 }
2227 
2228 static int
deletePk(int api)2229 deletePk(int api)
2230 {
2231   DBG("--- deletePk " << apiName[api] << " ---");
2232   unsigned n = 0;
2233   unsigned k = 0;
2234   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2235   enum OpState opState;
2236 
2237   do
2238   {
2239     opState= Normal;
2240     CHK((g_con = g_ndb->startTransaction()) != 0);
2241     for (; k < g_opt.m_rows; k++) {
2242       Tup& tup = g_tups[k];
2243       DBG("deletePk pk1=" << hex << tup.m_pk1);
2244       if (api == API_RECATTR)
2245       {
2246         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
2247         CHK(g_opr->deleteTuple() == 0);
2248         /* Must set explicit partitionId before equal() calls as that's
2249          * where implicit Blob handles are created which need the
2250          * partitioning info
2251          */
2252         setUDpartId(tup, g_opr);
2253         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
2254         if (g_opt.m_pk2chr.m_len != 0)
2255         {
2256           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
2257           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
2258         }
2259       }
2260       else
2261       {
2262         memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
2263         if (g_opt.m_pk2chr.m_len != 0) {
2264           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2265           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2266         }
2267         NdbOperation::OperationOptions opts;
2268         setUDpartIdNdbRecord(tup,
2269                              g_ndb->getDictionary()->getTable(g_opt.m_tname),
2270                              opts);
2271         CHK((g_const_opr= g_con->deleteTuple(g_key_record, tup.m_key_row,
2272                                              g_full_record, NULL,
2273                                              NULL, &opts, sizeof(opts))) != 0);
2274       }
2275       if (++n == g_opt.m_batch) {
2276         if (g_con->execute(Commit) != 0)
2277         {
2278           CHK(conHasTimeoutError());
2279           DISP("DeletePk failed due to timeout("
2280                << conError() <<")  Retries left : "
2281                << opTimeoutRetries -1);
2282           CHK(--opTimeoutRetries);
2283 
2284           opState= Retrying;
2285           k= k - (n-1);
2286           n= 0;
2287           sleep(1);
2288           break; // Out of for
2289         }
2290 
2291         g_ndb->closeTransaction(g_con);
2292         CHK((g_con = g_ndb->startTransaction()) != 0);
2293         n = 0;
2294       }
2295       g_const_opr = 0;
2296       g_opr = 0;
2297       tup.m_exists = false;
2298     } // for(
2299     if (opState == Normal)
2300     {
2301       if (n != 0) {
2302         if (g_con->execute(Commit) != 0)
2303         {
2304           CHK(conHasTimeoutError());
2305           DISP("DeletePk failed on last batch ("
2306                << conError() <<")  Retries left : "
2307                << opTimeoutRetries -1);
2308           CHK(--opTimeoutRetries);
2309           sleep(1);
2310           opState= Retrying;
2311           k= k- (n-1);
2312         }
2313         n = 0;
2314       }
2315     }
2316     g_ndb->closeTransaction(g_con);
2317     g_con = 0;
2318   } while (opState == Retrying);
2319 
2320   return 0;
2321 }
2322 
2323 static int
deleteNoPk()2324 deleteNoPk()
2325 {
2326   DBG("--- deleteNoPk ---");
2327   Tup no_tup; // bug#24028
2328   no_tup.m_pk1 = 0xb1ff;
2329   const Chr& pk2chr = g_opt.m_pk2chr;
2330   if (pk2chr.m_len != 0) {
2331     char* const p = no_tup.m_pk2;
2332     uint len = urandom(pk2chr.m_len + 1);
2333     uint i = 0;
2334     if (! pk2chr.m_fixed) {
2335       *(uchar*)&p[0] = len;
2336       i++;
2337     }
2338     uint j = 0;
2339     while (j < len) {
2340       p[i] = "b1ff"[j % 4];
2341       i++;
2342       j++;
2343     }
2344   }
2345   no_tup.m_pk3 = 0xb1ff;
2346   CHK((g_con = g_ndb->startTransaction()) != 0);
2347   Tup& tup =  no_tup;
2348   DBG("deletePk pk1=" << hex << tup.m_pk1);
2349   CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
2350   CHK(g_opr->deleteTuple() == 0);
2351   setUDpartId(tup, g_opr);
2352   CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
2353   if (pk2chr.m_len != 0) {
2354     CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
2355     CHK(g_opr->equal("PK3", (char*)&tup.m_pk2) == 0);
2356   }
2357   CHK(g_con->execute(Commit) == -1); // fail
2358   // BUG: error should be on op but is on con now
2359   DBG("con: " << g_con->getNdbError());
2360   DBG("opr: " << g_opr->getNdbError());
2361   CHK(g_con->getNdbError().code == 626 || g_opr->getNdbError().code == 626);
2362   g_ndb->closeTransaction(g_con);
2363   g_opr = 0;
2364   g_con = 0;
2365   return 0;
2366 }
2367 
2368 // hash index ops
2369 
2370 static int
readIdx(int style,int api)2371 readIdx(int style, int api)
2372 {
2373   DBG("--- readIdx " << stylename[style] << " " << apiName[api] << " ---");
2374   for (unsigned k = 0; k < g_opt.m_rows; k++) {
2375     Tup& tup = g_tups[k];
2376     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2377     enum OpState opState;
2378 
2379     do
2380     {
2381       opState= Normal;
2382       DBG("readIdx pk1=" << hex << tup.m_pk1);
2383       CHK((g_con = g_ndb->startTransaction()) != 0);
2384       NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead;
2385       switch(urandom(3))
2386       {
2387       case 0:
2388         lm = NdbOperation::LM_Read;
2389         break;
2390       case 1:
2391         lm = NdbOperation::LM_SimpleRead;
2392         break;
2393       default:
2394         break;
2395       }
2396       if (api == API_RECATTR)
2397       {
2398         CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
2399         CHK(g_opx->readTuple(lm) == 0);
2400         CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
2401         CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
2402         /* No need to set partition Id for unique indexes */
2403         CHK(getBlobHandles(g_opx) == 0);
2404       }
2405       else
2406       {
2407         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2408         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2409         /* No need to set partition Id for unique indexes */
2410         CHK((g_const_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
2411                                            g_blob_record, tup.m_row,
2412                                            lm)) != 0);
2413         CHK(getBlobHandles(g_const_opr) == 0);
2414       }
2415 
2416       bool timeout= false;
2417       if (style == 0) {
2418         CHK(getBlobValue(tup) == 0);
2419       } else if (style == 1) {
2420         CHK(setBlobReadHook(tup) == 0);
2421       } else {
2422         if(g_con->execute(NoCommit) ||
2423            readBlobData(tup))
2424           CHK((timeout= conHasTimeoutError()) == true);
2425       }
2426       if (!timeout)
2427       {
2428         if (g_con->execute(Commit) != 0)
2429         {
2430           CHK((timeout= conHasTimeoutError()) == true);
2431         }
2432       }
2433       if (!timeout)
2434       {
2435         // verify lock mode upgrade (already done by NdbIndexOperation)
2436         CHK((g_opx?g_opx:g_const_opr)->getLockMode() == NdbOperation::LM_Read);
2437         if (style == 0 || style == 1) {
2438           CHK(verifyBlobValue(tup) == 0);
2439         }
2440       }
2441       else
2442       {
2443         DISP("Timeout while reading via index ("
2444              << conError() <<")  Retries left : "
2445              << opTimeoutRetries -1);
2446         CHK(--opTimeoutRetries);
2447 
2448         opState= Retrying;
2449         sleep(1);
2450       }
2451       g_ndb->closeTransaction(g_con);
2452     } while (opState == Retrying);
2453     g_const_opr = 0;
2454     g_opx = 0;
2455     g_con = 0;
2456   }
2457   return 0;
2458 }
2459 
2460 static int
updateIdx(int style,int api)2461 updateIdx(int style, int api)
2462 {
2463   DBG("--- updateIdx " << stylename[style] << " " << apiName[api] << " ---");
2464   for (unsigned k = 0; k < g_opt.m_rows; k++) {
2465     Tup& tup = g_tups[k];
2466     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2467     enum OpState opState;
2468 
2469     do
2470     {
2471       opState= Normal;
2472       DBG("updateIdx pk1=" << hex << tup.m_pk1);
2473       // skip 4275 testing
2474       CHK((g_con = g_ndb->startTransaction()) != 0);
2475       if (api == API_RECATTR)
2476       {
2477         CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
2478         CHK(g_opx->updateTuple() == 0);
2479         CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
2480         CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
2481         /* No need to set partition Id for unique indexes */
2482         CHK(getBlobHandles(g_opx) == 0);
2483       }
2484       else
2485       {
2486         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2487         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2488         /* No need to set partition Id for unique indexes */
2489         CHK((g_const_opr= g_con->updateTuple(g_idx_record, tup.m_key_row,
2490                                              g_blob_record, tup.m_row)) != 0);
2491         CHK(getBlobHandles(g_const_opr) == 0);
2492       }
2493       bool timeout= false;
2494       if (style == 0) {
2495         CHK(setBlobValue(tup) == 0);
2496       } else if (style == 1) {
2497         CHK(setBlobWriteHook(tup) == 0);
2498       } else {
2499         if (g_con->execute(NoCommit) ||
2500             writeBlobData(tup))
2501           CHK((timeout= conHasTimeoutError()) == true);
2502       }
2503       if (!timeout)
2504       {
2505         if (g_con->execute(Commit) != 0)
2506           CHK((timeout= conHasTimeoutError()) == true);
2507       }
2508       if (timeout)
2509       {
2510         DISP("Timeout in Index Update ("
2511              << conError() <<")  Retries left : "
2512              << opTimeoutRetries-1);
2513         CHK(--opTimeoutRetries);
2514         opState= Retrying;
2515         sleep(1);
2516       }
2517       g_ndb->closeTransaction(g_con);
2518     } while (opState == Retrying);
2519     g_const_opr = 0;
2520     g_opx = 0;
2521     g_con = 0;
2522     tup.m_exists = true;
2523   }
2524   return 0;
2525 }
2526 
2527 static int
writeIdx(int style,int api)2528 writeIdx(int style, int api)
2529 {
2530   DBG("--- writeIdx " << stylename[style] << " " << apiName[api] << " ---");
2531   for (unsigned k = 0; k < g_opt.m_rows; k++) {
2532     Tup& tup = g_tups[k];
2533     Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2534     enum OpState opState;
2535 
2536     do
2537     {
2538       opState= Normal;
2539       DBG("writeIdx pk1=" << hex << tup.m_pk1);
2540       CHK((g_con = g_ndb->startTransaction()) != 0);
2541       if (api == API_RECATTR)
2542       {
2543         CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
2544         CHK(g_opx->writeTuple() == 0);
2545         CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
2546         CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
2547         /* No need to set partition Id for unique indexes */
2548         CHK(getBlobHandles(g_opx) == 0);
2549       }
2550       else
2551       {
2552         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2553         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2554         memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
2555         memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2556         memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2557         /* No need to set partition Id for unique indexes */
2558         CHK((g_const_opr= g_con->writeTuple(g_idx_record, tup.m_key_row,
2559                                             g_full_record, tup.m_row)) != 0);
2560         CHK(getBlobHandles(g_const_opr) == 0);
2561       }
2562       bool timeout= false;
2563       if (style == 0) {
2564         CHK(setBlobValue(tup) == 0);
2565       } else if (style == 1) {
2566         // non-nullable must be set
2567         CHK(g_bh1->setValue("", 0) == 0);
2568         CHK(setBlobWriteHook(tup) == 0);
2569       } else {
2570         // non-nullable must be set
2571         CHK(g_bh1->setValue("", 0) == 0);
2572         if (g_con->execute(NoCommit) ||
2573             writeBlobData(tup))
2574           CHK((timeout= conHasTimeoutError()) == true);
2575       }
2576       if (!timeout)
2577       {
2578         if (g_con->execute(Commit))
2579           CHK((timeout= conHasTimeoutError()) == true);
2580       }
2581       if (timeout)
2582       {
2583         DISP("Timeout in Index Write ("
2584              << conError() <<")  Retries left : "
2585              << opTimeoutRetries-1);
2586         CHK(--opTimeoutRetries);
2587         opState= Retrying;
2588         sleep(1);
2589       }
2590       g_ndb->closeTransaction(g_con);
2591     } while (opState == Retrying);
2592     g_const_opr = 0;
2593     g_opx = 0;
2594     g_con = 0;
2595     tup.m_exists = true;
2596   }
2597   return 0;
2598 }
2599 
2600 static int
deleteIdx(int api)2601 deleteIdx(int api)
2602 {
2603   DBG("--- deleteIdx " << apiName[api] << " ---");
2604   unsigned n = 0;
2605   unsigned k = 0;
2606   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2607   enum OpState opState;
2608 
2609   do
2610   {
2611     opState= Normal;
2612     CHK((g_con = g_ndb->startTransaction()) != 0);
2613     for (; k < g_opt.m_rows; k++) {
2614       Tup& tup = g_tups[k];
2615       DBG("deleteIdx pk1=" << hex << tup.m_pk1);
2616       if (api == API_RECATTR)
2617       {
2618         CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
2619         CHK(g_opx->deleteTuple() == 0);
2620         CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
2621         CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
2622         /* No need to set partition Id for unique indexes */
2623       }
2624       else
2625       {
2626         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
2627         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
2628         /* No need to set partition Id for unique indexes */
2629         CHK((g_const_opr= g_con->deleteTuple(g_idx_record, tup.m_key_row,
2630                                              g_full_record)) != 0);
2631       }
2632       if (++n == g_opt.m_batch) {
2633         if (g_con->execute(Commit))
2634         {
2635           CHK(conHasTimeoutError());
2636           DISP("Timeout deleteing via index ("
2637                << conError() <<")  Retries left :"
2638                << opTimeoutRetries-1);
2639           CHK(--opTimeoutRetries);
2640           opState= Retrying;
2641           k= k- (n-1);
2642           n= 0;
2643           sleep(1);
2644           break;
2645         }
2646 
2647         g_ndb->closeTransaction(g_con);
2648         CHK((g_con = g_ndb->startTransaction()) != 0);
2649         n = 0;
2650       }
2651 
2652       g_const_opr = 0;
2653       g_opx = 0;
2654       tup.m_exists = false;
2655     }
2656     if ((opState == Normal) &&
2657         (n != 0)) {
2658       if(g_con->execute(Commit))
2659       {
2660         CHK(conHasTimeoutError());
2661         DISP("Timeout on last idx delete batch ("
2662              << conError() <<")  Retries left :"
2663              << opTimeoutRetries-1);
2664         CHK(--opTimeoutRetries);
2665         opState= Retrying;
2666         k= k-(n-1);
2667         sleep(1);
2668       }
2669       n = 0;
2670     }
2671     g_ndb->closeTransaction(g_con);
2672   } while (opState == Retrying);
2673   g_con= 0;
2674   g_opx= 0;
2675   g_const_opr= 0;
2676   return 0;
2677 }
2678 
2679 // scan ops table and index
2680 
2681 static int
readScan(int style,int api,bool idx)2682 readScan(int style, int api, bool idx)
2683 {
2684   DBG("--- " << "readScan" << (idx ? "Idx" : "") << " " << stylename[style] << " " << apiName[api] << " ---");
2685   Tup tup;
2686   tup.alloc();  // allocate buffers
2687 
2688   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2689   enum OpState opState;
2690 
2691   do
2692   {
2693     opState= Normal;
2694     CHK((g_con = g_ndb->startTransaction()) != 0);
2695     NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead;
2696     switch(urandom(3))
2697     {
2698     case 0:
2699       lm = NdbOperation::LM_Read;
2700       break;
2701     case 1:
2702       lm = NdbOperation::LM_SimpleRead;
2703       break;
2704     default:
2705       break;
2706     }
2707     if (api == API_RECATTR)
2708     {
2709       if (! idx) {
2710         CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
2711       } else {
2712         CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
2713       }
2714       CHK(g_ops->readTuples(lm,
2715                             g_scanFlags,
2716                             g_batchSize,
2717                             g_parallel) == 0);
2718       CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
2719       if (g_opt.m_pk2chr.m_len != 0)
2720       {
2721         CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
2722         CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
2723       }
2724       /* Don't bother setting UserDefined partitions for scan tests */
2725       CHK(getBlobHandles(g_ops) == 0);
2726     }
2727     else
2728     {
2729       /* Don't bother setting UserDefined partitions for scan tests */
2730       if (! idx)
2731         CHK((g_ops= g_con->scanTable(g_full_record,
2732                                      lm)) != 0);
2733       else
2734         CHK((g_ops= g_con->scanIndex(g_ord_record, g_full_record,
2735                                      lm)) != 0);
2736       CHK(getBlobHandles(g_ops) == 0);
2737     }
2738 
2739     if (style == 0) {
2740       CHK(getBlobValue(tup) == 0);
2741     } else if (style == 1) {
2742       CHK(setBlobReadHook(tup) == 0);
2743     }
2744     if (g_con->execute(NoCommit))
2745     {
2746       CHK(conHasTimeoutError());
2747       DISP("Timeout scan read ("
2748            << conError()
2749            << ").  Retries left : "
2750            <<  opTimeoutRetries - 1);
2751       CHK(--opTimeoutRetries);
2752       opState= Retrying;
2753       g_ndb->closeTransaction(g_con);
2754       continue;
2755     }
2756 
2757     // verify lock mode upgrade
2758     CHK(g_ops->getLockMode() == NdbOperation::LM_Read);
2759     unsigned rows = 0;
2760     while (1) {
2761       int ret;
2762 
2763       if (api == API_RECATTR)
2764       {
2765         tup.m_pk1 = (Uint32)-1;
2766         memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
2767         tup.m_pk3 = -1;
2768         ret = g_ops->nextResult(true);
2769       }
2770       else
2771       {
2772         const char *out_row= NULL;
2773 
2774         if (0 == (ret = g_ops->nextResult(&out_row, true, false)))
2775         {
2776           memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
2777           if (g_opt.m_pk2chr.m_len != 0)
2778           {
2779             memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
2780             memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
2781           }
2782         }
2783       }
2784 
2785       if (ret == -1)
2786       {
2787         /* Timeout? */
2788         if (conHasTimeoutError())
2789         {
2790           /* Break out and restart scan unless we've
2791            * run out of attempts
2792            */
2793           DISP("Scan read failed due to deadlock timeout ("
2794                << conError() <<") retries left :"
2795                << opTimeoutRetries -1);
2796           CHK(--opTimeoutRetries);
2797 
2798           opState= Retrying;
2799           sleep(1);
2800           break;
2801         }
2802       }
2803       CHK(opState == Normal);
2804       CHK((ret == 0) || (ret == 1));
2805       if (ret == 1)
2806         break;
2807 
2808       DBG("readScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
2809       Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
2810       CHK(k < g_opt.m_rows && g_tups[k].m_exists);
2811       tup.copyfrom(g_tups[k]);
2812       if (style == 0) {
2813         CHK(verifyBlobValue(tup) == 0);
2814       } else if (style == 1) {
2815         // execute ops generated by callbacks, if any
2816         CHK(verifyBlobValue(tup) == 0);
2817       } else {
2818         if (readBlobData(tup))
2819         {
2820           CHK(conHasTimeoutError());
2821           DISP("Timeout in readScan("
2822                << conError()
2823                << ") Retries left : "
2824                << opTimeoutRetries - 1);
2825           CHK(--opTimeoutRetries);
2826           opState= Retrying;
2827           sleep(1);
2828           continue;
2829         }
2830       }
2831       rows++;
2832     }
2833     g_ndb->closeTransaction(g_con);
2834 
2835     if (opState == Normal)
2836       CHK(g_opt.m_rows == rows);
2837 
2838   } while (opState == Retrying);
2839 
2840   g_con = 0;
2841   g_ops = 0;
2842   return 0;
2843 }
2844 
2845 static int
updateScan(int style,int api,bool idx)2846 updateScan(int style, int api, bool idx)
2847 {
2848   DBG("--- " << "updateScan" << (idx ? "Idx" : "") << " " << stylename[style] << " " << apiName[api] << " ---");
2849   Tup tup;
2850   tup.alloc();  // allocate buffers
2851 
2852   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
2853   enum OpState opState;
2854 
2855   do
2856   {
2857     opState= Normal;
2858     CHK((g_con = g_ndb->startTransaction()) != 0);
2859     if (api == API_RECATTR)
2860     {
2861       if (! idx) {
2862         CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
2863       } else {
2864         CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
2865       }
2866       CHK(g_ops->readTuples(NdbOperation::LM_Exclusive,
2867                             g_scanFlags,
2868                             g_batchSize,
2869                             g_parallel) == 0);
2870       CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
2871       if (g_opt.m_pk2chr.m_len != 0)
2872       {
2873         CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
2874         CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
2875       }
2876       /* Don't bother setting UserDefined partitions for scan tests */
2877     }
2878     else
2879     {
2880       /* Don't bother setting UserDefined partitions for scan tests */
2881       if (! idx)
2882         CHK((g_ops= g_con->scanTable(g_key_record,
2883                                      NdbOperation::LM_Exclusive)) != 0);
2884       else
2885         CHK((g_ops= g_con->scanIndex(g_ord_record, g_key_record,
2886                                      NdbOperation::LM_Exclusive)) != 0);
2887     }
2888     CHK(g_con->execute(NoCommit) == 0);
2889     unsigned rows = 0;
2890     while (1) {
2891       const char *out_row= NULL;
2892       int ret;
2893 
2894       if (api == API_RECATTR)
2895       {
2896         tup.m_pk1 = (Uint32)-1;
2897         memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_totlen);
2898         tup.m_pk3 = -1;
2899 
2900         ret = g_ops->nextResult(true);
2901       }
2902       else
2903       {
2904         if(0 == (ret = g_ops->nextResult(&out_row, true, false)))
2905         {
2906           memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
2907           if (g_opt.m_pk2chr.m_len != 0) {
2908             memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
2909             memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
2910           }
2911         }
2912       }
2913 
2914       if (ret == -1)
2915       {
2916         /* Timeout? */
2917         if (conHasTimeoutError())
2918         {
2919           /* Break out and restart scan unless we've
2920            * run out of attempts
2921            */
2922           DISP("Scan update failed due to deadlock timeout ("
2923                << conError() <<"), retries left :"
2924                << opTimeoutRetries -1);
2925           CHK(--opTimeoutRetries);
2926 
2927           opState= Retrying;
2928           sleep(1);
2929           break;
2930         }
2931       }
2932       CHK(opState == Normal);
2933       CHK((ret == 0) || (ret == 1));
2934       if (ret == 1)
2935         break;
2936 
2937       DBG("updateScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
2938       Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
2939       CHK(k < g_opt.m_rows && g_tups[k].m_exists);
2940       // calculate new blob values
2941       calcBval(g_tups[k], false);
2942       tup.copyfrom(g_tups[k]);
2943       // cannot do 4275 testing, scan op error code controls execution
2944       if (api == API_RECATTR)
2945       {
2946         CHK((g_opr = g_ops->updateCurrentTuple()) != 0);
2947         CHK(getBlobHandles(g_opr) == 0);
2948       }
2949       else
2950       {
2951         CHK((g_const_opr = g_ops->updateCurrentTuple(g_con, g_blob_record, tup.m_row)) != 0);
2952         CHK(getBlobHandles(g_const_opr) == 0);
2953       }
2954       bool timeout= false;
2955       if (style == 0) {
2956         CHK(setBlobValue(tup) == 0);
2957       } else if (style == 1) {
2958         CHK(setBlobWriteHook(tup) == 0);
2959       } else {
2960         CHK(g_con->execute(NoCommit) == 0);
2961         if (writeBlobData(tup))
2962           CHK((timeout= conHasTimeoutError()) == true);
2963       }
2964       if (!timeout &&
2965           (g_con->execute(NoCommit)))
2966         CHK((timeout= conHasTimeoutError()) == true);
2967 
2968       if (timeout)
2969       {
2970         DISP("Scan update timeout("
2971              << conError()
2972              << ") Retries left : "
2973              << opTimeoutRetries-1);
2974         CHK(opTimeoutRetries--);
2975         opState= Retrying;
2976         sleep(1);
2977         break;
2978       }
2979 
2980       g_const_opr = 0;
2981       g_opr = 0;
2982       rows++;
2983     }
2984     if (opState == Normal)
2985     {
2986       CHK(g_con->execute(Commit) == 0);
2987       CHK(g_opt.m_rows == rows);
2988     }
2989     g_ndb->closeTransaction(g_con);
2990   } while (opState == Retrying);
2991   g_con = 0;
2992   g_ops = 0;
2993   return 0;
2994 }
2995 
2996 static int
lockUnlockScan(int style,int api,bool idx)2997 lockUnlockScan(int style, int api, bool idx)
2998 {
2999   DBG("--- " << "lockUnlockScan" << (idx ? "Idx" : "") << " " << stylename[style] << " " << apiName[api] << " ---");
3000   Tup tup;
3001   tup.alloc();  // allocate buffers
3002 
3003   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
3004   enum OpState opState;
3005 
3006   do
3007   {
3008     opState= Normal;
3009     CHK((g_con = g_ndb->startTransaction()) != 0);
3010     NdbOperation::LockMode lm = NdbOperation::LM_Read;
3011     if (urandom(2) == 0)
3012       lm = NdbOperation::LM_Exclusive;
3013 
3014     Uint32 scanFlags = g_scanFlags | NdbScanOperation::SF_KeyInfo;
3015 
3016     if (api == API_RECATTR)
3017     {
3018       if (! idx) {
3019         CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
3020       } else {
3021         CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
3022       }
3023       CHK(g_ops->readTuples(lm,
3024                             scanFlags,
3025                             g_batchSize,
3026                             g_parallel) == 0);
3027       CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
3028       if (g_opt.m_pk2chr.m_len != 0)
3029       {
3030         CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
3031         CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
3032       }
3033       /* Don't bother setting UserDefined partitions for scan tests */
3034     }
3035     else
3036     {
3037       NdbScanOperation::ScanOptions opts;
3038       opts.optionsPresent = NdbScanOperation::ScanOptions::SO_SCANFLAGS;
3039       opts.scan_flags = scanFlags;
3040 
3041       /* Don't bother setting UserDefined partitions for scan tests */
3042       if (! idx)
3043         CHK((g_ops= g_con->scanTable(g_key_record,
3044                                      lm, 0, &opts, sizeof(opts))) != 0);
3045       else
3046         CHK((g_ops= g_con->scanIndex(g_ord_record, g_key_record,
3047                                      lm, 0, 0, &opts, sizeof(opts))) != 0);
3048     }
3049     CHK(g_con->execute(NoCommit) == 0);
3050     unsigned rows = 0;
3051     while (1) {
3052       const char *out_row= NULL;
3053       int ret;
3054 
3055       if (api == API_RECATTR)
3056       {
3057         tup.m_pk1 = (Uint32)-1;
3058         memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_totlen);
3059         tup.m_pk3 = -1;
3060 
3061         ret = g_ops->nextResult(true);
3062       }
3063       else
3064       {
3065         if(0 == (ret = g_ops->nextResult(&out_row, true, false)))
3066         {
3067           memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
3068           if (g_opt.m_pk2chr.m_len != 0) {
3069             memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
3070             memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
3071           }
3072         }
3073       }
3074 
3075       if (ret == -1)
3076       {
3077         /* Timeout? */
3078         if (conHasTimeoutError())
3079         {
3080           /* Break out and restart scan unless we've
3081            * run out of attempts
3082            */
3083           DISP("Scan failed due to deadlock timeout ("
3084                << conError() <<"), retries left :"
3085                << opTimeoutRetries -1);
3086           CHK(--opTimeoutRetries);
3087 
3088           opState= Retrying;
3089           sleep(1);
3090           break;
3091         }
3092       }
3093       CHK(opState == Normal);
3094       CHK((ret == 0) || (ret == 1));
3095       if (ret == 1)
3096         break;
3097 
3098       DBG("lockUnlockScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
3099       /* Get tuple info for current row */
3100       Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
3101       CHK(k < g_opt.m_rows && g_tups[k].m_exists);
3102       tup.copyfrom(g_tups[k]);
3103 
3104       if (api == API_RECATTR)
3105       {
3106         CHK((g_opr = g_ops->lockCurrentTuple()) != 0);
3107         CHK(g_opr->getLockHandle() != NULL);
3108         CHK(getBlobHandles(g_opr) == 0);
3109       }
3110       else
3111       {
3112         NdbOperation::OperationOptions opts;
3113         opts.optionsPresent = NdbOperation::OperationOptions::OO_LOCKHANDLE;
3114         CHK((g_const_opr = g_ops->lockCurrentTuple(g_con, g_blob_record, tup.m_row,
3115                                                    0, &opts, sizeof(opts))) != 0);
3116         CHK(getBlobHandles(g_const_opr) == 0);
3117       }
3118       bool timeout= false;
3119       if (style == 0) {
3120         CHK(getBlobValue(tup) == 0);
3121       } else if (style == 1) {
3122         CHK(setBlobReadHook(tup) == 0);
3123       } else {
3124         CHK(g_con->execute(NoCommit) == 0);
3125         if (readBlobData(tup))
3126           CHK((timeout= conHasTimeoutError()) == true);
3127       }
3128       if (!timeout)
3129       {
3130         if (g_con->execute(NoCommit) == 0)
3131         {
3132           /* Read executed successfully,
3133            * - Verify the Blob data
3134            * - Verify the row is locked
3135            * - Close the Blob handles
3136            * - Attempt to unlock
3137            */
3138           NdbOperation::LockMode lmused = g_ops->getLockMode();
3139           CHK((lmused == NdbOperation::LM_Read) ||
3140               (lmused == NdbOperation::LM_Exclusive));
3141 
3142           if (style == 0 || style == 1) {
3143             CHK(verifyBlobValue(tup) == 0);
3144           }
3145 
3146           /* Occasionally check that we are locked */
3147           if (urandom(200) == 0)
3148             CHK(verifyRowLocked(tup) == 0);
3149 
3150           /* Close Blob handles */
3151           CHK(g_bh1->close() == 0);
3152           if (! g_opt.m_oneblob)
3153             CHK(g_bh2->close() == 0);
3154 
3155           if (lm != NdbOperation::LM_CommittedRead)
3156           {
3157             /* All Blob handles closed, now we can issue an
3158              * unlock operation and the main row should be
3159              * unlocked
3160              */
3161             const NdbOperation* readOp = (g_opr?g_opr:g_const_opr);
3162             const NdbLockHandle* lh = readOp->getLockHandle();
3163             CHK(lh != NULL);
3164             const NdbOperation* unlockOp = g_con->unlock(lh);
3165             CHK(unlockOp != NULL);
3166           }
3167 
3168           /* All Blob handles closed - manual or automatic
3169            * unlock op has been enqueued.  Now execute
3170            */
3171           CHK(g_con->execute(NoCommit) == 0);
3172         }
3173         else
3174         {
3175           CHK((timeout= conHasTimeoutError()) == true);
3176         }
3177       }
3178 
3179       if (timeout)
3180       {
3181         DISP("Scan read lock unlock timeout("
3182              << conError()
3183              << ") Retries left : "
3184              << opTimeoutRetries-1);
3185         CHK(opTimeoutRetries--);
3186         opState= Retrying;
3187         sleep(1);
3188         break;
3189       }
3190 
3191       g_const_opr = 0;
3192       g_opr = 0;
3193       rows++;
3194     }
3195     if (opState == Normal)
3196     {
3197       /* We've scanned all rows, locked them and then unlocked them
3198        * All rows should now be unlocked despite the transaction
3199        * not being committed.
3200        */
3201       for (unsigned k = 0; k < g_opt.m_rows; k++) {
3202         CHK(verifyRowNotLocked(g_tups[k]) == 0);
3203       }
3204 
3205       CHK(g_con->execute(Commit) == 0);
3206       CHK(g_opt.m_rows == rows);
3207     }
3208     g_ndb->closeTransaction(g_con);
3209   } while (opState == Retrying);
3210   g_con = 0;
3211   g_ops = 0;
3212   return 0;
3213 }
3214 
3215 static int
deleteScan(int api,bool idx)3216 deleteScan(int api, bool idx)
3217 {
3218   DBG("--- " << "deleteScan" << (idx ? "Idx" : "") << apiName[api] << " ---");
3219   Tup tup;
3220   Uint32 opTimeoutRetries= g_opt.m_timeout_retries;
3221   enum OpState opState;
3222   unsigned rows = 0;
3223 
3224   do
3225   {
3226     opState= Normal;
3227 
3228     CHK((g_con = g_ndb->startTransaction()) != 0);
3229 
3230     if (api == API_RECATTR)
3231     {
3232       if (! idx) {
3233         CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
3234       } else {
3235         CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
3236       }
3237       CHK(g_ops->readTuples(NdbOperation::LM_Exclusive,
3238                             g_scanFlags,
3239                             g_batchSize,
3240                             g_parallel) == 0);
3241       CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
3242       if (g_opt.m_pk2chr.m_len != 0)
3243       {
3244         CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
3245         CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
3246       }
3247       /* Don't bother setting UserDefined partitions for scan tests */
3248     }
3249     else
3250     {
3251       /* Don't bother setting UserDefined partitions for scan tests */
3252       if (! idx)
3253         CHK((g_ops= g_con->scanTable(g_key_record,
3254                                      NdbOperation::LM_Exclusive)) != 0);
3255       else
3256         CHK((g_ops= g_con->scanIndex(g_ord_record, g_key_record,
3257                                      NdbOperation::LM_Exclusive)) != 0);
3258     }
3259     CHK(g_con->execute(NoCommit) == 0);
3260     unsigned n = 0;
3261     while (1) {
3262       int ret;
3263 
3264       if (api == API_RECATTR)
3265       {
3266         tup.m_pk1 = (Uint32)-1;
3267         memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
3268         tup.m_pk3 = -1;
3269         ret = g_ops->nextResult(true);
3270       }
3271       else
3272       {
3273         const char *out_row= NULL;
3274 
3275         if (0 == (ret = g_ops->nextResult(&out_row, true, false)))
3276         {
3277           memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
3278           if (g_opt.m_pk2chr.m_len != 0)
3279           {
3280             memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
3281             memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
3282           }
3283         }
3284       }
3285 
3286       if (ret == -1)
3287       {
3288         /* Timeout? */
3289         if (conHasTimeoutError())
3290         {
3291           /* Break out and restart scan unless we've
3292            * run out of attempts
3293            */
3294           DISP("Scan delete failed due to deadlock timeout ("
3295                << conError() <<") retries left :"
3296                << opTimeoutRetries -1);
3297           CHK(--opTimeoutRetries);
3298 
3299           opState= Retrying;
3300           sleep(1);
3301           break;
3302         }
3303       }
3304       CHK(opState == Normal);
3305       CHK((ret == 0) || (ret == 1));
3306       if (ret == 1)
3307         break;
3308 
3309       while (1) {
3310         DBG("deleteScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
3311         Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
3312         CHK(k < g_opt.m_rows && g_tups[k].m_exists);
3313         g_tups[k].m_exists = false;
3314         if (api == API_RECATTR)
3315           CHK(g_ops->deleteCurrentTuple() == 0);
3316         else
3317           CHK(g_ops->deleteCurrentTuple(g_con, g_key_record) != NULL);
3318         tup.m_pk1 = (Uint32)-1;
3319         memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
3320         tup.m_pk3 = -1;
3321         if (api == API_RECATTR)
3322           ret = g_ops->nextResult(false);
3323         else
3324         {
3325           const char *out_row= NULL;
3326           ret = g_ops->nextResult(&out_row, false, false);
3327           if (ret == 0)
3328           {
3329             memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
3330             if (g_opt.m_pk2chr.m_len != 0)
3331             {
3332               memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
3333               memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
3334             }
3335           }
3336         }
3337 
3338         if (ret == -1)
3339         {
3340           /* Timeout? */
3341           if (conHasTimeoutError())
3342           {
3343             /* Break out and restart scan unless we've
3344              * run out of attempts
3345              */
3346             DISP("Scan delete failed due to deadlock timeout ("
3347                  << conError() <<") retries left :"
3348                  << opTimeoutRetries -1);
3349             CHK(--opTimeoutRetries);
3350 
3351             opState= Retrying;
3352             sleep(1);
3353             break;
3354           }
3355         }
3356         CHK(opState == Normal);
3357         CHK((ret == 0) || (ret == 1) || (ret == 2));
3358 
3359         if (++n == g_opt.m_batch || ret == 2) {
3360           DBG("execute batch: n=" << n << " ret=" << ret);
3361           if (! g_opt.m_fac) {
3362             CHK(g_con->execute(NoCommit) == 0);
3363           } else {
3364             CHK(g_con->execute(Commit) == 0);
3365             CHK(g_con->restart() == 0);
3366           }
3367           rows+= n;
3368           n = 0;
3369         }
3370         if (ret == 2)
3371           break;
3372       }
3373       if (opState == Retrying)
3374         break;
3375     }
3376     if (opState == Normal)
3377     {
3378       rows+= n;
3379       CHK(g_con->execute(Commit) == 0);
3380       CHK(g_opt.m_rows == rows);
3381     }
3382     g_ndb->closeTransaction(g_con);
3383 
3384   } while (opState == Retrying);
3385   g_con = 0;
3386   g_ops = 0;
3387   return 0;
3388 }
3389 
3390 
3391 enum OpTypes {
3392   PkRead,
3393   PkInsert,
3394   PkUpdate,
3395   PkWrite,
3396   PkDelete,
3397   UkRead,
3398   UkUpdate,
3399   UkWrite,
3400   UkDelete};
3401 
3402 static const char*
operationName(OpTypes optype)3403 operationName(OpTypes optype)
3404 {
3405   switch(optype){
3406   case PkRead:
3407     return "Pk Read";
3408   case PkInsert:
3409     return "Pk Insert";
3410   case PkUpdate:
3411     return "Pk Update";
3412   case PkWrite:
3413     return "Pk Write";
3414   case PkDelete:
3415     return "Pk Delete";
3416   case UkRead:
3417     return "Uk Read";
3418   case UkUpdate:
3419     return "Uk Update";
3420   case UkWrite:
3421     return "Uk Write";
3422   case UkDelete:
3423     return "Uk Delete";
3424   default:
3425     return "Bad operation type";
3426   }
3427 }
3428 
3429 static const char*
aoName(int abortOption)3430 aoName(int abortOption)
3431 {
3432   if (abortOption == 0)
3433     return "AbortOnError";
3434   return "IgnoreError";
3435 }
3436 
3437 static int
setupOperation(NdbOperation * & op,OpTypes optype,Tup & tup)3438 setupOperation(NdbOperation*& op, OpTypes optype, Tup& tup)
3439 {
3440   bool pkop;
3441   switch(optype){
3442   case PkRead: case PkInsert : case PkUpdate:
3443   case PkWrite : case PkDelete :
3444     pkop=true;
3445     break;
3446   default:
3447     pkop= false;
3448   }
3449 
3450   if (pkop)
3451     CHK((op= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3452   else
3453     CHK((op = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
3454 
3455   switch(optype){
3456   case PkRead:
3457   case UkRead:
3458     CHK(op->readTuple() == 0);
3459     break;
3460   case PkInsert:
3461     CHK(op->insertTuple() == 0);
3462     break;
3463   case PkUpdate:
3464   case UkUpdate:
3465     CHK(op->updateTuple() == 0);
3466     break;
3467   case PkWrite:
3468   case UkWrite:
3469     CHK(op->writeTuple() == 0);
3470     break;
3471   case PkDelete:
3472   case UkDelete:
3473     CHK(op->deleteTuple() == 0);
3474     break;
3475   default:
3476     CHK(false);
3477     return -1;
3478   }
3479 
3480   if (pkop)
3481   {
3482     setUDpartId(tup, op);
3483     CHK(op->equal("PK1", tup.m_pk1) == 0);
3484     if (g_opt.m_pk2chr.m_len != 0)
3485     {
3486       CHK(op->equal("PK2", tup.m_pk2) == 0);
3487       CHK(op->equal("PK3", tup.m_pk3) == 0);
3488     }
3489   }
3490   else
3491   {
3492     CHK(op->equal("PK2", tup.m_pk2) == 0);
3493     CHK(op->equal("PK3", tup.m_pk3) == 0);
3494   }
3495 
3496   CHK(getBlobHandles(op) == 0);
3497 
3498   switch(optype){
3499   case PkRead:
3500   case UkRead:
3501     CHK(getBlobValue(tup) == 0);
3502     break;
3503   case PkInsert:
3504   case PkUpdate:
3505   case UkUpdate:
3506     /* Fall through */
3507   case PkWrite:
3508   case UkWrite:
3509     CHK(setBlobValue(tup) == 0);
3510     break;
3511   case PkDelete:
3512   case UkDelete:
3513     /* Nothing */
3514     break;
3515   default:
3516     CHK(false);
3517     return -1;
3518   }
3519 
3520   return 0;
3521 }
3522 
3523 static int
bugtest_36756()3524 bugtest_36756()
3525 {
3526   /* Transaction which had accessed a Blob table was ignoring
3527    * abortOption passed in the execute() call.
3528    * Check that option passed in execute() call overrides
3529    * default / manually set operation abortOption, even in the
3530    * presence of Blobs in the transaction
3531    */
3532 
3533   /* Operation         AbortOnError             IgnoreError
3534    * PkRead            NoDataFound*             NoDataFound
3535    * PkInsert          Duplicate key            Duplicate key*
3536    * PkUpdate          NoDataFound              NoDataFound*
3537    * PkWrite           NoDataFound              NoDataFound*
3538    * PkDelete          NoDataFound              NoDataFound*
3539    * UkRead            NoDataFound*             NoDataFound
3540    * UkUpdate          NoDataFound              NoDataFound*
3541    * UkWrite           NoDataFound              NoDataFound*
3542    * UkDelete          NoDataFound              NoDataFound*
3543    *
3544    * * Are interesting, where non-default behaviour is requested.
3545    */
3546 
3547   struct ExpectedOutcome
3548   {
3549     int executeRc;
3550     int transactionErrorCode;
3551     int opr1ErrorCode;
3552     int opr2ErrorCode;
3553     int commitStatus;
3554   };
3555 
3556   /* Generally, AbortOnError sets the transaction error
3557    * but not the Operation error codes
3558    * IgnoreError sets the transaction error and the
3559    * failing operation error code(s)
3560    * Odd cases :
3561    *   Pk Write : Can't fail due to key presence, just
3562    *              incorrect NULLs etc.
3563    *   Uk Write : Key must exist, so not really different
3564    *              to Update?
3565    */
3566   ExpectedOutcome outcomes[9][2]=
3567   {
3568     // PkRead
3569     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3570      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3571     // PkInsert
3572     // Note operation order reversed for insert
3573     {{-1, 630, 0, 0, NdbTransaction::Aborted},   // AE
3574      {0, 630, 0, 630, NdbTransaction::Started}}, // IE
3575     // PkUpdate
3576     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3577      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3578     // PkWrite
3579     {{0, 0, 0, 0, NdbTransaction::Started},      // AE
3580      {0, 0, 0, 0, NdbTransaction::Started}},     // IE
3581     // PkDelete
3582     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3583      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3584     // UkRead
3585     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3586      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3587     // UkUpdate
3588     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3589      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3590     // UkWrite
3591     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3592      {0, 626, 0, 626, NdbTransaction::Started}}, // IE
3593     // UkDelete
3594     {{-1, 626, 0, 0, NdbTransaction::Aborted},   // AE
3595      {0, 626, 0, 626, NdbTransaction::Started}}  // IE
3596   };
3597 
3598   DBG("bugtest_36756 : IgnoreError Delete of nonexisting tuple aborts");
3599   DBG("                Also 36851 : Insert IgnoreError of existing tuple aborts");
3600 
3601   for (int iterations=0; iterations < 50; iterations++)
3602   {
3603     /* Recalculate and insert different tuple every time to
3604      * get different keys(and therefore nodes), and
3605      * different length Blobs, including zero length
3606      * and NULL
3607      */
3608     calcTups(true);
3609 
3610     Tup& tupExists = g_tups[0];
3611     Tup& tupDoesNotExist = g_tups[1];
3612 
3613     /* Setup table with just 1 row present */
3614     CHK((g_con= g_ndb->startTransaction()) != 0);
3615     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3616     CHK(g_opr->insertTuple() == 0);
3617     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3618     if (g_opt.m_pk2chr.m_len != 0)
3619     {
3620       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3621       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3622     }
3623     setUDpartId(tupExists, g_opr);
3624     CHK(getBlobHandles(g_opr) == 0);
3625 
3626     CHK(setBlobValue(tupExists) == 0);
3627 
3628     CHK(g_con->execute(Commit) == 0);
3629     g_con->close();
3630 
3631     DBG("Iteration : " << iterations);
3632     for (int optype=PkRead; optype <= UkDelete; optype++)
3633     {
3634       DBG("  " << operationName((OpTypes)optype));
3635 
3636       Tup* tup1= &tupExists;
3637       Tup* tup2= &tupDoesNotExist;
3638 
3639       if (optype == PkInsert)
3640       {
3641         /* Inserts - we want the failing operation to be second
3642          * rather than first to avoid hitting bugs with IgnoreError
3643          * and the first DML in a transaction
3644          * So we swap them
3645          */
3646         tup1= &tupDoesNotExist; // (Insert succeeds)
3647         tup2= &tupExists; //(Insert fails)
3648       }
3649 
3650       for (int abortOption=0; abortOption < 2; abortOption++)
3651       {
3652         DBG("    " << aoName(abortOption));
3653         NdbOperation *opr1, *opr2;
3654         NdbOperation::AbortOption ao= (abortOption==0)?
3655           NdbOperation::AbortOnError :
3656           NdbOperation::AO_IgnoreError;
3657 
3658         CHK((g_con= g_ndb->startTransaction()) != 0);
3659 
3660         /* Operation 1 */
3661         CHK(setupOperation(opr1, (OpTypes)optype, *tup1) == 0);
3662 
3663         /* Operation2 */
3664         CHK(setupOperation(opr2, (OpTypes)optype, *tup2) == 0);
3665 
3666         ExpectedOutcome eo= outcomes[optype][abortOption];
3667 
3668         int rc = g_con->execute(NdbTransaction::NoCommit, ao);
3669 
3670         DBG("execute returned " << rc <<
3671             " Trans err " << g_con->getNdbError().code <<
3672             " Opr1 err " << opr1->getNdbError().code <<
3673             " Opr2 err " << opr2->getNdbError().code <<
3674             " CommitStatus " << g_con->commitStatus());
3675 
3676         CHK(rc == eo.executeRc);
3677         CHK(g_con->getNdbError().code == eo.transactionErrorCode);
3678         CHK(opr1->getNdbError().code == eo.opr1ErrorCode);
3679         CHK(opr2->getNdbError().code == eo.opr2ErrorCode);
3680         CHK(g_con->commitStatus() == eo.commitStatus);
3681 
3682         g_con->close();
3683       }
3684     }
3685 
3686     /* Now delete the 'existing'row */
3687     CHK((g_con= g_ndb->startTransaction()) != 0);
3688     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3689     CHK(g_opr->deleteTuple() == 0);
3690     setUDpartId(tupExists, g_opr);
3691     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3692     if (g_opt.m_pk2chr.m_len != 0)
3693     {
3694       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3695       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3696     }
3697 
3698     CHK(g_con->execute(Commit) == 0);
3699     g_con->close();
3700   }
3701 
3702   g_opr= 0;
3703   g_con= 0;
3704   g_bh1= 0;
3705 
3706   return 0;
3707 }
3708 
3709 
3710 static int
bugtest_45768()3711 bugtest_45768()
3712 {
3713   /* Transaction inserting using blobs has an early error
3714      resulting in kernel-originated rollback.
3715      Api then calls execute(Commit) which chokes on Blob
3716      objects
3717 
3718    */
3719   DBG("bugtest_45768 : Batched blob transaction with abort followed by commit");
3720 
3721   const int numIterations = 5;
3722 
3723   for (int iteration=0; iteration < numIterations; iteration++)
3724   {
3725     /* Recalculate and insert different tuple every time to
3726      * get different keys(and therefore nodes), and
3727      * different length Blobs, including zero length
3728      * and NULL
3729      */
3730     calcTups(true);
3731 
3732     const Uint32 totalRows = 100;
3733     const Uint32 preExistingTupNum =  totalRows / 2;
3734 
3735     Tup& tupExists = g_tups[ preExistingTupNum ];
3736 
3737     /* Setup table with just 1 row present */
3738     CHK((g_con= g_ndb->startTransaction()) != 0);
3739     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3740     CHK(g_opr->insertTuple() == 0);
3741     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3742     if (g_opt.m_pk2chr.m_len != 0)
3743     {
3744       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3745       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3746     }
3747     setUDpartId(tupExists, g_opr);
3748     CHK(getBlobHandles(g_opr) == 0);
3749 
3750     CHK(setBlobValue(tupExists) == 0);
3751 
3752     CHK(g_con->execute(Commit) == 0);
3753     g_con->close();
3754 
3755     DBG("Iteration : " << iteration);
3756 
3757     /* Now do batched insert, including a TUP which already
3758      * exists
3759      */
3760     int rc = 0;
3761     int retries = 10;
3762 
3763     do
3764     {
3765       CHK((g_con = g_ndb->startTransaction()) != 0);
3766 
3767       for (Uint32 tupNum = 0; tupNum < totalRows ; tupNum++)
3768       {
3769         Tup& tup = g_tups[ tupNum ];
3770         CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
3771         CHK(g_opr->insertTuple() == 0);
3772         CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
3773         if (g_opt.m_pk2chr.m_len != 0)
3774         {
3775           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
3776           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
3777         }
3778         setUDpartId(tup, g_opr);
3779 
3780         CHK(getBlobHandles(g_opr) == 0);
3781         CHK(setBlobValue(tup) == 0);
3782       }
3783 
3784       /* Now execute NoCommit */
3785       int rc = g_con->execute(NdbTransaction::NoCommit);
3786 
3787       CHK(rc == -1);
3788 
3789       if (g_con->getNdbError().code == 630)
3790         break; /* Expected */
3791 
3792       CHK(g_con->getNdbError().code == 1218); // Send buffers overloaded
3793 
3794       DBG("Send Buffers overloaded, retrying");
3795       sleep(1);
3796       g_con->close();
3797     } while (retries--);
3798 
3799     CHK(g_con->getNdbError().code == 630);
3800 
3801     /* Now execute Commit */
3802     rc = g_con->execute(NdbTransaction::Commit);
3803 
3804     CHK(rc == -1);
3805     /* Transaction aborted already */
3806     CHK(g_con->getNdbError().code == 4350);
3807 
3808     g_con->close();
3809 
3810     /* Now delete the 'existing'row */
3811     CHK((g_con= g_ndb->startTransaction()) != 0);
3812     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3813     CHK(g_opr->deleteTuple() == 0);
3814     setUDpartId(tupExists, g_opr);
3815     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3816     if (g_opt.m_pk2chr.m_len != 0)
3817     {
3818       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3819       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3820     }
3821 
3822     CHK(g_con->execute(Commit) == 0);
3823     g_con->close();
3824   }
3825 
3826   g_opr= 0;
3827   g_con= 0;
3828   g_bh1= 0;
3829 
3830   return 0;
3831 }
3832 
bugtest_48040()3833 static int bugtest_48040()
3834 {
3835   /* When batch of operations triggers unique index
3836    * maint triggers (which fire back to TC) and
3837    * TC is still receiving ops in batch from the API
3838    * TC uses ContinueB to self to defer trigger
3839    * processing until all operations have been
3840    * received.
3841    * If the transaction starts aborting (due to some
3842    * problem in the original operations) while the
3843    * ContinueB is 'in-flight', the ContinueB never
3844    * terminates and causes excessive CPU consumption
3845    *
3846    * This testcase sets an ERROR INSERT to detect
3847    * the excessive ContinueB use in 1 transaction,
3848    * and runs bugtest_bug45768 to generate the
3849    * scenario
3850    */
3851   NdbRestarter restarter;
3852 
3853   DBG("bugtest 48040 - Infinite ContinueB loop in TC abort + unique");
3854 
3855   restarter.waitConnected();
3856 
3857   int rc = restarter.insertErrorInAllNodes(8082);
3858 
3859   DBG(" Initial error insert rc" << rc << endl);
3860 
3861   rc = bugtest_45768();
3862 
3863   /* Give time for infinite loop to build */
3864   sleep(10);
3865   restarter.insertErrorInAllNodes(0);
3866 
3867   return rc;
3868 }
3869 
3870 
bugtest_62321()3871 static int bugtest_62321()
3872 {
3873   /* Having a Blob operation in a batch with other operations
3874    * causes the other operation's ignored error not to be
3875    * set as the transaction error code after execution.
3876    * This is used (e.g in MySQLD) to check for conflicts
3877    */
3878   DBG("bugtest_62321 : Error code from other ops in batch obscured");
3879 
3880   /*
3881      1) Setup table : 1 row exists, another doesnt
3882      2) Start transaction
3883      3) Define failing before op
3884      4) Define Blob op with/without post-exec part
3885      5) Define failing after op
3886      6) Execute
3887      7) Check results
3888   */
3889   calcTups(true);
3890 
3891   /* Setup table */
3892   Tup& tupExists = g_tups[0];
3893   Tup& notExists = g_tups[1];
3894   {
3895     CHK((g_con= g_ndb->startTransaction()) != 0);
3896     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3897     CHK(g_opr->insertTuple() == 0);
3898     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3899     if (g_opt.m_pk2chr.m_len != 0)
3900     {
3901       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3902       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3903     }
3904     setUDpartId(tupExists, g_opr);
3905     CHK(getBlobHandles(g_opr) == 0);
3906 
3907     CHK(setBlobValue(tupExists) == 0);
3908 
3909     CHK(g_con->execute(Commit) == 0);
3910     g_con->close();
3911   }
3912 
3913   for (int scenario = 0; scenario < 4; scenario++)
3914   {
3915     DBG(" Scenario : " << scenario);
3916     CHK((g_con= g_ndb->startTransaction()) != 0);
3917     NdbOperation* failOp = NULL;
3918     if ((scenario & 0x1) == 0)
3919     {
3920       DBG("  Fail op before");
3921       /* Define failing op in batch before Blob op */
3922       failOp= g_con->getNdbOperation(g_opt.m_tname);
3923       CHK(failOp != 0);
3924       CHK(failOp->readTuple() == 0);
3925       CHK(failOp->equal("PK1", notExists.m_pk1) == 0);
3926       if (g_opt.m_pk2chr.m_len != 0)
3927       {
3928         CHK(failOp->equal("PK2", notExists.m_pk2) == 0);
3929         CHK(failOp->equal("PK3", notExists.m_pk3) == 0);
3930       }
3931       setUDpartId(notExists, failOp);
3932       CHK(failOp->getValue("PK1") != 0);
3933       CHK(failOp->setAbortOption(NdbOperation::AO_IgnoreError) == 0);
3934     }
3935 
3936     /* Now define successful Blob op */
3937     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
3938     CHK(g_opr->readTuple() == 0);
3939     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
3940     if (g_opt.m_pk2chr.m_len != 0)
3941     {
3942       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
3943       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
3944     }
3945     setUDpartId(tupExists, g_opr);
3946     CHK(getBlobHandles(g_opr) == 0);
3947 
3948     CHK(getBlobValue(tupExists) == 0);
3949 
3950 
3951     /* Define failing batch op after Blob op if not defined before */
3952     if (failOp == 0)
3953     {
3954       DBG("  Fail op after");
3955       failOp= g_con->getNdbOperation(g_opt.m_tname);
3956       CHK(failOp != 0);
3957       CHK(failOp->readTuple() == 0);
3958       CHK(failOp->equal("PK1", notExists.m_pk1) == 0);
3959       if (g_opt.m_pk2chr.m_len != 0)
3960       {
3961         CHK(failOp->equal("PK2", notExists.m_pk2) == 0);
3962         CHK(failOp->equal("PK3", notExists.m_pk3) == 0);
3963       }
3964       setUDpartId(notExists, failOp);
3965       CHK(failOp->getValue("PK1") != 0);
3966       CHK(failOp->setAbortOption(NdbOperation::AO_IgnoreError) == 0);
3967     }
3968 
3969     /* Now execute and check rc etc */
3970     NdbTransaction::ExecType et = (scenario & 0x2) ?
3971       NdbTransaction::NoCommit:
3972       NdbTransaction::Commit;
3973 
3974     DBG("  Executing with execType = " << ((et == NdbTransaction::NoCommit)?
3975                                            "NoCommit":"Commit"));
3976     int rc = g_con->execute(NdbTransaction::NoCommit);
3977 
3978     CHK(rc == 0);
3979     CHK(g_con->getNdbError().code == 626);
3980     CHK(failOp->getNdbError().code == 626);
3981     CHK(g_opr->getNdbError().code == 0);
3982     DBG("  Error code on transaction as expected");
3983 
3984     g_con->close();
3985   }
3986 
3987   return 0;
3988 }
3989 
3990 // main
3991 
3992 // from here on print always
3993 #undef DBG
3994 #define DBG(x) \
3995   do { \
3996     ndbout << "line " << __LINE__ << " " << x << endl; \
3997   } while (0)
3998 
3999 static int
testmain()4000 testmain()
4001 {
4002   g_ndb = new Ndb(g_ncc, "TEST_DB");
4003   CHK(g_ndb->init(20) == 0);
4004   CHK(g_ndb->waitUntilReady() == 0);
4005   g_dic = g_ndb->getDictionary();
4006   initblobs();
4007   initConstants();
4008   g_tups = new Tup [g_opt.m_rows];
4009 
4010   // Create tablespace if we're going to use disk based data
4011   if (testcase('h'))
4012     createDefaultTableSpace();
4013 
4014   if (g_opt.m_seed == -1)
4015     g_opt.m_seed = getpid();
4016   if (g_opt.m_seed != 0) {
4017     DBG("random seed = " << g_opt.m_seed);
4018     ndb_srand(g_opt.m_seed);
4019   }
4020   for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) {
4021     for (int storage= 0; storage < 2; storage++) {
4022       if (!testcase(storageSymbol[storage]))
4023         continue;
4024 
4025       DBG("Create table " << storageName[storage]);
4026       CHK(dropTable() == 0);
4027       CHK(createTable(storage) == 0);
4028       { /* Dump created table information */
4029         Bcol& b1 = g_blob1;
4030         DBG("FragType: " << g_dic->getTable(g_opt.m_tname)->getFragmentType());
4031         CHK(NdbBlob::getBlobTableName(b1.m_btname, g_ndb, g_opt.m_tname, "BL1") == 0);
4032         DBG("BL1: inline=" << b1.m_inline << " part=" << b1.m_partsize << " table=" << b1.m_btname);
4033         if (! g_opt.m_oneblob) {
4034           Bcol& b2 = g_blob2;
4035           CHK(NdbBlob::getBlobTableName(b2.m_btname, g_ndb, g_opt.m_tname, "BL2") == 0);
4036           DBG("BL2: inline=" << b2.m_inline << " part=" << b2.m_partsize << " table=" << b2.m_btname);
4037         }
4038       }
4039 
4040       /* Capability to adjust disk scan parameters to avoid scan
4041        * timeouts with disk based Blobs (Error 274)
4042        */
4043       if (storage == STORAGE_DISK)
4044       {
4045         g_usingDisk= true;
4046         // TODO : Resolve whether we need to adjust these for disk data
4047         // Currently the scans are passing ok without this.
4048         g_batchSize= 0;
4049         g_parallel= 0;
4050         g_scanFlags= 0; //NdbScanOperation::SF_DiskScan;
4051       }
4052       else
4053       {
4054         g_usingDisk= false;
4055         g_batchSize= 0;
4056         g_parallel= 0;
4057         g_scanFlags= 0;
4058       }
4059 
4060       // TODO Remove/resolve
4061       DBG("Settings : usingdisk " << g_usingDisk
4062           << " batchSize " << g_batchSize
4063           << " parallel " << g_parallel
4064           << " scanFlags " << g_scanFlags);
4065 
4066       int style;
4067       int api;
4068       DBG("=== loop " << g_loop << " ===");
4069       if (g_opt.m_seed == 0)
4070         ndb_srand(g_loop);
4071       if (g_opt.m_bugtest != 0) {
4072         // test some bug# instead
4073         CHK((*g_opt.m_bugtest)() == 0);
4074         continue;
4075       }
4076       /* Loop over API styles */
4077       for (api = 0; api <=1; api++) {
4078         // pk
4079         if (! testcase(apiSymbol[api]))
4080           continue;
4081         for (style = 0; style <= 2; style++) {
4082           if (! testcase('k') || ! testcase(style) )
4083             continue;
4084           DBG("--- pk ops " << stylename[style] << " " << apiName[api] << " ---");
4085           if (testcase('n')) {
4086             calcTups(true);
4087             CHK(insertPk(style, api) == 0);
4088             CHK(verifyBlob() == 0);
4089             CHK(readPk(style, api) == 0);
4090             if (testcase('u')) {
4091               calcTups(false);
4092               CHK(updatePk(style, api) == 0);
4093               CHK(verifyBlob() == 0);
4094               CHK(readPk(style, api) == 0);
4095             }
4096             if (testcase('l')) {
4097               CHK(readLockPk(style,api) == 0);
4098             }
4099             if (testcase('d')) {
4100               CHK(deletePk(api) == 0);
4101               CHK(deleteNoPk() == 0);
4102               CHK(verifyBlob() == 0);
4103             }
4104           }
4105           if (testcase('w')) {
4106             calcTups(true);
4107             CHK(writePk(style, api) == 0);
4108             CHK(verifyBlob() == 0);
4109             CHK(readPk(style, api) == 0);
4110             if (testcase('u')) {
4111               calcTups(false);
4112               CHK(writePk(style, api) == 0);
4113               CHK(verifyBlob() == 0);
4114               CHK(readPk(style, api) == 0);
4115             }
4116             if (testcase('l')) {
4117               CHK(readLockPk(style,api) == 0);
4118             }
4119             if (testcase('d')) {
4120               CHK(deletePk(api) == 0);
4121               CHK(deleteNoPk() == 0);
4122               CHK(verifyBlob() == 0);
4123             }
4124           }
4125         }
4126 
4127         // hash index
4128         for (style = 0; style <= 2; style++) {
4129           if (! testcase('i') || ! testcase(style))
4130             continue;
4131           DBG("--- idx ops " << stylename[style] << " " << apiName[api] << " ---");
4132           if (testcase('n')) {
4133             calcTups(true);
4134             CHK(insertPk(style, api) == 0);
4135             CHK(verifyBlob() == 0);
4136             CHK(readIdx(style, api) == 0);
4137             if (testcase('u')) {
4138               calcTups(false);
4139               CHK(updateIdx(style, api) == 0);
4140               CHK(verifyBlob() == 0);
4141               CHK(readIdx(style, api) == 0);
4142             }
4143             if (testcase('d')) {
4144               CHK(deleteIdx(api) == 0);
4145               CHK(verifyBlob() == 0);
4146             }
4147           }
4148           if (testcase('w')) {
4149             calcTups(false);
4150             CHK(writePk(style, api) == 0);
4151             CHK(verifyBlob() == 0);
4152             CHK(readIdx(style, api) == 0);
4153             if (testcase('u')) {
4154               calcTups(false);
4155               CHK(writeIdx(style, api) == 0);
4156               CHK(verifyBlob() == 0);
4157               CHK(readIdx(style, api) == 0);
4158             }
4159             if (testcase('d')) {
4160               CHK(deleteIdx(api) == 0);
4161               CHK(verifyBlob() == 0);
4162             }
4163           }
4164         }
4165         // scan table
4166         for (style = 0; style <= 2; style++) {
4167           if (! testcase('s') || ! testcase(style))
4168             continue;
4169           DBG("--- table scan " << stylename[style] << " " << apiName[api] << " ---");
4170           calcTups(true);
4171           CHK(insertPk(style, api) == 0);
4172           CHK(verifyBlob() == 0);
4173           CHK(readScan(style, api, false) == 0);
4174           if (testcase('u')) {
4175             CHK(updateScan(style, api, false) == 0);
4176             CHK(verifyBlob() == 0);
4177           }
4178           if (testcase('l')) {
4179             CHK(lockUnlockScan(style, api, false) == 0);
4180           }
4181           if (testcase('d')) {
4182             CHK(deleteScan(api, false) == 0);
4183             CHK(verifyBlob() == 0);
4184           }
4185         }
4186         // scan index
4187         for (style = 0; style <= 2; style++) {
4188           if (! testcase('r') || ! testcase(style))
4189             continue;
4190           DBG("--- index scan " << stylename[style] << " " << apiName[api] << " ---");
4191           calcTups(true);
4192           CHK(insertPk(style, api) == 0);
4193           CHK(verifyBlob() == 0);
4194           CHK(readScan(style, api, true) == 0);
4195           if (testcase('u')) {
4196             CHK(updateScan(style, api, true) == 0);
4197             CHK(verifyBlob() == 0);
4198           }
4199           if (testcase('l')) {
4200             CHK(lockUnlockScan(style, api, true) == 0);
4201           }
4202           if (testcase('d')) {
4203             CHK(deleteScan(api, true) == 0);
4204             CHK(verifyBlob() == 0);
4205           }
4206         }
4207       } // for (api
4208     } // for (storage
4209   } // for (loop
4210   delete g_ndb;
4211   return 0;
4212 }
4213 
4214 // separate performance test
4215 
4216 struct Tmr {    // stolen from testOIBasic
TmrTmr4217   Tmr() {
4218     clr();
4219   }
clrTmr4220   void clr() {
4221     m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
4222   }
onTmr4223   void on() {
4224     assert(m_on == 0);
4225     m_on = NdbTick_CurrentMillisecond();
4226   }
offTmr4227   void off(unsigned cnt = 0) {
4228     NDB_TICKS off = NdbTick_CurrentMillisecond();
4229     assert(m_on != 0 && off >= m_on);
4230     m_ms += off - m_on;
4231     m_cnt += cnt;
4232     m_on = 0;
4233   }
timeTmr4234   const char* time() {
4235     if (m_cnt == 0)
4236       sprintf(m_time, "%u ms", (Uint32)m_ms);
4237     else
4238       sprintf(m_time, "%u ms per %u ( %llu ms per 1000 )", (Uint32)m_ms, m_cnt, (1000 * m_ms) / m_cnt);
4239     return m_time;
4240   }
pctTmr4241   const char* pct (const Tmr& t1) {
4242     if (0 < t1.m_ms)
4243       sprintf(m_text, "%llu pct", (100 * m_ms) / t1.m_ms);
4244     else
4245       sprintf(m_text, "[cannot measure]");
4246     return m_text;
4247   }
overTmr4248   const char* over(const Tmr& t1) {
4249     if (0 < t1.m_ms) {
4250       if (t1.m_ms <= m_ms)
4251         sprintf(m_text, "%llu pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
4252       else
4253         sprintf(m_text, "-%llu pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
4254     } else
4255       sprintf(m_text, "[cannot measure]");
4256     return m_text;
4257   }
4258   NDB_TICKS m_on;
4259   NDB_TICKS m_ms;
4260   unsigned m_cnt;
4261   char m_time[100];
4262   char m_text[100];
4263 };
4264 
4265 static int
testperf()4266 testperf()
4267 {
4268   if (! testcase('p'))
4269     return 0;
4270   DBG("=== perf test ===");
4271   g_bh1 = g_bh2 = 0;
4272   g_ndb = new Ndb(g_ncc, "TEST_DB");
4273   CHK(g_ndb->init() == 0);
4274   CHK(g_ndb->waitUntilReady() == 0);
4275   g_dic = g_ndb->getDictionary();
4276   NdbDictionary::Table tab(g_opt.m_tnameperf);
4277   if (g_dic->getTable(tab.getName()) != 0)
4278     CHK(g_dic->dropTable(tab.getName()) == 0);
4279   // col A - pk
4280   { NdbDictionary::Column col("A");
4281     col.setType(NdbDictionary::Column::Unsigned);
4282     col.setPrimaryKey(true);
4283     tab.addColumn(col);
4284   }
4285   // col B - char 20
4286   { NdbDictionary::Column col("B");
4287     col.setType(NdbDictionary::Column::Char);
4288     col.setLength(20);
4289     col.setNullable(true);
4290     tab.addColumn(col);
4291   }
4292   // col C - text
4293   { NdbDictionary::Column col("C");
4294     col.setType(NdbDictionary::Column::Text);
4295     col.setBlobVersion(g_opt.m_blob_version);
4296     col.setInlineSize(20);
4297     col.setPartSize(512);
4298     col.setStripeSize(1);
4299     col.setNullable(true);
4300     tab.addColumn(col);
4301   }
4302   // create
4303   CHK(g_dic->createTable(tab) == 0);
4304   Uint32 cA = 0, cB = 1, cC = 2;
4305   // timers
4306   Tmr t1;
4307   Tmr t2;
4308   // insert char (one trans)
4309   {
4310     DBG("--- insert char ---");
4311     char b[20];
4312     t1.on();
4313     CHK((g_con = g_ndb->startTransaction()) != 0);
4314     for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
4315       CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
4316       CHK(g_opr->insertTuple() == 0);
4317       CHK(g_opr->equal(cA, (char*)&k) == 0);
4318       memset(b, 0x20, sizeof(b));
4319       b[0] = 'b';
4320       CHK(g_opr->setValue(cB, b) == 0);
4321       CHK(g_con->execute(NoCommit) == 0);
4322     }
4323     t1.off(g_opt.m_rowsperf);
4324     CHK(g_con->execute(Rollback) == 0);
4325     DBG(t1.time());
4326     g_opr = 0;
4327     g_con = 0;
4328   }
4329   // insert text (one trans)
4330   {
4331     DBG("--- insert text ---");
4332     t2.on();
4333     CHK((g_con = g_ndb->startTransaction()) != 0);
4334     for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
4335       CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
4336       CHK(g_opr->insertTuple() == 0);
4337       CHK(g_opr->equal(cA, (char*)&k) == 0);
4338       CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
4339       CHK((g_bh1->setValue("c", 1) == 0));
4340       CHK(g_con->execute(NoCommit) == 0);
4341     }
4342     t2.off(g_opt.m_rowsperf);
4343     CHK(g_con->execute(Rollback) == 0);
4344     DBG(t2.time());
4345     g_bh1 = 0;
4346     g_opr = 0;
4347     g_con = 0;
4348   }
4349   // insert overhead
4350   DBG("insert overhead: " << t2.over(t1));
4351   t1.clr();
4352   t2.clr();
4353   // insert
4354   {
4355     DBG("--- insert for read test ---");
4356     unsigned n = 0;
4357     char b[20];
4358     CHK((g_con = g_ndb->startTransaction()) != 0);
4359     for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
4360       CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
4361       CHK(g_opr->insertTuple() == 0);
4362       CHK(g_opr->equal(cA, (char*)&k) == 0);
4363       memset(b, 0x20, sizeof(b));
4364       b[0] = 'b';
4365       CHK(g_opr->setValue(cB, b) == 0);
4366       CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
4367       CHK((g_bh1->setValue("c", 1) == 0));
4368       if (++n == g_opt.m_batch) {
4369         CHK(g_con->execute(Commit) == 0);
4370         g_ndb->closeTransaction(g_con);
4371         CHK((g_con = g_ndb->startTransaction()) != 0);
4372         n = 0;
4373       }
4374     }
4375     if (n != 0) {
4376       CHK(g_con->execute(Commit) == 0);
4377       g_ndb->closeTransaction(g_con); g_con = 0;
4378       n = 0;
4379     }
4380     g_bh1 = 0;
4381     g_opr = 0;
4382   }
4383   // pk read char (one trans)
4384   {
4385     DBG("--- pk read char ---");
4386     CHK((g_con = g_ndb->startTransaction()) != 0);
4387     Uint32 a;
4388     char b[20];
4389     t1.on();
4390     for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
4391       CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
4392       CHK(g_opr->readTuple() == 0);
4393       CHK(g_opr->equal(cA, (char*)&k) == 0);
4394       CHK(g_opr->getValue(cA, (char*)&a) != 0);
4395       CHK(g_opr->getValue(cB, b) != 0);
4396       a = (Uint32)-1;
4397       b[0] = 0;
4398       CHK(g_con->execute(NoCommit) == 0);
4399       CHK(a == k && b[0] == 'b');
4400     }
4401     CHK(g_con->execute(Commit) == 0);
4402     t1.off(g_opt.m_rowsperf);
4403     DBG(t1.time());
4404     g_opr = 0;
4405     g_ndb->closeTransaction(g_con); g_con = 0;
4406   }
4407   // pk read text (one trans)
4408   {
4409     DBG("--- pk read text ---");
4410     CHK((g_con = g_ndb->startTransaction()) != 0);
4411     Uint32 a;
4412     char c[20];
4413     t2.on();
4414     for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
4415       CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
4416       CHK(g_opr->readTuple() == 0);
4417       CHK(g_opr->equal(cA, (char*)&k) == 0);
4418       CHK(g_opr->getValue(cA, (char*)&a) != 0);
4419       CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
4420       a = (Uint32)-1;
4421       c[0] = 0;
4422       CHK(g_con->execute(NoCommit) == 0);
4423       Uint32 m = 20;
4424       CHK(g_bh1->readData(c, m) == 0);
4425       CHK(a == k && m == 1 && c[0] == 'c');
4426     }
4427     CHK(g_con->execute(Commit) == 0);
4428     t2.off(g_opt.m_rowsperf);
4429     DBG(t2.time());
4430     g_ndb->closeTransaction(g_con); g_opr = 0;
4431     g_con = 0;
4432   }
4433   // pk read overhead
4434   DBG("pk read overhead: " << t2.over(t1));
4435   t1.clr();
4436   t2.clr();
4437   // scan read char
4438   const uint scan_loops = 10;
4439   {
4440     DBG("--- scan read char ---");
4441     Uint32 a;
4442     char b[20];
4443     uint i;
4444     for (i = 0; i < scan_loops; i++) {
4445       CHK((g_con = g_ndb->startTransaction()) != 0);
4446       CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
4447       CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
4448       CHK(g_ops->getValue(cA, (char*)&a) != 0);
4449       CHK(g_ops->getValue(cB, b) != 0);
4450       CHK(g_con->execute(NoCommit) == 0);
4451       unsigned n = 0;
4452       t1.on();
4453       while (1) {
4454         a = (Uint32)-1;
4455         b[0] = 0;
4456         int ret;
4457         CHK((ret = g_ops->nextResult(true)) == 0 || ret == 1);
4458         if (ret == 1)
4459           break;
4460         CHK(a < g_opt.m_rowsperf && b[0] == 'b');
4461         n++;
4462       }
4463       CHK(n == g_opt.m_rowsperf);
4464       t1.off(g_opt.m_rowsperf);
4465       g_ndb->closeTransaction(g_con); g_ops = 0;
4466       g_con = 0;
4467     }
4468     DBG(t1.time());
4469   }
4470   // scan read text
4471   {
4472     DBG("--- read text ---");
4473     Uint32 a;
4474     char c[20];
4475     uint i;
4476     for (i = 0; i < scan_loops; i++) {
4477       CHK((g_con = g_ndb->startTransaction()) != 0);
4478       CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
4479       CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
4480       CHK(g_ops->getValue(cA, (char*)&a) != 0);
4481       CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0);
4482       CHK(g_con->execute(NoCommit) == 0);
4483       unsigned n = 0;
4484       t2.on();
4485       while (1) {
4486         a = (Uint32)-1;
4487         c[0] = 0;
4488         int ret;
4489         CHK((ret = g_ops->nextResult(true)) == 0 || ret == 1);
4490         if (ret == 1)
4491           break;
4492         Uint32 m = 20;
4493         CHK(g_bh1->readData(c, m) == 0);
4494         CHK(a < g_opt.m_rowsperf && m == 1 && c[0] == 'c');
4495         n++;
4496       }
4497       CHK(n == g_opt.m_rowsperf);
4498       t2.off(g_opt.m_rowsperf);
4499       g_bh1 = 0;
4500       g_ops = 0;
4501       g_ndb->closeTransaction(g_con); g_con = 0;
4502     }
4503     DBG(t2.time());
4504   }
4505   // scan read overhead
4506   DBG("scan read overhead: " << t2.over(t1));
4507   t1.clr();
4508   t2.clr();
4509   delete g_ndb;
4510   return 0;
4511 }
4512 
4513 // bug tests
4514 
4515 static int
bugtest_4088()4516 bugtest_4088()
4517 {
4518   unsigned i;
4519   DBG("bug test 4088 - ndb api hang with mixed ops on index table");
4520   // insert rows
4521   calcTups(true);
4522   CHK(insertPk(0, API_NDBRECORD) == 0);
4523   // new trans
4524   CHK((g_con = g_ndb->startTransaction()) != 0);
4525   for (unsigned k = 0; k < g_opt.m_rows; k++) {
4526     Tup& tup = g_tups[k];
4527     // read table pk via index as a table
4528     const unsigned pkcnt = 2;
4529     Tup pktup[pkcnt];
4530     for (i = 0; i < pkcnt; i++) {
4531       char name[20];
4532       // XXX guess table id
4533       sprintf(name, "%d/%s", 4, g_opt.m_x1name);
4534       CHK((g_opr = g_con->getNdbOperation(name)) != 0);
4535       CHK(g_opr->readTuple() == 0);
4536       CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
4537       setUDpartId(tup, g_opr);
4538       CHK(g_opr->getValue("NDB$PK", (char*)&pktup[i].m_pk1) != 0);
4539     }
4540     // read blob inline via index as an index
4541     CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
4542     CHK(g_opx->readTuple() == 0);
4543     CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
4544     assert(tup.m_bval1.m_buf != 0);
4545     CHK(g_opx->getValue("BL1", (char*)tup.m_bval1.m_buf) != 0);
4546     // execute
4547     // BUG 4088: gets 1 tckeyconf, 1 tcindxconf, then hangs
4548     CHK(g_con->execute(Commit) == 0);
4549     // verify
4550     for (i = 0; i < pkcnt; i++) {
4551       CHK(pktup[i].m_pk1 == tup.m_pk1);
4552       CHK(memcmp(pktup[i].m_pk2, tup.m_pk2, g_opt.m_pk2chr.m_len) == 0);
4553     }
4554     CHK(memcmp(tup.m_bval1.m_val, tup.m_bval1.m_buf, 8 + g_blob1.m_inline) == 0);
4555   }
4556   return 0;
4557 }
4558 
4559 static int
bugtest_27018()4560 bugtest_27018()
4561 {
4562   DBG("bug test 27018 - middle partial part write clobbers rest of part");
4563 
4564   // insert rows
4565   calcTups(true);
4566   CHK(insertPk(0, API_NDBRECORD) == 0);
4567   // new trans
4568   for (unsigned k= 0; k < g_opt.m_rows; k++)
4569   {
4570     Tup& tup= g_tups[k];
4571 
4572     /* Update one byte in random position. */
4573     Uint32 offset= urandom(tup.m_bval1.m_len + 1);
4574     if (offset == tup.m_bval1.m_len) {
4575       // testing write at end is another problem..
4576       continue;
4577     }
4578     //DBG("len=" << tup.m_bval1.m_len << " offset=" << offset);
4579 
4580     CHK((g_con= g_ndb->startTransaction()) != 0);
4581     memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
4582     if (g_opt.m_pk2chr.m_len != 0) {
4583       memcpy(&tup.m_key_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
4584       memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
4585     }
4586     NdbOperation::OperationOptions opts;
4587     setUDpartIdNdbRecord(tup,
4588                          g_ndb->getDictionary()->getTable(g_opt.m_tname),
4589                          opts);
4590     CHK((g_const_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
4591                                          g_blob_record, tup.m_row,
4592                                          NULL, // mask
4593                                          &opts,
4594                                          sizeof(opts))) != 0);
4595     CHK(getBlobHandles(g_const_opr) == 0);
4596     CHK(g_con->execute(NoCommit) == 0);
4597 
4598     tup.m_bval1.m_buf[0]= 0xff ^ tup.m_bval1.m_val[offset];
4599     CHK(g_bh1->setPos(offset) == 0);
4600     CHK(g_bh1->writeData(&(tup.m_bval1.m_buf[0]), 1) == 0);
4601     CHK(g_con->execute(Commit) == 0);
4602     g_ndb->closeTransaction(g_con);
4603 
4604     CHK((g_con= g_ndb->startTransaction()) != 0);
4605     CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
4606                                        g_blob_record, tup.m_row,
4607                                        NdbOperation::LM_Read,
4608                                        NULL, // mask
4609                                        &opts,
4610                                        sizeof(opts))) != 0);
4611     CHK(getBlobHandles(g_const_opr) == 0);
4612 
4613     CHK(g_bh1->getValue(tup.m_bval1.m_buf, tup.m_bval1.m_len) == 0);
4614     CHK(g_con->execute(Commit) == 0);
4615 
4616     Uint64 len= ~0;
4617     CHK(g_bh1->getLength(len) == 0 && len == tup.m_bval1.m_len);
4618     tup.m_bval1.m_buf[offset]^= 0xff;
4619     //CHK(memcmp(tup.m_bval1.m_buf, tup.m_bval1.m_val, tup.m_bval1.m_len) == 0);
4620     Uint32 i = 0;
4621     while (i < tup.m_bval1.m_len) {
4622       CHK(tup.m_bval1.m_buf[i] == tup.m_bval1.m_val[i]);
4623       i++;
4624     }
4625 
4626     g_ndb->closeTransaction(g_con);
4627     g_con=0;
4628     g_const_opr=0;
4629   }
4630   CHK(deletePk(API_NDBRECORD) == 0);
4631 
4632   return 0;
4633 }
4634 
4635 
4636 struct bug27370_data {
4637   Ndb *m_ndb;
4638   char m_current_write_value;
4639   char *m_writebuf;
4640   Uint32 m_blob1_size;
4641   char *m_key_row;
4642   char *m_read_row;
4643   char *m_write_row;
4644   bool m_thread_stop;
4645   NdbOperation::OperationOptions* opts;
4646 };
4647 
bugtest_27370_thread(void * arg)4648 void *bugtest_27370_thread(void *arg)
4649 {
4650   bug27370_data *data= (bug27370_data *)arg;
4651 
4652   while (!data->m_thread_stop)
4653   {
4654     memset(data->m_writebuf, data->m_current_write_value, data->m_blob1_size);
4655     data->m_current_write_value++;
4656 
4657     NdbConnection *con;
4658     if ((con= data->m_ndb->startTransaction()) == 0)
4659       return (void *)"Failed to create transaction";
4660     const NdbOperation *opr;
4661     memcpy(data->m_write_row, data->m_key_row, g_rowsize);
4662     if ((opr= con->writeTuple(g_key_record, data->m_key_row,
4663                               g_full_record, data->m_write_row,
4664                               NULL, //mask
4665                               data->opts,
4666                               sizeof(NdbOperation::OperationOptions))) == 0)
4667       return (void *)"Failed to create operation";
4668     NdbBlob *bh;
4669     if ((bh= opr->getBlobHandle("BL1")) == 0)
4670       return (void *)"getBlobHandle() failed";
4671     if (bh->setValue(data->m_writebuf, data->m_blob1_size) != 0)
4672       return (void *)"setValue() failed";
4673     if (con->execute(Commit, AbortOnError, 1) != 0)
4674       return (void *)"execute() failed";
4675     data->m_ndb->closeTransaction(con);
4676   }
4677 
4678   return NULL;                                  // Success
4679 }
4680 
4681 static int
bugtest_27370()4682 bugtest_27370()
4683 {
4684   DBG("bug test 27370 - Potential inconsistent blob reads for ReadCommitted reads");
4685 
4686   bug27370_data data;
4687 
4688   CHK((data.m_key_row= new char[g_rowsize*3]) != 0);
4689   data.m_read_row= data.m_key_row + g_rowsize;
4690   data.m_write_row= data.m_read_row + g_rowsize;
4691 
4692   data.m_ndb= new Ndb(g_ncc, "TEST_DB");
4693   CHK(data.m_ndb->init(20) == 0);
4694   CHK(data.m_ndb->waitUntilReady() == 0);
4695 
4696   data.m_current_write_value= 0;
4697   data.m_blob1_size= g_blob1.m_inline + 10 * g_blob1.m_partsize;
4698   CHK((data.m_writebuf= new char [data.m_blob1_size]) != 0);
4699   Uint32 pk1_value= 27370;
4700 
4701   const NdbDictionary::Table* t= g_ndb->getDictionary()->getTable(g_opt.m_tname);
4702   bool isUserDefined= (t->getFragmentType() == NdbDictionary::Object::UserDefined);
4703   Uint32 partCount= t->getFragmentCount();
4704   Uint32 udPartId= pk1_value % partCount;
4705   NdbOperation::OperationOptions opts;
4706   opts.optionsPresent= 0;
4707   data.opts= &opts;
4708   if (isUserDefined)
4709   {
4710     opts.optionsPresent= NdbOperation::OperationOptions::OO_PARTITION_ID;
4711     opts.partitionId= udPartId;
4712   }
4713   memcpy(&data.m_key_row[g_pk1_offset], &pk1_value, sizeof(pk1_value));
4714   if (g_opt.m_pk2chr.m_len != 0)
4715   {
4716     memset(&data.m_key_row[g_pk2_offset], 'x', g_opt.m_pk2chr.m_totlen);
4717     if (!g_opt.m_pk2chr.m_fixed)
4718       data.m_key_row[g_pk2_offset]= urandom(g_opt.m_pk2chr.m_len + 1);
4719     Uint16 pk3_value= 27370;
4720     memcpy(&data.m_key_row[g_pk3_offset], &pk3_value, sizeof(pk3_value));
4721   }
4722   data.m_thread_stop= false;
4723 
4724   memset(data.m_writebuf, data.m_current_write_value, data.m_blob1_size);
4725   data.m_current_write_value++;
4726 
4727   CHK((g_con= g_ndb->startTransaction()) != 0);
4728   memcpy(data.m_write_row, data.m_key_row, g_rowsize);
4729   CHK((g_const_opr= g_con->writeTuple(g_key_record, data.m_key_row,
4730                                       g_full_record, data.m_write_row,
4731                                       NULL, // mask
4732                                       &opts,
4733                                       sizeof(opts))) != 0);
4734   CHK((g_bh1= g_const_opr->getBlobHandle("BL1")) != 0);
4735   CHK(g_bh1->setValue(data.m_writebuf, data.m_blob1_size) == 0);
4736   CHK(g_con->execute(Commit) == 0);
4737   g_ndb->closeTransaction(g_con);
4738   g_con= NULL;
4739 
4740   pthread_t thread_handle;
4741   CHK(pthread_create(&thread_handle, NULL, bugtest_27370_thread, &data) == 0);
4742 
4743   DBG("bug test 27370 - PK blob reads");
4744   Uint32 seen_updates= 0;
4745   while (seen_updates < 50)
4746   {
4747     CHK((g_con= g_ndb->startTransaction()) != 0);
4748     CHK((g_const_opr= g_con->readTuple(g_key_record, data.m_key_row,
4749                                        g_blob_record, data.m_read_row,
4750                                        NdbOperation::LM_CommittedRead,
4751                                        NULL, // mask
4752                                        &opts,
4753                                        sizeof(opts))) != 0);
4754     CHK((g_bh1= g_const_opr->getBlobHandle("BL1")) != 0);
4755     CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
4756 
4757     const Uint32 loop_max= 10;
4758     char read_char;
4759     char original_read_char= 0;
4760     Uint32 readloop;
4761     for (readloop= 0;; readloop++)
4762     {
4763       if (readloop > 0)
4764       {
4765         if (readloop > 1)
4766         {
4767           /* Compare against first read. */
4768           CHK(read_char == original_read_char);
4769         }
4770         else
4771         {
4772           /*
4773             We count the number of times we see the other thread had the
4774             chance to update, so that we can be sure it had the opportunity
4775             to run a reasonable number of times before we stop.
4776           */
4777           if (original_read_char != read_char)
4778             seen_updates++;
4779           original_read_char= read_char;
4780         }
4781       }
4782       if (readloop > loop_max)
4783         break;
4784       Uint32 readSize= 1;
4785       CHK(g_bh1->setPos(urandom(data.m_blob1_size)) == 0);
4786       CHK(g_bh1->readData(&read_char, readSize) == 0);
4787       CHK(readSize == 1);
4788       ExecType commitType= readloop == loop_max ? Commit : NoCommit;
4789       CHK(g_con->execute(commitType, AbortOnError, 1) == 0);
4790     }
4791     g_ndb->closeTransaction(g_con);
4792     g_con= NULL;
4793   }
4794 
4795   DBG("bug test 27370 - table scan blob reads");
4796   seen_updates= 0;
4797   while (seen_updates < 50)
4798   {
4799     CHK((g_con= g_ndb->startTransaction()) != 0);
4800     CHK((g_ops= g_con->scanTable(g_full_record,
4801                                  NdbOperation::LM_CommittedRead)) != 0);
4802     CHK((g_bh1= g_ops->getBlobHandle("BL1")) != 0);
4803     CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
4804     const char *out_row= NULL;
4805     CHK(g_ops->nextResult(&out_row, true, false) == 0);
4806 
4807     const Uint32 loop_max= 10;
4808     char read_char;
4809     char original_read_char= 0;
4810     Uint32 readloop;
4811     for (readloop= 0;; readloop++)
4812     {
4813       if (readloop > 0)
4814       {
4815         if (readloop > 1)
4816         {
4817           /* Compare against first read. */
4818           CHK(read_char == original_read_char);
4819         }
4820         else
4821         {
4822           /*
4823             We count the number of times we see the other thread had the
4824             chance to update, so that we can be sure it had the opportunity
4825             to run a reasonable number of times before we stop.
4826           */
4827           if (original_read_char != read_char)
4828             seen_updates++;
4829           original_read_char= read_char;
4830         }
4831       }
4832       if (readloop > loop_max)
4833         break;
4834       Uint32 readSize= 1;
4835       CHK(g_bh1->setPos(urandom(data.m_blob1_size)) == 0);
4836       CHK(g_bh1->readData(&read_char, readSize) == 0);
4837       CHK(readSize == 1);
4838       CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
4839     }
4840 
4841     CHK(g_ops->nextResult(&out_row, true, false) == 1);
4842     g_ndb->closeTransaction(g_con);
4843     g_con= NULL;
4844   }
4845 
4846   data.m_thread_stop= true;
4847   void *thread_return;
4848   CHK(pthread_join(thread_handle, &thread_return) == 0);
4849   DBG("bug 27370 - thread return status: " <<
4850       (thread_return ? (char *)thread_return : "<null>"));
4851   CHK(thread_return == 0);
4852 
4853   delete [] data.m_key_row;
4854   g_con= NULL;
4855   g_const_opr= NULL;
4856   g_bh1= NULL;
4857   return 0;
4858 }
4859 
4860 static int
bugtest_28116()4861 bugtest_28116()
4862 {
4863   DBG("bug test 28116 - Crash in getBlobHandle() when called without full key");
4864 
4865   if (g_opt.m_pk2chr.m_len == 0)
4866   {
4867     DBG("  ... skipped, requires multi-column primary key.");
4868     return 0;
4869   }
4870 
4871   calcTups(true);
4872 
4873   for (unsigned k = 0; k < g_opt.m_rows; k++) {
4874     Tup& tup = g_tups[k];
4875     CHK((g_con = g_ndb->startTransaction()) != 0);
4876     CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
4877     int reqType = urandom(4);
4878     switch(reqType) {
4879     case 0:
4880     {
4881       DBG("Read");
4882       CHK(g_opr->readTuple() == 0);
4883       break;
4884     }
4885     case 1:
4886     {
4887       DBG("Insert");
4888       CHK(g_opr->insertTuple() == 0);
4889       break;
4890     }
4891     case 2:
4892     {
4893       DBG("Update");
4894       CHK(g_opr->updateTuple() == 0);
4895       break;
4896     }
4897     case 3:
4898     default:
4899     {
4900       DBG("Delete");
4901       CHK(g_opr->deleteTuple() == 0);
4902       break;
4903     }
4904     }
4905     switch (urandom(3)) {
4906     case 0:
4907     {
4908       DBG("  No keys");
4909       break;
4910     }
4911     case 1:
4912     {
4913       DBG("  Pk1 only");
4914       CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
4915       break;
4916     }
4917     case 2:
4918     default:
4919     {
4920       DBG("  Pk2/3 only");
4921       if (g_opt.m_pk2chr.m_len != 0)
4922       {
4923         CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
4924         CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
4925       }
4926       break;
4927     }
4928     }
4929     /* Deliberately no equal() on rest of primary key, to provoke error. */
4930     CHK(g_opr->getBlobHandle("BL1") == 0);
4931 
4932     /* 4264 - Invalid usage of Blob attribute */
4933     CHK(g_con->getNdbError().code == 4264);
4934     CHK(g_opr->getNdbError().code == 4264);
4935 
4936     g_ndb->closeTransaction(g_con);
4937     g_opr = 0;
4938     g_con = 0;
4939   }
4940   return 0;
4941 }
4942 
4943 static struct {
4944   int m_bug;
4945   int (*m_test)();
4946 } g_bugtest[] = {
4947   { 4088, bugtest_4088 },
4948   { 27018, bugtest_27018 },
4949   { 27370, bugtest_27370 },
4950   { 36756, bugtest_36756 },
4951   { 45768, bugtest_45768 },
4952   { 48040, bugtest_48040 },
4953   { 28116, bugtest_28116 },
4954   { 62321, bugtest_62321 }
4955 };
4956 
4957 NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
4958 {
4959   ndb_init();
4960   // log the invocation
4961   char cmdline[512];
4962   {
4963     const char* progname =
4964       strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
4965     strcpy(cmdline, progname);
4966     for (int i = 1; i < argc; i++) {
4967       strcat(cmdline, " ");
4968       strcat(cmdline, argv[i]);
4969     }
4970   }
4971   Chr& pk2chr = g_opt.m_pk2chr;
4972   while (++argv, --argc > 0) {
4973     const char* arg = argv[0];
4974     if (strcmp(arg, "-batch") == 0) {
4975       if (++argv, --argc > 0) {
4976 	g_opt.m_batch = atoi(argv[0]);
4977         continue;
4978       }
4979     }
4980     if (strcmp(arg, "-core") == 0) {
4981       g_opt.m_core = true;
4982       continue;
4983     }
4984     if (strcmp(arg, "-dbg") == 0) {
4985       g_opt.m_dbg = true;
4986       continue;
4987     }
4988     if (strcmp(arg, "-debug") == 0) {
4989       if (++argv, --argc > 0) {
4990         g_opt.m_dbg = true;
4991         g_opt.m_debug = strdup(argv[0]);
4992 	continue;
4993       }
4994     }
4995     if (strcmp(arg, "-fac") == 0) {
4996       g_opt.m_fac = true;
4997       continue;
4998     }
4999     if (strcmp(arg, "-full") == 0) {
5000       g_opt.m_full = true;
5001       continue;
5002     }
5003     if (strcmp(arg, "-loop") == 0) {
5004       if (++argv, --argc > 0) {
5005 	g_opt.m_loop = atoi(argv[0]);
5006 	continue;
5007       }
5008     }
5009     if (strcmp(arg, "-min") == 0) {
5010       g_opt.m_min = true;
5011       continue;
5012     }
5013     if (strcmp(arg, "-parts") == 0) {
5014       if (++argv, --argc > 0) {
5015 	g_opt.m_parts = atoi(argv[0]);
5016 	continue;
5017       }
5018     }
5019     if (strcmp(arg, "-rows") == 0) {
5020       if (++argv, --argc > 0) {
5021 	g_opt.m_rows = atoi(argv[0]);
5022 	continue;
5023       }
5024     }
5025     if (strcmp(arg, "-rowsperf") == 0) {
5026       if (++argv, --argc > 0) {
5027 	g_opt.m_rowsperf = atoi(argv[0]);
5028 	continue;
5029       }
5030     }
5031     if (strcmp(arg, "-seed") == 0) {
5032       if (++argv, --argc > 0) {
5033 	g_opt.m_seed = atoi(argv[0]);
5034 	continue;
5035       }
5036     }
5037     if (strcmp(arg, "-skip") == 0) {
5038       if (++argv, --argc > 0) {
5039         g_opt.m_skip = strdup(argv[0]);
5040 	continue;
5041       }
5042     }
5043     if (strcmp(arg, "-test") == 0) {
5044       if (++argv, --argc > 0) {
5045         g_opt.m_test = strdup(argv[0]);
5046 	continue;
5047       }
5048     }
5049     if (strcmp(arg, "-timeoutretries") == 0) {
5050       if (++argv, --argc > 0) {
5051         g_opt.m_timeout_retries = atoi(argv[0]);
5052         continue;
5053       }
5054     }
5055     if (strcmp(arg, "-version") == 0) {
5056       if (++argv, --argc > 0) {
5057 	g_opt.m_blob_version = atoi(argv[0]);
5058         if (g_opt.m_blob_version == 1 || g_opt.m_blob_version == 2)
5059           continue;
5060       }
5061     }
5062     // metadata
5063     if (strcmp(arg, "-pk2len") == 0) {
5064       if (++argv, --argc > 0) {
5065 	pk2chr.m_len = atoi(argv[0]);
5066         continue;
5067       }
5068     }
5069     if (strcmp(arg, "-pk2fixed") == 0) {
5070       pk2chr.m_fixed = true;
5071       continue;
5072     }
5073     if (strcmp(arg, "-pk2binary") == 0) {
5074       pk2chr.m_binary = true;
5075       continue;
5076     }
5077     if (strcmp(arg, "-pk2cs") == 0) {
5078       if (++argv, --argc > 0) {
5079         pk2chr.m_cs = strdup(argv[0]);
5080 	continue;
5081       }
5082     }
5083     if (strcmp(arg, "-pk2part") == 0) {
5084       g_opt.m_pk2part = true;
5085       continue;
5086     }
5087     if (strcmp(arg, "-oneblob") == 0) {
5088       g_opt.m_oneblob = true;
5089       continue;
5090     }
5091     if (strcmp(arg, "-rbatch") == 0) {
5092       if (++argv, --argc > 0) {
5093         g_opt.m_rbatch = atoi(argv[0]);
5094         continue;
5095       }
5096     }
5097     if (strcmp(arg, "-wbatch") == 0) {
5098       if (++argv, --argc > 0) {
5099         g_opt.m_wbatch = atoi(argv[0]);
5100         continue;
5101       }
5102     }
5103     // bugs
5104     if (strcmp(arg, "-bug") == 0) {
5105       if (++argv, --argc > 0) {
5106 	g_opt.m_bug = atoi(argv[0]);
5107         for (unsigned i = 0; i < sizeof(g_bugtest)/sizeof(g_bugtest[0]); i++) {
5108           if (g_opt.m_bug == g_bugtest[i].m_bug) {
5109             g_opt.m_bugtest = g_bugtest[i].m_test;
5110             break;
5111           }
5112         }
5113         if (g_opt.m_bugtest != 0)
5114           continue;
5115       }
5116     }
5117     if (strcmp(arg, "-?") == 0 || strcmp(arg, "-h") == 0) {
5118       printusage();
5119       goto success;
5120     }
5121     ndbout << "unknown option " << arg << endl;
5122     goto wrongargs;
5123   }
5124   if (g_opt.m_debug != 0) {
5125     if (strchr(g_opt.m_debug, ':') == 0) {
5126       const char* s = "d:t:F:L:o,";
5127       char* t = new char [strlen(s) + strlen(g_opt.m_debug) + 1];
5128       strcpy(t, s);
5129       strcat(t, g_opt.m_debug);
5130       g_opt.m_debug = t;
5131     }
5132     DBUG_PUSH(g_opt.m_debug);
5133     ndbout.m_out = new FileOutputStream(DBUG_FILE);
5134   }
5135   if (pk2chr.m_len == 0) {
5136     char b[100];
5137     b[0] = 0;
5138     if (g_opt.m_skip != 0)
5139       strcpy(b, g_opt.m_skip);
5140     strcat(b, "i");
5141     strcat(b, "r");
5142     g_opt.m_skip = strdup(b);
5143   }
5144   if (pk2chr.m_len != 0) {
5145     Chr& c = pk2chr;
5146     if (c.m_binary) {
5147       if (c.m_fixed)
5148         c.m_type = NdbDictionary::Column::Binary;
5149       else
5150         c.m_type = NdbDictionary::Column::Varbinary;
5151       c.m_mblen = 1;
5152       c.m_cs = 0;
5153     } else {
5154       assert(c.m_cs != 0);
5155       if (c.m_fixed)
5156         c.m_type = NdbDictionary::Column::Char;
5157       else
5158         c.m_type = NdbDictionary::Column::Varchar;
5159       c.m_csinfo = get_charset_by_name(c.m_cs, MYF(0));
5160       if (c.m_csinfo == 0)
5161         c.m_csinfo = get_charset_by_csname(c.m_cs, MY_CS_PRIMARY, MYF(0));
5162       if (c.m_csinfo == 0) {
5163         ndbout << "unknown charset " << c.m_cs << endl;
5164         goto wrongargs;
5165       }
5166       c.m_mblen = c.m_csinfo->mbmaxlen;
5167       if (c.m_mblen == 0)
5168         c.m_mblen = 1;
5169     }
5170     c.m_bytelen = c.m_len * c.m_mblen;
5171     if (c.m_bytelen > 255) {
5172       ndbout << "length of pk2 in bytes exceeds 255" << endl;
5173       goto wrongargs;
5174     }
5175     if (c.m_fixed)
5176       c.m_totlen = c.m_bytelen;
5177     else
5178       c.m_totlen = 1 + c.m_bytelen;
5179     c.m_caseins = false;
5180     if (c.m_cs != 0) {
5181       CHARSET_INFO* info = c.m_csinfo;
5182       const char* p = "ABCxyz";
5183       const char* q = "abcXYZ";
5184       int e;
5185       if ((*info->cset->well_formed_len)(info, p, p + 6, 999, &e) != 6) {
5186         ndbout << "charset does not contain ascii" << endl;
5187         goto wrongargs;
5188       }
5189       if ((*info->coll->strcasecmp)(info, p, q) == 0) {
5190         c.m_caseins = true;
5191       }
5192       ndbout << "charset: " << c.m_cs << " caseins: " << c.m_caseins << endl;
5193     }
5194   }
5195   ndbout << cmdline << endl;
5196   g_ncc = new Ndb_cluster_connection();
5197   if (g_ncc->connect(30) != 0 || testmain() == -1 || testperf() == -1) {
5198     ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl;
5199     return NDBT_ProgramExit(NDBT_FAILED);
5200   }
5201   delete g_ncc;
5202   g_ncc = 0;
5203 success:
5204   return NDBT_ProgramExit(NDBT_OK);
5205 wrongargs:
5206   return NDBT_ProgramExit(NDBT_WRONGARGS);
5207 }
5208 
5209 // vim: set sw=2 et:
5210