1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2020, 2022, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /*******************************************************************//**
21 @file rem/rem0cmp.cc
22 Comparison services for records
23 
24 Created 7/1/1994 Heikki Tuuri
25 ************************************************************************/
26 
27 #include "rem0cmp.h"
28 #include "rem0rec.h"
29 #include "page0page.h"
30 #include "dict0mem.h"
31 #include "handler0alter.h"
32 
33 /*		ALPHABETICAL ORDER
34 		==================
35 
36 The records are put into alphabetical order in the following
37 way: let F be the first field where two records disagree.
38 If there is a character in some position n where the
39 records disagree, the order is determined by comparison of
40 the characters at position n, possibly after
41 collating transformation. If there is no such character,
42 but the corresponding fields have different lengths, then
43 if the data type of the fields is paddable,
44 shorter field is padded with a padding character. If the
45 data type is not paddable, longer field is considered greater.
46 Finally, the SQL null is bigger than any other value.
47 
48 At the present, the comparison functions return 0 in the case,
49 where two records disagree only in the way that one
50 has more fields than the other. */
51 
52 #ifndef DBUG_OFF
53 /** @return whether a data type is compatible with strnncoll() functions */
is_strnncoll_compatible(ulint type)54 static bool is_strnncoll_compatible(ulint type)
55 {
56   switch (type) {
57   case MYSQL_TYPE_BIT:
58   case MYSQL_TYPE_STRING:
59   case MYSQL_TYPE_VAR_STRING:
60   case MYSQL_TYPE_TINY_BLOB:
61   case MYSQL_TYPE_MEDIUM_BLOB:
62   case MYSQL_TYPE_BLOB:
63   case MYSQL_TYPE_LONG_BLOB:
64   case MYSQL_TYPE_VARCHAR:
65     return true;
66   default:
67     return false;
68   }
69 }
70 #endif /* DBUG_OFF */
71 
72 /*************************************************************//**
73 Returns TRUE if two columns are equal for comparison purposes.
74 @return TRUE if the columns are considered equal in comparisons */
75 ibool
cmp_cols_are_equal(const dict_col_t * col1,const dict_col_t * col2,ibool check_charsets)76 cmp_cols_are_equal(
77 /*===============*/
78 	const dict_col_t*	col1,	/*!< in: column 1 */
79 	const dict_col_t*	col2,	/*!< in: column 2 */
80 	ibool			check_charsets)
81 					/*!< in: whether to check charsets */
82 {
83 	if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
84 	    && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
85 
86 		/* Both are non-binary string types: they can be compared if
87 		and only if the charset-collation is the same */
88 
89 		if (check_charsets) {
90 			return(dtype_get_charset_coll(col1->prtype)
91 			       == dtype_get_charset_coll(col2->prtype));
92 		} else {
93 			return(TRUE);
94 		}
95 	}
96 
97 	if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
98 	    && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
99 
100 		/* Both are binary string types: they can be compared */
101 
102 		return(TRUE);
103 	}
104 
105 	if (col1->mtype != col2->mtype) {
106 
107 		return(FALSE);
108 	}
109 
110 	if (col1->mtype == DATA_INT
111 	    && (col1->prtype & DATA_UNSIGNED)
112 	    != (col2->prtype & DATA_UNSIGNED)) {
113 
114 		/* The storage format of an unsigned integer is different
115 		from a signed integer: in a signed integer we OR
116 		0x8000... to the value of positive integers. */
117 
118 		return(FALSE);
119 	}
120 
121 	return(col1->mtype != DATA_INT || col1->len == col2->len);
122 }
123 
124 /** Compare two DATA_DECIMAL (MYSQL_TYPE_DECIMAL) fields.
125 TODO: Remove this function. Everything should use MYSQL_TYPE_NEWDECIMAL.
126 @param[in] a data field
127 @param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
128 @param[in] b data field
129 @param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
130 @return positive, 0, negative, if a is greater, equal, less than b,
131 respectively */
132 static ATTRIBUTE_COLD
133 int
cmp_decimal(const byte * a,ulint a_length,const byte * b,ulint b_length)134 cmp_decimal(const byte*	a, ulint a_length, const byte* b, ulint b_length)
135 {
136 	int	swap_flag;
137 
138 	/* Remove preceding spaces */
139 	for (; a_length && *a == ' '; a++, a_length--) { }
140 	for (; b_length && *b == ' '; b++, b_length--) { }
141 
142 	if (*a == '-') {
143 		swap_flag = -1;
144 
145 		if (*b != '-') {
146 			return(swap_flag);
147 		}
148 
149 		a++; b++;
150 		a_length--;
151 		b_length--;
152 	} else {
153 		swap_flag = 1;
154 
155 		if (*b == '-') {
156 			return(swap_flag);
157 		}
158 	}
159 
160 	while (a_length > 0 && (*a == '+' || *a == '0')) {
161 		a++; a_length--;
162 	}
163 
164 	while (b_length > 0 && (*b == '+' || *b == '0')) {
165 		b++; b_length--;
166 	}
167 
168 	if (a_length != b_length) {
169 		if (a_length < b_length) {
170 			return(-swap_flag);
171 		}
172 
173 		return(swap_flag);
174 	}
175 
176 	while (a_length > 0 && *a == *b) {
177 
178 		a++; b++; a_length--;
179 	}
180 
181 	if (a_length == 0) {
182 		return(0);
183 	}
184 
185 	if (*a <= *b) {
186 		swap_flag = -swap_flag;
187 	}
188 
189 	return(swap_flag);
190 }
191 
192 /** Compare two data fields.
193 @param mtype  main type
194 @param prtype precise type
195 @param data1  data field
196 @param len1   length of data1 in bytes, or UNIV_SQL_NULL
197 @param data2  data field
198 @param len2   length of data2 in bytes, or UNIV_SQL_NULL
199 @return the comparison result of data1 and data2
200 @retval 0 if data1 is equal to data2
201 @retval negative if data1 is less than data2
202 @retval positive if data1 is greater than data2 */
cmp_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)203 static int cmp_data(ulint mtype, ulint prtype, const byte *data1, ulint len1,
204                     const byte *data2, ulint len2)
205 {
206   ut_ad(len1 != UNIV_SQL_DEFAULT);
207   ut_ad(len2 != UNIV_SQL_DEFAULT);
208 
209   if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL)
210   {
211     if (len1 == len2)
212       return 0;
213 
214     /* We define the SQL null to be the smallest possible value of a field. */
215     return len1 == UNIV_SQL_NULL ? -1 : 1;
216   }
217 
218   switch (mtype) {
219   default:
220     ib::fatal() << "Unknown data type number " << mtype;
221   case DATA_DECIMAL:
222     return cmp_decimal(data1, len1, data2, len2);
223   case DATA_DOUBLE:
224     {
225       const double af= mach_double_read(data1), bf= mach_double_read(data2);
226       return af > bf ? 1 : bf > af ? -1 : 0;
227     }
228   case DATA_FLOAT:
229     {
230       const float af= mach_float_read(data1), bf= mach_float_read(data2);
231       return af > bf ? 1 : bf > af ? -1 : 0;
232     }
233   case DATA_FIXBINARY:
234   case DATA_BINARY:
235     if (dtype_get_charset_coll(prtype) != DATA_MYSQL_BINARY_CHARSET_COLL)
236     {
237       if (ulint len= std::min(len1, len2))
238       {
239         if (int cmp= memcmp(data1, data2, len))
240           return cmp;
241         data1+= len;
242         data2+= len;
243         len1-= len;
244         len2-= len;
245       }
246 
247       int cmp= 0;
248       if (len1)
249       {
250         const byte *end= &data1[len1];
251         do
252           cmp= static_cast<int>(*data1++ - byte{0x20});
253         while (cmp == 0 && data1 < end);
254       }
255       else if (len2)
256       {
257         const byte *end= &data2[len2];
258         do
259           cmp= static_cast<int>(byte{0x20} - *data2++);
260         while (cmp == 0 && data2 < end);
261       }
262       return cmp;
263     }
264     /* fall through */
265   case DATA_INT:
266   case DATA_SYS_CHILD:
267   case DATA_SYS:
268     break;
269   case DATA_GEOMETRY:
270     ut_ad(prtype & DATA_BINARY_TYPE);
271     if (prtype & DATA_GIS_MBR)
272     {
273       ut_ad(len1 == DATA_MBR_LEN);
274       ut_ad(len2 == DATA_MBR_LEN);
275       return cmp_geometry_field(data1, data2);
276     }
277     break;
278   case DATA_BLOB:
279     if (prtype & DATA_BINARY_TYPE)
280       break;
281     /* fall through */
282   case DATA_VARMYSQL:
283     DBUG_ASSERT(is_strnncoll_compatible(prtype & DATA_MYSQL_TYPE_MASK));
284     if (CHARSET_INFO *cs= get_charset(dtype_get_charset_coll(prtype),
285                                       MYF(MY_WME)))
286       return cs->coll->strnncollsp(cs, data1, len1, data2, len2);
287   no_collation:
288     ib::fatal() << "Unable to find charset-collation for " << prtype;
289   case DATA_MYSQL:
290     DBUG_ASSERT(is_strnncoll_compatible(prtype & DATA_MYSQL_TYPE_MASK));
291     if (CHARSET_INFO *cs= get_charset(dtype_get_charset_coll(prtype),
292                                       MYF(MY_WME)))
293       return cs->coll->strnncollsp_nchars(cs, data1, len1, data2, len2,
294                                           std::max(len1, len2));
295     goto no_collation;
296   case DATA_VARCHAR:
297   case DATA_CHAR:
298     /* latin1_swedish_ci is treated as a special case in InnoDB.
299     Because it is a fixed-length encoding (mbminlen=mbmaxlen=1),
300     non-NULL CHAR(n) values will always occupy n bytes and we
301     can invoke strnncollsp() instead of strnncollsp_nchars(). */
302     return my_charset_latin1.strnncollsp(data1, len1, data2, len2);
303   }
304 
305   if (ulint len= std::min(len1, len2))
306     if (int cmp= memcmp(data1, data2, len))
307       return cmp;
308 
309   return len1 > len2 ? 1 : len2 > len1 ? -1 : 0;
310 }
311 
312 /** Compare two data fields.
313 @param[in] mtype main type
314 @param[in] prtype precise type
315 @param[in] data1 data field
316 @param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
317 @param[in] data2 data field
318 @param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
319 @return the comparison result of data1 and data2
320 @retval 0 if data1 is equal to data2
321 @retval negative if data1 is less than data2
322 @retval positive if data1 is greater than data2 */
323 int
cmp_data_data(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)324 cmp_data_data(
325 	ulint		mtype,
326 	ulint		prtype,
327 	const byte*	data1,
328 	ulint		len1,
329 	const byte*	data2,
330 	ulint		len2)
331 {
332 	return(cmp_data(mtype, prtype, data1, len1, data2, len2));
333 }
334 
335 /** Compare a data tuple to a physical record.
336 @param[in] dtuple data tuple
337 @param[in] rec B-tree record
338 @param[in] offsets rec_get_offsets(rec)
339 @param[in] n_cmp number of fields to compare
340 @param[in,out] matched_fields number of completely matched fields
341 @return the comparison result of dtuple and rec
342 @retval 0 if dtuple is equal to rec
343 @retval negative if dtuple is less than rec
344 @retval positive if dtuple is greater than rec */
345 int
cmp_dtuple_rec_with_match_low(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets,ulint n_cmp,ulint * matched_fields)346 cmp_dtuple_rec_with_match_low(
347 	const dtuple_t*	dtuple,
348 	const rec_t*	rec,
349 	const rec_offs*	offsets,
350 	ulint		n_cmp,
351 	ulint*		matched_fields)
352 {
353 	ulint		cur_field;	/* current field number */
354 	int		ret;		/* return value */
355 
356 	ut_ad(dtuple_check_typed(dtuple));
357 	ut_ad(rec_offs_validate(rec, NULL, offsets));
358 
359 	cur_field = *matched_fields;
360 
361 	ut_ad(n_cmp > 0);
362 	ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
363 	ut_ad(cur_field <= n_cmp);
364 	ut_ad(cur_field <= rec_offs_n_fields(offsets));
365 
366 	if (cur_field == 0) {
367 		ulint	rec_info = rec_get_info_bits(rec,
368 						     rec_offs_comp(offsets));
369 		ulint	tup_info = dtuple_get_info_bits(dtuple);
370 
371 		if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
372 			ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
373 			goto order_resolved;
374 		} else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
375 			ret = -1;
376 			goto order_resolved;
377 		}
378 	}
379 
380 	/* Match fields in a loop */
381 
382 	for (; cur_field < n_cmp; cur_field++) {
383 		const byte*	rec_b_ptr;
384 		const dfield_t*	dtuple_field
385 			= dtuple_get_nth_field(dtuple, cur_field);
386 		const byte*	dtuple_b_ptr
387 			= static_cast<const byte*>(
388 				dfield_get_data(dtuple_field));
389 		const dtype_t*	type
390 			= dfield_get_type(dtuple_field);
391 		ulint		dtuple_f_len
392 			= dfield_get_len(dtuple_field);
393 		ulint		rec_f_len;
394 
395 		/* We should never compare against an externally
396 		stored field.  Only clustered index records can
397 		contain externally stored fields, and the first fields
398 		(primary key fields) should already differ. */
399 		ut_ad(!rec_offs_nth_extern(offsets, cur_field));
400 		/* We should never compare against instantly added columns.
401 		Columns can only be instantly added to clustered index
402 		leaf page records, and the first fields (primary key fields)
403 		should already differ. */
404 		ut_ad(!rec_offs_nth_default(offsets, cur_field));
405 
406 		rec_b_ptr = rec_get_nth_field(rec, offsets, cur_field,
407 					      &rec_f_len);
408 
409 		ut_ad(!dfield_is_ext(dtuple_field));
410 
411 		ret = cmp_data(type->mtype, type->prtype,
412 			       dtuple_b_ptr, dtuple_f_len,
413 			       rec_b_ptr, rec_f_len);
414 		if (ret) {
415 			goto order_resolved;
416 		}
417 	}
418 
419 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
420 			up to the common fields */
421 order_resolved:
422 	*matched_fields = cur_field;
423 	return(ret);
424 }
425 
426 /** Get the pad character code point for a type.
427 @param[in]	type
428 @return		pad character code point
429 @retval		ULINT_UNDEFINED if no padding is specified */
430 UNIV_INLINE
431 ulint
cmp_get_pad_char(const dtype_t * type)432 cmp_get_pad_char(
433 	const dtype_t*	type)
434 {
435 	switch (type->mtype) {
436 	case DATA_FIXBINARY:
437 	case DATA_BINARY:
438 		if (dtype_get_charset_coll(type->prtype)
439 		    == DATA_MYSQL_BINARY_CHARSET_COLL) {
440 			/* Starting from 5.0.18, do not pad
441 			VARBINARY or BINARY columns. */
442 			return(ULINT_UNDEFINED);
443 		}
444 		/* Fall through */
445 	case DATA_CHAR:
446 	case DATA_VARCHAR:
447 	case DATA_MYSQL:
448 	case DATA_VARMYSQL:
449 		/* Space is the padding character for all char and binary
450 		strings, and starting from 5.0.3, also for TEXT strings. */
451 		return(0x20);
452 	case DATA_GEOMETRY:
453                 /* DATA_GEOMETRY is binary data, not ASCII-based. */
454 	        return(ULINT_UNDEFINED);
455 	case DATA_BLOB:
456 		if (!(type->prtype & DATA_BINARY_TYPE)) {
457 			return(0x20);
458 		}
459 		/* Fall through */
460 	default:
461 		/* No padding specified */
462 		return(ULINT_UNDEFINED);
463 	}
464 }
465 
466 /** Compare a data tuple to a physical record.
467 @param[in]	dtuple		data tuple
468 @param[in]	rec		B-tree or R-tree index record
469 @param[in]	index		index tree
470 @param[in]	offsets		rec_get_offsets(rec)
471 @param[in,out]	matched_fields	number of completely matched fields
472 @param[in,out]	matched_bytes	number of matched bytes in the first
473 field that is not matched
474 @return the comparison result of dtuple and rec
475 @retval 0 if dtuple is equal to rec
476 @retval negative if dtuple is less than rec
477 @retval positive if dtuple is greater than rec */
478 int
cmp_dtuple_rec_with_match_bytes(const dtuple_t * dtuple,const rec_t * rec,const dict_index_t * index,const rec_offs * offsets,ulint * matched_fields,ulint * matched_bytes)479 cmp_dtuple_rec_with_match_bytes(
480 	const dtuple_t*		dtuple,
481 	const rec_t*		rec,
482 	const dict_index_t*	index,
483 	const rec_offs*		offsets,
484 	ulint*			matched_fields,
485 	ulint*			matched_bytes)
486 {
487 	ut_ad(dtuple_check_typed(dtuple));
488 	ut_ad(rec_offs_validate(rec, index, offsets));
489 	ut_ad(!(REC_INFO_MIN_REC_FLAG
490 		& dtuple_get_info_bits(dtuple)));
491 
492 	if (UNIV_UNLIKELY(REC_INFO_MIN_REC_FLAG
493 			  & rec_get_info_bits(rec, rec_offs_comp(offsets)))) {
494 		ut_ad(page_rec_is_first(rec, page_align(rec)));
495 		ut_ad(!page_has_prev(page_align(rec)));
496 		ut_ad(rec_is_metadata(rec, *index));
497 		return 1;
498 	}
499 
500 	ulint cur_field = *matched_fields;
501 	ulint cur_bytes = *matched_bytes;
502 	ulint n_cmp = dtuple_get_n_fields_cmp(dtuple);
503 	int ret;
504 
505 	ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
506 	ut_ad(cur_field <= n_cmp);
507 	ut_ad(cur_field + (cur_bytes > 0) <= rec_offs_n_fields(offsets));
508 
509 	/* Match fields in a loop; stop if we run out of fields in dtuple
510 	or find an externally stored field */
511 
512 	while (cur_field < n_cmp) {
513 		const dfield_t*	dfield		= dtuple_get_nth_field(
514 			dtuple, cur_field);
515 		const dtype_t*	type		= dfield_get_type(dfield);
516 		ulint		dtuple_f_len	= dfield_get_len(dfield);
517 		const byte*	dtuple_b_ptr;
518 		const byte*	rec_b_ptr;
519 		ulint		rec_f_len;
520 
521 		dtuple_b_ptr = static_cast<const byte*>(
522 			dfield_get_data(dfield));
523 
524 		ut_ad(!rec_offs_nth_default(offsets, cur_field));
525 		rec_b_ptr = rec_get_nth_field(rec, offsets,
526 					      cur_field, &rec_f_len);
527 		ut_ad(!rec_offs_nth_extern(offsets, cur_field));
528 
529 		/* If we have matched yet 0 bytes, it may be that one or
530 		both the fields are SQL null, or the record or dtuple may be
531 		the predefined minimum record. */
532 		if (cur_bytes == 0) {
533 			if (dtuple_f_len == UNIV_SQL_NULL) {
534 				if (rec_f_len == UNIV_SQL_NULL) {
535 
536 					goto next_field;
537 				}
538 
539 				ret = -1;
540 				goto order_resolved;
541 			} else if (rec_f_len == UNIV_SQL_NULL) {
542 				/* We define the SQL null to be the
543 				smallest possible value of a field
544 				in the alphabetical order */
545 
546 				ret = 1;
547 				goto order_resolved;
548 			}
549 		}
550 
551 		switch (type->mtype) {
552 		case DATA_FIXBINARY:
553 		case DATA_BINARY:
554 		case DATA_INT:
555 		case DATA_SYS_CHILD:
556 		case DATA_SYS:
557 			break;
558 		case DATA_BLOB:
559 			if (type->prtype & DATA_BINARY_TYPE) {
560 				break;
561 			}
562 			/* fall through */
563 		default:
564 			ret = cmp_data(type->mtype, type->prtype,
565 				       dtuple_b_ptr, dtuple_f_len,
566 				       rec_b_ptr, rec_f_len);
567 
568 			if (!ret) {
569 				goto next_field;
570 			}
571 
572 			cur_bytes = 0;
573 			goto order_resolved;
574 		}
575 
576 		/* Set the pointers at the current byte */
577 
578 		rec_b_ptr += cur_bytes;
579 		dtuple_b_ptr += cur_bytes;
580 		/* Compare then the fields */
581 
582 		for (const ulint pad = cmp_get_pad_char(type);;
583 		     cur_bytes++) {
584 			ulint	rec_byte = pad;
585 			ulint	dtuple_byte = pad;
586 
587 			if (rec_f_len <= cur_bytes) {
588 				if (dtuple_f_len <= cur_bytes) {
589 
590 					goto next_field;
591 				}
592 
593 				if (rec_byte == ULINT_UNDEFINED) {
594 					ret = 1;
595 
596 					goto order_resolved;
597 				}
598 			} else {
599 				rec_byte = *rec_b_ptr++;
600 			}
601 
602 			if (dtuple_f_len <= cur_bytes) {
603 				if (dtuple_byte == ULINT_UNDEFINED) {
604 					ret = -1;
605 
606 					goto order_resolved;
607 				}
608 			} else {
609 				dtuple_byte = *dtuple_b_ptr++;
610 			}
611 
612 			if (dtuple_byte < rec_byte) {
613 				ret = -1;
614 				goto order_resolved;
615 			} else if (dtuple_byte > rec_byte) {
616 				ret = 1;
617 				goto order_resolved;
618 			}
619 		}
620 
621 next_field:
622 		cur_field++;
623 		cur_bytes = 0;
624 	}
625 
626 	ut_ad(cur_bytes == 0);
627 
628 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
629 			up to the common fields */
630 order_resolved:
631 	*matched_fields = cur_field;
632 	*matched_bytes = cur_bytes;
633 
634 	return(ret);
635 }
636 
637 /** Compare a data tuple to a physical record.
638 @see cmp_dtuple_rec_with_match
639 @param[in] dtuple data tuple
640 @param[in] rec B-tree record
641 @param[in] offsets rec_get_offsets(rec); may be NULL
642 for ROW_FORMAT=REDUNDANT
643 @return the comparison result of dtuple and rec
644 @retval 0 if dtuple is equal to rec
645 @retval negative if dtuple is less than rec
646 @retval positive if dtuple is greater than rec */
647 int
cmp_dtuple_rec(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets)648 cmp_dtuple_rec(
649 	const dtuple_t*	dtuple,
650 	const rec_t*	rec,
651 	const rec_offs*	offsets)
652 {
653 	ulint	matched_fields	= 0;
654 
655 	ut_ad(rec_offs_validate(rec, NULL, offsets));
656 	return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
657 					 &matched_fields));
658 }
659 
660 /**************************************************************//**
661 Checks if a dtuple is a prefix of a record. The last field in dtuple
662 is allowed to be a prefix of the corresponding field in the record.
663 @return TRUE if prefix */
664 ibool
cmp_dtuple_is_prefix_of_rec(const dtuple_t * dtuple,const rec_t * rec,const rec_offs * offsets)665 cmp_dtuple_is_prefix_of_rec(
666 /*========================*/
667 	const dtuple_t*	dtuple,	/*!< in: data tuple */
668 	const rec_t*	rec,	/*!< in: physical record */
669 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
670 {
671 	ulint	n_fields;
672 	ulint	matched_fields	= 0;
673 
674 	ut_ad(rec_offs_validate(rec, NULL, offsets));
675 	n_fields = dtuple_get_n_fields(dtuple);
676 
677 	if (n_fields > rec_offs_n_fields(offsets)) {
678 		ut_ad(0);
679 		return(FALSE);
680 	}
681 
682 	cmp_dtuple_rec_with_match(dtuple, rec, offsets, &matched_fields);
683 	return(matched_fields == n_fields);
684 }
685 
686 /*************************************************************//**
687 Compare two physical record fields.
688 @retval positive if rec1 field is greater than rec2
689 @retval negative if rec1 field is less than rec2
690 @retval 0 if rec1 field equals to rec2 */
691 static MY_ATTRIBUTE((nonnull, warn_unused_result))
692 int
cmp_rec_rec_simple_field(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,ulint n)693 cmp_rec_rec_simple_field(
694 /*=====================*/
695 	const rec_t*		rec1,	/*!< in: physical record */
696 	const rec_t*		rec2,	/*!< in: physical record */
697 	const rec_offs*		offsets1,/*!< in: rec_get_offsets(rec1, ...) */
698 	const rec_offs*		offsets2,/*!< in: rec_get_offsets(rec2, ...) */
699 	const dict_index_t*	index,	/*!< in: data dictionary index */
700 	ulint			n)	/*!< in: field to compare */
701 {
702 	const byte*	rec1_b_ptr;
703 	const byte*	rec2_b_ptr;
704 	ulint		rec1_f_len;
705 	ulint		rec2_f_len;
706 	const dict_col_t*	col	= dict_index_get_nth_col(index, n);
707 
708 	ut_ad(!rec_offs_nth_extern(offsets1, n));
709 	ut_ad(!rec_offs_nth_extern(offsets2, n));
710 
711 	rec1_b_ptr = rec_get_nth_field(rec1, offsets1, n, &rec1_f_len);
712 	rec2_b_ptr = rec_get_nth_field(rec2, offsets2, n, &rec2_f_len);
713 
714 	return(cmp_data(col->mtype, col->prtype,
715 			rec1_b_ptr, rec1_f_len, rec2_b_ptr, rec2_f_len));
716 }
717 
718 /** Compare two physical records that contain the same number of columns,
719 none of which are stored externally.
720 @retval positive if rec1 (including non-ordering columns) is greater than rec2
721 @retval negative if rec1 (including non-ordering columns) is less than rec2
722 @retval 0 if rec1 is a duplicate of rec2 */
723 int
cmp_rec_rec_simple(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,struct TABLE * table)724 cmp_rec_rec_simple(
725 /*===============*/
726 	const rec_t*		rec1,	/*!< in: physical record */
727 	const rec_t*		rec2,	/*!< in: physical record */
728 	const rec_offs*		offsets1,/*!< in: rec_get_offsets(rec1, ...) */
729 	const rec_offs*		offsets2,/*!< in: rec_get_offsets(rec2, ...) */
730 	const dict_index_t*	index,	/*!< in: data dictionary index */
731 	struct TABLE*		table)	/*!< in: MySQL table, for reporting
732 					duplicate key value if applicable,
733 					or NULL */
734 {
735 	ulint		n;
736 	ulint		n_uniq	= dict_index_get_n_unique(index);
737 	bool		null_eq	= false;
738 
739 	ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
740 	ut_ad(rec_offs_n_fields(offsets2) == rec_offs_n_fields(offsets2));
741 
742 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
743 
744 	for (n = 0; n < n_uniq; n++) {
745 		int cmp = cmp_rec_rec_simple_field(
746 			rec1, rec2, offsets1, offsets2, index, n);
747 
748 		if (cmp) {
749 			return(cmp);
750 		}
751 
752 		/* If the fields are internally equal, they must both
753 		be NULL or non-NULL. */
754 		ut_ad(rec_offs_nth_sql_null(offsets1, n)
755 		      == rec_offs_nth_sql_null(offsets2, n));
756 
757 		if (rec_offs_nth_sql_null(offsets1, n)) {
758 			ut_ad(!(dict_index_get_nth_col(index, n)->prtype
759 				& DATA_NOT_NULL));
760 			null_eq = true;
761 		}
762 	}
763 
764 	/* If we ran out of fields, the ordering columns of rec1 were
765 	equal to rec2. Issue a duplicate key error if needed. */
766 
767 	if (!null_eq && table && dict_index_is_unique(index)) {
768 		/* Report erroneous row using new version of table. */
769 		innobase_rec_to_mysql(table, rec1, index, offsets1);
770 		return(0);
771 	}
772 
773 	/* Else, keep comparing so that we have the full internal
774 	order. */
775 	for (; n < dict_index_get_n_fields(index); n++) {
776 		int cmp = cmp_rec_rec_simple_field(
777 			rec1, rec2, offsets1, offsets2, index, n);
778 
779 		if (cmp) {
780 			return(cmp);
781 		}
782 
783 		/* If the fields are internally equal, they must both
784 		be NULL or non-NULL. */
785 		ut_ad(rec_offs_nth_sql_null(offsets1, n)
786 		      == rec_offs_nth_sql_null(offsets2, n));
787 	}
788 
789 	/* This should never be reached. Internally, an index must
790 	never contain duplicate entries. */
791 	ut_ad(0);
792 	return(0);
793 }
794 
795 /** Compare two B-tree or R-tree records.
796 Only the common first fields are compared, and externally stored field
797 are treated as equal.
798 @param[in]	rec1		record (possibly not on an index page)
799 @param[in]	rec2		B-tree or R-tree record in an index page
800 @param[in]	offsets1	rec_get_offsets(rec1, index)
801 @param[in]	offsets2	rec_get_offsets(rec2, index)
802 @param[in]	nulls_unequal	true if this is for index cardinality
803 				statistics estimation with
804 				innodb_stats_method=nulls_unequal
805 				or innodb_stats_method=nulls_ignored
806 @param[out]	matched_fields	number of completely matched fields
807 				within the first field not completely matched
808 @retval 0 if rec1 is equal to rec2
809 @retval negative if rec1 is less than rec2
810 @retval positive if rec1 is greater than rec2 */
811 int
cmp_rec_rec(const rec_t * rec1,const rec_t * rec2,const rec_offs * offsets1,const rec_offs * offsets2,const dict_index_t * index,bool nulls_unequal,ulint * matched_fields)812 cmp_rec_rec(
813 	const rec_t*		rec1,
814 	const rec_t*		rec2,
815 	const rec_offs*		offsets1,
816 	const rec_offs*		offsets2,
817 	const dict_index_t*	index,
818 	bool			nulls_unequal,
819 	ulint*			matched_fields)
820 {
821 	ulint		rec1_f_len;	/* length of current field in rec */
822 	const byte*	rec1_b_ptr;	/* pointer to the current byte
823 					in rec field */
824 	ulint		rec2_f_len;	/* length of current field in rec */
825 	const byte*	rec2_b_ptr;	/* pointer to the current byte
826 					in rec field */
827 	ulint		cur_field = 0;	/* current field number */
828 	int		ret = 0;	/* return value */
829 
830 	ut_ad(rec1 != NULL);
831 	ut_ad(rec2 != NULL);
832 	ut_ad(index != NULL);
833 	ut_ad(rec_offs_validate(rec1, index, offsets1));
834 	ut_ad(rec_offs_validate(rec2, index, offsets2));
835 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
836 	ut_ad(fil_page_index_page_check(page_align(rec2)));
837 	ut_ad(!!dict_index_is_spatial(index)
838 	      == (fil_page_get_type(page_align(rec2)) == FIL_PAGE_RTREE));
839 
840 	ulint comp = rec_offs_comp(offsets1);
841 	ulint n_fields;
842 
843 	/* Test if rec is the predefined minimum record */
844 	if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
845 			  & REC_INFO_MIN_REC_FLAG)) {
846 		ret = UNIV_UNLIKELY(rec_get_info_bits(rec2, comp)
847 				    & REC_INFO_MIN_REC_FLAG)
848 			? 0 : -1;
849 		goto order_resolved;
850 	} else if (UNIV_UNLIKELY
851 		   (rec_get_info_bits(rec2, comp)
852 		    & REC_INFO_MIN_REC_FLAG)) {
853 		ret = 1;
854 		goto order_resolved;
855 	}
856 
857 	/* For non-leaf spatial index records, the
858 	dict_index_get_n_unique_in_tree() does include the child page
859 	number, because spatial index node pointers only contain
860 	the MBR (minimum bounding rectangle) and the child page number.
861 
862 	For B-tree node pointers, the key alone (secondary index
863 	columns and PRIMARY KEY columns) must be unique, and there is
864 	no need to compare the child page number. */
865 	n_fields = std::min(rec_offs_n_fields(offsets1),
866 			    rec_offs_n_fields(offsets2));
867 	n_fields = std::min<ulint>(n_fields,
868 				   dict_index_get_n_unique_in_tree(index));
869 
870 	for (; cur_field < n_fields; cur_field++) {
871 		ulint	mtype;
872 		ulint	prtype;
873 
874 		if (UNIV_UNLIKELY(dict_index_is_ibuf(index))) {
875 			/* This is for the insert buffer B-tree. */
876 			mtype = DATA_BINARY;
877 			prtype = 0;
878 		} else {
879 			const dict_col_t* col = dict_index_get_nth_col(
880 				index, cur_field);
881 			mtype = col->mtype;
882 			prtype = col->prtype;
883 
884 			if (UNIV_LIKELY(!dict_index_is_spatial(index))) {
885 			} else if (cur_field == 0) {
886 				ut_ad(DATA_GEOMETRY_MTYPE(mtype));
887 				prtype |= DATA_GIS_MBR;
888 			} else if (!page_rec_is_leaf(rec2)) {
889 				/* Compare the child page number. */
890 				ut_ad(cur_field == 1);
891 				mtype = DATA_SYS_CHILD;
892 				prtype = 0;
893 			}
894 		}
895 
896 		/* We should never encounter an externally stored field.
897 		Externally stored fields only exist in clustered index
898 		leaf page records. These fields should already differ
899 		in the primary key columns already, before DB_TRX_ID,
900 		DB_ROLL_PTR, and any externally stored columns. */
901 		ut_ad(!rec_offs_nth_extern(offsets1, cur_field));
902 		ut_ad(!rec_offs_nth_extern(offsets2, cur_field));
903 		ut_ad(!rec_offs_nth_default(offsets1, cur_field));
904 		ut_ad(!rec_offs_nth_default(offsets2, cur_field));
905 
906 		rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
907 					       cur_field, &rec1_f_len);
908 		rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
909 					       cur_field, &rec2_f_len);
910 
911 		if (nulls_unequal
912 		    && rec1_f_len == UNIV_SQL_NULL
913 		    && rec2_f_len == UNIV_SQL_NULL) {
914 			ret = -1;
915 			goto order_resolved;
916 		}
917 
918 		ret = cmp_data(mtype, prtype,
919 			       rec1_b_ptr, rec1_f_len,
920 			       rec2_b_ptr, rec2_f_len);
921 		if (ret) {
922 			goto order_resolved;
923 		}
924 	}
925 
926 	/* If we ran out of fields, rec1 was equal to rec2 up
927 	to the common fields */
928 	ut_ad(ret == 0);
929 order_resolved:
930 	if (matched_fields) {
931 		*matched_fields = cur_field;
932 	}
933 	return ret;
934 }
935 
936 #ifdef UNIV_COMPILE_TEST_FUNCS
937 
938 #ifdef HAVE_UT_CHRONO_T
939 
940 void
test_cmp_data_data(ulint len)941 test_cmp_data_data(ulint len)
942 {
943 	int		i;
944 	static byte	zeros[64];
945 
946 	if (len > sizeof zeros) {
947 		len = sizeof zeros;
948 	}
949 
950 	ut_chrono_t	ch(__func__);
951 
952 	for (i = 1000000; i > 0; i--) {
953 		i += cmp_data(DATA_INT, 0, zeros, len, zeros, len);
954 	}
955 }
956 
957 #endif /* HAVE_UT_CHRONO_T */
958 
959 #endif /* UNIV_COMPILE_TEST_FUNCS */
960