1 /*****************************************************************************
2 
3 Copyright (c) 2012, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0import.cc
29 Import a tablespace to a running instance.
30 
31 Created 2012-02-08 by Sunny Bains.
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "row0import.h"
37 
38 #ifdef UNIV_NONINL
39 #include "row0import.ic"
40 #endif
41 
42 #include "btr0pcur.h"
43 #include "que0que.h"
44 #include "dict0boot.h"
45 #include "ibuf0ibuf.h"
46 #include "pars0pars.h"
47 #include "row0upd.h"
48 #include "row0sel.h"
49 #include "row0mysql.h"
50 #include "srv0start.h"
51 #include "row0quiesce.h"
52 #include "ut0new.h"
53 
54 #include <vector>
55 
56 #include <my_aes.h>
57 
58 /** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
59 reads to fail. If you set the buffer size to be greater than a multiple of the
60 file size then it will assert. TODO: Fix this limitation of the IO functions.
61 @param n page size of the tablespace.
62 @retval number of pages */
63 #define IO_BUFFER_SIZE(m, n)	((m) / (n))
64 
65 /** For gathering stats on records during phase I */
66 struct row_stats_t {
67 	ulint		m_n_deleted;		/*!< Number of deleted records
68 						found in the index */
69 
70 	ulint		m_n_purged;		/*!< Number of records purged
71 						optimisatically */
72 
73 	ulint		m_n_rows;		/*!< Number of rows */
74 
75 	ulint		m_n_purge_failed;	/*!< Number of deleted rows
76 						that could not be purged */
77 };
78 
79 /** Index information required by IMPORT. */
80 struct row_index_t {
81 	index_id_t	m_id;			/*!< Index id of the table
82 						in the exporting server */
83 	byte*		m_name;			/*!< Index name */
84 
85 	ulint		m_space;		/*!< Space where it is placed */
86 
87 	ulint		m_page_no;		/*!< Root page number */
88 
89 	ulint		m_type;			/*!< Index type */
90 
91 	ulint		m_trx_id_offset;	/*!< Relevant only for clustered
92 						indexes, offset of transaction
93 						id system column */
94 
95 	ulint		m_n_user_defined_cols;	/*!< User defined columns */
96 
97 	ulint		m_n_uniq;		/*!< Number of columns that can
98 						uniquely identify the row */
99 
100 	ulint		m_n_nullable;		/*!< Number of nullable
101 						columns */
102 
103 	ulint		m_n_fields;		/*!< Total number of fields */
104 
105 	dict_field_t*	m_fields;		/*!< Index fields */
106 
107 	const dict_index_t*
108 			m_srv_index;		/*!< Index instance in the
109 						importing server */
110 
111 	row_stats_t	m_stats;		/*!< Statistics gathered during
112 						the import phase */
113 
114 };
115 
116 /** Meta data required by IMPORT. */
117 struct row_import {
row_importrow_import118 	row_import() UNIV_NOTHROW
119 		:
120 		m_table(NULL),
121 		m_version(0),
122 		m_hostname(NULL),
123 		m_table_name(NULL),
124 		m_autoinc(0),
125 		m_page_size(0, 0, false),
126 		m_flags(0),
127 		m_n_cols(0),
128 		m_cols(NULL),
129 		m_col_names(NULL),
130 		m_n_indexes(0),
131 		m_indexes(NULL),
132 		m_missing(true),
133 		m_cfp_missing(true)	{ }
134 
135 	~row_import() UNIV_NOTHROW;
136 
137 	/** Find the index entry in in the indexes array.
138 	@param name index name
139 	@return instance if found else 0. */
140 	row_index_t* get_index(const char* name) const UNIV_NOTHROW;
141 
142 	/** Get the number of rows in the index.
143 	@param name index name
144 	@return number of rows (doesn't include delete marked rows). */
145 	ulint	get_n_rows(const char* name) const UNIV_NOTHROW;
146 
147 	/** Find the ordinal value of the column name in the cfg table columns.
148 	@param name of column to look for.
149 	@return ULINT_UNDEFINED if not found. */
150 	ulint find_col(const char* name) const UNIV_NOTHROW;
151 
152 	/** Get the number of rows for which purge failed during the
153 	convert phase.
154 	@param name index name
155 	@return number of rows for which purge failed. */
156 	ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
157 
158 	/** Check if the index is clean. ie. no delete-marked records
159 	@param name index name
160 	@return true if index needs to be purged. */
requires_purgerow_import161 	bool requires_purge(const char* name) const UNIV_NOTHROW
162 	{
163 		return(get_n_purge_failed(name) > 0);
164 	}
165 
166 	/** Set the index root <space, pageno> using the index name */
167 	void set_root_by_name() UNIV_NOTHROW;
168 
169 	/** Set the index root <space, pageno> using a heuristic
170 	@return DB_SUCCESS or error code */
171 	dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172 
173 	/** Check if the index schema that was read from the .cfg file
174 	matches the in memory index definition.
175 	Note: It will update row_import_t::m_srv_index to map the meta-data
176 	read from the .cfg file to the server index instance.
177 	@return DB_SUCCESS or error code. */
178 	dberr_t match_index_columns(
179 		THD*			thd,
180 		const dict_index_t*	index) UNIV_NOTHROW;
181 
182 	/** Check if the table schema that was read from the .cfg file
183 	matches the in memory table definition.
184 	@param thd MySQL session variable
185 	@return DB_SUCCESS or error code. */
186 	dberr_t match_table_columns(
187 		THD*			thd) UNIV_NOTHROW;
188 
189 	/** Check if the table (and index) schema that was read from the
190 	.cfg file matches the in memory table definition.
191 	@param thd MySQL session variable
192 	@return DB_SUCCESS or error code. */
193 	dberr_t match_schema(
194 		THD*			thd) UNIV_NOTHROW;
195 
196 	dict_table_t*	m_table;		/*!< Table instance */
197 
198 	ulint		m_version;		/*!< Version of config file */
199 
200 	byte*		m_hostname;		/*!< Hostname where the
201 						tablespace was exported */
202 	byte*		m_table_name;		/*!< Exporting instance table
203 						name */
204 
205 	ib_uint64_t	m_autoinc;		/*!< Next autoinc value */
206 
207 	page_size_t	m_page_size;		/*!< Tablespace page size */
208 
209 	ulint		m_flags;		/*!< Table flags */
210 
211 	ulint		m_n_cols;		/*!< Number of columns in the
212 						meta-data file */
213 
214 	dict_col_t*	m_cols;			/*!< Column data */
215 
216 	byte**		m_col_names;		/*!< Column names, we store the
217 						column naems separately becuase
218 						there is no field to store the
219 						value in dict_col_t */
220 
221 	ulint		m_n_indexes;		/*!< Number of indexes,
222 						including clustered index */
223 
224 	row_index_t*	m_indexes;		/*!< Index meta data */
225 
226 	bool		m_missing;		/*!< true if a .cfg file was
227 						found and was readable */
228 
229 	bool		m_cfp_missing;		/*!< true if a .cfp file was
230 						found and was readable */
231 };
232 
233 /** Use the page cursor to iterate over records in a block. */
234 class RecIterator {
235 public:
236 	/** Default constructor */
RecIterator()237 	RecIterator() UNIV_NOTHROW
238 	{
239 		memset(&m_cur, 0x0, sizeof(m_cur));
240 	}
241 
242 	/** Position the cursor on the first user record. */
open(buf_block_t * block)243 	void	open(buf_block_t* block) UNIV_NOTHROW
244 	{
245 		page_cur_set_before_first(block, &m_cur);
246 
247 		if (!end()) {
248 			next();
249 		}
250 	}
251 
252 	/** Move to the next record. */
next()253 	void	next() UNIV_NOTHROW
254 	{
255 		page_cur_move_to_next(&m_cur);
256 	}
257 
258 	/**
259 	@return the current record */
current()260 	rec_t*	current() UNIV_NOTHROW
261 	{
262 		ut_ad(!end());
263 		return(page_cur_get_rec(&m_cur));
264 	}
265 
266 	/**
267 	@return true if cursor is at the end */
end()268 	bool	end() UNIV_NOTHROW
269 	{
270 		return(page_cur_is_after_last(&m_cur) == TRUE);
271 	}
272 
273 	/** Remove the current record
274 	@return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,ulint * offsets)275 	bool remove(
276 		const dict_index_t*	index,
277 		page_zip_des_t*		page_zip,
278 		ulint*			offsets) UNIV_NOTHROW
279 	{
280 		/* We can't end up with an empty page unless it is root. */
281 		if (page_get_n_recs(m_cur.block->frame) <= 1) {
282 			return(false);
283 		}
284 
285 		return(page_delete_rec(index, &m_cur, page_zip, offsets));
286 	}
287 
288 private:
289 	page_cur_t	m_cur;
290 };
291 
292 /** Class that purges delete marked reocords from indexes, both secondary
293 and cluster. It does a pessimistic delete. This should only be done if we
294 couldn't purge the delete marked reocrds during Phase I. */
295 class IndexPurge {
296 public:
297 	/** Constructor
298 	@param trx the user transaction covering the import tablespace
299 	@param index to be imported
300 	@param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)301 	IndexPurge(
302 		trx_t*		trx,
303 		dict_index_t*	index) UNIV_NOTHROW
304 		:
305 		m_trx(trx),
306 		m_index(index),
307 		m_n_rows(0)
308 	{
309 		ib::info() << "Phase II - Purge records from index "
310 			<< index->name;
311 	}
312 
313 	/** Descructor */
~IndexPurge()314 	~IndexPurge() UNIV_NOTHROW { }
315 
316 	/** Purge delete marked records.
317 	@return DB_SUCCESS or error code. */
318 	dberr_t	garbage_collect() UNIV_NOTHROW;
319 
320 	/** The number of records that are not delete marked.
321 	@return total records in the index after purge */
get_n_rows() const322 	ulint	get_n_rows() const UNIV_NOTHROW
323 	{
324 		return(m_n_rows);
325 	}
326 
327 private:
328 	/** Begin import, position the cursor on the first record. */
329 	void	open() UNIV_NOTHROW;
330 
331 	/** Close the persistent curosr and commit the mini-transaction. */
332 	void	close() UNIV_NOTHROW;
333 
334 	/** Position the cursor on the next record.
335 	@return DB_SUCCESS or error code */
336 	dberr_t	next() UNIV_NOTHROW;
337 
338 	/** Store the persistent cursor position and reopen the
339 	B-tree cursor in BTR_MODIFY_TREE mode, because the
340 	tree structure may be changed during a pessimistic delete. */
341 	void	purge_pessimistic_delete() UNIV_NOTHROW;
342 
343 	/** Purge delete-marked records.
344 	@param offsets current row offsets. */
345 	void	purge() UNIV_NOTHROW;
346 
347 protected:
348 	// Disable copying
349 	IndexPurge();
350 	IndexPurge(const IndexPurge&);
351 	IndexPurge &operator=(const IndexPurge&);
352 
353 private:
354 	trx_t*			m_trx;		/*!< User transaction */
355 	mtr_t			m_mtr;		/*!< Mini-transaction */
356 	btr_pcur_t		m_pcur;		/*!< Persistent cursor */
357 	dict_index_t*		m_index;	/*!< Index to be processed */
358 	ulint			m_n_rows;	/*!< Records in index */
359 };
360 
361 /** Functor that is called for each physical page that is read from the
362 tablespace file.  */
363 class AbstractCallback : public PageCallback {
364 public:
365 	/** Constructor
366 	@param trx covering transaction */
AbstractCallback(trx_t * trx)367 	AbstractCallback(trx_t* trx)
368 		:
369 		m_trx(trx),
370 		m_space(ULINT_UNDEFINED),
371 		m_xdes(),
372 		m_xdes_page_no(ULINT_UNDEFINED),
373 		m_space_flags(ULINT_UNDEFINED),
374 		m_table_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
375 
376 	/** Free any extent descriptor instance */
~AbstractCallback()377 	virtual ~AbstractCallback()
378 	{
379 		UT_DELETE_ARRAY(m_xdes);
380 	}
381 
382 	/** Determine the page size to use for traversing the tablespace
383 	@param file_size size of the tablespace file in bytes
384 	@param block contents of the first page in the tablespace file.
385 	@retval DB_SUCCESS or error code. */
386 	virtual dberr_t init(
387 		os_offset_t		file_size,
388 		const buf_block_t*	block) UNIV_NOTHROW;
389 
390 	/** @return true if compressed table. */
is_compressed_table() const391 	bool is_compressed_table() const UNIV_NOTHROW
392 	{
393 		return(get_page_size().is_compressed());
394 	}
395 
396 protected:
397 	/** Get the data page depending on the table type, compressed or not.
398 	@param block block read from disk
399 	@retval the buffer frame */
get_frame(buf_block_t * block) const400 	buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
401 	{
402 		if (is_compressed_table()) {
403 			return(block->page.zip.data);
404 		}
405 
406 		return(buf_block_get_frame(block));
407 	}
408 
409 	/** Check for session interrupt. If required we could
410 	even flush to disk here every N pages.
411 	@retval DB_SUCCESS or error code */
periodic_check()412 	dberr_t periodic_check() UNIV_NOTHROW
413 	{
414 		if (trx_is_interrupted(m_trx)) {
415 			return(DB_INTERRUPTED);
416 		}
417 
418 		return(DB_SUCCESS);
419 	}
420 
421 	/** Get the physical offset of the extent descriptor within the page.
422 	@param page_no page number of the extent descriptor
423 	@param page contents of the page containing the extent descriptor.
424 	@return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const425 	const xdes_t* xdes(
426 		ulint		page_no,
427 		const page_t*	page) const UNIV_NOTHROW
428 	{
429 		ulint	offset;
430 
431 		offset = xdes_calc_descriptor_index(get_page_size(), page_no);
432 
433 		return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
434 	}
435 
436 	/** Set the current page directory (xdes). If the extent descriptor is
437 	marked as free then free the current extent descriptor and set it to
438 	0. This implies that all pages that are covered by this extent
439 	descriptor are also freed.
440 
441 	@param page_no offset of page within the file
442 	@param page page contents
443 	@return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)444 	dberr_t	set_current_xdes(
445 		ulint		page_no,
446 		const page_t*	page) UNIV_NOTHROW
447 	{
448 		m_xdes_page_no = page_no;
449 
450 		UT_DELETE_ARRAY(m_xdes);
451 		m_xdes = NULL;
452 
453 		ulint		state;
454 		const xdes_t*	xdesc = page + XDES_ARR_OFFSET;
455 
456 		state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
457 
458 		if (state != XDES_FREE) {
459 
460 			m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t,
461 						    m_page_size.physical());
462 
463 			/* Trigger OOM */
464 			DBUG_EXECUTE_IF(
465 				"ib_import_OOM_13",
466 				UT_DELETE_ARRAY(m_xdes);
467 				m_xdes = NULL;
468 			);
469 
470 			if (m_xdes == NULL) {
471 				return(DB_OUT_OF_MEMORY);
472 			}
473 
474 			memcpy(m_xdes, page, m_page_size.physical());
475 		}
476 
477 		return(DB_SUCCESS);
478 	}
479 
480 	/**
481 	@return true if it is a root page */
is_root_page(const page_t * page) const482 	bool is_root_page(const page_t* page) const UNIV_NOTHROW
483 	{
484 		ut_ad(fil_page_index_page_check(page));
485 
486 		return(mach_read_from_4(page + FIL_PAGE_NEXT) == FIL_NULL
487 		       && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL);
488 	}
489 
490 	/** Check if the page is marked as free in the extent descriptor.
491 	@param page_no page number to check in the extent descriptor.
492 	@return true if the page is marked as free */
is_free(ulint page_no) const493 	bool is_free(ulint page_no) const UNIV_NOTHROW
494 	{
495 		ut_a(xdes_calc_descriptor_page(get_page_size(), page_no)
496 		     == m_xdes_page_no);
497 
498 		if (m_xdes != 0) {
499 			const xdes_t*	xdesc = xdes(page_no, m_xdes);
500 			ulint		pos = page_no % FSP_EXTENT_SIZE;
501 
502 			return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
503 		}
504 
505 		/* If the current xdes was free, the page must be free. */
506 		return(true);
507 	}
508 
509 protected:
510 	/** Covering transaction. */
511 	trx_t*			m_trx;
512 
513 	/** Space id of the file being iterated over. */
514 	ulint			m_space;
515 
516 	/** Minimum page number for which the free list has not been
517 	initialized: the pages >= this limit are, by definition, free;
518 	note that in a single-table tablespace where size < 64 pages,
519 	this number is 64, i.e., we have initialized the space about
520 	the first extent, but have not physically allocted those pages
521 	to the file. @see FSP_LIMIT. */
522 	ulint			m_free_limit;
523 
524 	/** Current size of the space in pages */
525 	ulint			m_size;
526 
527 	/** Current extent descriptor page */
528 	xdes_t*			m_xdes;
529 
530 	/** Physical page offset in the file of the extent descriptor */
531 	ulint			m_xdes_page_no;
532 
533 	/** Flags value read from the header page */
534 	ulint			m_space_flags;
535 
536 	/** Derived from m_space_flags and row format type, the row format
537 	type is determined from the page header. */
538 	ulint			m_table_flags;
539 };
540 
541 /** Determine the page size to use for traversing the tablespace
542 @param file_size size of the tablespace file in bytes
543 @param block contents of the first page in the tablespace file.
544 @retval DB_SUCCESS or error code. */
545 dberr_t
init(os_offset_t file_size,const buf_block_t * block)546 AbstractCallback::init(
547 	os_offset_t		file_size,
548 	const buf_block_t*	block) UNIV_NOTHROW
549 {
550 	const page_t*		page = block->frame;
551 
552 	m_space_flags = fsp_header_get_flags(page);
553 
554 	/* Since we don't know whether it is a compressed table
555 	or not, the data is always read into the block->frame. */
556 
557 	set_page_size(block->frame);
558 
559 	/* Set the page size used to traverse the tablespace. */
560 
561 	if (!is_compressed_table() && !m_page_size.equals_to(univ_page_size)) {
562 
563 		ib::error() << "Page size " << m_page_size.physical()
564 			<< " of ibd file is not the same as the server page"
565 			" size " << univ_page_size.physical();
566 
567 		return(DB_CORRUPTION);
568 
569 	} else if (file_size % m_page_size.physical() != 0) {
570 
571 		ib::error() << "File size " << file_size << " is not a"
572 			" multiple of the page size "
573 			<< m_page_size.physical();
574 
575 		return(DB_CORRUPTION);
576 	}
577 
578 	ut_a(m_space == ULINT_UNDEFINED);
579 
580 	m_size  = mach_read_from_4(page + FSP_SIZE);
581 	m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
582 	m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
583 
584 	dberr_t	err = set_current_xdes(0, page);
585 
586 	return(err);
587 }
588 
589 /**
590 Try and determine the index root pages by checking if the next/prev
591 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
592 struct FetchIndexRootPages : public AbstractCallback {
593 
594 	/** Index information gathered from the .ibd file. */
595 	struct Index {
596 
IndexFetchIndexRootPages::Index597 		Index(index_id_t id, ulint page_no)
598 			:
599 			m_id(id),
600 			m_page_no(page_no) { }
601 
602 		index_id_t	m_id;		/*!< Index id */
603 		ulint		m_page_no;	/*!< Root page number */
604 	};
605 
606 	typedef std::vector<Index, ut_allocator<Index> >	Indexes;
607 
608 	/** Constructor
609 	@param trx covering (user) transaction
610 	@param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages611 	FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
612 		:
613 		AbstractCallback(trx),
614 		m_table(table) UNIV_NOTHROW { }
615 
616 	/** Destructor */
~FetchIndexRootPagesFetchIndexRootPages617 	virtual ~FetchIndexRootPages() UNIV_NOTHROW { }
618 
619 	/**
620 	@retval the space id of the tablespace being iterated over */
get_space_idFetchIndexRootPages621 	virtual ulint get_space_id() const UNIV_NOTHROW
622 	{
623 		return(m_space);
624 	}
625 
626 	/**
627 	@retval the space flags of the tablespace being iterated over */
get_space_flagsFetchIndexRootPages628 	virtual ulint get_space_flags() const UNIV_NOTHROW
629 	{
630 		return(m_space_flags);
631 	}
632 
633 	/** Check if the .ibd file row format is the same as the table's.
634 	@param ibd_table_flags determined from space and page.
635 	@return DB_SUCCESS or error code. */
check_row_formatFetchIndexRootPages636 	dberr_t check_row_format(ulint ibd_table_flags) UNIV_NOTHROW
637 	{
638 		dberr_t		err;
639 		rec_format_t	ibd_rec_format;
640 		rec_format_t	table_rec_format;
641 
642 		if (!dict_tf_is_valid(ibd_table_flags)) {
643 
644 			ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
645 				ER_TABLE_SCHEMA_MISMATCH,
646 				".ibd file has invalid table flags: %lx",
647 				ibd_table_flags);
648 
649 			return(DB_CORRUPTION);
650 		}
651 
652 		ibd_rec_format = dict_tf_get_rec_format(ibd_table_flags);
653 		table_rec_format = dict_tf_get_rec_format(m_table->flags);
654 
655 		if (table_rec_format != ibd_rec_format) {
656 
657 			ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
658 				ER_TABLE_SCHEMA_MISMATCH,
659 				"Table has %s row format, .ibd"
660 				" file has %s row format.",
661 				dict_tf_to_row_format_string(m_table->flags),
662 				dict_tf_to_row_format_string(ibd_table_flags));
663 
664 			err = DB_CORRUPTION;
665 		} else {
666 			err = DB_SUCCESS;
667 		}
668 
669 		return(err);
670 	}
671 
672 	/** Called for each block as it is read from the file.
673 	@param offset physical offset in the file
674 	@param block block to convert, it is not from the buffer pool.
675 	@retval DB_SUCCESS or error code. */
676 	virtual dberr_t operator() (
677 		os_offset_t	offset,
678 		buf_block_t*	block) UNIV_NOTHROW;
679 
680 	/** Update the import configuration that will be used to import
681 	the tablespace. */
682 	dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
683 
684 	/** Table definition in server. */
685 	const dict_table_t*	m_table;
686 
687 	/** Index information */
688 	Indexes			m_indexes;
689 };
690 
691 /** Called for each block as it is read from the file. Check index pages to
692 determine the exact row format. We can't get that from the tablespace
693 header flags alone.
694 
695 @param offset physical offset in the file
696 @param block block to convert, it is not from the buffer pool.
697 @retval DB_SUCCESS or error code. */
698 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)699 FetchIndexRootPages::operator() (
700 	os_offset_t	offset,
701 	buf_block_t*	block) UNIV_NOTHROW
702 {
703 	dberr_t		err;
704 
705 	if ((err = periodic_check()) != DB_SUCCESS) {
706 		return(err);
707 	}
708 
709 	const page_t*	page = get_frame(block);
710 
711 	ulint	page_type = fil_page_get_type(page);
712 
713 	if (block->page.id.page_no() * m_page_size.physical() != offset) {
714 
715 		ib::error() << "Page offset doesn't match file offset:"
716 			" page offset: " << block->page.id.page_no()
717 			<< ", file offset: "
718 			<< (offset / m_page_size.physical());
719 
720 		err = DB_CORRUPTION;
721 	} else if (page_type == FIL_PAGE_TYPE_XDES) {
722 		err = set_current_xdes(block->page.id.page_no(), page);
723 	} else if (fil_page_index_page_check(page)
724 		   && !is_free(block->page.id.page_no())
725 		   && is_root_page(page)) {
726 
727 		index_id_t	id = btr_page_get_index_id(page);
728 
729 		m_indexes.push_back(Index(id, block->page.id.page_no()));
730 
731 		if (m_indexes.size() == 1) {
732 
733 			m_table_flags = fsp_flags_to_dict_tf(
734 				m_space_flags,
735 				page_is_comp(page) ? true : false);
736 
737 			err = check_row_format(m_table_flags);
738 		}
739 	}
740 
741 	return(err);
742 }
743 
744 /**
745 Update the import configuration that will be used to import the tablespace.
746 @return error code or DB_SUCCESS */
747 dberr_t
build_row_import(row_import * cfg) const748 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
749 {
750 	Indexes::const_iterator end = m_indexes.end();
751 
752 	ut_a(cfg->m_table == m_table);
753 	cfg->m_page_size.copy_from(m_page_size);
754 	cfg->m_n_indexes = m_indexes.size();
755 
756 	if (cfg->m_n_indexes == 0) {
757 
758 		ib::error() << "No B+Tree found in tablespace";
759 
760 		return(DB_CORRUPTION);
761 	}
762 
763 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
764 
765 	/* Trigger OOM */
766 	DBUG_EXECUTE_IF(
767 		"ib_import_OOM_11",
768 		UT_DELETE_ARRAY(cfg->m_indexes);
769 		cfg->m_indexes = NULL;
770 	);
771 
772 	if (cfg->m_indexes == NULL) {
773 		return(DB_OUT_OF_MEMORY);
774 	}
775 
776 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
777 
778 	row_index_t*	cfg_index = cfg->m_indexes;
779 
780 	for (Indexes::const_iterator it = m_indexes.begin();
781 	     it != end;
782 	     ++it, ++cfg_index) {
783 
784 		char	name[BUFSIZ];
785 
786 		ut_snprintf(name, sizeof(name), "index" IB_ID_FMT, it->m_id);
787 
788 		ulint	len = strlen(name) + 1;
789 
790 		cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
791 
792 		/* Trigger OOM */
793 		DBUG_EXECUTE_IF(
794 			"ib_import_OOM_12",
795 			UT_DELETE_ARRAY(cfg_index->m_name);
796 			cfg_index->m_name = NULL;
797 		);
798 
799 		if (cfg_index->m_name == NULL) {
800 			return(DB_OUT_OF_MEMORY);
801 		}
802 
803 		memcpy(cfg_index->m_name, name, len);
804 
805 		cfg_index->m_id = it->m_id;
806 
807 		cfg_index->m_space = m_space;
808 
809 		cfg_index->m_page_no = it->m_page_no;
810 	}
811 
812 	return(DB_SUCCESS);
813 }
814 
815 /* Functor that is called for each physical page that is read from the
816 tablespace file.
817 
818   1. Check each page for corruption.
819 
820   2. Update the space id and LSN on every page
821      * For the header page
822        - Validate the flags
823        - Update the LSN
824 
825   3. On Btree pages
826      * Set the index id
827      * Update the max trx id
828      * In a cluster index, update the system columns
829      * In a cluster index, update the BLOB ptr, set the space id
830      * Purge delete marked records, but only if they can be easily
831        removed from the page
832      * Keep a counter of number of rows, ie. non-delete-marked rows
833      * Keep a counter of number of delete marked rows
834      * Keep a counter of number of purge failure
835      * If a page is stamped with an index id that isn't in the .cfg file
836        we assume it is deleted and the page can be ignored.
837 
838    4. Set the page state to dirty so that it will be written to disk.
839 */
840 class PageConverter : public AbstractCallback {
841 public:
842 	/** Constructor
843 	@param cfg config of table being imported.
844 	@param trx transaction covering the import */
845 	PageConverter(row_import* cfg, trx_t* trx) UNIV_NOTHROW;
846 
~PageConverter()847 	virtual ~PageConverter() UNIV_NOTHROW
848 	{
849 		if (m_heap != 0) {
850 			mem_heap_free(m_heap);
851 		}
852 	}
853 
854 	/**
855 	@retval the server space id of the tablespace being iterated over */
get_space_id() const856 	virtual ulint get_space_id() const UNIV_NOTHROW
857 	{
858 		return(m_cfg->m_table->space);
859 	}
860 
861 	/**
862 	@retval the space flags of the tablespace being iterated over */
get_space_flags() const863 	virtual ulint get_space_flags() const UNIV_NOTHROW
864 	{
865 		return(m_space_flags);
866 	}
867 
868 	/** Called for each block as it is read from the file.
869 	@param offset physical offset in the file
870 	@param block block to convert, it is not from the buffer pool.
871 	@retval DB_SUCCESS or error code. */
872 	virtual dberr_t operator() (
873 		os_offset_t	offset,
874 		buf_block_t*	block) UNIV_NOTHROW;
875 private:
876 
877 	/** Status returned by PageConverter::validate() */
878 	enum import_page_status_t {
879 		IMPORT_PAGE_STATUS_OK,		/*!< Page is OK */
880 		IMPORT_PAGE_STATUS_ALL_ZERO,	/*!< Page is all zeros */
881 		IMPORT_PAGE_STATUS_CORRUPTED	/*!< Page is corrupted */
882 	};
883 
884 	/** Update the page, set the space id, max trx id and index id.
885 	@param block block read from file
886 	@param page_type type of the page
887 	@retval DB_SUCCESS or error code */
888 	dberr_t update_page(
889 		buf_block_t*	block,
890 		ulint&		page_type) UNIV_NOTHROW;
891 
892 #ifdef UNIV_DEBUG
893 	/**
894 	@return true error condition is enabled. */
trigger_corruption()895 	bool trigger_corruption() UNIV_NOTHROW
896 	{
897 		return(false);
898 	}
899 	#else
900 #define trigger_corruption()	(false)
901 #endif /* UNIV_DEBUG */
902 
903 	/** Update the space, index id, trx id.
904 	@param block block to convert
905 	@return DB_SUCCESS or error code */
906 	dberr_t	update_index_page(buf_block_t*	block) UNIV_NOTHROW;
907 
908 	/** Update the BLOB refrences and write UNDO log entries for
909 	rows that can't be purged optimistically.
910 	@param block block to update
911 	@retval DB_SUCCESS or error code */
912 	dberr_t	update_records(buf_block_t* block) UNIV_NOTHROW;
913 
914 	/** Validate the page, check for corruption.
915 	@param offset physical offset within file.
916 	@param page page read from file.
917 	@return 0 on success, 1 if all zero, 2 if corrupted */
918 	import_page_status_t validate(
919 		os_offset_t	offset,
920 		buf_block_t*	page) UNIV_NOTHROW;
921 
922 	/** Validate the space flags and update tablespace header page.
923 	@param block block read from file, not from the buffer pool.
924 	@retval DB_SUCCESS or error code */
925 	dberr_t	update_header(buf_block_t* block) UNIV_NOTHROW;
926 
927 	/** Adjust the BLOB reference for a single column that is externally stored
928 	@param rec record to update
929 	@param offsets column offsets for the record
930 	@param i column ordinal value
931 	@return DB_SUCCESS or error code */
932 	dberr_t	adjust_cluster_index_blob_column(
933 		rec_t*		rec,
934 		const ulint*	offsets,
935 		ulint		i) UNIV_NOTHROW;
936 
937 	/** Adjusts the BLOB reference in the clustered index row for all
938 	externally stored columns.
939 	@param rec record to update
940 	@param offsets column offsets for the record
941 	@return DB_SUCCESS or error code */
942 	dberr_t	adjust_cluster_index_blob_columns(
943 		rec_t*		rec,
944 		const ulint*	offsets) UNIV_NOTHROW;
945 
946 	/** In the clustered index, adjist the BLOB pointers as needed.
947 	Also update the BLOB reference, write the new space id.
948 	@param rec record to update
949 	@param offsets column offsets for the record
950 	@return DB_SUCCESS or error code */
951 	dberr_t	adjust_cluster_index_blob_ref(
952 		rec_t*		rec,
953 		const ulint*	offsets) UNIV_NOTHROW;
954 
955 	/** Purge delete-marked records, only if it is possible to do
956 	so without re-organising the B+tree.
957 	@param offsets current row offsets.
958 	@retval true if purged */
959 	bool	purge(const ulint* offsets) UNIV_NOTHROW;
960 
961 	/** Adjust the BLOB references and sys fields for the current record.
962 	@param index the index being converted
963 	@param rec record to update
964 	@param offsets column offsets for the record
965 	@param deleted true if row is delete marked
966 	@return DB_SUCCESS or error code. */
967 	dberr_t	adjust_cluster_record(
968 		const dict_index_t*	index,
969 		rec_t*			rec,
970 		const ulint*		offsets,
971 		bool			deleted) UNIV_NOTHROW;
972 
973 	/** Find an index with the matching id.
974 	@return row_index_t* instance or 0 */
find_index(index_id_t id)975 	row_index_t* find_index(index_id_t id) UNIV_NOTHROW
976 	{
977 		row_index_t*	index = &m_cfg->m_indexes[0];
978 
979 		for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
980 			if (id == index->m_id) {
981 				return(index);
982 			}
983 		}
984 
985 		return(0);
986 
987 	}
988 private:
989 	/** Config for table that is being imported. */
990 	row_import*		m_cfg;
991 
992 	/** Current index whose pages are being imported */
993 	row_index_t*		m_index;
994 
995 	/** Current system LSN */
996 	lsn_t			m_current_lsn;
997 
998 	/** Alias for m_page_zip, only set for compressed pages. */
999 	page_zip_des_t*		m_page_zip_ptr;
1000 
1001 	/** Iterator over records in a block */
1002 	RecIterator		m_rec_iter;
1003 
1004 	/** Record offset */
1005 	ulint			m_offsets_[REC_OFFS_NORMAL_SIZE];
1006 
1007 	/** Pointer to m_offsets_ */
1008 	ulint*			m_offsets;
1009 
1010 	/** Memory heap for the record offsets */
1011 	mem_heap_t*		m_heap;
1012 
1013 	/** Cluster index instance */
1014 	dict_index_t*		m_cluster_index;
1015 };
1016 
1017 /**
1018 row_import destructor. */
~row_import()1019 row_import::~row_import() UNIV_NOTHROW
1020 {
1021 	for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
1022 		UT_DELETE_ARRAY(m_indexes[i].m_name);
1023 
1024 		if (m_indexes[i].m_fields == NULL) {
1025 			continue;
1026 		}
1027 
1028 		dict_field_t*	fields = m_indexes[i].m_fields;
1029 		ulint		n_fields = m_indexes[i].m_n_fields;
1030 
1031 		for (ulint j = 0; j < n_fields; ++j) {
1032 			UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
1033 		}
1034 
1035 		UT_DELETE_ARRAY(fields);
1036 	}
1037 
1038 	for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
1039 		UT_DELETE_ARRAY(m_col_names[i]);
1040 	}
1041 
1042 	UT_DELETE_ARRAY(m_cols);
1043 	UT_DELETE_ARRAY(m_indexes);
1044 	UT_DELETE_ARRAY(m_col_names);
1045 	UT_DELETE_ARRAY(m_table_name);
1046 	UT_DELETE_ARRAY(m_hostname);
1047 }
1048 
1049 /** Find the index entry in in the indexes array.
1050 @param name index name
1051 @return instance if found else 0. */
1052 row_index_t*
get_index(const char * name) const1053 row_import::get_index(
1054 	const char*	name) const UNIV_NOTHROW
1055 {
1056 	for (ulint i = 0; i < m_n_indexes; ++i) {
1057 		const char*	index_name;
1058 		row_index_t*	index = &m_indexes[i];
1059 
1060 		index_name = reinterpret_cast<const char*>(index->m_name);
1061 
1062 		if (strcmp(index_name, name) == 0) {
1063 
1064 			return(index);
1065 		}
1066 	}
1067 
1068 	return(0);
1069 }
1070 
1071 /** Get the number of rows in the index.
1072 @param name index name
1073 @return number of rows (doesn't include delete marked rows). */
1074 ulint
get_n_rows(const char * name) const1075 row_import::get_n_rows(
1076 	const char*	name) const UNIV_NOTHROW
1077 {
1078 	const row_index_t*	index = get_index(name);
1079 
1080 	ut_a(name != 0);
1081 
1082 	return(index->m_stats.m_n_rows);
1083 }
1084 
1085 /** Get the number of rows for which purge failed uding the convert phase.
1086 @param name index name
1087 @return number of rows for which purge failed. */
1088 ulint
get_n_purge_failed(const char * name) const1089 row_import::get_n_purge_failed(
1090 	const char*	name) const UNIV_NOTHROW
1091 {
1092 	const row_index_t*	index = get_index(name);
1093 
1094 	ut_a(name != 0);
1095 
1096 	return(index->m_stats.m_n_purge_failed);
1097 }
1098 
1099 /** Find the ordinal value of the column name in the cfg table columns.
1100 @param name of column to look for.
1101 @return ULINT_UNDEFINED if not found. */
1102 ulint
find_col(const char * name) const1103 row_import::find_col(
1104 	const char*	name) const UNIV_NOTHROW
1105 {
1106 	for (ulint i = 0; i < m_n_cols; ++i) {
1107 		const char*	col_name;
1108 
1109 		col_name = reinterpret_cast<const char*>(m_col_names[i]);
1110 
1111 		if (strcmp(col_name, name) == 0) {
1112 			return(i);
1113 		}
1114 	}
1115 
1116 	return(ULINT_UNDEFINED);
1117 }
1118 
1119 /**
1120 Check if the index schema that was read from the .cfg file matches the
1121 in memory index definition.
1122 @return DB_SUCCESS or error code. */
1123 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1124 row_import::match_index_columns(
1125 	THD*			thd,
1126 	const dict_index_t*	index) UNIV_NOTHROW
1127 {
1128 	row_index_t*		cfg_index;
1129 	dberr_t			err = DB_SUCCESS;
1130 
1131 	cfg_index = get_index(index->name);
1132 
1133 	if (cfg_index == 0) {
1134 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1135 			ER_TABLE_SCHEMA_MISMATCH,
1136 			"Index %s not found in tablespace meta-data file.",
1137 			index->name());
1138 
1139 		return(DB_ERROR);
1140 	}
1141 
1142 	if (cfg_index->m_n_fields != index->n_fields) {
1143 
1144 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1145 			ER_TABLE_SCHEMA_MISMATCH,
1146 			"Index field count %lu doesn't match"
1147 			" tablespace metadata file value %lu",
1148 			(ulong) index->n_fields,
1149 			(ulong) cfg_index->m_n_fields);
1150 
1151 		return(DB_ERROR);
1152 	}
1153 
1154 	cfg_index->m_srv_index = index;
1155 
1156 	const dict_field_t*	field = index->fields;
1157 	const dict_field_t*	cfg_field = cfg_index->m_fields;
1158 
1159 	for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1160 
1161 		if (strcmp(field->name(), cfg_field->name()) != 0) {
1162 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1163 				ER_TABLE_SCHEMA_MISMATCH,
1164 				"Index field name %s doesn't match"
1165 				" tablespace metadata field name %s"
1166 				" for field position %lu",
1167 				field->name(), cfg_field->name(), (ulong) i);
1168 
1169 			err = DB_ERROR;
1170 		}
1171 
1172 		if (cfg_field->prefix_len != field->prefix_len) {
1173 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1174 				ER_TABLE_SCHEMA_MISMATCH,
1175 				"Index %s field %s prefix len %lu"
1176 				" doesn't match metadata file value"
1177 				" %lu",
1178 				index->name(), field->name(),
1179 				(ulong) field->prefix_len,
1180 				(ulong) cfg_field->prefix_len);
1181 
1182 			err = DB_ERROR;
1183 		}
1184 
1185 		if (cfg_field->fixed_len != field->fixed_len) {
1186 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1187 				ER_TABLE_SCHEMA_MISMATCH,
1188 				"Index %s field %s fixed len %lu"
1189 				" doesn't match metadata file value"
1190 				" %lu",
1191 				index->name(), field->name(),
1192 				(ulong) field->fixed_len,
1193 				(ulong) cfg_field->fixed_len);
1194 
1195 			err = DB_ERROR;
1196 		}
1197 	}
1198 
1199 	return(err);
1200 }
1201 
1202 /** Check if the table schema that was read from the .cfg file matches the
1203 in memory table definition.
1204 @param thd MySQL session variable
1205 @return DB_SUCCESS or error code. */
1206 dberr_t
match_table_columns(THD * thd)1207 row_import::match_table_columns(
1208 	THD*			thd) UNIV_NOTHROW
1209 {
1210 	dberr_t			err = DB_SUCCESS;
1211 	const dict_col_t*	col = m_table->cols;
1212 
1213 	for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1214 
1215 		const char*	col_name;
1216 		ulint		cfg_col_index;
1217 
1218 		col_name = dict_table_get_col_name(
1219 			m_table, dict_col_get_no(col));
1220 
1221 		cfg_col_index = find_col(col_name);
1222 
1223 		if (cfg_col_index == ULINT_UNDEFINED) {
1224 
1225 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1226 				 ER_TABLE_SCHEMA_MISMATCH,
1227 				 "Column %s not found in tablespace.",
1228 				 col_name);
1229 
1230 			err = DB_ERROR;
1231 		} else if (cfg_col_index != col->ind) {
1232 
1233 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1234 				 ER_TABLE_SCHEMA_MISMATCH,
1235 				 "Column %s ordinal value mismatch, it's at"
1236 				 " %lu in the table and %lu in the tablespace"
1237 				 " meta-data file",
1238 				 col_name,
1239 				 (ulong) col->ind, (ulong) cfg_col_index);
1240 
1241 			err = DB_ERROR;
1242 		} else {
1243 			const dict_col_t*	cfg_col;
1244 
1245 			cfg_col = &m_cols[cfg_col_index];
1246 			ut_a(cfg_col->ind == cfg_col_index);
1247 
1248 			if (cfg_col->prtype != col->prtype) {
1249 				ib_errf(thd,
1250 					 IB_LOG_LEVEL_ERROR,
1251 					 ER_TABLE_SCHEMA_MISMATCH,
1252 					 "Column %s precise type mismatch.",
1253 					 col_name);
1254 				err = DB_ERROR;
1255 			}
1256 
1257 			if (cfg_col->mtype != col->mtype) {
1258 				ib_errf(thd,
1259 					 IB_LOG_LEVEL_ERROR,
1260 					 ER_TABLE_SCHEMA_MISMATCH,
1261 					 "Column %s main type mismatch.",
1262 					 col_name);
1263 				err = DB_ERROR;
1264 			}
1265 
1266 			if (cfg_col->len != col->len) {
1267 				ib_errf(thd,
1268 					 IB_LOG_LEVEL_ERROR,
1269 					 ER_TABLE_SCHEMA_MISMATCH,
1270 					 "Column %s length mismatch.",
1271 					 col_name);
1272 				err = DB_ERROR;
1273 			}
1274 
1275 			if (cfg_col->mbminmaxlen != col->mbminmaxlen) {
1276 				ib_errf(thd,
1277 					 IB_LOG_LEVEL_ERROR,
1278 					 ER_TABLE_SCHEMA_MISMATCH,
1279 					 "Column %s multi-byte len mismatch.",
1280 					 col_name);
1281 				err = DB_ERROR;
1282 			}
1283 
1284 			if (cfg_col->ind != col->ind) {
1285 				err = DB_ERROR;
1286 			}
1287 
1288 			if (cfg_col->ord_part != col->ord_part) {
1289 				ib_errf(thd,
1290 					 IB_LOG_LEVEL_ERROR,
1291 					 ER_TABLE_SCHEMA_MISMATCH,
1292 					 "Column %s ordering mismatch.",
1293 					 col_name);
1294 				err = DB_ERROR;
1295 			}
1296 
1297 			if (cfg_col->max_prefix != col->max_prefix) {
1298 				ib_errf(thd,
1299 					 IB_LOG_LEVEL_ERROR,
1300 					 ER_TABLE_SCHEMA_MISMATCH,
1301 					 "Column %s max prefix mismatch.",
1302 					 col_name);
1303 				err = DB_ERROR;
1304 			}
1305 		}
1306 	}
1307 
1308 	return(err);
1309 }
1310 
1311 /** Check if the table (and index) schema that was read from the .cfg file
1312 matches the in memory table definition.
1313 @param thd MySQL session variable
1314 @return DB_SUCCESS or error code. */
1315 dberr_t
match_schema(THD * thd)1316 row_import::match_schema(
1317 	THD*		thd) UNIV_NOTHROW
1318 {
1319 	/* Do some simple checks. */
1320 
1321 	if (m_flags != m_table->flags) {
1322 		if (dict_tf_to_row_format_string(m_flags) !=
1323 				dict_tf_to_row_format_string(m_table->flags)) {
1324 			ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1325 				 "Table flags don't match, server table has %s"
1326 				 " and the meta-data file has %s",
1327 				(const char*)dict_tf_to_row_format_string(m_table->flags),
1328 				(const char*)dict_tf_to_row_format_string(m_flags));
1329 		} else if (DICT_TF_HAS_DATA_DIR(m_flags) !=
1330 				DICT_TF_HAS_DATA_DIR(m_table->flags)) {
1331 			/* If the meta-data flag is set for data_dir,
1332 			but table flag is not set for data_dir or vice versa
1333 			then return error. */
1334 			ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1335 				"Table location flags do not match. "
1336 				"The source table %s a DATA DIRECTORY "
1337 				"but the destination table %s.",
1338 				(DICT_TF_HAS_DATA_DIR(m_flags) ? "uses"
1339 				: "does not use"),
1340 				(DICT_TF_HAS_DATA_DIR(m_table->flags) ? "does"
1341 				: "does not"));
1342 		} else {
1343 			ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1344 				"Table flags don't match");
1345 		}
1346 		return(DB_ERROR);
1347 	} else if (m_table->n_cols != m_n_cols) {
1348 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1349 			 "Number of columns don't match, table has %lu"
1350 			 " columns but the tablespace meta-data file has"
1351 			 " %lu columns",
1352 			 (ulong) m_table->n_cols, (ulong) m_n_cols);
1353 
1354 		return(DB_ERROR);
1355 	} else if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1356 
1357 		/* If the number of indexes don't match then it is better
1358 		to abort the IMPORT. It is easy for the user to create a
1359 		table matching the IMPORT definition. */
1360 
1361 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1362 			 "Number of indexes don't match, table has %lu"
1363 			 " indexes but the tablespace meta-data file has"
1364 			 " %lu indexes",
1365 			 (ulong) UT_LIST_GET_LEN(m_table->indexes),
1366 			 (ulong) m_n_indexes);
1367 
1368 		return(DB_ERROR);
1369 	}
1370 
1371 	dberr_t	err = match_table_columns(thd);
1372 
1373 	if (err != DB_SUCCESS) {
1374 		return(err);
1375 	}
1376 
1377 	/* Check if the index definitions match. */
1378 
1379 	const dict_index_t* index;
1380 
1381 	for (index = UT_LIST_GET_FIRST(m_table->indexes);
1382 	     index != 0;
1383 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1384 
1385 		dberr_t	index_err;
1386 
1387 		index_err = match_index_columns(thd, index);
1388 
1389 		if (index_err != DB_SUCCESS) {
1390 			err = index_err;
1391 		}
1392 	}
1393 
1394 	return(err);
1395 }
1396 
1397 /**
1398 Set the index root <space, pageno>, using index name. */
1399 void
set_root_by_name()1400 row_import::set_root_by_name() UNIV_NOTHROW
1401 {
1402 	row_index_t*	cfg_index = m_indexes;
1403 
1404 	for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1405 		dict_index_t*	index;
1406 
1407 		const char*	index_name;
1408 
1409 		index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1410 
1411 		index = dict_table_get_index_on_name(m_table, index_name);
1412 
1413 		/* We've already checked that it exists. */
1414 		ut_a(index != 0);
1415 
1416 		/* Set the root page number and space id. */
1417 		index->space = m_table->space;
1418 		index->page = cfg_index->m_page_no;
1419 	}
1420 }
1421 
1422 /**
1423 Set the index root <space, pageno>, using a heuristic.
1424 @return DB_SUCCESS or error code */
1425 dberr_t
set_root_by_heuristic()1426 row_import::set_root_by_heuristic() UNIV_NOTHROW
1427 {
1428 	row_index_t*	cfg_index = m_indexes;
1429 
1430 	ut_a(m_n_indexes > 0);
1431 
1432 	// TODO: For now use brute force, based on ordinality
1433 
1434 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1435 
1436 		ib::warn() << "Table " << m_table->name << " should have "
1437 			<< UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1438 			" the tablespace has " << m_n_indexes << " indexes";
1439 	}
1440 
1441 	dict_mutex_enter_for_mysql();
1442 
1443 	ulint	i = 0;
1444 	dberr_t	err = DB_SUCCESS;
1445 
1446 	for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1447 	     index != 0;
1448 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1449 
1450 		if (index->type & DICT_FTS) {
1451 			index->type |= DICT_CORRUPT;
1452 			ib::warn() << "Skipping FTS index: " << index->name;
1453 		} else if (i < m_n_indexes) {
1454 
1455 			UT_DELETE_ARRAY(cfg_index[i].m_name);
1456 
1457 			ulint	len = strlen(index->name) + 1;
1458 
1459 			cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1460 
1461 			/* Trigger OOM */
1462 			DBUG_EXECUTE_IF(
1463 				"ib_import_OOM_14",
1464 				UT_DELETE_ARRAY(cfg_index[i].m_name);
1465 				cfg_index[i].m_name = NULL;
1466 			);
1467 
1468 			if (cfg_index[i].m_name == NULL) {
1469 				err = DB_OUT_OF_MEMORY;
1470 				break;
1471 			}
1472 
1473 			memcpy(cfg_index[i].m_name, index->name, len);
1474 
1475 			cfg_index[i].m_srv_index = index;
1476 
1477 			index->space = m_table->space;
1478 			index->page = cfg_index[i].m_page_no;
1479 
1480 			++i;
1481 		}
1482 	}
1483 
1484 	dict_mutex_exit_for_mysql();
1485 
1486 	return(err);
1487 }
1488 
1489 /**
1490 Purge delete marked records.
1491 @return DB_SUCCESS or error code. */
1492 dberr_t
garbage_collect()1493 IndexPurge::garbage_collect() UNIV_NOTHROW
1494 {
1495 	dberr_t	err;
1496 	ibool	comp = dict_table_is_comp(m_index->table);
1497 
1498 	/* Open the persistent cursor and start the mini-transaction. */
1499 
1500 	open();
1501 	import_ctx_t import_ctx = {false};
1502 	m_pcur.import_ctx = &import_ctx;
1503 
1504 	while ((err = next()) == DB_SUCCESS) {
1505 
1506 		rec_t*	rec = btr_pcur_get_rec(&m_pcur);
1507 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1508 
1509 		if (!deleted) {
1510 			++m_n_rows;
1511 		} else {
1512 			purge();
1513 		}
1514 	}
1515 
1516 	/* Close the persistent cursor and commit the mini-transaction. */
1517 
1518 	close();
1519 	if (m_pcur.import_ctx->is_error == true) {
1520 		m_pcur.import_ctx = NULL;
1521 		return DB_TABLE_CORRUPT;
1522 	}
1523 
1524 	m_pcur.import_ctx = NULL;
1525 
1526 	return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1527 }
1528 
1529 /**
1530 Begin import, position the cursor on the first record. */
1531 void
open()1532 IndexPurge::open() UNIV_NOTHROW
1533 {
1534 	mtr_start(&m_mtr);
1535 
1536 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1537 
1538 	btr_pcur_open_at_index_side(
1539 		true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1540 }
1541 
1542 /**
1543 Close the persistent curosr and commit the mini-transaction. */
1544 void
close()1545 IndexPurge::close() UNIV_NOTHROW
1546 {
1547 	btr_pcur_close(&m_pcur);
1548 	mtr_commit(&m_mtr);
1549 }
1550 
1551 /**
1552 Position the cursor on the next record.
1553 @return DB_SUCCESS or error code */
1554 dberr_t
next()1555 IndexPurge::next() UNIV_NOTHROW
1556 {
1557 	btr_pcur_move_to_next_on_page(&m_pcur);
1558 
1559 	/* When switching pages, commit the mini-transaction
1560 	in order to release the latch on the old page. */
1561 
1562 	if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1563 		return(DB_SUCCESS);
1564 	} else if (trx_is_interrupted(m_trx)) {
1565 		/* Check after every page because the check
1566 		is expensive. */
1567 		return(DB_INTERRUPTED);
1568 	}
1569 
1570 	btr_pcur_store_position(&m_pcur, &m_mtr);
1571 
1572 	mtr_commit(&m_mtr);
1573 
1574 	mtr_start(&m_mtr);
1575 
1576 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1577 
1578 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1579 
1580 	if (!btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr)) {
1581 
1582 		return(DB_END_OF_INDEX);
1583 	}
1584 
1585 	return (DB_SUCCESS);
1586 }
1587 
1588 /**
1589 Store the persistent cursor position and reopen the
1590 B-tree cursor in BTR_MODIFY_TREE mode, because the
1591 tree structure may be changed during a pessimistic delete. */
1592 void
purge_pessimistic_delete()1593 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1594 {
1595 	dberr_t	err;
1596 
1597 	btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1598 				  &m_pcur, &m_mtr);
1599 
1600 	ut_ad(rec_get_deleted_flag(
1601 			btr_pcur_get_rec(&m_pcur),
1602 			dict_table_is_comp(m_index->table)));
1603 
1604 	btr_cur_pessimistic_delete(
1605 		&err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1606 
1607 	ut_a(err == DB_SUCCESS);
1608 
1609 	/* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1610 	mtr_commit(&m_mtr);
1611 }
1612 
1613 /**
1614 Purge delete-marked records. */
1615 void
purge()1616 IndexPurge::purge() UNIV_NOTHROW
1617 {
1618 	btr_pcur_store_position(&m_pcur, &m_mtr);
1619 
1620 	purge_pessimistic_delete();
1621 
1622 	mtr_start(&m_mtr);
1623 
1624 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1625 
1626 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1627 }
1628 
1629 /** Constructor
1630 @param cfg config of table being imported.
1631 @param trx transaction covering the import */
PageConverter(row_import * cfg,trx_t * trx)1632 PageConverter::PageConverter(
1633 	row_import*	cfg,
1634 	trx_t*		trx)
1635 	:
1636 	AbstractCallback(trx),
1637 	m_cfg(cfg),
1638 	m_page_zip_ptr(0),
1639 	m_heap(0) UNIV_NOTHROW
1640 {
1641 	m_index = m_cfg->m_indexes;
1642 
1643 	m_current_lsn = log_get_lsn();
1644 	ut_a(m_current_lsn > 0);
1645 
1646 	m_offsets = m_offsets_;
1647 	rec_offs_init(m_offsets_);
1648 
1649 	m_cluster_index = dict_table_get_first_index(m_cfg->m_table);
1650 }
1651 
1652 /** Adjust the BLOB reference for a single column that is externally stored
1653 @param rec record to update
1654 @param offsets column offsets for the record
1655 @param i column ordinal value
1656 @return DB_SUCCESS or error code */
1657 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const ulint * offsets,ulint i)1658 PageConverter::adjust_cluster_index_blob_column(
1659 	rec_t*		rec,
1660 	const ulint*	offsets,
1661 	ulint		i) UNIV_NOTHROW
1662 {
1663 	ulint		len;
1664 	byte*		field;
1665 
1666 	field = rec_get_nth_field(rec, offsets, i, &len);
1667 
1668 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1669 			len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1670 
1671 	if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1672 
1673 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1674 			ER_INNODB_INDEX_CORRUPT,
1675 			"Externally stored column(%lu) has a reference"
1676 			" length of %lu in the cluster index %s",
1677 			(ulong) i, (ulong) len, m_cluster_index->name());
1678 
1679 		return(DB_CORRUPTION);
1680 	}
1681 
1682 	field += BTR_EXTERN_SPACE_ID - BTR_EXTERN_FIELD_REF_SIZE + len;
1683 
1684 	if (is_compressed_table()) {
1685 		mach_write_to_4(field, get_space_id());
1686 
1687 		page_zip_write_blob_ptr(
1688 			m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1689 	} else {
1690 		mlog_write_ulint(field, get_space_id(), MLOG_4BYTES, 0);
1691 	}
1692 
1693 	return(DB_SUCCESS);
1694 }
1695 
1696 /** Adjusts the BLOB reference in the clustered index row for all externally
1697 stored columns.
1698 @param rec record to update
1699 @param offsets column offsets for the record
1700 @return DB_SUCCESS or error code */
1701 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const ulint * offsets)1702 PageConverter::adjust_cluster_index_blob_columns(
1703 	rec_t*		rec,
1704 	const ulint*	offsets) UNIV_NOTHROW
1705 {
1706 	ut_ad(rec_offs_any_extern(offsets));
1707 
1708 	/* Adjust the space_id in the BLOB pointers. */
1709 
1710 	for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1711 
1712 		/* Only if the column is stored "externally". */
1713 
1714 		if (rec_offs_nth_extern(offsets, i)) {
1715 			dberr_t	err;
1716 
1717 			err = adjust_cluster_index_blob_column(rec, offsets, i);
1718 
1719 			if (err != DB_SUCCESS) {
1720 				return(err);
1721 			}
1722 		}
1723 	}
1724 
1725 	return(DB_SUCCESS);
1726 }
1727 
1728 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1729 BLOB reference, write the new space id.
1730 @param rec record to update
1731 @param offsets column offsets for the record
1732 @return DB_SUCCESS or error code */
1733 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const ulint * offsets)1734 PageConverter::adjust_cluster_index_blob_ref(
1735 	rec_t*		rec,
1736 	const ulint*	offsets) UNIV_NOTHROW
1737 {
1738 	if (rec_offs_any_extern(offsets)) {
1739 		dberr_t	err;
1740 
1741 		err = adjust_cluster_index_blob_columns(rec, offsets);
1742 
1743 		if (err != DB_SUCCESS) {
1744 			return(err);
1745 		}
1746 	}
1747 
1748 	return(DB_SUCCESS);
1749 }
1750 
1751 /** Purge delete-marked records, only if it is possible to do so without
1752 re-organising the B+tree.
1753 @param offsets current row offsets.
1754 @return true if purge succeeded */
1755 bool
purge(const ulint * offsets)1756 PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
1757 {
1758 	const dict_index_t*	index = m_index->m_srv_index;
1759 
1760 	/* We can't have a page that is empty and not root. */
1761 	if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1762 
1763 		++m_index->m_stats.m_n_purged;
1764 
1765 		return(true);
1766 	} else {
1767 		++m_index->m_stats.m_n_purge_failed;
1768 	}
1769 
1770 	return(false);
1771 }
1772 
1773 /** Adjust the BLOB references and sys fields for the current record.
1774 @param rec record to update
1775 @param offsets column offsets for the record
1776 @param deleted true if row is delete marked
1777 @return DB_SUCCESS or error code. */
1778 dberr_t
adjust_cluster_record(const dict_index_t * index,rec_t * rec,const ulint * offsets,bool deleted)1779 PageConverter::adjust_cluster_record(
1780 	const dict_index_t*	index,
1781 	rec_t*			rec,
1782 	const ulint*		offsets,
1783 	bool			deleted) UNIV_NOTHROW
1784 {
1785 	dberr_t	err;
1786 
1787 	if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1788 
1789 		/* Reset DB_TRX_ID and DB_ROLL_PTR.  Normally, these fields
1790 		are only written in conjunction with other changes to the
1791 		record. */
1792 
1793 		row_upd_rec_sys_fields(
1794 			rec, m_page_zip_ptr, m_cluster_index, m_offsets,
1795 			m_trx, 0);
1796 	}
1797 
1798 	return(err);
1799 }
1800 
1801 /** Update the BLOB refrences and write UNDO log entries for
1802 rows that can't be purged optimistically.
1803 @param block block to update
1804 @retval DB_SUCCESS or error code */
1805 dberr_t
update_records(buf_block_t * block)1806 PageConverter::update_records(
1807 	buf_block_t*	block) UNIV_NOTHROW
1808 {
1809 	ibool	comp = dict_table_is_comp(m_cfg->m_table);
1810 	bool	clust_index = m_index->m_srv_index == m_cluster_index;
1811 
1812 	/* This will also position the cursor on the first user record. */
1813 
1814 	m_rec_iter.open(block);
1815 
1816 	while (!m_rec_iter.end()) {
1817 
1818 		rec_t*	rec = m_rec_iter.current();
1819 
1820 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1821 
1822 		/* For the clustered index we have to adjust the BLOB
1823 		reference and the system fields irrespective of the
1824 		delete marked flag. The adjustment of delete marked
1825 		cluster records is required for purge to work later. */
1826 
1827 		if (deleted || clust_index) {
1828 			m_offsets = rec_get_offsets(
1829 				rec, m_index->m_srv_index, m_offsets,
1830 				ULINT_UNDEFINED, &m_heap);
1831 		}
1832 
1833 		if (clust_index) {
1834 
1835 			dberr_t err = adjust_cluster_record(
1836 				m_index->m_srv_index, rec, m_offsets,
1837 				deleted);
1838 
1839 			if (err != DB_SUCCESS) {
1840 				return(err);
1841 			}
1842 		}
1843 
1844 		/* If it is a delete marked record then try an
1845 		optimistic delete. */
1846 
1847 		if (deleted) {
1848 			/* A successful purge will move the cursor to the
1849 			next record. */
1850 
1851 			if (!purge(m_offsets)) {
1852 				m_rec_iter.next();
1853 			}
1854 
1855 			++m_index->m_stats.m_n_deleted;
1856 		} else {
1857 			++m_index->m_stats.m_n_rows;
1858 			m_rec_iter.next();
1859 		}
1860 	}
1861 
1862 	return(DB_SUCCESS);
1863 }
1864 
1865 /** Update the space, index id, trx id.
1866 @return DB_SUCCESS or error code */
1867 dberr_t
update_index_page(buf_block_t * block)1868 PageConverter::update_index_page(
1869 	buf_block_t*	block) UNIV_NOTHROW
1870 {
1871 	index_id_t	id;
1872 	buf_frame_t*	page = block->frame;
1873 
1874 	if (is_free(block->page.id.page_no())) {
1875 		return(DB_SUCCESS);
1876 	} else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1877 
1878 		row_index_t*	index = find_index(id);
1879 
1880 		if (index == 0) {
1881 			m_index = 0;
1882 			return(DB_CORRUPTION);
1883 		}
1884 
1885 		/* Update current index */
1886 		m_index = index;
1887 	}
1888 
1889 	/* If the .cfg file is missing and there is an index mismatch
1890 	then ignore the error. */
1891 	if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1892 		return(DB_SUCCESS);
1893 	}
1894 
1895 #ifdef UNIV_ZIP_DEBUG
1896 	ut_a(!is_compressed_table()
1897 	     || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1898 #endif /* UNIV_ZIP_DEBUG */
1899 
1900 	/* This has to be written to uncompressed index header. Set it to
1901 	the current index id. */
1902 	btr_page_set_index_id(
1903 		page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1904 
1905 	page_set_max_trx_id(block, m_page_zip_ptr, m_trx->id, 0);
1906 
1907 	if (page_is_empty(block->frame)) {
1908 
1909 		/* Only a root page can be empty. */
1910 		if (!is_root_page(block->frame)) {
1911 			// TODO: We should relax this and skip secondary
1912 			// indexes. Mark them as corrupt because they can
1913 			// always be rebuilt.
1914 			return(DB_CORRUPTION);
1915 		}
1916 
1917 		return(DB_SUCCESS);
1918 	}
1919 
1920 	if (!page_is_leaf(block->frame)) {
1921 		return (DB_SUCCESS);
1922 	}
1923 
1924 	return(update_records(block));
1925 }
1926 
1927 /** Validate the space flags and update tablespace header page.
1928 @param block block read from file, not from the buffer pool.
1929 @retval DB_SUCCESS or error code */
1930 dberr_t
update_header(buf_block_t * block)1931 PageConverter::update_header(
1932 	buf_block_t*	block) UNIV_NOTHROW
1933 {
1934 	/* Check for valid header */
1935 	switch (fsp_header_get_space_id(get_frame(block))) {
1936 	case 0:
1937 		return(DB_CORRUPTION);
1938 	case ULINT_UNDEFINED:
1939 		ib::warn() << "Space id check in the header failed: ignored";
1940 	}
1941 
1942 	ulint	space_flags = fsp_header_get_flags(get_frame(block));
1943 
1944 	if (!fsp_flags_is_valid(space_flags)) {
1945 
1946 		ib::error() <<  "Unsupported tablespace format "
1947 			<< space_flags;
1948 
1949 		return(DB_UNSUPPORTED);
1950 	}
1951 
1952 	/* Write space_id to the tablespace header, page 0. */
1953 	mach_write_to_4(
1954 		get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
1955 		get_space_id());
1956 
1957 	/* This is on every page in the tablespace. */
1958 	mach_write_to_4(
1959 		get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
1960 		get_space_id());
1961 
1962 	return(DB_SUCCESS);
1963 }
1964 
1965 /** Update the page, set the space id, max trx id and index id.
1966 @param block block read from file
1967 @retval DB_SUCCESS or error code */
1968 dberr_t
update_page(buf_block_t * block,ulint & page_type)1969 PageConverter::update_page(
1970 	buf_block_t*	block,
1971 	ulint&		page_type) UNIV_NOTHROW
1972 {
1973 	dberr_t		err = DB_SUCCESS;
1974 
1975 	switch (page_type = fil_page_get_type(get_frame(block))) {
1976 	case FIL_PAGE_TYPE_FSP_HDR:
1977 		/* Work directly on the uncompressed page headers. */
1978 		ut_a(block->page.id.page_no() == 0);
1979 		return(update_header(block));
1980 
1981 	case FIL_PAGE_INDEX:
1982 	case FIL_PAGE_RTREE:
1983 		/* We need to decompress the contents into block->frame
1984 		before we can do any thing with Btree pages. */
1985 
1986 		if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
1987 			return(DB_CORRUPTION);
1988 		}
1989 
1990 		/* This is on every page in the tablespace. */
1991 		mach_write_to_4(
1992 			get_frame(block)
1993 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
1994 
1995 		/* Only update the Btree nodes. */
1996 		return(update_index_page(block));
1997 
1998 	case FIL_PAGE_TYPE_SYS:
1999 		/* This is page 0 in the system tablespace. */
2000 		return(DB_CORRUPTION);
2001 
2002 	case FIL_PAGE_TYPE_XDES:
2003 		err = set_current_xdes(
2004 			block->page.id.page_no(), get_frame(block));
2005 		/* Fall through. */
2006 	case FIL_PAGE_INODE:
2007 	case FIL_PAGE_TYPE_TRX_SYS:
2008 	case FIL_PAGE_IBUF_FREE_LIST:
2009 	case FIL_PAGE_TYPE_ALLOCATED:
2010 	case FIL_PAGE_IBUF_BITMAP:
2011 	case FIL_PAGE_TYPE_BLOB:
2012 	case FIL_PAGE_TYPE_ZBLOB:
2013 	case FIL_PAGE_TYPE_ZBLOB2:
2014 
2015 		/* Work directly on the uncompressed page headers. */
2016 		/* This is on every page in the tablespace. */
2017 		mach_write_to_4(
2018 			get_frame(block)
2019 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2020 
2021 		return(err);
2022 	}
2023 
2024 	ib::warn() << "Unknown page type (" << page_type << ")";
2025 
2026 	return(DB_CORRUPTION);
2027 }
2028 
2029 /** Validate the page
2030 @param offset physical offset within file.
2031 @param page page read from file.
2032 @return status */
2033 PageConverter::import_page_status_t
validate(os_offset_t offset,buf_block_t * block)2034 PageConverter::validate(
2035 	os_offset_t	offset,
2036 	buf_block_t*	block) UNIV_NOTHROW
2037 {
2038 	buf_frame_t*	page = get_frame(block);
2039 
2040 	/* Check that the page number corresponds to the offset in
2041 	the file. Flag as corrupt if it doesn't. Disable the check
2042 	for LSN in buf_page_is_corrupted() */
2043 
2044 	if (buf_page_is_corrupted(
2045 		false, page, get_page_size(),
2046 		fsp_is_checksum_disabled(block->page.id.space()))
2047 	    || (page_get_page_no(page) != offset / m_page_size.physical()
2048 		&& page_get_page_no(page) != 0)) {
2049 
2050 		return(IMPORT_PAGE_STATUS_CORRUPTED);
2051 
2052 	} else if (offset > 0 && page_get_page_no(page) == 0) {
2053 
2054 		/* The page is all zero: do nothing. We already checked
2055 		for all NULs in buf_page_is_corrupted() */
2056 		return(IMPORT_PAGE_STATUS_ALL_ZERO);
2057 	}
2058 
2059 	return(IMPORT_PAGE_STATUS_OK);
2060 }
2061 
2062 /** Called for every page in the tablespace. If the page was not
2063 updated then its state must be set to BUF_PAGE_NOT_USED.
2064 @param offset physical offset within the file
2065 @param block block read from file, note it is not from the buffer pool
2066 @retval DB_SUCCESS or error code. */
2067 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)2068 PageConverter::operator() (
2069 	os_offset_t	offset,
2070 	buf_block_t*	block) UNIV_NOTHROW
2071 {
2072 	ulint		page_type;
2073 	dberr_t		err = DB_SUCCESS;
2074 
2075 	if ((err = periodic_check()) != DB_SUCCESS) {
2076 		return(err);
2077 	}
2078 
2079 	if (is_compressed_table()) {
2080 		m_page_zip_ptr = &block->page.zip;
2081 	} else {
2082 		ut_ad(m_page_zip_ptr == 0);
2083 	}
2084 
2085 	switch (validate(offset, block)) {
2086 	case IMPORT_PAGE_STATUS_OK:
2087 
2088 		/* We have to decompress the compressed pages before
2089 		we can work on them */
2090 
2091 		if ((err = update_page(block, page_type)) != DB_SUCCESS) {
2092 			return(err);
2093 		}
2094 
2095 		/* Note: For compressed pages this function will write to the
2096 		zip descriptor and for uncompressed pages it will write to
2097 		page (ie. the block->frame). Therefore the caller should write
2098 		out the descriptor contents and not block->frame for compressed
2099 		pages. */
2100 
2101 		if (!is_compressed_table()
2102 		    || fil_page_type_is_index(page_type)) {
2103 
2104 			buf_flush_init_for_writing(
2105 				!is_compressed_table() ? block : NULL,
2106 				!is_compressed_table()
2107 				? block->frame : block->page.zip.data,
2108 				!is_compressed_table() ? 0 : m_page_zip_ptr,
2109 				m_current_lsn,
2110 				fsp_is_checksum_disabled(
2111 					block->page.id.space()));
2112 		} else {
2113 			/* Calculate and update the checksum of non-btree
2114 			pages for compressed tables explicitly here. */
2115 
2116 			buf_flush_update_zip_checksum(
2117 				get_frame(block), get_page_size().physical(),
2118 				m_current_lsn);
2119 		}
2120 
2121 		break;
2122 
2123 	case IMPORT_PAGE_STATUS_ALL_ZERO:
2124 		/* The page is all zero: leave it as is. */
2125 		break;
2126 
2127 	case IMPORT_PAGE_STATUS_CORRUPTED:
2128 
2129 		ib::warn() << "Page " << (offset / m_page_size.physical())
2130 			<< " at offset " << offset
2131 			<< " looks corrupted in file " << m_filepath;
2132 
2133 		return(DB_CORRUPTION);
2134 	}
2135 
2136 	return(err);
2137 }
2138 
2139 /*****************************************************************//**
2140 Clean up after import tablespace failure, this function will acquire
2141 the dictionary latches on behalf of the transaction if the transaction
2142 hasn't already acquired them. */
2143 static	MY_ATTRIBUTE((nonnull))
2144 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2145 row_import_discard_changes(
2146 /*=======================*/
2147 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2148 	trx_t*		trx,		/*!< in/out: transaction for import */
2149 	dberr_t		err)		/*!< in: error code */
2150 {
2151 	dict_table_t*	table = prebuilt->table;
2152 
2153 	ut_a(err != DB_SUCCESS);
2154 
2155 	prebuilt->trx->error_index = NULL;
2156 
2157 	ib::info() << "Discarding tablespace of table "
2158 		<< prebuilt->table->name
2159 		<< ": " << ut_strerr(err);
2160 
2161 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2162 		ut_a(trx->dict_operation_lock_mode == 0);
2163 		row_mysql_lock_data_dictionary(trx);
2164 	}
2165 
2166 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2167 
2168 	/* Since we update the index root page numbers on disk after
2169 	we've done a successful import. The table will not be loadable.
2170 	However, we need to ensure that the in memory root page numbers
2171 	are reset to "NULL". */
2172 
2173 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2174 		index != 0;
2175 		index = UT_LIST_GET_NEXT(indexes, index)) {
2176 
2177 		index->page = FIL_NULL;
2178 		index->space = FIL_NULL;
2179 	}
2180 
2181 	table->ibd_file_missing = TRUE;
2182 
2183 	fil_close_tablespace(trx, table->space);
2184 }
2185 
2186 /*****************************************************************//**
2187 Clean up after import tablespace. */
2188 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2189 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2190 row_import_cleanup(
2191 /*===============*/
2192 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2193 	trx_t*		trx,		/*!< in/out: transaction for import */
2194 	dberr_t		err)		/*!< in: error code */
2195 {
2196 	ut_a(prebuilt->trx != trx);
2197 
2198 	if (err != DB_SUCCESS) {
2199 		row_import_discard_changes(prebuilt, trx, err);
2200 	}
2201 
2202 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2203 
2204 	DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2205 
2206 	trx_commit_for_mysql(trx);
2207 
2208 	prebuilt->table->encryption_key = NULL;
2209 	prebuilt->table->encryption_iv = NULL;
2210 
2211 	row_mysql_unlock_data_dictionary(trx);
2212 
2213 	trx_free_for_mysql(trx);
2214 
2215 	prebuilt->trx->op_info = "";
2216 
2217 	DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2218 
2219 	log_make_checkpoint_at(LSN_MAX, TRUE);
2220 
2221 	return(err);
2222 }
2223 
2224 /*****************************************************************//**
2225 Report error during tablespace import. */
2226 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2227 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2228 row_import_error(
2229 /*=============*/
2230 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2231 	trx_t*		trx,		/*!< in/out: transaction for import */
2232 	dberr_t		err)		/*!< in: error code */
2233 {
2234 	if (!trx_is_interrupted(trx)) {
2235 		char	table_name[MAX_FULL_NAME_LEN + 1];
2236 
2237 		innobase_format_name(
2238 			table_name, sizeof(table_name),
2239 			prebuilt->table->name.m_name);
2240 
2241 		ib_senderrf(
2242 			trx->mysql_thd, IB_LOG_LEVEL_WARN,
2243 			ER_INNODB_IMPORT_ERROR,
2244 			table_name, (ulong) err, ut_strerr(err));
2245 	}
2246 
2247 	return(row_import_cleanup(prebuilt, trx, err));
2248 }
2249 
2250 /*****************************************************************//**
2251 Adjust the root page index node and leaf node segment headers, update
2252 with the new space id. For all the table's secondary indexes.
2253 @return error code */
2254 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2255 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(row_prebuilt_t * prebuilt,trx_t * trx,dict_table_t * table,const row_import & cfg)2256 row_import_adjust_root_pages_of_secondary_indexes(
2257 /*==============================================*/
2258 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2259 						handler */
2260 	trx_t*			trx,		/*!< in: transaction used for
2261 						the import */
2262 	dict_table_t*		table,		/*!< in: table the indexes
2263 						belong to */
2264 	const row_import&	cfg)		/*!< Import context */
2265 {
2266 	dict_index_t*		index;
2267 	ulint			n_rows_in_table;
2268 	dberr_t			err = DB_SUCCESS;
2269 
2270 	/* Skip the clustered index. */
2271 	index = dict_table_get_first_index(table);
2272 
2273 	n_rows_in_table = cfg.get_n_rows(index->name);
2274 
2275 	DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2276 			n_rows_in_table++;);
2277 
2278 	/* Adjust the root pages of the secondary indexes only. */
2279 	while ((index = dict_table_get_next_index(index)) != NULL) {
2280 		ut_a(!dict_index_is_clust(index));
2281 
2282 		if (!(index->type & DICT_CORRUPT)
2283 		    && index->space != FIL_NULL
2284 		    && index->page != FIL_NULL) {
2285 
2286 			/* Update the Btree segment headers for index node and
2287 			leaf nodes in the root page. Set the new space id. */
2288 
2289 			err = btr_root_adjust_on_import(index);
2290 		} else {
2291 			ib::warn() << "Skip adjustment of root pages for"
2292 				" index " << index->name << ".";
2293 
2294 			err = DB_CORRUPTION;
2295 		}
2296 
2297 		if (err != DB_SUCCESS) {
2298 
2299 			if (index->type & DICT_CLUSTERED) {
2300 				break;
2301 			}
2302 
2303 			ib_errf(trx->mysql_thd,
2304 				IB_LOG_LEVEL_WARN,
2305 				ER_INNODB_INDEX_CORRUPT,
2306 				"Index %s not found or corrupt,"
2307 				" you should recreate this index.",
2308 				index->name());
2309 
2310 			/* Do not bail out, so that the data
2311 			can be recovered. */
2312 
2313 			err = DB_SUCCESS;
2314 			index->type |= DICT_CORRUPT;
2315 			continue;
2316 		}
2317 
2318 		/* If we failed to purge any records in the index then
2319 		do it the hard way.
2320 
2321 		TODO: We can do this in the first pass by generating UNDO log
2322 		records for the failed rows. */
2323 
2324 		if (!cfg.requires_purge(index->name)) {
2325 			continue;
2326 		}
2327 
2328 		IndexPurge   purge(trx, index);
2329 
2330 		trx->op_info = "secondary: purge delete marked records";
2331 
2332 		err = purge.garbage_collect();
2333 
2334 		trx->op_info = "";
2335 
2336 		if (err != DB_SUCCESS) {
2337 			break;
2338 		} else if (purge.get_n_rows() != n_rows_in_table) {
2339 
2340 			ib_errf(trx->mysql_thd,
2341 				IB_LOG_LEVEL_WARN,
2342 				ER_INNODB_INDEX_CORRUPT,
2343 				"Index %s contains %lu entries,"
2344 				" should be %lu, you should recreate"
2345 				" this index.", index->name(),
2346 				(ulong) purge.get_n_rows(),
2347 				(ulong) n_rows_in_table);
2348 
2349 			index->type |= DICT_CORRUPT;
2350 
2351 			/* Do not bail out, so that the data
2352 			can be recovered. */
2353 
2354 			err = DB_SUCCESS;
2355                 }
2356 	}
2357 
2358 	return(err);
2359 }
2360 
2361 /*****************************************************************//**
2362 Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
2363 @return error code */
2364 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2365 dberr_t
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2366 row_import_set_sys_max_row_id(
2367 /*==========================*/
2368 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2369 						handler */
2370 	const dict_table_t*	table)		/*!< in: table to import */
2371 {
2372 	dberr_t			err;
2373 	const rec_t*		rec;
2374 	mtr_t			mtr;
2375 	btr_pcur_t		pcur;
2376 	row_id_t		row_id	= 0;
2377 	dict_index_t*		index;
2378 
2379 	index = dict_table_get_first_index(table);
2380 	ut_a(dict_index_is_clust(index));
2381 
2382 	mtr_start(&mtr);
2383 
2384 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2385 
2386 	btr_pcur_open_at_index_side(
2387 		false,		// High end
2388 		index,
2389 		BTR_SEARCH_LEAF,
2390 		&pcur,
2391 		true,		// Init cursor
2392 		0,		// Leaf level
2393 		&mtr);
2394 
2395 	btr_pcur_move_to_prev_on_page(&pcur);
2396 	rec = btr_pcur_get_rec(&pcur);
2397 
2398 	/* Check for empty table. */
2399 	if (!page_rec_is_infimum(rec)) {
2400 		ulint		len;
2401 		const byte*	field;
2402 		mem_heap_t*	heap = NULL;
2403 		ulint		offsets_[1 + REC_OFFS_HEADER_SIZE];
2404 		ulint*		offsets;
2405 
2406 		rec_offs_init(offsets_);
2407 
2408 		offsets = rec_get_offsets(
2409 			rec, index, offsets_, ULINT_UNDEFINED, &heap);
2410 
2411 		field = rec_get_nth_field(
2412 			rec, offsets,
2413 			dict_index_get_sys_col_pos(index, DATA_ROW_ID),
2414 			&len);
2415 
2416 		if (len == DATA_ROW_ID_LEN) {
2417 			row_id = mach_read_from_6(field);
2418 			err = DB_SUCCESS;
2419 		} else {
2420 			err = DB_CORRUPTION;
2421 		}
2422 
2423 		if (heap != NULL) {
2424 			mem_heap_free(heap);
2425 		}
2426 	} else {
2427 		/* The table is empty. */
2428 		err = DB_SUCCESS;
2429 	}
2430 
2431 	btr_pcur_close(&pcur);
2432 	mtr_commit(&mtr);
2433 
2434 	DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
2435 			err = DB_CORRUPTION;);
2436 
2437 	if (err != DB_SUCCESS) {
2438 		ib_errf(prebuilt->trx->mysql_thd,
2439 			IB_LOG_LEVEL_WARN,
2440 			ER_INNODB_INDEX_CORRUPT,
2441 			"Index `%s` corruption detected, invalid DB_ROW_ID"
2442 			" in index.", index->name());
2443 
2444 		return(err);
2445 
2446 	} else if (row_id > 0) {
2447 
2448 		/* Update the system row id if the imported index row id is
2449 		greater than the max system row id. */
2450 
2451 		mutex_enter(&dict_sys->mutex);
2452 
2453 		if (row_id >= dict_sys->row_id) {
2454 			dict_sys->row_id = row_id + 1;
2455 			dict_hdr_flush_row_id();
2456 		}
2457 
2458 		mutex_exit(&dict_sys->mutex);
2459 	}
2460 
2461 	return(DB_SUCCESS);
2462 }
2463 
2464 /*****************************************************************//**
2465 Read the a string from the meta data file.
2466 @return DB_SUCCESS or error code. */
2467 static
2468 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2469 row_import_cfg_read_string(
2470 /*=======================*/
2471 	FILE*		file,		/*!< in/out: File to read from */
2472 	byte*		ptr,		/*!< out: string to read */
2473 	ulint		max_len)	/*!< in: maximum length of the output
2474 					buffer in bytes */
2475 {
2476 	DBUG_EXECUTE_IF("ib_import_string_read_error",
2477 			errno = EINVAL; return(DB_IO_ERROR););
2478 
2479 	ulint		len = 0;
2480 
2481 	while (!feof(file)) {
2482 		int	ch = fgetc(file);
2483 
2484 		if (ch == EOF) {
2485 			break;
2486 		} else if (ch != 0) {
2487 			if (len < max_len) {
2488 				ptr[len++] = ch;
2489 			} else {
2490 				break;
2491 			}
2492 		/* max_len includes the NUL byte */
2493 		} else if (len != max_len - 1) {
2494 			break;
2495 		} else {
2496 			ptr[len] = 0;
2497 			return(DB_SUCCESS);
2498 		}
2499 	}
2500 
2501 	errno = EINVAL;
2502 
2503 	return(DB_IO_ERROR);
2504 }
2505 
2506 /*********************************************************************//**
2507 Write the meta data (index user fields) config file.
2508 @return DB_SUCCESS or error code. */
2509 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2510 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index,row_import * cfg)2511 row_import_cfg_read_index_fields(
2512 /*=============================*/
2513 	FILE*			file,	/*!< in: file to write to */
2514 	THD*			thd,	/*!< in/out: session */
2515 	row_index_t*		index,	/*!< Index being read in */
2516 	row_import*		cfg)	/*!< in/out: meta-data read */
2517 {
2518 	byte			row[sizeof(ib_uint32_t) * 3];
2519 	ulint			n_fields = index->m_n_fields;
2520 
2521 	index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2522 
2523 	/* Trigger OOM */
2524 	DBUG_EXECUTE_IF(
2525 		"ib_import_OOM_4",
2526 		UT_DELETE_ARRAY(index->m_fields);
2527 		index->m_fields = NULL;
2528 	);
2529 
2530 	if (index->m_fields == NULL) {
2531 		return(DB_OUT_OF_MEMORY);
2532 	}
2533 
2534 	dict_field_t*	field = index->m_fields;
2535 
2536 	std::uninitialized_fill_n(field, n_fields, dict_field_t());
2537 
2538 	for (ulint i = 0; i < n_fields; ++i, ++field) {
2539 		byte*		ptr = row;
2540 
2541 		/* Trigger EOF */
2542 		DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2543 				(void) fseek(file, 0L, SEEK_END););
2544 
2545 		if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2546 
2547 			ib_senderrf(
2548 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2549 				errno, strerror(errno),
2550 				"while reading index fields.");
2551 
2552 			return(DB_IO_ERROR);
2553 		}
2554 
2555 		field->prefix_len = mach_read_from_4(ptr);
2556 		ptr += sizeof(ib_uint32_t);
2557 
2558 		field->fixed_len = mach_read_from_4(ptr);
2559 		ptr += sizeof(ib_uint32_t);
2560 
2561 		/* Include the NUL byte in the length. */
2562 		ulint	len = mach_read_from_4(ptr);
2563 
2564 		byte*	name = UT_NEW_ARRAY_NOKEY(byte, len);
2565 
2566 		/* Trigger OOM */
2567 		DBUG_EXECUTE_IF(
2568 			"ib_import_OOM_5",
2569 			UT_DELETE_ARRAY(name);
2570 			name = NULL;
2571 		);
2572 
2573 		if (name == NULL) {
2574 			return(DB_OUT_OF_MEMORY);
2575 		}
2576 
2577 		field->name = reinterpret_cast<const char*>(name);
2578 
2579 		dberr_t	err = row_import_cfg_read_string(file, name, len);
2580 
2581 		if (err != DB_SUCCESS) {
2582 
2583 			ib_senderrf(
2584 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2585 				errno, strerror(errno),
2586 				"while parsing table name.");
2587 
2588 			return(err);
2589 		}
2590 	}
2591 
2592 	return(DB_SUCCESS);
2593 }
2594 
2595 /*****************************************************************//**
2596 Read the index names and root page numbers of the indexes and set the values.
2597 Row format [root_page_no, len of str, str ... ]
2598 @return DB_SUCCESS or error code. */
2599 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2600 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2601 row_import_read_index_data(
2602 /*=======================*/
2603 	FILE*		file,		/*!< in: File to read from */
2604 	THD*		thd,		/*!< in: session */
2605 	row_import*	cfg)		/*!< in/out: meta-data read */
2606 {
2607 	byte*		ptr;
2608 	row_index_t*	cfg_index;
2609 	byte		row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2610 
2611 	/* FIXME: What is the max value? */
2612 	ut_a(cfg->m_n_indexes > 0);
2613 	ut_a(cfg->m_n_indexes < 1024);
2614 
2615 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2616 
2617 	/* Trigger OOM */
2618 	DBUG_EXECUTE_IF(
2619 		"ib_import_OOM_6",
2620 		UT_DELETE_ARRAY(cfg->m_indexes);
2621 		cfg->m_indexes = NULL;
2622 	);
2623 
2624 	if (cfg->m_indexes == NULL) {
2625 		return(DB_OUT_OF_MEMORY);
2626 	}
2627 
2628 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2629 
2630 	cfg_index = cfg->m_indexes;
2631 
2632 	for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2633 		/* Trigger EOF */
2634 		DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2635 				(void) fseek(file, 0L, SEEK_END););
2636 
2637 		/* Read the index data. */
2638 		size_t	n_bytes = fread(row, 1, sizeof(row), file);
2639 
2640 		/* Trigger EOF */
2641 		DBUG_EXECUTE_IF("ib_import_io_read_error",
2642 				(void) fseek(file, 0L, SEEK_END););
2643 
2644 		if (n_bytes != sizeof(row)) {
2645 			char	msg[BUFSIZ];
2646 
2647 			ut_snprintf(msg, sizeof(msg),
2648 				    "while reading index meta-data, expected"
2649 				    " to read %lu bytes but read only %lu"
2650 				    " bytes",
2651 				    (ulong) sizeof(row), (ulong) n_bytes);
2652 
2653 			ib_senderrf(
2654 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2655 				errno, strerror(errno), msg);
2656 
2657 			ib::error() << "IO Error: " << msg;
2658 
2659 			return(DB_IO_ERROR);
2660 		}
2661 
2662 		ptr = row;
2663 
2664 		cfg_index->m_id = mach_read_from_8(ptr);
2665 		ptr += sizeof(index_id_t);
2666 
2667 		cfg_index->m_space = mach_read_from_4(ptr);
2668 		ptr += sizeof(ib_uint32_t);
2669 
2670 		cfg_index->m_page_no = mach_read_from_4(ptr);
2671 		ptr += sizeof(ib_uint32_t);
2672 
2673 		cfg_index->m_type = mach_read_from_4(ptr);
2674 		ptr += sizeof(ib_uint32_t);
2675 
2676 		cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2677 		if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2678 			ut_ad(0);
2679 			/* Overflow. Pretend that the clustered index
2680 			has a variable-length PRIMARY KEY. */
2681 			cfg_index->m_trx_id_offset = 0;
2682 		}
2683 		ptr += sizeof(ib_uint32_t);
2684 
2685 		cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2686 		ptr += sizeof(ib_uint32_t);
2687 
2688 		cfg_index->m_n_uniq = mach_read_from_4(ptr);
2689 		ptr += sizeof(ib_uint32_t);
2690 
2691 		cfg_index->m_n_nullable = mach_read_from_4(ptr);
2692 		ptr += sizeof(ib_uint32_t);
2693 
2694 		cfg_index->m_n_fields = mach_read_from_4(ptr);
2695 		ptr += sizeof(ib_uint32_t);
2696 
2697 		/* The NUL byte is included in the name length. */
2698 		ulint	len = mach_read_from_4(ptr);
2699 
2700 		if (len > OS_FILE_MAX_PATH) {
2701 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2702 				ER_INNODB_INDEX_CORRUPT,
2703 				"Index name length (%lu) is too long,"
2704 				" the meta-data is corrupt", len);
2705 
2706 			return(DB_CORRUPTION);
2707 		}
2708 
2709 		cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2710 
2711 		/* Trigger OOM */
2712 		DBUG_EXECUTE_IF(
2713 			"ib_import_OOM_7",
2714 			UT_DELETE_ARRAY(cfg_index->m_name);
2715 			cfg_index->m_name = NULL;
2716 		);
2717 
2718 		if (cfg_index->m_name == NULL) {
2719 			return(DB_OUT_OF_MEMORY);
2720 		}
2721 
2722 		dberr_t	err;
2723 
2724 		err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2725 
2726 		if (err != DB_SUCCESS) {
2727 
2728 			ib_senderrf(
2729 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2730 				errno, strerror(errno),
2731 				"while parsing index name.");
2732 
2733 			return(err);
2734 		}
2735 
2736 		err = row_import_cfg_read_index_fields(
2737 			file, thd, cfg_index, cfg);
2738 
2739 		if (err != DB_SUCCESS) {
2740 			return(err);
2741 		}
2742 
2743 	}
2744 
2745 	return(DB_SUCCESS);
2746 }
2747 
2748 /*****************************************************************//**
2749 Set the index root page number for v1 format.
2750 @return DB_SUCCESS or error code. */
2751 static
2752 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2753 row_import_read_indexes(
2754 /*====================*/
2755 	FILE*		file,		/*!< in: File to read from */
2756 	THD*		thd,		/*!< in: session */
2757 	row_import*	cfg)		/*!< in/out: meta-data read */
2758 {
2759 	byte		row[sizeof(ib_uint32_t)];
2760 
2761 	/* Trigger EOF */
2762 	DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2763 			(void) fseek(file, 0L, SEEK_END););
2764 
2765 	/* Read the number of indexes. */
2766 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2767 		ib_senderrf(
2768 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2769 			errno, strerror(errno),
2770 			"while reading number of indexes.");
2771 
2772 		return(DB_IO_ERROR);
2773 	}
2774 
2775 	cfg->m_n_indexes = mach_read_from_4(row);
2776 
2777 	if (cfg->m_n_indexes == 0) {
2778 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2779 			"Number of indexes in meta-data file is 0");
2780 
2781 		return(DB_CORRUPTION);
2782 
2783 	} else if (cfg->m_n_indexes > 1024) {
2784 		// FIXME: What is the upper limit? */
2785 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2786 			"Number of indexes in meta-data file is too high: %lu",
2787 			(ulong) cfg->m_n_indexes);
2788 		cfg->m_n_indexes = 0;
2789 
2790 		return(DB_CORRUPTION);
2791 	}
2792 
2793 	return(row_import_read_index_data(file, thd, cfg));
2794 }
2795 
2796 /*********************************************************************//**
2797 Read the meta data (table columns) config file. Deserialise the contents of
2798 dict_col_t structure, along with the column name. */
2799 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2800 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2801 row_import_read_columns(
2802 /*====================*/
2803 	FILE*			file,	/*!< in: file to write to */
2804 	THD*			thd,	/*!< in/out: session */
2805 	row_import*		cfg)	/*!< in/out: meta-data read */
2806 {
2807 	dict_col_t*		col;
2808 	byte			row[sizeof(ib_uint32_t) * 8];
2809 
2810 	/* FIXME: What should the upper limit be? */
2811 	ut_a(cfg->m_n_cols > 0);
2812 	ut_a(cfg->m_n_cols < 1024);
2813 
2814 	cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2815 
2816 	/* Trigger OOM */
2817 	DBUG_EXECUTE_IF(
2818 		"ib_import_OOM_8",
2819 		UT_DELETE_ARRAY(cfg->m_cols);
2820 		cfg->m_cols = NULL;
2821 	);
2822 
2823 	if (cfg->m_cols == NULL) {
2824 		return(DB_OUT_OF_MEMORY);
2825 	}
2826 
2827 	cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2828 
2829 	/* Trigger OOM */
2830 	DBUG_EXECUTE_IF(
2831 		"ib_import_OOM_9",
2832 		UT_DELETE_ARRAY(cfg->m_col_names);
2833 		cfg->m_col_names = NULL;
2834 	);
2835 
2836 	if (cfg->m_col_names == NULL) {
2837 		return(DB_OUT_OF_MEMORY);
2838 	}
2839 
2840 	memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2841 	memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2842 
2843 	col = cfg->m_cols;
2844 
2845 	for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2846 		byte*		ptr = row;
2847 
2848 		/* Trigger EOF */
2849 		DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2850 				(void) fseek(file, 0L, SEEK_END););
2851 
2852 		if (fread(row, 1,  sizeof(row), file) != sizeof(row)) {
2853 			ib_senderrf(
2854 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2855 				errno, strerror(errno),
2856 				"while reading table column meta-data.");
2857 
2858 			return(DB_IO_ERROR);
2859 		}
2860 
2861 		col->prtype = mach_read_from_4(ptr);
2862 		ptr += sizeof(ib_uint32_t);
2863 
2864 		col->mtype = mach_read_from_4(ptr);
2865 		ptr += sizeof(ib_uint32_t);
2866 
2867 		col->len = mach_read_from_4(ptr);
2868 		ptr += sizeof(ib_uint32_t);
2869 
2870 		col->mbminmaxlen = mach_read_from_4(ptr);
2871 		ptr += sizeof(ib_uint32_t);
2872 
2873 		col->ind = mach_read_from_4(ptr);
2874 		ptr += sizeof(ib_uint32_t);
2875 
2876 		col->ord_part = mach_read_from_4(ptr);
2877 		ptr += sizeof(ib_uint32_t);
2878 
2879 		col->max_prefix = mach_read_from_4(ptr);
2880 		ptr += sizeof(ib_uint32_t);
2881 
2882 		/* Read in the column name as [len, byte array]. The len
2883 		includes the NUL byte. */
2884 
2885 		ulint		len = mach_read_from_4(ptr);
2886 
2887 		/* FIXME: What is the maximum column name length? */
2888 		if (len == 0 || len > 128) {
2889 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2890 				ER_IO_READ_ERROR,
2891 				"Column name length %lu, is invalid",
2892 				(ulong) len);
2893 
2894 			return(DB_CORRUPTION);
2895 		}
2896 
2897 		cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2898 
2899 		/* Trigger OOM */
2900 		DBUG_EXECUTE_IF(
2901 			"ib_import_OOM_10",
2902 			UT_DELETE_ARRAY(cfg->m_col_names[i]);
2903 			cfg->m_col_names[i] = NULL;
2904 		);
2905 
2906 		if (cfg->m_col_names[i] == NULL) {
2907 			return(DB_OUT_OF_MEMORY);
2908 		}
2909 
2910 		dberr_t	err;
2911 
2912 		err = row_import_cfg_read_string(
2913 			file, cfg->m_col_names[i], len);
2914 
2915 		if (err != DB_SUCCESS) {
2916 
2917 			ib_senderrf(
2918 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2919 				errno, strerror(errno),
2920 				"while parsing table column name.");
2921 
2922 			return(err);
2923 		}
2924 	}
2925 
2926 	return(DB_SUCCESS);
2927 }
2928 
2929 /*****************************************************************//**
2930 Read the contents of the <tablespace>.cfg file.
2931 @return DB_SUCCESS or error code. */
2932 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2933 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2934 row_import_read_v1(
2935 /*===============*/
2936 	FILE*		file,		/*!< in: File to read from */
2937 	THD*		thd,		/*!< in: session */
2938 	row_import*	cfg)		/*!< out: meta data */
2939 {
2940 	byte		value[sizeof(ib_uint32_t)];
2941 
2942 	/* Trigger EOF */
2943 	DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2944 			(void) fseek(file, 0L, SEEK_END););
2945 
2946 	/* Read the hostname where the tablespace was exported. */
2947 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2948 		ib_senderrf(
2949 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2950 			errno, strerror(errno),
2951 			"while reading meta-data export hostname length.");
2952 
2953 		return(DB_IO_ERROR);
2954 	}
2955 
2956 	ulint	len = mach_read_from_4(value);
2957 
2958 	/* NUL byte is part of name length. */
2959 	cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2960 
2961 	/* Trigger OOM */
2962 	DBUG_EXECUTE_IF(
2963 		"ib_import_OOM_1",
2964 		UT_DELETE_ARRAY(cfg->m_hostname);
2965 		cfg->m_hostname = NULL;
2966 	);
2967 
2968 	if (cfg->m_hostname == NULL) {
2969 		return(DB_OUT_OF_MEMORY);
2970 	}
2971 
2972 	dberr_t	err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2973 
2974 	if (err != DB_SUCCESS) {
2975 
2976 		ib_senderrf(
2977 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2978 			errno, strerror(errno),
2979 			"while parsing export hostname.");
2980 
2981 		return(err);
2982 	}
2983 
2984 	/* Trigger EOF */
2985 	DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2986 			(void) fseek(file, 0L, SEEK_END););
2987 
2988 	/* Read the table name of tablespace that was exported. */
2989 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2990 		ib_senderrf(
2991 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2992 			errno, strerror(errno),
2993 			"while reading meta-data table name length.");
2994 
2995 		return(DB_IO_ERROR);
2996 	}
2997 
2998 	len = mach_read_from_4(value);
2999 
3000 	/* NUL byte is part of name length. */
3001 	cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
3002 
3003 	/* Trigger OOM */
3004 	DBUG_EXECUTE_IF(
3005 		"ib_import_OOM_2",
3006 		UT_DELETE_ARRAY(cfg->m_table_name);
3007 		cfg->m_table_name = NULL;
3008 	);
3009 
3010 	if (cfg->m_table_name == NULL) {
3011 		return(DB_OUT_OF_MEMORY);
3012 	}
3013 
3014 	err = row_import_cfg_read_string(file, cfg->m_table_name, len);
3015 
3016 	if (err != DB_SUCCESS) {
3017 		ib_senderrf(
3018 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3019 			errno, strerror(errno),
3020 			"while parsing table name.");
3021 
3022 		return(err);
3023 	}
3024 
3025 	ib::info() << "Importing tablespace for table '" << cfg->m_table_name
3026 		<< "' that was exported from host '" << cfg->m_hostname << "'";
3027 
3028 	byte		row[sizeof(ib_uint32_t) * 3];
3029 
3030 	/* Trigger EOF */
3031 	DBUG_EXECUTE_IF("ib_import_io_read_error_7",
3032 			(void) fseek(file, 0L, SEEK_END););
3033 
3034 	/* Read the autoinc value. */
3035 	if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
3036 		ib_senderrf(
3037 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3038 			errno, strerror(errno),
3039 			"while reading autoinc value.");
3040 
3041 		return(DB_IO_ERROR);
3042 	}
3043 
3044 	cfg->m_autoinc = mach_read_from_8(row);
3045 
3046 	/* Trigger EOF */
3047 	DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3048 			(void) fseek(file, 0L, SEEK_END););
3049 
3050 	/* Read the tablespace page size. */
3051 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3052 		ib_senderrf(
3053 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3054 			errno, strerror(errno),
3055 			"while reading meta-data header.");
3056 
3057 		return(DB_IO_ERROR);
3058 	}
3059 
3060 	byte*		ptr = row;
3061 
3062 	const ulint	logical_page_size = mach_read_from_4(ptr);
3063 	ptr += sizeof(ib_uint32_t);
3064 
3065 	if (logical_page_size != univ_page_size.logical()) {
3066 
3067 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3068 			"Tablespace to be imported has a different"
3069 			" page size than this server. Server page size"
3070 			" is " ULINTPF ", whereas tablespace page size"
3071 			" is " ULINTPF,
3072 			univ_page_size.logical(),
3073 			logical_page_size);
3074 
3075 		return(DB_ERROR);
3076 	}
3077 
3078 	cfg->m_flags = mach_read_from_4(ptr);
3079 	ptr += sizeof(ib_uint32_t);
3080 
3081 	cfg->m_page_size.copy_from(dict_tf_get_page_size(cfg->m_flags));
3082 
3083 	ut_a(logical_page_size == cfg->m_page_size.logical());
3084 
3085 	cfg->m_n_cols = mach_read_from_4(ptr);
3086 
3087 	if (!dict_tf_is_valid(cfg->m_flags)) {
3088 
3089 		return(DB_CORRUPTION);
3090 
3091 	} else if ((err = row_import_read_columns(file, thd, cfg))
3092 		   != DB_SUCCESS) {
3093 
3094 		return(err);
3095 
3096 	} else  if ((err = row_import_read_indexes(file, thd, cfg))
3097 		   != DB_SUCCESS) {
3098 
3099 		return(err);
3100 	}
3101 
3102 	ut_a(err == DB_SUCCESS);
3103 	return(err);
3104 }
3105 
3106 /**
3107 Read the contents of the <tablespace>.cfg file.
3108 @return DB_SUCCESS or error code. */
3109 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3110 dberr_t
row_import_read_meta_data(dict_table_t * table,FILE * file,THD * thd,row_import & cfg)3111 row_import_read_meta_data(
3112 /*======================*/
3113 	dict_table_t*	table,		/*!< in: table */
3114 	FILE*		file,		/*!< in: File to read from */
3115 	THD*		thd,		/*!< in: session */
3116 	row_import&	cfg)		/*!< out: contents of the .cfg file */
3117 {
3118 	byte		row[sizeof(ib_uint32_t)];
3119 
3120 	/* Trigger EOF */
3121 	DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3122 			(void) fseek(file, 0L, SEEK_END););
3123 
3124 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3125 		ib_senderrf(
3126 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3127 			errno, strerror(errno),
3128 			"while reading meta-data version.");
3129 
3130 		return(DB_IO_ERROR);
3131 	}
3132 
3133 	cfg.m_version = mach_read_from_4(row);
3134 
3135 	/* Check the version number. */
3136 	switch (cfg.m_version) {
3137 	case IB_EXPORT_CFG_VERSION_V1:
3138 
3139 		return(row_import_read_v1(file, thd, &cfg));
3140 	default:
3141 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3142 			"Unsupported meta-data version number (%lu),"
3143 			" file ignored", (ulong) cfg.m_version);
3144 	}
3145 
3146 	return(DB_ERROR);
3147 }
3148 
3149 /**
3150 Read the contents of the <tablename>.cfg file.
3151 @return DB_SUCCESS or error code. */
3152 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3153 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3154 row_import_read_cfg(
3155 /*================*/
3156 	dict_table_t*	table,	/*!< in: table */
3157 	THD*		thd,	/*!< in: session */
3158 	row_import&	cfg)	/*!< out: contents of the .cfg file */
3159 {
3160 	dberr_t		err;
3161 	char		name[OS_FILE_MAX_PATH];
3162 
3163 	cfg.m_table = table;
3164 
3165 	srv_get_meta_data_filename(table, name, sizeof(name));
3166 
3167 	FILE*	file = fopen(name, "rb");
3168 
3169 	if (file == NULL) {
3170 		char	msg[BUFSIZ];
3171 
3172 		ut_snprintf(msg, sizeof(msg),
3173 			    "Error opening '%s', will attempt to import"
3174 			    " without schema verification", name);
3175 
3176 		ib_senderrf(
3177 			thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3178 			errno, strerror(errno), msg);
3179 
3180 		cfg.m_missing = true;
3181 
3182 		err = DB_FAIL;
3183 	} else {
3184 
3185 		cfg.m_missing = false;
3186 
3187 		err = row_import_read_meta_data(table, file, thd, cfg);
3188 		fclose(file);
3189 	}
3190 
3191 	return(err);
3192 }
3193 
3194 /** Read the contents of the <tablespace>.cfp file.
3195 @param[in]	table		table
3196 @param[in]	file		file to read from
3197 @param[in]	thd		session
3198 @param[in]	cfp		contents of the .cfp file
3199 @return DB_SUCCESS or error code. */
3200 static
3201 dberr_t
row_import_read_encryption_data(dict_table_t * table,FILE * file,THD * thd)3202 row_import_read_encryption_data(
3203 	dict_table_t*	table,
3204 	FILE*		file,
3205 	THD*		thd)
3206 {
3207 	byte		row[sizeof(ib_uint32_t)];
3208 	ulint		key_size;
3209 	byte		transfer_key[ENCRYPTION_KEY_LEN];
3210 	byte		encryption_key[ENCRYPTION_KEY_LEN];
3211 	byte		encryption_iv[ENCRYPTION_KEY_LEN];
3212 	lint		elen;
3213 
3214 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3215 		ib_senderrf(
3216 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3217 			errno, strerror(errno),
3218 			"while reading encrypton key size.");
3219 
3220 		return(DB_IO_ERROR);
3221 	}
3222 
3223 	key_size = mach_read_from_4(row);
3224 	if (key_size != ENCRYPTION_KEY_LEN) {
3225 		ib_senderrf(
3226 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3227 			errno, strerror(errno),
3228 			"while parsing encryption key size.");
3229 
3230 		return(DB_IO_ERROR);
3231 	}
3232 
3233 	/* Read the transfer key. */
3234 	if (fread(transfer_key, 1, ENCRYPTION_KEY_LEN, file)
3235 		!= ENCRYPTION_KEY_LEN) {
3236 		ib_senderrf(
3237 			thd, IB_LOG_LEVEL_WARN, ER_IO_WRITE_ERROR,
3238 			errno, strerror(errno),
3239 			"while reading tranfer key.");
3240 
3241 		return(DB_IO_ERROR);
3242 	}
3243 
3244 	/* Read the encrypted key. */
3245 	if (fread(encryption_key, 1, ENCRYPTION_KEY_LEN, file)
3246 		!= ENCRYPTION_KEY_LEN) {
3247 		ib_senderrf(
3248 			thd, IB_LOG_LEVEL_WARN, ER_IO_WRITE_ERROR,
3249 			errno, strerror(errno),
3250 			"while reading encryption key.");
3251 
3252 		return(DB_IO_ERROR);
3253 	}
3254 
3255 	/* Read the encrypted iv. */
3256 	if (fread(encryption_iv, 1, ENCRYPTION_KEY_LEN, file)
3257 		!= ENCRYPTION_KEY_LEN) {
3258 		ib_senderrf(
3259 			thd, IB_LOG_LEVEL_WARN, ER_IO_WRITE_ERROR,
3260 			errno, strerror(errno),
3261 			"while reading encryption iv.");
3262 
3263 		return(DB_IO_ERROR);
3264 	}
3265 
3266 	table->encryption_key =
3267 		static_cast<byte*>(mem_heap_alloc(table->heap,
3268 						  ENCRYPTION_KEY_LEN));
3269 
3270 	table->encryption_iv =
3271 		static_cast<byte*>(mem_heap_alloc(table->heap,
3272 						  ENCRYPTION_KEY_LEN));
3273 	/* Decrypt tablespace key and iv. */
3274 	elen = my_aes_decrypt(
3275 		encryption_key,
3276 		ENCRYPTION_KEY_LEN,
3277 		table->encryption_key,
3278 		transfer_key,
3279 		ENCRYPTION_KEY_LEN,
3280 		my_aes_256_ecb, NULL, false);
3281 
3282 	if (elen == MY_AES_BAD_DATA) {
3283 		ib_senderrf(
3284 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3285 			errno, strerror(errno),
3286 			"while decrypt encryption key.");
3287 
3288 		return(DB_IO_ERROR);
3289 	}
3290 
3291 	elen = my_aes_decrypt(
3292 		encryption_iv,
3293 		ENCRYPTION_KEY_LEN,
3294 		table->encryption_iv,
3295 		transfer_key,
3296 		ENCRYPTION_KEY_LEN,
3297 		my_aes_256_ecb, NULL, false);
3298 
3299 	if (elen == MY_AES_BAD_DATA) {
3300 		ib_senderrf(
3301 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3302 			errno, strerror(errno),
3303 			"while decrypt encryption iv.");
3304 
3305 		return(DB_IO_ERROR);
3306 	}
3307 
3308 	return(DB_SUCCESS);
3309 }
3310 
3311 /** Read the contents of the <tablename>.cfp file.
3312 @param[in]	table		table
3313 @param[in]	thd		session
3314 @param[in]	cfp		contents of the .cfp file
3315 @return DB_SUCCESS or error code. */
3316 static
3317 dberr_t
row_import_read_cfp(dict_table_t * table,THD * thd,row_import & import)3318 row_import_read_cfp(
3319 	dict_table_t*	table,
3320 	THD*		thd,
3321 	row_import&	import)
3322 {
3323 	dberr_t		err;
3324 	char		name[OS_FILE_MAX_PATH];
3325 
3326 	/* Clear table encryption information. */
3327 	table->encryption_key = NULL;
3328 	table->encryption_iv  = NULL;
3329 
3330 	srv_get_encryption_data_filename(table, name, sizeof(name));
3331 
3332 	FILE*	file = fopen(name, "rb");
3333 
3334 	if (file == NULL) {
3335 		/* If there's no cfp file, we assume it's not an
3336 		encrpyted table. return directly. */
3337 
3338 		import.m_cfp_missing = true;
3339 
3340 		err = DB_SUCCESS;
3341 	} else {
3342 
3343 		import.m_cfp_missing = false;
3344 
3345 		err = row_import_read_encryption_data(table, file,
3346 						      thd);
3347 		fclose(file);
3348 	}
3349 
3350 	return(err);
3351 }
3352 
3353 /*****************************************************************//**
3354 Update the <space, root page> of a table's indexes from the values
3355 in the data dictionary.
3356 @return DB_SUCCESS or error code */
3357 dberr_t
row_import_update_index_root(trx_t * trx,const dict_table_t * table,bool reset,bool dict_locked)3358 row_import_update_index_root(
3359 /*=========================*/
3360 	trx_t*			trx,		/*!< in/out: transaction that
3361 						covers the update */
3362 	const dict_table_t*	table,		/*!< in: Table for which we want
3363 						to set the root page_no */
3364 	bool			reset,		/*!< in: if true then set to
3365 						FIL_NUL */
3366 	bool			dict_locked)	/*!< in: Set to true if the
3367 						caller already owns the
3368 						dict_sys_t:: mutex. */
3369 
3370 {
3371 	const dict_index_t*	index;
3372 	que_t*			graph = 0;
3373 	dberr_t			err = DB_SUCCESS;
3374 
3375 	static const char	sql[] = {
3376 		"PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3377 		"BEGIN\n"
3378 		"UPDATE SYS_INDEXES\n"
3379 		"SET SPACE = :space,\n"
3380 		"    PAGE_NO = :page,\n"
3381 		"    TYPE = :type\n"
3382 		"WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3383 		"END;\n"};
3384 
3385 	if (!dict_locked) {
3386 		mutex_enter(&dict_sys->mutex);
3387 	}
3388 
3389 	for (index = dict_table_get_first_index(table);
3390 	     index != 0;
3391 	     index = dict_table_get_next_index(index)) {
3392 
3393 		pars_info_t*	info;
3394 		ib_uint32_t	page;
3395 		ib_uint32_t	space;
3396 		ib_uint32_t	type;
3397 		index_id_t	index_id;
3398 		table_id_t	table_id;
3399 
3400 		info = (graph != 0) ? graph->info : pars_info_create();
3401 
3402 		mach_write_to_4(
3403 			reinterpret_cast<byte*>(&type),
3404 			index->type);
3405 
3406 		mach_write_to_4(
3407 			reinterpret_cast<byte*>(&page),
3408 			reset ? FIL_NULL : index->page);
3409 
3410 		mach_write_to_4(
3411 			reinterpret_cast<byte*>(&space),
3412 			reset ? FIL_NULL : index->space);
3413 
3414 		mach_write_to_8(
3415 			reinterpret_cast<byte*>(&index_id),
3416 			index->id);
3417 
3418 		mach_write_to_8(
3419 			reinterpret_cast<byte*>(&table_id),
3420 			table->id);
3421 
3422 		/* If we set the corrupt bit during the IMPORT phase then
3423 		we need to update the system tables. */
3424 		pars_info_bind_int4_literal(info, "type", &type);
3425 		pars_info_bind_int4_literal(info, "space", &space);
3426 		pars_info_bind_int4_literal(info, "page", &page);
3427 		pars_info_bind_ull_literal(info, "index_id", &index_id);
3428 		pars_info_bind_ull_literal(info, "table_id", &table_id);
3429 
3430 		if (graph == 0) {
3431 			graph = pars_sql(info, sql);
3432 			ut_a(graph);
3433 			graph->trx = trx;
3434 		}
3435 
3436 		que_thr_t*	thr;
3437 
3438 		graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3439 
3440 		ut_a(thr = que_fork_start_command(graph));
3441 
3442 		que_run_threads(thr);
3443 
3444 		DBUG_EXECUTE_IF("ib_import_internal_error",
3445 				trx->error_state = DB_ERROR;);
3446 
3447 		err = trx->error_state;
3448 
3449 		if (err != DB_SUCCESS) {
3450 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3451 				ER_INTERNAL_ERROR,
3452 				"While updating the <space, root page"
3453 				" number> of index %s - %s",
3454 				index->name(), ut_strerr(err));
3455 
3456 			break;
3457 		}
3458 	}
3459 
3460 	que_graph_free(graph);
3461 
3462 	if (!dict_locked) {
3463 		mutex_exit(&dict_sys->mutex);
3464 	}
3465 
3466 	return(err);
3467 }
3468 
3469 /** Callback arg for row_import_set_discarded. */
3470 struct discard_t {
3471 	ib_uint32_t	flags2;			/*!< Value read from column */
3472 	bool		state;			/*!< New state of the flag */
3473 	ulint		n_recs;			/*!< Number of recs processed */
3474 };
3475 
3476 /******************************************************************//**
3477 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3478 SYS_TABLES. The flags is stored in MIX_LEN column.
3479 @return FALSE if all OK */
3480 static
3481 ibool
row_import_set_discarded(void * row,void * user_arg)3482 row_import_set_discarded(
3483 /*=====================*/
3484 	void*		row,			/*!< in: sel_node_t* */
3485 	void*		user_arg)		/*!< in: bool set/unset flag */
3486 {
3487 	sel_node_t*	node = static_cast<sel_node_t*>(row);
3488 	discard_t*	discard = static_cast<discard_t*>(user_arg);
3489 	dfield_t*	dfield = que_node_get_val(node->select_list);
3490 	dtype_t*	type = dfield_get_type(dfield);
3491 	ulint		len = dfield_get_len(dfield);
3492 
3493 	ut_a(dtype_get_mtype(type) == DATA_INT);
3494 	ut_a(len == sizeof(ib_uint32_t));
3495 
3496 	ulint	flags2 = mach_read_from_4(
3497 		static_cast<byte*>(dfield_get_data(dfield)));
3498 
3499 	if (discard->state) {
3500 		flags2 |= DICT_TF2_DISCARDED;
3501 	} else {
3502 		flags2 &= ~DICT_TF2_DISCARDED;
3503 	}
3504 
3505 	mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3506 
3507 	++discard->n_recs;
3508 
3509 	/* There should be at most one matching record. */
3510 	ut_a(discard->n_recs == 1);
3511 
3512 	return(FALSE);
3513 }
3514 
3515 /*****************************************************************//**
3516 Update the DICT_TF2_DISCARDED flag in SYS_TABLES.
3517 @return DB_SUCCESS or error code. */
3518 dberr_t
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded,bool dict_locked)3519 row_import_update_discarded_flag(
3520 /*=============================*/
3521 	trx_t*		trx,		/*!< in/out: transaction that
3522 					covers the update */
3523 	table_id_t	table_id,	/*!< in: Table for which we want
3524 					to set the root table->flags2 */
3525 	bool		discarded,	/*!< in: set MIX_LEN column bit
3526 					to discarded, if true */
3527 	bool		dict_locked)	/*!< in: set to true if the
3528 					caller already owns the
3529 					dict_sys_t:: mutex. */
3530 
3531 {
3532 	pars_info_t*		info;
3533 	discard_t		discard;
3534 
3535 	static const char	sql[] =
3536 		"PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3537 		"DECLARE FUNCTION my_func;\n"
3538 		"DECLARE CURSOR c IS\n"
3539 		" SELECT MIX_LEN"
3540 		" FROM SYS_TABLES"
3541 		" WHERE ID = :table_id FOR UPDATE;"
3542 		"\n"
3543 		"BEGIN\n"
3544 		"OPEN c;\n"
3545 		"WHILE 1 = 1 LOOP\n"
3546 		"  FETCH c INTO my_func();\n"
3547 		"  IF c % NOTFOUND THEN\n"
3548 		"    EXIT;\n"
3549 		"  END IF;\n"
3550 		"END LOOP;\n"
3551 		"UPDATE SYS_TABLES"
3552 		" SET MIX_LEN = :flags2"
3553 		" WHERE ID = :table_id;\n"
3554 		"CLOSE c;\n"
3555 		"END;\n";
3556 
3557 	discard.n_recs = 0;
3558 	discard.state = discarded;
3559 	discard.flags2 = ULINT32_UNDEFINED;
3560 
3561 	info = pars_info_create();
3562 
3563 	pars_info_add_ull_literal(info, "table_id", table_id);
3564 	pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3565 
3566 	pars_info_bind_function(
3567 		info, "my_func", row_import_set_discarded, &discard);
3568 
3569 	dberr_t	err = que_eval_sql(info, sql, !dict_locked, trx);
3570 
3571 	ut_a(discard.n_recs == 1);
3572 	ut_a(discard.flags2 != ULINT32_UNDEFINED);
3573 
3574 	return(err);
3575 }
3576 
3577 /*****************************************************************//**
3578 Imports a tablespace. The space id in the .ibd file must match the space id
3579 of the table in the data dictionary.
3580 @return error code or DB_SUCCESS */
3581 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)3582 row_import_for_mysql(
3583 /*=================*/
3584 	dict_table_t*	table,		/*!< in/out: table */
3585 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL */
3586 {
3587 	dberr_t		err;
3588 	trx_t*		trx;
3589 	ib_uint64_t	autoinc = 0;
3590 	char*		filepath = NULL;
3591 
3592 	/* The caller assured that this is not read_only_mode and that no
3593 	temorary tablespace is being imported. */
3594 	ut_ad(!srv_read_only_mode);
3595 	ut_ad(!dict_table_is_temporary(table));
3596 
3597 	ut_a(table->space);
3598 	ut_ad(prebuilt->trx);
3599 	ut_a(table->ibd_file_missing);
3600 
3601 	ibuf_delete_for_discarded_space(table->space);
3602 
3603 	trx_start_if_not_started(prebuilt->trx, true);
3604 
3605 	trx = trx_allocate_for_mysql();
3606 
3607 	/* So that the table is not DROPped during recovery. */
3608 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3609 
3610 	trx_start_if_not_started(trx, true);
3611 
3612 	/* So that we can send error messages to the user. */
3613 	trx->mysql_thd = prebuilt->trx->mysql_thd;
3614 
3615 	/* Ensure that the table will be dropped by trx_rollback_active()
3616 	in case of a crash. */
3617 
3618 	trx->table_id = table->id;
3619 
3620 	/* Assign an undo segment for the transaction, so that the
3621 	transaction will be recovered after a crash. */
3622 
3623 	mutex_enter(&trx->undo_mutex);
3624 
3625 	/* IMPORT tablespace is blocked for temp-tables and so we don't
3626 	need to assign temporary rollback segment for this trx. */
3627 	err = trx_undo_assign_undo(trx, &trx->rsegs.m_redo, TRX_UNDO_UPDATE);
3628 
3629 	mutex_exit(&trx->undo_mutex);
3630 
3631 	DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
3632 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
3633 
3634 	if (err != DB_SUCCESS) {
3635 
3636 		return(row_import_cleanup(prebuilt, trx, err));
3637 
3638 	} else if (trx->rsegs.m_redo.update_undo == 0) {
3639 
3640 		err = DB_TOO_MANY_CONCURRENT_TRXS;
3641 		return(row_import_cleanup(prebuilt, trx, err));
3642 	}
3643 
3644 	prebuilt->trx->op_info = "read meta-data file";
3645 
3646 	/* Prevent DDL operations while we are checking. */
3647 
3648 	rw_lock_s_lock_func(dict_operation_lock, 0, __FILE__, __LINE__);
3649 
3650 	row_import	cfg;
3651 	ulint		space_flags = 0;
3652 
3653 	/* Read CFP file */
3654 	if (dict_table_is_encrypted(table)) {
3655 		/* First try to read CFP file here. */
3656 		err = row_import_read_cfp(table, trx->mysql_thd, cfg);
3657 		ut_ad(cfg.m_cfp_missing || err == DB_SUCCESS);
3658 
3659 		if (err != DB_SUCCESS) {
3660 			rw_lock_s_unlock_gen(dict_operation_lock, 0);
3661 			return (row_import_error(prebuilt, trx, err));
3662 		}
3663 
3664 		/* If table is encrypted, but can't find cfp file,
3665                 return error. */
3666 		if (cfg.m_cfp_missing) {
3667 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3668 				ER_TABLE_SCHEMA_MISMATCH,
3669 				"Table is in an encrypted tablespace, but the"
3670 				" encryption meta-data file cannot be found"
3671 				" while importing.");
3672 			err = DB_ERROR;
3673 			rw_lock_s_unlock_gen(dict_operation_lock, 0);
3674 			return (row_import_error(prebuilt, trx, err));
3675 		} else {
3676 			/* If CFP file is read, encryption_key must have been
3677 			populted. */
3678 			ut_ad(table->encryption_key != NULL &&
3679 				table->encryption_iv != NULL);
3680 		}
3681 	}
3682 
3683 	/* Read CFG file */
3684 	err = row_import_read_cfg(table, trx->mysql_thd, cfg);
3685 
3686 	/* Check if the table column definitions match the contents
3687 	of the config file. */
3688 
3689 	if (err == DB_SUCCESS) {
3690 
3691 		/* We have a schema file, try and match it with our
3692 		data dictionary. */
3693 
3694 		err = cfg.match_schema(trx->mysql_thd);
3695 
3696 		/* Update index->page and SYS_INDEXES.PAGE_NO to match the
3697 		B-tree root page numbers in the tablespace. Use the index
3698 		name from the .cfg file to find match. */
3699 
3700 		if (err == DB_SUCCESS) {
3701 			cfg.set_root_by_name();
3702 			autoinc = cfg.m_autoinc;
3703 		}
3704 
3705 		rw_lock_s_unlock_gen(dict_operation_lock, 0);
3706 
3707 		DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
3708 				err = DB_TOO_MANY_CONCURRENT_TRXS;);
3709 
3710 	} else if (cfg.m_missing) {
3711 
3712 		rw_lock_s_unlock_gen(dict_operation_lock, 0);
3713 
3714 		/* We don't have a schema file, we will have to discover
3715 		the index root pages from the .ibd file and skip the schema
3716 		matching step. */
3717 
3718 		ut_a(err == DB_FAIL);
3719 
3720 		cfg.m_page_size.copy_from(univ_page_size);
3721 
3722 		FetchIndexRootPages	fetchIndexRootPages(table, trx);
3723 
3724 		err = fil_tablespace_iterate(
3725 			table, IO_BUFFER_SIZE(
3726 				cfg.m_page_size.physical(),
3727 				cfg.m_page_size.physical()),
3728 			fetchIndexRootPages);
3729 
3730 		if (err == DB_SUCCESS) {
3731 
3732 			err = fetchIndexRootPages.build_row_import(&cfg);
3733 
3734 			/* Update index->page and SYS_INDEXES.PAGE_NO
3735 			to match the B-tree root page numbers in the
3736 			tablespace. */
3737 
3738 			if (err == DB_SUCCESS) {
3739 				err = cfg.set_root_by_heuristic();
3740 			}
3741 		}
3742 
3743 		space_flags = fetchIndexRootPages.get_space_flags();
3744 
3745 		/* If the fsp flag is set for data_dir, but table flag is not
3746 		set for data_dir or vice versa then return error. */
3747 		if (err == DB_SUCCESS
3748 			&& FSP_FLAGS_HAS_DATA_DIR(space_flags) !=
3749 			DICT_TF_HAS_DATA_DIR(table->flags)) {
3750 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3751 				ER_TABLE_SCHEMA_MISMATCH,
3752 				"Table location flags do not match. "
3753 				"The source table %s a DATA DIRECTORY "
3754 				"but the destination table %s.",
3755 				(FSP_FLAGS_HAS_DATA_DIR(space_flags) ? "uses"
3756 				: "does not use"),
3757 				(DICT_TF_HAS_DATA_DIR(table->flags) ? "does"
3758 				: "does not"));
3759 			err = DB_ERROR;
3760 			return(row_import_error(prebuilt, trx, err));
3761 		}
3762 
3763 	} else {
3764 		rw_lock_s_unlock_gen(dict_operation_lock, 0);
3765 	}
3766 
3767 	if (err != DB_SUCCESS) {
3768 		if (err == DB_IO_NO_ENCRYPT_TABLESPACE) {
3769 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3770 				ER_TABLE_SCHEMA_MISMATCH,
3771 				"Encryption attribute in the file does not"
3772 				" match the dictionary.");
3773 		}
3774 		return(row_import_error(prebuilt, trx, err));
3775 	}
3776 
3777 	/* At this point, all required information has been collected for
3778 	IMPORT. */
3779 
3780 	prebuilt->trx->op_info = "importing tablespace";
3781 
3782 	ib::info() << "Phase I - Update all pages";
3783 
3784 	/* Iterate over all the pages and do the sanity checking and
3785 	the conversion required to import the tablespace. */
3786 
3787 	PageConverter	converter(&cfg, trx);
3788 
3789 	/* Set the IO buffer size in pages. */
3790 
3791 	err = fil_tablespace_iterate(
3792 		table, IO_BUFFER_SIZE(
3793 			cfg.m_page_size.physical(),
3794 			cfg.m_page_size.physical()), converter);
3795 
3796 	DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
3797 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
3798 
3799 	if (err == DB_IO_NO_ENCRYPT_TABLESPACE) {
3800 		ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3801 			ER_TABLE_SCHEMA_MISMATCH,
3802 			"Encryption attribute in the file does not match the"
3803 			" dictionary.");
3804 
3805 		return(row_import_cleanup(prebuilt, trx, err));
3806 	}
3807 
3808 	if (err != DB_SUCCESS) {
3809 		char	table_name[MAX_FULL_NAME_LEN + 1];
3810 
3811 		innobase_format_name(
3812 			table_name, sizeof(table_name),
3813 			table->name.m_name);
3814 
3815 		ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3816 			ER_INTERNAL_ERROR,
3817 			"Cannot reset LSNs in table %s : %s",
3818 			table_name, ut_strerr(err));
3819 
3820 		return(row_import_cleanup(prebuilt, trx, err));
3821 	}
3822 
3823 	row_mysql_lock_data_dictionary(trx);
3824 
3825 	/* If the table is stored in a remote tablespace, we need to
3826 	determine that filepath from the link file and system tables.
3827 	Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
3828 	dict_get_and_save_data_dir_path(table, true);
3829 
3830 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3831 		ut_a(table->data_dir_path);
3832 
3833 		filepath = fil_make_filepath(
3834 			table->data_dir_path, table->name.m_name, IBD, true);
3835 	} else {
3836 		filepath = fil_make_filepath(
3837 			NULL, table->name.m_name, IBD, false);
3838 	}
3839 
3840 	DBUG_EXECUTE_IF(
3841 		"ib_import_OOM_15",
3842 		ut_free(filepath);
3843 		filepath = NULL;
3844 	);
3845 
3846 	if (filepath == NULL) {
3847 		row_mysql_unlock_data_dictionary(trx);
3848 		return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
3849 	}
3850 
3851 	/* Open the tablespace so that we can access via the buffer pool.
3852 	We set the 2nd param (fix_dict = true) here because we already
3853 	have an x-lock on dict_operation_lock and dict_sys->mutex.
3854 	The tablespace is initially opened as a temporary one, because
3855 	we will not be writing any redo log for it before we have invoked
3856 	fil_space_set_imported() to declare it a persistent tablespace. */
3857 
3858 	ulint	fsp_flags = dict_tf_to_fsp_flags(table->flags, false);
3859 	if (table->encryption_key != NULL) {
3860 		fsp_flags |= FSP_FLAGS_MASK_ENCRYPTION;
3861 	}
3862 
3863 	err = fil_ibd_open(
3864 		true, true, FIL_TYPE_IMPORT, table->space,
3865 		fsp_flags, table->name.m_name, filepath);
3866 
3867 	DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
3868 			err = DB_TABLESPACE_NOT_FOUND;);
3869 
3870 	if (err != DB_SUCCESS) {
3871 		row_mysql_unlock_data_dictionary(trx);
3872 
3873 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3874 			ER_FILE_NOT_FOUND,
3875 			filepath, err, ut_strerr(err));
3876 
3877 		ut_free(filepath);
3878 
3879 		return(row_import_cleanup(prebuilt, trx, err));
3880 	}
3881 
3882 	/* For encrypted table, set encryption information. */
3883 	if (dict_table_is_encrypted(table)) {
3884 
3885 		err = fil_set_encryption(table->space,
3886 					 Encryption::AES,
3887 					 table->encryption_key,
3888 					 table->encryption_iv);
3889 	}
3890 
3891 	row_mysql_unlock_data_dictionary(trx);
3892 
3893 	ut_free(filepath);
3894 
3895 	err = ibuf_check_bitmap_on_import(trx, table->space);
3896 
3897 	DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
3898 
3899 	if (err != DB_SUCCESS) {
3900 		return(row_import_cleanup(prebuilt, trx, err));
3901 	}
3902 
3903 	/* The first index must always be the clustered index. */
3904 
3905 	dict_index_t*	index = dict_table_get_first_index(table);
3906 
3907 	if (!dict_index_is_clust(index)) {
3908 		return(row_import_error(prebuilt, trx, DB_CORRUPTION));
3909 	}
3910 
3911 	/* Update the Btree segment headers for index node and
3912 	leaf nodes in the root page. Set the new space id. */
3913 
3914 	err = btr_root_adjust_on_import(index);
3915 
3916 	DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
3917 			err = DB_CORRUPTION;);
3918 
3919 	if (err != DB_SUCCESS) {
3920 		return(row_import_error(prebuilt, trx, err));
3921 	}
3922 	DBUG_EXECUTE_IF("ib_import_page_corrupt",
3923 			row_index_t* i_index = cfg.get_index(index->name);
3924 			++i_index->m_stats.m_n_purge_failed;);
3925 	if (err != DB_SUCCESS) {
3926 		return(row_import_error(prebuilt, trx, err));
3927 	} else if (cfg.requires_purge(index->name)) {
3928 
3929 		/* Purge any delete-marked records that couldn't be
3930 		purged during the page conversion phase from the
3931 		cluster index. */
3932 
3933 		IndexPurge	purge(trx, index);
3934 
3935 		trx->op_info = "cluster: purging delete marked records";
3936 
3937 		err = purge.garbage_collect();
3938 
3939 		trx->op_info = "";
3940 	}
3941 
3942 	DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
3943 
3944 	if (err != DB_SUCCESS) {
3945 		return(row_import_error(prebuilt, trx, err));
3946 	}
3947 
3948 	/* For secondary indexes, purge any records that couldn't be purged
3949 	during the page conversion phase. */
3950 
3951 	err = row_import_adjust_root_pages_of_secondary_indexes(
3952 		prebuilt, trx, table, cfg);
3953 
3954 	DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
3955 			err = DB_CORRUPTION;);
3956 
3957 	if (err != DB_SUCCESS) {
3958 		return(row_import_error(prebuilt, trx, err));
3959 	}
3960 
3961 	/* Ensure that the next available DB_ROW_ID is not smaller than
3962 	any DB_ROW_ID stored in the table. */
3963 
3964 	if (prebuilt->clust_index_was_generated) {
3965 
3966 		err = row_import_set_sys_max_row_id(prebuilt, table);
3967 
3968 		if (err != DB_SUCCESS) {
3969 			return(row_import_error(prebuilt, trx, err));
3970 		}
3971 	}
3972 
3973 	ib::info() << "Phase III - Flush changes to disk";
3974 
3975 	/* Ensure that all pages dirtied during the IMPORT make it to disk.
3976 	The only dirty pages generated should be from the pessimistic purge
3977 	of delete marked records that couldn't be purged in Phase I. */
3978 
3979 	buf_LRU_flush_or_remove_pages(
3980 		prebuilt->table->space, BUF_REMOVE_FLUSH_WRITE,	trx);
3981 
3982 	if (trx_is_interrupted(trx)) {
3983 		ib::info() << "Phase III - Flush interrupted";
3984 		return(row_import_error(prebuilt, trx, DB_INTERRUPTED));
3985 	}
3986 
3987 	ib::info() << "Phase IV - Flush complete";
3988 	fil_space_set_imported(prebuilt->table->space);
3989 
3990 	if (dict_table_is_encrypted(table)) {
3991 		fil_space_t*	space;
3992 		mtr_t		mtr;
3993 		byte		encrypt_info[ENCRYPTION_INFO_SIZE_V2];
3994 
3995 		mtr_start(&mtr);
3996 
3997 		mtr.set_named_space(table->space);
3998 		space = mtr_x_lock_space(table->space, &mtr);
3999 
4000 		memset(encrypt_info, 0, ENCRYPTION_INFO_SIZE_V2);
4001 
4002 		if (!fsp_header_rotate_encryption(space,
4003 						  encrypt_info,
4004 						  &mtr)) {
4005 			mtr_commit(&mtr);
4006 			return(row_import_cleanup(prebuilt, trx, DB_ERROR));
4007 		}
4008 
4009 		mtr_commit(&mtr);
4010 	}
4011 
4012 	/* The dictionary latches will be released in in row_import_cleanup()
4013 	after the transaction commit, for both success and error. */
4014 
4015 	row_mysql_lock_data_dictionary(trx);
4016 
4017 	/* Update the root pages of the table's indexes. */
4018 	err = row_import_update_index_root(trx, table, false, true);
4019 
4020 	if (err != DB_SUCCESS) {
4021 		return(row_import_error(prebuilt, trx, err));
4022 	}
4023 
4024 	/* Update the table's discarded flag, unset it. */
4025 	err = row_import_update_discarded_flag(trx, table->id, false, true);
4026 
4027 	if (err != DB_SUCCESS) {
4028 		return(row_import_error(prebuilt, trx, err));
4029 	}
4030 
4031 	table->ibd_file_missing = false;
4032 	table->flags2 &= ~DICT_TF2_DISCARDED;
4033 
4034 	/* Set autoinc value read from cfg file. The value is set to zero
4035 	if the cfg file is missing and is initialized later from table
4036 	column value. */
4037 	ib::info() << table->name << " autoinc value set to "
4038 		<< autoinc;
4039 
4040 	dict_table_autoinc_lock(table);
4041 	dict_table_autoinc_initialize(table, autoinc);
4042 	dict_table_autoinc_unlock(table);
4043 
4044 	ut_a(err == DB_SUCCESS);
4045 
4046 	return(row_import_cleanup(prebuilt, trx, err));
4047 }
4048 
4049