1 /*****************************************************************************
2
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #ifdef HAVE_LZO
44 #include "lzo/lzo1x.h"
45 #endif
46 #ifdef HAVE_SNAPPY
47 #include "snappy-c.h"
48 #endif
49
50 #include <vector>
51
52 #ifdef HAVE_MY_AES_H
53 #include <my_aes.h>
54 #endif
55
56 /** The size of the buffer to use for IO.
57 @param n physical page size
58 @return number of pages */
59 #define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
60
61 /** For gathering stats on records during phase I */
62 struct row_stats_t {
63 ulint m_n_deleted; /*!< Number of deleted records
64 found in the index */
65
66 ulint m_n_purged; /*!< Number of records purged
67 optimisatically */
68
69 ulint m_n_rows; /*!< Number of rows */
70
71 ulint m_n_purge_failed; /*!< Number of deleted rows
72 that could not be purged */
73 };
74
75 /** Index information required by IMPORT. */
76 struct row_index_t {
77 index_id_t m_id; /*!< Index id of the table
78 in the exporting server */
79 byte* m_name; /*!< Index name */
80
81 ulint m_space; /*!< Space where it is placed */
82
83 ulint m_page_no; /*!< Root page number */
84
85 ulint m_type; /*!< Index type */
86
87 ulint m_trx_id_offset; /*!< Relevant only for clustered
88 indexes, offset of transaction
89 id system column */
90
91 ulint m_n_user_defined_cols; /*!< User defined columns */
92
93 ulint m_n_uniq; /*!< Number of columns that can
94 uniquely identify the row */
95
96 ulint m_n_nullable; /*!< Number of nullable
97 columns */
98
99 ulint m_n_fields; /*!< Total number of fields */
100
101 dict_field_t* m_fields; /*!< Index fields */
102
103 const dict_index_t*
104 m_srv_index; /*!< Index instance in the
105 importing server */
106
107 row_stats_t m_stats; /*!< Statistics gathered during
108 the import phase */
109
110 };
111
112 /** Meta data required by IMPORT. */
113 struct row_import {
row_importrow_import114 row_import() UNIV_NOTHROW
115 :
116 m_table(NULL),
117 m_version(0),
118 m_hostname(NULL),
119 m_table_name(NULL),
120 m_autoinc(0),
121 m_page_size(0, 0, false),
122 m_flags(0),
123 m_n_cols(0),
124 m_cols(NULL),
125 m_col_names(NULL),
126 m_n_indexes(0),
127 m_indexes(NULL),
128 m_missing(true) { }
129
130 ~row_import() UNIV_NOTHROW;
131
132 /** Find the index entry in in the indexes array.
133 @param name index name
134 @return instance if found else 0. */
135 row_index_t* get_index(const char* name) const UNIV_NOTHROW;
136
137 /** Get the number of rows in the index.
138 @param name index name
139 @return number of rows (doesn't include delete marked rows). */
140 ulint get_n_rows(const char* name) const UNIV_NOTHROW;
141
142 /** Find the ordinal value of the column name in the cfg table columns.
143 @param name of column to look for.
144 @return ULINT_UNDEFINED if not found. */
145 ulint find_col(const char* name) const UNIV_NOTHROW;
146
147 /** Get the number of rows for which purge failed during the
148 convert phase.
149 @param name index name
150 @return number of rows for which purge failed. */
151 ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
152
153 /** Check if the index is clean. ie. no delete-marked records
154 @param name index name
155 @return true if index needs to be purged. */
requires_purgerow_import156 bool requires_purge(const char* name) const UNIV_NOTHROW
157 {
158 return(get_n_purge_failed(name) > 0);
159 }
160
161 /** Set the index root <space, pageno> using the index name */
162 void set_root_by_name() UNIV_NOTHROW;
163
164 /** Set the index root <space, pageno> using a heuristic
165 @return DB_SUCCESS or error code */
166 dberr_t set_root_by_heuristic() UNIV_NOTHROW;
167
168 /** Check if the index schema that was read from the .cfg file
169 matches the in memory index definition.
170 Note: It will update row_import_t::m_srv_index to map the meta-data
171 read from the .cfg file to the server index instance.
172 @return DB_SUCCESS or error code. */
173 dberr_t match_index_columns(
174 THD* thd,
175 const dict_index_t* index) UNIV_NOTHROW;
176
177 /** Check if the table schema that was read from the .cfg file
178 matches the in memory table definition.
179 @param thd MySQL session variable
180 @return DB_SUCCESS or error code. */
181 dberr_t match_table_columns(
182 THD* thd) UNIV_NOTHROW;
183
184 /** Check if the table (and index) schema that was read from the
185 .cfg file matches the in memory table definition.
186 @param thd MySQL session variable
187 @return DB_SUCCESS or error code. */
188 dberr_t match_schema(
189 THD* thd) UNIV_NOTHROW;
190
191 dict_table_t* m_table; /*!< Table instance */
192
193 ulint m_version; /*!< Version of config file */
194
195 byte* m_hostname; /*!< Hostname where the
196 tablespace was exported */
197 byte* m_table_name; /*!< Exporting instance table
198 name */
199
200 ib_uint64_t m_autoinc; /*!< Next autoinc value */
201
202 page_size_t m_page_size; /*!< Tablespace page size */
203
204 ulint m_flags; /*!< Table flags */
205
206 ulint m_n_cols; /*!< Number of columns in the
207 meta-data file */
208
209 dict_col_t* m_cols; /*!< Column data */
210
211 byte** m_col_names; /*!< Column names, we store the
212 column naems separately becuase
213 there is no field to store the
214 value in dict_col_t */
215
216 ulint m_n_indexes; /*!< Number of indexes,
217 including clustered index */
218
219 row_index_t* m_indexes; /*!< Index meta data */
220
221 bool m_missing; /*!< true if a .cfg file was
222 found and was readable */
223 };
224
225 struct fil_iterator_t {
226 pfs_os_file_t file; /*!< File handle */
227 const char* filepath; /*!< File path name */
228 os_offset_t start; /*!< From where to start */
229 os_offset_t end; /*!< Where to stop */
230 os_offset_t file_size; /*!< File size in bytes */
231 ulint n_io_buffers; /*!< Number of pages to use
232 for IO */
233 byte* io_buffer; /*!< Buffer to use for IO */
234 fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
235 byte* crypt_io_buffer; /*!< IO buffer when encrypted */
236 };
237
238 /** Use the page cursor to iterate over records in a block. */
239 class RecIterator {
240 public:
241 /** Default constructor */
RecIterator()242 RecIterator() UNIV_NOTHROW
243 {
244 memset(&m_cur, 0x0, sizeof(m_cur));
245 }
246
247 /** Position the cursor on the first user record. */
open(buf_block_t * block)248 void open(buf_block_t* block) UNIV_NOTHROW
249 {
250 page_cur_set_before_first(block, &m_cur);
251
252 if (!end()) {
253 next();
254 }
255 }
256
257 /** Move to the next record. */
next()258 void next() UNIV_NOTHROW
259 {
260 page_cur_move_to_next(&m_cur);
261 }
262
263 /**
264 @return the current record */
current()265 rec_t* current() UNIV_NOTHROW
266 {
267 ut_ad(!end());
268 return(page_cur_get_rec(&m_cur));
269 }
270
271 /**
272 @return true if cursor is at the end */
end()273 bool end() UNIV_NOTHROW
274 {
275 return(page_cur_is_after_last(&m_cur) == TRUE);
276 }
277
278 /** Remove the current record
279 @return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,rec_offs * offsets)280 bool remove(
281 const dict_index_t* index,
282 page_zip_des_t* page_zip,
283 rec_offs* offsets) UNIV_NOTHROW
284 {
285 /* We can't end up with an empty page unless it is root. */
286 if (page_get_n_recs(m_cur.block->frame) <= 1) {
287 return(false);
288 }
289
290 return(page_delete_rec(index, &m_cur, page_zip, offsets));
291 }
292
293 private:
294 page_cur_t m_cur;
295 };
296
297 /** Class that purges delete marked reocords from indexes, both secondary
298 and cluster. It does a pessimistic delete. This should only be done if we
299 couldn't purge the delete marked reocrds during Phase I. */
300 class IndexPurge {
301 public:
302 /** Constructor
303 @param trx the user transaction covering the import tablespace
304 @param index to be imported
305 @param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)306 IndexPurge(
307 trx_t* trx,
308 dict_index_t* index) UNIV_NOTHROW
309 :
310 m_trx(trx),
311 m_index(index),
312 m_n_rows(0)
313 {
314 ib::info() << "Phase II - Purge records from index "
315 << index->name;
316 }
317
318 /** Descructor */
~IndexPurge()319 ~IndexPurge() UNIV_NOTHROW { }
320
321 /** Purge delete marked records.
322 @return DB_SUCCESS or error code. */
323 dberr_t garbage_collect() UNIV_NOTHROW;
324
325 /** The number of records that are not delete marked.
326 @return total records in the index after purge */
get_n_rows() const327 ulint get_n_rows() const UNIV_NOTHROW
328 {
329 return(m_n_rows);
330 }
331
332 private:
333 /** Begin import, position the cursor on the first record. */
334 void open() UNIV_NOTHROW;
335
336 /** Close the persistent curosr and commit the mini-transaction. */
337 void close() UNIV_NOTHROW;
338
339 /** Position the cursor on the next record.
340 @return DB_SUCCESS or error code */
341 dberr_t next() UNIV_NOTHROW;
342
343 /** Store the persistent cursor position and reopen the
344 B-tree cursor in BTR_MODIFY_TREE mode, because the
345 tree structure may be changed during a pessimistic delete. */
346 void purge_pessimistic_delete() UNIV_NOTHROW;
347
348 /** Purge delete-marked records.
349 @param offsets current row offsets. */
350 void purge() UNIV_NOTHROW;
351
352 protected:
353 // Disable copying
354 IndexPurge();
355 IndexPurge(const IndexPurge&);
356 IndexPurge &operator=(const IndexPurge&);
357
358 private:
359 trx_t* m_trx; /*!< User transaction */
360 mtr_t m_mtr; /*!< Mini-transaction */
361 btr_pcur_t m_pcur; /*!< Persistent cursor */
362 dict_index_t* m_index; /*!< Index to be processed */
363 ulint m_n_rows; /*!< Records in index */
364 };
365
366 /** Functor that is called for each physical page that is read from the
367 tablespace file. */
368 class AbstractCallback
369 {
370 public:
371 /** Constructor
372 @param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)373 AbstractCallback(trx_t* trx, ulint space_id)
374 :
375 m_page_size(0, 0, false),
376 m_trx(trx),
377 m_space(space_id),
378 m_xdes(),
379 m_xdes_page_no(ULINT_UNDEFINED),
380 m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
381
382 /** Free any extent descriptor instance */
~AbstractCallback()383 virtual ~AbstractCallback()
384 {
385 UT_DELETE_ARRAY(m_xdes);
386 }
387
388 /** Determine the page size to use for traversing the tablespace
389 @param file_size size of the tablespace file in bytes
390 @param block contents of the first page in the tablespace file.
391 @retval DB_SUCCESS or error code. */
392 virtual dberr_t init(
393 os_offset_t file_size,
394 const buf_block_t* block) UNIV_NOTHROW;
395
396 /** @return true if compressed table. */
is_compressed_table() const397 bool is_compressed_table() const UNIV_NOTHROW
398 {
399 return(get_page_size().is_compressed());
400 }
401
402 /** @return the tablespace flags */
get_space_flags() const403 ulint get_space_flags() const
404 {
405 return(m_space_flags);
406 }
407
408 /**
409 Set the name of the physical file and the file handle that is used
410 to open it for the file that is being iterated over.
411 @param filename the physical name of the tablespace file
412 @param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)413 void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
414 {
415 m_file = file;
416 m_filepath = filename;
417 }
418
get_page_size() const419 const page_size_t& get_page_size() const { return m_page_size; }
420
filename() const421 const char* filename() const { return m_filepath; }
422
423 /**
424 Called for every page in the tablespace. If the page was not
425 updated then its state must be set to BUF_PAGE_NOT_USED. For
426 compressed tables the page descriptor memory will be at offset:
427 block->frame + srv_page_size;
428 @param block block read from file, note it is not from the buffer pool
429 @retval DB_SUCCESS or error code. */
430 virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
431
432 /** @return the tablespace identifier */
get_space_id() const433 ulint get_space_id() const { return m_space; }
434
is_interrupted() const435 bool is_interrupted() const { return trx_is_interrupted(m_trx); }
436
437 /**
438 Get the data page depending on the table type, compressed or not.
439 @param block - block read from disk
440 @retval the buffer frame */
get_frame(const buf_block_t * block)441 static byte* get_frame(const buf_block_t* block)
442 {
443 return block->page.zip.data
444 ? block->page.zip.data : block->frame;
445 }
446
447 /** Invoke the functionality for the callback */
448 virtual dberr_t run(const fil_iterator_t& iter,
449 buf_block_t* block) UNIV_NOTHROW = 0;
450
451 protected:
452 /** Get the physical offset of the extent descriptor within the page.
453 @param page_no page number of the extent descriptor
454 @param page contents of the page containing the extent descriptor.
455 @return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const456 const xdes_t* xdes(
457 ulint page_no,
458 const page_t* page) const UNIV_NOTHROW
459 {
460 ulint offset;
461
462 offset = xdes_calc_descriptor_index(get_page_size(), page_no);
463
464 return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
465 }
466
467 /** Set the current page directory (xdes). If the extent descriptor is
468 marked as free then free the current extent descriptor and set it to
469 0. This implies that all pages that are covered by this extent
470 descriptor are also freed.
471
472 @param page_no offset of page within the file
473 @param page page contents
474 @return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)475 dberr_t set_current_xdes(
476 ulint page_no,
477 const page_t* page) UNIV_NOTHROW
478 {
479 m_xdes_page_no = page_no;
480
481 UT_DELETE_ARRAY(m_xdes);
482 m_xdes = NULL;
483
484 ulint state;
485 const xdes_t* xdesc = page + XDES_ARR_OFFSET;
486
487 state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
488
489 if (state != XDES_FREE) {
490
491 m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t,
492 m_page_size.physical());
493
494 /* Trigger OOM */
495 DBUG_EXECUTE_IF(
496 "ib_import_OOM_13",
497 UT_DELETE_ARRAY(m_xdes);
498 m_xdes = NULL;
499 );
500
501 if (m_xdes == NULL) {
502 return(DB_OUT_OF_MEMORY);
503 }
504
505 memcpy(m_xdes, page, m_page_size.physical());
506 }
507
508 return(DB_SUCCESS);
509 }
510
511 /** Check if the page is marked as free in the extent descriptor.
512 @param page_no page number to check in the extent descriptor.
513 @return true if the page is marked as free */
is_free(ulint page_no) const514 bool is_free(ulint page_no) const UNIV_NOTHROW
515 {
516 ut_a(xdes_calc_descriptor_page(get_page_size(), page_no)
517 == m_xdes_page_no);
518
519 if (m_xdes != 0) {
520 const xdes_t* xdesc = xdes(page_no, m_xdes);
521 ulint pos = page_no % FSP_EXTENT_SIZE;
522
523 return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
524 }
525
526 /* If the current xdes was free, the page must be free. */
527 return(true);
528 }
529
530 protected:
531 /** The tablespace page size. */
532 page_size_t m_page_size;
533
534 /** File handle to the tablespace */
535 pfs_os_file_t m_file;
536
537 /** Physical file path. */
538 const char* m_filepath;
539
540 /** Covering transaction. */
541 trx_t* m_trx;
542
543 /** Space id of the file being iterated over. */
544 ulint m_space;
545
546 /** Current size of the space in pages */
547 ulint m_size;
548
549 /** Current extent descriptor page */
550 xdes_t* m_xdes;
551
552 /** Physical page offset in the file of the extent descriptor */
553 ulint m_xdes_page_no;
554
555 /** Flags value read from the header page */
556 ulint m_space_flags;
557 };
558
559 /** Determine the page size to use for traversing the tablespace
560 @param file_size size of the tablespace file in bytes
561 @param block contents of the first page in the tablespace file.
562 @retval DB_SUCCESS or error code. */
563 dberr_t
init(os_offset_t file_size,const buf_block_t * block)564 AbstractCallback::init(
565 os_offset_t file_size,
566 const buf_block_t* block) UNIV_NOTHROW
567 {
568 const page_t* page = block->frame;
569
570 m_space_flags = fsp_header_get_flags(page);
571 if (!fsp_flags_is_valid(m_space_flags, true)) {
572 ulint cflags = fsp_flags_convert_from_101(m_space_flags);
573 if (cflags == ULINT_UNDEFINED) {
574 ib::error() << "Invalid FSP_SPACE_FLAGS="
575 << ib::hex(m_space_flags);
576 return(DB_CORRUPTION);
577 }
578 m_space_flags = cflags;
579 }
580
581 /* Clear the DATA_DIR flag, which is basically garbage. */
582 m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
583 m_page_size.copy_from(page_size_t(m_space_flags));
584
585 if (!is_compressed_table() && !m_page_size.equals_to(univ_page_size)) {
586
587 ib::error() << "Page size " << m_page_size.physical()
588 << " of ibd file is not the same as the server page"
589 " size " << srv_page_size;
590
591 return(DB_CORRUPTION);
592
593 } else if (file_size % m_page_size.physical() != 0) {
594
595 ib::error() << "File size " << file_size << " is not a"
596 " multiple of the page size "
597 << m_page_size.physical();
598
599 return(DB_CORRUPTION);
600 }
601
602 m_size = mach_read_from_4(page + FSP_SIZE);
603 if (m_space == ULINT_UNDEFINED) {
604 m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
605 + page);
606 }
607
608 return set_current_xdes(0, page);
609 }
610
611 /**
612 TODO: This can be made parallel trivially by chunking up the file
613 and creating a callback per thread.. Main benefit will be to use
614 multiple CPUs for checksums and compressed tables. We have to do
615 compressed tables block by block right now. Secondly we need to
616 decompress/compress and copy too much of data. These are
617 CPU intensive.
618
619 Iterate over all the pages in the tablespace.
620 @param iter - Tablespace iterator
621 @param block - block to use for IO
622 @param callback - Callback to inspect and update page contents
623 @retval DB_SUCCESS or error code */
624 static dberr_t fil_iterate(
625 const fil_iterator_t& iter,
626 buf_block_t* block,
627 AbstractCallback& callback);
628
629 /**
630 Try and determine the index root pages by checking if the next/prev
631 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
632 struct FetchIndexRootPages : public AbstractCallback {
633
634 /** Index information gathered from the .ibd file. */
635 struct Index {
636
IndexFetchIndexRootPages::Index637 Index(index_id_t id, ulint page_no)
638 :
639 m_id(id),
640 m_page_no(page_no) { }
641
642 index_id_t m_id; /*!< Index id */
643 ulint m_page_no; /*!< Root page number */
644 };
645
646 /** Constructor
647 @param trx covering (user) transaction
648 @param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages649 FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
650 :
651 AbstractCallback(trx, ULINT_UNDEFINED),
652 m_table(table), m_index(0, 0) UNIV_NOTHROW { }
653
654 /** Destructor */
~FetchIndexRootPagesFetchIndexRootPages655 virtual ~FetchIndexRootPages() UNIV_NOTHROW { }
656
657 /** Fetch the clustered index root page in the tablespace
658 @param iter Tablespace iterator
659 @param block Block to use for IO
660 @retval DB_SUCCESS or error code */
661 dberr_t run(const fil_iterator_t& iter,
662 buf_block_t* block) UNIV_NOTHROW;
663
664 /** Called for each block as it is read from the file.
665 @param block block to convert, it is not from the buffer pool.
666 @retval DB_SUCCESS or error code. */
667 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
668
669 /** Update the import configuration that will be used to import
670 the tablespace. */
671 dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
672
673 /** Table definition in server. */
674 const dict_table_t* m_table;
675
676 /** Index information */
677 Index m_index;
678 };
679
680 /** Called for each block as it is read from the file. Check index pages to
681 determine the exact row format. We can't get that from the tablespace
682 header flags alone.
683
684 @param block block to convert, it is not from the buffer pool.
685 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)686 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
687 {
688 if (is_interrupted()) return DB_INTERRUPTED;
689
690 const page_t* page = get_frame(block);
691
692 m_index.m_id = btr_page_get_index_id(page);
693 m_index.m_page_no = block->page.id.page_no();
694
695 /* Check that the tablespace flags match the table flags. */
696 ulint expected = dict_tf_to_fsp_flags(m_table->flags);
697 if (!fsp_flags_match(expected, m_space_flags)) {
698 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
699 ER_TABLE_SCHEMA_MISMATCH,
700 "Expected FSP_SPACE_FLAGS=0x%x, .ibd "
701 "file contains 0x%x.",
702 unsigned(expected),
703 unsigned(m_space_flags));
704 return(DB_CORRUPTION);
705 }
706
707 return DB_SUCCESS;
708 }
709
710 /**
711 Update the import configuration that will be used to import the tablespace.
712 @return error code or DB_SUCCESS */
713 dberr_t
build_row_import(row_import * cfg) const714 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
715 {
716 ut_a(cfg->m_table == m_table);
717 cfg->m_page_size.copy_from(m_page_size);
718 cfg->m_n_indexes = 1;
719
720 if (cfg->m_n_indexes == 0) {
721
722 ib::error() << "No B+Tree found in tablespace";
723
724 return(DB_CORRUPTION);
725 }
726
727 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
728
729 /* Trigger OOM */
730 DBUG_EXECUTE_IF(
731 "ib_import_OOM_11",
732 UT_DELETE_ARRAY(cfg->m_indexes);
733 cfg->m_indexes = NULL;
734 );
735
736 if (cfg->m_indexes == NULL) {
737 return(DB_OUT_OF_MEMORY);
738 }
739
740 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
741
742 row_index_t* cfg_index = cfg->m_indexes;
743
744 char name[BUFSIZ];
745
746 snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
747
748 ulint len = strlen(name) + 1;
749
750 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
751
752 /* Trigger OOM */
753 DBUG_EXECUTE_IF(
754 "ib_import_OOM_12",
755 UT_DELETE_ARRAY(cfg_index->m_name);
756 cfg_index->m_name = NULL;
757 );
758
759 if (cfg_index->m_name == NULL) {
760 return(DB_OUT_OF_MEMORY);
761 }
762
763 memcpy(cfg_index->m_name, name, len);
764
765 cfg_index->m_id = m_index.m_id;
766
767 cfg_index->m_space = m_space;
768
769 cfg_index->m_page_no = m_index.m_page_no;
770
771 return(DB_SUCCESS);
772 }
773
774 /* Functor that is called for each physical page that is read from the
775 tablespace file.
776
777 1. Check each page for corruption.
778
779 2. Update the space id and LSN on every page
780 * For the header page
781 - Validate the flags
782 - Update the LSN
783
784 3. On Btree pages
785 * Set the index id
786 * Update the max trx id
787 * In a cluster index, update the system columns
788 * In a cluster index, update the BLOB ptr, set the space id
789 * Purge delete marked records, but only if they can be easily
790 removed from the page
791 * Keep a counter of number of rows, ie. non-delete-marked rows
792 * Keep a counter of number of delete marked rows
793 * Keep a counter of number of purge failure
794 * If a page is stamped with an index id that isn't in the .cfg file
795 we assume it is deleted and the page can be ignored.
796
797 4. Set the page state to dirty so that it will be written to disk.
798 */
799 class PageConverter : public AbstractCallback {
800 public:
801 /** Constructor
802 @param cfg config of table being imported.
803 @param space_id tablespace identifier
804 @param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)805 PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
806 :
807 AbstractCallback(trx, space_id),
808 m_cfg(cfg),
809 m_index(cfg->m_indexes),
810 m_current_lsn(log_get_lsn()),
811 m_page_zip_ptr(0),
812 m_rec_iter(),
813 m_offsets_(), m_offsets(m_offsets_),
814 m_heap(0),
815 m_cluster_index(dict_table_get_first_index(cfg->m_table))
816 {
817 ut_ad(m_current_lsn);
818 rec_offs_init(m_offsets_);
819 }
820
~PageConverter()821 virtual ~PageConverter() UNIV_NOTHROW
822 {
823 if (m_heap != 0) {
824 mem_heap_free(m_heap);
825 }
826 }
827
run(const fil_iterator_t & iter,buf_block_t * block)828 dberr_t run(const fil_iterator_t& iter, buf_block_t* block) UNIV_NOTHROW
829 {
830 return fil_iterate(iter, block, *this);
831 }
832
833 /** Called for each block as it is read from the file.
834 @param block block to convert, it is not from the buffer pool.
835 @retval DB_SUCCESS or error code. */
836 dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
837 private:
838 /** Update the page, set the space id, max trx id and index id.
839 @param block block read from file
840 @param page_type type of the page
841 @retval DB_SUCCESS or error code */
842 dberr_t update_page(
843 buf_block_t* block,
844 ulint& page_type) UNIV_NOTHROW;
845
846 /** Update the space, index id, trx id.
847 @param block block to convert
848 @return DB_SUCCESS or error code */
849 dberr_t update_index_page(buf_block_t* block) UNIV_NOTHROW;
850
851 /** Update the BLOB refrences and write UNDO log entries for
852 rows that can't be purged optimistically.
853 @param block block to update
854 @retval DB_SUCCESS or error code */
855 dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
856
857 /** Validate the space flags and update tablespace header page.
858 @param block block read from file, not from the buffer pool.
859 @retval DB_SUCCESS or error code */
860 dberr_t update_header(buf_block_t* block) UNIV_NOTHROW;
861
862 /** Adjust the BLOB reference for a single column that is externally stored
863 @param rec record to update
864 @param offsets column offsets for the record
865 @param i column ordinal value
866 @return DB_SUCCESS or error code */
867 dberr_t adjust_cluster_index_blob_column(
868 rec_t* rec,
869 const rec_offs* offsets,
870 ulint i) UNIV_NOTHROW;
871
872 /** Adjusts the BLOB reference in the clustered index row for all
873 externally stored columns.
874 @param rec record to update
875 @param offsets column offsets for the record
876 @return DB_SUCCESS or error code */
877 dberr_t adjust_cluster_index_blob_columns(
878 rec_t* rec,
879 const rec_offs* offsets) UNIV_NOTHROW;
880
881 /** In the clustered index, adjist the BLOB pointers as needed.
882 Also update the BLOB reference, write the new space id.
883 @param rec record to update
884 @param offsets column offsets for the record
885 @return DB_SUCCESS or error code */
886 dberr_t adjust_cluster_index_blob_ref(
887 rec_t* rec,
888 const rec_offs* offsets) UNIV_NOTHROW;
889
890 /** Purge delete-marked records, only if it is possible to do
891 so without re-organising the B+tree.
892 @retval true if purged */
893 bool purge() UNIV_NOTHROW;
894
895 /** Adjust the BLOB references and sys fields for the current record.
896 @param rec record to update
897 @param offsets column offsets for the record
898 @return DB_SUCCESS or error code. */
899 dberr_t adjust_cluster_record(
900 rec_t* rec,
901 const rec_offs* offsets) UNIV_NOTHROW;
902
903 /** Find an index with the matching id.
904 @return row_index_t* instance or 0 */
find_index(index_id_t id)905 row_index_t* find_index(index_id_t id) UNIV_NOTHROW
906 {
907 row_index_t* index = &m_cfg->m_indexes[0];
908
909 for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
910 if (id == index->m_id) {
911 return(index);
912 }
913 }
914
915 return(0);
916
917 }
918 private:
919 /** Config for table that is being imported. */
920 row_import* m_cfg;
921
922 /** Current index whose pages are being imported */
923 row_index_t* m_index;
924
925 /** Current system LSN */
926 lsn_t m_current_lsn;
927
928 /** Alias for m_page_zip, only set for compressed pages. */
929 page_zip_des_t* m_page_zip_ptr;
930
931 /** Iterator over records in a block */
932 RecIterator m_rec_iter;
933
934 /** Record offset */
935 rec_offs m_offsets_[REC_OFFS_NORMAL_SIZE];
936
937 /** Pointer to m_offsets_ */
938 rec_offs* m_offsets;
939
940 /** Memory heap for the record offsets */
941 mem_heap_t* m_heap;
942
943 /** Cluster index instance */
944 dict_index_t* m_cluster_index;
945 };
946
947 /**
948 row_import destructor. */
~row_import()949 row_import::~row_import() UNIV_NOTHROW
950 {
951 for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
952 UT_DELETE_ARRAY(m_indexes[i].m_name);
953
954 if (m_indexes[i].m_fields == NULL) {
955 continue;
956 }
957
958 dict_field_t* fields = m_indexes[i].m_fields;
959 ulint n_fields = m_indexes[i].m_n_fields;
960
961 for (ulint j = 0; j < n_fields; ++j) {
962 UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
963 }
964
965 UT_DELETE_ARRAY(fields);
966 }
967
968 for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
969 UT_DELETE_ARRAY(m_col_names[i]);
970 }
971
972 UT_DELETE_ARRAY(m_cols);
973 UT_DELETE_ARRAY(m_indexes);
974 UT_DELETE_ARRAY(m_col_names);
975 UT_DELETE_ARRAY(m_table_name);
976 UT_DELETE_ARRAY(m_hostname);
977 }
978
979 /** Find the index entry in in the indexes array.
980 @param name index name
981 @return instance if found else 0. */
982 row_index_t*
get_index(const char * name) const983 row_import::get_index(
984 const char* name) const UNIV_NOTHROW
985 {
986 for (ulint i = 0; i < m_n_indexes; ++i) {
987 const char* index_name;
988 row_index_t* index = &m_indexes[i];
989
990 index_name = reinterpret_cast<const char*>(index->m_name);
991
992 if (strcmp(index_name, name) == 0) {
993
994 return(index);
995 }
996 }
997
998 return(0);
999 }
1000
1001 /** Get the number of rows in the index.
1002 @param name index name
1003 @return number of rows (doesn't include delete marked rows). */
1004 ulint
get_n_rows(const char * name) const1005 row_import::get_n_rows(
1006 const char* name) const UNIV_NOTHROW
1007 {
1008 const row_index_t* index = get_index(name);
1009
1010 ut_a(name != 0);
1011
1012 return(index->m_stats.m_n_rows);
1013 }
1014
1015 /** Get the number of rows for which purge failed uding the convert phase.
1016 @param name index name
1017 @return number of rows for which purge failed. */
1018 ulint
get_n_purge_failed(const char * name) const1019 row_import::get_n_purge_failed(
1020 const char* name) const UNIV_NOTHROW
1021 {
1022 const row_index_t* index = get_index(name);
1023
1024 ut_a(name != 0);
1025
1026 return(index->m_stats.m_n_purge_failed);
1027 }
1028
1029 /** Find the ordinal value of the column name in the cfg table columns.
1030 @param name of column to look for.
1031 @return ULINT_UNDEFINED if not found. */
1032 ulint
find_col(const char * name) const1033 row_import::find_col(
1034 const char* name) const UNIV_NOTHROW
1035 {
1036 for (ulint i = 0; i < m_n_cols; ++i) {
1037 const char* col_name;
1038
1039 col_name = reinterpret_cast<const char*>(m_col_names[i]);
1040
1041 if (strcmp(col_name, name) == 0) {
1042 return(i);
1043 }
1044 }
1045
1046 return(ULINT_UNDEFINED);
1047 }
1048
1049 /**
1050 Check if the index schema that was read from the .cfg file matches the
1051 in memory index definition.
1052 @return DB_SUCCESS or error code. */
1053 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1054 row_import::match_index_columns(
1055 THD* thd,
1056 const dict_index_t* index) UNIV_NOTHROW
1057 {
1058 row_index_t* cfg_index;
1059 dberr_t err = DB_SUCCESS;
1060
1061 cfg_index = get_index(index->name);
1062
1063 if (cfg_index == 0) {
1064 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1065 ER_TABLE_SCHEMA_MISMATCH,
1066 "Index %s not found in tablespace meta-data file.",
1067 index->name());
1068
1069 return(DB_ERROR);
1070 }
1071
1072 if (cfg_index->m_n_fields != index->n_fields) {
1073
1074 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1075 ER_TABLE_SCHEMA_MISMATCH,
1076 "Index field count %u doesn't match"
1077 " tablespace metadata file value " ULINTPF,
1078 index->n_fields, cfg_index->m_n_fields);
1079
1080 return(DB_ERROR);
1081 }
1082
1083 cfg_index->m_srv_index = index;
1084
1085 const dict_field_t* field = index->fields;
1086 const dict_field_t* cfg_field = cfg_index->m_fields;
1087
1088 for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1089
1090 if (strcmp(field->name(), cfg_field->name()) != 0) {
1091 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1092 ER_TABLE_SCHEMA_MISMATCH,
1093 "Index field name %s doesn't match"
1094 " tablespace metadata field name %s"
1095 " for field position " ULINTPF,
1096 field->name(), cfg_field->name(), i);
1097
1098 err = DB_ERROR;
1099 }
1100
1101 if (cfg_field->prefix_len != field->prefix_len) {
1102 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1103 ER_TABLE_SCHEMA_MISMATCH,
1104 "Index %s field %s prefix len %u"
1105 " doesn't match metadata file value %u",
1106 index->name(), field->name(),
1107 field->prefix_len, cfg_field->prefix_len);
1108
1109 err = DB_ERROR;
1110 }
1111
1112 if (cfg_field->fixed_len != field->fixed_len) {
1113 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1114 ER_TABLE_SCHEMA_MISMATCH,
1115 "Index %s field %s fixed len %u"
1116 " doesn't match metadata file value %u",
1117 index->name(), field->name(),
1118 field->fixed_len,
1119 cfg_field->fixed_len);
1120
1121 err = DB_ERROR;
1122 }
1123 }
1124
1125 return(err);
1126 }
1127
1128 /** Check if the table schema that was read from the .cfg file matches the
1129 in memory table definition.
1130 @param thd MySQL session variable
1131 @return DB_SUCCESS or error code. */
1132 dberr_t
match_table_columns(THD * thd)1133 row_import::match_table_columns(
1134 THD* thd) UNIV_NOTHROW
1135 {
1136 dberr_t err = DB_SUCCESS;
1137 const dict_col_t* col = m_table->cols;
1138
1139 for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1140
1141 const char* col_name;
1142 ulint cfg_col_index;
1143
1144 col_name = dict_table_get_col_name(
1145 m_table, dict_col_get_no(col));
1146
1147 cfg_col_index = find_col(col_name);
1148
1149 if (cfg_col_index == ULINT_UNDEFINED) {
1150
1151 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1152 ER_TABLE_SCHEMA_MISMATCH,
1153 "Column %s not found in tablespace.",
1154 col_name);
1155
1156 err = DB_ERROR;
1157 } else if (cfg_col_index != col->ind) {
1158
1159 ib_errf(thd, IB_LOG_LEVEL_ERROR,
1160 ER_TABLE_SCHEMA_MISMATCH,
1161 "Column %s ordinal value mismatch, it's at %u"
1162 " in the table and " ULINTPF
1163 " in the tablespace meta-data file",
1164 col_name, col->ind, cfg_col_index);
1165
1166 err = DB_ERROR;
1167 } else {
1168 const dict_col_t* cfg_col;
1169
1170 cfg_col = &m_cols[cfg_col_index];
1171 ut_a(cfg_col->ind == cfg_col_index);
1172
1173 if (cfg_col->prtype != col->prtype) {
1174 ib_errf(thd,
1175 IB_LOG_LEVEL_ERROR,
1176 ER_TABLE_SCHEMA_MISMATCH,
1177 "Column %s precise type mismatch,"
1178 " it's 0X%X in the table and 0X%X"
1179 " in the tablespace meta file",
1180 col_name, col->prtype, cfg_col->prtype);
1181 err = DB_ERROR;
1182 }
1183
1184 if (cfg_col->mtype != col->mtype) {
1185 ib_errf(thd,
1186 IB_LOG_LEVEL_ERROR,
1187 ER_TABLE_SCHEMA_MISMATCH,
1188 "Column %s main type mismatch,"
1189 " it's 0X%X in the table and 0X%X"
1190 " in the tablespace meta file",
1191 col_name, col->mtype, cfg_col->mtype);
1192 err = DB_ERROR;
1193 }
1194
1195 if (cfg_col->len != col->len) {
1196 ib_errf(thd,
1197 IB_LOG_LEVEL_ERROR,
1198 ER_TABLE_SCHEMA_MISMATCH,
1199 "Column %s length mismatch,"
1200 " it's %u in the table and %u"
1201 " in the tablespace meta file",
1202 col_name, col->len, cfg_col->len);
1203 err = DB_ERROR;
1204 }
1205
1206 if (cfg_col->mbminlen != col->mbminlen
1207 || cfg_col->mbmaxlen != col->mbmaxlen) {
1208 ib_errf(thd,
1209 IB_LOG_LEVEL_ERROR,
1210 ER_TABLE_SCHEMA_MISMATCH,
1211 "Column %s multi-byte len mismatch,"
1212 " it's %u-%u in the table and %u-%u"
1213 " in the tablespace meta file",
1214 col_name, col->mbminlen, col->mbmaxlen,
1215 cfg_col->mbminlen, cfg_col->mbmaxlen);
1216 err = DB_ERROR;
1217 }
1218
1219 if (cfg_col->ind != col->ind) {
1220 ib_errf(thd,
1221 IB_LOG_LEVEL_ERROR,
1222 ER_TABLE_SCHEMA_MISMATCH,
1223 "Column %s position mismatch,"
1224 " it's %u in the table and %u"
1225 " in the tablespace meta file",
1226 col_name, col->ind, cfg_col->ind);
1227 err = DB_ERROR;
1228 }
1229
1230 if (cfg_col->ord_part != col->ord_part) {
1231 ib_errf(thd,
1232 IB_LOG_LEVEL_ERROR,
1233 ER_TABLE_SCHEMA_MISMATCH,
1234 "Column %s ordering mismatch,"
1235 " it's %u in the table and %u"
1236 " in the tablespace meta file",
1237 col_name, col->ord_part,
1238 cfg_col->ord_part);
1239 err = DB_ERROR;
1240 }
1241
1242 if (cfg_col->max_prefix != col->max_prefix) {
1243 ib_errf(thd,
1244 IB_LOG_LEVEL_ERROR,
1245 ER_TABLE_SCHEMA_MISMATCH,
1246 "Column %s max prefix mismatch"
1247 " it's %u in the table and %u"
1248 " in the tablespace meta file",
1249 col_name, col->max_prefix,
1250 cfg_col->max_prefix);
1251 err = DB_ERROR;
1252 }
1253 }
1254 }
1255
1256 return(err);
1257 }
1258
1259 /** Check if the table (and index) schema that was read from the .cfg file
1260 matches the in memory table definition.
1261 @param thd MySQL session variable
1262 @return DB_SUCCESS or error code. */
1263 dberr_t
match_schema(THD * thd)1264 row_import::match_schema(
1265 THD* thd) UNIV_NOTHROW
1266 {
1267 /* Do some simple checks. */
1268
1269 if (ulint mismatch = (m_table->flags ^ m_flags)
1270 & ~DICT_TF_MASK_DATA_DIR) {
1271 const char* msg;
1272 if (mismatch & DICT_TF_MASK_ZIP_SSIZE) {
1273 if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE)
1274 && (m_flags & DICT_TF_MASK_ZIP_SSIZE)) {
1275 switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1276 case 0U << DICT_TF_POS_ZIP_SSIZE:
1277 goto uncompressed;
1278 case 1U << DICT_TF_POS_ZIP_SSIZE:
1279 msg = "ROW_FORMAT=COMPRESSED"
1280 " KEY_BLOCK_SIZE=1";
1281 break;
1282 case 2U << DICT_TF_POS_ZIP_SSIZE:
1283 msg = "ROW_FORMAT=COMPRESSED"
1284 " KEY_BLOCK_SIZE=2";
1285 break;
1286 case 3U << DICT_TF_POS_ZIP_SSIZE:
1287 msg = "ROW_FORMAT=COMPRESSED"
1288 " KEY_BLOCK_SIZE=4";
1289 break;
1290 case 4U << DICT_TF_POS_ZIP_SSIZE:
1291 msg = "ROW_FORMAT=COMPRESSED"
1292 " KEY_BLOCK_SIZE=8";
1293 break;
1294 case 5U << DICT_TF_POS_ZIP_SSIZE:
1295 msg = "ROW_FORMAT=COMPRESSED"
1296 " KEY_BLOCK_SIZE=16";
1297 break;
1298 default:
1299 msg = "strange KEY_BLOCK_SIZE";
1300 }
1301 } else if (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1302 msg = "ROW_FORMAT=COMPRESSED";
1303 } else {
1304 goto uncompressed;
1305 }
1306 } else {
1307 uncompressed:
1308 msg = (m_flags & DICT_TF_MASK_ATOMIC_BLOBS)
1309 ? "ROW_FORMAT=DYNAMIC"
1310 : (m_flags & DICT_TF_MASK_COMPACT)
1311 ? "ROW_FORMAT=COMPACT"
1312 : "ROW_FORMAT=REDUNDANT";
1313 }
1314
1315 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1316 "Table flags don't match, server table has 0x%x"
1317 " and the meta-data file has 0x" ULINTPFx ";"
1318 " .cfg file uses %s",
1319 m_table->flags, m_flags, msg);
1320
1321 return(DB_ERROR);
1322 } else if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1323
1324 /* If the number of indexes don't match then it is better
1325 to abort the IMPORT. It is easy for the user to create a
1326 table matching the IMPORT definition. */
1327
1328 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1329 "Number of indexes don't match, table has " ULINTPF
1330 " indexes but the tablespace meta-data file has "
1331 ULINTPF " indexes",
1332 UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1333
1334 return(DB_ERROR);
1335 }
1336
1337 dberr_t err = match_table_columns(thd);
1338
1339 if (err != DB_SUCCESS) {
1340 return(err);
1341 }
1342
1343 /* Check if the index definitions match. */
1344
1345 const dict_index_t* index;
1346
1347 for (index = UT_LIST_GET_FIRST(m_table->indexes);
1348 index != 0;
1349 index = UT_LIST_GET_NEXT(indexes, index)) {
1350
1351 dberr_t index_err;
1352
1353 index_err = match_index_columns(thd, index);
1354
1355 if (index_err != DB_SUCCESS) {
1356 err = index_err;
1357 }
1358 }
1359
1360 return(err);
1361 }
1362
1363 /**
1364 Set the index root <space, pageno>, using index name. */
1365 void
set_root_by_name()1366 row_import::set_root_by_name() UNIV_NOTHROW
1367 {
1368 row_index_t* cfg_index = m_indexes;
1369
1370 for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1371 dict_index_t* index;
1372
1373 const char* index_name;
1374
1375 index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1376
1377 index = dict_table_get_index_on_name(m_table, index_name);
1378
1379 /* We've already checked that it exists. */
1380 ut_a(index != 0);
1381
1382 index->page = cfg_index->m_page_no;
1383 }
1384 }
1385
1386 /**
1387 Set the index root <space, pageno>, using a heuristic.
1388 @return DB_SUCCESS or error code */
1389 dberr_t
set_root_by_heuristic()1390 row_import::set_root_by_heuristic() UNIV_NOTHROW
1391 {
1392 row_index_t* cfg_index = m_indexes;
1393
1394 ut_a(m_n_indexes > 0);
1395
1396 // TODO: For now use brute force, based on ordinality
1397
1398 if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1399
1400 ib::warn() << "Table " << m_table->name << " should have "
1401 << UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1402 " the tablespace has " << m_n_indexes << " indexes";
1403 }
1404
1405 dict_mutex_enter_for_mysql();
1406
1407 ulint i = 0;
1408 dberr_t err = DB_SUCCESS;
1409
1410 for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1411 index != 0;
1412 index = UT_LIST_GET_NEXT(indexes, index)) {
1413
1414 if (index->type & DICT_FTS) {
1415 index->type |= DICT_CORRUPT;
1416 ib::warn() << "Skipping FTS index: " << index->name;
1417 } else if (i < m_n_indexes) {
1418
1419 UT_DELETE_ARRAY(cfg_index[i].m_name);
1420
1421 ulint len = strlen(index->name) + 1;
1422
1423 cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1424
1425 /* Trigger OOM */
1426 DBUG_EXECUTE_IF(
1427 "ib_import_OOM_14",
1428 UT_DELETE_ARRAY(cfg_index[i].m_name);
1429 cfg_index[i].m_name = NULL;
1430 );
1431
1432 if (cfg_index[i].m_name == NULL) {
1433 err = DB_OUT_OF_MEMORY;
1434 break;
1435 }
1436
1437 memcpy(cfg_index[i].m_name, index->name, len);
1438
1439 cfg_index[i].m_srv_index = index;
1440
1441 index->page = cfg_index[i].m_page_no;
1442
1443 ++i;
1444 }
1445 }
1446
1447 dict_mutex_exit_for_mysql();
1448
1449 return(err);
1450 }
1451
1452 /**
1453 Purge delete marked records.
1454 @return DB_SUCCESS or error code. */
1455 dberr_t
garbage_collect()1456 IndexPurge::garbage_collect() UNIV_NOTHROW
1457 {
1458 dberr_t err;
1459 ibool comp = dict_table_is_comp(m_index->table);
1460
1461 /* Open the persistent cursor and start the mini-transaction. */
1462
1463 open();
1464
1465 while ((err = next()) == DB_SUCCESS) {
1466
1467 rec_t* rec = btr_pcur_get_rec(&m_pcur);
1468 ibool deleted = rec_get_deleted_flag(rec, comp);
1469
1470 if (!deleted) {
1471 ++m_n_rows;
1472 } else {
1473 purge();
1474 }
1475 }
1476
1477 /* Close the persistent cursor and commit the mini-transaction. */
1478
1479 close();
1480
1481 return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1482 }
1483
1484 /**
1485 Begin import, position the cursor on the first record. */
1486 void
open()1487 IndexPurge::open() UNIV_NOTHROW
1488 {
1489 mtr_start(&m_mtr);
1490
1491 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1492
1493 btr_pcur_open_at_index_side(
1494 true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1495 btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1496 if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), m_index)) {
1497 ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1498 /* Skip the metadata pseudo-record. */
1499 } else {
1500 btr_pcur_move_to_prev_on_page(&m_pcur);
1501 }
1502 }
1503
1504 /**
1505 Close the persistent curosr and commit the mini-transaction. */
1506 void
close()1507 IndexPurge::close() UNIV_NOTHROW
1508 {
1509 btr_pcur_close(&m_pcur);
1510 mtr_commit(&m_mtr);
1511 }
1512
1513 /**
1514 Position the cursor on the next record.
1515 @return DB_SUCCESS or error code */
1516 dberr_t
next()1517 IndexPurge::next() UNIV_NOTHROW
1518 {
1519 btr_pcur_move_to_next_on_page(&m_pcur);
1520
1521 /* When switching pages, commit the mini-transaction
1522 in order to release the latch on the old page. */
1523
1524 if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1525 return(DB_SUCCESS);
1526 } else if (trx_is_interrupted(m_trx)) {
1527 /* Check after every page because the check
1528 is expensive. */
1529 return(DB_INTERRUPTED);
1530 }
1531
1532 btr_pcur_store_position(&m_pcur, &m_mtr);
1533
1534 mtr_commit(&m_mtr);
1535
1536 mtr_start(&m_mtr);
1537
1538 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1539
1540 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1541 /* The following is based on btr_pcur_move_to_next_user_rec(). */
1542 m_pcur.old_stored = false;
1543 ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1544 do {
1545 if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1546 if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1547 return DB_END_OF_INDEX;
1548 }
1549
1550 buf_block_t* block = btr_pcur_get_block(&m_pcur);
1551 uint32_t next_page = btr_page_get_next(block->frame);
1552
1553 /* MDEV-13542 FIXME: Make these checks part of
1554 btr_pcur_move_to_next_page(), and introduce a
1555 return status that will be checked in all callers! */
1556 switch (next_page) {
1557 default:
1558 if (next_page != block->page.id.page_no()) {
1559 break;
1560 }
1561 /* MDEV-20931 FIXME: Check that
1562 next_page is within the tablespace
1563 bounds! Also check that it is not a
1564 change buffer bitmap page. */
1565 /* fall through */
1566 case 0:
1567 case 1:
1568 case FIL_NULL:
1569 return DB_CORRUPTION;
1570 }
1571
1572 dict_index_t* index = m_pcur.btr_cur.index;
1573 buf_block_t* next_block = btr_block_get(
1574 page_id_t(block->page.id.space(), next_page),
1575 block->page.size, BTR_MODIFY_LEAF, index,
1576 &m_mtr);
1577
1578 if (UNIV_UNLIKELY(!next_block
1579 || !fil_page_index_page_check(
1580 next_block->frame)
1581 || !!dict_index_is_spatial(index)
1582 != (fil_page_get_type(
1583 next_block->frame)
1584 == FIL_PAGE_RTREE)
1585 || page_is_comp(next_block->frame)
1586 != page_is_comp(block->frame)
1587 || btr_page_get_prev(
1588 next_block->frame)
1589 != block->page.id.page_no())) {
1590 return DB_CORRUPTION;
1591 }
1592
1593 btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1594
1595 page_cur_set_before_first(next_block,
1596 &m_pcur.btr_cur.page_cur);
1597
1598 ut_d(page_check_dir(next_block->frame));
1599 } else {
1600 btr_pcur_move_to_next_on_page(&m_pcur);
1601 }
1602 } while (!btr_pcur_is_on_user_rec(&m_pcur));
1603
1604 return DB_SUCCESS;
1605 }
1606
1607 /**
1608 Store the persistent cursor position and reopen the
1609 B-tree cursor in BTR_MODIFY_TREE mode, because the
1610 tree structure may be changed during a pessimistic delete. */
1611 void
purge_pessimistic_delete()1612 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1613 {
1614 dberr_t err;
1615
1616 btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1617 &m_pcur, &m_mtr);
1618
1619 ut_ad(rec_get_deleted_flag(
1620 btr_pcur_get_rec(&m_pcur),
1621 dict_table_is_comp(m_index->table)));
1622
1623 btr_cur_pessimistic_delete(
1624 &err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1625
1626 ut_a(err == DB_SUCCESS);
1627
1628 /* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1629 mtr_commit(&m_mtr);
1630 }
1631
1632 /**
1633 Purge delete-marked records. */
1634 void
purge()1635 IndexPurge::purge() UNIV_NOTHROW
1636 {
1637 btr_pcur_store_position(&m_pcur, &m_mtr);
1638
1639 purge_pessimistic_delete();
1640
1641 mtr_start(&m_mtr);
1642
1643 mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1644
1645 btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1646 }
1647
1648 /** Adjust the BLOB reference for a single column that is externally stored
1649 @param rec record to update
1650 @param offsets column offsets for the record
1651 @param i column ordinal value
1652 @return DB_SUCCESS or error code */
1653 inline
1654 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1655 PageConverter::adjust_cluster_index_blob_column(
1656 rec_t* rec,
1657 const rec_offs* offsets,
1658 ulint i) UNIV_NOTHROW
1659 {
1660 ulint len;
1661 byte* field;
1662
1663 field = rec_get_nth_field(rec, offsets, i, &len);
1664
1665 DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1666 len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1667
1668 if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1669
1670 ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1671 ER_INNODB_INDEX_CORRUPT,
1672 "Externally stored column(" ULINTPF
1673 ") has a reference length of " ULINTPF
1674 " in the cluster index %s",
1675 i, len, m_cluster_index->name());
1676
1677 return(DB_CORRUPTION);
1678 }
1679
1680 field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1681
1682 mach_write_to_4(field, get_space_id());
1683
1684 if (m_page_zip_ptr) {
1685 page_zip_write_blob_ptr(
1686 m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1687 }
1688
1689 return(DB_SUCCESS);
1690 }
1691
1692 /** Adjusts the BLOB reference in the clustered index row for all externally
1693 stored columns.
1694 @param rec record to update
1695 @param offsets column offsets for the record
1696 @return DB_SUCCESS or error code */
1697 inline
1698 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1699 PageConverter::adjust_cluster_index_blob_columns(
1700 rec_t* rec,
1701 const rec_offs* offsets) UNIV_NOTHROW
1702 {
1703 ut_ad(rec_offs_any_extern(offsets));
1704
1705 /* Adjust the space_id in the BLOB pointers. */
1706
1707 for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1708
1709 /* Only if the column is stored "externally". */
1710
1711 if (rec_offs_nth_extern(offsets, i)) {
1712 dberr_t err;
1713
1714 err = adjust_cluster_index_blob_column(rec, offsets, i);
1715
1716 if (err != DB_SUCCESS) {
1717 return(err);
1718 }
1719 }
1720 }
1721
1722 return(DB_SUCCESS);
1723 }
1724
1725 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1726 BLOB reference, write the new space id.
1727 @param rec record to update
1728 @param offsets column offsets for the record
1729 @return DB_SUCCESS or error code */
1730 inline
1731 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1732 PageConverter::adjust_cluster_index_blob_ref(
1733 rec_t* rec,
1734 const rec_offs* offsets) UNIV_NOTHROW
1735 {
1736 if (rec_offs_any_extern(offsets)) {
1737 dberr_t err;
1738
1739 err = adjust_cluster_index_blob_columns(rec, offsets);
1740
1741 if (err != DB_SUCCESS) {
1742 return(err);
1743 }
1744 }
1745
1746 return(DB_SUCCESS);
1747 }
1748
1749 /** Purge delete-marked records, only if it is possible to do so without
1750 re-organising the B+tree.
1751 @return true if purge succeeded */
purge()1752 inline bool PageConverter::purge() UNIV_NOTHROW
1753 {
1754 const dict_index_t* index = m_index->m_srv_index;
1755
1756 /* We can't have a page that is empty and not root. */
1757 if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1758
1759 ++m_index->m_stats.m_n_purged;
1760
1761 return(true);
1762 } else {
1763 ++m_index->m_stats.m_n_purge_failed;
1764 }
1765
1766 return(false);
1767 }
1768
1769 /** Adjust the BLOB references and sys fields for the current record.
1770 @param rec record to update
1771 @param offsets column offsets for the record
1772 @return DB_SUCCESS or error code. */
1773 inline
1774 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1775 PageConverter::adjust_cluster_record(
1776 rec_t* rec,
1777 const rec_offs* offsets) UNIV_NOTHROW
1778 {
1779 dberr_t err;
1780
1781 if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1782
1783 /* Reset DB_TRX_ID and DB_ROLL_PTR. Normally, these fields
1784 are only written in conjunction with other changes to the
1785 record. */
1786 ulint trx_id_pos = m_cluster_index->n_uniq
1787 ? m_cluster_index->n_uniq : 1;
1788 if (m_page_zip_ptr) {
1789 page_zip_write_trx_id_and_roll_ptr(
1790 m_page_zip_ptr, rec, m_offsets, trx_id_pos,
1791 0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1792 NULL);
1793 } else {
1794 ulint len;
1795 byte* ptr = rec_get_nth_field(
1796 rec, m_offsets, trx_id_pos, &len);
1797 ut_ad(len == DATA_TRX_ID_LEN);
1798 memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1799 }
1800 }
1801
1802 return(err);
1803 }
1804
1805 /** Update the BLOB refrences and write UNDO log entries for
1806 rows that can't be purged optimistically.
1807 @param block block to update
1808 @retval DB_SUCCESS or error code */
1809 inline
1810 dberr_t
update_records(buf_block_t * block)1811 PageConverter::update_records(
1812 buf_block_t* block) UNIV_NOTHROW
1813 {
1814 ibool comp = dict_table_is_comp(m_cfg->m_table);
1815 bool clust_index = m_index->m_srv_index == m_cluster_index;
1816
1817 /* This will also position the cursor on the first user record. */
1818
1819 m_rec_iter.open(block);
1820
1821 while (!m_rec_iter.end()) {
1822 rec_t* rec = m_rec_iter.current();
1823 ibool deleted = rec_get_deleted_flag(rec, comp);
1824
1825 /* For the clustered index we have to adjust the BLOB
1826 reference and the system fields irrespective of the
1827 delete marked flag. The adjustment of delete marked
1828 cluster records is required for purge to work later. */
1829
1830 if (deleted || clust_index) {
1831 m_offsets = rec_get_offsets(
1832 rec, m_index->m_srv_index, m_offsets,
1833 m_index->m_srv_index->n_core_fields,
1834 ULINT_UNDEFINED, &m_heap);
1835 }
1836
1837 if (clust_index) {
1838
1839 dberr_t err = adjust_cluster_record(rec, m_offsets);
1840
1841 if (err != DB_SUCCESS) {
1842 return(err);
1843 }
1844 }
1845
1846 /* If it is a delete marked record then try an
1847 optimistic delete. */
1848
1849 if (deleted) {
1850 /* A successful purge will move the cursor to the
1851 next record. */
1852
1853 if (!purge()) {
1854 m_rec_iter.next();
1855 }
1856
1857 ++m_index->m_stats.m_n_deleted;
1858 } else {
1859 ++m_index->m_stats.m_n_rows;
1860 m_rec_iter.next();
1861 }
1862 }
1863
1864 return(DB_SUCCESS);
1865 }
1866
1867 /** Update the space, index id, trx id.
1868 @return DB_SUCCESS or error code */
1869 inline
1870 dberr_t
update_index_page(buf_block_t * block)1871 PageConverter::update_index_page(
1872 buf_block_t* block) UNIV_NOTHROW
1873 {
1874 index_id_t id;
1875 buf_frame_t* page = block->frame;
1876
1877 if (is_free(block->page.id.page_no())) {
1878 return(DB_SUCCESS);
1879 } else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1880 row_index_t* index = find_index(id);
1881
1882 if (UNIV_UNLIKELY(!index)) {
1883 if (m_cfg->m_missing) {
1884 return DB_SUCCESS;
1885 }
1886
1887 ib::error() << "Page for tablespace " << m_space
1888 << " is index page with id " << id
1889 << " but that index is not found from"
1890 << " configuration file. Current index name "
1891 << m_index->m_name << " and id " << m_index->m_id;
1892 m_index = 0;
1893 return(DB_CORRUPTION);
1894 }
1895
1896 /* Update current index */
1897 m_index = index;
1898 }
1899
1900 /* If the .cfg file is missing and there is an index mismatch
1901 then ignore the error. */
1902 if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1903 return(DB_SUCCESS);
1904 }
1905
1906 if (m_index && block->page.id.page_no() == m_index->m_page_no) {
1907 byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1908 + page;
1909 mach_write_to_4(b, block->page.id.space());
1910
1911 memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1912 + page, b, 4);
1913 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1914 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1915 + PAGE_BTR_SEG_TOP
1916 + FSEG_HDR_SPACE], b, 4);
1917 memcpy(&block->page.zip.data[FIL_PAGE_DATA
1918 + PAGE_BTR_SEG_LEAF
1919 + FSEG_HDR_SPACE], b, 4);
1920 }
1921 }
1922
1923 #ifdef UNIV_ZIP_DEBUG
1924 ut_a(!is_compressed_table()
1925 || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1926 #endif /* UNIV_ZIP_DEBUG */
1927
1928 /* This has to be written to uncompressed index header. Set it to
1929 the current index id. */
1930 btr_page_set_index_id(
1931 page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1932
1933 if (dict_index_is_clust(m_index->m_srv_index)) {
1934 dict_index_t* index = const_cast<dict_index_t*>(
1935 m_index->m_srv_index);
1936 if (block->page.id.page_no() == index->page) {
1937 /* Preserve the PAGE_ROOT_AUTO_INC. */
1938 if (index->table->supports_instant()) {
1939 if (btr_cur_instant_root_init(index, page)) {
1940 return(DB_CORRUPTION);
1941 }
1942
1943 /* Provisionally set all instantly
1944 added columns to be DEFAULT NULL. */
1945 for (unsigned i = index->n_core_fields;
1946 i < index->n_fields; i++) {
1947 dict_col_t* col = index->fields[i].col;
1948 col->def_val.len = UNIV_SQL_NULL;
1949 col->def_val.data = NULL;
1950 }
1951 }
1952 } else {
1953 /* Clear PAGE_MAX_TRX_ID so that it can be
1954 used for other purposes in the future. IMPORT
1955 in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1956 would set the field to the transaction ID even
1957 on clustered index pages. */
1958 page_set_max_trx_id(block, m_page_zip_ptr, 0, NULL);
1959 }
1960 } else {
1961 /* Set PAGE_MAX_TRX_ID on secondary index leaf pages,
1962 and clear it on non-leaf pages. */
1963 page_set_max_trx_id(block, m_page_zip_ptr,
1964 page_is_leaf(page) ? m_trx->id : 0, NULL);
1965 }
1966
1967 if (page_is_empty(page)) {
1968
1969 /* Only a root page can be empty. */
1970 if (page_has_siblings(page)) {
1971 // TODO: We should relax this and skip secondary
1972 // indexes. Mark them as corrupt because they can
1973 // always be rebuilt.
1974 return(DB_CORRUPTION);
1975 }
1976
1977 return(DB_SUCCESS);
1978 }
1979
1980 return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
1981 }
1982
1983 /** Validate the space flags and update tablespace header page.
1984 @param block block read from file, not from the buffer pool.
1985 @retval DB_SUCCESS or error code */
1986 inline
1987 dberr_t
update_header(buf_block_t * block)1988 PageConverter::update_header(
1989 buf_block_t* block) UNIV_NOTHROW
1990 {
1991 /* Check for valid header */
1992 switch (fsp_header_get_space_id(get_frame(block))) {
1993 case 0:
1994 return(DB_CORRUPTION);
1995 case ULINT_UNDEFINED:
1996 ib::warn() << "Space id check in the header failed: ignored";
1997 }
1998
1999 mach_write_to_8(
2000 get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
2001 m_current_lsn);
2002
2003 /* Write back the adjusted flags. */
2004 mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS
2005 + get_frame(block), m_space_flags);
2006
2007 /* Write space_id to the tablespace header, page 0. */
2008 mach_write_to_4(
2009 get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2010 get_space_id());
2011
2012 /* This is on every page in the tablespace. */
2013 mach_write_to_4(
2014 get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2015 get_space_id());
2016
2017 return(DB_SUCCESS);
2018 }
2019
2020 /** Update the page, set the space id, max trx id and index id.
2021 @param block block read from file
2022 @retval DB_SUCCESS or error code */
2023 inline
2024 dberr_t
update_page(buf_block_t * block,ulint & page_type)2025 PageConverter::update_page(
2026 buf_block_t* block,
2027 ulint& page_type) UNIV_NOTHROW
2028 {
2029 dberr_t err = DB_SUCCESS;
2030
2031 ut_ad(!block->page.zip.data == !is_compressed_table());
2032
2033 if (block->page.zip.data) {
2034 m_page_zip_ptr = &block->page.zip;
2035 } else {
2036 ut_ad(!m_page_zip_ptr);
2037 }
2038
2039 switch (page_type = fil_page_get_type(get_frame(block))) {
2040 case FIL_PAGE_TYPE_FSP_HDR:
2041 ut_a(block->page.id.page_no() == 0);
2042 /* Work directly on the uncompressed page headers. */
2043 return(update_header(block));
2044
2045 case FIL_PAGE_INDEX:
2046 case FIL_PAGE_RTREE:
2047 /* We need to decompress the contents into block->frame
2048 before we can do any thing with Btree pages. */
2049
2050 if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2051 return(DB_CORRUPTION);
2052 }
2053
2054 /* fall through */
2055 case FIL_PAGE_TYPE_INSTANT:
2056 /* This is on every page in the tablespace. */
2057 mach_write_to_4(
2058 get_frame(block)
2059 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2060
2061 /* Only update the Btree nodes. */
2062 return(update_index_page(block));
2063
2064 case FIL_PAGE_TYPE_SYS:
2065 /* This is page 0 in the system tablespace. */
2066 return(DB_CORRUPTION);
2067
2068 case FIL_PAGE_TYPE_XDES:
2069 err = set_current_xdes(
2070 block->page.id.page_no(), get_frame(block));
2071 /* fall through */
2072 case FIL_PAGE_INODE:
2073 case FIL_PAGE_TYPE_TRX_SYS:
2074 case FIL_PAGE_IBUF_FREE_LIST:
2075 case FIL_PAGE_TYPE_ALLOCATED:
2076 case FIL_PAGE_IBUF_BITMAP:
2077 case FIL_PAGE_TYPE_BLOB:
2078 case FIL_PAGE_TYPE_ZBLOB:
2079 case FIL_PAGE_TYPE_ZBLOB2:
2080
2081 /* Work directly on the uncompressed page headers. */
2082 /* This is on every page in the tablespace. */
2083 mach_write_to_4(
2084 get_frame(block)
2085 + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2086
2087 return(err);
2088 }
2089
2090 ib::warn() << "Unknown page type (" << page_type << ")";
2091
2092 return(DB_CORRUPTION);
2093 }
2094
2095 /** Called for every page in the tablespace. If the page was not
2096 updated then its state must be set to BUF_PAGE_NOT_USED.
2097 @param block block read from file, note it is not from the buffer pool
2098 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2099 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2100 {
2101 /* If we already had an old page with matching number
2102 in the buffer pool, evict it now, because
2103 we no longer evict the pages on DISCARD TABLESPACE. */
2104 buf_page_get_gen(block->page.id, get_page_size(),
2105 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2106 __FILE__, __LINE__, NULL, NULL);
2107
2108 ulint page_type;
2109
2110 dberr_t err = update_page(block, page_type);
2111 if (err != DB_SUCCESS) return err;
2112
2113 if (!block->page.zip.data) {
2114 buf_flush_init_for_writing(
2115 NULL, block->frame, NULL, m_current_lsn);
2116 } else if (fil_page_type_is_index(page_type)) {
2117 buf_flush_init_for_writing(
2118 NULL, block->page.zip.data, &block->page.zip,
2119 m_current_lsn);
2120 } else {
2121 /* Calculate and update the checksum of non-index
2122 pages for ROW_FORMAT=COMPRESSED tables. */
2123 buf_flush_update_zip_checksum(
2124 block->page.zip.data, get_page_size().physical(),
2125 m_current_lsn);
2126 }
2127
2128 return DB_SUCCESS;
2129 }
2130
2131 /*****************************************************************//**
2132 Clean up after import tablespace failure, this function will acquire
2133 the dictionary latches on behalf of the transaction if the transaction
2134 hasn't already acquired them. */
2135 static MY_ATTRIBUTE((nonnull))
2136 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2137 row_import_discard_changes(
2138 /*=======================*/
2139 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2140 trx_t* trx, /*!< in/out: transaction for import */
2141 dberr_t err) /*!< in: error code */
2142 {
2143 dict_table_t* table = prebuilt->table;
2144
2145 ut_a(err != DB_SUCCESS);
2146
2147 prebuilt->trx->error_info = NULL;
2148
2149 ib::info() << "Discarding tablespace of table "
2150 << prebuilt->table->name
2151 << ": " << err;
2152
2153 if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2154 ut_a(trx->dict_operation_lock_mode == 0);
2155 row_mysql_lock_data_dictionary(trx);
2156 }
2157
2158 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2159
2160 /* Since we update the index root page numbers on disk after
2161 we've done a successful import. The table will not be loadable.
2162 However, we need to ensure that the in memory root page numbers
2163 are reset to "NULL". */
2164
2165 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2166 index != 0;
2167 index = UT_LIST_GET_NEXT(indexes, index)) {
2168
2169 index->page = FIL_NULL;
2170 }
2171
2172 table->file_unreadable = true;
2173 if (table->space) {
2174 fil_close_tablespace(trx, table->space_id);
2175 table->space = NULL;
2176 }
2177 }
2178
2179 /*****************************************************************//**
2180 Clean up after import tablespace. */
2181 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2182 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2183 row_import_cleanup(
2184 /*===============*/
2185 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2186 trx_t* trx, /*!< in/out: transaction for import */
2187 dberr_t err) /*!< in: error code */
2188 {
2189 ut_a(prebuilt->trx != trx);
2190
2191 if (err != DB_SUCCESS) {
2192 row_import_discard_changes(prebuilt, trx, err);
2193 }
2194
2195 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2196
2197 DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2198
2199 trx_commit_for_mysql(trx);
2200
2201 row_mysql_unlock_data_dictionary(trx);
2202
2203 trx->free();
2204
2205 prebuilt->trx->op_info = "";
2206
2207 DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2208
2209 log_make_checkpoint();
2210
2211 return(err);
2212 }
2213
2214 /*****************************************************************//**
2215 Report error during tablespace import. */
2216 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2217 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2218 row_import_error(
2219 /*=============*/
2220 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */
2221 trx_t* trx, /*!< in/out: transaction for import */
2222 dberr_t err) /*!< in: error code */
2223 {
2224 if (!trx_is_interrupted(trx)) {
2225 char table_name[MAX_FULL_NAME_LEN + 1];
2226
2227 innobase_format_name(
2228 table_name, sizeof(table_name),
2229 prebuilt->table->name.m_name);
2230
2231 ib_senderrf(
2232 trx->mysql_thd, IB_LOG_LEVEL_WARN,
2233 ER_INNODB_IMPORT_ERROR,
2234 table_name, (ulong) err, ut_strerr(err));
2235 }
2236
2237 return(row_import_cleanup(prebuilt, trx, err));
2238 }
2239
2240 /*****************************************************************//**
2241 Adjust the root page index node and leaf node segment headers, update
2242 with the new space id. For all the table's secondary indexes.
2243 @return error code */
2244 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2245 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2246 row_import_adjust_root_pages_of_secondary_indexes(
2247 /*==============================================*/
2248 trx_t* trx, /*!< in: transaction used for
2249 the import */
2250 dict_table_t* table, /*!< in: table the indexes
2251 belong to */
2252 const row_import& cfg) /*!< Import context */
2253 {
2254 dict_index_t* index;
2255 ulint n_rows_in_table;
2256 dberr_t err = DB_SUCCESS;
2257
2258 /* Skip the clustered index. */
2259 index = dict_table_get_first_index(table);
2260
2261 n_rows_in_table = cfg.get_n_rows(index->name);
2262
2263 DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2264 n_rows_in_table++;);
2265
2266 /* Adjust the root pages of the secondary indexes only. */
2267 while ((index = dict_table_get_next_index(index)) != NULL) {
2268 ut_a(!dict_index_is_clust(index));
2269
2270 if (!(index->type & DICT_CORRUPT)
2271 && index->page != FIL_NULL) {
2272
2273 /* Update the Btree segment headers for index node and
2274 leaf nodes in the root page. Set the new space id. */
2275
2276 err = btr_root_adjust_on_import(index);
2277 } else {
2278 ib::warn() << "Skip adjustment of root pages for"
2279 " index " << index->name << ".";
2280
2281 err = DB_CORRUPTION;
2282 }
2283
2284 if (err != DB_SUCCESS) {
2285
2286 if (index->type & DICT_CLUSTERED) {
2287 break;
2288 }
2289
2290 ib_errf(trx->mysql_thd,
2291 IB_LOG_LEVEL_WARN,
2292 ER_INNODB_INDEX_CORRUPT,
2293 "Index %s not found or corrupt,"
2294 " you should recreate this index.",
2295 index->name());
2296
2297 /* Do not bail out, so that the data
2298 can be recovered. */
2299
2300 err = DB_SUCCESS;
2301 index->type |= DICT_CORRUPT;
2302 continue;
2303 }
2304
2305 /* If we failed to purge any records in the index then
2306 do it the hard way.
2307
2308 TODO: We can do this in the first pass by generating UNDO log
2309 records for the failed rows. */
2310
2311 if (!cfg.requires_purge(index->name)) {
2312 continue;
2313 }
2314
2315 IndexPurge purge(trx, index);
2316
2317 trx->op_info = "secondary: purge delete marked records";
2318
2319 err = purge.garbage_collect();
2320
2321 trx->op_info = "";
2322
2323 if (err != DB_SUCCESS) {
2324 break;
2325 } else if (purge.get_n_rows() != n_rows_in_table) {
2326
2327 ib_errf(trx->mysql_thd,
2328 IB_LOG_LEVEL_WARN,
2329 ER_INNODB_INDEX_CORRUPT,
2330 "Index '%s' contains " ULINTPF " entries, "
2331 "should be " ULINTPF ", you should recreate "
2332 "this index.", index->name(),
2333 purge.get_n_rows(), n_rows_in_table);
2334
2335 index->type |= DICT_CORRUPT;
2336
2337 /* Do not bail out, so that the data
2338 can be recovered. */
2339
2340 err = DB_SUCCESS;
2341 }
2342 }
2343
2344 return(err);
2345 }
2346
2347 /*****************************************************************//**
2348 Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
2349 @return error code */
2350 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2351 dberr_t
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2352 row_import_set_sys_max_row_id(
2353 /*==========================*/
2354 row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
2355 handler */
2356 const dict_table_t* table) /*!< in: table to import */
2357 {
2358 dberr_t err;
2359 const rec_t* rec;
2360 mtr_t mtr;
2361 btr_pcur_t pcur;
2362 row_id_t row_id = 0;
2363 dict_index_t* index;
2364
2365 index = dict_table_get_first_index(table);
2366 ut_a(dict_index_is_clust(index));
2367
2368 mtr_start(&mtr);
2369
2370 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2371
2372 btr_pcur_open_at_index_side(
2373 false, // High end
2374 index,
2375 BTR_SEARCH_LEAF,
2376 &pcur,
2377 true, // Init cursor
2378 0, // Leaf level
2379 &mtr);
2380
2381 btr_pcur_move_to_prev_on_page(&pcur);
2382 rec = btr_pcur_get_rec(&pcur);
2383
2384 /* Check for empty table. */
2385 if (page_rec_is_infimum(rec)) {
2386 /* The table is empty. */
2387 err = DB_SUCCESS;
2388 } else if (rec_is_metadata(rec, index)) {
2389 /* The clustered index contains the metadata record only,
2390 that is, the table is empty. */
2391 err = DB_SUCCESS;
2392 } else {
2393 ulint len;
2394 const byte* field;
2395 mem_heap_t* heap = NULL;
2396 rec_offs offsets_[1 + REC_OFFS_HEADER_SIZE];
2397 rec_offs* offsets;
2398
2399 rec_offs_init(offsets_);
2400
2401 offsets = rec_get_offsets(
2402 rec, index, offsets_, index->n_core_fields,
2403 ULINT_UNDEFINED, &heap);
2404
2405 field = rec_get_nth_field(
2406 rec, offsets,
2407 dict_index_get_sys_col_pos(index, DATA_ROW_ID),
2408 &len);
2409
2410 if (len == DATA_ROW_ID_LEN) {
2411 row_id = mach_read_from_6(field);
2412 err = DB_SUCCESS;
2413 } else {
2414 err = DB_CORRUPTION;
2415 }
2416
2417 if (heap != NULL) {
2418 mem_heap_free(heap);
2419 }
2420 }
2421
2422 btr_pcur_close(&pcur);
2423 mtr_commit(&mtr);
2424
2425 DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
2426 err = DB_CORRUPTION;);
2427
2428 if (err != DB_SUCCESS) {
2429 ib_errf(prebuilt->trx->mysql_thd,
2430 IB_LOG_LEVEL_WARN,
2431 ER_INNODB_INDEX_CORRUPT,
2432 "Index `%s` corruption detected, invalid DB_ROW_ID"
2433 " in index.", index->name());
2434
2435 return(err);
2436
2437 } else if (row_id > 0) {
2438
2439 /* Update the system row id if the imported index row id is
2440 greater than the max system row id. */
2441
2442 mutex_enter(&dict_sys->mutex);
2443
2444 if (row_id >= dict_sys->row_id) {
2445 dict_sys->row_id = row_id + 1;
2446 dict_hdr_flush_row_id();
2447 }
2448
2449 mutex_exit(&dict_sys->mutex);
2450 }
2451
2452 return(DB_SUCCESS);
2453 }
2454
2455 /*****************************************************************//**
2456 Read the a string from the meta data file.
2457 @return DB_SUCCESS or error code. */
2458 static
2459 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2460 row_import_cfg_read_string(
2461 /*=======================*/
2462 FILE* file, /*!< in/out: File to read from */
2463 byte* ptr, /*!< out: string to read */
2464 ulint max_len) /*!< in: maximum length of the output
2465 buffer in bytes */
2466 {
2467 DBUG_EXECUTE_IF("ib_import_string_read_error",
2468 errno = EINVAL; return(DB_IO_ERROR););
2469
2470 ulint len = 0;
2471
2472 while (!feof(file)) {
2473 int ch = fgetc(file);
2474
2475 if (ch == EOF) {
2476 break;
2477 } else if (ch != 0) {
2478 if (len < max_len) {
2479 ptr[len++] = ch;
2480 } else {
2481 break;
2482 }
2483 /* max_len includes the NUL byte */
2484 } else if (len != max_len - 1) {
2485 break;
2486 } else {
2487 ptr[len] = 0;
2488 return(DB_SUCCESS);
2489 }
2490 }
2491
2492 errno = EINVAL;
2493
2494 return(DB_IO_ERROR);
2495 }
2496
2497 /*********************************************************************//**
2498 Write the meta data (index user fields) config file.
2499 @return DB_SUCCESS or error code. */
2500 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2501 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2502 row_import_cfg_read_index_fields(
2503 /*=============================*/
2504 FILE* file, /*!< in: file to write to */
2505 THD* thd, /*!< in/out: session */
2506 row_index_t* index) /*!< Index being read in */
2507 {
2508 byte row[sizeof(ib_uint32_t) * 3];
2509 ulint n_fields = index->m_n_fields;
2510
2511 index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2512
2513 /* Trigger OOM */
2514 DBUG_EXECUTE_IF(
2515 "ib_import_OOM_4",
2516 UT_DELETE_ARRAY(index->m_fields);
2517 index->m_fields = NULL;
2518 );
2519
2520 if (index->m_fields == NULL) {
2521 return(DB_OUT_OF_MEMORY);
2522 }
2523
2524 dict_field_t* field = index->m_fields;
2525
2526 for (ulint i = 0; i < n_fields; ++i, ++field) {
2527 byte* ptr = row;
2528
2529 /* Trigger EOF */
2530 DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2531 (void) fseek(file, 0L, SEEK_END););
2532
2533 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2534
2535 ib_senderrf(
2536 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2537 (ulong) errno, strerror(errno),
2538 "while reading index fields.");
2539
2540 return(DB_IO_ERROR);
2541 }
2542
2543 new (field) dict_field_t();
2544
2545 field->prefix_len = mach_read_from_4(ptr);
2546 ptr += sizeof(ib_uint32_t);
2547
2548 field->fixed_len = mach_read_from_4(ptr);
2549 ptr += sizeof(ib_uint32_t);
2550
2551 /* Include the NUL byte in the length. */
2552 ulint len = mach_read_from_4(ptr);
2553
2554 byte* name = UT_NEW_ARRAY_NOKEY(byte, len);
2555
2556 /* Trigger OOM */
2557 DBUG_EXECUTE_IF(
2558 "ib_import_OOM_5",
2559 UT_DELETE_ARRAY(name);
2560 name = NULL;
2561 );
2562
2563 if (name == NULL) {
2564 return(DB_OUT_OF_MEMORY);
2565 }
2566
2567 field->name = reinterpret_cast<const char*>(name);
2568
2569 dberr_t err = row_import_cfg_read_string(file, name, len);
2570
2571 if (err != DB_SUCCESS) {
2572
2573 ib_senderrf(
2574 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2575 (ulong) errno, strerror(errno),
2576 "while parsing table name.");
2577
2578 return(err);
2579 }
2580 }
2581
2582 return(DB_SUCCESS);
2583 }
2584
2585 /*****************************************************************//**
2586 Read the index names and root page numbers of the indexes and set the values.
2587 Row format [root_page_no, len of str, str ... ]
2588 @return DB_SUCCESS or error code. */
2589 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2590 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2591 row_import_read_index_data(
2592 /*=======================*/
2593 FILE* file, /*!< in: File to read from */
2594 THD* thd, /*!< in: session */
2595 row_import* cfg) /*!< in/out: meta-data read */
2596 {
2597 byte* ptr;
2598 row_index_t* cfg_index;
2599 byte row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2600
2601 /* FIXME: What is the max value? */
2602 ut_a(cfg->m_n_indexes > 0);
2603 ut_a(cfg->m_n_indexes < 1024);
2604
2605 cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2606
2607 /* Trigger OOM */
2608 DBUG_EXECUTE_IF(
2609 "ib_import_OOM_6",
2610 UT_DELETE_ARRAY(cfg->m_indexes);
2611 cfg->m_indexes = NULL;
2612 );
2613
2614 if (cfg->m_indexes == NULL) {
2615 return(DB_OUT_OF_MEMORY);
2616 }
2617
2618 memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2619
2620 cfg_index = cfg->m_indexes;
2621
2622 for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2623 /* Trigger EOF */
2624 DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2625 (void) fseek(file, 0L, SEEK_END););
2626
2627 /* Read the index data. */
2628 size_t n_bytes = fread(row, 1, sizeof(row), file);
2629
2630 /* Trigger EOF */
2631 DBUG_EXECUTE_IF("ib_import_io_read_error",
2632 (void) fseek(file, 0L, SEEK_END););
2633
2634 if (n_bytes != sizeof(row)) {
2635 char msg[BUFSIZ];
2636
2637 snprintf(msg, sizeof(msg),
2638 "while reading index meta-data, expected "
2639 "to read " ULINTPF
2640 " bytes but read only " ULINTPF " bytes",
2641 sizeof(row), n_bytes);
2642
2643 ib_senderrf(
2644 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2645 (ulong) errno, strerror(errno), msg);
2646
2647 ib::error() << "IO Error: " << msg;
2648
2649 return(DB_IO_ERROR);
2650 }
2651
2652 ptr = row;
2653
2654 cfg_index->m_id = mach_read_from_8(ptr);
2655 ptr += sizeof(index_id_t);
2656
2657 cfg_index->m_space = mach_read_from_4(ptr);
2658 ptr += sizeof(ib_uint32_t);
2659
2660 cfg_index->m_page_no = mach_read_from_4(ptr);
2661 ptr += sizeof(ib_uint32_t);
2662
2663 cfg_index->m_type = mach_read_from_4(ptr);
2664 ptr += sizeof(ib_uint32_t);
2665
2666 cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2667 if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2668 ut_ad(0);
2669 /* Overflow. Pretend that the clustered index
2670 has a variable-length PRIMARY KEY. */
2671 cfg_index->m_trx_id_offset = 0;
2672 }
2673 ptr += sizeof(ib_uint32_t);
2674
2675 cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2676 ptr += sizeof(ib_uint32_t);
2677
2678 cfg_index->m_n_uniq = mach_read_from_4(ptr);
2679 ptr += sizeof(ib_uint32_t);
2680
2681 cfg_index->m_n_nullable = mach_read_from_4(ptr);
2682 ptr += sizeof(ib_uint32_t);
2683
2684 cfg_index->m_n_fields = mach_read_from_4(ptr);
2685 ptr += sizeof(ib_uint32_t);
2686
2687 /* The NUL byte is included in the name length. */
2688 ulint len = mach_read_from_4(ptr);
2689
2690 if (len > OS_FILE_MAX_PATH) {
2691 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2692 ER_INNODB_INDEX_CORRUPT,
2693 "Index name length (" ULINTPF ") is too long, "
2694 "the meta-data is corrupt", len);
2695
2696 return(DB_CORRUPTION);
2697 }
2698
2699 cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2700
2701 /* Trigger OOM */
2702 DBUG_EXECUTE_IF(
2703 "ib_import_OOM_7",
2704 UT_DELETE_ARRAY(cfg_index->m_name);
2705 cfg_index->m_name = NULL;
2706 );
2707
2708 if (cfg_index->m_name == NULL) {
2709 return(DB_OUT_OF_MEMORY);
2710 }
2711
2712 dberr_t err;
2713
2714 err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2715
2716 if (err != DB_SUCCESS) {
2717
2718 ib_senderrf(
2719 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2720 (ulong) errno, strerror(errno),
2721 "while parsing index name.");
2722
2723 return(err);
2724 }
2725
2726 err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2727
2728 if (err != DB_SUCCESS) {
2729 return(err);
2730 }
2731
2732 }
2733
2734 return(DB_SUCCESS);
2735 }
2736
2737 /*****************************************************************//**
2738 Set the index root page number for v1 format.
2739 @return DB_SUCCESS or error code. */
2740 static
2741 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2742 row_import_read_indexes(
2743 /*====================*/
2744 FILE* file, /*!< in: File to read from */
2745 THD* thd, /*!< in: session */
2746 row_import* cfg) /*!< in/out: meta-data read */
2747 {
2748 byte row[sizeof(ib_uint32_t)];
2749
2750 /* Trigger EOF */
2751 DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2752 (void) fseek(file, 0L, SEEK_END););
2753
2754 /* Read the number of indexes. */
2755 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2756 ib_senderrf(
2757 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2758 (ulong) errno, strerror(errno),
2759 "while reading number of indexes.");
2760
2761 return(DB_IO_ERROR);
2762 }
2763
2764 cfg->m_n_indexes = mach_read_from_4(row);
2765
2766 if (cfg->m_n_indexes == 0) {
2767 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2768 "Number of indexes in meta-data file is 0");
2769
2770 return(DB_CORRUPTION);
2771
2772 } else if (cfg->m_n_indexes > 1024) {
2773 // FIXME: What is the upper limit? */
2774 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2775 "Number of indexes in meta-data file is too high: "
2776 ULINTPF, cfg->m_n_indexes);
2777 cfg->m_n_indexes = 0;
2778
2779 return(DB_CORRUPTION);
2780 }
2781
2782 return(row_import_read_index_data(file, thd, cfg));
2783 }
2784
2785 /*********************************************************************//**
2786 Read the meta data (table columns) config file. Deserialise the contents of
2787 dict_col_t structure, along with the column name. */
2788 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2789 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2790 row_import_read_columns(
2791 /*====================*/
2792 FILE* file, /*!< in: file to write to */
2793 THD* thd, /*!< in/out: session */
2794 row_import* cfg) /*!< in/out: meta-data read */
2795 {
2796 dict_col_t* col;
2797 byte row[sizeof(ib_uint32_t) * 8];
2798
2799 /* FIXME: What should the upper limit be? */
2800 ut_a(cfg->m_n_cols > 0);
2801 ut_a(cfg->m_n_cols < 1024);
2802
2803 cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2804
2805 /* Trigger OOM */
2806 DBUG_EXECUTE_IF(
2807 "ib_import_OOM_8",
2808 UT_DELETE_ARRAY(cfg->m_cols);
2809 cfg->m_cols = NULL;
2810 );
2811
2812 if (cfg->m_cols == NULL) {
2813 return(DB_OUT_OF_MEMORY);
2814 }
2815
2816 cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2817
2818 /* Trigger OOM */
2819 DBUG_EXECUTE_IF(
2820 "ib_import_OOM_9",
2821 UT_DELETE_ARRAY(cfg->m_col_names);
2822 cfg->m_col_names = NULL;
2823 );
2824
2825 if (cfg->m_col_names == NULL) {
2826 return(DB_OUT_OF_MEMORY);
2827 }
2828
2829 memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2830 memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2831
2832 col = cfg->m_cols;
2833
2834 for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2835 byte* ptr = row;
2836
2837 /* Trigger EOF */
2838 DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2839 (void) fseek(file, 0L, SEEK_END););
2840
2841 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2842 ib_senderrf(
2843 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2844 (ulong) errno, strerror(errno),
2845 "while reading table column meta-data.");
2846
2847 return(DB_IO_ERROR);
2848 }
2849
2850 col->prtype = mach_read_from_4(ptr);
2851 ptr += sizeof(ib_uint32_t);
2852
2853 col->mtype = mach_read_from_4(ptr);
2854 ptr += sizeof(ib_uint32_t);
2855
2856 col->len = mach_read_from_4(ptr);
2857 ptr += sizeof(ib_uint32_t);
2858
2859 ulint mbminmaxlen = mach_read_from_4(ptr);
2860 col->mbmaxlen = mbminmaxlen / 5;
2861 col->mbminlen = mbminmaxlen % 5;
2862 ptr += sizeof(ib_uint32_t);
2863
2864 col->ind = mach_read_from_4(ptr);
2865 ptr += sizeof(ib_uint32_t);
2866
2867 col->ord_part = mach_read_from_4(ptr);
2868 ptr += sizeof(ib_uint32_t);
2869
2870 col->max_prefix = mach_read_from_4(ptr);
2871 ptr += sizeof(ib_uint32_t);
2872
2873 /* Read in the column name as [len, byte array]. The len
2874 includes the NUL byte. */
2875
2876 ulint len = mach_read_from_4(ptr);
2877
2878 /* FIXME: What is the maximum column name length? */
2879 if (len == 0 || len > 128) {
2880 ib_errf(thd, IB_LOG_LEVEL_ERROR,
2881 ER_IO_READ_ERROR,
2882 "Column name length " ULINTPF ", is invalid",
2883 len);
2884
2885 return(DB_CORRUPTION);
2886 }
2887
2888 cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2889
2890 /* Trigger OOM */
2891 DBUG_EXECUTE_IF(
2892 "ib_import_OOM_10",
2893 UT_DELETE_ARRAY(cfg->m_col_names[i]);
2894 cfg->m_col_names[i] = NULL;
2895 );
2896
2897 if (cfg->m_col_names[i] == NULL) {
2898 return(DB_OUT_OF_MEMORY);
2899 }
2900
2901 dberr_t err;
2902
2903 err = row_import_cfg_read_string(
2904 file, cfg->m_col_names[i], len);
2905
2906 if (err != DB_SUCCESS) {
2907
2908 ib_senderrf(
2909 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2910 (ulong) errno, strerror(errno),
2911 "while parsing table column name.");
2912
2913 return(err);
2914 }
2915 }
2916
2917 return(DB_SUCCESS);
2918 }
2919
2920 /*****************************************************************//**
2921 Read the contents of the <tablespace>.cfg file.
2922 @return DB_SUCCESS or error code. */
2923 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2924 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2925 row_import_read_v1(
2926 /*===============*/
2927 FILE* file, /*!< in: File to read from */
2928 THD* thd, /*!< in: session */
2929 row_import* cfg) /*!< out: meta data */
2930 {
2931 byte value[sizeof(ib_uint32_t)];
2932
2933 /* Trigger EOF */
2934 DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2935 (void) fseek(file, 0L, SEEK_END););
2936
2937 /* Read the hostname where the tablespace was exported. */
2938 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2939 ib_senderrf(
2940 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2941 (ulong) errno, strerror(errno),
2942 "while reading meta-data export hostname length.");
2943
2944 return(DB_IO_ERROR);
2945 }
2946
2947 ulint len = mach_read_from_4(value);
2948
2949 /* NUL byte is part of name length. */
2950 cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2951
2952 /* Trigger OOM */
2953 DBUG_EXECUTE_IF(
2954 "ib_import_OOM_1",
2955 UT_DELETE_ARRAY(cfg->m_hostname);
2956 cfg->m_hostname = NULL;
2957 );
2958
2959 if (cfg->m_hostname == NULL) {
2960 return(DB_OUT_OF_MEMORY);
2961 }
2962
2963 dberr_t err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2964
2965 if (err != DB_SUCCESS) {
2966
2967 ib_senderrf(
2968 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2969 (ulong) errno, strerror(errno),
2970 "while parsing export hostname.");
2971
2972 return(err);
2973 }
2974
2975 /* Trigger EOF */
2976 DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2977 (void) fseek(file, 0L, SEEK_END););
2978
2979 /* Read the table name of tablespace that was exported. */
2980 if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2981 ib_senderrf(
2982 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2983 (ulong) errno, strerror(errno),
2984 "while reading meta-data table name length.");
2985
2986 return(DB_IO_ERROR);
2987 }
2988
2989 len = mach_read_from_4(value);
2990
2991 /* NUL byte is part of name length. */
2992 cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2993
2994 /* Trigger OOM */
2995 DBUG_EXECUTE_IF(
2996 "ib_import_OOM_2",
2997 UT_DELETE_ARRAY(cfg->m_table_name);
2998 cfg->m_table_name = NULL;
2999 );
3000
3001 if (cfg->m_table_name == NULL) {
3002 return(DB_OUT_OF_MEMORY);
3003 }
3004
3005 err = row_import_cfg_read_string(file, cfg->m_table_name, len);
3006
3007 if (err != DB_SUCCESS) {
3008 ib_senderrf(
3009 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3010 (ulong) errno, strerror(errno),
3011 "while parsing table name.");
3012
3013 return(err);
3014 }
3015
3016 ib::info() << "Importing tablespace for table '" << cfg->m_table_name
3017 << "' that was exported from host '" << cfg->m_hostname << "'";
3018
3019 byte row[sizeof(ib_uint32_t) * 3];
3020
3021 /* Trigger EOF */
3022 DBUG_EXECUTE_IF("ib_import_io_read_error_7",
3023 (void) fseek(file, 0L, SEEK_END););
3024
3025 /* Read the autoinc value. */
3026 if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
3027 ib_senderrf(
3028 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3029 (ulong) errno, strerror(errno),
3030 "while reading autoinc value.");
3031
3032 return(DB_IO_ERROR);
3033 }
3034
3035 cfg->m_autoinc = mach_read_from_8(row);
3036
3037 /* Trigger EOF */
3038 DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3039 (void) fseek(file, 0L, SEEK_END););
3040
3041 /* Read the tablespace page size. */
3042 if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3043 ib_senderrf(
3044 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3045 (ulong) errno, strerror(errno),
3046 "while reading meta-data header.");
3047
3048 return(DB_IO_ERROR);
3049 }
3050
3051 byte* ptr = row;
3052
3053 const ulint logical_page_size = mach_read_from_4(ptr);
3054 ptr += sizeof(ib_uint32_t);
3055
3056 if (logical_page_size != srv_page_size) {
3057
3058 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3059 "Tablespace to be imported has a different"
3060 " page size than this server. Server page size"
3061 " is %lu, whereas tablespace page size"
3062 " is " ULINTPF,
3063 srv_page_size,
3064 logical_page_size);
3065
3066 return(DB_ERROR);
3067 }
3068
3069 cfg->m_flags = mach_read_from_4(ptr);
3070 ptr += sizeof(ib_uint32_t);
3071
3072 cfg->m_page_size.copy_from(dict_tf_get_page_size(cfg->m_flags));
3073
3074 ut_a(logical_page_size == cfg->m_page_size.logical());
3075
3076 cfg->m_n_cols = mach_read_from_4(ptr);
3077
3078 if (!dict_tf_is_valid(cfg->m_flags)) {
3079 ib_errf(thd, IB_LOG_LEVEL_ERROR,
3080 ER_TABLE_SCHEMA_MISMATCH,
3081 "Invalid table flags: " ULINTPF, cfg->m_flags);
3082
3083 return(DB_CORRUPTION);
3084 }
3085
3086 err = row_import_read_columns(file, thd, cfg);
3087
3088 if (err == DB_SUCCESS) {
3089 err = row_import_read_indexes(file, thd, cfg);
3090 }
3091
3092 return(err);
3093 }
3094
3095 /**
3096 Read the contents of the <tablespace>.cfg file.
3097 @return DB_SUCCESS or error code. */
3098 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3099 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3100 row_import_read_meta_data(
3101 /*======================*/
3102 FILE* file, /*!< in: File to read from */
3103 THD* thd, /*!< in: session */
3104 row_import& cfg) /*!< out: contents of the .cfg file */
3105 {
3106 byte row[sizeof(ib_uint32_t)];
3107
3108 /* Trigger EOF */
3109 DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3110 (void) fseek(file, 0L, SEEK_END););
3111
3112 if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3113 ib_senderrf(
3114 thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3115 (ulong) errno, strerror(errno),
3116 "while reading meta-data version.");
3117
3118 return(DB_IO_ERROR);
3119 }
3120
3121 cfg.m_version = mach_read_from_4(row);
3122
3123 /* Check the version number. */
3124 switch (cfg.m_version) {
3125 case IB_EXPORT_CFG_VERSION_V1:
3126
3127 return(row_import_read_v1(file, thd, &cfg));
3128 default:
3129 ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3130 "Unsupported meta-data version number (" ULINTPF "), "
3131 "file ignored", cfg.m_version);
3132 }
3133
3134 return(DB_ERROR);
3135 }
3136
3137 /**
3138 Read the contents of the <tablename>.cfg file.
3139 @return DB_SUCCESS or error code. */
3140 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3141 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3142 row_import_read_cfg(
3143 /*================*/
3144 dict_table_t* table, /*!< in: table */
3145 THD* thd, /*!< in: session */
3146 row_import& cfg) /*!< out: contents of the .cfg file */
3147 {
3148 dberr_t err;
3149 char name[OS_FILE_MAX_PATH];
3150
3151 cfg.m_table = table;
3152
3153 srv_get_meta_data_filename(table, name, sizeof(name));
3154
3155 FILE* file = fopen(name, "rb");
3156
3157 if (file == NULL) {
3158 char msg[BUFSIZ];
3159
3160 snprintf(msg, sizeof(msg),
3161 "Error opening '%s', will attempt to import"
3162 " without schema verification", name);
3163
3164 ib_senderrf(
3165 thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3166 (ulong) errno, strerror(errno), msg);
3167
3168 cfg.m_missing = true;
3169
3170 err = DB_FAIL;
3171 } else {
3172
3173 cfg.m_missing = false;
3174
3175 err = row_import_read_meta_data(file, thd, cfg);
3176 fclose(file);
3177 }
3178
3179 return(err);
3180 }
3181
3182 /** Update the root page numbers and tablespace ID of a table.
3183 @param[in,out] trx dictionary transaction
3184 @param[in,out] table persistent table
3185 @param[in] reset whether to reset the fields to FIL_NULL
3186 @return DB_SUCCESS or error code */
3187 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3188 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3189 {
3190 const dict_index_t* index;
3191 que_t* graph = 0;
3192 dberr_t err = DB_SUCCESS;
3193
3194 ut_ad(reset || table->space->id == table->space_id);
3195
3196 static const char sql[] = {
3197 "PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3198 "BEGIN\n"
3199 "UPDATE SYS_INDEXES\n"
3200 "SET SPACE = :space,\n"
3201 " PAGE_NO = :page,\n"
3202 " TYPE = :type\n"
3203 "WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3204 "END;\n"};
3205
3206 table->def_trx_id = trx->id;
3207
3208 for (index = dict_table_get_first_index(table);
3209 index != 0;
3210 index = dict_table_get_next_index(index)) {
3211
3212 pars_info_t* info;
3213 ib_uint32_t page;
3214 ib_uint32_t space;
3215 ib_uint32_t type;
3216 index_id_t index_id;
3217 table_id_t table_id;
3218
3219 info = (graph != 0) ? graph->info : pars_info_create();
3220
3221 mach_write_to_4(
3222 reinterpret_cast<byte*>(&type),
3223 index->type);
3224
3225 mach_write_to_4(
3226 reinterpret_cast<byte*>(&page),
3227 reset ? FIL_NULL : index->page);
3228
3229 mach_write_to_4(
3230 reinterpret_cast<byte*>(&space),
3231 reset ? FIL_NULL : index->table->space_id);
3232
3233 mach_write_to_8(
3234 reinterpret_cast<byte*>(&index_id),
3235 index->id);
3236
3237 mach_write_to_8(
3238 reinterpret_cast<byte*>(&table_id),
3239 table->id);
3240
3241 /* If we set the corrupt bit during the IMPORT phase then
3242 we need to update the system tables. */
3243 pars_info_bind_int4_literal(info, "type", &type);
3244 pars_info_bind_int4_literal(info, "space", &space);
3245 pars_info_bind_int4_literal(info, "page", &page);
3246 pars_info_bind_ull_literal(info, "index_id", &index_id);
3247 pars_info_bind_ull_literal(info, "table_id", &table_id);
3248
3249 if (graph == 0) {
3250 graph = pars_sql(info, sql);
3251 ut_a(graph);
3252 graph->trx = trx;
3253 }
3254
3255 que_thr_t* thr;
3256
3257 graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3258
3259 ut_a(thr = que_fork_start_command(graph));
3260
3261 que_run_threads(thr);
3262
3263 DBUG_EXECUTE_IF("ib_import_internal_error",
3264 trx->error_state = DB_ERROR;);
3265
3266 err = trx->error_state;
3267
3268 if (err != DB_SUCCESS) {
3269 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3270 ER_INTERNAL_ERROR,
3271 "While updating the <space, root page"
3272 " number> of index %s - %s",
3273 index->name(), ut_strerr(err));
3274
3275 break;
3276 }
3277 }
3278
3279 que_graph_free(graph);
3280
3281 return(err);
3282 }
3283
3284 /** Callback arg for row_import_set_discarded. */
3285 struct discard_t {
3286 ib_uint32_t flags2; /*!< Value read from column */
3287 bool state; /*!< New state of the flag */
3288 ulint n_recs; /*!< Number of recs processed */
3289 };
3290
3291 /******************************************************************//**
3292 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3293 SYS_TABLES. The flags is stored in MIX_LEN column.
3294 @return FALSE if all OK */
3295 static
3296 ibool
row_import_set_discarded(void * row,void * user_arg)3297 row_import_set_discarded(
3298 /*=====================*/
3299 void* row, /*!< in: sel_node_t* */
3300 void* user_arg) /*!< in: bool set/unset flag */
3301 {
3302 sel_node_t* node = static_cast<sel_node_t*>(row);
3303 discard_t* discard = static_cast<discard_t*>(user_arg);
3304 dfield_t* dfield = que_node_get_val(node->select_list);
3305 dtype_t* type = dfield_get_type(dfield);
3306 ulint len = dfield_get_len(dfield);
3307
3308 ut_a(dtype_get_mtype(type) == DATA_INT);
3309 ut_a(len == sizeof(ib_uint32_t));
3310
3311 ulint flags2 = mach_read_from_4(
3312 static_cast<byte*>(dfield_get_data(dfield)));
3313
3314 if (discard->state) {
3315 flags2 |= DICT_TF2_DISCARDED;
3316 } else {
3317 flags2 &= ~DICT_TF2_DISCARDED;
3318 }
3319
3320 mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3321
3322 ++discard->n_recs;
3323
3324 /* There should be at most one matching record. */
3325 ut_a(discard->n_recs == 1);
3326
3327 return(FALSE);
3328 }
3329
3330 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3331 @param[in,out] trx dictionary transaction
3332 @param[in] table_id table identifier
3333 @param[in] discarded whether to set or clear the flag
3334 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3335 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3336 bool discarded)
3337 {
3338 pars_info_t* info;
3339 discard_t discard;
3340
3341 static const char sql[] =
3342 "PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3343 "DECLARE FUNCTION my_func;\n"
3344 "DECLARE CURSOR c IS\n"
3345 " SELECT MIX_LEN"
3346 " FROM SYS_TABLES"
3347 " WHERE ID = :table_id FOR UPDATE;"
3348 "\n"
3349 "BEGIN\n"
3350 "OPEN c;\n"
3351 "WHILE 1 = 1 LOOP\n"
3352 " FETCH c INTO my_func();\n"
3353 " IF c % NOTFOUND THEN\n"
3354 " EXIT;\n"
3355 " END IF;\n"
3356 "END LOOP;\n"
3357 "UPDATE SYS_TABLES"
3358 " SET MIX_LEN = :flags2"
3359 " WHERE ID = :table_id;\n"
3360 "CLOSE c;\n"
3361 "END;\n";
3362
3363 discard.n_recs = 0;
3364 discard.state = discarded;
3365 discard.flags2 = ULINT32_UNDEFINED;
3366
3367 info = pars_info_create();
3368
3369 pars_info_add_ull_literal(info, "table_id", table_id);
3370 pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3371
3372 pars_info_bind_function(
3373 info, "my_func", row_import_set_discarded, &discard);
3374
3375 dberr_t err = que_eval_sql(info, sql, false, trx);
3376
3377 ut_a(discard.n_recs == 1);
3378 ut_a(discard.flags2 != ULINT32_UNDEFINED);
3379
3380 return(err);
3381 }
3382
3383 /** InnoDB writes page by page when there is page compressed
3384 tablespace involved. It does help to save the disk space when
3385 punch hole is enabled
3386 @param iter Tablespace iterator
3387 @param write_request Request to write into the file
3388 @param offset offset of the file to be written
3389 @param writeptr buffer to be written
3390 @param n_bytes number of bytes to be written
3391 @param try_punch_only Try the range punch only because the
3392 current range is full of empty pages
3393 @return DB_SUCCESS */
3394 static
fil_import_compress_fwrite(const fil_iterator_t & iter,const IORequest & write_request,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3395 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3396 const IORequest &write_request,
3397 os_offset_t offset,
3398 const byte *writeptr,
3399 ulint n_bytes,
3400 bool try_punch_only=false)
3401 {
3402 dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes);
3403 if (err != DB_SUCCESS || try_punch_only)
3404 return err;
3405
3406 for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3407 {
3408 /* Read the original data length from block and
3409 safer to read FIL_PAGE_COMPRESSED_SIZE because it
3410 is not encrypted*/
3411 ulint n_write_bytes= srv_page_size;
3412 if (j || offset)
3413 {
3414 n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3415 const unsigned ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3416 /* Ignore the empty page */
3417 if (ptype == 0 && n_write_bytes == 0)
3418 continue;
3419 n_write_bytes+= FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
3420 if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
3421 n_write_bytes+= FIL_PAGE_COMPRESSION_METHOD_SIZE;
3422 }
3423
3424 err= os_file_write(write_request, iter.filepath, iter.file,
3425 writeptr + j, offset + j, n_write_bytes);
3426 if (err != DB_SUCCESS)
3427 break;
3428 }
3429
3430 return err;
3431 }
3432
run(const fil_iterator_t & iter,buf_block_t * block)3433 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3434 buf_block_t* block) UNIV_NOTHROW
3435 {
3436 const ulint size= get_page_size().physical();
3437 const ulint buf_size = srv_page_size
3438 #ifdef HAVE_LZO
3439 + LZO1X_1_15_MEM_COMPRESS
3440 #elif defined HAVE_SNAPPY
3441 + snappy_max_compressed_length(srv_page_size)
3442 #endif
3443 ;
3444 byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
3445 ut_ad(!srv_read_only_mode);
3446
3447 if (!page_compress_buf)
3448 return DB_OUT_OF_MEMORY;
3449
3450 const bool encrypted= iter.crypt_data != NULL &&
3451 iter.crypt_data->should_encrypt();
3452 byte* const readptr= iter.io_buffer;
3453 block->frame= readptr;
3454
3455 if (block->page.zip.data)
3456 block->page.zip.data= readptr;
3457
3458 IORequest read_request(IORequest::READ);
3459 read_request.disable_partial_io_warnings();
3460 ulint page_no= 0;
3461 bool page_compressed= false;
3462
3463 dberr_t err= os_file_read_no_error_handling(
3464 read_request, iter.file, readptr, 3 * size, size, 0);
3465 if (err != DB_SUCCESS)
3466 {
3467 ib::error() << iter.filepath << ": os_file_read() failed";
3468 goto func_exit;
3469 }
3470
3471 block->page.id.set_page_no(3);
3472 page_no= page_get_page_no(readptr);
3473
3474 if (page_no != 3)
3475 {
3476 page_corrupted:
3477 ib::warn() << filename() << ": Page 3 at offset "
3478 << 3 * size << " looks corrupted.";
3479 err= DB_CORRUPTION;
3480 goto func_exit;
3481 }
3482
3483 page_compressed= fil_page_is_compressed_encrypted(readptr) ||
3484 fil_page_is_compressed(readptr);
3485
3486 if (page_compressed && block->page.zip.data)
3487 goto page_corrupted;
3488
3489 if (encrypted)
3490 {
3491 if (!fil_space_verify_crypt_checksum(readptr, get_page_size()))
3492 goto page_corrupted;
3493
3494 if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3495 mach_read_from_4(readptr + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
3496 goto page_corrupted;
3497
3498 if ((err = fil_space_decrypt(iter.crypt_data, readptr,
3499 get_page_size(), readptr)))
3500 goto func_exit;
3501 }
3502
3503 if (page_compressed)
3504 {
3505 ulint compress_length = fil_page_decompress(page_compress_buf, readptr);
3506 ut_ad(compress_length != srv_page_size);
3507 if (compress_length == 0)
3508 goto page_corrupted;
3509 }
3510 else if (buf_page_is_corrupted(
3511 false, readptr, get_page_size(), NULL))
3512 goto page_corrupted;
3513
3514 err = this->operator()(block);
3515 func_exit:
3516 free(page_compress_buf);
3517 return err;
3518 }
3519
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3520 static dberr_t fil_iterate(
3521 const fil_iterator_t& iter,
3522 buf_block_t* block,
3523 AbstractCallback& callback)
3524 {
3525 os_offset_t offset;
3526 const ulint size = callback.get_page_size().physical();
3527 ulint n_bytes = iter.n_io_buffers * size;
3528
3529 const ulint buf_size = srv_page_size
3530 #ifdef HAVE_LZO
3531 + LZO1X_1_15_MEM_COMPRESS
3532 #elif defined HAVE_SNAPPY
3533 + snappy_max_compressed_length(srv_page_size)
3534 #endif
3535 ;
3536 byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
3537 ut_ad(!srv_read_only_mode);
3538
3539 if (!page_compress_buf) {
3540 return DB_OUT_OF_MEMORY;
3541 }
3542
3543 /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3544 copying for non-index pages. Unfortunately, it is
3545 required by buf_zip_decompress() */
3546 dberr_t err = DB_SUCCESS;
3547 bool page_compressed = false;
3548 bool punch_hole = true;
3549 IORequest write_request(IORequest::WRITE);
3550
3551 for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3552 if (callback.is_interrupted()) {
3553 err = DB_INTERRUPTED;
3554 goto func_exit;
3555 }
3556
3557 byte* io_buffer = iter.io_buffer;
3558 block->frame = io_buffer;
3559
3560 if (block->page.zip.data) {
3561 /* Zip IO is done in the compressed page buffer. */
3562 io_buffer = block->page.zip.data;
3563 }
3564
3565 /* We have to read the exact number of bytes. Otherwise the
3566 InnoDB IO functions croak on failed reads. */
3567
3568 n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3569 iter.end - offset));
3570
3571 ut_ad(n_bytes > 0);
3572 ut_ad(!(n_bytes % size));
3573
3574 const bool encrypted = iter.crypt_data != NULL
3575 && iter.crypt_data->should_encrypt();
3576 /* Use additional crypt io buffer if tablespace is encrypted */
3577 byte* const readptr = encrypted
3578 ? iter.crypt_io_buffer : io_buffer;
3579 byte* const writeptr = readptr;
3580
3581 IORequest read_request(IORequest::READ);
3582 read_request.disable_partial_io_warnings();
3583
3584 err = os_file_read_no_error_handling(
3585 read_request, iter.file, readptr, offset, n_bytes, 0);
3586 if (err != DB_SUCCESS) {
3587 ib::error() << iter.filepath
3588 << ": os_file_read() failed";
3589 goto func_exit;
3590 }
3591
3592 bool updated = false;
3593 os_offset_t page_off = offset;
3594 ulint n_pages_read = n_bytes / size;
3595 block->page.id.set_page_no(ulint(page_off / size));
3596
3597 for (ulint i = 0; i < n_pages_read;
3598 block->page.id.set_page_no(block->page.id.page_no() + 1),
3599 ++i, page_off += size, block->frame += size) {
3600 byte* src = readptr + i * size;
3601 const ulint page_no = page_get_page_no(src);
3602 if (!page_no && block->page.id.page_no()) {
3603 const ulint* b = reinterpret_cast<const ulint*>
3604 (src);
3605 const ulint* const e = b + size / sizeof *b;
3606 do {
3607 if (*b++) {
3608 goto page_corrupted;
3609 }
3610 } while (b != e);
3611
3612 /* Proceed to the next page,
3613 because this one is all zero. */
3614 continue;
3615 }
3616
3617 if (page_no != block->page.id.page_no()) {
3618 page_corrupted:
3619 ib::warn() << callback.filename()
3620 << ": Page " << (offset / size)
3621 << " at offset " << offset
3622 << " looks corrupted.";
3623 err = DB_CORRUPTION;
3624 goto func_exit;
3625 }
3626
3627 page_compressed= fil_page_is_compressed_encrypted(src)
3628 || fil_page_is_compressed(src);
3629
3630 if (page_compressed && block->page.zip.data) {
3631 goto page_corrupted;
3632 }
3633
3634 bool decrypted = false;
3635 byte* dst = io_buffer + i * size;
3636 bool frame_changed = false;
3637
3638 if (!encrypted) {
3639 } else if (!mach_read_from_4(
3640 FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
3641 + src)) {
3642 if (block->page.id.page_no() == 0
3643 && block->page.zip.data) {
3644 block->page.zip.data = src;
3645 frame_changed = true;
3646 } else if (!page_compressed
3647 && !block->page.zip.data) {
3648 block->frame = src;
3649 frame_changed = true;
3650 } else {
3651 ut_ad(dst != src);
3652 memcpy(dst, src, size);
3653 }
3654 } else {
3655 if (!fil_space_verify_crypt_checksum(
3656 src, callback.get_page_size())) {
3657 goto page_corrupted;
3658 }
3659
3660 if ((err = fil_space_decrypt(
3661 iter.crypt_data, dst,
3662 callback.get_page_size(), src))) {
3663 goto func_exit;
3664 }
3665
3666 decrypted = true;
3667 updated = true;
3668 }
3669
3670 /* If the original page is page_compressed, we need
3671 to decompress it before adjusting further. */
3672 if (page_compressed) {
3673 ulint compress_length = fil_page_decompress(
3674 page_compress_buf, dst);
3675 ut_ad(compress_length != srv_page_size);
3676 if (compress_length == 0) {
3677 goto page_corrupted;
3678 }
3679 updated = true;
3680 } else if (buf_page_is_corrupted(
3681 false,
3682 encrypted && !frame_changed
3683 ? dst : src,
3684 callback.get_page_size(), NULL)) {
3685 goto page_corrupted;
3686 }
3687
3688 if ((err = callback(block)) != DB_SUCCESS) {
3689 goto func_exit;
3690 } else if (!updated) {
3691 updated = buf_block_get_state(block)
3692 == BUF_BLOCK_FILE_PAGE;
3693 }
3694
3695 /* If tablespace is encrypted we use additional
3696 temporary scratch area where pages are read
3697 for decrypting readptr == crypt_io_buffer != io_buffer.
3698
3699 Destination for decryption is a buffer pool block
3700 block->frame == dst == io_buffer that is updated.
3701 Pages that did not require decryption even when
3702 tablespace is marked as encrypted are not copied
3703 instead block->frame is set to src == readptr.
3704
3705 For encryption we again use temporary scratch area
3706 writeptr != io_buffer == dst
3707 that is then written to the tablespace
3708
3709 (1) For normal tables io_buffer == dst == writeptr
3710 (2) For only page compressed tables
3711 io_buffer == dst == writeptr
3712 (3) For encrypted (and page compressed)
3713 readptr != io_buffer == dst != writeptr
3714 */
3715
3716 ut_ad(!encrypted && !page_compressed ?
3717 src == dst && dst == writeptr + (i * size):1);
3718 ut_ad(page_compressed && !encrypted ?
3719 src == dst && dst == writeptr + (i * size):1);
3720 ut_ad(encrypted ?
3721 src != dst && dst != writeptr + (i * size):1);
3722
3723 /* When tablespace is encrypted or compressed its
3724 first page (i.e. page 0) is not encrypted or
3725 compressed and there is no need to copy frame. */
3726 if (encrypted && block->page.id.page_no() != 0) {
3727 byte *local_frame = callback.get_frame(block);
3728 ut_ad((writeptr + (i * size)) != local_frame);
3729 memcpy((writeptr + (i * size)), local_frame, size);
3730 }
3731
3732 if (frame_changed) {
3733 if (block->page.zip.data) {
3734 block->page.zip.data = dst;
3735 } else {
3736 block->frame = dst;
3737 }
3738 }
3739
3740 src = io_buffer + (i * size);
3741
3742 if (page_compressed) {
3743 updated = true;
3744 if (ulint len = fil_page_compress(
3745 src,
3746 page_compress_buf,
3747 0,/* FIXME: compression level */
3748 512,/* FIXME: proper block size */
3749 encrypted)) {
3750 /* FIXME: remove memcpy() */
3751 memcpy(src, page_compress_buf, len);
3752 memset(src + len, 0,
3753 srv_page_size - len);
3754 }
3755 }
3756
3757 /* Encrypt the page if encryption was used. */
3758 if (encrypted && decrypted) {
3759 byte *dest = writeptr + i * size;
3760 byte* tmp = fil_encrypt_buf(
3761 iter.crypt_data,
3762 block->page.id.space(),
3763 block->page.id.page_no(),
3764 mach_read_from_8(src + FIL_PAGE_LSN),
3765 src, callback.get_page_size(), dest);
3766
3767 if (tmp == src) {
3768 /* TODO: remove unnecessary memcpy's */
3769 ut_ad(dest != src);
3770 memcpy(dest, src, size);
3771 }
3772
3773 updated = true;
3774 }
3775 }
3776
3777 if (page_compressed && punch_hole) {
3778 err = fil_import_compress_fwrite(
3779 iter, write_request, offset, writeptr, n_bytes,
3780 !updated);
3781
3782 if (err != DB_SUCCESS) {
3783 punch_hole = false;
3784 if (updated) {
3785 goto normal_write;
3786 }
3787 }
3788 } else if (updated) {
3789 /* A page was updated in the set, write back to disk. */
3790 normal_write:
3791 err = os_file_write(
3792 write_request, iter.filepath, iter.file,
3793 writeptr, offset, n_bytes);
3794
3795 if (err != DB_SUCCESS) {
3796 goto func_exit;
3797 }
3798 }
3799 }
3800
3801 func_exit:
3802 free(page_compress_buf);
3803 return err;
3804 }
3805
3806 /********************************************************************//**
3807 Iterate over all the pages in the tablespace.
3808 @param table - the table definiton in the server
3809 @param n_io_buffers - number of blocks to read and write together
3810 @param callback - functor that will do the page updates
3811 @return DB_SUCCESS or error code */
3812 static
3813 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)3814 fil_tablespace_iterate(
3815 /*===================*/
3816 dict_table_t* table,
3817 ulint n_io_buffers,
3818 AbstractCallback& callback)
3819 {
3820 dberr_t err;
3821 pfs_os_file_t file;
3822 char* filepath;
3823
3824 ut_a(n_io_buffers > 0);
3825 ut_ad(!srv_read_only_mode);
3826
3827 DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
3828 return(DB_CORRUPTION););
3829
3830 /* Make sure the data_dir_path is set. */
3831 dict_get_and_save_data_dir_path(table, false);
3832
3833 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3834 ut_a(table->data_dir_path);
3835
3836 filepath = fil_make_filepath(
3837 table->data_dir_path, table->name.m_name, IBD, true);
3838 } else {
3839 filepath = fil_make_filepath(
3840 NULL, table->name.m_name, IBD, false);
3841 }
3842
3843 if (!filepath) {
3844 return(DB_OUT_OF_MEMORY);
3845 } else {
3846 bool success;
3847
3848 file = os_file_create_simple_no_error_handling(
3849 innodb_data_file_key, filepath,
3850 OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
3851
3852 if (!success) {
3853 /* The following call prints an error message */
3854 os_file_get_last_error(true);
3855 ib::error() << "Trying to import a tablespace,"
3856 " but could not open the tablespace file "
3857 << filepath;
3858 ut_free(filepath);
3859 return DB_TABLESPACE_NOT_FOUND;
3860 } else {
3861 err = DB_SUCCESS;
3862 }
3863 }
3864
3865 callback.set_file(filepath, file);
3866
3867 os_offset_t file_size = os_file_get_size(file);
3868 ut_a(file_size != (os_offset_t) -1);
3869
3870 /* Allocate a page to read in the tablespace header, so that we
3871 can determine the page size and zip_size (if it is compressed).
3872 We allocate an extra page in case it is a compressed table. One
3873 page is to ensure alignement. */
3874
3875 void* page_ptr = ut_malloc_nokey(3U << srv_page_size_shift);
3876 byte* page = static_cast<byte*>(ut_align(page_ptr, srv_page_size));
3877
3878 buf_block_t* block = reinterpret_cast<buf_block_t*>
3879 (ut_zalloc_nokey(sizeof *block));
3880 block->frame = page;
3881 block->page.id = page_id_t(0, 0);
3882 block->page.io_fix = BUF_IO_NONE;
3883 block->page.buf_fix_count = 1;
3884 block->page.state = BUF_BLOCK_FILE_PAGE;
3885
3886 /* Read the first page and determine the page and zip size. */
3887
3888 IORequest request(IORequest::READ);
3889 request.disable_partial_io_warnings();
3890
3891 err = os_file_read_no_error_handling(request, file, page, 0,
3892 srv_page_size, 0);
3893
3894 if (err == DB_SUCCESS) {
3895 err = callback.init(file_size, block);
3896 }
3897
3898 if (err == DB_SUCCESS) {
3899 block->page.id = page_id_t(callback.get_space_id(), 0);
3900 block->page.size.copy_from(callback.get_page_size());
3901 if (block->page.size.is_compressed()) {
3902 page_zip_set_size(&block->page.zip,
3903 callback.get_page_size().physical());
3904 /* ROW_FORMAT=COMPRESSED is not optimised for block IO
3905 for now. We do the IMPORT page by page. */
3906 n_io_buffers = 1;
3907 }
3908
3909 fil_iterator_t iter;
3910
3911 /* read (optional) crypt data */
3912 iter.crypt_data = fil_space_read_crypt_data(
3913 callback.get_page_size(), page);
3914
3915 /* If tablespace is encrypted, it needs extra buffers */
3916 if (iter.crypt_data && n_io_buffers > 1) {
3917 /* decrease io buffers so that memory
3918 consumption will not double */
3919 n_io_buffers /= 2;
3920 }
3921
3922 iter.file = file;
3923 iter.start = 0;
3924 iter.end = file_size;
3925 iter.filepath = filepath;
3926 iter.file_size = file_size;
3927 iter.n_io_buffers = n_io_buffers;
3928
3929 /* Add an extra page for compressed page scratch area. */
3930 void* io_buffer = ut_malloc_nokey(
3931 (2 + iter.n_io_buffers) << srv_page_size_shift);
3932
3933 iter.io_buffer = static_cast<byte*>(
3934 ut_align(io_buffer, srv_page_size));
3935
3936 void* crypt_io_buffer = NULL;
3937 if (iter.crypt_data) {
3938 crypt_io_buffer = ut_malloc_nokey(
3939 (2 + iter.n_io_buffers)
3940 << srv_page_size_shift);
3941 iter.crypt_io_buffer = static_cast<byte*>(
3942 ut_align(crypt_io_buffer, srv_page_size));
3943 }
3944
3945 if (block->page.zip.ssize) {
3946 ut_ad(iter.n_io_buffers == 1);
3947 block->frame = iter.io_buffer;
3948 block->page.zip.data = block->frame + srv_page_size;
3949 }
3950
3951 err = callback.run(iter, block);
3952
3953 if (iter.crypt_data) {
3954 fil_space_destroy_crypt_data(&iter.crypt_data);
3955 }
3956
3957 ut_free(crypt_io_buffer);
3958 ut_free(io_buffer);
3959 }
3960
3961 if (err == DB_SUCCESS) {
3962 ib::info() << "Sync to disk";
3963
3964 if (!os_file_flush(file)) {
3965 ib::info() << "os_file_flush() failed!";
3966 err = DB_IO_ERROR;
3967 } else {
3968 ib::info() << "Sync to disk - done!";
3969 }
3970 }
3971
3972 os_file_close(file);
3973
3974 ut_free(page_ptr);
3975 ut_free(filepath);
3976 ut_free(block);
3977
3978 return(err);
3979 }
3980
3981 /*****************************************************************//**
3982 Imports a tablespace. The space id in the .ibd file must match the space id
3983 of the table in the data dictionary.
3984 @return error code or DB_SUCCESS */
3985 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)3986 row_import_for_mysql(
3987 /*=================*/
3988 dict_table_t* table, /*!< in/out: table */
3989 row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL */
3990 {
3991 dberr_t err;
3992 trx_t* trx;
3993 ib_uint64_t autoinc = 0;
3994 char* filepath = NULL;
3995 ulint space_flags MY_ATTRIBUTE((unused));
3996
3997 /* The caller assured that this is not read_only_mode and that no
3998 temorary tablespace is being imported. */
3999 ut_ad(!srv_read_only_mode);
4000 ut_ad(!table->is_temporary());
4001
4002 ut_ad(table->space_id);
4003 ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
4004 ut_ad(prebuilt->trx);
4005 ut_ad(!table->is_readable());
4006
4007 ibuf_delete_for_discarded_space(table->space_id);
4008
4009 trx_start_if_not_started(prebuilt->trx, true);
4010
4011 trx = trx_create();
4012
4013 /* So that the table is not DROPped during recovery. */
4014 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4015
4016 trx_start_if_not_started(trx, true);
4017
4018 /* So that we can send error messages to the user. */
4019 trx->mysql_thd = prebuilt->trx->mysql_thd;
4020
4021 /* Ensure that the table will be dropped by trx_rollback_active()
4022 in case of a crash. */
4023
4024 trx->table_id = table->id;
4025
4026 /* Assign an undo segment for the transaction, so that the
4027 transaction will be recovered after a crash. */
4028
4029 /* TODO: Do not write any undo log for the IMPORT cleanup. */
4030 {
4031 mtr_t mtr;
4032 mtr.start();
4033 trx_undo_assign(trx, &err, &mtr);
4034 mtr.commit();
4035 }
4036
4037 DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4038 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4039
4040 if (err != DB_SUCCESS) {
4041
4042 return(row_import_cleanup(prebuilt, trx, err));
4043
4044 } else if (trx->rsegs.m_redo.undo == 0) {
4045
4046 err = DB_TOO_MANY_CONCURRENT_TRXS;
4047 return(row_import_cleanup(prebuilt, trx, err));
4048 }
4049
4050 prebuilt->trx->op_info = "read meta-data file";
4051
4052 /* Prevent DDL operations while we are checking. */
4053
4054 rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
4055
4056 row_import cfg;
4057
4058 err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4059
4060 /* Check if the table column definitions match the contents
4061 of the config file. */
4062
4063 if (err == DB_SUCCESS) {
4064
4065 /* We have a schema file, try and match it with our
4066 data dictionary. */
4067
4068 err = cfg.match_schema(trx->mysql_thd);
4069
4070 /* Update index->page and SYS_INDEXES.PAGE_NO to match the
4071 B-tree root page numbers in the tablespace. Use the index
4072 name from the .cfg file to find match. */
4073
4074 if (err == DB_SUCCESS) {
4075 cfg.set_root_by_name();
4076 autoinc = cfg.m_autoinc;
4077 }
4078
4079 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4080
4081 DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4082 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4083
4084 } else if (cfg.m_missing) {
4085
4086 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4087
4088 /* We don't have a schema file, we will have to discover
4089 the index root pages from the .ibd file and skip the schema
4090 matching step. */
4091
4092 ut_a(err == DB_FAIL);
4093
4094 cfg.m_page_size.copy_from(univ_page_size);
4095
4096 if (UT_LIST_GET_LEN(table->indexes) > 1) {
4097 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4098 ER_INTERNAL_ERROR,
4099 "Drop all secondary indexes before importing "
4100 "table %s when .cfg file is missing.",
4101 table->name.m_name);
4102 err = DB_ERROR;
4103 return row_import_error(prebuilt, trx, err);
4104 }
4105
4106 FetchIndexRootPages fetchIndexRootPages(table, trx);
4107
4108 err = fil_tablespace_iterate(
4109 table, IO_BUFFER_SIZE(cfg.m_page_size.physical()),
4110 fetchIndexRootPages);
4111
4112 if (err == DB_SUCCESS) {
4113
4114 err = fetchIndexRootPages.build_row_import(&cfg);
4115
4116 /* Update index->page and SYS_INDEXES.PAGE_NO
4117 to match the B-tree root page numbers in the
4118 tablespace. */
4119
4120 if (err == DB_SUCCESS) {
4121 err = cfg.set_root_by_heuristic();
4122 }
4123 }
4124
4125 space_flags = fetchIndexRootPages.get_space_flags();
4126
4127 } else {
4128 rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4129 }
4130
4131 if (err != DB_SUCCESS) {
4132 return(row_import_error(prebuilt, trx, err));
4133 }
4134
4135 prebuilt->trx->op_info = "importing tablespace";
4136
4137 ib::info() << "Phase I - Update all pages";
4138
4139 /* Iterate over all the pages and do the sanity checking and
4140 the conversion required to import the tablespace. */
4141
4142 PageConverter converter(&cfg, table->space_id, trx);
4143
4144 /* Set the IO buffer size in pages. */
4145
4146 err = fil_tablespace_iterate(
4147 table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), converter);
4148
4149 DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4150 err = DB_TOO_MANY_CONCURRENT_TRXS;);
4151 #ifdef BTR_CUR_HASH_ADAPT
4152 /* On DISCARD TABLESPACE, we did not drop any adaptive hash
4153 index entries. If we replaced the discarded tablespace with a
4154 smaller one here, there could still be some adaptive hash
4155 index entries that point to cached garbage pages in the buffer
4156 pool, because PageConverter::operator() only evicted those
4157 pages that were replaced by the imported pages. We must
4158 detach any remaining adaptive hash index entries, because the
4159 adaptive hash index must be a subset of the table contents;
4160 false positives are not tolerated. */
4161 for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4162 index = UT_LIST_GET_NEXT(indexes, index)) {
4163 index = index->clone_if_needed();
4164 }
4165 #endif /* BTR_CUR_HASH_ADAPT */
4166
4167 if (err != DB_SUCCESS) {
4168 char table_name[MAX_FULL_NAME_LEN + 1];
4169
4170 innobase_format_name(
4171 table_name, sizeof(table_name),
4172 table->name.m_name);
4173
4174 if (err != DB_DECRYPTION_FAILED) {
4175
4176 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4177 ER_INTERNAL_ERROR,
4178 "Cannot reset LSNs in table %s : %s",
4179 table_name, ut_strerr(err));
4180 }
4181
4182 return(row_import_cleanup(prebuilt, trx, err));
4183 }
4184
4185 row_mysql_lock_data_dictionary(trx);
4186
4187 /* If the table is stored in a remote tablespace, we need to
4188 determine that filepath from the link file and system tables.
4189 Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4190 dict_get_and_save_data_dir_path(table, true);
4191
4192 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4193 ut_a(table->data_dir_path);
4194
4195 filepath = fil_make_filepath(
4196 table->data_dir_path, table->name.m_name, IBD, true);
4197 } else {
4198 filepath = fil_make_filepath(
4199 NULL, table->name.m_name, IBD, false);
4200 }
4201
4202 DBUG_EXECUTE_IF(
4203 "ib_import_OOM_15",
4204 ut_free(filepath);
4205 filepath = NULL;
4206 );
4207
4208 if (filepath == NULL) {
4209 row_mysql_unlock_data_dictionary(trx);
4210 return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4211 }
4212
4213 /* Open the tablespace so that we can access via the buffer pool.
4214 We set the 2nd param (fix_dict = true) here because we already
4215 have an x-lock on dict_operation_lock and dict_sys->mutex.
4216 The tablespace is initially opened as a temporary one, because
4217 we will not be writing any redo log for it before we have invoked
4218 fil_space_t::set_imported() to declare it a persistent tablespace. */
4219
4220 ulint fsp_flags = dict_tf_to_fsp_flags(table->flags);
4221
4222 table->space = fil_ibd_open(
4223 true, true, FIL_TYPE_IMPORT, table->space_id,
4224 fsp_flags, table->name, filepath, &err);
4225
4226 ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4227 DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4228 err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4229
4230 if (!table->space) {
4231 row_mysql_unlock_data_dictionary(trx);
4232
4233 ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4234 ER_GET_ERRMSG,
4235 err, ut_strerr(err), filepath);
4236
4237 ut_free(filepath);
4238
4239 return(row_import_cleanup(prebuilt, trx, err));
4240 }
4241
4242 row_mysql_unlock_data_dictionary(trx);
4243
4244 ut_free(filepath);
4245
4246 err = ibuf_check_bitmap_on_import(trx, table->space);
4247
4248 DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4249
4250 if (err != DB_SUCCESS) {
4251 return(row_import_cleanup(prebuilt, trx, err));
4252 }
4253
4254 /* The first index must always be the clustered index. */
4255
4256 dict_index_t* index = dict_table_get_first_index(table);
4257
4258 if (!dict_index_is_clust(index)) {
4259 return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4260 }
4261
4262 /* Update the Btree segment headers for index node and
4263 leaf nodes in the root page. Set the new space id. */
4264
4265 err = btr_root_adjust_on_import(index);
4266
4267 DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4268 err = DB_CORRUPTION;);
4269
4270 if (err != DB_SUCCESS) {
4271 return(row_import_error(prebuilt, trx, err));
4272 } else if (cfg.requires_purge(index->name)) {
4273
4274 /* Purge any delete-marked records that couldn't be
4275 purged during the page conversion phase from the
4276 cluster index. */
4277
4278 IndexPurge purge(trx, index);
4279
4280 trx->op_info = "cluster: purging delete marked records";
4281
4282 err = purge.garbage_collect();
4283
4284 trx->op_info = "";
4285 }
4286
4287 DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4288
4289 if (err != DB_SUCCESS) {
4290 return(row_import_error(prebuilt, trx, err));
4291 }
4292
4293 /* For secondary indexes, purge any records that couldn't be purged
4294 during the page conversion phase. */
4295
4296 err = row_import_adjust_root_pages_of_secondary_indexes(
4297 trx, table, cfg);
4298
4299 DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4300 err = DB_CORRUPTION;);
4301
4302 if (err != DB_SUCCESS) {
4303 return(row_import_error(prebuilt, trx, err));
4304 }
4305
4306 /* Ensure that the next available DB_ROW_ID is not smaller than
4307 any DB_ROW_ID stored in the table. */
4308
4309 if (prebuilt->clust_index_was_generated) {
4310
4311 err = row_import_set_sys_max_row_id(prebuilt, table);
4312
4313 if (err != DB_SUCCESS) {
4314 return(row_import_error(prebuilt, trx, err));
4315 }
4316 }
4317
4318 ib::info() << "Phase III - Flush changes to disk";
4319
4320 /* Ensure that all pages dirtied during the IMPORT make it to disk.
4321 The only dirty pages generated should be from the pessimistic purge
4322 of delete marked records that couldn't be purged in Phase I. */
4323
4324 {
4325 FlushObserver observer(prebuilt->table->space, trx, NULL);
4326 buf_LRU_flush_or_remove_pages(prebuilt->table->space_id,
4327 &observer);
4328
4329 if (observer.is_interrupted()) {
4330 ib::info() << "Phase III - Flush interrupted";
4331 return(row_import_error(prebuilt, trx,
4332 DB_INTERRUPTED));
4333 }
4334 }
4335
4336 ib::info() << "Phase IV - Flush complete";
4337 prebuilt->table->space->set_imported();
4338
4339 /* The dictionary latches will be released in in row_import_cleanup()
4340 after the transaction commit, for both success and error. */
4341
4342 row_mysql_lock_data_dictionary(trx);
4343
4344 /* Update the root pages of the table's indexes. */
4345 err = row_import_update_index_root(trx, table, false);
4346
4347 if (err != DB_SUCCESS) {
4348 return(row_import_error(prebuilt, trx, err));
4349 }
4350
4351 err = row_import_update_discarded_flag(trx, table->id, false);
4352
4353 if (err != DB_SUCCESS) {
4354 return(row_import_error(prebuilt, trx, err));
4355 }
4356
4357 table->file_unreadable = false;
4358 table->flags2 &= ~DICT_TF2_DISCARDED;
4359
4360 /* Set autoinc value read from .cfg file, if one was specified.
4361 Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4362 if (autoinc) {
4363 ib::info() << table->name << " autoinc value set to "
4364 << autoinc;
4365
4366 table->autoinc = autoinc--;
4367 btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4368 }
4369
4370 return(row_import_cleanup(prebuilt, trx, err));
4371 }
4372