1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbSqlUtil.hpp>
26 #include <ndb_version.h>
27 #include <math.h>
28 
29 /*
30  * Data types.  The entries must be in the numerical order.
31  */
32 
33 const NdbSqlUtil::Type
34 NdbSqlUtil::m_typeList[] = {
35   { // 0
36     Type::Undefined,
37     NULL,
38     NULL,
39     NULL
40   },
41   { // 1
42     Type::Tinyint,
43     cmpTinyint,
44     NULL,
45     NULL
46   },
47   { // 2
48     Type::Tinyunsigned,
49     cmpTinyunsigned,
50     NULL,
51     NULL
52   },
53   { // 3
54     Type::Smallint,
55     cmpSmallint,
56     NULL,
57     NULL
58   },
59   { // 4
60     Type::Smallunsigned,
61     cmpSmallunsigned,
62     NULL,
63     NULL
64   },
65   { // 5
66     Type::Mediumint,
67     cmpMediumint,
68     NULL,
69     NULL
70   },
71   { // 6
72     Type::Mediumunsigned,
73     cmpMediumunsigned,
74     NULL,
75     NULL
76   },
77   { // 7
78     Type::Int,
79     cmpInt,
80     NULL,
81     NULL
82   },
83   { // 8
84     Type::Unsigned,
85     cmpUnsigned,
86     NULL,
87     NULL
88   },
89   { // 9
90     Type::Bigint,
91     cmpBigint,
92     NULL,
93     NULL
94   },
95   { // 10
96     Type::Bigunsigned,
97     cmpBigunsigned,
98     NULL,
99     NULL
100   },
101   { // 11
102     Type::Float,
103     cmpFloat,
104     NULL,
105     NULL
106   },
107   { // 12
108     Type::Double,
109     cmpDouble,
110     NULL,
111     NULL
112   },
113   { // 13
114     Type::Olddecimal,
115     cmpOlddecimal,
116     NULL,
117     NULL
118   },
119   { // 14
120     Type::Char,
121     cmpChar,
122     likeChar,
123     NULL
124   },
125   { // 15
126     Type::Varchar,
127     cmpVarchar,
128     likeVarchar,
129     NULL
130   },
131   { // 16
132     Type::Binary,
133     cmpBinary,
134     likeBinary,
135     NULL
136   },
137   { // 17
138     Type::Varbinary,
139     cmpVarbinary,
140     likeVarbinary,
141     NULL
142   },
143   { // 18
144     Type::Datetime,
145     cmpDatetime,
146     NULL,
147     NULL
148   },
149   { // 19
150     Type::Date,
151     cmpDate,
152     NULL,
153     NULL
154   },
155   { // 20
156     Type::Blob,
157     NULL,
158     NULL,
159     NULL
160   },
161   { // 21
162     Type::Text,
163     NULL,
164     NULL,
165     NULL
166   },
167   { // 22
168     Type::Bit,
169     cmpBit,
170     NULL,
171     maskBit
172   },
173   { // 23
174     Type::Longvarchar,
175     cmpLongvarchar,
176     likeLongvarchar,
177     NULL
178   },
179   { // 24
180     Type::Longvarbinary,
181     cmpLongvarbinary,
182     likeLongvarbinary,
183     NULL
184   },
185   { // 25
186     Type::Time,
187     cmpTime,
188     NULL,
189     NULL
190   },
191   { // 26
192     Type::Year,
193     cmpYear,
194     NULL,
195     NULL
196   },
197   { // 27
198     Type::Timestamp,
199     cmpTimestamp,
200     NULL,
201     NULL
202   },
203   { // 28
204     Type::Olddecimalunsigned,
205     cmpOlddecimalunsigned,
206     NULL,
207     NULL
208   },
209   { // 29
210     Type::Decimal,
211     cmpDecimal,
212     NULL,
213     NULL
214   },
215   { // 30
216     Type::Decimalunsigned,
217     cmpDecimalunsigned,
218     NULL,
219     NULL
220   }
221 };
222 
223 const NdbSqlUtil::Type&
getType(Uint32 typeId)224 NdbSqlUtil::getType(Uint32 typeId)
225 {
226   if (typeId < sizeof(m_typeList) / sizeof(m_typeList[0]) &&
227       m_typeList[typeId].m_typeId != Type::Undefined) {
228     return m_typeList[typeId];
229   }
230   return m_typeList[Type::Undefined];
231 }
232 
233 /*
234  * Comparison functions.
235  */
236 
237 int
cmpTinyint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)238 NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
239 {
240   assert(info == 0 && n1 == 1 && n2 == 1);
241   Int8 v1, v2;
242   memcpy(&v1, p1, 1);
243   memcpy(&v2, p2, 1);
244   int w1 = (int)v1;
245   int w2 = (int)v2;
246   return w1 - w2;
247 }
248 
249 int
cmpTinyunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)250 NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
251 {
252   assert(info == 0 && n1 == 1 && n2 == 1);
253   Uint8 v1, v2;
254   memcpy(&v1, p1, 1);
255   memcpy(&v2, p2, 1);
256   int w1 = (int)v1;
257   int w2 = (int)v2;
258   return w1 - w2;
259 }
260 
261 int
cmpSmallint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)262 NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
263 {
264   assert(info == 0 && n1 == 2 && n2 == 2);
265   Int16 v1, v2;
266   memcpy(&v1, p1, 2);
267   memcpy(&v2, p2, 2);
268   int w1 = (int)v1;
269   int w2 = (int)v2;
270   return w1 - w2;
271 }
272 
273 int
cmpSmallunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)274 NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
275 {
276   assert(info == 0 && n1 == 2 && n2 == 2);
277   Uint16 v1, v2;
278   memcpy(&v1, p1, 2);
279   memcpy(&v2, p2, 2);
280   int w1 = (int)v1;
281   int w2 = (int)v2;
282   return w1 - w2;
283 }
284 
285 int
cmpMediumint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)286 NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
287 {
288   assert(info == 0 && n1 == 3 && n2 == 3);
289   uchar b1[4];
290   uchar b2[4];
291   memcpy(b1, p1, 3);
292   b1[3] = 0;
293   memcpy(b2, p2, 3);
294   b2[3] = 0;
295   int w1 = (int)sint3korr(b1);
296   int w2 = (int)sint3korr(b2);
297   return w1 - w2;
298 }
299 
300 int
cmpMediumunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)301 NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
302 {
303   assert(info == 0 && n1 == 3 && n2 == 3);
304   uchar b1[4];
305   uchar b2[4];
306   memcpy(b1, p1, 3);
307   b1[3] = 0;
308   memcpy(b2, p2, 3);
309   b2[3] = 0;
310   int w1 = (int)uint3korr(b1);
311   int w2 = (int)uint3korr(b2);
312   return w1 - w2;
313 }
314 
315 int
cmpInt(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)316 NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
317 {
318   assert(info == 0 && n1 == 4 && n2 == 4);
319   Int32 v1, v2;
320   memcpy(&v1, p1, 4);
321   memcpy(&v2, p2, 4);
322   if (v1 < v2)
323     return -1;
324   if (v1 > v2)
325     return +1;
326   return 0;
327 }
328 
329 int
cmpUnsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)330 NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
331 {
332   assert(info == 0 && n1 == 4 && n2 == 4);
333   Uint32 v1, v2;
334   memcpy(&v1, p1, 4);
335   memcpy(&v2, p2, 4);
336   if (v1 < v2)
337     return -1;
338   if (v1 > v2)
339     return +1;
340   return 0;
341 }
342 
343 int
cmpBigint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)344 NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
345 {
346   assert(info == 0 && n1 == 8 && n2 == 8);
347   Int64 v1, v2;
348   memcpy(&v1, p1, 8);
349   memcpy(&v2, p2, 8);
350   if (v1 < v2)
351     return -1;
352   if (v1 > v2)
353     return +1;
354   return 0;
355 }
356 
357 int
cmpBigunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)358 NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
359 {
360   assert(info == 0 && n1 == 8 && n2 == 8);
361   Uint64 v1, v2;
362   memcpy(&v1, p1, 8);
363   memcpy(&v2, p2, 8);
364   if (v1 < v2)
365     return -1;
366   if (v1 > v2)
367     return +1;
368   return 0;
369 }
370 
371 int
cmpFloat(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)372 NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
373 {
374   assert(info == 0 && n1 == 4 && n2 == 4);
375   float v1, v2;
376   memcpy(&v1, p1, 4);
377   memcpy(&v2, p2, 4);
378   require(!isnan(v1) && !isnan(v2));
379   if (v1 < v2)
380     return -1;
381   if (v1 > v2)
382     return +1;
383   return 0;
384 }
385 
386 int
cmpDouble(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)387 NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
388 {
389   assert(info == 0 && n1 == 8 && n2 == 8);
390   double v1, v2;
391   memcpy(&v1, p1, 8);
392   memcpy(&v2, p2, 8);
393   require(!isnan(v1) && !isnan(v2));
394   if (v1 < v2)
395     return -1;
396   if (v1 > v2)
397     return +1;
398   return 0;
399 }
400 
401 int
cmpOlddecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)402 NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
403 {
404   assert(info == 0 && n1 == n2);
405   const uchar* v1 = (const uchar*)p1;
406   const uchar* v2 = (const uchar*)p2;
407   int sgn = +1;
408   unsigned i = 0;
409   while (i < n1) {
410     int c1 = v1[i];
411     int c2 = v2[i];
412     if (c1 == c2) {
413       if (c1 == '-')
414         sgn = -1;
415     } else if (c1 == '-') {
416       return -1;
417     } else if (c2 == '-') {
418       return +1;
419     } else if (c1 < c2) {
420       return -1 * sgn;
421     } else {
422       return +1 * sgn;
423     }
424     i++;
425   }
426   return 0;
427 }
428 
429 int
cmpOlddecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)430 NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
431 {
432   return cmpOlddecimal(info, p1, n1, p2, n2);
433 }
434 
435 int
cmpDecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)436 NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
437 {
438   return cmpBinary(info, p1, n1, p2, n2);
439 }
440 
441 int
cmpDecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)442 NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
443 {
444   return cmpBinary(info, p1, n1, p2, n2);
445 }
446 
447 int
cmpChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)448 NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
449 {
450   // allow different lengths
451   assert(info != 0);
452   const uchar* v1 = (const uchar*)p1;
453   const uchar* v2 = (const uchar*)p2;
454   CHARSET_INFO* cs = (CHARSET_INFO*)info;
455   // compare with space padding
456   int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
457   return k;
458 }
459 
460 int
cmpVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)461 NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
462 {
463   assert(info != 0);
464   const uint lb = 1;
465   const uchar* v1 = (const uchar*)p1;
466   const uchar* v2 = (const uchar*)p2;
467   uint m1 = v1[0];
468   uint m2 = v2[0];
469   require(lb + m1 <= n1 && lb + m2 <= n2);
470   CHARSET_INFO* cs = (CHARSET_INFO*)info;
471   // compare with space padding
472   int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
473   return k;
474 }
475 
476 int
cmpBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)477 NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
478 {
479   // allow different lengths
480   assert(info == 0);
481   const uchar* v1 = (const uchar*)p1;
482   const uchar* v2 = (const uchar*)p2;
483   int k = 0;
484   if (n1 < n2) {
485     k = memcmp(v1, v2, n1);
486     if (k == 0)
487       k = -1;
488   } else if (n1 > n2) {
489     k = memcmp(v1, v2, n2);
490     if (k == 0)
491       k = +1;
492   } else {
493     k = memcmp(v1, v2, n1);
494   }
495   return k;
496 }
497 
498 int
cmpVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)499 NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
500 {
501   assert(info == 0);
502   const uint lb = 1;
503   const uchar* v1 = (const uchar*)p1;
504   const uchar* v2 = (const uchar*)p2;
505   uint m1 = v1[0];
506   uint m2 = v2[0];
507   require(lb + m1 <= n1 && lb + m2 <= n2);
508   int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
509   return k;
510 }
511 
512 int
cmpDatetime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)513 NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
514 {
515   assert(info == 0 && n1 == 8 && n2 == 8);
516   Int64 v1, v2;
517   memcpy(&v1, p1, sizeof(Int64));
518   memcpy(&v2, p2, sizeof(Int64));
519   if (v1 < v2)
520     return -1;
521   if (v1 > v2)
522     return +1;
523   return 0;
524 }
525 
526 int
cmpDate(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)527 NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
528 {
529   assert(info == 0 && n1 == 3 && n2 == 3);
530   uchar b1[4];
531   uchar b2[4];
532   memcpy(b1, p1, 3);
533   b1[3] = 0;
534   memcpy(b2, p2, 3);
535   b2[3] = 0;
536   // from Field_newdate::val_int
537   int w1 = (int)uint3korr(b1);
538   int w2 = (int)uint3korr(b2);
539   return w1 - w2;
540 }
541 
542 // not supported
543 int
cmpBlob(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)544 NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
545 {
546   assert(false);
547   return 0;
548 }
549 
550 // not supported
551 int
cmpText(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)552 NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
553 {
554   assert(false);
555   return 0;
556 }
557 
558 int
cmpBit(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)559 NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
560 {
561   /* Bitfields are stored as 32-bit words
562    * This means that a byte-by-byte comparison will not work on all platforms
563    * We do a word-wise comparison of the significant bytes.
564    * It is assumed that insignificant bits (but not bytes) are zeroed in the
565    * passed values.
566    */
567   const Uint32 bytes= MIN(n1, n2);
568   Uint32 words= (bytes + 3) >> 2;
569 
570   /* Don't expect either value to be length zero */
571   assert(words);
572 
573   /* Check ptr alignment */
574   if (unlikely(((((UintPtr)p1) & 3) != 0) ||
575                ((((UintPtr)p2) & 3) != 0)))
576   {
577     Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ];
578     Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ];
579     memcpy(copyP1, p1, words << 2);
580     memcpy(copyP2, p2, words << 2);
581 
582     return cmpBit(info, copyP1, bytes, copyP2, bytes);
583   }
584 
585   const Uint32* wp1= (const Uint32*) p1;
586   const Uint32* wp2= (const Uint32*) p2;
587   while (--words)
588   {
589     if (*wp1 < *wp2)
590       return -1;
591     if (*(wp1++) > *(wp2++))
592       return 1;
593   }
594 
595   /* For the last word, we mask out any insignificant bytes */
596   const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
597   const Uint32 mask= sigBytes?
598     (1 << (sigBytes *8)) -1 :
599     ~0;
600   const Uint32 lastWord1= *wp1 & mask;
601   const Uint32 lastWord2= *wp2 & mask;
602 
603   if (lastWord1 < lastWord2)
604     return -1;
605   if (lastWord1 > lastWord2)
606     return 1;
607 
608   return 0;
609 }
610 
611 
612 int
cmpTime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)613 NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
614 {
615   assert(info == 0 && n1 == 3 && n2 == 3);
616   uchar b1[4];
617   uchar b2[4];
618   memcpy(b1, p1, 3);
619   b1[3] = 0;
620   memcpy(b2, p2, 3);
621   b2[3] = 0;
622   // from Field_time::val_int
623   int j1 = (int)sint3korr(b1);
624   int j2 = (int)sint3korr(b2);
625   if (j1 < j2)
626     return -1;
627   if (j1 > j2)
628     return +1;
629   return 0;
630 }
631 
632 // not yet
633 
634 int
cmpLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)635 NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
636 {
637   assert(info != 0);
638   const uint lb = 2;
639   const uchar* v1 = (const uchar*)p1;
640   const uchar* v2 = (const uchar*)p2;
641   uint m1 = v1[0] | (v1[1] << 8);
642   uint m2 = v2[0] | (v2[1] << 8);
643   require(lb + m1 <= n1 && lb + m2 <= n2);
644   CHARSET_INFO* cs = (CHARSET_INFO*)info;
645   // compare with space padding
646   int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
647   return k;
648 }
649 
650 int
cmpLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)651 NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
652 {
653   assert(info == 0);
654   const uint lb = 2;
655   const uchar* v1 = (const uchar*)p1;
656   const uchar* v2 = (const uchar*)p2;
657   uint m1 = v1[0] | (v1[1] << 8);
658   uint m2 = v2[0] | (v2[1] << 8);
659   require(lb + m1 <= n1 && lb + m2 <= n2);
660   int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
661   return k;
662 }
663 
664 int
cmpYear(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)665 NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
666 {
667   assert(info == 0 && n1 == 1 && n2 == 1);
668   Uint8 v1, v2;
669   memcpy(&v1, p1, 1);
670   memcpy(&v2, p2, 1);
671   int w1 = (int)v1;
672   int w2 = (int)v2;
673   return w1 - w2;
674 }
675 
676 int
cmpTimestamp(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)677 NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
678 {
679   assert(info == 0 && n1 == 4 && n2 == 4);
680   Uint32 v1, v2;
681   memcpy(&v1, p1, 4);
682   memcpy(&v2, p2, 4);
683   if (v1 < v2)
684     return -1;
685   if (v1 > v2)
686     return +1;
687   return 0;
688 }
689 
690 // like
691 
692 static const int ndb_wild_prefix = '\\';
693 static const int ndb_wild_one = '_';
694 static const int ndb_wild_many = '%';
695 
696 int
likeChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)697 NdbSqlUtil::likeChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
698 {
699   const char* v1 = (const char*)p1;
700   const char* v2 = (const char*)p2;
701   CHARSET_INFO* cs = (CHARSET_INFO*)(info);
702   // strip end spaces to match (incorrect) MySQL behaviour
703   n1 = (*cs->cset->lengthsp)(cs, v1, n1);
704   int k = (*cs->coll->wildcmp)(cs, v1, v1 + n1, v2, v2 + n2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
705   return k == 0 ? 0 : +1;
706 }
707 
708 int
likeBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)709 NdbSqlUtil::likeBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
710 {
711   assert(info == 0);
712   return likeChar(&my_charset_bin, p1, n1, p2, n2);
713 }
714 
715 int
likeVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)716 NdbSqlUtil::likeVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
717 {
718   const unsigned lb = 1;
719   if (n1 >= lb) {
720     const uchar* v1 = (const uchar*)p1;
721     const uchar* v2 = (const uchar*)p2;
722     unsigned m1 = *v1;
723     unsigned m2 = n2;
724     if (lb + m1 <= n1) {
725       const char* w1 = (const char*)v1 + lb;
726       const char* w2 = (const char*)v2;
727       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
728       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
729       return k == 0 ? 0 : +1;
730     }
731   }
732   return -1;
733 }
734 
735 int
likeVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)736 NdbSqlUtil::likeVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
737 {
738   assert(info == 0);
739   return likeVarchar(&my_charset_bin, p1, n1, p2, n2);
740 }
741 
742 int
likeLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)743 NdbSqlUtil::likeLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
744 {
745   const unsigned lb = 2;
746   if (n1 >= lb) {
747     const uchar* v1 = (const uchar*)p1;
748     const uchar* v2 = (const uchar*)p2;
749     unsigned m1 = uint2korr(v1);
750     unsigned m2 = n2;
751     if (lb + m1 <= n1) {
752       const char* w1 = (const char*)v1 + lb;
753       const char* w2 = (const char*)v2;
754       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
755       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
756       return k == 0 ? 0 : +1;
757     }
758   }
759   return -1;
760 }
761 
762 int
likeLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)763 NdbSqlUtil::likeLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
764 {
765   assert(info == 0);
766   return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
767 }
768 
769 
770 // mask Functions
771 
772 int
maskBit(const void * data,unsigned dataLen,const void * mask,unsigned maskLen,bool cmpZero)773 NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero)
774 {
775   /* Bitfields are stored in word oriented form, so we must compare them in that
776    * style as well
777    * It is assumed that insignificant bits (but not bytes) in the passed values
778    * are zeroed
779    */
780   const Uint32 bytes = MIN(dataLen, maskLen);
781   Uint32 words = (bytes + 3) >> 2;
782 
783   /* Don't expect either value to be length zero */
784   assert(words);
785 
786   /* Check ptr alignment */
787   if (unlikely(((((UintPtr)data) & 3) != 0) ||
788                ((((UintPtr)mask) & 3) != 0)))
789   {
790     Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ];
791     Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ];
792     memcpy(copydata, data, words << 2);
793     memcpy(copymask, mask, words << 2);
794 
795     return maskBit(data, bytes, mask, bytes, cmpZero);
796   }
797 
798   const Uint32* wdata= (const Uint32*) data;
799   const Uint32* wmask= (const Uint32*) mask;
800 
801   if (cmpZero)
802   {
803     while (--words)
804     {
805       if ((*(wdata++) & *(wmask++)) != 0)
806         return 1;
807     }
808 
809     /* For the last word, we mask out any insignificant bytes */
810     const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
811     const Uint32 comparisonMask= sigBytes?
812       (1 << (sigBytes *8)) -1 :
813       ~0;
814     const Uint32 lastDataWord= *wdata & comparisonMask;
815     const Uint32 lastMaskWord= *wmask & comparisonMask;
816 
817     if ((lastDataWord & lastMaskWord) != 0)
818       return 1;
819 
820     return 0;
821   }
822   else
823   {
824     while (--words)
825     {
826       if ((*(wdata++) & *wmask) != *wmask)
827         return 1;
828 
829       wmask++;
830     }
831 
832     /* For the last word, we mask out any insignificant bytes */
833     const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
834     const Uint32 comparisonMask= sigBytes?
835       (1 << (sigBytes *8)) -1 :
836       ~0;
837     const Uint32 lastDataWord= *wdata & comparisonMask;
838     const Uint32 lastMaskWord= *wmask & comparisonMask;
839 
840     if ((lastDataWord & lastMaskWord) != lastMaskWord)
841       return 1;
842 
843     return 0;
844   }
845 }
846 
847 
848 // check charset
849 
850 uint
check_column_for_pk(Uint32 typeId,const void * info)851 NdbSqlUtil::check_column_for_pk(Uint32 typeId, const void* info)
852 {
853   const Type& type = getType(typeId);
854   switch (type.m_typeId) {
855   case Type::Char:
856   case Type::Varchar:
857   case Type::Longvarchar:
858     {
859       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
860       if(cs != 0 &&
861          cs->cset != 0 &&
862          cs->coll != 0 &&
863          cs->coll->strnxfrm != 0 &&
864          cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
865         return 0;
866       else
867         return 743;
868     }
869     break;
870   case Type::Undefined:
871   case Type::Blob:
872   case Type::Text:
873   case Type::Bit:
874     break;
875   default:
876     return 0;
877   }
878   return 906;
879 }
880 
881 uint
check_column_for_hash_index(Uint32 typeId,const void * info)882 NdbSqlUtil::check_column_for_hash_index(Uint32 typeId, const void* info)
883 {
884   return check_column_for_pk(typeId, info);
885 }
886 
887 uint
check_column_for_ordered_index(Uint32 typeId,const void * info)888 NdbSqlUtil::check_column_for_ordered_index(Uint32 typeId, const void* info)
889 {
890   const Type& type = getType(typeId);
891   if (type.m_cmp == NULL)
892     return false;
893   switch (type.m_typeId) {
894   case Type::Char:
895   case Type::Varchar:
896   case Type::Longvarchar:
897     {
898       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
899       if (cs != 0 &&
900           cs->cset != 0 &&
901           cs->coll != 0 &&
902           cs->coll->strnxfrm != 0 &&
903           cs->coll->strnncollsp != 0 &&
904           cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
905         return 0;
906       else
907         return 743;
908     }
909     break;
910   case Type::Undefined:
911   case Type::Blob:
912   case Type::Text:
913   case Type::Bit:       // can be fixed
914     break;
915   default:
916     return 0;
917   }
918   return 906;
919 }
920 
921 // utilities
922 
923 bool
get_var_length(Uint32 typeId,const void * p,unsigned attrlen,Uint32 & lb,Uint32 & len)924 NdbSqlUtil::get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len)
925 {
926   const unsigned char* const src = (const unsigned char*)p;
927   switch (typeId) {
928   case NdbSqlUtil::Type::Varchar:
929   case NdbSqlUtil::Type::Varbinary:
930     lb = 1;
931     if (attrlen >= lb) {
932       len = src[0];
933       if (attrlen >= lb + len)
934         return true;
935     }
936     break;
937   case NdbSqlUtil::Type::Longvarchar:
938   case NdbSqlUtil::Type::Longvarbinary:
939     lb = 2;
940     if (attrlen >= lb) {
941       len = src[0] + (src[1] << 8);
942       if (attrlen >= lb + len)
943         return true;
944     }
945     break;
946   default:
947     lb = 0;
948     len = attrlen;
949     return true;
950     break;
951   }
952   return false;
953 }
954 
955 
956 size_t
ndb_strnxfrm(struct charset_info_st * cs,uchar * dst,size_t dstlen,const uchar * src,size_t srclen)957 NdbSqlUtil::ndb_strnxfrm(struct charset_info_st * cs,
958                          uchar *dst, size_t dstlen,
959                          const uchar *src, size_t srclen)
960 {
961 #if NDB_MYSQL_VERSION_D < NDB_MAKE_VERSION(5,6,0)
962   return (*cs->coll->strnxfrm)(cs, dst, dstlen, src, srclen);
963 #else
964   /*
965     strnxfrm has got two new parameters in 5.6, we are using the
966     defaults for those and can thus easily calculate them from
967     existing params
968   */
969   return  (*cs->coll->strnxfrm)(cs, dst, dstlen, dstlen,
970                                 src, srclen, MY_STRXFRM_PAD_WITH_SPACE);
971 #endif
972 }
973 
974 // workaround
975 
976 int
strnxfrm_bug7284(CHARSET_INFO * cs,unsigned char * dst,unsigned dstLen,const unsigned char * src,unsigned srcLen)977 NdbSqlUtil::strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen)
978 {
979   unsigned char nsp[20]; // native space char
980   unsigned char xsp[20]; // strxfrm-ed space char
981 #ifdef VM_TRACE
982   memset(nsp, 0x1f, sizeof(nsp));
983   memset(xsp, 0x1f, sizeof(xsp));
984 #endif
985   // convert from unicode codepoint for space
986   int n1 = (*cs->cset->wc_mb)(cs, (my_wc_t)0x20, nsp, nsp + sizeof(nsp));
987   if (n1 <= 0)
988     return -1;
989   // strxfrm to binary
990   int n2 = ndb_strnxfrm(cs, xsp, sizeof(xsp), nsp, n1);
991   if (n2 <= 0)
992     return -1;
993   // XXX bug workaround - strnxfrm may not write full string
994   memset(dst, 0x0, dstLen);
995   // strxfrm argument string - returns no error indication
996   int n3 = ndb_strnxfrm(cs, dst, dstLen, src, srcLen);
997   // pad with strxfrm-ed space chars
998   int n4 = n3;
999   while (n4 < (int)dstLen) {
1000     dst[n4] = xsp[(n4 - n3) % n2];
1001     n4++;
1002   }
1003   // no check for partial last
1004   return dstLen;
1005 }
1006 
1007 #if defined(WORDS_BIGENDIAN) || defined (VM_TRACE)
1008 
1009 static
determineParams(Uint32 typeId,Uint32 typeLog2Size,Uint32 arrayType,Uint32 arraySize,Uint32 dataByteSize,Uint32 & convSize,Uint32 & convLen)1010 void determineParams(Uint32 typeId,
1011                      Uint32 typeLog2Size,
1012                      Uint32 arrayType,
1013                      Uint32 arraySize,
1014                      Uint32 dataByteSize,
1015                      Uint32& convSize,
1016                      Uint32& convLen)
1017 {
1018   /* Some types need 'normal behaviour' over-ridden in
1019    * terms of endian-ness conversions
1020    */
1021   convSize = 0;
1022   convLen = 0;
1023   switch(typeId)
1024   {
1025   case NdbSqlUtil::Type::Datetime:
1026   {
1027     // Datetime is stored as 8x8, should be twiddled as 64 bit
1028     assert(typeLog2Size == 3);
1029     assert(arraySize == 8);
1030     assert(dataByteSize == 8);
1031     convSize = 64;
1032     convLen = 1;
1033     break;
1034   }
1035   case NdbSqlUtil::Type::Timestamp:
1036   {
1037     // Timestamp is stored as 4x8, should be twiddled as 32 bit
1038     assert(typeLog2Size == 3);
1039     assert(arraySize == 4);
1040     assert(dataByteSize == 4);
1041     convSize = 32;
1042     convLen = 1;
1043     break;
1044   }
1045   case NdbSqlUtil::Type::Bit:
1046   {
1047     // Bit is stored as bits, should be twiddled as 32 bit
1048     assert(typeLog2Size == 0);
1049     convSize = 32;
1050     convLen = (arraySize + 31)/32;
1051     break;
1052   }
1053   case NdbSqlUtil::Type::Blob:
1054   case NdbSqlUtil::Type::Text:
1055   {
1056     if (arrayType == NDB_ARRAYTYPE_FIXED)
1057     {
1058       // Length of fixed size blob which is stored in first 64 bit's
1059       // has to be twiddled, the remaining byte stream left as is
1060       assert(typeLog2Size == 3);
1061       assert(arraySize > 8);
1062       assert(dataByteSize > 8);
1063       convSize = 64;
1064       convLen = 1;
1065       break;
1066     }
1067     // Fall through for Blob v2
1068   }
1069   default:
1070     /* Default determined by meta-info */
1071     convSize = 1 << typeLog2Size;
1072     convLen = arraySize;
1073     break;
1074   }
1075 
1076   const Uint32 unitBytes = (convSize >> 3);
1077 
1078   if (dataByteSize < (unitBytes * convLen))
1079   {
1080     /* Actual data is shorter than expected, could
1081      * be VAR type or bad FIXED data (which some other
1082      * code should detect + handle)
1083      * We reduce convLen to avoid trampling
1084      */
1085     assert((dataByteSize % unitBytes) == 0);
1086     convLen = dataByteSize / unitBytes;
1087   }
1088 
1089   assert(convSize);
1090   assert(convLen);
1091   assert(dataByteSize >= (unitBytes * convLen));
1092 }
1093 
1094 static
doConvert(Uint32 convSize,Uint32 convLen,uchar * data)1095 void doConvert(Uint32 convSize,
1096                Uint32 convLen,
1097                uchar* data)
1098 {
1099   switch(convSize)
1100   {
1101   case 8:
1102     /* Nothing to swap */
1103     break;
1104   case 16:
1105   {
1106     Uint16* ptr = (Uint16*)data;
1107     for (Uint32 i = 0; i < convLen; i++){
1108       Uint16 val =
1109         ((*ptr & 0xFF00) >> 8) |
1110         ((*ptr & 0x00FF) << 8);
1111       *ptr = val;
1112       ptr++;
1113     }
1114     break;
1115   }
1116   case 32:
1117   {
1118     Uint32* ptr = (Uint32*)data;
1119     for (Uint32 i = 0; i < convLen; i++){
1120       Uint32 val =
1121         ((*ptr & 0xFF000000) >> 24) |
1122         ((*ptr & 0x00FF0000) >> 8)  |
1123         ((*ptr & 0x0000FF00) << 8)  |
1124         ((*ptr & 0x000000FF) << 24);
1125       *ptr = val;
1126       ptr++;
1127     }
1128     break;
1129   }
1130   case 64:
1131   {
1132     Uint64* ptr = (Uint64*)data;
1133     for (Uint32 i = 0; i < convLen; i++){
1134       Uint64 val =
1135         ((*ptr & (Uint64)0xFF00000000000000LL) >> 56) |
1136         ((*ptr & (Uint64)0x00FF000000000000LL) >> 40) |
1137         ((*ptr & (Uint64)0x0000FF0000000000LL) >> 24) |
1138         ((*ptr & (Uint64)0x000000FF00000000LL) >> 8)  |
1139         ((*ptr & (Uint64)0x00000000FF000000LL) << 8)  |
1140         ((*ptr & (Uint64)0x0000000000FF0000LL) << 24) |
1141         ((*ptr & (Uint64)0x000000000000FF00LL) << 40) |
1142         ((*ptr & (Uint64)0x00000000000000FFLL) << 56);
1143       *ptr = val;
1144       ptr++;
1145     }
1146     break;
1147   }
1148   default:
1149     abort();
1150     break;
1151   }
1152 }
1153 #endif
1154 
1155 /**
1156  * Convert attribute byte order if necessary
1157  */
1158 void
convertByteOrder(Uint32 typeId,Uint32 typeLog2Size,Uint32 arrayType,Uint32 arraySize,uchar * data,Uint32 dataByteSize)1159 NdbSqlUtil::convertByteOrder(Uint32 typeId,
1160                              Uint32 typeLog2Size,
1161                              Uint32 arrayType,
1162                              Uint32 arraySize,
1163                              uchar* data,
1164                              Uint32 dataByteSize)
1165 {
1166 #if defined(WORDS_BIGENDIAN) || defined (VM_TRACE)
1167   Uint32 convSize;
1168   Uint32 convLen;
1169   determineParams(typeId,
1170                   typeLog2Size,
1171                   arrayType,
1172                   arraySize,
1173                   dataByteSize,
1174                   convSize,
1175                   convLen);
1176 
1177   size_t mask = (((size_t) convSize) >> 3) -1; // Bottom 0,1,2,3 bits set
1178   bool aligned = (((size_t) data) & mask) == 0;
1179   uchar* dataPtr = data;
1180   const Uint32 bufSize = (MAX_TUPLE_SIZE_IN_WORDS + 1)/2;
1181   Uint64 alignedBuf[bufSize];
1182 
1183   if (!aligned)
1184   {
1185     assert(dataByteSize <= 4 * MAX_TUPLE_SIZE_IN_WORDS);
1186     memcpy(alignedBuf, data, dataByteSize);
1187     dataPtr = (uchar*) alignedBuf;
1188   }
1189 
1190   /* Now have convSize and convLen, do the conversion */
1191   doConvert(convSize,
1192             convLen,
1193             dataPtr);
1194 
1195 #ifdef VM_TRACE
1196 #ifndef WORDS_BIGENDIAN
1197   // VM trace on little-endian performs double-convert
1198   doConvert(convSize,
1199             convLen,
1200             dataPtr);
1201 #endif
1202 #endif
1203 
1204   if (!aligned)
1205   {
1206     memcpy(data, alignedBuf, dataByteSize);
1207   }
1208 #endif
1209 }
1210 
1211