1 /*****************************************************************************
2
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #include "row0row.h"
44 #ifdef HAVE_LZO
45 #include "lzo/lzo1x.h"
46 #endif
47 #ifdef HAVE_SNAPPY
48 #include "snappy-c.h"
49 #endif
50
51 #include "scope.h"
52
53 #include <vector>
54
55 #ifdef HAVE_MY_AES_H
56 #include <my_aes.h>
57 #endif
58
59 using st_::span;
60
61 /** The size of the buffer to use for IO.
62 @param n physical page size
63 @return number of pages */
64 #define IO_BUFFER_SIZE(n) ((1024 * 1024) / (n))
65
66 /** For gathering stats on records during phase I */
67 struct row_stats_t {
68 ulint m_n_deleted; /*!< Number of deleted records
69 found in the index */
70
71 ulint m_n_purged; /*!< Number of records purged
72 optimisatically */
73
74 ulint m_n_rows; /*!< Number of rows */
75
76 ulint m_n_purge_failed; /*!< Number of deleted rows
77 that could not be purged */
78 };
79
80 /** Index information required by IMPORT. */
81 struct row_index_t {
82 index_id_t m_id; /*!< Index id of the table
83 in the exporting server */
84 byte* m_name; /*!< Index name */
85
86 ulint m_space; /*!< Space where it is placed */
87
88 ulint m_page_no; /*!< Root page number */
89
90 ulint m_type; /*!< Index type */
91
92 ulint m_trx_id_offset; /*!< Relevant only for clustered
93 indexes, offset of transaction
94 id system column */
95
96 ulint m_n_user_defined_cols; /*!< User defined columns */
97
98 ulint m_n_uniq; /*!< Number of columns that can
99 uniquely identify the row */
100
101 ulint m_n_nullable; /*!< Number of nullable
102 columns */
103
104 ulint m_n_fields; /*!< Total number of fields */
105
106 dict_field_t* m_fields; /*!< Index fields */
107
108 const dict_index_t*
109 m_srv_index; /*!< Index instance in the
110 importing server */
111
112 row_stats_t m_stats; /*!< Statistics gathered during
113 the import phase */
114
115 };
116
117 /** Meta data required by IMPORT. */
118 struct row_import {
row_importrow_import119 row_import() UNIV_NOTHROW
120 :
121 m_table(NULL),
122 m_version(0),
123 m_hostname(NULL),
124 m_table_name(NULL),
125 m_autoinc(0),
126 m_zip_size(0),
127 m_flags(0),
128 m_n_cols(0),
129 m_cols(NULL),
130 m_col_names(NULL),
131 m_n_indexes(0),
132 m_indexes(NULL),
133 m_missing(true) { }
134
135 ~row_import() UNIV_NOTHROW;
136
137 /** Find the index entry in in the indexes array.
138 @param name index name
139 @return instance if found else 0. */
140 row_index_t* get_index(const char* name) const UNIV_NOTHROW;
141
142 /** Get the number of rows in the index.
143 @param name index name
144 @return number of rows (doesn't include delete marked rows). */
145 ulint get_n_rows(const char* name) const UNIV_NOTHROW;
146
147 /** Find the ordinal value of the column name in the cfg table columns.
148 @param name of column to look for.
149 @return ULINT_UNDEFINED if not found. */
150 ulint find_col(const char* name) const UNIV_NOTHROW;
151
152 /** Get the number of rows for which purge failed during the
153 convert phase.
154 @param name index name
155 @return number of rows for which purge failed. */
156 ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
157
158 /** Check if the index is clean. ie. no delete-marked records
159 @param name index name
160 @return true if index needs to be purged. */
requires_purgerow_import161 bool requires_purge(const char* name) const UNIV_NOTHROW
162 {
163 return(get_n_purge_failed(name) > 0);
164 }
165
166 /** Set the index root <space, pageno> using the index name */
167 void set_root_by_name() UNIV_NOTHROW;
168
169 /** Set the index root <space, pageno> using a heuristic
170 @return DB_SUCCESS or error code */
171 dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172
173 /** Check if the index schema that was read from the .cfg file
174 matches the in memory index definition.
175 Note: It will update row_import_t::m_srv_index to map the meta-data
176 read from the .cfg file to the server index instance.
177 @return DB_SUCCESS or error code. */
178 dberr_t match_index_columns(
179 THD* thd,
180 const dict_index_t* index) UNIV_NOTHROW;
181
182 /** Check if the table schema that was read from the .cfg file
183 matches the in memory table definition.
184 @param thd MySQL session variable
185 @return DB_SUCCESS or error code. */
186 dberr_t match_table_columns(
187 THD* thd) UNIV_NOTHROW;
188
189 /** Check if the table (and index) schema that was read from the
190 .cfg file matches the in memory table definition.
191 @param thd MySQL session variable
192 @return DB_SUCCESS or error code. */
193 dberr_t match_schema(
194 THD* thd) UNIV_NOTHROW;
195
196 dberr_t match_flags(THD *thd) const ;
197
198
199 dict_table_t* m_table; /*!< Table instance */
200
201 ulint m_version; /*!< Version of config file */
202
203 byte* m_hostname; /*!< Hostname where the
204 tablespace was exported */
205 byte* m_table_name; /*!< Exporting instance table
206 name */
207
208 ib_uint64_t m_autoinc; /*!< Next autoinc value */
209
210 ulint m_zip_size; /*!< ROW_FORMAT=COMPRESSED
211 page size, or 0 */
212
213 ulint m_flags; /*!< Table flags */
214
215 ulint m_n_cols; /*!< Number of columns in the
216 meta-data file */
217
218 dict_col_t* m_cols; /*!< Column data */
219
220 byte** m_col_names; /*!< Column names, we store the
221 column naems separately becuase
222 there is no field to store the
223 value in dict_col_t */
224
225 ulint m_n_indexes; /*!< Number of indexes,
226 including clustered index */
227
228 row_index_t* m_indexes; /*!< Index meta data */
229
230 bool m_missing; /*!< true if a .cfg file was
231 found and was readable */
232 };
233
234 struct fil_iterator_t {
235 pfs_os_file_t file; /*!< File handle */
236 const char* filepath; /*!< File path name */
237 os_offset_t start; /*!< From where to start */
238 os_offset_t end; /*!< Where to stop */
239 os_offset_t file_size; /*!< File size in bytes */
240 ulint n_io_buffers; /*!< Number of pages to use
241 for IO */
242 byte* io_buffer; /*!< Buffer to use for IO */
243 fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
244 byte* crypt_io_buffer; /*!< IO buffer when encrypted */
245 };
246
247 /** Use the page cursor to iterate over records in a block. */
248 class RecIterator {
249 public:
250 /** Default constructor */
RecIterator()251 RecIterator() UNIV_NOTHROW
252 {
253 memset(&m_cur, 0x0, sizeof(m_cur));
254 }
255
256 /** Position the cursor on the first user record. */
open(buf_block_t * block)257 void open(buf_block_t* block) UNIV_NOTHROW
258 {
259 page_cur_set_before_first(block, &m_cur);
260
261 if (!end()) {
262 next();
263 }
264 }
265
266 /** Move to the next record. */
next()267 void next() UNIV_NOTHROW
268 {
269 page_cur_move_to_next(&m_cur);
270 }
271
272 /**
273 @return the current record */
current()274 rec_t* current() UNIV_NOTHROW
275 {
276 ut_ad(!end());
277 return(page_cur_get_rec(&m_cur));
278 }
279
280 /**
281 @return true if cursor is at the end */
end()282 bool end() UNIV_NOTHROW
283 {
284 return(page_cur_is_after_last(&m_cur) == TRUE);
285 }
286
287 /** Remove the current record
288 @return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,rec_offs * offsets)289 bool remove(
290 const dict_index_t* index,
291 page_zip_des_t* page_zip,
292 rec_offs* offsets) UNIV_NOTHROW
293 {
294 /* We can't end up with an empty page unless it is root. */
295 if (page_get_n_recs(m_cur.block->frame) <= 1) {
296 return(false);
297 }
298
299 return(page_delete_rec(index, &m_cur, page_zip, offsets));
300 }
301
302 private:
303 page_cur_t m_cur;
304 };
305
306 /** Class that purges delete marked reocords from indexes, both secondary
307 and cluster. It does a pessimistic delete. This should only be done if we
308 couldn't purge the delete marked reocrds during Phase I. */
309 class IndexPurge {
310 public:
311 /** Constructor
312 @param trx the user transaction covering the import tablespace
313 @param index to be imported
314 @param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)315 IndexPurge(
316 trx_t* trx,
317 dict_index_t* index) UNIV_NOTHROW
318 :
319 m_trx(trx),
320 m_index(index),
321 m_n_rows(0)
322 {
323 ib::info() << "Phase II - Purge records from index "
324 << index->name;
325 }
326
327 /** Descructor */
~IndexPurge()328 ~IndexPurge() UNIV_NOTHROW { }
329
330 /** Purge delete marked records.
331 @return DB_SUCCESS or error code. */
332 dberr_t garbage_collect() UNIV_NOTHROW;
333
334 /** The number of records that are not delete marked.
335 @return total records in the index after purge */
get_n_rows() const336 ulint get_n_rows() const UNIV_NOTHROW
337 {
338 return(m_n_rows);
339 }
340
341 private:
342 /** Begin import, position the cursor on the first record. */
343 void open() UNIV_NOTHROW;
344
345 /** Close the persistent curosr and commit the mini-transaction. */
346 void close() UNIV_NOTHROW;
347
348 /** Position the cursor on the next record.
349 @return DB_SUCCESS or error code */
350 dberr_t next() UNIV_NOTHROW;
351
352 /** Store the persistent cursor position and reopen the
353 B-tree cursor in BTR_MODIFY_TREE mode, because the
354 tree structure may be changed during a pessimistic delete. */
355 void purge_pessimistic_delete() UNIV_NOTHROW;
356
357 /** Purge delete-marked records.
358 @param offsets current row offsets. */
359 void purge() UNIV_NOTHROW;
360
361 protected:
362 // Disable copying
363 IndexPurge();
364 IndexPurge(const IndexPurge&);
365 IndexPurge &operator=(const IndexPurge&);
366
367 private:
368 trx_t* m_trx; /*!< User transaction */
369 mtr_t m_mtr; /*!< Mini-transaction */
370 btr_pcur_t m_pcur; /*!< Persistent cursor */
371 dict_index_t* m_index; /*!< Index to be processed */
372 ulint m_n_rows; /*!< Records in index */
373 };
374
375 /** Functor that is called for each physical page that is read from the
376 tablespace file. */
377 class AbstractCallback
378 {
379 public:
380 /** Constructor
381 @param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)382 AbstractCallback(trx_t* trx, ulint space_id)
383 :
384 m_zip_size(0),
385 m_trx(trx),
386 m_space(space_id),
387 m_xdes(),
388 m_xdes_page_no(ULINT_UNDEFINED),
389 m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
390
391 /** Free any extent descriptor instance */
~AbstractCallback()392 virtual ~AbstractCallback()
393 {
394 UT_DELETE_ARRAY(m_xdes);
395 }
396
397 /** Determine the page size to use for traversing the tablespace
398 @param file_size size of the tablespace file in bytes
399 @param block contents of the first page in the tablespace file.
400 @retval DB_SUCCESS or error code. */
401 virtual dberr_t init(
402 os_offset_t file_size,
403 const buf_block_t* block) UNIV_NOTHROW;
404
405 /** @return true if compressed table. */
is_compressed_table() const406 bool is_compressed_table() const UNIV_NOTHROW
407 {
408 return get_zip_size();
409 }
410
411 /** @return the tablespace flags */
get_space_flags() const412 ulint get_space_flags() const
413 {
414 return(m_space_flags);
415 }
416
417 /**
418 Set the name of the physical file and the file handle that is used
419 to open it for the file that is being iterated over.
420 @param filename the physical name of the tablespace file
421 @param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)422 void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
423 {
424 m_file = file;
425 m_filepath = filename;
426 }
427
get_zip_size() const428 ulint get_zip_size() const { return m_zip_size; }
physical_size() const429 ulint physical_size() const
430 {
431 return m_zip_size ? m_zip_size : srv_page_size;
432 }
433
filename() const434 const char* filename() const { return m_filepath; }
435
436 /**
437 Called for every page in the tablespace. If the page was not
438 updated then its state must be set to BUF_PAGE_NOT_USED. For
439 compressed tables the page descriptor memory will be at offset:
440 block->frame + srv_page_size;
441 @param block block read from file, note it is not from the buffer pool
442 @retval DB_SUCCESS or error code. */
443 virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
444
445 /** @return the tablespace identifier */
get_space_id() const446 ulint get_space_id() const { return m_space; }
447
is_interrupted() const448 bool is_interrupted() const { return trx_is_interrupted(m_trx); }
449
450 /**
451 Get the data page depending on the table type, compressed or not.
452 @param block - block read from disk
453 @retval the buffer frame */
get_frame(const buf_block_t * block)454 static byte* get_frame(const buf_block_t* block)
455 {
456 return block->page.zip.data
457 ? block->page.zip.data : block->frame;
458 }
459
460 /** Invoke the functionality for the callback */
461 virtual dberr_t run(const fil_iterator_t& iter,
462 buf_block_t* block) UNIV_NOTHROW = 0;
463
464 protected:
465 /** Get the physical offset of the extent descriptor within the page.
466 @param page_no page number of the extent descriptor
467 @param page contents of the page containing the extent descriptor.
468 @return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const469 const xdes_t* xdes(
470 ulint page_no,
471 const page_t* page) const UNIV_NOTHROW
472 {
473 ulint offset;
474
475 offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
476
477 return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
478 }
479
480 /** Set the current page directory (xdes). If the extent descriptor is
481 marked as free then free the current extent descriptor and set it to
482 0. This implies that all pages that are covered by this extent
483 descriptor are also freed.
484
485 @param page_no offset of page within the file
486 @param page page contents
487 @return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)488 dberr_t set_current_xdes(
489 ulint page_no,
490 const page_t* page) UNIV_NOTHROW
491 {
492 m_xdes_page_no = page_no;
493
494 UT_DELETE_ARRAY(m_xdes);
495 m_xdes = NULL;
496
497 if (mach_read_from_4(XDES_ARR_OFFSET + XDES_STATE + page)
498 != XDES_FREE) {
499 const ulint physical_size = m_zip_size
500 ? m_zip_size : srv_page_size;
501
502 m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t, physical_size);
503
504 /* Trigger OOM */
505 DBUG_EXECUTE_IF(
506 "ib_import_OOM_13",
507 UT_DELETE_ARRAY(m_xdes);
508 m_xdes = NULL;
509 );
510
511 if (m_xdes == NULL) {
512 return(DB_OUT_OF_MEMORY);
513 }
514
515 memcpy(m_xdes, page, physical_size);
516 }
517
518 return(DB_SUCCESS);
519 }
520
521 /** Check if the page is marked as free in the extent descriptor.
522 @param page_no page number to check in the extent descriptor.
523 @return true if the page is marked as free */
is_free(ulint page_no) const524 bool is_free(ulint page_no) const UNIV_NOTHROW
525 {
526 ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
527 == m_xdes_page_no);
528
529 if (m_xdes != 0) {
530 const xdes_t* xdesc = xdes(page_no, m_xdes);
531 ulint pos = page_no % FSP_EXTENT_SIZE;
532
533 return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
534 }
535
536 /* If the current xdes was free, the page must be free. */
537 return(true);
538 }
539
540 protected:
541 /** The ROW_FORMAT=COMPRESSED page size, or 0. */
542 ulint m_zip_size;
543
544 /** File handle to the tablespace */
545 pfs_os_file_t m_file;
546
547 /** Physical file path. */
548 const char* m_filepath;
549
550 /** Covering transaction. */
551 trx_t* m_trx;
552
553 /** Space id of the file being iterated over. */
554 ulint m_space;
555
556 /** Current size of the space in pages */
557 ulint m_size;
558
559 /** Current extent descriptor page */
560 xdes_t* m_xdes;
561
562 /** Physical page offset in the file of the extent descriptor */
563 ulint m_xdes_page_no;
564
565 /** Flags value read from the header page */
566 ulint m_space_flags;
567 };
568
569 /** Determine the page size to use for traversing the tablespace
570 @param file_size size of the tablespace file in bytes
571 @param block contents of the first page in the tablespace file.
572 @retval DB_SUCCESS or error code. */
573 dberr_t
init(os_offset_t file_size,const buf_block_t * block)574 AbstractCallback::init(
575 os_offset_t file_size,
576 const buf_block_t* block) UNIV_NOTHROW
577 {
578 const page_t* page = block->frame;
579
580 m_space_flags = fsp_header_get_flags(page);
581 if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
582 ulint cflags = fsp_flags_convert_from_101(m_space_flags);
583 if (cflags == ULINT_UNDEFINED) {
584 return(DB_CORRUPTION);
585 }
586 m_space_flags = cflags;
587 }
588
589 /* Clear the DATA_DIR flag, which is basically garbage. */
590 m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
591 m_zip_size = fil_space_t::zip_size(m_space_flags);
592 const ulint logical_size = fil_space_t::logical_size(m_space_flags);
593 const ulint physical_size = fil_space_t::physical_size(m_space_flags);
594
595 if (logical_size != srv_page_size) {
596
597 ib::error() << "Page size " << logical_size
598 << " of ibd file is not the same as the server page"
599 " size " << srv_page_size;
600
601 return(DB_CORRUPTION);
602
603 } else if (file_size & (physical_size - 1)) {
604
605 ib::error() << "File size " << file_size << " is not a"
606 " multiple of the page size "
607 << physical_size;
608
609 return(DB_CORRUPTION);
610 }
611
612 m_size = mach_read_from_4(page + FSP_SIZE);
613 if (m_space == ULINT_UNDEFINED) {
614 m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
615 + page);
616 }
617
618 return set_current_xdes(0, page);
619 }
620
621 /**
622 TODO: This can be made parallel trivially by chunking up the file
623 and creating a callback per thread.. Main benefit will be to use
624 multiple CPUs for checksums and compressed tables. We have to do
625 compressed tables block by block right now. Secondly we need to
626 decompress/compress and copy too much of data. These are
627 CPU intensive.
628
629 Iterate over all the pages in the tablespace.
630 @param iter - Tablespace iterator
631 @param block - block to use for IO
632 @param callback - Callback to inspect and update page contents
633 @retval DB_SUCCESS or error code */
634 static dberr_t fil_iterate(
635 const fil_iterator_t& iter,
636 buf_block_t* block,
637 AbstractCallback& callback);
638
639 /**
640 Try and determine the index root pages by checking if the next/prev
641 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
642 struct FetchIndexRootPages : public AbstractCallback {
643
644 /** Index information gathered from the .ibd file. */
645 struct Index {
646
IndexFetchIndexRootPages::Index647 Index(index_id_t id, ulint page_no)
648 :
649 m_id(id),
650 m_page_no(page_no) { }
651
652 index_id_t m_id; /*!< Index id */
653 ulint m_page_no; /*!< Root page number */
654 };
655
656 /** Constructor
657 @param trx covering (user) transaction
658 @param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages659 FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
660 :
661 AbstractCallback(trx, ULINT_UNDEFINED),
662 m_table(table), m_index(0, 0) UNIV_NOTHROW { }
663
664 /** Destructor */
~FetchIndexRootPagesFetchIndexRootPages665 ~FetchIndexRootPages() UNIV_NOTHROW override { }
666
667 /** Fetch the clustered index root page in the tablespace
668 @param iter Tablespace iterator
669 @param block Block to use for IO
670 @retval DB_SUCCESS or error code */
671 dberr_t run(const fil_iterator_t& iter,
672 buf_block_t* block) UNIV_NOTHROW override;
673
674 /** Called for each block as it is read from the file.
675 @param block block to convert, it is not from the buffer pool.
676 @retval DB_SUCCESS or error code. */
677 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
678
679 /** Update the import configuration that will be used to import
680 the tablespace. */
681 dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
682
683 /** Table definition in server. */
684 const dict_table_t* m_table;
685
686 /** Index information */
687 Index m_index;
688 };
689
690 /** Called for each block as it is read from the file. Check index pages to
691 determine the exact row format. We can't get that from the tablespace
692 header flags alone.
693
694 @param block block to convert, it is not from the buffer pool.
695 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)696 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
697 {
698 if (is_interrupted()) return DB_INTERRUPTED;
699
700 const page_t* page = get_frame(block);
701
702 m_index.m_id = btr_page_get_index_id(page);
703 m_index.m_page_no = block->page.id.page_no();
704
705 /* Check that the tablespace flags match the table flags. */
706 ulint expected = dict_tf_to_fsp_flags(m_table->flags);
707 if (!fsp_flags_match(expected, m_space_flags)) {
708 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
709 ER_TABLE_SCHEMA_MISMATCH,
710 "Expected FSP_SPACE_FLAGS=0x%x, .ibd "
711 "file contains 0x%x.",
712 unsigned(expected),
713 unsigned(m_space_flags));
714 return(DB_CORRUPTION);
715 }
716
717 if (!page_is_comp(block->frame) !=
718 !dict_table_is_comp(m_table)) {
719 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
720 ER_TABLE_SCHEMA_MISMATCH,
721 "ROW_FORMAT mismatch");
722 return DB_CORRUPTION;
723 }
724
725 return DB_SUCCESS;
726 }
727
728 /**
729 Update the import configuration that will be used to import the tablespace.
730 @return error code or DB_SUCCESS */
731 dberr_t
build_row_import(row_import * cfg) const732 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
733 {
734 ut_a(cfg->m_table == m_table);
735 cfg->m_zip_size = m_zip_size;
736 cfg->m_n_indexes = 1;
737
738 if (cfg->m_n_indexes == 0) {
739
740 ib::error() << "No B+Tree found in tablespace";
741
742 return(DB_CORRUPTION);
743 }
744
745 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
746
747 /* Trigger OOM */
748 DBUG_EXECUTE_IF(
749 "ib_import_OOM_11",
750 UT_DELETE_ARRAY(cfg->m_indexes);
751 cfg->m_indexes = NULL;
752 );
753
754 if (cfg->m_indexes == NULL) {
755 return(DB_OUT_OF_MEMORY);
756 }
757
758 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
759
760 row_index_t* cfg_index = cfg->m_indexes;
761
762 char name[BUFSIZ];
763
764 snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
765
766 ulint len = strlen(name) + 1;
767
768 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
769
770 /* Trigger OOM */
771 DBUG_EXECUTE_IF(
772 "ib_import_OOM_12",
773 UT_DELETE_ARRAY(cfg_index->m_name);
774 cfg_index->m_name = NULL;
775 );
776
777 if (cfg_index->m_name == NULL) {
778 return(DB_OUT_OF_MEMORY);
779 }
780
781 memcpy(cfg_index->m_name, name, len);
782
783 cfg_index->m_id = m_index.m_id;
784
785 cfg_index->m_space = m_space;
786
787 cfg_index->m_page_no = m_index.m_page_no;
788
789 return(DB_SUCCESS);
790 }
791
792 /* Functor that is called for each physical page that is read from the
793 tablespace file.
794
795 1. Check each page for corruption.
796
797 2. Update the space id and LSN on every page
798 * For the header page
799 - Validate the flags
800 - Update the LSN
801
802 3. On Btree pages
803 * Set the index id
804 * Update the max trx id
805 * In a cluster index, update the system columns
806 * In a cluster index, update the BLOB ptr, set the space id
807 * Purge delete marked records, but only if they can be easily
808 removed from the page
809 * Keep a counter of number of rows, ie. non-delete-marked rows
810 * Keep a counter of number of delete marked rows
811 * Keep a counter of number of purge failure
812 * If a page is stamped with an index id that isn't in the .cfg file
813 we assume it is deleted and the page can be ignored.
814
815 4. Set the page state to dirty so that it will be written to disk.
816 */
817 class PageConverter : public AbstractCallback {
818 public:
819 /** Constructor
820 @param cfg config of table being imported.
821 @param space_id tablespace identifier
822 @param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)823 PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
824 :
825 AbstractCallback(trx, space_id),
826 m_cfg(cfg),
827 m_index(cfg->m_indexes),
828 m_current_lsn(log_get_lsn()),
829 m_page_zip_ptr(0),
830 m_rec_iter(),
831 m_offsets_(), m_offsets(m_offsets_),
832 m_heap(0),
833 m_cluster_index(dict_table_get_first_index(cfg->m_table))
834 {
835 ut_ad(m_current_lsn);
836 rec_offs_init(m_offsets_);
837 }
838
~PageConverter()839 ~PageConverter() UNIV_NOTHROW override
840 {
841 if (m_heap != 0) {
842 mem_heap_free(m_heap);
843 }
844 }
845
run(const fil_iterator_t & iter,buf_block_t * block)846 dberr_t run(const fil_iterator_t& iter,
847 buf_block_t* block) UNIV_NOTHROW override
848 {
849 return fil_iterate(iter, block, *this);
850 }
851
852 /** Called for each block as it is read from the file.
853 @param block block to convert, it is not from the buffer pool.
854 @retval DB_SUCCESS or error code. */
855 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
856
857 private:
858 /** Update the page, set the space id, max trx id and index id.
859 @param block block read from file
860 @param page_type type of the page
861 @retval DB_SUCCESS or error code */
862 dberr_t update_page(
863 buf_block_t* block,
864 ulint& page_type) UNIV_NOTHROW;
865
866 /** Update the space, index id, trx id.
867 @param block block to convert
868 @return DB_SUCCESS or error code */
869 dberr_t update_index_page(buf_block_t* block) UNIV_NOTHROW;
870
871 /** Update the BLOB refrences and write UNDO log entries for
872 rows that can't be purged optimistically.
873 @param block block to update
874 @retval DB_SUCCESS or error code */
875 dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
876
877 /** Validate the space flags and update tablespace header page.
878 @param block block read from file, not from the buffer pool.
879 @retval DB_SUCCESS or error code */
880 dberr_t update_header(buf_block_t* block) UNIV_NOTHROW;
881
882 /** Adjust the BLOB reference for a single column that is externally stored
883 @param rec record to update
884 @param offsets column offsets for the record
885 @param i column ordinal value
886 @return DB_SUCCESS or error code */
887 dberr_t adjust_cluster_index_blob_column(
888 rec_t* rec,
889 const rec_offs* offsets,
890 ulint i) UNIV_NOTHROW;
891
892 /** Adjusts the BLOB reference in the clustered index row for all
893 externally stored columns.
894 @param rec record to update
895 @param offsets column offsets for the record
896 @return DB_SUCCESS or error code */
897 dberr_t adjust_cluster_index_blob_columns(
898 rec_t* rec,
899 const rec_offs* offsets) UNIV_NOTHROW;
900
901 /** In the clustered index, adjist the BLOB pointers as needed.
902 Also update the BLOB reference, write the new space id.
903 @param rec record to update
904 @param offsets column offsets for the record
905 @return DB_SUCCESS or error code */
906 dberr_t adjust_cluster_index_blob_ref(
907 rec_t* rec,
908 const rec_offs* offsets) UNIV_NOTHROW;
909
910 /** Purge delete-marked records, only if it is possible to do
911 so without re-organising the B+tree.
912 @retval true if purged */
913 bool purge() UNIV_NOTHROW;
914
915 /** Adjust the BLOB references and sys fields for the current record.
916 @param rec record to update
917 @param offsets column offsets for the record
918 @return DB_SUCCESS or error code. */
919 dberr_t adjust_cluster_record(
920 rec_t* rec,
921 const rec_offs* offsets) UNIV_NOTHROW;
922
923 /** Find an index with the matching id.
924 @return row_index_t* instance or 0 */
find_index(index_id_t id)925 row_index_t* find_index(index_id_t id) UNIV_NOTHROW
926 {
927 row_index_t* index = &m_cfg->m_indexes[0];
928
929 for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
930 if (id == index->m_id) {
931 return(index);
932 }
933 }
934
935 return(0);
936
937 }
938 private:
939 /** Config for table that is being imported. */
940 row_import* m_cfg;
941
942 /** Current index whose pages are being imported */
943 row_index_t* m_index;
944
945 /** Current system LSN */
946 lsn_t m_current_lsn;
947
948 /** Alias for m_page_zip, only set for compressed pages. */
949 page_zip_des_t* m_page_zip_ptr;
950
951 /** Iterator over records in a block */
952 RecIterator m_rec_iter;
953
954 /** Record offset */
955 rec_offs m_offsets_[REC_OFFS_NORMAL_SIZE];
956
957 /** Pointer to m_offsets_ */
958 rec_offs* m_offsets;
959
960 /** Memory heap for the record offsets */
961 mem_heap_t* m_heap;
962
963 /** Cluster index instance */
964 dict_index_t* m_cluster_index;
965 };
966
967 /**
968 row_import destructor. */
~row_import()969 row_import::~row_import() UNIV_NOTHROW
970 {
971 for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
972 UT_DELETE_ARRAY(m_indexes[i].m_name);
973
974 if (m_indexes[i].m_fields == NULL) {
975 continue;
976 }
977
978 dict_field_t* fields = m_indexes[i].m_fields;
979 ulint n_fields = m_indexes[i].m_n_fields;
980
981 for (ulint j = 0; j < n_fields; ++j) {
982 UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
983 }
984
985 UT_DELETE_ARRAY(fields);
986 }
987
988 for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
989 UT_DELETE_ARRAY(m_col_names[i]);
990 }
991
992 UT_DELETE_ARRAY(m_cols);
993 UT_DELETE_ARRAY(m_indexes);
994 UT_DELETE_ARRAY(m_col_names);
995 UT_DELETE_ARRAY(m_table_name);
996 UT_DELETE_ARRAY(m_hostname);
997 }
998
999 /** Find the index entry in in the indexes array.
1000 @param name index name
1001 @return instance if found else 0. */
1002 row_index_t*
get_index(const char * name) const1003 row_import::get_index(
1004 const char* name) const UNIV_NOTHROW
1005 {
1006 for (ulint i = 0; i < m_n_indexes; ++i) {
1007 const char* index_name;
1008 row_index_t* index = &m_indexes[i];
1009
1010 index_name = reinterpret_cast<const char*>(index->m_name);
1011
1012 if (strcmp(index_name, name) == 0) {
1013
1014 return(index);
1015 }
1016 }
1017
1018 return(0);
1019 }
1020
1021 /** Get the number of rows in the index.
1022 @param name index name
1023 @return number of rows (doesn't include delete marked rows). */
1024 ulint
get_n_rows(const char * name) const1025 row_import::get_n_rows(
1026 const char* name) const UNIV_NOTHROW
1027 {
1028 const row_index_t* index = get_index(name);
1029
1030 ut_a(name != 0);
1031
1032 return(index->m_stats.m_n_rows);
1033 }
1034
1035 /** Get the number of rows for which purge failed uding the convert phase.
1036 @param name index name
1037 @return number of rows for which purge failed. */
1038 ulint
get_n_purge_failed(const char * name) const1039 row_import::get_n_purge_failed(
1040 const char* name) const UNIV_NOTHROW
1041 {
1042 const row_index_t* index = get_index(name);
1043
1044 ut_a(name != 0);
1045
1046 return(index->m_stats.m_n_purge_failed);
1047 }
1048
1049 /** Find the ordinal value of the column name in the cfg table columns.
1050 @param name of column to look for.
1051 @return ULINT_UNDEFINED if not found. */
1052 ulint
find_col(const char * name) const1053 row_import::find_col(
1054 const char* name) const UNIV_NOTHROW
1055 {
1056 for (ulint i = 0; i < m_n_cols; ++i) {
1057 const char* col_name;
1058
1059 col_name = reinterpret_cast<const char*>(m_col_names[i]);
1060
1061 if (strcmp(col_name, name) == 0) {
1062 return(i);
1063 }
1064 }
1065
1066 return(ULINT_UNDEFINED);
1067 }
1068
1069 /**
1070 Check if the index schema that was read from the .cfg file matches the
1071 in memory index definition.
1072 @return DB_SUCCESS or error code. */
1073 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1074 row_import::match_index_columns(
1075 THD* thd,
1076 const dict_index_t* index) UNIV_NOTHROW
1077 {
1078 row_index_t* cfg_index;
1079 dberr_t err = DB_SUCCESS;
1080
1081 cfg_index = get_index(index->name);
1082
1083 if (cfg_index == 0) {
1084 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1085 ER_TABLE_SCHEMA_MISMATCH,
1086 "Index %s not found in tablespace meta-data file.",
1087 index->name());
1088
1089 return(DB_ERROR);
1090 }
1091
1092 if (cfg_index->m_n_fields != index->n_fields) {
1093
1094 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1095 ER_TABLE_SCHEMA_MISMATCH,
1096 "Index field count %u doesn't match"
1097 " tablespace metadata file value " ULINTPF,
1098 index->n_fields, cfg_index->m_n_fields);
1099
1100 return(DB_ERROR);
1101 }
1102
1103 cfg_index->m_srv_index = index;
1104
1105 const dict_field_t* field = index->fields;
1106 const dict_field_t* cfg_field = cfg_index->m_fields;
1107
1108 for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1109
1110 if (field->name() && cfg_field->name()
1111 && strcmp(field->name(), cfg_field->name()) != 0) {
1112 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1113 ER_TABLE_SCHEMA_MISMATCH,
1114 "Index field name %s doesn't match"
1115 " tablespace metadata field name %s"
1116 " for field position " ULINTPF,
1117 field->name(), cfg_field->name(), i);
1118
1119 err = DB_ERROR;
1120 }
1121
1122 if (cfg_field->prefix_len != field->prefix_len) {
1123 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1124 ER_TABLE_SCHEMA_MISMATCH,
1125 "Index %s field %s prefix len %u"
1126 " doesn't match metadata file value %u",
1127 index->name(), field->name(),
1128 field->prefix_len, cfg_field->prefix_len);
1129
1130 err = DB_ERROR;
1131 }
1132
1133 if (cfg_field->fixed_len != field->fixed_len) {
1134 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1135 ER_TABLE_SCHEMA_MISMATCH,
1136 "Index %s field %s fixed len %u"
1137 " doesn't match metadata file value %u",
1138 index->name(), field->name(),
1139 field->fixed_len,
1140 cfg_field->fixed_len);
1141
1142 err = DB_ERROR;
1143 }
1144 }
1145
1146 return(err);
1147 }
1148
1149 /** Check if the table schema that was read from the .cfg file matches the
1150 in memory table definition.
1151 @param thd MySQL session variable
1152 @return DB_SUCCESS or error code. */
1153 dberr_t
match_table_columns(THD * thd)1154 row_import::match_table_columns(
1155 THD* thd) UNIV_NOTHROW
1156 {
1157 dberr_t err = DB_SUCCESS;
1158 const dict_col_t* col = m_table->cols;
1159
1160 for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1161
1162 const char* col_name;
1163 ulint cfg_col_index;
1164
1165 col_name = dict_table_get_col_name(
1166 m_table, dict_col_get_no(col));
1167
1168 cfg_col_index = find_col(col_name);
1169
1170 if (cfg_col_index == ULINT_UNDEFINED) {
1171
1172 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1173 ER_TABLE_SCHEMA_MISMATCH,
1174 "Column %s not found in tablespace.",
1175 col_name);
1176
1177 err = DB_ERROR;
1178 } else if (cfg_col_index != col->ind) {
1179
1180 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1181 ER_TABLE_SCHEMA_MISMATCH,
1182 "Column %s ordinal value mismatch, it's at %u"
1183 " in the table and " ULINTPF
1184 " in the tablespace meta-data file",
1185 col_name, col->ind, cfg_col_index);
1186
1187 err = DB_ERROR;
1188 } else {
1189 const dict_col_t* cfg_col;
1190
1191 cfg_col = &m_cols[cfg_col_index];
1192 ut_a(cfg_col->ind == cfg_col_index);
1193
1194 if (cfg_col->prtype != col->prtype) {
1195 ib_errf(thd,
1196 IB_LOG_LEVEL_ERROR,
1197 ER_TABLE_SCHEMA_MISMATCH,
1198 "Column %s precise type mismatch,"
1199 " it's 0X%X in the table and 0X%X"
1200 " in the tablespace meta file",
1201 col_name, col->prtype, cfg_col->prtype);
1202 err = DB_ERROR;
1203 }
1204
1205 if (cfg_col->mtype != col->mtype) {
1206 ib_errf(thd,
1207 IB_LOG_LEVEL_ERROR,
1208 ER_TABLE_SCHEMA_MISMATCH,
1209 "Column %s main type mismatch,"
1210 " it's 0X%X in the table and 0X%X"
1211 " in the tablespace meta file",
1212 col_name, col->mtype, cfg_col->mtype);
1213 err = DB_ERROR;
1214 }
1215
1216 if (cfg_col->len != col->len) {
1217 ib_errf(thd,
1218 IB_LOG_LEVEL_ERROR,
1219 ER_TABLE_SCHEMA_MISMATCH,
1220 "Column %s length mismatch,"
1221 " it's %u in the table and %u"
1222 " in the tablespace meta file",
1223 col_name, col->len, cfg_col->len);
1224 err = DB_ERROR;
1225 }
1226
1227 if (cfg_col->mbminlen != col->mbminlen
1228 || cfg_col->mbmaxlen != col->mbmaxlen) {
1229 ib_errf(thd,
1230 IB_LOG_LEVEL_ERROR,
1231 ER_TABLE_SCHEMA_MISMATCH,
1232 "Column %s multi-byte len mismatch,"
1233 " it's %u-%u in the table and %u-%u"
1234 " in the tablespace meta file",
1235 col_name, col->mbminlen, col->mbmaxlen,
1236 cfg_col->mbminlen, cfg_col->mbmaxlen);
1237 err = DB_ERROR;
1238 }
1239
1240 if (cfg_col->ind != col->ind) {
1241 ib_errf(thd,
1242 IB_LOG_LEVEL_ERROR,
1243 ER_TABLE_SCHEMA_MISMATCH,
1244 "Column %s position mismatch,"
1245 " it's %u in the table and %u"
1246 " in the tablespace meta file",
1247 col_name, col->ind, cfg_col->ind);
1248 err = DB_ERROR;
1249 }
1250
1251 if (cfg_col->ord_part != col->ord_part) {
1252 ib_errf(thd,
1253 IB_LOG_LEVEL_ERROR,
1254 ER_TABLE_SCHEMA_MISMATCH,
1255 "Column %s ordering mismatch,"
1256 " it's %u in the table and %u"
1257 " in the tablespace meta file",
1258 col_name, col->ord_part,
1259 cfg_col->ord_part);
1260 err = DB_ERROR;
1261 }
1262
1263 if (cfg_col->max_prefix != col->max_prefix) {
1264 ib_errf(thd,
1265 IB_LOG_LEVEL_ERROR,
1266 ER_TABLE_SCHEMA_MISMATCH,
1267 "Column %s max prefix mismatch"
1268 " it's %u in the table and %u"
1269 " in the tablespace meta file",
1270 col_name, col->max_prefix,
1271 cfg_col->max_prefix);
1272 err = DB_ERROR;
1273 }
1274 }
1275 }
1276
1277 return(err);
1278 }
1279
match_flags(THD * thd) const1280 dberr_t row_import::match_flags(THD *thd) const
1281 {
1282 ulint mismatch= (m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR;
1283 if (!mismatch)
1284 return DB_SUCCESS;
1285
1286 const char *msg;
1287 if (mismatch & DICT_TF_MASK_ZIP_SSIZE)
1288 {
1289 if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) &&
1290 (m_flags & DICT_TF_MASK_ZIP_SSIZE))
1291 {
1292 switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1293 case 0U << DICT_TF_POS_ZIP_SSIZE:
1294 goto uncompressed;
1295 case 1U << DICT_TF_POS_ZIP_SSIZE:
1296 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=1";
1297 break;
1298 case 2U << DICT_TF_POS_ZIP_SSIZE:
1299 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=2";
1300 break;
1301 case 3U << DICT_TF_POS_ZIP_SSIZE:
1302 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=4";
1303 break;
1304 case 4U << DICT_TF_POS_ZIP_SSIZE:
1305 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8";
1306 break;
1307 case 5U << DICT_TF_POS_ZIP_SSIZE:
1308 msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=16";
1309 break;
1310 default:
1311 msg= "strange KEY_BLOCK_SIZE";
1312 }
1313 }
1314 else if (m_flags & DICT_TF_MASK_ZIP_SSIZE)
1315 msg= "ROW_FORMAT=COMPRESSED";
1316 else
1317 goto uncompressed;
1318 }
1319 else
1320 {
1321 uncompressed:
1322 msg= (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) ? "ROW_FORMAT=DYNAMIC"
1323 : (m_flags & DICT_TF_MASK_COMPACT) ? "ROW_FORMAT=COMPACT"
1324 : "ROW_FORMAT=REDUNDANT";
1325 }
1326
1327 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1328 "Table flags don't match, server table has 0x%x and the meta-data "
1329 "file has 0x%zx; .cfg file uses %s",
1330 m_table->flags, m_flags, msg);
1331
1332 return DB_ERROR;
1333 }
1334
1335 /** Check if the table (and index) schema that was read from the .cfg file
1336 matches the in memory table definition.
1337 @param thd MySQL session variable
1338 @return DB_SUCCESS or error code. */
1339 dberr_t
match_schema(THD * thd)1340 row_import::match_schema(
1341 THD* thd) UNIV_NOTHROW
1342 {
1343 /* Do some simple checks. */
1344
1345 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1346
1347 /* If the number of indexes don't match then it is better
1348 to abort the IMPORT. It is easy for the user to create a
1349 table matching the IMPORT definition. */
1350
1351 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1352 "Number of indexes don't match, table has " ULINTPF
1353 " indexes but the tablespace meta-data file has "
1354 ULINTPF " indexes",
1355 UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1356
1357 return(DB_ERROR);
1358 }
1359
1360 dberr_t err = match_table_columns(thd);
1361
1362 if (err != DB_SUCCESS) {
1363 return(err);
1364 }
1365
1366 /* Check if the index definitions match. */
1367
1368 const dict_index_t* index;
1369
1370 for (index = UT_LIST_GET_FIRST(m_table->indexes);
1371 index != 0;
1372 index = UT_LIST_GET_NEXT(indexes, index)) {
1373
1374 dberr_t index_err;
1375
1376 index_err = match_index_columns(thd, index);
1377
1378 if (index_err != DB_SUCCESS) {
1379 err = index_err;
1380 }
1381 }
1382
1383 return(err);
1384 }
1385
1386 /**
1387 Set the index root <space, pageno>, using index name. */
1388 void
set_root_by_name()1389 row_import::set_root_by_name() UNIV_NOTHROW
1390 {
1391 row_index_t* cfg_index = m_indexes;
1392
1393 for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1394 dict_index_t* index;
1395
1396 const char* index_name;
1397
1398 index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1399
1400 index = dict_table_get_index_on_name(m_table, index_name);
1401
1402 /* We've already checked that it exists. */
1403 ut_a(index != 0);
1404
1405 index->page = cfg_index->m_page_no;
1406 }
1407 }
1408
1409 /**
1410 Set the index root <space, pageno>, using a heuristic.
1411 @return DB_SUCCESS or error code */
1412 dberr_t
set_root_by_heuristic()1413 row_import::set_root_by_heuristic() UNIV_NOTHROW
1414 {
1415 row_index_t* cfg_index = m_indexes;
1416
1417 ut_a(m_n_indexes > 0);
1418
1419 // TODO: For now use brute force, based on ordinality
1420
1421 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1422
1423 ib::warn() << "Table " << m_table->name << " should have "
1424 << UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1425 " the tablespace has " << m_n_indexes << " indexes";
1426 }
1427
1428 dict_mutex_enter_for_mysql();
1429
1430 ulint i = 0;
1431 dberr_t err = DB_SUCCESS;
1432
1433 for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1434 index != 0;
1435 index = UT_LIST_GET_NEXT(indexes, index)) {
1436
1437 if (index->type & DICT_FTS) {
1438 index->type |= DICT_CORRUPT;
1439 ib::warn() << "Skipping FTS index: " << index->name;
1440 } else if (i < m_n_indexes) {
1441
1442 UT_DELETE_ARRAY(cfg_index[i].m_name);
1443
1444 ulint len = strlen(index->name) + 1;
1445
1446 cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1447
1448 /* Trigger OOM */
1449 DBUG_EXECUTE_IF(
1450 "ib_import_OOM_14",
1451 UT_DELETE_ARRAY(cfg_index[i].m_name);
1452 cfg_index[i].m_name = NULL;
1453 );
1454
1455 if (cfg_index[i].m_name == NULL) {
1456 err = DB_OUT_OF_MEMORY;
1457 break;
1458 }
1459
1460 memcpy(cfg_index[i].m_name, index->name, len);
1461
1462 cfg_index[i].m_srv_index = index;
1463
1464 index->page = cfg_index[i].m_page_no;
1465
1466 ++i;
1467 }
1468 }
1469
1470 dict_mutex_exit_for_mysql();
1471
1472 return(err);
1473 }
1474
1475 /**
1476 Purge delete marked records.
1477 @return DB_SUCCESS or error code. */
1478 dberr_t
garbage_collect()1479 IndexPurge::garbage_collect() UNIV_NOTHROW
1480 {
1481 dberr_t err;
1482 ibool comp = dict_table_is_comp(m_index->table);
1483
1484 /* Open the persistent cursor and start the mini-transaction. */
1485
1486 open();
1487
1488 while ((err = next()) == DB_SUCCESS) {
1489
1490 rec_t* rec = btr_pcur_get_rec(&m_pcur);
1491 ibool deleted = rec_get_deleted_flag(rec, comp);
1492
1493 if (!deleted) {
1494 ++m_n_rows;
1495 } else {
1496 purge();
1497 }
1498 }
1499
1500 /* Close the persistent cursor and commit the mini-transaction. */
1501
1502 close();
1503
1504 return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1505 }
1506
1507 /**
1508 Begin import, position the cursor on the first record. */
1509 void
open()1510 IndexPurge::open() UNIV_NOTHROW
1511 {
1512 mtr_start(&m_mtr);
1513
1514 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1515
1516 btr_pcur_open_at_index_side(
1517 true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1518 btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1519 if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) {
1520 ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1521 /* Skip the metadata pseudo-record. */
1522 } else {
1523 btr_pcur_move_to_prev_on_page(&m_pcur);
1524 }
1525 }
1526
1527 /**
1528 Close the persistent curosr and commit the mini-transaction. */
1529 void
close()1530 IndexPurge::close() UNIV_NOTHROW
1531 {
1532 btr_pcur_close(&m_pcur);
1533 mtr_commit(&m_mtr);
1534 }
1535
1536 /**
1537 Position the cursor on the next record.
1538 @return DB_SUCCESS or error code */
1539 dberr_t
next()1540 IndexPurge::next() UNIV_NOTHROW
1541 {
1542 btr_pcur_move_to_next_on_page(&m_pcur);
1543
1544 /* When switching pages, commit the mini-transaction
1545 in order to release the latch on the old page. */
1546
1547 if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1548 return(DB_SUCCESS);
1549 } else if (trx_is_interrupted(m_trx)) {
1550 /* Check after every page because the check
1551 is expensive. */
1552 return(DB_INTERRUPTED);
1553 }
1554
1555 btr_pcur_store_position(&m_pcur, &m_mtr);
1556
1557 mtr_commit(&m_mtr);
1558
1559 mtr_start(&m_mtr);
1560
1561 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1562
1563 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1564 /* The following is based on btr_pcur_move_to_next_user_rec(). */
1565 m_pcur.old_stored = false;
1566 ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1567 do {
1568 if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1569 if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1570 return DB_END_OF_INDEX;
1571 }
1572
1573 buf_block_t* block = btr_pcur_get_block(&m_pcur);
1574 uint32_t next_page = btr_page_get_next(block->frame);
1575
1576 /* MDEV-13542 FIXME: Make these checks part of
1577 btr_pcur_move_to_next_page(), and introduce a
1578 return status that will be checked in all callers! */
1579 switch (next_page) {
1580 default:
1581 if (next_page != block->page.id.page_no()) {
1582 break;
1583 }
1584 /* MDEV-20931 FIXME: Check that
1585 next_page is within the tablespace
1586 bounds! Also check that it is not a
1587 change buffer bitmap page. */
1588 /* fall through */
1589 case 0:
1590 case 1:
1591 case FIL_NULL:
1592 return DB_CORRUPTION;
1593 }
1594
1595 dict_index_t* index = m_pcur.btr_cur.index;
1596 buf_block_t* next_block = btr_block_get(
1597 page_id_t(block->page.id.space(), next_page),
1598 block->zip_size(), BTR_MODIFY_LEAF, index,
1599 &m_mtr);
1600
1601 if (UNIV_UNLIKELY(!next_block
1602 || !fil_page_index_page_check(
1603 next_block->frame)
1604 || !!dict_index_is_spatial(index)
1605 != (fil_page_get_type(
1606 next_block->frame)
1607 == FIL_PAGE_RTREE)
1608 || page_is_comp(next_block->frame)
1609 != page_is_comp(block->frame)
1610 || btr_page_get_prev(
1611 next_block->frame)
1612 != block->page.id.page_no())) {
1613 return DB_CORRUPTION;
1614 }
1615
1616 btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1617
1618 page_cur_set_before_first(next_block,
1619 &m_pcur.btr_cur.page_cur);
1620
1621 ut_d(page_check_dir(next_block->frame));
1622 } else {
1623 btr_pcur_move_to_next_on_page(&m_pcur);
1624 }
1625 } while (!btr_pcur_is_on_user_rec(&m_pcur));
1626
1627 return DB_SUCCESS;
1628 }
1629
1630 /**
1631 Store the persistent cursor position and reopen the
1632 B-tree cursor in BTR_MODIFY_TREE mode, because the
1633 tree structure may be changed during a pessimistic delete. */
1634 void
purge_pessimistic_delete()1635 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1636 {
1637 dberr_t err;
1638
1639 btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1640 &m_pcur, &m_mtr);
1641
1642 ut_ad(rec_get_deleted_flag(
1643 btr_pcur_get_rec(&m_pcur),
1644 dict_table_is_comp(m_index->table)));
1645
1646 btr_cur_pessimistic_delete(
1647 &err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1648
1649 ut_a(err == DB_SUCCESS);
1650
1651 /* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1652 mtr_commit(&m_mtr);
1653 }
1654
1655 /**
1656 Purge delete-marked records. */
1657 void
purge()1658 IndexPurge::purge() UNIV_NOTHROW
1659 {
1660 btr_pcur_store_position(&m_pcur, &m_mtr);
1661
1662 purge_pessimistic_delete();
1663
1664 mtr_start(&m_mtr);
1665
1666 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1667
1668 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1669 }
1670
1671 /** Adjust the BLOB reference for a single column that is externally stored
1672 @param rec record to update
1673 @param offsets column offsets for the record
1674 @param i column ordinal value
1675 @return DB_SUCCESS or error code */
1676 inline
1677 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1678 PageConverter::adjust_cluster_index_blob_column(
1679 rec_t* rec,
1680 const rec_offs* offsets,
1681 ulint i) UNIV_NOTHROW
1682 {
1683 ulint len;
1684 byte* field;
1685
1686 field = rec_get_nth_field(rec, offsets, i, &len);
1687
1688 DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1689 len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1690
1691 if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1692
1693 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1694 ER_INNODB_INDEX_CORRUPT,
1695 "Externally stored column(" ULINTPF
1696 ") has a reference length of " ULINTPF
1697 " in the cluster index %s",
1698 i, len, m_cluster_index->name());
1699
1700 return(DB_CORRUPTION);
1701 }
1702
1703 field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1704
1705 mach_write_to_4(field, get_space_id());
1706
1707 if (m_page_zip_ptr) {
1708 page_zip_write_blob_ptr(
1709 m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1710 }
1711
1712 return(DB_SUCCESS);
1713 }
1714
1715 /** Adjusts the BLOB reference in the clustered index row for all externally
1716 stored columns.
1717 @param rec record to update
1718 @param offsets column offsets for the record
1719 @return DB_SUCCESS or error code */
1720 inline
1721 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1722 PageConverter::adjust_cluster_index_blob_columns(
1723 rec_t* rec,
1724 const rec_offs* offsets) UNIV_NOTHROW
1725 {
1726 ut_ad(rec_offs_any_extern(offsets));
1727
1728 /* Adjust the space_id in the BLOB pointers. */
1729
1730 for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1731
1732 /* Only if the column is stored "externally". */
1733
1734 if (rec_offs_nth_extern(offsets, i)) {
1735 dberr_t err;
1736
1737 err = adjust_cluster_index_blob_column(rec, offsets, i);
1738
1739 if (err != DB_SUCCESS) {
1740 return(err);
1741 }
1742 }
1743 }
1744
1745 return(DB_SUCCESS);
1746 }
1747
1748 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1749 BLOB reference, write the new space id.
1750 @param rec record to update
1751 @param offsets column offsets for the record
1752 @return DB_SUCCESS or error code */
1753 inline
1754 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1755 PageConverter::adjust_cluster_index_blob_ref(
1756 rec_t* rec,
1757 const rec_offs* offsets) UNIV_NOTHROW
1758 {
1759 if (rec_offs_any_extern(offsets)) {
1760 dberr_t err;
1761
1762 err = adjust_cluster_index_blob_columns(rec, offsets);
1763
1764 if (err != DB_SUCCESS) {
1765 return(err);
1766 }
1767 }
1768
1769 return(DB_SUCCESS);
1770 }
1771
1772 /** Purge delete-marked records, only if it is possible to do so without
1773 re-organising the B+tree.
1774 @return true if purge succeeded */
purge()1775 inline bool PageConverter::purge() UNIV_NOTHROW
1776 {
1777 const dict_index_t* index = m_index->m_srv_index;
1778
1779 /* We can't have a page that is empty and not root. */
1780 if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1781
1782 ++m_index->m_stats.m_n_purged;
1783
1784 return(true);
1785 } else {
1786 ++m_index->m_stats.m_n_purge_failed;
1787 }
1788
1789 return(false);
1790 }
1791
1792 /** Adjust the BLOB references and sys fields for the current record.
1793 @param rec record to update
1794 @param offsets column offsets for the record
1795 @return DB_SUCCESS or error code. */
1796 inline
1797 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1798 PageConverter::adjust_cluster_record(
1799 rec_t* rec,
1800 const rec_offs* offsets) UNIV_NOTHROW
1801 {
1802 dberr_t err;
1803
1804 if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1805
1806 /* Reset DB_TRX_ID and DB_ROLL_PTR. Normally, these fields
1807 are only written in conjunction with other changes to the
1808 record. */
1809 ulint trx_id_pos = m_cluster_index->n_uniq
1810 ? m_cluster_index->n_uniq : 1;
1811 if (m_page_zip_ptr) {
1812 page_zip_write_trx_id_and_roll_ptr(
1813 m_page_zip_ptr, rec, m_offsets, trx_id_pos,
1814 0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1815 NULL);
1816 } else {
1817 ulint len;
1818 byte* ptr = rec_get_nth_field(
1819 rec, m_offsets, trx_id_pos, &len);
1820 ut_ad(len == DATA_TRX_ID_LEN);
1821 memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1822 }
1823 }
1824
1825 return(err);
1826 }
1827
1828 /** Update the BLOB refrences and write UNDO log entries for
1829 rows that can't be purged optimistically.
1830 @param block block to update
1831 @retval DB_SUCCESS or error code */
1832 inline
1833 dberr_t
update_records(buf_block_t * block)1834 PageConverter::update_records(
1835 buf_block_t* block) UNIV_NOTHROW
1836 {
1837 ibool comp = dict_table_is_comp(m_cfg->m_table);
1838 bool clust_index = m_index->m_srv_index == m_cluster_index;
1839
1840 /* This will also position the cursor on the first user record. */
1841
1842 m_rec_iter.open(block);
1843
1844 while (!m_rec_iter.end()) {
1845 rec_t* rec = m_rec_iter.current();
1846 ibool deleted = rec_get_deleted_flag(rec, comp);
1847
1848 /* For the clustered index we have to adjust the BLOB
1849 reference and the system fields irrespective of the
1850 delete marked flag. The adjustment of delete marked
1851 cluster records is required for purge to work later. */
1852
1853 if (deleted || clust_index) {
1854 m_offsets = rec_get_offsets(
1855 rec, m_index->m_srv_index, m_offsets,
1856 m_index->m_srv_index->n_core_fields,
1857 ULINT_UNDEFINED, &m_heap);
1858 }
1859
1860 if (clust_index) {
1861
1862 dberr_t err = adjust_cluster_record(rec, m_offsets);
1863
1864 if (err != DB_SUCCESS) {
1865 return(err);
1866 }
1867 }
1868
1869 /* If it is a delete marked record then try an
1870 optimistic delete. */
1871
1872 if (deleted) {
1873 /* A successful purge will move the cursor to the
1874 next record. */
1875
1876 if (!purge()) {
1877 m_rec_iter.next();
1878 }
1879
1880 ++m_index->m_stats.m_n_deleted;
1881 } else {
1882 ++m_index->m_stats.m_n_rows;
1883 m_rec_iter.next();
1884 }
1885 }
1886
1887 return(DB_SUCCESS);
1888 }
1889
1890 /** Update the space, index id, trx id.
1891 @return DB_SUCCESS or error code */
1892 inline
1893 dberr_t
update_index_page(buf_block_t * block)1894 PageConverter::update_index_page(
1895 buf_block_t* block) UNIV_NOTHROW
1896 {
1897 index_id_t id;
1898 buf_frame_t* page = block->frame;
1899
1900 if (is_free(block->page.id.page_no())) {
1901 return(DB_SUCCESS);
1902 } else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1903 row_index_t* index = find_index(id);
1904
1905 if (UNIV_UNLIKELY(!index)) {
1906 if (m_cfg->m_missing) {
1907 return DB_SUCCESS;
1908 }
1909
1910 ib::error() << "Page for tablespace " << m_space
1911 << " is index page with id " << id
1912 << " but that index is not found from"
1913 << " configuration file. Current index name "
1914 << m_index->m_name << " and id " << m_index->m_id;
1915 m_index = 0;
1916 return(DB_CORRUPTION);
1917 }
1918
1919 /* Update current index */
1920 m_index = index;
1921 }
1922
1923 /* If the .cfg file is missing and there is an index mismatch
1924 then ignore the error. */
1925 if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1926 return(DB_SUCCESS);
1927 }
1928
1929 if (m_index && block->page.id.page_no() == m_index->m_page_no) {
1930 byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1931 + page;
1932 mach_write_to_4(b, block->page.id.space());
1933
1934 memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1935 + page, b, 4);
1936 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1937 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1938 + PAGE_BTR_SEG_TOP
1939 + FSEG_HDR_SPACE], b, 4);
1940 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1941 + PAGE_BTR_SEG_LEAF
1942 + FSEG_HDR_SPACE], b, 4);
1943 }
1944 }
1945
1946 #ifdef UNIV_ZIP_DEBUG
1947 ut_a(!is_compressed_table()
1948 || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1949 #endif /* UNIV_ZIP_DEBUG */
1950
1951 /* This has to be written to uncompressed index header. Set it to
1952 the current index id. */
1953 btr_page_set_index_id(
1954 page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1955
1956 if (dict_index_is_clust(m_index->m_srv_index)) {
1957 dict_index_t* index = const_cast<dict_index_t*>(
1958 m_index->m_srv_index);
1959 if (block->page.id.page_no() != index->page) {
1960 /* Clear PAGE_MAX_TRX_ID so that it can be
1961 used for other purposes in the future. IMPORT
1962 in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1963 would set the field to the transaction ID even
1964 on clustered index pages. */
1965 page_set_max_trx_id(block, m_page_zip_ptr, 0, NULL);
1966 }
1967 } else {
1968 /* Set PAGE_MAX_TRX_ID on secondary index leaf pages,
1969 and clear it on non-leaf pages. */
1970 page_set_max_trx_id(block, m_page_zip_ptr,
1971 page_is_leaf(page) ? m_trx->id : 0, NULL);
1972 }
1973
1974 if (page_is_empty(page)) {
1975
1976 /* Only a root page can be empty. */
1977 if (page_has_siblings(page)) {
1978 // TODO: We should relax this and skip secondary
1979 // indexes. Mark them as corrupt because they can
1980 // always be rebuilt.
1981 return(DB_CORRUPTION);
1982 }
1983
1984 return(DB_SUCCESS);
1985 }
1986
1987 return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
1988 }
1989
1990 /** Validate the space flags and update tablespace header page.
1991 @param block block read from file, not from the buffer pool.
1992 @retval DB_SUCCESS or error code */
1993 inline
1994 dberr_t
update_header(buf_block_t * block)1995 PageConverter::update_header(
1996 buf_block_t* block) UNIV_NOTHROW
1997 {
1998 /* Check for valid header */
1999 switch (fsp_header_get_space_id(get_frame(block))) {
2000 case 0:
2001 return(DB_CORRUPTION);
2002 case ULINT_UNDEFINED:
2003 ib::warn() << "Space id check in the header failed: ignored";
2004 }
2005
2006 mach_write_to_8(
2007 get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
2008 m_current_lsn);
2009
2010 /* Write back the adjusted flags. */
2011 mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS
2012 + get_frame(block), m_space_flags);
2013
2014 /* Write space_id to the tablespace header, page 0. */
2015 mach_write_to_4(
2016 get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2017 get_space_id());
2018
2019 /* This is on every page in the tablespace. */
2020 mach_write_to_4(
2021 get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2022 get_space_id());
2023
2024 return(DB_SUCCESS);
2025 }
2026
2027 /** Update the page, set the space id, max trx id and index id.
2028 @param block block read from file
2029 @retval DB_SUCCESS or error code */
2030 inline
2031 dberr_t
update_page(buf_block_t * block,ulint & page_type)2032 PageConverter::update_page(
2033 buf_block_t* block,
2034 ulint& page_type) UNIV_NOTHROW
2035 {
2036 dberr_t err = DB_SUCCESS;
2037
2038 ut_ad(!block->page.zip.data == !is_compressed_table());
2039
2040 if (block->page.zip.data) {
2041 m_page_zip_ptr = &block->page.zip;
2042 } else {
2043 ut_ad(!m_page_zip_ptr);
2044 }
2045
2046 switch (page_type = fil_page_get_type(get_frame(block))) {
2047 case FIL_PAGE_TYPE_FSP_HDR:
2048 ut_a(block->page.id.page_no() == 0);
2049 /* Work directly on the uncompressed page headers. */
2050 return(update_header(block));
2051
2052 case FIL_PAGE_INDEX:
2053 case FIL_PAGE_RTREE:
2054 /* We need to decompress the contents into block->frame
2055 before we can do any thing with Btree pages. */
2056
2057 if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2058 return(DB_CORRUPTION);
2059 }
2060
2061 /* fall through */
2062 case FIL_PAGE_TYPE_INSTANT:
2063 /* This is on every page in the tablespace. */
2064 mach_write_to_4(
2065 get_frame(block)
2066 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2067
2068 /* Only update the Btree nodes. */
2069 return(update_index_page(block));
2070
2071 case FIL_PAGE_TYPE_SYS:
2072 /* This is page 0 in the system tablespace. */
2073 return(DB_CORRUPTION);
2074
2075 case FIL_PAGE_TYPE_XDES:
2076 err = set_current_xdes(
2077 block->page.id.page_no(), get_frame(block));
2078 /* fall through */
2079 case FIL_PAGE_INODE:
2080 case FIL_PAGE_TYPE_TRX_SYS:
2081 case FIL_PAGE_IBUF_FREE_LIST:
2082 case FIL_PAGE_TYPE_ALLOCATED:
2083 case FIL_PAGE_IBUF_BITMAP:
2084 case FIL_PAGE_TYPE_BLOB:
2085 case FIL_PAGE_TYPE_ZBLOB:
2086 case FIL_PAGE_TYPE_ZBLOB2:
2087
2088 /* Work directly on the uncompressed page headers. */
2089 /* This is on every page in the tablespace. */
2090 mach_write_to_4(
2091 get_frame(block)
2092 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2093
2094 return(err);
2095 }
2096
2097 ib::warn() << "Unknown page type (" << page_type << ")";
2098
2099 return(DB_CORRUPTION);
2100 }
2101
2102 /** Called for every page in the tablespace. If the page was not
2103 updated then its state must be set to BUF_PAGE_NOT_USED.
2104 @param block block read from file, note it is not from the buffer pool
2105 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2106 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2107 {
2108 /* If we already had an old page with matching number
2109 in the buffer pool, evict it now, because
2110 we no longer evict the pages on DISCARD TABLESPACE. */
2111 buf_page_get_gen(block->page.id, get_zip_size(),
2112 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2113 __FILE__, __LINE__, NULL, NULL);
2114
2115 ulint page_type;
2116
2117 if (dberr_t err = update_page(block, page_type)) {
2118 return err;
2119 }
2120
2121 const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
2122
2123 if (!block->page.zip.data) {
2124 buf_flush_init_for_writing(
2125 NULL, block->frame, NULL, m_current_lsn, full_crc32);
2126 } else if (fil_page_type_is_index(page_type)) {
2127 buf_flush_init_for_writing(
2128 NULL, block->page.zip.data, &block->page.zip,
2129 m_current_lsn, full_crc32);
2130 } else {
2131 /* Calculate and update the checksum of non-index
2132 pages for ROW_FORMAT=COMPRESSED tables. */
2133 buf_flush_update_zip_checksum(
2134 block->page.zip.data, block->zip_size(),
2135 m_current_lsn);
2136 }
2137
2138 return DB_SUCCESS;
2139 }
2140
2141 /*****************************************************************//**
2142 Clean up after import tablespace failure, this function will acquire
2143 the dictionary latches on behalf of the transaction if the transaction
2144 hasn't already acquired them. */
2145 static MY_ATTRIBUTE((nonnull))
2146 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2147 row_import_discard_changes(
2148 /*=======================*/
2149 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2150 trx_t* trx, /*!< in/out: transaction for import */
2151 dberr_t err) /*!< in: error code */
2152 {
2153 dict_table_t* table = prebuilt->table;
2154
2155 ut_a(err != DB_SUCCESS);
2156
2157 prebuilt->trx->error_info = NULL;
2158
2159 ib::info() << "Discarding tablespace of table "
2160 << prebuilt->table->name
2161 << ": " << err;
2162
2163 if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2164 ut_a(trx->dict_operation_lock_mode == 0);
2165 row_mysql_lock_data_dictionary(trx);
2166 }
2167
2168 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2169
2170 /* Since we update the index root page numbers on disk after
2171 we've done a successful import. The table will not be loadable.
2172 However, we need to ensure that the in memory root page numbers
2173 are reset to "NULL". */
2174
2175 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2176 index != 0;
2177 index = UT_LIST_GET_NEXT(indexes, index)) {
2178
2179 index->page = FIL_NULL;
2180 }
2181
2182 table->file_unreadable = true;
2183 if (table->space) {
2184 fil_close_tablespace(trx, table->space_id);
2185 table->space = NULL;
2186 }
2187 }
2188
2189 /*****************************************************************//**
2190 Clean up after import tablespace. */
2191 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2192 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2193 row_import_cleanup(
2194 /*===============*/
2195 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2196 trx_t* trx, /*!< in/out: transaction for import */
2197 dberr_t err) /*!< in: error code */
2198 {
2199 ut_a(prebuilt->trx != trx);
2200
2201 if (err != DB_SUCCESS) {
2202 row_import_discard_changes(prebuilt, trx, err);
2203 }
2204
2205 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2206
2207 DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2208
2209 trx_commit_for_mysql(trx);
2210
2211 row_mysql_unlock_data_dictionary(trx);
2212
2213 trx->free();
2214
2215 prebuilt->trx->op_info = "";
2216
2217 DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2218
2219 log_make_checkpoint();
2220
2221 return(err);
2222 }
2223
2224 /*****************************************************************//**
2225 Report error during tablespace import. */
2226 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2227 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2228 row_import_error(
2229 /*=============*/
2230 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2231 trx_t* trx, /*!< in/out: transaction for import */
2232 dberr_t err) /*!< in: error code */
2233 {
2234 if (!trx_is_interrupted(trx)) {
2235 char table_name[MAX_FULL_NAME_LEN + 1];
2236
2237 innobase_format_name(
2238 table_name, sizeof(table_name),
2239 prebuilt->table->name.m_name);
2240
2241 ib_senderrf(
2242 trx->mysql_thd, IB_LOG_LEVEL_WARN,
2243 ER_INNODB_IMPORT_ERROR,
2244 table_name, (ulong) err, ut_strerr(err));
2245 }
2246
2247 return(row_import_cleanup(prebuilt, trx, err));
2248 }
2249
2250 /*****************************************************************//**
2251 Adjust the root page index node and leaf node segment headers, update
2252 with the new space id. For all the table's secondary indexes.
2253 @return error code */
2254 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2255 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2256 row_import_adjust_root_pages_of_secondary_indexes(
2257 /*==============================================*/
2258 trx_t* trx, /*!< in: transaction used for
2259 the import */
2260 dict_table_t* table, /*!< in: table the indexes
2261 belong to */
2262 const row_import& cfg) /*!< Import context */
2263 {
2264 dict_index_t* index;
2265 ulint n_rows_in_table;
2266 dberr_t err = DB_SUCCESS;
2267
2268 /* Skip the clustered index. */
2269 index = dict_table_get_first_index(table);
2270
2271 n_rows_in_table = cfg.get_n_rows(index->name);
2272
2273 DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2274 n_rows_in_table++;);
2275
2276 /* Adjust the root pages of the secondary indexes only. */
2277 while ((index = dict_table_get_next_index(index)) != NULL) {
2278 ut_a(!dict_index_is_clust(index));
2279
2280 if (!(index->type & DICT_CORRUPT)
2281 && index->page != FIL_NULL) {
2282
2283 /* Update the Btree segment headers for index node and
2284 leaf nodes in the root page. Set the new space id. */
2285
2286 err = btr_root_adjust_on_import(index);
2287 } else {
2288 ib::warn() << "Skip adjustment of root pages for"
2289 " index " << index->name << ".";
2290
2291 err = DB_CORRUPTION;
2292 }
2293
2294 if (err != DB_SUCCESS) {
2295
2296 if (index->type & DICT_CLUSTERED) {
2297 break;
2298 }
2299
2300 ib_errf(trx->mysql_thd,
2301 IB_LOG_LEVEL_WARN,
2302 ER_INNODB_INDEX_CORRUPT,
2303 "Index %s not found or corrupt,"
2304 " you should recreate this index.",
2305 index->name());
2306
2307 /* Do not bail out, so that the data
2308 can be recovered. */
2309
2310 err = DB_SUCCESS;
2311 index->type |= DICT_CORRUPT;
2312 continue;
2313 }
2314
2315 /* If we failed to purge any records in the index then
2316 do it the hard way.
2317
2318 TODO: We can do this in the first pass by generating UNDO log
2319 records for the failed rows. */
2320
2321 if (!cfg.requires_purge(index->name)) {
2322 continue;
2323 }
2324
2325 IndexPurge purge(trx, index);
2326
2327 trx->op_info = "secondary: purge delete marked records";
2328
2329 err = purge.garbage_collect();
2330
2331 trx->op_info = "";
2332
2333 if (err != DB_SUCCESS) {
2334 break;
2335 } else if (purge.get_n_rows() != n_rows_in_table) {
2336
2337 ib_errf(trx->mysql_thd,
2338 IB_LOG_LEVEL_WARN,
2339 ER_INNODB_INDEX_CORRUPT,
2340 "Index '%s' contains " ULINTPF " entries, "
2341 "should be " ULINTPF ", you should recreate "
2342 "this index.", index->name(),
2343 purge.get_n_rows(), n_rows_in_table);
2344
2345 index->type |= DICT_CORRUPT;
2346
2347 /* Do not bail out, so that the data
2348 can be recovered. */
2349
2350 err = DB_SUCCESS;
2351 }
2352 }
2353
2354 return(err);
2355 }
2356
2357 /*****************************************************************//**
2358 Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */
2359 MY_ATTRIBUTE((nonnull)) static
2360 void
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2361 row_import_set_sys_max_row_id(
2362 /*==========================*/
2363 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
2364 handler */
2365 const dict_table_t* table) /*!< in: table to import */
2366 {
2367 const rec_t* rec;
2368 mtr_t mtr;
2369 btr_pcur_t pcur;
2370 row_id_t row_id = 0;
2371 dict_index_t* index;
2372
2373 index = dict_table_get_first_index(table);
2374 ut_ad(index->is_primary());
2375 ut_ad(dict_index_is_auto_gen_clust(index));
2376
2377 mtr_start(&mtr);
2378
2379 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2380
2381 btr_pcur_open_at_index_side(
2382 false, // High end
2383 index,
2384 BTR_SEARCH_LEAF,
2385 &pcur,
2386 true, // Init cursor
2387 0, // Leaf level
2388 &mtr);
2389
2390 btr_pcur_move_to_prev_on_page(&pcur);
2391 rec = btr_pcur_get_rec(&pcur);
2392
2393 /* Check for empty table. */
2394 if (page_rec_is_infimum(rec)) {
2395 /* The table is empty. */
2396 } else if (rec_is_metadata(rec, *index)) {
2397 /* The clustered index contains the metadata record only,
2398 that is, the table is empty. */
2399 } else {
2400 row_id = mach_read_from_6(rec);
2401 }
2402
2403 btr_pcur_close(&pcur);
2404 mtr_commit(&mtr);
2405
2406 if (row_id) {
2407 /* Update the system row id if the imported index row id is
2408 greater than the max system row id. */
2409
2410 mutex_enter(&dict_sys.mutex);
2411
2412 if (row_id >= dict_sys.row_id) {
2413 dict_sys.row_id = row_id + 1;
2414 dict_hdr_flush_row_id();
2415 }
2416
2417 mutex_exit(&dict_sys.mutex);
2418 }
2419 }
2420
2421 /*****************************************************************//**
2422 Read the a string from the meta data file.
2423 @return DB_SUCCESS or error code. */
2424 static
2425 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2426 row_import_cfg_read_string(
2427 /*=======================*/
2428 FILE* file, /*!< in/out: File to read from */
2429 byte* ptr, /*!< out: string to read */
2430 ulint max_len) /*!< in: maximum length of the output
2431 buffer in bytes */
2432 {
2433 DBUG_EXECUTE_IF("ib_import_string_read_error",
2434 errno = EINVAL; return(DB_IO_ERROR););
2435
2436 ulint len = 0;
2437
2438 while (!feof(file)) {
2439 int ch = fgetc(file);
2440
2441 if (ch == EOF) {
2442 break;
2443 } else if (ch != 0) {
2444 if (len < max_len) {
2445 ptr[len++] = ch;
2446 } else {
2447 break;
2448 }
2449 /* max_len includes the NUL byte */
2450 } else if (len != max_len - 1) {
2451 break;
2452 } else {
2453 ptr[len] = 0;
2454 return(DB_SUCCESS);
2455 }
2456 }
2457
2458 errno = EINVAL;
2459
2460 return(DB_IO_ERROR);
2461 }
2462
2463 /*********************************************************************//**
2464 Write the meta data (index user fields) config file.
2465 @return DB_SUCCESS or error code. */
2466 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2467 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2468 row_import_cfg_read_index_fields(
2469 /*=============================*/
2470 FILE* file, /*!< in: file to write to */
2471 THD* thd, /*!< in/out: session */
2472 row_index_t* index) /*!< Index being read in */
2473 {
2474 byte row[sizeof(ib_uint32_t) * 3];
2475 ulint n_fields = index->m_n_fields;
2476
2477 index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2478
2479 /* Trigger OOM */
2480 DBUG_EXECUTE_IF(
2481 "ib_import_OOM_4",
2482 UT_DELETE_ARRAY(index->m_fields);
2483 index->m_fields = NULL;
2484 );
2485
2486 if (index->m_fields == NULL) {
2487 return(DB_OUT_OF_MEMORY);
2488 }
2489
2490 dict_field_t* field = index->m_fields;
2491
2492 for (ulint i = 0; i < n_fields; ++i, ++field) {
2493 byte* ptr = row;
2494
2495 /* Trigger EOF */
2496 DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2497 (void) fseek(file, 0L, SEEK_END););
2498
2499 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2500
2501 ib_senderrf(
2502 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2503 (ulong) errno, strerror(errno),
2504 "while reading index fields.");
2505
2506 return(DB_IO_ERROR);
2507 }
2508
2509 new (field) dict_field_t();
2510
2511 field->prefix_len = mach_read_from_4(ptr);
2512 ptr += sizeof(ib_uint32_t);
2513
2514 field->fixed_len = mach_read_from_4(ptr);
2515 ptr += sizeof(ib_uint32_t);
2516
2517 /* Include the NUL byte in the length. */
2518 ulint len = mach_read_from_4(ptr);
2519
2520 byte* name = UT_NEW_ARRAY_NOKEY(byte, len);
2521
2522 /* Trigger OOM */
2523 DBUG_EXECUTE_IF(
2524 "ib_import_OOM_5",
2525 UT_DELETE_ARRAY(name);
2526 name = NULL;
2527 );
2528
2529 if (name == NULL) {
2530 return(DB_OUT_OF_MEMORY);
2531 }
2532
2533 field->name = reinterpret_cast<const char*>(name);
2534
2535 dberr_t err = row_import_cfg_read_string(file, name, len);
2536
2537 if (err != DB_SUCCESS) {
2538
2539 ib_senderrf(
2540 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2541 (ulong) errno, strerror(errno),
2542 "while parsing table name.");
2543
2544 return(err);
2545 }
2546 }
2547
2548 return(DB_SUCCESS);
2549 }
2550
2551 /*****************************************************************//**
2552 Read the index names and root page numbers of the indexes and set the values.
2553 Row format [root_page_no, len of str, str ... ]
2554 @return DB_SUCCESS or error code. */
2555 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2556 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2557 row_import_read_index_data(
2558 /*=======================*/
2559 FILE* file, /*!< in: File to read from */
2560 THD* thd, /*!< in: session */
2561 row_import* cfg) /*!< in/out: meta-data read */
2562 {
2563 byte* ptr;
2564 row_index_t* cfg_index;
2565 byte row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2566
2567 /* FIXME: What is the max value? */
2568 ut_a(cfg->m_n_indexes > 0);
2569 ut_a(cfg->m_n_indexes < 1024);
2570
2571 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2572
2573 /* Trigger OOM */
2574 DBUG_EXECUTE_IF(
2575 "ib_import_OOM_6",
2576 UT_DELETE_ARRAY(cfg->m_indexes);
2577 cfg->m_indexes = NULL;
2578 );
2579
2580 if (cfg->m_indexes == NULL) {
2581 return(DB_OUT_OF_MEMORY);
2582 }
2583
2584 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2585
2586 cfg_index = cfg->m_indexes;
2587
2588 for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2589 /* Trigger EOF */
2590 DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2591 (void) fseek(file, 0L, SEEK_END););
2592
2593 /* Read the index data. */
2594 size_t n_bytes = fread(row, 1, sizeof(row), file);
2595
2596 /* Trigger EOF */
2597 DBUG_EXECUTE_IF("ib_import_io_read_error",
2598 (void) fseek(file, 0L, SEEK_END););
2599
2600 if (n_bytes != sizeof(row)) {
2601 char msg[BUFSIZ];
2602
2603 snprintf(msg, sizeof(msg),
2604 "while reading index meta-data, expected "
2605 "to read " ULINTPF
2606 " bytes but read only " ULINTPF " bytes",
2607 sizeof(row), n_bytes);
2608
2609 ib_senderrf(
2610 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2611 (ulong) errno, strerror(errno), msg);
2612
2613 ib::error() << "IO Error: " << msg;
2614
2615 return(DB_IO_ERROR);
2616 }
2617
2618 ptr = row;
2619
2620 cfg_index->m_id = mach_read_from_8(ptr);
2621 ptr += sizeof(index_id_t);
2622
2623 cfg_index->m_space = mach_read_from_4(ptr);
2624 ptr += sizeof(ib_uint32_t);
2625
2626 cfg_index->m_page_no = mach_read_from_4(ptr);
2627 ptr += sizeof(ib_uint32_t);
2628
2629 cfg_index->m_type = mach_read_from_4(ptr);
2630 ptr += sizeof(ib_uint32_t);
2631
2632 cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2633 if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2634 ut_ad(0);
2635 /* Overflow. Pretend that the clustered index
2636 has a variable-length PRIMARY KEY. */
2637 cfg_index->m_trx_id_offset = 0;
2638 }
2639 ptr += sizeof(ib_uint32_t);
2640
2641 cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2642 ptr += sizeof(ib_uint32_t);
2643
2644 cfg_index->m_n_uniq = mach_read_from_4(ptr);
2645 ptr += sizeof(ib_uint32_t);
2646
2647 cfg_index->m_n_nullable = mach_read_from_4(ptr);
2648 ptr += sizeof(ib_uint32_t);
2649
2650 cfg_index->m_n_fields = mach_read_from_4(ptr);
2651 ptr += sizeof(ib_uint32_t);
2652
2653 /* The NUL byte is included in the name length. */
2654 ulint len = mach_read_from_4(ptr);
2655
2656 if (len > OS_FILE_MAX_PATH) {
2657 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2658 ER_INNODB_INDEX_CORRUPT,
2659 "Index name length (" ULINTPF ") is too long, "
2660 "the meta-data is corrupt", len);
2661
2662 return(DB_CORRUPTION);
2663 }
2664
2665 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2666
2667 /* Trigger OOM */
2668 DBUG_EXECUTE_IF(
2669 "ib_import_OOM_7",
2670 UT_DELETE_ARRAY(cfg_index->m_name);
2671 cfg_index->m_name = NULL;
2672 );
2673
2674 if (cfg_index->m_name == NULL) {
2675 return(DB_OUT_OF_MEMORY);
2676 }
2677
2678 dberr_t err;
2679
2680 err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2681
2682 if (err != DB_SUCCESS) {
2683
2684 ib_senderrf(
2685 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2686 (ulong) errno, strerror(errno),
2687 "while parsing index name.");
2688
2689 return(err);
2690 }
2691
2692 err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2693
2694 if (err != DB_SUCCESS) {
2695 return(err);
2696 }
2697
2698 }
2699
2700 return(DB_SUCCESS);
2701 }
2702
2703 /*****************************************************************//**
2704 Set the index root page number for v1 format.
2705 @return DB_SUCCESS or error code. */
2706 static
2707 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2708 row_import_read_indexes(
2709 /*====================*/
2710 FILE* file, /*!< in: File to read from */
2711 THD* thd, /*!< in: session */
2712 row_import* cfg) /*!< in/out: meta-data read */
2713 {
2714 byte row[sizeof(ib_uint32_t)];
2715
2716 /* Trigger EOF */
2717 DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2718 (void) fseek(file, 0L, SEEK_END););
2719
2720 /* Read the number of indexes. */
2721 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2722 ib_senderrf(
2723 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2724 (ulong) errno, strerror(errno),
2725 "while reading number of indexes.");
2726
2727 return(DB_IO_ERROR);
2728 }
2729
2730 cfg->m_n_indexes = mach_read_from_4(row);
2731
2732 if (cfg->m_n_indexes == 0) {
2733 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2734 "Number of indexes in meta-data file is 0");
2735
2736 return(DB_CORRUPTION);
2737
2738 } else if (cfg->m_n_indexes > 1024) {
2739 // FIXME: What is the upper limit? */
2740 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2741 "Number of indexes in meta-data file is too high: "
2742 ULINTPF, cfg->m_n_indexes);
2743 cfg->m_n_indexes = 0;
2744
2745 return(DB_CORRUPTION);
2746 }
2747
2748 return(row_import_read_index_data(file, thd, cfg));
2749 }
2750
2751 /*********************************************************************//**
2752 Read the meta data (table columns) config file. Deserialise the contents of
2753 dict_col_t structure, along with the column name. */
2754 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2755 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2756 row_import_read_columns(
2757 /*====================*/
2758 FILE* file, /*!< in: file to write to */
2759 THD* thd, /*!< in/out: session */
2760 row_import* cfg) /*!< in/out: meta-data read */
2761 {
2762 dict_col_t* col;
2763 byte row[sizeof(ib_uint32_t) * 8];
2764
2765 /* FIXME: What should the upper limit be? */
2766 ut_a(cfg->m_n_cols > 0);
2767 ut_a(cfg->m_n_cols < 1024);
2768
2769 cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2770
2771 /* Trigger OOM */
2772 DBUG_EXECUTE_IF(
2773 "ib_import_OOM_8",
2774 UT_DELETE_ARRAY(cfg->m_cols);
2775 cfg->m_cols = NULL;
2776 );
2777
2778 if (cfg->m_cols == NULL) {
2779 return(DB_OUT_OF_MEMORY);
2780 }
2781
2782 cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2783
2784 /* Trigger OOM */
2785 DBUG_EXECUTE_IF(
2786 "ib_import_OOM_9",
2787 UT_DELETE_ARRAY(cfg->m_col_names);
2788 cfg->m_col_names = NULL;
2789 );
2790
2791 if (cfg->m_col_names == NULL) {
2792 return(DB_OUT_OF_MEMORY);
2793 }
2794
2795 memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2796 memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2797
2798 col = cfg->m_cols;
2799
2800 for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2801 byte* ptr = row;
2802
2803 /* Trigger EOF */
2804 DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2805 (void) fseek(file, 0L, SEEK_END););
2806
2807 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2808 ib_senderrf(
2809 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2810 (ulong) errno, strerror(errno),
2811 "while reading table column meta-data.");
2812
2813 return(DB_IO_ERROR);
2814 }
2815
2816 col->prtype = mach_read_from_4(ptr);
2817 ptr += sizeof(ib_uint32_t);
2818
2819 col->mtype = mach_read_from_4(ptr);
2820 ptr += sizeof(ib_uint32_t);
2821
2822 col->len = mach_read_from_4(ptr);
2823 ptr += sizeof(ib_uint32_t);
2824
2825 ulint mbminmaxlen = mach_read_from_4(ptr);
2826 col->mbmaxlen = mbminmaxlen / 5;
2827 col->mbminlen = mbminmaxlen % 5;
2828 ptr += sizeof(ib_uint32_t);
2829
2830 col->ind = mach_read_from_4(ptr);
2831 ptr += sizeof(ib_uint32_t);
2832
2833 col->ord_part = mach_read_from_4(ptr);
2834 ptr += sizeof(ib_uint32_t);
2835
2836 col->max_prefix = mach_read_from_4(ptr);
2837 ptr += sizeof(ib_uint32_t);
2838
2839 /* Read in the column name as [len, byte array]. The len
2840 includes the NUL byte. */
2841
2842 ulint len = mach_read_from_4(ptr);
2843
2844 /* FIXME: What is the maximum column name length? */
2845 if (len == 0 || len > 128) {
2846 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2847 ER_IO_READ_ERROR,
2848 "Column name length " ULINTPF ", is invalid",
2849 len);
2850
2851 return(DB_CORRUPTION);
2852 }
2853
2854 cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2855
2856 /* Trigger OOM */
2857 DBUG_EXECUTE_IF(
2858 "ib_import_OOM_10",
2859 UT_DELETE_ARRAY(cfg->m_col_names[i]);
2860 cfg->m_col_names[i] = NULL;
2861 );
2862
2863 if (cfg->m_col_names[i] == NULL) {
2864 return(DB_OUT_OF_MEMORY);
2865 }
2866
2867 dberr_t err;
2868
2869 err = row_import_cfg_read_string(
2870 file, cfg->m_col_names[i], len);
2871
2872 if (err != DB_SUCCESS) {
2873
2874 ib_senderrf(
2875 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2876 (ulong) errno, strerror(errno),
2877 "while parsing table column name.");
2878
2879 return(err);
2880 }
2881 }
2882
2883 return(DB_SUCCESS);
2884 }
2885
2886 /*****************************************************************//**
2887 Read the contents of the <tablespace>.cfg file.
2888 @return DB_SUCCESS or error code. */
2889 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2890 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2891 row_import_read_v1(
2892 /*===============*/
2893 FILE* file, /*!< in: File to read from */
2894 THD* thd, /*!< in: session */
2895 row_import* cfg) /*!< out: meta data */
2896 {
2897 byte value[sizeof(ib_uint32_t)];
2898
2899 /* Trigger EOF */
2900 DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2901 (void) fseek(file, 0L, SEEK_END););
2902
2903 /* Read the hostname where the tablespace was exported. */
2904 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2905 ib_senderrf(
2906 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2907 (ulong) errno, strerror(errno),
2908 "while reading meta-data export hostname length.");
2909
2910 return(DB_IO_ERROR);
2911 }
2912
2913 ulint len = mach_read_from_4(value);
2914
2915 /* NUL byte is part of name length. */
2916 cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2917
2918 /* Trigger OOM */
2919 DBUG_EXECUTE_IF(
2920 "ib_import_OOM_1",
2921 UT_DELETE_ARRAY(cfg->m_hostname);
2922 cfg->m_hostname = NULL;
2923 );
2924
2925 if (cfg->m_hostname == NULL) {
2926 return(DB_OUT_OF_MEMORY);
2927 }
2928
2929 dberr_t err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2930
2931 if (err != DB_SUCCESS) {
2932
2933 ib_senderrf(
2934 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2935 (ulong) errno, strerror(errno),
2936 "while parsing export hostname.");
2937
2938 return(err);
2939 }
2940
2941 /* Trigger EOF */
2942 DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2943 (void) fseek(file, 0L, SEEK_END););
2944
2945 /* Read the table name of tablespace that was exported. */
2946 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2947 ib_senderrf(
2948 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2949 (ulong) errno, strerror(errno),
2950 "while reading meta-data table name length.");
2951
2952 return(DB_IO_ERROR);
2953 }
2954
2955 len = mach_read_from_4(value);
2956
2957 /* NUL byte is part of name length. */
2958 cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2959
2960 /* Trigger OOM */
2961 DBUG_EXECUTE_IF(
2962 "ib_import_OOM_2",
2963 UT_DELETE_ARRAY(cfg->m_table_name);
2964 cfg->m_table_name = NULL;
2965 );
2966
2967 if (cfg->m_table_name == NULL) {
2968 return(DB_OUT_OF_MEMORY);
2969 }
2970
2971 err = row_import_cfg_read_string(file, cfg->m_table_name, len);
2972
2973 if (err != DB_SUCCESS) {
2974 ib_senderrf(
2975 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2976 (ulong) errno, strerror(errno),
2977 "while parsing table name.");
2978
2979 return(err);
2980 }
2981
2982 ib::info() << "Importing tablespace for table '" << cfg->m_table_name
2983 << "' that was exported from host '" << cfg->m_hostname << "'";
2984
2985 byte row[sizeof(ib_uint32_t) * 3];
2986
2987 /* Trigger EOF */
2988 DBUG_EXECUTE_IF("ib_import_io_read_error_7",
2989 (void) fseek(file, 0L, SEEK_END););
2990
2991 /* Read the autoinc value. */
2992 if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
2993 ib_senderrf(
2994 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2995 (ulong) errno, strerror(errno),
2996 "while reading autoinc value.");
2997
2998 return(DB_IO_ERROR);
2999 }
3000
3001 cfg->m_autoinc = mach_read_from_8(row);
3002
3003 /* Trigger EOF */
3004 DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3005 (void) fseek(file, 0L, SEEK_END););
3006
3007 /* Read the tablespace page size. */
3008 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3009 ib_senderrf(
3010 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3011 (ulong) errno, strerror(errno),
3012 "while reading meta-data header.");
3013
3014 return(DB_IO_ERROR);
3015 }
3016
3017 byte* ptr = row;
3018
3019 const ulint logical_page_size = mach_read_from_4(ptr);
3020 ptr += sizeof(ib_uint32_t);
3021
3022 if (logical_page_size != srv_page_size) {
3023
3024 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3025 "Tablespace to be imported has a different"
3026 " page size than this server. Server page size"
3027 " is %lu, whereas tablespace page size"
3028 " is " ULINTPF,
3029 srv_page_size,
3030 logical_page_size);
3031
3032 return(DB_ERROR);
3033 }
3034
3035 cfg->m_flags = mach_read_from_4(ptr);
3036 ptr += sizeof(ib_uint32_t);
3037
3038 cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags);
3039 cfg->m_n_cols = mach_read_from_4(ptr);
3040
3041 if (!dict_tf_is_valid(cfg->m_flags)) {
3042 ib_errf(thd, IB_LOG_LEVEL_ERROR,
3043 ER_TABLE_SCHEMA_MISMATCH,
3044 "Invalid table flags: " ULINTPF, cfg->m_flags);
3045
3046 return(DB_CORRUPTION);
3047 }
3048
3049 err = row_import_read_columns(file, thd, cfg);
3050
3051 if (err == DB_SUCCESS) {
3052 err = row_import_read_indexes(file, thd, cfg);
3053 }
3054
3055 return(err);
3056 }
3057
3058 /**
3059 Read the contents of the <tablespace>.cfg file.
3060 @return DB_SUCCESS or error code. */
3061 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3062 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3063 row_import_read_meta_data(
3064 /*======================*/
3065 FILE* file, /*!< in: File to read from */
3066 THD* thd, /*!< in: session */
3067 row_import& cfg) /*!< out: contents of the .cfg file */
3068 {
3069 byte row[sizeof(ib_uint32_t)];
3070
3071 /* Trigger EOF */
3072 DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3073 (void) fseek(file, 0L, SEEK_END););
3074
3075 if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3076 ib_senderrf(
3077 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3078 (ulong) errno, strerror(errno),
3079 "while reading meta-data version.");
3080
3081 return(DB_IO_ERROR);
3082 }
3083
3084 cfg.m_version = mach_read_from_4(row);
3085
3086 /* Check the version number. */
3087 switch (cfg.m_version) {
3088 case IB_EXPORT_CFG_VERSION_V1:
3089
3090 return(row_import_read_v1(file, thd, &cfg));
3091 default:
3092 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3093 "Unsupported meta-data version number (" ULINTPF "), "
3094 "file ignored", cfg.m_version);
3095 }
3096
3097 return(DB_ERROR);
3098 }
3099
3100 #define BTR_BLOB_HDR_PART_LEN 0 /*!< BLOB part len on this page */
3101 #define BTR_BLOB_HDR_NEXT_PAGE_NO 4 /*!< next BLOB part page no,
3102 FIL_NULL if none */
3103 #define BTR_BLOB_HDR_SIZE 8 /*!< Size of a BLOB part header, in bytes */
3104
3105 /* decrypt and decompress page if needed */
decrypt_decompress(fil_space_crypt_t * space_crypt,size_t space_flags,span<byte> page,size_t space_id,byte * page_compress_buf)3106 static dberr_t decrypt_decompress(fil_space_crypt_t *space_crypt,
3107 size_t space_flags, span<byte> page,
3108 size_t space_id, byte *page_compress_buf)
3109 {
3110 auto *data= page.data();
3111
3112 if (space_crypt && space_crypt->should_encrypt())
3113 {
3114 if (!buf_page_verify_crypt_checksum(data, space_flags))
3115 return DB_CORRUPTION;
3116
3117 if (dberr_t err= fil_space_decrypt(space_id, space_crypt, data,
3118 page.size(), space_flags, data))
3119 return err;
3120 }
3121 else if (fil_page_is_compressed_encrypted(data))
3122 return DB_CORRUPTION;
3123
3124 const bool is_full_crc32_compressed=
3125 fil_space_t::is_full_crc32_compressed(space_flags);
3126
3127 const bool page_actually_compressed=
3128 (is_full_crc32_compressed &&
3129 buf_page_is_compressed(data, space_flags)) ||
3130 fil_page_is_compressed_encrypted(data) || fil_page_is_compressed(data);
3131
3132 if (page_actually_compressed)
3133 {
3134 if (!is_full_crc32_compressed && !fil_space_t::is_compressed(space_flags))
3135 return DB_CORRUPTION;
3136
3137 auto compress_length=
3138 fil_page_decompress(page_compress_buf, data, space_flags);
3139 ut_ad(compress_length != srv_page_size);
3140
3141 if (compress_length == 0)
3142 return DB_CORRUPTION;
3143 }
3144
3145 return DB_SUCCESS;
3146 }
3147
get_buf_size()3148 static size_t get_buf_size()
3149 {
3150 return srv_page_size
3151 #ifdef HAVE_LZO
3152 + LZO1X_1_15_MEM_COMPRESS
3153 #elif defined HAVE_SNAPPY
3154 + snappy_max_compressed_length(srv_page_size)
3155 #endif
3156 ;
3157 }
3158
3159 /* find, parse instant metadata, performing variaous checks,
3160 and apply it to dict_table_t
3161 @return DB_SUCCESS or some error */
handle_instant_metadata(dict_table_t * table,const row_import & cfg)3162 static dberr_t handle_instant_metadata(dict_table_t *table,
3163 const row_import &cfg)
3164 {
3165 dict_get_and_save_data_dir_path(table, false);
3166
3167 char *filepath;
3168 if (DICT_TF_HAS_DATA_DIR(table->flags))
3169 {
3170 ut_a(table->data_dir_path);
3171
3172 filepath=
3173 fil_make_filepath(table->data_dir_path, table->name.m_name, IBD, true);
3174 }
3175 else
3176 filepath= fil_make_filepath(nullptr, table->name.m_name, IBD, false);
3177
3178 if (!filepath)
3179 return DB_OUT_OF_MEMORY;
3180
3181 SCOPE_EXIT([filepath]() { ut_free(filepath); });
3182
3183 bool success;
3184 auto file= os_file_create_simple_no_error_handling(
3185 innodb_data_file_key, filepath, OS_FILE_OPEN, OS_FILE_READ_WRITE, false,
3186 &success);
3187 if (!success)
3188 return DB_IO_ERROR;
3189
3190 if (os_file_get_size(file) < srv_page_size * 4)
3191 return DB_CORRUPTION;
3192
3193 SCOPE_EXIT([&file]() { os_file_close(file); });
3194
3195 std::unique_ptr<byte[], decltype(&aligned_free)> first_page(
3196 static_cast<byte *>(aligned_malloc(srv_page_size, srv_page_size)),
3197 &aligned_free);
3198
3199 if (dberr_t err= os_file_read_no_error_handling(IORequest(IORequest::READ),
3200 file, first_page.get(), 0,
3201 srv_page_size, nullptr))
3202 return err;
3203
3204 auto space_flags= fsp_header_get_flags(first_page.get());
3205
3206 if (!fil_space_t::is_valid_flags(space_flags, true))
3207 {
3208 auto cflags= fsp_flags_convert_from_101(space_flags);
3209 if (cflags == ULINT_UNDEFINED)
3210 {
3211 ib::error() << "Invalid FSP_SPACE_FLAGS=" << ib::hex(space_flags);
3212 return DB_CORRUPTION;
3213 }
3214 space_flags= cflags;
3215 }
3216
3217 if (!cfg.m_missing)
3218 {
3219 if (dberr_t err= cfg.match_flags(current_thd))
3220 return err;
3221 }
3222
3223 const unsigned zip_size= fil_space_t::zip_size(space_flags);
3224 const unsigned physical_size= zip_size ? zip_size : unsigned(srv_page_size);
3225 ut_ad(physical_size <= UNIV_PAGE_SIZE_MAX);
3226 const uint32_t space_id= page_get_space_id(first_page.get());
3227
3228 auto *space_crypt= fil_space_read_crypt_data(zip_size, first_page.get());
3229 SCOPE_EXIT([&space_crypt]() {
3230 if (space_crypt)
3231 fil_space_destroy_crypt_data(&space_crypt);
3232 });
3233
3234 std::unique_ptr<byte[], decltype(&aligned_free)> page(
3235 static_cast<byte *>(
3236 aligned_malloc(UNIV_PAGE_SIZE_MAX, UNIV_PAGE_SIZE_MAX)),
3237 &aligned_free);
3238
3239 if (dberr_t err= os_file_read_no_error_handling(
3240 IORequest(IORequest::READ), file, page.get(), 3 * physical_size,
3241 physical_size, nullptr))
3242 return err;
3243
3244 std::unique_ptr<byte[]> page_compress_buf(new byte[get_buf_size()]);
3245
3246 if (dberr_t err=
3247 decrypt_decompress(space_crypt, space_flags,
3248 {page.get(), static_cast<size_t>(physical_size)},
3249 space_id, page_compress_buf.get()))
3250 return err;
3251
3252 if (table->supports_instant())
3253 {
3254 dict_index_t *index= dict_table_get_first_index(table);
3255
3256 auto tmp1= table->space_id;
3257 table->space_id= page_get_space_id(page.get());
3258 SCOPE_EXIT([tmp1, table]() { table->space_id= tmp1; });
3259
3260 auto tmp2= index->page;
3261 index->page= page_get_page_no(page.get());
3262 SCOPE_EXIT([tmp2, index]() { index->page= tmp2; });
3263
3264 if (!page_is_comp(page.get()) != !dict_table_is_comp(table))
3265 {
3266 ib_errf(current_thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3267 "ROW_FORMAT mismatch");
3268 return DB_CORRUPTION;
3269 }
3270
3271 if (btr_cur_instant_root_init(index, page.get()))
3272 return DB_ERROR;
3273
3274 ut_ad(index->n_core_null_bytes != dict_index_t::NO_CORE_NULL_BYTES);
3275
3276 if (fil_page_get_type(page.get()) == FIL_PAGE_INDEX)
3277 {
3278 ut_ad(!index->is_instant());
3279 return DB_SUCCESS;
3280 }
3281
3282 mem_heap_t *heap= NULL;
3283 SCOPE_EXIT([&heap]() {
3284 if (heap)
3285 mem_heap_free(heap);
3286 });
3287
3288 while (btr_page_get_level(page.get()) != 0)
3289 {
3290 const rec_t *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3291
3292 /* Relax the assertion in rec_init_offsets(). */
3293 ut_ad(!index->in_instant_init);
3294 ut_d(index->in_instant_init= true);
3295 rec_offs *offsets=
3296 rec_get_offsets(rec, index, nullptr, 0, ULINT_UNDEFINED, &heap);
3297 ut_d(index->in_instant_init= false);
3298
3299 uint64_t child_page_no= btr_node_ptr_get_child_page_no(rec, offsets);
3300
3301 if (dberr_t err= os_file_read_no_error_handling(
3302 IORequest(IORequest::READ), file, page.get(),
3303 child_page_no * physical_size, physical_size, nullptr))
3304 return err;
3305
3306 if (dberr_t err= decrypt_decompress(
3307 space_crypt, space_flags,
3308 {page.get(), static_cast<size_t>(physical_size)}, space_id,
3309 page_compress_buf.get()))
3310 return err;
3311 }
3312
3313 const auto *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3314 const auto comp= dict_table_is_comp(index->table);
3315 const auto info_bits= rec_get_info_bits(rec, comp);
3316
3317 if (page_rec_is_supremum(rec) || !(info_bits & REC_INFO_MIN_REC_FLAG))
3318 {
3319 ib::error() << "Table " << index->table->name
3320 << " is missing instant ALTER metadata";
3321 index->table->corrupted= true;
3322 return DB_CORRUPTION;
3323 }
3324
3325 if ((info_bits & ~REC_INFO_DELETED_FLAG) != REC_INFO_MIN_REC_FLAG ||
3326 (comp && rec_get_status(rec) != REC_STATUS_INSTANT))
3327 {
3328 incompatible:
3329 ib::error() << "Table " << index->table->name
3330 << " contains unrecognizable instant ALTER metadata";
3331 index->table->corrupted= true;
3332 return DB_CORRUPTION;
3333 }
3334
3335 if (info_bits & REC_INFO_DELETED_FLAG)
3336 {
3337 ulint trx_id_offset= index->trx_id_offset;
3338 ut_ad(index->n_uniq);
3339
3340 if (trx_id_offset)
3341 {
3342 }
3343 else if (index->table->not_redundant())
3344 {
3345
3346 for (uint i= index->n_uniq; i--;)
3347 trx_id_offset+= index->fields[i].fixed_len;
3348 }
3349 else if (rec_get_1byte_offs_flag(rec))
3350 {
3351 trx_id_offset= rec_1_get_field_end_info(rec, index->n_uniq - 1);
3352 ut_ad(!(trx_id_offset & REC_1BYTE_SQL_NULL_MASK));
3353 trx_id_offset&= ~REC_1BYTE_SQL_NULL_MASK;
3354 }
3355 else
3356 {
3357 trx_id_offset= rec_2_get_field_end_info(rec, index->n_uniq - 1);
3358 ut_ad(!(trx_id_offset & REC_2BYTE_SQL_NULL_MASK));
3359 trx_id_offset&= ~REC_2BYTE_SQL_NULL_MASK;
3360 }
3361
3362 const byte *ptr=
3363 rec + trx_id_offset + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3364
3365 if (mach_read_from_4(ptr + BTR_EXTERN_LEN))
3366 goto incompatible;
3367
3368 uint len= mach_read_from_4(ptr + BTR_EXTERN_LEN + 4);
3369 if (!len || mach_read_from_4(ptr + BTR_EXTERN_OFFSET) != FIL_PAGE_DATA)
3370 goto incompatible;
3371
3372 std::unique_ptr<byte[], decltype(&aligned_free)> second_page(
3373 static_cast<byte *>(aligned_malloc(physical_size, physical_size)),
3374 &aligned_free);
3375
3376 if (dberr_t err= os_file_read_no_error_handling(
3377 IORequest(IORequest::READ), file, second_page.get(),
3378 mach_read_from_4(ptr + BTR_EXTERN_PAGE_NO) * physical_size,
3379 srv_page_size, nullptr))
3380 return err;
3381
3382 if (dberr_t err= decrypt_decompress(
3383 space_crypt, space_flags,
3384 {second_page.get(), static_cast<size_t>(physical_size)},
3385 space_id, page_compress_buf.get()))
3386 return err;
3387
3388 if (fil_page_get_type(second_page.get()) != FIL_PAGE_TYPE_BLOB ||
3389 mach_read_from_4(
3390 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_NEXT_PAGE_NO]) !=
3391 FIL_NULL ||
3392 mach_read_from_4(
3393 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_PART_LEN]) != len)
3394 goto incompatible;
3395
3396 /* The unused part of the BLOB page should be zero-filled. */
3397 for (const byte *
3398 b= second_page.get() + (FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE) +
3399 len,
3400 *const end= second_page.get() + srv_page_size - BTR_EXTERN_LEN;
3401 b < end;)
3402 {
3403 if (*b++)
3404 goto incompatible;
3405 }
3406
3407 if (index->table->deserialise_columns(
3408 &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE], len))
3409 goto incompatible;
3410 }
3411
3412 rec_offs *offsets= rec_get_offsets(
3413 rec, index, nullptr, index->n_core_fields, ULINT_UNDEFINED, &heap);
3414 if (rec_offs_any_default(offsets))
3415 {
3416 inconsistent:
3417 goto incompatible;
3418 }
3419
3420 /* In fact, because we only ever append fields to the metadata
3421 record, it is also OK to perform READ UNCOMMITTED and
3422 then ignore any extra fields, provided that
3423 trx_sys.is_registered(DB_TRX_ID). */
3424 if (rec_offs_n_fields(offsets) >
3425 ulint(index->n_fields) + !!index->table->instant &&
3426 !trx_sys.is_registered(current_trx(),
3427 row_get_rec_trx_id(rec, index, offsets)))
3428 goto inconsistent;
3429
3430 for (unsigned i= index->n_core_fields; i < index->n_fields; i++)
3431 {
3432 dict_col_t *col= index->fields[i].col;
3433 const unsigned o= i + !!index->table->instant;
3434 ulint len;
3435 const byte *data= rec_get_nth_field(rec, offsets, o, &len);
3436 ut_ad(!col->is_added());
3437 ut_ad(!col->def_val.data);
3438 col->def_val.len= len;
3439 switch (len) {
3440 case UNIV_SQL_NULL:
3441 continue;
3442 case 0:
3443 col->def_val.data= field_ref_zero;
3444 continue;
3445 }
3446 ut_ad(len != UNIV_SQL_DEFAULT);
3447 if (!rec_offs_nth_extern(offsets, o))
3448 col->def_val.data= mem_heap_dup(index->table->heap, data, len);
3449 else if (len < BTR_EXTERN_FIELD_REF_SIZE ||
3450 !memcmp(data + len - BTR_EXTERN_FIELD_REF_SIZE, field_ref_zero,
3451 BTR_EXTERN_FIELD_REF_SIZE))
3452 {
3453 col->def_val.len= UNIV_SQL_DEFAULT;
3454 goto inconsistent;
3455 }
3456 else
3457 {
3458 col->def_val.data= btr_copy_externally_stored_field(
3459 &col->def_val.len, data, srv_page_size, len, index->table->heap);
3460 }
3461 }
3462 }
3463
3464 return DB_SUCCESS;
3465 }
3466
3467 /**
3468 Read the contents of the <tablename>.cfg file.
3469 @return DB_SUCCESS or error code. */
3470 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3471 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3472 row_import_read_cfg(
3473 /*================*/
3474 dict_table_t* table, /*!< in: table */
3475 THD* thd, /*!< in: session */
3476 row_import& cfg) /*!< out: contents of the .cfg file */
3477 {
3478 dberr_t err;
3479 char name[OS_FILE_MAX_PATH];
3480
3481 cfg.m_table = table;
3482
3483 srv_get_meta_data_filename(table, name, sizeof(name));
3484
3485 FILE* file = fopen(name, "rb");
3486
3487 if (file == NULL) {
3488 char msg[BUFSIZ];
3489
3490 snprintf(msg, sizeof(msg),
3491 "Error opening '%s', will attempt to import"
3492 " without schema verification", name);
3493
3494 ib_senderrf(
3495 thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3496 (ulong) errno, strerror(errno), msg);
3497
3498 cfg.m_missing = true;
3499
3500 err = DB_FAIL;
3501 } else {
3502
3503 cfg.m_missing = false;
3504
3505 err = row_import_read_meta_data(file, thd, cfg);
3506 fclose(file);
3507 }
3508
3509 return(err);
3510 }
3511
3512 /** Update the root page numbers and tablespace ID of a table.
3513 @param[in,out] trx dictionary transaction
3514 @param[in,out] table persistent table
3515 @param[in] reset whether to reset the fields to FIL_NULL
3516 @return DB_SUCCESS or error code */
3517 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3518 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3519 {
3520 const dict_index_t* index;
3521 que_t* graph = 0;
3522 dberr_t err = DB_SUCCESS;
3523
3524 ut_ad(reset || table->space->id == table->space_id);
3525
3526 static const char sql[] = {
3527 "PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3528 "BEGIN\n"
3529 "UPDATE SYS_INDEXES\n"
3530 "SET SPACE = :space,\n"
3531 " PAGE_NO = :page,\n"
3532 " TYPE = :type\n"
3533 "WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3534 "END;\n"};
3535
3536 table->def_trx_id = trx->id;
3537
3538 for (index = dict_table_get_first_index(table);
3539 index != 0;
3540 index = dict_table_get_next_index(index)) {
3541
3542 pars_info_t* info;
3543 ib_uint32_t page;
3544 ib_uint32_t space;
3545 ib_uint32_t type;
3546 index_id_t index_id;
3547 table_id_t table_id;
3548
3549 info = (graph != 0) ? graph->info : pars_info_create();
3550
3551 mach_write_to_4(
3552 reinterpret_cast<byte*>(&type),
3553 index->type);
3554
3555 mach_write_to_4(
3556 reinterpret_cast<byte*>(&page),
3557 reset ? FIL_NULL : index->page);
3558
3559 mach_write_to_4(
3560 reinterpret_cast<byte*>(&space),
3561 reset ? FIL_NULL : index->table->space_id);
3562
3563 mach_write_to_8(
3564 reinterpret_cast<byte*>(&index_id),
3565 index->id);
3566
3567 mach_write_to_8(
3568 reinterpret_cast<byte*>(&table_id),
3569 table->id);
3570
3571 /* If we set the corrupt bit during the IMPORT phase then
3572 we need to update the system tables. */
3573 pars_info_bind_int4_literal(info, "type", &type);
3574 pars_info_bind_int4_literal(info, "space", &space);
3575 pars_info_bind_int4_literal(info, "page", &page);
3576 pars_info_bind_ull_literal(info, "index_id", &index_id);
3577 pars_info_bind_ull_literal(info, "table_id", &table_id);
3578
3579 if (graph == 0) {
3580 graph = pars_sql(info, sql);
3581 ut_a(graph);
3582 graph->trx = trx;
3583 }
3584
3585 que_thr_t* thr;
3586
3587 graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3588
3589 ut_a(thr = que_fork_start_command(graph));
3590
3591 que_run_threads(thr);
3592
3593 DBUG_EXECUTE_IF("ib_import_internal_error",
3594 trx->error_state = DB_ERROR;);
3595
3596 err = trx->error_state;
3597
3598 if (err != DB_SUCCESS) {
3599 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3600 ER_INTERNAL_ERROR,
3601 "While updating the <space, root page"
3602 " number> of index %s - %s",
3603 index->name(), ut_strerr(err));
3604
3605 break;
3606 }
3607 }
3608
3609 que_graph_free(graph);
3610
3611 return(err);
3612 }
3613
3614 /** Callback arg for row_import_set_discarded. */
3615 struct discard_t {
3616 ib_uint32_t flags2; /*!< Value read from column */
3617 bool state; /*!< New state of the flag */
3618 ulint n_recs; /*!< Number of recs processed */
3619 };
3620
3621 /******************************************************************//**
3622 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3623 SYS_TABLES. The flags is stored in MIX_LEN column.
3624 @return FALSE if all OK */
3625 static
3626 ibool
row_import_set_discarded(void * row,void * user_arg)3627 row_import_set_discarded(
3628 /*=====================*/
3629 void* row, /*!< in: sel_node_t* */
3630 void* user_arg) /*!< in: bool set/unset flag */
3631 {
3632 sel_node_t* node = static_cast<sel_node_t*>(row);
3633 discard_t* discard = static_cast<discard_t*>(user_arg);
3634 dfield_t* dfield = que_node_get_val(node->select_list);
3635 dtype_t* type = dfield_get_type(dfield);
3636 ulint len = dfield_get_len(dfield);
3637
3638 ut_a(dtype_get_mtype(type) == DATA_INT);
3639 ut_a(len == sizeof(ib_uint32_t));
3640
3641 ulint flags2 = mach_read_from_4(
3642 static_cast<byte*>(dfield_get_data(dfield)));
3643
3644 if (discard->state) {
3645 flags2 |= DICT_TF2_DISCARDED;
3646 } else {
3647 flags2 &= ~DICT_TF2_DISCARDED;
3648 }
3649
3650 mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3651
3652 ++discard->n_recs;
3653
3654 /* There should be at most one matching record. */
3655 ut_a(discard->n_recs == 1);
3656
3657 return(FALSE);
3658 }
3659
3660 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3661 @param[in,out] trx dictionary transaction
3662 @param[in] table_id table identifier
3663 @param[in] discarded whether to set or clear the flag
3664 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3665 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3666 bool discarded)
3667 {
3668 pars_info_t* info;
3669 discard_t discard;
3670
3671 static const char sql[] =
3672 "PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3673 "DECLARE FUNCTION my_func;\n"
3674 "DECLARE CURSOR c IS\n"
3675 " SELECT MIX_LEN"
3676 " FROM SYS_TABLES"
3677 " WHERE ID = :table_id FOR UPDATE;"
3678 "\n"
3679 "BEGIN\n"
3680 "OPEN c;\n"
3681 "WHILE 1 = 1 LOOP\n"
3682 " FETCH c INTO my_func();\n"
3683 " IF c % NOTFOUND THEN\n"
3684 " EXIT;\n"
3685 " END IF;\n"
3686 "END LOOP;\n"
3687 "UPDATE SYS_TABLES"
3688 " SET MIX_LEN = :flags2"
3689 " WHERE ID = :table_id;\n"
3690 "CLOSE c;\n"
3691 "END;\n";
3692
3693 discard.n_recs = 0;
3694 discard.state = discarded;
3695 discard.flags2 = ULINT32_UNDEFINED;
3696
3697 info = pars_info_create();
3698
3699 pars_info_add_ull_literal(info, "table_id", table_id);
3700 pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3701
3702 pars_info_bind_function(
3703 info, "my_func", row_import_set_discarded, &discard);
3704
3705 dberr_t err = que_eval_sql(info, sql, false, trx);
3706
3707 ut_a(discard.n_recs == 1);
3708 ut_a(discard.flags2 != ULINT32_UNDEFINED);
3709
3710 return(err);
3711 }
3712
3713 /** InnoDB writes page by page when there is page compressed
3714 tablespace involved. It does help to save the disk space when
3715 punch hole is enabled
3716 @param iter Tablespace iterator
3717 @param full_crc32 whether the file is in the full_crc32 format
3718 @param write_request Request to write into the file
3719 @param offset offset of the file to be written
3720 @param writeptr buffer to be written
3721 @param n_bytes number of bytes to be written
3722 @param try_punch_only Try the range punch only because the
3723 current range is full of empty pages
3724 @return DB_SUCCESS */
3725 static
fil_import_compress_fwrite(const fil_iterator_t & iter,bool full_crc32,const IORequest & write_request,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3726 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3727 bool full_crc32,
3728 const IORequest &write_request,
3729 os_offset_t offset,
3730 const byte *writeptr,
3731 ulint n_bytes,
3732 bool try_punch_only= false)
3733 {
3734 if (dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes))
3735 return err;
3736
3737 if (try_punch_only)
3738 return DB_SUCCESS;
3739
3740 for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3741 {
3742 /* Read the original data length from block and
3743 safer to read FIL_PAGE_COMPRESSED_SIZE because it
3744 is not encrypted*/
3745 ulint n_write_bytes= srv_page_size;
3746 if (j || offset)
3747 {
3748 n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3749 const unsigned ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3750 /* Ignore the empty page */
3751 if (ptype == 0 && n_write_bytes == 0)
3752 continue;
3753 if (full_crc32)
3754 n_write_bytes= buf_page_full_crc32_size(writeptr + j,
3755 nullptr, nullptr);
3756 else
3757 {
3758 n_write_bytes+= ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3759 ? FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN
3760 : FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
3761 }
3762 }
3763
3764 if (dberr_t err= os_file_write(write_request, iter.filepath, iter.file,
3765 writeptr + j, offset + j, n_write_bytes))
3766 return err;
3767 }
3768
3769 return DB_SUCCESS;
3770 }
3771
run(const fil_iterator_t & iter,buf_block_t * block)3772 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3773 buf_block_t* block) UNIV_NOTHROW
3774 {
3775 const unsigned zip_size= fil_space_t::zip_size(m_space_flags);
3776 const unsigned size= zip_size ? zip_size : unsigned(srv_page_size);
3777 byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3778 const bool full_crc32 = fil_space_t::full_crc32(m_space_flags);
3779 bool skip_checksum_check = false;
3780 ut_ad(!srv_read_only_mode);
3781
3782 if (!page_compress_buf)
3783 return DB_OUT_OF_MEMORY;
3784
3785 const bool encrypted= iter.crypt_data != NULL &&
3786 iter.crypt_data->should_encrypt();
3787 byte* const readptr= iter.io_buffer;
3788 block->frame= readptr;
3789
3790 if (block->page.zip.data)
3791 block->page.zip.data= readptr;
3792
3793 IORequest read_request(IORequest::READ);
3794 read_request.disable_partial_io_warnings();
3795 ulint page_no= 0;
3796 bool page_compressed= false;
3797
3798 dberr_t err= os_file_read_no_error_handling(
3799 read_request, iter.file, readptr, 3 * size, size, 0);
3800 if (err != DB_SUCCESS)
3801 {
3802 ib::error() << iter.filepath << ": os_file_read() failed";
3803 goto func_exit;
3804 }
3805
3806 block->page.id.set_page_no(3);
3807 page_no= page_get_page_no(readptr);
3808
3809 if (page_no != 3)
3810 {
3811 page_corrupted:
3812 ib::warn() << filename() << ": Page 3 at offset "
3813 << 3 * size << " looks corrupted.";
3814 err= DB_CORRUPTION;
3815 goto func_exit;
3816 }
3817
3818 page_compressed=
3819 (full_crc32 && fil_space_t::is_compressed(m_space_flags) &&
3820 buf_page_is_compressed(readptr, m_space_flags)) ||
3821 (fil_page_is_compressed_encrypted(readptr) ||
3822 fil_page_is_compressed(readptr));
3823
3824 if (page_compressed && block->page.zip.data)
3825 goto page_corrupted;
3826
3827 if (encrypted)
3828 {
3829 if (!buf_page_verify_crypt_checksum(readptr, m_space_flags))
3830 goto page_corrupted;
3831
3832 if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3833 buf_page_get_key_version(readptr, m_space_flags))
3834 goto page_corrupted;
3835
3836 if ((err= fil_space_decrypt(get_space_id(), iter.crypt_data, readptr, size,
3837 m_space_flags, readptr)))
3838 goto func_exit;
3839 }
3840
3841 /* For full_crc32 format, skip checksum check
3842 after decryption. */
3843 skip_checksum_check= full_crc32 && encrypted;
3844
3845 if (page_compressed)
3846 {
3847 ulint compress_length= fil_page_decompress(page_compress_buf,
3848 readptr,
3849 m_space_flags);
3850 ut_ad(compress_length != srv_page_size);
3851 if (compress_length == 0)
3852 goto page_corrupted;
3853 }
3854 else if (!skip_checksum_check
3855 && buf_page_is_corrupted(false, readptr, m_space_flags))
3856 goto page_corrupted;
3857
3858 err= this->operator()(block);
3859 func_exit:
3860 free(page_compress_buf);
3861 return err;
3862 }
3863
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3864 static dberr_t fil_iterate(
3865 const fil_iterator_t& iter,
3866 buf_block_t* block,
3867 AbstractCallback& callback)
3868 {
3869 os_offset_t offset;
3870 const ulint size = callback.physical_size();
3871 ulint n_bytes = iter.n_io_buffers * size;
3872
3873 byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3874 ut_ad(!srv_read_only_mode);
3875
3876 if (!page_compress_buf) {
3877 return DB_OUT_OF_MEMORY;
3878 }
3879
3880 ulint actual_space_id = 0;
3881 const bool full_crc32 = fil_space_t::full_crc32(
3882 callback.get_space_flags());
3883
3884 /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3885 copying for non-index pages. Unfortunately, it is
3886 required by buf_zip_decompress() */
3887 dberr_t err = DB_SUCCESS;
3888 bool page_compressed = false;
3889 bool punch_hole = true;
3890 const IORequest write_request(IORequest::WRITE);
3891
3892 for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3893 if (callback.is_interrupted()) {
3894 err = DB_INTERRUPTED;
3895 goto func_exit;
3896 }
3897
3898 byte* io_buffer = iter.io_buffer;
3899 block->frame = io_buffer;
3900
3901 if (block->page.zip.data) {
3902 /* Zip IO is done in the compressed page buffer. */
3903 io_buffer = block->page.zip.data;
3904 }
3905
3906 /* We have to read the exact number of bytes. Otherwise the
3907 InnoDB IO functions croak on failed reads. */
3908
3909 n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3910 iter.end - offset));
3911
3912 ut_ad(n_bytes > 0);
3913 ut_ad(!(n_bytes % size));
3914
3915 const bool encrypted = iter.crypt_data != NULL
3916 && iter.crypt_data->should_encrypt();
3917 /* Use additional crypt io buffer if tablespace is encrypted */
3918 byte* const readptr = encrypted
3919 ? iter.crypt_io_buffer : io_buffer;
3920 byte* const writeptr = readptr;
3921
3922 IORequest read_request(IORequest::READ);
3923 read_request.disable_partial_io_warnings();
3924
3925 err = os_file_read_no_error_handling(
3926 read_request, iter.file, readptr, offset, n_bytes, 0);
3927 if (err != DB_SUCCESS) {
3928 ib::error() << iter.filepath
3929 << ": os_file_read() failed";
3930 goto func_exit;
3931 }
3932
3933 bool updated = false;
3934 os_offset_t page_off = offset;
3935 ulint n_pages_read = n_bytes / size;
3936 block->page.id.set_page_no(ulint(page_off / size));
3937
3938 for (ulint i = 0; i < n_pages_read;
3939 block->page.id.set_page_no(block->page.id.page_no() + 1),
3940 ++i, page_off += size, block->frame += size) {
3941 byte* src = readptr + i * size;
3942 const ulint page_no = page_get_page_no(src);
3943 if (!page_no && block->page.id.page_no()) {
3944 if (!buf_is_zeroes(span<const byte>(src,
3945 size))) {
3946 goto page_corrupted;
3947 }
3948 /* Proceed to the next page,
3949 because this one is all zero. */
3950 continue;
3951 }
3952
3953 if (page_no != block->page.id.page_no()) {
3954 page_corrupted:
3955 ib::warn() << callback.filename()
3956 << ": Page " << (offset / size)
3957 << " at offset " << offset
3958 << " looks corrupted.";
3959 err = DB_CORRUPTION;
3960 goto func_exit;
3961 }
3962
3963 if (block->page.id.page_no() == 0) {
3964 actual_space_id = mach_read_from_4(
3965 src + FIL_PAGE_SPACE_ID);
3966 }
3967
3968 page_compressed =
3969 (full_crc32
3970 && fil_space_t::is_compressed(
3971 callback.get_space_flags())
3972 && buf_page_is_compressed(
3973 src, callback.get_space_flags()))
3974 || (fil_page_is_compressed_encrypted(src)
3975 || fil_page_is_compressed(src));
3976
3977 if (page_compressed && block->page.zip.data) {
3978 goto page_corrupted;
3979 }
3980
3981 bool decrypted = false;
3982 byte* dst = io_buffer + i * size;
3983 bool frame_changed = false;
3984 uint key_version = buf_page_get_key_version(
3985 src, callback.get_space_flags());
3986
3987 if (!encrypted) {
3988 } else if (!key_version) {
3989 if (block->page.id.page_no() == 0
3990 && block->page.zip.data) {
3991 block->page.zip.data = src;
3992 frame_changed = true;
3993 } else if (!page_compressed
3994 && !block->page.zip.data) {
3995 block->frame = src;
3996 frame_changed = true;
3997 } else {
3998 ut_ad(dst != src);
3999 memcpy(dst, src, size);
4000 }
4001 } else {
4002 if (!buf_page_verify_crypt_checksum(
4003 src, callback.get_space_flags())) {
4004 goto page_corrupted;
4005 }
4006
4007 if ((err = fil_space_decrypt(
4008 actual_space_id,
4009 iter.crypt_data, dst,
4010 callback.physical_size(),
4011 callback.get_space_flags(),
4012 src))) {
4013 goto func_exit;
4014 }
4015
4016 decrypted = true;
4017 updated = true;
4018 }
4019
4020 /* For full_crc32 format, skip checksum check
4021 after decryption. */
4022 bool skip_checksum_check = full_crc32 && encrypted;
4023
4024 /* If the original page is page_compressed, we need
4025 to decompress it before adjusting further. */
4026 if (page_compressed) {
4027 ulint compress_length = fil_page_decompress(
4028 page_compress_buf, dst,
4029 callback.get_space_flags());
4030 ut_ad(compress_length != srv_page_size);
4031 if (compress_length == 0) {
4032 goto page_corrupted;
4033 }
4034 updated = true;
4035 } else if (!skip_checksum_check
4036 && buf_page_is_corrupted(
4037 false,
4038 encrypted && !frame_changed
4039 ? dst : src,
4040 callback.get_space_flags())) {
4041 goto page_corrupted;
4042 }
4043
4044 if ((err = callback(block)) != DB_SUCCESS) {
4045 goto func_exit;
4046 } else if (!updated) {
4047 updated = buf_block_get_state(block)
4048 == BUF_BLOCK_FILE_PAGE;
4049 }
4050
4051 /* If tablespace is encrypted we use additional
4052 temporary scratch area where pages are read
4053 for decrypting readptr == crypt_io_buffer != io_buffer.
4054
4055 Destination for decryption is a buffer pool block
4056 block->frame == dst == io_buffer that is updated.
4057 Pages that did not require decryption even when
4058 tablespace is marked as encrypted are not copied
4059 instead block->frame is set to src == readptr.
4060
4061 For encryption we again use temporary scratch area
4062 writeptr != io_buffer == dst
4063 that is then written to the tablespace
4064
4065 (1) For normal tables io_buffer == dst == writeptr
4066 (2) For only page compressed tables
4067 io_buffer == dst == writeptr
4068 (3) For encrypted (and page compressed)
4069 readptr != io_buffer == dst != writeptr
4070 */
4071
4072 ut_ad(!encrypted && !page_compressed ?
4073 src == dst && dst == writeptr + (i * size):1);
4074 ut_ad(page_compressed && !encrypted ?
4075 src == dst && dst == writeptr + (i * size):1);
4076 ut_ad(encrypted ?
4077 src != dst && dst != writeptr + (i * size):1);
4078
4079 /* When tablespace is encrypted or compressed its
4080 first page (i.e. page 0) is not encrypted or
4081 compressed and there is no need to copy frame. */
4082 if (encrypted && block->page.id.page_no() != 0) {
4083 byte *local_frame = callback.get_frame(block);
4084 ut_ad((writeptr + (i * size)) != local_frame);
4085 memcpy((writeptr + (i * size)), local_frame, size);
4086 }
4087
4088 if (frame_changed) {
4089 if (block->page.zip.data) {
4090 block->page.zip.data = dst;
4091 } else {
4092 block->frame = dst;
4093 }
4094 }
4095
4096 src = io_buffer + (i * size);
4097
4098 if (page_compressed) {
4099 updated = true;
4100 if (ulint len = fil_page_compress(
4101 src,
4102 page_compress_buf,
4103 callback.get_space_flags(),
4104 512,/* FIXME: proper block size */
4105 encrypted)) {
4106 /* FIXME: remove memcpy() */
4107 memcpy(src, page_compress_buf, len);
4108 memset(src + len, 0,
4109 srv_page_size - len);
4110 }
4111 }
4112
4113 /* Encrypt the page if encryption was used. */
4114 if (encrypted && decrypted) {
4115 byte *dest = writeptr + i * size;
4116
4117 byte* tmp = fil_encrypt_buf(
4118 iter.crypt_data,
4119 block->page.id.space(),
4120 block->page.id.page_no(),
4121 mach_read_from_8(src + FIL_PAGE_LSN),
4122 src, block->zip_size(), dest,
4123 full_crc32);
4124
4125 if (tmp == src) {
4126 /* TODO: remove unnecessary memcpy's */
4127 ut_ad(dest != src);
4128 memcpy(dest, src, size);
4129 }
4130
4131 updated = true;
4132 }
4133
4134 /* Write checksum for the compressed full crc32 page.*/
4135 if (full_crc32 && page_compressed) {
4136 ut_ad(updated);
4137 byte* dest = writeptr + i * size;
4138 ut_d(bool comp = false);
4139 ut_d(bool corrupt = false);
4140 ulint size = buf_page_full_crc32_size(
4141 dest,
4142 #ifdef UNIV_DEBUG
4143 &comp, &corrupt
4144 #else
4145 NULL, NULL
4146 #endif
4147 );
4148 ut_ad(!comp == (size == srv_page_size));
4149 ut_ad(!corrupt);
4150 mach_write_to_4(dest + (size - 4),
4151 ut_crc32(dest, size - 4));
4152 }
4153 }
4154
4155 if (page_compressed && punch_hole) {
4156 err = fil_import_compress_fwrite(
4157 iter, full_crc32, write_request, offset,
4158 writeptr, n_bytes, !updated);
4159
4160 if (err != DB_SUCCESS) {
4161 punch_hole = false;
4162 if (updated) {
4163 goto normal_write;
4164 }
4165 }
4166 } else if (updated) {
4167 /* A page was updated in the set, write back to disk. */
4168 normal_write:
4169 err = os_file_write(
4170 write_request, iter.filepath, iter.file,
4171 writeptr, offset, n_bytes);
4172
4173 if (err != DB_SUCCESS) {
4174 goto func_exit;
4175 }
4176 }
4177 }
4178
4179 func_exit:
4180 free(page_compress_buf);
4181 return err;
4182 }
4183
4184 /********************************************************************//**
4185 Iterate over all the pages in the tablespace.
4186 @param table - the table definiton in the server
4187 @param n_io_buffers - number of blocks to read and write together
4188 @param callback - functor that will do the page updates
4189 @return DB_SUCCESS or error code */
4190 static
4191 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)4192 fil_tablespace_iterate(
4193 /*===================*/
4194 dict_table_t* table,
4195 ulint n_io_buffers,
4196 AbstractCallback& callback)
4197 {
4198 dberr_t err;
4199 pfs_os_file_t file;
4200 char* filepath;
4201
4202 ut_a(n_io_buffers > 0);
4203 ut_ad(!srv_read_only_mode);
4204
4205 DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
4206 return(DB_CORRUPTION););
4207
4208 /* Make sure the data_dir_path is set. */
4209 dict_get_and_save_data_dir_path(table, false);
4210
4211 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4212 ut_a(table->data_dir_path);
4213
4214 filepath = fil_make_filepath(
4215 table->data_dir_path, table->name.m_name, IBD, true);
4216 } else {
4217 filepath = fil_make_filepath(
4218 NULL, table->name.m_name, IBD, false);
4219 }
4220
4221 if (!filepath) {
4222 return(DB_OUT_OF_MEMORY);
4223 } else {
4224 bool success;
4225
4226 file = os_file_create_simple_no_error_handling(
4227 innodb_data_file_key, filepath,
4228 OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
4229
4230 if (!success) {
4231 /* The following call prints an error message */
4232 os_file_get_last_error(true);
4233 ib::error() << "Trying to import a tablespace,"
4234 " but could not open the tablespace file "
4235 << filepath;
4236 ut_free(filepath);
4237 return DB_TABLESPACE_NOT_FOUND;
4238 } else {
4239 err = DB_SUCCESS;
4240 }
4241 }
4242
4243 callback.set_file(filepath, file);
4244
4245 os_offset_t file_size = os_file_get_size(file);
4246 ut_a(file_size != (os_offset_t) -1);
4247
4248 /* Allocate a page to read in the tablespace header, so that we
4249 can determine the page size and zip_size (if it is compressed).
4250 We allocate an extra page in case it is a compressed table. One
4251 page is to ensure alignement. */
4252
4253 void* page_ptr = ut_malloc_nokey(3U << srv_page_size_shift);
4254 byte* page = static_cast<byte*>(ut_align(page_ptr, srv_page_size));
4255
4256 buf_block_t* block = reinterpret_cast<buf_block_t*>
4257 (ut_zalloc_nokey(sizeof *block));
4258 block->frame = page;
4259 block->page.id = page_id_t(0, 0);
4260 block->page.io_fix = BUF_IO_NONE;
4261 block->page.buf_fix_count = 1;
4262 block->page.state = BUF_BLOCK_FILE_PAGE;
4263
4264 /* Read the first page and determine the page and zip size. */
4265
4266 IORequest request(IORequest::READ);
4267 request.disable_partial_io_warnings();
4268
4269 err = os_file_read_no_error_handling(request, file, page, 0,
4270 srv_page_size, 0);
4271
4272 if (err == DB_SUCCESS) {
4273 err = callback.init(file_size, block);
4274 }
4275
4276 if (err == DB_SUCCESS) {
4277 block->page.id = page_id_t(callback.get_space_id(), 0);
4278 if (ulint zip_size = callback.get_zip_size()) {
4279 page_zip_set_size(&block->page.zip, zip_size);
4280 /* ROW_FORMAT=COMPRESSED is not optimised for block IO
4281 for now. We do the IMPORT page by page. */
4282 n_io_buffers = 1;
4283 }
4284
4285 fil_iterator_t iter;
4286
4287 /* read (optional) crypt data */
4288 iter.crypt_data = fil_space_read_crypt_data(
4289 callback.get_zip_size(), page);
4290
4291 /* If tablespace is encrypted, it needs extra buffers */
4292 if (iter.crypt_data && n_io_buffers > 1) {
4293 /* decrease io buffers so that memory
4294 consumption will not double */
4295 n_io_buffers /= 2;
4296 }
4297
4298 iter.file = file;
4299 iter.start = 0;
4300 iter.end = file_size;
4301 iter.filepath = filepath;
4302 iter.file_size = file_size;
4303 iter.n_io_buffers = n_io_buffers;
4304
4305 /* Add an extra page for compressed page scratch area. */
4306 void* io_buffer = ut_malloc_nokey(
4307 (2 + iter.n_io_buffers) << srv_page_size_shift);
4308
4309 iter.io_buffer = static_cast<byte*>(
4310 ut_align(io_buffer, srv_page_size));
4311
4312 void* crypt_io_buffer = NULL;
4313 if (iter.crypt_data) {
4314 crypt_io_buffer = ut_malloc_nokey(
4315 (2 + iter.n_io_buffers)
4316 << srv_page_size_shift);
4317 iter.crypt_io_buffer = static_cast<byte*>(
4318 ut_align(crypt_io_buffer, srv_page_size));
4319 }
4320
4321 if (block->page.zip.ssize) {
4322 ut_ad(iter.n_io_buffers == 1);
4323 block->frame = iter.io_buffer;
4324 block->page.zip.data = block->frame + srv_page_size;
4325 }
4326
4327 err = callback.run(iter, block);
4328
4329 if (iter.crypt_data) {
4330 fil_space_destroy_crypt_data(&iter.crypt_data);
4331 }
4332
4333 ut_free(crypt_io_buffer);
4334 ut_free(io_buffer);
4335 }
4336
4337 if (err == DB_SUCCESS) {
4338 ib::info() << "Sync to disk";
4339
4340 if (!os_file_flush(file)) {
4341 ib::info() << "os_file_flush() failed!";
4342 err = DB_IO_ERROR;
4343 } else {
4344 ib::info() << "Sync to disk - done!";
4345 }
4346 }
4347
4348 os_file_close(file);
4349
4350 ut_free(page_ptr);
4351 ut_free(filepath);
4352 ut_free(block);
4353
4354 return(err);
4355 }
4356
4357 /*****************************************************************//**
4358 Imports a tablespace. The space id in the .ibd file must match the space id
4359 of the table in the data dictionary.
4360 @return error code or DB_SUCCESS */
4361 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)4362 row_import_for_mysql(
4363 /*=================*/
4364 dict_table_t* table, /*!< in/out: table */
4365 row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL */
4366 {
4367 dberr_t err;
4368 trx_t* trx;
4369 ib_uint64_t autoinc = 0;
4370 char* filepath = NULL;
4371 ulint space_flags MY_ATTRIBUTE((unused));
4372
4373 /* The caller assured that this is not read_only_mode and that no
4374 temorary tablespace is being imported. */
4375 ut_ad(!srv_read_only_mode);
4376 ut_ad(!table->is_temporary());
4377
4378 ut_ad(table->space_id);
4379 ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
4380 ut_ad(prebuilt->trx);
4381 ut_ad(!table->is_readable());
4382
4383 ibuf_delete_for_discarded_space(table->space_id);
4384
4385 trx_start_if_not_started(prebuilt->trx, true);
4386
4387 trx = trx_create();
4388
4389 /* So that the table is not DROPped during recovery. */
4390 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4391
4392 trx_start_if_not_started(trx, true);
4393
4394 /* So that we can send error messages to the user. */
4395 trx->mysql_thd = prebuilt->trx->mysql_thd;
4396
4397 /* Ensure that the table will be dropped by trx_rollback_active()
4398 in case of a crash. */
4399
4400 trx->table_id = table->id;
4401
4402 /* Assign an undo segment for the transaction, so that the
4403 transaction will be recovered after a crash. */
4404
4405 /* TODO: Do not write any undo log for the IMPORT cleanup. */
4406 {
4407 mtr_t mtr;
4408 mtr.start();
4409 trx_undo_assign(trx, &err, &mtr);
4410 mtr.commit();
4411 }
4412
4413 DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4414 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4415
4416 if (err != DB_SUCCESS) {
4417
4418 return(row_import_cleanup(prebuilt, trx, err));
4419
4420 } else if (trx->rsegs.m_redo.undo == 0) {
4421
4422 err = DB_TOO_MANY_CONCURRENT_TRXS;
4423 return(row_import_cleanup(prebuilt, trx, err));
4424 }
4425
4426 prebuilt->trx->op_info = "read meta-data file";
4427
4428 /* Prevent DDL operations while we are checking. */
4429
4430 rw_lock_s_lock(&dict_sys.latch);
4431
4432 row_import cfg;
4433
4434 err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4435
4436 /* Check if the table column definitions match the contents
4437 of the config file. */
4438
4439 if (err == DB_SUCCESS) {
4440
4441 if (dberr_t err = handle_instant_metadata(table, cfg)) {
4442 rw_lock_s_unlock(&dict_sys.latch);
4443 return row_import_error(prebuilt, trx, err);
4444 }
4445
4446 /* We have a schema file, try and match it with our
4447 data dictionary. */
4448
4449 err = cfg.match_schema(trx->mysql_thd);
4450
4451 /* Update index->page and SYS_INDEXES.PAGE_NO to match the
4452 B-tree root page numbers in the tablespace. Use the index
4453 name from the .cfg file to find match. */
4454
4455 if (err == DB_SUCCESS) {
4456 cfg.set_root_by_name();
4457 autoinc = cfg.m_autoinc;
4458 }
4459
4460 rw_lock_s_unlock(&dict_sys.latch);
4461
4462 DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4463 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4464
4465 } else if (cfg.m_missing) {
4466
4467 rw_lock_s_unlock(&dict_sys.latch);
4468
4469 /* We don't have a schema file, we will have to discover
4470 the index root pages from the .ibd file and skip the schema
4471 matching step. */
4472
4473 ut_a(err == DB_FAIL);
4474
4475 cfg.m_zip_size = 0;
4476
4477 if (UT_LIST_GET_LEN(table->indexes) > 1) {
4478 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4479 ER_INTERNAL_ERROR,
4480 "Drop all secondary indexes before importing "
4481 "table %s when .cfg file is missing.",
4482 table->name.m_name);
4483 err = DB_ERROR;
4484 return row_import_error(prebuilt, trx, err);
4485 }
4486
4487 FetchIndexRootPages fetchIndexRootPages(table, trx);
4488
4489 err = fil_tablespace_iterate(
4490 table, IO_BUFFER_SIZE(srv_page_size),
4491 fetchIndexRootPages);
4492
4493 if (err == DB_SUCCESS) {
4494
4495 err = fetchIndexRootPages.build_row_import(&cfg);
4496
4497 /* Update index->page and SYS_INDEXES.PAGE_NO
4498 to match the B-tree root page numbers in the
4499 tablespace. */
4500
4501 if (err == DB_SUCCESS) {
4502 err = cfg.set_root_by_heuristic();
4503
4504 if (err == DB_SUCCESS) {
4505 if (dberr_t err =
4506 handle_instant_metadata(table,
4507 cfg)) {
4508 return row_import_error(
4509 prebuilt, trx, err);
4510 }
4511 }
4512 }
4513 }
4514
4515 space_flags = fetchIndexRootPages.get_space_flags();
4516
4517 } else {
4518 rw_lock_s_unlock(&dict_sys.latch);
4519 }
4520
4521 if (err != DB_SUCCESS) {
4522 return(row_import_error(prebuilt, trx, err));
4523 }
4524
4525 prebuilt->trx->op_info = "importing tablespace";
4526
4527 ib::info() << "Phase I - Update all pages";
4528
4529 /* Iterate over all the pages and do the sanity checking and
4530 the conversion required to import the tablespace. */
4531
4532 PageConverter converter(&cfg, table->space_id, trx);
4533
4534 /* Set the IO buffer size in pages. */
4535
4536 err = fil_tablespace_iterate(
4537 table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size
4538 : srv_page_size), converter);
4539
4540 DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4541 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4542 #ifdef BTR_CUR_HASH_ADAPT
4543 /* On DISCARD TABLESPACE, we did not drop any adaptive hash
4544 index entries. If we replaced the discarded tablespace with a
4545 smaller one here, there could still be some adaptive hash
4546 index entries that point to cached garbage pages in the buffer
4547 pool, because PageConverter::operator() only evicted those
4548 pages that were replaced by the imported pages. We must
4549 detach any remaining adaptive hash index entries, because the
4550 adaptive hash index must be a subset of the table contents;
4551 false positives are not tolerated. */
4552 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4553 index = UT_LIST_GET_NEXT(indexes, index)) {
4554 index = index->clone_if_needed();
4555 }
4556 #endif /* BTR_CUR_HASH_ADAPT */
4557
4558 if (err != DB_SUCCESS) {
4559 char table_name[MAX_FULL_NAME_LEN + 1];
4560
4561 innobase_format_name(
4562 table_name, sizeof(table_name),
4563 table->name.m_name);
4564
4565 if (err != DB_DECRYPTION_FAILED) {
4566
4567 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4568 ER_INTERNAL_ERROR,
4569 "Cannot reset LSNs in table %s : %s",
4570 table_name, ut_strerr(err));
4571 }
4572
4573 return(row_import_cleanup(prebuilt, trx, err));
4574 }
4575
4576 row_mysql_lock_data_dictionary(trx);
4577
4578 /* If the table is stored in a remote tablespace, we need to
4579 determine that filepath from the link file and system tables.
4580 Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4581 dict_get_and_save_data_dir_path(table, true);
4582
4583 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4584 ut_a(table->data_dir_path);
4585
4586 filepath = fil_make_filepath(
4587 table->data_dir_path, table->name.m_name, IBD, true);
4588 } else {
4589 filepath = fil_make_filepath(
4590 NULL, table->name.m_name, IBD, false);
4591 }
4592
4593 DBUG_EXECUTE_IF(
4594 "ib_import_OOM_15",
4595 ut_free(filepath);
4596 filepath = NULL;
4597 );
4598
4599 if (filepath == NULL) {
4600 row_mysql_unlock_data_dictionary(trx);
4601 return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4602 }
4603
4604 /* Open the tablespace so that we can access via the buffer pool.
4605 We set the 2nd param (fix_dict = true) here because we already
4606 have an x-lock on dict_sys.latch and dict_sys.mutex.
4607 The tablespace is initially opened as a temporary one, because
4608 we will not be writing any redo log for it before we have invoked
4609 fil_space_t::set_imported() to declare it a persistent tablespace. */
4610
4611 ulint fsp_flags = dict_tf_to_fsp_flags(table->flags);
4612
4613 table->space = fil_ibd_open(
4614 true, true, FIL_TYPE_IMPORT, table->space_id,
4615 fsp_flags, table->name, filepath, &err);
4616
4617 ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4618 DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4619 err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4620
4621 if (!table->space) {
4622 row_mysql_unlock_data_dictionary(trx);
4623
4624 ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4625 ER_GET_ERRMSG,
4626 err, ut_strerr(err), filepath);
4627
4628 ut_free(filepath);
4629
4630 return(row_import_cleanup(prebuilt, trx, err));
4631 }
4632
4633 row_mysql_unlock_data_dictionary(trx);
4634
4635 ut_free(filepath);
4636
4637 err = ibuf_check_bitmap_on_import(trx, table->space);
4638
4639 DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4640
4641 if (err != DB_SUCCESS) {
4642 return(row_import_cleanup(prebuilt, trx, err));
4643 }
4644
4645 /* The first index must always be the clustered index. */
4646
4647 dict_index_t* index = dict_table_get_first_index(table);
4648
4649 if (!dict_index_is_clust(index)) {
4650 return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4651 }
4652
4653 /* Update the Btree segment headers for index node and
4654 leaf nodes in the root page. Set the new space id. */
4655
4656 err = btr_root_adjust_on_import(index);
4657
4658 DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4659 err = DB_CORRUPTION;);
4660
4661 if (err != DB_SUCCESS) {
4662 return(row_import_error(prebuilt, trx, err));
4663 } else if (cfg.requires_purge(index->name)) {
4664
4665 /* Purge any delete-marked records that couldn't be
4666 purged during the page conversion phase from the
4667 cluster index. */
4668
4669 IndexPurge purge(trx, index);
4670
4671 trx->op_info = "cluster: purging delete marked records";
4672
4673 err = purge.garbage_collect();
4674
4675 trx->op_info = "";
4676 }
4677
4678 DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4679
4680 if (err != DB_SUCCESS) {
4681 return(row_import_error(prebuilt, trx, err));
4682 }
4683
4684 /* For secondary indexes, purge any records that couldn't be purged
4685 during the page conversion phase. */
4686
4687 err = row_import_adjust_root_pages_of_secondary_indexes(
4688 trx, table, cfg);
4689
4690 DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4691 err = DB_CORRUPTION;);
4692
4693 if (err != DB_SUCCESS) {
4694 return(row_import_error(prebuilt, trx, err));
4695 }
4696
4697 /* Ensure that the next available DB_ROW_ID is not smaller than
4698 any DB_ROW_ID stored in the table. */
4699
4700 if (prebuilt->clust_index_was_generated) {
4701 row_import_set_sys_max_row_id(prebuilt, table);
4702 }
4703
4704 ib::info() << "Phase III - Flush changes to disk";
4705
4706 /* Ensure that all pages dirtied during the IMPORT make it to disk.
4707 The only dirty pages generated should be from the pessimistic purge
4708 of delete marked records that couldn't be purged in Phase I. */
4709
4710 {
4711 FlushObserver observer(prebuilt->table->space, trx, NULL);
4712 buf_LRU_flush_or_remove_pages(prebuilt->table->space_id,
4713 &observer);
4714
4715 if (observer.is_interrupted()) {
4716 ib::info() << "Phase III - Flush interrupted";
4717 return(row_import_error(prebuilt, trx,
4718 DB_INTERRUPTED));
4719 }
4720 }
4721
4722 ib::info() << "Phase IV - Flush complete";
4723 prebuilt->table->space->set_imported();
4724
4725 /* The dictionary latches will be released in in row_import_cleanup()
4726 after the transaction commit, for both success and error. */
4727
4728 row_mysql_lock_data_dictionary(trx);
4729
4730 /* Update the root pages of the table's indexes. */
4731 err = row_import_update_index_root(trx, table, false);
4732
4733 if (err != DB_SUCCESS) {
4734 return(row_import_error(prebuilt, trx, err));
4735 }
4736
4737 err = row_import_update_discarded_flag(trx, table->id, false);
4738
4739 if (err != DB_SUCCESS) {
4740 return(row_import_error(prebuilt, trx, err));
4741 }
4742
4743 table->file_unreadable = false;
4744 table->flags2 &= ~DICT_TF2_DISCARDED;
4745
4746 /* Set autoinc value read from .cfg file, if one was specified.
4747 Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4748 if (autoinc) {
4749 ib::info() << table->name << " autoinc value set to "
4750 << autoinc;
4751
4752 table->autoinc = autoinc--;
4753 btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4754 }
4755
4756 return(row_import_cleanup(prebuilt, trx, err));
4757 }
4758