1 /*****************************************************************************
2
3 Copyright (c) 1994, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /*******************************************************************//**
28 @file rem/rem0cmp.cc
29 Comparison services for records
30
31 Created 7/1/1994 Heikki Tuuri
32 ************************************************************************/
33
34 #include "ha_prototypes.h"
35
36 #include "rem0cmp.h"
37
38 #ifdef UNIV_NONINL
39 #include "rem0cmp.ic"
40 #endif
41
42 #include "handler0alter.h"
43 #include "srv0srv.h"
44
45 #include <gstream.h>
46 #include <spatial.h>
47 #include <gis0geo.h>
48 #include <page0cur.h>
49 #include <algorithm>
50
51 /* ALPHABETICAL ORDER
52 ==================
53
54 The records are put into alphabetical order in the following
55 way: let F be the first field where two records disagree.
56 If there is a character in some position n where the
57 records disagree, the order is determined by comparison of
58 the characters at position n, possibly after
59 collating transformation. If there is no such character,
60 but the corresponding fields have different lengths, then
61 if the data type of the fields is paddable,
62 shorter field is padded with a padding character. If the
63 data type is not paddable, longer field is considered greater.
64 Finally, the SQL null is bigger than any other value.
65
66 At the present, the comparison functions return 0 in the case,
67 where two records disagree only in the way that one
68 has more fields than the other. */
69
70 /** Compare two data fields.
71 @param[in] prtype precise type
72 @param[in] a data field
73 @param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
74 @param[in] b data field
75 @param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
76 @return positive, 0, negative, if a is greater, equal, less than b,
77 respectively */
78 UNIV_INLINE
79 int
innobase_mysql_cmp(ulint prtype,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)80 innobase_mysql_cmp(
81 ulint prtype,
82 const byte* a,
83 unsigned int a_length,
84 const byte* b,
85 unsigned int b_length)
86 {
87 #ifdef UNIV_DEBUG
88 switch (prtype & DATA_MYSQL_TYPE_MASK) {
89 case MYSQL_TYPE_BIT:
90 case MYSQL_TYPE_STRING:
91 case MYSQL_TYPE_VAR_STRING:
92 case MYSQL_TYPE_TINY_BLOB:
93 case MYSQL_TYPE_MEDIUM_BLOB:
94 case MYSQL_TYPE_BLOB:
95 case MYSQL_TYPE_LONG_BLOB:
96 case MYSQL_TYPE_VARCHAR:
97 break;
98 default:
99 ut_error;
100 }
101 #endif /* UNIV_DEBUG */
102
103 uint cs_num = (uint) dtype_get_charset_coll(prtype);
104
105 if (CHARSET_INFO* cs = get_charset(cs_num, MYF(MY_WME))) {
106 return(cs->coll->strnncollsp(
107 cs, a, a_length, b, b_length, 0));
108 }
109
110 ib::fatal() << "Unable to find charset-collation " << cs_num;
111 return(0);
112 }
113
114 /*************************************************************//**
115 Returns TRUE if two columns are equal for comparison purposes.
116 @return TRUE if the columns are considered equal in comparisons */
117 ibool
cmp_cols_are_equal(const dict_col_t * col1,const dict_col_t * col2,ibool check_charsets)118 cmp_cols_are_equal(
119 /*===============*/
120 const dict_col_t* col1, /*!< in: column 1 */
121 const dict_col_t* col2, /*!< in: column 2 */
122 ibool check_charsets)
123 /*!< in: whether to check charsets */
124 {
125 if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
126 && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
127
128 /* Both are non-binary string types: they can be compared if
129 and only if the charset-collation is the same */
130
131 if (check_charsets) {
132 return(dtype_get_charset_coll(col1->prtype)
133 == dtype_get_charset_coll(col2->prtype));
134 } else {
135 return(TRUE);
136 }
137 }
138
139 if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
140 && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
141
142 /* Both are binary string types: they can be compared */
143
144 return(TRUE);
145 }
146
147 if (col1->mtype != col2->mtype) {
148
149 return(FALSE);
150 }
151
152 if (col1->mtype == DATA_INT
153 && (col1->prtype & DATA_UNSIGNED)
154 != (col2->prtype & DATA_UNSIGNED)) {
155
156 /* The storage format of an unsigned integer is different
157 from a signed integer: in a signed integer we OR
158 0x8000... to the value of positive integers. */
159
160 return(FALSE);
161 }
162
163 return(col1->mtype != DATA_INT || col1->len == col2->len);
164 }
165
166 /** Compare two DATA_DECIMAL (MYSQL_TYPE_DECIMAL) fields.
167 TODO: Remove this function. Everything should use MYSQL_TYPE_NEWDECIMAL.
168 @param[in] a data field
169 @param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
170 @param[in] b data field
171 @param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
172 @return positive, 0, negative, if a is greater, equal, less than b,
173 respectively */
174 static UNIV_COLD
175 int
cmp_decimal(const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)176 cmp_decimal(
177 const byte* a,
178 unsigned int a_length,
179 const byte* b,
180 unsigned int b_length)
181 {
182 int swap_flag;
183
184 /* Remove preceding spaces */
185 for (; a_length && *a == ' '; a++, a_length--) { }
186 for (; b_length && *b == ' '; b++, b_length--) { }
187
188 if (*a == '-') {
189 swap_flag = -1;
190
191 if (*b != '-') {
192 return(swap_flag);
193 }
194
195 a++; b++;
196 a_length--;
197 b_length--;
198 } else {
199 swap_flag = 1;
200
201 if (*b == '-') {
202 return(swap_flag);
203 }
204 }
205
206 while (a_length > 0 && (*a == '+' || *a == '0')) {
207 a++; a_length--;
208 }
209
210 while (b_length > 0 && (*b == '+' || *b == '0')) {
211 b++; b_length--;
212 }
213
214 if (a_length != b_length) {
215 if (a_length < b_length) {
216 return(-swap_flag);
217 }
218
219 return(swap_flag);
220 }
221
222 while (a_length > 0 && *a == *b) {
223
224 a++; b++; a_length--;
225 }
226
227 if (a_length == 0) {
228 return(0);
229 }
230
231 if (*a <= *b) {
232 swap_flag = -swap_flag;
233 }
234
235 return(swap_flag);
236 }
237
238 /*************************************************************//**
239 Innobase uses this function to compare two geometry data fields
240 @return 1, 0, -1, if a is greater, equal, less than b, respectively */
241 static
242 int
cmp_geometry_field(ulint mtype,ulint prtype,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)243 cmp_geometry_field(
244 /*===============*/
245 ulint mtype, /*!< in: main type */
246 ulint prtype, /*!< in: precise type */
247 const byte* a, /*!< in: data field */
248 unsigned int a_length, /*!< in: data field length,
249 not UNIV_SQL_NULL */
250 const byte* b, /*!< in: data field */
251 unsigned int b_length) /*!< in: data field length,
252 not UNIV_SQL_NULL */
253 {
254 double x1, x2;
255 double y1, y2;
256
257 ut_ad(prtype & DATA_GIS_MBR);
258
259 if (a_length < sizeof(double) || b_length < sizeof(double)) {
260 return(0);
261 }
262
263 /* Try to compare mbr left lower corner (xmin, ymin) */
264 x1 = mach_double_read(a);
265 x2 = mach_double_read(b);
266 y1 = mach_double_read(a + sizeof(double) * SPDIMS);
267 y2 = mach_double_read(b + sizeof(double) * SPDIMS);
268
269 if (x1 > x2) {
270 return(1);
271 } else if (x2 > x1) {
272 return(-1);
273 }
274
275 if (y1 > y2) {
276 return(1);
277 } else if (y2 > y1) {
278 return(-1);
279 }
280
281 /* left lower corner (xmin, ymin) overlaps, now right upper corner */
282 x1 = mach_double_read(a + sizeof(double));
283 x2 = mach_double_read(b + sizeof(double));
284 y1 = mach_double_read(a + sizeof(double) * SPDIMS + sizeof(double));
285 y2 = mach_double_read(b + sizeof(double) * SPDIMS + sizeof(double));
286
287 if (x1 > x2) {
288 return(1);
289 } else if (x2 > x1) {
290 return(-1);
291 }
292
293 if (y1 > y2) {
294 return(1);
295 } else if (y2 > y1) {
296 return(-1);
297 }
298
299 return(0);
300 }
301 /*************************************************************//**
302 Innobase uses this function to compare two gis data fields
303 @return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
304 1, 0 for rest compare modes, depends on a and b qualifies the
305 relationship (CONTAINT, WITHIN etc.) */
306 static
307 int
cmp_gis_field(page_cur_mode_t mode,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)308 cmp_gis_field(
309 /*============*/
310 page_cur_mode_t mode, /*!< in: compare mode */
311 const byte* a, /*!< in: data field */
312 unsigned int a_length, /*!< in: data field length,
313 not UNIV_SQL_NULL */
314 const byte* b, /*!< in: data field */
315 unsigned int b_length) /*!< in: data field length,
316 not UNIV_SQL_NULL */
317 {
318 if (mode == PAGE_CUR_MBR_EQUAL) {
319 /* TODO: Since the DATA_GEOMETRY is not used in compare
320 function, we could pass it instead of a specific type now */
321 return(cmp_geometry_field(DATA_GEOMETRY, DATA_GIS_MBR,
322 a, a_length, b, b_length));
323 } else {
324 return(rtree_key_cmp(mode, a, a_length, b, b_length));
325 }
326 }
327
328 /** Compare two data fields.
329 @param[in] mtype main type
330 @param[in] prtype precise type
331 @param[in] a data field
332 @param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
333 @param[in] b data field
334 @param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
335 @return positive, 0, negative, if a is greater, equal, less than b,
336 respectively */
337 static
338 int
cmp_whole_field(ulint mtype,ulint prtype,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)339 cmp_whole_field(
340 ulint mtype,
341 ulint prtype,
342 const byte* a,
343 unsigned int a_length,
344 const byte* b,
345 unsigned int b_length)
346 {
347 float f_1;
348 float f_2;
349 double d_1;
350 double d_2;
351
352 switch (mtype) {
353 case DATA_DECIMAL:
354 return(cmp_decimal(a, a_length, b, b_length));
355 case DATA_DOUBLE:
356 d_1 = mach_double_read(a);
357 d_2 = mach_double_read(b);
358
359 if (d_1 > d_2) {
360 return(1);
361 } else if (d_2 > d_1) {
362 return(-1);
363 }
364
365 return(0);
366
367 case DATA_FLOAT:
368 f_1 = mach_float_read(a);
369 f_2 = mach_float_read(b);
370
371 if (f_1 > f_2) {
372 return(1);
373 } else if (f_2 > f_1) {
374 return(-1);
375 }
376
377 return(0);
378 case DATA_VARCHAR:
379 case DATA_CHAR:
380 return(my_charset_latin1.coll->strnncollsp(
381 &my_charset_latin1,
382 a, a_length, b, b_length, 0));
383 case DATA_BLOB:
384 if (prtype & DATA_BINARY_TYPE) {
385 ib::error() << "Comparing a binary BLOB"
386 " using a character set collation!";
387 ut_ad(0);
388 }
389 /* fall through */
390 case DATA_VARMYSQL:
391 case DATA_MYSQL:
392 return(innobase_mysql_cmp(prtype,
393 a, a_length, b, b_length));
394 case DATA_POINT:
395 case DATA_VAR_POINT:
396 case DATA_GEOMETRY:
397 return(cmp_geometry_field(mtype, prtype, a, a_length, b,
398 b_length));
399 default:
400 ib::fatal() << "Unknown data type number " << mtype;
401 }
402
403 return(0);
404 }
405
406 /** Compare two data fields.
407 @param[in] mtype main type
408 @param[in] prtype precise type
409 @param[in] data1 data field
410 @param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
411 @param[in] data2 data field
412 @param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
413 @return the comparison result of data1 and data2
414 @retval 0 if data1 is equal to data2
415 @retval negative if data1 is less than data2
416 @retval positive if data1 is greater than data2 */
417 inline
418 int
cmp_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)419 cmp_data(
420 ulint mtype,
421 ulint prtype,
422 const byte* data1,
423 ulint len1,
424 const byte* data2,
425 ulint len2)
426 {
427 if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL) {
428 if (len1 == len2) {
429 return(0);
430 }
431
432 /* We define the SQL null to be the smallest possible
433 value of a field. */
434 return(len1 == UNIV_SQL_NULL ? -1 : 1);
435 }
436
437 ulint pad;
438
439 switch (mtype) {
440 case DATA_FIXBINARY:
441 case DATA_BINARY:
442 if (dtype_get_charset_coll(prtype)
443 != DATA_MYSQL_BINARY_CHARSET_COLL) {
444 pad = 0x20;
445 break;
446 }
447 /* fall through */
448 case DATA_INT:
449 case DATA_SYS_CHILD:
450 case DATA_SYS:
451 pad = ULINT_UNDEFINED;
452 break;
453 case DATA_POINT:
454 case DATA_VAR_POINT:
455 /* Since DATA_POINT has a fixed length of DATA_POINT_LEN,
456 currently, pad is not needed. Meanwhile, DATA_VAR_POINT acts
457 the same as DATA_GEOMETRY */
458 case DATA_GEOMETRY:
459 ut_ad(prtype & DATA_BINARY_TYPE);
460 pad = ULINT_UNDEFINED;
461 if (prtype & DATA_GIS_MBR) {
462 return(cmp_whole_field(mtype, prtype,
463 data1, (unsigned) len1,
464 data2, (unsigned) len2));
465 }
466 break;
467 case DATA_BLOB:
468 if (prtype & DATA_BINARY_TYPE) {
469 pad = ULINT_UNDEFINED;
470 break;
471 }
472 /* fall through */
473 default:
474 return(cmp_whole_field(mtype, prtype,
475 data1, (unsigned) len1,
476 data2, (unsigned) len2));
477 }
478
479 ulint len;
480 int cmp;
481
482 if (len1 < len2) {
483 len = len1;
484 len2 -= len;
485 len1 = 0;
486 } else {
487 len = len2;
488 len1 -= len;
489 len2 = 0;
490 }
491
492 if (len) {
493 #if defined __i386__ || defined __x86_64__ || defined _M_IX86 || defined _M_X64
494 /* Compare the first bytes with a loop to avoid the call
495 overhead of memcmp(). On x86 and x86-64, the GCC built-in
496 (repz cmpsb) seems to be very slow, so we will be calling the
497 libc version. http://gcc.gnu.org/bugzilla/show_bug.cgi?id=43052
498 tracks the slowness of the GCC built-in memcmp().
499
500 We compare up to the first 4..7 bytes with the loop.
501 The (len & 3) is used for "normalizing" or
502 "quantizing" the len parameter for the memcmp() call,
503 in case the whole prefix is equal. On x86 and x86-64,
504 the GNU libc memcmp() of equal strings is faster with
505 len=4 than with len=3.
506
507 On other architectures than the IA32 or AMD64, there could
508 be a built-in memcmp() that is faster than the loop.
509 We only use the loop where we know that it can improve
510 the performance. */
511 for (ulint i = 4 + (len & 3); i > 0; i--) {
512 cmp = int(*data1++) - int(*data2++);
513 if (cmp) {
514 return(cmp);
515 }
516
517 if (!--len) {
518 break;
519 }
520 }
521
522 if (len) {
523 #endif /* IA32 or AMD64 */
524 cmp = memcmp(data1, data2, len);
525
526 if (cmp) {
527 return(cmp);
528 }
529
530 data1 += len;
531 data2 += len;
532 #if defined __i386__ || defined __x86_64__ || defined _M_IX86 || defined _M_X64
533 }
534 #endif /* IA32 or AMD64 */
535 }
536
537 cmp = (int) (len1 - len2);
538
539 if (!cmp || pad == ULINT_UNDEFINED) {
540 return(cmp);
541 }
542
543 len = 0;
544
545 if (len1) {
546 do {
547 cmp = static_cast<int>(
548 mach_read_from_1(&data1[len++]) - pad);
549 } while (cmp == 0 && len < len1);
550 } else {
551 ut_ad(len2 > 0);
552
553 do {
554 cmp = static_cast<int>(
555 pad - mach_read_from_1(&data2[len++]));
556 } while (cmp == 0 && len < len2);
557 }
558
559 return(cmp);
560 }
561
562 /** Compare a GIS data tuple to a physical record.
563 @param[in] dtuple data tuple
564 @param[in] rec B-tree record
565 @param[in] offsets rec_get_offsets(rec)
566 @param[in] mode compare mode
567 @retval negative if dtuple is less than rec */
568 int
cmp_dtuple_rec_with_gis(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,page_cur_mode_t mode)569 cmp_dtuple_rec_with_gis(
570 /*====================*/
571 const dtuple_t* dtuple, /*!< in: data tuple */
572 const rec_t* rec, /*!< in: physical record which differs from
573 dtuple in some of the common fields, or which
574 has an equal number or more fields than
575 dtuple */
576 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
577 page_cur_mode_t mode) /*!< in: compare mode */
578 {
579 const dfield_t* dtuple_field; /* current field in logical record */
580 ulint dtuple_f_len; /* the length of the current field
581 in the logical record */
582 ulint rec_f_len; /* length of current field in rec */
583 const byte* rec_b_ptr; /* pointer to the current byte in
584 rec field */
585 int ret = 0; /* return value */
586
587 dtuple_field = dtuple_get_nth_field(dtuple, 0);
588 dtuple_f_len = dfield_get_len(dtuple_field);
589
590 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
591 ret = cmp_gis_field(
592 mode, static_cast<const byte*>(dfield_get_data(dtuple_field)),
593 (unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
594
595 return(ret);
596 }
597
598 /** Compare a GIS data tuple to a physical record in rtree non-leaf node.
599 We need to check the page number field, since we don't store pk field in
600 rtree non-leaf node.
601 @param[in] dtuple data tuple
602 @param[in] rec R-tree record
603 @param[in] offsets rec_get_offsets(rec)
604 @retval negative if dtuple is less than rec */
605 int
cmp_dtuple_rec_with_gis_internal(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)606 cmp_dtuple_rec_with_gis_internal(
607 const dtuple_t* dtuple,
608 const rec_t* rec,
609 const ulint* offsets)
610 {
611 const dfield_t* dtuple_field; /* current field in logical record */
612 ulint dtuple_f_len; /* the length of the current field
613 in the logical record */
614 ulint rec_f_len; /* length of current field in rec */
615 const byte* rec_b_ptr; /* pointer to the current byte in
616 rec field */
617 int ret = 0; /* return value */
618
619 dtuple_field = dtuple_get_nth_field(dtuple, 0);
620 dtuple_f_len = dfield_get_len(dtuple_field);
621
622 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
623 ret = cmp_gis_field(
624 PAGE_CUR_WITHIN,
625 static_cast<const byte*>(dfield_get_data(dtuple_field)),
626 (unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
627 if (ret != 0) {
628 return(ret);
629 }
630
631 dtuple_field = dtuple_get_nth_field(dtuple, 1);
632 dtuple_f_len = dfield_get_len(dtuple_field);
633 rec_b_ptr = rec_get_nth_field(rec, offsets, 1, &rec_f_len);
634
635 return(cmp_data(dtuple_field->type.mtype,
636 dtuple_field->type.prtype,
637 static_cast<const byte*>(dtuple_field->data),
638 dtuple_f_len,
639 rec_b_ptr,
640 rec_f_len));
641 }
642
643 /** Compare two data fields.
644 @param[in] mtype main type
645 @param[in] prtype precise type
646 @param[in] data1 data field
647 @param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
648 @param[in] data2 data field
649 @param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
650 @return the comparison result of data1 and data2
651 @retval 0 if data1 is equal to data2
652 @retval negative if data1 is less than data2
653 @retval positive if data1 is greater than data2 */
654 int
cmp_data_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)655 cmp_data_data(
656 ulint mtype,
657 ulint prtype,
658 const byte* data1,
659 ulint len1,
660 const byte* data2,
661 ulint len2)
662 {
663 return(cmp_data(mtype, prtype, data1, len1, data2, len2));
664 }
665
666 /** Compare a data tuple to a physical record.
667 @param[in] dtuple data tuple
668 @param[in] rec B-tree record
669 @param[in] offsets rec_get_offsets(rec)
670 @param[in] n_cmp number of fields to compare
671 @param[in,out] matched_fields number of completely matched fields
672 @return the comparison result of dtuple and rec
673 @retval 0 if dtuple is equal to rec
674 @retval negative if dtuple is less than rec
675 @retval positive if dtuple is greater than rec */
676 int
cmp_dtuple_rec_with_match_low(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,ulint n_cmp,ulint * matched_fields)677 cmp_dtuple_rec_with_match_low(
678 const dtuple_t* dtuple,
679 const rec_t* rec,
680 const ulint* offsets,
681 ulint n_cmp,
682 ulint* matched_fields)
683 {
684 ulint cur_field; /* current field number */
685 int ret; /* return value */
686
687 ut_ad(dtuple_check_typed(dtuple));
688 ut_ad(rec_offs_validate(rec, NULL, offsets));
689
690 cur_field = *matched_fields;
691
692 ut_ad(n_cmp > 0);
693 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
694 ut_ad(cur_field <= n_cmp);
695 ut_ad(cur_field <= rec_offs_n_fields(offsets));
696
697 if (cur_field == 0) {
698 ulint rec_info = rec_get_info_bits(rec,
699 rec_offs_comp(offsets));
700 ulint tup_info = dtuple_get_info_bits(dtuple);
701
702 if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
703 ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
704 goto order_resolved;
705 } else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
706 ret = -1;
707 goto order_resolved;
708 }
709 }
710
711 /* Match fields in a loop */
712
713 for (; cur_field < n_cmp; cur_field++) {
714 const byte* rec_b_ptr;
715 const dfield_t* dtuple_field
716 = dtuple_get_nth_field(dtuple, cur_field);
717 const byte* dtuple_b_ptr
718 = static_cast<const byte*>(
719 dfield_get_data(dtuple_field));
720 const dtype_t* type
721 = dfield_get_type(dtuple_field);
722 ulint dtuple_f_len
723 = dfield_get_len(dtuple_field);
724 ulint rec_f_len;
725
726 /* We should never compare against an externally
727 stored field. Only clustered index records can
728 contain externally stored fields, and the first fields
729 (primary key fields) should already differ. */
730 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
731
732 rec_b_ptr = rec_get_nth_field(rec, offsets, cur_field,
733 &rec_f_len);
734
735 ut_ad(!dfield_is_ext(dtuple_field));
736
737 ret = cmp_data(type->mtype, type->prtype,
738 dtuple_b_ptr, dtuple_f_len,
739 rec_b_ptr, rec_f_len);
740 if (ret) {
741 goto order_resolved;
742 }
743 }
744
745 ret = 0; /* If we ran out of fields, dtuple was equal to rec
746 up to the common fields */
747 order_resolved:
748 *matched_fields = cur_field;
749 return(ret);
750 }
751
752 /** Get the pad character code point for a type.
753 @param[in] type
754 @return pad character code point
755 @retval ULINT_UNDEFINED if no padding is specified */
756 UNIV_INLINE
757 ulint
cmp_get_pad_char(const dtype_t * type)758 cmp_get_pad_char(
759 const dtype_t* type)
760 {
761 switch (type->mtype) {
762 case DATA_FIXBINARY:
763 case DATA_BINARY:
764 if (dtype_get_charset_coll(type->prtype)
765 == DATA_MYSQL_BINARY_CHARSET_COLL) {
766 /* Starting from 5.0.18, do not pad
767 VARBINARY or BINARY columns. */
768 return(ULINT_UNDEFINED);
769 }
770 /* Fall through */
771 case DATA_CHAR:
772 case DATA_VARCHAR:
773 case DATA_MYSQL:
774 case DATA_VARMYSQL:
775 /* Space is the padding character for all char and binary
776 strings, and starting from 5.0.3, also for TEXT strings. */
777 return(0x20);
778 case DATA_GEOMETRY:
779 /* DATA_GEOMETRY is binary data, not ASCII-based. */
780 return(ULINT_UNDEFINED);
781 case DATA_BLOB:
782 if (!(type->prtype & DATA_BINARY_TYPE)) {
783 return(0x20);
784 }
785 /* Fall through */
786 default:
787 /* No padding specified */
788 return(ULINT_UNDEFINED);
789 }
790 }
791
792 /** Compare a data tuple to a physical record.
793 @param[in] dtuple data tuple
794 @param[in] rec B-tree or R-tree index record
795 @param[in] index index tree
796 @param[in] offsets rec_get_offsets(rec)
797 @param[in,out] matched_fields number of completely matched fields
798 @param[in,out] matched_bytes number of matched bytes in the first
799 field that is not matched
800 @return the comparison result of dtuple and rec
801 @retval 0 if dtuple is equal to rec
802 @retval negative if dtuple is less than rec
803 @retval positive if dtuple is greater than rec */
804 int
cmp_dtuple_rec_with_match_bytes(const dtuple_t * dtuple,const rec_t * rec,const dict_index_t * index,const ulint * offsets,ulint * matched_fields,ulint * matched_bytes)805 cmp_dtuple_rec_with_match_bytes(
806 const dtuple_t* dtuple,
807 const rec_t* rec,
808 const dict_index_t* index,
809 const ulint* offsets,
810 ulint* matched_fields,
811 ulint* matched_bytes)
812 {
813 ulint n_cmp = dtuple_get_n_fields_cmp(dtuple);
814 ulint cur_field; /* current field number */
815 ulint cur_bytes;
816 int ret; /* return value */
817
818 ut_ad(dtuple_check_typed(dtuple));
819 ut_ad(rec_offs_validate(rec, index, offsets));
820 //ut_ad(page_is_leaf(page_align(rec)));
821 ut_ad(!(REC_INFO_MIN_REC_FLAG
822 & dtuple_get_info_bits(dtuple)));
823 ut_ad(!(REC_INFO_MIN_REC_FLAG
824 & rec_get_info_bits(rec, rec_offs_comp(offsets))));
825
826 cur_field = *matched_fields;
827 cur_bytes = *matched_bytes;
828
829 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
830 ut_ad(cur_field <= n_cmp);
831 ut_ad(cur_field + (cur_bytes > 0) <= rec_offs_n_fields(offsets));
832
833 /* Match fields in a loop; stop if we run out of fields in dtuple
834 or find an externally stored field */
835
836 while (cur_field < n_cmp) {
837 const dfield_t* dfield = dtuple_get_nth_field(
838 dtuple, cur_field);
839 const dtype_t* type = dfield_get_type(dfield);
840 ulint dtuple_f_len = dfield_get_len(dfield);
841 const byte* dtuple_b_ptr;
842 const byte* rec_b_ptr;
843 ulint rec_f_len;
844
845 dtuple_b_ptr = static_cast<const byte*>(
846 dfield_get_data(dfield));
847 rec_b_ptr = rec_get_nth_field(rec, offsets,
848 cur_field, &rec_f_len);
849 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
850
851 /* If we have matched yet 0 bytes, it may be that one or
852 both the fields are SQL null, or the record or dtuple may be
853 the predefined minimum record. */
854 if (cur_bytes == 0) {
855 if (dtuple_f_len == UNIV_SQL_NULL) {
856 if (rec_f_len == UNIV_SQL_NULL) {
857
858 goto next_field;
859 }
860
861 ret = -1;
862 goto order_resolved;
863 } else if (rec_f_len == UNIV_SQL_NULL) {
864 /* We define the SQL null to be the
865 smallest possible value of a field
866 in the alphabetical order */
867
868 ret = 1;
869 goto order_resolved;
870 }
871 }
872
873 switch (type->mtype) {
874 case DATA_FIXBINARY:
875 case DATA_BINARY:
876 case DATA_INT:
877 case DATA_SYS_CHILD:
878 case DATA_SYS:
879 break;
880 case DATA_BLOB:
881 if (type->prtype & DATA_BINARY_TYPE) {
882 break;
883 }
884 /* fall through */
885 default:
886 ret = cmp_data(type->mtype, type->prtype,
887 dtuple_b_ptr, dtuple_f_len,
888 rec_b_ptr, rec_f_len);
889
890 if (!ret) {
891 goto next_field;
892 }
893
894 cur_bytes = 0;
895 goto order_resolved;
896 }
897
898 /* Set the pointers at the current byte */
899
900 rec_b_ptr += cur_bytes;
901 dtuple_b_ptr += cur_bytes;
902 /* Compare then the fields */
903
904 for (const ulint pad = cmp_get_pad_char(type);;
905 cur_bytes++) {
906 ulint rec_byte = pad;
907 ulint dtuple_byte = pad;
908
909 if (rec_f_len <= cur_bytes) {
910 if (dtuple_f_len <= cur_bytes) {
911
912 goto next_field;
913 }
914
915 if (rec_byte == ULINT_UNDEFINED) {
916 ret = 1;
917
918 goto order_resolved;
919 }
920 } else {
921 rec_byte = *rec_b_ptr++;
922 }
923
924 if (dtuple_f_len <= cur_bytes) {
925 if (dtuple_byte == ULINT_UNDEFINED) {
926 ret = -1;
927
928 goto order_resolved;
929 }
930 } else {
931 dtuple_byte = *dtuple_b_ptr++;
932 }
933
934 if (dtuple_byte < rec_byte) {
935 ret = -1;
936 goto order_resolved;
937 } else if (dtuple_byte > rec_byte) {
938 ret = 1;
939 goto order_resolved;
940 }
941 }
942
943 next_field:
944 cur_field++;
945 cur_bytes = 0;
946 }
947
948 ut_ad(cur_bytes == 0);
949
950 ret = 0; /* If we ran out of fields, dtuple was equal to rec
951 up to the common fields */
952 order_resolved:
953 *matched_fields = cur_field;
954 *matched_bytes = cur_bytes;
955
956 return(ret);
957 }
958
959 /** Compare a data tuple to a physical record.
960 @see cmp_dtuple_rec_with_match
961 @param[in] dtuple data tuple
962 @param[in] rec B-tree record
963 @param[in] offsets rec_get_offsets(rec); may be NULL
964 for ROW_FORMAT=REDUNDANT
965 @return the comparison result of dtuple and rec
966 @retval 0 if dtuple is equal to rec
967 @retval negative if dtuple is less than rec
968 @retval positive if dtuple is greater than rec */
969 int
cmp_dtuple_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)970 cmp_dtuple_rec(
971 const dtuple_t* dtuple,
972 const rec_t* rec,
973 const ulint* offsets)
974 {
975 ulint matched_fields = 0;
976
977 ut_ad(rec_offs_validate(rec, NULL, offsets));
978 return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
979 &matched_fields));
980 }
981
982 /**************************************************************//**
983 Checks if a dtuple is a prefix of a record. The last field in dtuple
984 is allowed to be a prefix of the corresponding field in the record.
985 @return TRUE if prefix */
986 ibool
cmp_dtuple_is_prefix_of_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)987 cmp_dtuple_is_prefix_of_rec(
988 /*========================*/
989 const dtuple_t* dtuple, /*!< in: data tuple */
990 const rec_t* rec, /*!< in: physical record */
991 const ulint* offsets)/*!< in: array returned by rec_get_offsets() */
992 {
993 ulint n_fields;
994 ulint matched_fields = 0;
995
996 ut_ad(rec_offs_validate(rec, NULL, offsets));
997 n_fields = dtuple_get_n_fields(dtuple);
998
999 if (n_fields > rec_offs_n_fields(offsets)) {
1000 ut_ad(0);
1001 return(FALSE);
1002 }
1003
1004 cmp_dtuple_rec_with_match(dtuple, rec, offsets, &matched_fields);
1005 return(matched_fields == n_fields);
1006 }
1007
1008 /*************************************************************//**
1009 Compare two physical record fields.
1010 @retval positive if rec1 field is greater than rec2
1011 @retval negative if rec1 field is less than rec2
1012 @retval 0 if rec1 field equals to rec2 */
1013 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1014 int
cmp_rec_rec_simple_field(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,ulint n)1015 cmp_rec_rec_simple_field(
1016 /*=====================*/
1017 const rec_t* rec1, /*!< in: physical record */
1018 const rec_t* rec2, /*!< in: physical record */
1019 const ulint* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
1020 const ulint* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
1021 const dict_index_t* index, /*!< in: data dictionary index */
1022 ulint n) /*!< in: field to compare */
1023 {
1024 const byte* rec1_b_ptr;
1025 const byte* rec2_b_ptr;
1026 ulint rec1_f_len;
1027 ulint rec2_f_len;
1028 const dict_col_t* col = dict_index_get_nth_col(index, n);
1029
1030 ut_ad(!rec_offs_nth_extern(offsets1, n));
1031 ut_ad(!rec_offs_nth_extern(offsets2, n));
1032
1033 rec1_b_ptr = rec_get_nth_field(rec1, offsets1, n, &rec1_f_len);
1034 rec2_b_ptr = rec_get_nth_field(rec2, offsets2, n, &rec2_f_len);
1035
1036 return(cmp_data(col->mtype, col->prtype,
1037 rec1_b_ptr, rec1_f_len, rec2_b_ptr, rec2_f_len));
1038 }
1039
1040 /** Compare two physical records that contain the same number of columns,
1041 none of which are stored externally.
1042 @retval positive if rec1 (including non-ordering columns) is greater than rec2
1043 @retval negative if rec1 (including non-ordering columns) is less than rec2
1044 @retval 0 if rec1 is a duplicate of rec2 */
1045 int
cmp_rec_rec_simple(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,struct TABLE * table)1046 cmp_rec_rec_simple(
1047 /*===============*/
1048 const rec_t* rec1, /*!< in: physical record */
1049 const rec_t* rec2, /*!< in: physical record */
1050 const ulint* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
1051 const ulint* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
1052 const dict_index_t* index, /*!< in: data dictionary index */
1053 struct TABLE* table) /*!< in: MySQL table, for reporting
1054 duplicate key value if applicable,
1055 or NULL */
1056 {
1057 ulint n;
1058 ulint n_uniq = dict_index_get_n_unique(index);
1059 bool null_eq = false;
1060
1061 ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
1062 ut_ad(rec_offs_n_fields(offsets2) == rec_offs_n_fields(offsets2));
1063
1064 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1065
1066 for (n = 0; n < n_uniq; n++) {
1067 int cmp = cmp_rec_rec_simple_field(
1068 rec1, rec2, offsets1, offsets2, index, n);
1069
1070 if (cmp) {
1071 return(cmp);
1072 }
1073
1074 /* If the fields are internally equal, they must both
1075 be NULL or non-NULL. */
1076 ut_ad(rec_offs_nth_sql_null(offsets1, n)
1077 == rec_offs_nth_sql_null(offsets2, n));
1078
1079 if (rec_offs_nth_sql_null(offsets1, n)) {
1080 ut_ad(!(dict_index_get_nth_col(index, n)->prtype
1081 & DATA_NOT_NULL));
1082 null_eq = true;
1083 }
1084 }
1085
1086 /* If we ran out of fields, the ordering columns of rec1 were
1087 equal to rec2. Issue a duplicate key error if needed. */
1088
1089 if (!null_eq && table && dict_index_is_unique(index)) {
1090 /* Report erroneous row using new version of table. */
1091 innobase_rec_to_mysql(table, rec1, index, offsets1);
1092 return(0);
1093 }
1094
1095 /* Else, keep comparing so that we have the full internal
1096 order. */
1097 for (; n < dict_index_get_n_fields(index); n++) {
1098 int cmp = cmp_rec_rec_simple_field(
1099 rec1, rec2, offsets1, offsets2, index, n);
1100
1101 if (cmp) {
1102 return(cmp);
1103 }
1104
1105 /* If the fields are internally equal, they must both
1106 be NULL or non-NULL. */
1107 ut_ad(rec_offs_nth_sql_null(offsets1, n)
1108 == rec_offs_nth_sql_null(offsets2, n));
1109 }
1110
1111 /* This should never be reached. Internally, an index must
1112 never contain duplicate entries. */
1113 ut_ad(0);
1114 return(0);
1115 }
1116
1117 /** Compare two B-tree records.
1118 @param[in] rec1 B-tree record
1119 @param[in] rec2 B-tree record
1120 @param[in] offsets1 rec_get_offsets(rec1, index)
1121 @param[in] offsets2 rec_get_offsets(rec2, index)
1122 @param[in] index B-tree index
1123 @param[in] spatial_index_non_leaf true if record present in spatial index non leaf
1124 @param[in] nulls_unequal true if this is for index cardinality
1125 statistics estimation, and innodb_stats_method=nulls_unequal
1126 or innodb_stats_method=nulls_ignored
1127 @param[out] matched_fields number of completely matched fields
1128 within the first field not completely matched
1129 @return the comparison result
1130 @retval 0 if rec1 is equal to rec2
1131 @retval negative if rec1 is less than rec2
1132 @retval positive if rec2 is greater than rec2 */
1133 int
cmp_rec_rec_with_match(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,bool spatial_index_non_leaf,bool nulls_unequal,ulint * matched_fields)1134 cmp_rec_rec_with_match(
1135 const rec_t* rec1,
1136 const rec_t* rec2,
1137 const ulint* offsets1,
1138 const ulint* offsets2,
1139 const dict_index_t* index,
1140 bool spatial_index_non_leaf,
1141 bool nulls_unequal,
1142 ulint* matched_fields)
1143 {
1144 ulint rec1_n_fields; /* the number of fields in rec */
1145 ulint rec1_f_len; /* length of current field in rec */
1146 const byte* rec1_b_ptr; /* pointer to the current byte
1147 in rec field */
1148 ulint rec2_n_fields; /* the number of fields in rec */
1149 ulint rec2_f_len; /* length of current field in rec */
1150 const byte* rec2_b_ptr; /* pointer to the current byte
1151 in rec field */
1152 ulint cur_field = 0; /* current field number */
1153 int ret = 0; /* return value */
1154 ulint comp;
1155
1156 ut_ad(rec1 != NULL);
1157 ut_ad(rec2 != NULL);
1158 ut_ad(index != NULL);
1159 ut_ad(rec_offs_validate(rec1, index, offsets1));
1160 ut_ad(rec_offs_validate(rec2, index, offsets2));
1161 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1162
1163 comp = rec_offs_comp(offsets1);
1164 rec1_n_fields = rec_offs_n_fields(offsets1);
1165 rec2_n_fields = rec_offs_n_fields(offsets2);
1166
1167 /* Test if rec is the predefined minimum record */
1168 if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
1169 & REC_INFO_MIN_REC_FLAG)) {
1170 /* There should only be one such record. */
1171 ut_ad(!(rec_get_info_bits(rec2, comp)
1172 & REC_INFO_MIN_REC_FLAG));
1173 ret = -1;
1174 goto order_resolved;
1175 } else if (UNIV_UNLIKELY
1176 (rec_get_info_bits(rec2, comp)
1177 & REC_INFO_MIN_REC_FLAG)) {
1178 ret = 1;
1179 goto order_resolved;
1180 }
1181
1182 /* Match fields in a loop */
1183
1184 for (; cur_field < rec1_n_fields && cur_field < rec2_n_fields;
1185 cur_field++) {
1186
1187 ulint mtype;
1188 ulint prtype;
1189
1190 /* If this is node-ptr records then avoid comparing node-ptr
1191 field. Only key field needs to be compared. */
1192 if (cur_field == dict_index_get_n_unique_in_tree(index)) {
1193 break;
1194 }
1195
1196 if (dict_index_is_ibuf(index)) {
1197 /* This is for the insert buffer B-tree. */
1198 mtype = DATA_BINARY;
1199 prtype = 0;
1200 } else {
1201 /* When the page is non-leaf spatial index page
1202 we should not depend upon the dictionary information because the
1203 page doesnt hold any primary key information. The non leaf node
1204 of a spatial index has only two fields 1)MBR 2)page number of
1205 child node. */
1206
1207 if ((cur_field == 1) && spatial_index_non_leaf) {
1208 ut_ad(dict_index_is_spatial(index));
1209 mtype = DATA_SYS_CHILD;
1210 prtype = 0;
1211 } else {
1212 const dict_col_t* col;
1213 col= dict_index_get_nth_col(index, cur_field);
1214 mtype = col->mtype;
1215 prtype = col->prtype;
1216 }
1217
1218 /* If the index is spatial index, we mark the
1219 prtype of the first field as MBR field. */
1220 if (cur_field == 0 && dict_index_is_spatial(index)) {
1221 ut_ad(DATA_GEOMETRY_MTYPE(mtype));
1222 prtype |= DATA_GIS_MBR;
1223 }
1224 }
1225
1226 /* We should never encounter an externally stored field.
1227 Externally stored fields only exist in clustered index
1228 leaf page records. These fields should already differ
1229 in the primary key columns already, before DB_TRX_ID,
1230 DB_ROLL_PTR, and any externally stored columns. */
1231 ut_ad(!rec_offs_nth_extern(offsets1, cur_field));
1232 ut_ad(!rec_offs_nth_extern(offsets2, cur_field));
1233
1234 rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
1235 cur_field, &rec1_f_len);
1236 rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
1237 cur_field, &rec2_f_len);
1238
1239 if (nulls_unequal
1240 && rec1_f_len == UNIV_SQL_NULL
1241 && rec2_f_len == UNIV_SQL_NULL) {
1242 ret = -1;
1243 goto order_resolved;
1244 }
1245
1246 ret = cmp_data(mtype, prtype,
1247 rec1_b_ptr, rec1_f_len,
1248 rec2_b_ptr, rec2_f_len);
1249 if (ret) {
1250 goto order_resolved;
1251 }
1252 }
1253
1254 /* If we ran out of fields, rec1 was equal to rec2 up
1255 to the common fields */
1256 ut_ad(ret == 0);
1257 order_resolved:
1258 *matched_fields = cur_field;
1259 return(ret);
1260 }
1261
1262 #ifdef UNIV_COMPILE_TEST_FUNCS
1263
1264 #ifdef HAVE_UT_CHRONO_T
1265
1266 void
test_cmp_data_data(ulint len)1267 test_cmp_data_data(ulint len)
1268 {
1269 int i;
1270 static byte zeros[64];
1271
1272 if (len > sizeof zeros) {
1273 len = sizeof zeros;
1274 }
1275
1276 ut_chrono_t ch(__func__);
1277
1278 for (i = 1000000; i > 0; i--) {
1279 i += cmp_data(DATA_INT, 0, zeros, len, zeros, len);
1280 }
1281 }
1282
1283 #endif /* HAVE_UT_CHRONO_T */
1284
1285 #endif /* UNIV_COMPILE_TEST_FUNCS */
1286