1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file trx/trx0rec.cc
28  Transaction undo log record
29 
30  Created 3/26/1996 Heikki Tuuri
31  *******************************************************/
32 
33 #include "trx0rec.h"
34 
35 #include <sys/types.h>
36 
37 #include "fsp0fsp.h"
38 #include "mach0data.h"
39 #include "mtr0log.h"
40 #include "trx0undo.h"
41 #ifndef UNIV_HOTBACKUP
42 #include "dict0dict.h"
43 #include "fsp0sysspace.h"
44 #include "lob0index.h"
45 #include "lob0inf.h"
46 #include "lob0lob.h"
47 #include "que0que.h"
48 #include "read0read.h"
49 #include "row0ext.h"
50 #include "row0mysql.h"
51 #include "row0row.h"
52 #include "row0upd.h"
53 #include "trx0purge.h"
54 #include "trx0rseg.h"
55 #include "ut0mem.h"
56 
57 #include "my_dbug.h"
58 
59 namespace dd {
60 class Spatial_reference_system;
61 }
62 
63 /*=========== UNDO LOG RECORD CREATION AND DECODING ====================*/
64 
65 /** Writes the mtr log entry of the inserted undo log record on the undo log
66  page. */
67 UNIV_INLINE
trx_undof_page_add_undo_rec_log(page_t * undo_page,ulint old_free,ulint new_free,mtr_t * mtr)68 void trx_undof_page_add_undo_rec_log(
69     page_t *undo_page, /*!< in: undo log page */
70     ulint old_free,    /*!< in: start offset of the inserted entry */
71     ulint new_free,    /*!< in: end offset of the entry */
72     mtr_t *mtr)        /*!< in: mtr */
73 {
74   byte *log_ptr = nullptr;
75   const byte *log_end;
76   ulint len;
77 
78   if (!mlog_open(mtr, 11 + 13 + MLOG_BUF_MARGIN, log_ptr)) {
79     return;
80   }
81 
82   log_end = &log_ptr[11 + 13 + MLOG_BUF_MARGIN];
83   log_ptr = mlog_write_initial_log_record_fast(undo_page, MLOG_UNDO_INSERT,
84                                                log_ptr, mtr);
85   len = new_free - old_free - 4;
86 
87   mach_write_to_2(log_ptr, len);
88   log_ptr += 2;
89 
90   if (log_ptr + len <= log_end) {
91     memcpy(log_ptr, undo_page + old_free + 2, len);
92     mlog_close(mtr, log_ptr + len);
93   } else {
94     mlog_close(mtr, log_ptr);
95     mlog_catenate_string(mtr, undo_page + old_free + 2, len);
96   }
97 }
98 #endif /* !UNIV_HOTBACKUP */
99 
100 /** Parses a redo log record of adding an undo log record.
101  @return end of log record or NULL */
trx_undo_parse_add_undo_rec(byte * ptr,byte * end_ptr,page_t * page)102 byte *trx_undo_parse_add_undo_rec(byte *ptr,     /*!< in: buffer */
103                                   byte *end_ptr, /*!< in: buffer end */
104                                   page_t *page)  /*!< in: page or NULL */
105 {
106   ulint len;
107   byte *rec;
108   ulint first_free;
109 
110   if (end_ptr < ptr + 2) {
111     return (nullptr);
112   }
113 
114   len = mach_read_from_2(ptr);
115   ptr += 2;
116 
117   if (end_ptr < ptr + len) {
118     return (nullptr);
119   }
120 
121   if (page == nullptr) {
122     return (ptr + len);
123   }
124 
125   first_free = mach_read_from_2(page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE);
126   rec = page + first_free;
127 
128   mach_write_to_2(rec, first_free + 4 + len);
129   mach_write_to_2(rec + 2 + len, first_free);
130 
131   mach_write_to_2(page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
132                   first_free + 4 + len);
133   ut_memcpy(rec + 2, ptr, len);
134 
135   return (ptr + len);
136 }
137 
138 #ifndef UNIV_HOTBACKUP
139 /** Calculates the free space left for extending an undo log record.
140  @return bytes left */
141 UNIV_INLINE
trx_undo_left(const page_t * page,const byte * ptr)142 ulint trx_undo_left(const page_t *page, /*!< in: undo log page */
143                     const byte *ptr)    /*!< in: pointer to page */
144 {
145   /* The '- 10' is a safety margin, in case we have some small
146   calculation error below */
147 
148 #ifdef UNIV_DEBUG
149   ut_ad(ptr >= page);
150   size_t diff = ptr - page;
151   size_t max_free = UNIV_PAGE_SIZE - 10 - FIL_PAGE_DATA_END;
152   ut_ad(diff < UNIV_PAGE_SIZE);
153   ut_ad(diff <= max_free);
154 #endif /* UNIV_DEBUG */
155 
156   return (UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END);
157 }
158 
trx_undo_max_free_space()159 size_t trx_undo_max_free_space() {
160   /* Starting from an empty undo page. The following calculation is based
161   on what free space is got from trx_undo_reuse_cached(), trx_undo_create()
162   and trx_undo_left(). Current simplified free_space would be
163   UNIV_PAGE_SIZE - 290. */
164   size_t free_space =
165       UNIV_PAGE_SIZE - (TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE +
166                         TRX_UNDO_LOG_XA_HDR_SIZE + FIL_PAGE_DATA_END + 10);
167 
168   /* Undo number, table id, undo log type and pointer to next.
169   Also refer to the beginning of trx_undo_page_report_insert() */
170   free_space -= (11 + 11 + 1 + 2);
171 
172   /* For simplification, the max record length should be
173   UNIV_PAGE_SIZE - 290 - 25 = UNIV_PAGE_SIZE - 315. */
174 
175   return (free_space);
176 }
177 
178 /** Set the next and previous pointers in the undo page for the undo record
179  that was written to ptr. Update the first free value by the number of bytes
180  written for this undo record.
181  @return offset of the inserted entry on the page if succeeded, 0 if fail */
trx_undo_page_set_next_prev_and_add(page_t * undo_page,byte * ptr,mtr_t * mtr)182 static ulint trx_undo_page_set_next_prev_and_add(
183     page_t *undo_page, /*!< in/out: undo log page */
184     byte *ptr,         /*!< in: ptr up to where data has been
185                        written on this undo page. */
186     mtr_t *mtr)        /*!< in: mtr */
187 {
188   ulint first_free; /*!< offset within undo_page */
189   ulint end_of_rec; /*!< offset within undo_page */
190   byte *ptr_to_first_free;
191   /* pointer within undo_page
192   that points to the next free
193   offset value within undo_page.*/
194 
195   ut_ad(ptr > undo_page);
196   ut_ad(ptr < undo_page + UNIV_PAGE_SIZE);
197 
198   if (UNIV_UNLIKELY(trx_undo_left(undo_page, ptr) < 2)) {
199     return (0);
200   }
201 
202   ptr_to_first_free = undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE;
203 
204   first_free = mach_read_from_2(ptr_to_first_free);
205 
206   /* Write offset of the previous undo log record */
207   mach_write_to_2(ptr, first_free);
208   ptr += 2;
209 
210   end_of_rec = ptr - undo_page;
211 
212   /* Write offset of the next undo log record */
213   mach_write_to_2(undo_page + first_free, end_of_rec);
214 
215   /* Update the offset to first free undo record */
216   mach_write_to_2(ptr_to_first_free, end_of_rec);
217 
218   /* Write this log entry to the UNDO log */
219   trx_undof_page_add_undo_rec_log(undo_page, first_free, end_of_rec, mtr);
220 
221   return (first_free);
222 }
223 
224 /** Virtual column undo log version. To distinguish it from a length value
225 in 5.7.8 undo log, it starts with 0xF1 */
226 static const ulint VIRTUAL_COL_UNDO_FORMAT_1 = 0xF1;
227 
228 /** Decide if the following undo log record is a multi-value virtual column
229 @param[in]      undo_rec        undo log record
230 @return true if this is a multi-value virtual column log, otherwise false */
trx_undo_rec_is_multi_value(const byte * undo_rec)231 bool trx_undo_rec_is_multi_value(const byte *undo_rec) {
232   return (Multi_value_logger::is_multi_value_log(undo_rec));
233 }
234 
235 /** Write virtual column index info (index id and column position in index)
236 to the undo log
237 @param[in,out]	undo_page	undo log page
238 @param[in]	table           the table
239 @param[in]	pos		the virtual column position
240 @param[in]      ptr             undo log record being written
241 @param[in]	first_v_col	whether this is the first virtual column
242                                 which could start with a version marker
243 @return new undo log pointer */
trx_undo_log_v_idx(page_t * undo_page,const dict_table_t * table,ulint pos,byte * ptr,bool first_v_col)244 static byte *trx_undo_log_v_idx(page_t *undo_page, const dict_table_t *table,
245                                 ulint pos, byte *ptr, bool first_v_col) {
246   ut_ad(pos < table->n_v_def);
247   dict_v_col_t *vcol = dict_table_get_nth_v_col(table, pos);
248 
249   ulint n_idx = vcol->v_indexes->size();
250   byte *old_ptr;
251 
252   ut_ad(n_idx > 0);
253 
254   /* Size to reserve, max 5 bytes for each index id and position, plus
255   5 bytes for num of indexes, 2 bytes for write total length.
256   1 byte for undo log record format version marker */
257   ulint size = n_idx * (5 + 5) + 5 + 2 + (first_v_col ? 1 : 0);
258 
259   if (trx_undo_left(undo_page, ptr) < size) {
260     return (nullptr);
261   }
262 
263   if (first_v_col) {
264     /* write the version marker */
265     mach_write_to_1(ptr, VIRTUAL_COL_UNDO_FORMAT_1);
266 
267     ptr += 1;
268   }
269 
270   old_ptr = ptr;
271 
272   ptr += 2;
273 
274   ptr += mach_write_compressed(ptr, n_idx);
275 
276   dict_v_idx_list::iterator it;
277 
278   for (it = vcol->v_indexes->begin(); it != vcol->v_indexes->end(); ++it) {
279     dict_v_idx_t v_index = *it;
280 
281     ptr += mach_write_compressed(ptr, static_cast<ulint>(v_index.index->id));
282 
283     ptr += mach_write_compressed(ptr, v_index.nth_field);
284   }
285 
286   mach_write_to_2(old_ptr, ptr - old_ptr);
287 
288   return (ptr);
289 }
290 
291 /** Read virtual column index from undo log, and verify the column is still
292 indexed, and return its position
293 @param[in]	table		the table
294 @param[in]	ptr		undo log pointer
295 @param[out]	col_pos		the column number or ULINT_UNDEFINED
296                                 if the column is not indexed any more
297 @return remaining part of undo log record after reading these values */
trx_undo_read_v_idx_low(const dict_table_t * table,const byte * ptr,ulint * col_pos)298 static const byte *trx_undo_read_v_idx_low(const dict_table_t *table,
299                                            const byte *ptr, ulint *col_pos) {
300   ulint len = mach_read_from_2(ptr);
301   const byte *old_ptr = ptr;
302 
303   *col_pos = ULINT_UNDEFINED;
304 
305   ptr += 2;
306 
307   ulint num_idx = mach_read_next_compressed(&ptr);
308 
309   ut_ad(num_idx > 0);
310 
311   const dict_index_t *clust_index = table->first_index();
312 
313   for (ulint i = 0; i < num_idx; i++) {
314     space_index_t id = mach_read_next_compressed(&ptr);
315     ulint pos = mach_read_next_compressed(&ptr);
316     const dict_index_t *index = clust_index->next();
317 
318     while (index != nullptr) {
319       /* Return if we find a matching index.
320       TODO: in the future, it might be worth to add
321       checks on other indexes */
322       if (index->id == id) {
323         const dict_col_t *col = index->get_col(pos);
324         ut_ad(col->is_virtual());
325         const dict_v_col_t *vcol = reinterpret_cast<const dict_v_col_t *>(col);
326         *col_pos = vcol->v_pos;
327         return (old_ptr + len);
328       }
329 
330       index = index->next();
331     }
332   }
333 
334   return (old_ptr + len);
335 }
336 
337 /** Read virtual column index from undo log or online log if the log
338 contains such info, and in the undo log case, verify the column is
339 still indexed, and output its position
340 @param[in]	table		the table
341 @param[in]	ptr		undo log pointer
342 @param[in]	first_v_col	if this is the first virtual column, which
343                                 has the version marker
344 @param[in,out]	is_undo_log	this function is used to parse both undo log,
345                                 and online log for virtual columns. So
346                                 check to see if this is undo log. When
347                                 first_v_col is true, is_undo_log is output,
348                                 when first_v_col is false, is_undo_log is input
349 @param[in,out]	field_no	the column number
350 @return remaining part of undo log record after reading these values */
trx_undo_read_v_idx(const dict_table_t * table,const byte * ptr,bool first_v_col,bool * is_undo_log,ulint * field_no)351 const byte *trx_undo_read_v_idx(const dict_table_t *table, const byte *ptr,
352                                 bool first_v_col, bool *is_undo_log,
353                                 ulint *field_no) {
354   /* Version marker only put on the first virtual column */
355   if (first_v_col) {
356     /* Undo log has the virtual undo log marker */
357     *is_undo_log = (mach_read_from_1(ptr) == VIRTUAL_COL_UNDO_FORMAT_1);
358 
359     if (*is_undo_log) {
360       ptr += 1;
361     }
362   }
363 
364   if (*is_undo_log) {
365     ptr = trx_undo_read_v_idx_low(table, ptr, field_no);
366   } else {
367     *field_no -= REC_MAX_N_FIELDS;
368   }
369 
370   return (ptr);
371 }
372 
373 /** Store the multi-value column information for undo log
374 @param[in,out]	undo_page	undo page to store the information
375 @param[in]	vfield		multi-value field information
376 @param[in,out]	ptr		pointer where to store the information
377 @return true if stored successfully, false if space is not enough */
trx_undo_store_multi_value(page_t * undo_page,const dfield_t * vfield,byte ** ptr)378 static bool trx_undo_store_multi_value(page_t *undo_page,
379                                        const dfield_t *vfield, byte **ptr) {
380   Multi_value_logger mv_logger(
381       static_cast<multi_value_data *>(dfield_get_data(vfield)),
382       dfield_get_len(vfield));
383   uint32_t log_len = mv_logger.get_log_len(false);
384 
385   if (trx_undo_left(undo_page, *ptr) < log_len) {
386     return (false);
387   }
388 
389   mv_logger.log(ptr);
390 
391   return (true);
392 }
393 
394 /** Reports in the undo log of an insert of virtual columns.
395 @param[in]	undo_page	undo log page
396 @param[in]	table		the table
397 @param[in]	row		dtuple contains the virtual columns
398 @param[in,out]	ptr		log ptr
399 @return true if write goes well, false if out of space */
trx_undo_report_insert_virtual(page_t * undo_page,dict_table_t * table,const dtuple_t * row,byte ** ptr)400 static bool trx_undo_report_insert_virtual(page_t *undo_page,
401                                            dict_table_t *table,
402                                            const dtuple_t *row, byte **ptr) {
403   byte *start = *ptr;
404   bool first_v_col = true;
405 
406   if (trx_undo_left(undo_page, *ptr) < 2) {
407     return (false);
408   }
409 
410   /* Reserve 2 bytes to write the number
411   of bytes the stored fields take in this
412   undo record */
413   *ptr += 2;
414 
415   for (ulint col_no = 0; col_no < dict_table_get_n_v_cols(table); col_no++) {
416     dfield_t *vfield = nullptr;
417 
418     const dict_v_col_t *col = dict_table_get_nth_v_col(table, col_no);
419 
420     if (col->m_col.ord_part) {
421       /* make sure enought space to write the length */
422       if (trx_undo_left(undo_page, *ptr) < 5) {
423         return (false);
424       }
425 
426       ulint pos = col_no;
427       pos += REC_MAX_N_FIELDS;
428       *ptr += mach_write_compressed(*ptr, pos);
429 
430       *ptr = trx_undo_log_v_idx(undo_page, table, col_no, *ptr, first_v_col);
431       first_v_col = false;
432 
433       if (*ptr == nullptr) {
434         return (false);
435       }
436 
437       vfield = dtuple_get_nth_v_field(row, col->v_pos);
438 
439       ulint flen = vfield->len;
440 
441       if (col->m_col.is_multi_value()) {
442         bool suc = trx_undo_store_multi_value(undo_page, vfield, ptr);
443 
444         if (!suc) {
445           return (false);
446         }
447       } else if (flen != UNIV_SQL_NULL) {
448         ulint max_len = dict_max_v_field_len_store_undo(table, col_no);
449 
450         if (flen > max_len) {
451           flen = max_len;
452         }
453 
454         if (trx_undo_left(undo_page, *ptr) < flen + 5) {
455           return (false);
456         }
457         *ptr += mach_write_compressed(*ptr, flen);
458 
459         ut_memcpy(*ptr, vfield->data, flen);
460         *ptr += flen;
461       } else {
462         if (trx_undo_left(undo_page, *ptr) < 5) {
463           return (false);
464         }
465 
466         *ptr += mach_write_compressed(*ptr, flen);
467       }
468     }
469   }
470 
471   /* Always mark the end of the log with 2 bytes length field */
472   mach_write_to_2(start, *ptr - start);
473 
474   return (true);
475 }
476 
477 /** Reports in the undo log of an insert of a clustered index record.
478  @return offset of the inserted entry on the page if succeed, 0 if fail */
trx_undo_page_report_insert(page_t * undo_page,trx_t * trx,dict_index_t * index,const dtuple_t * clust_entry,mtr_t * mtr)479 static ulint trx_undo_page_report_insert(
480     page_t *undo_page,           /*!< in: undo log page */
481     trx_t *trx,                  /*!< in: transaction */
482     dict_index_t *index,         /*!< in: clustered index */
483     const dtuple_t *clust_entry, /*!< in: index entry which will be
484                                  inserted to the clustered index */
485     mtr_t *mtr)                  /*!< in: mtr */
486 {
487   ulint first_free;
488   byte *ptr;
489   ulint i;
490 
491   ut_ad(index->is_clustered());
492   ut_ad(mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE) ==
493         TRX_UNDO_INSERT);
494 
495   first_free =
496       mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE);
497   ptr = undo_page + first_free;
498 
499   ut_ad(first_free <= UNIV_PAGE_SIZE);
500 
501   if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11) {
502     /* Not enough space for writing the general parameters */
503 
504     return (0);
505   }
506 
507   /* Reserve 2 bytes for the pointer to the next undo log record */
508   ptr += 2;
509 
510   /* Store first some general parameters to the undo log */
511   *ptr++ = TRX_UNDO_INSERT_REC;
512   ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
513   ptr += mach_u64_write_much_compressed(ptr, index->table->id);
514   /*----------------------------------------*/
515   /* Store then the fields required to uniquely determine the record
516   to be inserted in the clustered index */
517 
518   for (i = 0; i < dict_index_get_n_unique(index); i++) {
519     const dfield_t *field = dtuple_get_nth_field(clust_entry, i);
520     ulint flen = dfield_get_len(field);
521 
522     if (trx_undo_left(undo_page, ptr) < 5) {
523       return (0);
524     }
525 
526     ptr += mach_write_compressed(ptr, flen);
527 
528     if (flen != UNIV_SQL_NULL && flen != 0) {
529       if (trx_undo_left(undo_page, ptr) < flen) {
530         return (0);
531       }
532 
533       ut_memcpy(ptr, dfield_get_data(field), flen);
534       ptr += flen;
535     }
536   }
537 
538   if (index->table->n_v_cols) {
539     if (!trx_undo_report_insert_virtual(undo_page, index->table, clust_entry,
540                                         &ptr)) {
541       return (0);
542     }
543   }
544 
545   return (trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr));
546 }
547 
548 /** Reads from an undo log record the general parameters.
549  @return remaining part of undo log record after reading these values */
trx_undo_rec_get_pars(trx_undo_rec_t * undo_rec,ulint * type,ulint * cmpl_info,bool * updated_extern,undo_no_t * undo_no,table_id_t * table_id,type_cmpl_t & type_cmpl)550 byte *trx_undo_rec_get_pars(
551     trx_undo_rec_t *undo_rec, /*!< in: undo log record */
552     ulint *type,              /*!< out: undo record type:
553                               TRX_UNDO_INSERT_REC, ... */
554     ulint *cmpl_info,         /*!< out: compiler info, relevant only
555                               for update type records */
556     bool *updated_extern,     /*!< out: true if we updated an
557                               externally stored fild */
558     undo_no_t *undo_no,       /*!< out: undo log record number */
559     table_id_t *table_id,     /*!< out: table id */
560     type_cmpl_t &type_cmpl)   /*!< out: type compilation info */
561 {
562   const byte *ptr;
563 
564   ptr = undo_rec + 2;
565   ptr = type_cmpl.read(ptr);
566 
567   *updated_extern = type_cmpl.is_lob_updated();
568   *type = type_cmpl.type_info();
569   *cmpl_info = type_cmpl.cmpl_info();
570 
571   if (type_cmpl.is_lob_undo()) {
572     /* Reading the new 1-byte undo record flag. */
573     uint8_t undo_rec_flags = 0x00;
574 
575     undo_rec_flags = mach_read_from_1(ptr);
576     ptr++;
577 
578     ut_a(undo_rec_flags == 0x00);
579   }
580 
581   *undo_no = mach_read_next_much_compressed(&ptr);
582   *table_id = mach_read_next_much_compressed(&ptr);
583 
584   return (const_cast<byte *>(ptr));
585 }
586 
587 /** Reads from an undo log record the table ID
588 @param[in]	undo_rec	Undo log record
589 @return the table ID */
trx_undo_rec_get_table_id(const trx_undo_rec_t * undo_rec)590 table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *undo_rec) {
591   const byte *ptr = undo_rec + 2;
592   uint8_t type_cmpl = mach_read_from_1(ptr);
593 
594   const bool blob_undo = type_cmpl & TRX_UNDO_MODIFY_BLOB;
595 
596   if (blob_undo) {
597     /* The next record offset takes 2 bytes + 1 byte for
598     type_cmpl flag + 1 byte for the new flag. Total 4 bytes.
599     The new flag is currently unused and is available for
600     future use. */
601     ptr = undo_rec + 4;
602   } else {
603     ptr = undo_rec + 3;
604   }
605 
606   /* Skip the UNDO number */
607   mach_read_next_much_compressed(&ptr);
608 
609   /* Read the table ID */
610   return (mach_read_next_much_compressed(&ptr));
611 }
612 
613 /** Read from an undo log record of a multi-value virtual column.
614 @param[in]	ptr	pointer to remaining part of the undo record
615 @param[in,out]	field	stored field, nullptr if the col is no longer
616                         indexed or existing, in the latter case,
617                         this function will only skip the log
618 @param[in,out]	heap	memory heap
619 @return remaining part of undo log record after reading these values */
trx_undo_rec_get_multi_value(const byte * ptr,dfield_t * field,mem_heap_t * heap)620 const byte *trx_undo_rec_get_multi_value(const byte *ptr, dfield_t *field,
621                                          mem_heap_t *heap) {
622   if (field == nullptr) {
623     return (ptr + Multi_value_logger::read_log_len(ptr));
624   }
625 
626   return (Multi_value_logger::read(ptr, field, heap));
627 }
628 
629 /** Read from an undo log record a non-virtual column value.
630 @param[in,out]	ptr		pointer to remaining part of the undo record
631 @param[in,out]	field		stored field
632 @param[in,out]	len		length of the field, or UNIV_SQL_NULL
633 @param[in,out]	orig_len	original length of the locally stored part
634 of an externally stored column, or 0
635 @return remaining part of undo log record after reading these values */
trx_undo_rec_get_col_val(const byte * ptr,const byte ** field,ulint * len,ulint * orig_len)636 byte *trx_undo_rec_get_col_val(const byte *ptr, const byte **field, ulint *len,
637                                ulint *orig_len) {
638   *len = mach_read_next_compressed(&ptr);
639   *orig_len = 0;
640 
641   switch (*len) {
642     case UNIV_SQL_NULL:
643       *field = nullptr;
644       break;
645     case UNIV_EXTERN_STORAGE_FIELD:
646       *orig_len = mach_read_next_compressed(&ptr);
647       *len = mach_read_next_compressed(&ptr);
648       *field = ptr;
649       ptr += *len & ~SPATIAL_STATUS_MASK;
650 
651       ut_ad(*orig_len >= BTR_EXTERN_FIELD_REF_SIZE);
652       ut_ad(*len > *orig_len);
653       /* @see dtuple_convert_big_rec() */
654       ut_ad(*len >= BTR_EXTERN_FIELD_REF_SIZE);
655 
656       /* we do not have access to index->table here
657       ut_ad(dict_table_has_atomic_blobs(index->table)
658             || *len >= col->max_prefix
659             + BTR_EXTERN_FIELD_REF_SIZE);
660       */
661 
662       *len += UNIV_EXTERN_STORAGE_FIELD;
663       break;
664     default:
665       *field = ptr;
666       if (*len >= UNIV_EXTERN_STORAGE_FIELD) {
667         ptr += (*len - UNIV_EXTERN_STORAGE_FIELD) & ~SPATIAL_STATUS_MASK;
668       } else {
669         ptr += *len;
670       }
671   }
672 
673   return (const_cast<byte *>(ptr));
674 }
675 
676 /** Builds a row reference from an undo log record.
677  @return pointer to remaining part of undo record */
trx_undo_rec_get_row_ref(byte * ptr,dict_index_t * index,dtuple_t ** ref,mem_heap_t * heap)678 byte *trx_undo_rec_get_row_ref(
679     byte *ptr,           /*!< in: remaining part of a copy of an undo log
680                          record, at the start of the row reference;
681                          NOTE that this copy of the undo log record must
682                          be preserved as long as the row reference is
683                          used, as we do NOT copy the data in the
684                          record! */
685     dict_index_t *index, /*!< in: clustered index */
686     dtuple_t **ref,      /*!< out, own: row reference */
687     mem_heap_t *heap)    /*!< in: memory heap from which the memory
688                          needed is allocated */
689 {
690   ulint ref_len;
691   ulint i;
692 
693   ut_ad(index && ptr && ref && heap);
694   ut_a(index->is_clustered());
695 
696   ref_len = dict_index_get_n_unique(index);
697 
698   *ref = dtuple_create(heap, ref_len);
699 
700   dict_index_copy_types(*ref, index, ref_len);
701 
702   for (i = 0; i < ref_len; i++) {
703     dfield_t *dfield;
704     const byte *field;
705     ulint len;
706     ulint orig_len;
707 
708     dfield = dtuple_get_nth_field(*ref, i);
709 
710     ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
711 
712     dfield_set_data(dfield, field, len);
713   }
714 
715   return (ptr);
716 }
717 
718 /** Skips a row reference from an undo log record.
719  @return pointer to remaining part of undo record */
trx_undo_rec_skip_row_ref(byte * ptr,const dict_index_t * index)720 static byte *trx_undo_rec_skip_row_ref(
721     byte *ptr,                 /*!< in: remaining part in update undo log
722                                record, at the start of the row reference */
723     const dict_index_t *index) /*!< in: clustered index */
724 {
725   ulint ref_len;
726   ulint i;
727 
728   ut_ad(index && ptr);
729   ut_a(index->is_clustered());
730 
731   ref_len = dict_index_get_n_unique(index);
732 
733   for (i = 0; i < ref_len; i++) {
734     const byte *field;
735     ulint len;
736     ulint orig_len;
737 
738     ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
739   }
740 
741   return (ptr);
742 }
743 
744 #ifdef UNIV_DEBUG
745 #define trx_undo_page_fetch_ext(trx, index, ext_buf, prefix_len, page_size, \
746                                 field, is_sdi, len)                         \
747   trx_undo_page_fetch_ext_func(trx, index, ext_buf, prefix_len, page_size,  \
748                                field, is_sdi, len)
749 
750 #define trx_undo_page_report_modify_ext(trx, index, ptr, ext_buf, prefix_len, \
751                                         page_size, field, len, is_sdi,        \
752                                         spatial_status)                       \
753   trx_undo_page_report_modify_ext_func(trx, index, ptr, ext_buf, prefix_len,  \
754                                        page_size, field, len, is_sdi,         \
755                                        spatial_status)
756 #else /* UNIV_DEBUG */
757 #define trx_undo_page_fetch_ext(trx, index, ext_buf, prefix_len, page_size, \
758                                 field, is_sdi, len)                         \
759   trx_undo_page_fetch_ext_func(trx, index, ext_buf, prefix_len, page_size,  \
760                                field, len)
761 
762 #define trx_undo_page_report_modify_ext(trx, index, ptr, ext_buf, prefix_len, \
763                                         page_size, field, len, is_sdi,        \
764                                         spatial_status)                       \
765   trx_undo_page_report_modify_ext_func(trx, index, ptr, ext_buf, prefix_len,  \
766                                        page_size, field, len, spatial_status)
767 #endif /* UNIV_DEBUG */
768 
769 /** Fetch a prefix of an externally stored column, for writing to the undo
770 log of an update or delete marking of a clustered index record.
771 @param[in]	trx		transaction object
772 @param[in]	index		the clustered index object
773 @param[out]	ext_buf		buffer to hold the prefix data and BLOB pointer
774 @param[in]	prefix_len	prefix size to store in the undo log
775 @param[in]	page_size	page size
776 @param[in]	field		an externally stored column */
777 #ifdef UNIV_DEBUG
778 /**
779 @param[in]	is_sdi		true for SDI indexes */
780 #endif /* UNIV_DEBUG */
781 /**
782 @param[in,out]	len		input: length of field; output: used length of
783 ext_buf
784 @return ext_buf */
trx_undo_page_fetch_ext_func(trx_t * trx,dict_index_t * index,byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte * field,bool is_sdi,ulint * len)785 static byte *trx_undo_page_fetch_ext_func(trx_t *trx, dict_index_t *index,
786                                           byte *ext_buf, ulint prefix_len,
787                                           const page_size_t &page_size,
788                                           const byte *field,
789 #ifdef UNIV_DEBUG
790                                           bool is_sdi,
791 #endif /* UNIV_DEBUG */
792                                           ulint *len) {
793   /* Fetch the BLOB. */
794   ulint ext_len = lob::btr_copy_externally_stored_field_prefix(
795       trx, index, ext_buf, prefix_len, page_size, field, is_sdi, *len);
796 
797 #ifdef UNIV_DEBUG
798   if (ext_len == 0) {
799     byte *field_ref = const_cast<byte *>(field) + (*len) - lob::ref_t::SIZE;
800     lob::ref_t ref(field_ref);
801     lob::ref_mem_t ref_mem;
802     ref.parse(ref_mem);
803     lob::print(trx, index, std::cout, ref, true);
804   }
805 #endif /* UNIV_DEBUG */
806 
807   /* BLOBs should always be nonempty. */
808   ut_a(ext_len > 0);
809   /* Append the BLOB pointer to the prefix. */
810   memcpy(ext_buf + ext_len, field + *len - BTR_EXTERN_FIELD_REF_SIZE,
811          BTR_EXTERN_FIELD_REF_SIZE);
812   *len = ext_len + BTR_EXTERN_FIELD_REF_SIZE;
813   return (ext_buf);
814 }
815 
816 /** Writes to the undo log a prefix of an externally stored column.
817 @param[in]	trx		transaction object
818 @param[in]	index		the clustered index object
819 @param[out]	ptr		undo log position, at least 15 bytes must be
820                                 available
821 @param[out]	ext_buf		a buffer of DICT_MAX_FIELD_LEN_BY_FORMAT()
822                                 size, or NULL when should not fetch a longer
823                                 prefix
824 @param[in]	prefix_len	prefix size to store in the undo log
825 @param[in]	page_size	page size
826 @param[in,out]	field		the locally stored part of the externally
827 stored column
828 @param[in,out]	len		length of field, in bytes */
829 #ifdef UNIV_DEBUG
830 /**
831 @param[in]	is_sdi		true for SDI indexes */
832 #endif /* UNIV_DEBUG */
833 /**
834 @param[in]	spatial_status	whether the column is used by spatial index or
835                                 regular index
836 @return undo log position */
trx_undo_page_report_modify_ext_func(trx_t * trx,dict_index_t * index,byte * ptr,byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte ** field,ulint * len,bool is_sdi,spatial_status_t spatial_status)837 static byte *trx_undo_page_report_modify_ext_func(
838     trx_t *trx, dict_index_t *index, byte *ptr, byte *ext_buf, ulint prefix_len,
839     const page_size_t &page_size, const byte **field, ulint *len,
840 #ifdef UNIV_DEBUG
841     bool is_sdi,
842 #endif /* UNIV_DEBUG */
843     spatial_status_t spatial_status) {
844   ulint spatial_len = 0;
845 
846   switch (spatial_status) {
847     case SPATIAL_UNKNOWN:
848     case SPATIAL_NONE:
849       break;
850 
851     case SPATIAL_MIXED:
852     case SPATIAL_ONLY:
853       spatial_len = DATA_MBR_LEN;
854       break;
855   }
856 
857   /* Encode spatial status into length. */
858   spatial_len |= spatial_status << SPATIAL_STATUS_SHIFT;
859 
860   if (spatial_status == SPATIAL_ONLY) {
861     /* If the column is only used by gis index, log its
862     MBR is enough.*/
863     ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD + spatial_len);
864 
865     return (ptr);
866   }
867 
868   if (ext_buf) {
869     ut_a(prefix_len > 0);
870 
871     /* If an ordering column is externally stored, we will
872     have to store a longer prefix of the field.  In this
873     case, write to the log a marker followed by the
874     original length and the real length of the field. */
875     ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD);
876 
877     ptr += mach_write_compressed(ptr, *len);
878 
879     *field = trx_undo_page_fetch_ext(trx, index, ext_buf, prefix_len, page_size,
880                                      *field, is_sdi, len);
881 
882     ptr += mach_write_compressed(ptr, *len + spatial_len);
883   } else {
884     ptr += mach_write_compressed(
885         ptr, UNIV_EXTERN_STORAGE_FIELD + *len + spatial_len);
886   }
887 
888   return (ptr);
889 }
890 
891 /** Get MBR from a Geometry column stored externally
892 @param[in]	trx		transaction object
893 @param[in]	index		the clustered index object
894 @param[out]	mbr		MBR to fill
895 @param[in]	page_size	table pagesize
896 @param[in]	field		field contain the geometry data
897 @param[in,out]	len		length of field, in bytes
898 @param[in]	srs		Spatial reference system of R-tree.
899 */
trx_undo_get_mbr_from_ext(trx_t * trx,dict_index_t * index,double * mbr,const page_size_t & page_size,const byte * field,ulint * len,const dd::Spatial_reference_system * srs)900 static void trx_undo_get_mbr_from_ext(trx_t *trx, dict_index_t *index,
901                                       double *mbr, const page_size_t &page_size,
902                                       const byte *field, ulint *len,
903                                       const dd::Spatial_reference_system *srs) {
904   uchar *dptr = nullptr;
905   ulint dlen;
906   mem_heap_t *heap = mem_heap_create(100);
907 
908   dptr = lob::btr_copy_externally_stored_field(
909       trx, index, &dlen, nullptr, field, page_size, *len, false, heap);
910 
911   if (dlen <= GEO_DATA_HEADER_SIZE) {
912     for (uint i = 0; i < SPDIMS; ++i) {
913       mbr[i * 2] = DBL_MAX;
914       mbr[i * 2 + 1] = -DBL_MAX;
915     }
916   } else {
917     get_mbr_from_store(srs, dptr, static_cast<uint>(dlen), SPDIMS, mbr,
918                        nullptr);
919   }
920 
921   mem_heap_free(heap);
922 }
923 
trx_undo_read_blob_update(const byte * undo_ptr,upd_field_t * uf,lob::undo_vers_t * lob_undo)924 static const byte *trx_undo_read_blob_update(const byte *undo_ptr,
925                                              upd_field_t *uf,
926                                              lob::undo_vers_t *lob_undo) {
927   DBUG_TRACE;
928 
929   /* Read one byte of flags. */
930   uint8_t flag = *undo_ptr;
931   ut_a(flag == 0x00);
932   undo_ptr++;
933 
934   const ulint field_no = uf->field_no;
935 
936   /* Read the size of the vector. */
937   ulint N = mach_read_next_compressed(&undo_ptr);
938 
939   if (N == 0) {
940     return undo_ptr;
941   }
942 
943   /* Read the LOB first page number*/
944   uf->lob_first_page_no = mach_read_next_compressed(&undo_ptr);
945   uf->lob_version = mach_read_next_compressed(&undo_ptr);
946   uf->last_trx_id = mach_read_next_compressed(&undo_ptr);
947   uf->last_undo_no = mach_read_next_compressed(&undo_ptr);
948 
949   for (size_t i = 0; i < N; ++i) {
950     Lob_diff lob_diff(uf->heap);
951     lob::undo_seq_t *lob_seq = nullptr;
952     lob::undo_data_t lob_undo_data;
953 
954     if (lob_undo != nullptr) {
955       lob_seq = lob_undo->get_undo_sequence(field_no);
956     }
957 
958     /* Read the offset. */
959     undo_ptr = lob_diff.read_offset(undo_ptr);
960     lob_undo_data.m_offset = lob_diff.m_offset;
961 
962     /* Read the length. */
963     undo_ptr = lob_diff.read_length(undo_ptr);
964 
965     /* Read the old data. */
966     lob_diff.set_old_data(undo_ptr);
967 
968     /* Copy the data only if the lob_undo is not null. */
969     if (lob_seq != nullptr) {
970       undo_ptr = lob_undo_data.copy_old_data(undo_ptr, lob_diff.m_length);
971     } else {
972       undo_ptr += lob_diff.m_length;
973     }
974 
975     lob_undo_data.m_version = uf->lob_version;
976     lob_undo_data.m_page_no = uf->lob_first_page_no;
977 
978     if (lob_seq != nullptr) {
979       lob_seq->m_field_no = field_no;
980       lob_seq->push_back(lob_undo_data);
981     }
982 
983     /* Read the number of LOB index entries modified. */
984     ulint n_entry = mach_read_next_compressed(&undo_ptr);
985 
986     ut_ad(n_entry == 1 || n_entry == 2);
987 
988     for (size_t i = 0; i < n_entry; ++i) {
989       lob_index_diff_t idx_diff;
990 
991       /* Read the modifier trx id of the LOB index entry. */
992       idx_diff.m_modifier_trxid = mach_read_next_compressed(&undo_ptr);
993 
994       /* Write the modifier trx undo_no of the LOB index entry. */
995       idx_diff.m_modifier_undo_no = mach_read_next_compressed(&undo_ptr);
996 
997       lob_diff.m_idx_diffs->push_back(idx_diff);
998     }
999 
1000     uf->push_lob_diff(lob_diff);
1001     DBUG_LOG("lob", lob_diff);
1002   }
1003 
1004   return undo_ptr;
1005 }
1006 
1007 /** Write the partial update information about LOBs to the undo log record.
1008 @param[in]	undo_page	the undo page
1009 @param[in]	index		the clustered index where LOBs are modified.
1010 @param[in]	undo_ptr	the location within undo page where next
1011                                 part of undo record is to be written.
1012 @param[in]	field		the LOB data
1013 @param[in]	flen		length of LOB data in bytes
1014 @param[in]	update		the update vector containing partial update
1015                                 information on LOBs.
1016 @param[in]	fld		the field to which the LOB belongs.
1017 @param[in]	mtr		the mini transaction context.
1018 @return the undo record pointer where new data can be written.
1019 @return nullptr when there is not enough space in undo page. */
trx_undo_report_blob_update(page_t * undo_page,dict_index_t * index,byte * undo_ptr,const byte * field,ulint flen,const upd_t * update,upd_field_t * fld,mtr_t * mtr)1020 static byte *trx_undo_report_blob_update(page_t *undo_page, dict_index_t *index,
1021                                          byte *undo_ptr, const byte *field,
1022                                          ulint flen, const upd_t *update,
1023                                          upd_field_t *fld, mtr_t *mtr) {
1024   DBUG_TRACE;
1025 
1026   /* Access the LOB reference object. */
1027   byte *field_ref = const_cast<byte *>(field) + flen - lob::ref_t::SIZE;
1028 
1029   lob::ref_t ref(field_ref);
1030 
1031   /* Check if enough space for flag and vector length. */
1032   if (trx_undo_left(undo_page, undo_ptr) < 6) {
1033     return nullptr;
1034   }
1035 
1036   /* Write one byte of flags. */
1037   *undo_ptr = 0x00;
1038   undo_ptr++;
1039 
1040   if (fld == nullptr || update == nullptr) {
1041     /* Write the size of the vector as 0. */
1042     undo_ptr += mach_write_compressed(undo_ptr, 0);
1043     return undo_ptr;
1044   }
1045 
1046   /* Find the Binary_diff object */
1047   const Binary_diff_vector *bdiff_v =
1048       update->get_binary_diff_by_field_no(fld->field_no);
1049 
1050   if (bdiff_v == nullptr || !update->is_partially_updated(fld->field_no)) {
1051     /* Write the size of the vector as 0. */
1052     undo_ptr += mach_write_compressed(undo_ptr, 0);
1053     return undo_ptr;
1054   }
1055 
1056   const ulint bytes_changed = upd_t::get_total_modified_bytes(*bdiff_v);
1057 
1058   /* Whether the update to the LOB can be considered as a small change. */
1059   const bool small_change =
1060       (bytes_changed <= lob::ref_t::LOB_SMALL_CHANGE_THRESHOLD);
1061 
1062   if (!small_change) {
1063     /* This is not a small change.  So write the size of the vector as
1064     0 and bailout. */
1065     undo_ptr += mach_write_compressed(undo_ptr, 0);
1066     return undo_ptr;
1067   }
1068 
1069   const page_size_t page_size = dict_table_page_size(index->table);
1070   if (page_size.is_compressed()) {
1071     /* This is compressed LOB. Not yet supporting. */
1072     undo_ptr += mach_write_compressed(undo_ptr, 0);
1073     return undo_ptr;
1074   }
1075 
1076   trx_id_t last_trx_id;
1077   undo_no_t last_undo_no;
1078   ulint lob_version;
1079   page_type_t f_page_type;
1080 
1081   /* Obtain LOB info. */
1082   lob::get_info(ref, index, lob_version, last_trx_id, last_undo_no, f_page_type,
1083                 mtr);
1084 
1085   /* Only the page type FIL_PAGE_TYPE_LOB_FIRST is supported here. */
1086   if (f_page_type != FIL_PAGE_TYPE_LOB_FIRST) {
1087     undo_ptr += mach_write_compressed(undo_ptr, 0);
1088     return undo_ptr;
1089   }
1090 
1091   /* Only for small changes to the BLOB, we do regular undo logging. */
1092   size_t N = bdiff_v->size();
1093 
1094   /* Write the size of the vector. */
1095   undo_ptr += mach_write_compressed(undo_ptr, N);
1096 
1097   if (N == 0) {
1098     return undo_ptr;
1099   }
1100 
1101   /* Check if there is enough space for lob_version, last_trx_id
1102   and last_undo_no. */
1103   if (trx_undo_left(undo_page, undo_ptr) < 20) {
1104     return nullptr;
1105   }
1106 
1107   /* Write the LOB first page number*/
1108   undo_ptr += mach_write_compressed(undo_ptr, ref.page_no());
1109 
1110   /* Write the lob version number */
1111   undo_ptr += mach_write_compressed(undo_ptr, lob_version);
1112 
1113   /* Write the last trx id */
1114   undo_ptr += mach_write_compressed(undo_ptr, last_trx_id);
1115 
1116   /* Write the last undo_no */
1117   undo_ptr += mach_write_compressed(undo_ptr, last_undo_no);
1118 
1119   for (size_t i = 0; i < N; ++i) {
1120     const Binary_diff &bdiff = bdiff_v->at(i);
1121 
1122     if (trx_undo_left(undo_page, undo_ptr) < 10) {
1123       return nullptr;
1124     }
1125 
1126     /* Write the offset. */
1127     undo_ptr += mach_write_compressed(undo_ptr, bdiff.offset());
1128 
1129     /* Write the length. */
1130     undo_ptr += mach_write_compressed(undo_ptr, bdiff.length());
1131 
1132     if (trx_undo_left(undo_page, undo_ptr) < bdiff.length()) {
1133       return nullptr;
1134     }
1135 
1136     /* Write the old data. */
1137     ut_memcpy(undo_ptr, bdiff.old_data(fld->mysql_field), bdiff.length());
1138     undo_ptr += bdiff.length();
1139 
1140     lob::List_iem_t entries;
1141 
1142     /* Find the affected LOB index entries. */
1143     lob::get_affected_index_entries(ref, index, bdiff, entries, mtr);
1144 
1145     ulint n_entry = entries.size();
1146 
1147     ut_ad(n_entry == 1 || n_entry == 2);
1148 
1149     /* Check if there is enough space for n_entry */
1150     if (trx_undo_left(undo_page, undo_ptr) < 5) {
1151       return nullptr;
1152     }
1153 
1154     /* Write the number of LOB index entries modified. */
1155     undo_ptr += mach_write_compressed(undo_ptr, n_entry);
1156 
1157     for (lob::List_iem_t::iterator iter = entries.begin();
1158          iter != entries.end(); ++iter) {
1159       if (trx_undo_left(undo_page, undo_ptr) < 10) {
1160         return nullptr;
1161       }
1162 
1163       /* Write the modifier trx id of the LOB index entry. */
1164       undo_ptr += mach_write_compressed(undo_ptr, iter->m_trx_id_modifier);
1165 
1166       /* Write the modifier trx undo_no of the LOB index entry. */
1167       undo_ptr += mach_write_compressed(undo_ptr, iter->m_undo_no_modifier);
1168     }
1169   }
1170 
1171   return undo_ptr;
1172 }
1173 
1174 /**********************************************************************/ /**
1175  Reports in the undo log of an update or delete marking of a clustered index
1176  record.
1177  @return byte offset of the inserted undo log entry on the page if
1178  succeed, 0 if fail */
trx_undo_page_report_modify(page_t * undo_page,trx_t * trx,dict_index_t * index,const rec_t * rec,const ulint * offsets,const upd_t * update,ulint cmpl_info,const dtuple_t * row,mtr_t * mtr)1179 static ulint trx_undo_page_report_modify(
1180     /*========================*/
1181     page_t *undo_page,    /*!< in: undo log page */
1182     trx_t *trx,           /*!< in: transaction */
1183     dict_index_t *index,  /*!< in: clustered index where update or
1184                           delete marking is done */
1185     const rec_t *rec,     /*!< in: clustered index record which
1186                           has NOT yet been modified */
1187     const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */
1188     const upd_t *update,  /*!< in: update vector which tells the
1189                           columns to be updated; in the case of
1190                           a delete, this should be set to NULL */
1191     ulint cmpl_info,      /*!< in: compiler info on secondary
1192                           index updates */
1193     const dtuple_t *row,  /*!< in: clustered index row contains
1194                           virtual column info */
1195     mtr_t *mtr)           /*!< in: mtr */
1196 {
1197   DBUG_TRACE;
1198 
1199   dict_table_t *table;
1200   ulint first_free;
1201   byte *ptr;
1202   const byte *field;
1203   ulint flen;
1204   ulint col_no;
1205   ulint type_cmpl;
1206   byte *type_cmpl_ptr;
1207   ulint i;
1208   trx_id_t trx_id;
1209   trx_undo_ptr_t *undo_ptr;
1210   ibool ignore_prefix = FALSE;
1211   byte ext_buf[REC_VERSION_56_MAX_INDEX_COL_LEN + BTR_EXTERN_FIELD_REF_SIZE];
1212   bool first_v_col = true;
1213 
1214   ut_a(index->is_clustered());
1215   ut_ad(rec_offs_validate(rec, index, offsets));
1216   ut_ad(mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE) ==
1217         TRX_UNDO_UPDATE);
1218   table = index->table;
1219 
1220   /* If table instance is temporary then select noredo rseg as changes
1221   to undo logs don't need REDO logging given that they are not
1222   restored on restart as corresponding object doesn't exist on restart.*/
1223   undo_ptr =
1224       index->table->is_temporary() ? &trx->rsegs.m_noredo : &trx->rsegs.m_redo;
1225 
1226   first_free =
1227       mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE);
1228   ptr = undo_page + first_free;
1229 
1230   ut_ad(first_free <= UNIV_PAGE_SIZE);
1231 
1232   if (trx_undo_left(undo_page, ptr) < 50) {
1233     /* NOTE: the value 50 must be big enough so that the general
1234     fields written below fit on the undo log page */
1235 
1236     return 0;
1237   }
1238 
1239   /* Reserve 2 bytes for the pointer to the next undo log record */
1240   ptr += 2;
1241 
1242   /* Store first some general parameters to the undo log */
1243 
1244   if (!update) {
1245     ut_ad(!rec_get_deleted_flag(rec, dict_table_is_comp(table)));
1246     type_cmpl = TRX_UNDO_DEL_MARK_REC;
1247   } else if (rec_get_deleted_flag(rec, dict_table_is_comp(table))) {
1248     type_cmpl = TRX_UNDO_UPD_DEL_REC;
1249     /* We are about to update a delete marked record.
1250     We don't typically need the prefix in this case unless
1251     the delete marking is done by the same transaction
1252     (which we check below). */
1253     ignore_prefix = TRUE;
1254   } else {
1255     type_cmpl = TRX_UNDO_UPD_EXIST_REC;
1256   }
1257 
1258   type_cmpl |= cmpl_info * TRX_UNDO_CMPL_INFO_MULT;
1259   type_cmpl_ptr = ptr;
1260 
1261   *ptr++ = (byte)type_cmpl;
1262 
1263   /* Introducing a change in undo log format. */
1264   *type_cmpl_ptr |= TRX_UNDO_MODIFY_BLOB;
1265 
1266   /* Introducing a new 1-byte flag. */
1267   *ptr++ = 0x00;
1268 
1269   ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
1270 
1271   ptr += mach_u64_write_much_compressed(ptr, table->id);
1272 
1273   /*----------------------------------------*/
1274   /* Store the state of the info bits */
1275 
1276   *ptr++ = (byte)rec_get_info_bits(rec, dict_table_is_comp(table));
1277 
1278   /* Store the values of the system columns */
1279   field = rec_get_nth_field(rec, offsets, index->get_sys_col_pos(DATA_TRX_ID),
1280                             &flen);
1281   ut_ad(flen == DATA_TRX_ID_LEN);
1282 
1283   trx_id = trx_read_trx_id(field);
1284 
1285   /* If it is an update of a delete marked record, then we are
1286   allowed to ignore blob prefixes if the delete marking was done
1287   by some other trx as it must have committed by now for us to
1288   allow an over-write. */
1289   if (ignore_prefix) {
1290     ignore_prefix = (trx_id != trx->id);
1291   }
1292   ptr += mach_u64_write_compressed(ptr, trx_id);
1293 
1294   field = rec_get_nth_field(rec, offsets, index->get_sys_col_pos(DATA_ROLL_PTR),
1295                             &flen);
1296   ut_ad(flen == DATA_ROLL_PTR_LEN);
1297 
1298   ptr += mach_u64_write_compressed(ptr, trx_read_roll_ptr(field));
1299 
1300   /*----------------------------------------*/
1301   /* Store then the fields required to uniquely determine the
1302   record which will be modified in the clustered index */
1303 
1304   for (i = 0; i < dict_index_get_n_unique(index); i++) {
1305     field = rec_get_nth_field(rec, offsets, i, &flen);
1306 
1307     /* The ordering columns must not be stored externally. */
1308     ut_ad(!rec_offs_nth_extern(offsets, i));
1309     ut_ad(!rec_offs_nth_default(offsets, i));
1310     ut_ad(index->get_col(i)->ord_part);
1311 
1312     if (trx_undo_left(undo_page, ptr) < 5) {
1313       return 0;
1314     }
1315 
1316     ptr += mach_write_compressed(ptr, flen);
1317 
1318     if (flen != UNIV_SQL_NULL) {
1319       if (trx_undo_left(undo_page, ptr) < flen) {
1320         return 0;
1321       }
1322 
1323       ut_memcpy(ptr, field, flen);
1324       ptr += flen;
1325     }
1326   }
1327 
1328   /*----------------------------------------*/
1329   /* Save to the undo log the old values of the columns to be updated. */
1330 
1331   if (update) {
1332     if (trx_undo_left(undo_page, ptr) < 5) {
1333       return 0;
1334     }
1335 
1336     ulint n_updated = upd_get_n_fields(update);
1337 
1338     /* If this is an online update while an inplace alter table
1339     is in progress and the table has virtual column, we will
1340     need to double check if there are any non-indexed columns
1341     being registered in update vector in case they will be indexed
1342     in new table */
1343     if (dict_index_is_online_ddl(index) && index->table->n_v_cols > 0) {
1344       for (i = 0; i < upd_get_n_fields(update); i++) {
1345         upd_field_t *fld = upd_get_nth_field(update, i);
1346         ulint pos = fld->field_no;
1347 
1348         /* These columns must not have an index
1349         on them */
1350         if (upd_fld_is_virtual_col(fld) &&
1351             dict_table_get_nth_v_col(table, pos)->v_indexes->empty()) {
1352           n_updated--;
1353         }
1354       }
1355     }
1356 
1357     ptr += mach_write_compressed(ptr, n_updated);
1358 
1359     for (i = 0; i < upd_get_n_fields(update); i++) {
1360       upd_field_t *fld = upd_get_nth_field(update, i);
1361 
1362       bool is_virtual = upd_fld_is_virtual_col(fld);
1363       bool is_multi_val = upd_fld_is_multi_value_col(fld);
1364       ulint max_v_log_len = 0;
1365 
1366       ulint pos = fld->field_no;
1367 
1368       /* Write field number to undo log */
1369       if (trx_undo_left(undo_page, ptr) < 5) {
1370         return 0;
1371       }
1372 
1373       if (is_virtual) {
1374         /* Skip the non-indexed column, during
1375         an online alter table */
1376         if (dict_index_is_online_ddl(index) &&
1377             dict_table_get_nth_v_col(table, pos)->v_indexes->empty()) {
1378           continue;
1379         }
1380 
1381         /* add REC_MAX_N_FIELDS to mark this
1382         is a virtual col */
1383         pos += REC_MAX_N_FIELDS;
1384       }
1385 
1386       ptr += mach_write_compressed(ptr, pos);
1387 
1388       /* Save the old value of field */
1389       if (is_virtual) {
1390         ut_ad(fld->field_no < table->n_v_def);
1391 
1392         ptr = trx_undo_log_v_idx(undo_page, table, fld->field_no, ptr,
1393                                  first_v_col);
1394         if (ptr == nullptr) {
1395           return 0;
1396         }
1397         first_v_col = false;
1398 
1399         max_v_log_len = dict_max_v_field_len_store_undo(table, fld->field_no);
1400 
1401         field = static_cast<byte *>(fld->old_v_val->data);
1402         flen = fld->old_v_val->len;
1403 
1404         /* Only log sufficient bytes for index
1405         record update */
1406         if (flen != UNIV_SQL_NULL) {
1407           flen = ut_min(flen, max_v_log_len);
1408         }
1409       } else {
1410         field = rec_get_nth_field_instant(rec, offsets, pos, index, &flen);
1411       }
1412 
1413       if (trx_undo_left(undo_page, ptr) < 15) {
1414         return 0;
1415       }
1416 
1417       if (!is_virtual && rec_offs_nth_extern(offsets, pos)) {
1418         ut_ad(!is_multi_val);
1419         const dict_col_t *col = index->get_col(pos);
1420         ulint prefix_len = dict_max_field_len_store_undo(table, col);
1421 
1422         ut_ad(prefix_len + BTR_EXTERN_FIELD_REF_SIZE <= sizeof ext_buf);
1423 
1424         ptr = trx_undo_page_report_modify_ext(
1425             trx, index, ptr,
1426             col->ord_part && !ignore_prefix &&
1427                     flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1428                 ? ext_buf
1429                 : nullptr,
1430             prefix_len, dict_table_page_size(table), &field, &flen,
1431             dict_table_is_sdi(table->id), SPATIAL_UNKNOWN);
1432 
1433         /* Notify purge that it eventually has to
1434         free the old externally stored field */
1435 
1436         undo_ptr->update_undo->del_marks = TRUE;
1437 
1438         *type_cmpl_ptr |= TRX_UNDO_UPD_EXTERN;
1439       } else if (!is_multi_val) {
1440         ptr += mach_write_compressed(ptr, flen);
1441       }
1442 
1443       if (is_multi_val) {
1444         bool suc = trx_undo_store_multi_value(undo_page, fld->old_v_val, &ptr);
1445         if (!suc) {
1446           return 0;
1447         }
1448       } else if (flen != UNIV_SQL_NULL) {
1449         if (trx_undo_left(undo_page, ptr) < flen) {
1450           return 0;
1451         }
1452 
1453         ut_memcpy(ptr, field, flen);
1454         ptr += flen;
1455 
1456         if (!is_virtual && rec_offs_nth_extern(offsets, pos)) {
1457           ptr = trx_undo_report_blob_update(undo_page, index, ptr, field, flen,
1458                                             update, fld, mtr);
1459 
1460           if (ptr == nullptr) {
1461             return 0;
1462           }
1463         }
1464       }
1465 
1466       /* Also record the new value for virtual column */
1467       if (is_virtual) {
1468         field = static_cast<byte *>(fld->new_val.data);
1469         flen = fld->new_val.len;
1470         if (flen != UNIV_SQL_NULL) {
1471           flen = ut_min(flen, max_v_log_len);
1472         }
1473 
1474         if (trx_undo_left(undo_page, ptr) < 15) {
1475           return 0;
1476         }
1477 
1478         if (is_multi_val) {
1479           bool suc = trx_undo_store_multi_value(undo_page, &fld->new_val, &ptr);
1480           if (!suc) {
1481             return 0;
1482           }
1483         } else {
1484           ptr += mach_write_compressed(ptr, flen);
1485 
1486           if (flen != UNIV_SQL_NULL) {
1487             if (trx_undo_left(undo_page, ptr) < flen) {
1488               return 0;
1489             }
1490 
1491             ut_memcpy(ptr, field, flen);
1492             ptr += flen;
1493           }
1494         }
1495       }
1496     }
1497   }
1498 
1499   /* Reset the first_v_col, so to put the virtual column undo
1500   version marker again, when we log all the indexed columns */
1501   first_v_col = true;
1502 
1503   /*----------------------------------------*/
1504   /* In the case of a delete marking, and also in the case of an update
1505   where any ordering field of any index changes, store the values of all
1506   columns which occur as ordering fields in any index. This info is used
1507   in the purge of old versions where we use it to build and search the
1508   delete marked index records, to look if we can remove them from the
1509   index tree. Note that starting from 4.0.14 also externally stored
1510   fields can be ordering in some index. Starting from 5.2, we no longer
1511   store REC_MAX_INDEX_COL_LEN first bytes to the undo log record,
1512   but we can construct the column prefix fields in the index by
1513   fetching the first page of the BLOB that is pointed to by the
1514   clustered index. This works also in crash recovery, because all pages
1515   (including BLOBs) are recovered before anything is rolled back. */
1516 
1517   if (!update || !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1518     byte *old_ptr = ptr;
1519     double mbr[SPDIMS * 2];
1520     mem_heap_t *row_heap = nullptr;
1521 
1522     undo_ptr->update_undo->del_marks = TRUE;
1523 
1524     if (trx_undo_left(undo_page, ptr) < 5) {
1525       return 0;
1526     }
1527 
1528     /* Reserve 2 bytes to write the number of bytes the stored
1529     fields take in this undo record */
1530 
1531     ptr += 2;
1532 
1533     for (col_no = 0; col_no < table->get_n_cols(); col_no++) {
1534       const dict_col_t *col = table->get_col(col_no);
1535 
1536       if (col->ord_part) {
1537         ulint pos;
1538         spatial_status_t spatial_status;
1539 
1540         spatial_status = SPATIAL_NONE;
1541 
1542         /* Write field number to undo log */
1543         if (trx_undo_left(undo_page, ptr) < 5 + 15) {
1544           return 0;
1545         }
1546 
1547         pos = index->get_col_pos(col_no);
1548         ptr += mach_write_compressed(ptr, pos);
1549 
1550         /* Save the old value of field */
1551         field = rec_get_nth_field_instant(rec, offsets, pos, index, &flen);
1552 
1553         if (rec_offs_nth_extern(offsets, pos)) {
1554           const dict_col_t *col = index->get_col(pos);
1555           ulint prefix_len = dict_max_field_len_store_undo(table, col);
1556 
1557           ut_a(prefix_len < sizeof ext_buf);
1558 
1559           spatial_status = col->get_spatial_status();
1560 
1561           /* If there is a spatial index on it,
1562           log its MBR */
1563           if (spatial_status != SPATIAL_NONE) {
1564             ut_ad(DATA_GEOMETRY_MTYPE(col->mtype));
1565 
1566             trx_undo_get_mbr_from_ext(trx, index, mbr,
1567                                       dict_table_page_size(table), field, &flen,
1568                                       index->rtr_srs.get());
1569           }
1570 
1571           ptr = trx_undo_page_report_modify_ext(
1572               trx, index, ptr,
1573               flen < REC_ANTELOPE_MAX_INDEX_COL_LEN && !ignore_prefix ? ext_buf
1574                                                                       : nullptr,
1575               prefix_len, dict_table_page_size(table), &field, &flen,
1576               dict_table_is_sdi(table->id), spatial_status);
1577         } else {
1578           ptr += mach_write_compressed(ptr, flen);
1579         }
1580 
1581         if (flen != UNIV_SQL_NULL && spatial_status != SPATIAL_ONLY) {
1582           if (trx_undo_left(undo_page, ptr) < flen) {
1583             return 0;
1584           }
1585 
1586           ut_memcpy(ptr, field, flen);
1587           ptr += flen;
1588         }
1589 
1590         if (spatial_status != SPATIAL_NONE) {
1591           if (trx_undo_left(undo_page, ptr) < DATA_MBR_LEN) {
1592             return 0;
1593           }
1594 
1595           for (int i = 0; i < SPDIMS * 2; i++) {
1596             mach_double_write(ptr, mbr[i]);
1597             ptr += sizeof(double);
1598           }
1599         }
1600       }
1601     }
1602 
1603     for (col_no = 0; col_no < dict_table_get_n_v_cols(table); col_no++) {
1604       dfield_t *vfield = nullptr;
1605 
1606       const dict_v_col_t *col = dict_table_get_nth_v_col(table, col_no);
1607 
1608       if (col->m_col.ord_part) {
1609         ulint pos = col_no;
1610         ulint max_v_log_len = dict_max_v_field_len_store_undo(table, pos);
1611 
1612         /* Write field number to undo log.
1613         Make sure there is enought space in log */
1614         if (trx_undo_left(undo_page, ptr) < 5) {
1615           return 0;
1616         }
1617 
1618         pos += REC_MAX_N_FIELDS;
1619         ptr += mach_write_compressed(ptr, pos);
1620 
1621         ut_ad(col_no < table->n_v_def);
1622         ptr = trx_undo_log_v_idx(undo_page, table, col_no, ptr, first_v_col);
1623         first_v_col = false;
1624 
1625         if (!ptr) {
1626           return 0;
1627         }
1628 
1629         if (update) {
1630           ut_ad(!row);
1631           if (update->old_vrow == nullptr) {
1632             flen = UNIV_SQL_NULL;
1633           } else {
1634             vfield = dtuple_get_nth_v_field(update->old_vrow, col->v_pos);
1635           }
1636         } else if (row) {
1637           vfield = dtuple_get_nth_v_field(row, col->v_pos);
1638         } else {
1639           ut_ad(0);
1640         }
1641 
1642         if (vfield) {
1643           field = static_cast<byte *>(vfield->data);
1644           flen = vfield->len;
1645         } else {
1646           ut_ad(flen == UNIV_SQL_NULL);
1647         }
1648 
1649         /* Prepare to write the field length and field data */
1650         if (flen != UNIV_SQL_NULL) {
1651           flen = ut_min(flen, max_v_log_len);
1652 
1653           if (trx_undo_left(undo_page, ptr) < 5 + flen) {
1654             return 0;
1655           }
1656         } else if (trx_undo_left(undo_page, ptr) < 5) {
1657           return 0;
1658         }
1659 
1660         if (col->m_col.is_multi_value()) {
1661           bool suc = trx_undo_store_multi_value(undo_page, vfield, &ptr);
1662           if (!suc) {
1663             return 0;
1664           }
1665         } else {
1666           ptr += mach_write_compressed(ptr, flen);
1667 
1668           if (flen != UNIV_SQL_NULL) {
1669             ut_memcpy(ptr, field, flen);
1670             ptr += flen;
1671           }
1672         }
1673       }
1674     }
1675 
1676     mach_write_to_2(old_ptr, ptr - old_ptr);
1677 
1678     if (row_heap) {
1679       mem_heap_free(row_heap);
1680     }
1681   }
1682 
1683   /*----------------------------------------*/
1684   /* Write pointers to the previous and the next undo log records */
1685   if (trx_undo_left(undo_page, ptr) < 2) {
1686     return 0;
1687   }
1688 
1689   mach_write_to_2(ptr, first_free);
1690   ptr += 2;
1691   mach_write_to_2(undo_page + first_free, ptr - undo_page);
1692 
1693   mach_write_to_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
1694                   ptr - undo_page);
1695 
1696   /* Write to the REDO log about this change in the UNDO log */
1697 
1698   trx_undof_page_add_undo_rec_log(undo_page, first_free, ptr - undo_page, mtr);
1699   return first_free;
1700 }
1701 
1702 /** Reads from an undo log update record the system field values of the old
1703  version.
1704  @return remaining part of undo log record after reading these values */
trx_undo_update_rec_get_sys_cols(const byte * ptr,trx_id_t * trx_id,roll_ptr_t * roll_ptr,ulint * info_bits)1705 byte *trx_undo_update_rec_get_sys_cols(
1706     const byte *ptr,      /*!< in: remaining part of undo
1707                           log record after reading
1708                           general parameters */
1709     trx_id_t *trx_id,     /*!< out: trx id */
1710     roll_ptr_t *roll_ptr, /*!< out: roll ptr */
1711     ulint *info_bits)     /*!< out: info bits state */
1712 {
1713   /* Read the state of the info bits */
1714   *info_bits = mach_read_from_1(ptr);
1715   ptr += 1;
1716 
1717   /* Read the values of the system columns */
1718 
1719   *trx_id = mach_u64_read_next_compressed(&ptr);
1720   *roll_ptr = mach_u64_read_next_compressed(&ptr);
1721 
1722   return (const_cast<byte *>(ptr));
1723 }
1724 
1725 /** Builds an update vector based on a remaining part of an undo log record.
1726  @return remaining part of the record, NULL if an error detected, which
1727  means that the record is corrupted */
trx_undo_update_rec_get_update(const byte * ptr,const dict_index_t * index,ulint type,trx_id_t trx_id,roll_ptr_t roll_ptr,ulint info_bits,trx_t * trx,mem_heap_t * heap,upd_t ** upd,lob::undo_vers_t * lob_undo,type_cmpl_t & type_cmpl)1728 byte *trx_undo_update_rec_get_update(
1729     const byte *ptr,            /*!< in: remaining part in update undo log
1730                                 record, after reading the row reference
1731                                 NOTE that this copy of the undo log record must
1732                                 be preserved as long as the update vector is
1733                                 used, as we do NOT copy the data in the
1734                                 record! */
1735     const dict_index_t *index,  /*!< in: clustered index */
1736     ulint type,                 /*!< in: TRX_UNDO_UPD_EXIST_REC,
1737                                 TRX_UNDO_UPD_DEL_REC, or
1738                                 TRX_UNDO_DEL_MARK_REC; in the last case,
1739                                 only trx id and roll ptr fields are added to
1740                                 the update vector */
1741     trx_id_t trx_id,            /*!< in: transaction id from this undo record */
1742     roll_ptr_t roll_ptr,        /*!< in: roll pointer from this undo record */
1743     ulint info_bits,            /*!< in: info bits from this undo record */
1744     trx_t *trx,                 /*!< in: transaction */
1745     mem_heap_t *heap,           /*!< in: memory heap from which the memory
1746                                 needed is allocated */
1747     upd_t **upd,                /*!< out, own: update vector */
1748     lob::undo_vers_t *lob_undo, /*!< out: LOB undo information. */
1749     type_cmpl_t &type_cmpl)     /*!< out: type compilation info */
1750 {
1751   DBUG_TRACE;
1752 
1753   upd_field_t *upd_field;
1754   upd_t *update;
1755   ulint n_fields;
1756   byte *buf;
1757   ulint i;
1758   bool first_v_col = true;
1759   bool is_undo_log = true;
1760   ulint n_skip_field = 0;
1761 
1762   ut_a(index->is_clustered());
1763 
1764   if (type != TRX_UNDO_DEL_MARK_REC) {
1765     n_fields = mach_read_next_compressed(&ptr);
1766   } else {
1767     n_fields = 0;
1768   }
1769 
1770   update = upd_create(n_fields + 2, heap);
1771 
1772   update->info_bits = info_bits;
1773 
1774   /* Store first trx id and roll ptr to update vector */
1775 
1776   upd_field = upd_get_nth_field(update, n_fields);
1777 
1778   buf = static_cast<byte *>(mem_heap_alloc(heap, DATA_TRX_ID_LEN));
1779 
1780   trx_write_trx_id(buf, trx_id);
1781 
1782   upd_field_set_field_no(upd_field, index->get_sys_col_pos(DATA_TRX_ID), index,
1783                          trx);
1784   dfield_set_data(&(upd_field->new_val), buf, DATA_TRX_ID_LEN);
1785 
1786   upd_field = upd_get_nth_field(update, n_fields + 1);
1787 
1788   buf = static_cast<byte *>(mem_heap_alloc(heap, DATA_ROLL_PTR_LEN));
1789 
1790   trx_write_roll_ptr(buf, roll_ptr);
1791 
1792   upd_field_set_field_no(upd_field, index->get_sys_col_pos(DATA_ROLL_PTR),
1793                          index, trx);
1794   dfield_set_data(&(upd_field->new_val), buf, DATA_ROLL_PTR_LEN);
1795 
1796   /* Store then the updated ordinary columns to the update vector */
1797 
1798   for (i = 0; i < n_fields; i++) {
1799     const byte *field;
1800     ulint len;
1801     ulint field_no;
1802     ulint orig_len;
1803     bool is_virtual;
1804     dict_v_col_t *vcol = nullptr;
1805 
1806     field_no = mach_read_next_compressed(&ptr);
1807 
1808     is_virtual = (field_no >= REC_MAX_N_FIELDS);
1809 
1810     if (is_virtual) {
1811       /* If new version, we need to check index list to figure
1812       out the correct virtual column position */
1813       ptr = trx_undo_read_v_idx(index->table, ptr, first_v_col, &is_undo_log,
1814                                 &field_no);
1815       first_v_col = false;
1816     } else if (field_no >= dict_index_get_n_fields(index)) {
1817       ib::error(ER_IB_MSG_1184)
1818           << "Trying to access update undo rec"
1819              " field "
1820           << field_no << " in index " << index->name << " of table "
1821           << index->table->name << " but index has only "
1822           << dict_index_get_n_fields(index) << " fields " << BUG_REPORT_MSG
1823           << ". Run also CHECK TABLE " << index->table->name
1824           << "."
1825              " n_fields = "
1826           << n_fields << ", i = " << i << ", ptr " << ptr;
1827 
1828       ut_ad(0);
1829       *upd = nullptr;
1830       return nullptr;
1831     }
1832 
1833     upd_field = upd_get_nth_field(update, i);
1834 
1835     if (is_virtual) {
1836       /* This column could be dropped or no longer indexed */
1837       if (field_no == ULINT_UNDEFINED) {
1838         /* Mark this is no longer needed */
1839         upd_field->field_no = REC_MAX_N_FIELDS;
1840 
1841         if (trx_undo_rec_is_multi_value(ptr)) {
1842           ptr = trx_undo_rec_get_multi_value(ptr, nullptr, heap);
1843           ut_ad(trx_undo_rec_is_multi_value(ptr));
1844           ptr = trx_undo_rec_get_multi_value(ptr, nullptr, heap);
1845         } else {
1846           ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1847           ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1848         }
1849         n_skip_field++;
1850         continue;
1851       } else {
1852         vcol = dict_table_get_nth_v_col(index->table, field_no);
1853       }
1854 
1855       upd_field_set_v_field_no(upd_field, field_no, index);
1856     } else {
1857       upd_field_set_field_no(upd_field, field_no, index, trx);
1858     }
1859 
1860     if (vcol != nullptr && vcol->m_col.is_multi_value()) {
1861       ptr = trx_undo_rec_get_multi_value(ptr, &upd_field->new_val, heap);
1862     } else {
1863       ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1864 
1865       upd_field->orig_len = orig_len;
1866 
1867       if (len == UNIV_SQL_NULL) {
1868         dfield_set_null(&upd_field->new_val);
1869       } else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1870         dfield_set_data(&upd_field->new_val, field, len);
1871       } else {
1872         len -= UNIV_EXTERN_STORAGE_FIELD;
1873 
1874         dfield_set_data(&upd_field->new_val, field, len);
1875         dfield_set_ext(&upd_field->new_val);
1876 
1877         if (type_cmpl.is_lob_undo() && type_cmpl.is_lob_updated()) {
1878           /* Read the partial update on LOB */
1879           ptr = trx_undo_read_blob_update(ptr, upd_field, lob_undo);
1880         }
1881       }
1882     }
1883 
1884     if (is_virtual) {
1885       upd_field->old_v_val = static_cast<dfield_t *>(
1886           mem_heap_zalloc(heap, sizeof *upd_field->old_v_val));
1887 
1888       if (vcol != nullptr && vcol->m_col.is_multi_value()) {
1889         ptr = trx_undo_rec_get_multi_value(ptr, upd_field->old_v_val, heap);
1890       } else {
1891         ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1892         if (len == UNIV_SQL_NULL) {
1893           dfield_set_null(upd_field->old_v_val);
1894         } else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1895           dfield_set_data(upd_field->old_v_val, field, len);
1896         } else {
1897           ut_ad(0);
1898         }
1899       }
1900     }
1901   }
1902 
1903   /* In rare scenario, we could have skipped virtual column (as they
1904   are dropped. We will regenerate a update vector and skip them */
1905   if (n_skip_field > 0) {
1906     ulint n = 0;
1907     ut_ad(n_skip_field <= n_fields);
1908 
1909     upd_t *new_update = upd_create(n_fields + 2 - n_skip_field, heap);
1910 
1911     for (i = 0; i < n_fields + 2; i++) {
1912       upd_field = upd_get_nth_field(update, i);
1913 
1914       if (upd_field->field_no == REC_MAX_N_FIELDS) {
1915         continue;
1916       }
1917 
1918       upd_field_t *new_upd_field = upd_get_nth_field(new_update, n);
1919       *new_upd_field = *upd_field;
1920       n++;
1921     }
1922     ut_ad(n == n_fields + 2 - n_skip_field);
1923     *upd = new_update;
1924   } else {
1925     *upd = update;
1926   }
1927 
1928   return const_cast<byte *>(ptr);
1929 }
1930 
1931 /** Builds a partial row from an update undo log record, for purge.
1932  It contains the columns which occur as ordering in any index of the table.
1933  Any missing columns are indicated by col->mtype == DATA_MISSING.
1934  @return pointer to remaining part of undo record */
trx_undo_rec_get_partial_row(const byte * ptr,dict_index_t * index,dtuple_t ** row,ibool ignore_prefix,mem_heap_t * heap)1935 byte *trx_undo_rec_get_partial_row(
1936     const byte *ptr,     /*!< in: remaining part in update undo log
1937                          record of a suitable type, at the start of
1938                          the stored index columns;
1939                          NOTE that this copy of the undo log record must
1940                          be preserved as long as the partial row is
1941                          used, as we do NOT copy the data in the
1942                          record! */
1943     dict_index_t *index, /*!< in: clustered index */
1944     dtuple_t **row,      /*!< out, own: partial row */
1945     ibool ignore_prefix, /*!< in: flag to indicate if we
1946                   expect blob prefixes in undo. Used
1947                   only in the assertion. */
1948     mem_heap_t *heap)    /*!< in: memory heap from which the memory
1949                          needed is allocated */
1950 {
1951   const byte *end_ptr;
1952   bool first_v_col = true;
1953   bool is_undo_log = true;
1954 
1955   ut_ad(index);
1956   ut_ad(ptr);
1957   ut_ad(row);
1958   ut_ad(heap);
1959   ut_ad(index->is_clustered());
1960 
1961   *row = dtuple_create_with_vcol(heap, index->table->get_n_cols(),
1962                                  dict_table_get_n_v_cols(index->table));
1963 
1964   /* Mark all columns in the row uninitialized, so that
1965   we can distinguish missing fields from fields that are SQL NULL. */
1966   for (ulint i = 0; i < index->table->get_n_cols(); i++) {
1967     dfield_get_type(dtuple_get_nth_field(*row, i))->mtype = DATA_MISSING;
1968     /* In case a multi-value field checking read uninitialized value */
1969     dfield_get_type(dtuple_get_nth_field(*row, i))->prtype = 0;
1970   }
1971 
1972   dtuple_init_v_fld(*row);
1973 
1974   end_ptr = ptr + mach_read_from_2(ptr);
1975   ptr += 2;
1976 
1977   while (ptr != end_ptr) {
1978     dfield_t *dfield = nullptr;
1979     const byte *field;
1980     ulint field_no = ULINT_UNDEFINED;
1981     const dict_col_t *col = nullptr;
1982     ulint col_no;
1983     ulint len;
1984     ulint orig_len;
1985     bool is_virtual;
1986     dict_v_col_t *vcol = nullptr;
1987 
1988     field_no = mach_read_next_compressed(&ptr);
1989 
1990     is_virtual = (field_no >= REC_MAX_N_FIELDS);
1991 
1992     if (is_virtual) {
1993       ptr = trx_undo_read_v_idx(index->table, ptr, first_v_col, &is_undo_log,
1994                                 &field_no);
1995       first_v_col = false;
1996       if (field_no != ULINT_UNDEFINED) {
1997         vcol = dict_table_get_nth_v_col(index->table, field_no);
1998         col = &vcol->m_col;
1999         col_no = dict_col_get_no(col);
2000         dfield = dtuple_get_nth_v_field(*row, vcol->v_pos);
2001         vcol->m_col.copy_type(dfield_get_type(dfield));
2002       }
2003     }
2004 
2005     if ((vcol != nullptr && vcol->m_col.is_multi_value()) ||
2006         trx_undo_rec_is_multi_value(ptr)) {
2007       ut_ad(is_virtual);
2008       ut_ad(vcol != nullptr || field_no == ULINT_UNDEFINED);
2009       ut_ad(dfield != nullptr || field_no == ULINT_UNDEFINED);
2010       ptr = trx_undo_rec_get_multi_value(ptr, dfield, heap);
2011       continue;
2012     } else {
2013       ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
2014     }
2015 
2016     /* This column could be dropped or no longer indexed */
2017     if (field_no == ULINT_UNDEFINED) {
2018       ut_ad(is_virtual);
2019       continue;
2020     }
2021 
2022     if (!is_virtual) {
2023       col = index->get_col(field_no);
2024       col_no = dict_col_get_no(col);
2025       dfield = dtuple_get_nth_field(*row, col_no);
2026       index->table->get_col(col_no)->copy_type(dfield_get_type(dfield));
2027     }
2028 
2029     dfield_set_data(dfield, field, len);
2030 
2031     if (len != UNIV_SQL_NULL && len >= UNIV_EXTERN_STORAGE_FIELD) {
2032       spatial_status_t spatial_status;
2033 
2034       /* Decode spatial status. */
2035       spatial_status = static_cast<spatial_status_t>(
2036           (len & SPATIAL_STATUS_MASK) >> SPATIAL_STATUS_SHIFT);
2037       len &= ~SPATIAL_STATUS_MASK;
2038 
2039       /* Keep compatible with 5.7.9 format. */
2040       if (spatial_status == SPATIAL_UNKNOWN) {
2041         spatial_status = col->get_spatial_status();
2042       }
2043 
2044       switch (spatial_status) {
2045         case SPATIAL_ONLY:
2046           ut_ad(len - UNIV_EXTERN_STORAGE_FIELD == DATA_MBR_LEN);
2047           dfield_set_len(dfield, len - UNIV_EXTERN_STORAGE_FIELD);
2048           break;
2049 
2050         case SPATIAL_MIXED:
2051           dfield_set_len(dfield,
2052                          len - UNIV_EXTERN_STORAGE_FIELD - DATA_MBR_LEN);
2053           break;
2054 
2055         case SPATIAL_NONE:
2056           dfield_set_len(dfield, len - UNIV_EXTERN_STORAGE_FIELD);
2057           break;
2058 
2059         case SPATIAL_UNKNOWN:
2060           ut_ad(0);
2061           break;
2062       }
2063 
2064       dfield_set_ext(dfield);
2065       dfield_set_spatial_status(dfield, spatial_status);
2066 
2067       /* If the prefix of this column is indexed,
2068       ensure that enough prefix is stored in the
2069       undo log record. */
2070       if (!ignore_prefix && col->ord_part && spatial_status != SPATIAL_ONLY) {
2071         ut_a(dfield_get_len(dfield) >= BTR_EXTERN_FIELD_REF_SIZE);
2072         ut_a(dict_table_has_atomic_blobs(index->table) ||
2073              dfield_get_len(dfield) >=
2074                  REC_ANTELOPE_MAX_INDEX_COL_LEN + BTR_EXTERN_FIELD_REF_SIZE);
2075       }
2076     }
2077   }
2078 
2079   return (const_cast<byte *>(ptr));
2080 }
2081 #endif /* !UNIV_HOTBACKUP */
2082 
2083 /** Erases the unused undo log page end.
2084  @return true if the page contained something, false if it was empty */
trx_undo_erase_page_end(page_t * undo_page,mtr_t * mtr)2085 static ibool trx_undo_erase_page_end(
2086     page_t *undo_page, /*!< in/out: undo page whose end to erase */
2087     mtr_t *mtr)        /*!< in/out: mini-transaction */
2088 {
2089   ulint first_free;
2090 
2091   first_free =
2092       mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE);
2093   memset(undo_page + first_free, 0xff,
2094          (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) - first_free);
2095 
2096   mlog_write_initial_log_record(undo_page, MLOG_UNDO_ERASE_END, mtr);
2097   return (first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
2098 }
2099 
2100 /** Parses a redo log record of erasing of an undo page end.
2101  @return end of log record or NULL */
trx_undo_parse_erase_page_end(byte * ptr,byte * end_ptr MY_ATTRIBUTE ((unused)),page_t * page,mtr_t * mtr)2102 byte *trx_undo_parse_erase_page_end(
2103     byte *ptr,                            /*!< in: buffer */
2104     byte *end_ptr MY_ATTRIBUTE((unused)), /*!< in: buffer end */
2105     page_t *page,                         /*!< in: page or NULL */
2106     mtr_t *mtr)                           /*!< in: mtr or NULL */
2107 {
2108   ut_ad(ptr != nullptr);
2109   ut_ad(end_ptr != nullptr);
2110 
2111   if (page == nullptr) {
2112     return (ptr);
2113   }
2114 
2115   trx_undo_erase_page_end(page, mtr);
2116 
2117   return (ptr);
2118 }
2119 
2120 #ifndef UNIV_HOTBACKUP
2121 /** Writes information to an undo log about an insert, update, or a delete
2122  marking of a clustered index record. This information is used in a rollback of
2123  the transaction and in consistent reads that must look to the history of this
2124  transaction.
2125  @return DB_SUCCESS or error code */
trx_undo_report_row_operation(ulint flags,ulint op_type,que_thr_t * thr,dict_index_t * index,const dtuple_t * clust_entry,const upd_t * update,ulint cmpl_info,const rec_t * rec,const ulint * offsets,roll_ptr_t * roll_ptr)2126 dberr_t trx_undo_report_row_operation(
2127     ulint flags,                 /*!< in: if BTR_NO_UNDO_LOG_FLAG bit is
2128                                  set, does nothing */
2129     ulint op_type,               /*!< in: TRX_UNDO_INSERT_OP or
2130                                  TRX_UNDO_MODIFY_OP */
2131     que_thr_t *thr,              /*!< in: query thread */
2132     dict_index_t *index,         /*!< in: clustered index */
2133     const dtuple_t *clust_entry, /*!< in: in the case of an insert,
2134                                  index entry to insert into the
2135                                  clustered index, otherwise NULL */
2136     const upd_t *update,         /*!< in: in the case of an update,
2137                                  the update vector, otherwise NULL */
2138     ulint cmpl_info,             /*!< in: compiler info on secondary
2139                                  index updates */
2140     const rec_t *rec,            /*!< in: in case of an update or delete
2141                                  marking, the record in the clustered
2142                                  index, otherwise NULL */
2143     const ulint *offsets,        /*!< in: rec_get_offsets(rec) */
2144     roll_ptr_t *roll_ptr)        /*!< out: rollback pointer to the
2145                                  inserted undo log record,
2146                                  0 if BTR_NO_UNDO_LOG
2147                                  flag was specified */
2148 {
2149   trx_t *trx;
2150   trx_undo_t *undo;
2151   page_no_t page_no;
2152   buf_block_t *undo_block;
2153   trx_undo_ptr_t *undo_ptr;
2154   mtr_t mtr;
2155   dberr_t err = DB_SUCCESS;
2156 #ifdef UNIV_DEBUG
2157   int loop_count = 0;
2158 #endif /* UNIV_DEBUG */
2159 
2160   ut_a(index->is_clustered());
2161   ut_ad(!rec || rec_offs_validate(rec, index, offsets));
2162 
2163   if (flags & BTR_NO_UNDO_LOG_FLAG) {
2164     *roll_ptr = 0;
2165 
2166     return (DB_SUCCESS);
2167   }
2168 
2169   ut_ad(thr);
2170   ut_ad(!srv_read_only_mode);
2171   ut_ad((op_type != TRX_UNDO_INSERT_OP) || (clust_entry && !update && !rec));
2172 
2173   trx = thr_get_trx(thr);
2174 
2175   bool is_temp_table = index->table->is_temporary();
2176 
2177   /* Temporary tables do not go into INFORMATION_SCHEMA.TABLES,
2178   so do not bother adding it to the list of modified tables by
2179   the transaction - this list is only used for maintaining
2180   INFORMATION_SCHEMA.TABLES.UPDATE_TIME. */
2181   if (!is_temp_table) {
2182     trx->mod_tables.insert(index->table);
2183   }
2184 
2185   /* If trx is read-only then only temp-tables can be written. */
2186   ut_ad(!trx->read_only || is_temp_table);
2187 
2188   /* If this is a temp-table then we assign temporary rseg. */
2189   if (is_temp_table && trx->rsegs.m_noredo.rseg == nullptr) {
2190     trx_assign_rseg_temp(trx);
2191   }
2192 
2193   mtr_start(&mtr);
2194 
2195   if (is_temp_table) {
2196     /* If object is temporary, disable REDO logging that
2197     is done to track changes done to UNDO logs. This is
2198     feasible given that temporary tables and temporary
2199     undo logs are not restored on restart. */
2200     undo_ptr = &trx->rsegs.m_noredo;
2201     mtr.set_log_mode(MTR_LOG_NO_REDO);
2202   } else {
2203     undo_ptr = &trx->rsegs.m_redo;
2204   }
2205 
2206   mutex_enter(&trx->undo_mutex);
2207 
2208 #ifdef UNIV_DEBUG
2209   if (srv_inject_too_many_concurrent_trxs) {
2210     err = DB_TOO_MANY_CONCURRENT_TRXS;
2211     goto err_exit;
2212   }
2213 #endif /* UNIV_DEBUG */
2214 
2215   switch (op_type) {
2216     case TRX_UNDO_INSERT_OP:
2217       undo = undo_ptr->insert_undo;
2218 
2219       if (undo == nullptr) {
2220         err = trx_undo_assign_undo(trx, undo_ptr, TRX_UNDO_INSERT);
2221         undo = undo_ptr->insert_undo;
2222 
2223         if (undo == nullptr) {
2224           /* Did not succeed */
2225           ut_ad(err != DB_SUCCESS);
2226           goto err_exit;
2227         }
2228       }
2229 
2230       ut_ad(err == DB_SUCCESS);
2231       break;
2232     default:
2233       ut_ad(op_type == TRX_UNDO_MODIFY_OP);
2234 
2235       undo = undo_ptr->update_undo;
2236 
2237       if (undo == nullptr) {
2238         err = trx_undo_assign_undo(trx, undo_ptr, TRX_UNDO_UPDATE);
2239         undo = undo_ptr->update_undo;
2240 
2241         if (undo == nullptr) {
2242           /* Did not succeed */
2243           ut_ad(err != DB_SUCCESS);
2244           goto err_exit;
2245         }
2246       }
2247 
2248       ut_ad(err == DB_SUCCESS);
2249       break;
2250   }
2251 
2252   page_no = undo->last_page_no;
2253 
2254   undo_block = buf_page_get_gen(
2255       page_id_t(undo->space, page_no), undo->page_size, RW_X_LATCH,
2256       buf_pool_is_obsolete(undo->withdraw_clock) ? nullptr : undo->guess_block,
2257       Page_fetch::NORMAL, __FILE__, __LINE__, &mtr);
2258 
2259   buf_block_dbg_add_level(undo_block, SYNC_TRX_UNDO_PAGE);
2260 
2261   do {
2262     page_t *undo_page;
2263     ulint offset;
2264 
2265     undo_page = buf_block_get_frame(undo_block);
2266     ut_ad(page_no == undo_block->page.id.page_no());
2267 
2268     switch (op_type) {
2269       case TRX_UNDO_INSERT_OP:
2270         offset = trx_undo_page_report_insert(undo_page, trx, index, clust_entry,
2271                                              &mtr);
2272         break;
2273       default:
2274         ut_ad(op_type == TRX_UNDO_MODIFY_OP);
2275         offset =
2276             trx_undo_page_report_modify(undo_page, trx, index, rec, offsets,
2277                                         update, cmpl_info, clust_entry, &mtr);
2278     }
2279 
2280     if (UNIV_UNLIKELY(offset == 0)) {
2281       /* The record did not fit on the page. We erase the
2282       end segment of the undo log page and write a log
2283       record of it: this is to ensure that in the debug
2284       version the replicate page constructed using the log
2285       records stays identical to the original page */
2286 
2287       if (!trx_undo_erase_page_end(undo_page, &mtr)) {
2288         /* The record did not fit on an empty
2289         undo page. Discard the freshly allocated
2290         page and return an error. */
2291 
2292         /* When we remove a page from an undo
2293         log, this is analogous to a
2294         pessimistic insert in a B-tree, and we
2295         must reserve the counterpart of the
2296         tree latch, which is the rseg
2297         mutex. We must commit the mini-transaction
2298         first, because it may be holding lower-level
2299         latches, such as SYNC_FSP and SYNC_FSP_PAGE. */
2300 
2301         mtr_commit(&mtr);
2302         mtr_start(&mtr);
2303 
2304         if (index->table->is_temporary()) {
2305           mtr.set_log_mode(MTR_LOG_NO_REDO);
2306         }
2307 
2308         undo_ptr->rseg->latch();
2309         trx_undo_free_last_page(trx, undo, &mtr);
2310         undo_ptr->rseg->unlatch();
2311 
2312         err = DB_UNDO_RECORD_TOO_BIG;
2313         goto err_exit;
2314       }
2315 
2316       mtr_commit(&mtr);
2317     } else {
2318       /* Success */
2319       undo->withdraw_clock = buf_withdraw_clock;
2320       mtr_commit(&mtr);
2321 
2322       undo->empty = FALSE;
2323       undo->top_page_no = page_no;
2324       undo->top_offset = offset;
2325       undo->top_undo_no = trx->undo_no;
2326       undo->guess_block = undo_block;
2327 
2328       trx->undo_no++;
2329       trx->undo_rseg_space = undo_ptr->rseg->space_id;
2330 
2331       mutex_exit(&trx->undo_mutex);
2332 
2333       *roll_ptr =
2334           trx_undo_build_roll_ptr(op_type == TRX_UNDO_INSERT_OP,
2335                                   undo_ptr->rseg->space_id, page_no, offset);
2336       return (DB_SUCCESS);
2337     }
2338 
2339     ut_ad(page_no == undo->last_page_no);
2340 
2341     /* We have to extend the undo log by one page */
2342 
2343     ut_ad(++loop_count < 2);
2344 
2345     mtr_start(&mtr);
2346 
2347     if (index->table->is_temporary()) {
2348       mtr.set_log_mode(MTR_LOG_NO_REDO);
2349     }
2350 
2351     /* When we add a page to an undo log, this is analogous to
2352     a pessimistic insert in a B-tree, and we must reserve the
2353     counterpart of the tree latch, which is the rseg mutex. */
2354 
2355     undo_ptr->rseg->latch();
2356     undo_block = trx_undo_add_page(trx, undo, undo_ptr, &mtr);
2357     undo_ptr->rseg->unlatch();
2358 
2359     page_no = undo->last_page_no;
2360 
2361     DBUG_EXECUTE_IF("ib_err_ins_undo_page_add_failure", undo_block = nullptr;);
2362   } while (undo_block != nullptr);
2363 
2364   ib_errf(
2365       trx->mysql_thd, IB_LOG_LEVEL_ERROR, ER_INNODB_UNDO_LOG_FULL,
2366       "No more space left over in %s tablespace for allocating UNDO"
2367       " log pages. Please add new data file to the tablespace or"
2368       " check if filesystem is full or enable auto-extension for"
2369       " the tablespace",
2370       ((undo->space == TRX_SYS_SPACE)
2371            ? "system"
2372            : ((fsp_is_system_temporary(undo->space)) ? "temporary" : "undo")));
2373 
2374   /* Did not succeed: out of space */
2375   err = DB_OUT_OF_FILE_SPACE;
2376 
2377 err_exit:
2378   mutex_exit(&trx->undo_mutex);
2379   mtr_commit(&mtr);
2380   return (err);
2381 }
2382 
2383 /*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/
2384 
2385 /** Copies an undo record to heap. This function can be called if we know that
2386  the undo log record exists.
2387  @return own: copy of the record */
2388 static MY_ATTRIBUTE((warn_unused_result))
trx_undo_get_undo_rec_low(roll_ptr_t roll_ptr,mem_heap_t * heap,bool is_temp)2389     trx_undo_rec_t *trx_undo_get_undo_rec_low(
2390         roll_ptr_t roll_ptr, /*!< in: roll pointer to record */
2391         mem_heap_t *heap,    /*!< in: memory heap where copied */
2392         bool is_temp)        /*!< in: true if temp undo rec. */
2393 {
2394   trx_undo_rec_t *undo_rec;
2395   ulint rseg_id;
2396   space_id_t space_id;
2397   page_no_t page_no;
2398   ulint offset;
2399   const page_t *undo_page;
2400   ibool is_insert;
2401   mtr_t mtr;
2402 
2403   trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset);
2404   space_id = trx_rseg_id_to_space_id(rseg_id, is_temp);
2405 
2406   bool found;
2407   const page_size_t &page_size = fil_space_get_page_size(space_id, &found);
2408   ut_ad(found);
2409 
2410   mtr_start(&mtr);
2411 
2412   undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no),
2413                                           page_size, &mtr);
2414 
2415   undo_rec = trx_undo_rec_copy(undo_page, static_cast<uint32_t>(offset), heap);
2416 
2417   mtr_commit(&mtr);
2418 
2419   return (undo_rec);
2420 }
2421 
2422 /** Copies an undo record to heap.
2423  @param[in]	roll_ptr	roll pointer to record
2424  @param[in]	trx_id		id of the trx that generated
2425                                  the roll pointer: it points to an
2426                                  undo log of this transaction
2427  @param[in]	heap		memory heap where copied
2428  @param[in]	is_temp		true if temporary, no-redo rseg.
2429  @param[in]	name		table name
2430  @param[out]	undo_rec	own: copy of the record
2431  @retval true if the undo log has been
2432  truncated and we cannot fetch the old version
2433  @retval false if the undo log record is available
2434  NOTE: the caller must have latches on the clustered index page. */
trx_undo_get_undo_rec(roll_ptr_t roll_ptr,trx_id_t trx_id,mem_heap_t * heap,bool is_temp,const table_name_t & name,trx_undo_rec_t ** undo_rec)2435 static MY_ATTRIBUTE((warn_unused_result)) bool trx_undo_get_undo_rec(
2436     roll_ptr_t roll_ptr, trx_id_t trx_id, mem_heap_t *heap, bool is_temp,
2437     const table_name_t &name, trx_undo_rec_t **undo_rec) {
2438   bool missing_history;
2439 
2440   rw_lock_s_lock(&purge_sys->latch);
2441 
2442   missing_history = purge_sys->view.changes_visible(trx_id, name);
2443   if (!missing_history) {
2444     *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp);
2445   }
2446 
2447   rw_lock_s_unlock(&purge_sys->latch);
2448 
2449   return (missing_history);
2450 }
2451 
2452 #ifdef UNIV_DEBUG
2453 #define ATTRIB_USED_ONLY_IN_DEBUG
2454 #else /* UNIV_DEBUG */
2455 #define ATTRIB_USED_ONLY_IN_DEBUG MY_ATTRIBUTE((unused))
2456 #endif /* UNIV_DEBUG */
2457 
trx_undo_prev_version_build(const rec_t * index_rec ATTRIB_USED_ONLY_IN_DEBUG,mtr_t * index_mtr ATTRIB_USED_ONLY_IN_DEBUG,const rec_t * rec,const dict_index_t * const index,ulint * offsets,mem_heap_t * heap,rec_t ** old_vers,mem_heap_t * v_heap,const dtuple_t ** vrow,ulint v_status,lob::undo_vers_t * lob_undo)2458 bool trx_undo_prev_version_build(
2459     const rec_t *index_rec ATTRIB_USED_ONLY_IN_DEBUG,
2460     mtr_t *index_mtr ATTRIB_USED_ONLY_IN_DEBUG, const rec_t *rec,
2461     const dict_index_t *const index, ulint *offsets, mem_heap_t *heap,
2462     rec_t **old_vers, mem_heap_t *v_heap, const dtuple_t **vrow, ulint v_status,
2463     lob::undo_vers_t *lob_undo) {
2464   DBUG_TRACE;
2465 
2466   trx_undo_rec_t *undo_rec = nullptr;
2467   dtuple_t *entry;
2468   trx_id_t rec_trx_id;
2469   ulint type;
2470   undo_no_t undo_no;
2471   table_id_t table_id;
2472   trx_id_t trx_id;
2473   roll_ptr_t roll_ptr;
2474   upd_t *update = nullptr;
2475   byte *ptr;
2476   ulint info_bits;
2477   ulint cmpl_info;
2478   bool dummy_extern;
2479   byte *buf;
2480 
2481   ut_ad(!rw_lock_own(&purge_sys->latch, RW_LOCK_S));
2482   ut_ad(mtr_memo_contains_page(index_mtr, index_rec, MTR_MEMO_PAGE_S_FIX) ||
2483         mtr_memo_contains_page(index_mtr, index_rec, MTR_MEMO_PAGE_X_FIX));
2484   ut_ad(rec_offs_validate(rec, index, offsets));
2485   ut_a(index->is_clustered());
2486 
2487   roll_ptr = row_get_rec_roll_ptr(rec, index, offsets);
2488 
2489   *old_vers = nullptr;
2490 
2491   if (trx_undo_roll_ptr_is_insert(roll_ptr)) {
2492     /* The record rec is the first inserted version */
2493     return true;
2494   }
2495 
2496   rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
2497 
2498   /* REDO rollback segments are used only for non-temporary objects.
2499   For temporary objects NON-REDO rollback segments are used. */
2500   bool is_temp = index->table->is_temporary();
2501 
2502   ut_ad(!index->table->skip_alter_undo);
2503 
2504   if (trx_undo_get_undo_rec(roll_ptr, rec_trx_id, heap, is_temp,
2505                             index->table->name, &undo_rec)) {
2506     if (v_status & TRX_UNDO_PREV_IN_PURGE) {
2507       /* We are fetching the record being purged */
2508       undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp);
2509     } else {
2510       /* The undo record may already have been purged,
2511       during purge or semi-consistent read. */
2512       return false;
2513     }
2514   }
2515 
2516   type_cmpl_t type_cmpl;
2517   ptr = trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info, &dummy_extern,
2518                               &undo_no, &table_id, type_cmpl);
2519 
2520   if (table_id != index->table->id) {
2521     /* The table should have been rebuilt, but purge has
2522     not yet removed the undo log records for the
2523     now-dropped old table (table_id). */
2524     return true;
2525   }
2526 
2527   ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr, &info_bits);
2528 
2529   /* (a) If a clustered index record version is such that the
2530   trx id stamp in it is bigger than purge_sys->view, then the
2531   BLOBs in that version are known to exist (the purge has not
2532   progressed that far);
2533 
2534   (b) if the version is the first version such that trx id in it
2535   is less than purge_sys->view, and it is not delete-marked,
2536   then the BLOBs in that version are known to exist (the purge
2537   cannot have purged the BLOBs referenced by that version
2538   yet).
2539 
2540   This function does not fetch any BLOBs.  The callers might, by
2541   possibly invoking row_ext_create() via row_build().  However,
2542   they should have all needed information in the *old_vers
2543   returned by this function.  This is because *old_vers is based
2544   on the transaction undo log records.  The function
2545   trx_undo_page_fetch_ext() will write BLOB prefixes to the
2546   transaction undo log that are at least as long as the longest
2547   possible column prefix in a secondary index.  Thus, secondary
2548   index entries for *old_vers can be constructed without
2549   dereferencing any BLOB pointers. */
2550 
2551   ptr = trx_undo_rec_skip_row_ref(ptr, index);
2552 
2553   ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id, roll_ptr,
2554                                        info_bits, nullptr, heap, &update,
2555                                        lob_undo, type_cmpl);
2556   ut_a(ptr);
2557 
2558   if (row_upd_changes_field_size_or_external(index, offsets, update)) {
2559     /* We should confirm the existence of disowned external data,
2560     if the previous version record is delete marked. If the trx_id
2561     of the previous record is seen by purge view, we should treat
2562     it as missing history, because the disowned external data
2563     might be purged already.
2564 
2565     The inherited external data (BLOBs) can be freed (purged)
2566     after trx_id was committed, provided that no view was started
2567     before trx_id. If the purge view can see the committed
2568     delete-marked record by trx_id, no transactions need to access
2569     the BLOB. */
2570 
2571     /* the row_upd_changes_disowned_external(update) call could be
2572     omitted, but the synchronization on purge_sys->latch is likely
2573     more expensive. */
2574 
2575     if ((update->info_bits & REC_INFO_DELETED_FLAG) &&
2576         row_upd_changes_disowned_external(update)) {
2577       bool missing_extern;
2578 
2579       rw_lock_s_lock(&purge_sys->latch);
2580 
2581       missing_extern =
2582           purge_sys->view.changes_visible(trx_id, index->table->name);
2583 
2584       rw_lock_s_unlock(&purge_sys->latch);
2585 
2586       if (missing_extern) {
2587         /* treat as a fresh insert, not to
2588         cause assertion error at the caller. */
2589         return true;
2590       }
2591     }
2592 
2593     /* We have to set the appropriate extern storage bits in the
2594     old version of the record: the extern bits in rec for those
2595     fields that update does NOT update, as well as the bits for
2596     those fields that update updates to become externally stored
2597     fields. Store the info: */
2598 
2599     entry = row_rec_to_index_entry(rec, index, offsets, heap);
2600     /* The page containing the clustered index record
2601     corresponding to entry is latched in mtr.  Thus the
2602     following call is safe. */
2603     row_upd_index_replace_new_col_vals(entry, index, update, heap);
2604 
2605     buf = static_cast<byte *>(
2606         mem_heap_alloc(heap, rec_get_converted_size(index, entry)));
2607 
2608     *old_vers = rec_convert_dtuple_to_rec(buf, index, entry);
2609   } else {
2610     buf = static_cast<byte *>(mem_heap_alloc(heap, rec_offs_size(offsets)));
2611 
2612     *old_vers = rec_copy(buf, rec, offsets);
2613     rec_offs_make_valid(*old_vers, index, offsets);
2614     row_upd_rec_in_place(*old_vers, index, offsets, update, nullptr);
2615   }
2616 
2617   /* Set the old value (which is the after image of an update) in the
2618   update vector to dtuple vrow */
2619   if (v_status & TRX_UNDO_GET_OLD_V_VALUE) {
2620     row_upd_replace_vcol((dtuple_t *)*vrow, index->table, update, false,
2621                          nullptr, nullptr);
2622   }
2623 
2624 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2625   ut_a(!rec_offs_any_null_extern(
2626       *old_vers,
2627       rec_get_offsets(*old_vers, index, nullptr, ULINT_UNDEFINED, &heap)));
2628 #endif  // defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2629 
2630   /* If vrow is not NULL it means that the caller is interested in the values of
2631   the virtual columns for this version.
2632   If the UPD_NODE_NO_ORD_CHANGE flag is set on cmpl_info, it means that the
2633   change which created this entry in undo log did not affect any column of any
2634   secondary index (in particular: virtual), and thus the values of virtual
2635   columns were not recorded in undo. In such case the caller may assume that the
2636   values of (virtual) columns present in secondary index are exactly the same as
2637   they are in the next (more recent) version.
2638   If on the other hand the UPD_NODE_NO_ORD_CHANGE flag is not set, then we will
2639   make sure that *vrow points to a properly allocated memory and contains the
2640   values of virtual columns for this version recovered from undo log.
2641   This implies that if the caller has provided a non-NULL vrow, and the *vrow is
2642   still NULL after the call, (and old_vers is not NULL) it must be because the
2643   UPD_NODE_NO_ORD_CHANGE flag was set for this version.
2644   This last statement is an important assumption made by the
2645   row_vers_impl_x_locked_low() function. */
2646   if (vrow && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
2647     if (!(*vrow)) {
2648       *vrow = dtuple_create_with_vcol(v_heap ? v_heap : heap,
2649                                       index->table->get_n_cols(),
2650                                       dict_table_get_n_v_cols(index->table));
2651       dtuple_init_v_fld(*vrow);
2652     }
2653 
2654     ut_ad(index->table->n_v_cols);
2655     trx_undo_read_v_cols(index->table, ptr, *vrow,
2656                          v_status & TRX_UNDO_PREV_IN_PURGE, false, nullptr,
2657                          (v_heap != nullptr ? v_heap : heap));
2658   }
2659 
2660   if (update != nullptr) {
2661     update->reset();
2662   }
2663 
2664   return true;
2665 }
2666 
2667 /** Read virtual column value from undo log
2668 @param[in]	table		the table
2669 @param[in]	ptr		undo log pointer
2670 @param[in,out]	row		the row struct to fill
2671 @param[in]	in_purge	called by purge thread
2672 @param[in]	online		true if this is from online DDL log
2673 @param[in]	col_map		online rebuild column map
2674 @param[in,out]	heap		memory heap to keep value when necessary */
trx_undo_read_v_cols(const dict_table_t * table,const byte * ptr,const dtuple_t * row,bool in_purge,bool online,const ulint * col_map,mem_heap_t * heap)2675 void trx_undo_read_v_cols(const dict_table_t *table, const byte *ptr,
2676                           const dtuple_t *row, bool in_purge, bool online,
2677                           const ulint *col_map, mem_heap_t *heap) {
2678   const byte *end_ptr;
2679   bool first_v_col = true;
2680   bool is_undo_log = true;
2681 
2682   end_ptr = ptr + mach_read_from_2(ptr);
2683   ptr += 2;
2684   while (ptr < end_ptr) {
2685     dfield_t *dfield;
2686     dfield_t multi_value_field;
2687     const byte *field;
2688     ulint field_no;
2689     ulint len = 0;
2690     ulint orig_len = 0;
2691     bool is_virtual;
2692     dict_v_col_t *vcol = nullptr;
2693     ulint col_no;
2694 
2695     field_no = mach_read_next_compressed(const_cast<const byte **>(&ptr));
2696 
2697     is_virtual = (field_no >= REC_MAX_N_FIELDS);
2698 
2699     if (is_virtual) {
2700       ptr =
2701           trx_undo_read_v_idx(table, ptr, first_v_col, &is_undo_log, &field_no);
2702       first_v_col = false;
2703     }
2704 
2705     if (!is_virtual || field_no == ULINT_UNDEFINED) {
2706       /* The virtual column is no longer indexed or does not exist.
2707       "continue" needs to put after ptr gets advanced */
2708       if (trx_undo_rec_is_multi_value(ptr)) {
2709         ptr = trx_undo_rec_get_multi_value(ptr, nullptr, heap);
2710       } else {
2711         ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
2712       }
2713       continue;
2714     }
2715 
2716     vcol = dict_table_get_nth_v_col(table, field_no);
2717 
2718     if (!col_map) {
2719       col_no = vcol->v_pos;
2720     } else {
2721       col_no = col_map[vcol->v_pos];
2722     }
2723 
2724     if (col_no == ULINT_UNDEFINED) {
2725       if (trx_undo_rec_is_multi_value(ptr)) {
2726         ptr = trx_undo_rec_get_multi_value(ptr, nullptr, heap);
2727       } else {
2728         ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
2729       }
2730       continue;
2731     }
2732 
2733     dfield = dtuple_get_nth_v_field(row, col_no);
2734 
2735     if (trx_undo_rec_is_multi_value(ptr)) {
2736       ut_ad(vcol->m_col.is_multi_value());
2737       ptr = trx_undo_rec_get_multi_value(ptr, &multi_value_field, heap);
2738     } else {
2739       ut_ad(!vcol->m_col.is_multi_value());
2740       ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
2741     }
2742 
2743     if (!in_purge || dfield_get_type(dfield)->mtype == DATA_MISSING) {
2744       vcol->m_col.copy_type(dfield_get_type(dfield));
2745       if (online && !vcol->m_col.is_multi_value()) {
2746         dfield->adjust_v_data_mysql(vcol, dict_table_is_comp(table), field, len,
2747                                     heap);
2748       } else if (!vcol->m_col.is_multi_value()) {
2749         dfield_set_data(dfield, field, len);
2750       } else {
2751         dfield_copy_data(dfield, &multi_value_field);
2752       }
2753     }
2754   }
2755 
2756   ut_ad(ptr == end_ptr);
2757 }
2758 #endif /* !UNIV_HOTBACKUP */
2759