1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbSqlUtil.hpp>
26 #include <ndb_version.h>
27 #include <math.h>
28 
29 /*
30  * Data types.  The entries must be in the numerical order.
31  */
32 
33 const NdbSqlUtil::Type
34 NdbSqlUtil::m_typeList[] = {
35   { // 0
36     Type::Undefined,
37     NULL,
38     NULL,
39     NULL
40   },
41   { // 1
42     Type::Tinyint,
43     cmpTinyint,
44     NULL,
45     NULL
46   },
47   { // 2
48     Type::Tinyunsigned,
49     cmpTinyunsigned,
50     NULL,
51     NULL
52   },
53   { // 3
54     Type::Smallint,
55     cmpSmallint,
56     NULL,
57     NULL
58   },
59   { // 4
60     Type::Smallunsigned,
61     cmpSmallunsigned,
62     NULL,
63     NULL
64   },
65   { // 5
66     Type::Mediumint,
67     cmpMediumint,
68     NULL,
69     NULL
70   },
71   { // 6
72     Type::Mediumunsigned,
73     cmpMediumunsigned,
74     NULL,
75     NULL
76   },
77   { // 7
78     Type::Int,
79     cmpInt,
80     NULL,
81     NULL
82   },
83   { // 8
84     Type::Unsigned,
85     cmpUnsigned,
86     NULL,
87     NULL
88   },
89   { // 9
90     Type::Bigint,
91     cmpBigint,
92     NULL,
93     NULL
94   },
95   { // 10
96     Type::Bigunsigned,
97     cmpBigunsigned,
98     NULL,
99     NULL
100   },
101   { // 11
102     Type::Float,
103     cmpFloat,
104     NULL,
105     NULL
106   },
107   { // 12
108     Type::Double,
109     cmpDouble,
110     NULL,
111     NULL
112   },
113   { // 13
114     Type::Olddecimal,
115     cmpOlddecimal,
116     NULL,
117     NULL
118   },
119   { // 14
120     Type::Char,
121     cmpChar,
122     likeChar,
123     NULL
124   },
125   { // 15
126     Type::Varchar,
127     cmpVarchar,
128     likeVarchar,
129     NULL
130   },
131   { // 16
132     Type::Binary,
133     cmpBinary,
134     likeBinary,
135     NULL
136   },
137   { // 17
138     Type::Varbinary,
139     cmpVarbinary,
140     likeVarbinary,
141     NULL
142   },
143   { // 18
144     Type::Datetime,
145     cmpDatetime,
146     NULL,
147     NULL
148   },
149   { // 19
150     Type::Date,
151     cmpDate,
152     NULL,
153     NULL
154   },
155   { // 20
156     Type::Blob,
157     NULL,
158     NULL,
159     NULL
160   },
161   { // 21
162     Type::Text,
163     NULL,
164     NULL,
165     NULL
166   },
167   { // 22
168     Type::Bit,
169     cmpBit,
170     NULL,
171     maskBit
172   },
173   { // 23
174     Type::Longvarchar,
175     cmpLongvarchar,
176     likeLongvarchar,
177     NULL
178   },
179   { // 24
180     Type::Longvarbinary,
181     cmpLongvarbinary,
182     likeLongvarbinary,
183     NULL
184   },
185   { // 25
186     Type::Time,
187     cmpTime,
188     NULL,
189     NULL
190   },
191   { // 26
192     Type::Year,
193     cmpYear,
194     NULL,
195     NULL
196   },
197   { // 27
198     Type::Timestamp,
199     cmpTimestamp,
200     NULL,
201     NULL
202   },
203   { // 28
204     Type::Olddecimalunsigned,
205     cmpOlddecimalunsigned,
206     NULL,
207     NULL
208   },
209   { // 29
210     Type::Decimal,
211     cmpDecimal,
212     NULL,
213     NULL
214   },
215   { // 30
216     Type::Decimalunsigned,
217     cmpDecimalunsigned,
218     NULL,
219     NULL
220   },
221   { // 31
222     Type::Time2,
223     cmpTime2,
224     NULL,
225     NULL
226   },
227   { // 32
228     Type::Datetime2,
229     cmpDatetime2,
230     NULL,
231     NULL
232   },
233   { // 33
234     Type::Timestamp2,
235     cmpTimestamp2,
236     NULL,
237     NULL
238   },
239 };
240 
241 const NdbSqlUtil::Type&
getType(Uint32 typeId)242 NdbSqlUtil::getType(Uint32 typeId)
243 {
244   if (typeId < sizeof(m_typeList) / sizeof(m_typeList[0]) &&
245       m_typeList[typeId].m_typeId != Type::Undefined) {
246     return m_typeList[typeId];
247   }
248   return m_typeList[Type::Undefined];
249 }
250 
251 /*
252  * Comparison functions.
253  */
254 
255 int
cmpTinyint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)256 NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
257 {
258   assert(info == 0 && n1 == 1 && n2 == 1);
259   Int8 v1, v2;
260   memcpy(&v1, p1, 1);
261   memcpy(&v2, p2, 1);
262   int w1 = (int)v1;
263   int w2 = (int)v2;
264   return w1 - w2;
265 }
266 
267 int
cmpTinyunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)268 NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
269 {
270   assert(info == 0 && n1 == 1 && n2 == 1);
271   Uint8 v1, v2;
272   memcpy(&v1, p1, 1);
273   memcpy(&v2, p2, 1);
274   int w1 = (int)v1;
275   int w2 = (int)v2;
276   return w1 - w2;
277 }
278 
279 int
cmpSmallint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)280 NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
281 {
282   assert(info == 0 && n1 == 2 && n2 == 2);
283   Int16 v1, v2;
284   memcpy(&v1, p1, 2);
285   memcpy(&v2, p2, 2);
286   int w1 = (int)v1;
287   int w2 = (int)v2;
288   return w1 - w2;
289 }
290 
291 int
cmpSmallunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)292 NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
293 {
294   assert(info == 0 && n1 == 2 && n2 == 2);
295   Uint16 v1, v2;
296   memcpy(&v1, p1, 2);
297   memcpy(&v2, p2, 2);
298   int w1 = (int)v1;
299   int w2 = (int)v2;
300   return w1 - w2;
301 }
302 
303 int
cmpMediumint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)304 NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
305 {
306   assert(info == 0 && n1 == 3 && n2 == 3);
307   uchar b1[4];
308   uchar b2[4];
309   memcpy(b1, p1, 3);
310   b1[3] = 0;
311   memcpy(b2, p2, 3);
312   b2[3] = 0;
313   int w1 = (int)sint3korr(b1);
314   int w2 = (int)sint3korr(b2);
315   return w1 - w2;
316 }
317 
318 int
cmpMediumunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)319 NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
320 {
321   assert(info == 0 && n1 == 3 && n2 == 3);
322   uchar b1[4];
323   uchar b2[4];
324   memcpy(b1, p1, 3);
325   b1[3] = 0;
326   memcpy(b2, p2, 3);
327   b2[3] = 0;
328   int w1 = (int)uint3korr(b1);
329   int w2 = (int)uint3korr(b2);
330   return w1 - w2;
331 }
332 
333 int
cmpInt(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)334 NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
335 {
336   assert(info == 0 && n1 == 4 && n2 == 4);
337   Int32 v1, v2;
338   memcpy(&v1, p1, 4);
339   memcpy(&v2, p2, 4);
340   if (v1 < v2)
341     return -1;
342   if (v1 > v2)
343     return +1;
344   return 0;
345 }
346 
347 int
cmpUnsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)348 NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
349 {
350   assert(info == 0 && n1 == 4 && n2 == 4);
351   Uint32 v1, v2;
352   memcpy(&v1, p1, 4);
353   memcpy(&v2, p2, 4);
354   if (v1 < v2)
355     return -1;
356   if (v1 > v2)
357     return +1;
358   return 0;
359 }
360 
361 int
cmpBigint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)362 NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
363 {
364   assert(info == 0 && n1 == 8 && n2 == 8);
365   Int64 v1, v2;
366   memcpy(&v1, p1, 8);
367   memcpy(&v2, p2, 8);
368   if (v1 < v2)
369     return -1;
370   if (v1 > v2)
371     return +1;
372   return 0;
373 }
374 
375 int
cmpBigunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)376 NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
377 {
378   assert(info == 0 && n1 == 8 && n2 == 8);
379   Uint64 v1, v2;
380   memcpy(&v1, p1, 8);
381   memcpy(&v2, p2, 8);
382   if (v1 < v2)
383     return -1;
384   if (v1 > v2)
385     return +1;
386   return 0;
387 }
388 
389 int
cmpFloat(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)390 NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
391 {
392   assert(info == 0 && n1 == 4 && n2 == 4);
393   float v1, v2;
394   memcpy(&v1, p1, 4);
395   memcpy(&v2, p2, 4);
396   require(!isnan(v1) && !isnan(v2));
397   if (v1 < v2)
398     return -1;
399   if (v1 > v2)
400     return +1;
401   return 0;
402 }
403 
404 int
cmpDouble(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)405 NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
406 {
407   assert(info == 0 && n1 == 8 && n2 == 8);
408   double v1, v2;
409   memcpy(&v1, p1, 8);
410   memcpy(&v2, p2, 8);
411   require(!isnan(v1) && !isnan(v2));
412   if (v1 < v2)
413     return -1;
414   if (v1 > v2)
415     return +1;
416   return 0;
417 }
418 
419 int
cmpOlddecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)420 NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
421 {
422   assert(info == 0 && n1 == n2);
423   const uchar* v1 = (const uchar*)p1;
424   const uchar* v2 = (const uchar*)p2;
425   int sgn = +1;
426   unsigned i = 0;
427   while (i < n1) {
428     int c1 = v1[i];
429     int c2 = v2[i];
430     if (c1 == c2) {
431       if (c1 == '-')
432         sgn = -1;
433     } else if (c1 == '-') {
434       return -1;
435     } else if (c2 == '-') {
436       return +1;
437     } else if (c1 < c2) {
438       return -1 * sgn;
439     } else {
440       return +1 * sgn;
441     }
442     i++;
443   }
444   return 0;
445 }
446 
447 int
cmpOlddecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)448 NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
449 {
450   return cmpOlddecimal(info, p1, n1, p2, n2);
451 }
452 
453 int
cmpDecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)454 NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
455 {
456   return cmpBinary(info, p1, n1, p2, n2);
457 }
458 
459 int
cmpDecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)460 NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
461 {
462   return cmpBinary(info, p1, n1, p2, n2);
463 }
464 
465 int
cmpChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)466 NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
467 {
468   // allow different lengths
469   assert(info != 0);
470   const uchar* v1 = (const uchar*)p1;
471   const uchar* v2 = (const uchar*)p2;
472   CHARSET_INFO* cs = (CHARSET_INFO*)info;
473   // compare with space padding
474   int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
475   return k;
476 }
477 
478 int
cmpVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)479 NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
480 {
481   assert(info != 0);
482   const uint lb = 1;
483   const uchar* v1 = (const uchar*)p1;
484   const uchar* v2 = (const uchar*)p2;
485   uint m1 = v1[0];
486   uint m2 = v2[0];
487   require(lb + m1 <= n1 && lb + m2 <= n2);
488   CHARSET_INFO* cs = (CHARSET_INFO*)info;
489   // compare with space padding
490   int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
491   return k;
492 }
493 
494 int
cmpBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)495 NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
496 {
497   // allow different lengths
498   assert(info == 0);
499   const uchar* v1 = (const uchar*)p1;
500   const uchar* v2 = (const uchar*)p2;
501   int k = 0;
502   if (n1 < n2) {
503     k = memcmp(v1, v2, n1);
504     if (k == 0)
505       k = -1;
506   } else if (n1 > n2) {
507     k = memcmp(v1, v2, n2);
508     if (k == 0)
509       k = +1;
510   } else {
511     k = memcmp(v1, v2, n1);
512   }
513   return k;
514 }
515 
516 int
cmpVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)517 NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
518 {
519   assert(info == 0);
520   const uint lb = 1;
521   const uchar* v1 = (const uchar*)p1;
522   const uchar* v2 = (const uchar*)p2;
523   uint m1 = v1[0];
524   uint m2 = v2[0];
525   require(lb + m1 <= n1 && lb + m2 <= n2);
526   int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
527   return k;
528 }
529 
530 int
cmpDatetime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)531 NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
532 {
533   assert(info == 0 && n1 == 8 && n2 == 8);
534   Int64 v1, v2;
535   memcpy(&v1, p1, sizeof(Int64));
536   memcpy(&v2, p2, sizeof(Int64));
537   if (v1 < v2)
538     return -1;
539   if (v1 > v2)
540     return +1;
541   return 0;
542 }
543 
544 int
cmpDate(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)545 NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
546 {
547   assert(info == 0 && n1 == 3 && n2 == 3);
548   uchar b1[4];
549   uchar b2[4];
550   memcpy(b1, p1, 3);
551   b1[3] = 0;
552   memcpy(b2, p2, 3);
553   b2[3] = 0;
554   // from Field_newdate::val_int
555   int w1 = (int)uint3korr(b1);
556   int w2 = (int)uint3korr(b2);
557   return w1 - w2;
558 }
559 
560 // not supported
561 int
cmpBlob(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)562 NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
563 {
564   assert(false);
565   return 0;
566 }
567 
568 // not supported
569 int
cmpText(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)570 NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
571 {
572   assert(false);
573   return 0;
574 }
575 
576 int
cmpBit(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)577 NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
578 {
579   /* Bitfields are stored as 32-bit words
580    * This means that a byte-by-byte comparison will not work on all platforms
581    * We do a word-wise comparison of the significant bytes.
582    * It is assumed that insignificant bits (but not bytes) are zeroed in the
583    * passed values.
584    */
585   const Uint32 bytes= MIN(n1, n2);
586   Uint32 words= (bytes + 3) >> 2;
587 
588   /* Don't expect either value to be length zero */
589   assert(words);
590 
591   /* Check ptr alignment */
592   if (unlikely(((((UintPtr)p1) & 3) != 0) ||
593                ((((UintPtr)p2) & 3) != 0)))
594   {
595     Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ];
596     Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ];
597     memcpy(copyP1, p1, words << 2);
598     memcpy(copyP2, p2, words << 2);
599 
600     return cmpBit(info, copyP1, bytes, copyP2, bytes);
601   }
602 
603   const Uint32* wp1= (const Uint32*) p1;
604   const Uint32* wp2= (const Uint32*) p2;
605   while (--words)
606   {
607     if (*wp1 < *wp2)
608       return -1;
609     if (*(wp1++) > *(wp2++))
610       return 1;
611   }
612 
613   /* For the last word, we mask out any insignificant bytes */
614   const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
615   const Uint32 mask= sigBytes?
616     (1 << (sigBytes *8)) -1 :
617     ~0;
618   const Uint32 lastWord1= *wp1 & mask;
619   const Uint32 lastWord2= *wp2 & mask;
620 
621   if (lastWord1 < lastWord2)
622     return -1;
623   if (lastWord1 > lastWord2)
624     return 1;
625 
626   return 0;
627 }
628 
629 
630 int
cmpTime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)631 NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
632 {
633   assert(info == 0 && n1 == 3 && n2 == 3);
634   uchar b1[4];
635   uchar b2[4];
636   memcpy(b1, p1, 3);
637   b1[3] = 0;
638   memcpy(b2, p2, 3);
639   b2[3] = 0;
640   // from Field_time::val_int
641   int j1 = (int)sint3korr(b1);
642   int j2 = (int)sint3korr(b2);
643   if (j1 < j2)
644     return -1;
645   if (j1 > j2)
646     return +1;
647   return 0;
648 }
649 
650 // not yet
651 
652 int
cmpLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)653 NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
654 {
655   assert(info != 0);
656   const uint lb = 2;
657   const uchar* v1 = (const uchar*)p1;
658   const uchar* v2 = (const uchar*)p2;
659   uint m1 = v1[0] | (v1[1] << 8);
660   uint m2 = v2[0] | (v2[1] << 8);
661   require(lb + m1 <= n1 && lb + m2 <= n2);
662   CHARSET_INFO* cs = (CHARSET_INFO*)info;
663   // compare with space padding
664   int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
665   return k;
666 }
667 
668 int
cmpLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)669 NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
670 {
671   assert(info == 0);
672   const uint lb = 2;
673   const uchar* v1 = (const uchar*)p1;
674   const uchar* v2 = (const uchar*)p2;
675   uint m1 = v1[0] | (v1[1] << 8);
676   uint m2 = v2[0] | (v2[1] << 8);
677   require(lb + m1 <= n1 && lb + m2 <= n2);
678   int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
679   return k;
680 }
681 
682 int
cmpYear(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)683 NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
684 {
685   assert(info == 0 && n1 == 1 && n2 == 1);
686   Uint8 v1, v2;
687   memcpy(&v1, p1, 1);
688   memcpy(&v2, p2, 1);
689   int w1 = (int)v1;
690   int w2 = (int)v2;
691   return w1 - w2;
692 }
693 
694 int
cmpTimestamp(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)695 NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
696 {
697   assert(info == 0 && n1 == 4 && n2 == 4);
698   Uint32 v1, v2;
699   memcpy(&v1, p1, 4);
700   memcpy(&v2, p2, 4);
701   if (v1 < v2)
702     return -1;
703   if (v1 > v2)
704     return +1;
705   return 0;
706 }
707 
708 // times with fractional seconds are big-endian binary-comparable
709 
710 int
cmpTime2(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)711 NdbSqlUtil::cmpTime2(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
712 {
713   return cmpBinary(info, p1, n1, p2, n2);
714 }
715 
716 int
cmpDatetime2(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)717 NdbSqlUtil::cmpDatetime2(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
718 {
719   return cmpBinary(info, p1, n1, p2, n2);
720 }
721 
722 int
cmpTimestamp2(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)723 NdbSqlUtil::cmpTimestamp2(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
724 {
725   return cmpBinary(info, p1, n1, p2, n2);
726 }
727 
728 // like
729 
730 static const int ndb_wild_prefix = '\\';
731 static const int ndb_wild_one = '_';
732 static const int ndb_wild_many = '%';
733 
734 int
likeChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)735 NdbSqlUtil::likeChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
736 {
737   const char* v1 = (const char*)p1;
738   const char* v2 = (const char*)p2;
739   CHARSET_INFO* cs = (CHARSET_INFO*)(info);
740   // strip end spaces to match (incorrect) MySQL behaviour
741   n1 = (unsigned)(*cs->cset->lengthsp)(cs, v1, n1);
742   int k = (*cs->coll->wildcmp)(cs, v1, v1 + n1, v2, v2 + n2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
743   return k == 0 ? 0 : +1;
744 }
745 
746 int
likeBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)747 NdbSqlUtil::likeBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
748 {
749   assert(info == 0);
750   return likeChar(&my_charset_bin, p1, n1, p2, n2);
751 }
752 
753 int
likeVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)754 NdbSqlUtil::likeVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
755 {
756   const unsigned lb = 1;
757   if (n1 >= lb) {
758     const uchar* v1 = (const uchar*)p1;
759     const uchar* v2 = (const uchar*)p2;
760     unsigned m1 = *v1;
761     unsigned m2 = n2;
762     if (lb + m1 <= n1) {
763       const char* w1 = (const char*)v1 + lb;
764       const char* w2 = (const char*)v2;
765       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
766       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
767       return k == 0 ? 0 : +1;
768     }
769   }
770   return -1;
771 }
772 
773 int
likeVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)774 NdbSqlUtil::likeVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
775 {
776   assert(info == 0);
777   return likeVarchar(&my_charset_bin, p1, n1, p2, n2);
778 }
779 
780 int
likeLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)781 NdbSqlUtil::likeLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
782 {
783   const unsigned lb = 2;
784   if (n1 >= lb) {
785     const uchar* v1 = (const uchar*)p1;
786     const uchar* v2 = (const uchar*)p2;
787     unsigned m1 = uint2korr(v1);
788     unsigned m2 = n2;
789     if (lb + m1 <= n1) {
790       const char* w1 = (const char*)v1 + lb;
791       const char* w2 = (const char*)v2;
792       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
793       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
794       return k == 0 ? 0 : +1;
795     }
796   }
797   return -1;
798 }
799 
800 int
likeLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)801 NdbSqlUtil::likeLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
802 {
803   assert(info == 0);
804   return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
805 }
806 
807 
808 // mask Functions
809 
810 int
maskBit(const void * data,unsigned dataLen,const void * mask,unsigned maskLen,bool cmpZero)811 NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero)
812 {
813   /* Bitfields are stored in word oriented form, so we must compare them in that
814    * style as well
815    * It is assumed that insignificant bits (but not bytes) in the passed values
816    * are zeroed
817    */
818   const Uint32 bytes = MIN(dataLen, maskLen);
819   Uint32 words = (bytes + 3) >> 2;
820 
821   /* Don't expect either value to be length zero */
822   assert(words);
823 
824   /* Check ptr alignment */
825   if (unlikely(((((UintPtr)data) & 3) != 0) ||
826                ((((UintPtr)mask) & 3) != 0)))
827   {
828     Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ];
829     Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ];
830     memcpy(copydata, data, words << 2);
831     memcpy(copymask, mask, words << 2);
832 
833     return maskBit(data, bytes, mask, bytes, cmpZero);
834   }
835 
836   const Uint32* wdata= (const Uint32*) data;
837   const Uint32* wmask= (const Uint32*) mask;
838 
839   if (cmpZero)
840   {
841     while (--words)
842     {
843       if ((*(wdata++) & *(wmask++)) != 0)
844         return 1;
845     }
846 
847     /* For the last word, we mask out any insignificant bytes */
848     const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
849     const Uint32 comparisonMask= sigBytes?
850       (1 << (sigBytes *8)) -1 :
851       ~0;
852     const Uint32 lastDataWord= *wdata & comparisonMask;
853     const Uint32 lastMaskWord= *wmask & comparisonMask;
854 
855     if ((lastDataWord & lastMaskWord) != 0)
856       return 1;
857 
858     return 0;
859   }
860   else
861   {
862     while (--words)
863     {
864       if ((*(wdata++) & *wmask) != *wmask)
865         return 1;
866 
867       wmask++;
868     }
869 
870     /* For the last word, we mask out any insignificant bytes */
871     const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
872     const Uint32 comparisonMask= sigBytes?
873       (1 << (sigBytes *8)) -1 :
874       ~0;
875     const Uint32 lastDataWord= *wdata & comparisonMask;
876     const Uint32 lastMaskWord= *wmask & comparisonMask;
877 
878     if ((lastDataWord & lastMaskWord) != lastMaskWord)
879       return 1;
880 
881     return 0;
882   }
883 }
884 
885 
886 // check charset
887 
888 uint
check_column_for_pk(Uint32 typeId,const void * info)889 NdbSqlUtil::check_column_for_pk(Uint32 typeId, const void* info)
890 {
891   const Type& type = getType(typeId);
892   switch (type.m_typeId) {
893   case Type::Char:
894   case Type::Varchar:
895   case Type::Longvarchar:
896     {
897       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
898       if(cs != 0 &&
899          cs->cset != 0 &&
900          cs->coll != 0 &&
901          cs->coll->strnxfrm != 0 &&
902          cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
903         return 0;
904       else
905         return 743;
906     }
907     break;
908   case Type::Undefined:
909   case Type::Blob:
910   case Type::Text:
911   case Type::Bit:
912     break;
913   default:
914     return 0;
915   }
916   return 906;
917 }
918 
919 uint
check_column_for_hash_index(Uint32 typeId,const void * info)920 NdbSqlUtil::check_column_for_hash_index(Uint32 typeId, const void* info)
921 {
922   return check_column_for_pk(typeId, info);
923 }
924 
925 uint
check_column_for_ordered_index(Uint32 typeId,const void * info)926 NdbSqlUtil::check_column_for_ordered_index(Uint32 typeId, const void* info)
927 {
928   const Type& type = getType(typeId);
929   if (type.m_cmp == NULL)
930     return false;
931   switch (type.m_typeId) {
932   case Type::Char:
933   case Type::Varchar:
934   case Type::Longvarchar:
935     {
936       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
937       if (cs != 0 &&
938           cs->cset != 0 &&
939           cs->coll != 0 &&
940           cs->coll->strnxfrm != 0 &&
941           cs->coll->strnncollsp != 0 &&
942           cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
943         return 0;
944       else
945         return 743;
946     }
947     break;
948   case Type::Undefined:
949   case Type::Blob:
950   case Type::Text:
951   case Type::Bit:       // can be fixed
952     break;
953   default:
954     return 0;
955   }
956   return 906;
957 }
958 
959 // utilities
960 
961 bool
get_var_length(Uint32 typeId,const void * p,unsigned attrlen,Uint32 & lb,Uint32 & len)962 NdbSqlUtil::get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len)
963 {
964   const unsigned char* const src = (const unsigned char*)p;
965   switch (typeId) {
966   case NdbSqlUtil::Type::Varchar:
967   case NdbSqlUtil::Type::Varbinary:
968     lb = 1;
969     if (attrlen >= lb) {
970       len = src[0];
971       if (attrlen >= lb + len)
972         return true;
973     }
974     break;
975   case NdbSqlUtil::Type::Longvarchar:
976   case NdbSqlUtil::Type::Longvarbinary:
977     lb = 2;
978     if (attrlen >= lb) {
979       len = src[0] + (src[1] << 8);
980       if (attrlen >= lb + len)
981         return true;
982     }
983     break;
984   default:
985     lb = 0;
986     len = attrlen;
987     return true;
988     break;
989   }
990   return false;
991 }
992 
993 
994 size_t
ndb_strnxfrm(struct charset_info_st * cs,uchar * dst,size_t dstlen,const uchar * src,size_t srclen)995 NdbSqlUtil::ndb_strnxfrm(struct charset_info_st * cs,
996                          uchar *dst, size_t dstlen,
997                          const uchar *src, size_t srclen)
998 {
999 #if NDB_MYSQL_VERSION_D < NDB_MAKE_VERSION(5,6,0)
1000   return (*cs->coll->strnxfrm)(cs, dst, dstlen, src, srclen);
1001 #else
1002   /*
1003     strnxfrm has got two new parameters in 5.6, we are using the
1004     defaults for those and can thus easily calculate them from
1005     existing params
1006   */
1007   return  (*cs->coll->strnxfrm)(cs, dst, dstlen, (uint)dstlen,
1008                                 src, srclen, MY_STRXFRM_PAD_WITH_SPACE);
1009 #endif
1010 }
1011 
1012 // workaround
1013 
1014 int
strnxfrm_bug7284(CHARSET_INFO * cs,unsigned char * dst,unsigned dstLen,const unsigned char * src,unsigned srcLen)1015 NdbSqlUtil::strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen)
1016 {
1017   unsigned char nsp[20]; // native space char
1018   unsigned char xsp[20]; // strxfrm-ed space char
1019 #ifdef VM_TRACE
1020   memset(nsp, 0x1f, sizeof(nsp));
1021   memset(xsp, 0x1f, sizeof(xsp));
1022 #endif
1023   // convert from unicode codepoint for space
1024   int n1 = (*cs->cset->wc_mb)(cs, (my_wc_t)0x20, nsp, nsp + sizeof(nsp));
1025   if (n1 <= 0)
1026     return -1;
1027   // strxfrm to binary
1028   int n2 = (int)ndb_strnxfrm(cs, xsp, sizeof(xsp), nsp, n1);
1029   if (n2 <= 0)
1030     return -1;
1031   // XXX bug workaround - strnxfrm may not write full string
1032   memset(dst, 0x0, dstLen);
1033   // strxfrm argument string - returns no error indication
1034   int n3 = (int)ndb_strnxfrm(cs, dst, dstLen, src, srcLen);
1035   // pad with strxfrm-ed space chars
1036   int n4 = n3;
1037   while (n4 < (int)dstLen) {
1038     dst[n4] = xsp[(n4 - n3) % n2];
1039     n4++;
1040   }
1041   // no check for partial last
1042   return dstLen;
1043 }
1044 
1045 #if defined(WORDS_BIGENDIAN) || defined (VM_TRACE)
1046 
1047 static
determineParams(Uint32 typeId,Uint32 typeLog2Size,Uint32 arrayType,Uint32 arraySize,Uint32 dataByteSize,Uint32 & convSize,Uint32 & convLen)1048 void determineParams(Uint32 typeId,
1049                      Uint32 typeLog2Size,
1050                      Uint32 arrayType,
1051                      Uint32 arraySize,
1052                      Uint32 dataByteSize,
1053                      Uint32& convSize,
1054                      Uint32& convLen)
1055 {
1056   /* Some types need 'normal behaviour' over-ridden in
1057    * terms of endian-ness conversions
1058    */
1059   convSize = 0;
1060   convLen = 0;
1061   switch(typeId)
1062   {
1063   case NdbSqlUtil::Type::Datetime:
1064   {
1065     // Datetime is stored as 8x8, should be twiddled as 64 bit
1066     assert(typeLog2Size == 3);
1067     assert(arraySize == 8);
1068     assert(dataByteSize == 8);
1069     convSize = 64;
1070     convLen = 1;
1071     break;
1072   }
1073   case NdbSqlUtil::Type::Timestamp:
1074   {
1075     // Timestamp is stored as 4x8, should be twiddled as 32 bit
1076     assert(typeLog2Size == 3);
1077     assert(arraySize == 4);
1078     assert(dataByteSize == 4);
1079     convSize = 32;
1080     convLen = 1;
1081     break;
1082   }
1083   case NdbSqlUtil::Type::Bit:
1084   {
1085     // Bit is stored as bits, should be twiddled as 32 bit
1086     assert(typeLog2Size == 0);
1087     convSize = 32;
1088     convLen = (arraySize + 31)/32;
1089     break;
1090   }
1091   case NdbSqlUtil::Type::Blob:
1092   case NdbSqlUtil::Type::Text:
1093   {
1094     if (arrayType == NDB_ARRAYTYPE_FIXED)
1095     {
1096       // Length of fixed size blob which is stored in first 64 bit's
1097       // has to be twiddled, the remaining byte stream left as is
1098       assert(typeLog2Size == 3);
1099       assert(arraySize > 8);
1100       assert(dataByteSize > 8);
1101       convSize = 64;
1102       convLen = 1;
1103       break;
1104     }
1105     // Fall through for Blob v2
1106   }
1107   default:
1108     /* Default determined by meta-info */
1109     convSize = 1 << typeLog2Size;
1110     convLen = arraySize;
1111     break;
1112   }
1113 
1114   const Uint32 unitBytes = (convSize >> 3);
1115 
1116   if (dataByteSize < (unitBytes * convLen))
1117   {
1118     /* Actual data is shorter than expected, could
1119      * be VAR type or bad FIXED data (which some other
1120      * code should detect + handle)
1121      * We reduce convLen to avoid trampling
1122      */
1123     assert((dataByteSize % unitBytes) == 0);
1124     convLen = dataByteSize / unitBytes;
1125   }
1126 
1127   assert(convSize);
1128   assert(convLen);
1129   assert(dataByteSize >= (unitBytes * convLen));
1130 }
1131 
1132 static
doConvert(Uint32 convSize,Uint32 convLen,uchar * data)1133 void doConvert(Uint32 convSize,
1134                Uint32 convLen,
1135                uchar* data)
1136 {
1137   switch(convSize)
1138   {
1139   case 8:
1140     /* Nothing to swap */
1141     break;
1142   case 16:
1143   {
1144     Uint16* ptr = (Uint16*)data;
1145     for (Uint32 i = 0; i < convLen; i++){
1146       Uint16 val =
1147         ((*ptr & 0xFF00) >> 8) |
1148         ((*ptr & 0x00FF) << 8);
1149       *ptr = val;
1150       ptr++;
1151     }
1152     break;
1153   }
1154   case 32:
1155   {
1156     Uint32* ptr = (Uint32*)data;
1157     for (Uint32 i = 0; i < convLen; i++){
1158       Uint32 val =
1159         ((*ptr & 0xFF000000) >> 24) |
1160         ((*ptr & 0x00FF0000) >> 8)  |
1161         ((*ptr & 0x0000FF00) << 8)  |
1162         ((*ptr & 0x000000FF) << 24);
1163       *ptr = val;
1164       ptr++;
1165     }
1166     break;
1167   }
1168   case 64:
1169   {
1170     Uint64* ptr = (Uint64*)data;
1171     for (Uint32 i = 0; i < convLen; i++){
1172       Uint64 val =
1173         ((*ptr & (Uint64)0xFF00000000000000LL) >> 56) |
1174         ((*ptr & (Uint64)0x00FF000000000000LL) >> 40) |
1175         ((*ptr & (Uint64)0x0000FF0000000000LL) >> 24) |
1176         ((*ptr & (Uint64)0x000000FF00000000LL) >> 8)  |
1177         ((*ptr & (Uint64)0x00000000FF000000LL) << 8)  |
1178         ((*ptr & (Uint64)0x0000000000FF0000LL) << 24) |
1179         ((*ptr & (Uint64)0x000000000000FF00LL) << 40) |
1180         ((*ptr & (Uint64)0x00000000000000FFLL) << 56);
1181       *ptr = val;
1182       ptr++;
1183     }
1184     break;
1185   }
1186   default:
1187     abort();
1188     break;
1189   }
1190 }
1191 #endif
1192 
1193 /**
1194  * Convert attribute byte order if necessary
1195  */
1196 void
convertByteOrder(Uint32 typeId,Uint32 typeLog2Size,Uint32 arrayType,Uint32 arraySize,uchar * data,Uint32 dataByteSize)1197 NdbSqlUtil::convertByteOrder(Uint32 typeId,
1198                              Uint32 typeLog2Size,
1199                              Uint32 arrayType,
1200                              Uint32 arraySize,
1201                              uchar* data,
1202                              Uint32 dataByteSize)
1203 {
1204 #if defined(WORDS_BIGENDIAN) || defined (VM_TRACE)
1205   Uint32 convSize;
1206   Uint32 convLen;
1207   determineParams(typeId,
1208                   typeLog2Size,
1209                   arrayType,
1210                   arraySize,
1211                   dataByteSize,
1212                   convSize,
1213                   convLen);
1214 
1215   size_t mask = (((size_t) convSize) >> 3) -1; // Bottom 0,1,2,3 bits set
1216   bool aligned = (((size_t) data) & mask) == 0;
1217   uchar* dataPtr = data;
1218   const Uint32 bufSize = (MAX_TUPLE_SIZE_IN_WORDS + 1)/2;
1219   Uint64 alignedBuf[bufSize];
1220 
1221   if (!aligned)
1222   {
1223     assert(dataByteSize <= 4 * MAX_TUPLE_SIZE_IN_WORDS);
1224     memcpy(alignedBuf, data, dataByteSize);
1225     dataPtr = (uchar*) alignedBuf;
1226   }
1227 
1228   /* Now have convSize and convLen, do the conversion */
1229   doConvert(convSize,
1230             convLen,
1231             dataPtr);
1232 
1233 #ifdef VM_TRACE
1234 #ifndef WORDS_BIGENDIAN
1235   // VM trace on little-endian performs double-convert
1236   doConvert(convSize,
1237             convLen,
1238             dataPtr);
1239 #endif
1240 #endif
1241 
1242   if (!aligned)
1243   {
1244     memcpy(data, alignedBuf, dataByteSize);
1245   }
1246 #endif
1247 }
1248 
1249 // unpack and pack date/time types
1250 
1251 // Year
1252 
1253 void
unpack_year(Year & s,const uchar * d)1254 NdbSqlUtil::unpack_year(Year& s, const uchar* d)
1255 {
1256   s.year = (uint)(1900 + d[0]);
1257 }
1258 
1259 void
pack_year(const Year & s,uchar * d)1260 NdbSqlUtil::pack_year(const Year& s, uchar* d)
1261 {
1262   d[0] = (uchar)(s.year - 1900);
1263 }
1264 
1265 // Date
1266 
1267 void
unpack_date(Date & s,const uchar * d)1268 NdbSqlUtil::unpack_date(Date& s, const uchar* d)
1269 {
1270   uchar b[4];
1271   memcpy(b, d, 3);
1272   b[3] = 0;
1273   uint w = (uint)uint3korr(b);
1274   s.day = (w & 31);
1275   w >>= 5;
1276   s.month = (w & 15);
1277   w >>= 4;
1278   s.year = w;
1279 }
1280 
1281 void
pack_date(const Date & s,uchar * d)1282 NdbSqlUtil::pack_date(const Date& s, uchar* d)
1283 {
1284   uint w = 0;
1285   w |= s.year;
1286   w <<= 4;
1287   w |= s.month;
1288   w <<= 5;
1289   w |= s.day;
1290   int3store(d, w);
1291 }
1292 
1293 // Time
1294 
1295 void
unpack_time(Time & s,const uchar * d)1296 NdbSqlUtil::unpack_time(Time& s, const uchar* d)
1297 {
1298   uchar b[4];
1299   memcpy(b, d, 3);
1300   b[3] = 0;
1301   uint w = 0;
1302   int v = (int)sint3korr(b);
1303   if (v >= 0)
1304   {
1305     s.sign = 1;
1306     w = (uint)v;
1307   }
1308   else
1309   {
1310     s.sign = 0;
1311     w = (uint)(-v);
1312   }
1313   const uint f = (uint)100;
1314   s.second = (w % f);
1315   w /= f;
1316   s.minute = (w % f);
1317   w /= f;
1318   s.hour = w;
1319 }
1320 
1321 void
pack_time(const Time & s,uchar * d)1322 NdbSqlUtil::pack_time(const Time& s, uchar* d)
1323 {
1324   const uint f = (uint)100;
1325   uint w = 0;
1326   w += s.hour;
1327   w *= f;
1328   w += s.minute;
1329   w *= f;
1330   w += s.second;
1331   int v = 0;
1332   if (s.sign == 1)
1333   {
1334     v = (int)w;
1335   }
1336   else
1337   {
1338     v = (int)w;
1339     v = -v;
1340   }
1341   int3store(d, v);
1342 }
1343 
1344 // Datetime
1345 
1346 void
unpack_datetime(Datetime & s,const uchar * d)1347 NdbSqlUtil::unpack_datetime(Datetime& s, const uchar* d)
1348 {
1349   uint64 w;
1350   memcpy(&w, d, 8);
1351   const uint64 f = (uint64)100;
1352   s.second = (w % f);
1353   w /= f;
1354   s.minute = (w % f);
1355   w /= f;
1356   s.hour = (w % f);
1357   w /= f;
1358   s.day = (w % f);
1359   w /= f;
1360   s.month = (w % f);
1361   w /= f;
1362   s.year = (uint)w;
1363 }
1364 
1365 void
pack_datetime(const Datetime & s,uchar * d)1366 NdbSqlUtil::pack_datetime(const Datetime& s, uchar* d)
1367 {
1368   const uint64 f = (uint64)100;
1369   uint64 w = 0;
1370   w += s.year;
1371   w *= f;
1372   w += s.month;
1373   w *= f;
1374   w += s.day;
1375   w *= f;
1376   w += s.hour;
1377   w *= f;
1378   w += s.minute;
1379   w *= f;
1380   w += s.second;
1381   memcpy(d, &w, 8);
1382 }
1383 
1384 // Timestamp
1385 
1386 void
unpack_timestamp(Timestamp & s,const uchar * d)1387 NdbSqlUtil::unpack_timestamp(Timestamp& s, const uchar* d)
1388 {
1389   uint32 w;
1390   memcpy(&w, d, 4);
1391   s.second = (uint)w;
1392 }
1393 
1394 void
pack_timestamp(const Timestamp & s,uchar * d)1395 NdbSqlUtil::pack_timestamp(const Timestamp& s, uchar* d)
1396 {
1397   uint32 w = s.second;
1398   memcpy(d, &w, 4);
1399 }
1400 
1401 // types with fractional seconds
1402 
1403 static uint64
unpack_bigendian(const uchar * d,uint len)1404 unpack_bigendian(const uchar* d, uint len)
1405 {
1406   assert(len <= 8);
1407   uint64 val = 0;
1408   int i = (int)len;
1409   int s = 0;
1410   while (i != 0)
1411   {
1412     i--;
1413     uint64 v = d[i];
1414     val += (v << s);
1415     s += 8;
1416   }
1417   return val;
1418 }
1419 
1420 static void
pack_bigendian(uint64 val,uchar * d,uint len)1421 pack_bigendian(uint64 val, uchar* d, uint len)
1422 {
1423   uchar b[8];
1424   uint i = 0;
1425   while (i  < len)
1426   {
1427     b[i] = (uchar)(val & 255);
1428     val >>= 8;
1429     i++;
1430   }
1431   uint j = 0;
1432   while (i != 0)
1433   {
1434     i--;
1435     d[i] = b[j];
1436     j++;
1437   }
1438 }
1439 
1440 // Time2 : big-endian time(3 bytes).fraction(0-3 bytes)
1441 
1442 void
unpack_time2(Time2 & s,const uchar * d,uint prec)1443 NdbSqlUtil::unpack_time2(Time2& s, const uchar* d, uint prec)
1444 {
1445   const uint64 one = (uint64)1;
1446   uint flen = (1 + prec) / 2;
1447   uint fbit = 8 * flen;
1448   uint64 val = unpack_bigendian(&d[0], 3 + flen);
1449   uint spos = 23 + fbit;
1450   uint sign = (uint)((val & (one << spos)) >> spos);
1451   if (sign == 0) // negative
1452     val = (one << spos) - val;
1453   uint64 w = (val >> fbit);
1454   s.second = (uint)(w & 63);
1455   w >>= 6;
1456   s.minute = (uint)(w & 63);
1457   w >>= 6;
1458   s.hour = (uint)(w & 1023);
1459   w >>= 10;
1460   s.interval = (uint)(w & 1);
1461   w >>= 1;
1462   s.sign = sign;
1463   uint f = (uint)(val & ((one << fbit) - 1));
1464   if (prec % 2 != 0)
1465     f /= 10;
1466   s.fraction = f;
1467 }
1468 
1469 void
pack_time2(const Time2 & s,uchar * d,uint prec)1470 NdbSqlUtil::pack_time2(const Time2& s, uchar* d, uint prec)
1471 {
1472   const uint64 one = (uint64)1;
1473   uint flen = (1 + prec) / 2;
1474   uint fbit = 8 * flen;
1475   uint spos = 23 + fbit;
1476   uint64 w = 0;
1477   w |= s.sign;
1478   w <<= 1;
1479   w |= s.interval;
1480   w <<= 10;
1481   w |= s.hour;
1482   w <<= 6;
1483   w |= s.minute;
1484   w <<= 6;
1485   w |= s.second;
1486   uint f = s.fraction;
1487   if (prec % 2 != 0)
1488     f *= 10;
1489   uint64 val = (w << fbit) | f;
1490   if (s.sign == 0)
1491     val = (one << spos) - val;
1492   pack_bigendian(val, &d[0], 3 + flen);
1493 }
1494 
1495 // Datetime2 : big-endian date(5 bytes).fraction(0-3 bytes)
1496 
1497 void
unpack_datetime2(Datetime2 & s,const uchar * d,uint prec)1498 NdbSqlUtil::unpack_datetime2(Datetime2& s, const uchar* d, uint prec)
1499 {
1500   const uint64 one = (uint64)1;
1501   uint flen = (1 + prec) / 2;
1502   uint fbit = 8 * flen;
1503   uint64 val = unpack_bigendian(&d[0], 5 + flen);
1504   uint spos = 39 + fbit;
1505   uint sign = (uint)((val & (one << spos)) >> spos);
1506   if (sign == 0) // negative
1507     val = (one << spos) - val;
1508   uint64 w = (val >> fbit);
1509   s.second = (uint)(w & 63);
1510   w >>= 6;
1511   s.minute = (uint)(w & 63);
1512   w >>= 6;
1513   s.hour = (uint)(w & 31);
1514   w >>= 5;
1515   s.day = (uint)(w & 31);
1516   w >>= 5;
1517   uint year_month = (uint)(w & ((1 << 17) - 1));
1518   s.month = year_month % 13;
1519   s.year = year_month / 13;
1520   w >>= 17;
1521   s.sign = sign;
1522   uint f = (uint)(val & ((one << fbit) - 1));
1523   if (prec % 2 != 0)
1524     f /= 10;
1525   s.fraction = f;
1526 }
1527 
1528 void
pack_datetime2(const Datetime2 & s,uchar * d,uint prec)1529 NdbSqlUtil::pack_datetime2(const Datetime2& s, uchar* d, uint prec)
1530 {
1531   const uint64 one = (uint64)1;
1532   uint flen = (1 + prec) / 2;
1533   uint fbit = 8 * flen;
1534   uint spos = 39 + fbit;
1535   uint64 w = 0;
1536   w |= s.sign;
1537   w <<= 17;
1538   w |= (s.year * 13 + s.month);
1539   w <<= 5;
1540   w |= s.day;
1541   w <<= 5;
1542   w |= s.hour;
1543   w <<= 6;
1544   w |= s.minute;
1545   w <<= 6;
1546   w |= s.second;
1547   uint f = s.fraction;
1548   if (prec % 2 != 0)
1549     f *= 10;
1550   uint64 val = (w << fbit) | f;
1551   if (s.sign == 0)
1552     val = (one << spos) - val;
1553   pack_bigendian(val, &d[0], 5 + flen);
1554 }
1555 
1556 // Timestamp2 : big-endian non-negative unix time
1557 
1558 void
unpack_timestamp2(Timestamp2 & s,const uchar * d,uint prec)1559 NdbSqlUtil::unpack_timestamp2(Timestamp2& s, const uchar* d, uint prec)
1560 {
1561   uint flen = (1 + prec) / 2;
1562   uint w = (uint)unpack_bigendian(&d[0], 4);
1563   s.second = w;
1564   uint f = (uint)unpack_bigendian(&d[4], flen);
1565   if (prec % 2 != 0)
1566     f /= 10;
1567   s.fraction = f;
1568 }
1569 
1570 void
pack_timestamp2(const Timestamp2 & s,uchar * d,uint prec)1571 NdbSqlUtil::pack_timestamp2(const Timestamp2& s, uchar* d, uint prec)
1572 {
1573   uint flen = (1 + prec) / 2;
1574   uint w = s.second;
1575   pack_bigendian(w, &d[0], 4);
1576   uint f = s.fraction;
1577   if (prec % 2 != 0)
1578     f *= 10;
1579   pack_bigendian(f, &d[4], flen);
1580 }
1581 
1582 #ifdef TEST_NDB_SQL_UTIL
1583 
1584 /*
1585  * Before using the pack/unpack test one must verify correctness
1586  * of unpack methods via sql and ndb_select_all.  Otherwise we are
1587  * just testing pack/unpack of some fantasy formats.
1588  */
1589 
1590 #include <util/NdbTap.hpp>
1591 #include <ndb_rand.h>
1592 #include <NdbOut.hpp>
1593 #include <NdbEnv.h>
1594 
1595 #define chk1(x) \
1596   do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; \
1597        require(false); } while (0)
1598 
1599 #define lln(x, n) \
1600   do { if (verbose < n) break; ndbout << x << endl; } while (0)
1601 
1602 #define ll0(x) lln(x, 0)
1603 #define ll1(x) lln(x, 1)
1604 
1605 static int seed = -1; // random
1606 static int loops = 10;
1607 static int subloops = 100000;
1608 static int verbose = 0;
1609 
1610 static const uint maxprec = 6;
1611 
1612 static uint maxfrac[1 + maxprec] = {
1613   0, 9, 99, 999, 9999, 99999, 999999
1614 };
1615 
1616 static uint
getrand(uint m)1617 getrand(uint m)
1618 {
1619   assert(m != 0);
1620   uint n = ndb_rand();
1621   return n % m;
1622 }
1623 
1624 static uint
getrand(uint m1,uint m2,bool & nz)1625 getrand(uint m1, uint  m2, bool& nz)
1626 {
1627   assert(m1 <= m2);
1628   // test zero and min and max more often
1629   uint n = 0;
1630   switch (getrand(10)) {
1631   case 0:
1632     n = 0;
1633     break;
1634   case 1:
1635     n = m1;
1636     break;
1637   case 2:
1638     n = m2;
1639     break;
1640   default:
1641     n =  m1 + getrand(m2 - m1 + 1);
1642     break;
1643   }
1644   if (n != 0)
1645     nz = true;
1646   return n;
1647 }
1648 
1649 static void
cmpyear(const NdbSqlUtil::Year & s1,const NdbSqlUtil::Year & s2)1650 cmpyear(const NdbSqlUtil::Year& s1,
1651         const NdbSqlUtil::Year& s2)
1652 {
1653   chk1(s1.year == s2.year);
1654 }
1655 
1656 static void
testyear()1657 testyear()
1658 {
1659   NdbSqlUtil::Year s1;
1660   NdbSqlUtil::Year s2;
1661   uchar d1[20];
1662   uchar d2[20];
1663   memset(&s1, 0x1f, sizeof(s1));
1664   memset(&s2, 0x1f, sizeof(s2));
1665   memset(d1, 0x1f, sizeof(d1));
1666   memset(d2, 0x1f, sizeof(d2));
1667   bool nz = false;
1668   s1.year = getrand(1900, 2155, nz);
1669   if (!nz)
1670     s1.year = 1900;
1671   NdbSqlUtil::pack_year(s1, d1);
1672   NdbSqlUtil::unpack_year(s2, d1);
1673   cmpyear(s1, s2);
1674   NdbSqlUtil::pack_year(s2, d2);
1675   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1676 }
1677 
1678 static void
loopyear()1679 loopyear()
1680 {
1681   for (int i = 0; i < subloops; i++) {
1682     testyear();
1683   }
1684 }
1685 
1686 static void
cmpdate(const NdbSqlUtil::Date & s1,const NdbSqlUtil::Date & s2)1687 cmpdate(const NdbSqlUtil::Date& s1,
1688         const NdbSqlUtil::Date& s2)
1689 {
1690   chk1(s1.year == s2.year);
1691   chk1(s1.month == s2.month);
1692   chk1(s1.day == s2.day);
1693 }
1694 
1695 static void
testdate()1696 testdate()
1697 {
1698   NdbSqlUtil::Date s1;
1699   NdbSqlUtil::Date s2;
1700   uchar d1[20];
1701   uchar d2[20];
1702   memset(&s1, 0x1f, sizeof(s1));
1703   memset(&s2, 0x1f, sizeof(s2));
1704   memset(d1, 0x1f, sizeof(d1));
1705   memset(d2, 0x1f, sizeof(d2));
1706   bool nz = false;
1707   s1.year = getrand(1000, 9999, nz);
1708   s1.month = getrand(1, 12, nz);
1709   s1.day = getrand(1, 31, nz);
1710   if (!nz)
1711     s1.year = 1900;
1712   NdbSqlUtil::pack_date(s1, d1);
1713   NdbSqlUtil::unpack_date(s2, d1);
1714   cmpdate(s1, s2);
1715   NdbSqlUtil::pack_date(s2, d2);
1716   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1717 }
1718 
1719 static void
loopdate()1720 loopdate()
1721 {
1722   for (int i = 0; i < subloops; i++) {
1723     testdate();
1724   }
1725 }
1726 
1727 static void
cmptime(const NdbSqlUtil::Time & s1,const NdbSqlUtil::Time & s2)1728 cmptime(const NdbSqlUtil::Time& s1,
1729         const NdbSqlUtil::Time& s2)
1730 {
1731   chk1(s1.sign == s2.sign);
1732   chk1(s1.hour == s2.hour);
1733   chk1(s1.minute == s2.minute);
1734   chk1(s1.second == s2.second);
1735 }
1736 
1737 static void
testtime()1738 testtime()
1739 {
1740   NdbSqlUtil::Time s1;
1741   NdbSqlUtil::Time s2;
1742   uchar d1[20];
1743   uchar d2[20];
1744   memset(&s1, 0x1f, sizeof(s1));
1745   memset(&s2, 0x1f, sizeof(s2));
1746   memset(d1, 0x1f, sizeof(d1));
1747   memset(d2, 0x1f, sizeof(d2));
1748   bool nz = false;
1749   s1.sign = getrand(0, 1, nz);
1750   s1.hour = getrand(0, 838, nz);
1751   s1.minute = getrand(0, 59, nz);
1752   s1.second = getrand(0, 59, nz);
1753   if (!nz)
1754     s1.sign = 1;
1755   NdbSqlUtil::pack_time(s1, d1);
1756   NdbSqlUtil::unpack_time(s2, d1);
1757   cmptime(s1, s2);
1758   NdbSqlUtil::pack_time(s2, d2);
1759   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1760 }
1761 
1762 static void
looptime()1763 looptime()
1764 {
1765   for (int i = 0; i < subloops; i++) {
1766     testtime();
1767   }
1768 }
1769 
1770 static void
cmpdatetime(const NdbSqlUtil::Datetime & s1,const NdbSqlUtil::Datetime & s2)1771 cmpdatetime(const NdbSqlUtil::Datetime& s1,
1772             const NdbSqlUtil::Datetime& s2)
1773 {
1774   chk1(s1.year == s2.year);
1775   chk1(s1.month == s2.month);
1776   chk1(s1.day == s2.day);
1777   chk1(s1.hour == s2.hour);
1778   chk1(s1.minute == s2.minute);
1779   chk1(s1.second == s2.second);
1780 }
1781 
1782 static void
testdatetime()1783 testdatetime()
1784 {
1785   NdbSqlUtil::Datetime s1;
1786   NdbSqlUtil::Datetime s2;
1787   uchar d1[20];
1788   uchar d2[20];
1789   memset(&s1, 0x1f, sizeof(s1));
1790   memset(&s2, 0x1f, sizeof(s2));
1791   memset(d1, 0x1f, sizeof(d1));
1792   memset(d2, 0x1f, sizeof(d2));
1793   bool nz = false;
1794   s1.year = getrand(1000, 9999, nz);
1795   s1.month = getrand(1, 12, nz);
1796   s1.day = getrand(1, 31, nz);
1797   s1.hour = getrand(0, 23, nz);
1798   s1.minute = getrand(0, 59, nz);
1799   s1.second = getrand(0, 59, nz);
1800   NdbSqlUtil::pack_datetime(s1, d1);
1801   NdbSqlUtil::unpack_datetime(s2, d1);
1802   cmpdatetime(s1, s2);
1803   NdbSqlUtil::pack_datetime(s2, d2);
1804   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1805 }
1806 
1807 static void
loopdatetime()1808 loopdatetime()
1809 {
1810   for (int i = 0; i < subloops; i++) {
1811     testdatetime();
1812   }
1813 }
1814 
1815 static void
cmptimestamp(const NdbSqlUtil::Timestamp & s1,const NdbSqlUtil::Timestamp & s2)1816 cmptimestamp(const NdbSqlUtil::Timestamp& s1,
1817              const NdbSqlUtil::Timestamp& s2)
1818 {
1819   chk1(s1.second == s2.second);
1820 }
1821 
1822 static void
testtimestamp()1823 testtimestamp()
1824 {
1825   NdbSqlUtil::Timestamp s1;
1826   NdbSqlUtil::Timestamp s2;
1827   uchar d1[20];
1828   uchar d2[20];
1829   memset(&s1, 0x1f, sizeof(s1));
1830   memset(&s2, 0x1f, sizeof(s2));
1831   memset(d1, 0x1f, sizeof(d1));
1832   memset(d2, 0x1f, sizeof(d2));
1833   bool nz = false;
1834   s1.second = getrand(0, 59, nz);
1835   NdbSqlUtil::pack_timestamp(s1, d1);
1836   NdbSqlUtil::unpack_timestamp(s2, d1);
1837   cmptimestamp(s1, s2);
1838   NdbSqlUtil::pack_timestamp(s2, d2);
1839   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1840 }
1841 
1842 static void
looptimestamp()1843 looptimestamp()
1844 {
1845   for (int i = 0; i < subloops; i++) {
1846     testtimestamp();
1847   }
1848 }
1849 
1850 static void
cmptime2(const NdbSqlUtil::Time2 & s1,const NdbSqlUtil::Time2 & s2)1851 cmptime2(const NdbSqlUtil::Time2& s1,
1852          const NdbSqlUtil::Time2& s2)
1853 {
1854   chk1(s1.sign == s2.sign);
1855   chk1(s1.interval == s2.interval);
1856   chk1(s1.hour == s2.hour);
1857   chk1(s1.minute == s2.minute);
1858   chk1(s1.second == s2.second);
1859   chk1(s1.fraction == s2.fraction);
1860 }
1861 
1862 static void
testtime2(uint prec)1863 testtime2(uint prec)
1864 {
1865   NdbSqlUtil::Time2 s1;
1866   NdbSqlUtil::Time2 s2;
1867   uchar d1[20];
1868   uchar d2[20];
1869   memset(&s1, 0x1f, sizeof(s1));
1870   memset(&s2, 0x1f, sizeof(s2));
1871   memset(d1, 0x1f, sizeof(d1));
1872   memset(d2, 0x1f, sizeof(d2));
1873   bool nz = false;
1874   s1.sign = getrand(0, 1, nz);
1875   s1.interval = 0;
1876   s1.hour = getrand(0, 838, nz);
1877   s1.minute = getrand(0, 59, nz);
1878   s1.second = getrand(0, 59, nz);
1879   s1.fraction = getrand(0, maxfrac[prec], nz);
1880   if (!nz)
1881     s1.sign = 1;
1882   NdbSqlUtil::pack_time2(s1, d1, prec);
1883   NdbSqlUtil::unpack_time2(s2, d1, prec);
1884   cmptime2(s1, s2);
1885   NdbSqlUtil::pack_time2(s2, d2, prec);
1886   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1887 }
1888 
1889 static void
looptime2()1890 looptime2()
1891 {
1892   for (uint prec = 0; prec <= maxprec; prec++) {
1893     for (int i = 0; i < subloops; i++) {
1894       testtime2(prec);
1895     }
1896   }
1897 }
1898 
1899 static void
cmpdatetime2(const NdbSqlUtil::Datetime2 & s1,const NdbSqlUtil::Datetime2 & s2)1900 cmpdatetime2(const NdbSqlUtil::Datetime2& s1,
1901              const NdbSqlUtil::Datetime2& s2)
1902 {
1903   chk1(s1.sign == s2.sign);
1904   chk1(s1.year == s2.year);
1905   chk1(s1.month == s2.month);
1906   chk1(s1.day == s2.day);
1907   chk1(s1.hour == s2.hour);
1908   chk1(s1.minute == s2.minute);
1909   chk1(s1.second == s2.second);
1910   chk1(s1.fraction == s2.fraction);
1911 }
1912 
1913 static void
testdatetime2(uint prec)1914 testdatetime2(uint prec)
1915 {
1916   NdbSqlUtil::Datetime2 s1;
1917   NdbSqlUtil::Datetime2 s2;
1918   uchar d1[20];
1919   uchar d2[20];
1920   memset(&s1, 0x1f, sizeof(s1));
1921   memset(&s2, 0x1f, sizeof(s2));
1922   memset(d1, 0x1f, sizeof(d1));
1923   memset(d2, 0x1f, sizeof(d2));
1924   bool nz = false;
1925   s1.sign = getrand(0, 1, nz); // negative not yet in MySQL
1926   s1.year = getrand(0, 9999, nz);
1927   s1.month = getrand(1, 12, nz);
1928   s1.day = getrand(1, 31, nz);
1929   s1.hour = getrand(0, 23, nz);
1930   s1.minute = getrand(0, 59, nz);
1931   s1.second = getrand(0, 59, nz);
1932   s1.fraction = getrand(0, maxfrac[prec], nz);
1933   if (!nz)
1934     s1.sign = 1;
1935   NdbSqlUtil::pack_datetime2(s1, d1, prec);
1936   NdbSqlUtil::unpack_datetime2(s2, d1, prec);
1937   cmpdatetime2(s1, s2);
1938   NdbSqlUtil::pack_datetime2(s2, d2, prec);
1939   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1940 }
1941 
1942 static void
loopdatetime2()1943 loopdatetime2()
1944 {
1945   for (uint prec = 0; prec <= maxprec; prec++) {
1946     for (int i = 0; i < subloops; i++) {
1947       testdatetime2(prec);
1948     }
1949   }
1950 }
1951 
1952 static void
cmptimestamp2(const NdbSqlUtil::Timestamp2 & s1,const NdbSqlUtil::Timestamp2 & s2)1953 cmptimestamp2(const NdbSqlUtil::Timestamp2& s1,
1954               const NdbSqlUtil::Timestamp2& s2)
1955 {
1956   chk1(s1.second == s2.second);
1957   chk1(s1.fraction == s2.fraction);
1958 }
1959 
1960 static void
testtimestamp2(uint prec)1961 testtimestamp2(uint prec)
1962 {
1963   NdbSqlUtil::Timestamp2 s1;
1964   NdbSqlUtil::Timestamp2 s2;
1965   uchar d1[20];
1966   uchar d2[20];
1967   memset(&s1, 0x1f, sizeof(s1));
1968   memset(&s2, 0x1f, sizeof(s2));
1969   memset(d1, 0x1f, sizeof(d1));
1970   memset(d2, 0x1f, sizeof(d2));
1971   bool nz = false;
1972   s1.second = getrand(0, 59, nz);
1973   s1.fraction = getrand(0, maxfrac[prec], nz);
1974   NdbSqlUtil::pack_timestamp2(s1, d1, prec);
1975   NdbSqlUtil::unpack_timestamp2(s2, d1, prec);
1976   cmptimestamp2(s1, s2);
1977   NdbSqlUtil::pack_timestamp2(s2, d2, prec);
1978   chk1(memcmp(d1, d2, sizeof(d1)) == 0);
1979 }
1980 
1981 static void
looptimestamp2()1982 looptimestamp2()
1983 {
1984   for (uint prec = 0; prec <= maxprec; prec++) {
1985     for (int i = 0; i < subloops; i++) {
1986       testtimestamp2(prec);
1987     }
1988   }
1989 }
1990 
1991 static void
testrun()1992 testrun()
1993 {
1994   loopyear();
1995   loopdate();
1996   looptime();
1997   loopdatetime();
1998   looptimestamp();
1999   looptime2();
2000   loopdatetime2();
2001   looptimestamp2();
2002 }
2003 
2004 static int
testmain()2005 testmain()
2006 {
2007   ndb_init();
2008 #ifdef NDB_USE_GET_ENV
2009   struct { const char* env; int* val; } opt[] = {
2010     { "TEST_NDB_SQL_UTIL_SEED", &seed },
2011     { "TEST_NDB_SQL_UTIL_LOOPS", &loops },
2012     { "TEST_NDB_SQL_UTIL_VERBOSE", &verbose },
2013     { 0, 0 }
2014   };
2015   for (int i = 0; opt[i].env != 0; i++) {
2016     const char* p = NdbEnv_GetEnv(opt[i].env, (char*)0, 0);
2017     if (p != 0)
2018       *opt[i].val = atoi(p);
2019   }
2020 #endif
2021 #ifdef VM_TRACE
2022   signal(SIGABRT, SIG_DFL);
2023 #endif
2024   if (seed == 0)
2025     ll0("random seed: loop number");
2026   else {
2027     if (seed < 0)
2028       seed = getpid();
2029     ll0("random seed " << seed);
2030     ndb_srand(seed);
2031   }
2032   for (int i = 0; i < loops; i++) {
2033     ll0("loop:" << i << "/" << loops);
2034     if (seed == 0)
2035       ndb_srand(seed);
2036     testrun(); // aborts on any error
2037   }
2038   ll0("passed");
2039   return 0;
2040 }
2041 
TAPTEST(NdbSqlUtil)2042 TAPTEST(NdbSqlUtil)
2043 {
2044   int ret = testmain();
2045   return (ret == 0);
2046 }
2047 
2048 #endif
2049