1 /*****************************************************************************
2
3 Copyright (c) 2011, 2018, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0log.cc
29 Modification log for online index creation and online table rebuild
30
31 Created 2011-05-26 Marko Makela
32 *******************************************************/
33
34 #include "row0log.h"
35
36 #ifdef UNIV_NONINL
37 #include "row0log.ic"
38 #endif
39
40 #include "row0row.h"
41 #include "row0ins.h"
42 #include "row0upd.h"
43 #include "row0merge.h"
44 #include "row0ext.h"
45 #include "data0data.h"
46 #include "que0que.h"
47 #include "handler0alter.h"
48
49 #include<map>
50
51 /** Table row modification operations during online table rebuild.
52 Delete-marked records are not copied to the rebuilt table. */
53 enum row_tab_op {
54 /** Insert a record */
55 ROW_T_INSERT = 0x41,
56 /** Update a record in place */
57 ROW_T_UPDATE,
58 /** Delete (purge) a record */
59 ROW_T_DELETE
60 };
61
62 /** Index record modification operations during online index creation */
63 enum row_op {
64 /** Insert a record */
65 ROW_OP_INSERT = 0x61,
66 /** Delete a record */
67 ROW_OP_DELETE
68 };
69
70 #ifdef UNIV_DEBUG
71 /** Write information about the applied record to the error log */
72 # define ROW_LOG_APPLY_PRINT
73 #endif /* UNIV_DEBUG */
74
75 #ifdef ROW_LOG_APPLY_PRINT
76 /** When set, write information about the applied record to the error log */
77 static bool row_log_apply_print;
78 #endif /* ROW_LOG_APPLY_PRINT */
79
80 /** Size of the modification log entry header, in bytes */
81 #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
82
83 /** Log block for modifications during online ALTER TABLE */
84 struct row_log_buf_t {
85 byte* block; /*!< file block buffer */
86 mrec_buf_t buf; /*!< buffer for accessing a record
87 that spans two blocks */
88 ulint blocks; /*!< current position in blocks */
89 ulint bytes; /*!< current position within block */
90 ulonglong total; /*!< logical position, in bytes from
91 the start of the row_log_table log;
92 0 for row_log_online_op() and
93 row_log_apply(). */
94 ulint size; /*!< allocated size of block */
95 };
96
97 /** Tracks BLOB allocation during online ALTER TABLE */
98 class row_log_table_blob_t {
99 public:
100 /** Constructor (declaring a BLOB freed)
101 @param offset_arg row_log_t::tail::total */
102 #ifdef UNIV_DEBUG
row_log_table_blob_t(ulonglong offset_arg)103 row_log_table_blob_t(ulonglong offset_arg) :
104 old_offset (0), free_offset (offset_arg),
105 offset (BLOB_FREED) {}
106 #else /* UNIV_DEBUG */
107 row_log_table_blob_t() :
108 offset (BLOB_FREED) {}
109 #endif /* UNIV_DEBUG */
110
111 /** Declare a BLOB freed again.
112 @param offset_arg row_log_t::tail::total */
113 #ifdef UNIV_DEBUG
blob_free(ulonglong offset_arg)114 void blob_free(ulonglong offset_arg)
115 #else /* UNIV_DEBUG */
116 void blob_free()
117 #endif /* UNIV_DEBUG */
118 {
119 ut_ad(offset < offset_arg);
120 ut_ad(offset != BLOB_FREED);
121 ut_d(old_offset = offset);
122 ut_d(free_offset = offset_arg);
123 offset = BLOB_FREED;
124 }
125 /** Declare a freed BLOB reused.
126 @param offset_arg row_log_t::tail::total */
blob_alloc(ulonglong offset_arg)127 void blob_alloc(ulonglong offset_arg) {
128 ut_ad(free_offset <= offset_arg);
129 ut_d(old_offset = offset);
130 offset = offset_arg;
131 }
132 /** Determine if a BLOB was freed at a given log position
133 @param offset_arg row_log_t::head::total after the log record
134 @return true if freed */
is_freed(ulonglong offset_arg) const135 bool is_freed(ulonglong offset_arg) const {
136 /* This is supposed to be the offset at the end of the
137 current log record. */
138 ut_ad(offset_arg > 0);
139 /* We should never get anywhere close the magic value. */
140 ut_ad(offset_arg < BLOB_FREED);
141 return(offset_arg < offset);
142 }
143 private:
144 /** Magic value for a freed BLOB */
145 static const ulonglong BLOB_FREED = ~0ULL;
146 #ifdef UNIV_DEBUG
147 /** Old offset, in case a page was freed, reused, freed, ... */
148 ulonglong old_offset;
149 /** Offset of last blob_free() */
150 ulonglong free_offset;
151 #endif /* UNIV_DEBUG */
152 /** Byte offset to the log file */
153 ulonglong offset;
154 };
155
156 /** @brief Map of off-page column page numbers to 0 or log byte offsets.
157
158 If there is no mapping for a page number, it is safe to access.
159 If a page number maps to 0, it is an off-page column that has been freed.
160 If a page number maps to a nonzero number, the number is a byte offset
161 into the index->online_log, indicating that the page is safe to access
162 when applying log records starting from that offset. */
163 typedef std::map<ulint, row_log_table_blob_t> page_no_map;
164
165 /** @brief Buffer for logging modifications during online index creation
166
167 All modifications to an index that is being created will be logged by
168 row_log_online_op() to this buffer.
169
170 All modifications to a table that is being rebuilt will be logged by
171 row_log_table_delete(), row_log_table_update(), row_log_table_insert()
172 to this buffer.
173
174 When head.blocks == tail.blocks, the reader will access tail.block
175 directly. When also head.bytes == tail.bytes, both counts will be
176 reset to 0 and the file will be truncated. */
177 struct row_log_t {
178 int fd; /*!< file descriptor */
179 ib_mutex_t mutex; /*!< mutex protecting error,
180 max_trx and tail */
181 page_no_map* blobs; /*!< map of page numbers of off-page columns
182 that have been freed during table-rebuilding
183 ALTER TABLE (row_log_table_*); protected by
184 index->lock X-latch only */
185 dict_table_t* table; /*!< table that is being rebuilt,
186 or NULL when this is a secondary
187 index that is being created online */
188 bool same_pk;/*!< whether the definition of the PRIMARY KEY
189 has remained the same */
190 const dtuple_t* add_cols;
191 /*!< default values of added columns, or NULL */
192 const ulint* col_map;/*!< mapping of old column numbers to
193 new ones, or NULL if !table */
194 dberr_t error; /*!< error that occurred during online
195 table rebuild */
196 trx_id_t max_trx;/*!< biggest observed trx_id in
197 row_log_online_op();
198 protected by mutex and index->lock S-latch,
199 or by index->lock X-latch only */
200 row_log_buf_t tail; /*!< writer context;
201 protected by mutex and index->lock S-latch,
202 or by index->lock X-latch only */
203 row_log_buf_t head; /*!< reader context; protected by MDL only;
204 modifiable by row_log_apply_ops() */
205 const char* path; /*!< where to create temporary file during
206 log operation */
207 };
208
209 /** Create the file or online log if it does not exist.
210 @param[in,out] log online rebuild log
211 @return file descriptor. */
212 static MY_ATTRIBUTE((warn_unused_result))
213 int
row_log_tmpfile(row_log_t * log)214 row_log_tmpfile(
215 row_log_t* log)
216 {
217 DBUG_ENTER("row_log_tmpfile");
218 if (log->fd < 0) {
219 log->fd = row_merge_file_create_low(log->path);
220 }
221
222 DBUG_RETURN(log->fd);
223 }
224
225 /** Allocate the memory for the log buffer.
226 @param[in,out] log_buf Buffer used for log operation
227 @return TRUE if success, false if not */
228 static MY_ATTRIBUTE((warn_unused_result))
229 bool
row_log_block_allocate(row_log_buf_t & log_buf)230 row_log_block_allocate(
231 row_log_buf_t& log_buf)
232 {
233 DBUG_ENTER("row_log_block_allocate");
234 if (log_buf.block == NULL) {
235 log_buf.size = srv_sort_buf_size;
236 log_buf.block = (byte*) os_mem_alloc_large(&log_buf.size, false);
237 DBUG_EXECUTE_IF("simulate_row_log_allocation_failure",
238 if (log_buf.block)
239 os_mem_free_large(log_buf.block, log_buf.size);
240 log_buf.block = NULL;);
241 if (!log_buf.block) {
242 DBUG_RETURN(false);
243 }
244 }
245 DBUG_RETURN(true);
246 }
247
248 /** Free the log buffer.
249 @param[in,out] log_buf Buffer used for log operation */
250 static
251 void
row_log_block_free(row_log_buf_t & log_buf)252 row_log_block_free(
253 row_log_buf_t& log_buf)
254 {
255 DBUG_ENTER("row_log_block_free");
256 if (log_buf.block != NULL) {
257 os_mem_free_large(log_buf.block, log_buf.size);
258 log_buf.block = NULL;
259 }
260 DBUG_VOID_RETURN;
261 }
262
263 /******************************************************//**
264 Logs an operation to a secondary index that is (or was) being created. */
265 UNIV_INTERN
266 void
row_log_online_op(dict_index_t * index,const dtuple_t * tuple,trx_id_t trx_id)267 row_log_online_op(
268 /*==============*/
269 dict_index_t* index, /*!< in/out: index, S or X latched */
270 const dtuple_t* tuple, /*!< in: index tuple */
271 trx_id_t trx_id) /*!< in: transaction ID for insert,
272 or 0 for delete */
273 {
274 byte* b;
275 ulint extra_size;
276 ulint size;
277 ulint mrec_size;
278 ulint avail_size;
279 row_log_t* log;
280
281 ut_ad(dtuple_validate(tuple));
282 ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
283 #ifdef UNIV_SYNC_DEBUG
284 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
285 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
286 #endif /* UNIV_SYNC_DEBUG */
287
288 if (dict_index_is_corrupted(index)) {
289 return;
290 }
291
292 ut_ad(dict_index_is_online_ddl(index));
293
294 /* Compute the size of the record. This differs from
295 row_merge_buf_encode(), because here we do not encode
296 extra_size+1 (and reserve 0 as the end-of-chunk marker). */
297
298 size = rec_get_converted_size_temp(
299 index, tuple->fields, tuple->n_fields, &extra_size);
300 ut_ad(size >= extra_size);
301 ut_ad(size <= sizeof log->tail.buf);
302
303 mrec_size = ROW_LOG_HEADER_SIZE
304 + (extra_size >= 0x80) + size
305 + (trx_id ? DATA_TRX_ID_LEN : 0);
306
307 log = index->online_log;
308 mutex_enter(&log->mutex);
309
310 if (trx_id > log->max_trx) {
311 log->max_trx = trx_id;
312 }
313
314 if (!row_log_block_allocate(log->tail)) {
315 log->error = DB_OUT_OF_MEMORY;
316 goto err_exit;
317 }
318
319 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
320
321 ut_ad(log->tail.bytes < srv_sort_buf_size);
322 avail_size = srv_sort_buf_size - log->tail.bytes;
323
324 if (mrec_size > avail_size) {
325 b = log->tail.buf;
326 } else {
327 b = log->tail.block + log->tail.bytes;
328 }
329
330 if (trx_id != 0) {
331 *b++ = ROW_OP_INSERT;
332 trx_write_trx_id(b, trx_id);
333 b += DATA_TRX_ID_LEN;
334 } else {
335 *b++ = ROW_OP_DELETE;
336 }
337
338 if (extra_size < 0x80) {
339 *b++ = (byte) extra_size;
340 } else {
341 ut_ad(extra_size < 0x8000);
342 *b++ = (byte) (0x80 | (extra_size >> 8));
343 *b++ = (byte) extra_size;
344 }
345
346 rec_convert_dtuple_to_temp(
347 b + extra_size, index, tuple->fields, tuple->n_fields);
348 b += size;
349
350 if (mrec_size >= avail_size) {
351 const os_offset_t byte_offset
352 = (os_offset_t) log->tail.blocks
353 * srv_sort_buf_size;
354 ibool ret;
355
356 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
357 goto write_failed;
358 }
359
360 if (mrec_size == avail_size) {
361 ut_ad(b == &log->tail.block[srv_sort_buf_size]);
362 } else {
363 ut_ad(b == log->tail.buf + mrec_size);
364 memcpy(log->tail.block + log->tail.bytes,
365 log->tail.buf, avail_size);
366 }
367 UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
368
369 if (row_log_tmpfile(log) < 0) {
370 log->error = DB_OUT_OF_MEMORY;
371 goto err_exit;
372 }
373
374 ret = os_file_write_int_fd(
375 "(modification log)",
376 log->fd,
377 log->tail.block, byte_offset, srv_sort_buf_size);
378 log->tail.blocks++;
379 if (!ret) {
380 write_failed:
381 /* We set the flag directly instead of invoking
382 dict_set_corrupted_index_cache_only(index) here,
383 because the index is not "public" yet. */
384 index->type |= DICT_CORRUPT;
385 }
386 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
387 memcpy(log->tail.block, log->tail.buf + avail_size,
388 mrec_size - avail_size);
389 log->tail.bytes = mrec_size - avail_size;
390 } else {
391 log->tail.bytes += mrec_size;
392 ut_ad(b == log->tail.block + log->tail.bytes);
393 }
394
395 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
396 err_exit:
397 mutex_exit(&log->mutex);
398 }
399
400 /******************************************************//**
401 Gets the error status of the online index rebuild log.
402 @return DB_SUCCESS or error code */
403 UNIV_INTERN
404 dberr_t
row_log_table_get_error(const dict_index_t * index)405 row_log_table_get_error(
406 /*====================*/
407 const dict_index_t* index) /*!< in: clustered index of a table
408 that is being rebuilt online */
409 {
410 ut_ad(dict_index_is_clust(index));
411 ut_ad(dict_index_is_online_ddl(index));
412 return(index->online_log->error);
413 }
414
415 /******************************************************//**
416 Starts logging an operation to a table that is being rebuilt.
417 @return pointer to log, or NULL if no logging is necessary */
418 static MY_ATTRIBUTE((nonnull, warn_unused_result))
419 byte*
row_log_table_open(row_log_t * log,ulint size,ulint * avail)420 row_log_table_open(
421 /*===============*/
422 row_log_t* log, /*!< in/out: online rebuild log */
423 ulint size, /*!< in: size of log record */
424 ulint* avail) /*!< out: available size for log record */
425 {
426 mutex_enter(&log->mutex);
427
428 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
429
430 if (log->error != DB_SUCCESS) {
431 err_exit:
432 mutex_exit(&log->mutex);
433 return(NULL);
434 }
435
436 if (!row_log_block_allocate(log->tail)) {
437 log->error = DB_OUT_OF_MEMORY;
438 goto err_exit;
439 }
440
441 ut_ad(log->tail.bytes < srv_sort_buf_size);
442 *avail = srv_sort_buf_size - log->tail.bytes;
443
444 if (size > *avail) {
445 return(log->tail.buf);
446 } else {
447 return(log->tail.block + log->tail.bytes);
448 }
449 }
450
451 /******************************************************//**
452 Stops logging an operation to a table that is being rebuilt. */
453 static MY_ATTRIBUTE((nonnull))
454 void
row_log_table_close_func(row_log_t * log,const byte * b,ulint size,ulint avail)455 row_log_table_close_func(
456 /*=====================*/
457 row_log_t* log, /*!< in/out: online rebuild log */
458 #ifdef UNIV_DEBUG
459 const byte* b, /*!< in: end of log record */
460 #endif /* UNIV_DEBUG */
461 ulint size, /*!< in: size of log record */
462 ulint avail) /*!< in: available size for log record */
463 {
464 ut_ad(mutex_own(&log->mutex));
465
466 if (size >= avail) {
467 const os_offset_t byte_offset
468 = (os_offset_t) log->tail.blocks
469 * srv_sort_buf_size;
470 ibool ret;
471
472 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
473 goto write_failed;
474 }
475
476 if (size == avail) {
477 ut_ad(b == &log->tail.block[srv_sort_buf_size]);
478 } else {
479 ut_ad(b == log->tail.buf + size);
480 memcpy(log->tail.block + log->tail.bytes,
481 log->tail.buf, avail);
482 }
483 UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
484
485 if (row_log_tmpfile(log) < 0) {
486 log->error = DB_OUT_OF_MEMORY;
487 goto err_exit;
488 }
489
490 ret = os_file_write_int_fd(
491 "(modification log)",
492 log->fd,
493 log->tail.block, byte_offset, srv_sort_buf_size);
494 log->tail.blocks++;
495 if (!ret) {
496 write_failed:
497 log->error = DB_ONLINE_LOG_TOO_BIG;
498 }
499 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
500 memcpy(log->tail.block, log->tail.buf + avail, size - avail);
501 log->tail.bytes = size - avail;
502 } else {
503 log->tail.bytes += size;
504 ut_ad(b == log->tail.block + log->tail.bytes);
505 }
506
507 log->tail.total += size;
508 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
509 err_exit:
510 mutex_exit(&log->mutex);
511 }
512
513 #ifdef UNIV_DEBUG
514 # define row_log_table_close(log, b, size, avail) \
515 row_log_table_close_func(log, b, size, avail)
516 #else /* UNIV_DEBUG */
517 # define row_log_table_close(log, b, size, avail) \
518 row_log_table_close_func(log, size, avail)
519 #endif /* UNIV_DEBUG */
520
521 /******************************************************//**
522 Logs a delete operation to a table that is being rebuilt.
523 This will be merged in row_log_table_apply_delete(). */
524 UNIV_INTERN
525 void
row_log_table_delete(const rec_t * rec,dict_index_t * index,const ulint * offsets,const byte * sys)526 row_log_table_delete(
527 /*=================*/
528 const rec_t* rec, /*!< in: clustered index leaf page record,
529 page X-latched */
530 dict_index_t* index, /*!< in/out: clustered index, S-latched
531 or X-latched */
532 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
533 const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should
534 be logged, or NULL to use those in rec */
535 {
536 ulint old_pk_extra_size;
537 ulint old_pk_size;
538 ulint ext_size = 0;
539 ulint mrec_size;
540 ulint avail_size;
541 mem_heap_t* heap = NULL;
542 const dtuple_t* old_pk;
543 row_ext_t* ext;
544
545 ut_ad(dict_index_is_clust(index));
546 ut_ad(rec_offs_validate(rec, index, offsets));
547 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
548 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
549 #ifdef UNIV_SYNC_DEBUG
550 ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
551 || rw_lock_own(&index->lock, RW_LOCK_EX));
552 #endif /* UNIV_SYNC_DEBUG */
553
554 if (dict_index_is_corrupted(index)
555 || !dict_index_is_online_ddl(index)
556 || index->online_log->error != DB_SUCCESS) {
557 return;
558 }
559
560 dict_table_t* new_table = index->online_log->table;
561 dict_index_t* new_index = dict_table_get_first_index(new_table);
562
563 ut_ad(dict_index_is_clust(new_index));
564 ut_ad(!dict_index_is_online_ddl(new_index));
565
566 /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
567 if (index->online_log->same_pk) {
568 dtuple_t* tuple;
569 ut_ad(new_index->n_uniq == index->n_uniq);
570
571 /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first
572 fields of the record. */
573 heap = mem_heap_create(
574 DATA_TRX_ID_LEN
575 + DTUPLE_EST_ALLOC(new_index->n_uniq + 2));
576 old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 2);
577 dict_index_copy_types(tuple, new_index, tuple->n_fields);
578 dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
579
580 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
581 ulint len;
582 const void* field = rec_get_nth_field(
583 rec, offsets, i, &len);
584 dfield_t* dfield = dtuple_get_nth_field(
585 tuple, i);
586 ut_ad(len != UNIV_SQL_NULL);
587 ut_ad(!rec_offs_nth_extern(offsets, i));
588 dfield_set_data(dfield, field, len);
589 }
590
591 if (sys) {
592 dfield_set_data(
593 dtuple_get_nth_field(tuple,
594 new_index->n_uniq),
595 sys, DATA_TRX_ID_LEN);
596 dfield_set_data(
597 dtuple_get_nth_field(tuple,
598 new_index->n_uniq + 1),
599 sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
600 }
601 } else {
602 /* The PRIMARY KEY has changed. Translate the tuple. */
603 old_pk = row_log_table_get_pk(
604 rec, index, offsets, NULL, &heap);
605
606 if (!old_pk) {
607 ut_ad(index->online_log->error != DB_SUCCESS);
608 if (heap) {
609 goto func_exit;
610 }
611 return;
612 }
613 }
614
615 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
616 old_pk, old_pk->n_fields - 2)->len);
617 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
618 old_pk, old_pk->n_fields - 1)->len);
619 old_pk_size = rec_get_converted_size_temp(
620 new_index, old_pk->fields, old_pk->n_fields,
621 &old_pk_extra_size);
622 ut_ad(old_pk_extra_size < 0x100);
623
624 mrec_size = 6 + old_pk_size;
625
626 /* Log enough prefix of the BLOB unless both the
627 old and new table are in COMPACT or REDUNDANT format,
628 which store the prefix in the clustered index record. */
629 if (rec_offs_any_extern(offsets)
630 && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
631 || dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
632
633 /* Build a cache of those off-page column prefixes
634 that are referenced by secondary indexes. It can be
635 that none of the off-page columns are needed. */
636 row_build(ROW_COPY_DATA, index, rec,
637 offsets, NULL, NULL, NULL, &ext, heap);
638 if (ext) {
639 /* Log the row_ext_t, ext->ext and ext->buf */
640 ext_size = ext->n_ext * ext->max_len
641 + sizeof(*ext)
642 + ext->n_ext * sizeof(ulint)
643 + (ext->n_ext - 1) * sizeof ext->len;
644 mrec_size += ext_size;
645 }
646 }
647
648 if (byte* b = row_log_table_open(index->online_log,
649 mrec_size, &avail_size)) {
650 *b++ = ROW_T_DELETE;
651 *b++ = static_cast<byte>(old_pk_extra_size);
652
653 /* Log the size of external prefix we saved */
654 mach_write_to_4(b, ext_size);
655 b += 4;
656
657 rec_convert_dtuple_to_temp(
658 b + old_pk_extra_size, new_index,
659 old_pk->fields, old_pk->n_fields);
660
661 b += old_pk_size;
662
663 if (ext_size) {
664 ulint cur_ext_size = sizeof(*ext)
665 + (ext->n_ext - 1) * sizeof ext->len;
666
667 memcpy(b, ext, cur_ext_size);
668 b += cur_ext_size;
669
670 /* Check if we need to col_map to adjust the column
671 number. If columns were added/removed/reordered,
672 adjust the column number. */
673 if (const ulint* col_map =
674 index->online_log->col_map) {
675 for (ulint i = 0; i < ext->n_ext; i++) {
676 const_cast<ulint&>(ext->ext[i]) =
677 col_map[ext->ext[i]];
678 }
679 }
680
681 memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
682 b += ext->n_ext * sizeof(*ext->ext);
683
684 ext_size -= cur_ext_size
685 + ext->n_ext * sizeof(*ext->ext);
686 memcpy(b, ext->buf, ext_size);
687 b += ext_size;
688 }
689
690 row_log_table_close(
691 index->online_log, b, mrec_size, avail_size);
692 }
693
694 func_exit:
695 mem_heap_free(heap);
696 }
697
698 /******************************************************//**
699 Logs an insert or update to a table that is being rebuilt. */
700 static
701 void
row_log_table_low_redundant(const rec_t * rec,dict_index_t * index,bool insert,const dtuple_t * old_pk,const dict_index_t * new_index)702 row_log_table_low_redundant(
703 /*========================*/
704 const rec_t* rec, /*!< in: clustered index leaf
705 page record in ROW_FORMAT=REDUNDANT,
706 page X-latched */
707 dict_index_t* index, /*!< in/out: clustered index, S-latched
708 or X-latched */
709 bool insert, /*!< in: true if insert,
710 false if update */
711 const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value
712 (if !insert and a PRIMARY KEY
713 is being created) */
714 const dict_index_t* new_index)
715 /*!< in: clustered index of the
716 new table, not latched */
717 {
718 ulint old_pk_size;
719 ulint old_pk_extra_size;
720 ulint size;
721 ulint extra_size;
722 ulint mrec_size;
723 ulint avail_size;
724 mem_heap_t* heap = NULL;
725 dtuple_t* tuple;
726
727 ut_ad(!page_is_comp(page_align(rec)));
728 ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
729 ut_ad(dict_tf_is_valid(index->table->flags));
730 ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */
731 ut_ad(dict_index_is_clust(new_index));
732
733 heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
734 tuple = dtuple_create(heap, index->n_fields);
735 dict_index_copy_types(tuple, index, index->n_fields);
736 dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
737
738 if (rec_get_1byte_offs_flag(rec)) {
739 for (ulint i = 0; i < index->n_fields; i++) {
740 dfield_t* dfield;
741 ulint len;
742 const void* field;
743
744 dfield = dtuple_get_nth_field(tuple, i);
745 field = rec_get_nth_field_old(rec, i, &len);
746
747 dfield_set_data(dfield, field, len);
748 }
749 } else {
750 for (ulint i = 0; i < index->n_fields; i++) {
751 dfield_t* dfield;
752 ulint len;
753 const void* field;
754
755 dfield = dtuple_get_nth_field(tuple, i);
756 field = rec_get_nth_field_old(rec, i, &len);
757
758 dfield_set_data(dfield, field, len);
759
760 if (rec_2_is_field_extern(rec, i)) {
761 dfield_set_ext(dfield);
762 }
763 }
764 }
765
766 size = rec_get_converted_size_temp(
767 index, tuple->fields, tuple->n_fields, &extra_size);
768
769 mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
770
771 if (insert || index->online_log->same_pk) {
772 ut_ad(!old_pk);
773 old_pk_extra_size = old_pk_size = 0;
774 } else {
775 ut_ad(old_pk);
776 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
777 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
778 old_pk, old_pk->n_fields - 2)->len);
779 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
780 old_pk, old_pk->n_fields - 1)->len);
781
782 old_pk_size = rec_get_converted_size_temp(
783 new_index, old_pk->fields, old_pk->n_fields,
784 &old_pk_extra_size);
785 ut_ad(old_pk_extra_size < 0x100);
786 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
787 }
788
789 if (byte* b = row_log_table_open(index->online_log,
790 mrec_size, &avail_size)) {
791 *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
792
793 if (old_pk_size) {
794 *b++ = static_cast<byte>(old_pk_extra_size);
795
796 rec_convert_dtuple_to_temp(
797 b + old_pk_extra_size, new_index,
798 old_pk->fields, old_pk->n_fields);
799 b += old_pk_size;
800 }
801
802 if (extra_size < 0x80) {
803 *b++ = static_cast<byte>(extra_size);
804 } else {
805 ut_ad(extra_size < 0x8000);
806 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
807 *b++ = static_cast<byte>(extra_size);
808 }
809
810 rec_convert_dtuple_to_temp(
811 b + extra_size, index, tuple->fields, tuple->n_fields);
812 b += size;
813
814 row_log_table_close(
815 index->online_log, b, mrec_size, avail_size);
816 }
817
818 mem_heap_free(heap);
819 }
820
821 /******************************************************//**
822 Logs an insert or update to a table that is being rebuilt. */
823 static MY_ATTRIBUTE((nonnull(1,2,3)))
824 void
row_log_table_low(const rec_t * rec,dict_index_t * index,const ulint * offsets,bool insert,const dtuple_t * old_pk)825 row_log_table_low(
826 /*==============*/
827 const rec_t* rec, /*!< in: clustered index leaf page record,
828 page X-latched */
829 dict_index_t* index, /*!< in/out: clustered index, S-latched
830 or X-latched */
831 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
832 bool insert, /*!< in: true if insert, false if update */
833 const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert
834 and a PRIMARY KEY is being created) */
835 {
836 ulint omit_size;
837 ulint old_pk_size;
838 ulint old_pk_extra_size;
839 ulint extra_size;
840 ulint mrec_size;
841 ulint avail_size;
842 const dict_index_t* new_index = dict_table_get_first_index(
843 index->online_log->table);
844 ut_ad(dict_index_is_clust(index));
845 ut_ad(dict_index_is_clust(new_index));
846 ut_ad(!dict_index_is_online_ddl(new_index));
847 ut_ad(rec_offs_validate(rec, index, offsets));
848 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
849 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
850 #ifdef UNIV_SYNC_DEBUG
851 ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
852 || rw_lock_own(&index->lock, RW_LOCK_EX));
853 #endif /* UNIV_SYNC_DEBUG */
854 ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
855 ut_ad(page_is_leaf(page_align(rec)));
856 ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
857
858 if (dict_index_is_corrupted(index)
859 || !dict_index_is_online_ddl(index)
860 || index->online_log->error != DB_SUCCESS) {
861 return;
862 }
863
864 if (!rec_offs_comp(offsets)) {
865 row_log_table_low_redundant(
866 rec, index, insert, old_pk, new_index);
867 return;
868 }
869
870 ut_ad(page_is_comp(page_align(rec)));
871 ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
872
873 omit_size = REC_N_NEW_EXTRA_BYTES;
874
875 extra_size = rec_offs_extra_size(offsets) - omit_size;
876
877 mrec_size = ROW_LOG_HEADER_SIZE
878 + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
879
880 if (insert || index->online_log->same_pk) {
881 ut_ad(!old_pk);
882 old_pk_extra_size = old_pk_size = 0;
883 } else {
884 ut_ad(old_pk);
885 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
886 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
887 old_pk, old_pk->n_fields - 2)->len);
888 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
889 old_pk, old_pk->n_fields - 1)->len);
890
891 old_pk_size = rec_get_converted_size_temp(
892 new_index, old_pk->fields, old_pk->n_fields,
893 &old_pk_extra_size);
894 ut_ad(old_pk_extra_size < 0x100);
895 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
896 }
897
898 if (byte* b = row_log_table_open(index->online_log,
899 mrec_size, &avail_size)) {
900 *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
901
902 if (old_pk_size) {
903 *b++ = static_cast<byte>(old_pk_extra_size);
904
905 rec_convert_dtuple_to_temp(
906 b + old_pk_extra_size, new_index,
907 old_pk->fields, old_pk->n_fields);
908 b += old_pk_size;
909 }
910
911 if (extra_size < 0x80) {
912 *b++ = static_cast<byte>(extra_size);
913 } else {
914 ut_ad(extra_size < 0x8000);
915 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
916 *b++ = static_cast<byte>(extra_size);
917 }
918
919 memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
920 b += extra_size;
921 memcpy(b, rec, rec_offs_data_size(offsets));
922 b += rec_offs_data_size(offsets);
923
924 row_log_table_close(
925 index->online_log, b, mrec_size, avail_size);
926 }
927 }
928
929 /******************************************************//**
930 Logs an update to a table that is being rebuilt.
931 This will be merged in row_log_table_apply_update(). */
932 UNIV_INTERN
933 void
row_log_table_update(const rec_t * rec,dict_index_t * index,const ulint * offsets,const dtuple_t * old_pk)934 row_log_table_update(
935 /*=================*/
936 const rec_t* rec, /*!< in: clustered index leaf page record,
937 page X-latched */
938 dict_index_t* index, /*!< in/out: clustered index, S-latched
939 or X-latched */
940 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
941 const dtuple_t* old_pk) /*!< in: row_log_table_get_pk()
942 before the update */
943 {
944 row_log_table_low(rec, index, offsets, false, old_pk);
945 }
946
947 /** Gets the old table column of a PRIMARY KEY column.
948 @param table old table (before ALTER TABLE)
949 @param col_map mapping of old column numbers to new ones
950 @param col_no column position in the new table
951 @return old table column, or NULL if this is an added column */
952 static
953 const dict_col_t*
row_log_table_get_pk_old_col(const dict_table_t * table,const ulint * col_map,ulint col_no)954 row_log_table_get_pk_old_col(
955 /*=========================*/
956 const dict_table_t* table,
957 const ulint* col_map,
958 ulint col_no)
959 {
960 for (ulint i = 0; i < table->n_cols; i++) {
961 if (col_no == col_map[i]) {
962 return(dict_table_get_nth_col(table, i));
963 }
964 }
965
966 return(NULL);
967 }
968
969 /** Maps an old table column of a PRIMARY KEY column.
970 @param col old table column (before ALTER TABLE)
971 @param ifield clustered index field in the new table (after ALTER TABLE)
972 @param dfield clustered index tuple field in the new table
973 @param heap memory heap for allocating dfield contents
974 @param rec clustered index leaf page record in the old table
975 @param offsets rec_get_offsets(rec)
976 @param i rec field corresponding to col
977 @param zip_size compressed page size of the old table, or 0 for uncompressed
978 @param max_len maximum length of dfield
979 @retval DB_INVALID_NULL if a NULL value is encountered
980 @retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */
981 static
982 dberr_t
row_log_table_get_pk_col(const dict_col_t * col,const dict_field_t * ifield,dfield_t * dfield,mem_heap_t * heap,const rec_t * rec,const ulint * offsets,ulint i,ulint zip_size,ulint max_len)983 row_log_table_get_pk_col(
984 /*=====================*/
985 const dict_col_t* col,
986 const dict_field_t* ifield,
987 dfield_t* dfield,
988 mem_heap_t* heap,
989 const rec_t* rec,
990 const ulint* offsets,
991 ulint i,
992 ulint zip_size,
993 ulint max_len)
994 {
995 const byte* field;
996 ulint len;
997
998 ut_ad(ut_is_2pow(zip_size));
999
1000 field = rec_get_nth_field(rec, offsets, i, &len);
1001
1002 if (len == UNIV_SQL_NULL) {
1003 return(DB_INVALID_NULL);
1004 }
1005
1006 if (rec_offs_nth_extern(offsets, i)) {
1007 ulint field_len = ifield->prefix_len;
1008 byte* blob_field;
1009
1010 if (!field_len) {
1011 field_len = ifield->fixed_len;
1012 if (!field_len) {
1013 field_len = max_len + 1;
1014 }
1015 }
1016
1017 blob_field = static_cast<byte*>(
1018 mem_heap_alloc(heap, field_len));
1019
1020 len = btr_copy_externally_stored_field_prefix(
1021 blob_field, field_len, zip_size, field, len);
1022 if (len >= max_len + 1) {
1023 return(DB_TOO_BIG_INDEX_COL);
1024 }
1025
1026 dfield_set_data(dfield, blob_field, len);
1027 } else {
1028 dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
1029 }
1030
1031 return(DB_SUCCESS);
1032 }
1033
1034 /******************************************************//**
1035 Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
1036 of a table that is being rebuilt.
1037 @return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
1038 or NULL if the PRIMARY KEY definition does not change */
1039 UNIV_INTERN
1040 const dtuple_t*
row_log_table_get_pk(const rec_t * rec,dict_index_t * index,const ulint * offsets,byte * sys,mem_heap_t ** heap)1041 row_log_table_get_pk(
1042 /*=================*/
1043 const rec_t* rec, /*!< in: clustered index leaf page record,
1044 page X-latched */
1045 dict_index_t* index, /*!< in/out: clustered index, S-latched
1046 or X-latched */
1047 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
1048 byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for
1049 row_log_table_delete(), or NULL */
1050 mem_heap_t** heap) /*!< in/out: memory heap where allocated */
1051 {
1052 dtuple_t* tuple = NULL;
1053 row_log_t* log = index->online_log;
1054
1055 ut_ad(dict_index_is_clust(index));
1056 ut_ad(dict_index_is_online_ddl(index));
1057 ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
1058 #ifdef UNIV_SYNC_DEBUG
1059 ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
1060 || rw_lock_own(&index->lock, RW_LOCK_EX));
1061 #endif /* UNIV_SYNC_DEBUG */
1062
1063 ut_ad(log);
1064 ut_ad(log->table);
1065
1066 if (log->same_pk) {
1067 /* The PRIMARY KEY columns are unchanged. */
1068 if (sys) {
1069 /* Store the DB_TRX_ID,DB_ROLL_PTR. */
1070 ulint trx_id_offs = index->trx_id_offset;
1071
1072 if (!trx_id_offs) {
1073 ulint pos = dict_index_get_sys_col_pos(
1074 index, DATA_TRX_ID);
1075 ulint len;
1076 ut_ad(pos > 0);
1077
1078 if (!offsets) {
1079 offsets = rec_get_offsets(
1080 rec, index, NULL, pos + 1,
1081 heap);
1082 }
1083
1084 trx_id_offs = rec_get_nth_field_offs(
1085 offsets, pos, &len);
1086 ut_ad(len == DATA_TRX_ID_LEN);
1087 }
1088
1089 memcpy(sys, rec + trx_id_offs,
1090 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1091 }
1092
1093 return(NULL);
1094 }
1095
1096 mutex_enter(&log->mutex);
1097
1098 /* log->error is protected by log->mutex. */
1099 if (log->error == DB_SUCCESS) {
1100 dict_table_t* new_table = log->table;
1101 dict_index_t* new_index
1102 = dict_table_get_first_index(new_table);
1103 const ulint new_n_uniq
1104 = dict_index_get_n_unique(new_index);
1105
1106 if (!*heap) {
1107 ulint size = 0;
1108
1109 if (!offsets) {
1110 size += (1 + REC_OFFS_HEADER_SIZE
1111 + index->n_fields)
1112 * sizeof *offsets;
1113 }
1114
1115 for (ulint i = 0; i < new_n_uniq; i++) {
1116 size += dict_col_get_min_size(
1117 dict_index_get_nth_col(new_index, i));
1118 }
1119
1120 *heap = mem_heap_create(
1121 DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1122 }
1123
1124 if (!offsets) {
1125 offsets = rec_get_offsets(rec, index, NULL,
1126 ULINT_UNDEFINED, heap);
1127 }
1128
1129 tuple = dtuple_create(*heap, new_n_uniq + 2);
1130 dict_index_copy_types(tuple, new_index, tuple->n_fields);
1131 dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1132
1133 const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1134 const ulint zip_size = dict_table_zip_size(index->table);
1135
1136 for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1137 dict_field_t* ifield;
1138 dfield_t* dfield;
1139 ulint prtype;
1140 ulint mbminmaxlen;
1141
1142 ifield = dict_index_get_nth_field(new_index, new_i);
1143 dfield = dtuple_get_nth_field(tuple, new_i);
1144
1145 const ulint col_no
1146 = dict_field_get_col(ifield)->ind;
1147
1148 if (const dict_col_t* col
1149 = row_log_table_get_pk_old_col(
1150 index->table, log->col_map, col_no)) {
1151 ulint i = dict_col_get_clust_pos(col, index);
1152
1153 if (i == ULINT_UNDEFINED) {
1154 ut_ad(0);
1155 log->error = DB_CORRUPTION;
1156 goto err_exit;
1157 }
1158
1159 log->error = row_log_table_get_pk_col(
1160 col, ifield, dfield, *heap,
1161 rec, offsets, i, zip_size, max_len);
1162
1163 if (log->error != DB_SUCCESS) {
1164 err_exit:
1165 tuple = NULL;
1166 goto func_exit;
1167 }
1168
1169 mbminmaxlen = col->mbminmaxlen;
1170 prtype = col->prtype;
1171 } else {
1172 /* No matching column was found in the old
1173 table, so this must be an added column.
1174 Copy the default value. */
1175 ut_ad(log->add_cols);
1176
1177 dfield_copy(dfield, dtuple_get_nth_field(
1178 log->add_cols, col_no));
1179 mbminmaxlen = dfield->type.mbminmaxlen;
1180 prtype = dfield->type.prtype;
1181 }
1182
1183 ut_ad(!dfield_is_ext(dfield));
1184 ut_ad(!dfield_is_null(dfield));
1185
1186 if (ifield->prefix_len) {
1187 ulint len = dtype_get_at_most_n_mbchars(
1188 prtype, mbminmaxlen,
1189 ifield->prefix_len,
1190 dfield_get_len(dfield),
1191 static_cast<const char*>(
1192 dfield_get_data(dfield)));
1193
1194 ut_ad(len <= dfield_get_len(dfield));
1195 dfield_set_len(dfield, len);
1196 }
1197 }
1198
1199 const byte* trx_roll = rec
1200 + row_get_trx_id_offset(index, offsets);
1201
1202 /* Copy the fields, because the fields will be updated
1203 or the record may be moved somewhere else in the B-tree
1204 as part of the upcoming operation. */
1205 if (sys) {
1206 memcpy(sys, trx_roll,
1207 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1208 trx_roll = sys;
1209 } else {
1210 trx_roll = static_cast<const byte*>(
1211 mem_heap_dup(
1212 *heap, trx_roll,
1213 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
1214 }
1215
1216 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1217 trx_roll, DATA_TRX_ID_LEN);
1218 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1219 trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1220 }
1221
1222 func_exit:
1223 mutex_exit(&log->mutex);
1224 return(tuple);
1225 }
1226
1227 /******************************************************//**
1228 Logs an insert to a table that is being rebuilt.
1229 This will be merged in row_log_table_apply_insert(). */
1230 UNIV_INTERN
1231 void
row_log_table_insert(const rec_t * rec,dict_index_t * index,const ulint * offsets)1232 row_log_table_insert(
1233 /*=================*/
1234 const rec_t* rec, /*!< in: clustered index leaf page record,
1235 page X-latched */
1236 dict_index_t* index, /*!< in/out: clustered index, S-latched
1237 or X-latched */
1238 const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */
1239 {
1240 row_log_table_low(rec, index, offsets, true, NULL);
1241 }
1242
1243 /******************************************************//**
1244 Notes that a BLOB is being freed during online ALTER TABLE. */
1245 UNIV_INTERN
1246 void
row_log_table_blob_free(dict_index_t * index,ulint page_no)1247 row_log_table_blob_free(
1248 /*====================*/
1249 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1250 ulint page_no)/*!< in: starting page number of the BLOB */
1251 {
1252 ut_ad(dict_index_is_clust(index));
1253 ut_ad(dict_index_is_online_ddl(index));
1254 #ifdef UNIV_SYNC_DEBUG
1255 ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1256 #endif /* UNIV_SYNC_DEBUG */
1257 ut_ad(page_no != FIL_NULL);
1258
1259 if (index->online_log->error != DB_SUCCESS) {
1260 return;
1261 }
1262
1263 page_no_map* blobs = index->online_log->blobs;
1264
1265 if (!blobs) {
1266 index->online_log->blobs = blobs = new page_no_map();
1267 }
1268
1269 #ifdef UNIV_DEBUG
1270 const ulonglong log_pos = index->online_log->tail.total;
1271 #else
1272 # define log_pos /* empty */
1273 #endif /* UNIV_DEBUG */
1274
1275 const page_no_map::value_type v(page_no,
1276 row_log_table_blob_t(log_pos));
1277
1278 std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1279
1280 if (!p.second) {
1281 /* Update the existing mapping. */
1282 ut_ad(p.first->first == page_no);
1283 p.first->second.blob_free(log_pos);
1284 }
1285 #undef log_pos
1286 }
1287
1288 /******************************************************//**
1289 Notes that a BLOB is being allocated during online ALTER TABLE. */
1290 UNIV_INTERN
1291 void
row_log_table_blob_alloc(dict_index_t * index,ulint page_no)1292 row_log_table_blob_alloc(
1293 /*=====================*/
1294 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1295 ulint page_no)/*!< in: starting page number of the BLOB */
1296 {
1297 ut_ad(dict_index_is_clust(index));
1298 ut_ad(dict_index_is_online_ddl(index));
1299 #ifdef UNIV_SYNC_DEBUG
1300 ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1301 #endif /* UNIV_SYNC_DEBUG */
1302 ut_ad(page_no != FIL_NULL);
1303
1304 if (index->online_log->error != DB_SUCCESS) {
1305 return;
1306 }
1307
1308 /* Only track allocations if the same page has been freed
1309 earlier. Double allocation without a free is not allowed. */
1310 if (page_no_map* blobs = index->online_log->blobs) {
1311 page_no_map::iterator p = blobs->find(page_no);
1312
1313 if (p != blobs->end()) {
1314 ut_ad(p->first == page_no);
1315 p->second.blob_alloc(index->online_log->tail.total);
1316 }
1317 }
1318 }
1319
1320 /******************************************************//**
1321 Converts a log record to a table row.
1322 @return converted row, or NULL if the conversion fails */
1323 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1324 const dtuple_t*
row_log_table_apply_convert_mrec(const mrec_t * mrec,dict_index_t * index,const ulint * offsets,const row_log_t * log,mem_heap_t * heap,trx_id_t trx_id,dberr_t * error)1325 row_log_table_apply_convert_mrec(
1326 /*=============================*/
1327 const mrec_t* mrec, /*!< in: merge record */
1328 dict_index_t* index, /*!< in: index of mrec */
1329 const ulint* offsets, /*!< in: offsets of mrec */
1330 const row_log_t* log, /*!< in: rebuild context */
1331 mem_heap_t* heap, /*!< in/out: memory heap */
1332 trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
1333 dberr_t* error) /*!< out: DB_SUCCESS or
1334 DB_MISSING_HISTORY or
1335 reason of failure */
1336 {
1337 dtuple_t* row;
1338
1339 *error = DB_SUCCESS;
1340
1341 /* This is based on row_build(). */
1342 if (log->add_cols) {
1343 row = dtuple_copy(log->add_cols, heap);
1344 /* dict_table_copy_types() would set the fields to NULL */
1345 for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1346 dict_col_copy_type(
1347 dict_table_get_nth_col(log->table, i),
1348 dfield_get_type(dtuple_get_nth_field(row, i)));
1349 }
1350 } else {
1351 row = dtuple_create(heap, dict_table_get_n_cols(log->table));
1352 dict_table_copy_types(row, log->table);
1353 }
1354
1355 for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1356 const dict_field_t* ind_field
1357 = dict_index_get_nth_field(index, i);
1358
1359 if (ind_field->prefix_len) {
1360 /* Column prefixes can only occur in key
1361 fields, which cannot be stored externally. For
1362 a column prefix, there should also be the full
1363 field in the clustered index tuple. The row
1364 tuple comprises full fields, not prefixes. */
1365 ut_ad(!rec_offs_nth_extern(offsets, i));
1366 continue;
1367 }
1368
1369 const dict_col_t* col
1370 = dict_field_get_col(ind_field);
1371 ulint col_no
1372 = log->col_map[dict_col_get_no(col)];
1373
1374 if (col_no == ULINT_UNDEFINED) {
1375 /* dropped column */
1376 continue;
1377 }
1378
1379 dfield_t* dfield
1380 = dtuple_get_nth_field(row, col_no);
1381 ulint len;
1382 const byte* data;
1383
1384 if (rec_offs_nth_extern(offsets, i)) {
1385 ut_ad(rec_offs_any_extern(offsets));
1386 rw_lock_x_lock(dict_index_get_lock(index));
1387
1388 if (const page_no_map* blobs = log->blobs) {
1389 data = rec_get_nth_field(
1390 mrec, offsets, i, &len);
1391 ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1392
1393 ulint page_no = mach_read_from_4(
1394 data + len - (BTR_EXTERN_FIELD_REF_SIZE
1395 - BTR_EXTERN_PAGE_NO));
1396 page_no_map::const_iterator p = blobs->find(
1397 page_no);
1398 if (p != blobs->end()
1399 && p->second.is_freed(log->head.total)) {
1400 /* This BLOB has been freed.
1401 We must not access the row. */
1402 *error = DB_MISSING_HISTORY;
1403 dfield_set_data(dfield, data, len);
1404 dfield_set_ext(dfield);
1405 goto blob_done;
1406 }
1407 }
1408
1409 data = btr_rec_copy_externally_stored_field(
1410 mrec, offsets,
1411 dict_table_zip_size(index->table),
1412 i, &len, heap);
1413 ut_a(data);
1414 dfield_set_data(dfield, data, len);
1415 blob_done:
1416 rw_lock_x_unlock(dict_index_get_lock(index));
1417 } else {
1418 data = rec_get_nth_field(mrec, offsets, i, &len);
1419 dfield_set_data(dfield, data, len);
1420 }
1421
1422 if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL
1423 && col->len != len && !dict_table_is_comp(log->table)) {
1424
1425 ut_ad(col->len >= len);
1426 if (dict_table_is_comp(index->table)) {
1427 byte* buf = (byte*) mem_heap_alloc(heap,
1428 col->len);
1429 memcpy(buf, dfield->data, len);
1430 memset(buf + len, 0x20, col->len - len);
1431
1432 dfield_set_data(dfield, buf, col->len);
1433 } else {
1434 /* field length mismatch should not happen
1435 when rebuilding the redundant row format
1436 table. */
1437 ut_ad(0);
1438 *error = DB_CORRUPTION;
1439 return(NULL);
1440 }
1441 }
1442
1443 /* See if any columns were changed to NULL or NOT NULL. */
1444 const dict_col_t* new_col
1445 = dict_table_get_nth_col(log->table, col_no);
1446 ut_ad(new_col->mtype == col->mtype);
1447
1448 /* Assert that prtype matches except for nullability. */
1449 ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1450 ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1451 & ~DATA_NOT_NULL));
1452
1453 if (new_col->prtype == col->prtype) {
1454 continue;
1455 }
1456
1457 if ((new_col->prtype & DATA_NOT_NULL)
1458 && dfield_is_null(dfield)) {
1459 /* We got a NULL value for a NOT NULL column. */
1460 *error = DB_INVALID_NULL;
1461 return(NULL);
1462 }
1463
1464 /* Adjust the DATA_NOT_NULL flag in the parsed row. */
1465 dfield_get_type(dfield)->prtype = new_col->prtype;
1466
1467 ut_ad(dict_col_type_assert_equal(new_col,
1468 dfield_get_type(dfield)));
1469 }
1470
1471 return(row);
1472 }
1473
1474 /******************************************************//**
1475 Replays an insert operation on a table that was rebuilt.
1476 @return DB_SUCCESS or error code */
1477 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1478 dberr_t
row_log_table_apply_insert_low(que_thr_t * thr,const dtuple_t * row,trx_id_t trx_id,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup)1479 row_log_table_apply_insert_low(
1480 /*===========================*/
1481 que_thr_t* thr, /*!< in: query graph */
1482 const dtuple_t* row, /*!< in: table row
1483 in the old table definition */
1484 trx_id_t trx_id, /*!< in: trx_id of the row */
1485 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1486 that can be emptied */
1487 mem_heap_t* heap, /*!< in/out: memory heap */
1488 row_merge_dup_t* dup) /*!< in/out: for reporting
1489 duplicate key errors */
1490 {
1491 dberr_t error;
1492 dtuple_t* entry;
1493 const row_log_t*log = dup->index->online_log;
1494 dict_index_t* index = dict_table_get_first_index(log->table);
1495 ulint n_index = 0;
1496
1497 ut_ad(dtuple_validate(row));
1498 ut_ad(trx_id);
1499
1500 #ifdef ROW_LOG_APPLY_PRINT
1501 if (row_log_apply_print) {
1502 fprintf(stderr, "table apply insert "
1503 IB_ID_FMT " " IB_ID_FMT "\n",
1504 index->table->id, index->id);
1505 dtuple_print(stderr, row);
1506 }
1507 #endif /* ROW_LOG_APPLY_PRINT */
1508
1509 static const ulint flags
1510 = (BTR_CREATE_FLAG
1511 | BTR_NO_LOCKING_FLAG
1512 | BTR_NO_UNDO_LOG_FLAG
1513 | BTR_KEEP_SYS_FLAG);
1514
1515 entry = row_build_index_entry(row, NULL, index, heap);
1516
1517 error = row_ins_clust_index_entry_low(
1518 flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr);
1519
1520 switch (error) {
1521 case DB_SUCCESS:
1522 break;
1523 case DB_SUCCESS_LOCKED_REC:
1524 /* The row had already been copied to the table. */
1525 return(DB_SUCCESS);
1526 default:
1527 return(error);
1528 }
1529
1530 do {
1531 n_index++;
1532
1533 if (!(index = dict_table_get_next_index(index))) {
1534 break;
1535 }
1536
1537 if (index->type & DICT_FTS) {
1538 continue;
1539 }
1540
1541 entry = row_build_index_entry(row, NULL, index, heap);
1542 error = row_ins_sec_index_entry_low(
1543 flags, BTR_MODIFY_TREE,
1544 index, offsets_heap, heap, entry, trx_id, thr);
1545
1546 /* Report correct index name for duplicate key error. */
1547 if (error == DB_DUPLICATE_KEY) {
1548 thr_get_trx(thr)->error_key_num = n_index;
1549 }
1550
1551 } while (error == DB_SUCCESS);
1552
1553 return(error);
1554 }
1555
1556 /******************************************************//**
1557 Replays an insert operation on a table that was rebuilt.
1558 @return DB_SUCCESS or error code */
1559 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1560 dberr_t
row_log_table_apply_insert(que_thr_t * thr,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id)1561 row_log_table_apply_insert(
1562 /*=======================*/
1563 que_thr_t* thr, /*!< in: query graph */
1564 const mrec_t* mrec, /*!< in: record to insert */
1565 const ulint* offsets, /*!< in: offsets of mrec */
1566 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1567 that can be emptied */
1568 mem_heap_t* heap, /*!< in/out: memory heap */
1569 row_merge_dup_t* dup, /*!< in/out: for reporting
1570 duplicate key errors */
1571 trx_id_t trx_id) /*!< in: DB_TRX_ID of mrec */
1572 {
1573 const row_log_t*log = dup->index->online_log;
1574 dberr_t error;
1575 const dtuple_t* row = row_log_table_apply_convert_mrec(
1576 mrec, dup->index, offsets, log, heap, trx_id, &error);
1577
1578 switch (error) {
1579 case DB_MISSING_HISTORY:
1580 ut_ad(log->blobs);
1581 /* Because some BLOBs are missing, we know that the
1582 transaction was rolled back later (a rollback of
1583 an insert can free BLOBs).
1584 We can simply skip the insert: the subsequent
1585 ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will
1586 be interpreted as ROW_T_INSERT. */
1587 return(DB_SUCCESS);
1588 case DB_SUCCESS:
1589 ut_ad(row != NULL);
1590 break;
1591 default:
1592 ut_ad(0);
1593 case DB_INVALID_NULL:
1594 ut_ad(row == NULL);
1595 return(error);
1596 }
1597
1598 error = row_log_table_apply_insert_low(
1599 thr, row, trx_id, offsets_heap, heap, dup);
1600 if (error != DB_SUCCESS) {
1601 /* Report the erroneous row using the new
1602 version of the table. */
1603 innobase_row_to_mysql(dup->table, log->table, row);
1604 }
1605 return(error);
1606 }
1607
1608 /******************************************************//**
1609 Deletes a record from a table that is being rebuilt.
1610 @return DB_SUCCESS or error code */
1611 static MY_ATTRIBUTE((nonnull(1, 2, 4, 5), warn_unused_result))
1612 dberr_t
row_log_table_apply_delete_low(btr_pcur_t * pcur,const ulint * offsets,const row_ext_t * save_ext,mem_heap_t * heap,mtr_t * mtr)1613 row_log_table_apply_delete_low(
1614 /*===========================*/
1615 btr_pcur_t* pcur, /*!< in/out: B-tree cursor,
1616 will be trashed */
1617 const ulint* offsets, /*!< in: offsets on pcur */
1618 const row_ext_t* save_ext, /*!< in: saved external field
1619 info, or NULL */
1620 mem_heap_t* heap, /*!< in/out: memory heap */
1621 mtr_t* mtr) /*!< in/out: mini-transaction,
1622 will be committed */
1623 {
1624 dberr_t error;
1625 row_ext_t* ext;
1626 dtuple_t* row;
1627 dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
1628
1629 ut_ad(dict_index_is_clust(index));
1630
1631 #ifdef ROW_LOG_APPLY_PRINT
1632 if (row_log_apply_print) {
1633 fprintf(stderr, "table apply delete "
1634 IB_ID_FMT " " IB_ID_FMT "\n",
1635 index->table->id, index->id);
1636 rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets);
1637 }
1638 #endif /* ROW_LOG_APPLY_PRINT */
1639 if (dict_table_get_next_index(index)) {
1640 /* Build a row template for purging secondary index entries. */
1641 row = row_build(
1642 ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1643 offsets, NULL, NULL, NULL,
1644 save_ext ? NULL : &ext, heap);
1645 if (!save_ext) {
1646 save_ext = ext;
1647 }
1648 } else {
1649 row = NULL;
1650 }
1651
1652 btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1653 BTR_CREATE_FLAG, RB_NONE, mtr);
1654 mtr_commit(mtr);
1655
1656 if (error != DB_SUCCESS) {
1657 return(error);
1658 }
1659
1660 while ((index = dict_table_get_next_index(index)) != NULL) {
1661 if (index->type & DICT_FTS) {
1662 continue;
1663 }
1664
1665 const dtuple_t* entry = row_build_index_entry(
1666 row, save_ext, index, heap);
1667 mtr_start(mtr);
1668 btr_pcur_open(index, entry, PAGE_CUR_LE,
1669 BTR_MODIFY_TREE, pcur, mtr);
1670 #ifdef UNIV_DEBUG
1671 switch (btr_pcur_get_btr_cur(pcur)->flag) {
1672 case BTR_CUR_DELETE_REF:
1673 case BTR_CUR_DEL_MARK_IBUF:
1674 case BTR_CUR_DELETE_IBUF:
1675 case BTR_CUR_INSERT_TO_IBUF:
1676 /* We did not request buffering. */
1677 break;
1678 case BTR_CUR_HASH:
1679 case BTR_CUR_HASH_FAIL:
1680 case BTR_CUR_BINARY:
1681 goto flag_ok;
1682 }
1683 ut_ad(0);
1684 flag_ok:
1685 #endif /* UNIV_DEBUG */
1686
1687 if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1688 || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1689 /* All secondary index entries should be
1690 found, because new_table is being modified by
1691 this thread only, and all indexes should be
1692 updated in sync. */
1693 mtr_commit(mtr);
1694 return(DB_INDEX_CORRUPT);
1695 }
1696
1697 btr_cur_pessimistic_delete(&error, FALSE,
1698 btr_pcur_get_btr_cur(pcur),
1699 BTR_CREATE_FLAG, RB_NONE, mtr);
1700 mtr_commit(mtr);
1701 }
1702
1703 return(error);
1704 }
1705
1706 /******************************************************//**
1707 Replays a delete operation on a table that was rebuilt.
1708 @return DB_SUCCESS or error code */
1709 static MY_ATTRIBUTE((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
1710 dberr_t
row_log_table_apply_delete(que_thr_t * thr,ulint trx_id_col,const mrec_t * mrec,const ulint * moffsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const row_log_t * log,const row_ext_t * save_ext)1711 row_log_table_apply_delete(
1712 /*=======================*/
1713 que_thr_t* thr, /*!< in: query graph */
1714 ulint trx_id_col, /*!< in: position of
1715 DB_TRX_ID in the new
1716 clustered index */
1717 const mrec_t* mrec, /*!< in: merge record */
1718 const ulint* moffsets, /*!< in: offsets of mrec */
1719 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1720 that can be emptied */
1721 mem_heap_t* heap, /*!< in/out: memory heap */
1722 const row_log_t* log, /*!< in: online log */
1723 const row_ext_t* save_ext) /*!< in: saved external field
1724 info, or NULL */
1725 {
1726 dict_table_t* new_table = log->table;
1727 dict_index_t* index = dict_table_get_first_index(new_table);
1728 dtuple_t* old_pk;
1729 mtr_t mtr;
1730 btr_pcur_t pcur;
1731 ulint* offsets;
1732
1733 ut_ad(rec_offs_n_fields(moffsets)
1734 == dict_index_get_n_unique(index) + 2);
1735 ut_ad(!rec_offs_any_extern(moffsets));
1736
1737 /* Convert the row to a search tuple. */
1738 old_pk = dtuple_create(heap, index->n_uniq);
1739 dict_index_copy_types(old_pk, index, index->n_uniq);
1740
1741 for (ulint i = 0; i < index->n_uniq; i++) {
1742 ulint len;
1743 const void* field;
1744 field = rec_get_nth_field(mrec, moffsets, i, &len);
1745 ut_ad(len != UNIV_SQL_NULL);
1746 dfield_set_data(dtuple_get_nth_field(old_pk, i),
1747 field, len);
1748 }
1749
1750 mtr_start(&mtr);
1751 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1752 BTR_MODIFY_TREE, &pcur, &mtr);
1753 #ifdef UNIV_DEBUG
1754 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1755 case BTR_CUR_DELETE_REF:
1756 case BTR_CUR_DEL_MARK_IBUF:
1757 case BTR_CUR_DELETE_IBUF:
1758 case BTR_CUR_INSERT_TO_IBUF:
1759 /* We did not request buffering. */
1760 break;
1761 case BTR_CUR_HASH:
1762 case BTR_CUR_HASH_FAIL:
1763 case BTR_CUR_BINARY:
1764 goto flag_ok;
1765 }
1766 ut_ad(0);
1767 flag_ok:
1768 #endif /* UNIV_DEBUG */
1769
1770 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1771 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1772 all_done:
1773 mtr_commit(&mtr);
1774 /* The record was not found. All done. */
1775 /* This should only happen when an earlier
1776 ROW_T_INSERT was skipped or
1777 ROW_T_UPDATE was interpreted as ROW_T_DELETE
1778 due to BLOBs having been freed by rollback. */
1779 return(DB_SUCCESS);
1780 }
1781
1782 offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
1783 ULINT_UNDEFINED, &offsets_heap);
1784 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1785 ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1786 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1787
1788 /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */
1789
1790 {
1791 ulint len;
1792 const byte* mrec_trx_id
1793 = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1794 ut_ad(len == DATA_TRX_ID_LEN);
1795 const byte* rec_trx_id
1796 = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1797 trx_id_col, &len);
1798 ut_ad(len == DATA_TRX_ID_LEN);
1799
1800 ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
1801 == mrec_trx_id + DATA_TRX_ID_LEN);
1802 ut_ad(len == DATA_ROLL_PTR_LEN);
1803 ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1804 trx_id_col + 1, &len)
1805 == rec_trx_id + DATA_TRX_ID_LEN);
1806 ut_ad(len == DATA_ROLL_PTR_LEN);
1807
1808 if (memcmp(mrec_trx_id, rec_trx_id,
1809 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1810 /* The ROW_T_DELETE was logged for a different
1811 PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR.
1812 This is possible if a ROW_T_INSERT was skipped
1813 or a ROW_T_UPDATE was interpreted as ROW_T_DELETE
1814 because some BLOBs were missing due to
1815 (1) rolling back the initial insert, or
1816 (2) purging the BLOB for a later ROW_T_DELETE
1817 (3) purging 'old values' for a later ROW_T_UPDATE
1818 or ROW_T_DELETE. */
1819 ut_ad(!log->same_pk);
1820 goto all_done;
1821 }
1822 }
1823
1824 return(row_log_table_apply_delete_low(&pcur, offsets, save_ext,
1825 heap, &mtr));
1826 }
1827
1828 /******************************************************//**
1829 Replays an update operation on a table that was rebuilt.
1830 @return DB_SUCCESS or error code */
1831 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1832 dberr_t
row_log_table_apply_update(que_thr_t * thr,ulint new_trx_id_col,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id,const dtuple_t * old_pk)1833 row_log_table_apply_update(
1834 /*=======================*/
1835 que_thr_t* thr, /*!< in: query graph */
1836 ulint new_trx_id_col, /*!< in: position of
1837 DB_TRX_ID in the new
1838 clustered index */
1839 const mrec_t* mrec, /*!< in: new value */
1840 const ulint* offsets, /*!< in: offsets of mrec */
1841 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1842 that can be emptied */
1843 mem_heap_t* heap, /*!< in/out: memory heap */
1844 row_merge_dup_t* dup, /*!< in/out: for reporting
1845 duplicate key errors */
1846 trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
1847 const dtuple_t* old_pk) /*!< in: PRIMARY KEY and
1848 DB_TRX_ID,DB_ROLL_PTR
1849 of the old value,
1850 or PRIMARY KEY if same_pk */
1851 {
1852 const row_log_t*log = dup->index->online_log;
1853 const dtuple_t* row;
1854 dict_index_t* index = dict_table_get_first_index(log->table);
1855 mtr_t mtr;
1856 btr_pcur_t pcur;
1857 dberr_t error;
1858 ulint n_index = 0;
1859
1860 ut_ad(dtuple_get_n_fields_cmp(old_pk)
1861 == dict_index_get_n_unique(index));
1862 ut_ad(dtuple_get_n_fields(old_pk)
1863 == dict_index_get_n_unique(index)
1864 + (log->same_pk ? 0 : 2));
1865
1866 row = row_log_table_apply_convert_mrec(
1867 mrec, dup->index, offsets, log, heap, trx_id, &error);
1868
1869 switch (error) {
1870 case DB_MISSING_HISTORY:
1871 /* The record contained BLOBs that are now missing. */
1872 ut_ad(log->blobs);
1873 /* Whether or not we are updating the PRIMARY KEY, we
1874 know that there should be a subsequent
1875 ROW_T_DELETE for rolling back a preceding ROW_T_INSERT,
1876 overriding this ROW_T_UPDATE record. (*1)
1877
1878 This allows us to interpret this ROW_T_UPDATE
1879 as ROW_T_DELETE.
1880
1881 When applying the subsequent ROW_T_DELETE, no matching
1882 record will be found. */
1883 return(DB_SUCCESS);
1884 case DB_SUCCESS:
1885 ut_ad(row != NULL);
1886 break;
1887 default:
1888 ut_ad(0);
1889 case DB_INVALID_NULL:
1890 ut_ad(row == NULL);
1891 return(error);
1892 }
1893
1894 mtr_start(&mtr);
1895 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1896 BTR_MODIFY_TREE, &pcur, &mtr);
1897 #ifdef UNIV_DEBUG
1898 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1899 case BTR_CUR_DELETE_REF:
1900 case BTR_CUR_DEL_MARK_IBUF:
1901 case BTR_CUR_DELETE_IBUF:
1902 case BTR_CUR_INSERT_TO_IBUF:
1903 ut_ad(0);/* We did not request buffering. */
1904 case BTR_CUR_HASH:
1905 case BTR_CUR_HASH_FAIL:
1906 case BTR_CUR_BINARY:
1907 break;
1908 }
1909 #endif /* UNIV_DEBUG */
1910
1911 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1912 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1913 /* The record was not found. This should only happen
1914 when an earlier ROW_T_INSERT or ROW_T_UPDATE was
1915 diverted because BLOBs were freed when the insert was
1916 later rolled back. */
1917
1918 ut_ad(log->blobs);
1919
1920 if (error == DB_SUCCESS) {
1921 /* An earlier ROW_T_INSERT could have been
1922 skipped because of a missing BLOB, like this:
1923
1924 BEGIN;
1925 INSERT INTO t SET blob_col='blob value';
1926 UPDATE t SET blob_col='';
1927 ROLLBACK;
1928
1929 This would generate the following records:
1930 ROW_T_INSERT (referring to 'blob value')
1931 ROW_T_UPDATE
1932 ROW_T_UPDATE (referring to 'blob value')
1933 ROW_T_DELETE
1934 [ROLLBACK removes the 'blob value']
1935
1936 The ROW_T_INSERT would have been skipped
1937 because of a missing BLOB. Now we are
1938 executing the first ROW_T_UPDATE.
1939 The second ROW_T_UPDATE (for the ROLLBACK)
1940 would be interpreted as ROW_T_DELETE, because
1941 the BLOB would be missing.
1942
1943 We could probably assume that the transaction
1944 has been rolled back and simply skip the
1945 'insert' part of this ROW_T_UPDATE record.
1946 However, there might be some complex scenario
1947 that could interfere with such a shortcut.
1948 So, we will insert the row (and risk
1949 introducing a bogus duplicate key error
1950 for the ALTER TABLE), and a subsequent
1951 ROW_T_UPDATE or ROW_T_DELETE will delete it. */
1952 mtr_commit(&mtr);
1953 error = row_log_table_apply_insert_low(
1954 thr, row, trx_id, offsets_heap, heap, dup);
1955 } else {
1956 /* Some BLOBs are missing, so we are interpreting
1957 this ROW_T_UPDATE as ROW_T_DELETE (see *1).
1958 Because the record was not found, we do nothing. */
1959 ut_ad(error == DB_MISSING_HISTORY);
1960 error = DB_SUCCESS;
1961 func_exit:
1962 mtr_commit(&mtr);
1963 }
1964 func_exit_committed:
1965 ut_ad(mtr.state == MTR_COMMITTED);
1966
1967 if (error != DB_SUCCESS) {
1968 /* Report the erroneous row using the new
1969 version of the table. */
1970 innobase_row_to_mysql(dup->table, log->table, row);
1971 }
1972
1973 return(error);
1974 }
1975
1976 /* Prepare to update (or delete) the record. */
1977 ulint* cur_offsets = rec_get_offsets(
1978 btr_pcur_get_rec(&pcur),
1979 index, NULL, ULINT_UNDEFINED, &offsets_heap);
1980
1981 if (!log->same_pk) {
1982 /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
1983 was buffered. */
1984 ulint len;
1985 const void* rec_trx_id
1986 = rec_get_nth_field(btr_pcur_get_rec(&pcur),
1987 cur_offsets, index->n_uniq, &len);
1988 ut_ad(len == DATA_TRX_ID_LEN);
1989 ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len
1990 == DATA_TRX_ID_LEN);
1991 ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len
1992 == DATA_ROLL_PTR_LEN);
1993 ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>(
1994 dtuple_get_nth_field(old_pk,
1995 index->n_uniq)->data)
1996 == dtuple_get_nth_field(old_pk,
1997 index->n_uniq + 1)->data);
1998 if (memcmp(rec_trx_id,
1999 dtuple_get_nth_field(old_pk, index->n_uniq)->data,
2000 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
2001 /* The ROW_T_UPDATE was logged for a different
2002 DB_TRX_ID,DB_ROLL_PTR. This is possible if an
2003 earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
2004 because some BLOBs were missing due to rolling
2005 back the initial insert or due to purging
2006 the old BLOB values of an update. */
2007 ut_ad(log->blobs);
2008 if (error != DB_SUCCESS) {
2009 ut_ad(error == DB_MISSING_HISTORY);
2010 /* Some BLOBs are missing, so we are
2011 interpreting this ROW_T_UPDATE as
2012 ROW_T_DELETE (see *1).
2013 Because this is a different row,
2014 we will do nothing. */
2015 error = DB_SUCCESS;
2016 } else {
2017 /* Because the user record is missing due to
2018 BLOBs that were missing when processing
2019 an earlier log record, we should
2020 interpret the ROW_T_UPDATE as ROW_T_INSERT.
2021 However, there is a different user record
2022 with the same PRIMARY KEY value already. */
2023 error = DB_DUPLICATE_KEY;
2024 }
2025
2026 goto func_exit;
2027 }
2028 }
2029
2030 if (error != DB_SUCCESS) {
2031 ut_ad(error == DB_MISSING_HISTORY);
2032 ut_ad(log->blobs);
2033 /* Some BLOBs are missing, so we are interpreting
2034 this ROW_T_UPDATE as ROW_T_DELETE (see *1). */
2035 error = row_log_table_apply_delete_low(
2036 &pcur, cur_offsets, NULL, heap, &mtr);
2037 goto func_exit_committed;
2038 }
2039
2040 dtuple_t* entry = row_build_index_entry(
2041 row, NULL, index, heap);
2042 const upd_t* update = row_upd_build_difference_binary(
2043 index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
2044 false, NULL, heap);
2045
2046 if (!update->n_fields) {
2047 /* Nothing to do. */
2048 goto func_exit;
2049 }
2050
2051 const bool pk_updated
2052 = upd_get_nth_field(update, 0)->field_no < new_trx_id_col;
2053
2054 if (pk_updated || rec_offs_any_extern(cur_offsets)) {
2055 /* If the record contains any externally stored
2056 columns, perform the update by delete and insert,
2057 because we will not write any undo log that would
2058 allow purge to free any orphaned externally stored
2059 columns. */
2060
2061 if (pk_updated && log->same_pk) {
2062 /* The ROW_T_UPDATE log record should only be
2063 written when the PRIMARY KEY fields of the
2064 record did not change in the old table. We
2065 can only get a change of PRIMARY KEY columns
2066 in the rebuilt table if the PRIMARY KEY was
2067 redefined (!same_pk). */
2068 ut_ad(0);
2069 error = DB_CORRUPTION;
2070 goto func_exit;
2071 }
2072
2073 error = row_log_table_apply_delete_low(
2074 &pcur, cur_offsets, NULL, heap, &mtr);
2075 ut_ad(mtr.state == MTR_COMMITTED);
2076
2077 if (error == DB_SUCCESS) {
2078 error = row_log_table_apply_insert_low(
2079 thr, row, trx_id, offsets_heap, heap, dup);
2080 }
2081
2082 goto func_exit_committed;
2083 }
2084
2085 dtuple_t* old_row;
2086 row_ext_t* old_ext;
2087
2088 if (dict_table_get_next_index(index)) {
2089 /* Construct the row corresponding to the old value of
2090 the record. */
2091 old_row = row_build(
2092 ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
2093 cur_offsets, NULL, NULL, NULL, &old_ext, heap);
2094 ut_ad(old_row);
2095 #ifdef ROW_LOG_APPLY_PRINT
2096 if (row_log_apply_print) {
2097 fprintf(stderr, "table apply update "
2098 IB_ID_FMT " " IB_ID_FMT "\n",
2099 index->table->id, index->id);
2100 dtuple_print(stderr, old_row);
2101 dtuple_print(stderr, row);
2102 }
2103 #endif /* ROW_LOG_APPLY_PRINT */
2104 } else {
2105 old_row = NULL;
2106 old_ext = NULL;
2107 }
2108
2109 big_rec_t* big_rec;
2110
2111 error = btr_cur_pessimistic_update(
2112 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2113 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
2114 | BTR_KEEP_POS_FLAG,
2115 btr_pcur_get_btr_cur(&pcur),
2116 &cur_offsets, &offsets_heap, heap, &big_rec,
2117 update, 0, thr, 0, &mtr);
2118
2119 if (big_rec) {
2120 if (error == DB_SUCCESS) {
2121 error = btr_store_big_rec_extern_fields(
2122 index, btr_pcur_get_block(&pcur),
2123 btr_pcur_get_rec(&pcur), cur_offsets,
2124 big_rec, &mtr, BTR_STORE_UPDATE);
2125 }
2126
2127 dtuple_big_rec_free(big_rec);
2128 }
2129
2130 while ((index = dict_table_get_next_index(index)) != NULL) {
2131 if (error != DB_SUCCESS) {
2132 break;
2133 }
2134
2135 n_index++;
2136
2137 if (index->type & DICT_FTS) {
2138 continue;
2139 }
2140
2141 if (!row_upd_changes_ord_field_binary(
2142 index, update, thr, old_row, NULL)) {
2143 continue;
2144 }
2145
2146 mtr_commit(&mtr);
2147
2148 entry = row_build_index_entry(old_row, old_ext, index, heap);
2149 if (!entry) {
2150 ut_ad(0);
2151 return(DB_CORRUPTION);
2152 }
2153
2154 mtr_start(&mtr);
2155
2156 if (ROW_FOUND != row_search_index_entry(
2157 index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
2158 ut_ad(0);
2159 error = DB_CORRUPTION;
2160 break;
2161 }
2162
2163 btr_cur_pessimistic_delete(
2164 &error, FALSE, btr_pcur_get_btr_cur(&pcur),
2165 BTR_CREATE_FLAG, RB_NONE, &mtr);
2166
2167 if (error != DB_SUCCESS) {
2168 break;
2169 }
2170
2171 mtr_commit(&mtr);
2172
2173 entry = row_build_index_entry(row, NULL, index, heap);
2174 error = row_ins_sec_index_entry_low(
2175 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2176 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
2177 BTR_MODIFY_TREE, index, offsets_heap, heap,
2178 entry, trx_id, thr);
2179
2180 /* Report correct index name for duplicate key error. */
2181 if (error == DB_DUPLICATE_KEY) {
2182 thr_get_trx(thr)->error_key_num = n_index;
2183 }
2184
2185 mtr_start(&mtr);
2186 }
2187
2188 goto func_exit;
2189 }
2190
2191 /******************************************************//**
2192 Applies an operation to a table that was rebuilt.
2193 @return NULL on failure (mrec corruption) or when out of data;
2194 pointer to next record on success */
2195 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2196 const mrec_t*
row_log_table_apply_op(que_thr_t * thr,ulint trx_id_col,ulint new_trx_id_col,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)2197 row_log_table_apply_op(
2198 /*===================*/
2199 que_thr_t* thr, /*!< in: query graph */
2200 ulint trx_id_col, /*!< in: position of
2201 DB_TRX_ID in old index */
2202 ulint new_trx_id_col, /*!< in: position of
2203 DB_TRX_ID in new index */
2204 row_merge_dup_t* dup, /*!< in/out: for reporting
2205 duplicate key errors */
2206 dberr_t* error, /*!< out: DB_SUCCESS
2207 or error code */
2208 mem_heap_t* offsets_heap, /*!< in/out: memory heap
2209 that can be emptied */
2210 mem_heap_t* heap, /*!< in/out: memory heap */
2211 const mrec_t* mrec, /*!< in: merge record */
2212 const mrec_t* mrec_end, /*!< in: end of buffer */
2213 ulint* offsets) /*!< in/out: work area
2214 for parsing mrec */
2215 {
2216 row_log_t* log = dup->index->online_log;
2217 dict_index_t* new_index = dict_table_get_first_index(log->table);
2218 ulint extra_size;
2219 const mrec_t* next_mrec;
2220 dtuple_t* old_pk;
2221 row_ext_t* ext;
2222 ulint ext_size;
2223
2224 ut_ad(dict_index_is_clust(dup->index));
2225 ut_ad(dup->index->table != log->table);
2226 ut_ad(log->head.total <= log->tail.total);
2227
2228 *error = DB_SUCCESS;
2229
2230 /* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
2231 if (mrec + 3 >= mrec_end) {
2232 return(NULL);
2233 }
2234
2235 const mrec_t* const mrec_start = mrec;
2236
2237 switch (*mrec++) {
2238 default:
2239 ut_ad(0);
2240 *error = DB_CORRUPTION;
2241 return(NULL);
2242 case ROW_T_INSERT:
2243 extra_size = *mrec++;
2244
2245 if (extra_size >= 0x80) {
2246 /* Read another byte of extra_size. */
2247
2248 extra_size = (extra_size & 0x7f) << 8;
2249 extra_size |= *mrec++;
2250 }
2251
2252 mrec += extra_size;
2253
2254 if (mrec > mrec_end) {
2255 return(NULL);
2256 }
2257
2258 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2259 rec_init_offsets_temp(mrec, dup->index, offsets);
2260
2261 next_mrec = mrec + rec_offs_data_size(offsets);
2262
2263 if (next_mrec > mrec_end) {
2264 return(NULL);
2265 } else {
2266 log->head.total += next_mrec - mrec_start;
2267
2268 ulint len;
2269 const byte* db_trx_id
2270 = rec_get_nth_field(
2271 mrec, offsets, trx_id_col, &len);
2272 ut_ad(len == DATA_TRX_ID_LEN);
2273 *error = row_log_table_apply_insert(
2274 thr, mrec, offsets, offsets_heap,
2275 heap, dup, trx_read_trx_id(db_trx_id));
2276 }
2277 break;
2278
2279 case ROW_T_DELETE:
2280 /* 1 (extra_size) + 4 (ext_size) + at least 1 (payload) */
2281 if (mrec + 6 >= mrec_end) {
2282 return(NULL);
2283 }
2284
2285 extra_size = *mrec++;
2286 ext_size = mach_read_from_4(mrec);
2287 mrec += 4;
2288 ut_ad(mrec < mrec_end);
2289
2290 /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2291 For fixed-length PRIMARY key columns, it is 0. */
2292 mrec += extra_size;
2293
2294 rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2295 rec_init_offsets_temp(mrec, new_index, offsets);
2296 next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
2297 if (next_mrec > mrec_end) {
2298 return(NULL);
2299 }
2300
2301 log->head.total += next_mrec - mrec_start;
2302
2303 /* If there are external fields, retrieve those logged
2304 prefix info and reconstruct the row_ext_t */
2305 if (ext_size) {
2306 /* We use memcpy to avoid unaligned
2307 access on some non-x86 platforms.*/
2308 ext = static_cast<row_ext_t*>(
2309 mem_heap_dup(heap,
2310 mrec + rec_offs_data_size(offsets),
2311 ext_size));
2312
2313 byte* ext_start = reinterpret_cast<byte*>(ext);
2314
2315 ulint ext_len = sizeof(*ext)
2316 + (ext->n_ext - 1) * sizeof ext->len;
2317
2318 ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
2319 ext_len += ext->n_ext * sizeof(*ext->ext);
2320
2321 ext->buf = static_cast<byte*>(ext_start + ext_len);
2322 } else {
2323 ext = NULL;
2324 }
2325
2326 *error = row_log_table_apply_delete(
2327 thr, new_trx_id_col,
2328 mrec, offsets, offsets_heap, heap,
2329 log, ext);
2330 break;
2331
2332 case ROW_T_UPDATE:
2333 /* Logically, the log entry consists of the
2334 (PRIMARY KEY,DB_TRX_ID) of the old value (converted
2335 to the new primary key definition) followed by
2336 the new value in the old table definition. If the
2337 definition of the columns belonging to PRIMARY KEY
2338 is not changed, the log will only contain
2339 DB_TRX_ID,new_row. */
2340
2341 if (dup->index->online_log->same_pk) {
2342 ut_ad(new_index->n_uniq == dup->index->n_uniq);
2343
2344 extra_size = *mrec++;
2345
2346 if (extra_size >= 0x80) {
2347 /* Read another byte of extra_size. */
2348
2349 extra_size = (extra_size & 0x7f) << 8;
2350 extra_size |= *mrec++;
2351 }
2352
2353 mrec += extra_size;
2354
2355 if (mrec > mrec_end) {
2356 return(NULL);
2357 }
2358
2359 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2360 rec_init_offsets_temp(mrec, dup->index, offsets);
2361
2362 next_mrec = mrec + rec_offs_data_size(offsets);
2363
2364 if (next_mrec > mrec_end) {
2365 return(NULL);
2366 }
2367
2368 old_pk = dtuple_create(heap, new_index->n_uniq);
2369 dict_index_copy_types(
2370 old_pk, new_index, old_pk->n_fields);
2371
2372 /* Copy the PRIMARY KEY fields from mrec to old_pk. */
2373 for (ulint i = 0; i < new_index->n_uniq; i++) {
2374 const void* field;
2375 ulint len;
2376 dfield_t* dfield;
2377
2378 ut_ad(!rec_offs_nth_extern(offsets, i));
2379
2380 field = rec_get_nth_field(
2381 mrec, offsets, i, &len);
2382 ut_ad(len != UNIV_SQL_NULL);
2383
2384 dfield = dtuple_get_nth_field(old_pk, i);
2385 dfield_set_data(dfield, field, len);
2386 }
2387 } else {
2388 /* We assume extra_size < 0x100
2389 for the PRIMARY KEY prefix. */
2390 mrec += *mrec + 1;
2391
2392 if (mrec > mrec_end) {
2393 return(NULL);
2394 }
2395
2396 /* Get offsets for PRIMARY KEY,
2397 DB_TRX_ID, DB_ROLL_PTR. */
2398 rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2399 rec_init_offsets_temp(mrec, new_index, offsets);
2400
2401 next_mrec = mrec + rec_offs_data_size(offsets);
2402 if (next_mrec + 2 > mrec_end) {
2403 return(NULL);
2404 }
2405
2406 /* Copy the PRIMARY KEY fields and
2407 DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2408 old_pk = dtuple_create(heap, new_index->n_uniq + 2);
2409 dict_index_copy_types(old_pk, new_index,
2410 old_pk->n_fields);
2411
2412 for (ulint i = 0;
2413 i < dict_index_get_n_unique(new_index) + 2;
2414 i++) {
2415 const void* field;
2416 ulint len;
2417 dfield_t* dfield;
2418
2419 ut_ad(!rec_offs_nth_extern(offsets, i));
2420
2421 field = rec_get_nth_field(
2422 mrec, offsets, i, &len);
2423 ut_ad(len != UNIV_SQL_NULL);
2424
2425 dfield = dtuple_get_nth_field(old_pk, i);
2426 dfield_set_data(dfield, field, len);
2427 }
2428
2429 mrec = next_mrec;
2430
2431 /* Fetch the new value of the row as it was
2432 in the old table definition. */
2433 extra_size = *mrec++;
2434
2435 if (extra_size >= 0x80) {
2436 /* Read another byte of extra_size. */
2437
2438 extra_size = (extra_size & 0x7f) << 8;
2439 extra_size |= *mrec++;
2440 }
2441
2442 mrec += extra_size;
2443
2444 if (mrec > mrec_end) {
2445 return(NULL);
2446 }
2447
2448 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2449 rec_init_offsets_temp(mrec, dup->index, offsets);
2450
2451 next_mrec = mrec + rec_offs_data_size(offsets);
2452
2453 if (next_mrec > mrec_end) {
2454 return(NULL);
2455 }
2456 }
2457
2458 ut_ad(next_mrec <= mrec_end);
2459 log->head.total += next_mrec - mrec_start;
2460 dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2461
2462 {
2463 ulint len;
2464 const byte* db_trx_id
2465 = rec_get_nth_field(
2466 mrec, offsets, trx_id_col, &len);
2467 ut_ad(len == DATA_TRX_ID_LEN);
2468 *error = row_log_table_apply_update(
2469 thr, new_trx_id_col,
2470 mrec, offsets, offsets_heap,
2471 heap, dup, trx_read_trx_id(db_trx_id), old_pk);
2472 }
2473
2474 break;
2475 }
2476
2477 ut_ad(log->head.total <= log->tail.total);
2478 mem_heap_empty(offsets_heap);
2479 mem_heap_empty(heap);
2480 return(next_mrec);
2481 }
2482
2483 /******************************************************//**
2484 Applies operations to a table was rebuilt.
2485 @return DB_SUCCESS, or error code on failure */
2486 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2487 dberr_t
row_log_table_apply_ops(que_thr_t * thr,row_merge_dup_t * dup)2488 row_log_table_apply_ops(
2489 /*====================*/
2490 que_thr_t* thr, /*!< in: query graph */
2491 row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key
2492 errors */
2493 {
2494 dberr_t error;
2495 const mrec_t* mrec = NULL;
2496 const mrec_t* next_mrec;
2497 const mrec_t* mrec_end = NULL; /* silence bogus warning */
2498 const mrec_t* next_mrec_end;
2499 mem_heap_t* heap;
2500 mem_heap_t* offsets_heap;
2501 ulint* offsets;
2502 bool has_index_lock;
2503 dict_index_t* index = const_cast<dict_index_t*>(
2504 dup->index);
2505 dict_table_t* new_table = index->online_log->table;
2506 dict_index_t* new_index = dict_table_get_first_index(
2507 new_table);
2508 const ulint i = 1 + REC_OFFS_HEADER_SIZE
2509 + ut_max(dict_index_get_n_fields(index),
2510 dict_index_get_n_unique(new_index) + 2);
2511 const ulint trx_id_col = dict_col_get_clust_pos(
2512 dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
2513 const ulint new_trx_id_col = dict_col_get_clust_pos(
2514 dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2515 trx_t* trx = thr_get_trx(thr);
2516
2517 ut_ad(dict_index_is_clust(index));
2518 ut_ad(dict_index_is_online_ddl(index));
2519 ut_ad(trx->mysql_thd);
2520 #ifdef UNIV_SYNC_DEBUG
2521 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2522 #endif /* UNIV_SYNC_DEBUG */
2523 ut_ad(!dict_index_is_online_ddl(new_index));
2524 ut_ad(trx_id_col > 0);
2525 ut_ad(trx_id_col != ULINT_UNDEFINED);
2526 ut_ad(new_trx_id_col > 0);
2527 ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2528
2529 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2530
2531 offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
2532 offsets[0] = i;
2533 offsets[1] = dict_index_get_n_fields(index);
2534
2535 heap = mem_heap_create(UNIV_PAGE_SIZE);
2536 offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
2537 has_index_lock = true;
2538
2539 next_block:
2540 ut_ad(has_index_lock);
2541 #ifdef UNIV_SYNC_DEBUG
2542 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2543 #endif /* UNIV_SYNC_DEBUG */
2544 ut_ad(index->online_log->head.bytes == 0);
2545
2546 if (trx_is_interrupted(trx)) {
2547 goto interrupted;
2548 }
2549
2550 if (dict_index_is_corrupted(index)) {
2551 error = DB_INDEX_CORRUPT;
2552 goto func_exit;
2553 }
2554
2555 ut_ad(dict_index_is_online_ddl(index));
2556
2557 error = index->online_log->error;
2558
2559 if (error != DB_SUCCESS) {
2560 goto func_exit;
2561 }
2562
2563 if (UNIV_UNLIKELY(index->online_log->head.blocks
2564 > index->online_log->tail.blocks)) {
2565 unexpected_eof:
2566 fprintf(stderr, "InnoDB: unexpected end of temporary file"
2567 " for table %s\n", index->table_name);
2568 corruption:
2569 error = DB_CORRUPTION;
2570 goto func_exit;
2571 }
2572
2573 if (index->online_log->head.blocks
2574 == index->online_log->tail.blocks) {
2575 if (index->online_log->head.blocks) {
2576 #ifdef HAVE_FTRUNCATE
2577 /* Truncate the file in order to save space. */
2578 if (index->online_log->fd != -1
2579 && ftruncate(index->online_log->fd, 0) == -1) {
2580 perror("ftruncate");
2581 }
2582 #endif /* HAVE_FTRUNCATE */
2583 index->online_log->head.blocks
2584 = index->online_log->tail.blocks = 0;
2585 }
2586
2587 next_mrec = index->online_log->tail.block;
2588 next_mrec_end = next_mrec + index->online_log->tail.bytes;
2589
2590 if (next_mrec_end == next_mrec) {
2591 /* End of log reached. */
2592 all_done:
2593 ut_ad(has_index_lock);
2594 ut_ad(index->online_log->head.blocks == 0);
2595 ut_ad(index->online_log->tail.blocks == 0);
2596 index->online_log->head.bytes = 0;
2597 index->online_log->tail.bytes = 0;
2598 error = DB_SUCCESS;
2599 goto func_exit;
2600 }
2601 } else {
2602 os_offset_t ofs;
2603 ibool success;
2604
2605 ofs = (os_offset_t) index->online_log->head.blocks
2606 * srv_sort_buf_size;
2607
2608 ut_ad(has_index_lock);
2609 has_index_lock = false;
2610 rw_lock_x_unlock(dict_index_get_lock(index));
2611
2612 log_free_check();
2613
2614 ut_ad(dict_index_is_online_ddl(index));
2615
2616 if (!row_log_block_allocate(index->online_log->head)) {
2617 error = DB_OUT_OF_MEMORY;
2618 goto func_exit;
2619 }
2620
2621 success = os_file_read_no_error_handling_int_fd(
2622 index->online_log->fd,
2623 index->online_log->head.block, ofs,
2624 srv_sort_buf_size);
2625 if (!success) {
2626 fprintf(stderr, "InnoDB: unable to read temporary file"
2627 " for table %s\n", index->table_name);
2628 goto corruption;
2629 }
2630
2631 #ifdef POSIX_FADV_DONTNEED
2632 /* Each block is read exactly once. Free up the file cache. */
2633 posix_fadvise(index->online_log->fd,
2634 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2635 #endif /* POSIX_FADV_DONTNEED */
2636 #if 0 //def FALLOC_FL_PUNCH_HOLE
2637 /* Try to deallocate the space for the file on disk.
2638 This should work on ext4 on Linux 2.6.39 and later,
2639 and be ignored when the operation is unsupported. */
2640 fallocate(index->online_log->fd,
2641 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
2642 ofs, srv_buf_size);
2643 #endif /* FALLOC_FL_PUNCH_HOLE */
2644
2645 next_mrec = index->online_log->head.block;
2646 next_mrec_end = next_mrec + srv_sort_buf_size;
2647 }
2648
2649 /* This read is not protected by index->online_log->mutex for
2650 performance reasons. We will eventually notice any error that
2651 was flagged by a DML thread. */
2652 error = index->online_log->error;
2653
2654 if (error != DB_SUCCESS) {
2655 goto func_exit;
2656 }
2657
2658 if (mrec) {
2659 /* A partial record was read from the previous block.
2660 Copy the temporary buffer full, as we do not know the
2661 length of the record. Parse subsequent records from
2662 the bigger buffer index->online_log->head.block
2663 or index->online_log->tail.block. */
2664
2665 ut_ad(mrec == index->online_log->head.buf);
2666 ut_ad(mrec_end > mrec);
2667 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2668
2669 memcpy((mrec_t*) mrec_end, next_mrec,
2670 (&index->online_log->head.buf)[1] - mrec_end);
2671 mrec = row_log_table_apply_op(
2672 thr, trx_id_col, new_trx_id_col,
2673 dup, &error, offsets_heap, heap,
2674 index->online_log->head.buf,
2675 (&index->online_log->head.buf)[1], offsets);
2676 if (error != DB_SUCCESS) {
2677 goto func_exit;
2678 } else if (UNIV_UNLIKELY(mrec == NULL)) {
2679 /* The record was not reassembled properly. */
2680 goto corruption;
2681 }
2682 /* The record was previously found out to be
2683 truncated. Now that the parse buffer was extended,
2684 it should proceed beyond the old end of the buffer. */
2685 ut_a(mrec > mrec_end);
2686
2687 index->online_log->head.bytes = mrec - mrec_end;
2688 next_mrec += index->online_log->head.bytes;
2689 }
2690
2691 ut_ad(next_mrec <= next_mrec_end);
2692 /* The following loop must not be parsing the temporary
2693 buffer, but head.block or tail.block. */
2694
2695 /* mrec!=NULL means that the next record starts from the
2696 middle of the block */
2697 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2698
2699 #ifdef UNIV_DEBUG
2700 if (next_mrec_end == index->online_log->head.block
2701 + srv_sort_buf_size) {
2702 /* If tail.bytes == 0, next_mrec_end can also be at
2703 the end of tail.block. */
2704 if (index->online_log->tail.bytes == 0) {
2705 ut_ad(next_mrec == next_mrec_end);
2706 ut_ad(index->online_log->tail.blocks == 0);
2707 ut_ad(index->online_log->head.blocks == 0);
2708 ut_ad(index->online_log->head.bytes == 0);
2709 } else {
2710 ut_ad(next_mrec == index->online_log->head.block
2711 + index->online_log->head.bytes);
2712 ut_ad(index->online_log->tail.blocks
2713 > index->online_log->head.blocks);
2714 }
2715 } else if (next_mrec_end == index->online_log->tail.block
2716 + index->online_log->tail.bytes) {
2717 ut_ad(next_mrec == index->online_log->tail.block
2718 + index->online_log->head.bytes);
2719 ut_ad(index->online_log->tail.blocks == 0);
2720 ut_ad(index->online_log->head.blocks == 0);
2721 ut_ad(index->online_log->head.bytes
2722 <= index->online_log->tail.bytes);
2723 } else {
2724 ut_error;
2725 }
2726 #endif /* UNIV_DEBUG */
2727
2728 mrec_end = next_mrec_end;
2729
2730 while (!trx_is_interrupted(trx)) {
2731 mrec = next_mrec;
2732 ut_ad(mrec <= mrec_end);
2733
2734 if (mrec == mrec_end) {
2735 /* We are at the end of the log.
2736 Mark the replay all_done. */
2737 if (has_index_lock) {
2738 goto all_done;
2739 }
2740 }
2741
2742 if (!has_index_lock) {
2743 /* We are applying operations from a different
2744 block than the one that is being written to.
2745 We do not hold index->lock in order to
2746 allow other threads to concurrently buffer
2747 modifications. */
2748 ut_ad(mrec >= index->online_log->head.block);
2749 ut_ad(mrec_end == index->online_log->head.block
2750 + srv_sort_buf_size);
2751 ut_ad(index->online_log->head.bytes
2752 < srv_sort_buf_size);
2753
2754 /* Take the opportunity to do a redo log
2755 checkpoint if needed. */
2756 log_free_check();
2757 } else {
2758 /* We are applying operations from the last block.
2759 Do not allow other threads to buffer anything,
2760 so that we can finally catch up and synchronize. */
2761 ut_ad(index->online_log->head.blocks == 0);
2762 ut_ad(index->online_log->tail.blocks == 0);
2763 ut_ad(mrec_end == index->online_log->tail.block
2764 + index->online_log->tail.bytes);
2765 ut_ad(mrec >= index->online_log->tail.block);
2766 }
2767
2768 /* This read is not protected by index->online_log->mutex
2769 for performance reasons. We will eventually notice any
2770 error that was flagged by a DML thread. */
2771 error = index->online_log->error;
2772
2773 if (error != DB_SUCCESS) {
2774 goto func_exit;
2775 }
2776
2777 next_mrec = row_log_table_apply_op(
2778 thr, trx_id_col, new_trx_id_col,
2779 dup, &error, offsets_heap, heap,
2780 mrec, mrec_end, offsets);
2781
2782 if (error != DB_SUCCESS) {
2783 goto func_exit;
2784 } else if (next_mrec == next_mrec_end) {
2785 /* The record happened to end on a block boundary.
2786 Do we have more blocks left? */
2787 if (has_index_lock) {
2788 /* The index will be locked while
2789 applying the last block. */
2790 goto all_done;
2791 }
2792
2793 mrec = NULL;
2794 process_next_block:
2795 rw_lock_x_lock(dict_index_get_lock(index));
2796 has_index_lock = true;
2797
2798 index->online_log->head.bytes = 0;
2799 index->online_log->head.blocks++;
2800 goto next_block;
2801 } else if (next_mrec != NULL) {
2802 ut_ad(next_mrec < next_mrec_end);
2803 index->online_log->head.bytes += next_mrec - mrec;
2804 } else if (has_index_lock) {
2805 /* When mrec is within tail.block, it should
2806 be a complete record, because we are holding
2807 index->lock and thus excluding the writer. */
2808 ut_ad(index->online_log->tail.blocks == 0);
2809 ut_ad(mrec_end == index->online_log->tail.block
2810 + index->online_log->tail.bytes);
2811 ut_ad(0);
2812 goto unexpected_eof;
2813 } else {
2814 memcpy(index->online_log->head.buf, mrec,
2815 mrec_end - mrec);
2816 mrec_end += index->online_log->head.buf - mrec;
2817 mrec = index->online_log->head.buf;
2818 goto process_next_block;
2819 }
2820 }
2821
2822 interrupted:
2823 error = DB_INTERRUPTED;
2824 func_exit:
2825 if (!has_index_lock) {
2826 rw_lock_x_lock(dict_index_get_lock(index));
2827 }
2828
2829 mem_heap_free(offsets_heap);
2830 mem_heap_free(heap);
2831 row_log_block_free(index->online_log->head);
2832 ut_free(offsets);
2833 return(error);
2834 }
2835
2836 /******************************************************//**
2837 Apply the row_log_table log to a table upon completing rebuild.
2838 @return DB_SUCCESS, or error code on failure */
2839 UNIV_INTERN
2840 dberr_t
row_log_table_apply(que_thr_t * thr,dict_table_t * old_table,struct TABLE * table)2841 row_log_table_apply(
2842 /*================*/
2843 que_thr_t* thr, /*!< in: query graph */
2844 dict_table_t* old_table,
2845 /*!< in: old table */
2846 struct TABLE* table) /*!< in/out: MySQL table
2847 (for reporting duplicates) */
2848 {
2849 dberr_t error;
2850 dict_index_t* clust_index;
2851
2852 thr_get_trx(thr)->error_key_num = 0;
2853
2854 #ifdef UNIV_SYNC_DEBUG
2855 ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
2856 #endif /* UNIV_SYNC_DEBUG */
2857 clust_index = dict_table_get_first_index(old_table);
2858
2859 rw_lock_x_lock(dict_index_get_lock(clust_index));
2860
2861 if (!clust_index->online_log) {
2862 ut_ad(dict_index_get_online_status(clust_index)
2863 == ONLINE_INDEX_COMPLETE);
2864 /* This function should not be called unless
2865 rebuilding a table online. Build in some fault
2866 tolerance. */
2867 ut_ad(0);
2868 error = DB_ERROR;
2869 } else {
2870 row_merge_dup_t dup = {
2871 clust_index, table,
2872 clust_index->online_log->col_map, 0
2873 };
2874
2875 error = row_log_table_apply_ops(thr, &dup);
2876
2877 ut_ad(error != DB_SUCCESS
2878 || clust_index->online_log->head.total
2879 == clust_index->online_log->tail.total);
2880 }
2881
2882 rw_lock_x_unlock(dict_index_get_lock(clust_index));
2883 return(error);
2884 }
2885
2886 /******************************************************//**
2887 Allocate the row log for an index and flag the index
2888 for online creation.
2889 @retval true if success, false if not */
2890 UNIV_INTERN
2891 bool
row_log_allocate(dict_index_t * index,dict_table_t * table,bool same_pk,const dtuple_t * add_cols,const ulint * col_map,const char * path)2892 row_log_allocate(
2893 /*=============*/
2894 dict_index_t* index, /*!< in/out: index */
2895 dict_table_t* table, /*!< in/out: new table being rebuilt,
2896 or NULL when creating a secondary index */
2897 bool same_pk,/*!< in: whether the definition of the
2898 PRIMARY KEY has remained the same */
2899 const dtuple_t* add_cols,
2900 /*!< in: default values of
2901 added columns, or NULL */
2902 const ulint* col_map,/*!< in: mapping of old column
2903 numbers to new ones, or NULL if !table */
2904 const char* path) /*!< in: where to create temporary file */
2905 {
2906 row_log_t* log;
2907 DBUG_ENTER("row_log_allocate");
2908
2909 ut_ad(!dict_index_is_online_ddl(index));
2910 ut_ad(dict_index_is_clust(index) == !!table);
2911 ut_ad(!table || index->table != table);
2912 ut_ad(same_pk || table);
2913 ut_ad(!table || col_map);
2914 ut_ad(!add_cols || col_map);
2915 #ifdef UNIV_SYNC_DEBUG
2916 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2917 #endif /* UNIV_SYNC_DEBUG */
2918 log = (row_log_t*) ut_malloc(sizeof *log);
2919 if (!log) {
2920 DBUG_RETURN(false);
2921 }
2922
2923 log->fd = -1;
2924 mutex_create(index_online_log_key, &log->mutex,
2925 SYNC_INDEX_ONLINE_LOG);
2926 log->blobs = NULL;
2927 log->table = table;
2928 log->same_pk = same_pk;
2929 log->add_cols = add_cols;
2930 log->col_map = col_map;
2931 log->error = DB_SUCCESS;
2932 log->max_trx = 0;
2933 log->tail.blocks = log->tail.bytes = 0;
2934 log->tail.total = 0;
2935 log->tail.block = log->head.block = NULL;
2936 log->head.blocks = log->head.bytes = 0;
2937 log->head.total = 0;
2938 log->path = path;
2939 dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
2940 index->online_log = log;
2941
2942 /* While we might be holding an exclusive data dictionary lock
2943 here, in row_log_abort_sec() we will not always be holding it. Use
2944 atomic operations in both cases. */
2945 MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
2946
2947 DBUG_RETURN(true);
2948 }
2949
2950 /******************************************************//**
2951 Free the row log for an index that was being created online. */
2952 UNIV_INTERN
2953 void
row_log_free(row_log_t * & log)2954 row_log_free(
2955 /*=========*/
2956 row_log_t*& log) /*!< in,own: row log */
2957 {
2958 MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
2959
2960 delete log->blobs;
2961 row_log_block_free(log->tail);
2962 row_log_block_free(log->head);
2963 row_merge_file_destroy_low(log->fd);
2964 mutex_free(&log->mutex);
2965 ut_free(log);
2966 log = 0;
2967 }
2968
2969 /******************************************************//**
2970 Get the latest transaction ID that has invoked row_log_online_op()
2971 during online creation.
2972 @return latest transaction ID, or 0 if nothing was logged */
2973 UNIV_INTERN
2974 trx_id_t
row_log_get_max_trx(dict_index_t * index)2975 row_log_get_max_trx(
2976 /*================*/
2977 dict_index_t* index) /*!< in: index, must be locked */
2978 {
2979 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
2980 #ifdef UNIV_SYNC_DEBUG
2981 ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
2982 && mutex_own(&index->online_log->mutex))
2983 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2984 #endif /* UNIV_SYNC_DEBUG */
2985 return(index->online_log->max_trx);
2986 }
2987
2988 /******************************************************//**
2989 Applies an operation to a secondary index that was being created. */
2990 static MY_ATTRIBUTE((nonnull))
2991 void
row_log_apply_op_low(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,bool has_index_lock,enum row_op op,trx_id_t trx_id,const dtuple_t * entry)2992 row_log_apply_op_low(
2993 /*=================*/
2994 dict_index_t* index, /*!< in/out: index */
2995 row_merge_dup_t*dup, /*!< in/out: for reporting
2996 duplicate key errors */
2997 dberr_t* error, /*!< out: DB_SUCCESS or error code */
2998 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
2999 allocating offsets; can be emptied */
3000 bool has_index_lock, /*!< in: true if holding index->lock
3001 in exclusive mode */
3002 enum row_op op, /*!< in: operation being applied */
3003 trx_id_t trx_id, /*!< in: transaction identifier */
3004 const dtuple_t* entry) /*!< in: row */
3005 {
3006 mtr_t mtr;
3007 btr_cur_t cursor;
3008 ulint* offsets = NULL;
3009
3010 ut_ad(!dict_index_is_clust(index));
3011 #ifdef UNIV_SYNC_DEBUG
3012 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
3013 == has_index_lock);
3014 #endif /* UNIV_SYNC_DEBUG */
3015 ut_ad(!dict_index_is_corrupted(index));
3016 ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
3017
3018 mtr_start(&mtr);
3019
3020 /* We perform the pessimistic variant of the operations if we
3021 already hold index->lock exclusively. First, search the
3022 record. The operation may already have been performed,
3023 depending on when the row in the clustered index was
3024 scanned. */
3025 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
3026 has_index_lock
3027 ? BTR_MODIFY_TREE
3028 : BTR_MODIFY_LEAF,
3029 &cursor, 0, __FILE__, __LINE__,
3030 &mtr);
3031
3032 ut_ad(dict_index_get_n_unique(index) > 0);
3033 /* This test is somewhat similar to row_ins_must_modify_rec(),
3034 but not identical for unique secondary indexes. */
3035 if (cursor.low_match >= dict_index_get_n_unique(index)
3036 && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
3037 /* We have a matching record. */
3038 bool exists = (cursor.low_match
3039 == dict_index_get_n_fields(index));
3040 #ifdef UNIV_DEBUG
3041 rec_t* rec = btr_cur_get_rec(&cursor);
3042 ut_ad(page_rec_is_user_rec(rec));
3043 ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
3044 #endif /* UNIV_DEBUG */
3045
3046 ut_ad(exists || dict_index_is_unique(index));
3047
3048 switch (op) {
3049 case ROW_OP_DELETE:
3050 if (!exists) {
3051 /* The existing record matches the
3052 unique secondary index key, but the
3053 PRIMARY KEY columns differ. So, this
3054 exact record does not exist. For
3055 example, we could detect a duplicate
3056 key error in some old index before
3057 logging an ROW_OP_INSERT for our
3058 index. This ROW_OP_DELETE could have
3059 been logged for rolling back
3060 TRX_UNDO_INSERT_REC. */
3061 goto func_exit;
3062 }
3063
3064 if (btr_cur_optimistic_delete(
3065 &cursor, BTR_CREATE_FLAG, &mtr)) {
3066 *error = DB_SUCCESS;
3067 break;
3068 }
3069
3070 if (!has_index_lock) {
3071 /* This needs a pessimistic operation.
3072 Lock the index tree exclusively. */
3073 mtr_commit(&mtr);
3074 mtr_start(&mtr);
3075 btr_cur_search_to_nth_level(
3076 index, 0, entry, PAGE_CUR_LE,
3077 BTR_MODIFY_TREE, &cursor, 0,
3078 __FILE__, __LINE__, &mtr);
3079
3080 /* No other thread than the current one
3081 is allowed to modify the index tree.
3082 Thus, the record should still exist. */
3083 ut_ad(cursor.low_match
3084 >= dict_index_get_n_fields(index));
3085 ut_ad(page_rec_is_user_rec(
3086 btr_cur_get_rec(&cursor)));
3087 }
3088
3089 /* As there are no externally stored fields in
3090 a secondary index record, the parameter
3091 rb_ctx = RB_NONE will be ignored. */
3092
3093 btr_cur_pessimistic_delete(
3094 error, FALSE, &cursor,
3095 BTR_CREATE_FLAG, RB_NONE, &mtr);
3096 break;
3097 case ROW_OP_INSERT:
3098 if (exists) {
3099 /* The record already exists. There
3100 is nothing to be inserted.
3101 This could happen when processing
3102 TRX_UNDO_DEL_MARK_REC in statement
3103 rollback:
3104
3105 UPDATE of PRIMARY KEY can lead to
3106 statement rollback if the updated
3107 value of the PRIMARY KEY already
3108 exists. In this case, the UPDATE would
3109 be mapped to DELETE;INSERT, and we
3110 only wrote undo log for the DELETE
3111 part. The duplicate key error would be
3112 triggered before logging the INSERT
3113 part.
3114
3115 Theoretically, we could also get a
3116 similar situation when a DELETE operation
3117 is blocked by a FOREIGN KEY constraint. */
3118 goto func_exit;
3119 }
3120
3121 if (dtuple_contains_null(entry)) {
3122 /* The UNIQUE KEY columns match, but
3123 there is a NULL value in the key, and
3124 NULL!=NULL. */
3125 goto insert_the_rec;
3126 }
3127
3128 goto duplicate;
3129 }
3130 } else {
3131 switch (op) {
3132 rec_t* rec;
3133 big_rec_t* big_rec;
3134 case ROW_OP_DELETE:
3135 /* The record does not exist. For example, we
3136 could detect a duplicate key error in some old
3137 index before logging an ROW_OP_INSERT for our
3138 index. This ROW_OP_DELETE could be logged for
3139 rolling back TRX_UNDO_INSERT_REC. */
3140 goto func_exit;
3141 case ROW_OP_INSERT:
3142 if (dict_index_is_unique(index)
3143 && (cursor.up_match
3144 >= dict_index_get_n_unique(index)
3145 || cursor.low_match
3146 >= dict_index_get_n_unique(index))
3147 && (!index->n_nullable
3148 || !dtuple_contains_null(entry))) {
3149 duplicate:
3150 /* Duplicate key */
3151 ut_ad(dict_index_is_unique(index));
3152 row_merge_dup_report(dup, entry->fields);
3153 *error = DB_DUPLICATE_KEY;
3154 goto func_exit;
3155 }
3156 insert_the_rec:
3157 /* Insert the record. As we are inserting into
3158 a secondary index, there cannot be externally
3159 stored columns (!big_rec). */
3160 *error = btr_cur_optimistic_insert(
3161 BTR_NO_UNDO_LOG_FLAG
3162 | BTR_NO_LOCKING_FLAG
3163 | BTR_CREATE_FLAG,
3164 &cursor, &offsets, &offsets_heap,
3165 const_cast<dtuple_t*>(entry),
3166 &rec, &big_rec, 0, NULL, &mtr);
3167 ut_ad(!big_rec);
3168 if (*error != DB_FAIL) {
3169 break;
3170 }
3171
3172 if (!has_index_lock) {
3173 /* This needs a pessimistic operation.
3174 Lock the index tree exclusively. */
3175 mtr_commit(&mtr);
3176 mtr_start(&mtr);
3177 btr_cur_search_to_nth_level(
3178 index, 0, entry, PAGE_CUR_LE,
3179 BTR_MODIFY_TREE, &cursor, 0,
3180 __FILE__, __LINE__, &mtr);
3181 }
3182
3183 /* We already determined that the
3184 record did not exist. No other thread
3185 than the current one is allowed to
3186 modify the index tree. Thus, the
3187 record should still not exist. */
3188
3189 *error = btr_cur_pessimistic_insert(
3190 BTR_NO_UNDO_LOG_FLAG
3191 | BTR_NO_LOCKING_FLAG
3192 | BTR_CREATE_FLAG,
3193 &cursor, &offsets, &offsets_heap,
3194 const_cast<dtuple_t*>(entry),
3195 &rec, &big_rec,
3196 0, NULL, &mtr);
3197 ut_ad(!big_rec);
3198 break;
3199 }
3200 mem_heap_empty(offsets_heap);
3201 }
3202
3203 if (*error == DB_SUCCESS && trx_id) {
3204 page_update_max_trx_id(btr_cur_get_block(&cursor),
3205 btr_cur_get_page_zip(&cursor),
3206 trx_id, &mtr);
3207 }
3208
3209 func_exit:
3210 mtr_commit(&mtr);
3211 }
3212
3213 /******************************************************//**
3214 Applies an operation to a secondary index that was being created.
3215 @return NULL on failure (mrec corruption) or when out of data;
3216 pointer to next record on success */
3217 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3218 const mrec_t*
row_log_apply_op(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,bool has_index_lock,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)3219 row_log_apply_op(
3220 /*=============*/
3221 dict_index_t* index, /*!< in/out: index */
3222 row_merge_dup_t*dup, /*!< in/out: for reporting
3223 duplicate key errors */
3224 dberr_t* error, /*!< out: DB_SUCCESS or error code */
3225 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
3226 allocating offsets; can be emptied */
3227 mem_heap_t* heap, /*!< in/out: memory heap for
3228 allocating data tuples */
3229 bool has_index_lock, /*!< in: true if holding index->lock
3230 in exclusive mode */
3231 const mrec_t* mrec, /*!< in: merge record */
3232 const mrec_t* mrec_end, /*!< in: end of buffer */
3233 ulint* offsets) /*!< in/out: work area for
3234 rec_init_offsets_temp() */
3235
3236 {
3237 enum row_op op;
3238 ulint extra_size;
3239 ulint data_size;
3240 ulint n_ext;
3241 dtuple_t* entry;
3242 trx_id_t trx_id;
3243
3244 /* Online index creation is only used for secondary indexes. */
3245 ut_ad(!dict_index_is_clust(index));
3246 #ifdef UNIV_SYNC_DEBUG
3247 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
3248 == has_index_lock);
3249 #endif /* UNIV_SYNC_DEBUG */
3250
3251 if (dict_index_is_corrupted(index)) {
3252 *error = DB_INDEX_CORRUPT;
3253 return(NULL);
3254 }
3255
3256 *error = DB_SUCCESS;
3257
3258 if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
3259 return(NULL);
3260 }
3261
3262 switch (*mrec) {
3263 case ROW_OP_INSERT:
3264 if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
3265 return(NULL);
3266 }
3267
3268 op = static_cast<enum row_op>(*mrec++);
3269 trx_id = trx_read_trx_id(mrec);
3270 mrec += DATA_TRX_ID_LEN;
3271 break;
3272 case ROW_OP_DELETE:
3273 op = static_cast<enum row_op>(*mrec++);
3274 trx_id = 0;
3275 break;
3276 default:
3277 corrupted:
3278 ut_ad(0);
3279 *error = DB_CORRUPTION;
3280 return(NULL);
3281 }
3282
3283 extra_size = *mrec++;
3284
3285 ut_ad(mrec < mrec_end);
3286
3287 if (extra_size >= 0x80) {
3288 /* Read another byte of extra_size. */
3289
3290 extra_size = (extra_size & 0x7f) << 8;
3291 extra_size |= *mrec++;
3292 }
3293
3294 mrec += extra_size;
3295
3296 if (mrec > mrec_end) {
3297 return(NULL);
3298 }
3299
3300 rec_init_offsets_temp(mrec, index, offsets);
3301
3302 if (rec_offs_any_extern(offsets)) {
3303 /* There should never be any externally stored fields
3304 in a secondary index, which is what online index
3305 creation is used for. Therefore, the log file must be
3306 corrupted. */
3307 goto corrupted;
3308 }
3309
3310 data_size = rec_offs_data_size(offsets);
3311
3312 mrec += data_size;
3313
3314 if (mrec > mrec_end) {
3315 return(NULL);
3316 }
3317
3318 entry = row_rec_to_index_entry_low(
3319 mrec - data_size, index, offsets, &n_ext, heap);
3320 /* Online index creation is only implemented for secondary
3321 indexes, which never contain off-page columns. */
3322 ut_ad(n_ext == 0);
3323 #ifdef ROW_LOG_APPLY_PRINT
3324 if (row_log_apply_print) {
3325 fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ",
3326 index->id, trx_id,
3327 unsigned (op), unsigned (has_index_lock));
3328 for (const byte* m = mrec - data_size; m < mrec; m++) {
3329 fprintf(stderr, "%02x", *m);
3330 }
3331 putc('\n', stderr);
3332 }
3333 #endif /* ROW_LOG_APPLY_PRINT */
3334 row_log_apply_op_low(index, dup, error, offsets_heap,
3335 has_index_lock, op, trx_id, entry);
3336 return(mrec);
3337 }
3338
3339 /******************************************************//**
3340 Applies operations to a secondary index that was being created.
3341 @return DB_SUCCESS, or error code on failure */
3342 static MY_ATTRIBUTE((nonnull))
3343 dberr_t
row_log_apply_ops(trx_t * trx,dict_index_t * index,row_merge_dup_t * dup)3344 row_log_apply_ops(
3345 /*==============*/
3346 trx_t* trx, /*!< in: transaction (for checking if
3347 the operation was interrupted) */
3348 dict_index_t* index, /*!< in/out: index */
3349 row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key
3350 errors */
3351 {
3352 dberr_t error;
3353 const mrec_t* mrec = NULL;
3354 const mrec_t* next_mrec;
3355 const mrec_t* mrec_end= NULL; /* silence bogus warning */
3356 const mrec_t* next_mrec_end;
3357 mem_heap_t* offsets_heap;
3358 mem_heap_t* heap;
3359 ulint* offsets;
3360 bool has_index_lock;
3361 const ulint i = 1 + REC_OFFS_HEADER_SIZE
3362 + dict_index_get_n_fields(index);
3363
3364 ut_ad(dict_index_is_online_ddl(index));
3365 ut_ad(*index->name == TEMP_INDEX_PREFIX);
3366 #ifdef UNIV_SYNC_DEBUG
3367 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3368 #endif /* UNIV_SYNC_DEBUG */
3369 ut_ad(index->online_log);
3370 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3371
3372 offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
3373 offsets[0] = i;
3374 offsets[1] = dict_index_get_n_fields(index);
3375
3376 offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
3377 heap = mem_heap_create(UNIV_PAGE_SIZE);
3378 has_index_lock = true;
3379
3380 next_block:
3381 ut_ad(has_index_lock);
3382 #ifdef UNIV_SYNC_DEBUG
3383 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3384 #endif /* UNIV_SYNC_DEBUG */
3385 ut_ad(index->online_log->head.bytes == 0);
3386
3387 if (trx_is_interrupted(trx)) {
3388 goto interrupted;
3389 }
3390
3391 error = index->online_log->error;
3392 if (error != DB_SUCCESS) {
3393 goto func_exit;
3394 }
3395
3396 if (dict_index_is_corrupted(index)) {
3397 error = DB_INDEX_CORRUPT;
3398 goto func_exit;
3399 }
3400
3401 if (UNIV_UNLIKELY(index->online_log->head.blocks
3402 > index->online_log->tail.blocks)) {
3403 unexpected_eof:
3404 fprintf(stderr, "InnoDB: unexpected end of temporary file"
3405 " for index %s\n", index->name + 1);
3406 corruption:
3407 error = DB_CORRUPTION;
3408 goto func_exit;
3409 }
3410
3411 if (index->online_log->head.blocks
3412 == index->online_log->tail.blocks) {
3413 if (index->online_log->head.blocks) {
3414 #ifdef HAVE_FTRUNCATE
3415 /* Truncate the file in order to save space. */
3416 if (index->online_log->fd != -1
3417 && ftruncate(index->online_log->fd, 0) == -1) {
3418 perror("ftruncate");
3419 }
3420 #endif /* HAVE_FTRUNCATE */
3421 index->online_log->head.blocks
3422 = index->online_log->tail.blocks = 0;
3423 }
3424
3425 next_mrec = index->online_log->tail.block;
3426 next_mrec_end = next_mrec + index->online_log->tail.bytes;
3427
3428 if (next_mrec_end == next_mrec) {
3429 /* End of log reached. */
3430 all_done:
3431 ut_ad(has_index_lock);
3432 ut_ad(index->online_log->head.blocks == 0);
3433 ut_ad(index->online_log->tail.blocks == 0);
3434 error = DB_SUCCESS;
3435 goto func_exit;
3436 }
3437 } else {
3438 os_offset_t ofs;
3439 ibool success;
3440
3441 ofs = (os_offset_t) index->online_log->head.blocks
3442 * srv_sort_buf_size;
3443
3444 ut_ad(has_index_lock);
3445 has_index_lock = false;
3446 rw_lock_x_unlock(dict_index_get_lock(index));
3447
3448 log_free_check();
3449
3450 if (!row_log_block_allocate(index->online_log->head)) {
3451 error = DB_OUT_OF_MEMORY;
3452 goto func_exit;
3453 }
3454
3455 success = os_file_read_no_error_handling_int_fd(
3456 index->online_log->fd,
3457 index->online_log->head.block, ofs,
3458 srv_sort_buf_size);
3459
3460 if (!success) {
3461 fprintf(stderr, "InnoDB: unable to read temporary file"
3462 " for index %s\n", index->name + 1);
3463 goto corruption;
3464 }
3465
3466 #ifdef POSIX_FADV_DONTNEED
3467 /* Each block is read exactly once. Free up the file cache. */
3468 posix_fadvise(index->online_log->fd,
3469 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3470 #endif /* POSIX_FADV_DONTNEED */
3471 #if 0 //def FALLOC_FL_PUNCH_HOLE
3472 /* Try to deallocate the space for the file on disk.
3473 This should work on ext4 on Linux 2.6.39 and later,
3474 and be ignored when the operation is unsupported. */
3475 fallocate(index->online_log->fd,
3476 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
3477 ofs, srv_buf_size);
3478 #endif /* FALLOC_FL_PUNCH_HOLE */
3479
3480 next_mrec = index->online_log->head.block;
3481 next_mrec_end = next_mrec + srv_sort_buf_size;
3482 }
3483
3484 if (mrec) {
3485 /* A partial record was read from the previous block.
3486 Copy the temporary buffer full, as we do not know the
3487 length of the record. Parse subsequent records from
3488 the bigger buffer index->online_log->head.block
3489 or index->online_log->tail.block. */
3490
3491 ut_ad(mrec == index->online_log->head.buf);
3492 ut_ad(mrec_end > mrec);
3493 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3494
3495 memcpy((mrec_t*) mrec_end, next_mrec,
3496 (&index->online_log->head.buf)[1] - mrec_end);
3497 mrec = row_log_apply_op(
3498 index, dup, &error, offsets_heap, heap,
3499 has_index_lock, index->online_log->head.buf,
3500 (&index->online_log->head.buf)[1], offsets);
3501 if (error != DB_SUCCESS) {
3502 goto func_exit;
3503 } else if (UNIV_UNLIKELY(mrec == NULL)) {
3504 /* The record was not reassembled properly. */
3505 goto corruption;
3506 }
3507 /* The record was previously found out to be
3508 truncated. Now that the parse buffer was extended,
3509 it should proceed beyond the old end of the buffer. */
3510 ut_a(mrec > mrec_end);
3511
3512 index->online_log->head.bytes = mrec - mrec_end;
3513 next_mrec += index->online_log->head.bytes;
3514 }
3515
3516 ut_ad(next_mrec <= next_mrec_end);
3517 /* The following loop must not be parsing the temporary
3518 buffer, but head.block or tail.block. */
3519
3520 /* mrec!=NULL means that the next record starts from the
3521 middle of the block */
3522 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3523
3524 #ifdef UNIV_DEBUG
3525 if (next_mrec_end == index->online_log->head.block
3526 + srv_sort_buf_size) {
3527 /* If tail.bytes == 0, next_mrec_end can also be at
3528 the end of tail.block. */
3529 if (index->online_log->tail.bytes == 0) {
3530 ut_ad(next_mrec == next_mrec_end);
3531 ut_ad(index->online_log->tail.blocks == 0);
3532 ut_ad(index->online_log->head.blocks == 0);
3533 ut_ad(index->online_log->head.bytes == 0);
3534 } else {
3535 ut_ad(next_mrec == index->online_log->head.block
3536 + index->online_log->head.bytes);
3537 ut_ad(index->online_log->tail.blocks
3538 > index->online_log->head.blocks);
3539 }
3540 } else if (next_mrec_end == index->online_log->tail.block
3541 + index->online_log->tail.bytes) {
3542 ut_ad(next_mrec == index->online_log->tail.block
3543 + index->online_log->head.bytes);
3544 ut_ad(index->online_log->tail.blocks == 0);
3545 ut_ad(index->online_log->head.blocks == 0);
3546 ut_ad(index->online_log->head.bytes
3547 <= index->online_log->tail.bytes);
3548 } else {
3549 ut_error;
3550 }
3551 #endif /* UNIV_DEBUG */
3552
3553 mrec_end = next_mrec_end;
3554
3555 while (!trx_is_interrupted(trx)) {
3556 mrec = next_mrec;
3557 ut_ad(mrec < mrec_end);
3558
3559 if (!has_index_lock) {
3560 /* We are applying operations from a different
3561 block than the one that is being written to.
3562 We do not hold index->lock in order to
3563 allow other threads to concurrently buffer
3564 modifications. */
3565 ut_ad(mrec >= index->online_log->head.block);
3566 ut_ad(mrec_end == index->online_log->head.block
3567 + srv_sort_buf_size);
3568 ut_ad(index->online_log->head.bytes
3569 < srv_sort_buf_size);
3570
3571 /* Take the opportunity to do a redo log
3572 checkpoint if needed. */
3573 log_free_check();
3574 } else {
3575 /* We are applying operations from the last block.
3576 Do not allow other threads to buffer anything,
3577 so that we can finally catch up and synchronize. */
3578 ut_ad(index->online_log->head.blocks == 0);
3579 ut_ad(index->online_log->tail.blocks == 0);
3580 ut_ad(mrec_end == index->online_log->tail.block
3581 + index->online_log->tail.bytes);
3582 ut_ad(mrec >= index->online_log->tail.block);
3583 }
3584
3585 next_mrec = row_log_apply_op(
3586 index, dup, &error, offsets_heap, heap,
3587 has_index_lock, mrec, mrec_end, offsets);
3588
3589 if (error != DB_SUCCESS) {
3590 goto func_exit;
3591 } else if (next_mrec == next_mrec_end) {
3592 /* The record happened to end on a block boundary.
3593 Do we have more blocks left? */
3594 if (has_index_lock) {
3595 /* The index will be locked while
3596 applying the last block. */
3597 goto all_done;
3598 }
3599
3600 mrec = NULL;
3601 process_next_block:
3602 rw_lock_x_lock(dict_index_get_lock(index));
3603 has_index_lock = true;
3604
3605 index->online_log->head.bytes = 0;
3606 index->online_log->head.blocks++;
3607 goto next_block;
3608 } else if (next_mrec != NULL) {
3609 ut_ad(next_mrec < next_mrec_end);
3610 index->online_log->head.bytes += next_mrec - mrec;
3611 } else if (has_index_lock) {
3612 /* When mrec is within tail.block, it should
3613 be a complete record, because we are holding
3614 index->lock and thus excluding the writer. */
3615 ut_ad(index->online_log->tail.blocks == 0);
3616 ut_ad(mrec_end == index->online_log->tail.block
3617 + index->online_log->tail.bytes);
3618 ut_ad(0);
3619 goto unexpected_eof;
3620 } else {
3621 memcpy(index->online_log->head.buf, mrec,
3622 mrec_end - mrec);
3623 mrec_end += index->online_log->head.buf - mrec;
3624 mrec = index->online_log->head.buf;
3625 goto process_next_block;
3626 }
3627 }
3628
3629 interrupted:
3630 error = DB_INTERRUPTED;
3631 func_exit:
3632 if (!has_index_lock) {
3633 rw_lock_x_lock(dict_index_get_lock(index));
3634 }
3635
3636 switch (error) {
3637 case DB_SUCCESS:
3638 break;
3639 case DB_INDEX_CORRUPT:
3640 if (((os_offset_t) index->online_log->tail.blocks + 1)
3641 * srv_sort_buf_size >= srv_online_max_size) {
3642 /* The log file grew too big. */
3643 error = DB_ONLINE_LOG_TOO_BIG;
3644 }
3645 /* fall through */
3646 default:
3647 /* We set the flag directly instead of invoking
3648 dict_set_corrupted_index_cache_only(index) here,
3649 because the index is not "public" yet. */
3650 index->type |= DICT_CORRUPT;
3651 }
3652
3653 mem_heap_free(heap);
3654 mem_heap_free(offsets_heap);
3655 row_log_block_free(index->online_log->head);
3656 ut_free(offsets);
3657 return(error);
3658 }
3659
3660 /******************************************************//**
3661 Apply the row log to the index upon completing index creation.
3662 @return DB_SUCCESS, or error code on failure */
3663 UNIV_INTERN
3664 dberr_t
row_log_apply(trx_t * trx,dict_index_t * index,struct TABLE * table)3665 row_log_apply(
3666 /*==========*/
3667 trx_t* trx, /*!< in: transaction (for checking if
3668 the operation was interrupted) */
3669 dict_index_t* index, /*!< in/out: secondary index */
3670 struct TABLE* table) /*!< in/out: MySQL table
3671 (for reporting duplicates) */
3672 {
3673 dberr_t error;
3674 row_log_t* log;
3675 row_merge_dup_t dup = { index, table, NULL, 0 };
3676 DBUG_ENTER("row_log_apply");
3677
3678 ut_ad(dict_index_is_online_ddl(index));
3679 ut_ad(!dict_index_is_clust(index));
3680
3681 log_free_check();
3682
3683 rw_lock_x_lock(dict_index_get_lock(index));
3684
3685 if (!dict_table_is_corrupted(index->table)) {
3686 error = row_log_apply_ops(trx, index, &dup);
3687 } else {
3688 error = DB_SUCCESS;
3689 }
3690
3691 if (error != DB_SUCCESS) {
3692 ut_a(!dict_table_is_discarded(index->table));
3693 /* We set the flag directly instead of invoking
3694 dict_set_corrupted_index_cache_only(index) here,
3695 because the index is not "public" yet. */
3696 index->type |= DICT_CORRUPT;
3697 index->table->drop_aborted = TRUE;
3698
3699 dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
3700 } else {
3701 ut_ad(dup.n_dup == 0);
3702 dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
3703 }
3704
3705 log = index->online_log;
3706 index->online_log = NULL;
3707 /* We could remove the TEMP_INDEX_PREFIX and update the data
3708 dictionary to say that this index is complete, if we had
3709 access to the .frm file here. If the server crashes before
3710 all requested indexes have been created, this completed index
3711 will be dropped. */
3712 rw_lock_x_unlock(dict_index_get_lock(index));
3713
3714 row_log_free(log);
3715
3716 DBUG_RETURN(error);
3717 }
3718