1 /*
2    Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NDBT.hpp>
26 #include <NDBT_Test.hpp>
27 
28 #define ERR_EXIT(obj, msg) \
29 do \
30 { \
31 fprintf(stderr, "%s: %s (%d) in %s:%d\n", \
32 msg, obj->getNdbError().message, obj->getNdbError().code, __FILE__, __LINE__); \
33 exit(-1); \
34 } \
35 while (0);
36 
37 #define PRINT_ERROR(code,msg) \
38 do \
39 { \
40 fprintf(stderr, "Error in %s, line: %d, code: %d, msg: %s.\n", __FILE__, __LINE__, code, msg); \
41 } \
42 while (0);
43 
44 #define MYSQLERROR(mysql) { \
45   PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
46   exit(-1); }
47 #define APIERROR(error) { \
48   PRINT_ERROR(error.code,error.message); \
49   exit(-1); }
50 
51 #define TEST_NAME "TestScanFilter"
52 #define TABLE_NAME "TABLE_SCAN"
53 
54 const char *COL_NAME[] = {"id", "i", "j", "k", "l", "m", "n"};
55 const char COL_LEN = 7;
56 /*
57 * Not to change TUPLE_NUM, because the column in TABLE_NAME is fixed,
58 * there are six columns, 'i', 'j', 'k', 'l', 'm', 'n', and each on is equal to 1 or 1,
59 * Since each tuple should be unique in this case, then TUPLE_NUM = 2 power 6 = 64
60 */
61 const int TUPLE_NUM = 1 << (COL_LEN - 1);
62 
63 /*
64 * the recursive level of random scan filter, can
65 * modify this parameter more or less, range from
66 * 1 to 100, larger num consumes more scan time
67 */
68 const int RECURSIVE_LEVEL = 10;
69 
70 const int MAX_STR_LEN = (RECURSIVE_LEVEL * (COL_LEN+1) * 4);
71 
72 /*
73 * Each time stands for one test, it will produce a random
74 * filter string, and scan through ndb api and through
75 * calculation with tuples' data, then compare the result,
76 * if they are equal, this test passed, or failed.
77 * Only all TEST_NUM times tests passed, we can believe
78 * the suite of test cases are okay.
79 * Change TEST_NUM to larger will need more time to test
80 */
81 const int TEST_NUM = 5000;
82 
83 
84 /* Table definition*/
85 static
86 const
87 NDBT_Attribute MYTAB1Attribs[] = {
88   NDBT_Attribute("id", NdbDictionary::Column::Unsigned, 1, true),
89   NDBT_Attribute("i", NdbDictionary::Column::Unsigned),
90   NDBT_Attribute("j", NdbDictionary::Column::Unsigned),
91   NDBT_Attribute("k", NdbDictionary::Column::Unsigned),
92   NDBT_Attribute("l", NdbDictionary::Column::Unsigned),
93   NDBT_Attribute("m", NdbDictionary::Column::Unsigned),
94   NDBT_Attribute("n", NdbDictionary::Column::Unsigned),
95 };
96 static
97 const
98 NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
99 
100 static
101 const
102 NDBT_Attribute MYTAB2Attribs[] = {
103   NDBT_Attribute("id",        NdbDictionary::Column::Unsigned, 1, true),
104   //                                                         _pk    _nullable
105   NDBT_Attribute("1bitnn",    NdbDictionary::Column::Bit, 1, false, false),
106   NDBT_Attribute("1bitnu",    NdbDictionary::Column::Bit, 1, false, true),
107   NDBT_Attribute("2bitnn",    NdbDictionary::Column::Bit, 2, false, false),
108   NDBT_Attribute("2bitnu",    NdbDictionary::Column::Bit, 2, false, true),
109   NDBT_Attribute("7bitnn",    NdbDictionary::Column::Bit, 7, false, false),
110   NDBT_Attribute("7bitnu",    NdbDictionary::Column::Bit, 7, false, true),
111   NDBT_Attribute("8bitnn",    NdbDictionary::Column::Bit, 8, false, false),
112   NDBT_Attribute("8bitnu",    NdbDictionary::Column::Bit, 8, false, true),
113   NDBT_Attribute("15bitnn",   NdbDictionary::Column::Bit, 15, false, false),
114   NDBT_Attribute("15bitnu",   NdbDictionary::Column::Bit, 15, false, true),
115   NDBT_Attribute("31bitnn",   NdbDictionary::Column::Bit, 31, false, false),
116   NDBT_Attribute("31bitnu",   NdbDictionary::Column::Bit, 31, false, true),
117   NDBT_Attribute("32bitnn",   NdbDictionary::Column::Bit, 32, false, false),
118   NDBT_Attribute("32bitnu",   NdbDictionary::Column::Bit, 32, false, true),
119   NDBT_Attribute("33bitnn",   NdbDictionary::Column::Bit, 33, false, false),
120   NDBT_Attribute("33bitnu",   NdbDictionary::Column::Bit, 33, false, true),
121   NDBT_Attribute("63bitnn",   NdbDictionary::Column::Bit, 63, false, false),
122   NDBT_Attribute("63bitnu",   NdbDictionary::Column::Bit, 63, false, true),
123   NDBT_Attribute("64bitnn",   NdbDictionary::Column::Bit, 64, false, false),
124   NDBT_Attribute("64bitnu",   NdbDictionary::Column::Bit, 64, false, true),
125   NDBT_Attribute("65bitnn",   NdbDictionary::Column::Bit, 65, false, false),
126   NDBT_Attribute("65bitnu",   NdbDictionary::Column::Bit, 65, false, true),
127   NDBT_Attribute("127bitnn",  NdbDictionary::Column::Bit, 127, false, false),
128   NDBT_Attribute("127bitnu",  NdbDictionary::Column::Bit, 127, false, true),
129   NDBT_Attribute("513bitnn",   NdbDictionary::Column::Bit, 513, false, false),
130   NDBT_Attribute("513bitnu",   NdbDictionary::Column::Bit, 513, false, true)
131 };
132 
133 static const char* TABLE2_NAME= "MyTab2";
134 
135 static
136 const
137 NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
138 
139 static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
140 static const int MAX_BIT_WIDTH= 513;
141 /* One extra row for all bits == 0 */
142 static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
143 
createTable(Ndb * pNdb,const NdbDictionary::Table * tab,bool _temp,bool existsOk,NDBT_CreateTableHook f)144 int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp,
145 			 bool existsOk, NDBT_CreateTableHook f)
146 {
147   int r = 0;
148   do{
149     NdbDictionary::Table tmpTab(* tab);
150     tmpTab.setStoredTable(_temp ? 0 : 1);
151     if(f != 0 && f(pNdb, tmpTab, 0, NULL))
152     {
153       ndbout << "Failed to create table" << endl;
154       return NDBT_FAILED;
155     }
156     r = pNdb->getDictionary()->createTable(tmpTab);
157     if(r == -1){
158       if(!existsOk){
159 	ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
160 	break;
161       }
162       if(pNdb->getDictionary()->getNdbError().code != 721){
163 	ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
164 	break;
165       }
166       r = 0;
167     }
168   }while(false);
169 
170   return r;
171 }
172 
173 /*
174 * Function to produce the tuples' data
175 */
runPopulate(NDBT_Context * ctx,NDBT_Step * step)176 int runPopulate(NDBT_Context* ctx, NDBT_Step* step)
177 {
178   Ndb *myNdb = GETNDB(step);
179   const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
180   const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
181   if(myTable == NULL)
182     APIERROR(myDict->getNdbError());
183 
184   NdbTransaction* myTrans = myNdb->startTransaction();
185   if (myTrans == NULL)
186     APIERROR(myNdb->getNdbError());
187 
188   for(int num = 0; num < TUPLE_NUM; num++)
189   {
190     NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
191     if(myNdbOperation == NULL)
192     {
193       APIERROR(myTrans->getNdbError());
194     }
195 
196 /* the tuples' data in TABLE_NAME
197 +----+---+---+---+---+---+---+
198 | id | i | j | k | l | m | n |
199 +----+---+---+---+---+---+---+
200 |  0 | 0 | 0 | 0 | 0 | 0 | 0 |
201 |  1 | 0 | 0 | 0 | 0 | 0 | 1 |
202 |  2 | 0 | 0 | 0 | 0 | 1 | 0 |
203 |  3 | 0 | 0 | 0 | 0 | 1 | 1 |
204 |  4 | 0 | 0 | 0 | 1 | 0 | 0 |
205 |  5 | 0 | 0 | 0 | 1 | 0 | 1 |
206 |  6 | 0 | 0 | 0 | 1 | 1 | 0 |
207 |  7 | 0 | 0 | 0 | 1 | 1 | 1 |
208 |  8 | 0 | 0 | 1 | 0 | 0 | 0 |
209 |  9 | 0 | 0 | 1 | 0 | 0 | 1 |
210 | 10 | 0 | 0 | 1 | 0 | 1 | 0 |
211 | 11 | 0 | 0 | 1 | 0 | 1 | 1 |
212 | 12 | 0 | 0 | 1 | 1 | 0 | 0 |
213 | 13 | 0 | 0 | 1 | 1 | 0 | 1 |
214 | 14 | 0 | 0 | 1 | 1 | 1 | 0 |
215 | 15 | 0 | 0 | 1 | 1 | 1 | 1 |
216 | 16 | 0 | 1 | 0 | 0 | 0 | 0 |
217 | 17 | 0 | 1 | 0 | 0 | 0 | 1 |
218 | 18 | 0 | 1 | 0 | 0 | 1 | 0 |
219 | 19 | 0 | 1 | 0 | 0 | 1 | 1 |
220 | 20 | 0 | 1 | 0 | 1 | 0 | 0 |
221 | 21 | 0 | 1 | 0 | 1 | 0 | 1 |
222 | 22 | 0 | 1 | 0 | 1 | 1 | 0 |
223 | 23 | 0 | 1 | 0 | 1 | 1 | 1 |
224 | 24 | 0 | 1 | 1 | 0 | 0 | 0 |
225 | 25 | 0 | 1 | 1 | 0 | 0 | 1 |
226 | 26 | 0 | 1 | 1 | 0 | 1 | 0 |
227 | 27 | 0 | 1 | 1 | 0 | 1 | 1 |
228 | 28 | 0 | 1 | 1 | 1 | 0 | 0 |
229 | 29 | 0 | 1 | 1 | 1 | 0 | 1 |
230 | 30 | 0 | 1 | 1 | 1 | 1 | 0 |
231 | 31 | 0 | 1 | 1 | 1 | 1 | 1 |
232 | 32 | 1 | 0 | 0 | 0 | 0 | 0 |
233 | 33 | 1 | 0 | 0 | 0 | 0 | 1 |
234 | 34 | 1 | 0 | 0 | 0 | 1 | 0 |
235 | 35 | 1 | 0 | 0 | 0 | 1 | 1 |
236 | 36 | 1 | 0 | 0 | 1 | 0 | 0 |
237 | 37 | 1 | 0 | 0 | 1 | 0 | 1 |
238 | 38 | 1 | 0 | 0 | 1 | 1 | 0 |
239 | 39 | 1 | 0 | 0 | 1 | 1 | 1 |
240 | 40 | 1 | 0 | 1 | 0 | 0 | 0 |
241 | 41 | 1 | 0 | 1 | 0 | 0 | 1 |
242 | 42 | 1 | 0 | 1 | 0 | 1 | 0 |
243 | 43 | 1 | 0 | 1 | 0 | 1 | 1 |
244 | 44 | 1 | 0 | 1 | 1 | 0 | 0 |
245 | 45 | 1 | 0 | 1 | 1 | 0 | 1 |
246 | 46 | 1 | 0 | 1 | 1 | 1 | 0 |
247 | 47 | 1 | 0 | 1 | 1 | 1 | 1 |
248 | 48 | 1 | 1 | 0 | 0 | 0 | 0 |
249 | 49 | 1 | 1 | 0 | 0 | 0 | 1 |
250 | 50 | 1 | 1 | 0 | 0 | 1 | 0 |
251 | 51 | 1 | 1 | 0 | 0 | 1 | 1 |
252 | 52 | 1 | 1 | 0 | 1 | 0 | 0 |
253 | 53 | 1 | 1 | 0 | 1 | 0 | 1 |
254 | 54 | 1 | 1 | 0 | 1 | 1 | 0 |
255 | 55 | 1 | 1 | 0 | 1 | 1 | 1 |
256 | 56 | 1 | 1 | 1 | 0 | 0 | 0 |
257 | 57 | 1 | 1 | 1 | 0 | 0 | 1 |
258 | 58 | 1 | 1 | 1 | 0 | 1 | 0 |
259 | 59 | 1 | 1 | 1 | 0 | 1 | 1 |
260 | 60 | 1 | 1 | 1 | 1 | 0 | 0 |
261 | 61 | 1 | 1 | 1 | 1 | 0 | 1 |
262 | 62 | 1 | 1 | 1 | 1 | 1 | 0 |
263 | 63 | 1 | 1 | 1 | 1 | 1 | 1 |
264 +----+---+---+---+---+---+---+
265 */
266     myNdbOperation->insertTuple();
267     myNdbOperation->equal(COL_NAME[0], num);
268     for(int col = 1; col < COL_LEN; col++)
269     {
270       myNdbOperation->setValue(COL_NAME[col], (num>>(COL_LEN-1-col))&1);
271     }
272   }
273 
274   int check = myTrans->execute(NdbTransaction::Commit);
275 
276   myTrans->close();
277 
278   if (check == -1)
279     return NDBT_FAILED;
280   else
281     return NDBT_OK;
282 
283 }
284 
285 
286 
287 /*
288 * a=AND, o=OR, A=NAND, O=NOR
289 */
290 char op_string[] = "aoAO";
291 /*
292 * the six columns' name of test table
293 */
294 char col_string[] = "ijklmn";
295 const int op_len = strlen(op_string);
296 const int col_len = strlen(col_string);
297 
298 /*
299 * get a random op from "aoAO"
300 */
get_rand_op_ch(char * ch)301 int get_rand_op_ch(char *ch)
302 {
303   static unsigned int num = 0;
304   if(++num == 0)
305     num = 1;
306   srand(num*(unsigned int)time(NULL));
307   *ch = op_string[rand() % op_len];
308   return 1;
309 }
310 
311 /*
312 * get a random order form of "ijklmn" trough exchanging letter
313 */
change_col_order()314 void change_col_order()
315 {
316   int pos1,pos2;
317   char temp;
318   for (int i = 0; i < 10; i++)  //exchange for 10 times
319   {
320     srand((unsigned int)time(NULL)/(i+1));
321     pos1 = rand() % col_len;
322     srand((i+1)*(unsigned int)time(NULL));
323     pos2 = rand() % col_len;
324     if (pos1 == pos2)
325       continue;
326     temp = col_string[pos1];
327     col_string[pos1] = col_string[pos2];
328     col_string[pos2] = temp;
329   }
330 }
331 
332 /*
333 * get a random sub string of "ijklmn"
334 */
get_rand_col_str(char * str)335 int get_rand_col_str(char *str)
336 {
337   int len;
338   static unsigned int num = 0;
339   if(++num == 0)
340     num = 1;
341   srand(num*(unsigned int)time(NULL));
342   len = rand() % col_len + 1;
343   change_col_order();
344   BaseString::snprintf(str, len+1, "%s", col_string);  //len+1, including '\0'
345   return len;
346 }
347 
348 /*
349 * get a random string including operation and column
350 * eg, Alnikx
351 */
get_rand_op_str(char * str)352 int get_rand_op_str(char *str)
353 {
354   char temp[256];
355   int len1, len2, len;
356   len1 = get_rand_op_ch(temp);
357   len2 = get_rand_col_str(temp+len1);
358   len = len1 + len2;
359   temp[len] = 'x';
360   temp[len+1] = '\0'; // Add ending \0
361   BaseString::snprintf(str, len+1+1, "%s", temp);  //len+1, including '\0'
362   return len+1;
363 }
364 
365 /*
366 * replace a letter of source string with a new string
367 * e.g., source string: 'Aijkx', replace i with new string 'olmx'
368 * then source string is changed to 'Aolmxjkx'
369 * source: its format should be produced from get_rand_op_str()
370 * pos: range from 1 to strlen(source)-2
371 */
replace_a_to_str(char * source,int pos,char * newstr)372 int replace_a_to_str(char *source, int pos, char *newstr)
373 {
374   char temp[MAX_STR_LEN];
375   BaseString::snprintf(temp, pos+1, "%s", source);
376   BaseString::snprintf(temp+pos, strlen(newstr)+1, "%s", newstr);
377   BaseString::snprintf(temp+pos+strlen(newstr), strlen(source)-pos, "%s", source+pos+1);
378   BaseString::snprintf(source, strlen(temp)+1, "%s", temp);
379   return strlen(source);
380 }
381 
382 /*
383 * check whether the inputed char is an operation
384 */
check_op(char ch)385 bool check_op(char ch)
386 {
387   if( ch == 'a' || ch == 'A' || ch == 'o' || ch == 'O')
388     return true;
389   else
390     return false;
391 }
392 
393 /*
394 * check whether the inputed char is end flag
395 */
check_end(char ch)396 bool check_end(char ch)
397 {
398   return (ch == 'x');
399 }
400 
401 /*
402 * check whether the inputed char is end flag
403 */
check_col(char ch)404 bool check_col(char ch)
405 {
406   if( ch == 'i' || ch == 'j' || ch == 'k'
407     || ch == 'l' || ch == 'm' || ch == 'n' )
408     return true;
409   else
410     return false;
411 }
412 
413 /*
414 * To ensure we can get a random string with RECURSIVE_LEVEL,
415 * we need a position where can replace a letter with a new string.
416 */
get_rand_replace_pos(char * str,int len)417 int get_rand_replace_pos(char *str, int len)
418 {
419   int pos_op = 0;
420   int pos_x = 0;
421   int pos_col = 0;
422   int span = 0;
423   static int num = 0;
424   char temp;
425 
426   for(int i = 0; i < len; i++)
427   {
428     temp = str[i];
429     if(! check_end(temp))
430     {
431       if(check_op(temp))
432         pos_op = i;
433     }
434     else
435     {
436       pos_x = i;
437       break;
438     }
439   }
440 
441   if(++num == 0)
442     num = 1;
443 
444   span = pos_x - pos_op - 1;
445   if(span <= 1)
446   {
447     pos_col = pos_op + 1;
448   }
449   else
450   {
451     srand(num*(unsigned int)time(NULL));
452     pos_col = pos_op + rand() % span + 1;
453   }
454   return pos_col;
455 }
456 
457 /*
458 * Check whether the given random string is valid
459 * and applicable for this test case
460 */
check_random_str(char * str)461 bool check_random_str(char *str)
462 {
463   char *p;
464   int op_num = 0;
465   int end_num = 0;
466 
467   for(p = str; *p; p++)
468   {
469     bool tmp1 = false, tmp2 = false;
470     if ((tmp1 = check_op(*p)))
471       op_num++;
472     if ((tmp2 = check_end(*p)))
473       end_num++;
474     if (!(tmp1 || tmp2 || check_col(*p)))   //there are illegal letters
475       return false;
476   }
477 
478   if(op_num != end_num)     //begins are not equal to ends
479     return false;
480 
481   return true;
482 }
483 
484 /*
485 * Get a random string with RECURSIVE_LEVEL
486 */
get_rand_op_str_compound(char * str)487 void get_rand_op_str_compound(char *str)
488 {
489   char small_str[256];
490   int pos;
491   int tmp;
492   int level;
493   static int num = 0;
494 
495   if(++num == 0)
496     num = 1;
497 
498   srand(num*(unsigned int)time(NULL));
499   level = 1 + rand() % RECURSIVE_LEVEL;
500 
501   get_rand_op_str(str);
502 
503   for(int i = 0; i < level; i++)
504   {
505     get_rand_op_str(small_str);
506     tmp = strlen(small_str);
507     get_rand_op_str(small_str + tmp);   //get two operations
508     pos = get_rand_replace_pos(str, strlen(str));
509     replace_a_to_str(str, pos, small_str);
510   }
511 
512   //check the random string
513   if(!check_random_str(str))
514   {
515     fprintf(stderr, "Error random string! \n");
516     exit(-1);
517   }
518 }
519 
520 /*
521 * get column id of i,j,k,l,m,n
522 */
get_column_id(char ch)523 int get_column_id(char ch)
524 {
525   return (ch - 'i' + 1);  //from 1 to 6
526 }
527 
528 /*
529 * check whether column value of the NO. tuple is equal to 1
530 * col_id: column id, range from 1 to 6
531 * tuple_no: record NO., range from 0 to 63
532 */
check_col_equal_one(int tuple_no,int col_id)533 bool check_col_equal_one(int tuple_no, int col_id)
534 {
535   int i = 1 << (6 - col_id);
536   int j = tuple_no / i;
537   if(j % 2)
538     return true;
539   else
540     return false;
541 }
542 
543 /*
544 * get a result after all elements in the array with AND
545 * value: pointer to a bool array
546 * len: length of the bool array
547 */
AND_op(bool * value,int len)548 bool AND_op(bool *value, int len)
549 {
550   for(int i = 0; i < len; i++)
551   {
552     if(! value[i])
553       return false;
554   }
555   return true;
556 }
557 
558 /*
559 * get a result after all elements in the array with OR
560 * value: pointer to a bool array
561 * len: length of the bool array
562 */
OR_op(bool * value,int len)563 bool OR_op(bool *value, int len)
564 {
565   for(int i = 0; i < len; i++)
566   {
567     if(value[i])
568       return true;
569   }
570   return false;
571 }
572 
573 /*
574 * get a result after all elements in the array with NAND
575 * value: pointer to a bool array
576 * len: length of the bool array
577 */
NAND_op(bool * value,int len)578 bool NAND_op(bool *value, int len)
579 {
580   return (! AND_op(value, len));
581 }
582 
583 /*
584 * get a result after all elements in the array with NOR
585 * value: pointer to a bool array
586 * len: length of the bool array
587 */
NOR_op(bool * value,int len)588 bool NOR_op(bool *value, int len)
589 {
590   return (! OR_op(value, len));
591 }
592 
593 /*
594 * AND/NAND/OR/NOR operation for a bool array
595 */
calculate_one_op(char op_type,bool * value,int len)596 bool calculate_one_op(char op_type, bool *value, int len)
597 {
598   switch(op_type)
599   {
600     case 'a':
601       return AND_op(value, len);
602       break;
603     case 'o':
604       return OR_op(value, len);
605       break;
606     case 'A':
607       return NAND_op(value, len);
608       break;
609     case 'O':
610       return NOR_op(value, len);
611       break;
612   }
613   return false;   //make gcc happy
614 }
615 
616 typedef struct _stack_element
617 {
618   char type;
619   int num;
620 }stack_element;
621 
622 /*
623 * stack_op, store info for AND,OR,NAND,NOR
624 * stack_col, store value of column(i,j,k,l,m,n) and temporary result for an operation
625 */
626 stack_element stack_op[RECURSIVE_LEVEL * COL_LEN];
627 bool stack_col[RECURSIVE_LEVEL * COL_LEN * 2];
628 
629 /*
630 * check whether the given tuple is chosen by judgement condition
631 * tuple_no, the NO of tuple in TABLE_NAME, range from 0 to TUPLE_NUM
632 * str: a random string of scan opearation and condition
633 * len: length of str
634 */
check_one_tuple(int tuple_no,char * str,int len)635 bool check_one_tuple(int tuple_no, char *str, int len)
636 {
637   int pop_op = 0;
638   int pop_col = 0;
639   for(int i = 0; i < len; i++)
640   {
641     char letter = *(str + i);
642     if(check_op(letter))    //push
643     {
644       stack_op[pop_op].type = letter;
645       stack_op[pop_op].num = 0;
646       pop_op++;
647     }
648     if(check_col(letter))   //push
649     {
650       stack_col[pop_col] = check_col_equal_one(tuple_no, get_column_id(letter));
651       pop_col++;
652       stack_op[pop_op-1].num += 1;
653     }
654     if(check_end(letter))
655     {
656       if(pop_op <= 1)
657       {
658         return calculate_one_op(stack_op[pop_op-1].type,
659                                 stack_col,
660                                 stack_op[pop_op-1].num);
661       }
662       else
663       {
664         bool tmp1 = calculate_one_op(stack_op[pop_op-1].type,
665                                     stack_col + pop_col - stack_op[pop_op-1].num,
666                                     stack_op[pop_op-1].num);
667         pop_col -= stack_op[pop_op-1].num;    //pop
668         pop_op--;
669         stack_col[pop_col] = tmp1;    //push
670         pop_col++;
671         stack_op[pop_op-1].num += 1;
672       }
673     }
674   }
675   return false;   //make gcc happy
676 }
677 
678 /*
679 * get lists of tuples which match the scan condiction through calculating
680 * str: a random string of scan opearation and condition
681 */
check_all_tuples(char * str,bool * res)682 void check_all_tuples(char *str, bool *res)
683 {
684   for (int i = 0; i < TUPLE_NUM; i++)
685   {
686     if(check_one_tuple(i, str, strlen(str)))
687       res[i] = true;
688   }
689 }
690 
691 /*
692 * convert a letter to group number what ndbapi need
693 */
get_api_group(char op_name)694 NdbScanFilter::Group get_api_group(char op_name)
695 {
696   switch (op_name) {
697   case 'a': return NdbScanFilter::AND;
698   case 'o': return NdbScanFilter::OR;
699   case 'A': return NdbScanFilter::NAND;
700   case 'O': return NdbScanFilter::NOR;
701   default:
702     fprintf(stderr, "Invalid group name %c !\n", op_name);
703     exit(3);
704   }
705 }
706 
707 /*
708 * with ndbapi, call begin, eq/ne/lt/gt/le/ge..., end
709 */
call_ndbapi(char * str,NdbTransaction * transaction,NdbScanOperation * scan,NdbDictionary::Column const * col[])710 NdbScanFilter * call_ndbapi(char *str, NdbTransaction *transaction,
711   NdbScanOperation *scan, NdbDictionary::Column const *col[])
712 {
713   NdbScanFilter *scanfilter = new NdbScanFilter(scan);
714   char *p;
715 
716   for (p = str; *p; p++)
717   {
718     if(check_op(*p))
719     {
720        if(scanfilter->begin(get_api_group(*p)))
721           ERR_EXIT(transaction, "filter begin() failed");
722     }
723     if(check_col(*p))
724     {
725        if(scanfilter->eq(col[*p-'i'+1]->getColumnNo(), (Uint32)1))
726           ERR_EXIT(transaction, "filter eq() failed");
727     }
728     if(check_end(*p))
729     {
730       if(scanfilter->end())
731       {
732         NdbError err= scanfilter->getNdbError();
733         printf("Problem closing ScanFilter= %d\n", err.code);
734         ERR_EXIT(transaction, "filter end() failed");
735       }
736     }
737   }
738 
739   return scanfilter;
740 }
741 
742 /*
743 * get the tuples through ndbapi, and save the tuples NO.
744 * str: a random string of scan opearation and condition
745 */
ndbapi_tuples(Ndb * ndb,char * str,bool * res)746 void ndbapi_tuples(Ndb *ndb, char *str, bool *res)
747 {
748 	const NdbDictionary::Dictionary *dict  = ndb->getDictionary();
749 	if (!dict)
750     ERR_EXIT(ndb, "Can't get dict");
751 
752 	const NdbDictionary::Table *table = dict->getTable(TABLE_NAME);
753 	if (!table)
754     ERR_EXIT(dict, "Can't get table"TABLE_NAME);
755 
756 	const NdbDictionary::Column *col[COL_LEN];
757 
758   for(int ii = 0; ii < COL_LEN; ii++)
759   {
760     char tmp[128];
761     col[ii] = table->getColumn(COL_NAME[ii]);
762 	  if(!col[ii])
763     {
764       BaseString::snprintf(tmp, 128, "Can't get column %s", COL_NAME[ii]);
765       ERR_EXIT(dict, tmp);
766     }
767   }
768 
769   NdbTransaction *transaction;
770   NdbScanOperation *scan;
771   NdbScanFilter *filter;
772 
773   transaction = ndb->startTransaction();
774   if (!transaction)
775     ERR_EXIT(ndb, "Can't start transaction");
776 
777   scan = transaction->getNdbScanOperation(table);
778   if (!scan)
779     ERR_EXIT(transaction, "Can't get scan op");
780 
781   if (scan->readTuples(NdbOperation::LM_Exclusive))
782     ERR_EXIT(scan, "Can't set up read");
783 
784   NdbRecAttr *rec[COL_LEN];
785   for(int ii = 0; ii < COL_LEN; ii++)
786   {
787     char tmp[128];
788     rec[ii] = scan->getValue(COL_NAME[ii]);
789 	  if(!rec[ii])
790     {
791       BaseString::snprintf(tmp, 128, "Can't get rec of %s", COL_NAME[ii]);
792       ERR_EXIT(scan, tmp);
793     }
794   }
795 
796   filter = call_ndbapi(str, transaction, scan, col);
797 
798   if (transaction->execute(NdbTransaction::NoCommit))
799     ERR_EXIT(transaction, "Can't execute");
800 
801   int i,j,k,l,m,n;
802   while (scan->nextResult(true) == 0)
803   {
804     do
805     {
806       i = rec[1]->u_32_value();
807       j = rec[2]->u_32_value();
808       k = rec[3]->u_32_value();
809       l = rec[4]->u_32_value();
810       m = rec[5]->u_32_value();
811       n = rec[6]->u_32_value();
812       res[32*i+16*j+8*k+4*l+2*m+n] = true;
813     } while (scan->nextResult(false) == 0);
814   }
815 
816   delete filter;
817   transaction->close();
818 }
819 
820 /*
821 * compare the result between calculation and NDBAPI
822 * str: a random string of scan opearation and condition
823 * return: true stands for ndbapi ok, false stands for ndbapi failed
824 */
825 template class Vector<bool>;
compare_cal_ndb(char * str,Ndb * ndb)826 bool compare_cal_ndb(char *str, Ndb *ndb)
827 {
828   Vector<bool> res_cal;
829   Vector<bool> res_ndb;
830 
831   for(int i = 0; i < TUPLE_NUM; i++)
832   {
833     res_cal.push_back(false);
834     res_ndb.push_back(false);
835   }
836 
837   check_all_tuples(str, res_cal.getBase());
838   ndbapi_tuples(ndb, str, res_ndb.getBase());
839 
840   for(int i = 0; i < TUPLE_NUM; i++)
841   {
842     if(res_cal[i] != res_ndb[i])
843       return false;
844   }
845   return true;
846 }
847 
848 
runCreateTables(NDBT_Context * ctx,NDBT_Step * step)849 int runCreateTables(NDBT_Context* ctx, NDBT_Step* step)
850 {
851   Ndb *pNdb = GETNDB(step);
852   pNdb->getDictionary()->dropTable(MYTAB1.getName());
853   int ret = createTable(pNdb, &MYTAB1, false, true, 0);
854   if(ret)
855     return ret;
856   return NDBT_OK;
857 }
858 
859 
runDropTables(NDBT_Context * ctx,NDBT_Step * step)860 int runDropTables(NDBT_Context* ctx, NDBT_Step* step)
861 {
862  int ret = GETNDB(step)->getDictionary()->dropTable(MYTAB1.getName());
863  if(ret == -1)
864    return NDBT_FAILED;
865 
866   return NDBT_OK;
867 }
868 
runScanRandomFilterTest(NDBT_Context * ctx,NDBT_Step * step)869 int runScanRandomFilterTest(NDBT_Context* ctx, NDBT_Step* step)
870 {
871   char random_str[MAX_STR_LEN];
872   Ndb *myNdb = GETNDB(step);
873 
874   for(int i = 0; i < TEST_NUM; i++)
875   {
876     get_rand_op_str_compound(random_str);
877     if( !compare_cal_ndb(random_str, myNdb))
878       return NDBT_FAILED;
879   }
880 
881   return NDBT_OK;
882 }
883 
runMaxScanFilterSize(NDBT_Context * ctx,NDBT_Step * step)884 int runMaxScanFilterSize(NDBT_Context* ctx, NDBT_Step* step)
885 {
886   /* This testcase uses the ScanFilter methods to build a large
887    * scanFilter, checking that ScanFilter building fails
888    * at the expected point, with the correct error message
889    */
890   const Uint32 MaxLength= NDB_MAX_SCANFILTER_SIZE_IN_WORDS;
891 
892   const Uint32 InstructionWordsPerEq= 3;
893 
894   const Uint32 MaxEqsInScanFilter= MaxLength/InstructionWordsPerEq;
895 
896   Ndb *myNdb = GETNDB(step);
897   const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
898   const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
899   if(myTable == NULL)
900     APIERROR(myDict->getNdbError());
901 
902   NdbInterpretedCode ic(myTable);
903 
904   NdbScanFilter sf(&ic);
905 
906   if (sf.begin()) // And group
907   {
908     ndbout << "Bad rc from begin\n";
909     ndbout << sf.getNdbError() << "\n";
910     return NDBT_FAILED;
911   }
912 
913   Uint32 loop=0;
914 
915   for (;loop < MaxEqsInScanFilter; loop++)
916   {
917     if (sf.eq(0u, 10u))
918     {
919       ndbout << "Bad rc from eq at loop " << loop << "\n";
920       ndbout << sf.getNdbError() << "\n";
921       return NDBT_FAILED;
922     }
923   }
924 
925   if (! sf.eq(0u, 10u))
926   {
927     ndbout << "Expected ScanFilter instruction addition to fail after"
928            << MaxEqsInScanFilter << "iterations, but it didn't\n";
929     return NDBT_FAILED;
930   }
931 
932   NdbError err=sf.getNdbError();
933 
934   if (err.code != 4294)
935   {
936     ndbout << "Expected to get error code 4294, but instead got " << err.code << "\n";
937     return NDBT_FAILED;
938   }
939 
940   return NDBT_OK;
941 }
942 
943 
runScanFilterConstructorFail(NDBT_Context * ctx,NDBT_Step * step)944 int runScanFilterConstructorFail(NDBT_Context* ctx, NDBT_Step* step)
945 {
946   /* We test that failures in the ScanFilter constructor can be
947    * detected by the various ScanFilter methods without
948    * issues
949    */
950   Ndb *myNdb = GETNDB(step);
951   const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
952   const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
953   if(myTable == NULL)
954     APIERROR(myDict->getNdbError());
955 
956   NdbTransaction* trans=myNdb->startTransaction();
957 
958   if (trans == NULL)
959   {
960     APIERROR(trans->getNdbError());
961     return NDBT_FAILED;
962   }
963 
964   /* Create an NdbRecord scan operation */
965   const NdbScanOperation* tabScan=
966     trans->scanTable(myTable->getDefaultRecord());
967 
968   if (tabScan==NULL)
969   {
970     APIERROR(trans->getNdbError());
971     return NDBT_FAILED;
972   }
973 
974   /* Now we hackily try to add a ScanFilter after the operation
975    * is defined.  This will cause a failure within the
976    * constructor
977    */
978   NdbScanFilter brokenSf((NdbScanOperation*) tabScan);
979 
980   /* Scan operation should have an error */
981   if (tabScan->getNdbError().code != 4536)
982   {
983     ndbout << "Expected error 4536, had error " <<
984       tabScan->getNdbError().code << " instead" << endl;
985     return NDBT_FAILED;
986   }
987 
988   /* ScanFilter should have an error */
989   if (brokenSf.getNdbError().code != 4539)
990   {
991     ndbout  << "Expected error 4539, had error " <<
992       brokenSf.getNdbError().code << " instead" << endl;
993     return NDBT_FAILED;
994   }
995 
996   if (brokenSf.begin() != -1)
997   { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
998 
999   if (brokenSf.istrue() != -1)
1000   { ndbout << "Bad rc from istrue" << endl; return NDBT_FAILED; }
1001 
1002   if (brokenSf.isfalse() != -1)
1003   { ndbout << "Bad rc from isfalse" << endl; return NDBT_FAILED; }
1004 
1005   if (brokenSf.isnull(0) != -1)
1006   { ndbout << "Bad rc from isnull" << endl; return NDBT_FAILED; }
1007 
1008   if (brokenSf.isnotnull(0) != -1)
1009   { ndbout << "Bad rc from isnotnull" << endl; return NDBT_FAILED; }
1010 
1011   if (brokenSf.cmp(NdbScanFilter::COND_EQ, 0, NULL, 0) != -1)
1012   { ndbout << "Bad rc from cmp" << endl; return NDBT_FAILED; }
1013 
1014   if (brokenSf.end() != -1)
1015   { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
1016 
1017   trans->close();
1018 
1019   /* Now we check that we can define a ScanFilter before
1020    * calling readTuples() for a scan operation
1021    */
1022   trans= myNdb->startTransaction();
1023 
1024   if (trans == NULL)
1025   {
1026     APIERROR(trans->getNdbError());
1027     return NDBT_FAILED;
1028   }
1029 
1030   /* Get an old Api table scan operation */
1031   NdbScanOperation* tabScanOp=
1032     trans->getNdbScanOperation(myTable);
1033 
1034   if (tabScanOp==NULL)
1035   {
1036     APIERROR(trans->getNdbError());
1037     return NDBT_FAILED;
1038   }
1039 
1040   /* Attempt to define a ScanFilter before calling readTuples() */
1041   NdbScanFilter sf(tabScanOp);
1042 
1043   /* Should be no problem ... */
1044   if (sf.getNdbError().code != 0)
1045   { APIERROR(sf.getNdbError()); return NDBT_FAILED; };
1046 
1047 
1048   /* Ok, now attempt to define a ScanFilter against a primary key op */
1049   NdbOperation* pkOp= trans->getNdbOperation(myTable);
1050 
1051   if (pkOp == NULL)
1052   {
1053     APIERROR(trans->getNdbError());
1054     return NDBT_FAILED;
1055   }
1056 
1057   NdbScanFilter sf2(pkOp);
1058 
1059   if (sf2.getNdbError().code != 4539)
1060   {
1061     ndbout << "Error, expected 4539" << endl;
1062     APIERROR(sf2.getNdbError());
1063     return NDBT_FAILED;
1064   }
1065 
1066   return NDBT_OK;
1067 }
1068 
getBit(const Uint32 * bitMap,int bitNum)1069 bool getBit(const Uint32* bitMap,
1070             int bitNum)
1071 {
1072   return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
1073 }
1074 
setBit(Uint32 * bitMap,int bitMapByteSize,int bitNum)1075 void setBit(Uint32* bitMap, int bitMapByteSize,
1076             int bitNum)
1077 {
1078   assert(bitNum >= 0 && bitNum < (bitMapByteSize * 8));
1079   bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
1080 }
1081 
1082 enum TestConditions {
1083   COND_LE = 0,
1084   COND_LT = 1,
1085   COND_GE = 2,
1086   COND_GT = 3,
1087   COND_EQ = 4,
1088   COND_NE = 5,
1089   COND_NULL = 6,
1090   COND_NOTNULL = 7,
1091   COND_AND_EQ_MASK = 8,
1092   COND_AND_NE_MASK = 9,
1093   COND_AND_EQ_ZERO = 10,
1094   COND_AND_NE_ZERO = 11
1095 };
1096 
getExpectedBitsSet(int rowId,int colBitWidth)1097 int getExpectedBitsSet(int rowId,
1098                        int colBitWidth)
1099 {
1100   /* returns value from -1 to colBitWidth -1
1101    * -1 == no bits set
1102    * 0 == bit 0 set
1103    * 1 == bit 0 + bit 1 set
1104    * ...
1105    */
1106   return (rowId % (colBitWidth + 1)) -1;
1107 }
1108 
isNullValue(int rowId,int colBitWidth)1109 bool isNullValue(int rowId,
1110                  int colBitWidth)
1111 {
1112   /* Occasionally we'll have a Null column */
1113   return (((rowId + colBitWidth) % 13) == 0);
1114 }
1115 
getBitfieldVariants(int bitNum,int & offset,bool & invert)1116 void getBitfieldVariants(int bitNum, int& offset, bool& invert)
1117 {
1118   offset= 0;
1119   invert= false;
1120   if ((bitNum % 5) == 3)
1121   {
1122     /* Invert the mask */
1123     invert= true;
1124   }
1125   if ((bitNum % 7) == 6)
1126   {
1127     /* Shift the mask */
1128     offset= (bitNum / 2);
1129   }
1130 };
1131 
1132 
isRowExpected(int rowId,TestConditions cond,int colBitWidth,int bitsSetInScanFilter,bool isNullable,const Uint32 * maskBuff)1133 bool isRowExpected(int rowId,
1134                    TestConditions cond,
1135                    int colBitWidth,
1136                    int bitsSetInScanFilter,
1137                    bool isNullable,
1138                    const Uint32* maskBuff)
1139 {
1140   if (isNullable && isNullValue(rowId, colBitWidth))
1141   {
1142     switch (cond) {
1143     case COND_LE:
1144       return true; // null < any value
1145     case COND_LT:
1146       return true; // null < any value
1147     case COND_GE:
1148       return false; // null < any value
1149     case COND_GT:
1150       return false; // null < any value
1151     case COND_EQ:
1152       return false; // null != any value
1153     case COND_NE:
1154       return true; // null != any value
1155     case COND_NULL:
1156       return true; // null is null
1157     case COND_NOTNULL:
1158       return false;
1159     case COND_AND_EQ_MASK:
1160       return false; // NULL AND MASK != MASK
1161     case COND_AND_NE_MASK:
1162       return true; // NULL AND MASK != MASK
1163     case COND_AND_EQ_ZERO:
1164       return false; // NULL AND MASK != 0
1165     case COND_AND_NE_ZERO:
1166       return true; // NULL AND MASK != 0
1167     default:
1168       printf("isRowExpected given bad condition : %u\n",
1169              cond);
1170       return false;
1171     }
1172   }
1173   else
1174   {
1175     /* Not a null value */
1176     int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
1177 
1178     int offset= 0;
1179     bool invert= false;
1180 
1181     getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
1182 
1183     switch (cond) {
1184     case COND_LE:
1185       return expectedBitsSet <= bitsSetInScanFilter;
1186     case COND_LT:
1187       return expectedBitsSet <  bitsSetInScanFilter;
1188     case COND_GE:
1189       return expectedBitsSet >= bitsSetInScanFilter;
1190     case COND_GT:
1191       return expectedBitsSet >  bitsSetInScanFilter;
1192     case COND_EQ:
1193       return expectedBitsSet == bitsSetInScanFilter;
1194     case COND_NE:
1195       return expectedBitsSet != bitsSetInScanFilter;
1196     case COND_NULL:
1197       return false;
1198     case COND_NOTNULL:
1199       return true;
1200     case COND_AND_EQ_MASK:
1201     case COND_AND_NE_MASK:
1202     {
1203       bool result= true;
1204       /* Compare data AND mask to the mask buff */
1205       for (int idx=0; idx < colBitWidth; idx++)
1206       {
1207         bool bitVal= (expectedBitsSet >= 0)?
1208           (expectedBitsSet >= idx): false;
1209         bool maskVal= getBit(maskBuff, idx);
1210         if ((bitVal & maskVal) != maskVal)
1211         {
1212           /* Difference to mask, know result */
1213           result= false;
1214           break;
1215         }
1216       }
1217 
1218       /* Invert result for NE condition */
1219       return (result ^ (cond == COND_AND_NE_MASK));
1220     }
1221     case COND_AND_EQ_ZERO:
1222     case COND_AND_NE_ZERO:
1223     {
1224       bool result= true;
1225       /* Compare data AND mask to zero */
1226       for (int idx=0; idx < colBitWidth; idx++)
1227       {
1228         bool bitVal= (expectedBitsSet >= 0)?
1229           (expectedBitsSet >= idx): false;
1230         bool maskVal= getBit(maskBuff, idx);
1231         if ((bitVal & maskVal) != 0)
1232         {
1233           /* Difference to 0, know result */
1234           result= false;
1235           break;
1236         }
1237       }
1238       /* Invert result for NE condition */
1239       return (result ^ (cond == COND_AND_NE_ZERO));
1240     }
1241     default:
1242       printf("isRowExpected given bad condition : %u\n",
1243              cond);
1244       return false;
1245     }
1246   }
1247 }
1248 
insertBitRows(Ndb * pNdb)1249 int insertBitRows(Ndb* pNdb)
1250 {
1251   const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1252   const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1253   if(myTable == NULL)
1254     APIERROR(myDict->getNdbError());
1255 
1256   for (int i=0; i< TOTAL_ROWS; i++)
1257   {
1258     NdbTransaction* myTrans = pNdb->startTransaction();
1259     if (myTrans == NULL)
1260       APIERROR(pNdb->getNdbError());
1261 
1262     NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
1263     if (insertOp == NULL)
1264       APIERROR(pNdb->getNdbError());
1265 
1266     const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
1267     Uint32 buff[ buffSize ];
1268 
1269     if (insertOp->insertTuple() != 0)
1270       APIERROR(insertOp->getNdbError());
1271 
1272     if (insertOp->equal((Uint32)0, i) != 0) // Set id column
1273       APIERROR(insertOp->getNdbError());
1274 
1275     for (int col=1; col < myTable->getNoOfColumns(); col++)
1276     {
1277       const NdbDictionary::Column* c= myTable->getColumn(col);
1278       int colBitWidth= c->getLength();
1279       bool isNullable= c->getNullable();
1280 
1281       if (isNullable &&
1282           isNullValue(i, colBitWidth))
1283       {
1284         /* Set column value to NULL */
1285         if (insertOp->setValue(col, (char*) NULL) != 0)
1286           APIERROR(insertOp->getNdbError());
1287       }
1288       else
1289       {
1290         /* Set lowest bits in this column */
1291         memset(buff, 0, (4 * buffSize));
1292 
1293         int bitsToSet= getExpectedBitsSet(i, colBitWidth);
1294 
1295         if (bitsToSet >= 0)
1296         {
1297           for (int idx=0; idx <= bitsToSet; idx++)
1298             setBit(buff, sizeof(buff), idx);
1299         }
1300 
1301         if (insertOp->setValue(col, (char *)buff) != 0)
1302           APIERROR(insertOp->getNdbError());
1303       }
1304     }
1305 
1306     if (myTrans->execute(NdbTransaction::Commit) != 0)
1307     {
1308       APIERROR(myTrans->getNdbError());
1309     }
1310     myTrans->close();
1311   }
1312 
1313   printf("Inserted %u rows\n", TOTAL_ROWS);
1314 
1315   return NDBT_OK;
1316 }
1317 
verifyBitData(Ndb * pNdb)1318 int verifyBitData(Ndb* pNdb)
1319 {
1320   const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1321   const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1322   if(myTable == NULL)
1323     APIERROR(myDict->getNdbError());
1324 
1325   NdbTransaction* myTrans= pNdb->startTransaction();
1326   if (myTrans == NULL)
1327     APIERROR(pNdb->getNdbError());
1328 
1329   NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1330   if (scanOp == NULL)
1331     APIERROR(pNdb->getNdbError());
1332 
1333   if (scanOp->readTuples() != 0)
1334     APIERROR(scanOp->getNdbError());
1335 
1336   NdbRecAttr* results[ NUM_COLS ];
1337 
1338   for (int col=0; col< NUM_COLS; col++)
1339   {
1340     if ((results[col]= scanOp->getValue(col)) == NULL)
1341       APIERROR(scanOp->getNdbError());
1342   }
1343 
1344   if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1345     APIERROR(myTrans->getNdbError());
1346 
1347   for (int row=0; row < TOTAL_ROWS; row++)
1348   {
1349     if (scanOp->nextResult() != 0)
1350       APIERROR(scanOp->getNdbError());
1351 
1352     int rowId= results[0]->int32_value();
1353 
1354     for (int col=1; col < NUM_COLS; col++)
1355     {
1356       const NdbDictionary::Column* c= myTable->getColumn(col);
1357       bool isNullable= c->getNullable();
1358       int colBitWidth= c->getLength();
1359 
1360       if (isNullable &&
1361           isNullValue(rowId, colBitWidth))
1362       {
1363         if (!results[ col ] ->isNULL())
1364         {
1365           printf("Mismatch at result %d row %d, column %d, expected NULL\n",
1366                  row, rowId, col);
1367           myTrans->close();
1368           return NDBT_FAILED;
1369         }
1370       }
1371       else
1372       {
1373         /* Non null value, check it */
1374         int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
1375 
1376         const Uint32* val= (const Uint32 *) results[ col ]->aRef();
1377 
1378         for (int bitNum=0; bitNum < colBitWidth; bitNum++)
1379         {
1380           bool expectClear= (bitNum > expectedSetBits);
1381           bool isClear= ! getBit(val, bitNum);
1382           if (expectClear != isClear)
1383           {
1384             printf("Mismatch at result %d row %d, column %d, bit %d"
1385                    " expected %d \n",
1386                    row, rowId, col, bitNum, (expectClear)?0:1);
1387             myTrans->close();
1388             return NDBT_FAILED;
1389           }
1390         }
1391       }
1392     }
1393   }
1394 
1395   if (scanOp->nextResult() != 1)
1396   {
1397     printf("Too many rows returned\n");
1398     return NDBT_FAILED;
1399   }
1400 
1401   if (myTrans->execute(NdbTransaction::Commit) != 0)
1402     APIERROR(myTrans->getNdbError());
1403 
1404   myTrans->close();
1405 
1406   printf("Verified data for %u rows\n",
1407          TOTAL_ROWS);
1408 
1409   return NDBT_OK;
1410 }
1411 
verifyBitScanFilter(Ndb * pNdb)1412 int verifyBitScanFilter(Ndb* pNdb)
1413 {
1414   const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1415   const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1416   if(myTable == NULL)
1417     APIERROR(myDict->getNdbError());
1418 
1419   /* Perform scan with scanfilter for :
1420    *   - each column in the table
1421    *   - each supported comparison type
1422    *   - each potentially set bit in the column
1423    */
1424   int scanCount= 0;
1425   for (int col=1; col < NUM_COLS; col++)
1426   {
1427     const NdbDictionary::Column* c= myTable->getColumn(col);
1428     const int bitWidth= c->getLength();
1429     printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
1430            (c->getNullable())?"Nullable":"Non-null",
1431            col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
1432     for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
1433     {
1434       for (int bitNum=0; bitNum <= bitWidth; bitNum++)
1435       {
1436         /* Define scan */
1437         NdbTransaction* myTrans= pNdb->startTransaction();
1438         if (myTrans == NULL)
1439           APIERROR(pNdb->getNdbError());
1440 
1441         NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1442         if (scanOp == NULL)
1443           APIERROR(pNdb->getNdbError());
1444 
1445         if (scanOp->readTuples() != 0)
1446           APIERROR(scanOp->getNdbError());
1447 
1448         NdbRecAttr* ra;
1449         if ((ra= scanOp->getValue((Uint32)0)) == NULL)
1450           APIERROR(scanOp->getNdbError());
1451 
1452         /* Define ScanFilter */
1453         const int buffSize= (MAX_BIT_WIDTH + 31)/32;
1454         Uint32 buff[ buffSize ];
1455         memset(buff, 0, (4 * buffSize));
1456 
1457         /* Define constant value, with some variants for bitwise operators */
1458         bool invert= false;
1459         int offset= 0;
1460 
1461         switch (comp) {
1462         case COND_AND_EQ_MASK:
1463         case COND_AND_NE_MASK:
1464         case COND_AND_EQ_ZERO:
1465         case COND_AND_NE_ZERO:
1466           getBitfieldVariants(bitNum, offset, invert);
1467         default:
1468           ;
1469         }
1470 
1471         /* Set lower bitNum -1 bits
1472          * If bitNum == 0, set none
1473          */
1474         int bitsSetInFilter= bitNum-1;
1475 
1476         if ((bitsSetInFilter >= 0) ||
1477             invert)
1478         {
1479           for (int idx=0; idx < (32 * buffSize); idx++)
1480           {
1481             if ((idx >= offset) &&
1482                 (idx <= (offset + bitsSetInFilter)))
1483             {
1484               if (!invert)
1485                 setBit(buff, sizeof(buff), idx);
1486             }
1487             else
1488             {
1489               if (invert)
1490                 setBit(buff, sizeof(buff), idx);
1491             }
1492           }
1493         }
1494 
1495 
1496 
1497         NdbScanFilter sf(scanOp);
1498 
1499         if (sf.begin(NdbScanFilter::AND) != 0)
1500           APIERROR(sf.getNdbError());
1501 
1502         if ((comp != COND_NULL) &&
1503             (comp != COND_NOTNULL))
1504         {
1505           /* Operator with a constant */
1506           if (sf.cmp((NdbScanFilter::BinaryCondition)comp,
1507                      col,
1508                      (char*)buff) != 0)
1509             APIERROR(sf.getNdbError());
1510         }
1511         else
1512         {
1513           switch (comp) {
1514           case COND_NULL:
1515             if (sf.isnull(col) != 0)
1516               APIERROR(sf.getNdbError());
1517             break;
1518           case COND_NOTNULL:
1519             if (sf.isnotnull(col) != 0)
1520               APIERROR(sf.getNdbError());
1521             break;
1522           default:
1523             printf("Condition %u not supported\n", comp);
1524             return NDBT_FAILED;
1525           }
1526         }
1527 
1528         if (sf.end() != 0)
1529           APIERROR(sf.getNdbError());
1530 
1531         /* Calculate expected number of rows in result */
1532         const NdbDictionary::Column* c= myTable->getColumn(col);
1533         int colBitWidth= c->getLength();
1534         bool isNullable= c->getNullable();
1535 
1536         // printf("Determining expected rows\n");
1537         int expectedResultCount= 0;
1538         for (int i=0; i< TOTAL_ROWS; i++)
1539         {
1540           if (isRowExpected(i,
1541                             (TestConditions)comp,
1542                             colBitWidth,
1543                             bitsSetInFilter,
1544                             isNullable,
1545                             buff))
1546             expectedResultCount++;
1547         }
1548 
1549 
1550         /* Execute */
1551         if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1552           APIERROR(myTrans->getNdbError());
1553 
1554 
1555         /* Process results to ensure we got the expected rows back */
1556         int rc= 0;
1557         int count= 0;
1558         int matchCount= 0;
1559         // printf("Checking rows returned\n");
1560         while ((rc= scanOp->nextResult()) == 0)
1561         {
1562           int rowId= ra->int32_value();
1563           count++;
1564           /*
1565            * Check that this row was expected
1566            */
1567           if (isRowExpected(rowId,
1568                             (TestConditions)comp,
1569                             colBitWidth,
1570                             bitsSetInFilter,
1571                             isNullable,
1572                             buff))
1573           {
1574             matchCount++;
1575           }
1576           else
1577           {
1578             printf("Col=%u Comp=%u BitNum=%u row=%u : "
1579                    "Got row %u back which I did not expect\n",
1580                    col, comp, bitNum, count, rowId);
1581             myTrans->close();
1582             return NDBT_FAILED;
1583           }
1584         }
1585 
1586         if (rc != 1)
1587         {
1588           printf("Col=%u Comp=%u BitNum=%u :"
1589                  "nextResult failure %d\n", col, comp, bitNum, rc);
1590           APIERROR(myTrans->getNdbError());
1591         }
1592 
1593         //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
1594         //       col, comp, bitNum, expectedResultCount, matchCount);
1595 
1596         /* Check that we didn't miss any expected rows */
1597         if (matchCount != expectedResultCount)
1598         {
1599           printf("Col=%u Comp=%u BitNum=%u :"
1600                  "Mismatch between expected(%u) and received(%u) result counts\n",
1601                  col, comp, bitNum, expectedResultCount, matchCount);
1602           myTrans->close();
1603           return NDBT_FAILED;
1604         }
1605 
1606         if (myTrans->execute(NdbTransaction::Commit) != 0)
1607           APIERROR(myTrans->getNdbError());
1608 
1609         myTrans->close();
1610 
1611         scanCount++;
1612 
1613       } // for bitNum
1614     } // for comparison
1615   } // for column
1616 
1617   printf("Verified %u scans with bitfield ScanFilter conditions\n",
1618          scanCount);
1619 
1620   return NDBT_OK;
1621 }
1622 
1623 
runTestScanFilterBit(NDBT_Context * ctx,NDBT_Step * step)1624 int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
1625 {
1626   /* Create table */
1627   Ndb *pNdb = GETNDB(step);
1628   pNdb->getDictionary()->dropTable(MYTAB2.getName());
1629   int ret = createTable(pNdb, &MYTAB2, false, true, 0);
1630   if(ret)
1631     return ret;
1632 
1633   /* Populate with data */
1634   if (insertBitRows(pNdb) != NDBT_OK)
1635     return NDBT_FAILED;
1636 
1637   /* Initial data check via scan */
1638   if (verifyBitData(pNdb) != NDBT_OK)
1639     return NDBT_FAILED;
1640 
1641   /* Verify Bit ScanFilter correctness */
1642   if (verifyBitScanFilter(pNdb) != NDBT_OK)
1643     return NDBT_FAILED;
1644 
1645   /* Drop table */
1646   pNdb->getDictionary()->dropTable(MYTAB2.getName());
1647 
1648   return NDBT_OK;
1649 }
1650 
1651 
1652 NDBT_TESTSUITE(testScanFilter);
1653 TESTCASE(TEST_NAME,
1654 	 "Scan table TABLE_NAME for the records which accord with \
1655    conditions of logical scan operations: AND/OR/NAND/NOR")
1656 {
1657   INITIALIZER(runCreateTables);
1658   INITIALIZER(runPopulate);
1659   INITIALIZER(runScanRandomFilterTest);
1660   INITIALIZER(runMaxScanFilterSize);
1661   INITIALIZER(runScanFilterConstructorFail);
1662   FINALIZER(runDropTables);
1663 }
1664 
1665 TESTCASE("TestScanFilterBit",
1666          "Test ScanFilter with bitfield columns")
1667 {
1668   INITIALIZER(runTestScanFilterBit);
1669 }
1670 
1671 NDBT_TESTSUITE_END(testScanFilter);
1672 
1673 
main(int argc,const char ** argv)1674 int main(int argc, const char** argv)
1675 {
1676   ndb_init();
1677 
1678   Ndb_cluster_connection con;
1679   if(con.connect(12, 5, 1))
1680   {
1681     return NDBT_ProgramExit(NDBT_FAILED);
1682   }
1683 
1684   NDBT_TESTSUITE_INSTANCE(testScanFilter);
1685   return testScanFilter.execute(argc, argv);
1686 }
1687