1 /*****************************************************************************
2
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #include "row0row.h"
44 #ifdef HAVE_LZO
45 #include "lzo/lzo1x.h"
46 #endif
47 #ifdef HAVE_SNAPPY
48 #include "snappy-c.h"
49 #endif
50
51 #include "scope.h"
52
53 #include <vector>
54
55 #ifdef HAVE_MY_AES_H
56 #include <my_aes.h>
57 #endif
58
59 using st_::span;
60
61 /** The size of the buffer to use for IO.
62 @param n physical page size
63 @return number of pages */
64 #define IO_BUFFER_SIZE(n) ((1024 * 1024) / (n))
65
66 /** For gathering stats on records during phase I */
67 struct row_stats_t {
68 ulint m_n_deleted; /*!< Number of deleted records
69 found in the index */
70
71 ulint m_n_purged; /*!< Number of records purged
72 optimisatically */
73
74 ulint m_n_rows; /*!< Number of rows */
75
76 ulint m_n_purge_failed; /*!< Number of deleted rows
77 that could not be purged */
78 };
79
80 /** Index information required by IMPORT. */
81 struct row_index_t {
82 index_id_t m_id; /*!< Index id of the table
83 in the exporting server */
84 byte* m_name; /*!< Index name */
85
86 ulint m_space; /*!< Space where it is placed */
87
88 ulint m_page_no; /*!< Root page number */
89
90 ulint m_type; /*!< Index type */
91
92 ulint m_trx_id_offset; /*!< Relevant only for clustered
93 indexes, offset of transaction
94 id system column */
95
96 ulint m_n_user_defined_cols; /*!< User defined columns */
97
98 ulint m_n_uniq; /*!< Number of columns that can
99 uniquely identify the row */
100
101 ulint m_n_nullable; /*!< Number of nullable
102 columns */
103
104 ulint m_n_fields; /*!< Total number of fields */
105
106 dict_field_t* m_fields; /*!< Index fields */
107
108 const dict_index_t*
109 m_srv_index; /*!< Index instance in the
110 importing server */
111
112 row_stats_t m_stats; /*!< Statistics gathered during
113 the import phase */
114
115 };
116
117 /** Meta data required by IMPORT. */
118 struct row_import {
row_importrow_import119 row_import() UNIV_NOTHROW
120 :
121 m_table(NULL),
122 m_version(0),
123 m_hostname(NULL),
124 m_table_name(NULL),
125 m_autoinc(0),
126 m_zip_size(0),
127 m_flags(0),
128 m_n_cols(0),
129 m_cols(NULL),
130 m_col_names(NULL),
131 m_n_indexes(0),
132 m_indexes(NULL),
133 m_missing(true) { }
134
135 ~row_import() UNIV_NOTHROW;
136
137 /** Find the index entry in in the indexes array.
138 @param name index name
139 @return instance if found else 0. */
140 row_index_t* get_index(const char* name) const UNIV_NOTHROW;
141
142 /** Get the number of rows in the index.
143 @param name index name
144 @return number of rows (doesn't include delete marked rows). */
145 ulint get_n_rows(const char* name) const UNIV_NOTHROW;
146
147 /** Find the ordinal value of the column name in the cfg table columns.
148 @param name of column to look for.
149 @return ULINT_UNDEFINED if not found. */
150 ulint find_col(const char* name) const UNIV_NOTHROW;
151
152 /** Get the number of rows for which purge failed during the
153 convert phase.
154 @param name index name
155 @return number of rows for which purge failed. */
156 ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
157
158 /** Check if the index is clean. ie. no delete-marked records
159 @param name index name
160 @return true if index needs to be purged. */
requires_purgerow_import161 bool requires_purge(const char* name) const UNIV_NOTHROW
162 {
163 return(get_n_purge_failed(name) > 0);
164 }
165
166 /** Set the index root <space, pageno> using the index name */
167 void set_root_by_name() UNIV_NOTHROW;
168
169 /** Set the index root <space, pageno> using a heuristic
170 @return DB_SUCCESS or error code */
171 dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172
173 /** Check if the index schema that was read from the .cfg file
174 matches the in memory index definition.
175 Note: It will update row_import_t::m_srv_index to map the meta-data
176 read from the .cfg file to the server index instance.
177 @return DB_SUCCESS or error code. */
178 dberr_t match_index_columns(
179 THD* thd,
180 const dict_index_t* index) UNIV_NOTHROW;
181
182 /** Check if the table schema that was read from the .cfg file
183 matches the in memory table definition.
184 @param thd MySQL session variable
185 @return DB_SUCCESS or error code. */
186 dberr_t match_table_columns(
187 THD* thd) UNIV_NOTHROW;
188
189 /** Check if the table (and index) schema that was read from the
190 .cfg file matches the in memory table definition.
191 @param thd MySQL session variable
192 @return DB_SUCCESS or error code. */
193 dberr_t match_schema(
194 THD* thd) UNIV_NOTHROW;
195
196 dberr_t match_flags(THD *thd) const ;
197
198
199 dict_table_t* m_table; /*!< Table instance */
200
201 ulint m_version; /*!< Version of config file */
202
203 byte* m_hostname; /*!< Hostname where the
204 tablespace was exported */
205 byte* m_table_name; /*!< Exporting instance table
206 name */
207
208 ib_uint64_t m_autoinc; /*!< Next autoinc value */
209
210 ulint m_zip_size; /*!< ROW_FORMAT=COMPRESSED
211 page size, or 0 */
212
213 ulint m_flags; /*!< Table flags */
214
215 ulint m_n_cols; /*!< Number of columns in the
216 meta-data file */
217
218 dict_col_t* m_cols; /*!< Column data */
219
220 byte** m_col_names; /*!< Column names, we store the
221 column naems separately becuase
222 there is no field to store the
223 value in dict_col_t */
224
225 ulint m_n_indexes; /*!< Number of indexes,
226 including clustered index */
227
228 row_index_t* m_indexes; /*!< Index meta data */
229
230 bool m_missing; /*!< true if a .cfg file was
231 found and was readable */
232 };
233
234 struct fil_iterator_t {
235 pfs_os_file_t file; /*!< File handle */
236 const char* filepath; /*!< File path name */
237 os_offset_t start; /*!< From where to start */
238 os_offset_t end; /*!< Where to stop */
239 os_offset_t file_size; /*!< File size in bytes */
240 ulint n_io_buffers; /*!< Number of pages to use
241 for IO */
242 byte* io_buffer; /*!< Buffer to use for IO */
243 fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
244 byte* crypt_io_buffer; /*!< IO buffer when encrypted */
245 };
246
247 /** Use the page cursor to iterate over records in a block. */
248 class RecIterator {
249 public:
250 /** Default constructor */
RecIterator()251 RecIterator() UNIV_NOTHROW
252 {
253 memset(&m_cur, 0x0, sizeof(m_cur));
254 /* Make page_cur_delete_rec() happy. */
255 m_mtr.start();
256 m_mtr.set_log_mode(MTR_LOG_NO_REDO);
257 }
258
259 /** Position the cursor on the first user record. */
open(buf_block_t * block)260 void open(buf_block_t* block) UNIV_NOTHROW
261 {
262 page_cur_set_before_first(block, &m_cur);
263
264 if (!end()) {
265 next();
266 }
267 }
268
269 /** Move to the next record. */
next()270 void next() UNIV_NOTHROW
271 {
272 page_cur_move_to_next(&m_cur);
273 }
274
275 /**
276 @return the current record */
current()277 rec_t* current() UNIV_NOTHROW
278 {
279 ut_ad(!end());
280 return(page_cur_get_rec(&m_cur));
281 }
282
current_block() const283 buf_block_t* current_block() const { return m_cur.block; }
284
285 /**
286 @return true if cursor is at the end */
end()287 bool end() UNIV_NOTHROW
288 {
289 return(page_cur_is_after_last(&m_cur) == TRUE);
290 }
291
292 /** Remove the current record
293 @return true on success */
remove(const dict_index_t * index,rec_offs * offsets)294 bool remove(
295 const dict_index_t* index,
296 rec_offs* offsets) UNIV_NOTHROW
297 {
298 ut_ad(page_is_leaf(m_cur.block->frame));
299 /* We can't end up with an empty page unless it is root. */
300 if (page_get_n_recs(m_cur.block->frame) <= 1) {
301 return(false);
302 }
303
304 if (!rec_offs_any_extern(offsets)
305 && m_cur.block->page.id().page_no() != index->page
306 && ((page_get_data_size(m_cur.block->frame)
307 - rec_offs_size(offsets)
308 < BTR_CUR_PAGE_COMPRESS_LIMIT(index))
309 || !page_has_siblings(m_cur.block->frame)
310 || (page_get_n_recs(m_cur.block->frame) < 2))) {
311 return false;
312 }
313
314 #ifdef UNIV_ZIP_DEBUG
315 page_zip_des_t* page_zip = buf_block_get_page_zip(m_cur.block);
316 ut_a(!page_zip || page_zip_validate(
317 page_zip, m_cur.block->frame, index));
318 #endif /* UNIV_ZIP_DEBUG */
319
320 page_cur_delete_rec(&m_cur, index, offsets, &m_mtr);
321
322 #ifdef UNIV_ZIP_DEBUG
323 ut_a(!page_zip || page_zip_validate(
324 page_zip, m_cur.block->frame, index));
325 #endif /* UNIV_ZIP_DEBUG */
326
327 return true;
328 }
329
330 private:
331 page_cur_t m_cur;
332 public:
333 mtr_t m_mtr;
334 };
335
336 /** Class that purges delete marked records from indexes, both secondary
337 and cluster. It does a pessimistic delete. This should only be done if we
338 couldn't purge the delete marked reocrds during Phase I. */
339 class IndexPurge {
340 public:
341 /** Constructor
342 @param trx the user transaction covering the import tablespace
343 @param index to be imported
344 @param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)345 IndexPurge(
346 trx_t* trx,
347 dict_index_t* index) UNIV_NOTHROW
348 :
349 m_trx(trx),
350 m_index(index),
351 m_n_rows(0)
352 {
353 ib::info() << "Phase II - Purge records from index "
354 << index->name;
355 }
356
357 /** Descructor */
~IndexPurge()358 ~IndexPurge() UNIV_NOTHROW { }
359
360 /** Purge delete marked records.
361 @return DB_SUCCESS or error code. */
362 dberr_t garbage_collect() UNIV_NOTHROW;
363
364 /** The number of records that are not delete marked.
365 @return total records in the index after purge */
get_n_rows() const366 ulint get_n_rows() const UNIV_NOTHROW
367 {
368 return(m_n_rows);
369 }
370
371 private:
372 /** Begin import, position the cursor on the first record. */
373 void open() UNIV_NOTHROW;
374
375 /** Close the persistent curosr and commit the mini-transaction. */
376 void close() UNIV_NOTHROW;
377
378 /** Position the cursor on the next record.
379 @return DB_SUCCESS or error code */
380 dberr_t next() UNIV_NOTHROW;
381
382 /** Store the persistent cursor position and reopen the
383 B-tree cursor in BTR_MODIFY_TREE mode, because the
384 tree structure may be changed during a pessimistic delete. */
385 void purge_pessimistic_delete() UNIV_NOTHROW;
386
387 /** Purge delete-marked records.
388 @param offsets current row offsets. */
389 void purge() UNIV_NOTHROW;
390
391 protected:
392 // Disable copying
393 IndexPurge();
394 IndexPurge(const IndexPurge&);
395 IndexPurge &operator=(const IndexPurge&);
396
397 private:
398 trx_t* m_trx; /*!< User transaction */
399 mtr_t m_mtr; /*!< Mini-transaction */
400 btr_pcur_t m_pcur; /*!< Persistent cursor */
401 dict_index_t* m_index; /*!< Index to be processed */
402 ulint m_n_rows; /*!< Records in index */
403 };
404
405 /** Functor that is called for each physical page that is read from the
406 tablespace file. */
407 class AbstractCallback
408 {
409 public:
410 /** Constructor
411 @param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)412 AbstractCallback(trx_t* trx, ulint space_id)
413 :
414 m_zip_size(0),
415 m_trx(trx),
416 m_space(space_id),
417 m_xdes(),
418 m_xdes_page_no(ULINT_UNDEFINED),
419 m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
420
421 /** Free any extent descriptor instance */
~AbstractCallback()422 virtual ~AbstractCallback()
423 {
424 UT_DELETE_ARRAY(m_xdes);
425 }
426
427 /** Determine the page size to use for traversing the tablespace
428 @param file_size size of the tablespace file in bytes
429 @param block contents of the first page in the tablespace file.
430 @retval DB_SUCCESS or error code. */
431 virtual dberr_t init(
432 os_offset_t file_size,
433 const buf_block_t* block) UNIV_NOTHROW;
434
435 /** @return true if compressed table. */
is_compressed_table() const436 bool is_compressed_table() const UNIV_NOTHROW
437 {
438 return get_zip_size();
439 }
440
441 /** @return the tablespace flags */
get_space_flags() const442 ulint get_space_flags() const
443 {
444 return(m_space_flags);
445 }
446
447 /**
448 Set the name of the physical file and the file handle that is used
449 to open it for the file that is being iterated over.
450 @param filename the physical name of the tablespace file
451 @param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)452 void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
453 {
454 m_file = file;
455 m_filepath = filename;
456 }
457
get_zip_size() const458 ulint get_zip_size() const { return m_zip_size; }
physical_size() const459 ulint physical_size() const
460 {
461 return m_zip_size ? m_zip_size : srv_page_size;
462 }
463
filename() const464 const char* filename() const { return m_filepath; }
465
466 /**
467 Called for every page in the tablespace. If the page was not
468 updated then its state must be set to BUF_PAGE_NOT_USED. For
469 compressed tables the page descriptor memory will be at offset:
470 block->frame + srv_page_size;
471 @param block block read from file, note it is not from the buffer pool
472 @retval DB_SUCCESS or error code. */
473 virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
474
475 /** @return the tablespace identifier */
get_space_id() const476 ulint get_space_id() const { return m_space; }
477
is_interrupted() const478 bool is_interrupted() const { return trx_is_interrupted(m_trx); }
479
480 /**
481 Get the data page depending on the table type, compressed or not.
482 @param block - block read from disk
483 @retval the buffer frame */
get_frame(const buf_block_t * block)484 static byte* get_frame(const buf_block_t* block)
485 {
486 return block->page.zip.data
487 ? block->page.zip.data : block->frame;
488 }
489
490 /** Invoke the functionality for the callback */
491 virtual dberr_t run(const fil_iterator_t& iter,
492 buf_block_t* block) UNIV_NOTHROW = 0;
493
494 protected:
495 /** Get the physical offset of the extent descriptor within the page.
496 @param page_no page number of the extent descriptor
497 @param page contents of the page containing the extent descriptor.
498 @return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const499 const xdes_t* xdes(
500 ulint page_no,
501 const page_t* page) const UNIV_NOTHROW
502 {
503 ulint offset;
504
505 offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
506
507 return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
508 }
509
510 /** Set the current page directory (xdes). If the extent descriptor is
511 marked as free then free the current extent descriptor and set it to
512 0. This implies that all pages that are covered by this extent
513 descriptor are also freed.
514
515 @param page_no offset of page within the file
516 @param page page contents
517 @return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)518 dberr_t set_current_xdes(
519 ulint page_no,
520 const page_t* page) UNIV_NOTHROW
521 {
522 m_xdes_page_no = page_no;
523
524 UT_DELETE_ARRAY(m_xdes);
525 m_xdes = NULL;
526
527 if (mach_read_from_4(XDES_ARR_OFFSET + XDES_STATE + page)
528 != XDES_FREE) {
529 const ulint physical_size = m_zip_size
530 ? m_zip_size : srv_page_size;
531
532 m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t, physical_size);
533
534 /* Trigger OOM */
535 DBUG_EXECUTE_IF(
536 "ib_import_OOM_13",
537 UT_DELETE_ARRAY(m_xdes);
538 m_xdes = NULL;
539 );
540
541 if (m_xdes == NULL) {
542 return(DB_OUT_OF_MEMORY);
543 }
544
545 memcpy(m_xdes, page, physical_size);
546 }
547
548 return(DB_SUCCESS);
549 }
550
551 /** Check if the page is marked as free in the extent descriptor.
552 @param page_no page number to check in the extent descriptor.
553 @return true if the page is marked as free */
is_free(uint32_t page_no) const554 bool is_free(uint32_t page_no) const UNIV_NOTHROW
555 {
556 ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
557 == m_xdes_page_no);
558
559 if (m_xdes != 0) {
560 const xdes_t* xdesc = xdes(page_no, m_xdes);
561 ulint pos = page_no % FSP_EXTENT_SIZE;
562
563 return xdes_is_free(xdesc, pos);
564 }
565
566 /* If the current xdes was free, the page must be free. */
567 return(true);
568 }
569
570 protected:
571 /** The ROW_FORMAT=COMPRESSED page size, or 0. */
572 ulint m_zip_size;
573
574 /** File handle to the tablespace */
575 pfs_os_file_t m_file;
576
577 /** Physical file path. */
578 const char* m_filepath;
579
580 /** Covering transaction. */
581 trx_t* m_trx;
582
583 /** Space id of the file being iterated over. */
584 ulint m_space;
585
586 /** Current size of the space in pages */
587 ulint m_size;
588
589 /** Current extent descriptor page */
590 xdes_t* m_xdes;
591
592 /** Physical page offset in the file of the extent descriptor */
593 ulint m_xdes_page_no;
594
595 /** Flags value read from the header page */
596 ulint m_space_flags;
597 };
598
599 /** Determine the page size to use for traversing the tablespace
600 @param file_size size of the tablespace file in bytes
601 @param block contents of the first page in the tablespace file.
602 @retval DB_SUCCESS or error code. */
603 dberr_t
init(os_offset_t file_size,const buf_block_t * block)604 AbstractCallback::init(
605 os_offset_t file_size,
606 const buf_block_t* block) UNIV_NOTHROW
607 {
608 const page_t* page = block->frame;
609
610 m_space_flags = fsp_header_get_flags(page);
611 if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
612 ulint cflags = fsp_flags_convert_from_101(m_space_flags);
613 if (cflags == ULINT_UNDEFINED) {
614 return(DB_CORRUPTION);
615 }
616 m_space_flags = cflags;
617 }
618
619 /* Clear the DATA_DIR flag, which is basically garbage. */
620 m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
621 m_zip_size = fil_space_t::zip_size(m_space_flags);
622 const ulint logical_size = fil_space_t::logical_size(m_space_flags);
623 const ulint physical_size = fil_space_t::physical_size(m_space_flags);
624
625 if (logical_size != srv_page_size) {
626
627 ib::error() << "Page size " << logical_size
628 << " of ibd file is not the same as the server page"
629 " size " << srv_page_size;
630
631 return(DB_CORRUPTION);
632
633 } else if (file_size & (physical_size - 1)) {
634
635 ib::error() << "File size " << file_size << " is not a"
636 " multiple of the page size "
637 << physical_size;
638
639 return(DB_CORRUPTION);
640 }
641
642 m_size = mach_read_from_4(page + FSP_SIZE);
643 if (m_space == ULINT_UNDEFINED) {
644 m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
645 + page);
646 }
647
648 return set_current_xdes(0, page);
649 }
650
651 /**
652 TODO: This can be made parallel trivially by chunking up the file
653 and creating a callback per thread.. Main benefit will be to use
654 multiple CPUs for checksums and compressed tables. We have to do
655 compressed tables block by block right now. Secondly we need to
656 decompress/compress and copy too much of data. These are
657 CPU intensive.
658
659 Iterate over all the pages in the tablespace.
660 @param iter - Tablespace iterator
661 @param block - block to use for IO
662 @param callback - Callback to inspect and update page contents
663 @retval DB_SUCCESS or error code */
664 static dberr_t fil_iterate(
665 const fil_iterator_t& iter,
666 buf_block_t* block,
667 AbstractCallback& callback);
668
669 /**
670 Try and determine the index root pages by checking if the next/prev
671 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
672 struct FetchIndexRootPages : public AbstractCallback {
673
674 /** Index information gathered from the .ibd file. */
675 struct Index {
676
IndexFetchIndexRootPages::Index677 Index(index_id_t id, ulint page_no)
678 :
679 m_id(id),
680 m_page_no(page_no) { }
681
682 index_id_t m_id; /*!< Index id */
683 ulint m_page_no; /*!< Root page number */
684 };
685
686 /** Constructor
687 @param trx covering (user) transaction
688 @param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages689 FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
690 :
691 AbstractCallback(trx, ULINT_UNDEFINED),
692 m_table(table), m_index(0, 0) UNIV_NOTHROW { }
693
694 /** Destructor */
~FetchIndexRootPagesFetchIndexRootPages695 ~FetchIndexRootPages() UNIV_NOTHROW override { }
696
697 /** Fetch the clustered index root page in the tablespace
698 @param iter Tablespace iterator
699 @param block Block to use for IO
700 @retval DB_SUCCESS or error code */
701 dberr_t run(const fil_iterator_t& iter,
702 buf_block_t* block) UNIV_NOTHROW override;
703
704 /** Called for each block as it is read from the file.
705 @param block block to convert, it is not from the buffer pool.
706 @retval DB_SUCCESS or error code. */
707 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
708
709 /** Update the import configuration that will be used to import
710 the tablespace. */
711 dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
712
713 /** Table definition in server. */
714 const dict_table_t* m_table;
715
716 /** Index information */
717 Index m_index;
718 };
719
720 /** Called for each block as it is read from the file. Check index pages to
721 determine the exact row format. We can't get that from the tablespace
722 header flags alone.
723
724 @param block block to convert, it is not from the buffer pool.
725 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)726 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
727 {
728 if (is_interrupted()) return DB_INTERRUPTED;
729
730 const page_t* page = get_frame(block);
731
732 m_index.m_id = btr_page_get_index_id(page);
733 m_index.m_page_no = block->page.id().page_no();
734
735 /* Check that the tablespace flags match the table flags. */
736 ulint expected = dict_tf_to_fsp_flags(m_table->flags);
737 if (!fsp_flags_match(expected, m_space_flags)) {
738 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
739 ER_TABLE_SCHEMA_MISMATCH,
740 "Expected FSP_SPACE_FLAGS=0x%x, .ibd "
741 "file contains 0x%x.",
742 unsigned(expected),
743 unsigned(m_space_flags));
744 return(DB_CORRUPTION);
745 }
746
747 if (!page_is_comp(block->frame) !=
748 !dict_table_is_comp(m_table)) {
749 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
750 ER_TABLE_SCHEMA_MISMATCH,
751 "ROW_FORMAT mismatch");
752 return DB_CORRUPTION;
753 }
754
755 return DB_SUCCESS;
756 }
757
758 /**
759 Update the import configuration that will be used to import the tablespace.
760 @return error code or DB_SUCCESS */
761 dberr_t
build_row_import(row_import * cfg) const762 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
763 {
764 ut_a(cfg->m_table == m_table);
765 cfg->m_zip_size = m_zip_size;
766 cfg->m_n_indexes = 1;
767
768 if (cfg->m_n_indexes == 0) {
769
770 ib::error() << "No B+Tree found in tablespace";
771
772 return(DB_CORRUPTION);
773 }
774
775 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
776
777 /* Trigger OOM */
778 DBUG_EXECUTE_IF(
779 "ib_import_OOM_11",
780 UT_DELETE_ARRAY(cfg->m_indexes);
781 cfg->m_indexes = NULL;
782 );
783
784 if (cfg->m_indexes == NULL) {
785 return(DB_OUT_OF_MEMORY);
786 }
787
788 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
789
790 row_index_t* cfg_index = cfg->m_indexes;
791
792 char name[BUFSIZ];
793
794 snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
795
796 ulint len = strlen(name) + 1;
797
798 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
799
800 /* Trigger OOM */
801 DBUG_EXECUTE_IF(
802 "ib_import_OOM_12",
803 UT_DELETE_ARRAY(cfg_index->m_name);
804 cfg_index->m_name = NULL;
805 );
806
807 if (cfg_index->m_name == NULL) {
808 return(DB_OUT_OF_MEMORY);
809 }
810
811 memcpy(cfg_index->m_name, name, len);
812
813 cfg_index->m_id = m_index.m_id;
814
815 cfg_index->m_space = m_space;
816
817 cfg_index->m_page_no = m_index.m_page_no;
818
819 return(DB_SUCCESS);
820 }
821
822 /* Functor that is called for each physical page that is read from the
823 tablespace file.
824
825 1. Check each page for corruption.
826
827 2. Update the space id and LSN on every page
828 * For the header page
829 - Validate the flags
830 - Update the LSN
831
832 3. On Btree pages
833 * Set the index id
834 * Update the max trx id
835 * In a cluster index, update the system columns
836 * In a cluster index, update the BLOB ptr, set the space id
837 * Purge delete marked records, but only if they can be easily
838 removed from the page
839 * Keep a counter of number of rows, ie. non-delete-marked rows
840 * Keep a counter of number of delete marked rows
841 * Keep a counter of number of purge failure
842 * If a page is stamped with an index id that isn't in the .cfg file
843 we assume it is deleted and the page can be ignored.
844
845 4. Set the page state to dirty so that it will be written to disk.
846 */
847 class PageConverter : public AbstractCallback {
848 public:
849 /** Constructor
850 @param cfg config of table being imported.
851 @param space_id tablespace identifier
852 @param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)853 PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
854 :
855 AbstractCallback(trx, space_id),
856 m_cfg(cfg),
857 m_index(cfg->m_indexes),
858 m_rec_iter(),
859 m_offsets_(), m_offsets(m_offsets_),
860 m_heap(0),
861 m_cluster_index(dict_table_get_first_index(cfg->m_table))
862 {
863 rec_offs_init(m_offsets_);
864 }
865
~PageConverter()866 ~PageConverter() UNIV_NOTHROW override
867 {
868 if (m_heap != 0) {
869 mem_heap_free(m_heap);
870 }
871 }
872
run(const fil_iterator_t & iter,buf_block_t * block)873 dberr_t run(const fil_iterator_t& iter,
874 buf_block_t* block) UNIV_NOTHROW override
875 {
876 return fil_iterate(iter, block, *this);
877 }
878
879 /** Called for each block as it is read from the file.
880 @param block block to convert, it is not from the buffer pool.
881 @retval DB_SUCCESS or error code. */
882 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
883
884 private:
885 /** Update the page, set the space id, max trx id and index id.
886 @param block block read from file
887 @param page_type type of the page
888 @retval DB_SUCCESS or error code */
889 dberr_t update_page(buf_block_t* block, uint16_t& page_type)
890 UNIV_NOTHROW;
891
892 /** Update the space, index id, trx id.
893 @param block block to convert
894 @return DB_SUCCESS or error code */
895 dberr_t update_index_page(buf_block_t* block) UNIV_NOTHROW;
896
897 /** Update the BLOB refrences and write UNDO log entries for
898 rows that can't be purged optimistically.
899 @param block block to update
900 @retval DB_SUCCESS or error code */
901 dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
902
903 /** Validate the space flags and update tablespace header page.
904 @param block block read from file, not from the buffer pool.
905 @retval DB_SUCCESS or error code */
906 dberr_t update_header(buf_block_t* block) UNIV_NOTHROW;
907
908 /** Adjust the BLOB reference for a single column that is externally stored
909 @param rec record to update
910 @param offsets column offsets for the record
911 @param i column ordinal value
912 @return DB_SUCCESS or error code */
913 dberr_t adjust_cluster_index_blob_column(
914 rec_t* rec,
915 const rec_offs* offsets,
916 ulint i) UNIV_NOTHROW;
917
918 /** Adjusts the BLOB reference in the clustered index row for all
919 externally stored columns.
920 @param rec record to update
921 @param offsets column offsets for the record
922 @return DB_SUCCESS or error code */
923 dberr_t adjust_cluster_index_blob_columns(
924 rec_t* rec,
925 const rec_offs* offsets) UNIV_NOTHROW;
926
927 /** In the clustered index, adjist the BLOB pointers as needed.
928 Also update the BLOB reference, write the new space id.
929 @param rec record to update
930 @param offsets column offsets for the record
931 @return DB_SUCCESS or error code */
932 dberr_t adjust_cluster_index_blob_ref(
933 rec_t* rec,
934 const rec_offs* offsets) UNIV_NOTHROW;
935
936 /** Purge delete-marked records, only if it is possible to do
937 so without re-organising the B+tree.
938 @retval true if purged */
939 bool purge() UNIV_NOTHROW;
940
941 /** Adjust the BLOB references and sys fields for the current record.
942 @param rec record to update
943 @param offsets column offsets for the record
944 @return DB_SUCCESS or error code. */
945 dberr_t adjust_cluster_record(
946 rec_t* rec,
947 const rec_offs* offsets) UNIV_NOTHROW;
948
949 /** Find an index with the matching id.
950 @return row_index_t* instance or 0 */
find_index(index_id_t id)951 row_index_t* find_index(index_id_t id) UNIV_NOTHROW
952 {
953 row_index_t* index = &m_cfg->m_indexes[0];
954
955 for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
956 if (id == index->m_id) {
957 return(index);
958 }
959 }
960
961 return(0);
962
963 }
964 private:
965 /** Config for table that is being imported. */
966 row_import* m_cfg;
967
968 /** Current index whose pages are being imported */
969 row_index_t* m_index;
970
971 /** Iterator over records in a block */
972 RecIterator m_rec_iter;
973
974 /** Record offset */
975 rec_offs m_offsets_[REC_OFFS_NORMAL_SIZE];
976
977 /** Pointer to m_offsets_ */
978 rec_offs* m_offsets;
979
980 /** Memory heap for the record offsets */
981 mem_heap_t* m_heap;
982
983 /** Cluster index instance */
984 dict_index_t* m_cluster_index;
985 };
986
987 /**
988 row_import destructor. */
~row_import()989 row_import::~row_import() UNIV_NOTHROW
990 {
991 for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
992 UT_DELETE_ARRAY(m_indexes[i].m_name);
993
994 if (m_indexes[i].m_fields == NULL) {
995 continue;
996 }
997
998 dict_field_t* fields = m_indexes[i].m_fields;
999 ulint n_fields = m_indexes[i].m_n_fields;
1000
1001 for (ulint j = 0; j < n_fields; ++j) {
1002 UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
1003 }
1004
1005 UT_DELETE_ARRAY(fields);
1006 }
1007
1008 for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
1009 UT_DELETE_ARRAY(m_col_names[i]);
1010 }
1011
1012 UT_DELETE_ARRAY(m_cols);
1013 UT_DELETE_ARRAY(m_indexes);
1014 UT_DELETE_ARRAY(m_col_names);
1015 UT_DELETE_ARRAY(m_table_name);
1016 UT_DELETE_ARRAY(m_hostname);
1017 }
1018
1019 /** Find the index entry in in the indexes array.
1020 @param name index name
1021 @return instance if found else 0. */
1022 row_index_t*
get_index(const char * name) const1023 row_import::get_index(
1024 const char* name) const UNIV_NOTHROW
1025 {
1026 for (ulint i = 0; i < m_n_indexes; ++i) {
1027 const char* index_name;
1028 row_index_t* index = &m_indexes[i];
1029
1030 index_name = reinterpret_cast<const char*>(index->m_name);
1031
1032 if (strcmp(index_name, name) == 0) {
1033
1034 return(index);
1035 }
1036 }
1037
1038 return(0);
1039 }
1040
1041 /** Get the number of rows in the index.
1042 @param name index name
1043 @return number of rows (doesn't include delete marked rows). */
1044 ulint
get_n_rows(const char * name) const1045 row_import::get_n_rows(
1046 const char* name) const UNIV_NOTHROW
1047 {
1048 const row_index_t* index = get_index(name);
1049
1050 ut_a(name != 0);
1051
1052 return(index->m_stats.m_n_rows);
1053 }
1054
1055 /** Get the number of rows for which purge failed uding the convert phase.
1056 @param name index name
1057 @return number of rows for which purge failed. */
1058 ulint
get_n_purge_failed(const char * name) const1059 row_import::get_n_purge_failed(
1060 const char* name) const UNIV_NOTHROW
1061 {
1062 const row_index_t* index = get_index(name);
1063
1064 ut_a(name != 0);
1065
1066 return(index->m_stats.m_n_purge_failed);
1067 }
1068
1069 /** Find the ordinal value of the column name in the cfg table columns.
1070 @param name of column to look for.
1071 @return ULINT_UNDEFINED if not found. */
1072 ulint
find_col(const char * name) const1073 row_import::find_col(
1074 const char* name) const UNIV_NOTHROW
1075 {
1076 for (ulint i = 0; i < m_n_cols; ++i) {
1077 const char* col_name;
1078
1079 col_name = reinterpret_cast<const char*>(m_col_names[i]);
1080
1081 if (strcmp(col_name, name) == 0) {
1082 return(i);
1083 }
1084 }
1085
1086 return(ULINT_UNDEFINED);
1087 }
1088
1089 /**
1090 Check if the index schema that was read from the .cfg file matches the
1091 in memory index definition.
1092 @return DB_SUCCESS or error code. */
1093 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1094 row_import::match_index_columns(
1095 THD* thd,
1096 const dict_index_t* index) UNIV_NOTHROW
1097 {
1098 row_index_t* cfg_index;
1099 dberr_t err = DB_SUCCESS;
1100
1101 cfg_index = get_index(index->name);
1102
1103 if (cfg_index == 0) {
1104 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1105 ER_TABLE_SCHEMA_MISMATCH,
1106 "Index %s not found in tablespace meta-data file.",
1107 index->name());
1108
1109 return(DB_ERROR);
1110 }
1111
1112 if (cfg_index->m_n_fields != index->n_fields) {
1113
1114 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1115 ER_TABLE_SCHEMA_MISMATCH,
1116 "Index field count %u doesn't match"
1117 " tablespace metadata file value " ULINTPF,
1118 index->n_fields, cfg_index->m_n_fields);
1119
1120 return(DB_ERROR);
1121 }
1122
1123 cfg_index->m_srv_index = index;
1124
1125 const dict_field_t* field = index->fields;
1126 const dict_field_t* cfg_field = cfg_index->m_fields;
1127
1128 for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1129
1130 if (field->name() && cfg_field->name()
1131 && strcmp(field->name(), cfg_field->name()) != 0) {
1132 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1133 ER_TABLE_SCHEMA_MISMATCH,
1134 "Index field name %s doesn't match"
1135 " tablespace metadata field name %s"
1136 " for field position " ULINTPF,
1137 field->name(), cfg_field->name(), i);
1138
1139 err = DB_ERROR;
1140 }
1141
1142 if (cfg_field->prefix_len != field->prefix_len) {
1143 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1144 ER_TABLE_SCHEMA_MISMATCH,
1145 "Index %s field %s prefix len %u"
1146 " doesn't match metadata file value %u",
1147 index->name(), field->name(),
1148 field->prefix_len, cfg_field->prefix_len);
1149
1150 err = DB_ERROR;
1151 }
1152
1153 if (cfg_field->fixed_len != field->fixed_len) {
1154 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1155 ER_TABLE_SCHEMA_MISMATCH,
1156 "Index %s field %s fixed len %u"
1157 " doesn't match metadata file value %u",
1158 index->name(), field->name(),
1159 field->fixed_len,
1160 cfg_field->fixed_len);
1161
1162 err = DB_ERROR;
1163 }
1164 }
1165
1166 return(err);
1167 }
1168
1169 /** Check if the table schema that was read from the .cfg file matches the
1170 in memory table definition.
1171 @param thd MySQL session variable
1172 @return DB_SUCCESS or error code. */
1173 dberr_t
match_table_columns(THD * thd)1174 row_import::match_table_columns(
1175 THD* thd) UNIV_NOTHROW
1176 {
1177 dberr_t err = DB_SUCCESS;
1178 const dict_col_t* col = m_table->cols;
1179
1180 for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1181
1182 const char* col_name;
1183 ulint cfg_col_index;
1184
1185 col_name = dict_table_get_col_name(
1186 m_table, dict_col_get_no(col));
1187
1188 cfg_col_index = find_col(col_name);
1189
1190 if (cfg_col_index == ULINT_UNDEFINED) {
1191
1192 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1193 ER_TABLE_SCHEMA_MISMATCH,
1194 "Column %s not found in tablespace.",
1195 col_name);
1196
1197 err = DB_ERROR;
1198 } else if (cfg_col_index != col->ind) {
1199
1200 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1201 ER_TABLE_SCHEMA_MISMATCH,
1202 "Column %s ordinal value mismatch, it's at %u"
1203 " in the table and " ULINTPF
1204 " in the tablespace meta-data file",
1205 col_name, col->ind, cfg_col_index);
1206
1207 err = DB_ERROR;
1208 } else {
1209 const dict_col_t* cfg_col;
1210
1211 cfg_col = &m_cols[cfg_col_index];
1212 ut_a(cfg_col->ind == cfg_col_index);
1213
1214 if (cfg_col->prtype != col->prtype) {
1215 ib_errf(thd,
1216 IB_LOG_LEVEL_ERROR,
1217 ER_TABLE_SCHEMA_MISMATCH,
1218 "Column %s precise type mismatch,"
1219 " it's 0X%X in the table and 0X%X"
1220 " in the tablespace meta file",
1221 col_name, col->prtype, cfg_col->prtype);
1222 err = DB_ERROR;
1223 }
1224
1225 if (cfg_col->mtype != col->mtype) {
1226 ib_errf(thd,
1227 IB_LOG_LEVEL_ERROR,
1228 ER_TABLE_SCHEMA_MISMATCH,
1229 "Column %s main type mismatch,"
1230 " it's 0X%X in the table and 0X%X"
1231 " in the tablespace meta file",
1232 col_name, col->mtype, cfg_col->mtype);
1233 err = DB_ERROR;
1234 }
1235
1236 if (cfg_col->len != col->len) {
1237 ib_errf(thd,
1238 IB_LOG_LEVEL_ERROR,
1239 ER_TABLE_SCHEMA_MISMATCH,
1240 "Column %s length mismatch,"
1241 " it's %u in the table and %u"
1242 " in the tablespace meta file",
1243 col_name, col->len, cfg_col->len);
1244 err = DB_ERROR;
1245 }
1246
1247 if (cfg_col->mbminlen != col->mbminlen
1248 || cfg_col->mbmaxlen != col->mbmaxlen) {
1249 ib_errf(thd,
1250 IB_LOG_LEVEL_ERROR,
1251 ER_TABLE_SCHEMA_MISMATCH,
1252 "Column %s multi-byte len mismatch,"
1253 " it's %u-%u in the table and %u-%u"
1254 " in the tablespace meta file",
1255 col_name, col->mbminlen, col->mbmaxlen,
1256 cfg_col->mbminlen, cfg_col->mbmaxlen);
1257 err = DB_ERROR;
1258 }
1259
1260 if (cfg_col->ind != col->ind) {
1261 ib_errf(thd,
1262 IB_LOG_LEVEL_ERROR,
1263 ER_TABLE_SCHEMA_MISMATCH,
1264 "Column %s position mismatch,"
1265 " it's %u in the table and %u"
1266 " in the tablespace meta file",
1267 col_name, col->ind, cfg_col->ind);
1268 err = DB_ERROR;
1269 }
1270
1271 if (cfg_col->ord_part != col->ord_part) {
1272 ib_errf(thd,
1273 IB_LOG_LEVEL_ERROR,
1274 ER_TABLE_SCHEMA_MISMATCH,
1275 "Column %s ordering mismatch,"
1276 " it's %u in the table and %u"
1277 " in the tablespace meta file",
1278 col_name, col->ord_part,
1279 cfg_col->ord_part);
1280 err = DB_ERROR;
1281 }
1282
1283 if (cfg_col->max_prefix != col->max_prefix) {
1284 ib_errf(thd,
1285 IB_LOG_LEVEL_ERROR,
1286 ER_TABLE_SCHEMA_MISMATCH,
1287 "Column %s max prefix mismatch"
1288 " it's %u in the table and %u"
1289 " in the tablespace meta file",
1290 col_name, col->max_prefix,
1291 cfg_col->max_prefix);
1292 err = DB_ERROR;
1293 }
1294 }
1295 }
1296
1297 return(err);
1298 }
1299
match_flags(THD * thd) const1300 dberr_t row_import::match_flags(THD *thd) const
1301 {
1302 ulint mismatch= (m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR;
1303 if (!mismatch)
1304 return DB_SUCCESS;
1305
1306 const char *msg;
1307 if (mismatch & DICT_TF_MASK_ZIP_SSIZE)
1308 {
1309 if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) &&
1310 (m_flags & DICT_TF_MASK_ZIP_SSIZE))
1311 {
1312 switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1313 case 0U << DICT_TF_POS_ZIP_SSIZE:
1314 goto uncompressed;
1315 case 1U << DICT_TF_POS_ZIP_SSIZE:
1316 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=1";
1317 break;
1318 case 2U << DICT_TF_POS_ZIP_SSIZE:
1319 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=2";
1320 break;
1321 case 3U << DICT_TF_POS_ZIP_SSIZE:
1322 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=4";
1323 break;
1324 case 4U << DICT_TF_POS_ZIP_SSIZE:
1325 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8";
1326 break;
1327 case 5U << DICT_TF_POS_ZIP_SSIZE:
1328 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=16";
1329 break;
1330 default:
1331 msg= "strange KEY_BLOCK_SIZE";
1332 }
1333 }
1334 else if (m_flags & DICT_TF_MASK_ZIP_SSIZE)
1335 msg= "ROW_FORMAT=COMPRESSED";
1336 else
1337 goto uncompressed;
1338 }
1339 else
1340 {
1341 uncompressed:
1342 msg= (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) ? "ROW_FORMAT=DYNAMIC"
1343 : (m_flags & DICT_TF_MASK_COMPACT) ? "ROW_FORMAT=COMPACT"
1344 : "ROW_FORMAT=REDUNDANT";
1345 }
1346
1347 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1348 "Table flags don't match, server table has 0x%x and the meta-data "
1349 "file has 0x%zx; .cfg file uses %s",
1350 m_table->flags, m_flags, msg);
1351
1352 return DB_ERROR;
1353 }
1354
1355 /** Check if the table (and index) schema that was read from the .cfg file
1356 matches the in memory table definition.
1357 @param thd MySQL session variable
1358 @return DB_SUCCESS or error code. */
1359 dberr_t
match_schema(THD * thd)1360 row_import::match_schema(
1361 THD* thd) UNIV_NOTHROW
1362 {
1363 /* Do some simple checks. */
1364
1365 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1366
1367 /* If the number of indexes don't match then it is better
1368 to abort the IMPORT. It is easy for the user to create a
1369 table matching the IMPORT definition. */
1370
1371 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1372 "Number of indexes don't match, table has " ULINTPF
1373 " indexes but the tablespace meta-data file has "
1374 ULINTPF " indexes",
1375 UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1376
1377 return(DB_ERROR);
1378 }
1379
1380 dberr_t err = match_table_columns(thd);
1381
1382 if (err != DB_SUCCESS) {
1383 return(err);
1384 }
1385
1386 /* Check if the index definitions match. */
1387
1388 const dict_index_t* index;
1389
1390 for (index = UT_LIST_GET_FIRST(m_table->indexes);
1391 index != 0;
1392 index = UT_LIST_GET_NEXT(indexes, index)) {
1393
1394 dberr_t index_err;
1395
1396 index_err = match_index_columns(thd, index);
1397
1398 if (index_err != DB_SUCCESS) {
1399 err = index_err;
1400 }
1401 }
1402
1403 return(err);
1404 }
1405
1406 /**
1407 Set the index root <space, pageno>, using index name. */
1408 void
set_root_by_name()1409 row_import::set_root_by_name() UNIV_NOTHROW
1410 {
1411 row_index_t* cfg_index = m_indexes;
1412
1413 for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1414 dict_index_t* index;
1415
1416 const char* index_name;
1417
1418 index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1419
1420 index = dict_table_get_index_on_name(m_table, index_name);
1421
1422 /* We've already checked that it exists. */
1423 ut_a(index != 0);
1424
1425 index->page = static_cast<uint32_t>(cfg_index->m_page_no);
1426 }
1427 }
1428
1429 /**
1430 Set the index root <space, pageno>, using a heuristic.
1431 @return DB_SUCCESS or error code */
1432 dberr_t
set_root_by_heuristic()1433 row_import::set_root_by_heuristic() UNIV_NOTHROW
1434 {
1435 row_index_t* cfg_index = m_indexes;
1436
1437 ut_a(m_n_indexes > 0);
1438
1439 // TODO: For now use brute force, based on ordinality
1440
1441 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1442
1443 ib::warn() << "Table " << m_table->name << " should have "
1444 << UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1445 " the tablespace has " << m_n_indexes << " indexes";
1446 }
1447
1448 dict_mutex_enter_for_mysql();
1449
1450 ulint i = 0;
1451 dberr_t err = DB_SUCCESS;
1452
1453 for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1454 index != 0;
1455 index = UT_LIST_GET_NEXT(indexes, index)) {
1456
1457 if (index->type & DICT_FTS) {
1458 index->type |= DICT_CORRUPT;
1459 ib::warn() << "Skipping FTS index: " << index->name;
1460 } else if (i < m_n_indexes) {
1461
1462 UT_DELETE_ARRAY(cfg_index[i].m_name);
1463
1464 ulint len = strlen(index->name) + 1;
1465
1466 cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1467
1468 /* Trigger OOM */
1469 DBUG_EXECUTE_IF(
1470 "ib_import_OOM_14",
1471 UT_DELETE_ARRAY(cfg_index[i].m_name);
1472 cfg_index[i].m_name = NULL;
1473 );
1474
1475 if (cfg_index[i].m_name == NULL) {
1476 err = DB_OUT_OF_MEMORY;
1477 break;
1478 }
1479
1480 memcpy(cfg_index[i].m_name, index->name, len);
1481
1482 cfg_index[i].m_srv_index = index;
1483
1484 index->page = static_cast<uint32_t>(
1485 cfg_index[i++].m_page_no);
1486 }
1487 }
1488
1489 dict_mutex_exit_for_mysql();
1490
1491 return(err);
1492 }
1493
1494 /**
1495 Purge delete marked records.
1496 @return DB_SUCCESS or error code. */
1497 dberr_t
garbage_collect()1498 IndexPurge::garbage_collect() UNIV_NOTHROW
1499 {
1500 dberr_t err;
1501 ibool comp = dict_table_is_comp(m_index->table);
1502
1503 /* Open the persistent cursor and start the mini-transaction. */
1504
1505 open();
1506
1507 while ((err = next()) == DB_SUCCESS) {
1508
1509 rec_t* rec = btr_pcur_get_rec(&m_pcur);
1510 ibool deleted = rec_get_deleted_flag(rec, comp);
1511
1512 if (!deleted) {
1513 ++m_n_rows;
1514 } else {
1515 purge();
1516 }
1517 }
1518
1519 /* Close the persistent cursor and commit the mini-transaction. */
1520
1521 close();
1522
1523 return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1524 }
1525
1526 /**
1527 Begin import, position the cursor on the first record. */
1528 void
open()1529 IndexPurge::open() UNIV_NOTHROW
1530 {
1531 mtr_start(&m_mtr);
1532
1533 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1534
1535 btr_pcur_open_at_index_side(
1536 true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1537 btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1538 if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) {
1539 ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1540 /* Skip the metadata pseudo-record. */
1541 } else {
1542 btr_pcur_move_to_prev_on_page(&m_pcur);
1543 }
1544 }
1545
1546 /**
1547 Close the persistent curosr and commit the mini-transaction. */
1548 void
close()1549 IndexPurge::close() UNIV_NOTHROW
1550 {
1551 btr_pcur_close(&m_pcur);
1552 mtr_commit(&m_mtr);
1553 }
1554
1555 /**
1556 Position the cursor on the next record.
1557 @return DB_SUCCESS or error code */
1558 dberr_t
next()1559 IndexPurge::next() UNIV_NOTHROW
1560 {
1561 btr_pcur_move_to_next_on_page(&m_pcur);
1562
1563 /* When switching pages, commit the mini-transaction
1564 in order to release the latch on the old page. */
1565
1566 if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1567 return(DB_SUCCESS);
1568 } else if (trx_is_interrupted(m_trx)) {
1569 /* Check after every page because the check
1570 is expensive. */
1571 return(DB_INTERRUPTED);
1572 }
1573
1574 btr_pcur_store_position(&m_pcur, &m_mtr);
1575
1576 mtr_commit(&m_mtr);
1577
1578 mtr_start(&m_mtr);
1579
1580 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1581
1582 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1583 /* The following is based on btr_pcur_move_to_next_user_rec(). */
1584 m_pcur.old_stored = false;
1585 ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1586 do {
1587 if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1588 if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1589 return DB_END_OF_INDEX;
1590 }
1591
1592 buf_block_t* block = btr_pcur_get_block(&m_pcur);
1593 uint32_t next_page = btr_page_get_next(block->frame);
1594
1595 /* MDEV-13542 FIXME: Make these checks part of
1596 btr_pcur_move_to_next_page(), and introduce a
1597 return status that will be checked in all callers! */
1598 switch (next_page) {
1599 default:
1600 if (next_page != block->page.id().page_no()) {
1601 break;
1602 }
1603 /* MDEV-20931 FIXME: Check that
1604 next_page is within the tablespace
1605 bounds! Also check that it is not a
1606 change buffer bitmap page. */
1607 /* fall through */
1608 case 0:
1609 case 1:
1610 case FIL_NULL:
1611 return DB_CORRUPTION;
1612 }
1613
1614 dict_index_t* index = m_pcur.btr_cur.index;
1615 buf_block_t* next_block = btr_block_get(
1616 *index, next_page, BTR_MODIFY_LEAF, false,
1617 &m_mtr);
1618
1619 if (UNIV_UNLIKELY(!next_block
1620 || !fil_page_index_page_check(
1621 next_block->frame)
1622 || !!dict_index_is_spatial(index)
1623 != (fil_page_get_type(
1624 next_block->frame)
1625 == FIL_PAGE_RTREE)
1626 || page_is_comp(next_block->frame)
1627 != page_is_comp(block->frame)
1628 || btr_page_get_prev(
1629 next_block->frame)
1630 != block->page.id().page_no())) {
1631 return DB_CORRUPTION;
1632 }
1633
1634 btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1635
1636 page_cur_set_before_first(next_block,
1637 &m_pcur.btr_cur.page_cur);
1638
1639 ut_d(page_check_dir(next_block->frame));
1640 } else {
1641 btr_pcur_move_to_next_on_page(&m_pcur);
1642 }
1643 } while (!btr_pcur_is_on_user_rec(&m_pcur));
1644
1645 return DB_SUCCESS;
1646 }
1647
1648 /**
1649 Store the persistent cursor position and reopen the
1650 B-tree cursor in BTR_MODIFY_TREE mode, because the
1651 tree structure may be changed during a pessimistic delete. */
1652 void
purge_pessimistic_delete()1653 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1654 {
1655 dberr_t err;
1656
1657 btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1658 &m_pcur, &m_mtr);
1659
1660 ut_ad(rec_get_deleted_flag(
1661 btr_pcur_get_rec(&m_pcur),
1662 dict_table_is_comp(m_index->table)));
1663
1664 btr_cur_pessimistic_delete(
1665 &err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1666
1667 ut_a(err == DB_SUCCESS);
1668
1669 /* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1670 mtr_commit(&m_mtr);
1671 }
1672
1673 /**
1674 Purge delete-marked records. */
1675 void
purge()1676 IndexPurge::purge() UNIV_NOTHROW
1677 {
1678 btr_pcur_store_position(&m_pcur, &m_mtr);
1679
1680 purge_pessimistic_delete();
1681
1682 mtr_start(&m_mtr);
1683
1684 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1685
1686 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1687 }
1688
1689 /** Adjust the BLOB reference for a single column that is externally stored
1690 @param rec record to update
1691 @param offsets column offsets for the record
1692 @param i column ordinal value
1693 @return DB_SUCCESS or error code */
1694 inline
1695 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1696 PageConverter::adjust_cluster_index_blob_column(
1697 rec_t* rec,
1698 const rec_offs* offsets,
1699 ulint i) UNIV_NOTHROW
1700 {
1701 ulint len;
1702 byte* field;
1703
1704 field = rec_get_nth_field(rec, offsets, i, &len);
1705
1706 DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1707 len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1708
1709 if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1710
1711 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1712 ER_INNODB_INDEX_CORRUPT,
1713 "Externally stored column(" ULINTPF
1714 ") has a reference length of " ULINTPF
1715 " in the cluster index %s",
1716 i, len, m_cluster_index->name());
1717
1718 return(DB_CORRUPTION);
1719 }
1720
1721 field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1722
1723 mach_write_to_4(field, get_space_id());
1724
1725 if (UNIV_LIKELY_NULL(m_rec_iter.current_block()->page.zip.data)) {
1726 page_zip_write_blob_ptr(
1727 m_rec_iter.current_block(), rec, m_cluster_index,
1728 offsets, i, &m_rec_iter.m_mtr);
1729 }
1730
1731 return(DB_SUCCESS);
1732 }
1733
1734 /** Adjusts the BLOB reference in the clustered index row for all externally
1735 stored columns.
1736 @param rec record to update
1737 @param offsets column offsets for the record
1738 @return DB_SUCCESS or error code */
1739 inline
1740 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1741 PageConverter::adjust_cluster_index_blob_columns(
1742 rec_t* rec,
1743 const rec_offs* offsets) UNIV_NOTHROW
1744 {
1745 ut_ad(rec_offs_any_extern(offsets));
1746
1747 /* Adjust the space_id in the BLOB pointers. */
1748
1749 for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1750
1751 /* Only if the column is stored "externally". */
1752
1753 if (rec_offs_nth_extern(offsets, i)) {
1754 dberr_t err;
1755
1756 err = adjust_cluster_index_blob_column(rec, offsets, i);
1757
1758 if (err != DB_SUCCESS) {
1759 return(err);
1760 }
1761 }
1762 }
1763
1764 return(DB_SUCCESS);
1765 }
1766
1767 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1768 BLOB reference, write the new space id.
1769 @param rec record to update
1770 @param offsets column offsets for the record
1771 @return DB_SUCCESS or error code */
1772 inline
1773 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1774 PageConverter::adjust_cluster_index_blob_ref(
1775 rec_t* rec,
1776 const rec_offs* offsets) UNIV_NOTHROW
1777 {
1778 if (rec_offs_any_extern(offsets)) {
1779 dberr_t err;
1780
1781 err = adjust_cluster_index_blob_columns(rec, offsets);
1782
1783 if (err != DB_SUCCESS) {
1784 return(err);
1785 }
1786 }
1787
1788 return(DB_SUCCESS);
1789 }
1790
1791 /** Purge delete-marked records, only if it is possible to do so without
1792 re-organising the B+tree.
1793 @return true if purge succeeded */
purge()1794 inline bool PageConverter::purge() UNIV_NOTHROW
1795 {
1796 const dict_index_t* index = m_index->m_srv_index;
1797
1798 /* We can't have a page that is empty and not root. */
1799 if (m_rec_iter.remove(index, m_offsets)) {
1800
1801 ++m_index->m_stats.m_n_purged;
1802
1803 return(true);
1804 } else {
1805 ++m_index->m_stats.m_n_purge_failed;
1806 }
1807
1808 return(false);
1809 }
1810
1811 /** Adjust the BLOB references and sys fields for the current record.
1812 @param rec record to update
1813 @param offsets column offsets for the record
1814 @return DB_SUCCESS or error code. */
1815 inline
1816 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1817 PageConverter::adjust_cluster_record(
1818 rec_t* rec,
1819 const rec_offs* offsets) UNIV_NOTHROW
1820 {
1821 dberr_t err;
1822
1823 if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1824
1825 /* Reset DB_TRX_ID and DB_ROLL_PTR. Normally, these fields
1826 are only written in conjunction with other changes to the
1827 record. */
1828 ulint trx_id_pos = m_cluster_index->n_uniq
1829 ? m_cluster_index->n_uniq : 1;
1830 if (UNIV_LIKELY_NULL(m_rec_iter.current_block()
1831 ->page.zip.data)) {
1832 page_zip_write_trx_id_and_roll_ptr(
1833 m_rec_iter.current_block(),
1834 rec, m_offsets, trx_id_pos,
1835 0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1836 &m_rec_iter.m_mtr);
1837 } else {
1838 ulint len;
1839 byte* ptr = rec_get_nth_field(
1840 rec, m_offsets, trx_id_pos, &len);
1841 ut_ad(len == DATA_TRX_ID_LEN);
1842 memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1843 }
1844 }
1845
1846 return(err);
1847 }
1848
1849 /** Update the BLOB refrences and write UNDO log entries for
1850 rows that can't be purged optimistically.
1851 @param block block to update
1852 @retval DB_SUCCESS or error code */
1853 inline
1854 dberr_t
update_records(buf_block_t * block)1855 PageConverter::update_records(
1856 buf_block_t* block) UNIV_NOTHROW
1857 {
1858 ibool comp = dict_table_is_comp(m_cfg->m_table);
1859 bool clust_index = m_index->m_srv_index == m_cluster_index;
1860
1861 /* This will also position the cursor on the first user record. */
1862
1863 m_rec_iter.open(block);
1864
1865 while (!m_rec_iter.end()) {
1866 rec_t* rec = m_rec_iter.current();
1867 ibool deleted = rec_get_deleted_flag(rec, comp);
1868
1869 /* For the clustered index we have to adjust the BLOB
1870 reference and the system fields irrespective of the
1871 delete marked flag. The adjustment of delete marked
1872 cluster records is required for purge to work later. */
1873
1874 if (deleted || clust_index) {
1875 m_offsets = rec_get_offsets(
1876 rec, m_index->m_srv_index, m_offsets,
1877 m_index->m_srv_index->n_core_fields,
1878 ULINT_UNDEFINED, &m_heap);
1879 }
1880
1881 if (clust_index) {
1882
1883 dberr_t err = adjust_cluster_record(rec, m_offsets);
1884
1885 if (err != DB_SUCCESS) {
1886 return(err);
1887 }
1888 }
1889
1890 /* If it is a delete marked record then try an
1891 optimistic delete. */
1892
1893 if (deleted) {
1894 /* A successful purge will move the cursor to the
1895 next record. */
1896
1897 if (!purge()) {
1898 m_rec_iter.next();
1899 }
1900
1901 ++m_index->m_stats.m_n_deleted;
1902 } else {
1903 ++m_index->m_stats.m_n_rows;
1904 m_rec_iter.next();
1905 }
1906 }
1907
1908 return(DB_SUCCESS);
1909 }
1910
1911 /** Update the space, index id, trx id.
1912 @return DB_SUCCESS or error code */
1913 inline
1914 dberr_t
update_index_page(buf_block_t * block)1915 PageConverter::update_index_page(
1916 buf_block_t* block) UNIV_NOTHROW
1917 {
1918 const page_id_t page_id(block->page.id());
1919
1920 if (is_free(page_id.page_no())) {
1921 return(DB_SUCCESS);
1922 }
1923
1924 buf_frame_t* page = block->frame;
1925 const index_id_t id = btr_page_get_index_id(page);
1926
1927 if (id != m_index->m_id) {
1928 row_index_t* index = find_index(id);
1929
1930 if (UNIV_UNLIKELY(!index)) {
1931 if (!m_cfg->m_missing) {
1932 ib::warn() << "Unknown index id " << id
1933 << " on page " << page_id.page_no();
1934 }
1935 return DB_SUCCESS;
1936 }
1937
1938 m_index = index;
1939 }
1940
1941 /* If the .cfg file is missing and there is an index mismatch
1942 then ignore the error. */
1943 if (m_cfg->m_missing && !m_index->m_srv_index) {
1944 return(DB_SUCCESS);
1945 }
1946
1947 if (m_index && page_id.page_no() == m_index->m_page_no) {
1948 byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1949 + page;
1950 mach_write_to_4(b, page_id.space());
1951
1952 memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1953 + page, b, 4);
1954 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1955 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1956 + PAGE_BTR_SEG_TOP
1957 + FSEG_HDR_SPACE], b, 4);
1958 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1959 + PAGE_BTR_SEG_LEAF
1960 + FSEG_HDR_SPACE], b, 4);
1961 }
1962 }
1963
1964 #ifdef UNIV_ZIP_DEBUG
1965 ut_a(!block->page.zip.data || page_zip_validate(&block->page.zip, page,
1966 m_index->m_srv_index));
1967 #endif /* UNIV_ZIP_DEBUG */
1968
1969 /* This has to be written to uncompressed index header. Set it to
1970 the current index id. */
1971 mach_write_to_8(page + (PAGE_HEADER + PAGE_INDEX_ID),
1972 m_index->m_srv_index->id);
1973 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1974 memcpy(&block->page.zip.data[PAGE_HEADER + PAGE_INDEX_ID],
1975 &block->frame[PAGE_HEADER + PAGE_INDEX_ID], 8);
1976 }
1977
1978 if (m_index->m_srv_index->is_clust()) {
1979 if (page_id.page_no() != m_index->m_srv_index->page) {
1980 goto clear_page_max_trx_id;
1981 }
1982 } else if (page_is_leaf(page)) {
1983 /* Set PAGE_MAX_TRX_ID on secondary index leaf pages. */
1984 mach_write_to_8(&block->frame[PAGE_HEADER + PAGE_MAX_TRX_ID],
1985 m_trx->id);
1986 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1987 memcpy_aligned<8>(&block->page.zip.data
1988 [PAGE_HEADER + PAGE_MAX_TRX_ID],
1989 &block->frame
1990 [PAGE_HEADER + PAGE_MAX_TRX_ID], 8);
1991 }
1992 } else {
1993 clear_page_max_trx_id:
1994 /* Clear PAGE_MAX_TRX_ID so that it can be
1995 used for other purposes in the future. IMPORT
1996 in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1997 would set the field to the transaction ID even
1998 on clustered index pages. */
1999 memset_aligned<8>(&block->frame[PAGE_HEADER + PAGE_MAX_TRX_ID],
2000 0, 8);
2001 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
2002 memset_aligned<8>(&block->page.zip.data
2003 [PAGE_HEADER + PAGE_MAX_TRX_ID],
2004 0, 8);
2005 }
2006 }
2007
2008 if (page_is_empty(page)) {
2009
2010 /* Only a root page can be empty. */
2011 if (page_has_siblings(page)) {
2012 // TODO: We should relax this and skip secondary
2013 // indexes. Mark them as corrupt because they can
2014 // always be rebuilt.
2015 return(DB_CORRUPTION);
2016 }
2017
2018 return(DB_SUCCESS);
2019 }
2020
2021 return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
2022 }
2023
2024 /** Validate the space flags and update tablespace header page.
2025 @param block block read from file, not from the buffer pool.
2026 @retval DB_SUCCESS or error code */
update_header(buf_block_t * block)2027 inline dberr_t PageConverter::update_header(buf_block_t* block) UNIV_NOTHROW
2028 {
2029 byte *frame= get_frame(block);
2030 if (memcmp_aligned<2>(FIL_PAGE_SPACE_ID + frame,
2031 FSP_HEADER_OFFSET + FSP_SPACE_ID + frame, 4))
2032 ib::warn() << "Space id check in the header failed: ignored";
2033 else if (!mach_read_from_4(FIL_PAGE_SPACE_ID + frame))
2034 return DB_CORRUPTION;
2035
2036 memset(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0, 8);
2037
2038 /* Write space_id to the tablespace header, page 0. */
2039 mach_write_to_4(FIL_PAGE_SPACE_ID + frame, get_space_id());
2040 memcpy_aligned<2>(FSP_HEADER_OFFSET + FSP_SPACE_ID + frame,
2041 FIL_PAGE_SPACE_ID + frame, 4);
2042 /* Write back the adjusted flags. */
2043 mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + frame, m_space_flags);
2044
2045 return DB_SUCCESS;
2046 }
2047
2048 /** Update the page, set the space id, max trx id and index id.
2049 @param block block read from file
2050 @retval DB_SUCCESS or error code */
2051 inline
2052 dberr_t
update_page(buf_block_t * block,uint16_t & page_type)2053 PageConverter::update_page(buf_block_t* block, uint16_t& page_type)
2054 UNIV_NOTHROW
2055 {
2056 dberr_t err = DB_SUCCESS;
2057
2058 ut_ad(!block->page.zip.data == !is_compressed_table());
2059
2060 switch (page_type = fil_page_get_type(get_frame(block))) {
2061 case FIL_PAGE_TYPE_FSP_HDR:
2062 ut_a(block->page.id().page_no() == 0);
2063 /* Work directly on the uncompressed page headers. */
2064 return(update_header(block));
2065
2066 case FIL_PAGE_INDEX:
2067 case FIL_PAGE_RTREE:
2068 /* We need to decompress the contents into block->frame
2069 before we can do any thing with Btree pages. */
2070
2071 if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2072 return(DB_CORRUPTION);
2073 }
2074
2075 /* fall through */
2076 case FIL_PAGE_TYPE_INSTANT:
2077 /* This is on every page in the tablespace. */
2078 mach_write_to_4(
2079 get_frame(block)
2080 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2081
2082 /* Only update the Btree nodes. */
2083 return(update_index_page(block));
2084
2085 case FIL_PAGE_TYPE_SYS:
2086 /* This is page 0 in the system tablespace. */
2087 return(DB_CORRUPTION);
2088
2089 case FIL_PAGE_TYPE_XDES:
2090 err = set_current_xdes(
2091 block->page.id().page_no(), get_frame(block));
2092 /* fall through */
2093 case FIL_PAGE_INODE:
2094 case FIL_PAGE_TYPE_TRX_SYS:
2095 case FIL_PAGE_IBUF_FREE_LIST:
2096 case FIL_PAGE_TYPE_ALLOCATED:
2097 case FIL_PAGE_IBUF_BITMAP:
2098 case FIL_PAGE_TYPE_BLOB:
2099 case FIL_PAGE_TYPE_ZBLOB:
2100 case FIL_PAGE_TYPE_ZBLOB2:
2101
2102 /* Work directly on the uncompressed page headers. */
2103 /* This is on every page in the tablespace. */
2104 mach_write_to_4(
2105 get_frame(block)
2106 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2107
2108 return(err);
2109 }
2110
2111 ib::warn() << "Unknown page type (" << page_type << ")";
2112
2113 return(DB_CORRUPTION);
2114 }
2115
2116 /** Called for every page in the tablespace. If the page was not
2117 updated then its state must be set to BUF_PAGE_NOT_USED.
2118 @param block block read from file, note it is not from the buffer pool
2119 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2120 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2121 {
2122 /* If we already had an old page with matching number
2123 in the buffer pool, evict it now, because
2124 we no longer evict the pages on DISCARD TABLESPACE. */
2125 buf_page_get_gen(block->page.id(), get_zip_size(),
2126 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2127 __FILE__, __LINE__, NULL, NULL);
2128
2129 uint16_t page_type;
2130
2131 if (dberr_t err = update_page(block, page_type)) {
2132 return err;
2133 }
2134
2135 const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
2136 byte* frame = get_frame(block);
2137 memset_aligned<8>(frame + FIL_PAGE_LSN, 0, 8);
2138
2139 if (!block->page.zip.data) {
2140 buf_flush_init_for_writing(
2141 NULL, block->frame, NULL, full_crc32);
2142 } else if (fil_page_type_is_index(page_type)) {
2143 buf_flush_init_for_writing(
2144 NULL, block->page.zip.data, &block->page.zip,
2145 full_crc32);
2146 } else {
2147 /* Calculate and update the checksum of non-index
2148 pages for ROW_FORMAT=COMPRESSED tables. */
2149 buf_flush_update_zip_checksum(
2150 block->page.zip.data, block->zip_size());
2151 }
2152
2153 return DB_SUCCESS;
2154 }
2155
2156 /*****************************************************************//**
2157 Clean up after import tablespace. */
2158 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2159 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2160 row_import_cleanup(
2161 /*===============*/
2162 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2163 trx_t* trx, /*!< in/out: transaction for import */
2164 dberr_t err) /*!< in: error code */
2165 {
2166 ut_a(prebuilt->trx != trx);
2167
2168 if (err != DB_SUCCESS) {
2169 dict_table_t* table = prebuilt->table;
2170 table->file_unreadable = true;
2171 if (table->space) {
2172 fil_close_tablespace(table->space_id);
2173 table->space = NULL;
2174 }
2175
2176 prebuilt->trx->error_info = NULL;
2177
2178 ib::info() << "Discarding tablespace of table "
2179 << table->name << ": " << err;
2180
2181 if (!trx->dict_operation_lock_mode) {
2182 row_mysql_lock_data_dictionary(trx);
2183 }
2184
2185 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2186 index;
2187 index = UT_LIST_GET_NEXT(indexes, index)) {
2188 index->page = FIL_NULL;
2189 }
2190 }
2191
2192 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2193
2194 DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2195
2196 trx_commit_for_mysql(trx);
2197
2198 row_mysql_unlock_data_dictionary(trx);
2199
2200 trx->free();
2201
2202 prebuilt->trx->op_info = "";
2203
2204 DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2205
2206 return(err);
2207 }
2208
2209 /*****************************************************************//**
2210 Report error during tablespace import. */
2211 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2212 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2213 row_import_error(
2214 /*=============*/
2215 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2216 trx_t* trx, /*!< in/out: transaction for import */
2217 dberr_t err) /*!< in: error code */
2218 {
2219 if (!trx_is_interrupted(trx)) {
2220 char table_name[MAX_FULL_NAME_LEN + 1];
2221
2222 innobase_format_name(
2223 table_name, sizeof(table_name),
2224 prebuilt->table->name.m_name);
2225
2226 ib_senderrf(
2227 trx->mysql_thd, IB_LOG_LEVEL_WARN,
2228 ER_INNODB_IMPORT_ERROR,
2229 table_name, (ulong) err, ut_strerr(err));
2230 }
2231
2232 return(row_import_cleanup(prebuilt, trx, err));
2233 }
2234
2235 /*****************************************************************//**
2236 Adjust the root page index node and leaf node segment headers, update
2237 with the new space id. For all the table's secondary indexes.
2238 @return error code */
2239 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2240 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2241 row_import_adjust_root_pages_of_secondary_indexes(
2242 /*==============================================*/
2243 trx_t* trx, /*!< in: transaction used for
2244 the import */
2245 dict_table_t* table, /*!< in: table the indexes
2246 belong to */
2247 const row_import& cfg) /*!< Import context */
2248 {
2249 dict_index_t* index;
2250 ulint n_rows_in_table;
2251 dberr_t err = DB_SUCCESS;
2252
2253 /* Skip the clustered index. */
2254 index = dict_table_get_first_index(table);
2255
2256 n_rows_in_table = cfg.get_n_rows(index->name);
2257
2258 DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2259 n_rows_in_table++;);
2260
2261 /* Adjust the root pages of the secondary indexes only. */
2262 while ((index = dict_table_get_next_index(index)) != NULL) {
2263 ut_a(!dict_index_is_clust(index));
2264
2265 if (!(index->type & DICT_CORRUPT)
2266 && index->page != FIL_NULL) {
2267
2268 /* Update the Btree segment headers for index node and
2269 leaf nodes in the root page. Set the new space id. */
2270
2271 err = btr_root_adjust_on_import(index);
2272 } else {
2273 ib::warn() << "Skip adjustment of root pages for"
2274 " index " << index->name << ".";
2275
2276 err = DB_CORRUPTION;
2277 }
2278
2279 if (err != DB_SUCCESS) {
2280
2281 if (index->type & DICT_CLUSTERED) {
2282 break;
2283 }
2284
2285 ib_errf(trx->mysql_thd,
2286 IB_LOG_LEVEL_WARN,
2287 ER_INNODB_INDEX_CORRUPT,
2288 "Index %s not found or corrupt,"
2289 " you should recreate this index.",
2290 index->name());
2291
2292 /* Do not bail out, so that the data
2293 can be recovered. */
2294
2295 err = DB_SUCCESS;
2296 index->type |= DICT_CORRUPT;
2297 continue;
2298 }
2299
2300 /* If we failed to purge any records in the index then
2301 do it the hard way.
2302
2303 TODO: We can do this in the first pass by generating UNDO log
2304 records for the failed rows. */
2305
2306 if (!cfg.requires_purge(index->name)) {
2307 continue;
2308 }
2309
2310 IndexPurge purge(trx, index);
2311
2312 trx->op_info = "secondary: purge delete marked records";
2313
2314 err = purge.garbage_collect();
2315
2316 trx->op_info = "";
2317
2318 if (err != DB_SUCCESS) {
2319 break;
2320 } else if (purge.get_n_rows() != n_rows_in_table) {
2321
2322 ib_errf(trx->mysql_thd,
2323 IB_LOG_LEVEL_WARN,
2324 ER_INNODB_INDEX_CORRUPT,
2325 "Index '%s' contains " ULINTPF " entries, "
2326 "should be " ULINTPF ", you should recreate "
2327 "this index.", index->name(),
2328 purge.get_n_rows(), n_rows_in_table);
2329
2330 index->type |= DICT_CORRUPT;
2331
2332 /* Do not bail out, so that the data
2333 can be recovered. */
2334
2335 err = DB_SUCCESS;
2336 }
2337 }
2338
2339 return(err);
2340 }
2341
2342 /*****************************************************************//**
2343 Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */
2344 MY_ATTRIBUTE((nonnull)) static
2345 void
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2346 row_import_set_sys_max_row_id(
2347 /*==========================*/
2348 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
2349 handler */
2350 const dict_table_t* table) /*!< in: table to import */
2351 {
2352 const rec_t* rec;
2353 mtr_t mtr;
2354 btr_pcur_t pcur;
2355 row_id_t row_id = 0;
2356 dict_index_t* index;
2357
2358 index = dict_table_get_first_index(table);
2359 ut_ad(index->is_primary());
2360 ut_ad(dict_index_is_auto_gen_clust(index));
2361
2362 mtr_start(&mtr);
2363
2364 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2365
2366 btr_pcur_open_at_index_side(
2367 false, // High end
2368 index,
2369 BTR_SEARCH_LEAF,
2370 &pcur,
2371 true, // Init cursor
2372 0, // Leaf level
2373 &mtr);
2374
2375 btr_pcur_move_to_prev_on_page(&pcur);
2376 rec = btr_pcur_get_rec(&pcur);
2377
2378 /* Check for empty table. */
2379 if (page_rec_is_infimum(rec)) {
2380 /* The table is empty. */
2381 } else if (rec_is_metadata(rec, *index)) {
2382 /* The clustered index contains the metadata record only,
2383 that is, the table is empty. */
2384 } else {
2385 row_id = mach_read_from_6(rec);
2386 }
2387
2388 btr_pcur_close(&pcur);
2389 mtr_commit(&mtr);
2390
2391 if (row_id) {
2392 /* Update the system row id if the imported index row id is
2393 greater than the max system row id. */
2394
2395 mutex_enter(&dict_sys.mutex);
2396
2397 if (row_id >= dict_sys.row_id) {
2398 dict_sys.row_id = row_id + 1;
2399 dict_hdr_flush_row_id();
2400 }
2401
2402 mutex_exit(&dict_sys.mutex);
2403 }
2404 }
2405
2406 /*****************************************************************//**
2407 Read the a string from the meta data file.
2408 @return DB_SUCCESS or error code. */
2409 static
2410 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2411 row_import_cfg_read_string(
2412 /*=======================*/
2413 FILE* file, /*!< in/out: File to read from */
2414 byte* ptr, /*!< out: string to read */
2415 ulint max_len) /*!< in: maximum length of the output
2416 buffer in bytes */
2417 {
2418 DBUG_EXECUTE_IF("ib_import_string_read_error",
2419 errno = EINVAL; return(DB_IO_ERROR););
2420
2421 ulint len = 0;
2422
2423 while (!feof(file)) {
2424 int ch = fgetc(file);
2425
2426 if (ch == EOF) {
2427 break;
2428 } else if (ch != 0) {
2429 if (len < max_len) {
2430 ptr[len++] = static_cast<byte>(ch);
2431 } else {
2432 break;
2433 }
2434 /* max_len includes the NUL byte */
2435 } else if (len != max_len - 1) {
2436 break;
2437 } else {
2438 ptr[len] = 0;
2439 return(DB_SUCCESS);
2440 }
2441 }
2442
2443 errno = EINVAL;
2444
2445 return(DB_IO_ERROR);
2446 }
2447
2448 /*********************************************************************//**
2449 Write the meta data (index user fields) config file.
2450 @return DB_SUCCESS or error code. */
2451 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2452 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2453 row_import_cfg_read_index_fields(
2454 /*=============================*/
2455 FILE* file, /*!< in: file to write to */
2456 THD* thd, /*!< in/out: session */
2457 row_index_t* index) /*!< Index being read in */
2458 {
2459 byte row[sizeof(ib_uint32_t) * 3];
2460 ulint n_fields = index->m_n_fields;
2461
2462 index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2463
2464 /* Trigger OOM */
2465 DBUG_EXECUTE_IF(
2466 "ib_import_OOM_4",
2467 UT_DELETE_ARRAY(index->m_fields);
2468 index->m_fields = NULL;
2469 );
2470
2471 if (index->m_fields == NULL) {
2472 return(DB_OUT_OF_MEMORY);
2473 }
2474
2475 dict_field_t* field = index->m_fields;
2476
2477 for (ulint i = 0; i < n_fields; ++i, ++field) {
2478 byte* ptr = row;
2479
2480 /* Trigger EOF */
2481 DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2482 (void) fseek(file, 0L, SEEK_END););
2483
2484 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2485
2486 ib_senderrf(
2487 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2488 (ulong) errno, strerror(errno),
2489 "while reading index fields.");
2490
2491 return(DB_IO_ERROR);
2492 }
2493
2494 new (field) dict_field_t();
2495
2496 field->prefix_len = mach_read_from_4(ptr) & ((1U << 12) - 1);
2497 ptr += sizeof(ib_uint32_t);
2498
2499 field->fixed_len = mach_read_from_4(ptr) & ((1U << 10) - 1);
2500 ptr += sizeof(ib_uint32_t);
2501
2502 /* Include the NUL byte in the length. */
2503 ulint len = mach_read_from_4(ptr);
2504
2505 byte* name = UT_NEW_ARRAY_NOKEY(byte, len);
2506
2507 /* Trigger OOM */
2508 DBUG_EXECUTE_IF(
2509 "ib_import_OOM_5",
2510 UT_DELETE_ARRAY(name);
2511 name = NULL;
2512 );
2513
2514 if (name == NULL) {
2515 return(DB_OUT_OF_MEMORY);
2516 }
2517
2518 field->name = reinterpret_cast<const char*>(name);
2519
2520 dberr_t err = row_import_cfg_read_string(file, name, len);
2521
2522 if (err != DB_SUCCESS) {
2523
2524 ib_senderrf(
2525 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2526 (ulong) errno, strerror(errno),
2527 "while parsing table name.");
2528
2529 return(err);
2530 }
2531 }
2532
2533 return(DB_SUCCESS);
2534 }
2535
2536 /*****************************************************************//**
2537 Read the index names and root page numbers of the indexes and set the values.
2538 Row format [root_page_no, len of str, str ... ]
2539 @return DB_SUCCESS or error code. */
2540 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2541 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2542 row_import_read_index_data(
2543 /*=======================*/
2544 FILE* file, /*!< in: File to read from */
2545 THD* thd, /*!< in: session */
2546 row_import* cfg) /*!< in/out: meta-data read */
2547 {
2548 byte* ptr;
2549 row_index_t* cfg_index;
2550 byte row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2551
2552 /* FIXME: What is the max value? */
2553 ut_a(cfg->m_n_indexes > 0);
2554 ut_a(cfg->m_n_indexes < 1024);
2555
2556 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2557
2558 /* Trigger OOM */
2559 DBUG_EXECUTE_IF(
2560 "ib_import_OOM_6",
2561 UT_DELETE_ARRAY(cfg->m_indexes);
2562 cfg->m_indexes = NULL;
2563 );
2564
2565 if (cfg->m_indexes == NULL) {
2566 return(DB_OUT_OF_MEMORY);
2567 }
2568
2569 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2570
2571 cfg_index = cfg->m_indexes;
2572
2573 for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2574 /* Trigger EOF */
2575 DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2576 (void) fseek(file, 0L, SEEK_END););
2577
2578 /* Read the index data. */
2579 size_t n_bytes = fread(row, 1, sizeof(row), file);
2580
2581 /* Trigger EOF */
2582 DBUG_EXECUTE_IF("ib_import_io_read_error",
2583 (void) fseek(file, 0L, SEEK_END););
2584
2585 if (n_bytes != sizeof(row)) {
2586 char msg[BUFSIZ];
2587
2588 snprintf(msg, sizeof(msg),
2589 "while reading index meta-data, expected "
2590 "to read " ULINTPF
2591 " bytes but read only " ULINTPF " bytes",
2592 sizeof(row), n_bytes);
2593
2594 ib_senderrf(
2595 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2596 (ulong) errno, strerror(errno), msg);
2597
2598 ib::error() << "IO Error: " << msg;
2599
2600 return(DB_IO_ERROR);
2601 }
2602
2603 ptr = row;
2604
2605 cfg_index->m_id = mach_read_from_8(ptr);
2606 ptr += sizeof(index_id_t);
2607
2608 cfg_index->m_space = mach_read_from_4(ptr);
2609 ptr += sizeof(ib_uint32_t);
2610
2611 cfg_index->m_page_no = mach_read_from_4(ptr);
2612 ptr += sizeof(ib_uint32_t);
2613
2614 cfg_index->m_type = mach_read_from_4(ptr);
2615 ptr += sizeof(ib_uint32_t);
2616
2617 cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2618 if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2619 ut_ad(0);
2620 /* Overflow. Pretend that the clustered index
2621 has a variable-length PRIMARY KEY. */
2622 cfg_index->m_trx_id_offset = 0;
2623 }
2624 ptr += sizeof(ib_uint32_t);
2625
2626 cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2627 ptr += sizeof(ib_uint32_t);
2628
2629 cfg_index->m_n_uniq = mach_read_from_4(ptr);
2630 ptr += sizeof(ib_uint32_t);
2631
2632 cfg_index->m_n_nullable = mach_read_from_4(ptr);
2633 ptr += sizeof(ib_uint32_t);
2634
2635 cfg_index->m_n_fields = mach_read_from_4(ptr);
2636 ptr += sizeof(ib_uint32_t);
2637
2638 /* The NUL byte is included in the name length. */
2639 ulint len = mach_read_from_4(ptr);
2640
2641 if (len > OS_FILE_MAX_PATH) {
2642 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2643 ER_INNODB_INDEX_CORRUPT,
2644 "Index name length (" ULINTPF ") is too long, "
2645 "the meta-data is corrupt", len);
2646
2647 return(DB_CORRUPTION);
2648 }
2649
2650 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2651
2652 /* Trigger OOM */
2653 DBUG_EXECUTE_IF(
2654 "ib_import_OOM_7",
2655 UT_DELETE_ARRAY(cfg_index->m_name);
2656 cfg_index->m_name = NULL;
2657 );
2658
2659 if (cfg_index->m_name == NULL) {
2660 return(DB_OUT_OF_MEMORY);
2661 }
2662
2663 dberr_t err;
2664
2665 err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2666
2667 if (err != DB_SUCCESS) {
2668
2669 ib_senderrf(
2670 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2671 (ulong) errno, strerror(errno),
2672 "while parsing index name.");
2673
2674 return(err);
2675 }
2676
2677 err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2678
2679 if (err != DB_SUCCESS) {
2680 return(err);
2681 }
2682
2683 }
2684
2685 return(DB_SUCCESS);
2686 }
2687
2688 /*****************************************************************//**
2689 Set the index root page number for v1 format.
2690 @return DB_SUCCESS or error code. */
2691 static
2692 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2693 row_import_read_indexes(
2694 /*====================*/
2695 FILE* file, /*!< in: File to read from */
2696 THD* thd, /*!< in: session */
2697 row_import* cfg) /*!< in/out: meta-data read */
2698 {
2699 byte row[sizeof(ib_uint32_t)];
2700
2701 /* Trigger EOF */
2702 DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2703 (void) fseek(file, 0L, SEEK_END););
2704
2705 /* Read the number of indexes. */
2706 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2707 ib_senderrf(
2708 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2709 (ulong) errno, strerror(errno),
2710 "while reading number of indexes.");
2711
2712 return(DB_IO_ERROR);
2713 }
2714
2715 cfg->m_n_indexes = mach_read_from_4(row);
2716
2717 if (cfg->m_n_indexes == 0) {
2718 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2719 "Number of indexes in meta-data file is 0");
2720
2721 return(DB_CORRUPTION);
2722
2723 } else if (cfg->m_n_indexes > 1024) {
2724 // FIXME: What is the upper limit? */
2725 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2726 "Number of indexes in meta-data file is too high: "
2727 ULINTPF, cfg->m_n_indexes);
2728 cfg->m_n_indexes = 0;
2729
2730 return(DB_CORRUPTION);
2731 }
2732
2733 return(row_import_read_index_data(file, thd, cfg));
2734 }
2735
2736 /*********************************************************************//**
2737 Read the meta data (table columns) config file. Deserialise the contents of
2738 dict_col_t structure, along with the column name. */
2739 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2740 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2741 row_import_read_columns(
2742 /*====================*/
2743 FILE* file, /*!< in: file to write to */
2744 THD* thd, /*!< in/out: session */
2745 row_import* cfg) /*!< in/out: meta-data read */
2746 {
2747 dict_col_t* col;
2748 byte row[sizeof(ib_uint32_t) * 8];
2749
2750 /* FIXME: What should the upper limit be? */
2751 ut_a(cfg->m_n_cols > 0);
2752 ut_a(cfg->m_n_cols < 1024);
2753
2754 cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2755
2756 /* Trigger OOM */
2757 DBUG_EXECUTE_IF(
2758 "ib_import_OOM_8",
2759 UT_DELETE_ARRAY(cfg->m_cols);
2760 cfg->m_cols = NULL;
2761 );
2762
2763 if (cfg->m_cols == NULL) {
2764 return(DB_OUT_OF_MEMORY);
2765 }
2766
2767 cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2768
2769 /* Trigger OOM */
2770 DBUG_EXECUTE_IF(
2771 "ib_import_OOM_9",
2772 UT_DELETE_ARRAY(cfg->m_col_names);
2773 cfg->m_col_names = NULL;
2774 );
2775
2776 if (cfg->m_col_names == NULL) {
2777 return(DB_OUT_OF_MEMORY);
2778 }
2779
2780 memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2781 memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2782
2783 col = cfg->m_cols;
2784
2785 for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2786 byte* ptr = row;
2787
2788 /* Trigger EOF */
2789 DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2790 (void) fseek(file, 0L, SEEK_END););
2791
2792 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2793 ib_senderrf(
2794 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2795 (ulong) errno, strerror(errno),
2796 "while reading table column meta-data.");
2797
2798 return(DB_IO_ERROR);
2799 }
2800
2801 col->prtype = mach_read_from_4(ptr);
2802 ptr += sizeof(ib_uint32_t);
2803
2804 col->mtype = static_cast<byte>(mach_read_from_4(ptr));
2805 ptr += sizeof(ib_uint32_t);
2806
2807 col->len = static_cast<uint16_t>(mach_read_from_4(ptr));
2808 ptr += sizeof(ib_uint32_t);
2809
2810 uint32_t mbminmaxlen = mach_read_from_4(ptr);
2811 col->mbmaxlen = (mbminmaxlen / 5) & 7;
2812 col->mbminlen = (mbminmaxlen % 5) & 7;
2813 ptr += sizeof(ib_uint32_t);
2814
2815 col->ind = mach_read_from_4(ptr) & dict_index_t::MAX_N_FIELDS;
2816 ptr += sizeof(ib_uint32_t);
2817
2818 col->ord_part = mach_read_from_4(ptr) & 1;
2819 ptr += sizeof(ib_uint32_t);
2820
2821 col->max_prefix = mach_read_from_4(ptr) & ((1U << 12) - 1);
2822 ptr += sizeof(ib_uint32_t);
2823
2824 /* Read in the column name as [len, byte array]. The len
2825 includes the NUL byte. */
2826
2827 ulint len = mach_read_from_4(ptr);
2828
2829 /* FIXME: What is the maximum column name length? */
2830 if (len == 0 || len > 128) {
2831 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2832 ER_IO_READ_ERROR,
2833 "Column name length " ULINTPF ", is invalid",
2834 len);
2835
2836 return(DB_CORRUPTION);
2837 }
2838
2839 cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2840
2841 /* Trigger OOM */
2842 DBUG_EXECUTE_IF(
2843 "ib_import_OOM_10",
2844 UT_DELETE_ARRAY(cfg->m_col_names[i]);
2845 cfg->m_col_names[i] = NULL;
2846 );
2847
2848 if (cfg->m_col_names[i] == NULL) {
2849 return(DB_OUT_OF_MEMORY);
2850 }
2851
2852 dberr_t err;
2853
2854 err = row_import_cfg_read_string(
2855 file, cfg->m_col_names[i], len);
2856
2857 if (err != DB_SUCCESS) {
2858
2859 ib_senderrf(
2860 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2861 (ulong) errno, strerror(errno),
2862 "while parsing table column name.");
2863
2864 return(err);
2865 }
2866 }
2867
2868 return(DB_SUCCESS);
2869 }
2870
2871 /*****************************************************************//**
2872 Read the contents of the <tablespace>.cfg file.
2873 @return DB_SUCCESS or error code. */
2874 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2875 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2876 row_import_read_v1(
2877 /*===============*/
2878 FILE* file, /*!< in: File to read from */
2879 THD* thd, /*!< in: session */
2880 row_import* cfg) /*!< out: meta data */
2881 {
2882 byte value[sizeof(ib_uint32_t)];
2883
2884 /* Trigger EOF */
2885 DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2886 (void) fseek(file, 0L, SEEK_END););
2887
2888 /* Read the hostname where the tablespace was exported. */
2889 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2890 ib_senderrf(
2891 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2892 (ulong) errno, strerror(errno),
2893 "while reading meta-data export hostname length.");
2894
2895 return(DB_IO_ERROR);
2896 }
2897
2898 ulint len = mach_read_from_4(value);
2899
2900 /* NUL byte is part of name length. */
2901 cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2902
2903 /* Trigger OOM */
2904 DBUG_EXECUTE_IF(
2905 "ib_import_OOM_1",
2906 UT_DELETE_ARRAY(cfg->m_hostname);
2907 cfg->m_hostname = NULL;
2908 );
2909
2910 if (cfg->m_hostname == NULL) {
2911 return(DB_OUT_OF_MEMORY);
2912 }
2913
2914 dberr_t err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2915
2916 if (err != DB_SUCCESS) {
2917
2918 ib_senderrf(
2919 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2920 (ulong) errno, strerror(errno),
2921 "while parsing export hostname.");
2922
2923 return(err);
2924 }
2925
2926 /* Trigger EOF */
2927 DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2928 (void) fseek(file, 0L, SEEK_END););
2929
2930 /* Read the table name of tablespace that was exported. */
2931 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2932 ib_senderrf(
2933 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2934 (ulong) errno, strerror(errno),
2935 "while reading meta-data table name length.");
2936
2937 return(DB_IO_ERROR);
2938 }
2939
2940 len = mach_read_from_4(value);
2941
2942 /* NUL byte is part of name length. */
2943 cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2944
2945 /* Trigger OOM */
2946 DBUG_EXECUTE_IF(
2947 "ib_import_OOM_2",
2948 UT_DELETE_ARRAY(cfg->m_table_name);
2949 cfg->m_table_name = NULL;
2950 );
2951
2952 if (cfg->m_table_name == NULL) {
2953 return(DB_OUT_OF_MEMORY);
2954 }
2955
2956 err = row_import_cfg_read_string(file, cfg->m_table_name, len);
2957
2958 if (err != DB_SUCCESS) {
2959 ib_senderrf(
2960 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2961 (ulong) errno, strerror(errno),
2962 "while parsing table name.");
2963
2964 return(err);
2965 }
2966
2967 ib::info() << "Importing tablespace for table '" << cfg->m_table_name
2968 << "' that was exported from host '" << cfg->m_hostname << "'";
2969
2970 byte row[sizeof(ib_uint32_t) * 3];
2971
2972 /* Trigger EOF */
2973 DBUG_EXECUTE_IF("ib_import_io_read_error_7",
2974 (void) fseek(file, 0L, SEEK_END););
2975
2976 /* Read the autoinc value. */
2977 if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
2978 ib_senderrf(
2979 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2980 (ulong) errno, strerror(errno),
2981 "while reading autoinc value.");
2982
2983 return(DB_IO_ERROR);
2984 }
2985
2986 cfg->m_autoinc = mach_read_from_8(row);
2987
2988 /* Trigger EOF */
2989 DBUG_EXECUTE_IF("ib_import_io_read_error_8",
2990 (void) fseek(file, 0L, SEEK_END););
2991
2992 /* Read the tablespace page size. */
2993 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2994 ib_senderrf(
2995 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2996 (ulong) errno, strerror(errno),
2997 "while reading meta-data header.");
2998
2999 return(DB_IO_ERROR);
3000 }
3001
3002 byte* ptr = row;
3003
3004 const ulint logical_page_size = mach_read_from_4(ptr);
3005 ptr += sizeof(ib_uint32_t);
3006
3007 if (logical_page_size != srv_page_size) {
3008
3009 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3010 "Tablespace to be imported has a different"
3011 " page size than this server. Server page size"
3012 " is %lu, whereas tablespace page size"
3013 " is " ULINTPF,
3014 srv_page_size,
3015 logical_page_size);
3016
3017 return(DB_ERROR);
3018 }
3019
3020 cfg->m_flags = mach_read_from_4(ptr);
3021 ptr += sizeof(ib_uint32_t);
3022
3023 cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags);
3024 cfg->m_n_cols = mach_read_from_4(ptr);
3025
3026 if (!dict_tf_is_valid(cfg->m_flags)) {
3027 ib_errf(thd, IB_LOG_LEVEL_ERROR,
3028 ER_TABLE_SCHEMA_MISMATCH,
3029 "Invalid table flags: " ULINTPF, cfg->m_flags);
3030
3031 return(DB_CORRUPTION);
3032 }
3033
3034 err = row_import_read_columns(file, thd, cfg);
3035
3036 if (err == DB_SUCCESS) {
3037 err = row_import_read_indexes(file, thd, cfg);
3038 }
3039
3040 return(err);
3041 }
3042
3043 /**
3044 Read the contents of the <tablespace>.cfg file.
3045 @return DB_SUCCESS or error code. */
3046 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3047 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3048 row_import_read_meta_data(
3049 /*======================*/
3050 FILE* file, /*!< in: File to read from */
3051 THD* thd, /*!< in: session */
3052 row_import& cfg) /*!< out: contents of the .cfg file */
3053 {
3054 byte row[sizeof(ib_uint32_t)];
3055
3056 /* Trigger EOF */
3057 DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3058 (void) fseek(file, 0L, SEEK_END););
3059
3060 if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3061 ib_senderrf(
3062 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3063 (ulong) errno, strerror(errno),
3064 "while reading meta-data version.");
3065
3066 return(DB_IO_ERROR);
3067 }
3068
3069 cfg.m_version = mach_read_from_4(row);
3070
3071 /* Check the version number. */
3072 switch (cfg.m_version) {
3073 case IB_EXPORT_CFG_VERSION_V1:
3074
3075 return(row_import_read_v1(file, thd, &cfg));
3076 default:
3077 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3078 "Unsupported meta-data version number (" ULINTPF "), "
3079 "file ignored", cfg.m_version);
3080 }
3081
3082 return(DB_ERROR);
3083 }
3084
3085 #define BTR_BLOB_HDR_PART_LEN 0 /*!< BLOB part len on this page */
3086 #define BTR_BLOB_HDR_NEXT_PAGE_NO 4 /*!< next BLOB part page no,
3087 FIL_NULL if none */
3088 #define BTR_BLOB_HDR_SIZE 8 /*!< Size of a BLOB part header, in bytes */
3089
3090 /* decrypt and decompress page if needed */
decrypt_decompress(fil_space_crypt_t * space_crypt,size_t space_flags,span<byte> page,size_t space_id,byte * page_compress_buf)3091 static dberr_t decrypt_decompress(fil_space_crypt_t *space_crypt,
3092 size_t space_flags, span<byte> page,
3093 size_t space_id, byte *page_compress_buf)
3094 {
3095 auto *data= page.data();
3096
3097 if (space_crypt && space_crypt->should_encrypt())
3098 {
3099 if (!buf_page_verify_crypt_checksum(data, space_flags))
3100 return DB_CORRUPTION;
3101
3102 if (dberr_t err= fil_space_decrypt(space_id, space_crypt, data,
3103 page.size(), space_flags, data))
3104 return err;
3105 }
3106
3107 bool page_compressed= false;
3108
3109 if (fil_space_t::full_crc32(space_flags) &&
3110 fil_space_t::is_compressed(space_flags))
3111 page_compressed= buf_page_is_compressed(data, space_flags);
3112 else
3113 {
3114 switch (fil_page_get_type(data)) {
3115 case FIL_PAGE_PAGE_COMPRESSED:
3116 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3117 page_compressed= true;
3118 }
3119 }
3120
3121 if (page_compressed)
3122 {
3123 auto compress_length=
3124 fil_page_decompress(page_compress_buf, data, space_flags);
3125 ut_ad(compress_length != srv_page_size);
3126
3127 if (compress_length == 0)
3128 return DB_CORRUPTION;
3129 }
3130
3131 return DB_SUCCESS;
3132 }
3133
get_buf_size()3134 static size_t get_buf_size()
3135 {
3136 return srv_page_size
3137 #ifdef HAVE_LZO
3138 + LZO1X_1_15_MEM_COMPRESS
3139 #elif defined HAVE_SNAPPY
3140 + snappy_max_compressed_length(srv_page_size)
3141 #endif
3142 ;
3143 }
3144
3145 /* find, parse instant metadata, performing variaous checks,
3146 and apply it to dict_table_t
3147 @return DB_SUCCESS or some error */
handle_instant_metadata(dict_table_t * table,const row_import & cfg)3148 static dberr_t handle_instant_metadata(dict_table_t *table,
3149 const row_import &cfg)
3150 {
3151 dict_get_and_save_data_dir_path(table, false);
3152
3153 char *filepath;
3154 if (DICT_TF_HAS_DATA_DIR(table->flags))
3155 {
3156 ut_a(table->data_dir_path);
3157
3158 filepath=
3159 fil_make_filepath(table->data_dir_path, table->name.m_name, IBD, true);
3160 }
3161 else
3162 filepath= fil_make_filepath(nullptr, table->name.m_name, IBD, false);
3163
3164 if (!filepath)
3165 return DB_OUT_OF_MEMORY;
3166
3167 SCOPE_EXIT([filepath]() { ut_free(filepath); });
3168
3169 bool success;
3170 auto file= os_file_create_simple_no_error_handling(
3171 innodb_data_file_key, filepath, OS_FILE_OPEN, OS_FILE_READ_WRITE, false,
3172 &success);
3173 if (!success)
3174 return DB_IO_ERROR;
3175
3176 if (os_file_get_size(file) < srv_page_size * 4)
3177 return DB_CORRUPTION;
3178
3179 SCOPE_EXIT([&file]() { os_file_close(file); });
3180
3181 std::unique_ptr<byte[], decltype(&aligned_free)> first_page(
3182 static_cast<byte *>(aligned_malloc(srv_page_size, srv_page_size)),
3183 &aligned_free);
3184
3185 if (dberr_t err= os_file_read_no_error_handling(IORequestReadPartial,
3186 file, first_page.get(), 0,
3187 srv_page_size, nullptr))
3188 return err;
3189
3190 auto space_flags= fsp_header_get_flags(first_page.get());
3191
3192 if (!fil_space_t::is_valid_flags(space_flags, true))
3193 {
3194 auto cflags= fsp_flags_convert_from_101(space_flags);
3195 if (cflags == ULINT_UNDEFINED)
3196 {
3197 ib::error() << "Invalid FSP_SPACE_FLAGS=" << ib::hex(space_flags);
3198 return DB_CORRUPTION;
3199 }
3200 space_flags= static_cast<decltype(space_flags)>(cflags);
3201 }
3202
3203 if (!cfg.m_missing)
3204 {
3205 if (dberr_t err= cfg.match_flags(current_thd))
3206 return err;
3207 }
3208
3209 const unsigned zip_size= fil_space_t::zip_size(space_flags);
3210 const unsigned physical_size= zip_size ? zip_size : unsigned(srv_page_size);
3211 ut_ad(physical_size <= UNIV_PAGE_SIZE_MAX);
3212 const uint32_t space_id= page_get_space_id(first_page.get());
3213
3214 auto *space_crypt= fil_space_read_crypt_data(zip_size, first_page.get());
3215 SCOPE_EXIT([&space_crypt]() {
3216 if (space_crypt)
3217 fil_space_destroy_crypt_data(&space_crypt);
3218 });
3219
3220 std::unique_ptr<byte[], decltype(&aligned_free)> page(
3221 static_cast<byte *>(
3222 aligned_malloc(UNIV_PAGE_SIZE_MAX, UNIV_PAGE_SIZE_MAX)),
3223 &aligned_free);
3224
3225 if (dberr_t err= os_file_read_no_error_handling(
3226 IORequestReadPartial, file, page.get(), 3 * physical_size,
3227 physical_size, nullptr))
3228 return err;
3229
3230 std::unique_ptr<byte[]> page_compress_buf(new byte[get_buf_size()]);
3231
3232 if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3233 {page.get(), static_cast<size_t>
3234 (physical_size)},
3235 space_id, page_compress_buf.get()))
3236 return err;
3237
3238 if (table->supports_instant())
3239 {
3240 dict_index_t *index= dict_table_get_first_index(table);
3241
3242 auto tmp1= table->space_id;
3243 table->space_id= page_get_space_id(page.get());
3244 SCOPE_EXIT([tmp1, table]() { table->space_id= tmp1; });
3245
3246 auto tmp2= index->page;
3247 index->page= page_get_page_no(page.get());
3248 SCOPE_EXIT([tmp2, index]() { index->page= tmp2; });
3249
3250 if (!page_is_comp(page.get()) != !dict_table_is_comp(table))
3251 {
3252 ib_errf(current_thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3253 "ROW_FORMAT mismatch");
3254 return DB_CORRUPTION;
3255 }
3256
3257 if (btr_cur_instant_root_init(index, page.get()))
3258 return DB_ERROR;
3259
3260 ut_ad(index->n_core_null_bytes != dict_index_t::NO_CORE_NULL_BYTES);
3261
3262 if (fil_page_get_type(page.get()) == FIL_PAGE_INDEX)
3263 {
3264 ut_ad(!index->is_instant());
3265 return DB_SUCCESS;
3266 }
3267
3268 mem_heap_t *heap= NULL;
3269 SCOPE_EXIT([&heap]() {
3270 if (heap)
3271 mem_heap_free(heap);
3272 });
3273
3274 while (btr_page_get_level(page.get()) != 0)
3275 {
3276 const rec_t *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3277
3278 /* Relax the assertion in rec_init_offsets(). */
3279 ut_ad(!index->in_instant_init);
3280 ut_d(index->in_instant_init= true);
3281 rec_offs *offsets=
3282 rec_get_offsets(rec, index, nullptr, 0, ULINT_UNDEFINED, &heap);
3283 ut_d(index->in_instant_init= false);
3284
3285 uint64_t child_page_no= btr_node_ptr_get_child_page_no(rec, offsets);
3286
3287 if (dberr_t err=
3288 os_file_read_no_error_handling(IORequestReadPartial, file,
3289 page.get(),
3290 child_page_no * physical_size,
3291 physical_size, nullptr))
3292 return err;
3293
3294 if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3295 {page.get(), static_cast<size_t>
3296 (physical_size)}, space_id,
3297 page_compress_buf.get()))
3298 return err;
3299 }
3300
3301 const auto *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3302 const auto comp= dict_table_is_comp(index->table);
3303 const auto info_bits= rec_get_info_bits(rec, comp);
3304
3305 if (page_rec_is_supremum(rec) || !(info_bits & REC_INFO_MIN_REC_FLAG))
3306 {
3307 ib::error() << "Table " << index->table->name
3308 << " is missing instant ALTER metadata";
3309 index->table->corrupted= true;
3310 return DB_CORRUPTION;
3311 }
3312
3313 if ((info_bits & ~REC_INFO_DELETED_FLAG) != REC_INFO_MIN_REC_FLAG ||
3314 (comp && rec_get_status(rec) != REC_STATUS_INSTANT))
3315 {
3316 incompatible:
3317 ib::error() << "Table " << index->table->name
3318 << " contains unrecognizable instant ALTER metadata";
3319 index->table->corrupted= true;
3320 return DB_CORRUPTION;
3321 }
3322
3323 if (info_bits & REC_INFO_DELETED_FLAG)
3324 {
3325 ulint trx_id_offset= index->trx_id_offset;
3326 ut_ad(index->n_uniq);
3327
3328 if (trx_id_offset)
3329 {
3330 }
3331 else if (index->table->not_redundant())
3332 {
3333
3334 for (uint i= index->n_uniq; i--;)
3335 trx_id_offset+= index->fields[i].fixed_len;
3336 }
3337 else if (rec_get_1byte_offs_flag(rec))
3338 {
3339 trx_id_offset= rec_1_get_field_end_info(rec, index->n_uniq - 1);
3340 ut_ad(!(trx_id_offset & REC_1BYTE_SQL_NULL_MASK));
3341 trx_id_offset&= ~REC_1BYTE_SQL_NULL_MASK;
3342 }
3343 else
3344 {
3345 trx_id_offset= rec_2_get_field_end_info(rec, index->n_uniq - 1);
3346 ut_ad(!(trx_id_offset & REC_2BYTE_SQL_NULL_MASK));
3347 trx_id_offset&= ~REC_2BYTE_SQL_NULL_MASK;
3348 }
3349
3350 const byte *ptr=
3351 rec + trx_id_offset + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3352
3353 if (mach_read_from_4(ptr + BTR_EXTERN_LEN))
3354 goto incompatible;
3355
3356 uint len= mach_read_from_4(ptr + BTR_EXTERN_LEN + 4);
3357 if (!len || mach_read_from_4(ptr + BTR_EXTERN_OFFSET) != FIL_PAGE_DATA)
3358 goto incompatible;
3359
3360 std::unique_ptr<byte[], decltype(&aligned_free)>
3361 second_page(static_cast<byte*>(aligned_malloc(physical_size,
3362 physical_size)),
3363 &aligned_free);
3364
3365 if (dberr_t err=
3366 os_file_read_no_error_handling(IORequestReadPartial, file,
3367 second_page.get(), physical_size *
3368 mach_read_from_4(ptr +
3369 BTR_EXTERN_PAGE_NO),
3370 srv_page_size, nullptr))
3371 return err;
3372
3373 if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3374 {second_page.get(),
3375 static_cast<size_t>(physical_size)},
3376 space_id, page_compress_buf.get()))
3377 return err;
3378
3379 if (fil_page_get_type(second_page.get()) != FIL_PAGE_TYPE_BLOB ||
3380 mach_read_from_4(
3381 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_NEXT_PAGE_NO]) !=
3382 FIL_NULL ||
3383 mach_read_from_4(
3384 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_PART_LEN]) != len)
3385 goto incompatible;
3386
3387 /* The unused part of the BLOB page should be zero-filled. */
3388 for (const byte *
3389 b= second_page.get() + (FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE) +
3390 len,
3391 *const end= second_page.get() + srv_page_size - BTR_EXTERN_LEN;
3392 b < end;)
3393 {
3394 if (*b++)
3395 goto incompatible;
3396 }
3397
3398 if (index->table->deserialise_columns(
3399 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE], len))
3400 goto incompatible;
3401 }
3402
3403 rec_offs *offsets= rec_get_offsets(
3404 rec, index, nullptr, index->n_core_fields, ULINT_UNDEFINED, &heap);
3405 if (rec_offs_any_default(offsets))
3406 {
3407 inconsistent:
3408 goto incompatible;
3409 }
3410
3411 /* In fact, because we only ever append fields to the metadata
3412 record, it is also OK to perform READ UNCOMMITTED and
3413 then ignore any extra fields, provided that
3414 trx_sys.is_registered(DB_TRX_ID). */
3415 if (rec_offs_n_fields(offsets) >
3416 ulint(index->n_fields) + !!index->table->instant &&
3417 !trx_sys.is_registered(current_trx(),
3418 row_get_rec_trx_id(rec, index, offsets)))
3419 goto inconsistent;
3420
3421 for (unsigned i= index->n_core_fields; i < index->n_fields; i++)
3422 {
3423 dict_col_t *col= index->fields[i].col;
3424 const unsigned o= i + !!index->table->instant;
3425 ulint len;
3426 const byte *data= rec_get_nth_field(rec, offsets, o, &len);
3427 ut_ad(!col->is_added());
3428 ut_ad(!col->def_val.data);
3429 col->def_val.len= len;
3430 switch (len) {
3431 case UNIV_SQL_NULL:
3432 continue;
3433 case 0:
3434 col->def_val.data= field_ref_zero;
3435 continue;
3436 }
3437 ut_ad(len != UNIV_SQL_DEFAULT);
3438 if (!rec_offs_nth_extern(offsets, o))
3439 col->def_val.data= mem_heap_dup(index->table->heap, data, len);
3440 else if (len < BTR_EXTERN_FIELD_REF_SIZE ||
3441 !memcmp(data + len - BTR_EXTERN_FIELD_REF_SIZE, field_ref_zero,
3442 BTR_EXTERN_FIELD_REF_SIZE))
3443 {
3444 col->def_val.len= UNIV_SQL_DEFAULT;
3445 goto inconsistent;
3446 }
3447 else
3448 {
3449 col->def_val.data= btr_copy_externally_stored_field(
3450 &col->def_val.len, data, srv_page_size, len, index->table->heap);
3451 }
3452 }
3453 }
3454
3455 return DB_SUCCESS;
3456 }
3457
3458 /**
3459 Read the contents of the <tablename>.cfg file.
3460 @return DB_SUCCESS or error code. */
3461 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3462 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3463 row_import_read_cfg(
3464 /*================*/
3465 dict_table_t* table, /*!< in: table */
3466 THD* thd, /*!< in: session */
3467 row_import& cfg) /*!< out: contents of the .cfg file */
3468 {
3469 dberr_t err;
3470 char name[OS_FILE_MAX_PATH];
3471
3472 cfg.m_table = table;
3473
3474 srv_get_meta_data_filename(table, name, sizeof(name));
3475
3476 FILE* file = fopen(name, "rb");
3477
3478 if (file == NULL) {
3479 char msg[BUFSIZ];
3480
3481 snprintf(msg, sizeof(msg),
3482 "Error opening '%s', will attempt to import"
3483 " without schema verification", name);
3484
3485 ib_senderrf(
3486 thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3487 (ulong) errno, strerror(errno), msg);
3488
3489 cfg.m_missing = true;
3490
3491 err = DB_FAIL;
3492 } else {
3493
3494 cfg.m_missing = false;
3495
3496 err = row_import_read_meta_data(file, thd, cfg);
3497 fclose(file);
3498 }
3499
3500 return(err);
3501 }
3502
3503 /** Update the root page numbers and tablespace ID of a table.
3504 @param[in,out] trx dictionary transaction
3505 @param[in,out] table persistent table
3506 @param[in] reset whether to reset the fields to FIL_NULL
3507 @return DB_SUCCESS or error code */
3508 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3509 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3510 {
3511 const dict_index_t* index;
3512 que_t* graph = 0;
3513 dberr_t err = DB_SUCCESS;
3514
3515 ut_ad(reset || table->space->id == table->space_id);
3516
3517 static const char sql[] = {
3518 "PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3519 "BEGIN\n"
3520 "UPDATE SYS_INDEXES\n"
3521 "SET SPACE = :space,\n"
3522 " PAGE_NO = :page,\n"
3523 " TYPE = :type\n"
3524 "WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3525 "END;\n"};
3526
3527 table->def_trx_id = trx->id;
3528
3529 for (index = dict_table_get_first_index(table);
3530 index != 0;
3531 index = dict_table_get_next_index(index)) {
3532
3533 pars_info_t* info;
3534 ib_uint32_t page;
3535 ib_uint32_t space;
3536 ib_uint32_t type;
3537 index_id_t index_id;
3538 table_id_t table_id;
3539
3540 info = (graph != 0) ? graph->info : pars_info_create();
3541
3542 mach_write_to_4(
3543 reinterpret_cast<byte*>(&type),
3544 index->type);
3545
3546 mach_write_to_4(
3547 reinterpret_cast<byte*>(&page),
3548 reset ? FIL_NULL : index->page);
3549
3550 mach_write_to_4(
3551 reinterpret_cast<byte*>(&space),
3552 reset ? FIL_NULL : index->table->space_id);
3553
3554 mach_write_to_8(
3555 reinterpret_cast<byte*>(&index_id),
3556 index->id);
3557
3558 mach_write_to_8(
3559 reinterpret_cast<byte*>(&table_id),
3560 table->id);
3561
3562 /* If we set the corrupt bit during the IMPORT phase then
3563 we need to update the system tables. */
3564 pars_info_bind_int4_literal(info, "type", &type);
3565 pars_info_bind_int4_literal(info, "space", &space);
3566 pars_info_bind_int4_literal(info, "page", &page);
3567 pars_info_bind_ull_literal(info, "index_id", &index_id);
3568 pars_info_bind_ull_literal(info, "table_id", &table_id);
3569
3570 if (graph == 0) {
3571 graph = pars_sql(info, sql);
3572 ut_a(graph);
3573 graph->trx = trx;
3574 }
3575
3576 que_thr_t* thr;
3577
3578 graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3579
3580 ut_a(thr = que_fork_start_command(graph));
3581
3582 que_run_threads(thr);
3583
3584 DBUG_EXECUTE_IF("ib_import_internal_error",
3585 trx->error_state = DB_ERROR;);
3586
3587 err = trx->error_state;
3588
3589 if (err != DB_SUCCESS) {
3590 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3591 ER_INTERNAL_ERROR,
3592 "While updating the <space, root page"
3593 " number> of index %s - %s",
3594 index->name(), ut_strerr(err));
3595
3596 break;
3597 }
3598 }
3599
3600 que_graph_free(graph);
3601
3602 return(err);
3603 }
3604
3605 /** Callback arg for row_import_set_discarded. */
3606 struct discard_t {
3607 ib_uint32_t flags2; /*!< Value read from column */
3608 bool state; /*!< New state of the flag */
3609 ulint n_recs; /*!< Number of recs processed */
3610 };
3611
3612 /******************************************************************//**
3613 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3614 SYS_TABLES. The flags is stored in MIX_LEN column.
3615 @return FALSE if all OK */
3616 static
3617 ibool
row_import_set_discarded(void * row,void * user_arg)3618 row_import_set_discarded(
3619 /*=====================*/
3620 void* row, /*!< in: sel_node_t* */
3621 void* user_arg) /*!< in: bool set/unset flag */
3622 {
3623 sel_node_t* node = static_cast<sel_node_t*>(row);
3624 discard_t* discard = static_cast<discard_t*>(user_arg);
3625 dfield_t* dfield = que_node_get_val(node->select_list);
3626 dtype_t* type = dfield_get_type(dfield);
3627 ulint len = dfield_get_len(dfield);
3628
3629 ut_a(dtype_get_mtype(type) == DATA_INT);
3630 ut_a(len == sizeof(ib_uint32_t));
3631
3632 ulint flags2 = mach_read_from_4(
3633 static_cast<byte*>(dfield_get_data(dfield)));
3634
3635 if (discard->state) {
3636 flags2 |= DICT_TF2_DISCARDED;
3637 } else {
3638 flags2 &= ~DICT_TF2_DISCARDED;
3639 }
3640
3641 mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3642
3643 ++discard->n_recs;
3644
3645 /* There should be at most one matching record. */
3646 ut_a(discard->n_recs == 1);
3647
3648 return(FALSE);
3649 }
3650
3651 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3652 @param[in,out] trx dictionary transaction
3653 @param[in] table_id table identifier
3654 @param[in] discarded whether to set or clear the flag
3655 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3656 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3657 bool discarded)
3658 {
3659 pars_info_t* info;
3660 discard_t discard;
3661
3662 static const char sql[] =
3663 "PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3664 "DECLARE FUNCTION my_func;\n"
3665 "DECLARE CURSOR c IS\n"
3666 " SELECT MIX_LEN"
3667 " FROM SYS_TABLES"
3668 " WHERE ID = :table_id FOR UPDATE;"
3669 "\n"
3670 "BEGIN\n"
3671 "OPEN c;\n"
3672 "WHILE 1 = 1 LOOP\n"
3673 " FETCH c INTO my_func();\n"
3674 " IF c % NOTFOUND THEN\n"
3675 " EXIT;\n"
3676 " END IF;\n"
3677 "END LOOP;\n"
3678 "UPDATE SYS_TABLES"
3679 " SET MIX_LEN = :flags2"
3680 " WHERE ID = :table_id;\n"
3681 "CLOSE c;\n"
3682 "END;\n";
3683
3684 discard.n_recs = 0;
3685 discard.state = discarded;
3686 discard.flags2 = ULINT32_UNDEFINED;
3687
3688 info = pars_info_create();
3689
3690 pars_info_add_ull_literal(info, "table_id", table_id);
3691 pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3692
3693 pars_info_bind_function(
3694 info, "my_func", row_import_set_discarded, &discard);
3695
3696 dberr_t err = que_eval_sql(info, sql, false, trx);
3697
3698 ut_a(discard.n_recs == 1);
3699 ut_a(discard.flags2 != ULINT32_UNDEFINED);
3700
3701 return(err);
3702 }
3703
3704 /** InnoDB writes page by page when there is page compressed
3705 tablespace involved. It does help to save the disk space when
3706 punch hole is enabled
3707 @param iter Tablespace iterator
3708 @param full_crc32 whether the file is in the full_crc32 format
3709 @param offset offset of the file to be written
3710 @param writeptr buffer to be written
3711 @param n_bytes number of bytes to be written
3712 @param try_punch_only Try the range punch only because the
3713 current range is full of empty pages
3714 @return DB_SUCCESS */
3715 static
fil_import_compress_fwrite(const fil_iterator_t & iter,bool full_crc32,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3716 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3717 bool full_crc32,
3718 os_offset_t offset,
3719 const byte *writeptr,
3720 ulint n_bytes,
3721 bool try_punch_only= false)
3722 {
3723 if (dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes))
3724 return err;
3725
3726 if (try_punch_only)
3727 return DB_SUCCESS;
3728
3729 for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3730 {
3731 /* Read the original data length from block and
3732 safer to read FIL_PAGE_COMPRESSED_SIZE because it
3733 is not encrypted*/
3734 ulint n_write_bytes= srv_page_size;
3735 if (j || offset)
3736 {
3737 n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3738 const unsigned ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3739 /* Ignore the empty page */
3740 if (ptype == 0 && n_write_bytes == 0)
3741 continue;
3742 if (full_crc32)
3743 n_write_bytes= buf_page_full_crc32_size(writeptr + j,
3744 nullptr, nullptr);
3745 else
3746 {
3747 n_write_bytes+= ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3748 ? FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN
3749 : FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
3750 }
3751 }
3752
3753 if (dberr_t err= os_file_write(IORequestWrite, iter.filepath, iter.file,
3754 writeptr + j, offset + j, n_write_bytes))
3755 return err;
3756 }
3757
3758 return DB_SUCCESS;
3759 }
3760
run(const fil_iterator_t & iter,buf_block_t * block)3761 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3762 buf_block_t* block) UNIV_NOTHROW
3763 {
3764 const unsigned zip_size= fil_space_t::zip_size(m_space_flags);
3765 const unsigned size= zip_size ? zip_size : unsigned(srv_page_size);
3766 byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3767 const bool full_crc32 = fil_space_t::full_crc32(m_space_flags);
3768 bool skip_checksum_check = false;
3769 ut_ad(!srv_read_only_mode);
3770
3771 if (!page_compress_buf)
3772 return DB_OUT_OF_MEMORY;
3773
3774 const bool encrypted= iter.crypt_data != NULL &&
3775 iter.crypt_data->should_encrypt();
3776 byte* const readptr= iter.io_buffer;
3777 block->frame= readptr;
3778
3779 if (block->page.zip.data)
3780 block->page.zip.data= readptr;
3781
3782 bool page_compressed= false;
3783
3784 dberr_t err= os_file_read_no_error_handling(
3785 IORequestReadPartial, iter.file, readptr, 3 * size, size, 0);
3786 if (err != DB_SUCCESS)
3787 {
3788 ib::error() << iter.filepath << ": os_file_read() failed";
3789 goto func_exit;
3790 }
3791
3792 if (page_get_page_no(readptr) != 3)
3793 {
3794 page_corrupted:
3795 ib::warn() << filename() << ": Page 3 at offset "
3796 << 3 * size << " looks corrupted.";
3797 err= DB_CORRUPTION;
3798 goto func_exit;
3799 }
3800
3801 block->page.id_.set_page_no(3);
3802 if (full_crc32 && fil_space_t::is_compressed(m_space_flags))
3803 page_compressed= buf_page_is_compressed(readptr, m_space_flags);
3804 else
3805 {
3806 switch (fil_page_get_type(readptr)) {
3807 case FIL_PAGE_PAGE_COMPRESSED:
3808 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3809 if (block->page.zip.data)
3810 goto page_corrupted;
3811 page_compressed= true;
3812 }
3813 }
3814
3815 if (encrypted)
3816 {
3817 if (!buf_page_verify_crypt_checksum(readptr, m_space_flags))
3818 goto page_corrupted;
3819
3820 if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3821 buf_page_get_key_version(readptr, m_space_flags))
3822 goto page_corrupted;
3823
3824 if ((err= fil_space_decrypt(get_space_id(), iter.crypt_data, readptr, size,
3825 m_space_flags, readptr)))
3826 goto func_exit;
3827 }
3828
3829 /* For full_crc32 format, skip checksum check
3830 after decryption. */
3831 skip_checksum_check= full_crc32 && encrypted;
3832
3833 if (page_compressed)
3834 {
3835 ulint compress_length= fil_page_decompress(page_compress_buf,
3836 readptr,
3837 m_space_flags);
3838 ut_ad(compress_length != srv_page_size);
3839 if (compress_length == 0)
3840 goto page_corrupted;
3841 }
3842 else if (!skip_checksum_check
3843 && buf_page_is_corrupted(false, readptr, m_space_flags))
3844 goto page_corrupted;
3845
3846 err= this->operator()(block);
3847 func_exit:
3848 free(page_compress_buf);
3849 return err;
3850 }
3851
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3852 static dberr_t fil_iterate(
3853 const fil_iterator_t& iter,
3854 buf_block_t* block,
3855 AbstractCallback& callback)
3856 {
3857 os_offset_t offset;
3858 const ulint size = callback.physical_size();
3859 ulint n_bytes = iter.n_io_buffers * size;
3860
3861 byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3862 ut_ad(!srv_read_only_mode);
3863
3864 if (!page_compress_buf) {
3865 return DB_OUT_OF_MEMORY;
3866 }
3867
3868 ulint actual_space_id = 0;
3869 const bool full_crc32 = fil_space_t::full_crc32(
3870 callback.get_space_flags());
3871
3872 /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3873 copying for non-index pages. Unfortunately, it is
3874 required by buf_zip_decompress() */
3875 dberr_t err = DB_SUCCESS;
3876 bool page_compressed = false;
3877 bool punch_hole = true;
3878
3879 for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3880 if (callback.is_interrupted()) {
3881 err = DB_INTERRUPTED;
3882 goto func_exit;
3883 }
3884
3885 byte* io_buffer = iter.io_buffer;
3886 block->frame = io_buffer;
3887
3888 if (block->page.zip.data) {
3889 /* Zip IO is done in the compressed page buffer. */
3890 io_buffer = block->page.zip.data;
3891 }
3892
3893 /* We have to read the exact number of bytes. Otherwise the
3894 InnoDB IO functions croak on failed reads. */
3895
3896 n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3897 iter.end - offset));
3898
3899 ut_ad(n_bytes > 0);
3900 ut_ad(!(n_bytes % size));
3901
3902 const bool encrypted = iter.crypt_data != NULL
3903 && iter.crypt_data->should_encrypt();
3904 /* Use additional crypt io buffer if tablespace is encrypted */
3905 byte* const readptr = encrypted
3906 ? iter.crypt_io_buffer : io_buffer;
3907 byte* const writeptr = readptr;
3908
3909 err = os_file_read_no_error_handling(
3910 IORequestReadPartial,
3911 iter.file, readptr, offset, n_bytes, 0);
3912 if (err != DB_SUCCESS) {
3913 ib::error() << iter.filepath
3914 << ": os_file_read() failed";
3915 goto func_exit;
3916 }
3917
3918 bool updated = false;
3919 os_offset_t page_off = offset;
3920 ulint n_pages_read = n_bytes / size;
3921 /* This block is not attached to buf_pool */
3922 block->page.id_.set_page_no(uint32_t(page_off / size));
3923
3924 for (ulint i = 0; i < n_pages_read;
3925 ++block->page.id_,
3926 ++i, page_off += size, block->frame += size) {
3927 byte* src = readptr + i * size;
3928 const ulint page_no = page_get_page_no(src);
3929 if (!page_no && block->page.id().page_no()) {
3930 if (!buf_is_zeroes(span<const byte>(src,
3931 size))) {
3932 goto page_corrupted;
3933 }
3934 /* Proceed to the next page,
3935 because this one is all zero. */
3936 continue;
3937 }
3938
3939 if (page_no != block->page.id().page_no()) {
3940 page_corrupted:
3941 ib::warn() << callback.filename()
3942 << ": Page " << (offset / size)
3943 << " at offset " << offset
3944 << " looks corrupted.";
3945 err = DB_CORRUPTION;
3946 goto func_exit;
3947 }
3948
3949 if (block->page.id().page_no() == 0) {
3950 actual_space_id = mach_read_from_4(
3951 src + FIL_PAGE_SPACE_ID);
3952 }
3953
3954 const uint16_t type = fil_page_get_type(src);
3955 page_compressed =
3956 (full_crc32
3957 && fil_space_t::is_compressed(
3958 callback.get_space_flags())
3959 && buf_page_is_compressed(
3960 src, callback.get_space_flags()))
3961 || type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3962 || type == FIL_PAGE_PAGE_COMPRESSED;
3963
3964 if (page_compressed && block->page.zip.data) {
3965 goto page_corrupted;
3966 }
3967
3968 bool decrypted = false;
3969 byte* dst = io_buffer + i * size;
3970 bool frame_changed = false;
3971 uint key_version = buf_page_get_key_version(
3972 src, callback.get_space_flags());
3973
3974 if (!encrypted) {
3975 } else if (!key_version) {
3976 if (block->page.id().page_no() == 0
3977 && block->page.zip.data) {
3978 block->page.zip.data = src;
3979 frame_changed = true;
3980 } else if (!page_compressed
3981 && !block->page.zip.data) {
3982 block->frame = src;
3983 frame_changed = true;
3984 } else {
3985 ut_ad(dst != src);
3986 memcpy(dst, src, size);
3987 }
3988 } else {
3989 if (!buf_page_verify_crypt_checksum(
3990 src, callback.get_space_flags())) {
3991 goto page_corrupted;
3992 }
3993
3994 if ((err = fil_space_decrypt(
3995 actual_space_id,
3996 iter.crypt_data, dst,
3997 callback.physical_size(),
3998 callback.get_space_flags(),
3999 src))) {
4000 goto func_exit;
4001 }
4002
4003 decrypted = true;
4004 updated = true;
4005 }
4006
4007 /* For full_crc32 format, skip checksum check
4008 after decryption. */
4009 bool skip_checksum_check = full_crc32 && encrypted;
4010
4011 /* If the original page is page_compressed, we need
4012 to decompress it before adjusting further. */
4013 if (page_compressed) {
4014 ulint compress_length = fil_page_decompress(
4015 page_compress_buf, dst,
4016 callback.get_space_flags());
4017 ut_ad(compress_length != srv_page_size);
4018 if (compress_length == 0) {
4019 goto page_corrupted;
4020 }
4021 updated = true;
4022 } else if (!skip_checksum_check
4023 && buf_page_is_corrupted(
4024 false,
4025 encrypted && !frame_changed
4026 ? dst : src,
4027 callback.get_space_flags())) {
4028 goto page_corrupted;
4029 }
4030
4031 if ((err = callback(block)) != DB_SUCCESS) {
4032 goto func_exit;
4033 } else if (!updated) {
4034 updated = block->page.state()
4035 == BUF_BLOCK_FILE_PAGE;
4036 }
4037
4038 /* If tablespace is encrypted we use additional
4039 temporary scratch area where pages are read
4040 for decrypting readptr == crypt_io_buffer != io_buffer.
4041
4042 Destination for decryption is a buffer pool block
4043 block->frame == dst == io_buffer that is updated.
4044 Pages that did not require decryption even when
4045 tablespace is marked as encrypted are not copied
4046 instead block->frame is set to src == readptr.
4047
4048 For encryption we again use temporary scratch area
4049 writeptr != io_buffer == dst
4050 that is then written to the tablespace
4051
4052 (1) For normal tables io_buffer == dst == writeptr
4053 (2) For only page compressed tables
4054 io_buffer == dst == writeptr
4055 (3) For encrypted (and page compressed)
4056 readptr != io_buffer == dst != writeptr
4057 */
4058
4059 ut_ad(!encrypted && !page_compressed ?
4060 src == dst && dst == writeptr + (i * size):1);
4061 ut_ad(page_compressed && !encrypted ?
4062 src == dst && dst == writeptr + (i * size):1);
4063 ut_ad(encrypted ?
4064 src != dst && dst != writeptr + (i * size):1);
4065
4066 /* When tablespace is encrypted or compressed its
4067 first page (i.e. page 0) is not encrypted or
4068 compressed and there is no need to copy frame. */
4069 if (encrypted && block->page.id().page_no() != 0) {
4070 byte *local_frame = callback.get_frame(block);
4071 ut_ad((writeptr + (i * size)) != local_frame);
4072 memcpy((writeptr + (i * size)), local_frame, size);
4073 }
4074
4075 if (frame_changed) {
4076 if (block->page.zip.data) {
4077 block->page.zip.data = dst;
4078 } else {
4079 block->frame = dst;
4080 }
4081 }
4082
4083 src = io_buffer + (i * size);
4084
4085 if (page_compressed) {
4086 updated = true;
4087 if (ulint len = fil_page_compress(
4088 src,
4089 page_compress_buf,
4090 callback.get_space_flags(),
4091 512,/* FIXME: proper block size */
4092 encrypted)) {
4093 /* FIXME: remove memcpy() */
4094 memcpy(src, page_compress_buf, len);
4095 memset(src + len, 0,
4096 srv_page_size - len);
4097 }
4098 }
4099
4100 /* Encrypt the page if encryption was used. */
4101 if (encrypted && decrypted) {
4102 byte *dest = writeptr + i * size;
4103
4104 byte* tmp = fil_encrypt_buf(
4105 iter.crypt_data,
4106 block->page.id().space(),
4107 block->page.id().page_no(),
4108 src, block->zip_size(), dest,
4109 full_crc32);
4110
4111 if (tmp == src) {
4112 /* TODO: remove unnecessary memcpy's */
4113 ut_ad(dest != src);
4114 memcpy(dest, src, size);
4115 }
4116
4117 updated = true;
4118 }
4119
4120 /* Write checksum for the compressed full crc32 page.*/
4121 if (full_crc32 && page_compressed) {
4122 ut_ad(updated);
4123 byte* dest = writeptr + i * size;
4124 ut_d(bool comp = false);
4125 ut_d(bool corrupt = false);
4126 ulint size = buf_page_full_crc32_size(
4127 dest,
4128 #ifdef UNIV_DEBUG
4129 &comp, &corrupt
4130 #else
4131 NULL, NULL
4132 #endif
4133 );
4134 ut_ad(!comp == (size == srv_page_size));
4135 ut_ad(!corrupt);
4136 mach_write_to_4(dest + (size - 4),
4137 ut_crc32(dest, size - 4));
4138 }
4139 }
4140
4141 if (page_compressed && punch_hole) {
4142 err = fil_import_compress_fwrite(
4143 iter, full_crc32, offset, writeptr, n_bytes,
4144 !updated);
4145
4146 if (err != DB_SUCCESS) {
4147 punch_hole = false;
4148 if (updated) {
4149 goto normal_write;
4150 }
4151 }
4152 } else if (updated) {
4153 normal_write:
4154 /* A page was updated in the set, write it back. */
4155 err = os_file_write(IORequestWrite,
4156 iter.filepath, iter.file,
4157 writeptr, offset, n_bytes);
4158
4159 if (err != DB_SUCCESS) {
4160 goto func_exit;
4161 }
4162 }
4163 }
4164
4165 func_exit:
4166 free(page_compress_buf);
4167 return err;
4168 }
4169
4170 /********************************************************************//**
4171 Iterate over all the pages in the tablespace.
4172 @param table - the table definiton in the server
4173 @param n_io_buffers - number of blocks to read and write together
4174 @param callback - functor that will do the page updates
4175 @return DB_SUCCESS or error code */
4176 static
4177 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)4178 fil_tablespace_iterate(
4179 /*===================*/
4180 dict_table_t* table,
4181 ulint n_io_buffers,
4182 AbstractCallback& callback)
4183 {
4184 dberr_t err;
4185 pfs_os_file_t file;
4186 char* filepath;
4187
4188 ut_a(n_io_buffers > 0);
4189 ut_ad(!srv_read_only_mode);
4190
4191 DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
4192 return(DB_CORRUPTION););
4193
4194 /* Make sure the data_dir_path is set. */
4195 dict_get_and_save_data_dir_path(table, false);
4196
4197 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4198 ut_a(table->data_dir_path);
4199
4200 filepath = fil_make_filepath(
4201 table->data_dir_path, table->name.m_name, IBD, true);
4202 } else {
4203 filepath = fil_make_filepath(
4204 NULL, table->name.m_name, IBD, false);
4205 }
4206
4207 if (!filepath) {
4208 return(DB_OUT_OF_MEMORY);
4209 } else {
4210 bool success;
4211
4212 file = os_file_create_simple_no_error_handling(
4213 innodb_data_file_key, filepath,
4214 OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
4215
4216 if (!success) {
4217 /* The following call prints an error message */
4218 os_file_get_last_error(true);
4219 ib::error() << "Trying to import a tablespace,"
4220 " but could not open the tablespace file "
4221 << filepath;
4222 ut_free(filepath);
4223 return DB_TABLESPACE_NOT_FOUND;
4224 } else {
4225 err = DB_SUCCESS;
4226 }
4227 }
4228
4229 callback.set_file(filepath, file);
4230
4231 os_offset_t file_size = os_file_get_size(file);
4232 ut_a(file_size != (os_offset_t) -1);
4233
4234 /* Allocate a page to read in the tablespace header, so that we
4235 can determine the page size and zip_size (if it is compressed).
4236 We allocate an extra page in case it is a compressed table. */
4237
4238 byte* page = static_cast<byte*>(aligned_malloc(2 * srv_page_size,
4239 srv_page_size));
4240
4241 buf_block_t* block = reinterpret_cast<buf_block_t*>
4242 (ut_zalloc_nokey(sizeof *block));
4243 block->frame = page;
4244 block->page.init(BUF_BLOCK_FILE_PAGE, page_id_t(~0ULL), 1);
4245
4246 /* Read the first page and determine the page and zip size. */
4247
4248 err = os_file_read_no_error_handling(IORequestReadPartial,
4249 file, page, 0, srv_page_size, 0);
4250
4251 if (err == DB_SUCCESS) {
4252 err = callback.init(file_size, block);
4253 }
4254
4255 if (err == DB_SUCCESS) {
4256 block->page.id_ = page_id_t(callback.get_space_id(), 0);
4257 if (ulint zip_size = callback.get_zip_size()) {
4258 page_zip_set_size(&block->page.zip, zip_size);
4259 /* ROW_FORMAT=COMPRESSED is not optimised for block IO
4260 for now. We do the IMPORT page by page. */
4261 n_io_buffers = 1;
4262 }
4263
4264 fil_iterator_t iter;
4265
4266 /* read (optional) crypt data */
4267 iter.crypt_data = fil_space_read_crypt_data(
4268 callback.get_zip_size(), page);
4269
4270 /* If tablespace is encrypted, it needs extra buffers */
4271 if (iter.crypt_data && n_io_buffers > 1) {
4272 /* decrease io buffers so that memory
4273 consumption will not double */
4274 n_io_buffers /= 2;
4275 }
4276
4277 iter.file = file;
4278 iter.start = 0;
4279 iter.end = file_size;
4280 iter.filepath = filepath;
4281 iter.file_size = file_size;
4282 iter.n_io_buffers = n_io_buffers;
4283
4284 /* Add an extra page for compressed page scratch area. */
4285 iter.io_buffer = static_cast<byte*>(
4286 aligned_malloc((1 + iter.n_io_buffers)
4287 << srv_page_size_shift, srv_page_size));
4288
4289 iter.crypt_io_buffer = iter.crypt_data
4290 ? static_cast<byte*>(
4291 aligned_malloc((1 + iter.n_io_buffers)
4292 << srv_page_size_shift,
4293 srv_page_size))
4294 : NULL;
4295
4296 if (block->page.zip.ssize) {
4297 ut_ad(iter.n_io_buffers == 1);
4298 block->frame = iter.io_buffer;
4299 block->page.zip.data = block->frame + srv_page_size;
4300 }
4301
4302 err = callback.run(iter, block);
4303
4304 if (iter.crypt_data) {
4305 fil_space_destroy_crypt_data(&iter.crypt_data);
4306 }
4307
4308 aligned_free(iter.crypt_io_buffer);
4309 aligned_free(iter.io_buffer);
4310 }
4311
4312 if (err == DB_SUCCESS) {
4313 ib::info() << "Sync to disk";
4314
4315 if (!os_file_flush(file)) {
4316 ib::info() << "os_file_flush() failed!";
4317 err = DB_IO_ERROR;
4318 } else {
4319 ib::info() << "Sync to disk - done!";
4320 }
4321 }
4322
4323 os_file_close(file);
4324
4325 aligned_free(page);
4326 ut_free(filepath);
4327 ut_free(block);
4328
4329 return(err);
4330 }
4331
4332 /*****************************************************************//**
4333 Imports a tablespace. The space id in the .ibd file must match the space id
4334 of the table in the data dictionary.
4335 @return error code or DB_SUCCESS */
4336 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)4337 row_import_for_mysql(
4338 /*=================*/
4339 dict_table_t* table, /*!< in/out: table */
4340 row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL */
4341 {
4342 dberr_t err;
4343 trx_t* trx;
4344 ib_uint64_t autoinc = 0;
4345 char* filepath = NULL;
4346
4347 /* The caller assured that this is not read_only_mode and that no
4348 temorary tablespace is being imported. */
4349 ut_ad(!srv_read_only_mode);
4350 ut_ad(!table->is_temporary());
4351
4352 ut_ad(table->space_id);
4353 ut_ad(table->space_id < SRV_SPACE_ID_UPPER_BOUND);
4354 ut_ad(prebuilt->trx);
4355 ut_ad(!table->is_readable());
4356
4357 ibuf_delete_for_discarded_space(table->space_id);
4358
4359 trx_start_if_not_started(prebuilt->trx, true);
4360
4361 trx = trx_create();
4362
4363 /* So that the table is not DROPped during recovery. */
4364 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4365
4366 trx_start_if_not_started(trx, true);
4367
4368 /* So that we can send error messages to the user. */
4369 trx->mysql_thd = prebuilt->trx->mysql_thd;
4370
4371 /* Ensure that the table will be dropped by trx_rollback_active()
4372 in case of a crash. */
4373
4374 trx->table_id = table->id;
4375
4376 /* Assign an undo segment for the transaction, so that the
4377 transaction will be recovered after a crash. */
4378
4379 /* TODO: Do not write any undo log for the IMPORT cleanup. */
4380 {
4381 mtr_t mtr;
4382 mtr.start();
4383 trx_undo_assign(trx, &err, &mtr);
4384 mtr.commit();
4385 }
4386
4387 DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4388 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4389
4390 if (err != DB_SUCCESS) {
4391
4392 return(row_import_cleanup(prebuilt, trx, err));
4393
4394 } else if (trx->rsegs.m_redo.undo == 0) {
4395
4396 err = DB_TOO_MANY_CONCURRENT_TRXS;
4397 return(row_import_cleanup(prebuilt, trx, err));
4398 }
4399
4400 prebuilt->trx->op_info = "read meta-data file";
4401
4402 /* Prevent DDL operations while we are checking. */
4403
4404 rw_lock_s_lock(&dict_sys.latch);
4405
4406 row_import cfg;
4407
4408 err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4409
4410 /* Check if the table column definitions match the contents
4411 of the config file. */
4412
4413 if (err == DB_SUCCESS) {
4414
4415 if (dberr_t err = handle_instant_metadata(table, cfg)) {
4416 rw_lock_s_unlock(&dict_sys.latch);
4417 return row_import_error(prebuilt, trx, err);
4418 }
4419
4420 /* We have a schema file, try and match it with our
4421 data dictionary. */
4422
4423 err = cfg.match_schema(trx->mysql_thd);
4424
4425 /* Update index->page and SYS_INDEXES.PAGE_NO to match the
4426 B-tree root page numbers in the tablespace. Use the index
4427 name from the .cfg file to find match. */
4428
4429 if (err == DB_SUCCESS) {
4430 cfg.set_root_by_name();
4431 autoinc = cfg.m_autoinc;
4432 }
4433
4434 rw_lock_s_unlock(&dict_sys.latch);
4435
4436 DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4437 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4438
4439 } else if (cfg.m_missing) {
4440
4441 rw_lock_s_unlock(&dict_sys.latch);
4442
4443 /* We don't have a schema file, we will have to discover
4444 the index root pages from the .ibd file and skip the schema
4445 matching step. */
4446
4447 ut_a(err == DB_FAIL);
4448
4449 cfg.m_zip_size = 0;
4450
4451 if (UT_LIST_GET_LEN(table->indexes) > 1) {
4452 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4453 ER_INTERNAL_ERROR,
4454 "Drop all secondary indexes before importing "
4455 "table %s when .cfg file is missing.",
4456 table->name.m_name);
4457 err = DB_ERROR;
4458 return row_import_error(prebuilt, trx, err);
4459 }
4460
4461 FetchIndexRootPages fetchIndexRootPages(table, trx);
4462
4463 err = fil_tablespace_iterate(
4464 table, IO_BUFFER_SIZE(srv_page_size),
4465 fetchIndexRootPages);
4466
4467 if (err == DB_SUCCESS) {
4468
4469 err = fetchIndexRootPages.build_row_import(&cfg);
4470
4471 /* Update index->page and SYS_INDEXES.PAGE_NO
4472 to match the B-tree root page numbers in the
4473 tablespace. */
4474
4475 if (err == DB_SUCCESS) {
4476 err = cfg.set_root_by_heuristic();
4477
4478 if (err == DB_SUCCESS) {
4479 if (dberr_t err =
4480 handle_instant_metadata(table,
4481 cfg)) {
4482 return row_import_error(
4483 prebuilt, trx, err);
4484 }
4485 }
4486 }
4487 }
4488 } else {
4489 rw_lock_s_unlock(&dict_sys.latch);
4490 }
4491
4492 if (err != DB_SUCCESS) {
4493 return(row_import_error(prebuilt, trx, err));
4494 }
4495
4496 prebuilt->trx->op_info = "importing tablespace";
4497
4498 ib::info() << "Phase I - Update all pages";
4499
4500 /* Iterate over all the pages and do the sanity checking and
4501 the conversion required to import the tablespace. */
4502
4503 PageConverter converter(&cfg, table->space_id, trx);
4504
4505 /* Set the IO buffer size in pages. */
4506
4507 err = fil_tablespace_iterate(
4508 table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size
4509 : srv_page_size), converter);
4510
4511 DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4512 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4513 #ifdef BTR_CUR_HASH_ADAPT
4514 /* On DISCARD TABLESPACE, we did not drop any adaptive hash
4515 index entries. If we replaced the discarded tablespace with a
4516 smaller one here, there could still be some adaptive hash
4517 index entries that point to cached garbage pages in the buffer
4518 pool, because PageConverter::operator() only evicted those
4519 pages that were replaced by the imported pages. We must
4520 detach any remaining adaptive hash index entries, because the
4521 adaptive hash index must be a subset of the table contents;
4522 false positives are not tolerated. */
4523 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4524 index = UT_LIST_GET_NEXT(indexes, index)) {
4525 index = index->clone_if_needed();
4526 }
4527 #endif /* BTR_CUR_HASH_ADAPT */
4528
4529 if (err != DB_SUCCESS) {
4530 char table_name[MAX_FULL_NAME_LEN + 1];
4531
4532 innobase_format_name(
4533 table_name, sizeof(table_name),
4534 table->name.m_name);
4535
4536 if (err != DB_DECRYPTION_FAILED) {
4537
4538 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4539 ER_INTERNAL_ERROR,
4540 "Cannot reset LSNs in table %s : %s",
4541 table_name, ut_strerr(err));
4542 }
4543
4544 return(row_import_cleanup(prebuilt, trx, err));
4545 }
4546
4547 row_mysql_lock_data_dictionary(trx);
4548
4549 /* If the table is stored in a remote tablespace, we need to
4550 determine that filepath from the link file and system tables.
4551 Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4552 dict_get_and_save_data_dir_path(table, true);
4553
4554 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4555 ut_a(table->data_dir_path);
4556
4557 filepath = fil_make_filepath(
4558 table->data_dir_path, table->name.m_name, IBD, true);
4559 } else {
4560 filepath = fil_make_filepath(
4561 NULL, table->name.m_name, IBD, false);
4562 }
4563
4564 DBUG_EXECUTE_IF(
4565 "ib_import_OOM_15",
4566 ut_free(filepath);
4567 filepath = NULL;
4568 );
4569
4570 if (filepath == NULL) {
4571 row_mysql_unlock_data_dictionary(trx);
4572 return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4573 }
4574
4575 /* Open the tablespace so that we can access via the buffer pool.
4576 We set the 2nd param (fix_dict = true) here because we already
4577 have an x-lock on dict_sys.latch and dict_sys.mutex.
4578 The tablespace is initially opened as a temporary one, because
4579 we will not be writing any redo log for it before we have invoked
4580 fil_space_t::set_imported() to declare it a persistent tablespace. */
4581
4582 ulint fsp_flags = dict_tf_to_fsp_flags(table->flags);
4583
4584 table->space = fil_ibd_open(
4585 true, true, FIL_TYPE_IMPORT, table->space_id,
4586 fsp_flags, table->name, filepath, &err);
4587
4588 ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4589 DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4590 err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4591
4592 if (!table->space) {
4593 row_mysql_unlock_data_dictionary(trx);
4594
4595 ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4596 ER_GET_ERRMSG,
4597 err, ut_strerr(err), filepath);
4598
4599 ut_free(filepath);
4600
4601 return(row_import_cleanup(prebuilt, trx, err));
4602 }
4603
4604 row_mysql_unlock_data_dictionary(trx);
4605
4606 ut_free(filepath);
4607
4608 err = ibuf_check_bitmap_on_import(trx, table->space);
4609
4610 DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4611
4612 if (err != DB_SUCCESS) {
4613 return(row_import_cleanup(prebuilt, trx, err));
4614 }
4615
4616 /* The first index must always be the clustered index. */
4617
4618 dict_index_t* index = dict_table_get_first_index(table);
4619
4620 if (!dict_index_is_clust(index)) {
4621 return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4622 }
4623
4624 /* Update the Btree segment headers for index node and
4625 leaf nodes in the root page. Set the new space id. */
4626
4627 err = btr_root_adjust_on_import(index);
4628
4629 DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4630 err = DB_CORRUPTION;);
4631
4632 if (err != DB_SUCCESS) {
4633 return(row_import_error(prebuilt, trx, err));
4634 } else if (cfg.requires_purge(index->name)) {
4635
4636 /* Purge any delete-marked records that couldn't be
4637 purged during the page conversion phase from the
4638 cluster index. */
4639
4640 IndexPurge purge(trx, index);
4641
4642 trx->op_info = "cluster: purging delete marked records";
4643
4644 err = purge.garbage_collect();
4645
4646 trx->op_info = "";
4647 }
4648
4649 DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4650
4651 if (err != DB_SUCCESS) {
4652 return(row_import_error(prebuilt, trx, err));
4653 }
4654
4655 /* For secondary indexes, purge any records that couldn't be purged
4656 during the page conversion phase. */
4657
4658 err = row_import_adjust_root_pages_of_secondary_indexes(
4659 trx, table, cfg);
4660
4661 DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4662 err = DB_CORRUPTION;);
4663
4664 if (err != DB_SUCCESS) {
4665 return(row_import_error(prebuilt, trx, err));
4666 }
4667
4668 /* Ensure that the next available DB_ROW_ID is not smaller than
4669 any DB_ROW_ID stored in the table. */
4670
4671 if (prebuilt->clust_index_was_generated) {
4672 row_import_set_sys_max_row_id(prebuilt, table);
4673 }
4674
4675 ib::info() << "Phase III - Flush changes to disk";
4676
4677 /* Ensure that all pages dirtied during the IMPORT make it to disk.
4678 The only dirty pages generated should be from the pessimistic purge
4679 of delete marked records that couldn't be purged in Phase I. */
4680 while (buf_flush_list_space(prebuilt->table->space));
4681
4682 for (ulint count = 0; prebuilt->table->space->referenced(); count++) {
4683 /* Issue a warning every 10.24 seconds, starting after
4684 2.56 seconds */
4685 if ((count & 511) == 128) {
4686 ib::warn() << "Waiting for flush to complete on "
4687 << prebuilt->table->name;
4688 }
4689 os_thread_sleep(20000);
4690 }
4691
4692 ib::info() << "Phase IV - Flush complete";
4693 prebuilt->table->space->set_imported();
4694
4695 /* The dictionary latches will be released in in row_import_cleanup()
4696 after the transaction commit, for both success and error. */
4697
4698 row_mysql_lock_data_dictionary(trx);
4699
4700 /* Update the root pages of the table's indexes. */
4701 err = row_import_update_index_root(trx, table, false);
4702
4703 if (err != DB_SUCCESS) {
4704 return(row_import_error(prebuilt, trx, err));
4705 }
4706
4707 err = row_import_update_discarded_flag(trx, table->id, false);
4708
4709 if (err != DB_SUCCESS) {
4710 return(row_import_error(prebuilt, trx, err));
4711 }
4712
4713 table->file_unreadable = false;
4714 table->flags2 &= ~DICT_TF2_DISCARDED & ((1U << DICT_TF2_BITS) - 1);
4715
4716 /* Set autoinc value read from .cfg file, if one was specified.
4717 Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4718 if (autoinc) {
4719 ib::info() << table->name << " autoinc value set to "
4720 << autoinc;
4721
4722 table->autoinc = autoinc--;
4723 btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4724 }
4725
4726 return(row_import_cleanup(prebuilt, trx, err));
4727 }
4728