1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <NdbSqlUtil.hpp>
18 #include <NdbOut.hpp>
19 #include <my_sys.h>
20 
21 /*
22  * Data types.  The entries must be in the numerical order.
23  */
24 
25 const NdbSqlUtil::Type
26 NdbSqlUtil::m_typeList[] = {
27   { // 0
28     Type::Undefined,
29     NULL,
30     NULL
31   },
32   { // 1
33     Type::Tinyint,
34     cmpTinyint,
35     NULL
36   },
37   { // 2
38     Type::Tinyunsigned,
39     cmpTinyunsigned,
40     NULL
41   },
42   { // 3
43     Type::Smallint,
44     cmpSmallint,
45     NULL
46   },
47   { // 4
48     Type::Smallunsigned,
49     cmpSmallunsigned,
50     NULL
51   },
52   { // 5
53     Type::Mediumint,
54     cmpMediumint,
55     NULL
56   },
57   { // 6
58     Type::Mediumunsigned,
59     cmpMediumunsigned,
60     NULL
61   },
62   { // 7
63     Type::Int,
64     cmpInt,
65     NULL
66   },
67   { // 8
68     Type::Unsigned,
69     cmpUnsigned,
70     NULL
71   },
72   { // 9
73     Type::Bigint,
74     cmpBigint,
75     NULL
76   },
77   { // 10
78     Type::Bigunsigned,
79     cmpBigunsigned,
80     NULL
81   },
82   { // 11
83     Type::Float,
84     cmpFloat,
85     NULL
86   },
87   { // 12
88     Type::Double,
89     cmpDouble,
90     NULL
91   },
92   { // 13
93     Type::Olddecimal,
94     cmpOlddecimal,
95     NULL
96   },
97   { // 14
98     Type::Char,
99     cmpChar,
100     likeChar
101   },
102   { // 15
103     Type::Varchar,
104     cmpVarchar,
105     likeVarchar
106   },
107   { // 16
108     Type::Binary,
109     cmpBinary,
110     likeBinary
111   },
112   { // 17
113     Type::Varbinary,
114     cmpVarbinary,
115     likeVarbinary
116   },
117   { // 18
118     Type::Datetime,
119     cmpDatetime,
120     NULL
121   },
122   { // 19
123     Type::Date,
124     cmpDate,
125     NULL
126   },
127   { // 20
128     Type::Blob,
129     NULL,
130     NULL
131   },
132   { // 21
133     Type::Text,
134     NULL,
135     NULL
136   },
137   { // 22
138     Type::Bit,
139     cmpBit,
140     NULL
141   },
142   { // 23
143     Type::Longvarchar,
144     cmpLongvarchar,
145     likeLongvarchar
146   },
147   { // 24
148     Type::Longvarbinary,
149     cmpLongvarbinary,
150     likeLongvarbinary
151   },
152   { // 25
153     Type::Time,
154     cmpTime,
155     NULL
156   },
157   { // 26
158     Type::Year,
159     cmpYear,
160     NULL
161   },
162   { // 27
163     Type::Timestamp,
164     cmpTimestamp,
165     NULL
166   },
167   { // 28
168     Type::Olddecimalunsigned,
169     cmpOlddecimalunsigned,
170     NULL
171   },
172   { // 29
173     Type::Decimal,
174     cmpDecimal,
175     NULL
176   },
177   { // 30
178     Type::Decimalunsigned,
179     cmpDecimalunsigned,
180     NULL
181   }
182 };
183 
184 const NdbSqlUtil::Type&
getType(Uint32 typeId)185 NdbSqlUtil::getType(Uint32 typeId)
186 {
187   if (typeId < sizeof(m_typeList) / sizeof(m_typeList[0]) &&
188       m_typeList[typeId].m_typeId != Type::Undefined) {
189     return m_typeList[typeId];
190   }
191   return m_typeList[Type::Undefined];
192 }
193 
194 const NdbSqlUtil::Type&
getTypeBinary(Uint32 typeId)195 NdbSqlUtil::getTypeBinary(Uint32 typeId)
196 {
197   switch (typeId) {
198   case Type::Char:
199   case Type::Varchar:
200   case Type::Binary:
201   case Type::Varbinary:
202   case Type::Longvarchar:
203   case Type::Longvarbinary:
204     typeId = Type::Binary;
205     break;
206   case Type::Text:
207     typeId = Type::Blob;
208     break;
209   default:
210     break;
211   }
212   return getType(typeId);
213 }
214 
215 /*
216  * Comparison functions.
217  */
218 
219 int
cmpTinyint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)220 NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
221 {
222   if (n2 >= sizeof(Int8)) {
223     Int8 v1, v2;
224     memcpy(&v1, p1, sizeof(Int8));
225     memcpy(&v2, p2, sizeof(Int8));
226     if (v1 < v2)
227       return -1;
228     if (v1 > v2)
229       return +1;
230     return 0;
231   }
232   assert(! full);
233   return CmpUnknown;
234 }
235 
236 int
cmpTinyunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)237 NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
238 {
239   if (n2 >= sizeof(Uint8)) {
240     Uint8 v1, v2;
241     memcpy(&v1, p1, sizeof(Uint8));
242     memcpy(&v2, p2, sizeof(Uint8));
243     if (v1 < v2)
244       return -1;
245     if (v1 > v2)
246       return +1;
247     return 0;
248   }
249   assert(! full);
250   return CmpUnknown;
251 }
252 
253 int
cmpSmallint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)254 NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
255 {
256   if (n2 >= sizeof(Int16)) {
257     Int16 v1, v2;
258     memcpy(&v1, p1, sizeof(Int16));
259     memcpy(&v2, p2, sizeof(Int16));
260     if (v1 < v2)
261       return -1;
262     if (v1 > v2)
263       return +1;
264     return 0;
265   }
266   assert(! full);
267   return CmpUnknown;
268 }
269 
270 int
cmpSmallunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)271 NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
272 {
273   if (n2 >= sizeof(Uint16)) {
274     Uint16 v1, v2;
275     memcpy(&v1, p1, sizeof(Uint16));
276     memcpy(&v2, p2, sizeof(Uint16));
277     if (v1 < v2)
278       return -1;
279     if (v1 > v2)
280       return +1;
281     return 0;
282   }
283   assert(! full);
284   return CmpUnknown;
285 }
286 
287 int
cmpMediumint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)288 NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
289 {
290   if (n2 >= 3) {
291     Int32 v1, v2;
292     v1 = sint3korr((const uchar*)p1);
293     v2 = sint3korr((const uchar*)p2);
294     if (v1 < v2)
295       return -1;
296     if (v1 > v2)
297       return +1;
298     return 0;
299   }
300   assert(! full);
301   return CmpUnknown;
302 }
303 
304 int
cmpMediumunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)305 NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
306 {
307   if (n2 >= 3) {
308     Uint32 v1, v2;
309     v1 = uint3korr((const uchar*)p1);
310     v2 = uint3korr((const uchar*)p2);
311     if (v1 < v2)
312       return -1;
313     if (v1 > v2)
314       return +1;
315     return 0;
316   }
317   assert(! full);
318   return CmpUnknown;
319 }
320 
321 int
cmpInt(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)322 NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
323 {
324   if (n2 >= sizeof(Int32)) {
325     Int32 v1, v2;
326     memcpy(&v1, p1, sizeof(Int32));
327     memcpy(&v2, p2, sizeof(Int32));
328     if (v1 < v2)
329       return -1;
330     if (v1 > v2)
331       return +1;
332     return 0;
333   }
334   assert(! full);
335   return CmpUnknown;
336 }
337 
338 int
cmpUnsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)339 NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
340 {
341   if (n2 >= sizeof(Uint32)) {
342     Uint32 v1, v2;
343     memcpy(&v1, p1, sizeof(Uint32));
344     memcpy(&v2, p2, sizeof(Uint32));
345     if (v1 < v2)
346       return -1;
347     if (v1 > v2)
348       return +1;
349     return 0;
350   }
351   assert(! full);
352   return CmpUnknown;
353 }
354 
355 int
cmpBigint(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)356 NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
357 {
358   if (n2 >= sizeof(Int64)) {
359     Int64 v1, v2;
360     memcpy(&v1, p1, sizeof(Int64));
361     memcpy(&v2, p2, sizeof(Int64));
362     if (v1 < v2)
363       return -1;
364     if (v1 > v2)
365       return +1;
366     return 0;
367   }
368   assert(! full);
369   return CmpUnknown;
370 }
371 
372 int
cmpBigunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)373 NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
374 {
375   if (n2 >= sizeof(Uint64)) {
376     Uint64 v1, v2;
377     memcpy(&v1, p1, sizeof(Uint64));
378     memcpy(&v2, p2, sizeof(Uint64));
379     if (v1 < v2)
380       return -1;
381     if (v1 > v2)
382       return +1;
383     return 0;
384   }
385   assert(! full);
386   return CmpUnknown;
387 }
388 
389 int
cmpFloat(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)390 NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
391 {
392   if (n2 >= sizeof(float)) {
393     float v1, v2;
394     memcpy(&v1, p1, sizeof(float));
395     memcpy(&v2, p2, sizeof(float));
396     if (v1 < v2)
397       return -1;
398     if (v1 > v2)
399       return +1;
400     return 0;
401   }
402   assert(! full);
403   return CmpUnknown;
404 }
405 
406 int
cmpDouble(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)407 NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
408 {
409   if (n2 >= sizeof(double)) {
410     double v1, v2;
411     memcpy(&v1, p1, sizeof(double));
412     memcpy(&v2, p2, sizeof(double));
413     if (v1 < v2)
414       return -1;
415     if (v1 > v2)
416       return +1;
417     return 0;
418   }
419   assert(! full);
420   return CmpUnknown;
421 }
422 
423 int
cmp_olddecimal(const uchar * s1,const uchar * s2,unsigned n)424 NdbSqlUtil::cmp_olddecimal(const uchar* s1, const uchar* s2, unsigned n)
425 {
426   int sgn = +1;
427   unsigned i = 0;
428   while (i < n) {
429     int c1 = s1[i];
430     int c2 = s2[i];
431     if (c1 == c2) {
432       if (c1 == '-')
433         sgn = -1;
434     } else if (c1 == '-') {
435       return -1;
436     } else if (c2 == '-') {
437       return +1;
438     } else if (c1 < c2) {
439       return -1 * sgn;
440     } else {
441       return +1 * sgn;
442     }
443     i++;
444   }
445   return 0;
446 }
447 
448 int
cmpOlddecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)449 NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
450 {
451   if (full) {
452     assert(n1 == n2);
453     const uchar* v1 = (const uchar*)p1;
454     const uchar* v2 = (const uchar*)p2;
455     return cmp_olddecimal(v1, v2, n1);
456   }
457   return CmpUnknown;
458 }
459 
460 int
cmpOlddecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)461 NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
462 {
463   if (full) {
464     assert(n1 == n2);
465     const uchar* v1 = (const uchar*)p1;
466     const uchar* v2 = (const uchar*)p2;
467     return cmp_olddecimal(v1, v2, n1);
468   }
469   return CmpUnknown;
470 }
471 
472 int
cmpDecimal(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)473 NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
474 {
475   const uchar* v1 = (const uchar*)p1;
476   const uchar* v2 = (const uchar*)p2;
477   // compare as binary strings
478   unsigned n = (n1 <= n2 ? n1 : n2);
479   int k = memcmp(v1, v2, n);
480   if (k == 0) {
481     k = (full ? n1 : n) - n2;
482   }
483   return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
484 }
485 
486 int
cmpDecimalunsigned(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)487 NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
488 {
489   const uchar* v1 = (const uchar*)p1;
490   const uchar* v2 = (const uchar*)p2;
491   // compare as binary strings
492   unsigned n = (n1 <= n2 ? n1 : n2);
493   int k = memcmp(v1, v2, n);
494   if (k == 0) {
495     k = (full ? n1 : n) - n2;
496   }
497   return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
498 }
499 
500 int
cmpChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)501 NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
502 {
503   // collation does not work on prefix for some charsets
504   assert(full);
505   const uchar* v1 = (const uchar*)p1;
506   const uchar* v2 = (const uchar*)p2;
507   // not const in MySQL
508   CHARSET_INFO* cs = (CHARSET_INFO*)(info);
509   // compare with space padding
510   int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
511   return k < 0 ? -1 : k > 0 ? +1 : 0;
512 }
513 
514 int
cmpVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)515 NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
516 {
517   const unsigned lb = 1;
518   // collation does not work on prefix for some charsets
519   assert(full && n1 >= lb && n2 >= lb);
520   const uchar* v1 = (const uchar*)p1;
521   const uchar* v2 = (const uchar*)p2;
522   unsigned m1 = *v1;
523   unsigned m2 = *v2;
524   if (m1 <= n1 - lb && m2 <= n2 - lb) {
525     CHARSET_INFO* cs = (CHARSET_INFO*)(info);
526     // compare with space padding
527     int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
528     return k < 0 ? -1 : k > 0 ? +1 : 0;
529   }
530   // treat bad data as NULL
531   if (m1 > n1 - lb && m2 <= n2 - lb)
532     return -1;
533   if (m1 <= n1 - lb && m2 > n2 - lb)
534     return +1;
535   return 0;
536 }
537 
538 int
cmpBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)539 NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
540 {
541   const uchar* v1 = (const uchar*)p1;
542   const uchar* v2 = (const uchar*)p2;
543   // compare as binary strings
544   unsigned n = (n1 <= n2 ? n1 : n2);
545   int k = memcmp(v1, v2, n);
546   if (k == 0) {
547     k = (full ? n1 : n) - n2;
548   }
549   return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
550 }
551 
552 int
cmpVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)553 NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
554 {
555   const unsigned lb = 1;
556   if (n2 >= lb) {
557     assert(n1 >= lb);
558     const uchar* v1 = (const uchar*)p1;
559     const uchar* v2 = (const uchar*)p2;
560     unsigned m1 = *v1;
561     unsigned m2 = *v2;
562     if (m1 <= n1 - lb && m2 <= n2 - lb) {
563       // compare as binary strings
564       unsigned m = (m1 <= m2 ? m1 : m2);
565       int k = memcmp(v1 + lb, v2 + lb, m);
566       if (k == 0) {
567         k = (full ? m1 : m) - m2;
568       }
569       return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
570     }
571     // treat bad data as NULL
572     if (m1 > n1 - lb && m2 <= n2 - lb)
573       return -1;
574     if (m1 <= n1 - lb && m2 > n2 - lb)
575       return +1;
576     return 0;
577   }
578   assert(! full);
579   return CmpUnknown;
580 }
581 
582 int
cmpDatetime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)583 NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
584 {
585   if (n2 >= sizeof(Int64)) {
586     Int64 v1, v2;
587     memcpy(&v1, p1, sizeof(Int64));
588     memcpy(&v2, p2, sizeof(Int64));
589     if (v1 < v2)
590       return -1;
591     if (v1 > v2)
592       return +1;
593     return 0;
594   }
595   assert(! full);
596   return CmpUnknown;
597 }
598 
599 int
cmpDate(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)600 NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
601 {
602 #ifdef ndb_date_is_4_byte_native_int
603   if (n2 >= sizeof(Int32)) {
604     Int32 v1, v2;
605     memcpy(&v1, p1, sizeof(Int32));
606     memcpy(&v2, p2, sizeof(Int32));
607     if (v1 < v2)
608       return -1;
609     if (v1 > v2)
610       return +1;
611     return 0;
612   }
613 #else
614 #ifdef ndb_date_sol9x86_cc_xO3_madness
615   if (n2 >= 3) {
616     const uchar* v1 = (const uchar*)p1;
617     const uchar* v2 = (const uchar*)p2;
618     // from Field_newdate::val_int
619     Uint64 j1 = uint3korr(v1);
620     Uint64 j2 = uint3korr(v2);
621     j1 = (j1 % 32L)+(j1 / 32L % 16L)*100L + (j1/(16L*32L))*10000L;
622     j2 = (j2 % 32L)+(j2 / 32L % 16L)*100L + (j2/(16L*32L))*10000L;
623     if (j1 < j2)
624       return -1;
625     if (j1 > j2)
626       return +1;
627     return 0;
628   }
629 #else
630   if (n2 >= 3) {
631     const uchar* v1 = (const uchar*)p1;
632     const uchar* v2 = (const uchar*)p2;
633     uint j1 = uint3korr(v1);
634     uint j2 = uint3korr(v2);
635     uint d1 = (j1 & 31);
636     uint d2 = (j2 & 31);
637     j1 = (j1 >> 5);
638     j2 = (j2 >> 5);
639     uint m1 = (j1 & 15);
640     uint m2 = (j2 & 15);
641     j1 = (j1 >> 4);
642     j2 = (j2 >> 4);
643     uint y1 = j1;
644     uint y2 = j2;
645     if (y1 < y2)
646       return -1;
647     if (y1 > y2)
648       return +1;
649     if (m1 < m2)
650       return -1;
651     if (m1 > m2)
652       return +1;
653     if (d1 < d2)
654       return -1;
655     if (d1 > d2)
656       return +1;
657     return 0;
658   }
659 #endif
660 #endif
661   assert(! full);
662   return CmpUnknown;
663 }
664 
665 // not supported
666 int
cmpBlob(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)667 NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
668 {
669   assert(false);
670   return 0;
671 }
672 
673 // not supported
674 int
cmpText(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)675 NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
676 {
677   assert(false);
678   return 0;
679 }
680 
681 int
cmpBit(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)682 NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
683 {
684   Uint32 n = (n1 < n2) ? n1 : n2;
685   int ret = memcmp(p1, p2, n);
686   return ret;
687 }
688 
689 
690 int
cmpTime(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)691 NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
692 {
693   if (n2 >= 3) {
694     const uchar* v1 = (const uchar*)p1;
695     const uchar* v2 = (const uchar*)p2;
696     // from Field_time::val_int
697     Int32 j1 = sint3korr(v1);
698     Int32 j2 = sint3korr(v2);
699     if (j1 < j2)
700       return -1;
701     if (j1 > j2)
702       return +1;
703     return 0;
704   }
705   assert(! full);
706   return CmpUnknown;
707 }
708 
709 // not yet
710 
711 int
cmpLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)712 NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
713 {
714   const unsigned lb = 2;
715   // collation does not work on prefix for some charsets
716   assert(full && n1 >= lb && n2 >= lb);
717   const uchar* v1 = (const uchar*)p1;
718   const uchar* v2 = (const uchar*)p2;
719   unsigned m1 = uint2korr(v1);
720   unsigned m2 = uint2korr(v2);
721   if (m1 <= n1 - lb && m2 <= n2 - lb) {
722     CHARSET_INFO* cs = (CHARSET_INFO*)(info);
723     // compare with space padding
724     int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
725     return k < 0 ? -1 : k > 0 ? +1 : 0;
726   }
727   // treat bad data as NULL
728   if (m1 > n1 - lb && m2 <= n2 - lb)
729     return -1;
730   if (m1 <= n1 - lb && m2 > n2 - lb)
731     return +1;
732   return 0;
733 }
734 
735 int
cmpLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)736 NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
737 {
738   const unsigned lb = 2;
739   if (n2 >= lb) {
740     assert(n1 >= lb);
741     const uchar* v1 = (const uchar*)p1;
742     const uchar* v2 = (const uchar*)p2;
743     unsigned m1 = uint2korr(v1);
744     unsigned m2 = uint2korr(v2);
745     if (m1 <= n1 - lb && m2 <= n2 - lb) {
746       // compare as binary strings
747       unsigned m = (m1 <= m2 ? m1 : m2);
748       int k = memcmp(v1 + lb, v2 + lb, m);
749       if (k == 0) {
750         k = (full ? m1 : m) - m2;
751       }
752       return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
753     }
754     // treat bad data as NULL
755     if (m1 > n1 - lb && m2 <= n2 - lb)
756       return -1;
757     if (m1 <= n1 - lb && m2 > n2 - lb)
758       return +1;
759     return 0;
760   }
761   assert(! full);
762   return CmpUnknown;
763 }
764 
765 int
cmpYear(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)766 NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
767 {
768   if (n2 >= sizeof(Uint8)) {
769     Uint8 v1, v2;
770     memcpy(&v1, p1, sizeof(Uint8));
771     memcpy(&v2, p2, sizeof(Uint8));
772     if (v1 < v2)
773       return -1;
774     if (v1 > v2)
775       return +1;
776     return 0;
777   }
778   assert(! full);
779   return CmpUnknown;
780 }
781 
782 int
cmpTimestamp(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2,bool full)783 NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
784 {
785   if (n2 >= sizeof(Uint32)) {
786     Uint32 v1, v2;
787     memcpy(&v1, p1, sizeof(Uint32));
788     memcpy(&v2, p2, sizeof(Uint32));
789     if (v1 < v2)
790       return -1;
791     if (v1 > v2)
792       return +1;
793     return 0;
794   }
795   assert(! full);
796   return CmpUnknown;
797 }
798 
799 // like
800 
801 static const int ndb_wild_prefix = '\\';
802 static const int ndb_wild_one = '_';
803 static const int ndb_wild_many = '%';
804 
805 int
likeChar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)806 NdbSqlUtil::likeChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
807 {
808   const char* v1 = (const char*)p1;
809   const char* v2 = (const char*)p2;
810   CHARSET_INFO* cs = (CHARSET_INFO*)(info);
811   // strip end spaces to match (incorrect) MySQL behaviour
812   n1 = (*cs->cset->lengthsp)(cs, v1, n1);
813   int k = (*cs->coll->wildcmp)(cs, v1, v1 + n1, v2, v2 + n2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
814   return k == 0 ? 0 : +1;
815 }
816 
817 int
likeBinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)818 NdbSqlUtil::likeBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
819 {
820   assert(info == 0);
821   return likeChar(&my_charset_bin, p1, n1, p2, n2);
822 }
823 
824 int
likeVarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)825 NdbSqlUtil::likeVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
826 {
827   const unsigned lb = 1;
828   if (n1 >= lb) {
829     const uchar* v1 = (const uchar*)p1;
830     const uchar* v2 = (const uchar*)p2;
831     unsigned m1 = *v1;
832     unsigned m2 = n2;
833     if (lb + m1 <= n1) {
834       const char* w1 = (const char*)v1 + lb;
835       const char* w2 = (const char*)v2;
836       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
837       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
838       return k == 0 ? 0 : +1;
839     }
840   }
841   return -1;
842 }
843 
844 int
likeVarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)845 NdbSqlUtil::likeVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
846 {
847   assert(info == 0);
848   return likeVarchar(&my_charset_bin, p1, n1, p2, n2);
849 }
850 
851 int
likeLongvarchar(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)852 NdbSqlUtil::likeLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
853 {
854   const unsigned lb = 2;
855   if (n1 >= lb) {
856     const uchar* v1 = (const uchar*)p1;
857     const uchar* v2 = (const uchar*)p2;
858     unsigned m1 = uint2korr(v1);
859     unsigned m2 = n2;
860     if (lb + m1 <= n1) {
861       const char* w1 = (const char*)v1 + lb;
862       const char* w2 = (const char*)v2;
863       CHARSET_INFO* cs = (CHARSET_INFO*)(info);
864       int k = (*cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, ndb_wild_prefix, ndb_wild_one, ndb_wild_many);
865       return k == 0 ? 0 : +1;
866     }
867   }
868   return -1;
869 }
870 
871 int
likeLongvarbinary(const void * info,const void * p1,unsigned n1,const void * p2,unsigned n2)872 NdbSqlUtil::likeLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
873 {
874   assert(info == 0);
875   return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
876 }
877 
878 // check charset
879 
880 uint
check_column_for_pk(Uint32 typeId,const void * info)881 NdbSqlUtil::check_column_for_pk(Uint32 typeId, const void* info)
882 {
883   const Type& type = getType(typeId);
884   switch (type.m_typeId) {
885   case Type::Char:
886   case Type::Varchar:
887   case Type::Longvarchar:
888     {
889       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
890       if(cs != 0 &&
891          cs->cset != 0 &&
892          cs->coll != 0 &&
893          cs->coll->strnxfrm != 0 &&
894          cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
895         return 0;
896       else
897         return 743;
898     }
899     break;
900   case Type::Undefined:
901   case Type::Blob:
902   case Type::Text:
903   case Type::Bit:
904     break;
905   default:
906     return 0;
907   }
908   return 906;
909 }
910 
911 uint
check_column_for_hash_index(Uint32 typeId,const void * info)912 NdbSqlUtil::check_column_for_hash_index(Uint32 typeId, const void* info)
913 {
914   return check_column_for_pk(typeId, info);
915 }
916 
917 uint
check_column_for_ordered_index(Uint32 typeId,const void * info)918 NdbSqlUtil::check_column_for_ordered_index(Uint32 typeId, const void* info)
919 {
920   const Type& type = getType(typeId);
921   if (type.m_cmp == NULL)
922     return false;
923   switch (type.m_typeId) {
924   case Type::Char:
925   case Type::Varchar:
926   case Type::Longvarchar:
927     {
928       const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
929       if (cs != 0 &&
930           cs->cset != 0 &&
931           cs->coll != 0 &&
932           cs->coll->strnxfrm != 0 &&
933           cs->coll->strnncollsp != 0 &&
934           cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY)
935         return 0;
936       else
937         return 743;
938     }
939     break;
940   case Type::Undefined:
941   case Type::Blob:
942   case Type::Text:
943   case Type::Bit:       // can be fixed
944     break;
945   default:
946     return 0;
947   }
948   return 906;
949 }
950 
951 // utilities
952 
953 bool
get_var_length(Uint32 typeId,const void * p,unsigned attrlen,Uint32 & lb,Uint32 & len)954 NdbSqlUtil::get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len)
955 {
956   const unsigned char* const src = (const unsigned char*)p;
957   switch (typeId) {
958   case NdbSqlUtil::Type::Varchar:
959   case NdbSqlUtil::Type::Varbinary:
960     lb = 1;
961     if (attrlen >= lb) {
962       len = src[0];
963       if (attrlen >= lb + len)
964         return true;
965     }
966     break;
967   case NdbSqlUtil::Type::Longvarchar:
968   case NdbSqlUtil::Type::Longvarbinary:
969     lb = 2;
970     if (attrlen >= lb) {
971       len = src[0] + (src[1] << 8);
972       if (attrlen >= lb + len)
973         return true;
974     }
975     break;
976   default:
977     lb = 0;
978     len = attrlen;
979     return true;
980     break;
981   }
982   return false;
983 }
984 
985 // workaround
986 
987 int
strnxfrm_bug7284(CHARSET_INFO * cs,unsigned char * dst,unsigned dstLen,const unsigned char * src,unsigned srcLen)988 NdbSqlUtil::strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen)
989 {
990   unsigned char nsp[20]; // native space char
991   unsigned char xsp[20]; // strxfrm-ed space char
992 #ifdef VM_TRACE
993   memset(nsp, 0x1f, sizeof(nsp));
994   memset(xsp, 0x1f, sizeof(xsp));
995 #endif
996   // convert from unicode codepoint for space
997   int n1 = (*cs->cset->wc_mb)(cs, (my_wc_t)0x20, nsp, nsp + sizeof(nsp));
998   if (n1 <= 0)
999     return -1;
1000   // strxfrm to binary
1001   int n2 = (*cs->coll->strnxfrm)(cs, xsp, sizeof(xsp), nsp, n1);
1002   if (n2 <= 0)
1003     return -1;
1004   // XXX bug workaround - strnxfrm may not write full string
1005   memset(dst, 0x0, dstLen);
1006   // strxfrm argument string - returns no error indication
1007   int n3 = (*cs->coll->strnxfrm)(cs, dst, dstLen, src, srcLen);
1008   // pad with strxfrm-ed space chars
1009   int n4 = n3;
1010   while (n4 < (int)dstLen) {
1011     dst[n4] = xsp[(n4 - n3) % n2];
1012     n4++;
1013   }
1014   // no check for partial last
1015   return dstLen;
1016 }
1017