1 /*****************************************************************************
2
3 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0log.cc
29 Modification log for online index creation and online table rebuild
30
31 Created 2011-05-26 Marko Makela
32 *******************************************************/
33
34 #include "row0log.h"
35
36 #ifdef UNIV_NONINL
37 #include "row0log.ic"
38 #endif
39
40 #include "row0row.h"
41 #include "row0ins.h"
42 #include "row0upd.h"
43 #include "row0merge.h"
44 #include "row0ext.h"
45 #include "data0data.h"
46 #include "que0que.h"
47 #include "srv0mon.h"
48 #include "handler0alter.h"
49 #include "ut0new.h"
50 #include "ut0stage.h"
51 #include "trx0rec.h"
52
53 #include <algorithm>
54 #include <map>
55
56 /** Table row modification operations during online table rebuild.
57 Delete-marked records are not copied to the rebuilt table. */
58 enum row_tab_op {
59 /** Insert a record */
60 ROW_T_INSERT = 0x41,
61 /** Update a record in place */
62 ROW_T_UPDATE,
63 /** Delete (purge) a record */
64 ROW_T_DELETE
65 };
66
67 /** Index record modification operations during online index creation */
68 enum row_op {
69 /** Insert a record */
70 ROW_OP_INSERT = 0x61,
71 /** Delete a record */
72 ROW_OP_DELETE
73 };
74
75 /** Size of the modification log entry header, in bytes */
76 #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
77
78 /** Log block for modifications during online ALTER TABLE */
79 struct row_log_buf_t {
80 byte* block; /*!< file block buffer */
81 ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set
82 by ut_allocator::allocate_large() and fed to
83 ut_allocator::deallocate_large(). */
84 mrec_buf_t buf; /*!< buffer for accessing a record
85 that spans two blocks */
86 ulint blocks; /*!< current position in blocks */
87 ulint bytes; /*!< current position within block */
88 ulonglong total; /*!< logical position, in bytes from
89 the start of the row_log_table log;
90 0 for row_log_online_op() and
91 row_log_apply(). */
92 };
93
94 /** Tracks BLOB allocation during online ALTER TABLE */
95 class row_log_table_blob_t {
96 public:
97 /** Constructor (declaring a BLOB freed)
98 @param offset_arg row_log_t::tail::total */
99 #ifdef UNIV_DEBUG
row_log_table_blob_t(ulonglong offset_arg)100 row_log_table_blob_t(ulonglong offset_arg) :
101 old_offset (0), free_offset (offset_arg),
102 offset (BLOB_FREED) {}
103 #else /* UNIV_DEBUG */
104 row_log_table_blob_t() :
105 offset (BLOB_FREED) {}
106 #endif /* UNIV_DEBUG */
107
108 /** Declare a BLOB freed again.
109 @param offset_arg row_log_t::tail::total */
110 #ifdef UNIV_DEBUG
blob_free(ulonglong offset_arg)111 void blob_free(ulonglong offset_arg)
112 #else /* UNIV_DEBUG */
113 void blob_free()
114 #endif /* UNIV_DEBUG */
115 {
116 ut_ad(offset < offset_arg);
117 ut_ad(offset != BLOB_FREED);
118 ut_d(old_offset = offset);
119 ut_d(free_offset = offset_arg);
120 offset = BLOB_FREED;
121 }
122 /** Declare a freed BLOB reused.
123 @param offset_arg row_log_t::tail::total */
blob_alloc(ulonglong offset_arg)124 void blob_alloc(ulonglong offset_arg) {
125 ut_ad(free_offset <= offset_arg);
126 ut_d(old_offset = offset);
127 offset = offset_arg;
128 }
129 /** Determine if a BLOB was freed at a given log position
130 @param offset_arg row_log_t::head::total after the log record
131 @return true if freed */
is_freed(ulonglong offset_arg) const132 bool is_freed(ulonglong offset_arg) const {
133 /* This is supposed to be the offset at the end of the
134 current log record. */
135 ut_ad(offset_arg > 0);
136 /* We should never get anywhere close the magic value. */
137 ut_ad(offset_arg < BLOB_FREED);
138 return(offset_arg < offset);
139 }
140 private:
141 /** Magic value for a freed BLOB */
142 static const ulonglong BLOB_FREED = ~0ULL;
143 #ifdef UNIV_DEBUG
144 /** Old offset, in case a page was freed, reused, freed, ... */
145 ulonglong old_offset;
146 /** Offset of last blob_free() */
147 ulonglong free_offset;
148 #endif /* UNIV_DEBUG */
149 /** Byte offset to the log file */
150 ulonglong offset;
151 };
152
153 /** @brief Map of off-page column page numbers to 0 or log byte offsets.
154
155 If there is no mapping for a page number, it is safe to access.
156 If a page number maps to 0, it is an off-page column that has been freed.
157 If a page number maps to a nonzero number, the number is a byte offset
158 into the index->online_log, indicating that the page is safe to access
159 when applying log records starting from that offset. */
160 typedef std::map<
161 ulint,
162 row_log_table_blob_t,
163 std::less<ulint>,
164 ut_allocator<std::pair<const ulint, row_log_table_blob_t> > >
165 page_no_map;
166
167 /** @brief Buffer for logging modifications during online index creation
168
169 All modifications to an index that is being created will be logged by
170 row_log_online_op() to this buffer.
171
172 All modifications to a table that is being rebuilt will be logged by
173 row_log_table_delete(), row_log_table_update(), row_log_table_insert()
174 to this buffer.
175
176 When head.blocks == tail.blocks, the reader will access tail.block
177 directly. When also head.bytes == tail.bytes, both counts will be
178 reset to 0 and the file will be truncated. */
179 struct row_log_t {
180 int fd; /*!< file descriptor */
181 ib_mutex_t mutex; /*!< mutex protecting error,
182 max_trx and tail */
183 page_no_map* blobs; /*!< map of page numbers of off-page columns
184 that have been freed during table-rebuilding
185 ALTER TABLE (row_log_table_*); protected by
186 index->lock X-latch only */
187 dict_table_t* table; /*!< table that is being rebuilt,
188 or NULL when this is a secondary
189 index that is being created online */
190 bool same_pk;/*!< whether the definition of the PRIMARY KEY
191 has remained the same */
192 const dtuple_t* add_cols;
193 /*!< default values of added columns, or NULL */
194 const ulint* col_map;/*!< mapping of old column numbers to
195 new ones, or NULL if !table */
196 dberr_t error; /*!< error that occurred during online
197 table rebuild */
198 trx_id_t max_trx;/*!< biggest observed trx_id in
199 row_log_online_op();
200 protected by mutex and index->lock S-latch,
201 or by index->lock X-latch only */
202 row_log_buf_t tail; /*!< writer context;
203 protected by mutex and index->lock S-latch,
204 or by index->lock X-latch only */
205 row_log_buf_t head; /*!< reader context; protected by MDL only;
206 modifiable by row_log_apply_ops() */
207 ulint n_old_col;
208 /*!< number of non-virtual column in
209 old table */
210 ulint n_old_vcol;
211 /*!< number of virtual column in old table */
212 const char* path; /*!< where to create temporary file during
213 log operation */
214 };
215
216
217 /** Create the file or online log if it does not exist.
218 @param[in,out] log online rebuild log
219 @return true if success, false if not */
220 static MY_ATTRIBUTE((warn_unused_result))
221 int
row_log_tmpfile(row_log_t * log)222 row_log_tmpfile(
223 row_log_t* log)
224 {
225 DBUG_ENTER("row_log_tmpfile");
226 if (log->fd < 0) {
227 log->fd = row_merge_file_create_low(log->path);
228 DBUG_EXECUTE_IF("row_log_tmpfile_fail",
229 if (log->fd > 0)
230 row_merge_file_destroy_low(log->fd);
231 log->fd = -1;);
232 if (log->fd >= 0) {
233 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES);
234 }
235 }
236
237 DBUG_RETURN(log->fd);
238 }
239
240 /** Allocate the memory for the log buffer.
241 @param[in,out] log_buf Buffer used for log operation
242 @return TRUE if success, false if not */
243 static MY_ATTRIBUTE((warn_unused_result))
244 bool
row_log_block_allocate(row_log_buf_t & log_buf)245 row_log_block_allocate(
246 row_log_buf_t& log_buf)
247 {
248 DBUG_ENTER("row_log_block_allocate");
249 if (log_buf.block == NULL) {
250 DBUG_EXECUTE_IF(
251 "simulate_row_log_allocation_failure",
252 DBUG_RETURN(false);
253 );
254
255 log_buf.block = ut_allocator<byte>(mem_key_row_log_buf)
256 .allocate_large(srv_sort_buf_size, &log_buf.block_pfx);
257
258 if (log_buf.block == NULL) {
259 DBUG_RETURN(false);
260 }
261 }
262 DBUG_RETURN(true);
263 }
264
265 /** Free the log buffer.
266 @param[in,out] log_buf Buffer used for log operation */
267 static
268 void
row_log_block_free(row_log_buf_t & log_buf)269 row_log_block_free(
270 row_log_buf_t& log_buf)
271 {
272 DBUG_ENTER("row_log_block_free");
273 if (log_buf.block != NULL) {
274 ut_allocator<byte>(mem_key_row_log_buf).deallocate_large(
275 log_buf.block, &log_buf.block_pfx);
276 log_buf.block = NULL;
277 }
278 DBUG_VOID_RETURN;
279 }
280
281 /******************************************************//**
282 Logs an operation to a secondary index that is (or was) being created. */
283 void
row_log_online_op(dict_index_t * index,const dtuple_t * tuple,trx_id_t trx_id)284 row_log_online_op(
285 /*==============*/
286 dict_index_t* index, /*!< in/out: index, S or X latched */
287 const dtuple_t* tuple, /*!< in: index tuple */
288 trx_id_t trx_id) /*!< in: transaction ID for insert,
289 or 0 for delete */
290 {
291 byte* b;
292 ulint extra_size;
293 ulint size;
294 ulint mrec_size;
295 ulint avail_size;
296 row_log_t* log;
297
298 ut_ad(dtuple_validate(tuple));
299 ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
300 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
301 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
302
303 if (dict_index_is_corrupted(index)) {
304 return;
305 }
306
307 ut_ad(dict_index_is_online_ddl(index));
308
309 /* Compute the size of the record. This differs from
310 row_merge_buf_encode(), because here we do not encode
311 extra_size+1 (and reserve 0 as the end-of-chunk marker). */
312
313 size = rec_get_converted_size_temp(
314 index, tuple->fields, tuple->n_fields, NULL, &extra_size);
315 ut_ad(size >= extra_size);
316 ut_ad(size <= sizeof log->tail.buf);
317
318 mrec_size = ROW_LOG_HEADER_SIZE
319 + (extra_size >= 0x80) + size
320 + (trx_id ? DATA_TRX_ID_LEN : 0);
321
322 log = index->online_log;
323 mutex_enter(&log->mutex);
324
325 if (trx_id > log->max_trx) {
326 log->max_trx = trx_id;
327 }
328
329 if (!row_log_block_allocate(log->tail)) {
330 log->error = DB_OUT_OF_MEMORY;
331 goto err_exit;
332 }
333
334 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
335
336 ut_ad(log->tail.bytes < srv_sort_buf_size);
337 avail_size = srv_sort_buf_size - log->tail.bytes;
338
339 if (mrec_size > avail_size) {
340 b = log->tail.buf;
341 } else {
342 b = log->tail.block + log->tail.bytes;
343 }
344
345 if (trx_id != 0) {
346 *b++ = ROW_OP_INSERT;
347 trx_write_trx_id(b, trx_id);
348 b += DATA_TRX_ID_LEN;
349 } else {
350 *b++ = ROW_OP_DELETE;
351 }
352
353 if (extra_size < 0x80) {
354 *b++ = (byte) extra_size;
355 } else {
356 ut_ad(extra_size < 0x8000);
357 *b++ = (byte) (0x80 | (extra_size >> 8));
358 *b++ = (byte) extra_size;
359 }
360
361 rec_convert_dtuple_to_temp(
362 b + extra_size, index, tuple->fields, tuple->n_fields, NULL);
363 b += size;
364
365 if (mrec_size >= avail_size) {
366 dberr_t err;
367 IORequest request(IORequest::ROW_LOG | IORequest::WRITE);
368 const os_offset_t byte_offset
369 = (os_offset_t) log->tail.blocks
370 * srv_sort_buf_size;
371
372 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
373 goto write_failed;
374 }
375
376 if (mrec_size == avail_size) {
377 ut_ad(b == &log->tail.block[srv_sort_buf_size]);
378 } else {
379 ut_ad(b == log->tail.buf + mrec_size);
380 memcpy(log->tail.block + log->tail.bytes,
381 log->tail.buf, avail_size);
382 }
383
384 UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
385
386 if (row_log_tmpfile(log) < 0) {
387 log->error = DB_OUT_OF_MEMORY;
388 goto err_exit;
389 }
390
391 err = os_file_write_int_fd(
392 request,
393 "(modification log)",
394 log->fd,
395 log->tail.block, byte_offset, srv_sort_buf_size);
396
397 log->tail.blocks++;
398 if (err != DB_SUCCESS) {
399 write_failed:
400 /* We set the flag directly instead of invoking
401 dict_set_corrupted_index_cache_only(index) here,
402 because the index is not "public" yet. */
403 index->type |= DICT_CORRUPT;
404 }
405 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
406 memcpy(log->tail.block, log->tail.buf + avail_size,
407 mrec_size - avail_size);
408 log->tail.bytes = mrec_size - avail_size;
409 } else {
410 log->tail.bytes += mrec_size;
411 ut_ad(b == log->tail.block + log->tail.bytes);
412 }
413
414 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
415 err_exit:
416 mutex_exit(&log->mutex);
417 }
418
419 /******************************************************//**
420 Gets the error status of the online index rebuild log.
421 @return DB_SUCCESS or error code */
422 dberr_t
row_log_table_get_error(const dict_index_t * index)423 row_log_table_get_error(
424 /*====================*/
425 const dict_index_t* index) /*!< in: clustered index of a table
426 that is being rebuilt online */
427 {
428 ut_ad(dict_index_is_clust(index));
429 ut_ad(dict_index_is_online_ddl(index));
430 return(index->online_log->error);
431 }
432
433 /******************************************************//**
434 Starts logging an operation to a table that is being rebuilt.
435 @return pointer to log, or NULL if no logging is necessary */
436 static MY_ATTRIBUTE((nonnull, warn_unused_result))
437 byte*
row_log_table_open(row_log_t * log,ulint size,ulint * avail)438 row_log_table_open(
439 /*===============*/
440 row_log_t* log, /*!< in/out: online rebuild log */
441 ulint size, /*!< in: size of log record */
442 ulint* avail) /*!< out: available size for log record */
443 {
444 mutex_enter(&log->mutex);
445
446 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
447
448 if (log->error != DB_SUCCESS) {
449 err_exit:
450 mutex_exit(&log->mutex);
451 return(NULL);
452 }
453
454 if (!row_log_block_allocate(log->tail)) {
455 log->error = DB_OUT_OF_MEMORY;
456 goto err_exit;
457 }
458
459 ut_ad(log->tail.bytes < srv_sort_buf_size);
460 *avail = srv_sort_buf_size - log->tail.bytes;
461
462 if (size > *avail) {
463 return(log->tail.buf);
464 } else {
465 return(log->tail.block + log->tail.bytes);
466 }
467 }
468
469 /******************************************************//**
470 Stops logging an operation to a table that is being rebuilt. */
471 static MY_ATTRIBUTE((nonnull))
472 void
row_log_table_close_func(row_log_t * log,const byte * b,ulint size,ulint avail)473 row_log_table_close_func(
474 /*=====================*/
475 row_log_t* log, /*!< in/out: online rebuild log */
476 #ifdef UNIV_DEBUG
477 const byte* b, /*!< in: end of log record */
478 #endif /* UNIV_DEBUG */
479 ulint size, /*!< in: size of log record */
480 ulint avail) /*!< in: available size for log record */
481 {
482 ut_ad(mutex_own(&log->mutex));
483
484 if (size >= avail) {
485 dberr_t err;
486 IORequest request(IORequest::ROW_LOG | IORequest::WRITE);
487
488 const os_offset_t byte_offset
489 = (os_offset_t) log->tail.blocks
490 * srv_sort_buf_size;
491
492 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
493 goto write_failed;
494 }
495
496 if (size == avail) {
497 ut_ad(b == &log->tail.block[srv_sort_buf_size]);
498 } else {
499 ut_ad(b == log->tail.buf + size);
500 memcpy(log->tail.block + log->tail.bytes,
501 log->tail.buf, avail);
502 }
503
504 UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
505
506 if (row_log_tmpfile(log) < 0) {
507 log->error = DB_OUT_OF_MEMORY;
508 goto err_exit;
509 }
510
511 err = os_file_write_int_fd(
512 request,
513 "(modification log)",
514 log->fd,
515 log->tail.block, byte_offset, srv_sort_buf_size);
516
517 log->tail.blocks++;
518 if (err != DB_SUCCESS) {
519 write_failed:
520 log->error = DB_ONLINE_LOG_TOO_BIG;
521 }
522 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
523 memcpy(log->tail.block, log->tail.buf + avail, size - avail);
524 log->tail.bytes = size - avail;
525 } else {
526 log->tail.bytes += size;
527 ut_ad(b == log->tail.block + log->tail.bytes);
528 }
529
530 log->tail.total += size;
531 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
532 err_exit:
533 mutex_exit(&log->mutex);
534 }
535
536 #ifdef UNIV_DEBUG
537 # define row_log_table_close(log, b, size, avail) \
538 row_log_table_close_func(log, b, size, avail)
539 #else /* UNIV_DEBUG */
540 # define row_log_table_close(log, b, size, avail) \
541 row_log_table_close_func(log, size, avail)
542 #endif /* UNIV_DEBUG */
543
544 /** Check whether a virtual column is indexed in the new table being
545 created during alter table
546 @param[in] index cluster index
547 @param[in] v_no virtual column number
548 @return true if it is indexed, else false */
549 bool
row_log_col_is_indexed(const dict_index_t * index,ulint v_no)550 row_log_col_is_indexed(
551 const dict_index_t* index,
552 ulint v_no)
553 {
554 return(dict_table_get_nth_v_col(
555 index->online_log->table, v_no)->m_col.ord_part);
556 }
557
558 /******************************************************//**
559 Logs a delete operation to a table that is being rebuilt.
560 This will be merged in row_log_table_apply_delete(). */
561 void
row_log_table_delete(const rec_t * rec,const dtuple_t * ventry,dict_index_t * index,const ulint * offsets,const byte * sys)562 row_log_table_delete(
563 /*=================*/
564 const rec_t* rec, /*!< in: clustered index leaf page record,
565 page X-latched */
566 const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */
567 dict_index_t* index, /*!< in/out: clustered index, S-latched
568 or X-latched */
569 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
570 const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should
571 be logged, or NULL to use those in rec */
572 {
573 ulint old_pk_extra_size;
574 ulint old_pk_size;
575 ulint ext_size = 0;
576 ulint mrec_size;
577 ulint avail_size;
578 mem_heap_t* heap = NULL;
579 const dtuple_t* old_pk;
580 row_ext_t* ext;
581
582 ut_ad(dict_index_is_clust(index));
583 ut_ad(rec_offs_validate(rec, index, offsets));
584 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
585 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
586 ut_ad(rw_lock_own_flagged(
587 &index->lock,
588 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
589
590 if (dict_index_is_corrupted(index)
591 || !dict_index_is_online_ddl(index)
592 || index->online_log->error != DB_SUCCESS) {
593 return;
594 }
595
596 dict_table_t* new_table = index->online_log->table;
597 dict_index_t* new_index = dict_table_get_first_index(new_table);
598
599 ut_ad(dict_index_is_clust(new_index));
600 ut_ad(!dict_index_is_online_ddl(new_index));
601
602 /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
603 if (index->online_log->same_pk) {
604 dtuple_t* tuple;
605 ut_ad(new_index->n_uniq == index->n_uniq);
606
607 /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first
608 fields of the record. */
609 heap = mem_heap_create(
610 DATA_TRX_ID_LEN
611 + DTUPLE_EST_ALLOC(new_index->n_uniq + 2));
612 old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 2);
613 dict_index_copy_types(tuple, new_index, tuple->n_fields);
614 dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
615
616 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
617 ulint len;
618 const void* field = rec_get_nth_field(
619 rec, offsets, i, &len);
620 dfield_t* dfield = dtuple_get_nth_field(
621 tuple, i);
622 ut_ad(len != UNIV_SQL_NULL);
623 ut_ad(!rec_offs_nth_extern(offsets, i));
624 dfield_set_data(dfield, field, len);
625 }
626
627 if (sys) {
628 dfield_set_data(
629 dtuple_get_nth_field(tuple,
630 new_index->n_uniq),
631 sys, DATA_TRX_ID_LEN);
632 dfield_set_data(
633 dtuple_get_nth_field(tuple,
634 new_index->n_uniq + 1),
635 sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
636 }
637 } else {
638 /* The PRIMARY KEY has changed. Translate the tuple. */
639 old_pk = row_log_table_get_pk(
640 rec, index, offsets, NULL, &heap);
641
642 if (!old_pk) {
643 ut_ad(index->online_log->error != DB_SUCCESS);
644 if (heap) {
645 goto func_exit;
646 }
647 return;
648 }
649 }
650
651 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
652 old_pk, old_pk->n_fields - 2)->len);
653 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
654 old_pk, old_pk->n_fields - 1)->len);
655 old_pk_size = rec_get_converted_size_temp(
656 new_index, old_pk->fields, old_pk->n_fields, NULL,
657 &old_pk_extra_size);
658 ut_ad(old_pk_extra_size < 0x100);
659
660 mrec_size = 6 + old_pk_size;
661
662 /* Log enough prefix of the BLOB unless both the
663 old and new table are in COMPACT or REDUNDANT format,
664 which store the prefix in the clustered index record. */
665 if (rec_offs_any_extern(offsets)
666 && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
667 || dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
668
669 /* Build a cache of those off-page column prefixes
670 that are referenced by secondary indexes. It can be
671 that none of the off-page columns are needed. */
672 row_build(ROW_COPY_DATA, index, rec,
673 offsets, NULL, NULL, NULL, &ext, heap);
674 if (ext) {
675 /* Log the row_ext_t, ext->ext and ext->buf */
676 ext_size = ext->n_ext * ext->max_len
677 + sizeof(*ext)
678 + ext->n_ext * sizeof(ulint)
679 + (ext->n_ext - 1) * sizeof ext->len;
680 mrec_size += ext_size;
681 }
682 }
683
684 /* Check if we need to log virtual column data */
685 if (ventry->n_v_fields > 0) {
686 ulint v_extra;
687 mrec_size += rec_get_converted_size_temp(
688 new_index, NULL, 0, ventry, &v_extra);
689 }
690
691 if (byte* b = row_log_table_open(index->online_log,
692 mrec_size, &avail_size)) {
693 *b++ = ROW_T_DELETE;
694 *b++ = static_cast<byte>(old_pk_extra_size);
695
696 /* Log the size of external prefix we saved */
697 mach_write_to_4(b, ext_size);
698 b += 4;
699
700 rec_convert_dtuple_to_temp(
701 b + old_pk_extra_size, new_index,
702 old_pk->fields, old_pk->n_fields, NULL);
703
704 b += old_pk_size;
705
706 if (ext_size) {
707 ulint cur_ext_size = sizeof(*ext)
708 + (ext->n_ext - 1) * sizeof ext->len;
709
710 memcpy(b, ext, cur_ext_size);
711 b += cur_ext_size;
712
713 /* Check if we need to col_map to adjust the column
714 number. If columns were added/removed/reordered,
715 adjust the column number. */
716 if (const ulint* col_map =
717 index->online_log->col_map) {
718 for (ulint i = 0; i < ext->n_ext; i++) {
719 const_cast<ulint&>(ext->ext[i]) =
720 col_map[ext->ext[i]];
721 }
722 }
723
724 memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
725 b += ext->n_ext * sizeof(*ext->ext);
726
727 ext_size -= cur_ext_size
728 + ext->n_ext * sizeof(*ext->ext);
729 memcpy(b, ext->buf, ext_size);
730 b += ext_size;
731 }
732
733 /* log virtual columns */
734 if (ventry->n_v_fields > 0) {
735 rec_convert_dtuple_to_temp(
736 b, new_index, NULL, 0, ventry);
737 b += mach_read_from_2(b);
738 }
739
740 row_log_table_close(
741 index->online_log, b, mrec_size, avail_size);
742 }
743
744 func_exit:
745 mem_heap_free(heap);
746 }
747
748 /******************************************************//**
749 Logs an insert or update to a table that is being rebuilt. */
750 static
751 void
row_log_table_low_redundant(const rec_t * rec,const dtuple_t * ventry,const dtuple_t * o_ventry,dict_index_t * index,bool insert,const dtuple_t * old_pk,const dict_index_t * new_index)752 row_log_table_low_redundant(
753 /*========================*/
754 const rec_t* rec, /*!< in: clustered index leaf
755 page record in ROW_FORMAT=REDUNDANT,
756 page X-latched */
757 const dtuple_t* ventry, /*!< in: dtuple holding virtual
758 column info or NULL */
759 const dtuple_t* o_ventry,/*!< in: old dtuple holding virtual
760 column info or NULL */
761 dict_index_t* index, /*!< in/out: clustered index, S-latched
762 or X-latched */
763 bool insert, /*!< in: true if insert,
764 false if update */
765 const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value
766 (if !insert and a PRIMARY KEY
767 is being created) */
768 const dict_index_t* new_index)
769 /*!< in: clustered index of the
770 new table, not latched */
771 {
772 ulint old_pk_size;
773 ulint old_pk_extra_size;
774 ulint size;
775 ulint extra_size;
776 ulint mrec_size;
777 ulint avail_size;
778 mem_heap_t* heap = NULL;
779 dtuple_t* tuple;
780 ulint num_v = ventry ? dtuple_get_n_v_fields(ventry) : 0;
781
782 ut_ad(!page_is_comp(page_align(rec)));
783 ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
784 ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2));
785 ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */
786 ut_ad(dict_index_is_clust(new_index));
787
788 heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
789 tuple = dtuple_create_with_vcol(heap, index->n_fields, num_v);
790 dict_index_copy_types(tuple, index, index->n_fields);
791
792 if (num_v) {
793 dict_table_copy_v_types(tuple, index->table);
794 }
795
796 dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
797
798 if (rec_get_1byte_offs_flag(rec)) {
799 for (ulint i = 0; i < index->n_fields; i++) {
800 dfield_t* dfield;
801 ulint len;
802 const void* field;
803
804 dfield = dtuple_get_nth_field(tuple, i);
805 field = rec_get_nth_field_old(rec, i, &len);
806
807 dfield_set_data(dfield, field, len);
808 }
809 } else {
810 for (ulint i = 0; i < index->n_fields; i++) {
811 dfield_t* dfield;
812 ulint len;
813 const void* field;
814
815 dfield = dtuple_get_nth_field(tuple, i);
816 field = rec_get_nth_field_old(rec, i, &len);
817
818 dfield_set_data(dfield, field, len);
819
820 if (rec_2_is_field_extern(rec, i)) {
821 dfield_set_ext(dfield);
822 }
823 }
824 }
825
826 size = rec_get_converted_size_temp(
827 index, tuple->fields, tuple->n_fields, ventry, &extra_size);
828
829 mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
830
831 if (num_v) {
832 if (o_ventry) {
833 ulint v_extra = 0;
834 mrec_size += rec_get_converted_size_temp(
835 index, NULL, 0, o_ventry, &v_extra);
836 }
837 } else if (index->table->n_v_cols) {
838 mrec_size += 2;
839 }
840
841 if (insert || index->online_log->same_pk) {
842 ut_ad(!old_pk);
843 old_pk_extra_size = old_pk_size = 0;
844 } else {
845 ut_ad(old_pk);
846 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
847 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
848 old_pk, old_pk->n_fields - 2)->len);
849 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
850 old_pk, old_pk->n_fields - 1)->len);
851
852 old_pk_size = rec_get_converted_size_temp(
853 new_index, old_pk->fields, old_pk->n_fields,
854 NULL, &old_pk_extra_size);
855 ut_ad(old_pk_extra_size < 0x100);
856 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
857 }
858
859 if (byte* b = row_log_table_open(index->online_log,
860 mrec_size, &avail_size)) {
861 *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
862
863 if (old_pk_size) {
864 *b++ = static_cast<byte>(old_pk_extra_size);
865
866 rec_convert_dtuple_to_temp(
867 b + old_pk_extra_size, new_index,
868 old_pk->fields, old_pk->n_fields,
869 ventry);
870 b += old_pk_size;
871 }
872
873 if (extra_size < 0x80) {
874 *b++ = static_cast<byte>(extra_size);
875 } else {
876 ut_ad(extra_size < 0x8000);
877 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
878 *b++ = static_cast<byte>(extra_size);
879 }
880
881 rec_convert_dtuple_to_temp(
882 b + extra_size, index, tuple->fields, tuple->n_fields,
883 ventry);
884 b += size;
885
886 if (num_v) {
887 if (o_ventry) {
888 rec_convert_dtuple_to_temp(
889 b, new_index, NULL, 0, o_ventry);
890 b += mach_read_from_2(b);
891 }
892 } else if (index->table->n_v_cols) {
893 /* The table contains virtual columns, but nothing
894 has changed for them, so just mark a 2 bytes length
895 field */
896 mach_write_to_2(b, 2);
897 b += 2;
898 }
899
900 row_log_table_close(
901 index->online_log, b, mrec_size, avail_size);
902 }
903
904 mem_heap_free(heap);
905 }
906
907 /******************************************************//**
908 Logs an insert or update to a table that is being rebuilt. */
909 static
910 void
row_log_table_low(const rec_t * rec,const dtuple_t * ventry,const dtuple_t * o_ventry,dict_index_t * index,const ulint * offsets,bool insert,const dtuple_t * old_pk)911 row_log_table_low(
912 /*==============*/
913 const rec_t* rec, /*!< in: clustered index leaf page record,
914 page X-latched */
915 const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */
916 const dtuple_t* o_ventry,/*!< in: dtuple holding old virtual column
917 info */
918 dict_index_t* index, /*!< in/out: clustered index, S-latched
919 or X-latched */
920 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
921 bool insert, /*!< in: true if insert, false if update */
922 const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert
923 and a PRIMARY KEY is being created) */
924 {
925 ulint omit_size;
926 ulint old_pk_size;
927 ulint old_pk_extra_size;
928 ulint extra_size;
929 ulint mrec_size;
930 ulint avail_size;
931 const dict_index_t* new_index;
932
933 new_index = dict_table_get_first_index(index->online_log->table);
934
935 ut_ad(dict_index_is_clust(index));
936 ut_ad(dict_index_is_clust(new_index));
937 ut_ad(!dict_index_is_online_ddl(new_index));
938 ut_ad(rec_offs_validate(rec, index, offsets));
939 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
940 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
941 ut_ad(rw_lock_own_flagged(
942 &index->lock,
943 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
944 ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
945 ut_ad(page_is_leaf(page_align(rec)));
946 ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
947 /* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix
948 of the clustered index record (PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR),
949 with no information on virtual columns */
950 ut_ad(!old_pk || !insert);
951 ut_ad(!old_pk || old_pk->n_v_fields == 0);
952 ut_ad(!o_ventry || !insert);
953 ut_ad(!o_ventry || ventry);
954
955 if (dict_index_is_corrupted(index)
956 || !dict_index_is_online_ddl(index)
957 || index->online_log->error != DB_SUCCESS) {
958 return;
959 }
960
961 if (!rec_offs_comp(offsets)) {
962 row_log_table_low_redundant(
963 rec, ventry, o_ventry, index, insert,
964 old_pk, new_index);
965 return;
966 }
967
968 ut_ad(page_is_comp(page_align(rec)));
969 ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
970
971 omit_size = REC_N_NEW_EXTRA_BYTES;
972
973 extra_size = rec_offs_extra_size(offsets) - omit_size;
974
975 mrec_size = ROW_LOG_HEADER_SIZE
976 + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
977
978 if (ventry && ventry->n_v_fields > 0) {
979 ulint v_extra = 0;
980 uint64_t rec_size = rec_get_converted_size_temp(
981 new_index, NULL, 0, ventry, &v_extra);
982
983 mrec_size += rec_size;
984
985 /* If there is actually nothing to be logged for new entry,
986 then there must be also nothing to do with old entry.
987 In this case, make it same with the case below, by only keep
988 2 bytes length marker */
989 if (rec_size > 2 && o_ventry != NULL) {
990 mrec_size += rec_get_converted_size_temp(
991 new_index, NULL, 0, o_ventry, &v_extra);
992 }
993 } else if (index->table->n_v_cols) {
994 /* Always leave 2 bytes length marker for virtual column
995 data logging even if there is none of them is indexed if table
996 has virtual columns */
997 mrec_size += 2;
998 }
999
1000 if (insert || index->online_log->same_pk) {
1001 ut_ad(!old_pk);
1002 old_pk_extra_size = old_pk_size = 0;
1003 } else {
1004 ut_ad(old_pk);
1005 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
1006 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
1007 old_pk, old_pk->n_fields - 2)->len);
1008 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
1009 old_pk, old_pk->n_fields - 1)->len);
1010
1011 old_pk_size = rec_get_converted_size_temp(
1012 new_index, old_pk->fields, old_pk->n_fields,
1013 NULL, &old_pk_extra_size);
1014 ut_ad(old_pk_extra_size < 0x100);
1015 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
1016 }
1017
1018 if (byte* b = row_log_table_open(index->online_log,
1019 mrec_size, &avail_size)) {
1020 *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
1021
1022 if (old_pk_size) {
1023 *b++ = static_cast<byte>(old_pk_extra_size);
1024
1025 rec_convert_dtuple_to_temp(
1026 b + old_pk_extra_size, new_index,
1027 old_pk->fields, old_pk->n_fields,
1028 NULL);
1029 b += old_pk_size;
1030 }
1031
1032 if (extra_size < 0x80) {
1033 *b++ = static_cast<byte>(extra_size);
1034 } else {
1035 ut_ad(extra_size < 0x8000);
1036 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
1037 *b++ = static_cast<byte>(extra_size);
1038 }
1039
1040 memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
1041 b += extra_size;
1042 memcpy(b, rec, rec_offs_data_size(offsets));
1043 b += rec_offs_data_size(offsets);
1044
1045 if (ventry && ventry->n_v_fields > 0) {
1046 uint64_t new_v_size;
1047
1048 rec_convert_dtuple_to_temp(
1049 b, new_index, NULL, 0, ventry);
1050 new_v_size = mach_read_from_2(b);
1051 b += new_v_size;
1052
1053 /* Nothing for new entry to be logged,
1054 skip the old one too. */
1055 if (new_v_size != 2 && o_ventry != NULL) {
1056 rec_convert_dtuple_to_temp(
1057 b, new_index, NULL, 0, o_ventry);
1058 b += mach_read_from_2(b);
1059 }
1060 } else if (index->table->n_v_cols) {
1061 /* The table contains virtual columns, but nothing
1062 has changed for them, so just mark a 2 bytes length
1063 field */
1064 mach_write_to_2(b, 2);
1065 b += 2;
1066 }
1067
1068 row_log_table_close(
1069 index->online_log, b, mrec_size, avail_size);
1070 }
1071 }
1072
1073 /******************************************************//**
1074 Logs an update to a table that is being rebuilt.
1075 This will be merged in row_log_table_apply_update(). */
1076 void
row_log_table_update(const rec_t * rec,dict_index_t * index,const ulint * offsets,const dtuple_t * old_pk,const dtuple_t * new_v_row,const dtuple_t * old_v_row)1077 row_log_table_update(
1078 /*=================*/
1079 const rec_t* rec, /*!< in: clustered index leaf page record,
1080 page X-latched */
1081 dict_index_t* index, /*!< in/out: clustered index, S-latched
1082 or X-latched */
1083 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
1084 const dtuple_t* old_pk, /*!< in: row_log_table_get_pk()
1085 before the update */
1086 const dtuple_t* new_v_row,/*!< in: dtuple contains the new virtual
1087 columns */
1088 const dtuple_t* old_v_row)/*!< in: dtuple contains the old virtual
1089 columns */
1090 {
1091 row_log_table_low(rec, new_v_row, old_v_row, index, offsets,
1092 false, old_pk);
1093 }
1094
1095 /** Gets the old table column of a PRIMARY KEY column.
1096 @param table old table (before ALTER TABLE)
1097 @param col_map mapping of old column numbers to new ones
1098 @param col_no column position in the new table
1099 @return old table column, or NULL if this is an added column */
1100 static
1101 const dict_col_t*
row_log_table_get_pk_old_col(const dict_table_t * table,const ulint * col_map,ulint col_no)1102 row_log_table_get_pk_old_col(
1103 /*=========================*/
1104 const dict_table_t* table,
1105 const ulint* col_map,
1106 ulint col_no)
1107 {
1108 for (ulint i = 0; i < table->n_cols; i++) {
1109 if (col_no == col_map[i]) {
1110 return(dict_table_get_nth_col(table, i));
1111 }
1112 }
1113
1114 return(NULL);
1115 }
1116
1117 /** Maps an old table column of a PRIMARY KEY column.
1118 @param[in] col old table column (before ALTER TABLE)
1119 @param[in] ifield clustered index field in the new table (after
1120 ALTER TABLE)
1121 @param[in,out] dfield clustered index tuple field in the new table
1122 @param[in,out] heap memory heap for allocating dfield contents
1123 @param[in] rec clustered index leaf page record in the old
1124 table
1125 @param[in] offsets rec_get_offsets(rec)
1126 @param[in] i rec field corresponding to col
1127 @param[in] page_size page size of the old table
1128 @param[in] max_len maximum length of dfield
1129 @retval DB_INVALID_NULL if a NULL value is encountered
1130 @retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */
1131 static
1132 dberr_t
row_log_table_get_pk_col(const dict_col_t * col,const dict_field_t * ifield,dfield_t * dfield,mem_heap_t * heap,const rec_t * rec,const ulint * offsets,ulint i,const page_size_t & page_size,ulint max_len)1133 row_log_table_get_pk_col(
1134 const dict_col_t* col,
1135 const dict_field_t* ifield,
1136 dfield_t* dfield,
1137 mem_heap_t* heap,
1138 const rec_t* rec,
1139 const ulint* offsets,
1140 ulint i,
1141 const page_size_t& page_size,
1142 ulint max_len)
1143 {
1144 const byte* field;
1145 ulint len;
1146
1147 field = rec_get_nth_field(rec, offsets, i, &len);
1148
1149 if (len == UNIV_SQL_NULL) {
1150 return(DB_INVALID_NULL);
1151 }
1152
1153 if (rec_offs_nth_extern(offsets, i)) {
1154 ulint field_len = ifield->prefix_len;
1155 byte* blob_field;
1156
1157 if (!field_len) {
1158 field_len = ifield->fixed_len;
1159 if (!field_len) {
1160 field_len = max_len + 1;
1161 }
1162 }
1163
1164 blob_field = static_cast<byte*>(
1165 mem_heap_alloc(heap, field_len));
1166
1167 len = btr_copy_externally_stored_field_prefix(
1168 blob_field, field_len, page_size, field, len);
1169 if (len >= max_len + 1) {
1170 return(DB_TOO_BIG_INDEX_COL);
1171 }
1172
1173 dfield_set_data(dfield, blob_field, len);
1174 } else {
1175 dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
1176 }
1177
1178 return(DB_SUCCESS);
1179 }
1180
1181 /******************************************************//**
1182 Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
1183 of a table that is being rebuilt.
1184 @return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
1185 or NULL if the PRIMARY KEY definition does not change */
1186 const dtuple_t*
row_log_table_get_pk(const rec_t * rec,dict_index_t * index,const ulint * offsets,byte * sys,mem_heap_t ** heap)1187 row_log_table_get_pk(
1188 /*=================*/
1189 const rec_t* rec, /*!< in: clustered index leaf page record,
1190 page X-latched */
1191 dict_index_t* index, /*!< in/out: clustered index, S-latched
1192 or X-latched */
1193 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
1194 byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for
1195 row_log_table_delete(), or NULL */
1196 mem_heap_t** heap) /*!< in/out: memory heap where allocated */
1197 {
1198 dtuple_t* tuple = NULL;
1199 row_log_t* log = index->online_log;
1200
1201 ut_ad(dict_index_is_clust(index));
1202 ut_ad(dict_index_is_online_ddl(index));
1203 ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
1204 ut_ad(rw_lock_own_flagged(
1205 &index->lock,
1206 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1207
1208 ut_ad(log);
1209 ut_ad(log->table);
1210
1211 if (log->same_pk) {
1212 /* The PRIMARY KEY columns are unchanged. */
1213 if (sys) {
1214 /* Store the DB_TRX_ID,DB_ROLL_PTR. */
1215 ulint trx_id_offs = index->trx_id_offset;
1216
1217 if (!trx_id_offs) {
1218 ulint pos = dict_index_get_sys_col_pos(
1219 index, DATA_TRX_ID);
1220 ulint len;
1221 ut_ad(pos > 0);
1222
1223 if (!offsets) {
1224 offsets = rec_get_offsets(
1225 rec, index, NULL, pos + 1,
1226 heap);
1227 }
1228
1229 trx_id_offs = rec_get_nth_field_offs(
1230 offsets, pos, &len);
1231 ut_ad(len == DATA_TRX_ID_LEN);
1232 }
1233
1234 memcpy(sys, rec + trx_id_offs,
1235 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1236 }
1237
1238 return(NULL);
1239 }
1240
1241 mutex_enter(&log->mutex);
1242
1243 /* log->error is protected by log->mutex. */
1244 if (log->error == DB_SUCCESS) {
1245 dict_table_t* new_table = log->table;
1246 dict_index_t* new_index
1247 = dict_table_get_first_index(new_table);
1248 const ulint new_n_uniq
1249 = dict_index_get_n_unique(new_index);
1250
1251 if (!*heap) {
1252 ulint size = 0;
1253
1254 if (!offsets) {
1255 size += (1 + REC_OFFS_HEADER_SIZE
1256 + index->n_fields)
1257 * sizeof *offsets;
1258 }
1259
1260 for (ulint i = 0; i < new_n_uniq; i++) {
1261 size += dict_col_get_min_size(
1262 dict_index_get_nth_col(new_index, i));
1263 }
1264
1265 *heap = mem_heap_create(
1266 DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1267 }
1268
1269 if (!offsets) {
1270 offsets = rec_get_offsets(rec, index, NULL,
1271 ULINT_UNDEFINED, heap);
1272 }
1273
1274 tuple = dtuple_create(*heap, new_n_uniq + 2);
1275 dict_index_copy_types(tuple, new_index, tuple->n_fields);
1276 dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1277
1278 const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1279
1280 const page_size_t& page_size
1281 = dict_table_page_size(index->table);
1282
1283 for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1284 dict_field_t* ifield;
1285 dfield_t* dfield;
1286 ulint prtype;
1287 ulint mbminmaxlen;
1288
1289 ifield = dict_index_get_nth_field(new_index, new_i);
1290 dfield = dtuple_get_nth_field(tuple, new_i);
1291
1292 const ulint col_no
1293 = dict_field_get_col(ifield)->ind;
1294
1295 if (const dict_col_t* col
1296 = row_log_table_get_pk_old_col(
1297 index->table, log->col_map, col_no)) {
1298 ulint i = dict_col_get_clust_pos(col, index);
1299
1300 if (i == ULINT_UNDEFINED) {
1301 ut_ad(0);
1302 log->error = DB_CORRUPTION;
1303 goto err_exit;
1304 }
1305
1306 log->error = row_log_table_get_pk_col(
1307 col, ifield, dfield, *heap,
1308 rec, offsets, i, page_size, max_len);
1309
1310 if (log->error != DB_SUCCESS) {
1311 err_exit:
1312 tuple = NULL;
1313 goto func_exit;
1314 }
1315
1316 mbminmaxlen = col->mbminmaxlen;
1317 prtype = col->prtype;
1318 } else {
1319 /* No matching column was found in the old
1320 table, so this must be an added column.
1321 Copy the default value. */
1322 ut_ad(log->add_cols);
1323
1324 dfield_copy(dfield, dtuple_get_nth_field(
1325 log->add_cols, col_no));
1326 mbminmaxlen = dfield->type.mbminmaxlen;
1327 prtype = dfield->type.prtype;
1328 }
1329
1330 ut_ad(!dfield_is_ext(dfield));
1331 ut_ad(!dfield_is_null(dfield));
1332
1333 if (ifield->prefix_len) {
1334 ulint len = dtype_get_at_most_n_mbchars(
1335 prtype, mbminmaxlen,
1336 ifield->prefix_len,
1337 dfield_get_len(dfield),
1338 static_cast<const char*>(
1339 dfield_get_data(dfield)));
1340
1341 ut_ad(len <= dfield_get_len(dfield));
1342 dfield_set_len(dfield, len);
1343 }
1344 }
1345
1346 const byte* trx_roll = rec
1347 + row_get_trx_id_offset(index, offsets);
1348
1349 /* Copy the fields, because the fields will be updated
1350 or the record may be moved somewhere else in the B-tree
1351 as part of the upcoming operation. */
1352 if (sys) {
1353 memcpy(sys, trx_roll,
1354 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1355 trx_roll = sys;
1356 } else {
1357 trx_roll = static_cast<const byte*>(
1358 mem_heap_dup(
1359 *heap, trx_roll,
1360 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
1361 }
1362
1363 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1364 trx_roll, DATA_TRX_ID_LEN);
1365 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1366 trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1367 }
1368
1369 func_exit:
1370 mutex_exit(&log->mutex);
1371 return(tuple);
1372 }
1373
1374 /******************************************************//**
1375 Logs an insert to a table that is being rebuilt.
1376 This will be merged in row_log_table_apply_insert(). */
1377 void
row_log_table_insert(const rec_t * rec,const dtuple_t * ventry,dict_index_t * index,const ulint * offsets)1378 row_log_table_insert(
1379 /*=================*/
1380 const rec_t* rec, /*!< in: clustered index leaf page record,
1381 page X-latched */
1382 const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */
1383 dict_index_t* index, /*!< in/out: clustered index, S-latched
1384 or X-latched */
1385 const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */
1386 {
1387 row_log_table_low(rec, ventry, NULL, index, offsets, true, NULL);
1388 }
1389
1390 /******************************************************//**
1391 Notes that a BLOB is being freed during online ALTER TABLE. */
1392 void
row_log_table_blob_free(dict_index_t * index,ulint page_no)1393 row_log_table_blob_free(
1394 /*====================*/
1395 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1396 ulint page_no)/*!< in: starting page number of the BLOB */
1397 {
1398 ut_ad(dict_index_is_clust(index));
1399 ut_ad(dict_index_is_online_ddl(index));
1400 ut_ad(rw_lock_own_flagged(
1401 &index->lock,
1402 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1403 ut_ad(page_no != FIL_NULL);
1404
1405 if (index->online_log->error != DB_SUCCESS) {
1406 return;
1407 }
1408
1409 page_no_map* blobs = index->online_log->blobs;
1410
1411 if (blobs == NULL) {
1412 index->online_log->blobs = blobs = UT_NEW_NOKEY(page_no_map());
1413 }
1414
1415 #ifdef UNIV_DEBUG
1416 const ulonglong log_pos = index->online_log->tail.total;
1417 #else
1418 # define log_pos /* empty */
1419 #endif /* UNIV_DEBUG */
1420
1421 const page_no_map::value_type v(page_no,
1422 row_log_table_blob_t(log_pos));
1423
1424 std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1425
1426 if (!p.second) {
1427 /* Update the existing mapping. */
1428 ut_ad(p.first->first == page_no);
1429 p.first->second.blob_free(log_pos);
1430 }
1431 #undef log_pos
1432 }
1433
1434 /******************************************************//**
1435 Notes that a BLOB is being allocated during online ALTER TABLE. */
1436 void
row_log_table_blob_alloc(dict_index_t * index,ulint page_no)1437 row_log_table_blob_alloc(
1438 /*=====================*/
1439 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1440 ulint page_no)/*!< in: starting page number of the BLOB */
1441 {
1442 ut_ad(dict_index_is_clust(index));
1443 ut_ad(dict_index_is_online_ddl(index));
1444
1445 ut_ad(rw_lock_own_flagged(
1446 &index->lock,
1447 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1448
1449 ut_ad(page_no != FIL_NULL);
1450
1451 if (index->online_log->error != DB_SUCCESS) {
1452 return;
1453 }
1454
1455 /* Only track allocations if the same page has been freed
1456 earlier. Double allocation without a free is not allowed. */
1457 if (page_no_map* blobs = index->online_log->blobs) {
1458 page_no_map::iterator p = blobs->find(page_no);
1459
1460 if (p != blobs->end()) {
1461 ut_ad(p->first == page_no);
1462 p->second.blob_alloc(index->online_log->tail.total);
1463 }
1464 }
1465 }
1466
1467 /******************************************************//**
1468 Converts a log record to a table row.
1469 @return converted row, or NULL if the conversion fails */
1470 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1471 const dtuple_t*
row_log_table_apply_convert_mrec(const mrec_t * mrec,dict_index_t * index,const ulint * offsets,const row_log_t * log,mem_heap_t * heap,trx_id_t trx_id,dberr_t * error)1472 row_log_table_apply_convert_mrec(
1473 /*=============================*/
1474 const mrec_t* mrec, /*!< in: merge record */
1475 dict_index_t* index, /*!< in: index of mrec */
1476 const ulint* offsets, /*!< in: offsets of mrec */
1477 const row_log_t* log, /*!< in: rebuild context */
1478 mem_heap_t* heap, /*!< in/out: memory heap */
1479 trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
1480 dberr_t* error) /*!< out: DB_SUCCESS or
1481 DB_MISSING_HISTORY or
1482 reason of failure */
1483 {
1484 dtuple_t* row;
1485 ulint num_v = dict_table_get_n_v_cols(log->table);
1486
1487 *error = DB_SUCCESS;
1488
1489 /* This is based on row_build(). */
1490 if (log->add_cols) {
1491 row = dtuple_copy(log->add_cols, heap);
1492 /* dict_table_copy_types() would set the fields to NULL */
1493 for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1494 dict_col_copy_type(
1495 dict_table_get_nth_col(log->table, i),
1496 dfield_get_type(dtuple_get_nth_field(row, i)));
1497 }
1498 } else {
1499 row = dtuple_create_with_vcol(
1500 heap, dict_table_get_n_cols(log->table), num_v);
1501 dict_table_copy_types(row, log->table);
1502 }
1503
1504 for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1505 const dict_field_t* ind_field
1506 = dict_index_get_nth_field(index, i);
1507
1508 if (ind_field->prefix_len) {
1509 /* Column prefixes can only occur in key
1510 fields, which cannot be stored externally. For
1511 a column prefix, there should also be the full
1512 field in the clustered index tuple. The row
1513 tuple comprises full fields, not prefixes. */
1514 ut_ad(!rec_offs_nth_extern(offsets, i));
1515 continue;
1516 }
1517
1518 const dict_col_t* col
1519 = dict_field_get_col(ind_field);
1520
1521 ulint col_no
1522 = log->col_map[dict_col_get_no(col)];
1523
1524 if (col_no == ULINT_UNDEFINED) {
1525 /* dropped column */
1526 continue;
1527 }
1528
1529 dfield_t* dfield
1530 = dtuple_get_nth_field(row, col_no);
1531
1532 ulint len;
1533 const byte* data;
1534
1535 if (rec_offs_nth_extern(offsets, i)) {
1536 ut_ad(rec_offs_any_extern(offsets));
1537 rw_lock_x_lock(dict_index_get_lock(index));
1538
1539 if (const page_no_map* blobs = log->blobs) {
1540 data = rec_get_nth_field(
1541 mrec, offsets, i, &len);
1542 ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1543
1544 ulint page_no = mach_read_from_4(
1545 data + len - (BTR_EXTERN_FIELD_REF_SIZE
1546 - BTR_EXTERN_PAGE_NO));
1547 page_no_map::const_iterator p = blobs->find(
1548 page_no);
1549 if (p != blobs->end()
1550 && p->second.is_freed(log->head.total)) {
1551 /* This BLOB has been freed.
1552 We must not access the row. */
1553 *error = DB_MISSING_HISTORY;
1554 dfield_set_data(dfield, data, len);
1555 dfield_set_ext(dfield);
1556 goto blob_done;
1557 }
1558 }
1559
1560 data = btr_rec_copy_externally_stored_field(
1561 mrec, offsets,
1562 dict_table_page_size(index->table),
1563 i, &len, heap);
1564 ut_a(data);
1565 dfield_set_data(dfield, data, len);
1566 blob_done:
1567 rw_lock_x_unlock(dict_index_get_lock(index));
1568 } else {
1569 data = rec_get_nth_field(mrec, offsets, i, &len);
1570 dfield_set_data(dfield, data, len);
1571 }
1572
1573 if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL
1574 && col->len != len && !dict_table_is_comp(log->table)) {
1575
1576 ut_ad(col->len >= len);
1577 if (dict_table_is_comp(index->table)) {
1578 byte* buf = (byte*) mem_heap_alloc(heap,
1579 col->len);
1580 memcpy(buf, dfield->data, len);
1581 memset(buf + len, 0x20, col->len - len);
1582
1583 dfield_set_data(dfield, buf, col->len);
1584 } else {
1585 /* field length mismatch should not happen
1586 when rebuilding the redundant row format
1587 table. */
1588 ut_ad(0);
1589 *error = DB_CORRUPTION;
1590 return(NULL);
1591 }
1592 }
1593
1594 /* See if any columns were changed to NULL or NOT NULL. */
1595 const dict_col_t* new_col
1596 = dict_table_get_nth_col(log->table, col_no);
1597 ut_ad(new_col->mtype == col->mtype);
1598
1599 /* Assert that prtype matches except for nullability. */
1600 ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1601 ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1602 & ~DATA_NOT_NULL));
1603
1604 if (new_col->prtype == col->prtype) {
1605 continue;
1606 }
1607
1608 if ((new_col->prtype & DATA_NOT_NULL)
1609 && dfield_is_null(dfield)) {
1610 /* We got a NULL value for a NOT NULL column. */
1611 *error = DB_INVALID_NULL;
1612 return(NULL);
1613 }
1614
1615 /* Adjust the DATA_NOT_NULL flag in the parsed row. */
1616 dfield_get_type(dfield)->prtype = new_col->prtype;
1617
1618 ut_ad(dict_col_type_assert_equal(new_col,
1619 dfield_get_type(dfield)));
1620 }
1621
1622 /* read the virtual column data if any */
1623 if (num_v) {
1624 byte* b = const_cast<byte*>(mrec)
1625 + rec_offs_data_size(offsets);
1626 trx_undo_read_v_cols(log->table, b, row, false,
1627 &(log->col_map[log->n_old_col]));
1628 }
1629
1630 return(row);
1631 }
1632
1633 /******************************************************//**
1634 Replays an insert operation on a table that was rebuilt.
1635 @return DB_SUCCESS or error code */
1636 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1637 dberr_t
row_log_table_apply_insert_low(que_thr_t * thr,const dtuple_t * row,trx_id_t trx_id,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup)1638 row_log_table_apply_insert_low(
1639 /*===========================*/
1640 que_thr_t* thr, /*!< in: query graph */
1641 const dtuple_t* row, /*!< in: table row
1642 in the old table definition */
1643 trx_id_t trx_id, /*!< in: trx_id of the row */
1644 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1645 that can be emptied */
1646 mem_heap_t* heap, /*!< in/out: memory heap */
1647 row_merge_dup_t* dup) /*!< in/out: for reporting
1648 duplicate key errors */
1649 {
1650 dberr_t error;
1651 dtuple_t* entry;
1652 const row_log_t*log = dup->index->online_log;
1653 dict_index_t* index = dict_table_get_first_index(log->table);
1654 ulint n_index = 0;
1655
1656 ut_ad(dtuple_validate(row));
1657 ut_ad(trx_id);
1658
1659 DBUG_PRINT("ib_alter_table",
1660 ("insert table " IB_ID_FMT "(index " IB_ID_FMT "): %s",
1661 index->table->id, index->id,
1662 rec_printer(row).str().c_str()));
1663
1664 static const ulint flags
1665 = (BTR_CREATE_FLAG
1666 | BTR_NO_LOCKING_FLAG
1667 | BTR_NO_UNDO_LOG_FLAG
1668 | BTR_KEEP_SYS_FLAG);
1669
1670 entry = row_build_index_entry(row, NULL, index, heap);
1671
1672 error = row_ins_clust_index_entry_low(
1673 flags, BTR_MODIFY_TREE, index, index->n_uniq,
1674 entry, 0, thr, false);
1675
1676 switch (error) {
1677 case DB_SUCCESS:
1678 break;
1679 case DB_SUCCESS_LOCKED_REC:
1680 /* The row had already been copied to the table. */
1681 return(DB_SUCCESS);
1682 default:
1683 return(error);
1684 }
1685
1686 do {
1687 n_index++;
1688 if (!(index = dict_table_get_next_index(index))) {
1689 break;
1690 }
1691
1692 if (index->type & DICT_FTS) {
1693 continue;
1694 }
1695
1696 entry = row_build_index_entry(row, NULL, index, heap);
1697 error = row_ins_sec_index_entry_low(
1698 flags, BTR_MODIFY_TREE,
1699 index, offsets_heap, heap, entry, trx_id, thr,
1700 false);
1701
1702 /* Report correct index name for duplicate key error. */
1703 if (error == DB_DUPLICATE_KEY) {
1704 thr_get_trx(thr)->error_key_num = n_index;
1705 }
1706
1707 } while (error == DB_SUCCESS);
1708
1709 return(error);
1710 }
1711
1712 /******************************************************//**
1713 Replays an insert operation on a table that was rebuilt.
1714 @return DB_SUCCESS or error code */
1715 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1716 dberr_t
row_log_table_apply_insert(que_thr_t * thr,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id)1717 row_log_table_apply_insert(
1718 /*=======================*/
1719 que_thr_t* thr, /*!< in: query graph */
1720 const mrec_t* mrec, /*!< in: record to insert */
1721 const ulint* offsets, /*!< in: offsets of mrec */
1722 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1723 that can be emptied */
1724 mem_heap_t* heap, /*!< in/out: memory heap */
1725 row_merge_dup_t* dup, /*!< in/out: for reporting
1726 duplicate key errors */
1727 trx_id_t trx_id) /*!< in: DB_TRX_ID of mrec */
1728 {
1729 const row_log_t*log = dup->index->online_log;
1730 dberr_t error;
1731 const dtuple_t* row = row_log_table_apply_convert_mrec(
1732 mrec, dup->index, offsets, log, heap, trx_id, &error);
1733
1734 switch (error) {
1735 case DB_MISSING_HISTORY:
1736 ut_ad(log->blobs);
1737 /* Because some BLOBs are missing, we know that the
1738 transaction was rolled back later (a rollback of
1739 an insert can free BLOBs).
1740 We can simply skip the insert: the subsequent
1741 ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will
1742 be interpreted as ROW_T_INSERT. */
1743 return(DB_SUCCESS);
1744 case DB_SUCCESS:
1745 ut_ad(row != NULL);
1746 break;
1747 default:
1748 ut_ad(0);
1749 case DB_INVALID_NULL:
1750 ut_ad(row == NULL);
1751 return(error);
1752 }
1753
1754 error = row_log_table_apply_insert_low(
1755 thr, row, trx_id, offsets_heap, heap, dup);
1756 if (error != DB_SUCCESS) {
1757 /* Report the erroneous row using the new
1758 version of the table. */
1759 innobase_row_to_mysql(dup->table, log->table, row);
1760 }
1761 return(error);
1762 }
1763
1764 /******************************************************//**
1765 Deletes a record from a table that is being rebuilt.
1766 @return DB_SUCCESS or error code */
1767 static MY_ATTRIBUTE((warn_unused_result))
1768 dberr_t
row_log_table_apply_delete_low(btr_pcur_t * pcur,const dtuple_t * ventry,const ulint * offsets,const row_ext_t * save_ext,mem_heap_t * heap,mtr_t * mtr)1769 row_log_table_apply_delete_low(
1770 /*===========================*/
1771 btr_pcur_t* pcur, /*!< in/out: B-tree cursor,
1772 will be trashed */
1773 const dtuple_t* ventry, /*!< in: dtuple holding
1774 virtual column info */
1775 const ulint* offsets, /*!< in: offsets on pcur */
1776 const row_ext_t* save_ext, /*!< in: saved external field
1777 info, or NULL */
1778 mem_heap_t* heap, /*!< in/out: memory heap */
1779 mtr_t* mtr) /*!< in/out: mini-transaction,
1780 will be committed */
1781 {
1782 dberr_t error;
1783 row_ext_t* ext;
1784 dtuple_t* row;
1785 dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
1786
1787 ut_ad(dict_index_is_clust(index));
1788
1789 DBUG_PRINT("ib_alter_table",
1790 ("delete table " IB_ID_FMT "(index " IB_ID_FMT "): %s",
1791 index->table->id, index->id,
1792 rec_printer(btr_pcur_get_rec(pcur),
1793 offsets).str().c_str()));
1794
1795 if (dict_table_get_next_index(index)) {
1796 /* Build a row template for purging secondary index entries. */
1797 row = row_build(
1798 ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1799 offsets, NULL, NULL, NULL,
1800 save_ext ? NULL : &ext, heap);
1801 if (ventry) {
1802 dtuple_copy_v_fields(row, ventry);
1803 }
1804
1805 if (!save_ext) {
1806 save_ext = ext;
1807 }
1808 } else {
1809 row = NULL;
1810 }
1811
1812 btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1813 BTR_CREATE_FLAG, false, mtr);
1814 mtr_commit(mtr);
1815
1816 if (error != DB_SUCCESS) {
1817 return(error);
1818 }
1819
1820 while ((index = dict_table_get_next_index(index)) != NULL) {
1821 if (index->type & DICT_FTS) {
1822 continue;
1823 }
1824
1825 const dtuple_t* entry = row_build_index_entry(
1826 row, save_ext, index, heap);
1827 mtr_start(mtr);
1828 mtr->set_named_space(index->space);
1829 btr_pcur_open(index, entry, PAGE_CUR_LE,
1830 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1831 pcur, mtr);
1832 #ifdef UNIV_DEBUG
1833 switch (btr_pcur_get_btr_cur(pcur)->flag) {
1834 case BTR_CUR_DELETE_REF:
1835 case BTR_CUR_DEL_MARK_IBUF:
1836 case BTR_CUR_DELETE_IBUF:
1837 case BTR_CUR_INSERT_TO_IBUF:
1838 /* We did not request buffering. */
1839 break;
1840 case BTR_CUR_HASH:
1841 case BTR_CUR_HASH_FAIL:
1842 case BTR_CUR_BINARY:
1843 goto flag_ok;
1844 }
1845 ut_ad(0);
1846 flag_ok:
1847 #endif /* UNIV_DEBUG */
1848
1849 if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1850 || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1851 /* All secondary index entries should be
1852 found, because new_table is being modified by
1853 this thread only, and all indexes should be
1854 updated in sync. */
1855 mtr_commit(mtr);
1856 return(DB_INDEX_CORRUPT);
1857 }
1858
1859 btr_cur_pessimistic_delete(&error, FALSE,
1860 btr_pcur_get_btr_cur(pcur),
1861 BTR_CREATE_FLAG, false, mtr);
1862 mtr_commit(mtr);
1863 }
1864
1865 return(error);
1866 }
1867
1868 /******************************************************//**
1869 Replays a delete operation on a table that was rebuilt.
1870 @return DB_SUCCESS or error code */
1871 static MY_ATTRIBUTE((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
1872 dberr_t
row_log_table_apply_delete(que_thr_t * thr,ulint trx_id_col,const mrec_t * mrec,const ulint * moffsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const row_log_t * log,const row_ext_t * save_ext,ulint ext_size)1873 row_log_table_apply_delete(
1874 /*=======================*/
1875 que_thr_t* thr, /*!< in: query graph */
1876 ulint trx_id_col, /*!< in: position of
1877 DB_TRX_ID in the new
1878 clustered index */
1879 const mrec_t* mrec, /*!< in: merge record */
1880 const ulint* moffsets, /*!< in: offsets of mrec */
1881 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1882 that can be emptied */
1883 mem_heap_t* heap, /*!< in/out: memory heap */
1884 const row_log_t* log, /*!< in: online log */
1885 const row_ext_t* save_ext, /*!< in: saved external field
1886 info, or NULL */
1887 ulint ext_size) /*!< in: external field size */
1888 {
1889 dict_table_t* new_table = log->table;
1890 dict_index_t* index = dict_table_get_first_index(new_table);
1891 dtuple_t* old_pk;
1892 mtr_t mtr;
1893 btr_pcur_t pcur;
1894 ulint* offsets;
1895 ulint num_v = new_table->n_v_cols;
1896
1897 ut_ad(rec_offs_n_fields(moffsets)
1898 == dict_index_get_n_unique(index) + 2);
1899 ut_ad(!rec_offs_any_extern(moffsets));
1900
1901 /* Convert the row to a search tuple. */
1902 old_pk = dtuple_create_with_vcol(heap, index->n_uniq, num_v);
1903 dict_index_copy_types(old_pk, index, index->n_uniq);
1904
1905 if (num_v) {
1906 dict_table_copy_v_types(old_pk, index->table);
1907 }
1908
1909 for (ulint i = 0; i < index->n_uniq; i++) {
1910 ulint len;
1911 const void* field;
1912 field = rec_get_nth_field(mrec, moffsets, i, &len);
1913 ut_ad(len != UNIV_SQL_NULL);
1914 dfield_set_data(dtuple_get_nth_field(old_pk, i),
1915 field, len);
1916 }
1917
1918 mtr_start(&mtr);
1919 mtr.set_named_space(index->space);
1920 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1921 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1922 &pcur, &mtr);
1923 #ifdef UNIV_DEBUG
1924 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1925 case BTR_CUR_DELETE_REF:
1926 case BTR_CUR_DEL_MARK_IBUF:
1927 case BTR_CUR_DELETE_IBUF:
1928 case BTR_CUR_INSERT_TO_IBUF:
1929 /* We did not request buffering. */
1930 break;
1931 case BTR_CUR_HASH:
1932 case BTR_CUR_HASH_FAIL:
1933 case BTR_CUR_BINARY:
1934 goto flag_ok;
1935 }
1936 ut_ad(0);
1937 flag_ok:
1938 #endif /* UNIV_DEBUG */
1939
1940 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1941 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1942 all_done:
1943 mtr_commit(&mtr);
1944 /* The record was not found. All done. */
1945 /* This should only happen when an earlier
1946 ROW_T_INSERT was skipped or
1947 ROW_T_UPDATE was interpreted as ROW_T_DELETE
1948 due to BLOBs having been freed by rollback. */
1949 return(DB_SUCCESS);
1950 }
1951
1952 offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
1953 ULINT_UNDEFINED, &offsets_heap);
1954 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1955 ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1956 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1957
1958 /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */
1959
1960 {
1961 ulint len;
1962 const byte* mrec_trx_id
1963 = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1964 ut_ad(len == DATA_TRX_ID_LEN);
1965 const byte* rec_trx_id
1966 = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1967 trx_id_col, &len);
1968 ut_ad(len == DATA_TRX_ID_LEN);
1969
1970 ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
1971 == mrec_trx_id + DATA_TRX_ID_LEN);
1972 ut_ad(len == DATA_ROLL_PTR_LEN);
1973 ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1974 trx_id_col + 1, &len)
1975 == rec_trx_id + DATA_TRX_ID_LEN);
1976 ut_ad(len == DATA_ROLL_PTR_LEN);
1977
1978 if (memcmp(mrec_trx_id, rec_trx_id,
1979 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1980 /* The ROW_T_DELETE was logged for a different
1981 PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR.
1982 This is possible if a ROW_T_INSERT was skipped
1983 or a ROW_T_UPDATE was interpreted as ROW_T_DELETE
1984 because some BLOBs were missing due to
1985 (1) rolling back the initial insert, or
1986 (2) purging the BLOB for a later ROW_T_DELETE
1987 (3) purging 'old values' for a later ROW_T_UPDATE
1988 or ROW_T_DELETE. */
1989 ut_ad(!log->same_pk);
1990 goto all_done;
1991 }
1992 }
1993
1994 if (num_v) {
1995 byte* b = (byte*)mrec + rec_offs_data_size(moffsets)
1996 + ext_size;
1997 trx_undo_read_v_cols(log->table, b, old_pk, false,
1998 &(log->col_map[log->n_old_col]));
1999 }
2000
2001 return(row_log_table_apply_delete_low(&pcur, old_pk,
2002 offsets, save_ext,
2003 heap, &mtr));
2004 }
2005
2006 /******************************************************//**
2007 Replays an update operation on a table that was rebuilt.
2008 @return DB_SUCCESS or error code */
2009 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2010 dberr_t
row_log_table_apply_update(que_thr_t * thr,ulint new_trx_id_col,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id,const dtuple_t * old_pk)2011 row_log_table_apply_update(
2012 /*=======================*/
2013 que_thr_t* thr, /*!< in: query graph */
2014 ulint new_trx_id_col, /*!< in: position of
2015 DB_TRX_ID in the new
2016 clustered index */
2017 const mrec_t* mrec, /*!< in: new value */
2018 const ulint* offsets, /*!< in: offsets of mrec */
2019 mem_heap_t* offsets_heap, /*!< in/out: memory heap
2020 that can be emptied */
2021 mem_heap_t* heap, /*!< in/out: memory heap */
2022 row_merge_dup_t* dup, /*!< in/out: for reporting
2023 duplicate key errors */
2024 trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
2025 const dtuple_t* old_pk) /*!< in: PRIMARY KEY and
2026 DB_TRX_ID,DB_ROLL_PTR
2027 of the old value,
2028 or PRIMARY KEY if same_pk */
2029 {
2030 const row_log_t*log = dup->index->online_log;
2031 const dtuple_t* row;
2032 dict_index_t* index = dict_table_get_first_index(log->table);
2033 mtr_t mtr;
2034 btr_pcur_t pcur;
2035 dberr_t error;
2036 ulint n_index = 0;
2037
2038 ut_ad(dtuple_get_n_fields_cmp(old_pk)
2039 == dict_index_get_n_unique(index));
2040 ut_ad(dtuple_get_n_fields(old_pk)
2041 == dict_index_get_n_unique(index)
2042 + (log->same_pk ? 0 : 2));
2043
2044 row = row_log_table_apply_convert_mrec(
2045 mrec, dup->index, offsets, log, heap, trx_id, &error);
2046
2047 switch (error) {
2048 case DB_MISSING_HISTORY:
2049 /* The record contained BLOBs that are now missing. */
2050 ut_ad(log->blobs);
2051 /* Whether or not we are updating the PRIMARY KEY, we
2052 know that there should be a subsequent
2053 ROW_T_DELETE for rolling back a preceding ROW_T_INSERT,
2054 overriding this ROW_T_UPDATE record. (*1)
2055
2056 This allows us to interpret this ROW_T_UPDATE
2057 as ROW_T_DELETE.
2058
2059 When applying the subsequent ROW_T_DELETE, no matching
2060 record will be found. */
2061 /* Fall through. */
2062 case DB_SUCCESS:
2063 ut_ad(row != NULL);
2064 break;
2065 default:
2066 ut_ad(0);
2067 case DB_INVALID_NULL:
2068 ut_ad(row == NULL);
2069 return(error);
2070 }
2071
2072 mtr_start(&mtr);
2073 mtr.set_named_space(index->space);
2074 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
2075 BTR_MODIFY_TREE, &pcur, &mtr);
2076 #ifdef UNIV_DEBUG
2077 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
2078 case BTR_CUR_DELETE_REF:
2079 case BTR_CUR_DEL_MARK_IBUF:
2080 case BTR_CUR_DELETE_IBUF:
2081 case BTR_CUR_INSERT_TO_IBUF:
2082 ut_ad(0);/* We did not request buffering. */
2083 case BTR_CUR_HASH:
2084 case BTR_CUR_HASH_FAIL:
2085 case BTR_CUR_BINARY:
2086 break;
2087 }
2088 #endif /* UNIV_DEBUG */
2089
2090 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
2091 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
2092 /* The record was not found. This should only happen
2093 when an earlier ROW_T_INSERT or ROW_T_UPDATE was
2094 diverted because BLOBs were freed when the insert was
2095 later rolled back. */
2096
2097 ut_ad(log->blobs);
2098
2099 if (error == DB_SUCCESS) {
2100 /* An earlier ROW_T_INSERT could have been
2101 skipped because of a missing BLOB, like this:
2102
2103 BEGIN;
2104 INSERT INTO t SET blob_col='blob value';
2105 UPDATE t SET blob_col='';
2106 ROLLBACK;
2107
2108 This would generate the following records:
2109 ROW_T_INSERT (referring to 'blob value')
2110 ROW_T_UPDATE
2111 ROW_T_UPDATE (referring to 'blob value')
2112 ROW_T_DELETE
2113 [ROLLBACK removes the 'blob value']
2114
2115 The ROW_T_INSERT would have been skipped
2116 because of a missing BLOB. Now we are
2117 executing the first ROW_T_UPDATE.
2118 The second ROW_T_UPDATE (for the ROLLBACK)
2119 would be interpreted as ROW_T_DELETE, because
2120 the BLOB would be missing.
2121
2122 We could probably assume that the transaction
2123 has been rolled back and simply skip the
2124 'insert' part of this ROW_T_UPDATE record.
2125 However, there might be some complex scenario
2126 that could interfere with such a shortcut.
2127 So, we will insert the row (and risk
2128 introducing a bogus duplicate key error
2129 for the ALTER TABLE), and a subsequent
2130 ROW_T_UPDATE or ROW_T_DELETE will delete it. */
2131 mtr_commit(&mtr);
2132 error = row_log_table_apply_insert_low(
2133 thr, row, trx_id, offsets_heap, heap, dup);
2134 } else {
2135 /* Some BLOBs are missing, so we are interpreting
2136 this ROW_T_UPDATE as ROW_T_DELETE (see *1).
2137 Because the record was not found, we do nothing. */
2138 ut_ad(error == DB_MISSING_HISTORY);
2139 error = DB_SUCCESS;
2140 func_exit:
2141 mtr_commit(&mtr);
2142 }
2143 func_exit_committed:
2144 ut_ad(mtr.has_committed());
2145
2146 if (error != DB_SUCCESS) {
2147 /* Report the erroneous row using the new
2148 version of the table. */
2149 innobase_row_to_mysql(dup->table, log->table, row);
2150 }
2151
2152 return(error);
2153 }
2154
2155 /* Prepare to update (or delete) the record. */
2156 ulint* cur_offsets = rec_get_offsets(
2157 btr_pcur_get_rec(&pcur),
2158 index, NULL, ULINT_UNDEFINED, &offsets_heap);
2159
2160 if (!log->same_pk) {
2161 /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
2162 was buffered. */
2163 ulint len;
2164 const void* rec_trx_id
2165 = rec_get_nth_field(btr_pcur_get_rec(&pcur),
2166 cur_offsets, index->n_uniq, &len);
2167 ut_ad(len == DATA_TRX_ID_LEN);
2168 ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len
2169 == DATA_TRX_ID_LEN);
2170 ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len
2171 == DATA_ROLL_PTR_LEN);
2172 ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>(
2173 dtuple_get_nth_field(old_pk,
2174 index->n_uniq)->data)
2175 == dtuple_get_nth_field(old_pk,
2176 index->n_uniq + 1)->data);
2177 if (memcmp(rec_trx_id,
2178 dtuple_get_nth_field(old_pk, index->n_uniq)->data,
2179 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
2180 /* The ROW_T_UPDATE was logged for a different
2181 DB_TRX_ID,DB_ROLL_PTR. This is possible if an
2182 earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
2183 because some BLOBs were missing due to rolling
2184 back the initial insert or due to purging
2185 the old BLOB values of an update. */
2186 ut_ad(log->blobs);
2187 if (error != DB_SUCCESS) {
2188 ut_ad(error == DB_MISSING_HISTORY);
2189 /* Some BLOBs are missing, so we are
2190 interpreting this ROW_T_UPDATE as
2191 ROW_T_DELETE (see *1).
2192 Because this is a different row,
2193 we will do nothing. */
2194 error = DB_SUCCESS;
2195 } else {
2196 /* Because the user record is missing due to
2197 BLOBs that were missing when processing
2198 an earlier log record, we should
2199 interpret the ROW_T_UPDATE as ROW_T_INSERT.
2200 However, there is a different user record
2201 with the same PRIMARY KEY value already. */
2202 error = DB_DUPLICATE_KEY;
2203 }
2204
2205 goto func_exit;
2206 }
2207 }
2208
2209 if (error != DB_SUCCESS) {
2210 ut_ad(error == DB_MISSING_HISTORY);
2211 ut_ad(log->blobs);
2212 /* Some BLOBs are missing, so we are interpreting
2213 this ROW_T_UPDATE as ROW_T_DELETE (see *1). */
2214 error = row_log_table_apply_delete_low(
2215 &pcur, old_pk, cur_offsets, NULL, heap, &mtr);
2216 goto func_exit_committed;
2217 }
2218
2219 /** It allows to create tuple with virtual column information. */
2220 dtuple_t* entry = row_build_index_entry_low(
2221 row, NULL, index, heap, ROW_BUILD_FOR_INSERT);
2222 upd_t* update = row_upd_build_difference_binary(
2223 index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
2224 false, NULL, heap, dup->table, &error);
2225 if (error != DB_SUCCESS) {
2226 goto func_exit;
2227 }
2228
2229 if (!update->n_fields) {
2230 /* Nothing to do. */
2231 goto func_exit;
2232 }
2233
2234 const bool pk_updated
2235 = upd_get_nth_field(update, 0)->field_no < new_trx_id_col;
2236
2237 if (pk_updated || rec_offs_any_extern(cur_offsets)) {
2238 /* If the record contains any externally stored
2239 columns, perform the update by delete and insert,
2240 because we will not write any undo log that would
2241 allow purge to free any orphaned externally stored
2242 columns. */
2243
2244 if (pk_updated && log->same_pk) {
2245 /* The ROW_T_UPDATE log record should only be
2246 written when the PRIMARY KEY fields of the
2247 record did not change in the old table. We
2248 can only get a change of PRIMARY KEY columns
2249 in the rebuilt table if the PRIMARY KEY was
2250 redefined (!same_pk). */
2251 ut_ad(0);
2252 error = DB_CORRUPTION;
2253 goto func_exit;
2254 }
2255
2256 error = row_log_table_apply_delete_low(
2257 &pcur, old_pk, cur_offsets, NULL, heap, &mtr);
2258 ut_ad(mtr.has_committed());
2259
2260 if (error == DB_SUCCESS) {
2261 error = row_log_table_apply_insert_low(
2262 thr, row, trx_id, offsets_heap, heap, dup);
2263 }
2264
2265 goto func_exit_committed;
2266 }
2267
2268 dtuple_t* old_row;
2269 row_ext_t* old_ext;
2270
2271 if (dict_table_get_next_index(index) != NULL) {
2272 /* Construct the row corresponding to the old value of
2273 the record. */
2274 old_row = row_build(
2275 ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
2276 cur_offsets, NULL, NULL, NULL, &old_ext, heap);
2277 ut_ad(old_row);
2278 DBUG_PRINT("ib_alter_table",
2279 ("update table " IB_ID_FMT
2280 "(index " IB_ID_FMT "): %s to %s",
2281 index->table->id, index->id,
2282 rec_printer(old_row).str().c_str(),
2283 rec_printer(row).str().c_str()));
2284 } else {
2285 old_row = NULL;
2286 old_ext = NULL;
2287 }
2288
2289 big_rec_t* big_rec;
2290
2291 error = btr_cur_pessimistic_update(
2292 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2293 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
2294 | BTR_KEEP_POS_FLAG,
2295 btr_pcur_get_btr_cur(&pcur),
2296 &cur_offsets, &offsets_heap, heap, &big_rec,
2297 update, 0, thr, 0, &mtr);
2298
2299 if (big_rec) {
2300 if (error == DB_SUCCESS) {
2301 error = btr_store_big_rec_extern_fields(
2302 &pcur, update, cur_offsets, big_rec, &mtr,
2303 BTR_STORE_UPDATE);
2304 }
2305
2306 dtuple_big_rec_free(big_rec);
2307 }
2308
2309 bool vfields_copied = false;
2310 while ((index = dict_table_get_next_index(index)) != NULL) {
2311
2312 n_index++;
2313 if (error != DB_SUCCESS) {
2314 break;
2315 }
2316
2317 if (index->type & DICT_FTS) {
2318 continue;
2319 }
2320
2321 if (!vfields_copied && dict_index_has_virtual(index)) {
2322 dtuple_copy_v_fields(old_row, old_pk);
2323 vfields_copied = true;
2324 }
2325
2326 if (!row_upd_changes_ord_field_binary(
2327 index, update, thr, old_row, NULL)) {
2328 continue;
2329 }
2330
2331 mtr_commit(&mtr);
2332
2333 entry = row_build_index_entry(old_row, old_ext, index, heap);
2334 if (!entry) {
2335 ut_ad(0);
2336 return(DB_CORRUPTION);
2337 }
2338
2339 mtr_start(&mtr);
2340 mtr.set_named_space(index->space);
2341
2342 if (ROW_FOUND != row_search_index_entry(
2343 index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
2344 ut_ad(0);
2345 error = DB_CORRUPTION;
2346 break;
2347 }
2348
2349 btr_cur_pessimistic_delete(
2350 &error, FALSE, btr_pcur_get_btr_cur(&pcur),
2351 BTR_CREATE_FLAG, false, &mtr);
2352
2353 if (error != DB_SUCCESS) {
2354 break;
2355 }
2356
2357 mtr_commit(&mtr);
2358
2359 entry = row_build_index_entry(row, NULL, index, heap);
2360 error = row_ins_sec_index_entry_low(
2361 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2362 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
2363 BTR_MODIFY_TREE, index, offsets_heap, heap,
2364 entry, trx_id, thr, false);
2365
2366 /* Report correct index name for duplicate key error. */
2367 if (error == DB_DUPLICATE_KEY) {
2368 thr_get_trx(thr)->error_key_num = n_index;
2369 }
2370
2371 mtr_start(&mtr);
2372 mtr.set_named_space(index->space);
2373 }
2374
2375 goto func_exit;
2376 }
2377
2378 /******************************************************//**
2379 Applies an operation to a table that was rebuilt.
2380 @return NULL on failure (mrec corruption) or when out of data;
2381 pointer to next record on success */
2382 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2383 const mrec_t*
row_log_table_apply_op(que_thr_t * thr,ulint trx_id_col,ulint new_trx_id_col,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)2384 row_log_table_apply_op(
2385 /*===================*/
2386 que_thr_t* thr, /*!< in: query graph */
2387 ulint trx_id_col, /*!< in: position of
2388 DB_TRX_ID in old index */
2389 ulint new_trx_id_col, /*!< in: position of
2390 DB_TRX_ID in new index */
2391 row_merge_dup_t* dup, /*!< in/out: for reporting
2392 duplicate key errors */
2393 dberr_t* error, /*!< out: DB_SUCCESS
2394 or error code */
2395 mem_heap_t* offsets_heap, /*!< in/out: memory heap
2396 that can be emptied */
2397 mem_heap_t* heap, /*!< in/out: memory heap */
2398 const mrec_t* mrec, /*!< in: merge record */
2399 const mrec_t* mrec_end, /*!< in: end of buffer */
2400 ulint* offsets) /*!< in/out: work area
2401 for parsing mrec */
2402 {
2403 row_log_t* log = dup->index->online_log;
2404 dict_index_t* new_index = dict_table_get_first_index(log->table);
2405 ulint extra_size;
2406 const mrec_t* next_mrec;
2407 dtuple_t* old_pk;
2408 row_ext_t* ext;
2409 ulint ext_size;
2410
2411 ut_ad(dict_index_is_clust(dup->index));
2412 ut_ad(dup->index->table != log->table);
2413 ut_ad(log->head.total <= log->tail.total);
2414
2415 *error = DB_SUCCESS;
2416
2417 /* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
2418 if (mrec + 3 >= mrec_end) {
2419 return(NULL);
2420 }
2421
2422 const mrec_t* const mrec_start = mrec;
2423
2424 switch (*mrec++) {
2425 default:
2426 ut_ad(0);
2427 *error = DB_CORRUPTION;
2428 return(NULL);
2429 case ROW_T_INSERT:
2430 extra_size = *mrec++;
2431
2432 if (extra_size >= 0x80) {
2433 /* Read another byte of extra_size. */
2434
2435 extra_size = (extra_size & 0x7f) << 8;
2436 extra_size |= *mrec++;
2437 }
2438
2439 mrec += extra_size;
2440
2441 if (mrec > mrec_end) {
2442 return(NULL);
2443 }
2444
2445 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2446 rec_init_offsets_temp(mrec, dup->index, offsets);
2447
2448 next_mrec = mrec + rec_offs_data_size(offsets);
2449
2450 if (log->table->n_v_cols) {
2451 if (next_mrec + 2 > mrec_end) {
2452 return(NULL);
2453 }
2454 next_mrec += mach_read_from_2(next_mrec);
2455 }
2456
2457 if (next_mrec > mrec_end) {
2458 return(NULL);
2459 } else {
2460 log->head.total += next_mrec - mrec_start;
2461
2462 ulint len;
2463 const byte* db_trx_id
2464 = rec_get_nth_field(
2465 mrec, offsets, trx_id_col, &len);
2466 ut_ad(len == DATA_TRX_ID_LEN);
2467 *error = row_log_table_apply_insert(
2468 thr, mrec, offsets, offsets_heap,
2469 heap, dup, trx_read_trx_id(db_trx_id));
2470 }
2471 break;
2472
2473 case ROW_T_DELETE:
2474 /* 1 (extra_size) + 4 (ext_size) + at least 1 (payload) */
2475 if (mrec + 6 >= mrec_end) {
2476 return(NULL);
2477 }
2478
2479 extra_size = *mrec++;
2480 ext_size = mach_read_from_4(mrec);
2481 mrec += 4;
2482 ut_ad(mrec < mrec_end);
2483
2484 /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2485 For fixed-length PRIMARY key columns, it is 0. */
2486 mrec += extra_size;
2487
2488 rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2489 rec_init_offsets_temp(mrec, new_index, offsets);
2490 next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
2491 if (log->table->n_v_cols) {
2492 if (next_mrec + 2 > mrec_end) {
2493 return(NULL);
2494 }
2495
2496 next_mrec += mach_read_from_2(next_mrec);
2497 }
2498
2499 if (next_mrec > mrec_end) {
2500 return(NULL);
2501 }
2502
2503 log->head.total += next_mrec - mrec_start;
2504
2505 /* If there are external fields, retrieve those logged
2506 prefix info and reconstruct the row_ext_t */
2507 if (ext_size) {
2508 /* We use memcpy to avoid unaligned
2509 access on some non-x86 platforms.*/
2510 ext = static_cast<row_ext_t*>(
2511 mem_heap_dup(heap,
2512 mrec + rec_offs_data_size(offsets),
2513 ext_size));
2514
2515 byte* ext_start = reinterpret_cast<byte*>(ext);
2516
2517 ulint ext_len = sizeof(*ext)
2518 + (ext->n_ext - 1) * sizeof ext->len;
2519
2520 ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
2521 ext_len += ext->n_ext * sizeof(*ext->ext);
2522
2523 ext->buf = static_cast<byte*>(ext_start + ext_len);
2524 } else {
2525 ext = NULL;
2526 }
2527
2528 *error = row_log_table_apply_delete(
2529 thr, new_trx_id_col,
2530 mrec, offsets, offsets_heap, heap,
2531 log, ext, ext_size);
2532 break;
2533
2534 case ROW_T_UPDATE:
2535 /* Logically, the log entry consists of the
2536 (PRIMARY KEY,DB_TRX_ID) of the old value (converted
2537 to the new primary key definition) followed by
2538 the new value in the old table definition. If the
2539 definition of the columns belonging to PRIMARY KEY
2540 is not changed, the log will only contain
2541 DB_TRX_ID,new_row. */
2542 ulint num_v = new_index->table->n_v_cols;
2543
2544 if (dup->index->online_log->same_pk) {
2545 ut_ad(new_index->n_uniq == dup->index->n_uniq);
2546
2547 extra_size = *mrec++;
2548
2549 if (extra_size >= 0x80) {
2550 /* Read another byte of extra_size. */
2551
2552 extra_size = (extra_size & 0x7f) << 8;
2553 extra_size |= *mrec++;
2554 }
2555
2556 mrec += extra_size;
2557
2558 if (mrec > mrec_end) {
2559 return(NULL);
2560 }
2561
2562 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2563 rec_init_offsets_temp(mrec, dup->index, offsets);
2564
2565 next_mrec = mrec + rec_offs_data_size(offsets);
2566
2567 if (next_mrec > mrec_end) {
2568 return(NULL);
2569 }
2570
2571 old_pk = dtuple_create_with_vcol(
2572 heap, new_index->n_uniq, num_v);
2573 dict_index_copy_types(
2574 old_pk, new_index, old_pk->n_fields);
2575 if (num_v) {
2576 dict_table_copy_v_types(
2577 old_pk, new_index->table);
2578 }
2579
2580 /* Copy the PRIMARY KEY fields from mrec to old_pk. */
2581 for (ulint i = 0; i < new_index->n_uniq; i++) {
2582 const void* field;
2583 ulint len;
2584 dfield_t* dfield;
2585
2586 ut_ad(!rec_offs_nth_extern(offsets, i));
2587
2588 field = rec_get_nth_field(
2589 mrec, offsets, i, &len);
2590 ut_ad(len != UNIV_SQL_NULL);
2591
2592 dfield = dtuple_get_nth_field(old_pk, i);
2593 dfield_set_data(dfield, field, len);
2594 }
2595 } else {
2596 /* We assume extra_size < 0x100
2597 for the PRIMARY KEY prefix. */
2598 mrec += *mrec + 1;
2599
2600 if (mrec > mrec_end) {
2601 return(NULL);
2602 }
2603
2604 /* Get offsets for PRIMARY KEY,
2605 DB_TRX_ID, DB_ROLL_PTR. */
2606 rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2607 rec_init_offsets_temp(mrec, new_index, offsets);
2608
2609 next_mrec = mrec + rec_offs_data_size(offsets);
2610 if (next_mrec + 2 > mrec_end) {
2611 return(NULL);
2612 }
2613
2614 /* Copy the PRIMARY KEY fields and
2615 DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2616 old_pk = dtuple_create_with_vcol(
2617 heap, new_index->n_uniq + 2, num_v);
2618 dict_index_copy_types(old_pk, new_index,
2619 old_pk->n_fields);
2620
2621 if (num_v) {
2622 dict_table_copy_v_types(
2623 old_pk, new_index->table);
2624 }
2625
2626 for (ulint i = 0;
2627 i < dict_index_get_n_unique(new_index) + 2;
2628 i++) {
2629 const void* field;
2630 ulint len;
2631 dfield_t* dfield;
2632
2633 ut_ad(!rec_offs_nth_extern(offsets, i));
2634
2635 field = rec_get_nth_field(
2636 mrec, offsets, i, &len);
2637 ut_ad(len != UNIV_SQL_NULL);
2638
2639 dfield = dtuple_get_nth_field(old_pk, i);
2640 dfield_set_data(dfield, field, len);
2641 }
2642
2643 mrec = next_mrec;
2644
2645 /* Fetch the new value of the row as it was
2646 in the old table definition. */
2647 extra_size = *mrec++;
2648
2649 if (extra_size >= 0x80) {
2650 /* Read another byte of extra_size. */
2651
2652 extra_size = (extra_size & 0x7f) << 8;
2653 extra_size |= *mrec++;
2654 }
2655
2656 mrec += extra_size;
2657
2658 if (mrec > mrec_end) {
2659 return(NULL);
2660 }
2661
2662 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2663 rec_init_offsets_temp(mrec, dup->index, offsets);
2664
2665 next_mrec = mrec + rec_offs_data_size(offsets);
2666
2667 if (next_mrec > mrec_end) {
2668 return(NULL);
2669 }
2670 }
2671
2672 /* Read virtual column info from log */
2673 if (num_v) {
2674 ulint o_v_size = 0;
2675 ulint n_v_size = 0;
2676
2677 if (next_mrec + 2 > mrec_end) {
2678 return(NULL);
2679 }
2680
2681 n_v_size = mach_read_from_2(next_mrec);
2682 next_mrec += n_v_size;
2683 if (next_mrec > mrec_end) {
2684 return(NULL);
2685 }
2686
2687 /* if there is more than 2 bytes length info */
2688 if (n_v_size > 2) {
2689 trx_undo_read_v_cols(
2690 log->table, const_cast<byte*>(
2691 next_mrec), old_pk, false,
2692 &(log->col_map[log->n_old_col]));
2693 o_v_size = mach_read_from_2(next_mrec);
2694 }
2695
2696 next_mrec += o_v_size;
2697 if (next_mrec > mrec_end) {
2698 return(NULL);
2699 }
2700 }
2701
2702 ut_ad(next_mrec <= mrec_end);
2703 log->head.total += next_mrec - mrec_start;
2704 dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2705
2706 {
2707 ulint len;
2708 const byte* db_trx_id
2709 = rec_get_nth_field(
2710 mrec, offsets, trx_id_col, &len);
2711 ut_ad(len == DATA_TRX_ID_LEN);
2712 *error = row_log_table_apply_update(
2713 thr, new_trx_id_col,
2714 mrec, offsets, offsets_heap,
2715 heap, dup, trx_read_trx_id(db_trx_id), old_pk);
2716 }
2717
2718 break;
2719 }
2720
2721 ut_ad(log->head.total <= log->tail.total);
2722 mem_heap_empty(offsets_heap);
2723 mem_heap_empty(heap);
2724 return(next_mrec);
2725 }
2726
2727 #ifdef HAVE_PSI_STAGE_INTERFACE
2728 /** Estimate how much an ALTER TABLE progress should be incremented per
2729 one block of log applied.
2730 For the other phases of ALTER TABLE we increment the progress with 1 per
2731 page processed.
2732 @return amount of abstract units to add to work_completed when one block
2733 of log is applied.
2734 */
2735 inline
2736 ulint
row_log_progress_inc_per_block()2737 row_log_progress_inc_per_block()
2738 {
2739 /* We must increment the progress once per page (as in
2740 univ_page_size, usually 16KiB). One block here is srv_sort_buf_size
2741 (usually 1MiB). */
2742 const ulint pages_per_block = std::max(
2743 static_cast<unsigned long>(
2744 srv_sort_buf_size / univ_page_size.physical()),
2745 1UL);
2746
2747 /* Multiply by an artificial factor of 6 to even the pace with
2748 the rest of the ALTER TABLE phases, they process page_size amount
2749 of data faster. */
2750 return(pages_per_block * 6);
2751 }
2752
2753 /** Estimate how much work is to be done by the log apply phase
2754 of an ALTER TABLE for this index.
2755 @param[in] index index whose log to assess
2756 @return work to be done by log-apply in abstract units
2757 */
2758 ulint
row_log_estimate_work(const dict_index_t * index)2759 row_log_estimate_work(
2760 const dict_index_t* index)
2761 {
2762 if (index == NULL || index->online_log == NULL) {
2763 return(0);
2764 }
2765
2766 const row_log_t* l = index->online_log;
2767 const ulint bytes_left =
2768 static_cast<ulint>(l->tail.total - l->head.total);
2769 const ulint blocks_left = bytes_left / srv_sort_buf_size;
2770
2771 return(blocks_left * row_log_progress_inc_per_block());
2772 }
2773 #else /* HAVE_PSI_STAGE_INTERFACE */
2774 inline
2775 ulint
row_log_progress_inc_per_block()2776 row_log_progress_inc_per_block()
2777 {
2778 return(0);
2779 }
2780 #endif /* HAVE_PSI_STAGE_INTERFACE */
2781
2782 /** Applies operations to a table was rebuilt.
2783 @param[in] thr query graph
2784 @param[in,out] dup for reporting duplicate key errors
2785 @param[in,out] stage performance schema accounting object, used by
2786 ALTER TABLE. If not NULL, then stage->inc() will be called for each block
2787 of log that is applied.
2788 @return DB_SUCCESS, or error code on failure */
2789 static MY_ATTRIBUTE((warn_unused_result))
2790 dberr_t
row_log_table_apply_ops(que_thr_t * thr,row_merge_dup_t * dup,ut_stage_alter_t * stage)2791 row_log_table_apply_ops(
2792 que_thr_t* thr,
2793 row_merge_dup_t* dup,
2794 ut_stage_alter_t* stage)
2795 {
2796 dberr_t error;
2797 const mrec_t* mrec = NULL;
2798 const mrec_t* next_mrec;
2799 const mrec_t* mrec_end = NULL; /* silence bogus warning */
2800 const mrec_t* next_mrec_end;
2801 mem_heap_t* heap;
2802 mem_heap_t* offsets_heap;
2803 ulint* offsets;
2804 bool has_index_lock;
2805 dict_index_t* index = const_cast<dict_index_t*>(
2806 dup->index);
2807 dict_table_t* new_table = index->online_log->table;
2808 dict_index_t* new_index = dict_table_get_first_index(
2809 new_table);
2810 const ulint i = 1 + REC_OFFS_HEADER_SIZE
2811 + ut_max(dict_index_get_n_fields(index),
2812 dict_index_get_n_unique(new_index) + 2);
2813 const ulint trx_id_col = dict_col_get_clust_pos(
2814 dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
2815 const ulint new_trx_id_col = dict_col_get_clust_pos(
2816 dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2817 trx_t* trx = thr_get_trx(thr);
2818 dberr_t err;
2819
2820 ut_ad(dict_index_is_clust(index));
2821 ut_ad(dict_index_is_online_ddl(index));
2822 ut_ad(trx->mysql_thd);
2823 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2824 ut_ad(!dict_index_is_online_ddl(new_index));
2825 ut_ad(trx_id_col > 0);
2826 ut_ad(trx_id_col != ULINT_UNDEFINED);
2827 ut_ad(new_trx_id_col > 0);
2828 ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2829
2830 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2831
2832 offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
2833 offsets[0] = i;
2834 offsets[1] = dict_index_get_n_fields(index);
2835
2836 heap = mem_heap_create(UNIV_PAGE_SIZE);
2837 offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
2838 has_index_lock = true;
2839
2840 next_block:
2841 ut_ad(has_index_lock);
2842 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2843 ut_ad(index->online_log->head.bytes == 0);
2844
2845 stage->inc(row_log_progress_inc_per_block());
2846
2847 if (trx_is_interrupted(trx)) {
2848 goto interrupted;
2849 }
2850
2851 if (dict_index_is_corrupted(index)) {
2852 error = DB_INDEX_CORRUPT;
2853 goto func_exit;
2854 }
2855
2856 ut_ad(dict_index_is_online_ddl(index));
2857
2858 error = index->online_log->error;
2859
2860 if (error != DB_SUCCESS) {
2861 goto func_exit;
2862 }
2863
2864 if (UNIV_UNLIKELY(index->online_log->head.blocks
2865 > index->online_log->tail.blocks)) {
2866 unexpected_eof:
2867 ib::error() << "Unexpected end of temporary file for table "
2868 << index->table->name;
2869 corruption:
2870 error = DB_CORRUPTION;
2871 goto func_exit;
2872 }
2873
2874 if (index->online_log->head.blocks
2875 == index->online_log->tail.blocks) {
2876 if (index->online_log->head.blocks) {
2877 #ifdef HAVE_FTRUNCATE
2878 /* Truncate the file in order to save space. */
2879 if (index->online_log->fd > 0
2880 && ftruncate(index->online_log->fd, 0) == -1) {
2881 perror("ftruncate");
2882 }
2883 #endif /* HAVE_FTRUNCATE */
2884 index->online_log->head.blocks
2885 = index->online_log->tail.blocks = 0;
2886 }
2887
2888 next_mrec = index->online_log->tail.block;
2889 next_mrec_end = next_mrec + index->online_log->tail.bytes;
2890
2891 if (next_mrec_end == next_mrec) {
2892 /* End of log reached. */
2893 all_done:
2894 ut_ad(has_index_lock);
2895 ut_ad(index->online_log->head.blocks == 0);
2896 ut_ad(index->online_log->tail.blocks == 0);
2897 index->online_log->head.bytes = 0;
2898 index->online_log->tail.bytes = 0;
2899 error = DB_SUCCESS;
2900 goto func_exit;
2901 }
2902 } else {
2903 os_offset_t ofs;
2904
2905 ofs = (os_offset_t) index->online_log->head.blocks
2906 * srv_sort_buf_size;
2907
2908 ut_ad(has_index_lock);
2909 has_index_lock = false;
2910 rw_lock_x_unlock(dict_index_get_lock(index));
2911
2912 log_free_check();
2913
2914 ut_ad(dict_index_is_online_ddl(index));
2915
2916 if (!row_log_block_allocate(index->online_log->head)) {
2917 error = DB_OUT_OF_MEMORY;
2918 goto func_exit;
2919 }
2920
2921 IORequest request(IORequest::READ | IORequest::ROW_LOG);
2922 err = os_file_read_no_error_handling_int_fd(
2923 request,
2924 index->online_log->fd,
2925 index->online_log->head.block, ofs,
2926 srv_sort_buf_size,
2927 NULL);
2928
2929 if (err != DB_SUCCESS) {
2930 ib::error()
2931 << "Unable to read temporary file"
2932 " for table " << index->table_name;
2933 goto corruption;
2934 }
2935
2936 #ifdef POSIX_FADV_DONTNEED
2937 /* Each block is read exactly once. Free up the file cache. */
2938 posix_fadvise(index->online_log->fd,
2939 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2940 #endif /* POSIX_FADV_DONTNEED */
2941
2942 next_mrec = index->online_log->head.block;
2943 next_mrec_end = next_mrec + srv_sort_buf_size;
2944 }
2945
2946 /* This read is not protected by index->online_log->mutex for
2947 performance reasons. We will eventually notice any error that
2948 was flagged by a DML thread. */
2949 error = index->online_log->error;
2950
2951 if (error != DB_SUCCESS) {
2952 goto func_exit;
2953 }
2954
2955 if (mrec) {
2956 /* A partial record was read from the previous block.
2957 Copy the temporary buffer full, as we do not know the
2958 length of the record. Parse subsequent records from
2959 the bigger buffer index->online_log->head.block
2960 or index->online_log->tail.block. */
2961
2962 ut_ad(mrec == index->online_log->head.buf);
2963 ut_ad(mrec_end > mrec);
2964 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2965
2966 memcpy((mrec_t*) mrec_end, next_mrec,
2967 (&index->online_log->head.buf)[1] - mrec_end);
2968 mrec = row_log_table_apply_op(
2969 thr, trx_id_col, new_trx_id_col,
2970 dup, &error, offsets_heap, heap,
2971 index->online_log->head.buf,
2972 (&index->online_log->head.buf)[1], offsets);
2973 if (error != DB_SUCCESS) {
2974 goto func_exit;
2975 } else if (UNIV_UNLIKELY(mrec == NULL)) {
2976 /* The record was not reassembled properly. */
2977 goto corruption;
2978 }
2979 /* The record was previously found out to be
2980 truncated. Now that the parse buffer was extended,
2981 it should proceed beyond the old end of the buffer. */
2982 ut_a(mrec > mrec_end);
2983
2984 index->online_log->head.bytes = mrec - mrec_end;
2985 next_mrec += index->online_log->head.bytes;
2986 }
2987
2988 ut_ad(next_mrec <= next_mrec_end);
2989 /* The following loop must not be parsing the temporary
2990 buffer, but head.block or tail.block. */
2991
2992 /* mrec!=NULL means that the next record starts from the
2993 middle of the block */
2994 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2995
2996 #ifdef UNIV_DEBUG
2997 if (next_mrec_end == index->online_log->head.block
2998 + srv_sort_buf_size) {
2999 /* If tail.bytes == 0, next_mrec_end can also be at
3000 the end of tail.block. */
3001 if (index->online_log->tail.bytes == 0) {
3002 ut_ad(next_mrec == next_mrec_end);
3003 ut_ad(index->online_log->tail.blocks == 0);
3004 ut_ad(index->online_log->head.blocks == 0);
3005 ut_ad(index->online_log->head.bytes == 0);
3006 } else {
3007 ut_ad(next_mrec == index->online_log->head.block
3008 + index->online_log->head.bytes);
3009 ut_ad(index->online_log->tail.blocks
3010 > index->online_log->head.blocks);
3011 }
3012 } else if (next_mrec_end == index->online_log->tail.block
3013 + index->online_log->tail.bytes) {
3014 ut_ad(next_mrec == index->online_log->tail.block
3015 + index->online_log->head.bytes);
3016 ut_ad(index->online_log->tail.blocks == 0);
3017 ut_ad(index->online_log->head.blocks == 0);
3018 ut_ad(index->online_log->head.bytes
3019 <= index->online_log->tail.bytes);
3020 } else {
3021 ut_error;
3022 }
3023 #endif /* UNIV_DEBUG */
3024
3025 mrec_end = next_mrec_end;
3026
3027 while (!trx_is_interrupted(trx)) {
3028 mrec = next_mrec;
3029 ut_ad(mrec <= mrec_end);
3030
3031 if (mrec == mrec_end) {
3032 /* We are at the end of the log.
3033 Mark the replay all_done. */
3034 if (has_index_lock) {
3035 goto all_done;
3036 }
3037 }
3038
3039 if (!has_index_lock) {
3040 /* We are applying operations from a different
3041 block than the one that is being written to.
3042 We do not hold index->lock in order to
3043 allow other threads to concurrently buffer
3044 modifications. */
3045 ut_ad(mrec >= index->online_log->head.block);
3046 ut_ad(mrec_end == index->online_log->head.block
3047 + srv_sort_buf_size);
3048 ut_ad(index->online_log->head.bytes
3049 < srv_sort_buf_size);
3050
3051 /* Take the opportunity to do a redo log
3052 checkpoint if needed. */
3053 log_free_check();
3054 } else {
3055 /* We are applying operations from the last block.
3056 Do not allow other threads to buffer anything,
3057 so that we can finally catch up and synchronize. */
3058 ut_ad(index->online_log->head.blocks == 0);
3059 ut_ad(index->online_log->tail.blocks == 0);
3060 ut_ad(mrec_end == index->online_log->tail.block
3061 + index->online_log->tail.bytes);
3062 ut_ad(mrec >= index->online_log->tail.block);
3063 }
3064
3065 /* This read is not protected by index->online_log->mutex
3066 for performance reasons. We will eventually notice any
3067 error that was flagged by a DML thread. */
3068 error = index->online_log->error;
3069
3070 if (error != DB_SUCCESS) {
3071 goto func_exit;
3072 }
3073
3074 next_mrec = row_log_table_apply_op(
3075 thr, trx_id_col, new_trx_id_col,
3076 dup, &error, offsets_heap, heap,
3077 mrec, mrec_end, offsets);
3078
3079 if (error != DB_SUCCESS) {
3080 goto func_exit;
3081 } else if (next_mrec == next_mrec_end) {
3082 /* The record happened to end on a block boundary.
3083 Do we have more blocks left? */
3084 if (has_index_lock) {
3085 /* The index will be locked while
3086 applying the last block. */
3087 goto all_done;
3088 }
3089
3090 mrec = NULL;
3091 process_next_block:
3092 rw_lock_x_lock(dict_index_get_lock(index));
3093 has_index_lock = true;
3094
3095 index->online_log->head.bytes = 0;
3096 index->online_log->head.blocks++;
3097 goto next_block;
3098 } else if (next_mrec != NULL) {
3099 ut_ad(next_mrec < next_mrec_end);
3100 index->online_log->head.bytes += next_mrec - mrec;
3101 } else if (has_index_lock) {
3102 /* When mrec is within tail.block, it should
3103 be a complete record, because we are holding
3104 index->lock and thus excluding the writer. */
3105 ut_ad(index->online_log->tail.blocks == 0);
3106 ut_ad(mrec_end == index->online_log->tail.block
3107 + index->online_log->tail.bytes);
3108 ut_ad(0);
3109 goto unexpected_eof;
3110 } else {
3111 memcpy(index->online_log->head.buf, mrec,
3112 mrec_end - mrec);
3113 mrec_end += index->online_log->head.buf - mrec;
3114 mrec = index->online_log->head.buf;
3115 goto process_next_block;
3116 }
3117 }
3118
3119 interrupted:
3120 error = DB_INTERRUPTED;
3121 func_exit:
3122 if (!has_index_lock) {
3123 rw_lock_x_lock(dict_index_get_lock(index));
3124 }
3125
3126 mem_heap_free(offsets_heap);
3127 mem_heap_free(heap);
3128 row_log_block_free(index->online_log->head);
3129 ut_free(offsets);
3130 return(error);
3131 }
3132
3133 /** Apply the row_log_table log to a table upon completing rebuild.
3134 @param[in] thr query graph
3135 @param[in] old_table old table
3136 @param[in,out] table MySQL table (for reporting duplicates)
3137 @param[in,out] stage performance schema accounting object, used by
3138 ALTER TABLE. stage->begin_phase_log_table() will be called initially and then
3139 stage->inc() will be called for each block of log that is applied.
3140 @return DB_SUCCESS, or error code on failure */
3141 dberr_t
row_log_table_apply(que_thr_t * thr,dict_table_t * old_table,struct TABLE * table,ut_stage_alter_t * stage)3142 row_log_table_apply(
3143 que_thr_t* thr,
3144 dict_table_t* old_table,
3145 struct TABLE* table,
3146 ut_stage_alter_t* stage)
3147 {
3148 dberr_t error;
3149 dict_index_t* clust_index;
3150
3151 thr_get_trx(thr)->error_key_num = 0;
3152 DBUG_EXECUTE_IF("innodb_trx_duplicates",
3153 thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;);
3154
3155 stage->begin_phase_log_table();
3156
3157 ut_ad(!rw_lock_own(dict_operation_lock, RW_LOCK_S));
3158 clust_index = dict_table_get_first_index(old_table);
3159
3160 rw_lock_x_lock(dict_index_get_lock(clust_index));
3161
3162 if (!clust_index->online_log) {
3163 ut_ad(dict_index_get_online_status(clust_index)
3164 == ONLINE_INDEX_COMPLETE);
3165 /* This function should not be called unless
3166 rebuilding a table online. Build in some fault
3167 tolerance. */
3168 ut_ad(0);
3169 error = DB_ERROR;
3170 } else {
3171 row_merge_dup_t dup = {
3172 clust_index, table,
3173 clust_index->online_log->col_map, 0
3174 };
3175
3176 error = row_log_table_apply_ops(thr, &dup, stage);
3177
3178 ut_ad(error != DB_SUCCESS
3179 || clust_index->online_log->head.total
3180 == clust_index->online_log->tail.total);
3181 }
3182
3183 rw_lock_x_unlock(dict_index_get_lock(clust_index));
3184 DBUG_EXECUTE_IF("innodb_trx_duplicates",
3185 thr_get_trx(thr)->duplicates = 0;);
3186
3187 return(error);
3188 }
3189
3190 /******************************************************//**
3191 Allocate the row log for an index and flag the index
3192 for online creation.
3193 @retval true if success, false if not */
3194 bool
row_log_allocate(dict_index_t * index,dict_table_t * table,bool same_pk,const dtuple_t * add_cols,const ulint * col_map,const char * path)3195 row_log_allocate(
3196 /*=============*/
3197 dict_index_t* index, /*!< in/out: index */
3198 dict_table_t* table, /*!< in/out: new table being rebuilt,
3199 or NULL when creating a secondary index */
3200 bool same_pk,/*!< in: whether the definition of the
3201 PRIMARY KEY has remained the same */
3202 const dtuple_t* add_cols,
3203 /*!< in: default values of
3204 added columns, or NULL */
3205 const ulint* col_map,/*!< in: mapping of old column
3206 numbers to new ones, or NULL if !table */
3207 const char* path) /*!< in: where to create temporary file */
3208 {
3209 row_log_t* log;
3210 DBUG_ENTER("row_log_allocate");
3211
3212 ut_ad(!dict_index_is_online_ddl(index));
3213 ut_ad(dict_index_is_clust(index) == !!table);
3214 ut_ad(!table || index->table != table);
3215 ut_ad(same_pk || table);
3216 ut_ad(!table || col_map);
3217 ut_ad(!add_cols || col_map);
3218 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3219
3220 log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log));
3221
3222 if (log == NULL) {
3223 DBUG_RETURN(false);
3224 }
3225
3226 log->fd = -1;
3227 mutex_create(LATCH_ID_INDEX_ONLINE_LOG, &log->mutex);
3228
3229 log->blobs = NULL;
3230 log->table = table;
3231 log->same_pk = same_pk;
3232 log->add_cols = add_cols;
3233 log->col_map = col_map;
3234 log->error = DB_SUCCESS;
3235 log->max_trx = 0;
3236 log->tail.blocks = log->tail.bytes = 0;
3237 log->tail.total = 0;
3238 log->tail.block = log->head.block = NULL;
3239 log->head.blocks = log->head.bytes = 0;
3240 log->head.total = 0;
3241 log->path = path;
3242 log->n_old_col = index->table->n_cols;
3243 log->n_old_vcol = index->table->n_v_cols;
3244
3245 dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
3246 index->online_log = log;
3247
3248 /* While we might be holding an exclusive data dictionary lock
3249 here, in row_log_abort_sec() we will not always be holding it. Use
3250 atomic operations in both cases. */
3251 MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
3252
3253 DBUG_RETURN(true);
3254 }
3255
3256 /******************************************************//**
3257 Free the row log for an index that was being created online. */
3258 void
row_log_free(row_log_t * & log)3259 row_log_free(
3260 /*=========*/
3261 row_log_t*& log) /*!< in,own: row log */
3262 {
3263 MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
3264
3265 UT_DELETE(log->blobs);
3266 row_log_block_free(log->tail);
3267 row_log_block_free(log->head);
3268 row_merge_file_destroy_low(log->fd);
3269 mutex_free(&log->mutex);
3270 ut_free(log);
3271 log = NULL;
3272 }
3273
3274 /******************************************************//**
3275 Get the latest transaction ID that has invoked row_log_online_op()
3276 during online creation.
3277 @return latest transaction ID, or 0 if nothing was logged */
3278 trx_id_t
row_log_get_max_trx(dict_index_t * index)3279 row_log_get_max_trx(
3280 /*================*/
3281 dict_index_t* index) /*!< in: index, must be locked */
3282 {
3283 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
3284
3285 ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
3286 && mutex_own(&index->online_log->mutex))
3287 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3288
3289 return(index->online_log->max_trx);
3290 }
3291
3292 /******************************************************//**
3293 Applies an operation to a secondary index that was being created. */
3294 static MY_ATTRIBUTE((nonnull))
3295 void
row_log_apply_op_low(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,bool has_index_lock,enum row_op op,trx_id_t trx_id,const dtuple_t * entry)3296 row_log_apply_op_low(
3297 /*=================*/
3298 dict_index_t* index, /*!< in/out: index */
3299 row_merge_dup_t*dup, /*!< in/out: for reporting
3300 duplicate key errors */
3301 dberr_t* error, /*!< out: DB_SUCCESS or error code */
3302 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
3303 allocating offsets; can be emptied */
3304 bool has_index_lock, /*!< in: true if holding index->lock
3305 in exclusive mode */
3306 enum row_op op, /*!< in: operation being applied */
3307 trx_id_t trx_id, /*!< in: transaction identifier */
3308 const dtuple_t* entry) /*!< in: row */
3309 {
3310 mtr_t mtr;
3311 btr_cur_t cursor;
3312 ulint* offsets = NULL;
3313
3314 ut_ad(!dict_index_is_clust(index));
3315
3316 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3317 == has_index_lock);
3318
3319 ut_ad(!dict_index_is_corrupted(index));
3320 ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
3321
3322 DBUG_PRINT("ib_create_index",
3323 ("%s %s index " IB_ID_FMT "," TRX_ID_FMT ": %s",
3324 op == ROW_OP_INSERT ? "insert" : "delete",
3325 has_index_lock ? "locked" : "unlocked",
3326 index->id, trx_id,
3327 rec_printer(entry).str().c_str()));
3328
3329 mtr_start(&mtr);
3330 mtr.set_named_space(index->space);
3331
3332 /* We perform the pessimistic variant of the operations if we
3333 already hold index->lock exclusively. First, search the
3334 record. The operation may already have been performed,
3335 depending on when the row in the clustered index was
3336 scanned. */
3337 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
3338 has_index_lock
3339 ? BTR_MODIFY_TREE
3340 : BTR_MODIFY_LEAF,
3341 &cursor, 0, __FILE__, __LINE__,
3342 &mtr);
3343
3344 ut_ad(dict_index_get_n_unique(index) > 0);
3345 /* This test is somewhat similar to row_ins_must_modify_rec(),
3346 but not identical for unique secondary indexes. */
3347 if (cursor.low_match >= dict_index_get_n_unique(index)
3348 && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
3349 /* We have a matching record. */
3350 bool exists = (cursor.low_match
3351 == dict_index_get_n_fields(index));
3352 #ifdef UNIV_DEBUG
3353 rec_t* rec = btr_cur_get_rec(&cursor);
3354 ut_ad(page_rec_is_user_rec(rec));
3355 ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
3356 #endif /* UNIV_DEBUG */
3357
3358 ut_ad(exists || dict_index_is_unique(index));
3359
3360 switch (op) {
3361 case ROW_OP_DELETE:
3362 if (!exists) {
3363 /* The existing record matches the
3364 unique secondary index key, but the
3365 PRIMARY KEY columns differ. So, this
3366 exact record does not exist. For
3367 example, we could detect a duplicate
3368 key error in some old index before
3369 logging an ROW_OP_INSERT for our
3370 index. This ROW_OP_DELETE could have
3371 been logged for rolling back
3372 TRX_UNDO_INSERT_REC. */
3373 goto func_exit;
3374 }
3375
3376 if (btr_cur_optimistic_delete(
3377 &cursor, BTR_CREATE_FLAG, &mtr)) {
3378 *error = DB_SUCCESS;
3379 break;
3380 }
3381
3382 if (!has_index_lock) {
3383 /* This needs a pessimistic operation.
3384 Lock the index tree exclusively. */
3385 mtr_commit(&mtr);
3386 mtr_start(&mtr);
3387 mtr.set_named_space(index->space);
3388 btr_cur_search_to_nth_level(
3389 index, 0, entry, PAGE_CUR_LE,
3390 BTR_MODIFY_TREE, &cursor, 0,
3391 __FILE__, __LINE__, &mtr);
3392
3393 /* No other thread than the current one
3394 is allowed to modify the index tree.
3395 Thus, the record should still exist. */
3396 ut_ad(cursor.low_match
3397 >= dict_index_get_n_fields(index));
3398 ut_ad(page_rec_is_user_rec(
3399 btr_cur_get_rec(&cursor)));
3400 }
3401
3402 /* As there are no externally stored fields in
3403 a secondary index record, the parameter
3404 rollback=false will be ignored. */
3405
3406 btr_cur_pessimistic_delete(
3407 error, FALSE, &cursor,
3408 BTR_CREATE_FLAG, false, &mtr);
3409 break;
3410 case ROW_OP_INSERT:
3411 if (exists) {
3412 /* The record already exists. There
3413 is nothing to be inserted.
3414 This could happen when processing
3415 TRX_UNDO_DEL_MARK_REC in statement
3416 rollback:
3417
3418 UPDATE of PRIMARY KEY can lead to
3419 statement rollback if the updated
3420 value of the PRIMARY KEY already
3421 exists. In this case, the UPDATE would
3422 be mapped to DELETE;INSERT, and we
3423 only wrote undo log for the DELETE
3424 part. The duplicate key error would be
3425 triggered before logging the INSERT
3426 part.
3427
3428 Theoretically, we could also get a
3429 similar situation when a DELETE operation
3430 is blocked by a FOREIGN KEY constraint. */
3431 goto func_exit;
3432 }
3433
3434 if (dtuple_contains_null(entry)) {
3435 /* The UNIQUE KEY columns match, but
3436 there is a NULL value in the key, and
3437 NULL!=NULL. */
3438 goto insert_the_rec;
3439 }
3440
3441 goto duplicate;
3442 }
3443 } else {
3444 switch (op) {
3445 rec_t* rec;
3446 big_rec_t* big_rec;
3447 case ROW_OP_DELETE:
3448 /* The record does not exist. For example, we
3449 could detect a duplicate key error in some old
3450 index before logging an ROW_OP_INSERT for our
3451 index. This ROW_OP_DELETE could be logged for
3452 rolling back TRX_UNDO_INSERT_REC. */
3453 goto func_exit;
3454 case ROW_OP_INSERT:
3455 if (dict_index_is_unique(index)
3456 && (cursor.up_match
3457 >= dict_index_get_n_unique(index)
3458 || cursor.low_match
3459 >= dict_index_get_n_unique(index))
3460 && (!index->n_nullable
3461 || !dtuple_contains_null(entry))) {
3462 duplicate:
3463 /* Duplicate key */
3464 ut_ad(dict_index_is_unique(index));
3465 row_merge_dup_report(dup, entry->fields);
3466 *error = DB_DUPLICATE_KEY;
3467 goto func_exit;
3468 }
3469 insert_the_rec:
3470 /* Insert the record. As we are inserting into
3471 a secondary index, there cannot be externally
3472 stored columns (!big_rec). */
3473 *error = btr_cur_optimistic_insert(
3474 BTR_NO_UNDO_LOG_FLAG
3475 | BTR_NO_LOCKING_FLAG
3476 | BTR_CREATE_FLAG,
3477 &cursor, &offsets, &offsets_heap,
3478 const_cast<dtuple_t*>(entry),
3479 &rec, &big_rec, 0, NULL, &mtr);
3480 ut_ad(!big_rec);
3481 if (*error != DB_FAIL) {
3482 break;
3483 }
3484
3485 if (!has_index_lock) {
3486 /* This needs a pessimistic operation.
3487 Lock the index tree exclusively. */
3488 mtr_commit(&mtr);
3489 mtr_start(&mtr);
3490 mtr.set_named_space(index->space);
3491 btr_cur_search_to_nth_level(
3492 index, 0, entry, PAGE_CUR_LE,
3493 BTR_MODIFY_TREE, &cursor, 0,
3494 __FILE__, __LINE__, &mtr);
3495 }
3496
3497 /* We already determined that the
3498 record did not exist. No other thread
3499 than the current one is allowed to
3500 modify the index tree. Thus, the
3501 record should still not exist. */
3502
3503 *error = btr_cur_pessimistic_insert(
3504 BTR_NO_UNDO_LOG_FLAG
3505 | BTR_NO_LOCKING_FLAG
3506 | BTR_CREATE_FLAG,
3507 &cursor, &offsets, &offsets_heap,
3508 const_cast<dtuple_t*>(entry),
3509 &rec, &big_rec,
3510 0, NULL, &mtr);
3511 ut_ad(!big_rec);
3512 break;
3513 }
3514 mem_heap_empty(offsets_heap);
3515 }
3516
3517 if (*error == DB_SUCCESS && trx_id) {
3518 page_update_max_trx_id(btr_cur_get_block(&cursor),
3519 btr_cur_get_page_zip(&cursor),
3520 trx_id, &mtr);
3521 }
3522
3523 func_exit:
3524 mtr_commit(&mtr);
3525 }
3526
3527 /******************************************************//**
3528 Applies an operation to a secondary index that was being created.
3529 @return NULL on failure (mrec corruption) or when out of data;
3530 pointer to next record on success */
3531 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3532 const mrec_t*
row_log_apply_op(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,bool has_index_lock,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)3533 row_log_apply_op(
3534 /*=============*/
3535 dict_index_t* index, /*!< in/out: index */
3536 row_merge_dup_t*dup, /*!< in/out: for reporting
3537 duplicate key errors */
3538 dberr_t* error, /*!< out: DB_SUCCESS or error code */
3539 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
3540 allocating offsets; can be emptied */
3541 mem_heap_t* heap, /*!< in/out: memory heap for
3542 allocating data tuples */
3543 bool has_index_lock, /*!< in: true if holding index->lock
3544 in exclusive mode */
3545 const mrec_t* mrec, /*!< in: merge record */
3546 const mrec_t* mrec_end, /*!< in: end of buffer */
3547 ulint* offsets) /*!< in/out: work area for
3548 rec_init_offsets_temp() */
3549
3550 {
3551 enum row_op op;
3552 ulint extra_size;
3553 ulint data_size;
3554 ulint n_ext;
3555 dtuple_t* entry;
3556 trx_id_t trx_id;
3557
3558 /* Online index creation is only used for secondary indexes. */
3559 ut_ad(!dict_index_is_clust(index));
3560
3561 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3562 == has_index_lock);
3563
3564 if (dict_index_is_corrupted(index)) {
3565 *error = DB_INDEX_CORRUPT;
3566 return(NULL);
3567 }
3568
3569 *error = DB_SUCCESS;
3570
3571 if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
3572 return(NULL);
3573 }
3574
3575 switch (*mrec) {
3576 case ROW_OP_INSERT:
3577 if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
3578 return(NULL);
3579 }
3580
3581 op = static_cast<enum row_op>(*mrec++);
3582 trx_id = trx_read_trx_id(mrec);
3583 mrec += DATA_TRX_ID_LEN;
3584 break;
3585 case ROW_OP_DELETE:
3586 op = static_cast<enum row_op>(*mrec++);
3587 trx_id = 0;
3588 break;
3589 default:
3590 corrupted:
3591 ut_ad(0);
3592 *error = DB_CORRUPTION;
3593 return(NULL);
3594 }
3595
3596 extra_size = *mrec++;
3597
3598 ut_ad(mrec < mrec_end);
3599
3600 if (extra_size >= 0x80) {
3601 /* Read another byte of extra_size. */
3602
3603 extra_size = (extra_size & 0x7f) << 8;
3604 extra_size |= *mrec++;
3605 }
3606
3607 mrec += extra_size;
3608
3609 if (mrec > mrec_end) {
3610 return(NULL);
3611 }
3612
3613 rec_init_offsets_temp(mrec, index, offsets);
3614
3615 if (rec_offs_any_extern(offsets)) {
3616 /* There should never be any externally stored fields
3617 in a secondary index, which is what online index
3618 creation is used for. Therefore, the log file must be
3619 corrupted. */
3620 goto corrupted;
3621 }
3622
3623 data_size = rec_offs_data_size(offsets);
3624
3625 mrec += data_size;
3626
3627 if (mrec > mrec_end) {
3628 return(NULL);
3629 }
3630
3631 entry = row_rec_to_index_entry_low(
3632 mrec - data_size, index, offsets, &n_ext, heap);
3633 /* Online index creation is only implemented for secondary
3634 indexes, which never contain off-page columns. */
3635 ut_ad(n_ext == 0);
3636
3637 row_log_apply_op_low(index, dup, error, offsets_heap,
3638 has_index_lock, op, trx_id, entry);
3639 return(mrec);
3640 }
3641
3642 /** Applies operations to a secondary index that was being created.
3643 @param[in] trx transaction (for checking if the operation was
3644 interrupted)
3645 @param[in,out] index index
3646 @param[in,out] dup for reporting duplicate key errors
3647 @param[in,out] stage performance schema accounting object, used by
3648 ALTER TABLE. If not NULL, then stage->inc() will be called for each block
3649 of log that is applied.
3650 @return DB_SUCCESS, or error code on failure */
3651 static
3652 dberr_t
row_log_apply_ops(const trx_t * trx,dict_index_t * index,row_merge_dup_t * dup,ut_stage_alter_t * stage)3653 row_log_apply_ops(
3654 const trx_t* trx,
3655 dict_index_t* index,
3656 row_merge_dup_t* dup,
3657 ut_stage_alter_t* stage)
3658 {
3659 dberr_t error;
3660 const mrec_t* mrec = NULL;
3661 const mrec_t* next_mrec;
3662 const mrec_t* mrec_end= NULL; /* silence bogus warning */
3663 const mrec_t* next_mrec_end;
3664 mem_heap_t* offsets_heap;
3665 mem_heap_t* heap;
3666 ulint* offsets;
3667 bool has_index_lock;
3668 const ulint i = 1 + REC_OFFS_HEADER_SIZE
3669 + dict_index_get_n_fields(index);
3670
3671 ut_ad(dict_index_is_online_ddl(index));
3672 ut_ad(!index->is_committed());
3673 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3674 ut_ad(index->online_log);
3675 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3676
3677 offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
3678 offsets[0] = i;
3679 offsets[1] = dict_index_get_n_fields(index);
3680
3681 offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
3682 heap = mem_heap_create(UNIV_PAGE_SIZE);
3683 has_index_lock = true;
3684
3685 next_block:
3686 ut_ad(has_index_lock);
3687 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3688 ut_ad(index->online_log->head.bytes == 0);
3689
3690 stage->inc(row_log_progress_inc_per_block());
3691
3692 if (trx_is_interrupted(trx)) {
3693 goto interrupted;
3694 }
3695
3696 error = index->online_log->error;
3697 if (error != DB_SUCCESS) {
3698 goto func_exit;
3699 }
3700
3701 if (dict_index_is_corrupted(index)) {
3702 error = DB_INDEX_CORRUPT;
3703 goto func_exit;
3704 }
3705
3706 if (UNIV_UNLIKELY(index->online_log->head.blocks
3707 > index->online_log->tail.blocks)) {
3708 unexpected_eof:
3709 ib::error() << "Unexpected end of temporary file for index "
3710 << index->name;
3711 corruption:
3712 error = DB_CORRUPTION;
3713 goto func_exit;
3714 }
3715
3716 if (index->online_log->head.blocks
3717 == index->online_log->tail.blocks) {
3718 if (index->online_log->head.blocks) {
3719 #ifdef HAVE_FTRUNCATE
3720 /* Truncate the file in order to save space. */
3721 if (index->online_log->fd > 0
3722 && ftruncate(index->online_log->fd, 0) == -1) {
3723 perror("ftruncate");
3724 }
3725 #endif /* HAVE_FTRUNCATE */
3726 index->online_log->head.blocks
3727 = index->online_log->tail.blocks = 0;
3728 }
3729
3730 next_mrec = index->online_log->tail.block;
3731 next_mrec_end = next_mrec + index->online_log->tail.bytes;
3732
3733 if (next_mrec_end == next_mrec) {
3734 /* End of log reached. */
3735 all_done:
3736 ut_ad(has_index_lock);
3737 ut_ad(index->online_log->head.blocks == 0);
3738 ut_ad(index->online_log->tail.blocks == 0);
3739 error = DB_SUCCESS;
3740 goto func_exit;
3741 }
3742 } else {
3743 os_offset_t ofs;
3744
3745 ofs = (os_offset_t) index->online_log->head.blocks
3746 * srv_sort_buf_size;
3747
3748 ut_ad(has_index_lock);
3749 has_index_lock = false;
3750 rw_lock_x_unlock(dict_index_get_lock(index));
3751
3752 log_free_check();
3753
3754 if (!row_log_block_allocate(index->online_log->head)) {
3755 error = DB_OUT_OF_MEMORY;
3756 goto func_exit;
3757 }
3758
3759 IORequest request(IORequest::READ | IORequest::ROW_LOG);
3760 dberr_t err = os_file_read_no_error_handling_int_fd(
3761 request,
3762 index->online_log->fd,
3763 index->online_log->head.block, ofs,
3764 srv_sort_buf_size,
3765 NULL);
3766
3767 if (err != DB_SUCCESS) {
3768 ib::error()
3769 << "Unable to read temporary file"
3770 " for index " << index->name;
3771 goto corruption;
3772 }
3773
3774 #ifdef POSIX_FADV_DONTNEED
3775 /* Each block is read exactly once. Free up the file cache. */
3776 posix_fadvise(index->online_log->fd,
3777 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3778 #endif /* POSIX_FADV_DONTNEED */
3779
3780 next_mrec = index->online_log->head.block;
3781 next_mrec_end = next_mrec + srv_sort_buf_size;
3782 }
3783
3784 if (mrec) {
3785 /* A partial record was read from the previous block.
3786 Copy the temporary buffer full, as we do not know the
3787 length of the record. Parse subsequent records from
3788 the bigger buffer index->online_log->head.block
3789 or index->online_log->tail.block. */
3790
3791 ut_ad(mrec == index->online_log->head.buf);
3792 ut_ad(mrec_end > mrec);
3793 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3794
3795 memcpy((mrec_t*) mrec_end, next_mrec,
3796 (&index->online_log->head.buf)[1] - mrec_end);
3797 mrec = row_log_apply_op(
3798 index, dup, &error, offsets_heap, heap,
3799 has_index_lock, index->online_log->head.buf,
3800 (&index->online_log->head.buf)[1], offsets);
3801 if (error != DB_SUCCESS) {
3802 goto func_exit;
3803 } else if (UNIV_UNLIKELY(mrec == NULL)) {
3804 /* The record was not reassembled properly. */
3805 goto corruption;
3806 }
3807 /* The record was previously found out to be
3808 truncated. Now that the parse buffer was extended,
3809 it should proceed beyond the old end of the buffer. */
3810 ut_a(mrec > mrec_end);
3811
3812 index->online_log->head.bytes = mrec - mrec_end;
3813 next_mrec += index->online_log->head.bytes;
3814 }
3815
3816 ut_ad(next_mrec <= next_mrec_end);
3817 /* The following loop must not be parsing the temporary
3818 buffer, but head.block or tail.block. */
3819
3820 /* mrec!=NULL means that the next record starts from the
3821 middle of the block */
3822 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3823
3824 #ifdef UNIV_DEBUG
3825 if (next_mrec_end == index->online_log->head.block
3826 + srv_sort_buf_size) {
3827 /* If tail.bytes == 0, next_mrec_end can also be at
3828 the end of tail.block. */
3829 if (index->online_log->tail.bytes == 0) {
3830 ut_ad(next_mrec == next_mrec_end);
3831 ut_ad(index->online_log->tail.blocks == 0);
3832 ut_ad(index->online_log->head.blocks == 0);
3833 ut_ad(index->online_log->head.bytes == 0);
3834 } else {
3835 ut_ad(next_mrec == index->online_log->head.block
3836 + index->online_log->head.bytes);
3837 ut_ad(index->online_log->tail.blocks
3838 > index->online_log->head.blocks);
3839 }
3840 } else if (next_mrec_end == index->online_log->tail.block
3841 + index->online_log->tail.bytes) {
3842 ut_ad(next_mrec == index->online_log->tail.block
3843 + index->online_log->head.bytes);
3844 ut_ad(index->online_log->tail.blocks == 0);
3845 ut_ad(index->online_log->head.blocks == 0);
3846 ut_ad(index->online_log->head.bytes
3847 <= index->online_log->tail.bytes);
3848 } else {
3849 ut_error;
3850 }
3851 #endif /* UNIV_DEBUG */
3852
3853 mrec_end = next_mrec_end;
3854
3855 while (!trx_is_interrupted(trx)) {
3856 mrec = next_mrec;
3857 ut_ad(mrec < mrec_end);
3858
3859 if (!has_index_lock) {
3860 /* We are applying operations from a different
3861 block than the one that is being written to.
3862 We do not hold index->lock in order to
3863 allow other threads to concurrently buffer
3864 modifications. */
3865 ut_ad(mrec >= index->online_log->head.block);
3866 ut_ad(mrec_end == index->online_log->head.block
3867 + srv_sort_buf_size);
3868 ut_ad(index->online_log->head.bytes
3869 < srv_sort_buf_size);
3870
3871 /* Take the opportunity to do a redo log
3872 checkpoint if needed. */
3873 log_free_check();
3874 } else {
3875 /* We are applying operations from the last block.
3876 Do not allow other threads to buffer anything,
3877 so that we can finally catch up and synchronize. */
3878 ut_ad(index->online_log->head.blocks == 0);
3879 ut_ad(index->online_log->tail.blocks == 0);
3880 ut_ad(mrec_end == index->online_log->tail.block
3881 + index->online_log->tail.bytes);
3882 ut_ad(mrec >= index->online_log->tail.block);
3883 }
3884
3885 next_mrec = row_log_apply_op(
3886 index, dup, &error, offsets_heap, heap,
3887 has_index_lock, mrec, mrec_end, offsets);
3888
3889 if (error != DB_SUCCESS) {
3890 goto func_exit;
3891 } else if (next_mrec == next_mrec_end) {
3892 /* The record happened to end on a block boundary.
3893 Do we have more blocks left? */
3894 if (has_index_lock) {
3895 /* The index will be locked while
3896 applying the last block. */
3897 goto all_done;
3898 }
3899
3900 mrec = NULL;
3901 process_next_block:
3902 rw_lock_x_lock(dict_index_get_lock(index));
3903 has_index_lock = true;
3904
3905 index->online_log->head.bytes = 0;
3906 index->online_log->head.blocks++;
3907 goto next_block;
3908 } else if (next_mrec != NULL) {
3909 ut_ad(next_mrec < next_mrec_end);
3910 index->online_log->head.bytes += next_mrec - mrec;
3911 } else if (has_index_lock) {
3912 /* When mrec is within tail.block, it should
3913 be a complete record, because we are holding
3914 index->lock and thus excluding the writer. */
3915 ut_ad(index->online_log->tail.blocks == 0);
3916 ut_ad(mrec_end == index->online_log->tail.block
3917 + index->online_log->tail.bytes);
3918 ut_ad(0);
3919 goto unexpected_eof;
3920 } else {
3921 memcpy(index->online_log->head.buf, mrec,
3922 mrec_end - mrec);
3923 mrec_end += index->online_log->head.buf - mrec;
3924 mrec = index->online_log->head.buf;
3925 goto process_next_block;
3926 }
3927 }
3928
3929 interrupted:
3930 error = DB_INTERRUPTED;
3931 func_exit:
3932 if (!has_index_lock) {
3933 rw_lock_x_lock(dict_index_get_lock(index));
3934 }
3935
3936 switch (error) {
3937 case DB_SUCCESS:
3938 break;
3939 case DB_INDEX_CORRUPT:
3940 if (((os_offset_t) index->online_log->tail.blocks + 1)
3941 * srv_sort_buf_size >= srv_online_max_size) {
3942 /* The log file grew too big. */
3943 error = DB_ONLINE_LOG_TOO_BIG;
3944 }
3945 /* fall through */
3946 default:
3947 /* We set the flag directly instead of invoking
3948 dict_set_corrupted_index_cache_only(index) here,
3949 because the index is not "public" yet. */
3950 index->type |= DICT_CORRUPT;
3951 }
3952
3953 mem_heap_free(heap);
3954 mem_heap_free(offsets_heap);
3955 row_log_block_free(index->online_log->head);
3956 ut_free(offsets);
3957 return(error);
3958 }
3959
3960 /** Apply the row log to the index upon completing index creation.
3961 @param[in] trx transaction (for checking if the operation was
3962 interrupted)
3963 @param[in,out] index secondary index
3964 @param[in,out] table MySQL table (for reporting duplicates)
3965 @param[in,out] stage performance schema accounting object, used by
3966 ALTER TABLE. stage->begin_phase_log_index() will be called initially and then
3967 stage->inc() will be called for each block of log that is applied.
3968 @return DB_SUCCESS, or error code on failure */
3969 dberr_t
row_log_apply(const trx_t * trx,dict_index_t * index,struct TABLE * table,ut_stage_alter_t * stage)3970 row_log_apply(
3971 const trx_t* trx,
3972 dict_index_t* index,
3973 struct TABLE* table,
3974 ut_stage_alter_t* stage)
3975 {
3976 dberr_t error;
3977 row_log_t* log;
3978 row_merge_dup_t dup = { index, table, NULL, 0 };
3979 DBUG_ENTER("row_log_apply");
3980
3981 ut_ad(dict_index_is_online_ddl(index));
3982 ut_ad(!dict_index_is_clust(index));
3983
3984 stage->begin_phase_log_index();
3985
3986 log_free_check();
3987
3988 rw_lock_x_lock(dict_index_get_lock(index));
3989
3990 if (!dict_table_is_corrupted(index->table)) {
3991 error = row_log_apply_ops(trx, index, &dup, stage);
3992 } else {
3993 error = DB_SUCCESS;
3994 }
3995
3996 if (error != DB_SUCCESS) {
3997 ut_a(!dict_table_is_discarded(index->table));
3998 /* We set the flag directly instead of invoking
3999 dict_set_corrupted_index_cache_only(index) here,
4000 because the index is not "public" yet. */
4001 index->type |= DICT_CORRUPT;
4002 index->table->drop_aborted = TRUE;
4003
4004 dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
4005 } else {
4006 ut_ad(dup.n_dup == 0);
4007 dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
4008 }
4009
4010 log = index->online_log;
4011 index->online_log = NULL;
4012 rw_lock_x_unlock(dict_index_get_lock(index));
4013
4014 row_log_free(log);
4015
4016 DBUG_RETURN(error);
4017 }
4018