1 /* Copyright (c) 2011, Oracle and/or its affiliates.
2    Copyright (C) 2009-2011 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA */
16 
17 #include <my_global.h>
18 #include <m_ctype.h>
19 #include <my_base.h>
20 #include <my_compare.h>
21 #include <my_sys.h>
22 
ha_compare_text(CHARSET_INFO * charset_info,const uchar * a,size_t a_length,const uchar * b,size_t b_length,my_bool part_key)23 int ha_compare_text(CHARSET_INFO *charset_info, const uchar *a, size_t a_length,
24 		    const uchar *b, size_t b_length, my_bool part_key)
25 {
26   if (!part_key)
27     return charset_info->coll->strnncollsp(charset_info, a, a_length,
28                                                          b, b_length);
29   return charset_info->coll->strnncoll(charset_info, a, a_length,
30                                        b, b_length, part_key);
31 }
32 
33 
compare_bin(const uchar * a,uint a_length,const uchar * b,uint b_length,my_bool part_key,my_bool skip_end_space)34 static int compare_bin(const uchar *a, uint a_length,
35                        const uchar *b, uint b_length,
36                        my_bool part_key, my_bool skip_end_space)
37 {
38   uint length= MY_MIN(a_length,b_length);
39   const uchar *end= a+ length;
40   int flag;
41 
42   while (a < end)
43     if ((flag= (int) *a++ - (int) *b++))
44       return flag;
45   if (part_key && b_length < a_length)
46     return 0;
47   if (skip_end_space && a_length != b_length)
48   {
49     int swap= 1;
50     /*
51       We are using space compression. We have to check if longer key
52       has next character < ' ', in which case it's less than the shorter
53       key that has an implicite space afterwards.
54 
55       This code is identical to the one in
56       strings/ctype-simple.c:my_strnncollsp_simple
57     */
58     if (a_length < b_length)
59     {
60       /* put shorter key in a */
61       a_length= b_length;
62       a= b;
63       swap= -1;					/* swap sign of result */
64     }
65     for (end= a + a_length-length; a < end ; a++)
66     {
67       if (*a != ' ')
68 	return (*a < ' ') ? -swap : swap;
69     }
70     return 0;
71   }
72   return (int) (a_length-b_length);
73 }
74 
75 
76 /*
77   Compare two keys
78 
79   SYNOPSIS
80     ha_key_cmp()
81     keyseg	Array of key segments of key to compare
82     a		First key to compare, in format from _mi_pack_key()
83 		This is always from the row
84     b		Second key to compare.  This is from the row or the user
85     key_length	Length of key to compare, based on key b.  This can be shorter
86 		than b to just compare sub keys
87     next_flag	How keys should be compared
88 		If bit SEARCH_FIND is not set the keys includes the row
89 		position and this should also be compared
90                 If SEARCH_PAGE_KEY_HAS_TRANSID is set then 'a' has transid
91                 If SEARCH_USER_KEY_HAS_TRANSID is set then 'b' has transid
92     diff_pos    OUT Number of first keypart where values differ, counting
93                 from one.
94     diff_pos[1] OUT  (b + diff_pos[1]) points to first value in tuple b
95                       that is different from corresponding value in tuple a.
96 
97   EXAMPLES
98    Example1: if the function is called for tuples
99      ('aaa','bbb') and ('eee','fff'), then
100      diff_pos[0] = 1 (as 'aaa' != 'eee')
101      diff_pos[1] = 0 (offset from beginning of tuple b to 'eee' keypart).
102 
103    Example2: if the index function is called for tuples
104      ('aaa','bbb') and ('aaa','fff'),
105      diff_pos[0] = 2 (as 'aaa' != 'eee')
106      diff_pos[1] = 3 (offset from beginning of tuple b to 'fff' keypart,
107                       here we assume that first key part is CHAR(3) NOT NULL)
108 
109   NOTES
110     Number-keys can't be splited
111 
112   RETURN VALUES
113     <0	If a < b
114     0	If a == b
115     >0	If a > b
116 */
117 
118 #define FCMP(A,B) ((int) (A) - (int) (B))
119 
ha_key_cmp(HA_KEYSEG * keyseg,const uchar * a,const uchar * b,uint key_length,uint32 nextflag,uint * diff_pos)120 int ha_key_cmp(HA_KEYSEG *keyseg, const uchar *a,
121                const uchar *b, uint key_length, uint32 nextflag,
122 	       uint *diff_pos)
123 {
124   int flag;
125   int16 s_1,s_2;
126   int32 l_1,l_2;
127   uint32 u_1,u_2;
128   float f_1,f_2;
129   double d_1,d_2;
130   uint next_key_length;
131   const uchar *orig_b= b;
132 
133   *diff_pos=0;
134   for ( ; (int) key_length >0 ; key_length=next_key_length, keyseg++)
135   {
136     const uchar *end;
137     uint piks=! (keyseg->flag & HA_NO_SORT);
138     (*diff_pos)++;
139     diff_pos[1]= (uint)(b - orig_b);
140 
141     /* Handle NULL part */
142     if (keyseg->null_bit)
143     {
144       key_length--;
145       if (*a != *b && piks)
146       {
147         flag = (int) *a - (int) *b;
148         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
149       }
150       b++;
151       if (!*a++)                                /* If key was NULL */
152       {
153         if ((nextflag & (SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT |
154                          SEARCH_NULL_ARE_EQUAL)) ==
155             (SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT))
156         {
157           /* Allow duplicate keys */
158           nextflag= (nextflag & ~(SEARCH_FIND | SEARCH_UPDATE)) | SEARCH_SAME;
159         }
160   	else if (nextflag & SEARCH_NULL_ARE_NOT_EQUAL)
161 	{
162 	  /*
163 	    This is only used from mi_check() to calculate cardinality.
164 	    It can't be used when searching for a key as this would cause
165 	    compare of (a,b) and (b,a) to return the same value.
166 	  */
167 	  return -1;
168 	}
169         next_key_length=key_length;
170         continue;                               /* To next key part */
171       }
172     }
173     end= a+ MY_MIN(keyseg->length,key_length);
174     next_key_length=key_length-keyseg->length;
175 
176     switch ((enum ha_base_keytype) keyseg->type) {
177     case HA_KEYTYPE_TEXT:                       /* Ascii; Key is converted */
178       if (keyseg->flag & HA_SPACE_PACK)
179       {
180         int a_length,b_length,pack_length;
181         get_key_length(a_length,a);
182         get_key_pack_length(b_length,pack_length,b);
183         next_key_length=key_length-b_length-pack_length;
184 
185         if (piks &&
186             (flag=ha_compare_text(keyseg->charset,a,a_length,b,b_length,
187 				  (my_bool) ((nextflag & SEARCH_PREFIX) &&
188 					     next_key_length <= 0))))
189           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
190         a+=a_length;
191         b+=b_length;
192         break;
193       }
194       else
195       {
196 	uint length=(uint) (end-a), a_length=length, b_length=length;
197         if (piks &&
198             (flag= ha_compare_text(keyseg->charset, a, a_length, b, b_length,
199 				   (my_bool) ((nextflag & SEARCH_PREFIX) &&
200 					      next_key_length <= 0))))
201           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
202         a=end;
203         b+=length;
204       }
205       break;
206     case HA_KEYTYPE_BINARY:
207     case HA_KEYTYPE_BIT:
208       if (keyseg->flag & HA_SPACE_PACK)
209       {
210         int a_length,b_length,pack_length;
211         get_key_length(a_length,a);
212         get_key_pack_length(b_length,pack_length,b);
213         next_key_length=key_length-b_length-pack_length;
214 
215         if (piks &&
216 	    (flag=compare_bin(a,a_length,b,b_length,
217                               (my_bool) ((nextflag & SEARCH_PREFIX) &&
218                                          next_key_length <= 0),1)))
219           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
220         a+=a_length;
221         b+=b_length;
222         break;
223       }
224       else
225       {
226         uint length=keyseg->length;
227         if (piks &&
228 	    (flag=compare_bin(a,length,b,length,
229                               (my_bool) ((nextflag & SEARCH_PREFIX) &&
230                                          next_key_length <= 0),0)))
231           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
232         a+=length;
233         b+=length;
234       }
235       break;
236     case HA_KEYTYPE_VARTEXT1:
237     case HA_KEYTYPE_VARTEXT2:
238       {
239         int a_length,b_length,pack_length;
240         get_key_length(a_length,a);
241         get_key_pack_length(b_length,pack_length,b);
242         next_key_length=key_length-b_length-pack_length;
243 
244         if (piks &&
245 	    (flag= ha_compare_text(keyseg->charset,a,a_length,b,b_length,
246                                    (my_bool) ((nextflag & SEARCH_PREFIX) &&
247                                               next_key_length <= 0))))
248           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
249         a+= a_length;
250         b+= b_length;
251         break;
252       }
253       break;
254     case HA_KEYTYPE_VARBINARY1:
255     case HA_KEYTYPE_VARBINARY2:
256       {
257         int a_length,b_length,pack_length;
258         get_key_length(a_length,a);
259         get_key_pack_length(b_length,pack_length,b);
260         next_key_length=key_length-b_length-pack_length;
261 
262         if (piks &&
263 	    (flag=compare_bin(a,a_length,b,b_length,
264                               (my_bool) ((nextflag & SEARCH_PREFIX) &&
265                                          next_key_length <= 0), 0)))
266           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
267         a+=a_length;
268         b+=b_length;
269       }
270       break;
271     case HA_KEYTYPE_INT8:
272     {
273       int i_1= (int) *((signed char*) a);
274       int i_2= (int) *((signed char*) b);
275       if (piks && (flag = CMP_NUM(i_1,i_2)))
276         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
277       a= end;
278       b++;
279       break;
280     }
281     case HA_KEYTYPE_SHORT_INT:
282       s_1= mi_sint2korr(a);
283       s_2= mi_sint2korr(b);
284       if (piks && (flag = CMP_NUM(s_1,s_2)))
285         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
286       a=  end;
287       b+= 2; /* sizeof(short int); */
288       break;
289     case HA_KEYTYPE_USHORT_INT:
290       {
291         uint16 us_1,us_2;
292         us_1= mi_sint2korr(a);
293         us_2= mi_sint2korr(b);
294         if (piks && (flag = CMP_NUM(us_1,us_2)))
295           return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
296         a=  end;
297         b+=2; /* sizeof(short int); */
298         break;
299       }
300     case HA_KEYTYPE_LONG_INT:
301       l_1= mi_sint4korr(a);
302       l_2= mi_sint4korr(b);
303       if (piks && (flag = CMP_NUM(l_1,l_2)))
304         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
305       a=  end;
306       b+= 4; /* sizeof(long int); */
307       break;
308     case HA_KEYTYPE_ULONG_INT:
309       u_1= mi_sint4korr(a);
310       u_2= mi_sint4korr(b);
311       if (piks && (flag = CMP_NUM(u_1,u_2)))
312         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
313       a=  end;
314       b+= 4; /* sizeof(long int); */
315       break;
316     case HA_KEYTYPE_INT24:
317       l_1=mi_sint3korr(a);
318       l_2=mi_sint3korr(b);
319       if (piks && (flag = CMP_NUM(l_1,l_2)))
320         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
321       a=  end;
322       b+= 3;
323       break;
324     case HA_KEYTYPE_UINT24:
325       l_1=mi_uint3korr(a);
326       l_2=mi_uint3korr(b);
327       if (piks && (flag = CMP_NUM(l_1,l_2)))
328         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
329       a=  end;
330       b+= 3;
331       break;
332     case HA_KEYTYPE_FLOAT:
333       mi_float4get(f_1,a);
334       mi_float4get(f_2,b);
335       /*
336         The following may give a compiler warning about floating point
337         comparison not being safe, but this is ok in this context as
338         we are bascily doing sorting
339       */
340       if (piks && (flag = CMP_NUM(f_1,f_2)))
341         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
342       a=  end;
343       b+= 4; /* sizeof(float); */
344       break;
345     case HA_KEYTYPE_DOUBLE:
346       mi_float8get(d_1,a);
347       mi_float8get(d_2,b);
348       /*
349         The following may give a compiler warning about floating point
350         comparison not being safe, but this is ok in this context as
351         we are bascily doing sorting
352       */
353       if (piks && (flag = CMP_NUM(d_1,d_2)))
354         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
355       a=  end;
356       b+= 8;  /* sizeof(double); */
357       break;
358     case HA_KEYTYPE_NUM:                                /* Numeric key */
359     {
360       int swap_flag= 0;
361       int alength,blength;
362 
363       if (keyseg->flag & HA_REVERSE_SORT)
364       {
365         swap_variables(const uchar*, a, b);
366         swap_flag=1;                            /* Remember swap of a & b */
367         end= a+ (int) (end-b);
368       }
369       if (keyseg->flag & HA_SPACE_PACK)
370       {
371         alength= *a++; blength= *b++;
372         end=a+alength;
373         next_key_length=key_length-blength-1;
374       }
375       else
376       {
377         alength= (int) (end-a);
378         blength=keyseg->length;
379         /* remove pre space from keys */
380         for ( ; alength && *a == ' ' ; a++, alength--) ;
381         for ( ; blength && *b == ' ' ; b++, blength--) ;
382       }
383       if (piks)
384       {
385 	if (*a == '-')
386 	{
387 	  if (*b != '-')
388 	    return -1;
389 	  a++; b++;
390 	  swap_variables(const uchar*, a, b);
391 	  swap_variables(int, alength, blength);
392 	  swap_flag=1-swap_flag;
393 	  alength--; blength--;
394 	  end=a+alength;
395 	}
396 	else if (*b == '-')
397 	  return 1;
398 	while (alength && (*a == '+' || *a == '0'))
399 	{
400 	  a++; alength--;
401 	}
402 	while (blength && (*b == '+' || *b == '0'))
403 	{
404 	  b++; blength--;
405 	}
406 	if (alength != blength)
407 	  return (alength < blength) ? -1 : 1;
408 	while (a < end)
409 	  if (*a++ !=  *b++)
410 	    return ((int) a[-1] - (int) b[-1]);
411       }
412       else
413       {
414         b+=(end-a);
415         a=end;
416       }
417 
418       if (swap_flag)                            /* Restore pointers */
419         swap_variables(const uchar*, a, b);
420       break;
421     }
422 #ifdef HAVE_LONG_LONG
423     case HA_KEYTYPE_LONGLONG:
424     {
425       longlong ll_a,ll_b;
426       ll_a= mi_sint8korr(a);
427       ll_b= mi_sint8korr(b);
428       if (piks && (flag = CMP_NUM(ll_a,ll_b)))
429         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
430       a=  end;
431       b+= 8;
432       break;
433     }
434     case HA_KEYTYPE_ULONGLONG:
435     {
436       ulonglong ll_a,ll_b;
437       ll_a= mi_uint8korr(a);
438       ll_b= mi_uint8korr(b);
439       if (piks && (flag = CMP_NUM(ll_a,ll_b)))
440         return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
441       a=  end;
442       b+= 8;
443       break;
444     }
445 #endif
446     case HA_KEYTYPE_END:                        /* Ready */
447       goto end;                                 /* diff_pos is incremented */
448     }
449   }
450   (*diff_pos)++;
451 end:
452   if (!(nextflag & SEARCH_FIND))
453   {
454     /*
455       Compare rowid and possible transid
456       This happens in the following case:
457       - INSERT, UPDATE, DELETE when we have not unique keys or
458         are using versioning
459       - SEARCH_NEXT, SEARCH_PREVIOUS when we need to restart search
460 
461       The logic for comparing transid are as follows:
462       Keys with have a transid have lowest bit in the rowidt. This means that
463       if we are comparing a key with a transid with another key that doesn't
464       have a tranid, we must reset the lowest bit for both keys.
465 
466       When we have transid, the keys are compared in transid order.
467       A key without a transid is regared to be smaller than a key with
468       a transid.
469     */
470 
471     uint i;
472     uchar key_mask, tmp_a, tmp_b;
473 
474     if (nextflag & (SEARCH_NO_FIND | SEARCH_LAST)) /* Find record after key */
475       return (nextflag & (SEARCH_BIGGER | SEARCH_LAST)) ? -1 : 1;
476     key_mask= (uchar) 255;
477 
478     if (!(nextflag & (SEARCH_USER_KEY_HAS_TRANSID |
479                       SEARCH_PAGE_KEY_HAS_TRANSID)))
480     {
481       /*
482         Neither key has a trid.  Only compare row id's and don't
483         try to store rows in trid order
484       */
485       key_length= keyseg->length;
486       nextflag&= ~SEARCH_INSERT;
487     }
488     else
489     {
490       /*
491         Set key_mask so that we reset the last bit in the rowid before
492         we compare it. This is needed as the lowest bit in the rowid is
493         used to mark if the key has a transid or not.
494       */
495       key_mask= (uchar) 254;
496       if (!test_all_bits(nextflag, (SEARCH_USER_KEY_HAS_TRANSID |
497                                     SEARCH_PAGE_KEY_HAS_TRANSID)))
498       {
499         /*
500           No transaction id for user key or for key on page
501           Ignore transid as at least one of the keys are visible for all
502         */
503         key_length= keyseg->length;
504       }
505       else
506       {
507         /*
508           Both keys have trids. No need of special handling of incomplete
509           trids below.
510         */
511         nextflag&= ~SEARCH_INSERT;
512       }
513     }
514     DBUG_ASSERT(key_length > 0);
515 
516     for (i= key_length-1 ; (int) i-- > 0 ; )
517     {
518       if (*a++ != *b++)
519       {
520         flag= FCMP(a[-1],b[-1]);
521         goto found;
522       }
523     }
524     tmp_a= *a & key_mask;
525     tmp_b= *b & key_mask;
526     flag= FCMP(tmp_a, tmp_b);
527 
528     if (flag == 0 && (nextflag & SEARCH_INSERT))
529     {
530       /*
531         Ensure that on insert we get rows stored in trid order.
532         If one of the parts doesn't have a trid, this should be regarded
533         as smaller than the other
534       */
535         return (nextflag & SEARCH_USER_KEY_HAS_TRANSID) ? -1 : 1;
536     }
537 found:
538     if (nextflag & SEARCH_SAME)
539       return (flag);                            /* read same */
540     if (nextflag & SEARCH_BIGGER)
541       return (flag <= 0 ? -1 : 1);              /* read next */
542     return (flag < 0 ? -1 : 1);                 /* read previous */
543   }
544   return 0;
545 } /* ha_key_cmp */
546 
547 /*
548   Find the first NULL value in index-suffix values tuple
549 
550   SYNOPSIS
551     ha_find_null()
552       keyseg     Array of keyparts for key suffix
553       a          Key suffix value tuple
554 
555   DESCRIPTION
556     Find the first NULL value in index-suffix values tuple.
557 
558   TODO
559     Consider optimizing this function or its use so we don't search for
560     NULL values in completely NOT NULL index suffixes.
561 
562   RETURN
563     First key part that has NULL as value in values tuple, or the last key
564     part (with keyseg->type==HA_TYPE_END) if values tuple doesn't contain
565     NULLs.
566 */
567 
ha_find_null(HA_KEYSEG * keyseg,const uchar * a)568 HA_KEYSEG *ha_find_null(HA_KEYSEG *keyseg, const uchar *a)
569 {
570   for (; (enum ha_base_keytype) keyseg->type != HA_KEYTYPE_END; keyseg++)
571   {
572     const uchar *end;
573     if (keyseg->null_bit)
574     {
575       if (!*a++)
576         return keyseg;
577     }
578     end= a+ keyseg->length;
579 
580     switch ((enum ha_base_keytype) keyseg->type) {
581     case HA_KEYTYPE_TEXT:
582     case HA_KEYTYPE_BINARY:
583     case HA_KEYTYPE_BIT:
584       if (keyseg->flag & HA_SPACE_PACK)
585       {
586         int a_length;
587         get_key_length(a_length, a);
588         a += a_length;
589         break;
590       }
591       else
592         a= end;
593       break;
594     case HA_KEYTYPE_VARTEXT1:
595     case HA_KEYTYPE_VARTEXT2:
596     case HA_KEYTYPE_VARBINARY1:
597     case HA_KEYTYPE_VARBINARY2:
598       {
599         int a_length;
600         get_key_length(a_length, a);
601         a+= a_length;
602         break;
603       }
604     case HA_KEYTYPE_NUM:
605       if (keyseg->flag & HA_SPACE_PACK)
606       {
607         int alength= *a++;
608         end= a+alength;
609       }
610       a= end;
611       break;
612     case HA_KEYTYPE_INT8:
613     case HA_KEYTYPE_SHORT_INT:
614     case HA_KEYTYPE_USHORT_INT:
615     case HA_KEYTYPE_LONG_INT:
616     case HA_KEYTYPE_ULONG_INT:
617     case HA_KEYTYPE_INT24:
618     case HA_KEYTYPE_UINT24:
619 #ifdef HAVE_LONG_LONG
620     case HA_KEYTYPE_LONGLONG:
621     case HA_KEYTYPE_ULONGLONG:
622 #endif
623     case HA_KEYTYPE_FLOAT:
624     case HA_KEYTYPE_DOUBLE:
625       a= end;
626       break;
627     case HA_KEYTYPE_END:                        /* purecov: inspected */
628       /* keep compiler happy */
629       DBUG_ASSERT(0);
630       break;
631     }
632   }
633   return keyseg;
634 }
635 
636