1 /*****************************************************************************
2
3 Copyright (c) 1996, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file trx/trx0rec.cc
22 Transaction undo log record
23
24 Created 3/26/1996 Heikki Tuuri
25 *******************************************************/
26
27 #include "trx0rec.h"
28 #include "fsp0fsp.h"
29 #include "mach0data.h"
30 #include "trx0undo.h"
31 #include "mtr0log.h"
32 #include "dict0dict.h"
33 #include "ut0mem.h"
34 #include "row0ext.h"
35 #include "row0upd.h"
36 #include "que0que.h"
37 #include "trx0purge.h"
38 #include "trx0rseg.h"
39 #include "row0row.h"
40 #include "row0mysql.h"
41
42 /** The search tuple corresponding to TRX_UNDO_INSERT_METADATA */
43 const dtuple_t trx_undo_metadata = {
44 REC_INFO_METADATA, 0, 0,
45 NULL, 0, NULL
46 #ifdef UNIV_DEBUG
47 , DATA_TUPLE_MAGIC_N
48 #endif /* UNIV_DEBUG */
49 };
50
51 /*=========== UNDO LOG RECORD CREATION AND DECODING ====================*/
52
53 /** Write redo log of writing an undo log record.
54 @param[in] undo_block undo log page
55 @param[in] old_free start offset of the undo log record
56 @param[in] new_free end offset of the undo log record
57 @param[in,out] mtr mini-transaction */
trx_undof_page_add_undo_rec_log(const buf_block_t * undo_block,ulint old_free,ulint new_free,mtr_t * mtr)58 static void trx_undof_page_add_undo_rec_log(const buf_block_t* undo_block,
59 ulint old_free, ulint new_free,
60 mtr_t* mtr)
61 {
62 ut_ad(old_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
63 ut_ad(new_free >= old_free);
64 ut_ad(new_free < srv_page_size);
65 ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
66 + undo_block->frame)
67 == new_free);
68 mtr->set_modified();
69 switch (mtr->get_log_mode()) {
70 case MTR_LOG_NONE:
71 case MTR_LOG_NO_REDO:
72 return;
73 case MTR_LOG_SHORT_INSERTS:
74 ut_ad(0);
75 /* fall through */
76 case MTR_LOG_ALL:
77 break;
78 }
79
80 const uint32_t
81 len = uint32_t(new_free - old_free - 4),
82 reserved = std::min<uint32_t>(11 + 13 + len,
83 mtr->get_log()->MAX_DATA_SIZE);
84 byte* log_ptr = mtr->get_log()->open(reserved);
85 const byte* log_end = log_ptr + reserved;
86 log_ptr = mlog_write_initial_log_record_low(
87 MLOG_UNDO_INSERT,
88 undo_block->page.id.space(), undo_block->page.id.page_no(),
89 log_ptr, mtr);
90 mach_write_to_2(log_ptr, len);
91 if (log_ptr + 2 + len <= log_end) {
92 memcpy(log_ptr + 2, undo_block->frame + old_free + 2, len);
93 mlog_close(mtr, log_ptr + 2 + len);
94 } else {
95 mlog_close(mtr, log_ptr + 2);
96 mtr->get_log()->push(undo_block->frame + old_free + 2, len);
97 }
98 }
99
100 /** Parse MLOG_UNDO_INSERT.
101 @param[in] ptr log record
102 @param[in] end_ptr end of log record buffer
103 @param[in,out] page page or NULL
104 @return end of log record
105 @retval NULL if the log record is incomplete */
106 byte*
trx_undo_parse_add_undo_rec(const byte * ptr,const byte * end_ptr,page_t * page)107 trx_undo_parse_add_undo_rec(
108 const byte* ptr,
109 const byte* end_ptr,
110 page_t* page)
111 {
112 ulint len;
113
114 if (end_ptr < ptr + 2) {
115
116 return(NULL);
117 }
118
119 len = mach_read_from_2(ptr);
120 ptr += 2;
121
122 if (end_ptr < ptr + len) {
123
124 return(NULL);
125 }
126
127 if (page) {
128 ulint first_free = mach_read_from_2(page + TRX_UNDO_PAGE_HDR
129 + TRX_UNDO_PAGE_FREE);
130 byte* rec = page + first_free;
131
132 mach_write_to_2(rec, first_free + 4 + len);
133 mach_write_to_2(rec + 2 + len, first_free);
134
135 mach_write_to_2(page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
136 first_free + 4 + len);
137 memcpy(rec + 2, ptr, len);
138 }
139
140 return(const_cast<byte*>(ptr + len));
141 }
142
143 /** Calculate the free space left for extending an undo log record.
144 @param block undo log page
145 @param ptr current end of the undo page
146 @return bytes left */
trx_undo_left(const buf_block_t * undo_block,const byte * ptr)147 static ulint trx_undo_left(const buf_block_t *undo_block, const byte *ptr)
148 {
149 ut_ad(ptr >= &undo_block->frame[TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE]);
150 /* The 10 is supposed to be an extra safety margin (and needed for
151 compatibility with older versions) */
152 lint left= srv_page_size - (ptr - undo_block->frame) -
153 (10 + FIL_PAGE_DATA_END);
154 ut_ad(left >= 0);
155 return left < 0 ? 0 : static_cast<ulint>(left);
156 }
157
158 /**********************************************************************//**
159 Set the next and previous pointers in the undo page for the undo record
160 that was written to ptr. Update the first free value by the number of bytes
161 written for this undo record.
162 @return offset of the inserted entry on the page if succeeded, 0 if fail */
163 static
164 ulint
trx_undo_page_set_next_prev_and_add(buf_block_t * undo_block,byte * ptr,mtr_t * mtr)165 trx_undo_page_set_next_prev_and_add(
166 /*================================*/
167 buf_block_t* undo_block, /*!< in/out: undo log page */
168 byte* ptr, /*!< in: ptr up to where data has been
169 written on this undo page. */
170 mtr_t* mtr) /*!< in: mtr */
171 {
172 ulint first_free; /*!< offset within undo_page */
173 ulint end_of_rec; /*!< offset within undo_page */
174 byte* ptr_to_first_free;
175 /* pointer within undo_page
176 that points to the next free
177 offset value within undo_page.*/
178
179 ut_ad(ptr > undo_block->frame);
180 ut_ad(ptr < undo_block->frame + srv_page_size);
181
182 if (UNIV_UNLIKELY(trx_undo_left(undo_block, ptr) < 2)) {
183 return(0);
184 }
185
186 ptr_to_first_free = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
187 + undo_block->frame;
188
189 first_free = mach_read_from_2(ptr_to_first_free);
190
191 /* Write offset of the previous undo log record */
192 mach_write_to_2(ptr, first_free);
193 ptr += 2;
194
195 end_of_rec = ulint(ptr - undo_block->frame);
196
197 /* Write offset of the next undo log record */
198 mach_write_to_2(undo_block->frame + first_free, end_of_rec);
199
200 /* Update the offset to first free undo record */
201 mach_write_to_2(ptr_to_first_free, end_of_rec);
202
203 /* Write this log entry to the UNDO log */
204 trx_undof_page_add_undo_rec_log(undo_block, first_free,
205 end_of_rec, mtr);
206
207 return(first_free);
208 }
209
210 /** Virtual column undo log version. To distinguish it from a length value
211 in 5.7.8 undo log, it starts with 0xF1 */
212 static const ulint VIRTUAL_COL_UNDO_FORMAT_1 = 0xF1;
213
214 /** Write virtual column index info (index id and column position in index)
215 to the undo log
216 @param[in,out] undo_block undo log page
217 @param[in] table the table
218 @param[in] pos the virtual column position
219 @param[in] ptr undo log record being written
220 @param[in] first_v_col whether this is the first virtual column
221 which could start with a version marker
222 @return new undo log pointer */
223 static
224 byte*
trx_undo_log_v_idx(buf_block_t * undo_block,const dict_table_t * table,ulint pos,byte * ptr,bool first_v_col)225 trx_undo_log_v_idx(
226 buf_block_t* undo_block,
227 const dict_table_t* table,
228 ulint pos,
229 byte* ptr,
230 bool first_v_col)
231 {
232 ut_ad(pos < table->n_v_def);
233 dict_v_col_t* vcol = dict_table_get_nth_v_col(table, pos);
234
235 ulint n_idx = vcol->v_indexes->size();
236 byte* old_ptr;
237
238 ut_ad(n_idx > 0);
239
240 /* Size to reserve, max 5 bytes for each index id and position, plus
241 5 bytes for num of indexes, 2 bytes for write total length.
242 1 byte for undo log record format version marker */
243 ulint size = n_idx * (5 + 5) + 5 + 2 + (first_v_col ? 1 : 0);
244
245 if (trx_undo_left(undo_block, ptr) < size) {
246 return(NULL);
247 }
248
249 if (first_v_col) {
250 /* write the version marker */
251 mach_write_to_1(ptr, VIRTUAL_COL_UNDO_FORMAT_1);
252
253 ptr += 1;
254 }
255
256 old_ptr = ptr;
257
258 ptr += 2;
259
260 ptr += mach_write_compressed(ptr, n_idx);
261
262 dict_v_idx_list::iterator it;
263
264 for (it = vcol->v_indexes->begin();
265 it != vcol->v_indexes->end(); ++it) {
266 dict_v_idx_t v_index = *it;
267
268 ptr += mach_write_compressed(
269 ptr, static_cast<ulint>(v_index.index->id));
270
271 ptr += mach_write_compressed(ptr, v_index.nth_field);
272 }
273
274 mach_write_to_2(old_ptr, ulint(ptr - old_ptr));
275
276 return(ptr);
277 }
278
279 /** Read virtual column index from undo log, and verify the column is still
280 indexed, and return its position
281 @param[in] table the table
282 @param[in] ptr undo log pointer
283 @param[out] col_pos the column number or ULINT_UNDEFINED
284 if the column is not indexed any more
285 @return remaining part of undo log record after reading these values */
286 static
287 const byte*
trx_undo_read_v_idx_low(const dict_table_t * table,const byte * ptr,ulint * col_pos)288 trx_undo_read_v_idx_low(
289 const dict_table_t* table,
290 const byte* ptr,
291 ulint* col_pos)
292 {
293 ulint len = mach_read_from_2(ptr);
294 const byte* old_ptr = ptr;
295
296 *col_pos = ULINT_UNDEFINED;
297
298 ptr += 2;
299
300 ulint num_idx = mach_read_next_compressed(&ptr);
301
302 ut_ad(num_idx > 0);
303
304 dict_index_t* clust_index = dict_table_get_first_index(table);
305
306 for (ulint i = 0; i < num_idx; i++) {
307 index_id_t id = mach_read_next_compressed(&ptr);
308 ulint pos = mach_read_next_compressed(&ptr);
309 dict_index_t* index = dict_table_get_next_index(clust_index);
310
311 while (index != NULL) {
312 /* Return if we find a matching index.
313 TODO: in the future, it might be worth to add
314 checks on other indexes */
315 if (index->id == id) {
316 const dict_col_t* col = dict_index_get_nth_col(
317 index, pos);
318 ut_ad(col->is_virtual());
319 const dict_v_col_t* vcol = reinterpret_cast<
320 const dict_v_col_t*>(col);
321 *col_pos = vcol->v_pos;
322 return(old_ptr + len);
323 }
324
325 index = dict_table_get_next_index(index);
326 }
327 }
328
329 return(old_ptr + len);
330 }
331
332 /** Read virtual column index from undo log or online log if the log
333 contains such info, and in the undo log case, verify the column is
334 still indexed, and output its position
335 @param[in] table the table
336 @param[in] ptr undo log pointer
337 @param[in] first_v_col if this is the first virtual column, which
338 has the version marker
339 @param[in,out] is_undo_log this function is used to parse both undo log,
340 and online log for virtual columns. So
341 check to see if this is undo log. When
342 first_v_col is true, is_undo_log is output,
343 when first_v_col is false, is_undo_log is input
344 @param[in,out] field_no the column number
345 @return remaining part of undo log record after reading these values */
346 const byte*
trx_undo_read_v_idx(const dict_table_t * table,const byte * ptr,bool first_v_col,bool * is_undo_log,ulint * field_no)347 trx_undo_read_v_idx(
348 const dict_table_t* table,
349 const byte* ptr,
350 bool first_v_col,
351 bool* is_undo_log,
352 ulint* field_no)
353 {
354 /* Version marker only put on the first virtual column */
355 if (first_v_col) {
356 /* Undo log has the virtual undo log marker */
357 *is_undo_log = (mach_read_from_1(ptr)
358 == VIRTUAL_COL_UNDO_FORMAT_1);
359
360 if (*is_undo_log) {
361 ptr += 1;
362 }
363 }
364
365 if (*is_undo_log) {
366 ptr = trx_undo_read_v_idx_low(table, ptr, field_no);
367 } else {
368 *field_no -= REC_MAX_N_FIELDS;
369 }
370
371 return(ptr);
372 }
373
374 /** Reports in the undo log of an insert of virtual columns.
375 @param[in] undo_block undo log page
376 @param[in] table the table
377 @param[in] row dtuple contains the virtual columns
378 @param[in,out] ptr log ptr
379 @return true if write goes well, false if out of space */
380 static
381 bool
trx_undo_report_insert_virtual(buf_block_t * undo_block,dict_table_t * table,const dtuple_t * row,byte ** ptr)382 trx_undo_report_insert_virtual(
383 buf_block_t* undo_block,
384 dict_table_t* table,
385 const dtuple_t* row,
386 byte** ptr)
387 {
388 byte* start = *ptr;
389 bool first_v_col = true;
390
391 if (trx_undo_left(undo_block, *ptr) < 2) {
392 return(false);
393 }
394
395 /* Reserve 2 bytes to write the number
396 of bytes the stored fields take in this
397 undo record */
398 *ptr += 2;
399
400 for (ulint col_no = 0; col_no < dict_table_get_n_v_cols(table);
401 col_no++) {
402 const dict_v_col_t* col
403 = dict_table_get_nth_v_col(table, col_no);
404
405 if (col->m_col.ord_part) {
406
407 /* make sure enought space to write the length */
408 if (trx_undo_left(undo_block, *ptr) < 5) {
409 return(false);
410 }
411
412 ulint pos = col_no;
413 pos += REC_MAX_N_FIELDS;
414 *ptr += mach_write_compressed(*ptr, pos);
415
416 *ptr = trx_undo_log_v_idx(undo_block, table,
417 col_no, *ptr, first_v_col);
418 first_v_col = false;
419
420 if (*ptr == NULL) {
421 return(false);
422 }
423
424 const dfield_t* vfield = dtuple_get_nth_v_field(
425 row, col->v_pos);
426 switch (ulint flen = vfield->len) {
427 case 0: case UNIV_SQL_NULL:
428 if (trx_undo_left(undo_block, *ptr) < 5) {
429 return(false);
430 }
431
432 *ptr += mach_write_compressed(*ptr, flen);
433 break;
434 default:
435 ulint max_len
436 = dict_max_v_field_len_store_undo(
437 table, col_no);
438
439 if (flen > max_len) {
440 flen = max_len;
441 }
442
443 if (trx_undo_left(undo_block, *ptr)
444 < flen + 5) {
445 return(false);
446 }
447 *ptr += mach_write_compressed(*ptr, flen);
448
449 memcpy(*ptr, vfield->data, flen);
450 *ptr += flen;
451 }
452 }
453 }
454
455 /* Always mark the end of the log with 2 bytes length field */
456 mach_write_to_2(start, ulint(*ptr - start));
457
458 return(true);
459 }
460
461 /**********************************************************************//**
462 Reports in the undo log of an insert of a clustered index record.
463 @return offset of the inserted entry on the page if succeed, 0 if fail */
464 static
465 ulint
trx_undo_page_report_insert(buf_block_t * undo_block,trx_t * trx,dict_index_t * index,const dtuple_t * clust_entry,mtr_t * mtr)466 trx_undo_page_report_insert(
467 /*========================*/
468 buf_block_t* undo_block, /*!< in: undo log page */
469 trx_t* trx, /*!< in: transaction */
470 dict_index_t* index, /*!< in: clustered index */
471 const dtuple_t* clust_entry, /*!< in: index entry which will be
472 inserted to the clustered index */
473 mtr_t* mtr) /*!< in: mtr */
474 {
475 ulint first_free;
476 byte* ptr;
477 ulint i;
478
479 ut_ad(dict_index_is_clust(index));
480 /* MariaDB 10.3.1+ in trx_undo_page_init() always initializes
481 TRX_UNDO_PAGE_TYPE as 0, but previous versions wrote
482 TRX_UNDO_INSERT == 1 into insert_undo pages,
483 or TRX_UNDO_UPDATE == 2 into update_undo pages. */
484 ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
485 + undo_block->frame) <= 2);
486
487 first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
488 + undo_block->frame);
489 ptr = undo_block->frame + first_free;
490
491 ut_ad(first_free <= srv_page_size);
492
493 if (trx_undo_left(undo_block, ptr) < 2 + 1 + 11 + 11) {
494 /* Not enough space for writing the general parameters */
495 return(0);
496 }
497
498 /* Reserve 2 bytes for the pointer to the next undo log record */
499 ptr += 2;
500
501 /* Store first some general parameters to the undo log */
502 *ptr++ = TRX_UNDO_INSERT_REC;
503 ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
504 ptr += mach_u64_write_much_compressed(ptr, index->table->id);
505 /*----------------------------------------*/
506 /* Store then the fields required to uniquely determine the record
507 to be inserted in the clustered index */
508 if (UNIV_UNLIKELY(clust_entry->info_bits != 0)) {
509 ut_ad(clust_entry->info_bits == REC_INFO_METADATA);
510 ut_ad(index->is_instant());
511 ut_ad(undo_block->frame[first_free + 2]
512 == TRX_UNDO_INSERT_REC);
513 undo_block->frame[first_free + 2] = TRX_UNDO_INSERT_METADATA;
514 goto done;
515 }
516
517 for (i = 0; i < dict_index_get_n_unique(index); i++) {
518
519 const dfield_t* field = dtuple_get_nth_field(clust_entry, i);
520 ulint flen = dfield_get_len(field);
521
522 if (trx_undo_left(undo_block, ptr) < 5) {
523
524 return(0);
525 }
526
527 ptr += mach_write_compressed(ptr, flen);
528
529 switch (flen) {
530 case 0: case UNIV_SQL_NULL:
531 break;
532 default:
533 if (trx_undo_left(undo_block, ptr) < flen) {
534
535 return(0);
536 }
537
538 memcpy(ptr, dfield_get_data(field), flen);
539 ptr += flen;
540 }
541 }
542
543 if (index->table->n_v_cols) {
544 if (!trx_undo_report_insert_virtual(
545 undo_block, index->table, clust_entry, &ptr)) {
546 return(0);
547 }
548 }
549
550 done:
551 return(trx_undo_page_set_next_prev_and_add(undo_block, ptr, mtr));
552 }
553
554 /**********************************************************************//**
555 Reads from an undo log record the general parameters.
556 @return remaining part of undo log record after reading these values */
557 byte*
trx_undo_rec_get_pars(trx_undo_rec_t * undo_rec,ulint * type,ulint * cmpl_info,bool * updated_extern,undo_no_t * undo_no,table_id_t * table_id)558 trx_undo_rec_get_pars(
559 /*==================*/
560 trx_undo_rec_t* undo_rec, /*!< in: undo log record */
561 ulint* type, /*!< out: undo record type:
562 TRX_UNDO_INSERT_REC, ... */
563 ulint* cmpl_info, /*!< out: compiler info, relevant only
564 for update type records */
565 bool* updated_extern, /*!< out: true if we updated an
566 externally stored fild */
567 undo_no_t* undo_no, /*!< out: undo log record number */
568 table_id_t* table_id) /*!< out: table id */
569 {
570 const byte* ptr;
571 ulint type_cmpl;
572
573 ptr = undo_rec + 2;
574
575 type_cmpl = mach_read_from_1(ptr);
576 ptr++;
577
578 *updated_extern = !!(type_cmpl & TRX_UNDO_UPD_EXTERN);
579 type_cmpl &= ~TRX_UNDO_UPD_EXTERN;
580
581 *type = type_cmpl & (TRX_UNDO_CMPL_INFO_MULT - 1);
582 *cmpl_info = type_cmpl / TRX_UNDO_CMPL_INFO_MULT;
583
584 *undo_no = mach_read_next_much_compressed(&ptr);
585 *table_id = mach_read_next_much_compressed(&ptr);
586
587 return(const_cast<byte*>(ptr));
588 }
589
590 /** Read from an undo log record a non-virtual column value.
591 @param[in,out] ptr pointer to remaining part of the undo record
592 @param[in,out] field stored field
593 @param[in,out] len length of the field, or UNIV_SQL_NULL
594 @param[in,out] orig_len original length of the locally stored part
595 of an externally stored column, or 0
596 @return remaining part of undo log record after reading these values */
597 byte*
trx_undo_rec_get_col_val(const byte * ptr,const byte ** field,ulint * len,ulint * orig_len)598 trx_undo_rec_get_col_val(
599 const byte* ptr,
600 const byte** field,
601 ulint* len,
602 ulint* orig_len)
603 {
604 *len = mach_read_next_compressed(&ptr);
605 *orig_len = 0;
606
607 switch (*len) {
608 case UNIV_SQL_NULL:
609 *field = NULL;
610 break;
611 case UNIV_EXTERN_STORAGE_FIELD:
612 *orig_len = mach_read_next_compressed(&ptr);
613 *len = mach_read_next_compressed(&ptr);
614 *field = ptr;
615 ptr += *len & ~SPATIAL_STATUS_MASK;
616
617 ut_ad(*orig_len >= BTR_EXTERN_FIELD_REF_SIZE);
618 ut_ad(*len > *orig_len);
619 /* @see dtuple_convert_big_rec() */
620 ut_ad(*len >= BTR_EXTERN_FIELD_REF_SIZE);
621
622 /* we do not have access to index->table here
623 ut_ad(dict_table_has_atomic_blobs(index->table)
624 || *len >= col->max_prefix
625 + BTR_EXTERN_FIELD_REF_SIZE);
626 */
627
628 *len += UNIV_EXTERN_STORAGE_FIELD;
629 break;
630 default:
631 *field = ptr;
632 if (*len >= UNIV_EXTERN_STORAGE_FIELD) {
633 ptr += (*len - UNIV_EXTERN_STORAGE_FIELD)
634 & ~SPATIAL_STATUS_MASK;
635 } else {
636 ptr += *len;
637 }
638 }
639
640 return(const_cast<byte*>(ptr));
641 }
642
643 /*******************************************************************//**
644 Builds a row reference from an undo log record.
645 @return pointer to remaining part of undo record */
646 byte*
trx_undo_rec_get_row_ref(byte * ptr,dict_index_t * index,const dtuple_t ** ref,mem_heap_t * heap)647 trx_undo_rec_get_row_ref(
648 /*=====================*/
649 byte* ptr, /*!< in: remaining part of a copy of an undo log
650 record, at the start of the row reference;
651 NOTE that this copy of the undo log record must
652 be preserved as long as the row reference is
653 used, as we do NOT copy the data in the
654 record! */
655 dict_index_t* index, /*!< in: clustered index */
656 const dtuple_t**ref, /*!< out, own: row reference */
657 mem_heap_t* heap) /*!< in: memory heap from which the memory
658 needed is allocated */
659 {
660 ulint ref_len;
661 ulint i;
662
663 ut_ad(index && ptr && ref && heap);
664 ut_a(dict_index_is_clust(index));
665
666 ref_len = dict_index_get_n_unique(index);
667
668 dtuple_t* tuple = dtuple_create(heap, ref_len);
669 *ref = tuple;
670
671 dict_index_copy_types(tuple, index, ref_len);
672
673 for (i = 0; i < ref_len; i++) {
674 const byte* field;
675 ulint len;
676 ulint orig_len;
677
678 dfield_t* dfield = dtuple_get_nth_field(tuple, i);
679
680 ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
681
682 dfield_set_data(dfield, field, len);
683 }
684
685 return(ptr);
686 }
687
688 /*******************************************************************//**
689 Skips a row reference from an undo log record.
690 @return pointer to remaining part of undo record */
691 static
692 byte*
trx_undo_rec_skip_row_ref(byte * ptr,dict_index_t * index)693 trx_undo_rec_skip_row_ref(
694 /*======================*/
695 byte* ptr, /*!< in: remaining part in update undo log
696 record, at the start of the row reference */
697 dict_index_t* index) /*!< in: clustered index */
698 {
699 ulint ref_len;
700 ulint i;
701
702 ut_ad(index && ptr);
703 ut_a(dict_index_is_clust(index));
704
705 ref_len = dict_index_get_n_unique(index);
706
707 for (i = 0; i < ref_len; i++) {
708 const byte* field;
709 ulint len;
710 ulint orig_len;
711
712 ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
713 }
714
715 return(ptr);
716 }
717
718 /** Fetch a prefix of an externally stored column, for writing to the undo
719 log of an update or delete marking of a clustered index record.
720 @param[out] ext_buf buffer to hold the prefix data and BLOB pointer
721 @param[in] prefix_len prefix size to store in the undo log
722 @param[in] page_size page size
723 @param[in] field an externally stored column
724 @param[in,out] len input: length of field; output: used length of
725 ext_buf
726 @return ext_buf */
727 static
728 byte*
trx_undo_page_fetch_ext(byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte * field,ulint * len)729 trx_undo_page_fetch_ext(
730 byte* ext_buf,
731 ulint prefix_len,
732 const page_size_t& page_size,
733 const byte* field,
734 ulint* len)
735 {
736 /* Fetch the BLOB. */
737 ulint ext_len = btr_copy_externally_stored_field_prefix(
738 ext_buf, prefix_len, page_size, field, *len);
739 /* BLOBs should always be nonempty. */
740 ut_a(ext_len);
741 /* Append the BLOB pointer to the prefix. */
742 memcpy(ext_buf + ext_len,
743 field + *len - BTR_EXTERN_FIELD_REF_SIZE,
744 BTR_EXTERN_FIELD_REF_SIZE);
745 *len = ext_len + BTR_EXTERN_FIELD_REF_SIZE;
746 return(ext_buf);
747 }
748
749 /** Writes to the undo log a prefix of an externally stored column.
750 @param[out] ptr undo log position, at least 15 bytes must be
751 available
752 @param[out] ext_buf a buffer of DICT_MAX_FIELD_LEN_BY_FORMAT()
753 size, or NULL when should not fetch a longer
754 prefix
755 @param[in] prefix_len prefix size to store in the undo log
756 @param[in] page_size page size
757 @param[in,out] field the locally stored part of the externally
758 stored column
759 @param[in,out] len length of field, in bytes
760 @param[in] spatial_status whether the column is used by spatial index or
761 regular index
762 @return undo log position */
763 static
764 byte*
trx_undo_page_report_modify_ext(byte * ptr,byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte ** field,ulint * len,spatial_status_t spatial_status)765 trx_undo_page_report_modify_ext(
766 byte* ptr,
767 byte* ext_buf,
768 ulint prefix_len,
769 const page_size_t& page_size,
770 const byte** field,
771 ulint* len,
772 spatial_status_t spatial_status)
773 {
774 ulint spatial_len= 0;
775
776 switch (spatial_status) {
777 case SPATIAL_UNKNOWN:
778 case SPATIAL_NONE:
779 break;
780
781 case SPATIAL_MIXED:
782 case SPATIAL_ONLY:
783 spatial_len = DATA_MBR_LEN;
784 break;
785 }
786
787 /* Encode spatial status into length. */
788 spatial_len |= ulint(spatial_status) << SPATIAL_STATUS_SHIFT;
789
790 if (spatial_status == SPATIAL_ONLY) {
791 /* If the column is only used by gis index, log its
792 MBR is enough.*/
793 ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
794 + spatial_len);
795
796 return(ptr);
797 }
798
799 if (ext_buf) {
800 ut_a(prefix_len > 0);
801
802 /* If an ordering column is externally stored, we will
803 have to store a longer prefix of the field. In this
804 case, write to the log a marker followed by the
805 original length and the real length of the field. */
806 ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD);
807
808 ptr += mach_write_compressed(ptr, *len);
809
810 *field = trx_undo_page_fetch_ext(ext_buf, prefix_len,
811 page_size, *field, len);
812
813 ptr += mach_write_compressed(ptr, *len + spatial_len);
814 } else {
815 ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
816 + *len + spatial_len);
817 }
818
819 return(ptr);
820 }
821
822 /** Get MBR from a Geometry column stored externally
823 @param[out] mbr MBR to fill
824 @param[in] pagesize table pagesize
825 @param[in] field field contain the geometry data
826 @param[in,out] len length of field, in bytes
827 */
828 static
829 void
trx_undo_get_mbr_from_ext(double * mbr,const page_size_t & page_size,const byte * field,ulint * len)830 trx_undo_get_mbr_from_ext(
831 /*======================*/
832 double* mbr,
833 const page_size_t& page_size,
834 const byte* field,
835 ulint* len)
836 {
837 uchar* dptr = NULL;
838 ulint dlen;
839 mem_heap_t* heap = mem_heap_create(100);
840
841 dptr = btr_copy_externally_stored_field(
842 &dlen, field, page_size, *len, heap);
843
844 if (dlen <= GEO_DATA_HEADER_SIZE) {
845 for (uint i = 0; i < SPDIMS; ++i) {
846 mbr[i * 2] = DBL_MAX;
847 mbr[i * 2 + 1] = -DBL_MAX;
848 }
849 } else {
850 rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
851 static_cast<uint>(dlen
852 - GEO_DATA_HEADER_SIZE), SPDIMS, mbr);
853 }
854
855 mem_heap_free(heap);
856 }
857
858 /**********************************************************************//**
859 Reports in the undo log of an update or delete marking of a clustered index
860 record.
861 @return byte offset of the inserted undo log entry on the page if
862 succeed, 0 if fail */
863 static
864 ulint
trx_undo_page_report_modify(buf_block_t * undo_block,trx_t * trx,dict_index_t * index,const rec_t * rec,const rec_offs * offsets,const upd_t * update,ulint cmpl_info,const dtuple_t * row,mtr_t * mtr)865 trx_undo_page_report_modify(
866 /*========================*/
867 buf_block_t* undo_block, /*!< in: undo log page */
868 trx_t* trx, /*!< in: transaction */
869 dict_index_t* index, /*!< in: clustered index where update or
870 delete marking is done */
871 const rec_t* rec, /*!< in: clustered index record which
872 has NOT yet been modified */
873 const rec_offs* offsets, /*!< in: rec_get_offsets(rec, index) */
874 const upd_t* update, /*!< in: update vector which tells the
875 columns to be updated; in the case of
876 a delete, this should be set to NULL */
877 ulint cmpl_info, /*!< in: compiler info on secondary
878 index updates */
879 const dtuple_t* row, /*!< in: clustered index row contains
880 virtual column info */
881 mtr_t* mtr) /*!< in: mtr */
882 {
883 ulint first_free;
884 byte* ptr;
885
886 ut_ad(index->is_primary());
887 ut_ad(rec_offs_validate(rec, index, offsets));
888 /* MariaDB 10.3.1+ in trx_undo_page_init() always initializes
889 TRX_UNDO_PAGE_TYPE as 0, but previous versions wrote
890 TRX_UNDO_INSERT == 1 into insert_undo pages,
891 or TRX_UNDO_UPDATE == 2 into update_undo pages. */
892 ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
893 + undo_block->frame) <= 2);
894
895 first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
896 + undo_block->frame);
897 ptr = undo_block->frame + first_free;
898
899 ut_ad(first_free <= srv_page_size);
900
901 if (trx_undo_left(undo_block, ptr) < 50) {
902 /* NOTE: the value 50 must be big enough so that the general
903 fields written below fit on the undo log page */
904 return 0;
905 }
906
907 /* Reserve 2 bytes for the pointer to the next undo log record */
908 ptr += 2;
909
910 dict_table_t* table = index->table;
911 const byte* field;
912 ulint flen;
913 ulint col_no;
914 ulint type_cmpl;
915 byte* type_cmpl_ptr;
916 ulint i;
917 trx_id_t trx_id;
918 ibool ignore_prefix = FALSE;
919 byte ext_buf[REC_VERSION_56_MAX_INDEX_COL_LEN
920 + BTR_EXTERN_FIELD_REF_SIZE];
921 bool first_v_col = true;
922
923 /* Store first some general parameters to the undo log */
924
925 if (!update) {
926 ut_ad(!rec_get_deleted_flag(rec, dict_table_is_comp(table)));
927 type_cmpl = TRX_UNDO_DEL_MARK_REC;
928 } else if (rec_get_deleted_flag(rec, dict_table_is_comp(table))) {
929 /* In delete-marked records, DB_TRX_ID must
930 always refer to an existing update_undo log record. */
931 ut_ad(row_get_rec_trx_id(rec, index, offsets));
932
933 type_cmpl = TRX_UNDO_UPD_DEL_REC;
934 /* We are about to update a delete marked record.
935 We don't typically need the prefix in this case unless
936 the delete marking is done by the same transaction
937 (which we check below). */
938 ignore_prefix = TRUE;
939 } else {
940 type_cmpl = TRX_UNDO_UPD_EXIST_REC;
941 }
942
943 type_cmpl |= cmpl_info * TRX_UNDO_CMPL_INFO_MULT;
944 type_cmpl_ptr = ptr;
945
946 *ptr++ = (byte) type_cmpl;
947 ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
948
949 ptr += mach_u64_write_much_compressed(ptr, table->id);
950
951 /*----------------------------------------*/
952 /* Store the state of the info bits */
953
954 *ptr++ = (byte) rec_get_info_bits(rec, dict_table_is_comp(table));
955
956 /* Store the values of the system columns */
957 field = rec_get_nth_field(rec, offsets,
958 dict_index_get_sys_col_pos(
959 index, DATA_TRX_ID), &flen);
960 ut_ad(flen == DATA_TRX_ID_LEN);
961
962 trx_id = trx_read_trx_id(field);
963
964 /* If it is an update of a delete marked record, then we are
965 allowed to ignore blob prefixes if the delete marking was done
966 by some other trx as it must have committed by now for us to
967 allow an over-write. */
968 if (trx_id == trx->id) {
969 ignore_prefix = false;
970 }
971 ptr += mach_u64_write_compressed(ptr, trx_id);
972
973 field = rec_get_nth_field(rec, offsets,
974 dict_index_get_sys_col_pos(
975 index, DATA_ROLL_PTR), &flen);
976 ut_ad(flen == DATA_ROLL_PTR_LEN);
977 ut_ad(memcmp(field, field_ref_zero, DATA_ROLL_PTR_LEN));
978
979 ptr += mach_u64_write_compressed(ptr, trx_read_roll_ptr(field));
980
981 /*----------------------------------------*/
982 /* Store then the fields required to uniquely determine the
983 record which will be modified in the clustered index */
984
985 for (i = 0; i < dict_index_get_n_unique(index); i++) {
986
987 /* The ordering columns must not be instant added columns. */
988 ut_ad(!rec_offs_nth_default(offsets, i));
989 field = rec_get_nth_field(rec, offsets, i, &flen);
990
991 /* The ordering columns must not be stored externally. */
992 ut_ad(!rec_offs_nth_extern(offsets, i));
993 ut_ad(dict_index_get_nth_col(index, i)->ord_part);
994
995 if (trx_undo_left(undo_block, ptr) < 5) {
996 return(0);
997 }
998
999 ptr += mach_write_compressed(ptr, flen);
1000
1001 if (flen != UNIV_SQL_NULL) {
1002 if (trx_undo_left(undo_block, ptr) < flen) {
1003 return(0);
1004 }
1005
1006 memcpy(ptr, field, flen);
1007 ptr += flen;
1008 }
1009 }
1010
1011 /*----------------------------------------*/
1012 /* Save to the undo log the old values of the columns to be updated. */
1013
1014 if (update) {
1015 if (trx_undo_left(undo_block, ptr) < 5) {
1016 return(0);
1017 }
1018
1019 ulint n_updated = upd_get_n_fields(update);
1020
1021 /* If this is an online update while an inplace alter table
1022 is in progress and the table has virtual column, we will
1023 need to double check if there are any non-indexed columns
1024 being registered in update vector in case they will be indexed
1025 in new table */
1026 if (dict_index_is_online_ddl(index) && table->n_v_cols > 0) {
1027 for (i = 0; i < upd_get_n_fields(update); i++) {
1028 upd_field_t* fld = upd_get_nth_field(
1029 update, i);
1030 ulint pos = fld->field_no;
1031
1032 /* These columns must not have an index
1033 on them */
1034 if (upd_fld_is_virtual_col(fld)
1035 && dict_table_get_nth_v_col(
1036 table, pos)->v_indexes->empty()) {
1037 n_updated--;
1038 }
1039 }
1040 }
1041
1042 ptr += mach_write_compressed(ptr, n_updated);
1043
1044 for (i = 0; i < upd_get_n_fields(update); i++) {
1045 upd_field_t* fld = upd_get_nth_field(update, i);
1046
1047 bool is_virtual = upd_fld_is_virtual_col(fld);
1048 ulint max_v_log_len = 0;
1049
1050 ulint pos = fld->field_no;
1051
1052 /* Write field number to undo log */
1053 if (trx_undo_left(undo_block, ptr) < 5) {
1054 return(0);
1055 }
1056
1057 if (is_virtual) {
1058 /* Skip the non-indexed column, during
1059 an online alter table */
1060 if (dict_index_is_online_ddl(index)
1061 && dict_table_get_nth_v_col(
1062 table, pos)->v_indexes->empty()) {
1063 continue;
1064 }
1065
1066 /* add REC_MAX_N_FIELDS to mark this
1067 is a virtual col */
1068 pos += REC_MAX_N_FIELDS;
1069 }
1070
1071 ptr += mach_write_compressed(ptr, pos);
1072
1073 /* Save the old value of field */
1074 if (is_virtual) {
1075 ut_ad(fld->field_no < table->n_v_def);
1076
1077 ptr = trx_undo_log_v_idx(undo_block, table,
1078 fld->field_no, ptr,
1079 first_v_col);
1080 if (ptr == NULL) {
1081 return(0);
1082 }
1083 first_v_col = false;
1084
1085 max_v_log_len
1086 = dict_max_v_field_len_store_undo(
1087 table, fld->field_no);
1088
1089 field = static_cast<byte*>(
1090 fld->old_v_val->data);
1091 flen = fld->old_v_val->len;
1092
1093 /* Only log sufficient bytes for index
1094 record update */
1095 if (flen != UNIV_SQL_NULL) {
1096 flen = ut_min(
1097 flen, max_v_log_len);
1098 }
1099 } else {
1100 field = rec_get_nth_cfield(
1101 rec, index, offsets, pos, &flen);
1102 }
1103
1104 if (trx_undo_left(undo_block, ptr) < 15) {
1105 return(0);
1106 }
1107
1108 if (!is_virtual && rec_offs_nth_extern(offsets, pos)) {
1109 const dict_col_t* col
1110 = dict_index_get_nth_col(index, pos);
1111 ulint prefix_len
1112 = dict_max_field_len_store_undo(
1113 table, col);
1114
1115 ut_ad(prefix_len + BTR_EXTERN_FIELD_REF_SIZE
1116 <= sizeof ext_buf);
1117
1118 ptr = trx_undo_page_report_modify_ext(
1119 ptr,
1120 col->ord_part
1121 && !ignore_prefix
1122 && flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1123 ? ext_buf : NULL, prefix_len,
1124 dict_table_page_size(table),
1125 &field, &flen, SPATIAL_UNKNOWN);
1126
1127 *type_cmpl_ptr |= TRX_UNDO_UPD_EXTERN;
1128 } else {
1129 ptr += mach_write_compressed(ptr, flen);
1130 }
1131
1132 if (flen != UNIV_SQL_NULL) {
1133 if (trx_undo_left(undo_block, ptr) < flen) {
1134 return(0);
1135 }
1136
1137 memcpy(ptr, field, flen);
1138 ptr += flen;
1139 }
1140
1141 /* Also record the new value for virtual column */
1142 if (is_virtual) {
1143 field = static_cast<byte*>(fld->new_val.data);
1144 flen = fld->new_val.len;
1145 if (flen != UNIV_SQL_NULL) {
1146 flen = ut_min(
1147 flen, max_v_log_len);
1148 }
1149
1150 if (trx_undo_left(undo_block, ptr) < 15) {
1151 return(0);
1152 }
1153
1154 ptr += mach_write_compressed(ptr, flen);
1155
1156 if (flen != UNIV_SQL_NULL) {
1157 if (trx_undo_left(undo_block, ptr)
1158 < flen) {
1159 return(0);
1160 }
1161
1162 memcpy(ptr, field, flen);
1163 ptr += flen;
1164 }
1165 }
1166 }
1167 }
1168
1169 /* Reset the first_v_col, so to put the virtual column undo
1170 version marker again, when we log all the indexed columns */
1171 first_v_col = true;
1172
1173 /*----------------------------------------*/
1174 /* In the case of a delete marking, and also in the case of an update
1175 where any ordering field of any index changes, store the values of all
1176 columns which occur as ordering fields in any index. This info is used
1177 in the purge of old versions where we use it to build and search the
1178 delete marked index records, to look if we can remove them from the
1179 index tree. Note that starting from 4.0.14 also externally stored
1180 fields can be ordering in some index. Starting from 5.2, we no longer
1181 store REC_MAX_INDEX_COL_LEN first bytes to the undo log record,
1182 but we can construct the column prefix fields in the index by
1183 fetching the first page of the BLOB that is pointed to by the
1184 clustered index. This works also in crash recovery, because all pages
1185 (including BLOBs) are recovered before anything is rolled back. */
1186
1187 if (!update || !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1188 byte* old_ptr = ptr;
1189 double mbr[SPDIMS * 2];
1190 mem_heap_t* row_heap = NULL;
1191
1192 if (trx_undo_left(undo_block, ptr) < 5) {
1193 return(0);
1194 }
1195
1196 /* Reserve 2 bytes to write the number of bytes the stored
1197 fields take in this undo record */
1198
1199 ptr += 2;
1200
1201 for (col_no = 0; col_no < dict_table_get_n_cols(table);
1202 col_no++) {
1203
1204 const dict_col_t* col
1205 = dict_table_get_nth_col(table, col_no);
1206
1207 if (!col->ord_part) {
1208 continue;
1209 }
1210
1211 const ulint pos = dict_index_get_nth_col_pos(
1212 index, col_no, NULL);
1213 /* All non-virtual columns must be present in
1214 the clustered index. */
1215 ut_ad(pos != ULINT_UNDEFINED);
1216
1217 const bool is_ext = rec_offs_nth_extern(offsets, pos);
1218 const spatial_status_t spatial_status = is_ext
1219 ? dict_col_get_spatial_status(col)
1220 : SPATIAL_NONE;
1221
1222 switch (spatial_status) {
1223 case SPATIAL_UNKNOWN:
1224 ut_ad(0);
1225 /* fall through */
1226 case SPATIAL_MIXED:
1227 case SPATIAL_ONLY:
1228 /* Externally stored spatially indexed
1229 columns will be (redundantly) logged
1230 again, because we did not write the
1231 MBR yet, that is, the previous call to
1232 trx_undo_page_report_modify_ext()
1233 was with SPATIAL_UNKNOWN. */
1234 break;
1235 case SPATIAL_NONE:
1236 if (!update) {
1237 /* This is a DELETE operation. */
1238 break;
1239 }
1240 /* Avoid redundantly logging indexed
1241 columns that were updated. */
1242
1243 for (i = 0; i < update->n_fields; i++) {
1244 const ulint field_no
1245 = upd_get_nth_field(update, i)
1246 ->field_no;
1247 if (field_no >= index->n_fields
1248 || dict_index_get_nth_field(
1249 index, field_no)->col
1250 == col) {
1251 goto already_logged;
1252 }
1253 }
1254 }
1255
1256 if (true) {
1257 /* Write field number to undo log */
1258 if (trx_undo_left(undo_block, ptr) < 5 + 15) {
1259 return(0);
1260 }
1261
1262 ptr += mach_write_compressed(ptr, pos);
1263
1264 /* Save the old value of field */
1265 field = rec_get_nth_cfield(
1266 rec, index, offsets, pos, &flen);
1267
1268 if (is_ext) {
1269 const dict_col_t* col =
1270 dict_index_get_nth_col(
1271 index, pos);
1272 ulint prefix_len =
1273 dict_max_field_len_store_undo(
1274 table, col);
1275
1276 ut_a(prefix_len < sizeof ext_buf);
1277
1278 /* If there is a spatial index on it,
1279 log its MBR */
1280 if (spatial_status != SPATIAL_NONE) {
1281 ut_ad(DATA_GEOMETRY_MTYPE(
1282 col->mtype));
1283
1284 trx_undo_get_mbr_from_ext(
1285 mbr,
1286 dict_table_page_size(
1287 table),
1288 field, &flen);
1289 }
1290
1291 ptr = trx_undo_page_report_modify_ext(
1292 ptr,
1293 flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1294 && !ignore_prefix
1295 ? ext_buf : NULL, prefix_len,
1296 dict_table_page_size(table),
1297 &field, &flen,
1298 spatial_status);
1299 } else {
1300 ptr += mach_write_compressed(
1301 ptr, flen);
1302 }
1303
1304 if (flen != UNIV_SQL_NULL
1305 && spatial_status != SPATIAL_ONLY) {
1306 if (trx_undo_left(undo_block, ptr)
1307 < flen) {
1308 return(0);
1309 }
1310
1311 memcpy(ptr, field, flen);
1312 ptr += flen;
1313 }
1314
1315 if (spatial_status != SPATIAL_NONE) {
1316 if (trx_undo_left(undo_block, ptr)
1317 < DATA_MBR_LEN) {
1318 return(0);
1319 }
1320
1321 for (int i = 0; i < SPDIMS * 2;
1322 i++) {
1323 mach_double_write(
1324 ptr, mbr[i]);
1325 ptr += sizeof(double);
1326 }
1327 }
1328 }
1329
1330 already_logged:
1331 continue;
1332 }
1333
1334 for (col_no = 0; col_no < dict_table_get_n_v_cols(table);
1335 col_no++) {
1336 const dict_v_col_t* col
1337 = dict_table_get_nth_v_col(table, col_no);
1338
1339 if (col->m_col.ord_part) {
1340 ulint pos = col_no;
1341 ulint max_v_log_len
1342 = dict_max_v_field_len_store_undo(
1343 table, pos);
1344
1345 /* Write field number to undo log.
1346 Make sure there is enought space in log */
1347 if (trx_undo_left(undo_block, ptr) < 5) {
1348 return(0);
1349 }
1350
1351 pos += REC_MAX_N_FIELDS;
1352 ptr += mach_write_compressed(ptr, pos);
1353
1354 ut_ad(col_no < table->n_v_def);
1355 ptr = trx_undo_log_v_idx(undo_block, table,
1356 col_no, ptr,
1357 first_v_col);
1358 first_v_col = false;
1359
1360 if (!ptr) {
1361 return(0);
1362 }
1363
1364 const dfield_t* vfield = NULL;
1365
1366 if (update) {
1367 ut_ad(!row);
1368 if (update->old_vrow == NULL) {
1369 flen = UNIV_SQL_NULL;
1370 } else {
1371 vfield = dtuple_get_nth_v_field(
1372 update->old_vrow,
1373 col->v_pos);
1374 }
1375 } else if (row) {
1376 vfield = dtuple_get_nth_v_field(
1377 row, col->v_pos);
1378 } else {
1379 ut_ad(0);
1380 }
1381
1382 if (vfield) {
1383 field = static_cast<byte*>(vfield->data);
1384 flen = vfield->len;
1385 } else {
1386 ut_ad(flen == UNIV_SQL_NULL);
1387 }
1388
1389 if (flen != UNIV_SQL_NULL) {
1390 flen = ut_min(
1391 flen, max_v_log_len);
1392 }
1393
1394 ptr += mach_write_compressed(ptr, flen);
1395
1396 switch (flen) {
1397 case 0: case UNIV_SQL_NULL:
1398 break;
1399 default:
1400 if (trx_undo_left(undo_block, ptr)
1401 < flen) {
1402 return(0);
1403 }
1404
1405 memcpy(ptr, field, flen);
1406 ptr += flen;
1407 }
1408 }
1409 }
1410
1411 mach_write_to_2(old_ptr, ulint(ptr - old_ptr));
1412
1413 if (row_heap) {
1414 mem_heap_free(row_heap);
1415 }
1416 }
1417
1418 /*----------------------------------------*/
1419 /* Write pointers to the previous and the next undo log records */
1420 if (trx_undo_left(undo_block, ptr) < 2) {
1421 return(0);
1422 }
1423
1424 mach_write_to_2(ptr, first_free);
1425 ptr += 2;
1426 const ulint new_free = ulint(ptr - undo_block->frame);
1427 mach_write_to_2(undo_block->frame + first_free, new_free);
1428
1429 mach_write_to_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
1430 + undo_block->frame, new_free);
1431
1432 /* Write to the REDO log about this change in the UNDO log */
1433 trx_undof_page_add_undo_rec_log(undo_block, first_free, new_free, mtr);
1434 return(first_free);
1435 }
1436
1437 /**********************************************************************//**
1438 Reads from an undo log update record the system field values of the old
1439 version.
1440 @return remaining part of undo log record after reading these values */
1441 byte*
trx_undo_update_rec_get_sys_cols(const byte * ptr,trx_id_t * trx_id,roll_ptr_t * roll_ptr,ulint * info_bits)1442 trx_undo_update_rec_get_sys_cols(
1443 /*=============================*/
1444 const byte* ptr, /*!< in: remaining part of undo
1445 log record after reading
1446 general parameters */
1447 trx_id_t* trx_id, /*!< out: trx id */
1448 roll_ptr_t* roll_ptr, /*!< out: roll ptr */
1449 ulint* info_bits) /*!< out: info bits state */
1450 {
1451 /* Read the state of the info bits */
1452 *info_bits = mach_read_from_1(ptr);
1453 ptr += 1;
1454
1455 /* Read the values of the system columns */
1456
1457 *trx_id = mach_u64_read_next_compressed(&ptr);
1458 *roll_ptr = mach_u64_read_next_compressed(&ptr);
1459
1460 return(const_cast<byte*>(ptr));
1461 }
1462
1463 /*******************************************************************//**
1464 Builds an update vector based on a remaining part of an undo log record.
1465 @return remaining part of the record, NULL if an error detected, which
1466 means that the record is corrupted */
1467 byte*
trx_undo_update_rec_get_update(const byte * ptr,dict_index_t * index,ulint type,trx_id_t trx_id,roll_ptr_t roll_ptr,ulint info_bits,mem_heap_t * heap,upd_t ** upd)1468 trx_undo_update_rec_get_update(
1469 /*===========================*/
1470 const byte* ptr, /*!< in: remaining part in update undo log
1471 record, after reading the row reference
1472 NOTE that this copy of the undo log record must
1473 be preserved as long as the update vector is
1474 used, as we do NOT copy the data in the
1475 record! */
1476 dict_index_t* index, /*!< in: clustered index */
1477 ulint type, /*!< in: TRX_UNDO_UPD_EXIST_REC,
1478 TRX_UNDO_UPD_DEL_REC, or
1479 TRX_UNDO_DEL_MARK_REC; in the last case,
1480 only trx id and roll ptr fields are added to
1481 the update vector */
1482 trx_id_t trx_id, /*!< in: transaction id from this undo record */
1483 roll_ptr_t roll_ptr,/*!< in: roll pointer from this undo record */
1484 ulint info_bits,/*!< in: info bits from this undo record */
1485 mem_heap_t* heap, /*!< in: memory heap from which the memory
1486 needed is allocated */
1487 upd_t** upd) /*!< out, own: update vector */
1488 {
1489 upd_field_t* upd_field;
1490 upd_t* update;
1491 ulint n_fields;
1492 byte* buf;
1493 ulint i;
1494 bool first_v_col = true;
1495 bool is_undo_log = true;
1496 ulint n_skip_field = 0;
1497
1498 ut_a(dict_index_is_clust(index));
1499
1500 if (type != TRX_UNDO_DEL_MARK_REC) {
1501 n_fields = mach_read_next_compressed(&ptr);
1502 } else {
1503 n_fields = 0;
1504 }
1505
1506 update = upd_create(n_fields + 2, heap);
1507
1508 update->info_bits = info_bits;
1509
1510 /* Store first trx id and roll ptr to update vector */
1511
1512 upd_field = upd_get_nth_field(update, n_fields);
1513
1514 buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_TRX_ID_LEN));
1515
1516 mach_write_to_6(buf, trx_id);
1517
1518 upd_field_set_field_no(upd_field,
1519 dict_index_get_sys_col_pos(index, DATA_TRX_ID),
1520 index);
1521 dfield_set_data(&(upd_field->new_val), buf, DATA_TRX_ID_LEN);
1522
1523 upd_field = upd_get_nth_field(update, n_fields + 1);
1524
1525 buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_ROLL_PTR_LEN));
1526
1527 trx_write_roll_ptr(buf, roll_ptr);
1528
1529 upd_field_set_field_no(
1530 upd_field, dict_index_get_sys_col_pos(index, DATA_ROLL_PTR),
1531 index);
1532 dfield_set_data(&(upd_field->new_val), buf, DATA_ROLL_PTR_LEN);
1533
1534 /* Store then the updated ordinary columns to the update vector */
1535
1536 for (i = 0; i < n_fields; i++) {
1537
1538 const byte* field;
1539 ulint len;
1540 ulint field_no;
1541 ulint orig_len;
1542 bool is_virtual;
1543
1544 upd_field = upd_get_nth_field(update, i);
1545 field_no = mach_read_next_compressed(&ptr);
1546
1547 is_virtual = (field_no >= REC_MAX_N_FIELDS);
1548
1549 if (is_virtual) {
1550 /* If new version, we need to check index list to figure
1551 out the correct virtual column position */
1552 ptr = trx_undo_read_v_idx(
1553 index->table, ptr, first_v_col, &is_undo_log,
1554 &field_no);
1555 first_v_col = false;
1556 /* This column could be dropped or no longer indexed */
1557 if (field_no == ULINT_UNDEFINED) {
1558 /* Mark this is no longer needed */
1559 upd_field->field_no = REC_MAX_N_FIELDS;
1560
1561 ptr = trx_undo_rec_get_col_val(
1562 ptr, &field, &len, &orig_len);
1563 ptr = trx_undo_rec_get_col_val(
1564 ptr, &field, &len, &orig_len);
1565 n_skip_field++;
1566 continue;
1567 }
1568
1569 upd_field_set_v_field_no(upd_field, field_no, index);
1570 } else if (field_no < index->n_fields) {
1571 upd_field_set_field_no(upd_field, field_no, index);
1572 } else if (update->info_bits == REC_INFO_MIN_REC_FLAG
1573 && index->is_instant()) {
1574 /* This must be a rollback of a subsequent
1575 instant ADD COLUMN operation. This will be
1576 detected and handled by btr_cur_trim(). */
1577 upd_field->field_no = field_no;
1578 upd_field->orig_len = 0;
1579 } else {
1580 ib::error() << "Trying to access update undo rec"
1581 " field " << field_no
1582 << " in index " << index->name
1583 << " of table " << index->table->name
1584 << " but index has only "
1585 << dict_index_get_n_fields(index)
1586 << " fields " << BUG_REPORT_MSG
1587 << ". Run also CHECK TABLE "
1588 << index->table->name << "."
1589 " n_fields = " << n_fields << ", i = " << i;
1590
1591 ut_ad(0);
1592 *upd = NULL;
1593 return(NULL);
1594 }
1595
1596 ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1597
1598 upd_field->orig_len = orig_len;
1599
1600 if (len == UNIV_SQL_NULL) {
1601 dfield_set_null(&upd_field->new_val);
1602 } else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1603 dfield_set_data(&upd_field->new_val, field, len);
1604 } else {
1605 len -= UNIV_EXTERN_STORAGE_FIELD;
1606
1607 dfield_set_data(&upd_field->new_val, field, len);
1608 dfield_set_ext(&upd_field->new_val);
1609 }
1610
1611 if (is_virtual) {
1612 upd_field->old_v_val = static_cast<dfield_t*>(
1613 mem_heap_alloc(
1614 heap, sizeof *upd_field->old_v_val));
1615 ptr = trx_undo_rec_get_col_val(
1616 ptr, &field, &len, &orig_len);
1617 if (len == UNIV_SQL_NULL) {
1618 dfield_set_null(upd_field->old_v_val);
1619 } else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1620 dfield_set_data(
1621 upd_field->old_v_val, field, len);
1622 } else {
1623 ut_ad(0);
1624 }
1625 }
1626 }
1627
1628 /* In rare scenario, we could have skipped virtual column (as they
1629 are dropped. We will regenerate a update vector and skip them */
1630 if (n_skip_field > 0) {
1631 ulint n = 0;
1632 ut_ad(n_skip_field <= n_fields);
1633
1634 upd_t* new_update = upd_create(
1635 n_fields + 2 - n_skip_field, heap);
1636
1637 for (i = 0; i < n_fields + 2; i++) {
1638 upd_field = upd_get_nth_field(update, i);
1639
1640 if (upd_field->field_no == REC_MAX_N_FIELDS) {
1641 continue;
1642 }
1643
1644 upd_field_t* new_upd_field
1645 = upd_get_nth_field(new_update, n);
1646 *new_upd_field = *upd_field;
1647 n++;
1648 }
1649 ut_ad(n == n_fields + 2 - n_skip_field);
1650 *upd = new_update;
1651 } else {
1652 *upd = update;
1653 }
1654
1655 return(const_cast<byte*>(ptr));
1656 }
1657
1658 /*******************************************************************//**
1659 Builds a partial row from an update undo log record, for purge.
1660 It contains the columns which occur as ordering in any index of the table.
1661 Any missing columns are indicated by col->mtype == DATA_MISSING.
1662 @return pointer to remaining part of undo record */
1663 byte*
trx_undo_rec_get_partial_row(const byte * ptr,dict_index_t * index,const upd_t * update,dtuple_t ** row,ibool ignore_prefix,mem_heap_t * heap)1664 trx_undo_rec_get_partial_row(
1665 /*=========================*/
1666 const byte* ptr, /*!< in: remaining part in update undo log
1667 record of a suitable type, at the start of
1668 the stored index columns;
1669 NOTE that this copy of the undo log record must
1670 be preserved as long as the partial row is
1671 used, as we do NOT copy the data in the
1672 record! */
1673 dict_index_t* index, /*!< in: clustered index */
1674 const upd_t* update, /*!< in: updated columns */
1675 dtuple_t** row, /*!< out, own: partial row */
1676 ibool ignore_prefix, /*!< in: flag to indicate if we
1677 expect blob prefixes in undo. Used
1678 only in the assertion. */
1679 mem_heap_t* heap) /*!< in: memory heap from which the memory
1680 needed is allocated */
1681 {
1682 const byte* end_ptr;
1683 bool first_v_col = true;
1684 bool is_undo_log = true;
1685
1686 ut_ad(index->is_primary());
1687
1688 *row = dtuple_create_with_vcol(
1689 heap, dict_table_get_n_cols(index->table),
1690 dict_table_get_n_v_cols(index->table));
1691
1692 /* Mark all columns in the row uninitialized, so that
1693 we can distinguish missing fields from fields that are SQL NULL. */
1694 for (ulint i = 0; i < dict_table_get_n_cols(index->table); i++) {
1695 dfield_get_type(dtuple_get_nth_field(*row, i))
1696 ->mtype = DATA_MISSING;
1697 }
1698
1699 dtuple_init_v_fld(*row);
1700
1701 for (const upd_field_t* uf = update->fields, * const ue
1702 = update->fields + update->n_fields;
1703 uf != ue; uf++) {
1704 if (uf->old_v_val) {
1705 continue;
1706 }
1707 ulint c = dict_index_get_nth_col(index, uf->field_no)->ind;
1708 *dtuple_get_nth_field(*row, c) = uf->new_val;
1709 }
1710
1711 end_ptr = ptr + mach_read_from_2(ptr);
1712 ptr += 2;
1713
1714 while (ptr != end_ptr) {
1715 dfield_t* dfield;
1716 const byte* field;
1717 ulint field_no;
1718 const dict_col_t* col;
1719 ulint col_no;
1720 ulint len;
1721 ulint orig_len;
1722 bool is_virtual;
1723
1724 field_no = mach_read_next_compressed(&ptr);
1725
1726 is_virtual = (field_no >= REC_MAX_N_FIELDS);
1727
1728 if (is_virtual) {
1729 ptr = trx_undo_read_v_idx(
1730 index->table, ptr, first_v_col, &is_undo_log,
1731 &field_no);
1732 first_v_col = false;
1733 }
1734
1735 ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1736
1737 /* This column could be dropped or no longer indexed */
1738 if (field_no == ULINT_UNDEFINED) {
1739 ut_ad(is_virtual);
1740 continue;
1741 }
1742
1743 if (is_virtual) {
1744 dict_v_col_t* vcol = dict_table_get_nth_v_col(
1745 index->table, field_no);
1746 col = &vcol->m_col;
1747 col_no = dict_col_get_no(col);
1748 dfield = dtuple_get_nth_v_field(*row, vcol->v_pos);
1749 dict_col_copy_type(
1750 &vcol->m_col,
1751 dfield_get_type(dfield));
1752 } else {
1753 col = dict_index_get_nth_col(index, field_no);
1754 col_no = dict_col_get_no(col);
1755 dfield = dtuple_get_nth_field(*row, col_no);
1756 ut_ad(dfield->type.mtype == DATA_MISSING
1757 || dict_col_type_assert_equal(col,
1758 &dfield->type));
1759 ut_ad(dfield->type.mtype == DATA_MISSING
1760 || dfield->len == len
1761 || (len != UNIV_SQL_NULL
1762 && len >= UNIV_EXTERN_STORAGE_FIELD));
1763 dict_col_copy_type(
1764 dict_table_get_nth_col(index->table, col_no),
1765 dfield_get_type(dfield));
1766 }
1767
1768 dfield_set_data(dfield, field, len);
1769
1770 if (len != UNIV_SQL_NULL
1771 && len >= UNIV_EXTERN_STORAGE_FIELD) {
1772 spatial_status_t spatial_status;
1773
1774 /* Decode spatial status. */
1775 spatial_status = static_cast<spatial_status_t>(
1776 (len & SPATIAL_STATUS_MASK)
1777 >> SPATIAL_STATUS_SHIFT);
1778 len &= ~SPATIAL_STATUS_MASK;
1779
1780 /* Keep compatible with 5.7.9 format. */
1781 if (spatial_status == SPATIAL_UNKNOWN) {
1782 spatial_status =
1783 dict_col_get_spatial_status(col);
1784 }
1785
1786 switch (spatial_status) {
1787 case SPATIAL_ONLY:
1788 ut_ad(len - UNIV_EXTERN_STORAGE_FIELD
1789 == DATA_MBR_LEN);
1790 dfield_set_len(
1791 dfield,
1792 len - UNIV_EXTERN_STORAGE_FIELD);
1793 break;
1794
1795 case SPATIAL_MIXED:
1796 dfield_set_len(
1797 dfield,
1798 len - UNIV_EXTERN_STORAGE_FIELD
1799 - DATA_MBR_LEN);
1800 break;
1801
1802 case SPATIAL_NONE:
1803 dfield_set_len(
1804 dfield,
1805 len - UNIV_EXTERN_STORAGE_FIELD);
1806 break;
1807
1808 case SPATIAL_UNKNOWN:
1809 ut_ad(0);
1810 break;
1811 }
1812
1813 dfield_set_ext(dfield);
1814 dfield_set_spatial_status(dfield, spatial_status);
1815
1816 /* If the prefix of this column is indexed,
1817 ensure that enough prefix is stored in the
1818 undo log record. */
1819 if (!ignore_prefix && col->ord_part
1820 && spatial_status != SPATIAL_ONLY) {
1821 ut_a(dfield_get_len(dfield)
1822 >= BTR_EXTERN_FIELD_REF_SIZE);
1823 ut_a(dict_table_has_atomic_blobs(index->table)
1824 || dfield_get_len(dfield)
1825 >= REC_ANTELOPE_MAX_INDEX_COL_LEN
1826 + BTR_EXTERN_FIELD_REF_SIZE);
1827 }
1828 }
1829 }
1830
1831 return(const_cast<byte*>(ptr));
1832 }
1833
1834 /** Erase the unused undo log page end.
1835 @param[in,out] undo_page undo log page
1836 @return whether the page contained something */
1837 bool
trx_undo_erase_page_end(page_t * undo_page)1838 trx_undo_erase_page_end(page_t* undo_page)
1839 {
1840 ulint first_free;
1841
1842 first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
1843 + TRX_UNDO_PAGE_FREE);
1844 memset(undo_page + first_free, 0,
1845 (srv_page_size - FIL_PAGE_DATA_END) - first_free);
1846
1847 return(first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
1848 }
1849
1850 /** Report a RENAME TABLE operation.
1851 @param[in,out] trx transaction
1852 @param[in] table table that is being renamed
1853 @param[in,out] block undo page
1854 @param[in,out] mtr mini-transaction
1855 @return byte offset of the undo log record
1856 @retval 0 in case of failure */
1857 static
1858 ulint
trx_undo_page_report_rename(trx_t * trx,const dict_table_t * table,buf_block_t * block,mtr_t * mtr)1859 trx_undo_page_report_rename(trx_t* trx, const dict_table_t* table,
1860 buf_block_t* block, mtr_t* mtr)
1861 {
1862 byte* ptr_first_free = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
1863 + block->frame;
1864 ulint first_free = mach_read_from_2(ptr_first_free);
1865 ut_ad(first_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
1866 ut_ad(first_free <= srv_page_size);
1867 byte* start = block->frame + first_free;
1868 size_t len = strlen(table->name.m_name);
1869 const size_t fixed = 2 + 1 + 11 + 11 + 2;
1870 ut_ad(len <= NAME_LEN * 2 + 1);
1871 /* The -10 is used in trx_undo_left() */
1872 compile_time_assert((NAME_LEN * 1) * 2 + fixed
1873 + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE
1874 < UNIV_PAGE_SIZE_MIN - 10 - FIL_PAGE_DATA_END);
1875
1876 if (trx_undo_left(block, start) < fixed + len) {
1877 ut_ad(first_free > TRX_UNDO_PAGE_HDR
1878 + TRX_UNDO_PAGE_HDR_SIZE);
1879 return 0;
1880 }
1881
1882 byte* ptr = start + 2;
1883 *ptr++ = TRX_UNDO_RENAME_TABLE;
1884 ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
1885 ptr += mach_u64_write_much_compressed(ptr, table->id);
1886 memcpy(ptr, table->name.m_name, len);
1887 ptr += len;
1888 mach_write_to_2(ptr, first_free);
1889 ptr += 2;
1890 ulint offset = page_offset(ptr);
1891 mach_write_to_2(start, offset);
1892 mach_write_to_2(ptr_first_free, offset);
1893
1894 trx_undof_page_add_undo_rec_log(block, first_free, offset, mtr);
1895 return first_free;
1896 }
1897
1898 /** Report a RENAME TABLE operation.
1899 @param[in,out] trx transaction
1900 @param[in] table table that is being renamed
1901 @return DB_SUCCESS or error code */
trx_undo_report_rename(trx_t * trx,const dict_table_t * table)1902 dberr_t trx_undo_report_rename(trx_t* trx, const dict_table_t* table)
1903 {
1904 ut_ad(!trx->read_only);
1905 ut_ad(trx->id);
1906 ut_ad(!table->is_temporary());
1907
1908 mtr_t mtr;
1909 dberr_t err;
1910 mtr.start();
1911 if (buf_block_t* block = trx_undo_assign(trx, &err, &mtr)) {
1912 trx_undo_t* undo = trx->rsegs.m_redo.undo;
1913 ut_ad(err == DB_SUCCESS);
1914 ut_ad(undo);
1915 for (ut_d(int loop_count = 0);;) {
1916 ut_ad(loop_count++ < 2);
1917 ut_ad(undo->last_page_no == block->page.id.page_no());
1918
1919 if (ulint offset = trx_undo_page_report_rename(
1920 trx, table, block, &mtr)) {
1921 undo->top_page_no = undo->last_page_no;
1922 undo->top_offset = offset;
1923 undo->top_undo_no = trx->undo_no++;
1924 undo->guess_block = block;
1925 ut_ad(!undo->empty());
1926
1927 err = DB_SUCCESS;
1928 break;
1929 } else {
1930 mtr.commit();
1931 mtr.start();
1932 block = trx_undo_add_page(undo, &mtr);
1933 if (!block) {
1934 err = DB_OUT_OF_FILE_SPACE;
1935 break;
1936 }
1937 }
1938 }
1939 }
1940
1941 mtr.commit();
1942 return err;
1943 }
1944
1945 /***********************************************************************//**
1946 Writes information to an undo log about an insert, update, or a delete marking
1947 of a clustered index record. This information is used in a rollback of the
1948 transaction and in consistent reads that must look to the history of this
1949 transaction.
1950 @return DB_SUCCESS or error code */
1951 dberr_t
trx_undo_report_row_operation(que_thr_t * thr,dict_index_t * index,const dtuple_t * clust_entry,const upd_t * update,ulint cmpl_info,const rec_t * rec,const rec_offs * offsets,roll_ptr_t * roll_ptr)1952 trx_undo_report_row_operation(
1953 /*==========================*/
1954 que_thr_t* thr, /*!< in: query thread */
1955 dict_index_t* index, /*!< in: clustered index */
1956 const dtuple_t* clust_entry, /*!< in: in the case of an insert,
1957 index entry to insert into the
1958 clustered index; in updates,
1959 may contain a clustered index
1960 record tuple that also contains
1961 virtual columns of the table;
1962 otherwise, NULL */
1963 const upd_t* update, /*!< in: in the case of an update,
1964 the update vector, otherwise NULL */
1965 ulint cmpl_info, /*!< in: compiler info on secondary
1966 index updates */
1967 const rec_t* rec, /*!< in: case of an update or delete
1968 marking, the record in the clustered
1969 index; NULL if insert */
1970 const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
1971 roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
1972 undo log record */
1973 {
1974 trx_t* trx;
1975 mtr_t mtr;
1976 #ifdef UNIV_DEBUG
1977 int loop_count = 0;
1978 #endif /* UNIV_DEBUG */
1979
1980 ut_a(dict_index_is_clust(index));
1981 ut_ad(!update || rec);
1982 ut_ad(!rec || rec_offs_validate(rec, index, offsets));
1983 ut_ad(!srv_read_only_mode);
1984
1985 trx = thr_get_trx(thr);
1986 /* This function must not be invoked during rollback
1987 (of a TRX_STATE_PREPARE transaction or otherwise). */
1988 ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
1989 ut_ad(!trx->in_rollback);
1990
1991 mtr.start();
1992 trx_undo_t** pundo;
1993 trx_rseg_t* rseg;
1994 const bool is_temp = index->table->is_temporary();
1995
1996 if (is_temp) {
1997 mtr.set_log_mode(MTR_LOG_NO_REDO);
1998
1999 rseg = trx->get_temp_rseg();
2000 pundo = &trx->rsegs.m_noredo.undo;
2001 } else {
2002 ut_ad(!trx->read_only);
2003 ut_ad(trx->id);
2004 pundo = &trx->rsegs.m_redo.undo;
2005 rseg = trx->rsegs.m_redo.rseg;
2006 }
2007
2008 dberr_t err;
2009 buf_block_t* undo_block = trx_undo_assign_low(trx, rseg, pundo,
2010 &err, &mtr);
2011 trx_undo_t* undo = *pundo;
2012
2013 ut_ad((err == DB_SUCCESS) == (undo_block != NULL));
2014 if (UNIV_UNLIKELY(undo_block == NULL)) {
2015 goto err_exit;
2016 }
2017
2018 ut_ad(undo != NULL);
2019
2020 do {
2021 ulint offset = !rec
2022 ? trx_undo_page_report_insert(
2023 undo_block, trx, index, clust_entry, &mtr)
2024 : trx_undo_page_report_modify(
2025 undo_block, trx, index, rec, offsets, update,
2026 cmpl_info, clust_entry, &mtr);
2027
2028 if (UNIV_UNLIKELY(offset == 0)) {
2029 if (!trx_undo_erase_page_end(undo_block->frame)) {
2030 /* The record did not fit on an empty
2031 undo page. Discard the freshly allocated
2032 page and return an error. */
2033
2034 /* When we remove a page from an undo
2035 log, this is analogous to a
2036 pessimistic insert in a B-tree, and we
2037 must reserve the counterpart of the
2038 tree latch, which is the rseg
2039 mutex. We must commit the mini-transaction
2040 first, because it may be holding lower-level
2041 latches, such as SYNC_FSP and SYNC_FSP_PAGE. */
2042
2043 mtr.commit();
2044 mtr.start();
2045 if (is_temp) {
2046 mtr.set_log_mode(MTR_LOG_NO_REDO);
2047 }
2048
2049 mutex_enter(&rseg->mutex);
2050 trx_undo_free_last_page(undo, &mtr);
2051 mutex_exit(&rseg->mutex);
2052
2053 err = DB_UNDO_RECORD_TOO_BIG;
2054 goto err_exit;
2055 }
2056
2057 mtr.commit();
2058 } else {
2059 /* Success */
2060 undo->top_page_no = undo_block->page.id.page_no();
2061 mtr.commit();
2062 undo->top_offset = offset;
2063 undo->top_undo_no = trx->undo_no++;
2064 undo->guess_block = undo_block;
2065 ut_ad(!undo->empty());
2066
2067 if (!is_temp) {
2068 const undo_no_t limit = undo->top_undo_no;
2069 /* Determine if this is the first time
2070 when this transaction modifies a
2071 system-versioned column in this table. */
2072 trx_mod_table_time_t& time
2073 = trx->mod_tables.insert(
2074 trx_mod_tables_t::value_type(
2075 index->table, limit))
2076 .first->second;
2077 ut_ad(time.valid(limit));
2078
2079 if (!time.is_versioned()
2080 && index->table->versioned_by_id()
2081 && (!rec /* INSERT */
2082 || (update
2083 && update->affects_versioned()))) {
2084 time.set_versioned(limit);
2085 }
2086 }
2087
2088 *roll_ptr = trx_undo_build_roll_ptr(
2089 !rec, rseg->id, undo->top_page_no, offset);
2090 return(DB_SUCCESS);
2091 }
2092
2093 ut_ad(undo_block->page.id.page_no() == undo->last_page_no);
2094
2095 /* We have to extend the undo log by one page */
2096
2097 ut_ad(++loop_count < 2);
2098 mtr.start();
2099
2100 if (is_temp) {
2101 mtr.set_log_mode(MTR_LOG_NO_REDO);
2102 }
2103
2104 undo_block = trx_undo_add_page(undo, &mtr);
2105
2106 DBUG_EXECUTE_IF("ib_err_ins_undo_page_add_failure",
2107 undo_block = NULL;);
2108 } while (UNIV_LIKELY(undo_block != NULL));
2109
2110 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2111 DB_OUT_OF_FILE_SPACE,
2112 //ER_INNODB_UNDO_LOG_FULL,
2113 "No more space left over in %s tablespace for allocating UNDO"
2114 " log pages. Please add new data file to the tablespace or"
2115 " check if filesystem is full or enable auto-extension for"
2116 " the tablespace",
2117 undo->rseg->space == fil_system.sys_space
2118 ? "system" : is_temp ? "temporary" : "undo");
2119
2120 /* Did not succeed: out of space */
2121 err = DB_OUT_OF_FILE_SPACE;
2122
2123 err_exit:
2124 mtr_commit(&mtr);
2125 return(err);
2126 }
2127
2128 /*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/
2129
2130 /** Copy an undo record to heap.
2131 @param[in] roll_ptr roll pointer to a record that exists
2132 @param[in,out] heap memory heap where copied */
2133 static
2134 trx_undo_rec_t*
trx_undo_get_undo_rec_low(roll_ptr_t roll_ptr,mem_heap_t * heap)2135 trx_undo_get_undo_rec_low(
2136 roll_ptr_t roll_ptr,
2137 mem_heap_t* heap)
2138 {
2139 trx_undo_rec_t* undo_rec;
2140 ulint rseg_id;
2141 ulint page_no;
2142 ulint offset;
2143 const page_t* undo_page;
2144 trx_rseg_t* rseg;
2145 ibool is_insert;
2146 mtr_t mtr;
2147
2148 trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no,
2149 &offset);
2150 ut_ad(page_no > FSP_FIRST_INODE_PAGE_NO);
2151 ut_ad(offset >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
2152 rseg = trx_sys.rseg_array[rseg_id];
2153 ut_ad(rseg->is_persistent());
2154
2155 mtr_start(&mtr);
2156
2157 undo_page = trx_undo_page_get_s_latched(
2158 page_id_t(rseg->space->id, page_no), &mtr);
2159
2160 undo_rec = trx_undo_rec_copy(undo_page + offset, heap);
2161
2162 mtr_commit(&mtr);
2163
2164 return(undo_rec);
2165 }
2166
2167 /** Copy an undo record to heap.
2168 @param[in] roll_ptr roll pointer to record
2169 @param[in,out] heap memory heap where copied
2170 @param[in] trx_id id of the trx that generated
2171 the roll pointer: it points to an
2172 undo log of this transaction
2173 @param[in] name table name
2174 @param[out] undo_rec own: copy of the record
2175 @retval true if the undo log has been
2176 truncated and we cannot fetch the old version
2177 @retval false if the undo log record is available
2178 NOTE: the caller must have latches on the clustered index page. */
2179 static MY_ATTRIBUTE((warn_unused_result))
2180 bool
trx_undo_get_undo_rec(roll_ptr_t roll_ptr,mem_heap_t * heap,trx_id_t trx_id,const table_name_t & name,trx_undo_rec_t ** undo_rec)2181 trx_undo_get_undo_rec(
2182 roll_ptr_t roll_ptr,
2183 mem_heap_t* heap,
2184 trx_id_t trx_id,
2185 const table_name_t& name,
2186 trx_undo_rec_t** undo_rec)
2187 {
2188 bool missing_history;
2189
2190 rw_lock_s_lock(&purge_sys.latch);
2191
2192 missing_history = purge_sys.view.changes_visible(trx_id, name);
2193 if (!missing_history) {
2194 *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap);
2195 }
2196
2197 rw_lock_s_unlock(&purge_sys.latch);
2198
2199 return(missing_history);
2200 }
2201
2202 #ifdef UNIV_DEBUG
2203 #define ATTRIB_USED_ONLY_IN_DEBUG
2204 #else /* UNIV_DEBUG */
2205 #define ATTRIB_USED_ONLY_IN_DEBUG MY_ATTRIBUTE((unused))
2206 #endif /* UNIV_DEBUG */
2207
2208 /*******************************************************************//**
2209 Build a previous version of a clustered index record. The caller must
2210 hold a latch on the index page of the clustered index record.
2211 @retval true if previous version was built, or if it was an insert
2212 or the table has been rebuilt
2213 @retval false if the previous version is earlier than purge_view,
2214 or being purged, which means that it may have been removed */
2215 bool
trx_undo_prev_version_build(const rec_t * index_rec ATTRIB_USED_ONLY_IN_DEBUG,mtr_t * index_mtr ATTRIB_USED_ONLY_IN_DEBUG,const rec_t * rec,dict_index_t * index,rec_offs * offsets,mem_heap_t * heap,rec_t ** old_vers,mem_heap_t * v_heap,dtuple_t ** vrow,ulint v_status)2216 trx_undo_prev_version_build(
2217 /*========================*/
2218 const rec_t* index_rec ATTRIB_USED_ONLY_IN_DEBUG,
2219 /*!< in: clustered index record in the
2220 index tree */
2221 mtr_t* index_mtr ATTRIB_USED_ONLY_IN_DEBUG,
2222 /*!< in: mtr which contains the latch to
2223 index_rec page and purge_view */
2224 const rec_t* rec, /*!< in: version of a clustered index record */
2225 dict_index_t* index, /*!< in: clustered index */
2226 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
2227 mem_heap_t* heap, /*!< in: memory heap from which the memory
2228 needed is allocated */
2229 rec_t** old_vers,/*!< out, own: previous version, or NULL if
2230 rec is the first inserted version, or if
2231 history data has been deleted (an error),
2232 or if the purge COULD have removed the version
2233 though it has not yet done so */
2234 mem_heap_t* v_heap, /* !< in: memory heap used to create vrow
2235 dtuple if it is not yet created. This heap
2236 diffs from "heap" above in that it could be
2237 prebuilt->old_vers_heap for selection */
2238 dtuple_t** vrow, /*!< out: virtual column info, if any */
2239 ulint v_status)
2240 /*!< in: status determine if it is going
2241 into this function by purge thread or not.
2242 And if we read "after image" of undo log */
2243 {
2244 trx_undo_rec_t* undo_rec = NULL;
2245 dtuple_t* entry;
2246 trx_id_t rec_trx_id;
2247 ulint type;
2248 undo_no_t undo_no;
2249 table_id_t table_id;
2250 trx_id_t trx_id;
2251 roll_ptr_t roll_ptr;
2252 upd_t* update;
2253 byte* ptr;
2254 ulint info_bits;
2255 ulint cmpl_info;
2256 bool dummy_extern;
2257 byte* buf;
2258
2259 ut_ad(!index->table->is_temporary());
2260 ut_ad(!rw_lock_own(&purge_sys.latch, RW_LOCK_S));
2261 ut_ad(mtr_memo_contains_page_flagged(index_mtr, index_rec,
2262 MTR_MEMO_PAGE_S_FIX
2263 | MTR_MEMO_PAGE_X_FIX));
2264 ut_ad(rec_offs_validate(rec, index, offsets));
2265 ut_a(index->is_primary());
2266
2267 roll_ptr = row_get_rec_roll_ptr(rec, index, offsets);
2268
2269 *old_vers = NULL;
2270
2271 if (trx_undo_roll_ptr_is_insert(roll_ptr)) {
2272 /* The record rec is the first inserted version */
2273 return(true);
2274 }
2275
2276 rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
2277
2278 ut_ad(!index->table->skip_alter_undo);
2279
2280 if (trx_undo_get_undo_rec(
2281 roll_ptr, heap, rec_trx_id, index->table->name,
2282 &undo_rec)) {
2283 if (v_status & TRX_UNDO_PREV_IN_PURGE) {
2284 /* We are fetching the record being purged */
2285 undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap);
2286 } else {
2287 /* The undo record may already have been purged,
2288 during purge or semi-consistent read. */
2289 return(false);
2290 }
2291 }
2292
2293 ptr = trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info,
2294 &dummy_extern, &undo_no, &table_id);
2295
2296 if (table_id != index->table->id) {
2297 /* The table should have been rebuilt, but purge has
2298 not yet removed the undo log records for the
2299 now-dropped old table (table_id). */
2300 return(true);
2301 }
2302
2303 ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr,
2304 &info_bits);
2305
2306 /* (a) If a clustered index record version is such that the
2307 trx id stamp in it is bigger than purge_sys.view, then the
2308 BLOBs in that version are known to exist (the purge has not
2309 progressed that far);
2310
2311 (b) if the version is the first version such that trx id in it
2312 is less than purge_sys.view, and it is not delete-marked,
2313 then the BLOBs in that version are known to exist (the purge
2314 cannot have purged the BLOBs referenced by that version
2315 yet).
2316
2317 This function does not fetch any BLOBs. The callers might, by
2318 possibly invoking row_ext_create() via row_build(). However,
2319 they should have all needed information in the *old_vers
2320 returned by this function. This is because *old_vers is based
2321 on the transaction undo log records. The function
2322 trx_undo_page_fetch_ext() will write BLOB prefixes to the
2323 transaction undo log that are at least as long as the longest
2324 possible column prefix in a secondary index. Thus, secondary
2325 index entries for *old_vers can be constructed without
2326 dereferencing any BLOB pointers. */
2327
2328 ptr = trx_undo_rec_skip_row_ref(ptr, index);
2329
2330 ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id,
2331 roll_ptr, info_bits,
2332 heap, &update);
2333 ut_a(ptr);
2334
2335 if (row_upd_changes_field_size_or_external(index, offsets, update)) {
2336 /* We should confirm the existence of disowned external data,
2337 if the previous version record is delete marked. If the trx_id
2338 of the previous record is seen by purge view, we should treat
2339 it as missing history, because the disowned external data
2340 might be purged already.
2341
2342 The inherited external data (BLOBs) can be freed (purged)
2343 after trx_id was committed, provided that no view was started
2344 before trx_id. If the purge view can see the committed
2345 delete-marked record by trx_id, no transactions need to access
2346 the BLOB. */
2347
2348 /* the row_upd_changes_disowned_external(update) call could be
2349 omitted, but the synchronization on purge_sys.latch is likely
2350 more expensive. */
2351
2352 if ((update->info_bits & REC_INFO_DELETED_FLAG)
2353 && row_upd_changes_disowned_external(update)) {
2354 bool missing_extern;
2355
2356 rw_lock_s_lock(&purge_sys.latch);
2357
2358 missing_extern = purge_sys.view.changes_visible(
2359 trx_id, index->table->name);
2360
2361 rw_lock_s_unlock(&purge_sys.latch);
2362
2363 if (missing_extern) {
2364 /* treat as a fresh insert, not to
2365 cause assertion error at the caller. */
2366 return(true);
2367 }
2368 }
2369
2370 /* We have to set the appropriate extern storage bits in the
2371 old version of the record: the extern bits in rec for those
2372 fields that update does NOT update, as well as the bits for
2373 those fields that update updates to become externally stored
2374 fields. Store the info: */
2375
2376 entry = row_rec_to_index_entry(rec, index, offsets, heap);
2377 /* The page containing the clustered index record
2378 corresponding to entry is latched in mtr. Thus the
2379 following call is safe. */
2380 if (!row_upd_index_replace_new_col_vals(entry, *index, update,
2381 heap)) {
2382 ut_a(v_status & TRX_UNDO_PREV_IN_PURGE);
2383 return false;
2384 }
2385
2386 /* Get number of externally stored columns in updated record */
2387 const ulint n_ext = dtuple_get_n_ext(entry);
2388
2389 buf = static_cast<byte*>(mem_heap_alloc(
2390 heap, rec_get_converted_size(index, entry, n_ext)));
2391
2392 *old_vers = rec_convert_dtuple_to_rec(buf, index,
2393 entry, n_ext);
2394 } else {
2395 buf = static_cast<byte*>(mem_heap_alloc(
2396 heap, rec_offs_size(offsets)));
2397
2398 *old_vers = rec_copy(buf, rec, offsets);
2399 rec_offs_make_valid(*old_vers, index, true, offsets);
2400 row_upd_rec_in_place(*old_vers, index, offsets, update, NULL);
2401 }
2402
2403 /* Set the old value (which is the after image of an update) in the
2404 update vector to dtuple vrow */
2405 if (v_status & TRX_UNDO_GET_OLD_V_VALUE) {
2406 row_upd_replace_vcol((dtuple_t*)*vrow, index->table, update,
2407 false, NULL, NULL);
2408 }
2409
2410 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2411 rec_offs offsets_dbg[REC_OFFS_NORMAL_SIZE];
2412 rec_offs_init(offsets_dbg);
2413 ut_a(!rec_offs_any_null_extern(
2414 *old_vers, rec_get_offsets(*old_vers, index, offsets_dbg,
2415 index->n_core_fields,
2416 ULINT_UNDEFINED, &heap)));
2417 #endif // defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2418
2419 if (vrow && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
2420 if (!(*vrow)) {
2421 *vrow = dtuple_create_with_vcol(
2422 v_heap ? v_heap : heap,
2423 dict_table_get_n_cols(index->table),
2424 dict_table_get_n_v_cols(index->table));
2425 dtuple_init_v_fld(*vrow);
2426 }
2427
2428 ut_ad(index->table->n_v_cols);
2429 trx_undo_read_v_cols(index->table, ptr, *vrow,
2430 v_status & TRX_UNDO_PREV_IN_PURGE);
2431 }
2432
2433 return(true);
2434 }
2435
2436 /** Read virtual column value from undo log
2437 @param[in] table the table
2438 @param[in] ptr undo log pointer
2439 @param[in,out] row the dtuple to fill
2440 @param[in] in_purge whether this is called by purge */
2441 void
trx_undo_read_v_cols(const dict_table_t * table,const byte * ptr,dtuple_t * row,bool in_purge)2442 trx_undo_read_v_cols(
2443 const dict_table_t* table,
2444 const byte* ptr,
2445 dtuple_t* row,
2446 bool in_purge)
2447 {
2448 const byte* end_ptr;
2449 bool first_v_col = true;
2450 bool is_undo_log = true;
2451
2452 end_ptr = ptr + mach_read_from_2(ptr);
2453 ptr += 2;
2454 while (ptr < end_ptr) {
2455 dfield_t* dfield;
2456 const byte* field;
2457 ulint field_no;
2458 ulint len;
2459 ulint orig_len;
2460 bool is_virtual;
2461
2462 field_no = mach_read_next_compressed(
2463 const_cast<const byte**>(&ptr));
2464
2465 is_virtual = (field_no >= REC_MAX_N_FIELDS);
2466
2467 if (is_virtual) {
2468 ptr = trx_undo_read_v_idx(
2469 table, ptr, first_v_col, &is_undo_log,
2470 &field_no);
2471 first_v_col = false;
2472 }
2473
2474 ptr = trx_undo_rec_get_col_val(
2475 ptr, &field, &len, &orig_len);
2476
2477 /* The virtual column is no longer indexed or does not exist.
2478 This needs to put after trx_undo_rec_get_col_val() so the
2479 undo ptr advances */
2480 if (field_no == ULINT_UNDEFINED) {
2481 ut_ad(is_virtual);
2482 continue;
2483 }
2484
2485 if (is_virtual) {
2486 dict_v_col_t* vcol = dict_table_get_nth_v_col(
2487 table, field_no);
2488
2489 dfield = dtuple_get_nth_v_field(row, vcol->v_pos);
2490
2491 if (!in_purge
2492 || dfield_get_type(dfield)->mtype == DATA_MISSING) {
2493 dict_col_copy_type(
2494 &vcol->m_col,
2495 dfield_get_type(dfield));
2496 dfield_set_data(dfield, field, len);
2497 }
2498 }
2499 }
2500
2501 ut_ad(ptr == end_ptr);
2502 }
2503