1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 *****************************************************************************/
18 
19 /*******************************************************************//**
20 @file rem/rem0cmp.c
21 Comparison services for records
22 
23 Created 7/1/1994 Heikki Tuuri
24 ************************************************************************/
25 
26 #include "rem0cmp.h"
27 
28 #ifdef UNIV_NONINL
29 #include "rem0cmp.ic"
30 #endif
31 
32 #include "srv0srv.h"
33 
34 /*		ALPHABETICAL ORDER
35 		==================
36 
37 The records are put into alphabetical order in the following
38 way: let F be the first field where two records disagree.
39 If there is a character in some position n where the
40 records disagree, the order is determined by comparison of
41 the characters at position n, possibly after
42 collating transformation. If there is no such character,
43 but the corresponding fields have different lengths, then
44 if the data type of the fields is paddable,
45 shorter field is padded with a padding character. If the
46 data type is not paddable, longer field is considered greater.
47 Finally, the SQL null is bigger than any other value.
48 
49 At the present, the comparison functions return 0 in the case,
50 where two records disagree only in the way that one
51 has more fields than the other. */
52 
53 #ifdef UNIV_DEBUG
54 /*************************************************************//**
55 Used in debug checking of cmp_dtuple_... .
56 This function is used to compare a data tuple to a physical record. If
57 dtuple has n fields then rec must have either m >= n fields, or it must
58 differ from dtuple in some of the m fields rec has.
59 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
60 respectively, when only the common first fields are compared */
61 static
62 int
63 cmp_debug_dtuple_rec_with_match(
64 /*============================*/
65 	const dtuple_t*	dtuple,	/*!< in: data tuple */
66 	const rec_t*	rec,	/*!< in: physical record which differs from
67 				dtuple in some of the common fields, or which
68 				has an equal number or more fields than
69 				dtuple */
70 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
71 	ulint*		matched_fields);/*!< in/out: number of already
72 				completely  matched fields; when function
73 				returns, contains the value for current
74 				comparison */
75 #endif /* UNIV_DEBUG */
76 /*************************************************************//**
77 This function is used to compare two data fields for which the data type
78 is such that we must use MySQL code to compare them. The prototype here
79 must be a copy of the one in ha_innobase.cc!
80 @return	1, 0, -1, if a is greater, equal, less than b, respectively */
81 extern
82 int
83 innobase_mysql_cmp(
84 /*===============*/
85 	int		mysql_type,	/*!< in: MySQL type */
86 	uint		charset_number,	/*!< in: number of the charset */
87 	const unsigned char* a,		/*!< in: data field */
88 	unsigned int	a_length,	/*!< in: data field length,
89 					not UNIV_SQL_NULL */
90 	const unsigned char* b,		/*!< in: data field */
91 	unsigned int	b_length);	/*!< in: data field length,
92 					not UNIV_SQL_NULL */
93 /*********************************************************************//**
94 Transforms the character code so that it is ordered appropriately for the
95 language. This is only used for the latin1 char set. MySQL does the
96 comparisons for other char sets.
97 @return	collation order position */
98 UNIV_INLINE
99 ulint
cmp_collate(ulint code)100 cmp_collate(
101 /*========*/
102 	ulint	code)	/*!< in: code of a character stored in database record */
103 {
104 	return((ulint) srv_latin1_ordering[code]);
105 }
106 
107 /*************************************************************//**
108 Returns TRUE if two columns are equal for comparison purposes.
109 @return	TRUE if the columns are considered equal in comparisons */
110 UNIV_INTERN
111 ibool
cmp_cols_are_equal(const dict_col_t * col1,const dict_col_t * col2,ibool check_charsets)112 cmp_cols_are_equal(
113 /*===============*/
114 	const dict_col_t*	col1,	/*!< in: column 1 */
115 	const dict_col_t*	col2,	/*!< in: column 2 */
116 	ibool			check_charsets)
117 					/*!< in: whether to check charsets */
118 {
119 	if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
120 	    && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
121 
122 		/* Both are non-binary string types: they can be compared if
123 		and only if the charset-collation is the same */
124 
125 		if (check_charsets) {
126 			return(dtype_get_charset_coll(col1->prtype)
127 			       == dtype_get_charset_coll(col2->prtype));
128 		} else {
129 			return(TRUE);
130 		}
131 	}
132 
133 	if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
134 	    && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
135 
136 		/* Both are binary string types: they can be compared */
137 
138 		return(TRUE);
139 	}
140 
141 	if (col1->mtype != col2->mtype) {
142 
143 		return(FALSE);
144 	}
145 
146 	if (col1->mtype == DATA_INT
147 	    && (col1->prtype & DATA_UNSIGNED)
148 	    != (col2->prtype & DATA_UNSIGNED)) {
149 
150 		/* The storage format of an unsigned integer is different
151 		from a signed integer: in a signed integer we OR
152 		0x8000... to the value of positive integers. */
153 
154 		return(FALSE);
155 	}
156 
157 	return(col1->mtype != DATA_INT || col1->len == col2->len);
158 }
159 
160 /*************************************************************//**
161 Innobase uses this function to compare two data fields for which the data type
162 is such that we must compare whole fields or call MySQL to do the comparison
163 @return	1, 0, -1, if a is greater, equal, less than b, respectively */
164 static
165 int
cmp_whole_field(ulint mtype,ulint prtype,const byte * a,unsigned int a_length,const byte * b,unsigned int b_length)166 cmp_whole_field(
167 /*============*/
168 	ulint		mtype,		/*!< in: main type */
169 	ulint		prtype,		/*!< in: precise type */
170 	const byte*	a,		/*!< in: data field */
171 	unsigned int	a_length,	/*!< in: data field length,
172 					not UNIV_SQL_NULL */
173 	const byte*	b,		/*!< in: data field */
174 	unsigned int	b_length)	/*!< in: data field length,
175 					not UNIV_SQL_NULL */
176 {
177 	float		f_1;
178 	float		f_2;
179 	double		d_1;
180 	double		d_2;
181 	int		swap_flag	= 1;
182 
183 	switch (mtype) {
184 
185 	case DATA_DECIMAL:
186 		/* Remove preceding spaces */
187 		for (; a_length && *a == ' '; a++, a_length--);
188 		for (; b_length && *b == ' '; b++, b_length--);
189 
190 		if (*a == '-') {
191 			if (*b != '-') {
192 				return(-1);
193 			}
194 
195 			a++; b++;
196 			a_length--;
197 			b_length--;
198 
199 			swap_flag = -1;
200 
201 		} else if (*b == '-') {
202 
203 			return(1);
204 		}
205 
206 		while (a_length > 0 && (*a == '+' || *a == '0')) {
207 			a++; a_length--;
208 		}
209 
210 		while (b_length > 0 && (*b == '+' || *b == '0')) {
211 			b++; b_length--;
212 		}
213 
214 		if (a_length != b_length) {
215 			if (a_length < b_length) {
216 				return(-swap_flag);
217 			}
218 
219 			return(swap_flag);
220 		}
221 
222 		while (a_length > 0 && *a == *b) {
223 
224 			a++; b++; a_length--;
225 		}
226 
227 		if (a_length == 0) {
228 
229 			return(0);
230 		}
231 
232 		if (*a > *b) {
233 			return(swap_flag);
234 		}
235 
236 		return(-swap_flag);
237 	case DATA_DOUBLE:
238 		d_1 = mach_double_read(a);
239 		d_2 = mach_double_read(b);
240 
241 		if (d_1 > d_2) {
242 			return(1);
243 		} else if (d_2 > d_1) {
244 			return(-1);
245 		}
246 
247 		return(0);
248 
249 	case DATA_FLOAT:
250 		f_1 = mach_float_read(a);
251 		f_2 = mach_float_read(b);
252 
253 		if (f_1 > f_2) {
254 			return(1);
255 		} else if (f_2 > f_1) {
256 			return(-1);
257 		}
258 
259 		return(0);
260 	case DATA_BLOB:
261 		if (prtype & DATA_BINARY_TYPE) {
262 
263 			ut_print_timestamp(stderr);
264 			fprintf(stderr,
265 				"  InnoDB: Error: comparing a binary BLOB"
266 				" with a character set sensitive\n"
267 				"InnoDB: comparison!\n");
268 		}
269 		/* fall through */
270 	case DATA_VARMYSQL:
271 	case DATA_MYSQL:
272 		return(innobase_mysql_cmp(
273 			       (int)(prtype & DATA_MYSQL_TYPE_MASK),
274 			       (uint)dtype_get_charset_coll(prtype),
275 			       a, a_length, b, b_length));
276 	default:
277 		fprintf(stderr,
278 			"InnoDB: unknown type number %lu\n",
279 			(ulong) mtype);
280 		ut_error;
281 	}
282 
283 	return(0);
284 }
285 
286 /*************************************************************//**
287 This function is used to compare two data fields for which we know the
288 data type.
289 @return	1, 0, -1, if data1 is greater, equal, less than data2, respectively */
290 UNIV_INTERN
291 int
cmp_data_data_slow(ulint mtype,ulint prtype,const byte * data1,ulint len1,const byte * data2,ulint len2)292 cmp_data_data_slow(
293 /*===============*/
294 	ulint		mtype,	/*!< in: main type */
295 	ulint		prtype,	/*!< in: precise type */
296 	const byte*	data1,	/*!< in: data field (== a pointer to a memory
297 				buffer) */
298 	ulint		len1,	/*!< in: data field length or UNIV_SQL_NULL */
299 	const byte*	data2,	/*!< in: data field (== a pointer to a memory
300 				buffer) */
301 	ulint		len2)	/*!< in: data field length or UNIV_SQL_NULL */
302 {
303 	ulint	data1_byte;
304 	ulint	data2_byte;
305 	ulint	cur_bytes;
306 
307 	if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL) {
308 
309 		if (len1 == len2) {
310 
311 			return(0);
312 		}
313 
314 		if (len1 == UNIV_SQL_NULL) {
315 			/* We define the SQL null to be the smallest possible
316 			value of a field in the alphabetical order */
317 
318 			return(-1);
319 		}
320 
321 		return(1);
322 	}
323 
324 	if (mtype >= DATA_FLOAT
325 	    || (mtype == DATA_BLOB
326 		&& 0 == (prtype & DATA_BINARY_TYPE)
327 		&& dtype_get_charset_coll(prtype)
328 		!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
329 
330 		return(cmp_whole_field(mtype, prtype,
331 				       data1, (unsigned) len1,
332 				       data2, (unsigned) len2));
333 	}
334 
335 	/* Compare then the fields */
336 
337 	cur_bytes = 0;
338 
339 	for (;;) {
340 		if (len1 <= cur_bytes) {
341 			if (len2 <= cur_bytes) {
342 
343 				return(0);
344 			}
345 
346 			data1_byte = dtype_get_pad_char(mtype, prtype);
347 
348 			if (data1_byte == ULINT_UNDEFINED) {
349 
350 				return(-1);
351 			}
352 		} else {
353 			data1_byte = *data1;
354 		}
355 
356 		if (len2 <= cur_bytes) {
357 			data2_byte = dtype_get_pad_char(mtype, prtype);
358 
359 			if (data2_byte == ULINT_UNDEFINED) {
360 
361 				return(1);
362 			}
363 		} else {
364 			data2_byte = *data2;
365 		}
366 
367 		if (data1_byte == data2_byte) {
368 			/* If the bytes are equal, they will remain such even
369 			after the collation transformation below */
370 
371 			goto next_byte;
372 		}
373 
374 		if (mtype <= DATA_CHAR
375 		    || (mtype == DATA_BLOB
376 			&& 0 == (prtype & DATA_BINARY_TYPE))) {
377 
378 			data1_byte = cmp_collate(data1_byte);
379 			data2_byte = cmp_collate(data2_byte);
380 		}
381 
382 		if (data1_byte > data2_byte) {
383 
384 			return(1);
385 		} else if (data1_byte < data2_byte) {
386 
387 			return(-1);
388 		}
389 next_byte:
390 		/* Next byte */
391 		cur_bytes++;
392 		data1++;
393 		data2++;
394 	}
395 
396 	return(0);		/* Not reached */
397 }
398 
399 /*************************************************************//**
400 This function is used to compare a data tuple to a physical record.
401 Only dtuple->n_fields_cmp first fields are taken into account for
402 the data tuple! If we denote by n = n_fields_cmp, then rec must
403 have either m >= n fields, or it must differ from dtuple in some of
404 the m fields rec has. If rec has an externally stored field we do not
405 compare it but return with value 0 if such a comparison should be
406 made.
407 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
408 respectively, when only the common first fields are compared, or until
409 the first externally stored field in rec */
410 UNIV_INTERN
411 int
cmp_dtuple_rec_with_match(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,ulint * matched_fields,ulint * matched_bytes)412 cmp_dtuple_rec_with_match(
413 /*======================*/
414 	const dtuple_t*	dtuple,	/*!< in: data tuple */
415 	const rec_t*	rec,	/*!< in: physical record which differs from
416 				dtuple in some of the common fields, or which
417 				has an equal number or more fields than
418 				dtuple */
419 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
420 	ulint*		matched_fields, /*!< in/out: number of already completely
421 				matched fields; when function returns,
422 				contains the value for current comparison */
423 	ulint*		matched_bytes) /*!< in/out: number of already matched
424 				bytes within the first field not completely
425 				matched; when function returns, contains the
426 				value for current comparison */
427 {
428 	const dfield_t*	dtuple_field;	/* current field in logical record */
429 	ulint		dtuple_f_len;	/* the length of the current field
430 					in the logical record */
431 	const byte*	dtuple_b_ptr;	/* pointer to the current byte in
432 					logical field data */
433 	ulint		dtuple_byte;	/* value of current byte to be compared
434 					in dtuple*/
435 	ulint		rec_f_len;	/* length of current field in rec */
436 	const byte*	rec_b_ptr;	/* pointer to the current byte in
437 					rec field */
438 	ulint		rec_byte;	/* value of current byte to be
439 					compared in rec */
440 	ulint		cur_field;	/* current field number */
441 	ulint		cur_bytes;	/* number of already matched bytes
442 					in current field */
443 	int		ret = 3333;	/* return value */
444 
445 	ut_ad(dtuple && rec && matched_fields && matched_bytes);
446 	ut_ad(dtuple_check_typed(dtuple));
447 	ut_ad(rec_offs_validate(rec, NULL, offsets));
448 
449 	cur_field = *matched_fields;
450 	cur_bytes = *matched_bytes;
451 
452 	ut_ad(cur_field <= dtuple_get_n_fields_cmp(dtuple));
453 	ut_ad(cur_field <= rec_offs_n_fields(offsets));
454 
455 	if (cur_bytes == 0 && cur_field == 0) {
456 		ulint	rec_info = rec_get_info_bits(rec,
457 						     rec_offs_comp(offsets));
458 		ulint	tup_info = dtuple_get_info_bits(dtuple);
459 
460 		if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
461 			ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
462 			goto order_resolved;
463 		} else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
464 			ret = -1;
465 			goto order_resolved;
466 		}
467 	}
468 
469 	/* Match fields in a loop; stop if we run out of fields in dtuple
470 	or find an externally stored field */
471 
472 	while (cur_field < dtuple_get_n_fields_cmp(dtuple)) {
473 
474 		ulint	mtype;
475 		ulint	prtype;
476 
477 		dtuple_field = dtuple_get_nth_field(dtuple, cur_field);
478 		{
479 			const dtype_t*	type
480 				= dfield_get_type(dtuple_field);
481 
482 			mtype = type->mtype;
483 			prtype = type->prtype;
484 		}
485 
486 		dtuple_f_len = dfield_get_len(dtuple_field);
487 
488 		rec_b_ptr = rec_get_nth_field(rec, offsets,
489 					      cur_field, &rec_f_len);
490 
491 		/* If we have matched yet 0 bytes, it may be that one or
492 		both the fields are SQL null, or the record or dtuple may be
493 		the predefined minimum record, or the field is externally
494 		stored */
495 
496 		if (UNIV_LIKELY(cur_bytes == 0)) {
497 			if (rec_offs_nth_extern(offsets, cur_field)) {
498 				/* We do not compare to an externally
499 				stored field */
500 
501 				ret = 0;
502 
503 				goto order_resolved;
504 			}
505 
506 			if (dtuple_f_len == UNIV_SQL_NULL) {
507 				if (rec_f_len == UNIV_SQL_NULL) {
508 
509 					goto next_field;
510 				}
511 
512 				ret = -1;
513 				goto order_resolved;
514 			} else if (rec_f_len == UNIV_SQL_NULL) {
515 				/* We define the SQL null to be the
516 				smallest possible value of a field
517 				in the alphabetical order */
518 
519 				ret = 1;
520 				goto order_resolved;
521 			}
522 		}
523 
524 		if (mtype >= DATA_FLOAT
525 		    || (mtype == DATA_BLOB
526 			&& 0 == (prtype & DATA_BINARY_TYPE)
527 			&& dtype_get_charset_coll(prtype)
528 			!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
529 
530 			ret = cmp_whole_field(mtype, prtype,
531 					      dfield_get_data(dtuple_field),
532 					      (unsigned) dtuple_f_len,
533 					      rec_b_ptr, (unsigned) rec_f_len);
534 
535 			if (ret != 0) {
536 				cur_bytes = 0;
537 
538 				goto order_resolved;
539 			} else {
540 				goto next_field;
541 			}
542 		}
543 
544 		/* Set the pointers at the current byte */
545 
546 		rec_b_ptr = rec_b_ptr + cur_bytes;
547 		dtuple_b_ptr = (byte*)dfield_get_data(dtuple_field)
548 			+ cur_bytes;
549 		/* Compare then the fields */
550 
551 		for (;;) {
552 			if (UNIV_UNLIKELY(rec_f_len <= cur_bytes)) {
553 				if (dtuple_f_len <= cur_bytes) {
554 
555 					goto next_field;
556 				}
557 
558 				rec_byte = dtype_get_pad_char(mtype, prtype);
559 
560 				if (rec_byte == ULINT_UNDEFINED) {
561 					ret = 1;
562 
563 					goto order_resolved;
564 				}
565 			} else {
566 				rec_byte = *rec_b_ptr;
567 			}
568 
569 			if (UNIV_UNLIKELY(dtuple_f_len <= cur_bytes)) {
570 				dtuple_byte = dtype_get_pad_char(mtype,
571 								 prtype);
572 
573 				if (dtuple_byte == ULINT_UNDEFINED) {
574 					ret = -1;
575 
576 					goto order_resolved;
577 				}
578 			} else {
579 				dtuple_byte = *dtuple_b_ptr;
580 			}
581 
582 			if (dtuple_byte == rec_byte) {
583 				/* If the bytes are equal, they will
584 				remain such even after the collation
585 				transformation below */
586 
587 				goto next_byte;
588 			}
589 
590 			if (mtype <= DATA_CHAR
591 			    || (mtype == DATA_BLOB
592 				&& !(prtype & DATA_BINARY_TYPE))) {
593 
594 				rec_byte = cmp_collate(rec_byte);
595 				dtuple_byte = cmp_collate(dtuple_byte);
596 			}
597 
598 			ret = (int) (dtuple_byte - rec_byte);
599 			if (UNIV_LIKELY(ret)) {
600 				if (ret < 0) {
601 					ret = -1;
602 					goto order_resolved;
603 				} else {
604 					ret = 1;
605 					goto order_resolved;
606 				}
607 			}
608 next_byte:
609 			/* Next byte */
610 			cur_bytes++;
611 			rec_b_ptr++;
612 			dtuple_b_ptr++;
613 		}
614 
615 next_field:
616 		cur_field++;
617 		cur_bytes = 0;
618 	}
619 
620 	ut_ad(cur_bytes == 0);
621 
622 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
623 			up to the common fields */
624 order_resolved:
625 	ut_ad((ret >= - 1) && (ret <= 1));
626 	ut_ad(ret == cmp_debug_dtuple_rec_with_match(dtuple, rec, offsets,
627 						     matched_fields));
628 	ut_ad(*matched_fields == cur_field); /* In the debug version, the
629 					     above cmp_debug_... sets
630 					     *matched_fields to a value */
631 	*matched_fields = cur_field;
632 	*matched_bytes = cur_bytes;
633 
634 	return(ret);
635 }
636 
637 /**************************************************************//**
638 Compares a data tuple to a physical record.
639 @see cmp_dtuple_rec_with_match
640 @return 1, 0, -1, if dtuple is greater, equal, less than rec, respectively */
641 UNIV_INTERN
642 int
cmp_dtuple_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)643 cmp_dtuple_rec(
644 /*===========*/
645 	const dtuple_t*	dtuple,	/*!< in: data tuple */
646 	const rec_t*	rec,	/*!< in: physical record */
647 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
648 {
649 	ulint	matched_fields	= 0;
650 	ulint	matched_bytes	= 0;
651 
652 	ut_ad(rec_offs_validate(rec, NULL, offsets));
653 	return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
654 					 &matched_fields, &matched_bytes));
655 }
656 
657 /**************************************************************//**
658 Checks if a dtuple is a prefix of a record. The last field in dtuple
659 is allowed to be a prefix of the corresponding field in the record.
660 @return	TRUE if prefix */
661 UNIV_INTERN
662 ibool
cmp_dtuple_is_prefix_of_rec(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets)663 cmp_dtuple_is_prefix_of_rec(
664 /*========================*/
665 	const dtuple_t*	dtuple,	/*!< in: data tuple */
666 	const rec_t*	rec,	/*!< in: physical record */
667 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
668 {
669 	ulint	n_fields;
670 	ulint	matched_fields	= 0;
671 	ulint	matched_bytes	= 0;
672 
673 	ut_ad(rec_offs_validate(rec, NULL, offsets));
674 	n_fields = dtuple_get_n_fields(dtuple);
675 
676 	if (n_fields > rec_offs_n_fields(offsets)) {
677 
678 		return(FALSE);
679 	}
680 
681 	cmp_dtuple_rec_with_match(dtuple, rec, offsets,
682 				  &matched_fields, &matched_bytes);
683 	if (matched_fields == n_fields) {
684 
685 		return(TRUE);
686 	}
687 
688 	if (matched_fields == n_fields - 1
689 	    && matched_bytes == dfield_get_len(
690 		    dtuple_get_nth_field(dtuple, n_fields - 1))) {
691 		return(TRUE);
692 	}
693 
694 	return(FALSE);
695 }
696 
697 /*************************************************************//**
698 Compare two physical records that contain the same number of columns,
699 none of which are stored externally.
700 @return	1, 0, -1 if rec1 is greater, equal, less, respectively, than rec2 */
701 UNIV_INTERN
702 int
cmp_rec_rec_simple(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,const dict_index_t * index,ibool * null_eq)703 cmp_rec_rec_simple(
704 /*===============*/
705 	const rec_t*		rec1,	/*!< in: physical record */
706 	const rec_t*		rec2,	/*!< in: physical record */
707 	const ulint*		offsets1,/*!< in: rec_get_offsets(rec1, ...) */
708 	const ulint*		offsets2,/*!< in: rec_get_offsets(rec2, ...) */
709 	const dict_index_t*	index,	/*!< in: data dictionary index */
710 	ibool*			null_eq)/*!< out: set to TRUE if
711 					found matching null values */
712 {
713 	ulint		rec1_f_len;	/*!< length of current field in rec1 */
714 	const byte*	rec1_b_ptr;	/*!< pointer to the current byte
715 					in rec1 field */
716 	ulint		rec1_byte;	/*!< value of current byte to be
717 					compared in rec1 */
718 	ulint		rec2_f_len;	/*!< length of current field in rec2 */
719 	const byte*	rec2_b_ptr;	/*!< pointer to the current byte
720 					in rec2 field */
721 	ulint		rec2_byte;	/*!< value of current byte to be
722 					compared in rec2 */
723 	ulint		cur_field;	/*!< current field number */
724 	ulint		n_uniq;
725 
726 	n_uniq = dict_index_get_n_unique(index);
727 	ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
728 	ut_ad(rec_offs_n_fields(offsets2) >= n_uniq);
729 
730 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
731 
732 	for (cur_field = 0; cur_field < n_uniq; cur_field++) {
733 
734 		ulint	cur_bytes;
735 		ulint	mtype;
736 		ulint	prtype;
737 
738 		{
739 			const dict_col_t*	col
740 				= dict_index_get_nth_col(index, cur_field);
741 
742 			mtype = col->mtype;
743 			prtype = col->prtype;
744 		}
745 
746 		ut_ad(!rec_offs_nth_extern(offsets1, cur_field));
747 		ut_ad(!rec_offs_nth_extern(offsets2, cur_field));
748 
749 		rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
750 					       cur_field, &rec1_f_len);
751 		rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
752 					       cur_field, &rec2_f_len);
753 
754 		if (rec1_f_len == UNIV_SQL_NULL
755 		    || rec2_f_len == UNIV_SQL_NULL) {
756 
757 			if (rec1_f_len == rec2_f_len) {
758 				if (null_eq) {
759 					*null_eq = TRUE;
760 				}
761 
762 				goto next_field;
763 
764 			} else if (rec2_f_len == UNIV_SQL_NULL) {
765 
766 				/* We define the SQL null to be the
767 				smallest possible value of a field
768 				in the alphabetical order */
769 
770 				return(1);
771 			} else {
772 				return(-1);
773 			}
774 		}
775 
776 		if (mtype >= DATA_FLOAT
777 		    || (mtype == DATA_BLOB
778 			&& 0 == (prtype & DATA_BINARY_TYPE)
779 			&& dtype_get_charset_coll(prtype)
780 			!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
781 			int ret = cmp_whole_field(mtype, prtype,
782 						  rec1_b_ptr,
783 						  (unsigned) rec1_f_len,
784 						  rec2_b_ptr,
785 						  (unsigned) rec2_f_len);
786 			if (ret) {
787 				return(ret);
788 			}
789 
790 			goto next_field;
791 		}
792 
793 		/* Compare the fields */
794 		for (cur_bytes = 0;; cur_bytes++, rec1_b_ptr++, rec2_b_ptr++) {
795 			if (rec2_f_len <= cur_bytes) {
796 
797 				if (rec1_f_len <= cur_bytes) {
798 
799 					goto next_field;
800 				}
801 
802 				rec2_byte = dtype_get_pad_char(mtype, prtype);
803 
804 				if (rec2_byte == ULINT_UNDEFINED) {
805 					return(1);
806 				}
807 			} else {
808 				rec2_byte = *rec2_b_ptr;
809 			}
810 
811 			if (rec1_f_len <= cur_bytes) {
812 				rec1_byte = dtype_get_pad_char(mtype, prtype);
813 
814 				if (rec1_byte == ULINT_UNDEFINED) {
815 					return(-1);
816 				}
817 			} else {
818 				rec1_byte = *rec1_b_ptr;
819 			}
820 
821 			if (rec1_byte == rec2_byte) {
822 				/* If the bytes are equal, they will remain
823 				such even after the collation transformation
824 				below */
825 
826 				continue;
827 			}
828 
829 			if (mtype <= DATA_CHAR
830 			    || (mtype == DATA_BLOB
831 				&& !(prtype & DATA_BINARY_TYPE))) {
832 
833 				rec1_byte = cmp_collate(rec1_byte);
834 				rec2_byte = cmp_collate(rec2_byte);
835 			}
836 
837 			if (rec1_byte < rec2_byte) {
838 				return(-1);
839 			} else if (rec1_byte > rec2_byte) {
840 				return(1);
841 			}
842 		}
843 next_field:
844 		continue;
845 	}
846 
847 	/* If we ran out of fields, rec1 was equal to rec2. */
848 	return(0);
849 }
850 
851 /*************************************************************//**
852 This function is used to compare two physical records. Only the common
853 first fields are compared, and if an externally stored field is
854 encountered, then 0 is returned.
855 @return 1, 0, -1 if rec1 is greater, equal, less, respectively */
856 UNIV_INTERN
857 int
cmp_rec_rec_with_match(const rec_t * rec1,const rec_t * rec2,const ulint * offsets1,const ulint * offsets2,dict_index_t * index,ibool nulls_unequal,ulint * matched_fields,ulint * matched_bytes)858 cmp_rec_rec_with_match(
859 /*===================*/
860 	const rec_t*	rec1,	/*!< in: physical record */
861 	const rec_t*	rec2,	/*!< in: physical record */
862 	const ulint*	offsets1,/*!< in: rec_get_offsets(rec1, index) */
863 	const ulint*	offsets2,/*!< in: rec_get_offsets(rec2, index) */
864 	dict_index_t*	index,	/*!< in: data dictionary index */
865 	ibool		nulls_unequal,
866 				/* in: TRUE if this is for index statistics
867 				cardinality estimation, and innodb_stats_method
868 				is "nulls_unequal" or "nulls_ignored" */
869 	ulint*		matched_fields, /*!< in/out: number of already completely
870 				matched fields; when the function returns,
871 				contains the value the for current
872 				comparison */
873 	ulint*		matched_bytes) /*!< in/out: number of already matched
874 				bytes within the first field not completely
875 				matched; when the function returns, contains
876 				the value for the current comparison */
877 {
878 	ulint		rec1_n_fields;	/* the number of fields in rec */
879 	ulint		rec1_f_len;	/* length of current field in rec */
880 	const byte*	rec1_b_ptr;	/* pointer to the current byte
881 					in rec field */
882 	ulint		rec1_byte;	/* value of current byte to be
883 					compared in rec */
884 	ulint		rec2_n_fields;	/* the number of fields in rec */
885 	ulint		rec2_f_len;	/* length of current field in rec */
886 	const byte*	rec2_b_ptr;	/* pointer to the current byte
887 					in rec field */
888 	ulint		rec2_byte;	/* value of current byte to be
889 					compared in rec */
890 	ulint		cur_field;	/* current field number */
891 	ulint		cur_bytes;	/* number of already matched
892 					bytes in current field */
893 	int		ret = 0;	/* return value */
894 	ulint		comp;
895 
896 	ut_ad(rec1 && rec2 && index);
897 	ut_ad(rec_offs_validate(rec1, index, offsets1));
898 	ut_ad(rec_offs_validate(rec2, index, offsets2));
899 	ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
900 
901 	comp = rec_offs_comp(offsets1);
902 	rec1_n_fields = rec_offs_n_fields(offsets1);
903 	rec2_n_fields = rec_offs_n_fields(offsets2);
904 
905 	cur_field = *matched_fields;
906 	cur_bytes = *matched_bytes;
907 
908 	/* Match fields in a loop */
909 
910 	while ((cur_field < rec1_n_fields) && (cur_field < rec2_n_fields)) {
911 
912 		ulint	mtype;
913 		ulint	prtype;
914 
915 		if (UNIV_UNLIKELY(index->type & DICT_UNIVERSAL)) {
916 			/* This is for the insert buffer B-tree. */
917 			mtype = DATA_BINARY;
918 			prtype = 0;
919 		} else {
920 			const dict_col_t*	col
921 				= dict_index_get_nth_col(index, cur_field);
922 
923 			mtype = col->mtype;
924 			prtype = col->prtype;
925 		}
926 
927 		rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
928 					       cur_field, &rec1_f_len);
929 		rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
930 					       cur_field, &rec2_f_len);
931 
932 		if (cur_bytes == 0) {
933 			if (cur_field == 0) {
934 				/* Test if rec is the predefined minimum
935 				record */
936 				if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
937 						  & REC_INFO_MIN_REC_FLAG)) {
938 
939 					if (!(rec_get_info_bits(rec2, comp)
940 					      & REC_INFO_MIN_REC_FLAG)) {
941 						ret = -1;
942 					}
943 
944 					goto order_resolved;
945 
946 				} else if (UNIV_UNLIKELY
947 					   (rec_get_info_bits(rec2, comp)
948 					    & REC_INFO_MIN_REC_FLAG)) {
949 
950 					ret = 1;
951 
952 					goto order_resolved;
953 				}
954 			}
955 
956 			if (rec_offs_nth_extern(offsets1, cur_field)
957 			    || rec_offs_nth_extern(offsets2, cur_field)) {
958 				/* We do not compare to an externally
959 				stored field */
960 
961 				goto order_resolved;
962 			}
963 
964 			if (rec1_f_len == UNIV_SQL_NULL
965 			    || rec2_f_len == UNIV_SQL_NULL) {
966 
967 				if (rec1_f_len == rec2_f_len) {
968 					/* This is limited to stats collection,
969 					cannot use it for regular search */
970 					if (nulls_unequal) {
971 						ret = -1;
972 					} else {
973 						goto next_field;
974 					}
975 				} else if (rec2_f_len == UNIV_SQL_NULL) {
976 
977 					/* We define the SQL null to be the
978 					smallest possible value of a field
979 					in the alphabetical order */
980 
981 					ret = 1;
982 				} else {
983 					ret = -1;
984 				}
985 
986 				goto order_resolved;
987 			}
988 		}
989 
990 		if (mtype >= DATA_FLOAT
991 		    || (mtype == DATA_BLOB
992 			&& 0 == (prtype & DATA_BINARY_TYPE)
993 			&& dtype_get_charset_coll(prtype)
994 			!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
995 
996 			ret = cmp_whole_field(mtype, prtype,
997 					      rec1_b_ptr,
998 					      (unsigned) rec1_f_len,
999 					      rec2_b_ptr,
1000 					      (unsigned) rec2_f_len);
1001 			if (ret != 0) {
1002 				cur_bytes = 0;
1003 
1004 				goto order_resolved;
1005 			} else {
1006 				goto next_field;
1007 			}
1008 		}
1009 
1010 		/* Set the pointers at the current byte */
1011 		rec1_b_ptr = rec1_b_ptr + cur_bytes;
1012 		rec2_b_ptr = rec2_b_ptr + cur_bytes;
1013 
1014 		/* Compare then the fields */
1015 		for (;;) {
1016 			if (rec2_f_len <= cur_bytes) {
1017 
1018 				if (rec1_f_len <= cur_bytes) {
1019 
1020 					goto next_field;
1021 				}
1022 
1023 				rec2_byte = dtype_get_pad_char(mtype, prtype);
1024 
1025 				if (rec2_byte == ULINT_UNDEFINED) {
1026 					ret = 1;
1027 
1028 					goto order_resolved;
1029 				}
1030 			} else {
1031 				rec2_byte = *rec2_b_ptr;
1032 			}
1033 
1034 			if (rec1_f_len <= cur_bytes) {
1035 				rec1_byte = dtype_get_pad_char(mtype, prtype);
1036 
1037 				if (rec1_byte == ULINT_UNDEFINED) {
1038 					ret = -1;
1039 
1040 					goto order_resolved;
1041 				}
1042 			} else {
1043 				rec1_byte = *rec1_b_ptr;
1044 			}
1045 
1046 			if (rec1_byte == rec2_byte) {
1047 				/* If the bytes are equal, they will remain
1048 				such even after the collation transformation
1049 				below */
1050 
1051 				goto next_byte;
1052 			}
1053 
1054 			if (mtype <= DATA_CHAR
1055 			    || (mtype == DATA_BLOB
1056 				&& !(prtype & DATA_BINARY_TYPE))) {
1057 
1058 				rec1_byte = cmp_collate(rec1_byte);
1059 				rec2_byte = cmp_collate(rec2_byte);
1060 			}
1061 
1062 			if (rec1_byte < rec2_byte) {
1063 				ret = -1;
1064 				goto order_resolved;
1065 			} else if (rec1_byte > rec2_byte) {
1066 				ret = 1;
1067 				goto order_resolved;
1068 			}
1069 next_byte:
1070 			/* Next byte */
1071 
1072 			cur_bytes++;
1073 			rec1_b_ptr++;
1074 			rec2_b_ptr++;
1075 		}
1076 
1077 next_field:
1078 		cur_field++;
1079 		cur_bytes = 0;
1080 	}
1081 
1082 	ut_ad(cur_bytes == 0);
1083 
1084 	/* If we ran out of fields, rec1 was equal to rec2 up
1085 	to the common fields */
1086 	ut_ad(ret == 0);
1087 order_resolved:
1088 
1089 	ut_ad((ret >= - 1) && (ret <= 1));
1090 
1091 	*matched_fields = cur_field;
1092 	*matched_bytes = cur_bytes;
1093 
1094 	return(ret);
1095 }
1096 
1097 #ifdef UNIV_DEBUG
1098 /*************************************************************//**
1099 Used in debug checking of cmp_dtuple_... .
1100 This function is used to compare a data tuple to a physical record. If
1101 dtuple has n fields then rec must have either m >= n fields, or it must
1102 differ from dtuple in some of the m fields rec has. If encounters an
1103 externally stored field, returns 0.
1104 @return 1, 0, -1, if dtuple is greater, equal, less than rec,
1105 respectively, when only the common first fields are compared */
1106 static
1107 int
cmp_debug_dtuple_rec_with_match(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,ulint * matched_fields)1108 cmp_debug_dtuple_rec_with_match(
1109 /*============================*/
1110 	const dtuple_t*	dtuple,	/*!< in: data tuple */
1111 	const rec_t*	rec,	/*!< in: physical record which differs from
1112 				dtuple in some of the common fields, or which
1113 				has an equal number or more fields than
1114 				dtuple */
1115 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1116 	ulint*		matched_fields) /*!< in/out: number of already
1117 				completely matched fields; when function
1118 				returns, contains the value for current
1119 				comparison */
1120 {
1121 	const dfield_t*	dtuple_field;	/* current field in logical record */
1122 	ulint		dtuple_f_len;	/* the length of the current field
1123 					in the logical record */
1124 	const byte*	dtuple_f_data;	/* pointer to the current logical
1125 					field data */
1126 	ulint		rec_f_len;	/* length of current field in rec */
1127 	const byte*	rec_f_data;	/* pointer to the current rec field */
1128 	int		ret = 3333;	/* return value */
1129 	ulint		cur_field;	/* current field number */
1130 
1131 	ut_ad(dtuple && rec && matched_fields);
1132 	ut_ad(dtuple_check_typed(dtuple));
1133 	ut_ad(rec_offs_validate(rec, NULL, offsets));
1134 
1135 	ut_ad(*matched_fields <= dtuple_get_n_fields_cmp(dtuple));
1136 	ut_ad(*matched_fields <= rec_offs_n_fields(offsets));
1137 
1138 	cur_field = *matched_fields;
1139 
1140 	if (cur_field == 0) {
1141 		if (UNIV_UNLIKELY
1142 		    (rec_get_info_bits(rec, rec_offs_comp(offsets))
1143 		     & REC_INFO_MIN_REC_FLAG)) {
1144 
1145 			ret = !(dtuple_get_info_bits(dtuple)
1146 				& REC_INFO_MIN_REC_FLAG);
1147 
1148 			goto order_resolved;
1149 		}
1150 
1151 		if (UNIV_UNLIKELY
1152 		    (dtuple_get_info_bits(dtuple) & REC_INFO_MIN_REC_FLAG)) {
1153 			ret = -1;
1154 
1155 			goto order_resolved;
1156 		}
1157 	}
1158 
1159 	/* Match fields in a loop; stop if we run out of fields in dtuple */
1160 
1161 	while (cur_field < dtuple_get_n_fields_cmp(dtuple)) {
1162 
1163 		ulint	mtype;
1164 		ulint	prtype;
1165 
1166 		dtuple_field = dtuple_get_nth_field(dtuple, cur_field);
1167 		{
1168 			const dtype_t*	type
1169 				= dfield_get_type(dtuple_field);
1170 
1171 			mtype = type->mtype;
1172 			prtype = type->prtype;
1173 		}
1174 
1175 		dtuple_f_data = dfield_get_data(dtuple_field);
1176 		dtuple_f_len = dfield_get_len(dtuple_field);
1177 
1178 		rec_f_data = rec_get_nth_field(rec, offsets,
1179 					       cur_field, &rec_f_len);
1180 
1181 		if (rec_offs_nth_extern(offsets, cur_field)) {
1182 			/* We do not compare to an externally stored field */
1183 
1184 			ret = 0;
1185 
1186 			goto order_resolved;
1187 		}
1188 
1189 		ret = cmp_data_data(mtype, prtype, dtuple_f_data, dtuple_f_len,
1190 				    rec_f_data, rec_f_len);
1191 		if (ret != 0) {
1192 			goto order_resolved;
1193 		}
1194 
1195 		cur_field++;
1196 	}
1197 
1198 	ret = 0;	/* If we ran out of fields, dtuple was equal to rec
1199 			up to the common fields */
1200 order_resolved:
1201 	ut_ad((ret >= - 1) && (ret <= 1));
1202 
1203 	*matched_fields = cur_field;
1204 
1205 	return(ret);
1206 }
1207 #endif /* UNIV_DEBUG */
1208