1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /********************************************************************//**
21 @file rem/rem0rec.cc
22 Record manager
23 
24 Created 5/30/1994 Heikki Tuuri
25 *************************************************************************/
26 
27 #include "rem0rec.h"
28 #include "page0page.h"
29 #include "mtr0log.h"
30 #include "fts0fts.h"
31 #include "trx0sys.h"
32 #include "row0log.h"
33 
34 /*			PHYSICAL RECORD (OLD STYLE)
35 			===========================
36 
37 The physical record, which is the data type of all the records
38 found in index pages of the database, has the following format
39 (lower addresses and more significant bits inside a byte are below
40 represented on a higher text line):
41 
42 | offset of the end of the last field of data, the most significant
43   bit is set to 1 if and only if the field is SQL-null,
44   if the offset is 2-byte, then the second most significant
45   bit is set to 1 if the field is stored on another page:
46   mostly this will occur in the case of big BLOB fields |
47 ...
48 | offset of the end of the first field of data + the SQL-null bit |
49 | 4 bits used to delete mark a record, and mark a predefined
50   minimum record in alphabetical order |
51 | 4 bits giving the number of records owned by this record
52   (this term is explained in page0page.h) |
53 | 13 bits giving the order number of this record in the
54   heap of the index page |
55 | 10 bits giving the number of fields in this record |
56 | 1 bit which is set to 1 if the offsets above are given in
57   one byte format, 0 if in two byte format |
58 | two bytes giving an absolute pointer to the next record in the page |
59 ORIGIN of the record
60 | first field of data |
61 ...
62 | last field of data |
63 
64 The origin of the record is the start address of the first field
65 of data. The offsets are given relative to the origin.
66 The offsets of the data fields are stored in an inverted
67 order because then the offset of the first fields are near the
68 origin, giving maybe a better processor cache hit rate in searches.
69 
70 The offsets of the data fields are given as one-byte
71 (if there are less than 127 bytes of data in the record)
72 or two-byte unsigned integers. The most significant bit
73 is not part of the offset, instead it indicates the SQL-null
74 if the bit is set to 1. */
75 
76 /*			PHYSICAL RECORD (NEW STYLE)
77 			===========================
78 
79 The physical record, which is the data type of all the records
80 found in index pages of the database, has the following format
81 (lower addresses and more significant bits inside a byte are below
82 represented on a higher text line):
83 
84 | length of the last non-null variable-length field of data:
85   if the maximum length is 255, one byte; otherwise,
86   0xxxxxxx (one byte, length=0..127), or 1exxxxxxxxxxxxxx (two bytes,
87   length=128..16383, extern storage flag) |
88 ...
89 | length of first variable-length field of data |
90 | SQL-null flags (1 bit per nullable field), padded to full bytes |
91 | 4 bits used to delete mark a record, and mark a predefined
92   minimum record in alphabetical order |
93 | 4 bits giving the number of records owned by this record
94   (this term is explained in page0page.h) |
95 | 13 bits giving the order number of this record in the
96   heap of the index page |
97 | 3 bits record type: 000=conventional, 001=node pointer (inside B-tree),
98   010=infimum, 011=supremum, 1xx=reserved |
99 | two bytes giving a relative pointer to the next record in the page |
100 ORIGIN of the record
101 | first field of data |
102 ...
103 | last field of data |
104 
105 The origin of the record is the start address of the first field
106 of data. The offsets are given relative to the origin.
107 The offsets of the data fields are stored in an inverted
108 order because then the offset of the first fields are near the
109 origin, giving maybe a better processor cache hit rate in searches.
110 
111 The offsets of the data fields are given as one-byte
112 (if there are less than 127 bytes of data in the record)
113 or two-byte unsigned integers. The most significant bit
114 is not part of the offset, instead it indicates the SQL-null
115 if the bit is set to 1. */
116 
117 /* CANONICAL COORDINATES. A record can be seen as a single
118 string of 'characters' in the following way: catenate the bytes
119 in each field, in the order of fields. An SQL-null field
120 is taken to be an empty sequence of bytes. Then after
121 the position of each field insert in the string
122 the 'character' <FIELD-END>, except that after an SQL-null field
123 insert <NULL-FIELD-END>. Now the ordinal position of each
124 byte in this canonical string is its canonical coordinate.
125 So, for the record ("AA", SQL-NULL, "BB", ""), the canonical
126 string is "AA<FIELD_END><NULL-FIELD-END>BB<FIELD-END><FIELD-END>".
127 We identify prefixes (= initial segments) of a record
128 with prefixes of the canonical string. The canonical
129 length of the prefix is the length of the corresponding
130 prefix of the canonical string. The canonical length of
131 a record is the length of its canonical string.
132 
133 For example, the maximal common prefix of records
134 ("AA", SQL-NULL, "BB", "C") and ("AA", SQL-NULL, "B", "C")
135 is "AA<FIELD-END><NULL-FIELD-END>B", and its canonical
136 length is 5.
137 
138 A complete-field prefix of a record is a prefix which ends at the
139 end of some field (containing also <FIELD-END>).
140 A record is a complete-field prefix of another record, if
141 the corresponding canonical strings have the same property. */
142 
143 /***************************************************************//**
144 Validates the consistency of an old-style physical record.
145 @return TRUE if ok */
146 static
147 ibool
148 rec_validate_old(
149 /*=============*/
150 	const rec_t*	rec);	/*!< in: physical record */
151 
152 /******************************************************//**
153 Determine how many of the first n columns in a compact
154 physical record are stored externally.
155 @return number of externally stored columns */
156 ulint
rec_get_n_extern_new(const rec_t * rec,const dict_index_t * index,ulint n)157 rec_get_n_extern_new(
158 /*=================*/
159 	const rec_t*		rec,	/*!< in: compact physical record */
160 	const dict_index_t*	index,	/*!< in: record descriptor */
161 	ulint			n)	/*!< in: number of columns to scan */
162 {
163 	const byte*	nulls;
164 	const byte*	lens;
165 	ulint		null_mask;
166 	ulint		n_extern;
167 	ulint		i;
168 
169 	ut_ad(dict_table_is_comp(index->table));
170 	ut_ad(!index->table->supports_instant() || index->is_dummy);
171 	ut_ad(!index->is_instant());
172 	ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY
173 	      || rec_get_status(rec) == REC_STATUS_COLUMNS_ADDED);
174 	ut_ad(n == ULINT_UNDEFINED || n <= dict_index_get_n_fields(index));
175 
176 	if (n == ULINT_UNDEFINED) {
177 		n = dict_index_get_n_fields(index);
178 	}
179 
180 	nulls = rec - (REC_N_NEW_EXTRA_BYTES + 1);
181 	lens = nulls - UT_BITS_IN_BYTES(index->n_nullable);
182 	null_mask = 1;
183 	n_extern = 0;
184 	i = 0;
185 
186 	/* read the lengths of fields 0..n */
187 	do {
188 		const dict_field_t*	field
189 			= dict_index_get_nth_field(index, i);
190 		const dict_col_t*	col
191 			= dict_field_get_col(field);
192 		ulint			len;
193 
194 		if (!(col->prtype & DATA_NOT_NULL)) {
195 			/* nullable field => read the null flag */
196 
197 			if (UNIV_UNLIKELY(!(byte) null_mask)) {
198 				nulls--;
199 				null_mask = 1;
200 			}
201 
202 			if (*nulls & null_mask) {
203 				null_mask <<= 1;
204 				/* No length is stored for NULL fields. */
205 				continue;
206 			}
207 			null_mask <<= 1;
208 		}
209 
210 		if (UNIV_UNLIKELY(!field->fixed_len)) {
211 			/* Variable-length field: read the length */
212 			len = *lens--;
213 			/* If the maximum length of the field is up
214 			to 255 bytes, the actual length is always
215 			stored in one byte. If the maximum length is
216 			more than 255 bytes, the actual length is
217 			stored in one byte for 0..127.  The length
218 			will be encoded in two bytes when it is 128 or
219 			more, or when the field is stored externally. */
220 			if (DATA_BIG_COL(col)) {
221 				if (len & 0x80) {
222 					/* 1exxxxxxx xxxxxxxx */
223 					if (len & 0x40) {
224 						n_extern++;
225 					}
226 					lens--;
227 				}
228 			}
229 		}
230 	} while (++i < n);
231 
232 	return(n_extern);
233 }
234 
235 /** Get the added field count in a REC_STATUS_COLUMNS_ADDED record.
236 @param[in,out]	header	variable header of a REC_STATUS_COLUMNS_ADDED record
237 @return	number of added fields */
rec_get_n_add_field(const byte * & header)238 static inline unsigned rec_get_n_add_field(const byte*& header)
239 {
240 	unsigned n_fields_add = *--header;
241 	if (n_fields_add < 0x80) {
242 		ut_ad(rec_get_n_add_field_len(n_fields_add) == 1);
243 		return n_fields_add;
244 	}
245 
246 	n_fields_add &= 0x7f;
247 	n_fields_add |= unsigned(*--header) << 7;
248 	ut_ad(n_fields_add < REC_MAX_N_FIELDS);
249 	ut_ad(rec_get_n_add_field_len(n_fields_add) == 2);
250 	return n_fields_add;
251 }
252 
253 /** Format of a leaf-page ROW_FORMAT!=REDUNDANT record */
254 enum rec_leaf_format {
255 	/** Temporary file record */
256 	REC_LEAF_TEMP,
257 	/** Temporary file record, with added columns
258 	(REC_STATUS_COLUMNS_ADDED) */
259 	REC_LEAF_TEMP_COLUMNS_ADDED,
260 	/** Normal (REC_STATUS_ORDINARY) */
261 	REC_LEAF_ORDINARY,
262 	/** With added columns (REC_STATUS_COLUMNS_ADDED) */
263 	REC_LEAF_COLUMNS_ADDED
264 };
265 
266 /** Determine the offset to each field in a leaf-page record
267 in ROW_FORMAT=COMPACT,DYNAMIC,COMPRESSED.
268 This is a special case of rec_init_offsets() and rec_get_offsets_func().
269 @param[in]	rec	leaf-page record
270 @param[in]	index	the index that the record belongs in
271 @param[in]	n_core	number of core fields (index->n_core_fields)
272 @param[in]	def_val	default values for non-core fields, or
273 			NULL to refer to index->fields[].col->def_val
274 @param[in,out]	offsets	offsets, with valid rec_offs_n_fields(offsets)
275 @param[in]	format	record format */
276 static inline
277 void
rec_init_offsets_comp_ordinary(const rec_t * rec,const dict_index_t * index,rec_offs * offsets,ulint n_core,const dict_col_t::def_t * def_val,rec_leaf_format format)278 rec_init_offsets_comp_ordinary(
279 	const rec_t*		rec,
280 	const dict_index_t*	index,
281 	rec_offs*		offsets,
282 	ulint			n_core,
283 	const dict_col_t::def_t*def_val,
284 	rec_leaf_format		format)
285 {
286 	rec_offs	offs		= 0;
287 	rec_offs	any		= 0;
288 	const byte*	nulls		= rec;
289 	const byte*	lens		= NULL;
290 	ulint		n_fields	= n_core;
291 	ulint		null_mask	= 1;
292 
293 	ut_ad(n_core > 0);
294 	ut_ad(index->n_core_fields >= n_core);
295 	ut_ad(index->n_fields >= index->n_core_fields);
296 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
297 	ut_ad(format == REC_LEAF_TEMP || format == REC_LEAF_TEMP_COLUMNS_ADDED
298 	      || dict_table_is_comp(index->table));
299 	ut_ad(format != REC_LEAF_TEMP_COLUMNS_ADDED
300 	      || index->n_fields == rec_offs_n_fields(offsets));
301 	ut_d(ulint n_null= 0);
302 
303 	const unsigned n_core_null_bytes = UNIV_UNLIKELY(index->n_core_fields
304 						      != n_core)
305 		? UT_BITS_IN_BYTES(unsigned(index->get_n_nullable(n_core)))
306 		: index->n_core_null_bytes;
307 
308 	switch (format) {
309 	case REC_LEAF_TEMP:
310 		if (dict_table_is_comp(index->table)) {
311 			/* No need to do adjust fixed_len=0. We only need to
312 			adjust it for ROW_FORMAT=REDUNDANT. */
313 			format = REC_LEAF_ORDINARY;
314 		}
315 		goto ordinary;
316 	case REC_LEAF_ORDINARY:
317 		nulls -= REC_N_NEW_EXTRA_BYTES;
318 ordinary:
319 		lens = --nulls - n_core_null_bytes;
320 
321 		ut_d(n_null = std::min(n_core_null_bytes * 8U,
322 				       index->n_nullable));
323 		break;
324 	case REC_LEAF_COLUMNS_ADDED:
325 		/* We would have !index->is_instant() when rolling back
326 		an instant ADD COLUMN operation. */
327 		nulls -= REC_N_NEW_EXTRA_BYTES;
328 		ut_ad(index->is_instant());
329 		/* fall through */
330 	case REC_LEAF_TEMP_COLUMNS_ADDED:
331 		n_fields = n_core + 1 + rec_get_n_add_field(nulls);
332 		ut_ad(n_fields <= index->n_fields);
333 		const ulint n_nullable = index->get_n_nullable(n_fields);
334 		const ulint n_null_bytes = UT_BITS_IN_BYTES(n_nullable);
335 		ut_d(n_null = n_nullable);
336 		ut_ad(n_null <= index->n_nullable);
337 		ut_ad(n_null_bytes >= n_core_null_bytes
338 		      || n_core < index->n_core_fields);
339 		lens = --nulls - n_null_bytes;
340 	}
341 
342 #ifdef UNIV_DEBUG
343 	/* We cannot invoke rec_offs_make_valid() if format==REC_LEAF_TEMP.
344 	Similarly, rec_offs_validate() will fail in that case, because
345 	it invokes rec_get_status(). */
346 	memcpy(&offsets[RECORD_OFFSET], &rec, sizeof(rec));
347 	memcpy(&offsets[INDEX_OFFSET], &index, sizeof(index));
348 #endif /* UNIV_DEBUG */
349 
350 	/* read the lengths of fields 0..n_fields */
351 	ulint i = 0;
352 	do {
353 		const dict_field_t*	field
354 			= dict_index_get_nth_field(index, i);
355 		const dict_col_t*	col
356 			= dict_field_get_col(field);
357 		rec_offs		len;
358 
359 		/* set default value flag */
360 		if (i < n_fields) {
361 		} else if (def_val) {
362 			const dict_col_t::def_t& d = def_val[i - n_core];
363 			if (!d.data) {
364 				len = combine(offs, SQL_NULL);
365 				ut_ad(d.len == UNIV_SQL_NULL);
366 			} else {
367 				len = combine(offs, DEFAULT);
368 				any |= REC_OFFS_DEFAULT;
369 			}
370 
371 			goto resolved;
372 		} else {
373 			ulint dlen;
374 			if (!index->instant_field_value(i, &dlen)) {
375 				len = combine(offs, SQL_NULL);
376 				ut_ad(dlen == UNIV_SQL_NULL);
377 			} else {
378 				len = combine(offs, DEFAULT);
379 				any |= REC_OFFS_DEFAULT;
380 			}
381 
382 			goto resolved;
383 		}
384 
385 		if (!(col->prtype & DATA_NOT_NULL)) {
386 			/* nullable field => read the null flag */
387 			ut_ad(n_null--);
388 
389 			if (UNIV_UNLIKELY(!(byte) null_mask)) {
390 				nulls--;
391 				null_mask = 1;
392 			}
393 
394 			if (*nulls & null_mask) {
395 				null_mask <<= 1;
396 				/* No length is stored for NULL fields.
397 				We do not advance offs, and we set
398 				the length to zero and enable the
399 				SQL NULL flag in offsets[]. */
400 				len = combine(offs, SQL_NULL);
401 				goto resolved;
402 			}
403 			null_mask <<= 1;
404 		}
405 
406 		if (!field->fixed_len
407 		    || (format == REC_LEAF_TEMP
408 			&& !dict_col_get_fixed_size(col, true))) {
409 			/* Variable-length field: read the length */
410 			len = *lens--;
411 			/* If the maximum length of the field is up
412 			to 255 bytes, the actual length is always
413 			stored in one byte. If the maximum length is
414 			more than 255 bytes, the actual length is
415 			stored in one byte for 0..127.  The length
416 			will be encoded in two bytes when it is 128 or
417 			more, or when the field is stored externally. */
418 			if ((len & 0x80) && DATA_BIG_COL(col)) {
419 				/* 1exxxxxxx xxxxxxxx */
420 				len <<= 8;
421 				len |= *lens--;
422 
423 				offs += get_value(len);
424 				if (UNIV_UNLIKELY(len & 0x4000)) {
425 					ut_ad(dict_index_is_clust(index));
426 					any |= REC_OFFS_EXTERNAL;
427 					len = combine(offs, STORED_OFFPAGE);
428 				} else {
429 					len = offs;
430 				}
431 
432 				goto resolved;
433 			}
434 
435 			len = offs += len;
436 		} else {
437 			len = offs += field->fixed_len;
438 		}
439 resolved:
440 		rec_offs_base(offsets)[i + 1] = len;
441 	} while (++i < rec_offs_n_fields(offsets));
442 
443 	*rec_offs_base(offsets)
444 		= static_cast<rec_offs>(rec - (lens + 1)) | REC_OFFS_COMPACT | any;
445 }
446 
447 #ifdef UNIV_DEBUG
448 /** Update debug data in offsets, in order to tame rec_offs_validate().
449 @param[in]	rec	record
450 @param[in]	index	the index that the record belongs in
451 @param[in]	leaf	whether the record resides in a leaf page
452 @param[in,out]	offsets	offsets from rec_get_offsets() to adjust */
453 void
rec_offs_make_valid(const rec_t * rec,const dict_index_t * index,bool leaf,rec_offs * offsets)454 rec_offs_make_valid(
455 	const rec_t*		rec,
456 	const dict_index_t*	index,
457 	bool			leaf,
458 	rec_offs*		offsets)
459 {
460 	ut_ad(rec_offs_n_fields(offsets)
461 	      <= (leaf
462 		  ? dict_index_get_n_fields(index)
463 		  : dict_index_get_n_unique_in_tree_nonleaf(index) + 1)
464 	      || index->is_dummy || dict_index_is_ibuf(index));
465 	const bool is_user_rec = (dict_table_is_comp(index->table)
466 				  ? rec_get_heap_no_new(rec)
467 				  : rec_get_heap_no_old(rec))
468 		>= PAGE_HEAP_NO_USER_LOW;
469 	ulint n = rec_get_n_fields(rec, index);
470 	/* The infimum and supremum records carry 1 field. */
471 	ut_ad(is_user_rec || n == 1);
472 	ut_ad(is_user_rec || rec_offs_n_fields(offsets) == 1);
473 	ut_ad(!is_user_rec
474 	      || (n + (index->id == DICT_INDEXES_ID)) >= index->n_core_fields
475 	      || n >= rec_offs_n_fields(offsets));
476 	for (; n < rec_offs_n_fields(offsets); n++) {
477 		ut_ad(leaf);
478 		ut_ad(get_type(rec_offs_base(offsets)[1 + n]) == DEFAULT);
479 	}
480 	memcpy(&offsets[RECORD_OFFSET], &rec, sizeof(rec));
481 	memcpy(&offsets[INDEX_OFFSET], &index, sizeof(index));
482 }
483 
484 /** Validate offsets returned by rec_get_offsets().
485 @param[in]	rec	record, or NULL
486 @param[in]	index	the index that the record belongs in, or NULL
487 @param[in,out]	offsets	the offsets of the record
488 @return true */
489 bool
rec_offs_validate(const rec_t * rec,const dict_index_t * index,const rec_offs * offsets)490 rec_offs_validate(
491 	const rec_t*		rec,
492 	const dict_index_t*	index,
493 	const rec_offs*		offsets)
494 {
495 	ulint	i	= rec_offs_n_fields(offsets);
496 	ulint	last	= ULINT_MAX;
497 	ulint	comp	= *rec_offs_base(offsets) & REC_OFFS_COMPACT;
498 
499 	if (rec) {
500 		ut_ad(!memcmp(&rec, &offsets[RECORD_OFFSET], sizeof(rec)));
501 		if (!comp) {
502 			const bool is_user_rec = rec_get_heap_no_old(rec)
503 				>= PAGE_HEAP_NO_USER_LOW;
504 			ulint n = rec_get_n_fields_old(rec);
505 			/* The infimum and supremum records carry 1 field. */
506 			ut_ad(is_user_rec || n == 1);
507 			ut_ad(is_user_rec || i == 1);
508 			ut_ad(!is_user_rec || n >= i || !index
509 			      || (n + (index->id == DICT_INDEXES_ID))
510 			      >= index->n_core_fields);
511 			for (; n < i; n++) {
512 				ut_ad(get_type(rec_offs_base(offsets)[1 + n])
513 				      == DEFAULT);
514 			}
515 		}
516 	}
517 	if (index) {
518 		ulint max_n_fields;
519 		ut_ad(!memcmp(&index, &offsets[INDEX_OFFSET], sizeof(index)));
520 		max_n_fields = ut_max(
521 			dict_index_get_n_fields(index),
522 			dict_index_get_n_unique_in_tree(index) + 1);
523 		if (comp && rec) {
524 			switch (rec_get_status(rec)) {
525 			case REC_STATUS_COLUMNS_ADDED:
526 			case REC_STATUS_ORDINARY:
527 				break;
528 			case REC_STATUS_NODE_PTR:
529 				max_n_fields = dict_index_get_n_unique_in_tree(
530 					index) + 1;
531 				break;
532 			case REC_STATUS_INFIMUM:
533 			case REC_STATUS_SUPREMUM:
534 				max_n_fields = 1;
535 				break;
536 			default:
537 				ut_error;
538 			}
539 		}
540 		/* index->n_def == 0 for dummy indexes if !comp */
541 		ut_a(!comp || index->n_def);
542 		ut_a(!index->n_def || i <= max_n_fields);
543 	}
544 	while (i--) {
545 		ulint curr = get_value(rec_offs_base(offsets)[1 + i]);
546 		ut_a(curr <= last);
547 		last = curr;
548 	}
549 	return(TRUE);
550 }
551 #endif /* UNIV_DEBUG */
552 
553 /** Determine the offsets to each field in the record.
554  The offsets are written to a previously allocated array of
555 ulint, where rec_offs_n_fields(offsets) has been initialized to the
556 number of fields in the record.	 The rest of the array will be
557 initialized by this function.  rec_offs_base(offsets)[0] will be set
558 to the extra size (if REC_OFFS_COMPACT is set, the record is in the
559 new format; if REC_OFFS_EXTERNAL is set, the record contains externally
560 stored columns), and rec_offs_base(offsets)[1..n_fields] will be set to
561 offsets past the end of fields 0..n_fields, or to the beginning of
562 fields 1..n_fields+1.  When the type of the offset at [i+1]
563 is (SQL_NULL), the field i is NULL. When the type of the offset at [i+1]
564 is (STORED_OFFPAGE), the field i is stored externally.
565 @param[in]	rec	record
566 @param[in]	index	the index that the record belongs in
567 @param[in]	n_core	0, or index->n_core_fields for leaf page
568 @param[in,out]	offsets	array of offsets, with valid rec_offs_n_fields() */
569 static
570 void
rec_init_offsets(const rec_t * rec,const dict_index_t * index,ulint n_core,rec_offs * offsets)571 rec_init_offsets(
572 	const rec_t*		rec,
573 	const dict_index_t*	index,
574 	ulint			n_core,
575 	rec_offs*		offsets)
576 {
577 	ulint	i	= 0;
578 	rec_offs	offs;
579 
580 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
581 	ut_d(memcpy(&offsets[RECORD_OFFSET], &rec, sizeof(rec)));
582 	ut_d(memcpy(&offsets[INDEX_OFFSET], &index, sizeof(index)));
583 	ut_ad(index->n_fields >= n_core);
584 	ut_ad(index->n_core_fields >= n_core);
585 
586 	if (dict_table_is_comp(index->table)) {
587 		const byte*	nulls;
588 		const byte*	lens;
589 		dict_field_t*	field;
590 		ulint		null_mask;
591 		rec_comp_status_t status = rec_get_status(rec);
592 		ulint		n_node_ptr_field = ULINT_UNDEFINED;
593 
594 		switch (UNIV_EXPECT(status, REC_STATUS_ORDINARY)) {
595 		case REC_STATUS_INFIMUM:
596 		case REC_STATUS_SUPREMUM:
597 			/* the field is 8 bytes long */
598 			rec_offs_base(offsets)[0]
599 				= REC_N_NEW_EXTRA_BYTES | REC_OFFS_COMPACT;
600 			rec_offs_base(offsets)[1] = 8;
601 			return;
602 		case REC_STATUS_NODE_PTR:
603 			ut_ad(!n_core);
604 			n_node_ptr_field
605 				= dict_index_get_n_unique_in_tree_nonleaf(
606 					index);
607 			break;
608 		case REC_STATUS_COLUMNS_ADDED:
609 			ut_ad(index->is_instant());
610 			rec_init_offsets_comp_ordinary(rec, index, offsets,
611 						       n_core,
612 						       NULL,
613 						       REC_LEAF_COLUMNS_ADDED);
614 			return;
615 		case REC_STATUS_ORDINARY:
616 			rec_init_offsets_comp_ordinary(rec, index, offsets,
617 						       n_core,
618 						       NULL,
619 						       REC_LEAF_ORDINARY);
620 			return;
621 		}
622 
623 		/* The n_nullable flags in the clustered index node pointer
624 		records in ROW_FORMAT=COMPACT or ROW_FORMAT=DYNAMIC must
625 		reflect the number of 'core columns'. These flags are
626 		useless garbage, and they are only reserved because of
627 		file format compatibility.
628 		(Clustered index node pointer records only contain the
629 		PRIMARY KEY columns, which are always NOT NULL,
630 		so we should have used n_nullable=0.) */
631 		ut_ad(index->n_core_fields > 0);
632 
633 		nulls = rec - (REC_N_NEW_EXTRA_BYTES + 1);
634 		lens = nulls - index->n_core_null_bytes;
635 		offs = 0;
636 		null_mask = 1;
637 
638 		/* read the lengths of fields 0..n */
639 		do {
640 			rec_offs len;
641 			if (UNIV_UNLIKELY(i == n_node_ptr_field)) {
642 				len = offs += REC_NODE_PTR_SIZE;
643 				goto resolved;
644 			}
645 
646 			field = dict_index_get_nth_field(index, i);
647 			if (!(dict_field_get_col(field)->prtype
648 			      & DATA_NOT_NULL)) {
649 				/* nullable field => read the null flag */
650 
651 				if (UNIV_UNLIKELY(!(byte) null_mask)) {
652 					nulls--;
653 					null_mask = 1;
654 				}
655 
656 				if (*nulls & null_mask) {
657 					null_mask <<= 1;
658 					/* No length is stored for NULL fields.
659 					We do not advance offs, and we set
660 					the length to zero and enable the
661 					SQL NULL flag in offsets[]. */
662 					len = combine(offs, SQL_NULL);
663 					goto resolved;
664 				}
665 				null_mask <<= 1;
666 			}
667 
668 			if (UNIV_UNLIKELY(!field->fixed_len)) {
669 				const dict_col_t*	col
670 					= dict_field_get_col(field);
671 				/* Variable-length field: read the length */
672 				len = *lens--;
673 				/* If the maximum length of the field
674 				is up to 255 bytes, the actual length
675 				is always stored in one byte. If the
676 				maximum length is more than 255 bytes,
677 				the actual length is stored in one
678 				byte for 0..127.  The length will be
679 				encoded in two bytes when it is 128 or
680 				more, or when the field is stored
681 				externally. */
682 				if (DATA_BIG_COL(col)) {
683 					if (len & 0x80) {
684 						/* 1exxxxxxx xxxxxxxx */
685 
686 						len <<= 8;
687 						len |= *lens--;
688 
689 						/* B-tree node pointers
690 						must not contain externally
691 						stored columns.  Thus
692 						the "e" flag must be 0. */
693 						ut_a(!(len & 0x4000));
694 						offs += get_value(len);
695 						len = offs;
696 
697 						goto resolved;
698 					}
699 				}
700 
701 				len = offs += len;
702 			} else {
703 				len = offs += field->fixed_len;
704 			}
705 resolved:
706 			rec_offs_base(offsets)[i + 1] = len;
707 		} while (++i < rec_offs_n_fields(offsets));
708 
709 		*rec_offs_base(offsets)
710 			= static_cast<rec_offs>(rec - (lens + 1))
711 			  | REC_OFFS_COMPACT;
712 	} else {
713 		/* Old-style record: determine extra size and end offsets */
714 		offs = REC_N_OLD_EXTRA_BYTES;
715 		const ulint n_fields = rec_get_n_fields_old(rec);
716 		const ulint n = std::min(n_fields, rec_offs_n_fields(offsets));
717 		rec_offs any;
718 
719 		if (rec_get_1byte_offs_flag(rec)) {
720 			offs += static_cast<rec_offs>(n_fields);
721 			any = offs;
722 			/* Determine offsets to fields */
723 			do {
724 				offs = rec_1_get_field_end_info(rec, i);
725 				if (offs & REC_1BYTE_SQL_NULL_MASK) {
726 					offs &= ~REC_1BYTE_SQL_NULL_MASK;
727 					set_type(offs, SQL_NULL);
728 				}
729 				rec_offs_base(offsets)[1 + i] = offs;
730 			} while (++i < n);
731 		} else {
732 			offs += 2 * static_cast<rec_offs>(n_fields);
733 			any = offs;
734 			/* Determine offsets to fields */
735 			do {
736 				offs = rec_2_get_field_end_info(rec, i);
737 				if (offs & REC_2BYTE_SQL_NULL_MASK) {
738 					offs &= ~REC_2BYTE_SQL_NULL_MASK;
739 					set_type(offs, SQL_NULL);
740 				}
741 				if (offs & REC_2BYTE_EXTERN_MASK) {
742 					offs &= ~REC_2BYTE_EXTERN_MASK;
743 					set_type(offs, STORED_OFFPAGE);
744 					any |= REC_OFFS_EXTERNAL;
745 				}
746 				rec_offs_base(offsets)[1 + i] = offs;
747 			} while (++i < n);
748 		}
749 
750 		if (i < rec_offs_n_fields(offsets)) {
751 			ut_ad(index->is_instant()
752 			      || i + (index->id == DICT_INDEXES_ID)
753 			      == rec_offs_n_fields(offsets));
754 
755 			ut_ad(i != 0);
756 			offs = combine(rec_offs_base(offsets)[i], DEFAULT);
757 
758 			do {
759 				rec_offs_base(offsets)[1 + i] = offs;
760 			} while (++i < rec_offs_n_fields(offsets));
761 
762 			any |= REC_OFFS_DEFAULT;
763 		}
764 
765 		*rec_offs_base(offsets) = any;
766 	}
767 }
768 
769 /** Determine the offsets to each field in an index record.
770 @param[in]	rec		physical record
771 @param[in]	index		the index that the record belongs to
772 @param[in,out]	offsets		array comprising offsets[0] allocated elements,
773 				or an array from rec_get_offsets(), or NULL
774 @param[in]	n_core		0, or index->n_core_fields for leaf page
775 @param[in]	n_fields	maximum number of offsets to compute
776 				(ULINT_UNDEFINED to compute all offsets)
777 @param[in,out]	heap		memory heap
778 @return the new offsets */
779 rec_offs*
rec_get_offsets_func(const rec_t * rec,const dict_index_t * index,rec_offs * offsets,ulint n_core,ulint n_fields,const char * file,unsigned line,mem_heap_t ** heap)780 rec_get_offsets_func(
781 	const rec_t*		rec,
782 	const dict_index_t*	index,
783 	rec_offs*		offsets,
784 	ulint			n_core,
785 	ulint			n_fields,
786 #ifdef UNIV_DEBUG
787 	const char*		file,	/*!< in: file name where called */
788 	unsigned		line,	/*!< in: line number where called */
789 #endif /* UNIV_DEBUG */
790 	mem_heap_t**		heap)	/*!< in/out: memory heap */
791 {
792 	ulint	n;
793 	ulint	size;
794 
795 	ut_ad(index->n_core_fields >= n_core);
796 	ut_ad(index->n_fields >= index->n_core_fields);
797 
798 	if (dict_table_is_comp(index->table)) {
799 		switch (UNIV_EXPECT(rec_get_status(rec),
800 				    REC_STATUS_ORDINARY)) {
801 		case REC_STATUS_COLUMNS_ADDED:
802 		case REC_STATUS_ORDINARY:
803 			ut_ad(n_core);
804 			n = dict_index_get_n_fields(index);
805 			break;
806 		case REC_STATUS_NODE_PTR:
807 			/* Node pointer records consist of the
808 			uniquely identifying fields of the record
809 			followed by a child page number field. */
810 			ut_ad(!n_core);
811 			n = dict_index_get_n_unique_in_tree_nonleaf(index) + 1;
812 			break;
813 		case REC_STATUS_INFIMUM:
814 		case REC_STATUS_SUPREMUM:
815 			/* infimum or supremum record */
816 			ut_ad(rec_get_heap_no_new(rec)
817 			      == ulint(rec_get_status(rec)
818                                        == REC_STATUS_INFIMUM
819                                        ? PAGE_HEAP_NO_INFIMUM
820                                        : PAGE_HEAP_NO_SUPREMUM));
821 			n = 1;
822 			break;
823 		default:
824 			ut_error;
825 			return(NULL);
826 		}
827 	} else {
828 		n = rec_get_n_fields_old(rec);
829 		/* Here, rec can be allocated from the heap (copied
830 		from an index page record), or it can be located in an
831 		index page. If rec is not in an index page, then
832 		page_rec_is_user_rec(rec) and similar predicates
833 		cannot be evaluated. We can still distinguish the
834 		infimum and supremum record based on the heap number. */
835 		const bool is_user_rec = rec_get_heap_no_old(rec)
836 			>= PAGE_HEAP_NO_USER_LOW;
837 		/* The infimum and supremum records carry 1 field. */
838 		ut_ad(is_user_rec || n == 1);
839 		ut_ad(!is_user_rec || n_core || index->is_dummy
840 		      || dict_index_is_ibuf(index)
841 		      || n == n_fields /* dict_stats_analyze_index_level() */
842 		      || n
843 		      == dict_index_get_n_unique_in_tree_nonleaf(index) + 1);
844 		ut_ad(!is_user_rec || !n_core || index->is_dummy
845 		      || dict_index_is_ibuf(index)
846 		      || n == n_fields /* btr_pcur_restore_position() */
847 		      || (n + (index->id == DICT_INDEXES_ID)
848 			  >= n_core && n <= index->n_fields));
849 
850 		if (is_user_rec && n_core && n < index->n_fields) {
851 			ut_ad(!index->is_dummy);
852 			ut_ad(!dict_index_is_ibuf(index));
853 			n = index->n_fields;
854 		}
855 	}
856 
857 	if (UNIV_UNLIKELY(n_fields < n)) {
858 		n = n_fields;
859 	}
860 
861 	/* The offsets header consists of the allocation size at
862 	offsets[0] and the REC_OFFS_HEADER_SIZE bytes. */
863 	size = n + (1 + REC_OFFS_HEADER_SIZE);
864 
865 	if (UNIV_UNLIKELY(!offsets)
866 	    || UNIV_UNLIKELY(rec_offs_get_n_alloc(offsets) < size)) {
867 		if (UNIV_UNLIKELY(!*heap)) {
868 			*heap = mem_heap_create_at(size * sizeof(*offsets),
869 						   file, line);
870 		}
871 		offsets = static_cast<rec_offs*>(
872 			mem_heap_alloc(*heap, size * sizeof(*offsets)));
873 
874 		rec_offs_set_n_alloc(offsets, size);
875 	}
876 
877 	rec_offs_set_n_fields(offsets, n);
878 	rec_init_offsets(rec, index, n_core, offsets);
879 	return(offsets);
880 }
881 
882 /******************************************************//**
883 The following function determines the offsets to each field
884 in the record.  It can reuse a previously allocated array. */
885 void
rec_get_offsets_reverse(const byte * extra,const dict_index_t * index,ulint node_ptr,rec_offs * offsets)886 rec_get_offsets_reverse(
887 /*====================*/
888 	const byte*		extra,	/*!< in: the extra bytes of a
889 					compact record in reverse order,
890 					excluding the fixed-size
891 					REC_N_NEW_EXTRA_BYTES */
892 	const dict_index_t*	index,	/*!< in: record descriptor */
893 	ulint			node_ptr,/*!< in: nonzero=node pointer,
894 					0=leaf node */
895 	rec_offs*		offsets)/*!< in/out: array consisting of
896 					offsets[0] allocated elements */
897 {
898 	ulint		n;
899 	ulint		i;
900 	rec_offs	offs;
901 	rec_offs	any_ext = 0;
902 	const byte*	nulls;
903 	const byte*	lens;
904 	dict_field_t*	field;
905 	ulint		null_mask;
906 	ulint		n_node_ptr_field;
907 
908 	ut_ad(dict_table_is_comp(index->table));
909 	ut_ad(!index->is_instant());
910 
911 	if (UNIV_UNLIKELY(node_ptr != 0)) {
912 		n_node_ptr_field =
913 			dict_index_get_n_unique_in_tree_nonleaf(index);
914 		n = n_node_ptr_field + 1;
915 	} else {
916 		n_node_ptr_field = ULINT_UNDEFINED;
917 		n = dict_index_get_n_fields(index);
918 	}
919 
920 	ut_a(rec_offs_get_n_alloc(offsets) >= n + (1 + REC_OFFS_HEADER_SIZE));
921 	rec_offs_set_n_fields(offsets, n);
922 
923 	nulls = extra;
924 	lens = nulls + UT_BITS_IN_BYTES(index->n_nullable);
925 	i = offs = 0;
926 	null_mask = 1;
927 
928 	/* read the lengths of fields 0..n */
929 	do {
930 		rec_offs len;
931 		if (UNIV_UNLIKELY(i == n_node_ptr_field)) {
932 			len = offs += REC_NODE_PTR_SIZE;
933 			goto resolved;
934 		}
935 
936 		field = dict_index_get_nth_field(index, i);
937 		if (!(dict_field_get_col(field)->prtype & DATA_NOT_NULL)) {
938 			/* nullable field => read the null flag */
939 
940 			if (UNIV_UNLIKELY(!(byte) null_mask)) {
941 				nulls++;
942 				null_mask = 1;
943 			}
944 
945 			if (*nulls & null_mask) {
946 				null_mask <<= 1;
947 				/* No length is stored for NULL fields.
948 				We do not advance offs, and we set
949 				the length to zero and enable the
950 				SQL NULL flag in offsets[]. */
951 				len = combine(offs, SQL_NULL);
952 				goto resolved;
953 			}
954 			null_mask <<= 1;
955 		}
956 
957 		if (UNIV_UNLIKELY(!field->fixed_len)) {
958 			/* Variable-length field: read the length */
959 			const dict_col_t*	col
960 				= dict_field_get_col(field);
961 			len = *lens++;
962 			/* If the maximum length of the field is up
963 			to 255 bytes, the actual length is always
964 			stored in one byte. If the maximum length is
965 			more than 255 bytes, the actual length is
966 			stored in one byte for 0..127.  The length
967 			will be encoded in two bytes when it is 128 or
968 			more, or when the field is stored externally. */
969 			if (DATA_BIG_COL(col)) {
970 				if (len & 0x80) {
971 					/* 1exxxxxxx xxxxxxxx */
972 					len <<= 8;
973 					len |= *lens++;
974 
975 					offs += get_value(len);
976 					if (UNIV_UNLIKELY(len & 0x4000)) {
977 						any_ext = REC_OFFS_EXTERNAL;
978 						len = combine(offs,
979 							      STORED_OFFPAGE);
980 					} else {
981 						len = offs;
982 					}
983 
984 					goto resolved;
985 				}
986 			}
987 
988 			len = offs += len;
989 		} else {
990 			len = offs += static_cast<rec_offs>(field->fixed_len);
991 		}
992 resolved:
993 		rec_offs_base(offsets)[i + 1] = len;
994 	} while (++i < rec_offs_n_fields(offsets));
995 
996 	ut_ad(lens >= extra);
997 	*rec_offs_base(offsets)
998 		= static_cast<rec_offs>(lens - extra + REC_N_NEW_EXTRA_BYTES)
999 		  | REC_OFFS_COMPACT | any_ext;
1000 }
1001 
1002 /************************************************************//**
1003 The following function is used to get the offset to the nth
1004 data field in an old-style record.
1005 @return offset to the field */
1006 ulint
rec_get_nth_field_offs_old(const rec_t * rec,ulint n,ulint * len)1007 rec_get_nth_field_offs_old(
1008 /*=======================*/
1009 	const rec_t*	rec,	/*!< in: record */
1010 	ulint		n,	/*!< in: index of the field */
1011 	ulint*		len)	/*!< out: length of the field;
1012 				UNIV_SQL_NULL if SQL null */
1013 {
1014 	ulint	os;
1015 	ulint	next_os;
1016 
1017 	ut_a(n < rec_get_n_fields_old(rec));
1018 
1019 	if (rec_get_1byte_offs_flag(rec)) {
1020 		os = rec_1_get_field_start_offs(rec, n);
1021 
1022 		next_os = rec_1_get_field_end_info(rec, n);
1023 
1024 		if (next_os & REC_1BYTE_SQL_NULL_MASK) {
1025 			*len = UNIV_SQL_NULL;
1026 
1027 			return(os);
1028 		}
1029 
1030 		next_os = next_os & ~REC_1BYTE_SQL_NULL_MASK;
1031 	} else {
1032 		os = rec_2_get_field_start_offs(rec, n);
1033 
1034 		next_os = rec_2_get_field_end_info(rec, n);
1035 
1036 		if (next_os & REC_2BYTE_SQL_NULL_MASK) {
1037 			*len = UNIV_SQL_NULL;
1038 
1039 			return(os);
1040 		}
1041 
1042 		next_os = next_os & ~(REC_2BYTE_SQL_NULL_MASK
1043 				      | REC_2BYTE_EXTERN_MASK);
1044 	}
1045 
1046 	*len = next_os - os;
1047 
1048 	ut_ad(*len < srv_page_size);
1049 
1050 	return(os);
1051 }
1052 
1053 /**********************************************************//**
1054 Determines the size of a data tuple prefix in ROW_FORMAT=COMPACT.
1055 @return total size */
1056 template<bool redundant_temp>
1057 MY_ATTRIBUTE((warn_unused_result, nonnull(1,2)))
1058 static inline
1059 ulint
rec_get_converted_size_comp_prefix_low(const dict_index_t * index,const dfield_t * fields,ulint n_fields,ulint * extra,rec_comp_status_t status,bool temp)1060 rec_get_converted_size_comp_prefix_low(
1061 /*===================================*/
1062 	const dict_index_t*	index,	/*!< in: record descriptor;
1063 					dict_table_is_comp() is
1064 					assumed to hold, even if
1065 					it does not */
1066 	const dfield_t*		fields,	/*!< in: array of data fields */
1067 	ulint			n_fields,/*!< in: number of data fields */
1068 	ulint*			extra,	/*!< out: extra size */
1069 	rec_comp_status_t	status,	/*!< in: status flags */
1070 	bool			temp)	/*!< in: whether this is a
1071 					temporary file record */
1072 {
1073 	ulint	extra_size = temp ? 0 : REC_N_NEW_EXTRA_BYTES;
1074 	ulint	data_size;
1075 	ulint	i;
1076 	ut_ad(n_fields > 0);
1077 	ut_ad(n_fields <= dict_index_get_n_fields(index));
1078 	ut_d(ulint n_null = index->n_nullable);
1079 	ut_ad(status == REC_STATUS_ORDINARY || status == REC_STATUS_NODE_PTR
1080 	      || status == REC_STATUS_COLUMNS_ADDED);
1081 	unsigned n_core_fields = redundant_temp
1082 		? row_log_get_n_core_fields(index)
1083 		: index->n_core_fields;
1084 
1085 	if (status == REC_STATUS_COLUMNS_ADDED
1086 	    && (!temp || n_fields > n_core_fields)) {
1087 		if (!redundant_temp) { ut_ad(index->is_instant()); }
1088 		ut_ad(UT_BITS_IN_BYTES(n_null) >= index->n_core_null_bytes);
1089 		extra_size += UT_BITS_IN_BYTES(index->get_n_nullable(n_fields))
1090 			+ rec_get_n_add_field_len(n_fields - 1
1091 						  - n_core_fields);
1092 	} else {
1093 		ut_ad(n_fields <= n_core_fields);
1094 		extra_size += index->n_core_null_bytes;
1095 	}
1096 
1097 	data_size = 0;
1098 
1099 	if (temp && dict_table_is_comp(index->table)) {
1100 		/* No need to do adjust fixed_len=0. We only need to
1101 		adjust it for ROW_FORMAT=REDUNDANT. */
1102 		temp = false;
1103 	}
1104 
1105 	/* read the lengths of fields 0..n */
1106 	for (i = 0; i < n_fields; i++) {
1107 		const dict_field_t*	field;
1108 		ulint			len;
1109 		ulint			fixed_len;
1110 		const dict_col_t*	col;
1111 
1112 		field = dict_index_get_nth_field(index, i);
1113 		len = dfield_get_len(&fields[i]);
1114 		col = dict_field_get_col(field);
1115 
1116 #ifdef UNIV_DEBUG
1117 		const dtype_t* type = dfield_get_type(&fields[i]);
1118 		if (dict_index_is_spatial(index)) {
1119 			if (DATA_GEOMETRY_MTYPE(col->mtype) && i == 0) {
1120 				ut_ad(type->prtype & DATA_GIS_MBR);
1121 			} else {
1122 				ut_ad(type->mtype == DATA_SYS_CHILD
1123 				      || dict_col_type_assert_equal(col, type));
1124 			}
1125 		} else {
1126 			ut_ad(dict_col_type_assert_equal(col, type));
1127 		}
1128 #endif
1129 
1130 		/* All NULLable fields must be included in the n_null count. */
1131 		ut_ad((col->prtype & DATA_NOT_NULL) || n_null--);
1132 
1133 		if (dfield_is_null(&fields[i])) {
1134 			/* No length is stored for NULL fields. */
1135 			ut_ad(!(col->prtype & DATA_NOT_NULL));
1136 			continue;
1137 		}
1138 
1139 		ut_ad(len <= col->len || DATA_LARGE_MTYPE(col->mtype)
1140 		      || (col->len == 0 && col->mtype == DATA_VARCHAR));
1141 
1142 		fixed_len = field->fixed_len;
1143 		if (temp && fixed_len
1144 		    && !dict_col_get_fixed_size(col, temp)) {
1145 			fixed_len = 0;
1146 		}
1147 		/* If the maximum length of a variable-length field
1148 		is up to 255 bytes, the actual length is always stored
1149 		in one byte. If the maximum length is more than 255
1150 		bytes, the actual length is stored in one byte for
1151 		0..127.  The length will be encoded in two bytes when
1152 		it is 128 or more, or when the field is stored externally. */
1153 
1154 		if (fixed_len) {
1155 #ifdef UNIV_DEBUG
1156 			ut_ad(len <= fixed_len);
1157 
1158 			if (dict_index_is_spatial(index)) {
1159 				ut_ad(type->mtype == DATA_SYS_CHILD
1160 				      || !col->mbmaxlen
1161 				      || len >= col->mbminlen
1162 				      * fixed_len / col->mbmaxlen);
1163 			} else {
1164 				ut_ad(type->mtype != DATA_SYS_CHILD);
1165 				ut_ad(!col->mbmaxlen
1166 				      || len >= col->mbminlen
1167 				      * fixed_len / col->mbmaxlen);
1168 			}
1169 
1170 			/* dict_index_add_col() should guarantee this */
1171 			ut_ad(!field->prefix_len
1172 			      || fixed_len == field->prefix_len);
1173 #endif /* UNIV_DEBUG */
1174 		} else if (dfield_is_ext(&fields[i])) {
1175 			ut_ad(DATA_BIG_COL(col));
1176 			extra_size += 2;
1177 		} else if (len < 128 || !DATA_BIG_COL(col)) {
1178 			extra_size++;
1179 		} else {
1180 			/* For variable-length columns, we look up the
1181 			maximum length from the column itself.  If this
1182 			is a prefix index column shorter than 256 bytes,
1183 			this will waste one byte. */
1184 			extra_size += 2;
1185 		}
1186 		data_size += len;
1187 	}
1188 
1189 	if (extra) {
1190 		*extra = extra_size;
1191 	}
1192 
1193 	return(extra_size + data_size);
1194 }
1195 
1196 /**********************************************************//**
1197 Determines the size of a data tuple prefix in ROW_FORMAT=COMPACT.
1198 @return total size */
1199 ulint
rec_get_converted_size_comp_prefix(const dict_index_t * index,const dfield_t * fields,ulint n_fields,ulint * extra)1200 rec_get_converted_size_comp_prefix(
1201 /*===============================*/
1202 	const dict_index_t*	index,	/*!< in: record descriptor */
1203 	const dfield_t*		fields,	/*!< in: array of data fields */
1204 	ulint			n_fields,/*!< in: number of data fields */
1205 	ulint*			extra)	/*!< out: extra size */
1206 {
1207 	ut_ad(dict_table_is_comp(index->table));
1208 	return(rec_get_converted_size_comp_prefix_low<false>(
1209 		       index, fields, n_fields, extra,
1210 		       REC_STATUS_ORDINARY, false));
1211 }
1212 
1213 /**********************************************************//**
1214 Determines the size of a data tuple in ROW_FORMAT=COMPACT.
1215 @return total size */
1216 ulint
rec_get_converted_size_comp(const dict_index_t * index,rec_comp_status_t status,const dfield_t * fields,ulint n_fields,ulint * extra)1217 rec_get_converted_size_comp(
1218 /*========================*/
1219 	const dict_index_t*	index,	/*!< in: record descriptor;
1220 					dict_table_is_comp() is
1221 					assumed to hold, even if
1222 					it does not */
1223 	rec_comp_status_t	status,	/*!< in: status bits of the record */
1224 	const dfield_t*		fields,	/*!< in: array of data fields */
1225 	ulint			n_fields,/*!< in: number of data fields */
1226 	ulint*			extra)	/*!< out: extra size */
1227 {
1228 	ut_ad(n_fields > 0);
1229 
1230 	switch (UNIV_EXPECT(status, REC_STATUS_ORDINARY)) {
1231 	case REC_STATUS_ORDINARY:
1232 		if (n_fields > index->n_core_fields) {
1233 			ut_ad(index->is_instant());
1234 			status = REC_STATUS_COLUMNS_ADDED;
1235 		}
1236 		/* fall through */
1237 	case REC_STATUS_COLUMNS_ADDED:
1238 		ut_ad(n_fields >= index->n_core_fields);
1239 		ut_ad(n_fields <= index->n_fields);
1240 		return rec_get_converted_size_comp_prefix_low<false>(
1241 			index, fields, n_fields, extra, status, false);
1242 	case REC_STATUS_NODE_PTR:
1243 		n_fields--;
1244 		ut_ad(n_fields == dict_index_get_n_unique_in_tree_nonleaf(
1245 					index));
1246 		ut_ad(dfield_get_len(&fields[n_fields]) == REC_NODE_PTR_SIZE);
1247 		return REC_NODE_PTR_SIZE /* child page number */
1248 			+ rec_get_converted_size_comp_prefix_low<false>(
1249 				index, fields, n_fields, extra, status, false);
1250 	case REC_STATUS_INFIMUM:
1251 	case REC_STATUS_SUPREMUM:
1252 		/* not supported */
1253 		break;
1254 	}
1255 
1256 	ut_error;
1257 	return(ULINT_UNDEFINED);
1258 }
1259 
1260 /***********************************************************//**
1261 Sets the value of the ith field SQL null bit of an old-style record. */
1262 void
rec_set_nth_field_null_bit(rec_t * rec,ulint i,ibool val)1263 rec_set_nth_field_null_bit(
1264 /*=======================*/
1265 	rec_t*	rec,	/*!< in: record */
1266 	ulint	i,	/*!< in: ith field */
1267 	ibool	val)	/*!< in: value to set */
1268 {
1269 	ulint	info;
1270 
1271 	if (rec_get_1byte_offs_flag(rec)) {
1272 
1273 		info = rec_1_get_field_end_info(rec, i);
1274 
1275 		if (val) {
1276 			info = info | REC_1BYTE_SQL_NULL_MASK;
1277 		} else {
1278 			info = info & ~REC_1BYTE_SQL_NULL_MASK;
1279 		}
1280 
1281 		rec_1_set_field_end_info(rec, i, info);
1282 
1283 		return;
1284 	}
1285 
1286 	info = rec_2_get_field_end_info(rec, i);
1287 
1288 	if (val) {
1289 		info = info | REC_2BYTE_SQL_NULL_MASK;
1290 	} else {
1291 		info = info & ~REC_2BYTE_SQL_NULL_MASK;
1292 	}
1293 
1294 	rec_2_set_field_end_info(rec, i, info);
1295 }
1296 
1297 /***********************************************************//**
1298 Sets an old-style record field to SQL null.
1299 The physical size of the field is not changed. */
1300 void
rec_set_nth_field_sql_null(rec_t * rec,ulint n)1301 rec_set_nth_field_sql_null(
1302 /*=======================*/
1303 	rec_t*	rec,	/*!< in: record */
1304 	ulint	n)	/*!< in: index of the field */
1305 {
1306 	ulint	offset;
1307 
1308 	offset = rec_get_field_start_offs(rec, n);
1309 
1310 	data_write_sql_null(rec + offset, rec_get_nth_field_size(rec, n));
1311 
1312 	rec_set_nth_field_null_bit(rec, n, TRUE);
1313 }
1314 
1315 /*********************************************************//**
1316 Builds an old-style physical record out of a data tuple and
1317 stores it beginning from the start of the given buffer.
1318 @return pointer to the origin of physical record */
1319 static
1320 rec_t*
rec_convert_dtuple_to_rec_old(byte * buf,const dtuple_t * dtuple,ulint n_ext)1321 rec_convert_dtuple_to_rec_old(
1322 /*==========================*/
1323 	byte*		buf,	/*!< in: start address of the physical record */
1324 	const dtuple_t*	dtuple,	/*!< in: data tuple */
1325 	ulint		n_ext)	/*!< in: number of externally stored columns */
1326 {
1327 	const dfield_t*	field;
1328 	ulint		n_fields;
1329 	ulint		data_size;
1330 	rec_t*		rec;
1331 	ulint		end_offset;
1332 	ulint		ored_offset;
1333 	ulint		len;
1334 	ulint		i;
1335 
1336 	ut_ad(buf && dtuple);
1337 	ut_ad(dtuple_validate(dtuple));
1338 	ut_ad(dtuple_check_typed(dtuple));
1339 
1340 	n_fields = dtuple_get_n_fields(dtuple);
1341 	data_size = dtuple_get_data_size(dtuple, 0);
1342 
1343 	ut_ad(n_fields > 0);
1344 
1345 	/* Calculate the offset of the origin in the physical record */
1346 
1347 	rec = buf + rec_get_converted_extra_size(data_size, n_fields, n_ext);
1348 	/* Store the number of fields */
1349 	rec_set_n_fields_old(rec, n_fields);
1350 
1351 	/* Set the info bits of the record */
1352 	rec_set_info_bits_old(rec, dtuple_get_info_bits(dtuple)
1353 			      & REC_INFO_BITS_MASK);
1354 	rec_set_heap_no_old(rec, PAGE_HEAP_NO_USER_LOW);
1355 
1356 	/* Store the data and the offsets */
1357 
1358 	end_offset = 0;
1359 
1360 	if (!n_ext && data_size <= REC_1BYTE_OFFS_LIMIT) {
1361 
1362 		rec_set_1byte_offs_flag(rec, TRUE);
1363 
1364 		for (i = 0; i < n_fields; i++) {
1365 
1366 			field = dtuple_get_nth_field(dtuple, i);
1367 
1368 			if (dfield_is_null(field)) {
1369 				len = dtype_get_sql_null_size(
1370 					dfield_get_type(field), 0);
1371 				data_write_sql_null(rec + end_offset, len);
1372 
1373 				end_offset += len;
1374 				ored_offset = end_offset
1375 					| REC_1BYTE_SQL_NULL_MASK;
1376 			} else {
1377 				/* If the data is not SQL null, store it */
1378 				len = dfield_get_len(field);
1379 
1380 				memcpy(rec + end_offset,
1381 				       dfield_get_data(field), len);
1382 
1383 				end_offset += len;
1384 				ored_offset = end_offset;
1385 			}
1386 
1387 			rec_1_set_field_end_info(rec, i, ored_offset);
1388 		}
1389 	} else {
1390 		rec_set_1byte_offs_flag(rec, FALSE);
1391 
1392 		for (i = 0; i < n_fields; i++) {
1393 
1394 			field = dtuple_get_nth_field(dtuple, i);
1395 
1396 			if (dfield_is_null(field)) {
1397 				len = dtype_get_sql_null_size(
1398 					dfield_get_type(field), 0);
1399 				data_write_sql_null(rec + end_offset, len);
1400 
1401 				end_offset += len;
1402 				ored_offset = end_offset
1403 					| REC_2BYTE_SQL_NULL_MASK;
1404 			} else {
1405 				/* If the data is not SQL null, store it */
1406 				len = dfield_get_len(field);
1407 
1408 				memcpy(rec + end_offset,
1409 				       dfield_get_data(field), len);
1410 
1411 				end_offset += len;
1412 				ored_offset = end_offset;
1413 
1414 				if (dfield_is_ext(field)) {
1415 					ored_offset |= REC_2BYTE_EXTERN_MASK;
1416 				}
1417 			}
1418 
1419 			rec_2_set_field_end_info(rec, i, ored_offset);
1420 		}
1421 	}
1422 
1423 	return(rec);
1424 }
1425 
1426 /** Convert a data tuple into a ROW_FORMAT=COMPACT record.
1427 @param[out]	rec		converted record
1428 @param[in]	index		index
1429 @param[in]	fields		data fields to convert
1430 @param[in]	n_fields	number of data fields
1431 @param[in]	status		rec_get_status(rec)
1432 @param[in]	temp		whether to use the format for temporary files
1433 				in index creation */
1434 template<bool redundant_temp>
1435 static inline
1436 void
rec_convert_dtuple_to_rec_comp(rec_t * rec,const dict_index_t * index,const dfield_t * fields,ulint n_fields,rec_comp_status_t status,bool temp)1437 rec_convert_dtuple_to_rec_comp(
1438 	rec_t*			rec,
1439 	const dict_index_t*	index,
1440 	const dfield_t*		fields,
1441 	ulint			n_fields,
1442 	rec_comp_status_t	status,
1443 	bool			temp)
1444 {
1445 	const dfield_t*	field;
1446 	const dtype_t*	type;
1447 	byte*		end;
1448 	byte*		nulls = temp
1449 		? rec - 1 : rec - (REC_N_NEW_EXTRA_BYTES + 1);
1450 	byte*		UNINIT_VAR(lens);
1451 	ulint		len;
1452 	ulint		i;
1453 	ulint		UNINIT_VAR(n_node_ptr_field);
1454 	ulint		fixed_len;
1455 	ulint		null_mask	= 1;
1456 	const ulint	n_core_fields = redundant_temp
1457 			? row_log_get_n_core_fields(index)
1458 			: index->n_core_fields;
1459 	ut_ad(n_fields > 0);
1460 	ut_ad(temp || dict_table_is_comp(index->table));
1461 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1462 
1463 	ut_d(ulint n_null = index->n_nullable);
1464 
1465 	switch (status) {
1466 	case REC_STATUS_COLUMNS_ADDED:
1467 		if (!redundant_temp) { ut_ad(index->is_instant()); }
1468 		ut_ad(n_fields > n_core_fields);
1469 		rec_set_n_add_field(nulls, n_fields - 1 - n_core_fields);
1470 		/* fall through */
1471 	case REC_STATUS_ORDINARY:
1472 		ut_ad(n_fields <= dict_index_get_n_fields(index));
1473 		if (!temp) {
1474 			rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
1475 			rec_set_status(rec, n_fields == n_core_fields
1476 				       ? REC_STATUS_ORDINARY
1477 				       : REC_STATUS_COLUMNS_ADDED);
1478 		} if (dict_table_is_comp(index->table)) {
1479 			/* No need to do adjust fixed_len=0. We only
1480 			need to adjust it for ROW_FORMAT=REDUNDANT. */
1481 			temp = false;
1482 		}
1483 
1484 		n_node_ptr_field = ULINT_UNDEFINED;
1485 		lens = nulls - (index->is_instant()
1486 				? UT_BITS_IN_BYTES(index->get_n_nullable(
1487 							   n_fields))
1488 				: UT_BITS_IN_BYTES(
1489 					unsigned(index->n_nullable)));
1490 		break;
1491 	case REC_STATUS_NODE_PTR:
1492 		ut_ad(!temp);
1493 		rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
1494 		rec_set_status(rec, status);
1495 		ut_ad(n_fields
1496 		      == dict_index_get_n_unique_in_tree_nonleaf(index) + 1);
1497 		ut_d(n_null = std::min<unsigned>(index->n_core_null_bytes * 8U,
1498 						 index->n_nullable));
1499 		n_node_ptr_field = n_fields - 1;
1500 		lens = nulls - index->n_core_null_bytes;
1501 		break;
1502 	case REC_STATUS_INFIMUM:
1503 	case REC_STATUS_SUPREMUM:
1504 		ut_error;
1505 		return;
1506 	}
1507 
1508 	end = rec;
1509 	/* clear the SQL-null flags */
1510 	memset(lens + 1, 0, ulint(nulls - lens));
1511 
1512 	/* Store the data and the offsets */
1513 
1514 	for (i = 0; i < n_fields; i++) {
1515 		const dict_field_t*	ifield;
1516 		dict_col_t*		col = NULL;
1517 
1518 		field = &fields[i];
1519 
1520 		type = dfield_get_type(field);
1521 		len = dfield_get_len(field);
1522 
1523 		if (UNIV_UNLIKELY(i == n_node_ptr_field)) {
1524 			ut_ad(dtype_get_prtype(type) & DATA_NOT_NULL);
1525 			ut_ad(len == REC_NODE_PTR_SIZE);
1526 			memcpy(end, dfield_get_data(field), len);
1527 			end += REC_NODE_PTR_SIZE;
1528 			break;
1529 		}
1530 
1531 		if (!(dtype_get_prtype(type) & DATA_NOT_NULL)) {
1532 			/* nullable field */
1533 			ut_ad(n_null--);
1534 
1535 			if (UNIV_UNLIKELY(!(byte) null_mask)) {
1536 				nulls--;
1537 				null_mask = 1;
1538 			}
1539 
1540 			ut_ad(*nulls < null_mask);
1541 
1542 			/* set the null flag if necessary */
1543 			if (dfield_is_null(field)) {
1544 				*nulls |= null_mask;
1545 				null_mask <<= 1;
1546 				continue;
1547 			}
1548 
1549 			null_mask <<= 1;
1550 		}
1551 		/* only nullable fields can be null */
1552 		ut_ad(!dfield_is_null(field));
1553 
1554 		ifield = dict_index_get_nth_field(index, i);
1555 		fixed_len = ifield->fixed_len;
1556 		col = ifield->col;
1557 		if (temp && fixed_len
1558 		    && !dict_col_get_fixed_size(col, temp)) {
1559 			fixed_len = 0;
1560 		}
1561 
1562 		/* If the maximum length of a variable-length field
1563 		is up to 255 bytes, the actual length is always stored
1564 		in one byte. If the maximum length is more than 255
1565 		bytes, the actual length is stored in one byte for
1566 		0..127.  The length will be encoded in two bytes when
1567 		it is 128 or more, or when the field is stored externally. */
1568 		if (fixed_len) {
1569 			ut_ad(len <= fixed_len);
1570 			ut_ad(!col->mbmaxlen
1571 			      || len >= col->mbminlen
1572 			      * fixed_len / col->mbmaxlen);
1573 			ut_ad(!dfield_is_ext(field));
1574 		} else if (dfield_is_ext(field)) {
1575 			ut_ad(DATA_BIG_COL(col));
1576 			ut_ad(len <= REC_ANTELOPE_MAX_INDEX_COL_LEN
1577 			      + BTR_EXTERN_FIELD_REF_SIZE);
1578 			*lens-- = (byte) (len >> 8) | 0xc0;
1579 			*lens-- = (byte) len;
1580 		} else {
1581 			ut_ad(len <= dtype_get_len(type)
1582 			      || DATA_LARGE_MTYPE(dtype_get_mtype(type))
1583 			      || !strcmp(index->name,
1584 					 FTS_INDEX_TABLE_IND_NAME));
1585 			if (len < 128 || !DATA_BIG_LEN_MTYPE(
1586 				dtype_get_len(type), dtype_get_mtype(type))) {
1587 
1588 				*lens-- = (byte) len;
1589 			} else {
1590 				ut_ad(len < 16384);
1591 				*lens-- = (byte) (len >> 8) | 0x80;
1592 				*lens-- = (byte) len;
1593 			}
1594 		}
1595 
1596 		if (len) {
1597 			memcpy(end, dfield_get_data(field), len);
1598 			end += len;
1599 		}
1600 	}
1601 }
1602 
1603 /*********************************************************//**
1604 Builds a new-style physical record out of a data tuple and
1605 stores it beginning from the start of the given buffer.
1606 @return pointer to the origin of physical record */
1607 static
1608 rec_t*
rec_convert_dtuple_to_rec_new(byte * buf,const dict_index_t * index,const dtuple_t * dtuple)1609 rec_convert_dtuple_to_rec_new(
1610 /*==========================*/
1611 	byte*			buf,	/*!< in: start address of
1612 					the physical record */
1613 	const dict_index_t*	index,	/*!< in: record descriptor */
1614 	const dtuple_t*		dtuple)	/*!< in: data tuple */
1615 {
1616 	ut_ad(!(dtuple->info_bits
1617 		& ~(REC_NEW_STATUS_MASK | REC_INFO_DELETED_FLAG
1618 		    | REC_INFO_MIN_REC_FLAG)));
1619 	rec_comp_status_t status = static_cast<rec_comp_status_t>(
1620 		dtuple->info_bits & REC_NEW_STATUS_MASK);
1621 	if (status == REC_STATUS_ORDINARY
1622 	    && dtuple->n_fields > index->n_core_fields) {
1623 		ut_ad(index->is_instant());
1624 		status = REC_STATUS_COLUMNS_ADDED;
1625 	}
1626 
1627 	ulint	extra_size;
1628 
1629 	rec_get_converted_size_comp(
1630 		index, status, dtuple->fields, dtuple->n_fields, &extra_size);
1631 	rec_t* rec = buf + extra_size;
1632 
1633 	rec_convert_dtuple_to_rec_comp<false>(
1634 		rec, index, dtuple->fields, dtuple->n_fields, status, false);
1635 	rec_set_info_bits_new(rec, dtuple->info_bits & ~REC_NEW_STATUS_MASK);
1636 	return(rec);
1637 }
1638 
1639 /*********************************************************//**
1640 Builds a physical record out of a data tuple and
1641 stores it beginning from the start of the given buffer.
1642 @return pointer to the origin of physical record */
1643 rec_t*
rec_convert_dtuple_to_rec(byte * buf,const dict_index_t * index,const dtuple_t * dtuple,ulint n_ext)1644 rec_convert_dtuple_to_rec(
1645 /*======================*/
1646 	byte*			buf,	/*!< in: start address of the
1647 					physical record */
1648 	const dict_index_t*	index,	/*!< in: record descriptor */
1649 	const dtuple_t*		dtuple,	/*!< in: data tuple */
1650 	ulint			n_ext)	/*!< in: number of
1651 					externally stored columns */
1652 {
1653 	rec_t*	rec;
1654 
1655 	ut_ad(buf != NULL);
1656 	ut_ad(index != NULL);
1657 	ut_ad(dtuple != NULL);
1658 	ut_ad(dtuple_validate(dtuple));
1659 	ut_ad(dtuple_check_typed(dtuple));
1660 
1661 	if (dict_table_is_comp(index->table)) {
1662 		rec = rec_convert_dtuple_to_rec_new(buf, index, dtuple);
1663 	} else {
1664 		rec = rec_convert_dtuple_to_rec_old(buf, dtuple, n_ext);
1665 	}
1666 
1667 	return(rec);
1668 }
1669 
1670 /** Determine the size of a data tuple prefix in a temporary file.
1671 @param[in]	index		clustered or secondary index
1672 @param[in]	fields		data fields
1673 @param[in]	n_fields	number of data fields
1674 @param[out]	extra		record header size
1675 @param[in]	status		REC_STATUS_ORDINARY or REC_STATUS_COLUMNS_ADDED
1676 @return	total size, in bytes */
1677 template<bool redundant_temp>
1678 ulint
rec_get_converted_size_temp(const dict_index_t * index,const dfield_t * fields,ulint n_fields,ulint * extra,rec_comp_status_t status)1679 rec_get_converted_size_temp(
1680 	const dict_index_t*	index,
1681 	const dfield_t*		fields,
1682 	ulint			n_fields,
1683 	ulint*			extra,
1684 	rec_comp_status_t	status)
1685 {
1686 	return rec_get_converted_size_comp_prefix_low<redundant_temp>(
1687 		index, fields, n_fields, extra, status, true);
1688 }
1689 
1690 template ulint rec_get_converted_size_temp<false>(
1691 	const dict_index_t*, const dfield_t*, ulint, ulint*,
1692 	rec_comp_status_t);
1693 
1694 template ulint rec_get_converted_size_temp<true>(
1695 	const dict_index_t*, const dfield_t*, ulint, ulint*,
1696 	rec_comp_status_t);
1697 
1698 /** Determine the offset to each field in temporary file.
1699 @param[in]	rec	temporary file record
1700 @param[in]	index	index of that the record belongs to
1701 @param[in,out]	offsets	offsets to the fields; in: rec_offs_n_fields(offsets)
1702 @param[in]	n_core	number of core fields (index->n_core_fields)
1703 @param[in]	def_val	default values for non-core fields
1704 @param[in]	status	REC_STATUS_ORDINARY or REC_STATUS_COLUMNS_ADDED */
1705 void
rec_init_offsets_temp(const rec_t * rec,const dict_index_t * index,rec_offs * offsets,ulint n_core,const dict_col_t::def_t * def_val,rec_comp_status_t status)1706 rec_init_offsets_temp(
1707 	const rec_t*		rec,
1708 	const dict_index_t*	index,
1709 	rec_offs*		offsets,
1710 	ulint			n_core,
1711 	const dict_col_t::def_t*def_val,
1712 	rec_comp_status_t	status)
1713 {
1714 	ut_ad(status == REC_STATUS_ORDINARY
1715 	      || status == REC_STATUS_COLUMNS_ADDED);
1716 	/* The table may have been converted to plain format
1717 	if it was emptied during an ALTER TABLE operation. */
1718 	ut_ad(index->n_core_fields == n_core || !index->is_instant());
1719 	ut_ad(index->n_core_fields >= n_core);
1720 	rec_init_offsets_comp_ordinary(rec, index, offsets, n_core, def_val,
1721 				       status == REC_STATUS_COLUMNS_ADDED
1722 				       ? REC_LEAF_TEMP_COLUMNS_ADDED
1723 				       : REC_LEAF_TEMP);
1724 }
1725 
1726 /** Determine the offset to each field in temporary file.
1727 @param[in]	rec	temporary file record
1728 @param[in]	index	index of that the record belongs to
1729 @param[in,out]	offsets	offsets to the fields; in: rec_offs_n_fields(offsets)
1730 */
1731 void
rec_init_offsets_temp(const rec_t * rec,const dict_index_t * index,rec_offs * offsets)1732 rec_init_offsets_temp(
1733 	const rec_t*		rec,
1734 	const dict_index_t*	index,
1735 	rec_offs*		offsets)
1736 {
1737 	ut_ad(!index->is_instant());
1738 	rec_init_offsets_comp_ordinary(rec, index, offsets,
1739 				       index->n_core_fields, NULL,
1740 				       REC_LEAF_TEMP);
1741 }
1742 
1743 /** Convert a data tuple prefix to the temporary file format.
1744 @param[out]	rec		record in temporary file format
1745 @param[in]	index		clustered or secondary index
1746 @param[in]	fields		data fields
1747 @param[in]	n_fields	number of data fields
1748 @param[in]	status		REC_STATUS_ORDINARY or REC_STATUS_COLUMNS_ADDED
1749 */
1750 template<bool redundant_temp>
1751 void
rec_convert_dtuple_to_temp(rec_t * rec,const dict_index_t * index,const dfield_t * fields,ulint n_fields,rec_comp_status_t status)1752 rec_convert_dtuple_to_temp(
1753 	rec_t*			rec,
1754 	const dict_index_t*	index,
1755 	const dfield_t*		fields,
1756 	ulint			n_fields,
1757 	rec_comp_status_t	status)
1758 {
1759 	rec_convert_dtuple_to_rec_comp<redundant_temp>(
1760 		rec, index, fields, n_fields, status, true);
1761 }
1762 
1763 template void rec_convert_dtuple_to_temp<false>(
1764 	rec_t*, const dict_index_t*, const dfield_t*,
1765 	ulint, rec_comp_status_t);
1766 
1767 template void rec_convert_dtuple_to_temp<true>(
1768 	rec_t*, const dict_index_t*, const dfield_t*,
1769 	ulint, rec_comp_status_t);
1770 
1771 /** Copy the first n fields of a (copy of a) physical record to a data tuple.
1772 The fields are copied into the memory heap.
1773 @param[out]	tuple		data tuple
1774 @param[in]	rec		index record, or a copy thereof
1775 @param[in]	index		index of rec
1776 @param[in]	n_core		index->n_core_fields at the time rec was
1777 				copied, or 0 if non-leaf page record
1778 @param[in]	n_fields	number of fields to copy
1779 @param[in,out]	heap		memory heap */
1780 void
rec_copy_prefix_to_dtuple(dtuple_t * tuple,const rec_t * rec,const dict_index_t * index,ulint n_core,ulint n_fields,mem_heap_t * heap)1781 rec_copy_prefix_to_dtuple(
1782 	dtuple_t*		tuple,
1783 	const rec_t*		rec,
1784 	const dict_index_t*	index,
1785 	ulint			n_core,
1786 	ulint			n_fields,
1787 	mem_heap_t*		heap)
1788 {
1789 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1790 	rec_offs*	offsets	= offsets_;
1791 	rec_offs_init(offsets_);
1792 
1793 	ut_ad(n_core <= index->n_core_fields);
1794 	ut_ad(n_core || n_fields
1795 	      <= dict_index_get_n_unique_in_tree_nonleaf(index) + 1);
1796 
1797 	offsets = rec_get_offsets(rec, index, offsets, n_core,
1798 				  n_fields, &heap);
1799 
1800 	ut_ad(rec_validate(rec, offsets));
1801 	ut_ad(!rec_offs_any_default(offsets));
1802 	ut_ad(dtuple_check_typed(tuple));
1803 
1804 	tuple->info_bits = rec_get_info_bits(rec, rec_offs_comp(offsets));
1805 
1806 	for (ulint i = 0; i < n_fields; i++) {
1807 		dfield_t*	field;
1808 		const byte*	data;
1809 		ulint		len;
1810 
1811 		field = dtuple_get_nth_field(tuple, i);
1812 		data = rec_get_nth_field(rec, offsets, i, &len);
1813 
1814 		if (len != UNIV_SQL_NULL) {
1815 			dfield_set_data(field,
1816 					mem_heap_dup(heap, data, len), len);
1817 			ut_ad(!rec_offs_nth_extern(offsets, i));
1818 		} else {
1819 			dfield_set_null(field);
1820 		}
1821 	}
1822 }
1823 
1824 /**************************************************************//**
1825 Copies the first n fields of an old-style physical record
1826 to a new physical record in a buffer.
1827 @return own: copied record */
1828 static
1829 rec_t*
rec_copy_prefix_to_buf_old(const rec_t * rec,ulint n_fields,ulint area_end,byte ** buf,ulint * buf_size)1830 rec_copy_prefix_to_buf_old(
1831 /*=======================*/
1832 	const rec_t*	rec,		/*!< in: physical record */
1833 	ulint		n_fields,	/*!< in: number of fields to copy */
1834 	ulint		area_end,	/*!< in: end of the prefix data */
1835 	byte**		buf,		/*!< in/out: memory buffer for
1836 					the copied prefix, or NULL */
1837 	ulint*		buf_size)	/*!< in/out: buffer size */
1838 {
1839 	rec_t*	copy_rec;
1840 	ulint	area_start;
1841 	ulint	prefix_len;
1842 
1843 	if (rec_get_1byte_offs_flag(rec)) {
1844 		area_start = REC_N_OLD_EXTRA_BYTES + n_fields;
1845 	} else {
1846 		area_start = REC_N_OLD_EXTRA_BYTES + 2 * n_fields;
1847 	}
1848 
1849 	prefix_len = area_start + area_end;
1850 
1851 	if ((*buf == NULL) || (*buf_size < prefix_len)) {
1852 		ut_free(*buf);
1853 		*buf_size = prefix_len;
1854 		*buf = static_cast<byte*>(ut_malloc_nokey(prefix_len));
1855 	}
1856 
1857 	ut_memcpy(*buf, rec - area_start, prefix_len);
1858 
1859 	copy_rec = *buf + area_start;
1860 
1861 	rec_set_n_fields_old(copy_rec, n_fields);
1862 
1863 	return(copy_rec);
1864 }
1865 
1866 /**************************************************************//**
1867 Copies the first n fields of a physical record to a new physical record in
1868 a buffer.
1869 @return own: copied record */
1870 rec_t*
rec_copy_prefix_to_buf(const rec_t * rec,const dict_index_t * index,ulint n_fields,byte ** buf,ulint * buf_size)1871 rec_copy_prefix_to_buf(
1872 /*===================*/
1873 	const rec_t*		rec,		/*!< in: physical record */
1874 	const dict_index_t*	index,		/*!< in: record descriptor */
1875 	ulint			n_fields,	/*!< in: number of fields
1876 						to copy */
1877 	byte**			buf,		/*!< in/out: memory buffer
1878 						for the copied prefix,
1879 						or NULL */
1880 	ulint*			buf_size)	/*!< in/out: buffer size */
1881 {
1882 	ut_ad(n_fields <= index->n_fields || dict_index_is_ibuf(index));
1883 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1884 	UNIV_PREFETCH_RW(*buf);
1885 
1886 	if (!dict_table_is_comp(index->table)) {
1887 		ut_ad(rec_validate_old(rec));
1888 		return(rec_copy_prefix_to_buf_old(
1889 			       rec, n_fields,
1890 			       rec_get_field_start_offs(rec, n_fields),
1891 			       buf, buf_size));
1892 	}
1893 
1894 	ulint		prefix_len	= 0;
1895 	ulint		instant_omit	= 0;
1896 	const byte*	nulls		= rec - (REC_N_NEW_EXTRA_BYTES + 1);
1897 	const byte*	nullf		= nulls;
1898 	const byte*	lens		= nulls - index->n_core_null_bytes;
1899 
1900 	switch (rec_get_status(rec)) {
1901 	default:
1902 		/* infimum or supremum record: no sense to copy anything */
1903 		ut_error;
1904 		return(NULL);
1905 	case REC_STATUS_ORDINARY:
1906 		ut_ad(n_fields <= index->n_core_fields);
1907 		break;
1908 	case REC_STATUS_NODE_PTR:
1909 		/* For R-tree, we need to copy the child page number field. */
1910 		compile_time_assert(DICT_INDEX_SPATIAL_NODEPTR_SIZE == 1);
1911 		if (dict_index_is_spatial(index)) {
1912 			ut_ad(index->n_core_null_bytes == 0);
1913 			ut_ad(n_fields == DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1);
1914 			ut_ad(index->fields[0].col->prtype & DATA_NOT_NULL);
1915 			ut_ad(DATA_BIG_COL(index->fields[0].col));
1916 			/* This is a deficiency of the format introduced
1917 			in MySQL 5.7. The length in the R-tree index should
1918 			always be DATA_MBR_LEN. */
1919 			ut_ad(!index->fields[0].fixed_len);
1920 			ut_ad(*lens == DATA_MBR_LEN);
1921 			lens--;
1922 			prefix_len = DATA_MBR_LEN + REC_NODE_PTR_SIZE;
1923 			n_fields = 0; /* skip the "for" loop below */
1924 			break;
1925 		}
1926 		/* it doesn't make sense to copy the child page number field */
1927 		ut_ad(n_fields
1928 		      <= dict_index_get_n_unique_in_tree_nonleaf(index));
1929 		break;
1930 	case REC_STATUS_COLUMNS_ADDED:
1931 		/* We would have !index->is_instant() when rolling back
1932 		an instant ADD COLUMN operation. */
1933 		ut_ad(index->is_instant() || page_rec_is_metadata(rec));
1934 		nulls++;
1935 		const ulint n_rec = ulint(index->n_core_fields) + 1
1936 			+ rec_get_n_add_field(nulls);
1937 		instant_omit = ulint(&rec[-REC_N_NEW_EXTRA_BYTES] - nulls);
1938 		ut_ad(instant_omit == 1 || instant_omit == 2);
1939 		nullf = nulls;
1940 		const uint nb = UT_BITS_IN_BYTES(index->get_n_nullable(n_rec));
1941 		instant_omit += nb - index->n_core_null_bytes;
1942 		lens = --nulls - nb;
1943 	}
1944 
1945 	const byte* const lenf = lens;
1946 	UNIV_PREFETCH_R(lens);
1947 
1948 	/* read the lengths of fields 0..n */
1949 	for (ulint i = 0, null_mask = 1; i < n_fields; i++) {
1950 		const dict_field_t*	field;
1951 		const dict_col_t*	col;
1952 
1953 		field = dict_index_get_nth_field(index, i);
1954 		col = dict_field_get_col(field);
1955 
1956 		if (!(col->prtype & DATA_NOT_NULL)) {
1957 			/* nullable field => read the null flag */
1958 			if (UNIV_UNLIKELY(!(byte) null_mask)) {
1959 				nulls--;
1960 				null_mask = 1;
1961 			}
1962 
1963 			if (*nulls & null_mask) {
1964 				null_mask <<= 1;
1965 				continue;
1966 			}
1967 
1968 			null_mask <<= 1;
1969 		}
1970 
1971 		if (field->fixed_len) {
1972 			prefix_len += field->fixed_len;
1973 		} else {
1974 			ulint	len = *lens--;
1975 			/* If the maximum length of the column is up
1976 			to 255 bytes, the actual length is always
1977 			stored in one byte. If the maximum length is
1978 			more than 255 bytes, the actual length is
1979 			stored in one byte for 0..127.  The length
1980 			will be encoded in two bytes when it is 128 or
1981 			more, or when the column is stored externally. */
1982 			if (DATA_BIG_COL(col)) {
1983 				if (len & 0x80) {
1984 					/* 1exxxxxx */
1985 					len &= 0x3f;
1986 					len <<= 8;
1987 					len |= *lens--;
1988 					UNIV_PREFETCH_R(lens);
1989 				}
1990 			}
1991 			prefix_len += len;
1992 		}
1993 	}
1994 
1995 	UNIV_PREFETCH_R(rec + prefix_len);
1996 
1997 	ulint size = prefix_len + ulint(rec - (lens + 1)) - instant_omit;
1998 
1999 	if (*buf == NULL || *buf_size < size) {
2000 		ut_free(*buf);
2001 		*buf_size = size;
2002 		*buf = static_cast<byte*>(ut_malloc_nokey(size));
2003 	}
2004 
2005 	if (instant_omit) {
2006 		/* Copy and convert the record header to a format where
2007 		instant ADD COLUMN has not been used:
2008 		+ lengths of variable-length fields in the prefix
2009 		- omit any null flag bytes for any instantly added columns
2010 		+ index->n_core_null_bytes of null flags
2011 		- omit the n_add_fields header (1 or 2 bytes)
2012 		+ REC_N_NEW_EXTRA_BYTES of fixed header */
2013 		byte* b = *buf;
2014 		/* copy the lengths of the variable-length fields */
2015 		memcpy(b, lens + 1, ulint(lenf - lens));
2016 		b += ulint(lenf - lens);
2017 		/* copy the null flags */
2018 		memcpy(b, nullf - index->n_core_null_bytes,
2019 		       index->n_core_null_bytes);
2020 		b += index->n_core_null_bytes + REC_N_NEW_EXTRA_BYTES;
2021 		ut_ad(ulint(b - *buf) + prefix_len == size);
2022 		/* copy the fixed-size header and the record prefix */
2023 		memcpy(b - REC_N_NEW_EXTRA_BYTES, rec - REC_N_NEW_EXTRA_BYTES,
2024 		       prefix_len + REC_N_NEW_EXTRA_BYTES);
2025 		ut_ad(rec_get_status(b) == REC_STATUS_COLUMNS_ADDED);
2026 		rec_set_status(b, REC_STATUS_ORDINARY);
2027 		return b;
2028 	} else {
2029 		memcpy(*buf, lens + 1, size);
2030 		return *buf + (rec - (lens + 1));
2031 	}
2032 }
2033 
2034 /***************************************************************//**
2035 Validates the consistency of an old-style physical record.
2036 @return TRUE if ok */
2037 static
2038 ibool
rec_validate_old(const rec_t * rec)2039 rec_validate_old(
2040 /*=============*/
2041 	const rec_t*	rec)	/*!< in: physical record */
2042 {
2043 	ulint		len;
2044 	ulint		n_fields;
2045 	ulint		len_sum		= 0;
2046 	ulint		i;
2047 
2048 	ut_a(rec);
2049 	n_fields = rec_get_n_fields_old(rec);
2050 
2051 	if ((n_fields == 0) || (n_fields > REC_MAX_N_FIELDS)) {
2052 		ib::error() << "Record has " << n_fields << " fields";
2053 		return(FALSE);
2054 	}
2055 
2056 	for (i = 0; i < n_fields; i++) {
2057 		rec_get_nth_field_offs_old(rec, i, &len);
2058 
2059 		if (!((len < srv_page_size) || (len == UNIV_SQL_NULL))) {
2060 			ib::error() << "Record field " << i << " len " << len;
2061 			return(FALSE);
2062 		}
2063 
2064 		if (len != UNIV_SQL_NULL) {
2065 			len_sum += len;
2066 		} else {
2067 			len_sum += rec_get_nth_field_size(rec, i);
2068 		}
2069 	}
2070 
2071 	if (len_sum != rec_get_data_size_old(rec)) {
2072 		ib::error() << "Record len should be " << len_sum << ", len "
2073 			<< rec_get_data_size_old(rec);
2074 		return(FALSE);
2075 	}
2076 
2077 	return(TRUE);
2078 }
2079 
2080 /***************************************************************//**
2081 Validates the consistency of a physical record.
2082 @return TRUE if ok */
2083 ibool
rec_validate(const rec_t * rec,const rec_offs * offsets)2084 rec_validate(
2085 /*=========*/
2086 	const rec_t*	rec,	/*!< in: physical record */
2087 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
2088 {
2089 	ulint		len;
2090 	ulint		n_fields;
2091 	ulint		len_sum		= 0;
2092 	ulint		i;
2093 
2094 	n_fields = rec_offs_n_fields(offsets);
2095 
2096 	if ((n_fields == 0) || (n_fields > REC_MAX_N_FIELDS)) {
2097 		ib::error() << "Record has " << n_fields << " fields";
2098 		return(FALSE);
2099 	}
2100 
2101 	ut_a(rec_offs_any_flag(offsets, REC_OFFS_COMPACT | REC_OFFS_DEFAULT)
2102 	     || n_fields <= rec_get_n_fields_old(rec));
2103 
2104 	for (i = 0; i < n_fields; i++) {
2105 		rec_get_nth_field_offs(offsets, i, &len);
2106 
2107 		switch (len) {
2108 		default:
2109 			if (len >= srv_page_size) {
2110 				ib::error() << "Record field " << i
2111 					<< " len " << len;
2112 				return(FALSE);
2113 			}
2114 			len_sum += len;
2115 			break;
2116 		case UNIV_SQL_DEFAULT:
2117 			break;
2118 		case UNIV_SQL_NULL:
2119 			if (!rec_offs_comp(offsets)) {
2120 				len_sum += rec_get_nth_field_size(rec, i);
2121 			}
2122 		}
2123 	}
2124 
2125 	if (len_sum != rec_offs_data_size(offsets)) {
2126 		ib::error() << "Record len should be " << len_sum << ", len "
2127 			<< rec_offs_data_size(offsets);
2128 		return(FALSE);
2129 	}
2130 
2131 	if (!rec_offs_comp(offsets)) {
2132 		ut_a(rec_validate_old(rec));
2133 	}
2134 
2135 	return(TRUE);
2136 }
2137 
2138 /***************************************************************//**
2139 Prints an old-style physical record. */
2140 void
rec_print_old(FILE * file,const rec_t * rec)2141 rec_print_old(
2142 /*==========*/
2143 	FILE*		file,	/*!< in: file where to print */
2144 	const rec_t*	rec)	/*!< in: physical record */
2145 {
2146 	const byte*	data;
2147 	ulint		len;
2148 	ulint		n;
2149 	ulint		i;
2150 
2151 	n = rec_get_n_fields_old(rec);
2152 
2153 	fprintf(file, "PHYSICAL RECORD: n_fields " ULINTPF ";"
2154 		" %u-byte offsets; info bits " ULINTPF "\n",
2155 		n,
2156 		rec_get_1byte_offs_flag(rec) ? 1 : 2,
2157 		rec_get_info_bits(rec, FALSE));
2158 
2159 	for (i = 0; i < n; i++) {
2160 
2161 		data = rec_get_nth_field_old(rec, i, &len);
2162 
2163 		fprintf(file, " " ULINTPF ":", i);
2164 
2165 		if (len != UNIV_SQL_NULL) {
2166 			if (len <= 30) {
2167 
2168 				ut_print_buf(file, data, len);
2169 			} else {
2170 				ut_print_buf(file, data, 30);
2171 
2172 				fprintf(file, " (total " ULINTPF " bytes)",
2173 					len);
2174 			}
2175 		} else {
2176 			fprintf(file, " SQL NULL, size " ULINTPF " ",
2177 				rec_get_nth_field_size(rec, i));
2178 		}
2179 
2180 		putc(';', file);
2181 		putc('\n', file);
2182 	}
2183 
2184 	rec_validate_old(rec);
2185 }
2186 
2187 /***************************************************************//**
2188 Prints a physical record in ROW_FORMAT=COMPACT.  Ignores the
2189 record header. */
2190 static
2191 void
rec_print_comp(FILE * file,const rec_t * rec,const rec_offs * offsets)2192 rec_print_comp(
2193 /*===========*/
2194 	FILE*		file,	/*!< in: file where to print */
2195 	const rec_t*	rec,	/*!< in: physical record */
2196 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
2197 {
2198 	ulint	i;
2199 
2200 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
2201           const byte*	UNINIT_VAR(data);
2202 		ulint		len;
2203 
2204 		if (rec_offs_nth_default(offsets, i)) {
2205 			len = UNIV_SQL_DEFAULT;
2206 		} else {
2207 			data = rec_get_nth_field(rec, offsets, i, &len);
2208 		}
2209 
2210 		fprintf(file, " " ULINTPF ":", i);
2211 
2212 		if (len == UNIV_SQL_NULL) {
2213 			fputs(" SQL NULL", file);
2214 		} else if (len == UNIV_SQL_DEFAULT) {
2215 			fputs(" SQL DEFAULT", file);
2216 		} else {
2217 			if (len <= 30) {
2218 
2219 				ut_print_buf(file, data, len);
2220 			} else if (rec_offs_nth_extern(offsets, i)) {
2221 				ut_print_buf(file, data, 30);
2222 				fprintf(file,
2223 					" (total " ULINTPF " bytes, external)",
2224 					len);
2225 				ut_print_buf(file, data + len
2226 					     - BTR_EXTERN_FIELD_REF_SIZE,
2227 					     BTR_EXTERN_FIELD_REF_SIZE);
2228 			} else {
2229 				ut_print_buf(file, data, 30);
2230 
2231 				fprintf(file, " (total " ULINTPF " bytes)",
2232 					len);
2233 			}
2234 		}
2235 		putc(';', file);
2236 		putc('\n', file);
2237 	}
2238 }
2239 
2240 /***************************************************************//**
2241 Prints an old-style spatial index record. */
2242 static
2243 void
rec_print_mbr_old(FILE * file,const rec_t * rec)2244 rec_print_mbr_old(
2245 /*==============*/
2246 	FILE*		file,	/*!< in: file where to print */
2247 	const rec_t*	rec)	/*!< in: physical record */
2248 {
2249 	const byte*	data;
2250 	ulint		len;
2251 	ulint		n;
2252 	ulint		i;
2253 
2254 	ut_ad(rec);
2255 
2256 	n = rec_get_n_fields_old(rec);
2257 
2258 	fprintf(file, "PHYSICAL RECORD: n_fields %lu;"
2259 		" %u-byte offsets; info bits %lu\n",
2260 		(ulong) n,
2261 		rec_get_1byte_offs_flag(rec) ? 1 : 2,
2262 		(ulong) rec_get_info_bits(rec, FALSE));
2263 
2264 	for (i = 0; i < n; i++) {
2265 
2266 		data = rec_get_nth_field_old(rec, i, &len);
2267 
2268 		fprintf(file, " %lu:", (ulong) i);
2269 
2270 		if (len != UNIV_SQL_NULL) {
2271 			if (i == 0) {
2272 				fprintf(file, " MBR:");
2273 				for (; len > 0; len -= sizeof(double)) {
2274 					double	d = mach_double_read(data);
2275 
2276 					if (len != sizeof(double)) {
2277 						fprintf(file, "%.2lf,", d);
2278 					} else {
2279 						fprintf(file, "%.2lf", d);
2280 					}
2281 
2282 					data += sizeof(double);
2283 				}
2284 			} else {
2285 				if (len <= 30) {
2286 
2287 					ut_print_buf(file, data, len);
2288 				} else {
2289 					ut_print_buf(file, data, 30);
2290 
2291 					fprintf(file, " (total %lu bytes)",
2292 						(ulong) len);
2293 				}
2294 			}
2295 		} else {
2296 			fprintf(file, " SQL NULL, size " ULINTPF " ",
2297 				rec_get_nth_field_size(rec, i));
2298 		}
2299 
2300 		putc(';', file);
2301 		putc('\n', file);
2302 	}
2303 
2304 	if (rec_get_deleted_flag(rec, false)) {
2305 		fprintf(file, " Deleted");
2306 	}
2307 
2308 	if (rec_get_info_bits(rec, true) & REC_INFO_MIN_REC_FLAG) {
2309 		fprintf(file, " First rec");
2310 	}
2311 
2312 	rec_validate_old(rec);
2313 }
2314 
2315 /***************************************************************//**
2316 Prints a spatial index record. */
2317 void
rec_print_mbr_rec(FILE * file,const rec_t * rec,const rec_offs * offsets)2318 rec_print_mbr_rec(
2319 /*==============*/
2320 	FILE*		file,	/*!< in: file where to print */
2321 	const rec_t*	rec,	/*!< in: physical record */
2322 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
2323 {
2324 	ut_ad(rec_offs_validate(rec, NULL, offsets));
2325 	ut_ad(!rec_offs_any_default(offsets));
2326 
2327 	if (!rec_offs_comp(offsets)) {
2328 		rec_print_mbr_old(file, rec);
2329 		return;
2330 	}
2331 
2332 	for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
2333 		const byte*	data;
2334 		ulint		len;
2335 
2336 		data = rec_get_nth_field(rec, offsets, i, &len);
2337 
2338 		if (i == 0) {
2339 			fprintf(file, " MBR:");
2340 			for (; len > 0; len -= sizeof(double)) {
2341 				double	d = mach_double_read(data);
2342 
2343 				if (len != sizeof(double)) {
2344 					fprintf(file, "%.2lf,", d);
2345 				} else {
2346 					fprintf(file, "%.2lf", d);
2347 				}
2348 
2349 				data += sizeof(double);
2350 			}
2351 		} else {
2352 			fprintf(file, " %lu:", (ulong) i);
2353 
2354 			if (len != UNIV_SQL_NULL) {
2355 				if (len <= 30) {
2356 
2357 					ut_print_buf(file, data, len);
2358 				} else {
2359 					ut_print_buf(file, data, 30);
2360 
2361 					fprintf(file, " (total %lu bytes)",
2362 						(ulong) len);
2363 				}
2364 			} else {
2365 				fputs(" SQL NULL", file);
2366 			}
2367 		}
2368 		putc(';', file);
2369 	}
2370 
2371 	if (rec_get_info_bits(rec, true) & REC_INFO_DELETED_FLAG) {
2372 		fprintf(file, " Deleted");
2373 	}
2374 
2375 	if (rec_get_info_bits(rec, true) & REC_INFO_MIN_REC_FLAG) {
2376 		fprintf(file, " First rec");
2377 	}
2378 
2379 
2380 	rec_validate(rec, offsets);
2381 }
2382 
2383 /***************************************************************//**
2384 Prints a physical record. */
2385 void
rec_print_new(FILE * file,const rec_t * rec,const rec_offs * offsets)2386 rec_print_new(
2387 /*==========*/
2388 	FILE*		file,	/*!< in: file where to print */
2389 	const rec_t*	rec,	/*!< in: physical record */
2390 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
2391 {
2392 	ut_ad(rec_offs_validate(rec, NULL, offsets));
2393 
2394 #ifdef UNIV_DEBUG
2395 	if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))) {
2396 		DBUG_PRINT("info", ("deleted "));
2397 	} else {
2398 		DBUG_PRINT("info", ("not-deleted "));
2399 	}
2400 #endif /* UNIV_DEBUG */
2401 
2402 	if (!rec_offs_comp(offsets)) {
2403 		rec_print_old(file, rec);
2404 		return;
2405 	}
2406 
2407 	fprintf(file, "PHYSICAL RECORD: n_fields " ULINTPF ";"
2408 		" compact format; info bits " ULINTPF "\n",
2409 		rec_offs_n_fields(offsets),
2410 		rec_get_info_bits(rec, TRUE));
2411 
2412 	rec_print_comp(file, rec, offsets);
2413 	rec_validate(rec, offsets);
2414 }
2415 
2416 /***************************************************************//**
2417 Prints a physical record. */
2418 void
rec_print(FILE * file,const rec_t * rec,const dict_index_t * index)2419 rec_print(
2420 /*======*/
2421 	FILE*			file,	/*!< in: file where to print */
2422 	const rec_t*		rec,	/*!< in: physical record */
2423 	const dict_index_t*	index)	/*!< in: record descriptor */
2424 {
2425 	if (!dict_table_is_comp(index->table)) {
2426 		rec_print_old(file, rec);
2427 		return;
2428 	} else {
2429 		mem_heap_t*	heap	= NULL;
2430 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2431 		rec_offs_init(offsets_);
2432 
2433 		rec_print_new(file, rec,
2434 			      rec_get_offsets(rec, index, offsets_,
2435 					      page_rec_is_leaf(rec)
2436 					      ? index->n_core_fields : 0,
2437 					      ULINT_UNDEFINED, &heap));
2438 		if (UNIV_LIKELY_NULL(heap)) {
2439 			mem_heap_free(heap);
2440 		}
2441 	}
2442 }
2443 
2444 /** Pretty-print a record.
2445 @param[in,out]	o	output stream
2446 @param[in]	rec	physical record
2447 @param[in]	info	rec_get_info_bits(rec)
2448 @param[in]	offsets	rec_get_offsets(rec) */
2449 void
rec_print(std::ostream & o,const rec_t * rec,ulint info,const rec_offs * offsets)2450 rec_print(
2451 	std::ostream&	o,
2452 	const rec_t*	rec,
2453 	ulint		info,
2454 	const rec_offs*	offsets)
2455 {
2456 	const ulint	comp	= rec_offs_comp(offsets);
2457 	const ulint	n	= rec_offs_n_fields(offsets);
2458 
2459 	ut_ad(rec_offs_validate(rec, NULL, offsets));
2460 
2461 	o << (comp ? "COMPACT RECORD" : "RECORD")
2462 	  << "(info_bits=" << info << ", " << n << " fields): {";
2463 
2464 	for (ulint i = 0; i < n; i++) {
2465 		const byte*	data;
2466 		ulint		len;
2467 
2468 		if (i) {
2469 			o << ',';
2470 		}
2471 
2472 		data = rec_get_nth_field(rec, offsets, i, &len);
2473 
2474 		if (len == UNIV_SQL_DEFAULT) {
2475 			o << "DEFAULT";
2476 			continue;
2477 		}
2478 
2479 		if (len == UNIV_SQL_NULL) {
2480 			o << "NULL";
2481 			continue;
2482 		}
2483 
2484 		if (rec_offs_nth_extern(offsets, i)) {
2485 			ulint	local_len = len - BTR_EXTERN_FIELD_REF_SIZE;
2486 			ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
2487 
2488 			o << '['
2489 			  << local_len
2490 			  << '+' << BTR_EXTERN_FIELD_REF_SIZE << ']';
2491 			ut_print_buf(o, data, local_len);
2492 			ut_print_buf_hex(o, data + local_len,
2493 					 BTR_EXTERN_FIELD_REF_SIZE);
2494 		} else {
2495 			o << '[' << len << ']';
2496 			ut_print_buf(o, data, len);
2497 		}
2498 	}
2499 
2500 	o << "}";
2501 }
2502 
2503 /** Display a record.
2504 @param[in,out]	o	output stream
2505 @param[in]	r	record to display
2506 @return	the output stream */
2507 std::ostream&
operator <<(std::ostream & o,const rec_index_print & r)2508 operator<<(std::ostream& o, const rec_index_print& r)
2509 {
2510 	mem_heap_t*	heap	= NULL;
2511 	rec_offs*	offsets	= rec_get_offsets(
2512 		r.m_rec, r.m_index, NULL, page_rec_is_leaf(r.m_rec)
2513 		? r.m_index->n_core_fields : 0,
2514 		ULINT_UNDEFINED, &heap);
2515 	rec_print(o, r.m_rec,
2516 		  rec_get_info_bits(r.m_rec, rec_offs_comp(offsets)),
2517 		  offsets);
2518 	mem_heap_free(heap);
2519 	return(o);
2520 }
2521 
2522 /** Display a record.
2523 @param[in,out]	o	output stream
2524 @param[in]	r	record to display
2525 @return	the output stream */
2526 std::ostream&
operator <<(std::ostream & o,const rec_offsets_print & r)2527 operator<<(std::ostream& o, const rec_offsets_print& r)
2528 {
2529 	rec_print(o, r.m_rec,
2530 		  rec_get_info_bits(r.m_rec, rec_offs_comp(r.m_offsets)),
2531 		  r.m_offsets);
2532 	return(o);
2533 }
2534 
2535 #ifdef UNIV_DEBUG
2536 /** Read the DB_TRX_ID of a clustered index record.
2537 @param[in]	rec	clustered index record
2538 @param[in]	index	clustered index
2539 @return the value of DB_TRX_ID */
2540 trx_id_t
rec_get_trx_id(const rec_t * rec,const dict_index_t * index)2541 rec_get_trx_id(
2542 	const rec_t*		rec,
2543 	const dict_index_t*	index)
2544 {
2545 	ulint		trx_id_col
2546 		= dict_index_get_sys_col_pos(index, DATA_TRX_ID);
2547 	const byte*	trx_id;
2548 	ulint		len;
2549 	mem_heap_t*	heap		= NULL;
2550 	rec_offs offsets_[REC_OFFS_HEADER_SIZE + MAX_REF_PARTS + 2];
2551 	rec_offs_init(offsets_);
2552 	rec_offs* offsets = offsets_;
2553 
2554 	ut_ad(trx_id_col <= MAX_REF_PARTS);
2555 	ut_ad(dict_index_is_clust(index));
2556 	ut_ad(trx_id_col > 0);
2557 	ut_ad(trx_id_col != ULINT_UNDEFINED);
2558 
2559 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
2560 				  trx_id_col + 1, &heap);
2561 
2562 	trx_id = rec_get_nth_field(rec, offsets, trx_id_col, &len);
2563 
2564 	ut_ad(len == DATA_TRX_ID_LEN);
2565 
2566 	if (UNIV_LIKELY_NULL(heap)) {
2567 		mem_heap_free(heap);
2568 	}
2569 
2570 	return(trx_read_trx_id(trx_id));
2571 }
2572 #endif /* UNIV_DEBUG */
2573 
2574 /** Mark the nth field as externally stored.
2575 @param[in]	offsets		array returned by rec_get_offsets()
2576 @param[in]	n		nth field */
2577 void
rec_offs_make_nth_extern(rec_offs * offsets,const ulint n)2578 rec_offs_make_nth_extern(
2579 	rec_offs*	offsets,
2580 	const ulint	n)
2581 {
2582 	ut_ad(!rec_offs_nth_sql_null(offsets, n));
2583 	set_type(rec_offs_base(offsets)[1 + n], STORED_OFFPAGE);
2584 }
2585 #ifdef WITH_WSREP
2586 # include "ha_prototypes.h"
2587 
2588 int
wsrep_rec_get_foreign_key(byte * buf,ulint * buf_len,const rec_t * rec,dict_index_t * index_for,dict_index_t * index_ref,ibool new_protocol)2589 wsrep_rec_get_foreign_key(
2590 	byte 		*buf,     /* out: extracted key */
2591 	ulint 		*buf_len, /* in/out: length of buf */
2592 	const rec_t*	rec,	  /* in: physical record */
2593 	dict_index_t*	index_for,  /* in: index in foreign table */
2594 	dict_index_t*	index_ref,  /* in: index in referenced table */
2595 	ibool		new_protocol) /* in: protocol > 1 */
2596 {
2597 	const byte*	data;
2598 	ulint		len;
2599 	ulint		key_len = 0;
2600 	ulint		i;
2601 	uint            key_parts;
2602 	mem_heap_t*	heap	= NULL;
2603 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2604 	const rec_offs* offsets;
2605 
2606 	ut_ad(index_for);
2607 	ut_ad(index_ref);
2608 
2609         rec_offs_init(offsets_);
2610 	offsets = rec_get_offsets(rec, index_for, offsets_,
2611 				  index_for->n_core_fields,
2612 				  ULINT_UNDEFINED, &heap);
2613 
2614 	ut_ad(rec_offs_validate(rec, NULL, offsets));
2615 
2616 	ut_ad(rec);
2617 
2618 	key_parts = dict_index_get_n_unique_in_tree(index_for);
2619 	for (i = 0;
2620 	     i < key_parts &&
2621 	       (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
2622 	     i++) {
2623 		dict_field_t*	  field_f =
2624 			dict_index_get_nth_field(index_for, i);
2625 		const dict_col_t* col_f = dict_field_get_col(field_f);
2626                 dict_field_t*	  field_r =
2627 			dict_index_get_nth_field(index_ref, i);
2628 		const dict_col_t* col_r = dict_field_get_col(field_r);
2629 
2630 		ut_ad(!rec_offs_nth_default(offsets, i));
2631 		data = rec_get_nth_field(rec, offsets, i, &len);
2632 		if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
2633 		    *buf_len) {
2634 			fprintf(stderr,
2635 				"WSREP: FK key len exceeded "
2636 				ULINTPF " " ULINTPF " " ULINTPF "\n",
2637 				key_len, len, *buf_len);
2638 			goto err_out;
2639 		}
2640 
2641 		if (len == UNIV_SQL_NULL) {
2642 			ut_a(!(col_f->prtype & DATA_NOT_NULL));
2643 			*buf++ = 1;
2644 			key_len++;
2645 		} else if (!new_protocol) {
2646 			if (!(col_r->prtype & DATA_NOT_NULL)) {
2647 				*buf++ = 0;
2648 				key_len++;
2649 			}
2650 			memcpy(buf, data, len);
2651 			*buf_len = wsrep_innobase_mysql_sort(
2652 				(int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
2653 				(uint)dtype_get_charset_coll(col_f->prtype),
2654 				buf, len, *buf_len);
2655 		} else { /* new protocol */
2656 			if (!(col_r->prtype & DATA_NOT_NULL)) {
2657 				*buf++ = 0;
2658 				key_len++;
2659 			}
2660 			switch (col_f->mtype) {
2661 			case DATA_INT: {
2662 				byte* ptr = buf+len;
2663 				for (;;) {
2664 					ptr--;
2665 					*ptr = *data;
2666 					if (ptr == buf) {
2667 						break;
2668 					}
2669 					data++;
2670 				}
2671 
2672 				if (!(col_f->prtype & DATA_UNSIGNED)) {
2673 					buf[len-1] = (byte) (buf[len-1] ^ 128);
2674 				}
2675 
2676 				break;
2677 			}
2678 			case DATA_VARCHAR:
2679 			case DATA_VARMYSQL:
2680 			case DATA_CHAR:
2681 			case DATA_MYSQL:
2682 				/* Copy the actual data */
2683 				ut_memcpy(buf, data, len);
2684 				len = wsrep_innobase_mysql_sort(
2685 					(int)
2686 					(col_f->prtype & DATA_MYSQL_TYPE_MASK),
2687 					(uint)
2688 					dtype_get_charset_coll(col_f->prtype),
2689 					buf, len, *buf_len);
2690 				break;
2691 			case DATA_BLOB:
2692 			case DATA_BINARY:
2693 			case DATA_FIXBINARY:
2694 			case DATA_GEOMETRY:
2695 				memcpy(buf, data, len);
2696 				break;
2697 
2698 			case DATA_FLOAT:
2699 			{
2700 				float f = mach_float_read(data);
2701 				memcpy(buf, &f, sizeof(float));
2702 			}
2703 			break;
2704 			case DATA_DOUBLE:
2705 			{
2706 				double d = mach_double_read(data);
2707 				memcpy(buf, &d, sizeof(double));
2708 			}
2709 			break;
2710 			default:
2711 				break;
2712 			}
2713 
2714 			key_len += len;
2715 			buf 	+= len;
2716 		}
2717 	}
2718 
2719 	rec_validate(rec, offsets);
2720 
2721 	if (UNIV_LIKELY_NULL(heap)) {
2722 		mem_heap_free(heap);
2723 	}
2724 
2725 	*buf_len = key_len;
2726 	return DB_SUCCESS;
2727 
2728  err_out:
2729 	if (UNIV_LIKELY_NULL(heap)) {
2730 		mem_heap_free(heap);
2731 	}
2732 	return DB_ERROR;
2733 }
2734 #endif // WITH_WSREP
2735