1 /*
2 Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <NDBT.hpp>
26 #include <NDBT_Test.hpp>
27
28 #define ERR_EXIT(obj, msg) \
29 do \
30 { \
31 fprintf(stderr, "%s: %s (%d) in %s:%d\n", \
32 msg, obj->getNdbError().message, obj->getNdbError().code, __FILE__, __LINE__); \
33 exit(-1); \
34 } \
35 while (0);
36
37 #define PRINT_ERROR(code,msg) \
38 do \
39 { \
40 fprintf(stderr, "Error in %s, line: %d, code: %d, msg: %s.\n", __FILE__, __LINE__, code, msg); \
41 } \
42 while (0);
43
44 #define MYSQLERROR(mysql) { \
45 PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
46 exit(-1); }
47 #define APIERROR(error) { \
48 PRINT_ERROR(error.code,error.message); \
49 exit(-1); }
50
51 #define TEST_NAME "TestScanFilter"
52 #define TABLE_NAME "TABLE_SCAN"
53
54 const char *COL_NAME[] = {"id", "i", "j", "k", "l", "m", "n"};
55 const char COL_LEN = 7;
56 /*
57 * Not to change TUPLE_NUM, because the column in TABLE_NAME is fixed,
58 * there are six columns, 'i', 'j', 'k', 'l', 'm', 'n', and each on is equal to 1 or 1,
59 * Since each tuple should be unique in this case, then TUPLE_NUM = 2 power 6 = 64
60 */
61 const int TUPLE_NUM = 1 << (COL_LEN - 1);
62
63 /*
64 * the recursive level of random scan filter, can
65 * modify this parameter more or less, range from
66 * 1 to 100, larger num consumes more scan time
67 */
68 const int RECURSIVE_LEVEL = 10;
69
70 const int MAX_STR_LEN = (RECURSIVE_LEVEL * (COL_LEN+1) * 4);
71
72 /*
73 * Each time stands for one test, it will produce a random
74 * filter string, and scan through ndb api and through
75 * calculation with tuples' data, then compare the result,
76 * if they are equal, this test passed, or failed.
77 * Only all TEST_NUM times tests passed, we can believe
78 * the suite of test cases are okay.
79 * Change TEST_NUM to larger will need more time to test
80 */
81 const int TEST_NUM = 5000;
82
83
84 /* Table definition*/
85 static
86 const
87 NDBT_Attribute MYTAB1Attribs[] = {
88 NDBT_Attribute("id", NdbDictionary::Column::Unsigned, 1, true),
89 NDBT_Attribute("i", NdbDictionary::Column::Unsigned),
90 NDBT_Attribute("j", NdbDictionary::Column::Unsigned),
91 NDBT_Attribute("k", NdbDictionary::Column::Unsigned),
92 NDBT_Attribute("l", NdbDictionary::Column::Unsigned),
93 NDBT_Attribute("m", NdbDictionary::Column::Unsigned),
94 NDBT_Attribute("n", NdbDictionary::Column::Unsigned),
95 };
96 static
97 const
98 NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
99
100 static
101 const
102 NDBT_Attribute MYTAB2Attribs[] = {
103 NDBT_Attribute("id", NdbDictionary::Column::Unsigned, 1, true),
104 // _pk _nullable
105 NDBT_Attribute("1bitnn", NdbDictionary::Column::Bit, 1, false, false),
106 NDBT_Attribute("1bitnu", NdbDictionary::Column::Bit, 1, false, true),
107 NDBT_Attribute("2bitnn", NdbDictionary::Column::Bit, 2, false, false),
108 NDBT_Attribute("2bitnu", NdbDictionary::Column::Bit, 2, false, true),
109 NDBT_Attribute("7bitnn", NdbDictionary::Column::Bit, 7, false, false),
110 NDBT_Attribute("7bitnu", NdbDictionary::Column::Bit, 7, false, true),
111 NDBT_Attribute("8bitnn", NdbDictionary::Column::Bit, 8, false, false),
112 NDBT_Attribute("8bitnu", NdbDictionary::Column::Bit, 8, false, true),
113 NDBT_Attribute("15bitnn", NdbDictionary::Column::Bit, 15, false, false),
114 NDBT_Attribute("15bitnu", NdbDictionary::Column::Bit, 15, false, true),
115 NDBT_Attribute("31bitnn", NdbDictionary::Column::Bit, 31, false, false),
116 NDBT_Attribute("31bitnu", NdbDictionary::Column::Bit, 31, false, true),
117 NDBT_Attribute("32bitnn", NdbDictionary::Column::Bit, 32, false, false),
118 NDBT_Attribute("32bitnu", NdbDictionary::Column::Bit, 32, false, true),
119 NDBT_Attribute("33bitnn", NdbDictionary::Column::Bit, 33, false, false),
120 NDBT_Attribute("33bitnu", NdbDictionary::Column::Bit, 33, false, true),
121 NDBT_Attribute("63bitnn", NdbDictionary::Column::Bit, 63, false, false),
122 NDBT_Attribute("63bitnu", NdbDictionary::Column::Bit, 63, false, true),
123 NDBT_Attribute("64bitnn", NdbDictionary::Column::Bit, 64, false, false),
124 NDBT_Attribute("64bitnu", NdbDictionary::Column::Bit, 64, false, true),
125 NDBT_Attribute("65bitnn", NdbDictionary::Column::Bit, 65, false, false),
126 NDBT_Attribute("65bitnu", NdbDictionary::Column::Bit, 65, false, true),
127 NDBT_Attribute("127bitnn", NdbDictionary::Column::Bit, 127, false, false),
128 NDBT_Attribute("127bitnu", NdbDictionary::Column::Bit, 127, false, true),
129 NDBT_Attribute("513bitnn", NdbDictionary::Column::Bit, 513, false, false),
130 NDBT_Attribute("513bitnu", NdbDictionary::Column::Bit, 513, false, true)
131 };
132
133 static const char* TABLE2_NAME= "MyTab2";
134
135 static
136 const
137 NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
138
139 static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
140 static const int MAX_BIT_WIDTH= 513;
141 /* One extra row for all bits == 0 */
142 static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
143
createTable(Ndb * pNdb,const NdbDictionary::Table * tab,bool _temp,bool existsOk,NDBT_CreateTableHook f)144 int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp,
145 bool existsOk, NDBT_CreateTableHook f)
146 {
147 int r = 0;
148 do{
149 NdbDictionary::Table tmpTab(* tab);
150 tmpTab.setStoredTable(_temp ? 0 : 1);
151 if(f != 0 && f(pNdb, tmpTab, 0, NULL))
152 {
153 ndbout << "Failed to create table" << endl;
154 return NDBT_FAILED;
155 }
156 r = pNdb->getDictionary()->createTable(tmpTab);
157 if(r == -1){
158 if(!existsOk){
159 ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
160 break;
161 }
162 if(pNdb->getDictionary()->getNdbError().code != 721){
163 ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
164 break;
165 }
166 r = 0;
167 }
168 }while(false);
169
170 return r;
171 }
172
173 /*
174 * Function to produce the tuples' data
175 */
runPopulate(NDBT_Context * ctx,NDBT_Step * step)176 int runPopulate(NDBT_Context* ctx, NDBT_Step* step)
177 {
178 Ndb *myNdb = GETNDB(step);
179 const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
180 const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
181 if(myTable == NULL)
182 APIERROR(myDict->getNdbError());
183
184 NdbTransaction* myTrans = myNdb->startTransaction();
185 if (myTrans == NULL)
186 APIERROR(myNdb->getNdbError());
187
188 for(int num = 0; num < TUPLE_NUM; num++)
189 {
190 NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
191 if(myNdbOperation == NULL)
192 {
193 APIERROR(myTrans->getNdbError());
194 }
195
196 /* the tuples' data in TABLE_NAME
197 +----+---+---+---+---+---+---+
198 | id | i | j | k | l | m | n |
199 +----+---+---+---+---+---+---+
200 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
201 | 1 | 0 | 0 | 0 | 0 | 0 | 1 |
202 | 2 | 0 | 0 | 0 | 0 | 1 | 0 |
203 | 3 | 0 | 0 | 0 | 0 | 1 | 1 |
204 | 4 | 0 | 0 | 0 | 1 | 0 | 0 |
205 | 5 | 0 | 0 | 0 | 1 | 0 | 1 |
206 | 6 | 0 | 0 | 0 | 1 | 1 | 0 |
207 | 7 | 0 | 0 | 0 | 1 | 1 | 1 |
208 | 8 | 0 | 0 | 1 | 0 | 0 | 0 |
209 | 9 | 0 | 0 | 1 | 0 | 0 | 1 |
210 | 10 | 0 | 0 | 1 | 0 | 1 | 0 |
211 | 11 | 0 | 0 | 1 | 0 | 1 | 1 |
212 | 12 | 0 | 0 | 1 | 1 | 0 | 0 |
213 | 13 | 0 | 0 | 1 | 1 | 0 | 1 |
214 | 14 | 0 | 0 | 1 | 1 | 1 | 0 |
215 | 15 | 0 | 0 | 1 | 1 | 1 | 1 |
216 | 16 | 0 | 1 | 0 | 0 | 0 | 0 |
217 | 17 | 0 | 1 | 0 | 0 | 0 | 1 |
218 | 18 | 0 | 1 | 0 | 0 | 1 | 0 |
219 | 19 | 0 | 1 | 0 | 0 | 1 | 1 |
220 | 20 | 0 | 1 | 0 | 1 | 0 | 0 |
221 | 21 | 0 | 1 | 0 | 1 | 0 | 1 |
222 | 22 | 0 | 1 | 0 | 1 | 1 | 0 |
223 | 23 | 0 | 1 | 0 | 1 | 1 | 1 |
224 | 24 | 0 | 1 | 1 | 0 | 0 | 0 |
225 | 25 | 0 | 1 | 1 | 0 | 0 | 1 |
226 | 26 | 0 | 1 | 1 | 0 | 1 | 0 |
227 | 27 | 0 | 1 | 1 | 0 | 1 | 1 |
228 | 28 | 0 | 1 | 1 | 1 | 0 | 0 |
229 | 29 | 0 | 1 | 1 | 1 | 0 | 1 |
230 | 30 | 0 | 1 | 1 | 1 | 1 | 0 |
231 | 31 | 0 | 1 | 1 | 1 | 1 | 1 |
232 | 32 | 1 | 0 | 0 | 0 | 0 | 0 |
233 | 33 | 1 | 0 | 0 | 0 | 0 | 1 |
234 | 34 | 1 | 0 | 0 | 0 | 1 | 0 |
235 | 35 | 1 | 0 | 0 | 0 | 1 | 1 |
236 | 36 | 1 | 0 | 0 | 1 | 0 | 0 |
237 | 37 | 1 | 0 | 0 | 1 | 0 | 1 |
238 | 38 | 1 | 0 | 0 | 1 | 1 | 0 |
239 | 39 | 1 | 0 | 0 | 1 | 1 | 1 |
240 | 40 | 1 | 0 | 1 | 0 | 0 | 0 |
241 | 41 | 1 | 0 | 1 | 0 | 0 | 1 |
242 | 42 | 1 | 0 | 1 | 0 | 1 | 0 |
243 | 43 | 1 | 0 | 1 | 0 | 1 | 1 |
244 | 44 | 1 | 0 | 1 | 1 | 0 | 0 |
245 | 45 | 1 | 0 | 1 | 1 | 0 | 1 |
246 | 46 | 1 | 0 | 1 | 1 | 1 | 0 |
247 | 47 | 1 | 0 | 1 | 1 | 1 | 1 |
248 | 48 | 1 | 1 | 0 | 0 | 0 | 0 |
249 | 49 | 1 | 1 | 0 | 0 | 0 | 1 |
250 | 50 | 1 | 1 | 0 | 0 | 1 | 0 |
251 | 51 | 1 | 1 | 0 | 0 | 1 | 1 |
252 | 52 | 1 | 1 | 0 | 1 | 0 | 0 |
253 | 53 | 1 | 1 | 0 | 1 | 0 | 1 |
254 | 54 | 1 | 1 | 0 | 1 | 1 | 0 |
255 | 55 | 1 | 1 | 0 | 1 | 1 | 1 |
256 | 56 | 1 | 1 | 1 | 0 | 0 | 0 |
257 | 57 | 1 | 1 | 1 | 0 | 0 | 1 |
258 | 58 | 1 | 1 | 1 | 0 | 1 | 0 |
259 | 59 | 1 | 1 | 1 | 0 | 1 | 1 |
260 | 60 | 1 | 1 | 1 | 1 | 0 | 0 |
261 | 61 | 1 | 1 | 1 | 1 | 0 | 1 |
262 | 62 | 1 | 1 | 1 | 1 | 1 | 0 |
263 | 63 | 1 | 1 | 1 | 1 | 1 | 1 |
264 +----+---+---+---+---+---+---+
265 */
266 myNdbOperation->insertTuple();
267 myNdbOperation->equal(COL_NAME[0], num);
268 for(int col = 1; col < COL_LEN; col++)
269 {
270 myNdbOperation->setValue(COL_NAME[col], (num>>(COL_LEN-1-col))&1);
271 }
272 }
273
274 int check = myTrans->execute(NdbTransaction::Commit);
275
276 myTrans->close();
277
278 if (check == -1)
279 return NDBT_FAILED;
280 else
281 return NDBT_OK;
282
283 }
284
285
286
287 /*
288 * a=AND, o=OR, A=NAND, O=NOR
289 */
290 char op_string[] = "aoAO";
291 /*
292 * the six columns' name of test table
293 */
294 char col_string[] = "ijklmn";
295 const int op_len = strlen(op_string);
296 const int col_len = strlen(col_string);
297
298 /*
299 * get a random op from "aoAO"
300 */
get_rand_op_ch(char * ch)301 int get_rand_op_ch(char *ch)
302 {
303 static unsigned int num = 0;
304 if(++num == 0)
305 num = 1;
306 srand(num*(unsigned int)time(NULL));
307 *ch = op_string[rand() % op_len];
308 return 1;
309 }
310
311 /*
312 * get a random order form of "ijklmn" trough exchanging letter
313 */
change_col_order()314 void change_col_order()
315 {
316 int pos1,pos2;
317 char temp;
318 for (int i = 0; i < 10; i++) //exchange for 10 times
319 {
320 srand((unsigned int)time(NULL)/(i+1));
321 pos1 = rand() % col_len;
322 srand((i+1)*(unsigned int)time(NULL));
323 pos2 = rand() % col_len;
324 if (pos1 == pos2)
325 continue;
326 temp = col_string[pos1];
327 col_string[pos1] = col_string[pos2];
328 col_string[pos2] = temp;
329 }
330 }
331
332 /*
333 * get a random sub string of "ijklmn"
334 */
get_rand_col_str(char * str)335 int get_rand_col_str(char *str)
336 {
337 int len;
338 static unsigned int num = 0;
339 if(++num == 0)
340 num = 1;
341 srand(num*(unsigned int)time(NULL));
342 len = rand() % col_len + 1;
343 change_col_order();
344 BaseString::snprintf(str, len+1, "%s", col_string); //len+1, including '\0'
345 return len;
346 }
347
348 /*
349 * get a random string including operation and column
350 * eg, Alnikx
351 */
get_rand_op_str(char * str)352 int get_rand_op_str(char *str)
353 {
354 char temp[256];
355 int len1, len2, len;
356 len1 = get_rand_op_ch(temp);
357 len2 = get_rand_col_str(temp+len1);
358 len = len1 + len2;
359 temp[len] = 'x';
360 temp[len+1] = '\0'; // Add ending \0
361 BaseString::snprintf(str, len+1+1, "%s", temp); //len+1, including '\0'
362 return len+1;
363 }
364
365 /*
366 * replace a letter of source string with a new string
367 * e.g., source string: 'Aijkx', replace i with new string 'olmx'
368 * then source string is changed to 'Aolmxjkx'
369 * source: its format should be produced from get_rand_op_str()
370 * pos: range from 1 to strlen(source)-2
371 */
replace_a_to_str(char * source,int pos,char * newstr)372 int replace_a_to_str(char *source, int pos, char *newstr)
373 {
374 char temp[MAX_STR_LEN];
375 BaseString::snprintf(temp, pos+1, "%s", source);
376 BaseString::snprintf(temp+pos, strlen(newstr)+1, "%s", newstr);
377 BaseString::snprintf(temp+pos+strlen(newstr), strlen(source)-pos, "%s", source+pos+1);
378 BaseString::snprintf(source, strlen(temp)+1, "%s", temp);
379 return strlen(source);
380 }
381
382 /*
383 * check whether the inputed char is an operation
384 */
check_op(char ch)385 bool check_op(char ch)
386 {
387 if( ch == 'a' || ch == 'A' || ch == 'o' || ch == 'O')
388 return true;
389 else
390 return false;
391 }
392
393 /*
394 * check whether the inputed char is end flag
395 */
check_end(char ch)396 bool check_end(char ch)
397 {
398 return (ch == 'x');
399 }
400
401 /*
402 * check whether the inputed char is end flag
403 */
check_col(char ch)404 bool check_col(char ch)
405 {
406 if( ch == 'i' || ch == 'j' || ch == 'k'
407 || ch == 'l' || ch == 'm' || ch == 'n' )
408 return true;
409 else
410 return false;
411 }
412
413 /*
414 * To ensure we can get a random string with RECURSIVE_LEVEL,
415 * we need a position where can replace a letter with a new string.
416 */
get_rand_replace_pos(char * str,int len)417 int get_rand_replace_pos(char *str, int len)
418 {
419 int pos_op = 0;
420 int pos_x = 0;
421 int pos_col = 0;
422 int span = 0;
423 static int num = 0;
424 char temp;
425
426 for(int i = 0; i < len; i++)
427 {
428 temp = str[i];
429 if(! check_end(temp))
430 {
431 if(check_op(temp))
432 pos_op = i;
433 }
434 else
435 {
436 pos_x = i;
437 break;
438 }
439 }
440
441 if(++num == 0)
442 num = 1;
443
444 span = pos_x - pos_op - 1;
445 if(span <= 1)
446 {
447 pos_col = pos_op + 1;
448 }
449 else
450 {
451 srand(num*(unsigned int)time(NULL));
452 pos_col = pos_op + rand() % span + 1;
453 }
454 return pos_col;
455 }
456
457 /*
458 * Check whether the given random string is valid
459 * and applicable for this test case
460 */
check_random_str(char * str)461 bool check_random_str(char *str)
462 {
463 char *p;
464 int op_num = 0;
465 int end_num = 0;
466
467 for(p = str; *p; p++)
468 {
469 bool tmp1 = false, tmp2 = false;
470 if ((tmp1 = check_op(*p)))
471 op_num++;
472 if ((tmp2 = check_end(*p)))
473 end_num++;
474 if (!(tmp1 || tmp2 || check_col(*p))) //there are illegal letters
475 return false;
476 }
477
478 if(op_num != end_num) //begins are not equal to ends
479 return false;
480
481 return true;
482 }
483
484 /*
485 * Get a random string with RECURSIVE_LEVEL
486 */
get_rand_op_str_compound(char * str)487 void get_rand_op_str_compound(char *str)
488 {
489 char small_str[256];
490 int pos;
491 int tmp;
492 int level;
493 static int num = 0;
494
495 if(++num == 0)
496 num = 1;
497
498 srand(num*(unsigned int)time(NULL));
499 level = 1 + rand() % RECURSIVE_LEVEL;
500
501 get_rand_op_str(str);
502
503 for(int i = 0; i < level; i++)
504 {
505 get_rand_op_str(small_str);
506 tmp = strlen(small_str);
507 get_rand_op_str(small_str + tmp); //get two operations
508 pos = get_rand_replace_pos(str, strlen(str));
509 replace_a_to_str(str, pos, small_str);
510 }
511
512 //check the random string
513 if(!check_random_str(str))
514 {
515 fprintf(stderr, "Error random string! \n");
516 exit(-1);
517 }
518 }
519
520 /*
521 * get column id of i,j,k,l,m,n
522 */
get_column_id(char ch)523 int get_column_id(char ch)
524 {
525 return (ch - 'i' + 1); //from 1 to 6
526 }
527
528 /*
529 * check whether column value of the NO. tuple is equal to 1
530 * col_id: column id, range from 1 to 6
531 * tuple_no: record NO., range from 0 to 63
532 */
check_col_equal_one(int tuple_no,int col_id)533 bool check_col_equal_one(int tuple_no, int col_id)
534 {
535 int i = 1 << (6 - col_id);
536 int j = tuple_no / i;
537 if(j % 2)
538 return true;
539 else
540 return false;
541 }
542
543 /*
544 * get a result after all elements in the array with AND
545 * value: pointer to a bool array
546 * len: length of the bool array
547 */
AND_op(bool * value,int len)548 bool AND_op(bool *value, int len)
549 {
550 for(int i = 0; i < len; i++)
551 {
552 if(! value[i])
553 return false;
554 }
555 return true;
556 }
557
558 /*
559 * get a result after all elements in the array with OR
560 * value: pointer to a bool array
561 * len: length of the bool array
562 */
OR_op(bool * value,int len)563 bool OR_op(bool *value, int len)
564 {
565 for(int i = 0; i < len; i++)
566 {
567 if(value[i])
568 return true;
569 }
570 return false;
571 }
572
573 /*
574 * get a result after all elements in the array with NAND
575 * value: pointer to a bool array
576 * len: length of the bool array
577 */
NAND_op(bool * value,int len)578 bool NAND_op(bool *value, int len)
579 {
580 return (! AND_op(value, len));
581 }
582
583 /*
584 * get a result after all elements in the array with NOR
585 * value: pointer to a bool array
586 * len: length of the bool array
587 */
NOR_op(bool * value,int len)588 bool NOR_op(bool *value, int len)
589 {
590 return (! OR_op(value, len));
591 }
592
593 /*
594 * AND/NAND/OR/NOR operation for a bool array
595 */
calculate_one_op(char op_type,bool * value,int len)596 bool calculate_one_op(char op_type, bool *value, int len)
597 {
598 switch(op_type)
599 {
600 case 'a':
601 return AND_op(value, len);
602 break;
603 case 'o':
604 return OR_op(value, len);
605 break;
606 case 'A':
607 return NAND_op(value, len);
608 break;
609 case 'O':
610 return NOR_op(value, len);
611 break;
612 }
613 return false; //make gcc happy
614 }
615
616 typedef struct _stack_element
617 {
618 char type;
619 int num;
620 }stack_element;
621
622 /*
623 * stack_op, store info for AND,OR,NAND,NOR
624 * stack_col, store value of column(i,j,k,l,m,n) and temporary result for an operation
625 */
626 stack_element stack_op[RECURSIVE_LEVEL * COL_LEN];
627 bool stack_col[RECURSIVE_LEVEL * COL_LEN * 2];
628
629 /*
630 * check whether the given tuple is chosen by judgement condition
631 * tuple_no, the NO of tuple in TABLE_NAME, range from 0 to TUPLE_NUM
632 * str: a random string of scan opearation and condition
633 * len: length of str
634 */
check_one_tuple(int tuple_no,char * str,int len)635 bool check_one_tuple(int tuple_no, char *str, int len)
636 {
637 int pop_op = 0;
638 int pop_col = 0;
639 for(int i = 0; i < len; i++)
640 {
641 char letter = *(str + i);
642 if(check_op(letter)) //push
643 {
644 stack_op[pop_op].type = letter;
645 stack_op[pop_op].num = 0;
646 pop_op++;
647 }
648 if(check_col(letter)) //push
649 {
650 stack_col[pop_col] = check_col_equal_one(tuple_no, get_column_id(letter));
651 pop_col++;
652 stack_op[pop_op-1].num += 1;
653 }
654 if(check_end(letter))
655 {
656 if(pop_op <= 1)
657 {
658 return calculate_one_op(stack_op[pop_op-1].type,
659 stack_col,
660 stack_op[pop_op-1].num);
661 }
662 else
663 {
664 bool tmp1 = calculate_one_op(stack_op[pop_op-1].type,
665 stack_col + pop_col - stack_op[pop_op-1].num,
666 stack_op[pop_op-1].num);
667 pop_col -= stack_op[pop_op-1].num; //pop
668 pop_op--;
669 stack_col[pop_col] = tmp1; //push
670 pop_col++;
671 stack_op[pop_op-1].num += 1;
672 }
673 }
674 }
675 return false; //make gcc happy
676 }
677
678 /*
679 * get lists of tuples which match the scan condiction through calculating
680 * str: a random string of scan opearation and condition
681 */
check_all_tuples(char * str,bool * res)682 void check_all_tuples(char *str, bool *res)
683 {
684 for (int i = 0; i < TUPLE_NUM; i++)
685 {
686 if(check_one_tuple(i, str, strlen(str)))
687 res[i] = true;
688 }
689 }
690
691 /*
692 * convert a letter to group number what ndbapi need
693 */
get_api_group(char op_name)694 NdbScanFilter::Group get_api_group(char op_name)
695 {
696 switch (op_name) {
697 case 'a': return NdbScanFilter::AND;
698 case 'o': return NdbScanFilter::OR;
699 case 'A': return NdbScanFilter::NAND;
700 case 'O': return NdbScanFilter::NOR;
701 default:
702 fprintf(stderr, "Invalid group name %c !\n", op_name);
703 exit(3);
704 }
705 }
706
707 /*
708 * with ndbapi, call begin, eq/ne/lt/gt/le/ge..., end
709 */
call_ndbapi(char * str,NdbTransaction * transaction,NdbScanOperation * scan,NdbDictionary::Column const * col[])710 NdbScanFilter * call_ndbapi(char *str, NdbTransaction *transaction,
711 NdbScanOperation *scan, NdbDictionary::Column const *col[])
712 {
713 NdbScanFilter *scanfilter = new NdbScanFilter(scan);
714 char *p;
715
716 for (p = str; *p; p++)
717 {
718 if(check_op(*p))
719 {
720 if(scanfilter->begin(get_api_group(*p)))
721 ERR_EXIT(transaction, "filter begin() failed");
722 }
723 if(check_col(*p))
724 {
725 if(scanfilter->eq(col[*p-'i'+1]->getColumnNo(), (Uint32)1))
726 ERR_EXIT(transaction, "filter eq() failed");
727 }
728 if(check_end(*p))
729 {
730 if(scanfilter->end())
731 {
732 NdbError err= scanfilter->getNdbError();
733 printf("Problem closing ScanFilter= %d\n", err.code);
734 ERR_EXIT(transaction, "filter end() failed");
735 }
736 }
737 }
738
739 return scanfilter;
740 }
741
742 /*
743 * get the tuples through ndbapi, and save the tuples NO.
744 * str: a random string of scan opearation and condition
745 */
ndbapi_tuples(Ndb * ndb,char * str,bool * res)746 void ndbapi_tuples(Ndb *ndb, char *str, bool *res)
747 {
748 const NdbDictionary::Dictionary *dict = ndb->getDictionary();
749 if (!dict)
750 ERR_EXIT(ndb, "Can't get dict");
751
752 const NdbDictionary::Table *table = dict->getTable(TABLE_NAME);
753 if (!table)
754 ERR_EXIT(dict, "Can't get table"TABLE_NAME);
755
756 const NdbDictionary::Column *col[COL_LEN];
757
758 for(int ii = 0; ii < COL_LEN; ii++)
759 {
760 char tmp[128];
761 col[ii] = table->getColumn(COL_NAME[ii]);
762 if(!col[ii])
763 {
764 BaseString::snprintf(tmp, 128, "Can't get column %s", COL_NAME[ii]);
765 ERR_EXIT(dict, tmp);
766 }
767 }
768
769 NdbTransaction *transaction;
770 NdbScanOperation *scan;
771 NdbScanFilter *filter;
772
773 transaction = ndb->startTransaction();
774 if (!transaction)
775 ERR_EXIT(ndb, "Can't start transaction");
776
777 scan = transaction->getNdbScanOperation(table);
778 if (!scan)
779 ERR_EXIT(transaction, "Can't get scan op");
780
781 if (scan->readTuples(NdbOperation::LM_Exclusive))
782 ERR_EXIT(scan, "Can't set up read");
783
784 NdbRecAttr *rec[COL_LEN];
785 for(int ii = 0; ii < COL_LEN; ii++)
786 {
787 char tmp[128];
788 rec[ii] = scan->getValue(COL_NAME[ii]);
789 if(!rec[ii])
790 {
791 BaseString::snprintf(tmp, 128, "Can't get rec of %s", COL_NAME[ii]);
792 ERR_EXIT(scan, tmp);
793 }
794 }
795
796 filter = call_ndbapi(str, transaction, scan, col);
797
798 if (transaction->execute(NdbTransaction::NoCommit))
799 ERR_EXIT(transaction, "Can't execute");
800
801 int i,j,k,l,m,n;
802 while (scan->nextResult(true) == 0)
803 {
804 do
805 {
806 i = rec[1]->u_32_value();
807 j = rec[2]->u_32_value();
808 k = rec[3]->u_32_value();
809 l = rec[4]->u_32_value();
810 m = rec[5]->u_32_value();
811 n = rec[6]->u_32_value();
812 res[32*i+16*j+8*k+4*l+2*m+n] = true;
813 } while (scan->nextResult(false) == 0);
814 }
815
816 delete filter;
817 transaction->close();
818 }
819
820 /*
821 * compare the result between calculation and NDBAPI
822 * str: a random string of scan opearation and condition
823 * return: true stands for ndbapi ok, false stands for ndbapi failed
824 */
825 template class Vector<bool>;
compare_cal_ndb(char * str,Ndb * ndb)826 bool compare_cal_ndb(char *str, Ndb *ndb)
827 {
828 Vector<bool> res_cal;
829 Vector<bool> res_ndb;
830
831 for(int i = 0; i < TUPLE_NUM; i++)
832 {
833 res_cal.push_back(false);
834 res_ndb.push_back(false);
835 }
836
837 check_all_tuples(str, res_cal.getBase());
838 ndbapi_tuples(ndb, str, res_ndb.getBase());
839
840 for(int i = 0; i < TUPLE_NUM; i++)
841 {
842 if(res_cal[i] != res_ndb[i])
843 return false;
844 }
845 return true;
846 }
847
848
runCreateTables(NDBT_Context * ctx,NDBT_Step * step)849 int runCreateTables(NDBT_Context* ctx, NDBT_Step* step)
850 {
851 Ndb *pNdb = GETNDB(step);
852 pNdb->getDictionary()->dropTable(MYTAB1.getName());
853 int ret = createTable(pNdb, &MYTAB1, false, true, 0);
854 if(ret)
855 return ret;
856 return NDBT_OK;
857 }
858
859
runDropTables(NDBT_Context * ctx,NDBT_Step * step)860 int runDropTables(NDBT_Context* ctx, NDBT_Step* step)
861 {
862 int ret = GETNDB(step)->getDictionary()->dropTable(MYTAB1.getName());
863 if(ret == -1)
864 return NDBT_FAILED;
865
866 return NDBT_OK;
867 }
868
runScanRandomFilterTest(NDBT_Context * ctx,NDBT_Step * step)869 int runScanRandomFilterTest(NDBT_Context* ctx, NDBT_Step* step)
870 {
871 char random_str[MAX_STR_LEN];
872 Ndb *myNdb = GETNDB(step);
873
874 for(int i = 0; i < TEST_NUM; i++)
875 {
876 get_rand_op_str_compound(random_str);
877 if( !compare_cal_ndb(random_str, myNdb))
878 return NDBT_FAILED;
879 }
880
881 return NDBT_OK;
882 }
883
runMaxScanFilterSize(NDBT_Context * ctx,NDBT_Step * step)884 int runMaxScanFilterSize(NDBT_Context* ctx, NDBT_Step* step)
885 {
886 /* This testcase uses the ScanFilter methods to build a large
887 * scanFilter, checking that ScanFilter building fails
888 * at the expected point, with the correct error message
889 */
890 const Uint32 MaxLength= NDB_MAX_SCANFILTER_SIZE_IN_WORDS;
891
892 const Uint32 InstructionWordsPerEq= 3;
893
894 const Uint32 MaxEqsInScanFilter= MaxLength/InstructionWordsPerEq;
895
896 Ndb *myNdb = GETNDB(step);
897 const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
898 const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
899 if(myTable == NULL)
900 APIERROR(myDict->getNdbError());
901
902 NdbInterpretedCode ic(myTable);
903
904 NdbScanFilter sf(&ic);
905
906 if (sf.begin()) // And group
907 {
908 ndbout << "Bad rc from begin\n";
909 ndbout << sf.getNdbError() << "\n";
910 return NDBT_FAILED;
911 }
912
913 Uint32 loop=0;
914
915 for (;loop < MaxEqsInScanFilter; loop++)
916 {
917 if (sf.eq(0u, 10u))
918 {
919 ndbout << "Bad rc from eq at loop " << loop << "\n";
920 ndbout << sf.getNdbError() << "\n";
921 return NDBT_FAILED;
922 }
923 }
924
925 if (! sf.eq(0u, 10u))
926 {
927 ndbout << "Expected ScanFilter instruction addition to fail after"
928 << MaxEqsInScanFilter << "iterations, but it didn't\n";
929 return NDBT_FAILED;
930 }
931
932 NdbError err=sf.getNdbError();
933
934 if (err.code != 4294)
935 {
936 ndbout << "Expected to get error code 4294, but instead got " << err.code << "\n";
937 return NDBT_FAILED;
938 }
939
940 return NDBT_OK;
941 }
942
943
runScanFilterConstructorFail(NDBT_Context * ctx,NDBT_Step * step)944 int runScanFilterConstructorFail(NDBT_Context* ctx, NDBT_Step* step)
945 {
946 /* We test that failures in the ScanFilter constructor can be
947 * detected by the various ScanFilter methods without
948 * issues
949 */
950 Ndb *myNdb = GETNDB(step);
951 const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
952 const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
953 if(myTable == NULL)
954 APIERROR(myDict->getNdbError());
955
956 NdbTransaction* trans=myNdb->startTransaction();
957
958 if (trans == NULL)
959 {
960 APIERROR(trans->getNdbError());
961 return NDBT_FAILED;
962 }
963
964 /* Create an NdbRecord scan operation */
965 const NdbScanOperation* tabScan=
966 trans->scanTable(myTable->getDefaultRecord());
967
968 if (tabScan==NULL)
969 {
970 APIERROR(trans->getNdbError());
971 return NDBT_FAILED;
972 }
973
974 /* Now we hackily try to add a ScanFilter after the operation
975 * is defined. This will cause a failure within the
976 * constructor
977 */
978 NdbScanFilter brokenSf((NdbScanOperation*) tabScan);
979
980 /* Scan operation should have an error */
981 if (tabScan->getNdbError().code != 4536)
982 {
983 ndbout << "Expected error 4536, had error " <<
984 tabScan->getNdbError().code << " instead" << endl;
985 return NDBT_FAILED;
986 }
987
988 /* ScanFilter should have an error */
989 if (brokenSf.getNdbError().code != 4539)
990 {
991 ndbout << "Expected error 4539, had error " <<
992 brokenSf.getNdbError().code << " instead" << endl;
993 return NDBT_FAILED;
994 }
995
996 if (brokenSf.begin() != -1)
997 { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
998
999 if (brokenSf.istrue() != -1)
1000 { ndbout << "Bad rc from istrue" << endl; return NDBT_FAILED; }
1001
1002 if (brokenSf.isfalse() != -1)
1003 { ndbout << "Bad rc from isfalse" << endl; return NDBT_FAILED; }
1004
1005 if (brokenSf.isnull(0) != -1)
1006 { ndbout << "Bad rc from isnull" << endl; return NDBT_FAILED; }
1007
1008 if (brokenSf.isnotnull(0) != -1)
1009 { ndbout << "Bad rc from isnotnull" << endl; return NDBT_FAILED; }
1010
1011 if (brokenSf.cmp(NdbScanFilter::COND_EQ, 0, NULL, 0) != -1)
1012 { ndbout << "Bad rc from cmp" << endl; return NDBT_FAILED; }
1013
1014 if (brokenSf.end() != -1)
1015 { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
1016
1017 trans->close();
1018
1019 /* Now we check that we can define a ScanFilter before
1020 * calling readTuples() for a scan operation
1021 */
1022 trans= myNdb->startTransaction();
1023
1024 if (trans == NULL)
1025 {
1026 APIERROR(trans->getNdbError());
1027 return NDBT_FAILED;
1028 }
1029
1030 /* Get an old Api table scan operation */
1031 NdbScanOperation* tabScanOp=
1032 trans->getNdbScanOperation(myTable);
1033
1034 if (tabScanOp==NULL)
1035 {
1036 APIERROR(trans->getNdbError());
1037 return NDBT_FAILED;
1038 }
1039
1040 /* Attempt to define a ScanFilter before calling readTuples() */
1041 NdbScanFilter sf(tabScanOp);
1042
1043 /* Should be no problem ... */
1044 if (sf.getNdbError().code != 0)
1045 { APIERROR(sf.getNdbError()); return NDBT_FAILED; };
1046
1047
1048 /* Ok, now attempt to define a ScanFilter against a primary key op */
1049 NdbOperation* pkOp= trans->getNdbOperation(myTable);
1050
1051 if (pkOp == NULL)
1052 {
1053 APIERROR(trans->getNdbError());
1054 return NDBT_FAILED;
1055 }
1056
1057 NdbScanFilter sf2(pkOp);
1058
1059 if (sf2.getNdbError().code != 4539)
1060 {
1061 ndbout << "Error, expected 4539" << endl;
1062 APIERROR(sf2.getNdbError());
1063 return NDBT_FAILED;
1064 }
1065
1066 return NDBT_OK;
1067 }
1068
getBit(const Uint32 * bitMap,int bitNum)1069 bool getBit(const Uint32* bitMap,
1070 int bitNum)
1071 {
1072 return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
1073 }
1074
setBit(Uint32 * bitMap,int bitMapByteSize,int bitNum)1075 void setBit(Uint32* bitMap, int bitMapByteSize,
1076 int bitNum)
1077 {
1078 assert(bitNum >= 0 && bitNum < (bitMapByteSize * 8));
1079 bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
1080 }
1081
1082 enum TestConditions {
1083 COND_LE = 0,
1084 COND_LT = 1,
1085 COND_GE = 2,
1086 COND_GT = 3,
1087 COND_EQ = 4,
1088 COND_NE = 5,
1089 COND_NULL = 6,
1090 COND_NOTNULL = 7,
1091 COND_AND_EQ_MASK = 8,
1092 COND_AND_NE_MASK = 9,
1093 COND_AND_EQ_ZERO = 10,
1094 COND_AND_NE_ZERO = 11
1095 };
1096
getExpectedBitsSet(int rowId,int colBitWidth)1097 int getExpectedBitsSet(int rowId,
1098 int colBitWidth)
1099 {
1100 /* returns value from -1 to colBitWidth -1
1101 * -1 == no bits set
1102 * 0 == bit 0 set
1103 * 1 == bit 0 + bit 1 set
1104 * ...
1105 */
1106 return (rowId % (colBitWidth + 1)) -1;
1107 }
1108
isNullValue(int rowId,int colBitWidth)1109 bool isNullValue(int rowId,
1110 int colBitWidth)
1111 {
1112 /* Occasionally we'll have a Null column */
1113 return (((rowId + colBitWidth) % 13) == 0);
1114 }
1115
getBitfieldVariants(int bitNum,int & offset,bool & invert)1116 void getBitfieldVariants(int bitNum, int& offset, bool& invert)
1117 {
1118 offset= 0;
1119 invert= false;
1120 if ((bitNum % 5) == 3)
1121 {
1122 /* Invert the mask */
1123 invert= true;
1124 }
1125 if ((bitNum % 7) == 6)
1126 {
1127 /* Shift the mask */
1128 offset= (bitNum / 2);
1129 }
1130 };
1131
1132
isRowExpected(int rowId,TestConditions cond,int colBitWidth,int bitsSetInScanFilter,bool isNullable,const Uint32 * maskBuff)1133 bool isRowExpected(int rowId,
1134 TestConditions cond,
1135 int colBitWidth,
1136 int bitsSetInScanFilter,
1137 bool isNullable,
1138 const Uint32* maskBuff)
1139 {
1140 if (isNullable && isNullValue(rowId, colBitWidth))
1141 {
1142 switch (cond) {
1143 case COND_LE:
1144 return true; // null < any value
1145 case COND_LT:
1146 return true; // null < any value
1147 case COND_GE:
1148 return false; // null < any value
1149 case COND_GT:
1150 return false; // null < any value
1151 case COND_EQ:
1152 return false; // null != any value
1153 case COND_NE:
1154 return true; // null != any value
1155 case COND_NULL:
1156 return true; // null is null
1157 case COND_NOTNULL:
1158 return false;
1159 case COND_AND_EQ_MASK:
1160 return false; // NULL AND MASK != MASK
1161 case COND_AND_NE_MASK:
1162 return true; // NULL AND MASK != MASK
1163 case COND_AND_EQ_ZERO:
1164 return false; // NULL AND MASK != 0
1165 case COND_AND_NE_ZERO:
1166 return true; // NULL AND MASK != 0
1167 default:
1168 printf("isRowExpected given bad condition : %u\n",
1169 cond);
1170 return false;
1171 }
1172 }
1173 else
1174 {
1175 /* Not a null value */
1176 int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
1177
1178 int offset= 0;
1179 bool invert= false;
1180
1181 getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
1182
1183 switch (cond) {
1184 case COND_LE:
1185 return expectedBitsSet <= bitsSetInScanFilter;
1186 case COND_LT:
1187 return expectedBitsSet < bitsSetInScanFilter;
1188 case COND_GE:
1189 return expectedBitsSet >= bitsSetInScanFilter;
1190 case COND_GT:
1191 return expectedBitsSet > bitsSetInScanFilter;
1192 case COND_EQ:
1193 return expectedBitsSet == bitsSetInScanFilter;
1194 case COND_NE:
1195 return expectedBitsSet != bitsSetInScanFilter;
1196 case COND_NULL:
1197 return false;
1198 case COND_NOTNULL:
1199 return true;
1200 case COND_AND_EQ_MASK:
1201 case COND_AND_NE_MASK:
1202 {
1203 bool result= true;
1204 /* Compare data AND mask to the mask buff */
1205 for (int idx=0; idx < colBitWidth; idx++)
1206 {
1207 bool bitVal= (expectedBitsSet >= 0)?
1208 (expectedBitsSet >= idx): false;
1209 bool maskVal= getBit(maskBuff, idx);
1210 if ((bitVal & maskVal) != maskVal)
1211 {
1212 /* Difference to mask, know result */
1213 result= false;
1214 break;
1215 }
1216 }
1217
1218 /* Invert result for NE condition */
1219 return (result ^ (cond == COND_AND_NE_MASK));
1220 }
1221 case COND_AND_EQ_ZERO:
1222 case COND_AND_NE_ZERO:
1223 {
1224 bool result= true;
1225 /* Compare data AND mask to zero */
1226 for (int idx=0; idx < colBitWidth; idx++)
1227 {
1228 bool bitVal= (expectedBitsSet >= 0)?
1229 (expectedBitsSet >= idx): false;
1230 bool maskVal= getBit(maskBuff, idx);
1231 if ((bitVal & maskVal) != 0)
1232 {
1233 /* Difference to 0, know result */
1234 result= false;
1235 break;
1236 }
1237 }
1238 /* Invert result for NE condition */
1239 return (result ^ (cond == COND_AND_NE_ZERO));
1240 }
1241 default:
1242 printf("isRowExpected given bad condition : %u\n",
1243 cond);
1244 return false;
1245 }
1246 }
1247 }
1248
insertBitRows(Ndb * pNdb)1249 int insertBitRows(Ndb* pNdb)
1250 {
1251 const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1252 const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1253 if(myTable == NULL)
1254 APIERROR(myDict->getNdbError());
1255
1256 for (int i=0; i< TOTAL_ROWS; i++)
1257 {
1258 NdbTransaction* myTrans = pNdb->startTransaction();
1259 if (myTrans == NULL)
1260 APIERROR(pNdb->getNdbError());
1261
1262 NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
1263 if (insertOp == NULL)
1264 APIERROR(pNdb->getNdbError());
1265
1266 const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
1267 Uint32 buff[ buffSize ];
1268
1269 if (insertOp->insertTuple() != 0)
1270 APIERROR(insertOp->getNdbError());
1271
1272 if (insertOp->equal((Uint32)0, i) != 0) // Set id column
1273 APIERROR(insertOp->getNdbError());
1274
1275 for (int col=1; col < myTable->getNoOfColumns(); col++)
1276 {
1277 const NdbDictionary::Column* c= myTable->getColumn(col);
1278 int colBitWidth= c->getLength();
1279 bool isNullable= c->getNullable();
1280
1281 if (isNullable &&
1282 isNullValue(i, colBitWidth))
1283 {
1284 /* Set column value to NULL */
1285 if (insertOp->setValue(col, (char*) NULL) != 0)
1286 APIERROR(insertOp->getNdbError());
1287 }
1288 else
1289 {
1290 /* Set lowest bits in this column */
1291 memset(buff, 0, (4 * buffSize));
1292
1293 int bitsToSet= getExpectedBitsSet(i, colBitWidth);
1294
1295 if (bitsToSet >= 0)
1296 {
1297 for (int idx=0; idx <= bitsToSet; idx++)
1298 setBit(buff, sizeof(buff), idx);
1299 }
1300
1301 if (insertOp->setValue(col, (char *)buff) != 0)
1302 APIERROR(insertOp->getNdbError());
1303 }
1304 }
1305
1306 if (myTrans->execute(NdbTransaction::Commit) != 0)
1307 {
1308 APIERROR(myTrans->getNdbError());
1309 }
1310 myTrans->close();
1311 }
1312
1313 printf("Inserted %u rows\n", TOTAL_ROWS);
1314
1315 return NDBT_OK;
1316 }
1317
verifyBitData(Ndb * pNdb)1318 int verifyBitData(Ndb* pNdb)
1319 {
1320 const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1321 const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1322 if(myTable == NULL)
1323 APIERROR(myDict->getNdbError());
1324
1325 NdbTransaction* myTrans= pNdb->startTransaction();
1326 if (myTrans == NULL)
1327 APIERROR(pNdb->getNdbError());
1328
1329 NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1330 if (scanOp == NULL)
1331 APIERROR(pNdb->getNdbError());
1332
1333 if (scanOp->readTuples() != 0)
1334 APIERROR(scanOp->getNdbError());
1335
1336 NdbRecAttr* results[ NUM_COLS ];
1337
1338 for (int col=0; col< NUM_COLS; col++)
1339 {
1340 if ((results[col]= scanOp->getValue(col)) == NULL)
1341 APIERROR(scanOp->getNdbError());
1342 }
1343
1344 if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1345 APIERROR(myTrans->getNdbError());
1346
1347 for (int row=0; row < TOTAL_ROWS; row++)
1348 {
1349 if (scanOp->nextResult() != 0)
1350 APIERROR(scanOp->getNdbError());
1351
1352 int rowId= results[0]->int32_value();
1353
1354 for (int col=1; col < NUM_COLS; col++)
1355 {
1356 const NdbDictionary::Column* c= myTable->getColumn(col);
1357 bool isNullable= c->getNullable();
1358 int colBitWidth= c->getLength();
1359
1360 if (isNullable &&
1361 isNullValue(rowId, colBitWidth))
1362 {
1363 if (!results[ col ] ->isNULL())
1364 {
1365 printf("Mismatch at result %d row %d, column %d, expected NULL\n",
1366 row, rowId, col);
1367 myTrans->close();
1368 return NDBT_FAILED;
1369 }
1370 }
1371 else
1372 {
1373 /* Non null value, check it */
1374 int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
1375
1376 const Uint32* val= (const Uint32 *) results[ col ]->aRef();
1377
1378 for (int bitNum=0; bitNum < colBitWidth; bitNum++)
1379 {
1380 bool expectClear= (bitNum > expectedSetBits);
1381 bool isClear= ! getBit(val, bitNum);
1382 if (expectClear != isClear)
1383 {
1384 printf("Mismatch at result %d row %d, column %d, bit %d"
1385 " expected %d \n",
1386 row, rowId, col, bitNum, (expectClear)?0:1);
1387 myTrans->close();
1388 return NDBT_FAILED;
1389 }
1390 }
1391 }
1392 }
1393 }
1394
1395 if (scanOp->nextResult() != 1)
1396 {
1397 printf("Too many rows returned\n");
1398 return NDBT_FAILED;
1399 }
1400
1401 if (myTrans->execute(NdbTransaction::Commit) != 0)
1402 APIERROR(myTrans->getNdbError());
1403
1404 myTrans->close();
1405
1406 printf("Verified data for %u rows\n",
1407 TOTAL_ROWS);
1408
1409 return NDBT_OK;
1410 }
1411
verifyBitScanFilter(Ndb * pNdb)1412 int verifyBitScanFilter(Ndb* pNdb)
1413 {
1414 const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1415 const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1416 if(myTable == NULL)
1417 APIERROR(myDict->getNdbError());
1418
1419 /* Perform scan with scanfilter for :
1420 * - each column in the table
1421 * - each supported comparison type
1422 * - each potentially set bit in the column
1423 */
1424 int scanCount= 0;
1425 for (int col=1; col < NUM_COLS; col++)
1426 {
1427 const NdbDictionary::Column* c= myTable->getColumn(col);
1428 const int bitWidth= c->getLength();
1429 printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
1430 (c->getNullable())?"Nullable":"Non-null",
1431 col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
1432 for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
1433 {
1434 for (int bitNum=0; bitNum <= bitWidth; bitNum++)
1435 {
1436 /* Define scan */
1437 NdbTransaction* myTrans= pNdb->startTransaction();
1438 if (myTrans == NULL)
1439 APIERROR(pNdb->getNdbError());
1440
1441 NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1442 if (scanOp == NULL)
1443 APIERROR(pNdb->getNdbError());
1444
1445 if (scanOp->readTuples() != 0)
1446 APIERROR(scanOp->getNdbError());
1447
1448 NdbRecAttr* ra;
1449 if ((ra= scanOp->getValue((Uint32)0)) == NULL)
1450 APIERROR(scanOp->getNdbError());
1451
1452 /* Define ScanFilter */
1453 const int buffSize= (MAX_BIT_WIDTH + 31)/32;
1454 Uint32 buff[ buffSize ];
1455 memset(buff, 0, (4 * buffSize));
1456
1457 /* Define constant value, with some variants for bitwise operators */
1458 bool invert= false;
1459 int offset= 0;
1460
1461 switch (comp) {
1462 case COND_AND_EQ_MASK:
1463 case COND_AND_NE_MASK:
1464 case COND_AND_EQ_ZERO:
1465 case COND_AND_NE_ZERO:
1466 getBitfieldVariants(bitNum, offset, invert);
1467 default:
1468 ;
1469 }
1470
1471 /* Set lower bitNum -1 bits
1472 * If bitNum == 0, set none
1473 */
1474 int bitsSetInFilter= bitNum-1;
1475
1476 if ((bitsSetInFilter >= 0) ||
1477 invert)
1478 {
1479 for (int idx=0; idx < (32 * buffSize); idx++)
1480 {
1481 if ((idx >= offset) &&
1482 (idx <= (offset + bitsSetInFilter)))
1483 {
1484 if (!invert)
1485 setBit(buff, sizeof(buff), idx);
1486 }
1487 else
1488 {
1489 if (invert)
1490 setBit(buff, sizeof(buff), idx);
1491 }
1492 }
1493 }
1494
1495
1496
1497 NdbScanFilter sf(scanOp);
1498
1499 if (sf.begin(NdbScanFilter::AND) != 0)
1500 APIERROR(sf.getNdbError());
1501
1502 if ((comp != COND_NULL) &&
1503 (comp != COND_NOTNULL))
1504 {
1505 /* Operator with a constant */
1506 if (sf.cmp((NdbScanFilter::BinaryCondition)comp,
1507 col,
1508 (char*)buff) != 0)
1509 APIERROR(sf.getNdbError());
1510 }
1511 else
1512 {
1513 switch (comp) {
1514 case COND_NULL:
1515 if (sf.isnull(col) != 0)
1516 APIERROR(sf.getNdbError());
1517 break;
1518 case COND_NOTNULL:
1519 if (sf.isnotnull(col) != 0)
1520 APIERROR(sf.getNdbError());
1521 break;
1522 default:
1523 printf("Condition %u not supported\n", comp);
1524 return NDBT_FAILED;
1525 }
1526 }
1527
1528 if (sf.end() != 0)
1529 APIERROR(sf.getNdbError());
1530
1531 /* Calculate expected number of rows in result */
1532 const NdbDictionary::Column* c= myTable->getColumn(col);
1533 int colBitWidth= c->getLength();
1534 bool isNullable= c->getNullable();
1535
1536 // printf("Determining expected rows\n");
1537 int expectedResultCount= 0;
1538 for (int i=0; i< TOTAL_ROWS; i++)
1539 {
1540 if (isRowExpected(i,
1541 (TestConditions)comp,
1542 colBitWidth,
1543 bitsSetInFilter,
1544 isNullable,
1545 buff))
1546 expectedResultCount++;
1547 }
1548
1549
1550 /* Execute */
1551 if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1552 APIERROR(myTrans->getNdbError());
1553
1554
1555 /* Process results to ensure we got the expected rows back */
1556 int rc= 0;
1557 int count= 0;
1558 int matchCount= 0;
1559 // printf("Checking rows returned\n");
1560 while ((rc= scanOp->nextResult()) == 0)
1561 {
1562 int rowId= ra->int32_value();
1563 count++;
1564 /*
1565 * Check that this row was expected
1566 */
1567 if (isRowExpected(rowId,
1568 (TestConditions)comp,
1569 colBitWidth,
1570 bitsSetInFilter,
1571 isNullable,
1572 buff))
1573 {
1574 matchCount++;
1575 }
1576 else
1577 {
1578 printf("Col=%u Comp=%u BitNum=%u row=%u : "
1579 "Got row %u back which I did not expect\n",
1580 col, comp, bitNum, count, rowId);
1581 myTrans->close();
1582 return NDBT_FAILED;
1583 }
1584 }
1585
1586 if (rc != 1)
1587 {
1588 printf("Col=%u Comp=%u BitNum=%u :"
1589 "nextResult failure %d\n", col, comp, bitNum, rc);
1590 APIERROR(myTrans->getNdbError());
1591 }
1592
1593 //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
1594 // col, comp, bitNum, expectedResultCount, matchCount);
1595
1596 /* Check that we didn't miss any expected rows */
1597 if (matchCount != expectedResultCount)
1598 {
1599 printf("Col=%u Comp=%u BitNum=%u :"
1600 "Mismatch between expected(%u) and received(%u) result counts\n",
1601 col, comp, bitNum, expectedResultCount, matchCount);
1602 myTrans->close();
1603 return NDBT_FAILED;
1604 }
1605
1606 if (myTrans->execute(NdbTransaction::Commit) != 0)
1607 APIERROR(myTrans->getNdbError());
1608
1609 myTrans->close();
1610
1611 scanCount++;
1612
1613 } // for bitNum
1614 } // for comparison
1615 } // for column
1616
1617 printf("Verified %u scans with bitfield ScanFilter conditions\n",
1618 scanCount);
1619
1620 return NDBT_OK;
1621 }
1622
1623
runTestScanFilterBit(NDBT_Context * ctx,NDBT_Step * step)1624 int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
1625 {
1626 /* Create table */
1627 Ndb *pNdb = GETNDB(step);
1628 pNdb->getDictionary()->dropTable(MYTAB2.getName());
1629 int ret = createTable(pNdb, &MYTAB2, false, true, 0);
1630 if(ret)
1631 return ret;
1632
1633 /* Populate with data */
1634 if (insertBitRows(pNdb) != NDBT_OK)
1635 return NDBT_FAILED;
1636
1637 /* Initial data check via scan */
1638 if (verifyBitData(pNdb) != NDBT_OK)
1639 return NDBT_FAILED;
1640
1641 /* Verify Bit ScanFilter correctness */
1642 if (verifyBitScanFilter(pNdb) != NDBT_OK)
1643 return NDBT_FAILED;
1644
1645 /* Drop table */
1646 pNdb->getDictionary()->dropTable(MYTAB2.getName());
1647
1648 return NDBT_OK;
1649 }
1650
1651
1652 NDBT_TESTSUITE(testScanFilter);
1653 TESTCASE(TEST_NAME,
1654 "Scan table TABLE_NAME for the records which accord with \
1655 conditions of logical scan operations: AND/OR/NAND/NOR")
1656 {
1657 INITIALIZER(runCreateTables);
1658 INITIALIZER(runPopulate);
1659 INITIALIZER(runScanRandomFilterTest);
1660 INITIALIZER(runMaxScanFilterSize);
1661 INITIALIZER(runScanFilterConstructorFail);
1662 FINALIZER(runDropTables);
1663 }
1664
1665 TESTCASE("TestScanFilterBit",
1666 "Test ScanFilter with bitfield columns")
1667 {
1668 INITIALIZER(runTestScanFilterBit);
1669 }
1670
1671 NDBT_TESTSUITE_END(testScanFilter);
1672
1673
main(int argc,const char ** argv)1674 int main(int argc, const char** argv)
1675 {
1676 ndb_init();
1677
1678 Ndb_cluster_connection con;
1679 if(con.connect(12, 5, 1))
1680 {
1681 return NDBT_ProgramExit(NDBT_FAILED);
1682 }
1683
1684 NDBT_TESTSUITE_INSTANCE(testScanFilter);
1685 return testScanFilter.execute(argc, argv);
1686 }
1687