1 /*****************************************************************************
2
3 Copyright (c) 2012, 2019, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0import.cc
29 Import a tablespace to a running instance.
30
31 Created 2012-02-08 by Sunny Bains.
32 *******************************************************/
33
34 #include "row0import.h"
35
36 #ifdef UNIV_NONINL
37 #include "row0import.ic"
38 #endif
39
40 #include "btr0pcur.h"
41 #include "que0que.h"
42 #include "dict0boot.h"
43 #include "ibuf0ibuf.h"
44 #include "pars0pars.h"
45 #include "row0upd.h"
46 #include "row0sel.h"
47 #include "row0mysql.h"
48 #include "srv0start.h"
49 #include "row0quiesce.h"
50
51 #include <vector>
52
53 /** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
54 reads to fail. If you set the buffer size to be greater than a multiple of the
55 file size then it will assert. TODO: Fix this limitation of the IO functions.
56 @param n - page size of the tablespace.
57 @retval number of pages */
58 #define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
59
60 /** For gathering stats on records during phase I */
61 struct row_stats_t {
62 ulint m_n_deleted; /*!< Number of deleted records
63 found in the index */
64
65 ulint m_n_purged; /*!< Number of records purged
66 optimisatically */
67
68 ulint m_n_rows; /*!< Number of rows */
69
70 ulint m_n_purge_failed; /*!< Number of deleted rows
71 that could not be purged */
72 };
73
74 /** Index information required by IMPORT. */
75 struct row_index_t {
76 index_id_t m_id; /*!< Index id of the table
77 in the exporting server */
78 byte* m_name; /*!< Index name */
79
80 ulint m_space; /*!< Space where it is placed */
81
82 ulint m_page_no; /*!< Root page number */
83
84 ulint m_type; /*!< Index type */
85
86 ulint m_trx_id_offset; /*!< Relevant only for clustered
87 indexes, offset of transaction
88 id system column */
89
90 ulint m_n_user_defined_cols; /*!< User defined columns */
91
92 ulint m_n_uniq; /*!< Number of columns that can
93 uniquely identify the row */
94
95 ulint m_n_nullable; /*!< Number of nullable
96 columns */
97
98 ulint m_n_fields; /*!< Total number of fields */
99
100 dict_field_t* m_fields; /*!< Index fields */
101
102 const dict_index_t*
103 m_srv_index; /*!< Index instance in the
104 importing server */
105
106 row_stats_t m_stats; /*!< Statistics gathered during
107 the import phase */
108
109 };
110
111 /** Meta data required by IMPORT. */
112 struct row_import {
row_importrow_import113 row_import() UNIV_NOTHROW
114 :
115 m_table(),
116 m_version(),
117 m_hostname(),
118 m_table_name(),
119 m_autoinc(),
120 m_page_size(),
121 m_flags(),
122 m_n_cols(),
123 m_cols(),
124 m_col_names(),
125 m_n_indexes(),
126 m_indexes(),
127 m_missing(true) { }
128
129 ~row_import() UNIV_NOTHROW;
130
131 /**
132 Find the index entry in in the indexes array.
133 @param name - index name
134 @return instance if found else 0. */
135 row_index_t* get_index(const char* name) const UNIV_NOTHROW;
136
137 /**
138 Get the number of rows in the index.
139 @param name - index name
140 @return number of rows (doesn't include delete marked rows). */
141 ulint get_n_rows(const char* name) const UNIV_NOTHROW;
142
143 /**
144 Find the ordinal value of the column name in the cfg table columns.
145 @param name - of column to look for.
146 @return ULINT_UNDEFINED if not found. */
147 ulint find_col(const char* name) const UNIV_NOTHROW;
148
149 /**
150 Get the number of rows for which purge failed during the convert phase.
151 @param name - index name
152 @return number of rows for which purge failed. */
153 ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
154
155 /**
156 Check if the index is clean. ie. no delete-marked records
157 @param name - index name
158 @return true if index needs to be purged. */
requires_purgerow_import159 bool requires_purge(const char* name) const UNIV_NOTHROW
160 {
161 return(get_n_purge_failed(name) > 0);
162 }
163
164 /**
165 Set the index root <space, pageno> using the index name */
166 void set_root_by_name() UNIV_NOTHROW;
167
168 /**
169 Set the index root <space, pageno> using a heuristic
170 @return DB_SUCCESS or error code */
171 dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172
173 /** Check if the index schema that was read from the .cfg file
174 matches the in memory index definition.
175 Note: It will update row_import_t::m_srv_index to map the meta-data
176 read from the .cfg file to the server index instance.
177 @return DB_SUCCESS or error code. */
178 dberr_t match_index_columns(
179 THD* thd,
180 const dict_index_t* index) UNIV_NOTHROW;
181
182 /**
183 Check if the table schema that was read from the .cfg file matches the
184 in memory table definition.
185 @param thd - MySQL session variable
186 @return DB_SUCCESS or error code. */
187 dberr_t match_table_columns(
188 THD* thd) UNIV_NOTHROW;
189
190 /**
191 Check if the table (and index) schema that was read from the .cfg file
192 matches the in memory table definition.
193 @param thd - MySQL session variable
194 @return DB_SUCCESS or error code. */
195 dberr_t match_schema(
196 THD* thd) UNIV_NOTHROW;
197
198 dict_table_t* m_table; /*!< Table instance */
199
200 ulint m_version; /*!< Version of config file */
201
202 byte* m_hostname; /*!< Hostname where the
203 tablespace was exported */
204 byte* m_table_name; /*!< Exporting instance table
205 name */
206
207 ib_uint64_t m_autoinc; /*!< Next autoinc value */
208
209 ulint m_page_size; /*!< Tablespace page size */
210
211 ulint m_flags; /*!< Table flags */
212
213 ulint m_n_cols; /*!< Number of columns in the
214 meta-data file */
215
216 dict_col_t* m_cols; /*!< Column data */
217
218 byte** m_col_names; /*!< Column names, we store the
219 column naems separately becuase
220 there is no field to store the
221 value in dict_col_t */
222
223 ulint m_n_indexes; /*!< Number of indexes,
224 including clustered index */
225
226 row_index_t* m_indexes; /*!< Index meta data */
227
228 bool m_missing; /*!< true if a .cfg file was
229 found and was readable */
230 };
231
232 /** Use the page cursor to iterate over records in a block. */
233 class RecIterator {
234 public:
235 /**
236 Default constructor */
RecIterator()237 RecIterator() UNIV_NOTHROW
238 {
239 memset(&m_cur, 0x0, sizeof(m_cur));
240 }
241
242 /**
243 Position the cursor on the first user record. */
open(buf_block_t * block)244 void open(buf_block_t* block) UNIV_NOTHROW
245 {
246 page_cur_set_before_first(block, &m_cur);
247
248 if (!end()) {
249 next();
250 }
251 }
252
253 /**
254 Move to the next record. */
next()255 void next() UNIV_NOTHROW
256 {
257 page_cur_move_to_next(&m_cur);
258 }
259
260 /**
261 @return the current record */
current()262 rec_t* current() UNIV_NOTHROW
263 {
264 ut_ad(!end());
265 return(page_cur_get_rec(&m_cur));
266 }
267
268 /**
269 @return true if cursor is at the end */
end()270 bool end() UNIV_NOTHROW
271 {
272 return(page_cur_is_after_last(&m_cur) == TRUE);
273 }
274
275 /** Remove the current record
276 @return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,ulint * offsets)277 bool remove(
278 const dict_index_t* index,
279 page_zip_des_t* page_zip,
280 ulint* offsets) UNIV_NOTHROW
281 {
282 /* We can't end up with an empty page unless it is root. */
283 if (page_get_n_recs(m_cur.block->frame) <= 1) {
284 return(false);
285 }
286
287 return(page_delete_rec(index, &m_cur, page_zip, offsets));
288 }
289
290 private:
291 page_cur_t m_cur;
292 };
293
294 /** Class that purges delete marked reocords from indexes, both secondary
295 and cluster. It does a pessimistic delete. This should only be done if we
296 couldn't purge the delete marked reocrds during Phase I. */
297 class IndexPurge {
298 public:
299 /** Constructor
300 @param trx - the user transaction covering the import tablespace
301 @param index - to be imported
302 @param space_id - space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)303 IndexPurge(
304 trx_t* trx,
305 dict_index_t* index) UNIV_NOTHROW
306 :
307 m_trx(trx),
308 m_index(index),
309 m_n_rows(0)
310 {
311 ib_logf(IB_LOG_LEVEL_INFO,
312 "Phase II - Purge records from index %s",
313 index->name);
314 }
315
316 /** Descructor */
~IndexPurge()317 ~IndexPurge() UNIV_NOTHROW { }
318
319 /** Purge delete marked records.
320 @return DB_SUCCESS or error code. */
321 dberr_t garbage_collect() UNIV_NOTHROW;
322
323 /** The number of records that are not delete marked.
324 @return total records in the index after purge */
get_n_rows() const325 ulint get_n_rows() const UNIV_NOTHROW
326 {
327 return(m_n_rows);
328 }
329
330 private:
331 /**
332 Begin import, position the cursor on the first record. */
333 void open() UNIV_NOTHROW;
334
335 /**
336 Close the persistent curosr and commit the mini-transaction. */
337 void close() UNIV_NOTHROW;
338
339 /**
340 Position the cursor on the next record.
341 @return DB_SUCCESS or error code */
342 dberr_t next() UNIV_NOTHROW;
343
344 /**
345 Store the persistent cursor position and reopen the
346 B-tree cursor in BTR_MODIFY_TREE mode, because the
347 tree structure may be changed during a pessimistic delete. */
348 void purge_pessimistic_delete() UNIV_NOTHROW;
349
350 /**
351 Purge delete-marked records.
352 @param offsets - current row offsets. */
353 void purge() UNIV_NOTHROW;
354
355 protected:
356 // Disable copying
357 IndexPurge();
358 IndexPurge(const IndexPurge&);
359 IndexPurge &operator=(const IndexPurge&);
360
361 private:
362 trx_t* m_trx; /*!< User transaction */
363 mtr_t m_mtr; /*!< Mini-transaction */
364 btr_pcur_t m_pcur; /*!< Persistent cursor */
365 dict_index_t* m_index; /*!< Index to be processed */
366 ulint m_n_rows; /*!< Records in index */
367 };
368
369 /** Functor that is called for each physical page that is read from the
370 tablespace file. */
371 class AbstractCallback : public PageCallback {
372 public:
373 /** Constructor
374 @param trx - covering transaction */
AbstractCallback(trx_t * trx)375 AbstractCallback(trx_t* trx)
376 :
377 m_trx(trx),
378 m_space(ULINT_UNDEFINED),
379 m_xdes(),
380 m_xdes_page_no(ULINT_UNDEFINED),
381 m_space_flags(ULINT_UNDEFINED),
382 m_table_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
383
384 /**
385 Free any extent descriptor instance */
~AbstractCallback()386 virtual ~AbstractCallback()
387 {
388 delete [] m_xdes;
389 }
390
391 /** Determine the page size to use for traversing the tablespace
392 @param file_size - size of the tablespace file in bytes
393 @param block - contents of the first page in the tablespace file.
394 @retval DB_SUCCESS or error code. */
395 virtual dberr_t init(
396 os_offset_t file_size,
397 const buf_block_t* block) UNIV_NOTHROW;
398
399 /** @return true if compressed table. */
is_compressed_table() const400 bool is_compressed_table() const UNIV_NOTHROW
401 {
402 return(get_zip_size() > 0);
403 }
404
405 protected:
406 /**
407 Get the data page depending on the table type, compressed or not.
408 @param block - block read from disk
409 @retval the buffer frame */
get_frame(buf_block_t * block) const410 buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
411 {
412 if (is_compressed_table()) {
413 return(block->page.zip.data);
414 }
415
416 return(buf_block_get_frame(block));
417 }
418
419 /** Check for session interrupt. If required we could
420 even flush to disk here every N pages.
421 @retval DB_SUCCESS or error code */
periodic_check()422 dberr_t periodic_check() UNIV_NOTHROW
423 {
424 if (trx_is_interrupted(m_trx)) {
425 return(DB_INTERRUPTED);
426 }
427
428 return(DB_SUCCESS);
429 }
430
431 /**
432 Get the physical offset of the extent descriptor within the page.
433 @param page_no - page number of the extent descriptor
434 @param page - contents of the page containing the extent descriptor.
435 @return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const436 const xdes_t* xdes(
437 ulint page_no,
438 const page_t* page) const UNIV_NOTHROW
439 {
440 ulint offset;
441
442 offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
443
444 return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
445 }
446
447 /**
448 Set the current page directory (xdes). If the extent descriptor is
449 marked as free then free the current extent descriptor and set it to
450 0. This implies that all pages that are covered by this extent
451 descriptor are also freed.
452
453 @param page_no - offset of page within the file
454 @param page - page contents
455 @return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)456 dberr_t set_current_xdes(
457 ulint page_no,
458 const page_t* page) UNIV_NOTHROW
459 {
460 m_xdes_page_no = page_no;
461
462 delete[] m_xdes;
463
464 m_xdes = 0;
465
466 ulint state;
467 const xdes_t* xdesc = page + XDES_ARR_OFFSET;
468
469 state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
470
471 if (state != XDES_FREE) {
472
473 m_xdes = new(std::nothrow) xdes_t[m_page_size];
474
475 /* Trigger OOM */
476 DBUG_EXECUTE_IF("ib_import_OOM_13",
477 delete [] m_xdes; m_xdes = 0;);
478
479 if (m_xdes == 0) {
480 return(DB_OUT_OF_MEMORY);
481 }
482
483 memcpy(m_xdes, page, m_page_size);
484 }
485
486 return(DB_SUCCESS);
487 }
488
489 /**
490 @return true if it is a root page */
is_root_page(const page_t * page) const491 bool is_root_page(const page_t* page) const UNIV_NOTHROW
492 {
493 ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
494
495 return(mach_read_from_4(page + FIL_PAGE_NEXT) == FIL_NULL
496 && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL);
497 }
498
499 /**
500 Check if the page is marked as free in the extent descriptor.
501 @param page_no - page number to check in the extent descriptor.
502 @return true if the page is marked as free */
is_free(ulint page_no) const503 bool is_free(ulint page_no) const UNIV_NOTHROW
504 {
505 ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
506 == m_xdes_page_no);
507
508 if (m_xdes != 0) {
509 const xdes_t* xdesc = xdes(page_no, m_xdes);
510 ulint pos = page_no % FSP_EXTENT_SIZE;
511
512 return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
513 }
514
515 /* If the current xdes was free, the page must be free. */
516 return(true);
517 }
518
519 protected:
520 /** Covering transaction. */
521 trx_t* m_trx;
522
523 /** Space id of the file being iterated over. */
524 ulint m_space;
525
526 /** Minimum page number for which the free list has not been
527 initialized: the pages >= this limit are, by definition, free;
528 note that in a single-table tablespace where size < 64 pages,
529 this number is 64, i.e., we have initialized the space about
530 the first extent, but have not physically allocted those pages
531 to the file. @see FSP_LIMIT. */
532 ulint m_free_limit;
533
534 /** Current size of the space in pages */
535 ulint m_size;
536
537 /** Current extent descriptor page */
538 xdes_t* m_xdes;
539
540 /** Physical page offset in the file of the extent descriptor */
541 ulint m_xdes_page_no;
542
543 /** Flags value read from the header page */
544 ulint m_space_flags;
545
546 /** Derived from m_space_flags and row format type, the row format
547 type is determined from the page header. */
548 ulint m_table_flags;
549 };
550
551 /** Determine the page size to use for traversing the tablespace
552 @param file_size - size of the tablespace file in bytes
553 @param block - contents of the first page in the tablespace file.
554 @retval DB_SUCCESS or error code. */
555 dberr_t
init(os_offset_t file_size,const buf_block_t * block)556 AbstractCallback::init(
557 os_offset_t file_size,
558 const buf_block_t* block) UNIV_NOTHROW
559 {
560 const page_t* page = block->frame;
561
562 m_space_flags = fsp_header_get_flags(page);
563
564 /* Since we don't know whether it is a compressed table
565 or not, the data is always read into the block->frame. */
566
567 dberr_t err = set_zip_size(block->frame);
568
569 if (err != DB_SUCCESS) {
570 return(DB_CORRUPTION);
571 }
572
573 /* Set the page size used to traverse the tablespace. */
574
575 m_page_size = (is_compressed_table())
576 ? get_zip_size() : fsp_flags_get_page_size(m_space_flags);
577
578 if (m_page_size == 0) {
579 ib_logf(IB_LOG_LEVEL_ERROR, "Page size is 0");
580 return(DB_CORRUPTION);
581 } else if (!is_compressed_table() && m_page_size != UNIV_PAGE_SIZE) {
582
583 ib_logf(IB_LOG_LEVEL_ERROR,
584 "Page size %lu of ibd file is not the same "
585 "as the server page size %lu",
586 m_page_size, UNIV_PAGE_SIZE);
587
588 return(DB_CORRUPTION);
589
590 } else if ((file_size % m_page_size)) {
591
592 ib_logf(IB_LOG_LEVEL_ERROR,
593 "File size " UINT64PF " is not a multiple "
594 "of the page size %lu",
595 (ib_uint64_t) file_size, (ulong) m_page_size);
596
597 return(DB_CORRUPTION);
598 }
599
600 ut_a(m_space == ULINT_UNDEFINED);
601
602 m_size = mach_read_from_4(page + FSP_SIZE);
603 m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
604 m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
605
606 if ((err = set_current_xdes(0, page)) != DB_SUCCESS) {
607 return(err);
608 }
609
610 return(DB_SUCCESS);
611 }
612
613 /**
614 Try and determine the index root pages by checking if the next/prev
615 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
616 struct FetchIndexRootPages : public AbstractCallback {
617
618 /** Index information gathered from the .ibd file. */
619 struct Index {
620
IndexFetchIndexRootPages::Index621 Index(index_id_t id, ulint page_no)
622 :
623 m_id(id),
624 m_page_no(page_no) { }
625
626 index_id_t m_id; /*!< Index id */
627 ulint m_page_no; /*!< Root page number */
628 };
629
630 typedef std::vector<Index> Indexes;
631
632 /** Constructor
633 @param trx - covering (user) transaction
634 @param table - table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages635 FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
636 :
637 AbstractCallback(trx),
638 m_table(table) UNIV_NOTHROW { }
639
640 /** Destructor */
~FetchIndexRootPagesFetchIndexRootPages641 virtual ~FetchIndexRootPages() UNIV_NOTHROW { }
642
643 /**
644 @retval the space id of the tablespace being iterated over */
get_space_idFetchIndexRootPages645 virtual ulint get_space_id() const UNIV_NOTHROW
646 {
647 return(m_space);
648 }
649
650 /**
651 @retval the space flags of the tablespace being iterated over */
get_space_flagsFetchIndexRootPages652 virtual ulint get_space_flags() const UNIV_NOTHROW
653 {
654 return(m_space_flags);
655 }
656
657 /**
658 Check if the .ibd file row format is the same as the table's.
659 @param ibd_table_flags - determined from space and page.
660 @return DB_SUCCESS or error code. */
check_row_formatFetchIndexRootPages661 dberr_t check_row_format(ulint ibd_table_flags) UNIV_NOTHROW
662 {
663 dberr_t err;
664 rec_format_t ibd_rec_format;
665 rec_format_t table_rec_format;
666
667 if (!dict_tf_is_valid(ibd_table_flags)) {
668
669 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
670 ER_TABLE_SCHEMA_MISMATCH,
671 ".ibd file has invlad table flags: %lx",
672 ibd_table_flags);
673
674 return(DB_CORRUPTION);
675 }
676
677 ibd_rec_format = dict_tf_get_rec_format(ibd_table_flags);
678 table_rec_format = dict_tf_get_rec_format(m_table->flags);
679
680 if (table_rec_format != ibd_rec_format) {
681
682 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
683 ER_TABLE_SCHEMA_MISMATCH,
684 "Table has %s row format, .ibd "
685 "file has %s row format.",
686 dict_tf_to_row_format_string(m_table->flags),
687 dict_tf_to_row_format_string(ibd_table_flags));
688
689 err = DB_CORRUPTION;
690 } else {
691 err = DB_SUCCESS;
692 }
693
694 return(err);
695 }
696
697 /**
698 Called for each block as it is read from the file.
699 @param offset - physical offset in the file
700 @param block - block to convert, it is not from the buffer pool.
701 @retval DB_SUCCESS or error code. */
702 virtual dberr_t operator() (
703 os_offset_t offset,
704 buf_block_t* block) UNIV_NOTHROW;
705
706 /** Update the import configuration that will be used to import
707 the tablespace. */
708 dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
709
710 /** Table definition in server. */
711 const dict_table_t* m_table;
712
713 /** Index information */
714 Indexes m_indexes;
715 };
716
717 /**
718 Called for each block as it is read from the file. Check index pages to
719 determine the exact row format. We can't get that from the tablespace
720 header flags alone.
721
722 @param offset - physical offset in the file
723 @param block - block to convert, it is not from the buffer pool.
724 @retval DB_SUCCESS or error code. */
725 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)726 FetchIndexRootPages::operator() (
727 os_offset_t offset,
728 buf_block_t* block) UNIV_NOTHROW
729 {
730 dberr_t err;
731
732 if ((err = periodic_check()) != DB_SUCCESS) {
733 return(err);
734 }
735
736 const page_t* page = get_frame(block);
737
738 ulint page_type = fil_page_get_type(page);
739
740 if (block->page.offset * m_page_size != offset) {
741 ib_logf(IB_LOG_LEVEL_ERROR,
742 "Page offset doesn't match file offset: "
743 "page offset: %lu, file offset: %lu",
744 (ulint) block->page.offset,
745 (ulint) (offset / m_page_size));
746
747 err = DB_CORRUPTION;
748 } else if (page_type == FIL_PAGE_TYPE_XDES) {
749 err = set_current_xdes(block->page.offset, page);
750 } else if (page_type == FIL_PAGE_INDEX
751 && !is_free(block->page.offset)
752 && is_root_page(page)) {
753
754 index_id_t id = btr_page_get_index_id(page);
755 ulint page_no = buf_block_get_page_no(block);
756
757 m_indexes.push_back(Index(id, page_no));
758
759 if (m_indexes.size() == 1) {
760
761 m_table_flags = dict_sys_tables_type_to_tf(
762 m_space_flags,
763 page_is_comp(page) ? DICT_N_COLS_COMPACT : 0);
764
765 err = check_row_format(m_table_flags);
766 }
767 }
768
769 return(err);
770 }
771
772 /**
773 Update the import configuration that will be used to import the tablespace.
774 @return error code or DB_SUCCESS */
775 dberr_t
build_row_import(row_import * cfg) const776 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
777 {
778 Indexes::const_iterator end = m_indexes.end();
779
780 ut_a(cfg->m_table == m_table);
781 cfg->m_page_size = m_page_size;
782 cfg->m_n_indexes = m_indexes.size();
783
784 if (cfg->m_n_indexes == 0) {
785
786 ib_logf(IB_LOG_LEVEL_ERROR, "No B+Tree found in tablespace");
787
788 return(DB_CORRUPTION);
789 }
790
791 cfg->m_indexes = new(std::nothrow) row_index_t[cfg->m_n_indexes];
792
793 /* Trigger OOM */
794 DBUG_EXECUTE_IF("ib_import_OOM_11",
795 delete [] cfg->m_indexes; cfg->m_indexes = 0;);
796
797 if (cfg->m_indexes == 0) {
798 return(DB_OUT_OF_MEMORY);
799 }
800
801 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
802
803 row_index_t* cfg_index = cfg->m_indexes;
804
805 for (Indexes::const_iterator it = m_indexes.begin();
806 it != end;
807 ++it, ++cfg_index) {
808
809 char name[BUFSIZ];
810
811 ut_snprintf(name, sizeof(name), "index" IB_ID_FMT, it->m_id);
812
813 ulint len = strlen(name) + 1;
814
815 cfg_index->m_name = new(std::nothrow) byte[len];
816
817 /* Trigger OOM */
818 DBUG_EXECUTE_IF("ib_import_OOM_12",
819 delete [] cfg_index->m_name;
820 cfg_index->m_name = 0;);
821
822 if (cfg_index->m_name == 0) {
823 return(DB_OUT_OF_MEMORY);
824 }
825
826 memcpy(cfg_index->m_name, name, len);
827
828 cfg_index->m_id = it->m_id;
829
830 cfg_index->m_space = m_space;
831
832 cfg_index->m_page_no = it->m_page_no;
833 }
834
835 return(DB_SUCCESS);
836 }
837
838 /* Functor that is called for each physical page that is read from the
839 tablespace file.
840
841 1. Check each page for corruption.
842
843 2. Update the space id and LSN on every page
844 * For the header page
845 - Validate the flags
846 - Update the LSN
847
848 3. On Btree pages
849 * Set the index id
850 * Update the max trx id
851 * In a cluster index, update the system columns
852 * In a cluster index, update the BLOB ptr, set the space id
853 * Purge delete marked records, but only if they can be easily
854 removed from the page
855 * Keep a counter of number of rows, ie. non-delete-marked rows
856 * Keep a counter of number of delete marked rows
857 * Keep a counter of number of purge failure
858 * If a page is stamped with an index id that isn't in the .cfg file
859 we assume it is deleted and the page can be ignored.
860
861 4. Set the page state to dirty so that it will be written to disk.
862 */
863 class PageConverter : public AbstractCallback {
864 public:
865 /** Constructor
866 * @param cfg - config of table being imported.
867 * @param trx - transaction covering the import */
868 PageConverter(row_import* cfg, trx_t* trx) UNIV_NOTHROW;
869
~PageConverter()870 virtual ~PageConverter() UNIV_NOTHROW
871 {
872 if (m_heap != 0) {
873 mem_heap_free(m_heap);
874 }
875 }
876
877 /**
878 @retval the server space id of the tablespace being iterated over */
get_space_id() const879 virtual ulint get_space_id() const UNIV_NOTHROW
880 {
881 return(m_cfg->m_table->space);
882 }
883
884 /**
885 Called for each block as it is read from the file.
886 @param offset - physical offset in the file
887 @param block - block to convert, it is not from the buffer pool.
888 @retval DB_SUCCESS or error code. */
889 virtual dberr_t operator() (
890 os_offset_t offset,
891 buf_block_t* block) UNIV_NOTHROW;
892 private:
893
894 /** Status returned by PageConverter::validate() */
895 enum import_page_status_t {
896 IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
897 IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
898 IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
899 };
900
901 /**
902 Update the page, set the space id, max trx id and index id.
903 @param block - block read from file
904 @param page_type - type of the page
905 @retval DB_SUCCESS or error code */
906 dberr_t update_page(
907 buf_block_t* block,
908 ulint& page_type) UNIV_NOTHROW;
909
910 #if defined UNIV_DEBUG
911 /**
912 @return true error condition is enabled. */
trigger_corruption()913 bool trigger_corruption() UNIV_NOTHROW
914 {
915 return(false);
916 }
917 #else
918 #define trigger_corruption() (false)
919 #endif /* UNIV_DEBUG */
920
921 /**
922 Update the space, index id, trx id.
923 @param block - block to convert
924 @return DB_SUCCESS or error code */
925 dberr_t update_index_page(buf_block_t* block) UNIV_NOTHROW;
926
927 /** Update the BLOB refrences and write UNDO log entries for
928 rows that can't be purged optimistically.
929 @param block - block to update
930 @retval DB_SUCCESS or error code */
931 dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
932
933 /**
934 Validate the page, check for corruption.
935 @param offset - physical offset within file.
936 @param page - page read from file.
937 @return 0 on success, 1 if all zero, 2 if corrupted */
938 import_page_status_t validate(
939 os_offset_t offset,
940 buf_block_t* page) UNIV_NOTHROW;
941
942 /**
943 Validate the space flags and update tablespace header page.
944 @param block - block read from file, not from the buffer pool.
945 @retval DB_SUCCESS or error code */
946 dberr_t update_header(buf_block_t* block) UNIV_NOTHROW;
947
948 /**
949 Adjust the BLOB reference for a single column that is externally stored
950 @param rec - record to update
951 @param offsets - column offsets for the record
952 @param i - column ordinal value
953 @return DB_SUCCESS or error code */
954 dberr_t adjust_cluster_index_blob_column(
955 rec_t* rec,
956 const ulint* offsets,
957 ulint i) UNIV_NOTHROW;
958
959 /**
960 Adjusts the BLOB reference in the clustered index row for all
961 externally stored columns.
962 @param rec - record to update
963 @param offsets - column offsets for the record
964 @return DB_SUCCESS or error code */
965 dberr_t adjust_cluster_index_blob_columns(
966 rec_t* rec,
967 const ulint* offsets) UNIV_NOTHROW;
968
969 /**
970 In the clustered index, adjist the BLOB pointers as needed.
971 Also update the BLOB reference, write the new space id.
972 @param rec - record to update
973 @param offsets - column offsets for the record
974 @return DB_SUCCESS or error code */
975 dberr_t adjust_cluster_index_blob_ref(
976 rec_t* rec,
977 const ulint* offsets) UNIV_NOTHROW;
978
979 /**
980 Purge delete-marked records, only if it is possible to do
981 so without re-organising the B+tree.
982 @param offsets - current row offsets.
983 @retval true if purged */
984 bool purge(const ulint* offsets) UNIV_NOTHROW;
985
986 /**
987 Adjust the BLOB references and sys fields for the current record.
988 @param index - the index being converted
989 @param rec - record to update
990 @param offsets - column offsets for the record
991 @param deleted - true if row is delete marked
992 @return DB_SUCCESS or error code. */
993 dberr_t adjust_cluster_record(
994 const dict_index_t* index,
995 rec_t* rec,
996 const ulint* offsets,
997 bool deleted) UNIV_NOTHROW;
998
999 /**
1000 Find an index with the matching id.
1001 @return row_index_t* instance or 0 */
find_index(index_id_t id)1002 row_index_t* find_index(index_id_t id) UNIV_NOTHROW
1003 {
1004 row_index_t* index = &m_cfg->m_indexes[0];
1005
1006 for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
1007 if (id == index->m_id) {
1008 return(index);
1009 }
1010 }
1011
1012 return(0);
1013
1014 }
1015 private:
1016 /** Config for table that is being imported. */
1017 row_import* m_cfg;
1018
1019 /** Current index whose pages are being imported */
1020 row_index_t* m_index;
1021
1022 /** Current system LSN */
1023 lsn_t m_current_lsn;
1024
1025 /** Alias for m_page_zip, only set for compressed pages. */
1026 page_zip_des_t* m_page_zip_ptr;
1027
1028 /** Iterator over records in a block */
1029 RecIterator m_rec_iter;
1030
1031 /** Record offset */
1032 ulint m_offsets_[REC_OFFS_NORMAL_SIZE];
1033
1034 /** Pointer to m_offsets_ */
1035 ulint* m_offsets;
1036
1037 /** Memory heap for the record offsets */
1038 mem_heap_t* m_heap;
1039
1040 /** Cluster index instance */
1041 dict_index_t* m_cluster_index;
1042 };
1043
1044 /**
1045 row_import destructor. */
~row_import()1046 row_import::~row_import() UNIV_NOTHROW
1047 {
1048 for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
1049 delete [] m_indexes[i].m_name;
1050
1051 if (m_indexes[i].m_fields == 0) {
1052 continue;
1053 }
1054
1055 dict_field_t* fields = m_indexes[i].m_fields;
1056 ulint n_fields = m_indexes[i].m_n_fields;
1057
1058 for (ulint j = 0; j < n_fields; ++j) {
1059 delete [] fields[j].name;
1060 }
1061
1062 delete [] fields;
1063 }
1064
1065 for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
1066 delete [] m_col_names[i];
1067 }
1068
1069 delete [] m_cols;
1070 delete [] m_indexes;
1071 delete [] m_col_names;
1072 delete [] m_table_name;
1073 delete [] m_hostname;
1074 }
1075
1076 /**
1077 Find the index entry in in the indexes array.
1078 @param name - index name
1079 @return instance if found else 0. */
1080 row_index_t*
get_index(const char * name) const1081 row_import::get_index(
1082 const char* name) const UNIV_NOTHROW
1083 {
1084 for (ulint i = 0; i < m_n_indexes; ++i) {
1085 const char* index_name;
1086 row_index_t* index = &m_indexes[i];
1087
1088 index_name = reinterpret_cast<const char*>(index->m_name);
1089
1090 if (strcmp(index_name, name) == 0) {
1091
1092 return(index);
1093 }
1094 }
1095
1096 return(0);
1097 }
1098
1099 /**
1100 Get the number of rows in the index.
1101 @param name - index name
1102 @return number of rows (doesn't include delete marked rows). */
1103 ulint
get_n_rows(const char * name) const1104 row_import::get_n_rows(
1105 const char* name) const UNIV_NOTHROW
1106 {
1107 const row_index_t* index = get_index(name);
1108
1109 ut_a(name != 0);
1110
1111 return(index->m_stats.m_n_rows);
1112 }
1113
1114 /**
1115 Get the number of rows for which purge failed uding the convert phase.
1116 @param name - index name
1117 @return number of rows for which purge failed. */
1118 ulint
get_n_purge_failed(const char * name) const1119 row_import::get_n_purge_failed(
1120 const char* name) const UNIV_NOTHROW
1121 {
1122 const row_index_t* index = get_index(name);
1123
1124 ut_a(name != 0);
1125
1126 return(index->m_stats.m_n_purge_failed);
1127 }
1128
1129 /**
1130 Find the ordinal value of the column name in the cfg table columns.
1131 @param name - of column to look for.
1132 @return ULINT_UNDEFINED if not found. */
1133 ulint
find_col(const char * name) const1134 row_import::find_col(
1135 const char* name) const UNIV_NOTHROW
1136 {
1137 for (ulint i = 0; i < m_n_cols; ++i) {
1138 const char* col_name;
1139
1140 col_name = reinterpret_cast<const char*>(m_col_names[i]);
1141
1142 if (strcmp(col_name, name) == 0) {
1143 return(i);
1144 }
1145 }
1146
1147 return(ULINT_UNDEFINED);
1148 }
1149
1150 /**
1151 Check if the index schema that was read from the .cfg file matches the
1152 in memory index definition.
1153 @return DB_SUCCESS or error code. */
1154 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1155 row_import::match_index_columns(
1156 THD* thd,
1157 const dict_index_t* index) UNIV_NOTHROW
1158 {
1159 row_index_t* cfg_index;
1160 dberr_t err = DB_SUCCESS;
1161
1162 cfg_index = get_index(index->name);
1163
1164 if (cfg_index == 0) {
1165 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1166 ER_TABLE_SCHEMA_MISMATCH,
1167 "Index %s not found in tablespace meta-data file.",
1168 index->name);
1169
1170 return(DB_ERROR);
1171 }
1172
1173 if (cfg_index->m_n_fields != index->n_fields) {
1174
1175 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1176 ER_TABLE_SCHEMA_MISMATCH,
1177 "Index field count %lu doesn't match"
1178 " tablespace metadata file value %lu",
1179 (ulong) index->n_fields,
1180 (ulong) cfg_index->m_n_fields);
1181
1182 return(DB_ERROR);
1183 }
1184
1185 cfg_index->m_srv_index = index;
1186
1187 const dict_field_t* field = index->fields;
1188 const dict_field_t* cfg_field = cfg_index->m_fields;
1189
1190 for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1191
1192 if (strcmp(field->name, cfg_field->name) != 0) {
1193 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1194 ER_TABLE_SCHEMA_MISMATCH,
1195 "Index field name %s doesn't match"
1196 " tablespace metadata field name %s"
1197 " for field position %lu",
1198 field->name, cfg_field->name, (ulong) i);
1199
1200 err = DB_ERROR;
1201 }
1202
1203 if (cfg_field->prefix_len != field->prefix_len) {
1204 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1205 ER_TABLE_SCHEMA_MISMATCH,
1206 "Index %s field %s prefix len %lu"
1207 " doesn't match metadata file value"
1208 " %lu",
1209 index->name, field->name,
1210 (ulong) field->prefix_len,
1211 (ulong) cfg_field->prefix_len);
1212
1213 err = DB_ERROR;
1214 }
1215
1216 if (cfg_field->fixed_len != field->fixed_len) {
1217 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1218 ER_TABLE_SCHEMA_MISMATCH,
1219 "Index %s field %s fixed len %lu"
1220 " doesn't match metadata file value"
1221 " %lu",
1222 index->name, field->name,
1223 (ulong) field->fixed_len,
1224 (ulong) cfg_field->fixed_len);
1225
1226 err = DB_ERROR;
1227 }
1228 }
1229
1230 return(err);
1231 }
1232
1233 /**
1234 Check if the table schema that was read from the .cfg file matches the
1235 in memory table definition.
1236 @param thd - MySQL session variable
1237 @return DB_SUCCESS or error code. */
1238 dberr_t
match_table_columns(THD * thd)1239 row_import::match_table_columns(
1240 THD* thd) UNIV_NOTHROW
1241 {
1242 dberr_t err = DB_SUCCESS;
1243 const dict_col_t* col = m_table->cols;
1244
1245 for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1246
1247 const char* col_name;
1248 ulint cfg_col_index;
1249
1250 col_name = dict_table_get_col_name(
1251 m_table, dict_col_get_no(col));
1252
1253 cfg_col_index = find_col(col_name);
1254
1255 if (cfg_col_index == ULINT_UNDEFINED) {
1256
1257 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1258 ER_TABLE_SCHEMA_MISMATCH,
1259 "Column %s not found in tablespace.",
1260 col_name);
1261
1262 err = DB_ERROR;
1263 } else if (cfg_col_index != col->ind) {
1264
1265 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1266 ER_TABLE_SCHEMA_MISMATCH,
1267 "Column %s ordinal value mismatch, it's at "
1268 "%lu in the table and %lu in the tablespace "
1269 "meta-data file",
1270 col_name,
1271 (ulong) col->ind, (ulong) cfg_col_index);
1272
1273 err = DB_ERROR;
1274 } else {
1275 const dict_col_t* cfg_col;
1276
1277 cfg_col = &m_cols[cfg_col_index];
1278 ut_a(cfg_col->ind == cfg_col_index);
1279
1280 if (cfg_col->prtype != col->prtype) {
1281 ib_errf(thd,
1282 IB_LOG_LEVEL_ERROR,
1283 ER_TABLE_SCHEMA_MISMATCH,
1284 "Column %s precise type mismatch.",
1285 col_name);
1286 err = DB_ERROR;
1287 }
1288
1289 if (cfg_col->mtype != col->mtype) {
1290 ib_errf(thd,
1291 IB_LOG_LEVEL_ERROR,
1292 ER_TABLE_SCHEMA_MISMATCH,
1293 "Column %s main type mismatch.",
1294 col_name);
1295 err = DB_ERROR;
1296 }
1297
1298 if (cfg_col->len != col->len) {
1299 ib_errf(thd,
1300 IB_LOG_LEVEL_ERROR,
1301 ER_TABLE_SCHEMA_MISMATCH,
1302 "Column %s length mismatch.",
1303 col_name);
1304 err = DB_ERROR;
1305 }
1306
1307 if (cfg_col->mbminmaxlen != col->mbminmaxlen) {
1308 ib_errf(thd,
1309 IB_LOG_LEVEL_ERROR,
1310 ER_TABLE_SCHEMA_MISMATCH,
1311 "Column %s multi-byte len mismatch.",
1312 col_name);
1313 err = DB_ERROR;
1314 }
1315
1316 if (cfg_col->ind != col->ind) {
1317 err = DB_ERROR;
1318 }
1319
1320 if (cfg_col->ord_part != col->ord_part) {
1321 ib_errf(thd,
1322 IB_LOG_LEVEL_ERROR,
1323 ER_TABLE_SCHEMA_MISMATCH,
1324 "Column %s ordering mismatch.",
1325 col_name);
1326 err = DB_ERROR;
1327 }
1328
1329 if (cfg_col->max_prefix != col->max_prefix) {
1330 ib_errf(thd,
1331 IB_LOG_LEVEL_ERROR,
1332 ER_TABLE_SCHEMA_MISMATCH,
1333 "Column %s max prefix mismatch.",
1334 col_name);
1335 err = DB_ERROR;
1336 }
1337 }
1338 }
1339
1340 return(err);
1341 }
1342
1343 /**
1344 Check if the table (and index) schema that was read from the .cfg file
1345 matches the in memory table definition.
1346 @param thd - MySQL session variable
1347 @return DB_SUCCESS or error code. */
1348 dberr_t
match_schema(THD * thd)1349 row_import::match_schema(
1350 THD* thd) UNIV_NOTHROW
1351 {
1352 /* Do some simple checks. */
1353
1354 if (m_flags != m_table->flags) {
1355 if (dict_tf_to_row_format_string(m_flags) !=
1356 dict_tf_to_row_format_string(m_table->flags)) {
1357 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1358 ER_TABLE_SCHEMA_MISMATCH,
1359 "Table flags don't match,"
1360 "server table has %s "
1361 "and the meta-data file has %s",
1362 dict_tf_to_row_format_string(m_table->flags),
1363 dict_tf_to_row_format_string(m_flags));
1364 } else if (DICT_TF_HAS_DATA_DIR(m_flags) !=
1365 DICT_TF_HAS_DATA_DIR(m_table->flags)) {
1366 /* If the meta-data flag is set for data_dir,
1367 but table flag is not set for data_dir or vice versa
1368 then return error. */
1369 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1370 ER_TABLE_SCHEMA_MISMATCH,
1371 "Table location flags do not match. "
1372 "The source table %s a DATA DIRECTORY "
1373 "but the destination table %s.",
1374 (DICT_TF_HAS_DATA_DIR(m_flags) ? "uses"
1375 : "does not use"),
1376 (DICT_TF_HAS_DATA_DIR(m_table->flags) ? "does"
1377 : "does not"));
1378 } else {
1379 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1380 ER_TABLE_SCHEMA_MISMATCH,
1381 "Table flags don't match");
1382 }
1383 return(DB_ERROR);
1384 } else if (m_table->n_cols != m_n_cols) {
1385 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1386 "Number of columns don't match, table has %lu "
1387 "columns but the tablespace meta-data file has "
1388 "%lu columns",
1389 (ulong) m_table->n_cols, (ulong) m_n_cols);
1390
1391 return(DB_ERROR);
1392 } else if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1393
1394 /* If the number of indexes don't match then it is better
1395 to abort the IMPORT. It is easy for the user to create a
1396 table matching the IMPORT definition. */
1397
1398 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1399 "Number of indexes don't match, table has %lu "
1400 "indexes but the tablespace meta-data file has "
1401 "%lu indexes",
1402 (ulong) UT_LIST_GET_LEN(m_table->indexes),
1403 (ulong) m_n_indexes);
1404
1405 return(DB_ERROR);
1406 }
1407
1408 dberr_t err = match_table_columns(thd);
1409
1410 if (err != DB_SUCCESS) {
1411 return(err);
1412 }
1413
1414 /* Check if the index definitions match. */
1415
1416 const dict_index_t* index;
1417
1418 for (index = UT_LIST_GET_FIRST(m_table->indexes);
1419 index != 0;
1420 index = UT_LIST_GET_NEXT(indexes, index)) {
1421
1422 dberr_t index_err;
1423
1424 index_err = match_index_columns(thd, index);
1425
1426 if (index_err != DB_SUCCESS) {
1427 err = index_err;
1428 }
1429 }
1430
1431 return(err);
1432 }
1433
1434 /**
1435 Set the index root <space, pageno>, using index name. */
1436 void
set_root_by_name()1437 row_import::set_root_by_name() UNIV_NOTHROW
1438 {
1439 row_index_t* cfg_index = m_indexes;
1440
1441 for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1442 dict_index_t* index;
1443
1444 const char* index_name;
1445
1446 index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1447
1448 index = dict_table_get_index_on_name(m_table, index_name);
1449
1450 /* We've already checked that it exists. */
1451 ut_a(index != 0);
1452
1453 /* Set the root page number and space id. */
1454 index->space = m_table->space;
1455 index->page = cfg_index->m_page_no;
1456 }
1457 }
1458
1459 /**
1460 Set the index root <space, pageno>, using a heuristic.
1461 @return DB_SUCCESS or error code */
1462 dberr_t
set_root_by_heuristic()1463 row_import::set_root_by_heuristic() UNIV_NOTHROW
1464 {
1465 row_index_t* cfg_index = m_indexes;
1466
1467 ut_a(m_n_indexes > 0);
1468
1469 // TODO: For now use brute force, based on ordinality
1470
1471 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1472
1473 char table_name[MAX_FULL_NAME_LEN + 1];
1474
1475 innobase_format_name(
1476 table_name, sizeof(table_name), m_table->name, FALSE);
1477
1478 ib_logf(IB_LOG_LEVEL_WARN,
1479 "Table %s should have %lu indexes but the tablespace "
1480 "has %lu indexes",
1481 table_name,
1482 UT_LIST_GET_LEN(m_table->indexes),
1483 m_n_indexes);
1484 }
1485
1486 dict_mutex_enter_for_mysql();
1487
1488 ulint i = 0;
1489 dberr_t err = DB_SUCCESS;
1490
1491 for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1492 index != 0;
1493 index = UT_LIST_GET_NEXT(indexes, index)) {
1494
1495 if (index->type & DICT_FTS) {
1496 index->type |= DICT_CORRUPT;
1497 ib_logf(IB_LOG_LEVEL_WARN,
1498 "Skipping FTS index: %s", index->name);
1499 } else if (i < m_n_indexes) {
1500
1501 delete [] cfg_index[i].m_name;
1502
1503 ulint len = strlen(index->name) + 1;
1504
1505 cfg_index[i].m_name = new(std::nothrow) byte[len];
1506
1507 /* Trigger OOM */
1508 DBUG_EXECUTE_IF("ib_import_OOM_14",
1509 delete[] cfg_index[i].m_name;
1510 cfg_index[i].m_name = 0;);
1511
1512 if (cfg_index[i].m_name == 0) {
1513 err = DB_OUT_OF_MEMORY;
1514 break;
1515 }
1516
1517 memcpy(cfg_index[i].m_name, index->name, len);
1518
1519 cfg_index[i].m_srv_index = index;
1520
1521 index->space = m_table->space;
1522 index->page = cfg_index[i].m_page_no;
1523
1524 ++i;
1525 }
1526 }
1527
1528 dict_mutex_exit_for_mysql();
1529
1530 return(err);
1531 }
1532
1533 /**
1534 Purge delete marked records.
1535 @return DB_SUCCESS or error code. */
1536 dberr_t
garbage_collect()1537 IndexPurge::garbage_collect() UNIV_NOTHROW
1538 {
1539 dberr_t err;
1540 ibool comp = dict_table_is_comp(m_index->table);
1541
1542 /* Open the persistent cursor and start the mini-transaction. */
1543
1544 open();
1545
1546 while ((err = next()) == DB_SUCCESS) {
1547
1548 rec_t* rec = btr_pcur_get_rec(&m_pcur);
1549 ibool deleted = rec_get_deleted_flag(rec, comp);
1550
1551 if (!deleted) {
1552 ++m_n_rows;
1553 } else {
1554 purge();
1555 }
1556 }
1557
1558 /* Close the persistent cursor and commit the mini-transaction. */
1559
1560 close();
1561
1562 return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1563 }
1564
1565 /**
1566 Begin import, position the cursor on the first record. */
1567 void
open()1568 IndexPurge::open() UNIV_NOTHROW
1569 {
1570 mtr_start(&m_mtr);
1571
1572 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1573
1574 btr_pcur_open_at_index_side(
1575 true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1576 }
1577
1578 /**
1579 Close the persistent curosr and commit the mini-transaction. */
1580 void
close()1581 IndexPurge::close() UNIV_NOTHROW
1582 {
1583 btr_pcur_close(&m_pcur);
1584 mtr_commit(&m_mtr);
1585 }
1586
1587 /**
1588 Position the cursor on the next record.
1589 @return DB_SUCCESS or error code */
1590 dberr_t
next()1591 IndexPurge::next() UNIV_NOTHROW
1592 {
1593 btr_pcur_move_to_next_on_page(&m_pcur);
1594
1595 /* When switching pages, commit the mini-transaction
1596 in order to release the latch on the old page. */
1597
1598 if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1599 return(DB_SUCCESS);
1600 } else if (trx_is_interrupted(m_trx)) {
1601 /* Check after every page because the check
1602 is expensive. */
1603 return(DB_INTERRUPTED);
1604 }
1605
1606 btr_pcur_store_position(&m_pcur, &m_mtr);
1607
1608 mtr_commit(&m_mtr);
1609
1610 mtr_start(&m_mtr);
1611
1612 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1613
1614 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1615
1616 if (!btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr)) {
1617
1618 return(DB_END_OF_INDEX);
1619 }
1620
1621 return(DB_SUCCESS);
1622 }
1623
1624 /**
1625 Store the persistent cursor position and reopen the
1626 B-tree cursor in BTR_MODIFY_TREE mode, because the
1627 tree structure may be changed during a pessimistic delete. */
1628 void
purge_pessimistic_delete()1629 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1630 {
1631 dberr_t err;
1632
1633 btr_pcur_restore_position(BTR_MODIFY_TREE, &m_pcur, &m_mtr);
1634
1635 ut_ad(rec_get_deleted_flag(
1636 btr_pcur_get_rec(&m_pcur),
1637 dict_table_is_comp(m_index->table)));
1638
1639 btr_cur_pessimistic_delete(
1640 &err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, RB_NONE, &m_mtr);
1641
1642 ut_a(err == DB_SUCCESS);
1643
1644 /* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1645 mtr_commit(&m_mtr);
1646 }
1647
1648 /**
1649 Purge delete-marked records. */
1650 void
purge()1651 IndexPurge::purge() UNIV_NOTHROW
1652 {
1653 btr_pcur_store_position(&m_pcur, &m_mtr);
1654
1655 purge_pessimistic_delete();
1656
1657 mtr_start(&m_mtr);
1658
1659 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1660
1661 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1662 }
1663
1664 /**
1665 Constructor
1666 * @param cfg - config of table being imported.
1667 * @param trx - transaction covering the import */
PageConverter(row_import * cfg,trx_t * trx)1668 PageConverter::PageConverter(
1669 row_import* cfg,
1670 trx_t* trx)
1671 :
1672 AbstractCallback(trx),
1673 m_cfg(cfg),
1674 m_page_zip_ptr(0),
1675 m_heap(0) UNIV_NOTHROW
1676 {
1677 m_index = m_cfg->m_indexes;
1678
1679 m_current_lsn = log_get_lsn();
1680 ut_a(m_current_lsn > 0);
1681
1682 m_offsets = m_offsets_;
1683 rec_offs_init(m_offsets_);
1684
1685 m_cluster_index = dict_table_get_first_index(m_cfg->m_table);
1686 }
1687
1688 /**
1689 Adjust the BLOB reference for a single column that is externally stored
1690 @param rec - record to update
1691 @param offsets - column offsets for the record
1692 @param i - column ordinal value
1693 @return DB_SUCCESS or error code */
1694 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const ulint * offsets,ulint i)1695 PageConverter::adjust_cluster_index_blob_column(
1696 rec_t* rec,
1697 const ulint* offsets,
1698 ulint i) UNIV_NOTHROW
1699 {
1700 ulint len;
1701 byte* field;
1702
1703 field = rec_get_nth_field(rec, offsets, i, &len);
1704
1705 DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1706 len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1707
1708 if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1709
1710 char index_name[MAX_FULL_NAME_LEN + 1];
1711
1712 innobase_format_name(
1713 index_name, sizeof(index_name),
1714 m_cluster_index->name, TRUE);
1715
1716 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1717 ER_INNODB_INDEX_CORRUPT,
1718 "Externally stored column(%lu) has a reference "
1719 "length of %lu in the cluster index %s",
1720 (ulong) i, (ulong) len, index_name);
1721
1722 return(DB_CORRUPTION);
1723 }
1724
1725 field += BTR_EXTERN_SPACE_ID - BTR_EXTERN_FIELD_REF_SIZE + len;
1726
1727 if (is_compressed_table()) {
1728 mach_write_to_4(field, get_space_id());
1729
1730 page_zip_write_blob_ptr(
1731 m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1732 } else {
1733 mlog_write_ulint(field, get_space_id(), MLOG_4BYTES, 0);
1734 }
1735
1736 return(DB_SUCCESS);
1737 }
1738
1739 /**
1740 Adjusts the BLOB reference in the clustered index row for all externally
1741 stored columns.
1742 @param rec - record to update
1743 @param offsets - column offsets for the record
1744 @return DB_SUCCESS or error code */
1745 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const ulint * offsets)1746 PageConverter::adjust_cluster_index_blob_columns(
1747 rec_t* rec,
1748 const ulint* offsets) UNIV_NOTHROW
1749 {
1750 ut_ad(rec_offs_any_extern(offsets));
1751
1752 /* Adjust the space_id in the BLOB pointers. */
1753
1754 for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1755
1756 /* Only if the column is stored "externally". */
1757
1758 if (rec_offs_nth_extern(offsets, i)) {
1759 dberr_t err;
1760
1761 err = adjust_cluster_index_blob_column(rec, offsets, i);
1762
1763 if (err != DB_SUCCESS) {
1764 return(err);
1765 }
1766 }
1767 }
1768
1769 return(DB_SUCCESS);
1770 }
1771
1772 /**
1773 In the clustered index, adjust BLOB pointers as needed. Also update the
1774 BLOB reference, write the new space id.
1775 @param rec - record to update
1776 @param offsets - column offsets for the record
1777 @return DB_SUCCESS or error code */
1778 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const ulint * offsets)1779 PageConverter::adjust_cluster_index_blob_ref(
1780 rec_t* rec,
1781 const ulint* offsets) UNIV_NOTHROW
1782 {
1783 if (rec_offs_any_extern(offsets)) {
1784 dberr_t err;
1785
1786 err = adjust_cluster_index_blob_columns(rec, offsets);
1787
1788 if (err != DB_SUCCESS) {
1789 return(err);
1790 }
1791 }
1792
1793 return(DB_SUCCESS);
1794 }
1795
1796 /**
1797 Purge delete-marked records, only if it is possible to do so without
1798 re-organising the B+tree.
1799 @param offsets - current row offsets.
1800 @return true if purge succeeded */
1801 bool
purge(const ulint * offsets)1802 PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
1803 {
1804 const dict_index_t* index = m_index->m_srv_index;
1805
1806 /* We can't have a page that is empty and not root. */
1807 if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1808
1809 ++m_index->m_stats.m_n_purged;
1810
1811 return(true);
1812 } else {
1813 ++m_index->m_stats.m_n_purge_failed;
1814 }
1815
1816 return(false);
1817 }
1818
1819 /**
1820 Adjust the BLOB references and sys fields for the current record.
1821 @param rec - record to update
1822 @param offsets - column offsets for the record
1823 @param deleted - true if row is delete marked
1824 @return DB_SUCCESS or error code. */
1825 dberr_t
adjust_cluster_record(const dict_index_t * index,rec_t * rec,const ulint * offsets,bool deleted)1826 PageConverter::adjust_cluster_record(
1827 const dict_index_t* index,
1828 rec_t* rec,
1829 const ulint* offsets,
1830 bool deleted) UNIV_NOTHROW
1831 {
1832 dberr_t err;
1833
1834 if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1835
1836 /* Reset DB_TRX_ID and DB_ROLL_PTR. Normally, these fields
1837 are only written in conjunction with other changes to the
1838 record. */
1839
1840 row_upd_rec_sys_fields(
1841 rec, m_page_zip_ptr, m_cluster_index, m_offsets,
1842 m_trx, 0);
1843 }
1844
1845 return(err);
1846 }
1847
1848 /**
1849 Update the BLOB refrences and write UNDO log entries for
1850 rows that can't be purged optimistically.
1851 @param block - block to update
1852 @retval DB_SUCCESS or error code */
1853 dberr_t
update_records(buf_block_t * block)1854 PageConverter::update_records(
1855 buf_block_t* block) UNIV_NOTHROW
1856 {
1857 ibool comp = dict_table_is_comp(m_cfg->m_table);
1858 bool clust_index = m_index->m_srv_index == m_cluster_index;
1859
1860 /* This will also position the cursor on the first user record. */
1861
1862 m_rec_iter.open(block);
1863
1864 while (!m_rec_iter.end()) {
1865
1866 rec_t* rec = m_rec_iter.current();
1867
1868 ibool deleted = rec_get_deleted_flag(rec, comp);
1869
1870 /* For the clustered index we have to adjust the BLOB
1871 reference and the system fields irrespective of the
1872 delete marked flag. The adjustment of delete marked
1873 cluster records is required for purge to work later. */
1874
1875 if (deleted || clust_index) {
1876 m_offsets = rec_get_offsets(
1877 rec, m_index->m_srv_index, m_offsets,
1878 ULINT_UNDEFINED, &m_heap);
1879 }
1880
1881 if (clust_index) {
1882
1883 dberr_t err = adjust_cluster_record(
1884 m_index->m_srv_index, rec, m_offsets,
1885 deleted);
1886
1887 if (err != DB_SUCCESS) {
1888 return(err);
1889 }
1890 }
1891
1892 /* If it is a delete marked record then try an
1893 optimistic delete. */
1894
1895 if (deleted) {
1896 /* A successful purge will move the cursor to the
1897 next record. */
1898
1899 if (!purge(m_offsets)) {
1900 m_rec_iter.next();
1901 }
1902
1903 ++m_index->m_stats.m_n_deleted;
1904 } else {
1905 ++m_index->m_stats.m_n_rows;
1906 m_rec_iter.next();
1907 }
1908 }
1909
1910 return(DB_SUCCESS);
1911 }
1912
1913 /**
1914 Update the space, index id, trx id.
1915 @return DB_SUCCESS or error code */
1916 dberr_t
update_index_page(buf_block_t * block)1917 PageConverter::update_index_page(
1918 buf_block_t* block) UNIV_NOTHROW
1919 {
1920 index_id_t id;
1921 buf_frame_t* page = block->frame;
1922
1923 if (is_free(buf_block_get_page_no(block))) {
1924 return(DB_SUCCESS);
1925 } else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1926
1927 row_index_t* index = find_index(id);
1928
1929 if (index == 0) {
1930 m_index = 0;
1931 return(DB_CORRUPTION);
1932 }
1933
1934 /* Update current index */
1935 m_index = index;
1936 }
1937
1938 /* If the .cfg file is missing and there is an index mismatch
1939 then ignore the error. */
1940 if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1941 return(DB_SUCCESS);
1942 }
1943
1944 #ifdef UNIV_ZIP_DEBUG
1945 ut_a(!is_compressed_table()
1946 || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1947 #endif /* UNIV_ZIP_DEBUG */
1948
1949 /* This has to be written to uncompressed index header. Set it to
1950 the current index id. */
1951 btr_page_set_index_id(
1952 page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1953
1954 page_set_max_trx_id(block, m_page_zip_ptr, m_trx->id, 0);
1955
1956 if (page_is_empty(block->frame)) {
1957
1958 /* Only a root page can be empty. */
1959 if (!is_root_page(block->frame)) {
1960 // TODO: We should relax this and skip secondary
1961 // indexes. Mark them as corrupt because they can
1962 // always be rebuilt.
1963 return(DB_CORRUPTION);
1964 }
1965
1966 return(DB_SUCCESS);
1967 }
1968
1969 if (!page_is_leaf(block->frame)) {
1970 return (DB_SUCCESS);
1971 }
1972
1973 return(update_records(block));
1974 }
1975
1976 /**
1977 Validate the space flags and update tablespace header page.
1978 @param block - block read from file, not from the buffer pool.
1979 @retval DB_SUCCESS or error code */
1980 dberr_t
update_header(buf_block_t * block)1981 PageConverter::update_header(
1982 buf_block_t* block) UNIV_NOTHROW
1983 {
1984 /* Check for valid header */
1985 switch(fsp_header_get_space_id(get_frame(block))) {
1986 case 0:
1987 return(DB_CORRUPTION);
1988 case ULINT_UNDEFINED:
1989 ib_logf(IB_LOG_LEVEL_WARN,
1990 "Space id check in the header failed "
1991 "- ignored");
1992 }
1993
1994 ulint space_flags = fsp_header_get_flags(get_frame(block));
1995
1996 if (!fsp_flags_is_valid(space_flags)) {
1997
1998 ib_logf(IB_LOG_LEVEL_ERROR,
1999 "Unsupported tablespace format %lu",
2000 (ulong) space_flags);
2001
2002 return(DB_UNSUPPORTED);
2003 }
2004
2005 mach_write_to_8(
2006 get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN, m_current_lsn);
2007
2008 /* Write space_id to the tablespace header, page 0. */
2009 mach_write_to_4(
2010 get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2011 get_space_id());
2012
2013 /* This is on every page in the tablespace. */
2014 mach_write_to_4(
2015 get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2016 get_space_id());
2017
2018 return(DB_SUCCESS);
2019 }
2020
2021 /**
2022 Update the page, set the space id, max trx id and index id.
2023 @param block - block read from file
2024 @retval DB_SUCCESS or error code */
2025 dberr_t
update_page(buf_block_t * block,ulint & page_type)2026 PageConverter::update_page(
2027 buf_block_t* block,
2028 ulint& page_type) UNIV_NOTHROW
2029 {
2030 dberr_t err = DB_SUCCESS;
2031
2032 switch (page_type = fil_page_get_type(get_frame(block))) {
2033 case FIL_PAGE_TYPE_FSP_HDR:
2034 /* Work directly on the uncompressed page headers. */
2035 ut_a(buf_block_get_page_no(block) == 0);
2036 return(update_header(block));
2037
2038 case FIL_PAGE_INDEX:
2039 /* We need to decompress the contents into block->frame
2040 before we can do any thing with Btree pages. */
2041
2042 if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2043 return(DB_CORRUPTION);
2044 }
2045
2046 /* This is on every page in the tablespace. */
2047 mach_write_to_4(
2048 get_frame(block)
2049 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2050
2051 /* Only update the Btree nodes. */
2052 return(update_index_page(block));
2053
2054 case FIL_PAGE_TYPE_SYS:
2055 /* This is page 0 in the system tablespace. */
2056 return(DB_CORRUPTION);
2057
2058 case FIL_PAGE_TYPE_XDES:
2059 err = set_current_xdes(
2060 buf_block_get_page_no(block), get_frame(block));
2061 case FIL_PAGE_INODE:
2062 case FIL_PAGE_TYPE_TRX_SYS:
2063 case FIL_PAGE_IBUF_FREE_LIST:
2064 case FIL_PAGE_TYPE_ALLOCATED:
2065 case FIL_PAGE_IBUF_BITMAP:
2066 case FIL_PAGE_TYPE_BLOB:
2067 case FIL_PAGE_TYPE_ZBLOB:
2068 case FIL_PAGE_TYPE_ZBLOB2:
2069
2070 /* Work directly on the uncompressed page headers. */
2071 /* This is on every page in the tablespace. */
2072 mach_write_to_4(
2073 get_frame(block)
2074 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2075
2076 return(err);
2077 }
2078
2079 ib_logf(IB_LOG_LEVEL_WARN, "Unknown page type (%lu)", page_type);
2080
2081 return(DB_CORRUPTION);
2082 }
2083
2084 /**
2085 Validate the page
2086 @param offset - physical offset within file.
2087 @param page - page read from file.
2088 @return status */
2089 PageConverter::import_page_status_t
validate(os_offset_t offset,buf_block_t * block)2090 PageConverter::validate(
2091 os_offset_t offset,
2092 buf_block_t* block) UNIV_NOTHROW
2093 {
2094 buf_frame_t* page = get_frame(block);
2095
2096 /* Check that the page number corresponds to the offset in
2097 the file. Flag as corrupt if it doesn't. Disable the check
2098 for LSN in buf_page_is_corrupted() */
2099
2100 if (buf_page_is_corrupted(false, page, get_zip_size())
2101 || (page_get_page_no(page) != offset / m_page_size
2102 && page_get_page_no(page) != 0)) {
2103
2104 return(IMPORT_PAGE_STATUS_CORRUPTED);
2105
2106 } else if (offset > 0 && page_get_page_no(page) == 0) {
2107 ulint checksum;
2108
2109 checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
2110 if (checksum != 0) {
2111 /* Checksum check passed in buf_page_is_corrupted(). */
2112 ib_logf(IB_LOG_LEVEL_WARN,
2113 "%s: Page %lu checksum %lu should be zero.",
2114 m_filepath, (ulong) (offset / m_page_size),
2115 checksum);
2116 }
2117
2118 const byte* b = page + FIL_PAGE_OFFSET;
2119 const byte* e = page + m_page_size
2120 - FIL_PAGE_END_LSN_OLD_CHKSUM;
2121
2122 /* If the page number is zero and offset > 0 then
2123 the entire page MUST consist of zeroes. If not then
2124 we flag it as corrupt. */
2125
2126 while (b != e) {
2127
2128 if (*b++ && !trigger_corruption()) {
2129 return(IMPORT_PAGE_STATUS_CORRUPTED);
2130 }
2131 }
2132
2133 /* The page is all zero: do nothing. */
2134 return(IMPORT_PAGE_STATUS_ALL_ZERO);
2135 }
2136
2137 return(IMPORT_PAGE_STATUS_OK);
2138 }
2139
2140 /**
2141 Called for every page in the tablespace. If the page was not
2142 updated then its state must be set to BUF_PAGE_NOT_USED.
2143 @param offset - physical offset within the file
2144 @param block - block read from file, note it is not from the buffer pool
2145 @retval DB_SUCCESS or error code. */
2146 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)2147 PageConverter::operator() (
2148 os_offset_t offset,
2149 buf_block_t* block) UNIV_NOTHROW
2150 {
2151 ulint page_type;
2152 dberr_t err = DB_SUCCESS;
2153
2154 if ((err = periodic_check()) != DB_SUCCESS) {
2155 return(err);
2156 }
2157
2158 if (is_compressed_table()) {
2159 m_page_zip_ptr = &block->page.zip;
2160 } else {
2161 ut_ad(m_page_zip_ptr == 0);
2162 }
2163
2164 switch(validate(offset, block)) {
2165 case IMPORT_PAGE_STATUS_OK:
2166
2167 /* We have to decompress the compressed pages before
2168 we can work on them */
2169
2170 if ((err = update_page(block, page_type)) != DB_SUCCESS) {
2171 return(err);
2172 }
2173
2174 /* Note: For compressed pages this function will write to the
2175 zip descriptor and for uncompressed pages it will write to
2176 page (ie. the block->frame). Therefore the caller should write
2177 out the descriptor contents and not block->frame for compressed
2178 pages. */
2179
2180 if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) {
2181
2182 buf_flush_init_for_writing(
2183 !is_compressed_table()
2184 ? block->frame : block->page.zip.data,
2185 !is_compressed_table() ? 0 : m_page_zip_ptr,
2186 m_current_lsn);
2187 } else {
2188 /* Calculate and update the checksum of non-btree
2189 pages for compressed tables explicitly here. */
2190
2191 buf_flush_update_zip_checksum(
2192 get_frame(block), get_zip_size(),
2193 m_current_lsn);
2194 }
2195
2196 break;
2197
2198 case IMPORT_PAGE_STATUS_ALL_ZERO:
2199 /* The page is all zero: leave it as is. */
2200 break;
2201
2202 case IMPORT_PAGE_STATUS_CORRUPTED:
2203
2204 ib_logf(IB_LOG_LEVEL_WARN,
2205 "%s: Page %lu at offset " UINT64PF " looks corrupted.",
2206 m_filepath, (ulong) (offset / m_page_size), offset);
2207
2208 return(DB_CORRUPTION);
2209 }
2210
2211 return(err);
2212 }
2213
2214 /*****************************************************************//**
2215 Clean up after import tablespace failure, this function will acquire
2216 the dictionary latches on behalf of the transaction if the transaction
2217 hasn't already acquired them. */
2218 static MY_ATTRIBUTE((nonnull))
2219 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2220 row_import_discard_changes(
2221 /*=======================*/
2222 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2223 trx_t* trx, /*!< in/out: transaction for import */
2224 dberr_t err) /*!< in: error code */
2225 {
2226 dict_table_t* table = prebuilt->table;
2227
2228 ut_a(err != DB_SUCCESS);
2229
2230 prebuilt->trx->error_info = NULL;
2231
2232 char table_name[MAX_FULL_NAME_LEN + 1];
2233
2234 innobase_format_name(
2235 table_name, sizeof(table_name),
2236 prebuilt->table->name, FALSE);
2237
2238 ib_logf(IB_LOG_LEVEL_INFO,
2239 "Discarding tablespace of table %s: %s",
2240 table_name, ut_strerr(err));
2241
2242 if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2243 ut_a(trx->dict_operation_lock_mode == 0);
2244 row_mysql_lock_data_dictionary(trx);
2245 }
2246
2247 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2248
2249 /* Since we update the index root page numbers on disk after
2250 we've done a successful import. The table will not be loadable.
2251 However, we need to ensure that the in memory root page numbers
2252 are reset to "NULL". */
2253
2254 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2255 index != 0;
2256 index = UT_LIST_GET_NEXT(indexes, index)) {
2257
2258 index->page = FIL_NULL;
2259 index->space = FIL_NULL;
2260 }
2261
2262 table->ibd_file_missing = TRUE;
2263
2264 fil_close_tablespace(trx, table->space);
2265 }
2266
2267 /*****************************************************************//**
2268 Clean up after import tablespace. */
2269 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2270 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2271 row_import_cleanup(
2272 /*===============*/
2273 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2274 trx_t* trx, /*!< in/out: transaction for import */
2275 dberr_t err) /*!< in: error code */
2276 {
2277 ut_a(prebuilt->trx != trx);
2278
2279 if (err != DB_SUCCESS) {
2280 row_import_discard_changes(prebuilt, trx, err);
2281 }
2282
2283 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2284
2285 DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2286
2287 trx_commit_for_mysql(trx);
2288
2289 row_mysql_unlock_data_dictionary(trx);
2290
2291 trx_free_for_mysql(trx);
2292
2293 prebuilt->trx->op_info = "";
2294
2295 DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2296
2297 log_make_checkpoint_at(LSN_MAX, TRUE);
2298
2299 return(err);
2300 }
2301
2302 /*****************************************************************//**
2303 Report error during tablespace import. */
2304 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2305 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2306 row_import_error(
2307 /*=============*/
2308 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2309 trx_t* trx, /*!< in/out: transaction for import */
2310 dberr_t err) /*!< in: error code */
2311 {
2312 if (!trx_is_interrupted(trx)) {
2313 char table_name[MAX_FULL_NAME_LEN + 1];
2314
2315 innobase_format_name(
2316 table_name, sizeof(table_name),
2317 prebuilt->table->name, FALSE);
2318
2319 ib_senderrf(
2320 trx->mysql_thd, IB_LOG_LEVEL_WARN,
2321 ER_INNODB_IMPORT_ERROR,
2322 table_name, (ulong) err, ut_strerr(err));
2323 }
2324
2325 return(row_import_cleanup(prebuilt, trx, err));
2326 }
2327
2328 /*****************************************************************//**
2329 Adjust the root page index node and leaf node segment headers, update
2330 with the new space id. For all the table's secondary indexes.
2331 @return error code */
2332 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2333 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(row_prebuilt_t * prebuilt,trx_t * trx,dict_table_t * table,const row_import & cfg)2334 row_import_adjust_root_pages_of_secondary_indexes(
2335 /*==============================================*/
2336 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
2337 handler */
2338 trx_t* trx, /*!< in: transaction used for
2339 the import */
2340 dict_table_t* table, /*!< in: table the indexes
2341 belong to */
2342 const row_import& cfg) /*!< Import context */
2343 {
2344 dict_index_t* index;
2345 ulint n_rows_in_table;
2346 dberr_t err = DB_SUCCESS;
2347
2348 /* Skip the clustered index. */
2349 index = dict_table_get_first_index(table);
2350
2351 n_rows_in_table = cfg.get_n_rows(index->name);
2352
2353 DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2354 n_rows_in_table++;);
2355
2356 /* Adjust the root pages of the secondary indexes only. */
2357 while ((index = dict_table_get_next_index(index)) != NULL) {
2358 char index_name[MAX_FULL_NAME_LEN + 1];
2359
2360 innobase_format_name(
2361 index_name, sizeof(index_name), index->name, TRUE);
2362
2363 ut_a(!dict_index_is_clust(index));
2364
2365 if (!(index->type & DICT_CORRUPT)
2366 && index->space != FIL_NULL
2367 && index->page != FIL_NULL) {
2368
2369 /* Update the Btree segment headers for index node and
2370 leaf nodes in the root page. Set the new space id. */
2371
2372 err = btr_root_adjust_on_import(index);
2373 } else {
2374 ib_logf(IB_LOG_LEVEL_WARN,
2375 "Skip adjustment of root pages for "
2376 "index %s.", index->name);
2377
2378 err = DB_CORRUPTION;
2379 }
2380
2381 if (err != DB_SUCCESS) {
2382
2383 if (index->type & DICT_CLUSTERED) {
2384 break;
2385 }
2386
2387 ib_errf(trx->mysql_thd,
2388 IB_LOG_LEVEL_WARN,
2389 ER_INNODB_INDEX_CORRUPT,
2390 "Index '%s' not found or corrupt, "
2391 "you should recreate this index.",
2392 index_name);
2393
2394 /* Do not bail out, so that the data
2395 can be recovered. */
2396
2397 err = DB_SUCCESS;
2398 index->type |= DICT_CORRUPT;
2399 continue;
2400 }
2401
2402 /* If we failed to purge any records in the index then
2403 do it the hard way.
2404
2405 TODO: We can do this in the first pass by generating UNDO log
2406 records for the failed rows. */
2407
2408 if (!cfg.requires_purge(index->name)) {
2409 continue;
2410 }
2411
2412 IndexPurge purge(trx, index);
2413
2414 trx->op_info = "secondary: purge delete marked records";
2415
2416 err = purge.garbage_collect();
2417
2418 trx->op_info = "";
2419
2420 if (err != DB_SUCCESS) {
2421 break;
2422 } else if (purge.get_n_rows() != n_rows_in_table) {
2423
2424 ib_errf(trx->mysql_thd,
2425 IB_LOG_LEVEL_WARN,
2426 ER_INNODB_INDEX_CORRUPT,
2427 "Index '%s' contains %lu entries, "
2428 "should be %lu, you should recreate "
2429 "this index.", index_name,
2430 (ulong) purge.get_n_rows(),
2431 (ulong) n_rows_in_table);
2432
2433 index->type |= DICT_CORRUPT;
2434
2435 /* Do not bail out, so that the data
2436 can be recovered. */
2437
2438 err = DB_SUCCESS;
2439 }
2440 }
2441
2442 return(err);
2443 }
2444
2445 /*****************************************************************//**
2446 Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
2447 @return error code */
2448 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2449 dberr_t
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2450 row_import_set_sys_max_row_id(
2451 /*==========================*/
2452 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
2453 handler */
2454 const dict_table_t* table) /*!< in: table to import */
2455 {
2456 dberr_t err;
2457 const rec_t* rec;
2458 mtr_t mtr;
2459 btr_pcur_t pcur;
2460 row_id_t row_id = 0;
2461 dict_index_t* index;
2462
2463 index = dict_table_get_first_index(table);
2464 ut_a(dict_index_is_clust(index));
2465
2466 mtr_start(&mtr);
2467
2468 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2469
2470 btr_pcur_open_at_index_side(
2471 false, // High end
2472 index,
2473 BTR_SEARCH_LEAF,
2474 &pcur,
2475 true, // Init cursor
2476 0, // Leaf level
2477 &mtr);
2478
2479 btr_pcur_move_to_prev_on_page(&pcur);
2480 rec = btr_pcur_get_rec(&pcur);
2481
2482 /* Check for empty table. */
2483 if (!page_rec_is_infimum(rec)) {
2484 ulint len;
2485 const byte* field;
2486 mem_heap_t* heap = NULL;
2487 ulint offsets_[1 + REC_OFFS_HEADER_SIZE];
2488 ulint* offsets;
2489
2490 rec_offs_init(offsets_);
2491
2492 offsets = rec_get_offsets(
2493 rec, index, offsets_, ULINT_UNDEFINED, &heap);
2494
2495 field = rec_get_nth_field(
2496 rec, offsets,
2497 dict_index_get_sys_col_pos(index, DATA_ROW_ID),
2498 &len);
2499
2500 if (len == DATA_ROW_ID_LEN) {
2501 row_id = mach_read_from_6(field);
2502 err = DB_SUCCESS;
2503 } else {
2504 err = DB_CORRUPTION;
2505 }
2506
2507 if (heap != NULL) {
2508 mem_heap_free(heap);
2509 }
2510 } else {
2511 /* The table is empty. */
2512 err = DB_SUCCESS;
2513 }
2514
2515 btr_pcur_close(&pcur);
2516 mtr_commit(&mtr);
2517
2518 DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
2519 err = DB_CORRUPTION;);
2520
2521 if (err != DB_SUCCESS) {
2522 char index_name[MAX_FULL_NAME_LEN + 1];
2523
2524 innobase_format_name(
2525 index_name, sizeof(index_name), index->name, TRUE);
2526
2527 ib_errf(prebuilt->trx->mysql_thd,
2528 IB_LOG_LEVEL_WARN,
2529 ER_INNODB_INDEX_CORRUPT,
2530 "Index '%s' corruption detected, invalid DB_ROW_ID "
2531 "in index.", index_name);
2532
2533 return(err);
2534
2535 } else if (row_id > 0) {
2536
2537 /* Update the system row id if the imported index row id is
2538 greater than the max system row id. */
2539
2540 mutex_enter(&dict_sys->mutex);
2541
2542 if (row_id >= dict_sys->row_id) {
2543 dict_sys->row_id = row_id + 1;
2544 dict_hdr_flush_row_id();
2545 }
2546
2547 mutex_exit(&dict_sys->mutex);
2548 }
2549
2550 return(DB_SUCCESS);
2551 }
2552
2553 /*****************************************************************//**
2554 Read the a string from the meta data file.
2555 @return DB_SUCCESS or error code. */
2556 static
2557 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2558 row_import_cfg_read_string(
2559 /*=======================*/
2560 FILE* file, /*!< in/out: File to read from */
2561 byte* ptr, /*!< out: string to read */
2562 ulint max_len) /*!< in: maximum length of the output
2563 buffer in bytes */
2564 {
2565 DBUG_EXECUTE_IF("ib_import_string_read_error",
2566 errno = EINVAL; return(DB_IO_ERROR););
2567
2568 ulint len = 0;
2569
2570 while (!feof(file)) {
2571 int ch = fgetc(file);
2572
2573 if (ch == EOF) {
2574 break;
2575 } else if (ch != 0) {
2576 if (len < max_len) {
2577 ptr[len++] = ch;
2578 } else {
2579 break;
2580 }
2581 /* max_len includes the NUL byte */
2582 } else if (len != max_len - 1) {
2583 break;
2584 } else {
2585 ptr[len] = 0;
2586 return(DB_SUCCESS);
2587 }
2588 }
2589
2590 errno = EINVAL;
2591
2592 return(DB_IO_ERROR);
2593 }
2594
2595 /*********************************************************************//**
2596 Write the meta data (index user fields) config file.
2597 @return DB_SUCCESS or error code. */
2598 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2599 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index,row_import * cfg)2600 row_import_cfg_read_index_fields(
2601 /*=============================*/
2602 FILE* file, /*!< in: file to write to */
2603 THD* thd, /*!< in/out: session */
2604 row_index_t* index, /*!< Index being read in */
2605 row_import* cfg) /*!< in/out: meta-data read */
2606 {
2607 byte row[sizeof(ib_uint32_t) * 3];
2608 ulint n_fields = index->m_n_fields;
2609
2610 index->m_fields = new(std::nothrow) dict_field_t[n_fields];
2611
2612 /* Trigger OOM */
2613 DBUG_EXECUTE_IF("ib_import_OOM_4",
2614 delete [] index->m_fields; index->m_fields = 0;);
2615
2616 if (index->m_fields == 0) {
2617 return(DB_OUT_OF_MEMORY);
2618 }
2619
2620 dict_field_t* field = index->m_fields;
2621
2622 memset(field, 0x0, sizeof(*field) * n_fields);
2623
2624 for (ulint i = 0; i < n_fields; ++i, ++field) {
2625 byte* ptr = row;
2626
2627 /* Trigger EOF */
2628 DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2629 (void) fseek(file, 0L, SEEK_END););
2630
2631 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2632
2633 ib_senderrf(
2634 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2635 errno, strerror(errno),
2636 "while reading index fields.");
2637
2638 return(DB_IO_ERROR);
2639 }
2640
2641 field->prefix_len = mach_read_from_4(ptr);
2642 ptr += sizeof(ib_uint32_t);
2643
2644 field->fixed_len = mach_read_from_4(ptr);
2645 ptr += sizeof(ib_uint32_t);
2646
2647 /* Include the NUL byte in the length. */
2648 ulint len = mach_read_from_4(ptr);
2649
2650 byte* name = new(std::nothrow) byte[len];
2651
2652 /* Trigger OOM */
2653 DBUG_EXECUTE_IF("ib_import_OOM_5", delete [] name; name = 0;);
2654
2655 if (name == 0) {
2656 return(DB_OUT_OF_MEMORY);
2657 }
2658
2659 field->name = reinterpret_cast<const char*>(name);
2660
2661 dberr_t err = row_import_cfg_read_string(file, name, len);
2662
2663 if (err != DB_SUCCESS) {
2664
2665 ib_senderrf(
2666 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2667 errno, strerror(errno),
2668 "while parsing table name.");
2669
2670 return(err);
2671 }
2672 }
2673
2674 return(DB_SUCCESS);
2675 }
2676
2677 /*****************************************************************//**
2678 Read the index names and root page numbers of the indexes and set the values.
2679 Row format [root_page_no, len of str, str ... ]
2680 @return DB_SUCCESS or error code. */
2681 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2682 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2683 row_import_read_index_data(
2684 /*=======================*/
2685 FILE* file, /*!< in: File to read from */
2686 THD* thd, /*!< in: session */
2687 row_import* cfg) /*!< in/out: meta-data read */
2688 {
2689 byte* ptr;
2690 row_index_t* cfg_index;
2691 byte row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2692
2693 /* FIXME: What is the max value? */
2694 ut_a(cfg->m_n_indexes > 0);
2695 ut_a(cfg->m_n_indexes < 1024);
2696
2697 cfg->m_indexes = new(std::nothrow) row_index_t[cfg->m_n_indexes];
2698
2699 /* Trigger OOM */
2700 DBUG_EXECUTE_IF("ib_import_OOM_6",
2701 delete [] cfg->m_indexes; cfg->m_indexes = 0;);
2702
2703 if (cfg->m_indexes == 0) {
2704 return(DB_OUT_OF_MEMORY);
2705 }
2706
2707 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2708
2709 cfg_index = cfg->m_indexes;
2710
2711 for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2712 /* Trigger EOF */
2713 DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2714 (void) fseek(file, 0L, SEEK_END););
2715
2716 /* Read the index data. */
2717 size_t n_bytes = fread(row, 1, sizeof(row), file);
2718
2719 /* Trigger EOF */
2720 DBUG_EXECUTE_IF("ib_import_io_read_error",
2721 (void) fseek(file, 0L, SEEK_END););
2722
2723 if (n_bytes != sizeof(row)) {
2724 char msg[BUFSIZ];
2725
2726 ut_snprintf(msg, sizeof(msg),
2727 "while reading index meta-data, expected "
2728 "to read %lu bytes but read only %lu "
2729 "bytes",
2730 (ulong) sizeof(row), (ulong) n_bytes);
2731
2732 ib_senderrf(
2733 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2734 errno, strerror(errno), msg);
2735
2736 ib_logf(IB_LOG_LEVEL_ERROR, "IO Error: %s", msg);
2737
2738 return(DB_IO_ERROR);
2739 }
2740
2741 ptr = row;
2742
2743 cfg_index->m_id = mach_read_from_8(ptr);
2744 ptr += sizeof(index_id_t);
2745
2746 cfg_index->m_space = mach_read_from_4(ptr);
2747 ptr += sizeof(ib_uint32_t);
2748
2749 cfg_index->m_page_no = mach_read_from_4(ptr);
2750 ptr += sizeof(ib_uint32_t);
2751
2752 cfg_index->m_type = mach_read_from_4(ptr);
2753 ptr += sizeof(ib_uint32_t);
2754
2755 cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2756 if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2757 ut_ad(0);
2758 /* Overflow. Pretend that the clustered index
2759 has a variable-length PRIMARY KEY. */
2760 cfg_index->m_trx_id_offset = 0;
2761 }
2762 ptr += sizeof(ib_uint32_t);
2763
2764 cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2765 ptr += sizeof(ib_uint32_t);
2766
2767 cfg_index->m_n_uniq = mach_read_from_4(ptr);
2768 ptr += sizeof(ib_uint32_t);
2769
2770 cfg_index->m_n_nullable = mach_read_from_4(ptr);
2771 ptr += sizeof(ib_uint32_t);
2772
2773 cfg_index->m_n_fields = mach_read_from_4(ptr);
2774 ptr += sizeof(ib_uint32_t);
2775
2776 /* The NUL byte is included in the name length. */
2777 ulint len = mach_read_from_4(ptr);
2778
2779 if (len > OS_FILE_MAX_PATH) {
2780 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2781 ER_INNODB_INDEX_CORRUPT,
2782 "Index name length (%lu) is too long, "
2783 "the meta-data is corrupt", len);
2784
2785 return(DB_CORRUPTION);
2786 }
2787
2788 cfg_index->m_name = new(std::nothrow) byte[len];
2789
2790 /* Trigger OOM */
2791 DBUG_EXECUTE_IF("ib_import_OOM_7",
2792 delete [] cfg_index->m_name;
2793 cfg_index->m_name = 0;);
2794
2795 if (cfg_index->m_name == 0) {
2796 return(DB_OUT_OF_MEMORY);
2797 }
2798
2799 dberr_t err;
2800
2801 err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2802
2803 if (err != DB_SUCCESS) {
2804
2805 ib_senderrf(
2806 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2807 errno, strerror(errno),
2808 "while parsing index name.");
2809
2810 return(err);
2811 }
2812
2813 err = row_import_cfg_read_index_fields(
2814 file, thd, cfg_index, cfg);
2815
2816 if (err != DB_SUCCESS) {
2817 return(err);
2818 }
2819
2820 }
2821
2822 return(DB_SUCCESS);
2823 }
2824
2825 /*****************************************************************//**
2826 Set the index root page number for v1 format.
2827 @return DB_SUCCESS or error code. */
2828 static
2829 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2830 row_import_read_indexes(
2831 /*====================*/
2832 FILE* file, /*!< in: File to read from */
2833 THD* thd, /*!< in: session */
2834 row_import* cfg) /*!< in/out: meta-data read */
2835 {
2836 byte row[sizeof(ib_uint32_t)];
2837
2838 /* Trigger EOF */
2839 DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2840 (void) fseek(file, 0L, SEEK_END););
2841
2842 /* Read the number of indexes. */
2843 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2844 ib_senderrf(
2845 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2846 errno, strerror(errno),
2847 "while reading number of indexes.");
2848
2849 return(DB_IO_ERROR);
2850 }
2851
2852 cfg->m_n_indexes = mach_read_from_4(row);
2853
2854 if (cfg->m_n_indexes == 0) {
2855 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2856 "Number of indexes in meta-data file is 0");
2857
2858 return(DB_CORRUPTION);
2859
2860 } else if (cfg->m_n_indexes > 1024) {
2861 // FIXME: What is the upper limit? */
2862 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2863 "Number of indexes in meta-data file is too high: %lu",
2864 (ulong) cfg->m_n_indexes);
2865 cfg->m_n_indexes = 0;
2866
2867 return(DB_CORRUPTION);
2868 }
2869
2870 return(row_import_read_index_data(file, thd, cfg));
2871 }
2872
2873 /*********************************************************************//**
2874 Read the meta data (table columns) config file. Deserialise the contents of
2875 dict_col_t structure, along with the column name. */
2876 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2877 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2878 row_import_read_columns(
2879 /*====================*/
2880 FILE* file, /*!< in: file to write to */
2881 THD* thd, /*!< in/out: session */
2882 row_import* cfg) /*!< in/out: meta-data read */
2883 {
2884 dict_col_t* col;
2885 byte row[sizeof(ib_uint32_t) * 8];
2886
2887 /* FIXME: What should the upper limit be? */
2888 ut_a(cfg->m_n_cols > 0);
2889 ut_a(cfg->m_n_cols < 1024);
2890
2891 cfg->m_cols = new(std::nothrow) dict_col_t[cfg->m_n_cols];
2892
2893 /* Trigger OOM */
2894 DBUG_EXECUTE_IF("ib_import_OOM_8",
2895 delete [] cfg->m_cols; cfg->m_cols = 0;);
2896
2897 if (cfg->m_cols == 0) {
2898 return(DB_OUT_OF_MEMORY);
2899 }
2900
2901 cfg->m_col_names = new(std::nothrow) byte* [cfg->m_n_cols];
2902
2903 /* Trigger OOM */
2904 DBUG_EXECUTE_IF("ib_import_OOM_9",
2905 delete [] cfg->m_col_names; cfg->m_col_names = 0;);
2906
2907 if (cfg->m_col_names == 0) {
2908 return(DB_OUT_OF_MEMORY);
2909 }
2910
2911 memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2912 memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2913
2914 col = cfg->m_cols;
2915
2916 for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2917 byte* ptr = row;
2918
2919 /* Trigger EOF */
2920 DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2921 (void) fseek(file, 0L, SEEK_END););
2922
2923 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2924 ib_senderrf(
2925 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2926 errno, strerror(errno),
2927 "while reading table column meta-data.");
2928
2929 return(DB_IO_ERROR);
2930 }
2931
2932 col->prtype = mach_read_from_4(ptr);
2933 ptr += sizeof(ib_uint32_t);
2934
2935 col->mtype = mach_read_from_4(ptr);
2936 ptr += sizeof(ib_uint32_t);
2937
2938 col->len = mach_read_from_4(ptr);
2939 ptr += sizeof(ib_uint32_t);
2940
2941 col->mbminmaxlen = mach_read_from_4(ptr);
2942 ptr += sizeof(ib_uint32_t);
2943
2944 col->ind = mach_read_from_4(ptr);
2945 ptr += sizeof(ib_uint32_t);
2946
2947 col->ord_part = mach_read_from_4(ptr);
2948 ptr += sizeof(ib_uint32_t);
2949
2950 col->max_prefix = mach_read_from_4(ptr);
2951 ptr += sizeof(ib_uint32_t);
2952
2953 /* Read in the column name as [len, byte array]. The len
2954 includes the NUL byte. */
2955
2956 ulint len = mach_read_from_4(ptr);
2957
2958 /* FIXME: What is the maximum column name length? */
2959 if (len == 0 || len > 128) {
2960 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2961 ER_IO_READ_ERROR,
2962 "Column name length %lu, is invalid",
2963 (ulong) len);
2964
2965 return(DB_CORRUPTION);
2966 }
2967
2968 cfg->m_col_names[i] = new(std::nothrow) byte[len];
2969
2970 /* Trigger OOM */
2971 DBUG_EXECUTE_IF("ib_import_OOM_10",
2972 delete [] cfg->m_col_names[i];
2973 cfg->m_col_names[i] = 0;);
2974
2975 if (cfg->m_col_names[i] == 0) {
2976 return(DB_OUT_OF_MEMORY);
2977 }
2978
2979 dberr_t err;
2980
2981 err = row_import_cfg_read_string(
2982 file, cfg->m_col_names[i], len);
2983
2984 if (err != DB_SUCCESS) {
2985
2986 ib_senderrf(
2987 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2988 errno, strerror(errno),
2989 "while parsing table column name.");
2990
2991 return(err);
2992 }
2993 }
2994
2995 return(DB_SUCCESS);
2996 }
2997
2998 /*****************************************************************//**
2999 Read the contents of the <tablespace>.cfg file.
3000 @return DB_SUCCESS or error code. */
3001 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3002 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)3003 row_import_read_v1(
3004 /*===============*/
3005 FILE* file, /*!< in: File to read from */
3006 THD* thd, /*!< in: session */
3007 row_import* cfg) /*!< out: meta data */
3008 {
3009 byte value[sizeof(ib_uint32_t)];
3010
3011 /* Trigger EOF */
3012 DBUG_EXECUTE_IF("ib_import_io_read_error_5",
3013 (void) fseek(file, 0L, SEEK_END););
3014
3015 /* Read the hostname where the tablespace was exported. */
3016 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
3017 ib_senderrf(
3018 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3019 errno, strerror(errno),
3020 "while reading meta-data export hostname length.");
3021
3022 return(DB_IO_ERROR);
3023 }
3024
3025 ulint len = mach_read_from_4(value);
3026
3027 /* NUL byte is part of name length. */
3028 cfg->m_hostname = new(std::nothrow) byte[len];
3029
3030 /* Trigger OOM */
3031 DBUG_EXECUTE_IF("ib_import_OOM_1",
3032 delete [] cfg->m_hostname; cfg->m_hostname = 0;);
3033
3034 if (cfg->m_hostname == 0) {
3035 return(DB_OUT_OF_MEMORY);
3036 }
3037
3038 dberr_t err = row_import_cfg_read_string(file, cfg->m_hostname, len);
3039
3040 if (err != DB_SUCCESS) {
3041
3042 ib_senderrf(
3043 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3044 errno, strerror(errno),
3045 "while parsing export hostname.");
3046
3047 return(err);
3048 }
3049
3050 /* Trigger EOF */
3051 DBUG_EXECUTE_IF("ib_import_io_read_error_6",
3052 (void) fseek(file, 0L, SEEK_END););
3053
3054 /* Read the table name of tablespace that was exported. */
3055 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
3056 ib_senderrf(
3057 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3058 errno, strerror(errno),
3059 "while reading meta-data table name length.");
3060
3061 return(DB_IO_ERROR);
3062 }
3063
3064 len = mach_read_from_4(value);
3065
3066 /* NUL byte is part of name length. */
3067 cfg->m_table_name = new(std::nothrow) byte[len];
3068
3069 /* Trigger OOM */
3070 DBUG_EXECUTE_IF("ib_import_OOM_2",
3071 delete [] cfg->m_table_name; cfg->m_table_name = 0;);
3072
3073 if (cfg->m_table_name == 0) {
3074 return(DB_OUT_OF_MEMORY);
3075 }
3076
3077 err = row_import_cfg_read_string(file, cfg->m_table_name, len);
3078
3079 if (err != DB_SUCCESS) {
3080 ib_senderrf(
3081 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3082 errno, strerror(errno),
3083 "while parsing table name.");
3084
3085 return(err);
3086 }
3087
3088 ib_logf(IB_LOG_LEVEL_INFO,
3089 "Importing tablespace for table '%s' that was exported "
3090 "from host '%s'", cfg->m_table_name, cfg->m_hostname);
3091
3092 byte row[sizeof(ib_uint32_t) * 3];
3093
3094 /* Trigger EOF */
3095 DBUG_EXECUTE_IF("ib_import_io_read_error_7",
3096 (void) fseek(file, 0L, SEEK_END););
3097
3098 /* Read the autoinc value. */
3099 if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
3100 ib_senderrf(
3101 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3102 errno, strerror(errno),
3103 "while reading autoinc value.");
3104
3105 return(DB_IO_ERROR);
3106 }
3107
3108 cfg->m_autoinc = mach_read_from_8(row);
3109
3110 /* Trigger EOF */
3111 DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3112 (void) fseek(file, 0L, SEEK_END););
3113
3114 /* Read the tablespace page size. */
3115 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3116 ib_senderrf(
3117 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3118 errno, strerror(errno),
3119 "while reading meta-data header.");
3120
3121 return(DB_IO_ERROR);
3122 }
3123
3124 byte* ptr = row;
3125
3126 cfg->m_page_size = mach_read_from_4(ptr);
3127 ptr += sizeof(ib_uint32_t);
3128
3129 if (cfg->m_page_size != UNIV_PAGE_SIZE) {
3130
3131 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3132 "Tablespace to be imported has a different "
3133 "page size than this server. Server page size "
3134 "is %lu, whereas tablespace page size is %lu",
3135 UNIV_PAGE_SIZE, (ulong) cfg->m_page_size);
3136
3137 return(DB_ERROR);
3138 }
3139
3140 cfg->m_flags = mach_read_from_4(ptr);
3141 ptr += sizeof(ib_uint32_t);
3142
3143 cfg->m_n_cols = mach_read_from_4(ptr);
3144
3145 if (!dict_tf_is_valid(cfg->m_flags)) {
3146
3147 return(DB_CORRUPTION);
3148
3149 } else if ((err = row_import_read_columns(file, thd, cfg))
3150 != DB_SUCCESS) {
3151
3152 return(err);
3153
3154 } else if ((err = row_import_read_indexes(file, thd, cfg))
3155 != DB_SUCCESS) {
3156
3157 return(err);
3158 }
3159
3160 ut_a(err == DB_SUCCESS);
3161 return(err);
3162 }
3163
3164 /**
3165 Read the contents of the <tablespace>.cfg file.
3166 @return DB_SUCCESS or error code. */
3167 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3168 dberr_t
row_import_read_meta_data(dict_table_t * table,FILE * file,THD * thd,row_import & cfg)3169 row_import_read_meta_data(
3170 /*======================*/
3171 dict_table_t* table, /*!< in: table */
3172 FILE* file, /*!< in: File to read from */
3173 THD* thd, /*!< in: session */
3174 row_import& cfg) /*!< out: contents of the .cfg file */
3175 {
3176 byte row[sizeof(ib_uint32_t)];
3177
3178 /* Trigger EOF */
3179 DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3180 (void) fseek(file, 0L, SEEK_END););
3181
3182 if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3183 ib_senderrf(
3184 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3185 errno, strerror(errno),
3186 "while reading meta-data version.");
3187
3188 return(DB_IO_ERROR);
3189 }
3190
3191 cfg.m_version = mach_read_from_4(row);
3192
3193 /* Check the version number. */
3194 switch (cfg.m_version) {
3195 case IB_EXPORT_CFG_VERSION_V1:
3196
3197 return(row_import_read_v1(file, thd, &cfg));
3198 default:
3199 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3200 "Unsupported meta-data version number (%lu), "
3201 "file ignored", (ulong) cfg.m_version);
3202 }
3203
3204 return(DB_ERROR);
3205 }
3206
3207 /**
3208 Read the contents of the <tablename>.cfg file.
3209 @return DB_SUCCESS or error code. */
3210 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3211 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3212 row_import_read_cfg(
3213 /*================*/
3214 dict_table_t* table, /*!< in: table */
3215 THD* thd, /*!< in: session */
3216 row_import& cfg) /*!< out: contents of the .cfg file */
3217 {
3218 dberr_t err;
3219 char name[OS_FILE_MAX_PATH];
3220
3221 cfg.m_table = table;
3222
3223 srv_get_meta_data_filename(table, name, sizeof(name));
3224
3225 FILE* file = fopen(name, "rb");
3226
3227 if (file == NULL) {
3228 char msg[BUFSIZ];
3229
3230 ut_snprintf(msg, sizeof(msg),
3231 "Error opening '%s', will attempt to import "
3232 "without schema verification", name);
3233
3234 ib_senderrf(
3235 thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3236 errno, strerror(errno), msg);
3237
3238 cfg.m_missing = true;
3239
3240 err = DB_FAIL;
3241 } else {
3242
3243 cfg.m_missing = false;
3244
3245 err = row_import_read_meta_data(table, file, thd, cfg);
3246 fclose(file);
3247 }
3248
3249 return(err);
3250 }
3251
3252 /*****************************************************************//**
3253 Update the <space, root page> of a table's indexes from the values
3254 in the data dictionary.
3255 @return DB_SUCCESS or error code */
3256 UNIV_INTERN
3257 dberr_t
row_import_update_index_root(trx_t * trx,const dict_table_t * table,bool reset,bool dict_locked)3258 row_import_update_index_root(
3259 /*=========================*/
3260 trx_t* trx, /*!< in/out: transaction that
3261 covers the update */
3262 const dict_table_t* table, /*!< in: Table for which we want
3263 to set the root page_no */
3264 bool reset, /*!< in: if true then set to
3265 FIL_NUL */
3266 bool dict_locked) /*!< in: Set to true if the
3267 caller already owns the
3268 dict_sys_t:: mutex. */
3269
3270 {
3271 const dict_index_t* index;
3272 que_t* graph = 0;
3273 dberr_t err = DB_SUCCESS;
3274
3275 static const char sql[] = {
3276 "PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3277 "BEGIN\n"
3278 "UPDATE SYS_INDEXES\n"
3279 "SET SPACE = :space,\n"
3280 " PAGE_NO = :page,\n"
3281 " TYPE = :type\n"
3282 "WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3283 "END;\n"};
3284
3285 if (!dict_locked) {
3286 mutex_enter(&dict_sys->mutex);
3287 }
3288
3289 for (index = dict_table_get_first_index(table);
3290 index != 0;
3291 index = dict_table_get_next_index(index)) {
3292
3293 pars_info_t* info;
3294 ib_uint32_t page;
3295 ib_uint32_t space;
3296 ib_uint32_t type;
3297 index_id_t index_id;
3298 table_id_t table_id;
3299
3300 info = (graph != 0) ? graph->info : pars_info_create();
3301
3302 mach_write_to_4(
3303 reinterpret_cast<byte*>(&type),
3304 index->type);
3305
3306 mach_write_to_4(
3307 reinterpret_cast<byte*>(&page),
3308 reset ? FIL_NULL : index->page);
3309
3310 mach_write_to_4(
3311 reinterpret_cast<byte*>(&space),
3312 reset ? FIL_NULL : index->space);
3313
3314 mach_write_to_8(
3315 reinterpret_cast<byte*>(&index_id),
3316 index->id);
3317
3318 mach_write_to_8(
3319 reinterpret_cast<byte*>(&table_id),
3320 table->id);
3321
3322 /* If we set the corrupt bit during the IMPORT phase then
3323 we need to update the system tables. */
3324 pars_info_bind_int4_literal(info, "type", &type);
3325 pars_info_bind_int4_literal(info, "space", &space);
3326 pars_info_bind_int4_literal(info, "page", &page);
3327 pars_info_bind_ull_literal(info, "index_id", &index_id);
3328 pars_info_bind_ull_literal(info, "table_id", &table_id);
3329
3330 if (graph == 0) {
3331 graph = pars_sql(info, sql);
3332 ut_a(graph);
3333 graph->trx = trx;
3334 }
3335
3336 que_thr_t* thr;
3337
3338 graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3339
3340 ut_a(thr = que_fork_start_command(graph));
3341
3342 que_run_threads(thr);
3343
3344 DBUG_EXECUTE_IF("ib_import_internal_error",
3345 trx->error_state = DB_ERROR;);
3346
3347 err = trx->error_state;
3348
3349 if (err != DB_SUCCESS) {
3350 char index_name[MAX_FULL_NAME_LEN + 1];
3351
3352 innobase_format_name(
3353 index_name, sizeof(index_name),
3354 index->name, TRUE);
3355
3356 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3357 ER_INTERNAL_ERROR,
3358 "While updating the <space, root page "
3359 "number> of index %s - %s",
3360 index_name, ut_strerr(err));
3361
3362 break;
3363 }
3364 }
3365
3366 que_graph_free(graph);
3367
3368 if (!dict_locked) {
3369 mutex_exit(&dict_sys->mutex);
3370 }
3371
3372 return(err);
3373 }
3374
3375 /** Callback arg for row_import_set_discarded. */
3376 struct discard_t {
3377 ib_uint32_t flags2; /*!< Value read from column */
3378 bool state; /*!< New state of the flag */
3379 ulint n_recs; /*!< Number of recs processed */
3380 };
3381
3382 /******************************************************************//**
3383 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3384 SYS_TABLES. The flags is stored in MIX_LEN column.
3385 @return FALSE if all OK */
3386 static
3387 ibool
row_import_set_discarded(void * row,void * user_arg)3388 row_import_set_discarded(
3389 /*=====================*/
3390 void* row, /*!< in: sel_node_t* */
3391 void* user_arg) /*!< in: bool set/unset flag */
3392 {
3393 sel_node_t* node = static_cast<sel_node_t*>(row);
3394 discard_t* discard = static_cast<discard_t*>(user_arg);
3395 dfield_t* dfield = que_node_get_val(node->select_list);
3396 dtype_t* type = dfield_get_type(dfield);
3397 ulint len = dfield_get_len(dfield);
3398
3399 ut_a(dtype_get_mtype(type) == DATA_INT);
3400 ut_a(len == sizeof(ib_uint32_t));
3401
3402 ulint flags2 = mach_read_from_4(
3403 static_cast<byte*>(dfield_get_data(dfield)));
3404
3405 if (discard->state) {
3406 flags2 |= DICT_TF2_DISCARDED;
3407 } else {
3408 flags2 &= ~DICT_TF2_DISCARDED;
3409 }
3410
3411 mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3412
3413 ++discard->n_recs;
3414
3415 /* There should be at most one matching record. */
3416 ut_a(discard->n_recs == 1);
3417
3418 return(FALSE);
3419 }
3420
3421 /*****************************************************************//**
3422 Update the DICT_TF2_DISCARDED flag in SYS_TABLES.
3423 @return DB_SUCCESS or error code. */
3424 UNIV_INTERN
3425 dberr_t
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded,bool dict_locked)3426 row_import_update_discarded_flag(
3427 /*=============================*/
3428 trx_t* trx, /*!< in/out: transaction that
3429 covers the update */
3430 table_id_t table_id, /*!< in: Table for which we want
3431 to set the root table->flags2 */
3432 bool discarded, /*!< in: set MIX_LEN column bit
3433 to discarded, if true */
3434 bool dict_locked) /*!< in: set to true if the
3435 caller already owns the
3436 dict_sys_t:: mutex. */
3437
3438 {
3439 pars_info_t* info;
3440 discard_t discard;
3441
3442 static const char sql[] =
3443 "PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3444 "DECLARE FUNCTION my_func;\n"
3445 "DECLARE CURSOR c IS\n"
3446 " SELECT MIX_LEN "
3447 " FROM SYS_TABLES "
3448 " WHERE ID = :table_id FOR UPDATE;"
3449 "\n"
3450 "BEGIN\n"
3451 "OPEN c;\n"
3452 "WHILE 1 = 1 LOOP\n"
3453 " FETCH c INTO my_func();\n"
3454 " IF c % NOTFOUND THEN\n"
3455 " EXIT;\n"
3456 " END IF;\n"
3457 "END LOOP;\n"
3458 "UPDATE SYS_TABLES"
3459 " SET MIX_LEN = :flags2"
3460 " WHERE ID = :table_id;\n"
3461 "CLOSE c;\n"
3462 "END;\n";
3463
3464 discard.n_recs = 0;
3465 discard.state = discarded;
3466 discard.flags2 = ULINT32_UNDEFINED;
3467
3468 info = pars_info_create();
3469
3470 pars_info_add_ull_literal(info, "table_id", table_id);
3471 pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3472
3473 pars_info_bind_function(
3474 info, "my_func", row_import_set_discarded, &discard);
3475
3476 dberr_t err = que_eval_sql(info, sql, !dict_locked, trx);
3477
3478 ut_a(discard.n_recs == 1);
3479 ut_a(discard.flags2 != ULINT32_UNDEFINED);
3480
3481 return(err);
3482 }
3483
3484 /*****************************************************************//**
3485 Imports a tablespace. The space id in the .ibd file must match the space id
3486 of the table in the data dictionary.
3487 @return error code or DB_SUCCESS */
3488 UNIV_INTERN
3489 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)3490 row_import_for_mysql(
3491 /*=================*/
3492 dict_table_t* table, /*!< in/out: table */
3493 row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL */
3494 {
3495 dberr_t err;
3496 trx_t* trx;
3497 ib_uint64_t autoinc = 0;
3498 char table_name[MAX_FULL_NAME_LEN + 1];
3499 char* filepath = NULL;
3500
3501 ut_ad(!srv_read_only_mode);
3502
3503 innobase_format_name(
3504 table_name, sizeof(table_name), table->name, FALSE);
3505
3506 ut_a(table->space);
3507 ut_ad(prebuilt->trx);
3508 ut_a(table->ibd_file_missing);
3509
3510 trx_start_if_not_started(prebuilt->trx);
3511
3512 trx = trx_allocate_for_mysql();
3513
3514 /* So that the table is not DROPped during recovery. */
3515 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3516
3517 trx_start_if_not_started(trx);
3518
3519 /* So that we can send error messages to the user. */
3520 trx->mysql_thd = prebuilt->trx->mysql_thd;
3521
3522 /* Ensure that the table will be dropped by trx_rollback_active()
3523 in case of a crash. */
3524
3525 trx->table_id = table->id;
3526
3527 /* Assign an undo segment for the transaction, so that the
3528 transaction will be recovered after a crash. */
3529
3530 mutex_enter(&trx->undo_mutex);
3531
3532 err = trx_undo_assign_undo(trx, TRX_UNDO_UPDATE);
3533
3534 mutex_exit(&trx->undo_mutex);
3535
3536 DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
3537 err = DB_TOO_MANY_CONCURRENT_TRXS;);
3538
3539 if (err != DB_SUCCESS) {
3540
3541 return(row_import_cleanup(prebuilt, trx, err));
3542
3543 } else if (trx->update_undo == 0) {
3544
3545 err = DB_TOO_MANY_CONCURRENT_TRXS;
3546 return(row_import_cleanup(prebuilt, trx, err));
3547 }
3548
3549 prebuilt->trx->op_info = "read meta-data file";
3550
3551 /* Prevent DDL operations while we are checking. */
3552
3553 rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
3554
3555 row_import cfg;
3556 ulint space_flags = 0;
3557
3558 memset(&cfg, 0x0, sizeof(cfg));
3559
3560 err = row_import_read_cfg(table, trx->mysql_thd, cfg);
3561
3562 /* Check if the table column definitions match the contents
3563 of the config file. */
3564
3565 if (err == DB_SUCCESS) {
3566
3567 /* We have a schema file, try and match it with the our
3568 data dictionary. */
3569
3570 err = cfg.match_schema(trx->mysql_thd);
3571
3572 /* Update index->page and SYS_INDEXES.PAGE_NO to match the
3573 B-tree root page numbers in the tablespace. Use the index
3574 name from the .cfg file to find match. */
3575
3576 if (err == DB_SUCCESS) {
3577 cfg.set_root_by_name();
3578 autoinc = cfg.m_autoinc;
3579 }
3580
3581 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3582
3583 DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
3584 err = DB_TOO_MANY_CONCURRENT_TRXS;);
3585
3586 } else if (cfg.m_missing) {
3587
3588 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3589
3590 /* We don't have a schema file, we will have to discover
3591 the index root pages from the .ibd file and skip the schema
3592 matching step. */
3593
3594 ut_a(err == DB_FAIL);
3595
3596 cfg.m_page_size = UNIV_PAGE_SIZE;
3597
3598 FetchIndexRootPages fetchIndexRootPages(table, trx);
3599
3600 err = fil_tablespace_iterate(
3601 table, IO_BUFFER_SIZE(cfg.m_page_size),
3602 fetchIndexRootPages);
3603
3604 if (err == DB_SUCCESS) {
3605
3606 err = fetchIndexRootPages.build_row_import(&cfg);
3607
3608 /* Update index->page and SYS_INDEXES.PAGE_NO
3609 to match the B-tree root page numbers in the
3610 tablespace. */
3611
3612 if (err == DB_SUCCESS) {
3613 err = cfg.set_root_by_heuristic();
3614 }
3615 }
3616
3617 space_flags = fetchIndexRootPages.get_space_flags();
3618
3619 /* If the fsp flag is set for data_dir, but table flag is not
3620 set for data_dir or vice versa then return error. */
3621 if (err == DB_SUCCESS
3622 && FSP_FLAGS_HAS_DATA_DIR(space_flags) !=
3623 DICT_TF_HAS_DATA_DIR(table->flags)) {
3624 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3625 ER_TABLE_SCHEMA_MISMATCH,
3626 "Table location flags do not match. "
3627 "The source table %s a DATA DIRECTORY "
3628 "but the destination table %s.",
3629 (FSP_FLAGS_HAS_DATA_DIR(space_flags) ? "uses"
3630 : "does not use"),
3631 (DICT_TF_HAS_DATA_DIR(table->flags) ? "does"
3632 : "does not"));
3633 err = DB_ERROR;
3634 return(row_import_error(prebuilt, trx, err));
3635 }
3636
3637 } else {
3638 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3639 }
3640
3641 if (err != DB_SUCCESS) {
3642 return(row_import_error(prebuilt, trx, err));
3643 }
3644
3645 prebuilt->trx->op_info = "importing tablespace";
3646
3647 ib_logf(IB_LOG_LEVEL_INFO, "Phase I - Update all pages");
3648
3649 /* Iterate over all the pages and do the sanity checking and
3650 the conversion required to import the tablespace. */
3651
3652 PageConverter converter(&cfg, trx);
3653
3654 /* Set the IO buffer size in pages. */
3655
3656 err = fil_tablespace_iterate(
3657 table, IO_BUFFER_SIZE(cfg.m_page_size), converter);
3658
3659 DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
3660 err = DB_TOO_MANY_CONCURRENT_TRXS;);
3661
3662 if (err != DB_SUCCESS) {
3663 char table_name[MAX_FULL_NAME_LEN + 1];
3664
3665 innobase_format_name(
3666 table_name, sizeof(table_name), table->name, FALSE);
3667
3668 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3669 ER_INTERNAL_ERROR,
3670 "Cannot reset LSNs in table '%s' : %s",
3671 table_name, ut_strerr(err));
3672
3673 return(row_import_cleanup(prebuilt, trx, err));
3674 }
3675
3676 row_mysql_lock_data_dictionary(trx);
3677
3678 /* If the table is stored in a remote tablespace, we need to
3679 determine that filepath from the link file and system tables.
3680 Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
3681 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3682 dict_get_and_save_data_dir_path(table, true);
3683 ut_a(table->data_dir_path);
3684
3685 filepath = os_file_make_remote_pathname(
3686 table->data_dir_path, table->name, "ibd");
3687 } else {
3688 filepath = fil_make_ibd_name(table->name, false);
3689 }
3690 ut_a(filepath);
3691
3692 /* Open the tablespace so that we can access via the buffer pool.
3693 We set the 2nd param (fix_dict = true) here because we already
3694 have an x-lock on dict_operation_lock and dict_sys->mutex. */
3695
3696 err = fil_open_single_table_tablespace(
3697 true, true, table->space,
3698 dict_tf_to_fsp_flags(table->flags),
3699 table->name, filepath);
3700
3701 DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
3702 err = DB_TABLESPACE_NOT_FOUND;);
3703
3704 if (err != DB_SUCCESS) {
3705 row_mysql_unlock_data_dictionary(trx);
3706
3707 ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3708 ER_FILE_NOT_FOUND,
3709 filepath, err, ut_strerr(err));
3710
3711 mem_free(filepath);
3712
3713 return(row_import_cleanup(prebuilt, trx, err));
3714 }
3715
3716 row_mysql_unlock_data_dictionary(trx);
3717
3718 mem_free(filepath);
3719
3720 err = ibuf_check_bitmap_on_import(trx, table->space);
3721
3722 DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
3723
3724 if (err != DB_SUCCESS) {
3725 return(row_import_cleanup(prebuilt, trx, err));
3726 }
3727
3728 /* The first index must always be the clustered index. */
3729
3730 dict_index_t* index = dict_table_get_first_index(table);
3731
3732 if (!dict_index_is_clust(index)) {
3733 return(row_import_error(prebuilt, trx, DB_CORRUPTION));
3734 }
3735
3736 /* Update the Btree segment headers for index node and
3737 leaf nodes in the root page. Set the new space id. */
3738
3739 err = btr_root_adjust_on_import(index);
3740
3741 DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
3742 err = DB_CORRUPTION;);
3743
3744 if (err != DB_SUCCESS) {
3745 return(row_import_error(prebuilt, trx, err));
3746 }
3747
3748 if (err != DB_SUCCESS) {
3749 return(row_import_error(prebuilt, trx, err));
3750 } else if (cfg.requires_purge(index->name)) {
3751
3752 /* Purge any delete-marked records that couldn't be
3753 purged during the page conversion phase from the
3754 cluster index. */
3755
3756 IndexPurge purge(trx, index);
3757
3758 trx->op_info = "cluster: purging delete marked records";
3759
3760 err = purge.garbage_collect();
3761
3762 trx->op_info = "";
3763 }
3764
3765 DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
3766
3767 if (err != DB_SUCCESS) {
3768 return(row_import_error(prebuilt, trx, err));
3769 }
3770
3771 /* For secondary indexes, purge any records that couldn't be purged
3772 during the page conversion phase. */
3773
3774 err = row_import_adjust_root_pages_of_secondary_indexes(
3775 prebuilt, trx, table, cfg);
3776
3777 DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
3778 err = DB_CORRUPTION;);
3779
3780 if (err != DB_SUCCESS) {
3781 return(row_import_error(prebuilt, trx, err));
3782 }
3783
3784 /* Ensure that the next available DB_ROW_ID is not smaller than
3785 any DB_ROW_ID stored in the table. */
3786
3787 if (prebuilt->clust_index_was_generated) {
3788
3789 err = row_import_set_sys_max_row_id(prebuilt, table);
3790
3791 if (err != DB_SUCCESS) {
3792 return(row_import_error(prebuilt, trx, err));
3793 }
3794 }
3795
3796 ib_logf(IB_LOG_LEVEL_INFO, "Phase III - Flush changes to disk");
3797
3798 /* Ensure that all pages dirtied during the IMPORT make it to disk.
3799 The only dirty pages generated should be from the pessimistic purge
3800 of delete marked records that couldn't be purged in Phase I. */
3801
3802 buf_LRU_flush_or_remove_pages(
3803 prebuilt->table->space, BUF_REMOVE_FLUSH_WRITE, trx);
3804
3805 if (trx_is_interrupted(trx)) {
3806 ib_logf(IB_LOG_LEVEL_INFO, "Phase III - Flush interrupted");
3807 return(row_import_error(prebuilt, trx, DB_INTERRUPTED));
3808 } else {
3809 ib_logf(IB_LOG_LEVEL_INFO, "Phase IV - Flush complete");
3810 }
3811
3812 /* The dictionary latches will be released in in row_import_cleanup()
3813 after the transaction commit, for both success and error. */
3814
3815 row_mysql_lock_data_dictionary(trx);
3816
3817 /* Update the root pages of the table's indexes. */
3818 err = row_import_update_index_root(trx, table, false, true);
3819
3820 if (err != DB_SUCCESS) {
3821 return(row_import_error(prebuilt, trx, err));
3822 }
3823
3824 /* Update the table's discarded flag, unset it. */
3825 err = row_import_update_discarded_flag(trx, table->id, false, true);
3826
3827 if (err != DB_SUCCESS) {
3828 return(row_import_error(prebuilt, trx, err));
3829 }
3830
3831 table->ibd_file_missing = false;
3832 table->flags2 &= ~DICT_TF2_DISCARDED;
3833
3834 if (autoinc != 0) {
3835 char table_name[MAX_FULL_NAME_LEN + 1];
3836
3837 innobase_format_name(
3838 table_name, sizeof(table_name), table->name, FALSE);
3839
3840 ib_logf(IB_LOG_LEVEL_INFO, "%s autoinc value set to " IB_ID_FMT,
3841 table_name, autoinc);
3842
3843 dict_table_autoinc_lock(table);
3844 dict_table_autoinc_initialize(table, autoinc);
3845 dict_table_autoinc_unlock(table);
3846 }
3847
3848 ut_a(err == DB_SUCCESS);
3849
3850 return(row_import_cleanup(prebuilt, trx, err));
3851 }
3852
3853