1 /*****************************************************************************
2
3 Copyright (c) 1994, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2020, 2022, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /*******************************************************************//**
21 @file rem/rem0cmp.cc
22 Comparison services for records
23
24 Created 7/1/1994 Heikki Tuuri
25 ************************************************************************/
26
27 #include "rem0cmp.h"
28 #include "rem0rec.h"
29 #include "page0page.h"
30 #include "dict0mem.h"
31 #include "handler0alter.h"
32
33 /* ALPHABETICAL ORDER
34 ==================
35
36 The records are put into alphabetical order in the following
37 way: let F be the first field where two records disagree.
38 If there is a character in some position n where the
39 records disagree, the order is determined by comparison of
40 the characters at position n, possibly after
41 collating transformation. If there is no such character,
42 but the corresponding fields have different lengths, then
43 if the data type of the fields is paddable,
44 shorter field is padded with a padding character. If the
45 data type is not paddable, longer field is considered greater.
46 Finally, the SQL null is bigger than any other value.
47
48 At the present, the comparison functions return 0 in the case,
49 where two records disagree only in the way that one
50 has more fields than the other. */
51
52 #ifndef DBUG_OFF
53 /** @return whether a data type is compatible with strnncoll() functions */
is_strnncoll_compatible(ulint type)54 static bool is_strnncoll_compatible(ulint type)
55 {
56 switch (type) {
57 case MYSQL_TYPE_BIT:
58 case MYSQL_TYPE_STRING:
59 case MYSQL_TYPE_VAR_STRING:
60 case MYSQL_TYPE_TINY_BLOB:
61 case MYSQL_TYPE_MEDIUM_BLOB:
62 case MYSQL_TYPE_BLOB:
63 case MYSQL_TYPE_LONG_BLOB:
64 case MYSQL_TYPE_VARCHAR:
65 return true;
66 default:
67 return false;
68 }
69 }
70 #endif /* DBUG_OFF */
71
72 /*************************************************************//**
73 Returns TRUE if two columns are equal for comparison purposes.
74 @return TRUE if the columns are considered equal in comparisons */
75 ibool
cmp_cols_are_equal(const dict_col_t * col1,const dict_col_t * col2,ibool check_charsets)76 cmp_cols_are_equal(
77 /*===============*/
78 const dict_col_t* col1, /*!< in: column 1 */
79 const dict_col_t* col2, /*!< in: column 2 */
80 ibool check_charsets)
81 /*!< in: whether to check charsets */
82 {
83 if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
84 && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
85
86 /* Both are non-binary string types: they can be compared if
87 and only if the charset-collation is the same */
88
89 if (check_charsets) {
90 return(dtype_get_charset_coll(col1->prtype)
91 == dtype_get_charset_coll(col2->prtype));
92 } else {
93 return(TRUE);
94 }
95 }
96
97 if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
98 && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
99
100 /* Both are binary string types: they can be compared */
101
102 return(TRUE);
103 }
104
105 if (col1->mtype != col2->mtype) {
106
107 return(FALSE);
108 }
109
110 if (col1->mtype == DATA_INT
111 && (col1->prtype & DATA_UNSIGNED)
112 != (col2->prtype & DATA_UNSIGNED)) {
113
114 /* The storage format of an unsigned integer is different
115 from a signed integer: in a signed integer we OR
116 0x8000... to the value of positive integers. */
117
118 return(FALSE);
119 }
120
121 return(col1->mtype != DATA_INT || col1->len == col2->len);
122 }
123
124 /** Compare two DATA_DECIMAL (MYSQL_TYPE_DECIMAL) fields.
125 TODO: Remove this function. Everything should use MYSQL_TYPE_NEWDECIMAL.
126 @param[in] a data field
127 @param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
128 @param[in] b data field
129 @param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
130 @return positive, 0, negative, if a is greater, equal, less than b,
131 respectively */
132 static ATTRIBUTE_COLD
133 int
cmp_decimal(const byte * a,ulint a_length,const byte * b,ulint b_length)134 cmp_decimal(const byte* a, ulint a_length, const byte* b, ulint b_length)
135 {
136 int swap_flag;
137
138 /* Remove preceding spaces */
139 for (; a_length && *a == ' '; a++, a_length--) { }
140 for (; b_length && *b == ' '; b++, b_length--) { }
141
142 if (*a == '-') {
143 swap_flag = -1;
144
145 if (*b != '-') {
146 return(swap_flag);
147 }
148
149 a++; b++;
150 a_length--;
151 b_length--;
152 } else {
153 swap_flag = 1;
154
155 if (*b == '-') {
156 return(swap_flag);
157 }
158 }
159
160 while (a_length > 0 && (*a == '+' || *a == '0')) {
161 a++; a_length--;
162 }
163
164 while (b_length > 0 && (*b == '+' || *b == '0')) {
165 b++; b_length--;
166 }
167
168 if (a_length != b_length) {
169 if (a_length < b_length) {
170 return(-swap_flag);
171 }
172
173 return(swap_flag);
174 }
175
176 while (a_length > 0 && *a == *b) {
177
178 a++; b++; a_length--;
179 }
180
181 if (a_length == 0) {
182 return(0);
183 }
184
185 if (*a <= *b) {
186 swap_flag = -swap_flag;
187 }
188
189 return(swap_flag);
190 }
191
192 /** Compare two data fields.
193 @param mtype main type
194 @param prtype precise type
195 @param data1 data field
196 @param len1 length of data1 in bytes, or UNIV_SQL_NULL
197 @param data2 data field
198 @param len2 length of data2 in bytes, or UNIV_SQL_NULL
199 @return the comparison result of data1 and data2
200 @retval 0 if data1 is equal to data2
201 @retval negative if data1 is less than data2
202 @retval positive if data1 is greater than data2 */
cmp_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)203 static int cmp_data(ulint mtype, ulint prtype, const byte *data1, ulint len1,
204 const byte *data2, ulint len2)
205 {
206 ut_ad(len1 != UNIV_SQL_DEFAULT);
207 ut_ad(len2 != UNIV_SQL_DEFAULT);
208
209 if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL)
210 {
211 if (len1 == len2)
212 return 0;
213
214 /* We define the SQL null to be the smallest possible value of a field. */
215 return len1 == UNIV_SQL_NULL ? -1 : 1;
216 }
217
218 switch (mtype) {
219 default:
220 ib::fatal() << "Unknown data type number " << mtype;
221 case DATA_DECIMAL:
222 return cmp_decimal(data1, len1, data2, len2);
223 case DATA_DOUBLE:
224 {
225 const double af= mach_double_read(data1), bf= mach_double_read(data2);
226 return af > bf ? 1 : bf > af ? -1 : 0;
227 }
228 case DATA_FLOAT:
229 {
230 const float af= mach_float_read(data1), bf= mach_float_read(data2);
231 return af > bf ? 1 : bf > af ? -1 : 0;
232 }
233 case DATA_FIXBINARY:
234 case DATA_BINARY:
235 if (dtype_get_charset_coll(prtype) != DATA_MYSQL_BINARY_CHARSET_COLL)
236 {
237 if (ulint len= std::min(len1, len2))
238 {
239 if (int cmp= memcmp(data1, data2, len))
240 return cmp;
241 data1+= len;
242 data2+= len;
243 len1-= len;
244 len2-= len;
245 }
246
247 int cmp= 0;
248 if (len1)
249 {
250 const byte *end= &data1[len1];
251 do
252 cmp= static_cast<int>(*data1++ - byte{0x20});
253 while (cmp == 0 && data1 < end);
254 }
255 else if (len2)
256 {
257 const byte *end= &data2[len2];
258 do
259 cmp= static_cast<int>(byte{0x20} - *data2++);
260 while (cmp == 0 && data2 < end);
261 }
262 return cmp;
263 }
264 /* fall through */
265 case DATA_INT:
266 case DATA_SYS_CHILD:
267 case DATA_SYS:
268 break;
269 case DATA_GEOMETRY:
270 ut_ad(prtype & DATA_BINARY_TYPE);
271 if (prtype & DATA_GIS_MBR)
272 {
273 ut_ad(len1 == DATA_MBR_LEN);
274 ut_ad(len2 == DATA_MBR_LEN);
275 return cmp_geometry_field(data1, data2);
276 }
277 break;
278 case DATA_BLOB:
279 if (prtype & DATA_BINARY_TYPE)
280 break;
281 /* fall through */
282 case DATA_VARMYSQL:
283 DBUG_ASSERT(is_strnncoll_compatible(prtype & DATA_MYSQL_TYPE_MASK));
284 if (CHARSET_INFO *cs= get_charset(dtype_get_charset_coll(prtype),
285 MYF(MY_WME)))
286 return cs->coll->strnncollsp(cs, data1, len1, data2, len2);
287 no_collation:
288 ib::fatal() << "Unable to find charset-collation for " << prtype;
289 case DATA_MYSQL:
290 DBUG_ASSERT(is_strnncoll_compatible(prtype & DATA_MYSQL_TYPE_MASK));
291 if (CHARSET_INFO *cs= get_charset(dtype_get_charset_coll(prtype),
292 MYF(MY_WME)))
293 return cs->coll->strnncollsp_nchars(cs, data1, len1, data2, len2,
294 std::max(len1, len2));
295 goto no_collation;
296 case DATA_VARCHAR:
297 case DATA_CHAR:
298 /* latin1_swedish_ci is treated as a special case in InnoDB.
299 Because it is a fixed-length encoding (mbminlen=mbmaxlen=1),
300 non-NULL CHAR(n) values will always occupy n bytes and we
301 can invoke strnncollsp() instead of strnncollsp_nchars(). */
302 return my_charset_latin1.strnncollsp(data1, len1, data2, len2);
303 }
304
305 if (ulint len= std::min(len1, len2))
306 if (int cmp= memcmp(data1, data2, len))
307 return cmp;
308
309 return len1 > len2 ? 1 : len2 > len1 ? -1 : 0;
310 }
311
312 /** Compare two data fields.
313 @param[in] mtype main type
314 @param[in] prtype precise type
315 @param[in] data1 data field
316 @param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
317 @param[in] data2 data field
318 @param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
319 @return the comparison result of data1 and data2
320 @retval 0 if data1 is equal to data2
321 @retval negative if data1 is less than data2
322 @retval positive if data1 is greater than data2 */
323 int
cmp_data_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)324 cmp_data_data(
325 ulint mtype,
326 ulint prtype,
327 const byte* data1,
328 ulint len1,
329 const byte* data2,
330 ulint len2)
331 {
332 return(cmp_data(mtype, prtype, data1, len1, data2, len2));
333 }
334
335 /** Compare a data tuple to a physical record.
336 @param[in] dtuple data tuple
337 @param[in] rec B-tree record
338 @param[in] offsets rec_get_offsets(rec)
339 @param[in] n_cmp number of fields to compare
340 @param[in,out] matched_fields number of completely matched fields
341 @return the comparison result of dtuple and rec
342 @retval 0 if dtuple is equal to rec
343 @retval negative if dtuple is less than rec
344 @retval positive if dtuple is greater than rec */
345 int
cmp_dtuple_rec_with_match_low(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets,ulint n_cmp,ulint * matched_fields)346 cmp_dtuple_rec_with_match_low(
347 const dtuple_t* dtuple,
348 const rec_t* rec,
349 const rec_offs* offsets,
350 ulint n_cmp,
351 ulint* matched_fields)
352 {
353 ulint cur_field; /* current field number */
354 int ret; /* return value */
355
356 ut_ad(dtuple_check_typed(dtuple));
357 ut_ad(rec_offs_validate(rec, NULL, offsets));
358
359 cur_field = *matched_fields;
360
361 ut_ad(n_cmp > 0);
362 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
363 ut_ad(cur_field <= n_cmp);
364 ut_ad(cur_field <= rec_offs_n_fields(offsets));
365
366 if (cur_field == 0) {
367 ulint rec_info = rec_get_info_bits(rec,
368 rec_offs_comp(offsets));
369 ulint tup_info = dtuple_get_info_bits(dtuple);
370
371 if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
372 ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
373 goto order_resolved;
374 } else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
375 ret = -1;
376 goto order_resolved;
377 }
378 }
379
380 /* Match fields in a loop */
381
382 for (; cur_field < n_cmp; cur_field++) {
383 const byte* rec_b_ptr;
384 const dfield_t* dtuple_field
385 = dtuple_get_nth_field(dtuple, cur_field);
386 const byte* dtuple_b_ptr
387 = static_cast<const byte*>(
388 dfield_get_data(dtuple_field));
389 const dtype_t* type
390 = dfield_get_type(dtuple_field);
391 ulint dtuple_f_len
392 = dfield_get_len(dtuple_field);
393 ulint rec_f_len;
394
395 /* We should never compare against an externally
396 stored field. Only clustered index records can
397 contain externally stored fields, and the first fields
398 (primary key fields) should already differ. */
399 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
400 /* We should never compare against instantly added columns.
401 Columns can only be instantly added to clustered index
402 leaf page records, and the first fields (primary key fields)
403 should already differ. */
404 ut_ad(!rec_offs_nth_default(offsets, cur_field));
405
406 rec_b_ptr = rec_get_nth_field(rec, offsets, cur_field,
407 &rec_f_len);
408
409 ut_ad(!dfield_is_ext(dtuple_field));
410
411 ret = cmp_data(type->mtype, type->prtype,
412 dtuple_b_ptr, dtuple_f_len,
413 rec_b_ptr, rec_f_len);
414 if (ret) {
415 goto order_resolved;
416 }
417 }
418
419 ret = 0; /* If we ran out of fields, dtuple was equal to rec
420 up to the common fields */
421 order_resolved:
422 *matched_fields = cur_field;
423 return(ret);
424 }
425
426 /** Get the pad character code point for a type.
427 @param[in] type
428 @return pad character code point
429 @retval ULINT_UNDEFINED if no padding is specified */
430 UNIV_INLINE
431 ulint
cmp_get_pad_char(const dtype_t * type)432 cmp_get_pad_char(
433 const dtype_t* type)
434 {
435 switch (type->mtype) {
436 case DATA_FIXBINARY:
437 case DATA_BINARY:
438 if (dtype_get_charset_coll(type->prtype)
439 == DATA_MYSQL_BINARY_CHARSET_COLL) {
440 /* Starting from 5.0.18, do not pad
441 VARBINARY or BINARY columns. */
442 return(ULINT_UNDEFINED);
443 }
444 /* Fall through */
445 case DATA_CHAR:
446 case DATA_VARCHAR:
447 case DATA_MYSQL:
448 case DATA_VARMYSQL:
449 /* Space is the padding character for all char and binary
450 strings, and starting from 5.0.3, also for TEXT strings. */
451 return(0x20);
452 case DATA_GEOMETRY:
453 /* DATA_GEOMETRY is binary data, not ASCII-based. */
454 return(ULINT_UNDEFINED);
455 case DATA_BLOB:
456 if (!(type->prtype & DATA_BINARY_TYPE)) {
457 return(0x20);
458 }
459 /* Fall through */
460 default:
461 /* No padding specified */
462 return(ULINT_UNDEFINED);
463 }
464 }
465
466 /** Compare a data tuple to a physical record.
467 @param[in] dtuple data tuple
468 @param[in] rec B-tree or R-tree index record
469 @param[in] index index tree
470 @param[in] offsets rec_get_offsets(rec)
471 @param[in,out] matched_fields number of completely matched fields
472 @param[in,out] matched_bytes number of matched bytes in the first
473 field that is not matched
474 @return the comparison result of dtuple and rec
475 @retval 0 if dtuple is equal to rec
476 @retval negative if dtuple is less than rec
477 @retval positive if dtuple is greater than rec */
478 int
cmp_dtuple_rec_with_match_bytes(const dtuple_t * dtuple,const rec_t * rec,const dict_index_t * index,const rec_offs * offsets,ulint * matched_fields,ulint * matched_bytes)479 cmp_dtuple_rec_with_match_bytes(
480 const dtuple_t* dtuple,
481 const rec_t* rec,
482 const dict_index_t* index,
483 const rec_offs* offsets,
484 ulint* matched_fields,
485 ulint* matched_bytes)
486 {
487 ut_ad(dtuple_check_typed(dtuple));
488 ut_ad(rec_offs_validate(rec, index, offsets));
489 ut_ad(!(REC_INFO_MIN_REC_FLAG
490 & dtuple_get_info_bits(dtuple)));
491
492 if (UNIV_UNLIKELY(REC_INFO_MIN_REC_FLAG
493 & rec_get_info_bits(rec, rec_offs_comp(offsets)))) {
494 ut_ad(page_rec_is_first(rec, page_align(rec)));
495 ut_ad(!page_has_prev(page_align(rec)));
496 ut_ad(rec_is_metadata(rec, *index));
497 return 1;
498 }
499
500 ulint cur_field = *matched_fields;
501 ulint cur_bytes = *matched_bytes;
502 ulint n_cmp = dtuple_get_n_fields_cmp(dtuple);
503 int ret;
504
505 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
506 ut_ad(cur_field <= n_cmp);
507 ut_ad(cur_field + (cur_bytes > 0) <= rec_offs_n_fields(offsets));
508
509 /* Match fields in a loop; stop if we run out of fields in dtuple
510 or find an externally stored field */
511
512 while (cur_field < n_cmp) {
513 const dfield_t* dfield = dtuple_get_nth_field(
514 dtuple, cur_field);
515 const dtype_t* type = dfield_get_type(dfield);
516 ulint dtuple_f_len = dfield_get_len(dfield);
517 const byte* dtuple_b_ptr;
518 const byte* rec_b_ptr;
519 ulint rec_f_len;
520
521 dtuple_b_ptr = static_cast<const byte*>(
522 dfield_get_data(dfield));
523
524 ut_ad(!rec_offs_nth_default(offsets, cur_field));
525 rec_b_ptr = rec_get_nth_field(rec, offsets,
526 cur_field, &rec_f_len);
527 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
528
529 /* If we have matched yet 0 bytes, it may be that one or
530 both the fields are SQL null, or the record or dtuple may be
531 the predefined minimum record. */
532 if (cur_bytes == 0) {
533 if (dtuple_f_len == UNIV_SQL_NULL) {
534 if (rec_f_len == UNIV_SQL_NULL) {
535
536 goto next_field;
537 }
538
539 ret = -1;
540 goto order_resolved;
541 } else if (rec_f_len == UNIV_SQL_NULL) {
542 /* We define the SQL null to be the
543 smallest possible value of a field
544 in the alphabetical order */
545
546 ret = 1;
547 goto order_resolved;
548 }
549 }
550
551 switch (type->mtype) {
552 case DATA_FIXBINARY:
553 case DATA_BINARY:
554 case DATA_INT:
555 case DATA_SYS_CHILD:
556 case DATA_SYS:
557 break;
558 case DATA_BLOB:
559 if (type->prtype & DATA_BINARY_TYPE) {
560 break;
561 }
562 /* fall through */
563 default:
564 ret = cmp_data(type->mtype, type->prtype,
565 dtuple_b_ptr, dtuple_f_len,
566 rec_b_ptr, rec_f_len);
567
568 if (!ret) {
569 goto next_field;
570 }
571
572 cur_bytes = 0;
573 goto order_resolved;
574 }
575
576 /* Set the pointers at the current byte */
577
578 rec_b_ptr += cur_bytes;
579 dtuple_b_ptr += cur_bytes;
580 /* Compare then the fields */
581
582 for (const ulint pad = cmp_get_pad_char(type);;
583 cur_bytes++) {
584 ulint rec_byte = pad;
585 ulint dtuple_byte = pad;
586
587 if (rec_f_len <= cur_bytes) {
588 if (dtuple_f_len <= cur_bytes) {
589
590 goto next_field;
591 }
592
593 if (rec_byte == ULINT_UNDEFINED) {
594 ret = 1;
595
596 goto order_resolved;
597 }
598 } else {
599 rec_byte = *rec_b_ptr++;
600 }
601
602 if (dtuple_f_len <= cur_bytes) {
603 if (dtuple_byte == ULINT_UNDEFINED) {
604 ret = -1;
605
606 goto order_resolved;
607 }
608 } else {
609 dtuple_byte = *dtuple_b_ptr++;
610 }
611
612 if (dtuple_byte < rec_byte) {
613 ret = -1;
614 goto order_resolved;
615 } else if (dtuple_byte > rec_byte) {
616 ret = 1;
617 goto order_resolved;
618 }
619 }
620
621 next_field:
622 cur_field++;
623 cur_bytes = 0;
624 }
625
626 ut_ad(cur_bytes == 0);
627
628 ret = 0; /* If we ran out of fields, dtuple was equal to rec
629 up to the common fields */
630 order_resolved:
631 *matched_fields = cur_field;
632 *matched_bytes = cur_bytes;
633
634 return(ret);
635 }
636
637 /** Compare a data tuple to a physical record.
638 @see cmp_dtuple_rec_with_match
639 @param[in] dtuple data tuple
640 @param[in] rec B-tree record
641 @param[in] offsets rec_get_offsets(rec); may be NULL
642 for ROW_FORMAT=REDUNDANT
643 @return the comparison result of dtuple and rec
644 @retval 0 if dtuple is equal to rec
645 @retval negative if dtuple is less than rec
646 @retval positive if dtuple is greater than rec */
647 int
cmp_dtuple_rec(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets)648 cmp_dtuple_rec(
649 const dtuple_t* dtuple,
650 const rec_t* rec,
651 const rec_offs* offsets)
652 {
653 ulint matched_fields = 0;
654
655 ut_ad(rec_offs_validate(rec, NULL, offsets));
656 return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
657 &matched_fields));
658 }
659
660 /**************************************************************//**
661 Checks if a dtuple is a prefix of a record. The last field in dtuple
662 is allowed to be a prefix of the corresponding field in the record.
663 @return TRUE if prefix */
664 ibool
cmp_dtuple_is_prefix_of_rec(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets)665 cmp_dtuple_is_prefix_of_rec(
666 /*========================*/
667 const dtuple_t* dtuple, /*!< in: data tuple */
668 const rec_t* rec, /*!< in: physical record */
669 const rec_offs* offsets)/*!< in: array returned by rec_get_offsets() */
670 {
671 ulint n_fields;
672 ulint matched_fields = 0;
673
674 ut_ad(rec_offs_validate(rec, NULL, offsets));
675 n_fields = dtuple_get_n_fields(dtuple);
676
677 if (n_fields > rec_offs_n_fields(offsets)) {
678 ut_ad(0);
679 return(FALSE);
680 }
681
682 cmp_dtuple_rec_with_match(dtuple, rec, offsets, &matched_fields);
683 return(matched_fields == n_fields);
684 }
685
686 /*************************************************************//**
687 Compare two physical record fields.
688 @retval positive if rec1 field is greater than rec2
689 @retval negative if rec1 field is less than rec2
690 @retval 0 if rec1 field equals to rec2 */
691 static MY_ATTRIBUTE((nonnull, warn_unused_result))
692 int
cmp_rec_rec_simple_field(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,ulint n)693 cmp_rec_rec_simple_field(
694 /*=====================*/
695 const rec_t* rec1, /*!< in: physical record */
696 const rec_t* rec2, /*!< in: physical record */
697 const rec_offs* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
698 const rec_offs* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
699 const dict_index_t* index, /*!< in: data dictionary index */
700 ulint n) /*!< in: field to compare */
701 {
702 const byte* rec1_b_ptr;
703 const byte* rec2_b_ptr;
704 ulint rec1_f_len;
705 ulint rec2_f_len;
706 const dict_col_t* col = dict_index_get_nth_col(index, n);
707
708 ut_ad(!rec_offs_nth_extern(offsets1, n));
709 ut_ad(!rec_offs_nth_extern(offsets2, n));
710
711 rec1_b_ptr = rec_get_nth_field(rec1, offsets1, n, &rec1_f_len);
712 rec2_b_ptr = rec_get_nth_field(rec2, offsets2, n, &rec2_f_len);
713
714 return(cmp_data(col->mtype, col->prtype,
715 rec1_b_ptr, rec1_f_len, rec2_b_ptr, rec2_f_len));
716 }
717
718 /** Compare two physical records that contain the same number of columns,
719 none of which are stored externally.
720 @retval positive if rec1 (including non-ordering columns) is greater than rec2
721 @retval negative if rec1 (including non-ordering columns) is less than rec2
722 @retval 0 if rec1 is a duplicate of rec2 */
723 int
cmp_rec_rec_simple(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,struct TABLE * table)724 cmp_rec_rec_simple(
725 /*===============*/
726 const rec_t* rec1, /*!< in: physical record */
727 const rec_t* rec2, /*!< in: physical record */
728 const rec_offs* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
729 const rec_offs* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
730 const dict_index_t* index, /*!< in: data dictionary index */
731 struct TABLE* table) /*!< in: MySQL table, for reporting
732 duplicate key value if applicable,
733 or NULL */
734 {
735 ulint n;
736 ulint n_uniq = dict_index_get_n_unique(index);
737 bool null_eq = false;
738
739 ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
740 ut_ad(rec_offs_n_fields(offsets2) == rec_offs_n_fields(offsets2));
741
742 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
743
744 for (n = 0; n < n_uniq; n++) {
745 int cmp = cmp_rec_rec_simple_field(
746 rec1, rec2, offsets1, offsets2, index, n);
747
748 if (cmp) {
749 return(cmp);
750 }
751
752 /* If the fields are internally equal, they must both
753 be NULL or non-NULL. */
754 ut_ad(rec_offs_nth_sql_null(offsets1, n)
755 == rec_offs_nth_sql_null(offsets2, n));
756
757 if (rec_offs_nth_sql_null(offsets1, n)) {
758 ut_ad(!(dict_index_get_nth_col(index, n)->prtype
759 & DATA_NOT_NULL));
760 null_eq = true;
761 }
762 }
763
764 /* If we ran out of fields, the ordering columns of rec1 were
765 equal to rec2. Issue a duplicate key error if needed. */
766
767 if (!null_eq && table && dict_index_is_unique(index)) {
768 /* Report erroneous row using new version of table. */
769 innobase_rec_to_mysql(table, rec1, index, offsets1);
770 return(0);
771 }
772
773 /* Else, keep comparing so that we have the full internal
774 order. */
775 for (; n < dict_index_get_n_fields(index); n++) {
776 int cmp = cmp_rec_rec_simple_field(
777 rec1, rec2, offsets1, offsets2, index, n);
778
779 if (cmp) {
780 return(cmp);
781 }
782
783 /* If the fields are internally equal, they must both
784 be NULL or non-NULL. */
785 ut_ad(rec_offs_nth_sql_null(offsets1, n)
786 == rec_offs_nth_sql_null(offsets2, n));
787 }
788
789 /* This should never be reached. Internally, an index must
790 never contain duplicate entries. */
791 ut_ad(0);
792 return(0);
793 }
794
795 /** Compare two B-tree or R-tree records.
796 Only the common first fields are compared, and externally stored field
797 are treated as equal.
798 @param[in] rec1 record (possibly not on an index page)
799 @param[in] rec2 B-tree or R-tree record in an index page
800 @param[in] offsets1 rec_get_offsets(rec1, index)
801 @param[in] offsets2 rec_get_offsets(rec2, index)
802 @param[in] nulls_unequal true if this is for index cardinality
803 statistics estimation with
804 innodb_stats_method=nulls_unequal
805 or innodb_stats_method=nulls_ignored
806 @param[out] matched_fields number of completely matched fields
807 within the first field not completely matched
808 @retval 0 if rec1 is equal to rec2
809 @retval negative if rec1 is less than rec2
810 @retval positive if rec1 is greater than rec2 */
811 int
cmp_rec_rec(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,bool nulls_unequal,ulint * matched_fields)812 cmp_rec_rec(
813 const rec_t* rec1,
814 const rec_t* rec2,
815 const rec_offs* offsets1,
816 const rec_offs* offsets2,
817 const dict_index_t* index,
818 bool nulls_unequal,
819 ulint* matched_fields)
820 {
821 ulint rec1_f_len; /* length of current field in rec */
822 const byte* rec1_b_ptr; /* pointer to the current byte
823 in rec field */
824 ulint rec2_f_len; /* length of current field in rec */
825 const byte* rec2_b_ptr; /* pointer to the current byte
826 in rec field */
827 ulint cur_field = 0; /* current field number */
828 int ret = 0; /* return value */
829
830 ut_ad(rec1 != NULL);
831 ut_ad(rec2 != NULL);
832 ut_ad(index != NULL);
833 ut_ad(rec_offs_validate(rec1, index, offsets1));
834 ut_ad(rec_offs_validate(rec2, index, offsets2));
835 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
836 ut_ad(fil_page_index_page_check(page_align(rec2)));
837 ut_ad(!!dict_index_is_spatial(index)
838 == (fil_page_get_type(page_align(rec2)) == FIL_PAGE_RTREE));
839
840 ulint comp = rec_offs_comp(offsets1);
841 ulint n_fields;
842
843 /* Test if rec is the predefined minimum record */
844 if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
845 & REC_INFO_MIN_REC_FLAG)) {
846 ret = UNIV_UNLIKELY(rec_get_info_bits(rec2, comp)
847 & REC_INFO_MIN_REC_FLAG)
848 ? 0 : -1;
849 goto order_resolved;
850 } else if (UNIV_UNLIKELY
851 (rec_get_info_bits(rec2, comp)
852 & REC_INFO_MIN_REC_FLAG)) {
853 ret = 1;
854 goto order_resolved;
855 }
856
857 /* For non-leaf spatial index records, the
858 dict_index_get_n_unique_in_tree() does include the child page
859 number, because spatial index node pointers only contain
860 the MBR (minimum bounding rectangle) and the child page number.
861
862 For B-tree node pointers, the key alone (secondary index
863 columns and PRIMARY KEY columns) must be unique, and there is
864 no need to compare the child page number. */
865 n_fields = std::min(rec_offs_n_fields(offsets1),
866 rec_offs_n_fields(offsets2));
867 n_fields = std::min<ulint>(n_fields,
868 dict_index_get_n_unique_in_tree(index));
869
870 for (; cur_field < n_fields; cur_field++) {
871 ulint mtype;
872 ulint prtype;
873
874 if (UNIV_UNLIKELY(dict_index_is_ibuf(index))) {
875 /* This is for the insert buffer B-tree. */
876 mtype = DATA_BINARY;
877 prtype = 0;
878 } else {
879 const dict_col_t* col = dict_index_get_nth_col(
880 index, cur_field);
881 mtype = col->mtype;
882 prtype = col->prtype;
883
884 if (UNIV_LIKELY(!dict_index_is_spatial(index))) {
885 } else if (cur_field == 0) {
886 ut_ad(DATA_GEOMETRY_MTYPE(mtype));
887 prtype |= DATA_GIS_MBR;
888 } else if (!page_rec_is_leaf(rec2)) {
889 /* Compare the child page number. */
890 ut_ad(cur_field == 1);
891 mtype = DATA_SYS_CHILD;
892 prtype = 0;
893 }
894 }
895
896 /* We should never encounter an externally stored field.
897 Externally stored fields only exist in clustered index
898 leaf page records. These fields should already differ
899 in the primary key columns already, before DB_TRX_ID,
900 DB_ROLL_PTR, and any externally stored columns. */
901 ut_ad(!rec_offs_nth_extern(offsets1, cur_field));
902 ut_ad(!rec_offs_nth_extern(offsets2, cur_field));
903 ut_ad(!rec_offs_nth_default(offsets1, cur_field));
904 ut_ad(!rec_offs_nth_default(offsets2, cur_field));
905
906 rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
907 cur_field, &rec1_f_len);
908 rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
909 cur_field, &rec2_f_len);
910
911 if (nulls_unequal
912 && rec1_f_len == UNIV_SQL_NULL
913 && rec2_f_len == UNIV_SQL_NULL) {
914 ret = -1;
915 goto order_resolved;
916 }
917
918 ret = cmp_data(mtype, prtype,
919 rec1_b_ptr, rec1_f_len,
920 rec2_b_ptr, rec2_f_len);
921 if (ret) {
922 goto order_resolved;
923 }
924 }
925
926 /* If we ran out of fields, rec1 was equal to rec2 up
927 to the common fields */
928 ut_ad(ret == 0);
929 order_resolved:
930 if (matched_fields) {
931 *matched_fields = cur_field;
932 }
933 return ret;
934 }
935
936 #ifdef UNIV_COMPILE_TEST_FUNCS
937
938 #ifdef HAVE_UT_CHRONO_T
939
940 void
test_cmp_data_data(ulint len)941 test_cmp_data_data(ulint len)
942 {
943 int i;
944 static byte zeros[64];
945
946 if (len > sizeof zeros) {
947 len = sizeof zeros;
948 }
949
950 ut_chrono_t ch(__func__);
951
952 for (i = 1000000; i > 0; i--) {
953 i += cmp_data(DATA_INT, 0, zeros, len, zeros, len);
954 }
955 }
956
957 #endif /* HAVE_UT_CHRONO_T */
958
959 #endif /* UNIV_COMPILE_TEST_FUNCS */
960