1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /*******************************************************************//**
28 @file rem/rem0cmp.cc
29 Comparison services for records
30 
31 Created 7/1/1994 Heikki Tuuri
32 ************************************************************************/
33 
34 #include "rem0cmp.h"
35 
36 #ifdef UNIV_NONINL
37 #include "rem0cmp.ic"
38 #endif
39 
40 #include "ha_prototypes.h"
41 #include "handler0alter.h"
42 #include "srv0srv.h"
43 
44 /*		ALPHABETICAL ORDER
45 		==================
46 
47 The records are put into alphabetical order in the following
48 way: let F be the first field where two records disagree.
49 If there is a character in some position n where the
50 records disagree, the order is determined by comparison of
51 the characters at position n, possibly after
52 collating transformation. If there is no such character,
53 but the corresponding fields have different lengths, then
54 if the data type of the fields is paddable,
55 shorter field is padded with a padding character. If the
56 data type is not paddable, longer field is considered greater.
57 Finally, the SQL null is bigger than any other value.
58 
59 At the present, the comparison functions return 0 in the case,
60 where two records disagree only in the way that one
61 has more fields than the other. */
62 
63 #ifdef UNIV_DEBUG
64 /*************************************************************//**
65 Used in debug checking of cmp_dtuple_... .
66 This function is used to compare a data tuple to a physical record. If
67 dtuple has n fields then rec must have either m >= n fields, or it must
68 differ from dtuple in some of the m fields rec has.
69 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
70 respectively, when only the common first fields are compared */
71 static
72 int
73 cmp_debug_dtuple_rec_with_match(
74 /*============================*/
75 	const dtuple_t*	dtuple,	/*!< in: data tuple */
76 	const rec_t*	rec,	/*!< in: physical record which differs from
77 				dtuple in some of the common fields, or which
78 				has an equal number or more fields than
79 				dtuple */
80 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
81 	ulint		n_cmp,	/*!< in: number of fields to compare */
82 	ulint*		matched_fields)/*!< in/out: number of already
83 				completely  matched fields; when function
84 				returns, contains the value for current
85 				comparison */
86 	MY_ATTRIBUTE((nonnull, warn_unused_result));
87 #endif /* UNIV_DEBUG */
88 /*************************************************************//**
89 This function is used to compare two data fields for which the data type
90 is such that we must use MySQL code to compare them. The prototype here
91 must be a copy of the one in ha_innobase.cc!
92 @return	1, 0, -1, if a is greater, equal, less than b, respectively */
93 extern
94 int
95 innobase_mysql_cmp(
96 /*===============*/
97 	int		mysql_type,	/*!< in: MySQL type */
98 	uint		charset_number,	/*!< in: number of the charset */
99 	const unsigned char* a,		/*!< in: data field */
100 	unsigned int	a_length,	/*!< in: data field length,
101 					not UNIV_SQL_NULL */
102 	const unsigned char* b,		/*!< in: data field */
103 	unsigned int	b_length);	/*!< in: data field length,
104 					not UNIV_SQL_NULL */
105 /*************************************************************//**
106 This function is used to compare two data fields for which the data type
107 is such that we must use MySQL code to compare them. The prototype here
108 must be a copy of the one in ha_innobase.cc!
109 @return	1, 0, -1, if a is greater, equal, less than b, respectively */
110 extern
111 int
112 innobase_mysql_cmp_prefix(
113 /*======================*/
114 	int		mysql_type,	/*!< in: MySQL type */
115 	uint		charset_number,	/*!< in: number of the charset */
116 	const unsigned char* a,		/*!< in: data field */
117 	unsigned int	a_length,	/*!< in: data field length,
118 					not UNIV_SQL_NULL */
119 	const unsigned char* b,		/*!< in: data field */
120 	unsigned int	b_length);	/*!< in: data field length,
121 					not UNIV_SQL_NULL */
122 /*********************************************************************//**
123 Transforms the character code so that it is ordered appropriately for the
124 language. This is only used for the latin1 char set. MySQL does the
125 comparisons for other char sets.
126 @return	collation order position */
127 UNIV_INLINE
128 ulint
cmp_collate(ulint code)129 cmp_collate(
130 /*========*/
131 	ulint	code)	/*!< in: code of a character stored in database record */
132 {
133 	return((ulint) srv_latin1_ordering[code]);
134 }
135 
136 /*************************************************************//**
137 Returns TRUE if two columns are equal for comparison purposes.
138 @return	TRUE if the columns are considered equal in comparisons */
139 UNIV_INTERN
140 ibool
cmp_cols_are_equal(const dict_col_t * col1,const dict_col_t * col2,ibool check_charsets)141 cmp_cols_are_equal(
142 /*===============*/
143 	const dict_col_t*	col1,	/*!< in: column 1 */
144 	const dict_col_t*	col2,	/*!< in: column 2 */
145 	ibool			check_charsets)
146 					/*!< in: whether to check charsets */
147 {
148 	if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
149 	    && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
150 
151 		/* Both are non-binary string types: they can be compared if
152 		and only if the charset-collation is the same */
153 
154 		if (check_charsets) {
155 			return(dtype_get_charset_coll(col1->prtype)
156 			       == dtype_get_charset_coll(col2->prtype));
157 		} else {
158 			return(TRUE);
159 		}
160 	}
161 
162 	if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
163 	    && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
164 
165 		/* Both are binary string types: they can be compared */
166 
167 		return(TRUE);
168 	}
169 
170 	if (col1->mtype != col2->mtype) {
171 
172 		return(FALSE);
173 	}
174 
175 	if (col1->mtype == DATA_INT
176 	    && (col1->prtype & DATA_UNSIGNED)
177 	    != (col2->prtype & DATA_UNSIGNED)) {
178 
179 		/* The storage format of an unsigned integer is different
180 		from a signed integer: in a signed integer we OR
181 		0x8000... to the value of positive integers. */
182 
183 		return(FALSE);
184 	}
185 
186 	return(col1->mtype != DATA_INT || col1->len == col2->len);
187 }
188 
189 /*************************************************************//**
190 Innobase uses this function to compare two data fields for which the data type
191 is such that we must compare whole fields or call MySQL to do the comparison
192 @return	1, 0, -1, if a is greater, equal, less than b, respectively */
193 static
194 int
cmp_whole_field(ulint mtype,ulint prtype,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)195 cmp_whole_field(
196 /*============*/
197 	ulint		mtype,		/*!< in: main type */
198 	ulint		prtype,		/*!< in: precise type */
199 	const byte*	a,		/*!< in: data field */
200 	unsigned int	a_length,	/*!< in: data field length,
201 					not UNIV_SQL_NULL */
202 	const byte*	b,		/*!< in: data field */
203 	unsigned int	b_length)	/*!< in: data field length,
204 					not UNIV_SQL_NULL */
205 {
206 	float		f_1;
207 	float		f_2;
208 	double		d_1;
209 	double		d_2;
210 	int		swap_flag	= 1;
211 
212 	switch (mtype) {
213 
214 	case DATA_DECIMAL:
215 		/* Remove preceding spaces */
216 		for (; a_length && *a == ' '; a++, a_length--) { }
217 		for (; b_length && *b == ' '; b++, b_length--) { }
218 
219 		if (*a == '-') {
220 			if (*b != '-') {
221 				return(-1);
222 			}
223 
224 			a++; b++;
225 			a_length--;
226 			b_length--;
227 
228 			swap_flag = -1;
229 
230 		} else if (*b == '-') {
231 
232 			return(1);
233 		}
234 
235 		while (a_length > 0 && (*a == '+' || *a == '0')) {
236 			a++; a_length--;
237 		}
238 
239 		while (b_length > 0 && (*b == '+' || *b == '0')) {
240 			b++; b_length--;
241 		}
242 
243 		if (a_length != b_length) {
244 			if (a_length < b_length) {
245 				return(-swap_flag);
246 			}
247 
248 			return(swap_flag);
249 		}
250 
251 		while (a_length > 0 && *a == *b) {
252 
253 			a++; b++; a_length--;
254 		}
255 
256 		if (a_length == 0) {
257 
258 			return(0);
259 		}
260 
261 		if (*a > *b) {
262 			return(swap_flag);
263 		}
264 
265 		return(-swap_flag);
266 	case DATA_DOUBLE:
267 		d_1 = mach_double_read(a);
268 		d_2 = mach_double_read(b);
269 
270 		if (d_1 > d_2) {
271 			return(1);
272 		} else if (d_2 > d_1) {
273 			return(-1);
274 		}
275 
276 		return(0);
277 
278 	case DATA_FLOAT:
279 		f_1 = mach_float_read(a);
280 		f_2 = mach_float_read(b);
281 
282 		if (f_1 > f_2) {
283 			return(1);
284 		} else if (f_2 > f_1) {
285 			return(-1);
286 		}
287 
288 		return(0);
289 	case DATA_BLOB:
290 		if (prtype & DATA_BINARY_TYPE) {
291 
292 			ut_print_timestamp(stderr);
293 			fprintf(stderr,
294 				"  InnoDB: Error: comparing a binary BLOB"
295 				" with a character set sensitive\n"
296 				"InnoDB: comparison!\n");
297 		}
298 		/* fall through */
299 	case DATA_VARMYSQL:
300 	case DATA_MYSQL:
301 		return(innobase_mysql_cmp(
302 			       (int)(prtype & DATA_MYSQL_TYPE_MASK),
303 			       (uint) dtype_get_charset_coll(prtype),
304 			       a, a_length, b, b_length));
305 	default:
306 		fprintf(stderr,
307 			"InnoDB: unknown type number %lu\n",
308 			(ulong) mtype);
309 		ut_error;
310 	}
311 
312 	return(0);
313 }
314 
315 /*****************************************************************
316 This function is used to compare two dfields where at least the first
317 has its data type field set. */
318 UNIV_INTERN
319 int
cmp_dfield_dfield_like_prefix(dfield_t * dfield1,dfield_t * dfield2)320 cmp_dfield_dfield_like_prefix(
321 /*==========================*/
322 				/* out: 1, 0, -1, if dfield1 is greater, equal,
323 				less than dfield2, respectively */
324 	dfield_t*	dfield1,/* in: data field; must have type field set */
325 	dfield_t*	dfield2)/* in: data field */
326 {
327 	const dtype_t*	type;
328 	int		ret;
329 
330 	ut_ad(dfield_check_typed(dfield1));
331 
332 	type = dfield_get_type(dfield1);
333 
334 	if (type->mtype >= DATA_FLOAT) {
335 		ret = innobase_mysql_cmp_prefix(
336 			static_cast<int>(type->prtype & DATA_MYSQL_TYPE_MASK),
337 			static_cast<uint>(dtype_get_charset_coll(type->prtype)),
338 			static_cast<byte*>(dfield_get_data(dfield1)),
339 			static_cast<uint>(dfield_get_len(dfield1)),
340 			static_cast<byte*>(dfield_get_data(dfield2)),
341 			static_cast<uint>(dfield_get_len(dfield2)));
342 	} else {
343 		ret = (cmp_data_data_like_prefix(
344 			static_cast<byte*>(dfield_get_data(dfield1)),
345 			dfield_get_len(dfield1),
346 			static_cast<byte*>(dfield_get_data(dfield2)),
347 			dfield_get_len(dfield2)));
348 	}
349 
350 	return(ret);
351 }
352 
353 /*************************************************************//**
354 This function is used to compare two data fields for which we know the
355 data type.
356 @return	1, 0, -1, if data1 is greater, equal, less than data2, respectively */
357 UNIV_INTERN
358 int
cmp_data_data_slow(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)359 cmp_data_data_slow(
360 /*===============*/
361 	ulint		mtype,	/*!< in: main type */
362 	ulint		prtype,	/*!< in: precise type */
363 	const byte*	data1,	/*!< in: data field (== a pointer to a memory
364 				buffer) */
365 	ulint		len1,	/*!< in: data field length or UNIV_SQL_NULL */
366 	const byte*	data2,	/*!< in: data field (== a pointer to a memory
367 				buffer) */
368 	ulint		len2)	/*!< in: data field length or UNIV_SQL_NULL */
369 {
370 	ulint	data1_byte;
371 	ulint	data2_byte;
372 	ulint	cur_bytes;
373 
374 	if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL) {
375 
376 		if (len1 == len2) {
377 
378 			return(0);
379 		}
380 
381 		if (len1 == UNIV_SQL_NULL) {
382 			/* We define the SQL null to be the smallest possible
383 			value of a field in the alphabetical order */
384 
385 			return(-1);
386 		}
387 
388 		return(1);
389 	}
390 
391 	if (mtype >= DATA_FLOAT
392 	    || (mtype == DATA_BLOB
393 		&& 0 == (prtype & DATA_BINARY_TYPE)
394 		&& dtype_get_charset_coll(prtype)
395 		!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
396 
397 		return(cmp_whole_field(mtype, prtype,
398 				       data1, (unsigned) len1,
399 				       data2, (unsigned) len2));
400 	}
401 
402 	/* Compare then the fields */
403 
404 	cur_bytes = 0;
405 
406 	for (;;) {
407 		if (len1 <= cur_bytes) {
408 			if (len2 <= cur_bytes) {
409 
410 				return(0);
411 			}
412 
413 			data1_byte = dtype_get_pad_char(mtype, prtype);
414 
415 			if (data1_byte == ULINT_UNDEFINED) {
416 
417 				return(-1);
418 			}
419 		} else {
420 			data1_byte = *data1;
421 		}
422 
423 		if (len2 <= cur_bytes) {
424 			data2_byte = dtype_get_pad_char(mtype, prtype);
425 
426 			if (data2_byte == ULINT_UNDEFINED) {
427 
428 				return(1);
429 			}
430 		} else {
431 			data2_byte = *data2;
432 		}
433 
434 		if (data1_byte == data2_byte) {
435 			/* If the bytes are equal, they will remain such even
436 			after the collation transformation below */
437 
438 			goto next_byte;
439 		}
440 
441 		if (mtype <= DATA_CHAR
442 		    || (mtype == DATA_BLOB
443 			&& 0 == (prtype & DATA_BINARY_TYPE))) {
444 
445 			data1_byte = cmp_collate(data1_byte);
446 			data2_byte = cmp_collate(data2_byte);
447 		}
448 
449 		if (data1_byte > data2_byte) {
450 
451 			return(1);
452 		} else if (data1_byte < data2_byte) {
453 
454 			return(-1);
455 		}
456 next_byte:
457 		/* Next byte */
458 		cur_bytes++;
459 		data1++;
460 		data2++;
461 	}
462 
463 	return(0);		/* Not reached */
464 }
465 
466 /*****************************************************************
467 This function is used to compare two data fields for which we know the
468 data type to be VARCHAR */
469 
470 int
cmp_data_data_slow_varchar(const byte * lhs,ulint lhs_len,const byte * rhs,ulint rhs_len)471 cmp_data_data_slow_varchar(
472 /*=======================*/
473 				/* out: 1, 0, -1, if lhs is greater, equal,
474 				less than rhs, respectively */
475 	const byte*	lhs,	/* in: data field (== a pointer to a memory
476 				buffer) */
477 	ulint		lhs_len,/* in: data field length or UNIV_SQL_NULL */
478 	const byte*	rhs,	/* in: data field (== a pointer to a memory
479 				buffer) */
480 	ulint		rhs_len)/* in: data field length or UNIV_SQL_NULL */
481 {
482 	ulint	i;
483 
484 	ut_a(rhs_len != UNIV_SQL_NULL);
485 
486 	if (lhs_len == UNIV_SQL_NULL) {
487 
488 		/* We define the SQL null to be the smallest possible
489 		value of a field in the alphabetical order */
490 
491 		return(-1);
492 	}
493 
494 	/* Compare the values.*/
495 
496 	for (i = 0; i < lhs_len && i < rhs_len; ++i, ++rhs, ++lhs) {
497 		ulint	lhs_byte = *lhs;
498 		ulint	rhs_byte = *rhs;
499 
500 		if (lhs_byte != rhs_byte) {
501 			/* If the bytes are equal, they will remain such even
502 			after the collation transformation below */
503 
504 			lhs_byte = cmp_collate(lhs_byte);
505 			rhs_byte = cmp_collate(rhs_byte);
506 
507 			if (lhs_byte > rhs_byte) {
508 
509 				return(1);
510 			} else if (lhs_byte < rhs_byte) {
511 
512 				return(-1);
513 			}
514 		}
515 	}
516 
517 	return((i == lhs_len && i == rhs_len) ? 0 :
518 		static_cast<int>(rhs_len - lhs_len));
519 }
520 
521 /*****************************************************************
522 This function is used to compare two data fields for which we know the
523 data type. The comparison is done for the LIKE operator.*/
524 
525 int
cmp_data_data_slow_like_prefix(const byte * lhs,ulint len1,const byte * rhs,ulint len2)526 cmp_data_data_slow_like_prefix(
527 /*===========================*/
528 				/* out: 1, 0, -1, if lhs is greater, equal,
529 				less than rhs, respectively */
530 	const byte*	lhs,	/* in: data field (== a pointer to a memory
531 				buffer) */
532 	ulint		len1,	/* in: data field length or UNIV_SQL_NULL */
533 	const byte*	rhs,	/* in: data field (== a pointer to a memory
534 				buffer) */
535 	ulint		len2)	/* in: data field length or UNIV_SQL_NULL */
536 {
537 	ulint	i;
538 
539 	ut_a(len2 != UNIV_SQL_NULL);
540 
541 	if (len1 == UNIV_SQL_NULL) {
542 
543 		/* We define the SQL null to be the smallest possible
544 		value of a field in the alphabetical order */
545 
546 		return(-1);
547 	}
548 
549 	/* Compare the values.*/
550 
551 	for (i = 0; i < len1 && i < len2; ++i, ++rhs, ++lhs) {
552 		ulint	lhs_byte = *lhs;
553 		ulint	rhs_byte = *rhs;
554 
555 		if (lhs_byte != rhs_byte) {
556 			/* If the bytes are equal, they will remain such even
557 			after the collation transformation below */
558 
559 			lhs_byte = cmp_collate(lhs_byte);
560 			rhs_byte = cmp_collate(rhs_byte);
561 
562 			if (lhs_byte > rhs_byte) {
563 
564 				return(1);
565 			} else if (lhs_byte < rhs_byte) {
566 
567 				return(-1);
568 			}
569 		}
570 	}
571 
572 	return(i == len2 ? 0 : 1);
573 }
574 
575 /*****************************************************************
576 This function is used to compare two data fields for which we know the
577 data type. The comparison is done for the LIKE operator.*/
578 
579 int
cmp_data_data_slow_like_suffix(const byte * data1 UNIV_UNUSED,ulint len1 UNIV_UNUSED,const byte * data2 UNIV_UNUSED,ulint len2 UNIV_UNUSED)580 cmp_data_data_slow_like_suffix(
581 /*===========================*/
582 				/* out: 1, 0, -1, if data1 is greater, equal,
583 				less than data2, respectively */
584 				/* in: data field (== a pointer to a
585 				memory buffer) */
586 	const byte*	data1 UNIV_UNUSED,
587 				/* in: data field length or UNIV_SQL_NULL */
588 	ulint		len1 UNIV_UNUSED,
589 				/* in: data field (== a pointer to a memory
590 				buffer) */
591 	const byte*	data2 UNIV_UNUSED,
592 				/* in: data field length or UNIV_SQL_NULL */
593 	ulint		len2 UNIV_UNUSED)
594 
595 {
596 	ut_error;	// FIXME:
597 	return(1);
598 }
599 
600 /*****************************************************************
601 This function is used to compare two data fields for which we know the
602 data type. The comparison is done for the LIKE operator.*/
603 
604 int
cmp_data_data_slow_like_substr(const byte * data1 UNIV_UNUSED,ulint len1 UNIV_UNUSED,const byte * data2 UNIV_UNUSED,ulint len2 UNIV_UNUSED)605 cmp_data_data_slow_like_substr(
606 /*===========================*/
607 				/* out: 1, 0, -1, if data1 is greater, equal,
608 				less than data2, respectively */
609 				/* in: data field (== a pointer to a
610 				memory buffer) */
611 	const byte*	data1 UNIV_UNUSED,
612 				/* in: data field length or UNIV_SQL_NULL */
613 	ulint		len1 UNIV_UNUSED,
614 				/* in: data field (== a pointer to a memory
615 				buffer) */
616 	const byte*	data2 UNIV_UNUSED,
617 				/* in: data field length or UNIV_SQL_NULL */
618 	ulint		len2 UNIV_UNUSED)
619 {
620 	ut_error;	// FIXME:
621 	return(1);
622 }
623 /*************************************************************//**
624 This function is used to compare a data tuple to a physical record.
625 Only dtuple->n_fields_cmp first fields are taken into account for
626 the data tuple! If we denote by n = n_fields_cmp, then rec must
627 have either m >= n fields, or it must differ from dtuple in some of
628 the m fields rec has. If rec has an externally stored field we do not
629 compare it but return with value 0 if such a comparison should be
630 made.
631 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
632 respectively, when only the common first fields are compared, or until
633 the first externally stored field in rec */
634 UNIV_INTERN
635 int
cmp_dtuple_rec_with_match_low(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,ulint n_cmp,ulint * matched_fields,ulint * matched_bytes)636 cmp_dtuple_rec_with_match_low(
637 /*==========================*/
638 	const dtuple_t*	dtuple,	/*!< in: data tuple */
639 	const rec_t*	rec,	/*!< in: physical record which differs from
640 				dtuple in some of the common fields, or which
641 				has an equal number or more fields than
642 				dtuple */
643 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
644 	ulint		n_cmp,	/*!< in: number of fields to compare */
645 	ulint*		matched_fields, /*!< in/out: number of already completely
646 				matched fields; when function returns,
647 				contains the value for current comparison */
648 	ulint*		matched_bytes) /*!< in/out: number of already matched
649 				bytes within the first field not completely
650 				matched; when function returns, contains the
651 				value for current comparison */
652 {
653 	const dfield_t*	dtuple_field;	/* current field in logical record */
654 	ulint		dtuple_f_len;	/* the length of the current field
655 					in the logical record */
656 	const byte*	dtuple_b_ptr;	/* pointer to the current byte in
657 					logical field data */
658 	ulint		dtuple_byte;	/* value of current byte to be compared
659 					in dtuple*/
660 	ulint		rec_f_len;	/* length of current field in rec */
661 	const byte*	rec_b_ptr;	/* pointer to the current byte in
662 					rec field */
663 	ulint		rec_byte;	/* value of current byte to be
664 					compared in rec */
665 	ulint		cur_field;	/* current field number */
666 	ulint		cur_bytes;	/* number of already matched bytes
667 					in current field */
668 	int		ret;		/* return value */
669 
670 	ut_ad(dtuple != NULL);
671 	ut_ad(rec != NULL);
672 	ut_ad(matched_fields != NULL);
673 	ut_ad(matched_bytes != NULL);
674 	ut_ad(dtuple_check_typed(dtuple));
675 	ut_ad(rec_offs_validate(rec, NULL, offsets));
676 
677 	cur_field = *matched_fields;
678 	cur_bytes = *matched_bytes;
679 
680 	ut_ad(n_cmp > 0);
681 	ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
682 	ut_ad(cur_field <= n_cmp);
683 	ut_ad(cur_field <= rec_offs_n_fields(offsets));
684 
685 	if (cur_bytes == 0 && cur_field == 0) {
686 		ulint	rec_info = rec_get_info_bits(rec,
687 						     rec_offs_comp(offsets));
688 		ulint	tup_info = dtuple_get_info_bits(dtuple);
689 
690 		if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
691 			ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
692 			goto order_resolved;
693 		} else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
694 			ret = -1;
695 			goto order_resolved;
696 		}
697 	}
698 
699 	/* Match fields in a loop; stop if we run out of fields in dtuple
700 	or find an externally stored field */
701 
702 	while (cur_field < n_cmp) {
703 
704 		ulint	mtype;
705 		ulint	prtype;
706 
707 		dtuple_field = dtuple_get_nth_field(dtuple, cur_field);
708 		{
709 			const dtype_t*	type
710 				= dfield_get_type(dtuple_field);
711 
712 			mtype = type->mtype;
713 			prtype = type->prtype;
714 		}
715 
716 		dtuple_f_len = dfield_get_len(dtuple_field);
717 
718 		rec_b_ptr = rec_get_nth_field(rec, offsets,
719 					      cur_field, &rec_f_len);
720 
721 		/* If we have matched yet 0 bytes, it may be that one or
722 		both the fields are SQL null, or the record or dtuple may be
723 		the predefined minimum record, or the field is externally
724 		stored */
725 
726 		if (UNIV_LIKELY(cur_bytes == 0)) {
727 			if (rec_offs_nth_extern(offsets, cur_field)) {
728 				/* We do not compare to an externally
729 				stored field */
730 
731 				ret = 0;
732 
733 				goto order_resolved;
734 			}
735 
736 			if (dtuple_f_len == UNIV_SQL_NULL) {
737 				if (rec_f_len == UNIV_SQL_NULL) {
738 
739 					goto next_field;
740 				}
741 
742 				ret = -1;
743 				goto order_resolved;
744 			} else if (rec_f_len == UNIV_SQL_NULL) {
745 				/* We define the SQL null to be the
746 				smallest possible value of a field
747 				in the alphabetical order */
748 
749 				ret = 1;
750 				goto order_resolved;
751 			}
752 		}
753 
754 		if (mtype >= DATA_FLOAT
755 		    || (mtype == DATA_BLOB
756 			&& 0 == (prtype & DATA_BINARY_TYPE)
757 			&& dtype_get_charset_coll(prtype)
758 			!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
759 
760 			ret = cmp_whole_field(
761 				mtype, prtype,
762 				static_cast<const byte*>(
763 					dfield_get_data(dtuple_field)),
764 				(unsigned) dtuple_f_len,
765 				rec_b_ptr, (unsigned) rec_f_len);
766 
767 			if (ret != 0) {
768 				cur_bytes = 0;
769 
770 				goto order_resolved;
771 			} else {
772 				goto next_field;
773 			}
774 		}
775 
776 		/* Set the pointers at the current byte */
777 
778 		rec_b_ptr = rec_b_ptr + cur_bytes;
779 		dtuple_b_ptr = (byte*) dfield_get_data(dtuple_field)
780 			+ cur_bytes;
781 		/* Compare then the fields */
782 
783 		for (;;) {
784 			if (UNIV_UNLIKELY(rec_f_len <= cur_bytes)) {
785 				if (dtuple_f_len <= cur_bytes) {
786 
787 					goto next_field;
788 				}
789 
790 				rec_byte = dtype_get_pad_char(mtype, prtype);
791 
792 				if (rec_byte == ULINT_UNDEFINED) {
793 					ret = 1;
794 
795 					goto order_resolved;
796 				}
797 			} else {
798 				rec_byte = *rec_b_ptr;
799 			}
800 
801 			if (UNIV_UNLIKELY(dtuple_f_len <= cur_bytes)) {
802 				dtuple_byte = dtype_get_pad_char(mtype,
803 								 prtype);
804 
805 				if (dtuple_byte == ULINT_UNDEFINED) {
806 					ret = -1;
807 
808 					goto order_resolved;
809 				}
810 			} else {
811 				dtuple_byte = *dtuple_b_ptr;
812 			}
813 
814 			if (dtuple_byte == rec_byte) {
815 				/* If the bytes are equal, they will
816 				remain such even after the collation
817 				transformation below */
818 
819 				goto next_byte;
820 			}
821 
822 			if (mtype <= DATA_CHAR
823 			    || (mtype == DATA_BLOB
824 				&& !(prtype & DATA_BINARY_TYPE))) {
825 
826 				rec_byte = cmp_collate(rec_byte);
827 				dtuple_byte = cmp_collate(dtuple_byte);
828 			}
829 
830 			ret = (int) (dtuple_byte - rec_byte);
831 			if (UNIV_LIKELY(ret)) {
832 				if (ret < 0) {
833 					ret = -1;
834 					goto order_resolved;
835 				} else {
836 					ret = 1;
837 					goto order_resolved;
838 				}
839 			}
840 next_byte:
841 			/* Next byte */
842 			cur_bytes++;
843 			rec_b_ptr++;
844 			dtuple_b_ptr++;
845 		}
846 
847 next_field:
848 		cur_field++;
849 		cur_bytes = 0;
850 	}
851 
852 	ut_ad(cur_bytes == 0);
853 
854 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
855 			up to the common fields */
856 order_resolved:
857 	ut_ad((ret >= - 1) && (ret <= 1));
858 	ut_ad(ret == cmp_debug_dtuple_rec_with_match(dtuple, rec, offsets,
859 						     n_cmp, matched_fields));
860 	ut_ad(*matched_fields == cur_field); /* In the debug version, the
861 					     above cmp_debug_... sets
862 					     *matched_fields to a value */
863 	*matched_fields = cur_field;
864 	*matched_bytes = cur_bytes;
865 
866 	return(ret);
867 }
868 
869 /**************************************************************//**
870 Compares a data tuple to a physical record.
871 @see cmp_dtuple_rec_with_match
872 @return 1, 0, -1, if dtuple is greater, equal, less than rec, respectively */
873 UNIV_INTERN
874 int
cmp_dtuple_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)875 cmp_dtuple_rec(
876 /*===========*/
877 	const dtuple_t*	dtuple,	/*!< in: data tuple */
878 	const rec_t*	rec,	/*!< in: physical record */
879 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
880 {
881 	ulint	matched_fields	= 0;
882 	ulint	matched_bytes	= 0;
883 
884 	ut_ad(rec_offs_validate(rec, NULL, offsets));
885 	return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
886 					 &matched_fields, &matched_bytes));
887 }
888 
889 /**************************************************************//**
890 Checks if a dtuple is a prefix of a record. The last field in dtuple
891 is allowed to be a prefix of the corresponding field in the record.
892 @return	TRUE if prefix */
893 UNIV_INTERN
894 ibool
cmp_dtuple_is_prefix_of_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)895 cmp_dtuple_is_prefix_of_rec(
896 /*========================*/
897 	const dtuple_t*	dtuple,	/*!< in: data tuple */
898 	const rec_t*	rec,	/*!< in: physical record */
899 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
900 {
901 	ulint	n_fields;
902 	ulint	matched_fields	= 0;
903 	ulint	matched_bytes	= 0;
904 
905 	ut_ad(rec_offs_validate(rec, NULL, offsets));
906 	n_fields = dtuple_get_n_fields(dtuple);
907 
908 	if (n_fields > rec_offs_n_fields(offsets)) {
909 
910 		return(FALSE);
911 	}
912 
913 	cmp_dtuple_rec_with_match(dtuple, rec, offsets,
914 				  &matched_fields, &matched_bytes);
915 	if (matched_fields == n_fields) {
916 
917 		return(TRUE);
918 	}
919 
920 	if (matched_fields == n_fields - 1
921 	    && matched_bytes == dfield_get_len(
922 		    dtuple_get_nth_field(dtuple, n_fields - 1))) {
923 		return(TRUE);
924 	}
925 
926 	return(FALSE);
927 }
928 
929 /*************************************************************//**
930 Compare two physical record fields.
931 @retval 1 if rec1 field is greater than rec2
932 @retval -1 if rec1 field is less than rec2
933 @retval 0 if rec1 field equals to rec2 */
934 static MY_ATTRIBUTE((nonnull, warn_unused_result))
935 int
cmp_rec_rec_simple_field(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,ulint n)936 cmp_rec_rec_simple_field(
937 /*=====================*/
938 	const rec_t*		rec1,	/*!< in: physical record */
939 	const rec_t*		rec2,	/*!< in: physical record */
940 	const ulint*		offsets1,/*!< in: rec_get_offsets(rec1, ...) */
941 	const ulint*		offsets2,/*!< in: rec_get_offsets(rec2, ...) */
942 	const dict_index_t*	index,	/*!< in: data dictionary index */
943 	ulint			n)	/*!< in: field to compare */
944 {
945 	const byte*	rec1_b_ptr;
946 	const byte*	rec2_b_ptr;
947 	ulint		rec1_f_len;
948 	ulint		rec2_f_len;
949 	const dict_col_t*	col	= dict_index_get_nth_col(index, n);
950 
951 	ut_ad(!rec_offs_nth_extern(offsets1, n));
952 	ut_ad(!rec_offs_nth_extern(offsets2, n));
953 
954 	rec1_b_ptr = rec_get_nth_field(rec1, offsets1, n, &rec1_f_len);
955 	rec2_b_ptr = rec_get_nth_field(rec2, offsets2, n, &rec2_f_len);
956 
957 	if (rec1_f_len == UNIV_SQL_NULL || rec2_f_len == UNIV_SQL_NULL) {
958 		if (rec1_f_len == rec2_f_len) {
959 			return(0);
960 		}
961 		/* We define the SQL null to be the smallest possible
962 		value of a field in the alphabetical order */
963 		return(rec1_f_len == UNIV_SQL_NULL ? -1 : 1);
964 	}
965 
966 	if (col->mtype >= DATA_FLOAT
967 	    || (col->mtype == DATA_BLOB
968 		&& !(col->prtype & DATA_BINARY_TYPE)
969 		&& dtype_get_charset_coll(col->prtype)
970 		!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
971 		return(cmp_whole_field(col->mtype, col->prtype,
972 				       rec1_b_ptr, (unsigned) rec1_f_len,
973 				       rec2_b_ptr, (unsigned) rec2_f_len));
974 	}
975 
976 	/* Compare the fields */
977 	for (ulint cur_bytes = 0;; cur_bytes++, rec1_b_ptr++, rec2_b_ptr++) {
978 		ulint		rec1_byte;
979 		ulint		rec2_byte;
980 
981 		if (rec2_f_len <= cur_bytes) {
982 			if (rec1_f_len <= cur_bytes) {
983 				return(0);
984 			}
985 
986 			rec2_byte = dtype_get_pad_char(
987 				col->mtype, col->prtype);
988 
989 			if (rec2_byte == ULINT_UNDEFINED) {
990 				return(1);
991 			}
992 		} else {
993 			rec2_byte = *rec2_b_ptr;
994 		}
995 
996 		if (rec1_f_len <= cur_bytes) {
997 			rec1_byte = dtype_get_pad_char(
998 				col->mtype, col->prtype);
999 
1000 			if (rec1_byte == ULINT_UNDEFINED) {
1001 				return(-1);
1002 			}
1003 		} else {
1004 			rec1_byte = *rec1_b_ptr;
1005 		}
1006 
1007 		if (rec1_byte == rec2_byte) {
1008 			/* If the bytes are equal, they will remain such
1009 			even after the collation transformation below */
1010 			continue;
1011 		}
1012 
1013 		if (col->mtype <= DATA_CHAR
1014 		    || (col->mtype == DATA_BLOB
1015 			&& !(col->prtype & DATA_BINARY_TYPE))) {
1016 
1017 			rec1_byte = cmp_collate(rec1_byte);
1018 			rec2_byte = cmp_collate(rec2_byte);
1019 		}
1020 
1021 		if (rec1_byte < rec2_byte) {
1022 			return(-1);
1023 		} else if (rec1_byte > rec2_byte) {
1024 			return(1);
1025 		}
1026 	}
1027 }
1028 
1029 /*************************************************************//**
1030 Compare two physical records that contain the same number of columns,
1031 none of which are stored externally.
1032 @retval 1 if rec1 (including non-ordering columns) is greater than rec2
1033 @retval -1 if rec1 (including non-ordering columns) is less than rec2
1034 @retval 0 if rec1 is a duplicate of rec2 */
1035 UNIV_INTERN
1036 int
cmp_rec_rec_simple(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,struct TABLE * table)1037 cmp_rec_rec_simple(
1038 /*===============*/
1039 	const rec_t*		rec1,	/*!< in: physical record */
1040 	const rec_t*		rec2,	/*!< in: physical record */
1041 	const ulint*		offsets1,/*!< in: rec_get_offsets(rec1, ...) */
1042 	const ulint*		offsets2,/*!< in: rec_get_offsets(rec2, ...) */
1043 	const dict_index_t*	index,	/*!< in: data dictionary index */
1044 	struct TABLE*		table)	/*!< in: MySQL table, for reporting
1045 					duplicate key value if applicable,
1046 					or NULL */
1047 {
1048 	ulint		n;
1049 	ulint		n_uniq	= dict_index_get_n_unique(index);
1050 	bool		null_eq	= false;
1051 
1052 	ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
1053 	ut_ad(rec_offs_n_fields(offsets2) == rec_offs_n_fields(offsets2));
1054 
1055 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1056 
1057 	for (n = 0; n < n_uniq; n++) {
1058 		int cmp = cmp_rec_rec_simple_field(
1059 			rec1, rec2, offsets1, offsets2, index, n);
1060 
1061 		if (cmp) {
1062 			return(cmp);
1063 		}
1064 
1065 		/* If the fields are internally equal, they must both
1066 		be NULL or non-NULL. */
1067 		ut_ad(rec_offs_nth_sql_null(offsets1, n)
1068 		      == rec_offs_nth_sql_null(offsets2, n));
1069 
1070 		if (rec_offs_nth_sql_null(offsets1, n)) {
1071 			ut_ad(!(dict_index_get_nth_col(index, n)->prtype
1072 				& DATA_NOT_NULL));
1073 			null_eq = true;
1074 		}
1075 	}
1076 
1077 	/* If we ran out of fields, the ordering columns of rec1 were
1078 	equal to rec2. Issue a duplicate key error if needed. */
1079 
1080 	if (!null_eq && table && dict_index_is_unique(index)) {
1081 		/* Report erroneous row using new version of table. */
1082 		innobase_rec_to_mysql(table, rec1, index, offsets1);
1083 		return(0);
1084 	}
1085 
1086 	/* Else, keep comparing so that we have the full internal
1087 	order. */
1088 	for (; n < dict_index_get_n_fields(index); n++) {
1089 		int cmp = cmp_rec_rec_simple_field(
1090 			rec1, rec2, offsets1, offsets2, index, n);
1091 
1092 		if (cmp) {
1093 			return(cmp);
1094 		}
1095 
1096 		/* If the fields are internally equal, they must both
1097 		be NULL or non-NULL. */
1098 		ut_ad(rec_offs_nth_sql_null(offsets1, n)
1099 		      == rec_offs_nth_sql_null(offsets2, n));
1100 	}
1101 
1102 	/* This should never be reached. Internally, an index must
1103 	never contain duplicate entries. */
1104 	ut_ad(0);
1105 	return(0);
1106 }
1107 
1108 /*************************************************************//**
1109 This function is used to compare two physical records. Only the common
1110 first fields are compared, and if an externally stored field is
1111 encountered, then 0 is returned.
1112 @return 1, 0, -1 if rec1 is greater, equal, less, respectively */
1113 UNIV_INTERN
1114 int
cmp_rec_rec_with_match(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,dict_index_t * index,ibool nulls_unequal,ulint * matched_fields,ulint * matched_bytes)1115 cmp_rec_rec_with_match(
1116 /*===================*/
1117 	const rec_t*	rec1,	/*!< in: physical record */
1118 	const rec_t*	rec2,	/*!< in: physical record */
1119 	const ulint*	offsets1,/*!< in: rec_get_offsets(rec1, index) */
1120 	const ulint*	offsets2,/*!< in: rec_get_offsets(rec2, index) */
1121 	dict_index_t*	index,	/*!< in: data dictionary index */
1122 	ibool		nulls_unequal,
1123 				/* in: TRUE if this is for index statistics
1124 				cardinality estimation, and innodb_stats_method
1125 				is "nulls_unequal" or "nulls_ignored" */
1126 	ulint*		matched_fields, /*!< in/out: number of already completely
1127 				matched fields; when the function returns,
1128 				contains the value the for current
1129 				comparison */
1130 	ulint*		matched_bytes) /*!< in/out: number of already matched
1131 				bytes within the first field not completely
1132 				matched; when the function returns, contains
1133 				the value for the current comparison */
1134 {
1135 	ulint		rec1_n_fields;	/* the number of fields in rec */
1136 	ulint		rec1_f_len;	/* length of current field in rec */
1137 	const byte*	rec1_b_ptr;	/* pointer to the current byte
1138 					in rec field */
1139 	ulint		rec1_byte;	/* value of current byte to be
1140 					compared in rec */
1141 	ulint		rec2_n_fields;	/* the number of fields in rec */
1142 	ulint		rec2_f_len;	/* length of current field in rec */
1143 	const byte*	rec2_b_ptr;	/* pointer to the current byte
1144 					in rec field */
1145 	ulint		rec2_byte;	/* value of current byte to be
1146 					compared in rec */
1147 	ulint		cur_field;	/* current field number */
1148 	ulint		cur_bytes;	/* number of already matched
1149 					bytes in current field */
1150 	int		ret = 0;	/* return value */
1151 	ulint		comp;
1152 
1153 	ut_ad(rec1 != NULL);
1154 	ut_ad(rec2 != NULL);
1155 	ut_ad(index != NULL);
1156 	ut_ad(rec_offs_validate(rec1, index, offsets1));
1157 	ut_ad(rec_offs_validate(rec2, index, offsets2));
1158 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1159 
1160 	comp = rec_offs_comp(offsets1);
1161 	rec1_n_fields = rec_offs_n_fields(offsets1);
1162 	rec2_n_fields = rec_offs_n_fields(offsets2);
1163 
1164 	cur_field = *matched_fields;
1165 	cur_bytes = *matched_bytes;
1166 
1167 	/* Match fields in a loop */
1168 
1169 	while ((cur_field < rec1_n_fields) && (cur_field < rec2_n_fields)) {
1170 
1171 		ulint	mtype;
1172 		ulint	prtype;
1173 
1174 		if (dict_index_is_univ(index)) {
1175 			/* This is for the insert buffer B-tree. */
1176 			mtype = DATA_BINARY;
1177 			prtype = 0;
1178 		} else {
1179 			const dict_col_t*	col
1180 				= dict_index_get_nth_col(index, cur_field);
1181 
1182 			mtype = col->mtype;
1183 			prtype = col->prtype;
1184 		}
1185 
1186 		rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
1187 					       cur_field, &rec1_f_len);
1188 		rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
1189 					       cur_field, &rec2_f_len);
1190 
1191 		if (cur_bytes == 0) {
1192 			if (cur_field == 0) {
1193 				/* Test if rec is the predefined minimum
1194 				record */
1195 				if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
1196 						  & REC_INFO_MIN_REC_FLAG)) {
1197 
1198 					if (!(rec_get_info_bits(rec2, comp)
1199 					      & REC_INFO_MIN_REC_FLAG)) {
1200 						ret = -1;
1201 					}
1202 
1203 					goto order_resolved;
1204 
1205 				} else if (UNIV_UNLIKELY
1206 					   (rec_get_info_bits(rec2, comp)
1207 					    & REC_INFO_MIN_REC_FLAG)) {
1208 
1209 					ret = 1;
1210 
1211 					goto order_resolved;
1212 				}
1213 			}
1214 
1215 			if (rec_offs_nth_extern(offsets1, cur_field)
1216 			    || rec_offs_nth_extern(offsets2, cur_field)) {
1217 				/* We do not compare to an externally
1218 				stored field */
1219 
1220 				goto order_resolved;
1221 			}
1222 
1223 			if (rec1_f_len == UNIV_SQL_NULL
1224 			    || rec2_f_len == UNIV_SQL_NULL) {
1225 
1226 				if (rec1_f_len == rec2_f_len) {
1227 					/* This is limited to stats collection,
1228 					cannot use it for regular search */
1229 					if (nulls_unequal) {
1230 						ret = -1;
1231 					} else {
1232 						goto next_field;
1233 					}
1234 				} else if (rec2_f_len == UNIV_SQL_NULL) {
1235 
1236 					/* We define the SQL null to be the
1237 					smallest possible value of a field
1238 					in the alphabetical order */
1239 
1240 					ret = 1;
1241 				} else {
1242 					ret = -1;
1243 				}
1244 
1245 				goto order_resolved;
1246 			}
1247 		}
1248 
1249 		if (mtype >= DATA_FLOAT
1250 		    || (mtype == DATA_BLOB
1251 			&& 0 == (prtype & DATA_BINARY_TYPE)
1252 			&& dtype_get_charset_coll(prtype)
1253 			!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
1254 
1255 			ret = cmp_whole_field(mtype, prtype,
1256 					      rec1_b_ptr,
1257 					      (unsigned) rec1_f_len,
1258 					      rec2_b_ptr,
1259 					      (unsigned) rec2_f_len);
1260 			if (ret != 0) {
1261 				cur_bytes = 0;
1262 
1263 				goto order_resolved;
1264 			} else {
1265 				goto next_field;
1266 			}
1267 		}
1268 
1269 		/* Set the pointers at the current byte */
1270 		rec1_b_ptr = rec1_b_ptr + cur_bytes;
1271 		rec2_b_ptr = rec2_b_ptr + cur_bytes;
1272 
1273 		/* Compare then the fields */
1274 		for (;;) {
1275 			if (rec2_f_len <= cur_bytes) {
1276 
1277 				if (rec1_f_len <= cur_bytes) {
1278 
1279 					goto next_field;
1280 				}
1281 
1282 				rec2_byte = dtype_get_pad_char(mtype, prtype);
1283 
1284 				if (rec2_byte == ULINT_UNDEFINED) {
1285 					ret = 1;
1286 
1287 					goto order_resolved;
1288 				}
1289 			} else {
1290 				rec2_byte = *rec2_b_ptr;
1291 			}
1292 
1293 			if (rec1_f_len <= cur_bytes) {
1294 				rec1_byte = dtype_get_pad_char(mtype, prtype);
1295 
1296 				if (rec1_byte == ULINT_UNDEFINED) {
1297 					ret = -1;
1298 
1299 					goto order_resolved;
1300 				}
1301 			} else {
1302 				rec1_byte = *rec1_b_ptr;
1303 			}
1304 
1305 			if (rec1_byte == rec2_byte) {
1306 				/* If the bytes are equal, they will remain
1307 				such even after the collation transformation
1308 				below */
1309 
1310 				goto next_byte;
1311 			}
1312 
1313 			if (mtype <= DATA_CHAR
1314 			    || (mtype == DATA_BLOB
1315 				&& !(prtype & DATA_BINARY_TYPE))) {
1316 
1317 				rec1_byte = cmp_collate(rec1_byte);
1318 				rec2_byte = cmp_collate(rec2_byte);
1319 			}
1320 
1321 			if (rec1_byte < rec2_byte) {
1322 				ret = -1;
1323 				goto order_resolved;
1324 			} else if (rec1_byte > rec2_byte) {
1325 				ret = 1;
1326 				goto order_resolved;
1327 			}
1328 next_byte:
1329 			/* Next byte */
1330 
1331 			cur_bytes++;
1332 			rec1_b_ptr++;
1333 			rec2_b_ptr++;
1334 		}
1335 
1336 next_field:
1337 		cur_field++;
1338 		cur_bytes = 0;
1339 	}
1340 
1341 	ut_ad(cur_bytes == 0);
1342 
1343 	/* If we ran out of fields, rec1 was equal to rec2 up
1344 	to the common fields */
1345 	ut_ad(ret == 0);
1346 order_resolved:
1347 
1348 	ut_ad((ret >= - 1) && (ret <= 1));
1349 
1350 	*matched_fields = cur_field;
1351 	*matched_bytes = cur_bytes;
1352 
1353 	return(ret);
1354 }
1355 
1356 #ifdef UNIV_DEBUG
1357 /*************************************************************//**
1358 Used in debug checking of cmp_dtuple_... .
1359 This function is used to compare a data tuple to a physical record. If
1360 dtuple has n fields then rec must have either m >= n fields, or it must
1361 differ from dtuple in some of the m fields rec has. If encounters an
1362 externally stored field, returns 0.
1363 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
1364 respectively, when only the common first fields are compared */
1365 static
1366 int
cmp_debug_dtuple_rec_with_match(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,ulint n_cmp,ulint * matched_fields)1367 cmp_debug_dtuple_rec_with_match(
1368 /*============================*/
1369 	const dtuple_t*	dtuple,	/*!< in: data tuple */
1370 	const rec_t*	rec,	/*!< in: physical record which differs from
1371 				dtuple in some of the common fields, or which
1372 				has an equal number or more fields than
1373 				dtuple */
1374 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1375 	ulint		n_cmp,	/*!< in: number of fields to compare */
1376 	ulint*		matched_fields) /*!< in/out: number of already
1377 				completely matched fields; when function
1378 				returns, contains the value for current
1379 				comparison */
1380 {
1381 	const dfield_t*	dtuple_field;	/* current field in logical record */
1382 	ulint		dtuple_f_len;	/* the length of the current field
1383 					in the logical record */
1384 	const byte*	dtuple_f_data;	/* pointer to the current logical
1385 					field data */
1386 	ulint		rec_f_len;	/* length of current field in rec */
1387 	const byte*	rec_f_data;	/* pointer to the current rec field */
1388 	int		ret;		/* return value */
1389 	ulint		cur_field;	/* current field number */
1390 
1391 	ut_ad(dtuple != NULL);
1392 	ut_ad(rec != NULL);
1393 	ut_ad(matched_fields != NULL);
1394 	ut_ad(dtuple_check_typed(dtuple));
1395 	ut_ad(rec_offs_validate(rec, NULL, offsets));
1396 
1397 	ut_ad(n_cmp > 0);
1398 	ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
1399 	ut_ad(*matched_fields <= n_cmp);
1400 	ut_ad(*matched_fields <= rec_offs_n_fields(offsets));
1401 
1402 	cur_field = *matched_fields;
1403 
1404 	if (cur_field == 0) {
1405 		if (UNIV_UNLIKELY
1406 		    (rec_get_info_bits(rec, rec_offs_comp(offsets))
1407 		     & REC_INFO_MIN_REC_FLAG)) {
1408 
1409 			ret = !(dtuple_get_info_bits(dtuple)
1410 				& REC_INFO_MIN_REC_FLAG);
1411 
1412 			goto order_resolved;
1413 		}
1414 
1415 		if (UNIV_UNLIKELY
1416 		    (dtuple_get_info_bits(dtuple) & REC_INFO_MIN_REC_FLAG)) {
1417 			ret = -1;
1418 
1419 			goto order_resolved;
1420 		}
1421 	}
1422 
1423 	/* Match fields in a loop; stop if we run out of fields in dtuple */
1424 
1425 	while (cur_field < n_cmp) {
1426 
1427 		ulint	mtype;
1428 		ulint	prtype;
1429 
1430 		dtuple_field = dtuple_get_nth_field(dtuple, cur_field);
1431 		{
1432 			const dtype_t*	type
1433 				= dfield_get_type(dtuple_field);
1434 
1435 			mtype = type->mtype;
1436 			prtype = type->prtype;
1437 		}
1438 
1439 		dtuple_f_data = static_cast<const byte*>(
1440 			dfield_get_data(dtuple_field));
1441 
1442 		dtuple_f_len = dfield_get_len(dtuple_field);
1443 
1444 		rec_f_data = rec_get_nth_field(rec, offsets,
1445 					       cur_field, &rec_f_len);
1446 
1447 		if (rec_offs_nth_extern(offsets, cur_field)) {
1448 			/* We do not compare to an externally stored field */
1449 
1450 			ret = 0;
1451 
1452 			goto order_resolved;
1453 		}
1454 
1455 		ret = cmp_data_data(mtype, prtype, dtuple_f_data, dtuple_f_len,
1456 				    rec_f_data, rec_f_len);
1457 		if (ret != 0) {
1458 			goto order_resolved;
1459 		}
1460 
1461 		cur_field++;
1462 	}
1463 
1464 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
1465 			up to the common fields */
1466 order_resolved:
1467 	ut_ad((ret >= - 1) && (ret <= 1));
1468 
1469 	*matched_fields = cur_field;
1470 
1471 	return(ret);
1472 }
1473 #endif /* UNIV_DEBUG */
1474