1 /*****************************************************************************
2 
3 Copyright (c) 2012, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0import.cc
29 Import a tablespace to a running instance.
30 
31 Created 2012-02-08 by Sunny Bains.
32 *******************************************************/
33 
34 #include "row0import.h"
35 
36 #ifdef UNIV_NONINL
37 #include "row0import.ic"
38 #endif
39 
40 #include "btr0pcur.h"
41 #include "que0que.h"
42 #include "dict0boot.h"
43 #include "ibuf0ibuf.h"
44 #include "pars0pars.h"
45 #include "row0upd.h"
46 #include "row0sel.h"
47 #include "row0mysql.h"
48 #include "srv0start.h"
49 #include "row0quiesce.h"
50 
51 #include <vector>
52 
53 /** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
54 reads to fail. If you set the buffer size to be greater than a multiple of the
55 file size then it will assert. TODO: Fix this limitation of the IO functions.
56 @param n - page size of the tablespace.
57 @retval number of pages */
58 #define IO_BUFFER_SIZE(n)	((1024 * 1024) / n)
59 
60 /** For gathering stats on records during phase I */
61 struct row_stats_t {
62 	ulint		m_n_deleted;		/*!< Number of deleted records
63 						found in the index */
64 
65 	ulint		m_n_purged;		/*!< Number of records purged
66 						optimisatically */
67 
68 	ulint		m_n_rows;		/*!< Number of rows */
69 
70 	ulint		m_n_purge_failed;	/*!< Number of deleted rows
71 						that could not be purged */
72 };
73 
74 /** Index information required by IMPORT. */
75 struct row_index_t {
76 	index_id_t	m_id;			/*!< Index id of the table
77 						in the exporting server */
78 	byte*		m_name;			/*!< Index name */
79 
80 	ulint		m_space;		/*!< Space where it is placed */
81 
82 	ulint		m_page_no;		/*!< Root page number */
83 
84 	ulint		m_type;			/*!< Index type */
85 
86 	ulint		m_trx_id_offset;	/*!< Relevant only for clustered
87 						indexes, offset of transaction
88 						id system column */
89 
90 	ulint		m_n_user_defined_cols;	/*!< User defined columns */
91 
92 	ulint		m_n_uniq;		/*!< Number of columns that can
93 						uniquely identify the row */
94 
95 	ulint		m_n_nullable;		/*!< Number of nullable
96 						columns */
97 
98 	ulint		m_n_fields;		/*!< Total number of fields */
99 
100 	dict_field_t*	m_fields;		/*!< Index fields */
101 
102 	const dict_index_t*
103 			m_srv_index;		/*!< Index instance in the
104 						importing server */
105 
106 	row_stats_t	m_stats;		/*!< Statistics gathered during
107 						the import phase */
108 
109 };
110 
111 /** Meta data required by IMPORT. */
112 struct row_import {
row_importrow_import113 	row_import() UNIV_NOTHROW
114 		:
115 		m_table(),
116 		m_version(),
117 		m_hostname(),
118 		m_table_name(),
119 		m_autoinc(),
120 		m_page_size(),
121 		m_flags(),
122 		m_n_cols(),
123 		m_cols(),
124 		m_col_names(),
125 		m_n_indexes(),
126 		m_indexes(),
127 		m_missing(true) { }
128 
129 	~row_import() UNIV_NOTHROW;
130 
131 	/**
132 	Find the index entry in in the indexes array.
133 	@param name - index name
134 	@return instance if found else 0. */
135 	row_index_t* get_index(const char* name) const UNIV_NOTHROW;
136 
137 	/**
138 	Get the number of rows in the index.
139 	@param name - index name
140 	@return number of rows (doesn't include delete marked rows). */
141 	ulint	get_n_rows(const char* name) const UNIV_NOTHROW;
142 
143 	/**
144 	Find the ordinal value of the column name in the cfg table columns.
145 	@param name - of column to look for.
146 	@return ULINT_UNDEFINED if not found. */
147 	ulint find_col(const char* name) const UNIV_NOTHROW;
148 
149 	/**
150 	Get the number of rows for which purge failed during the convert phase.
151 	@param name - index name
152 	@return number of rows for which purge failed. */
153 	ulint	get_n_purge_failed(const char* name) const UNIV_NOTHROW;
154 
155 	/**
156 	Check if the index is clean. ie. no delete-marked records
157 	@param name - index name
158 	@return true if index needs to be purged. */
requires_purgerow_import159 	bool requires_purge(const char* name) const UNIV_NOTHROW
160 	{
161 		return(get_n_purge_failed(name) > 0);
162 	}
163 
164 	/**
165 	Set the index root <space, pageno> using the index name */
166 	void set_root_by_name() UNIV_NOTHROW;
167 
168 	/**
169 	Set the index root <space, pageno> using a heuristic
170 	@return DB_SUCCESS or error code */
171 	dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172 
173 	/** Check if the index schema that was read from the .cfg file
174 	matches the in memory index definition.
175 	Note: It will update row_import_t::m_srv_index to map the meta-data
176 	read from the .cfg file to the server index instance.
177 	@return DB_SUCCESS or error code. */
178 	dberr_t match_index_columns(
179 		THD*			thd,
180 		const dict_index_t*	index) UNIV_NOTHROW;
181 
182 	/**
183 	Check if the table schema that was read from the .cfg file matches the
184 	in memory table definition.
185 	@param thd - MySQL session variable
186 	@return DB_SUCCESS or error code. */
187 	dberr_t match_table_columns(
188 		THD*			thd) UNIV_NOTHROW;
189 
190 	/**
191 	Check if the table (and index) schema that was read from the .cfg file
192 	matches the in memory table definition.
193 	@param thd - MySQL session variable
194 	@return DB_SUCCESS or error code. */
195 	dberr_t match_schema(
196 		THD*			thd) UNIV_NOTHROW;
197 
198 	dict_table_t*	m_table;		/*!< Table instance */
199 
200 	ulint		m_version;		/*!< Version of config file */
201 
202 	byte*		m_hostname;		/*!< Hostname where the
203 						tablespace was exported */
204 	byte*		m_table_name;		/*!< Exporting instance table
205 						name */
206 
207 	ib_uint64_t	m_autoinc;		/*!< Next autoinc value */
208 
209 	ulint		m_page_size;		/*!< Tablespace page size */
210 
211 	ulint		m_flags;		/*!< Table flags */
212 
213 	ulint		m_n_cols;		/*!< Number of columns in the
214 						meta-data file */
215 
216 	dict_col_t*	m_cols;			/*!< Column data */
217 
218 	byte**		m_col_names;		/*!< Column names, we store the
219 						column naems separately becuase
220 						there is no field to store the
221 						value in dict_col_t */
222 
223 	ulint		m_n_indexes;		/*!< Number of indexes,
224 						including clustered index */
225 
226 	row_index_t*	m_indexes;		/*!< Index meta data */
227 
228 	bool		m_missing;		/*!< true if a .cfg file was
229 						found and was readable */
230 };
231 
232 /** Use the page cursor to iterate over records in a block. */
233 class RecIterator {
234 public:
235 	/**
236 	Default constructor */
RecIterator()237 	RecIterator() UNIV_NOTHROW
238 	{
239 		memset(&m_cur, 0x0, sizeof(m_cur));
240 	}
241 
242 	/**
243 	Position the cursor on the first user record. */
open(buf_block_t * block)244 	void	open(buf_block_t* block) UNIV_NOTHROW
245 	{
246 		page_cur_set_before_first(block, &m_cur);
247 
248 		if (!end()) {
249 			next();
250 		}
251 	}
252 
253 	/**
254 	Move to the next record. */
next()255 	void	next() UNIV_NOTHROW
256 	{
257 		page_cur_move_to_next(&m_cur);
258 	}
259 
260 	/**
261 	@return the current record */
current()262 	rec_t*	current() UNIV_NOTHROW
263 	{
264 		ut_ad(!end());
265 		return(page_cur_get_rec(&m_cur));
266 	}
267 
268 	/**
269 	@return true if cursor is at the end */
end()270 	bool	end() UNIV_NOTHROW
271 	{
272 		return(page_cur_is_after_last(&m_cur) == TRUE);
273 	}
274 
275 	/** Remove the current record
276 	@return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,ulint * offsets)277 	bool remove(
278 		const dict_index_t*	index,
279 		page_zip_des_t*		page_zip,
280 		ulint*			offsets) UNIV_NOTHROW
281 	{
282 		/* We can't end up with an empty page unless it is root. */
283 		if (page_get_n_recs(m_cur.block->frame) <= 1) {
284 			return(false);
285 		}
286 
287 		return(page_delete_rec(index, &m_cur, page_zip, offsets));
288 	}
289 
290 private:
291 	page_cur_t	m_cur;
292 };
293 
294 /** Class that purges delete marked reocords from indexes, both secondary
295 and cluster. It does a pessimistic delete. This should only be done if we
296 couldn't purge the delete marked reocrds during Phase I. */
297 class IndexPurge {
298 public:
299 	/** Constructor
300 	@param trx - the user transaction covering the import tablespace
301 	@param index - to be imported
302 	@param space_id - space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)303 	IndexPurge(
304 		trx_t*		trx,
305 		dict_index_t*	index) UNIV_NOTHROW
306 		:
307 		m_trx(trx),
308 		m_index(index),
309 		m_n_rows(0)
310 	{
311 		ib_logf(IB_LOG_LEVEL_INFO,
312 			"Phase II - Purge records from index %s",
313 			index->name);
314 	}
315 
316 	/** Descructor */
~IndexPurge()317 	~IndexPurge() UNIV_NOTHROW { }
318 
319 	/** Purge delete marked records.
320 	@return DB_SUCCESS or error code. */
321 	dberr_t	garbage_collect() UNIV_NOTHROW;
322 
323 	/** The number of records that are not delete marked.
324 	@return total records in the index after purge */
get_n_rows() const325 	ulint	get_n_rows() const UNIV_NOTHROW
326 	{
327 		return(m_n_rows);
328 	}
329 
330 private:
331 	/**
332 	Begin import, position the cursor on the first record. */
333 	void	open() UNIV_NOTHROW;
334 
335 	/**
336 	Close the persistent curosr and commit the mini-transaction. */
337 	void	close() UNIV_NOTHROW;
338 
339 	/**
340 	Position the cursor on the next record.
341 	@return DB_SUCCESS or error code */
342 	dberr_t	next() UNIV_NOTHROW;
343 
344 	/**
345 	Store the persistent cursor position and reopen the
346 	B-tree cursor in BTR_MODIFY_TREE mode, because the
347 	tree structure may be changed during a pessimistic delete. */
348 	void	purge_pessimistic_delete() UNIV_NOTHROW;
349 
350 	/**
351 	Purge delete-marked records.
352 	@param offsets - current row offsets. */
353 	void	purge() UNIV_NOTHROW;
354 
355 protected:
356 	// Disable copying
357 	IndexPurge();
358 	IndexPurge(const IndexPurge&);
359 	IndexPurge &operator=(const IndexPurge&);
360 
361 private:
362 	trx_t*			m_trx;		/*!< User transaction */
363 	mtr_t			m_mtr;		/*!< Mini-transaction */
364 	btr_pcur_t		m_pcur;		/*!< Persistent cursor */
365 	dict_index_t*		m_index;	/*!< Index to be processed */
366 	ulint			m_n_rows;	/*!< Records in index */
367 };
368 
369 /** Functor that is called for each physical page that is read from the
370 tablespace file.  */
371 class AbstractCallback : public PageCallback {
372 public:
373 	/** Constructor
374 	@param trx - covering transaction */
AbstractCallback(trx_t * trx)375 	AbstractCallback(trx_t* trx)
376 		:
377 		m_trx(trx),
378 		m_space(ULINT_UNDEFINED),
379 		m_xdes(),
380 		m_xdes_page_no(ULINT_UNDEFINED),
381 		m_space_flags(ULINT_UNDEFINED),
382 		m_table_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
383 
384 	/**
385 	Free any extent descriptor instance */
~AbstractCallback()386 	virtual ~AbstractCallback()
387 	{
388 		delete [] m_xdes;
389 	}
390 
391 	/** Determine the page size to use for traversing the tablespace
392 	@param file_size - size of the tablespace file in bytes
393 	@param block - contents of the first page in the tablespace file.
394 	@retval DB_SUCCESS or error code. */
395 	virtual dberr_t init(
396 		os_offset_t		file_size,
397 		const buf_block_t*	block) UNIV_NOTHROW;
398 
399 	/** @return true if compressed table. */
is_compressed_table() const400 	bool is_compressed_table() const UNIV_NOTHROW
401 	{
402 		return(get_zip_size() > 0);
403 	}
404 
405 protected:
406 	/**
407 	Get the data page depending on the table type, compressed or not.
408 	@param block - block read from disk
409 	@retval the buffer frame */
get_frame(buf_block_t * block) const410 	buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
411 	{
412 		if (is_compressed_table()) {
413 			return(block->page.zip.data);
414 		}
415 
416 		return(buf_block_get_frame(block));
417 	}
418 
419 	/** Check for session interrupt. If required we could
420 	even flush to disk here every N pages.
421 	@retval DB_SUCCESS or error code */
periodic_check()422 	dberr_t periodic_check() UNIV_NOTHROW
423 	{
424 		if (trx_is_interrupted(m_trx)) {
425 			return(DB_INTERRUPTED);
426 		}
427 
428 		return(DB_SUCCESS);
429 	}
430 
431 	/**
432 	Get the physical offset of the extent descriptor within the page.
433 	@param page_no - page number of the extent descriptor
434 	@param page - contents of the page containing the extent descriptor.
435 	@return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const436 	const xdes_t* xdes(
437 		ulint		page_no,
438 		const page_t*	page) const UNIV_NOTHROW
439 	{
440 		ulint	offset;
441 
442 		offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
443 
444 		return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
445 	}
446 
447 	/**
448 	Set the current page directory (xdes). If the extent descriptor is
449 	marked as free then free the current extent descriptor and set it to
450 	0. This implies that all pages that are covered by this extent
451 	descriptor are also freed.
452 
453 	@param page_no - offset of page within the file
454 	@param page - page contents
455 	@return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)456 	dberr_t	set_current_xdes(
457 		ulint		page_no,
458 		const page_t*	page) UNIV_NOTHROW
459 	{
460 		m_xdes_page_no = page_no;
461 
462 		delete[] m_xdes;
463 
464 		m_xdes = 0;
465 
466 		ulint		state;
467 		const xdes_t*	xdesc = page + XDES_ARR_OFFSET;
468 
469 		state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
470 
471 		if (state != XDES_FREE) {
472 
473 			m_xdes = new(std::nothrow) xdes_t[m_page_size];
474 
475 			/* Trigger OOM */
476 			DBUG_EXECUTE_IF("ib_import_OOM_13",
477 					delete [] m_xdes; m_xdes = 0;);
478 
479 			if (m_xdes == 0) {
480 				return(DB_OUT_OF_MEMORY);
481 			}
482 
483 			memcpy(m_xdes, page, m_page_size);
484 		}
485 
486 		return(DB_SUCCESS);
487 	}
488 
489 	/**
490 	@return true if it is a root page */
is_root_page(const page_t * page) const491 	bool is_root_page(const page_t* page) const UNIV_NOTHROW
492 	{
493 		ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
494 
495 		return(mach_read_from_4(page + FIL_PAGE_NEXT) == FIL_NULL
496 		       && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL);
497 	}
498 
499 	/**
500 	Check if the page is marked as free in the extent descriptor.
501 	@param page_no - page number to check in the extent descriptor.
502 	@return true if the page is marked as free */
is_free(ulint page_no) const503 	bool is_free(ulint page_no) const UNIV_NOTHROW
504 	{
505 		ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
506 		     == m_xdes_page_no);
507 
508 		if (m_xdes != 0) {
509 			const xdes_t*	xdesc = xdes(page_no, m_xdes);
510 			ulint		pos = page_no % FSP_EXTENT_SIZE;
511 
512 			return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
513 		}
514 
515 		/* If the current xdes was free, the page must be free. */
516 		return(true);
517 	}
518 
519 protected:
520 	/** Covering transaction. */
521 	trx_t*			m_trx;
522 
523 	/** Space id of the file being iterated over. */
524 	ulint			m_space;
525 
526 	/** Minimum page number for which the free list has not been
527 	initialized: the pages >= this limit are, by definition, free;
528 	note that in a single-table tablespace where size < 64 pages,
529 	this number is 64, i.e., we have initialized the space about
530 	the first extent, but have not physically allocted those pages
531 	to the file. @see FSP_LIMIT. */
532 	ulint			m_free_limit;
533 
534 	/** Current size of the space in pages */
535 	ulint			m_size;
536 
537 	/** Current extent descriptor page */
538 	xdes_t*			m_xdes;
539 
540 	/** Physical page offset in the file of the extent descriptor */
541 	ulint			m_xdes_page_no;
542 
543 	/** Flags value read from the header page */
544 	ulint			m_space_flags;
545 
546 	/** Derived from m_space_flags and row format type, the row format
547 	type is determined from the page header. */
548 	ulint			m_table_flags;
549 };
550 
551 /** Determine the page size to use for traversing the tablespace
552 @param file_size - size of the tablespace file in bytes
553 @param block - contents of the first page in the tablespace file.
554 @retval DB_SUCCESS or error code. */
555 dberr_t
init(os_offset_t file_size,const buf_block_t * block)556 AbstractCallback::init(
557 	os_offset_t		file_size,
558 	const buf_block_t*	block) UNIV_NOTHROW
559 {
560 	const page_t*		page = block->frame;
561 
562 	m_space_flags = fsp_header_get_flags(page);
563 
564 	/* Since we don't know whether it is a compressed table
565 	or not, the data is always read into the block->frame. */
566 
567 	dberr_t	err = set_zip_size(block->frame);
568 
569 	if (err != DB_SUCCESS) {
570 		return(DB_CORRUPTION);
571 	}
572 
573 	/* Set the page size used to traverse the tablespace. */
574 
575 	m_page_size = (is_compressed_table())
576 		? get_zip_size() : fsp_flags_get_page_size(m_space_flags);
577 
578 	if (m_page_size == 0) {
579 		ib_logf(IB_LOG_LEVEL_ERROR, "Page size is 0");
580 		return(DB_CORRUPTION);
581 	} else if (!is_compressed_table() && m_page_size != UNIV_PAGE_SIZE) {
582 
583 		ib_logf(IB_LOG_LEVEL_ERROR,
584 			"Page size %lu of ibd file is not the same "
585 			"as the server page size %lu",
586 			m_page_size, UNIV_PAGE_SIZE);
587 
588 		return(DB_CORRUPTION);
589 
590 	} else if ((file_size % m_page_size)) {
591 
592 		ib_logf(IB_LOG_LEVEL_ERROR,
593 			"File size " UINT64PF " is not a multiple "
594 			"of the page size %lu",
595 			(ib_uint64_t) file_size, (ulong) m_page_size);
596 
597 		return(DB_CORRUPTION);
598 	}
599 
600 	ut_a(m_space == ULINT_UNDEFINED);
601 
602 	m_size  = mach_read_from_4(page + FSP_SIZE);
603 	m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
604 	m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
605 
606 	if ((err = set_current_xdes(0, page)) != DB_SUCCESS) {
607 		return(err);
608 	}
609 
610 	return(DB_SUCCESS);
611 }
612 
613 /**
614 Try and determine the index root pages by checking if the next/prev
615 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
616 struct FetchIndexRootPages : public AbstractCallback {
617 
618 	/** Index information gathered from the .ibd file. */
619 	struct Index {
620 
IndexFetchIndexRootPages::Index621 		Index(index_id_t id, ulint page_no)
622 			:
623 			m_id(id),
624 			m_page_no(page_no) { }
625 
626 		index_id_t	m_id;		/*!< Index id */
627 		ulint		m_page_no;	/*!< Root page number */
628 	};
629 
630 	typedef std::vector<Index> Indexes;
631 
632 	/** Constructor
633 	@param trx - covering (user) transaction
634 	@param table - table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages635 	FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
636 		:
637 		AbstractCallback(trx),
638 		m_table(table) UNIV_NOTHROW { }
639 
640 	/** Destructor */
~FetchIndexRootPagesFetchIndexRootPages641 	virtual ~FetchIndexRootPages() UNIV_NOTHROW { }
642 
643 	/**
644 	@retval the space id of the tablespace being iterated over */
get_space_idFetchIndexRootPages645 	virtual ulint get_space_id() const UNIV_NOTHROW
646 	{
647 		return(m_space);
648 	}
649 
650 	/**
651 	@retval the space flags of the tablespace being iterated over */
get_space_flagsFetchIndexRootPages652 	virtual ulint get_space_flags() const UNIV_NOTHROW
653 	{
654 		return(m_space_flags);
655 	}
656 
657 	/**
658 	Check if the .ibd file row format is the same as the table's.
659 	@param ibd_table_flags - determined from space and page.
660 	@return DB_SUCCESS or error code. */
check_row_formatFetchIndexRootPages661 	dberr_t check_row_format(ulint ibd_table_flags) UNIV_NOTHROW
662 	{
663 		dberr_t		err;
664 		rec_format_t	ibd_rec_format;
665 		rec_format_t	table_rec_format;
666 
667 		if (!dict_tf_is_valid(ibd_table_flags)) {
668 
669 			ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
670 				ER_TABLE_SCHEMA_MISMATCH,
671 				".ibd file has invlad table flags: %lx",
672 				ibd_table_flags);
673 
674 			return(DB_CORRUPTION);
675 		}
676 
677 		ibd_rec_format = dict_tf_get_rec_format(ibd_table_flags);
678 		table_rec_format = dict_tf_get_rec_format(m_table->flags);
679 
680 		if (table_rec_format != ibd_rec_format) {
681 
682 			ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
683 				ER_TABLE_SCHEMA_MISMATCH,
684 				"Table has %s row format, .ibd "
685 				"file has %s row format.",
686 				dict_tf_to_row_format_string(m_table->flags),
687 				dict_tf_to_row_format_string(ibd_table_flags));
688 
689 			err = DB_CORRUPTION;
690 		} else {
691 			err = DB_SUCCESS;
692 		}
693 
694 		return(err);
695 	}
696 
697 	/**
698 	Called for each block as it is read from the file.
699 	@param offset - physical offset in the file
700 	@param block - block to convert, it is not from the buffer pool.
701 	@retval DB_SUCCESS or error code. */
702 	virtual dberr_t operator() (
703 		os_offset_t	offset,
704 		buf_block_t*	block) UNIV_NOTHROW;
705 
706 	/** Update the import configuration that will be used to import
707 	the tablespace. */
708 	dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
709 
710 	/** Table definition in server. */
711 	const dict_table_t*	m_table;
712 
713 	/** Index information */
714 	Indexes			m_indexes;
715 };
716 
717 /**
718 Called for each block as it is read from the file. Check index pages to
719 determine the exact row format. We can't get that from the tablespace
720 header flags alone.
721 
722 @param offset - physical offset in the file
723 @param block - block to convert, it is not from the buffer pool.
724 @retval DB_SUCCESS or error code. */
725 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)726 FetchIndexRootPages::operator() (
727 	os_offset_t	offset,
728 	buf_block_t*	block) UNIV_NOTHROW
729 {
730 	dberr_t		err;
731 
732 	if ((err = periodic_check()) != DB_SUCCESS) {
733 		return(err);
734 	}
735 
736 	const page_t*	page = get_frame(block);
737 
738 	ulint	page_type = fil_page_get_type(page);
739 
740 	if (block->page.offset * m_page_size != offset) {
741 		ib_logf(IB_LOG_LEVEL_ERROR,
742 			"Page offset doesn't match file offset: "
743 			"page offset: %lu, file offset: %lu",
744 			(ulint) block->page.offset,
745 			(ulint) (offset / m_page_size));
746 
747 		err = DB_CORRUPTION;
748 	} else if (page_type == FIL_PAGE_TYPE_XDES) {
749 		err = set_current_xdes(block->page.offset, page);
750 	} else if (page_type == FIL_PAGE_INDEX
751 		   && !is_free(block->page.offset)
752 		   && is_root_page(page)) {
753 
754 		index_id_t	id = btr_page_get_index_id(page);
755 		ulint		page_no = buf_block_get_page_no(block);
756 
757 		m_indexes.push_back(Index(id, page_no));
758 
759 		if (m_indexes.size() == 1) {
760 
761 			m_table_flags = dict_sys_tables_type_to_tf(
762 				m_space_flags,
763 				page_is_comp(page) ? DICT_N_COLS_COMPACT : 0);
764 
765 			err = check_row_format(m_table_flags);
766 		}
767 	}
768 
769 	return(err);
770 }
771 
772 /**
773 Update the import configuration that will be used to import the tablespace.
774 @return error code or DB_SUCCESS */
775 dberr_t
build_row_import(row_import * cfg) const776 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
777 {
778 	Indexes::const_iterator end = m_indexes.end();
779 
780 	ut_a(cfg->m_table == m_table);
781 	cfg->m_page_size = m_page_size;
782 	cfg->m_n_indexes = m_indexes.size();
783 
784 	if (cfg->m_n_indexes == 0) {
785 
786 		ib_logf(IB_LOG_LEVEL_ERROR, "No B+Tree found in tablespace");
787 
788 		return(DB_CORRUPTION);
789 	}
790 
791 	cfg->m_indexes = new(std::nothrow) row_index_t[cfg->m_n_indexes];
792 
793 	/* Trigger OOM */
794 	DBUG_EXECUTE_IF("ib_import_OOM_11",
795 			delete [] cfg->m_indexes; cfg->m_indexes = 0;);
796 
797 	if (cfg->m_indexes == 0) {
798 		return(DB_OUT_OF_MEMORY);
799 	}
800 
801 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
802 
803 	row_index_t*	cfg_index = cfg->m_indexes;
804 
805 	for (Indexes::const_iterator it = m_indexes.begin();
806 	     it != end;
807 	     ++it, ++cfg_index) {
808 
809 		char	name[BUFSIZ];
810 
811 		ut_snprintf(name, sizeof(name), "index" IB_ID_FMT, it->m_id);
812 
813 		ulint	len = strlen(name) + 1;
814 
815 		cfg_index->m_name = new(std::nothrow) byte[len];
816 
817 		/* Trigger OOM */
818 		DBUG_EXECUTE_IF("ib_import_OOM_12",
819 				delete [] cfg_index->m_name;
820 				cfg_index->m_name = 0;);
821 
822 		if (cfg_index->m_name == 0) {
823 			return(DB_OUT_OF_MEMORY);
824 		}
825 
826 		memcpy(cfg_index->m_name, name, len);
827 
828 		cfg_index->m_id = it->m_id;
829 
830 		cfg_index->m_space = m_space;
831 
832 		cfg_index->m_page_no = it->m_page_no;
833 	}
834 
835 	return(DB_SUCCESS);
836 }
837 
838 /* Functor that is called for each physical page that is read from the
839 tablespace file.
840 
841   1. Check each page for corruption.
842 
843   2. Update the space id and LSN on every page
844      * For the header page
845        - Validate the flags
846        - Update the LSN
847 
848   3. On Btree pages
849      * Set the index id
850      * Update the max trx id
851      * In a cluster index, update the system columns
852      * In a cluster index, update the BLOB ptr, set the space id
853      * Purge delete marked records, but only if they can be easily
854        removed from the page
855      * Keep a counter of number of rows, ie. non-delete-marked rows
856      * Keep a counter of number of delete marked rows
857      * Keep a counter of number of purge failure
858      * If a page is stamped with an index id that isn't in the .cfg file
859        we assume it is deleted and the page can be ignored.
860 
861    4. Set the page state to dirty so that it will be written to disk.
862 */
863 class PageConverter : public AbstractCallback {
864 public:
865 	/** Constructor
866 	* @param cfg - config of table being imported.
867 	* @param trx - transaction covering the import */
868 	PageConverter(row_import* cfg, trx_t* trx) UNIV_NOTHROW;
869 
~PageConverter()870 	virtual ~PageConverter() UNIV_NOTHROW
871 	{
872 		if (m_heap != 0) {
873 			mem_heap_free(m_heap);
874 		}
875 	}
876 
877 	/**
878 	@retval the server space id of the tablespace being iterated over */
get_space_id() const879 	virtual ulint get_space_id() const UNIV_NOTHROW
880 	{
881 		return(m_cfg->m_table->space);
882 	}
883 
884 	/**
885 	Called for each block as it is read from the file.
886 	@param offset - physical offset in the file
887 	@param block - block to convert, it is not from the buffer pool.
888 	@retval DB_SUCCESS or error code. */
889 	virtual dberr_t operator() (
890 		os_offset_t	offset,
891 		buf_block_t*	block) UNIV_NOTHROW;
892 private:
893 
894 	/** Status returned by PageConverter::validate() */
895 	enum import_page_status_t {
896 		IMPORT_PAGE_STATUS_OK,		/*!< Page is OK */
897 		IMPORT_PAGE_STATUS_ALL_ZERO,	/*!< Page is all zeros */
898 		IMPORT_PAGE_STATUS_CORRUPTED	/*!< Page is corrupted */
899 	};
900 
901 	/**
902 	Update the page, set the space id, max trx id and index id.
903 	@param block - block read from file
904 	@param page_type - type of the page
905 	@retval DB_SUCCESS or error code */
906 	dberr_t update_page(
907 		buf_block_t*	block,
908 		ulint&		page_type) UNIV_NOTHROW;
909 
910 #if defined UNIV_DEBUG
911 	/**
912 	@return true error condition is enabled. */
trigger_corruption()913 	bool trigger_corruption() UNIV_NOTHROW
914 	{
915 		return(false);
916 	}
917 	#else
918 #define trigger_corruption()	(false)
919 #endif /* UNIV_DEBUG */
920 
921 	/**
922 	Update the space, index id, trx id.
923 	@param block - block to convert
924 	@return DB_SUCCESS or error code */
925 	dberr_t	update_index_page(buf_block_t*	block) UNIV_NOTHROW;
926 
927 	/** Update the BLOB refrences and write UNDO log entries for
928 	rows that can't be purged optimistically.
929 	@param block - block to update
930 	@retval DB_SUCCESS or error code */
931 	dberr_t	update_records(buf_block_t* block) UNIV_NOTHROW;
932 
933 	/**
934 	Validate the page, check for corruption.
935 	@param offset - physical offset within file.
936 	@param page - page read from file.
937 	@return 0 on success, 1 if all zero, 2 if corrupted */
938 	import_page_status_t validate(
939 		os_offset_t	offset,
940 		buf_block_t*	page) UNIV_NOTHROW;
941 
942 	/**
943 	Validate the space flags and update tablespace header page.
944 	@param block - block read from file, not from the buffer pool.
945 	@retval DB_SUCCESS or error code */
946 	dberr_t	update_header(buf_block_t* block) UNIV_NOTHROW;
947 
948 	/**
949 	Adjust the BLOB reference for a single column that is externally stored
950 	@param rec - record to update
951 	@param offsets - column offsets for the record
952 	@param i - column ordinal value
953 	@return DB_SUCCESS or error code */
954 	dberr_t	adjust_cluster_index_blob_column(
955 		rec_t*		rec,
956 		const ulint*	offsets,
957 		ulint		i) UNIV_NOTHROW;
958 
959 	/**
960 	Adjusts the BLOB reference in the clustered index row for all
961 	externally stored columns.
962 	@param rec - record to update
963 	@param offsets - column offsets for the record
964 	@return DB_SUCCESS or error code */
965 	dberr_t	adjust_cluster_index_blob_columns(
966 		rec_t*		rec,
967 		const ulint*	offsets) UNIV_NOTHROW;
968 
969 	/**
970 	In the clustered index, adjist the BLOB pointers as needed.
971 	Also update the BLOB reference, write the new space id.
972 	@param rec - record to update
973 	@param offsets - column offsets for the record
974 	@return DB_SUCCESS or error code */
975 	dberr_t	adjust_cluster_index_blob_ref(
976 		rec_t*		rec,
977 		const ulint*	offsets) UNIV_NOTHROW;
978 
979 	/**
980 	Purge delete-marked records, only if it is possible to do
981 	so without re-organising the B+tree.
982 	@param offsets - current row offsets.
983 	@retval true if purged */
984 	bool	purge(const ulint* offsets) UNIV_NOTHROW;
985 
986 	/**
987 	Adjust the BLOB references and sys fields for the current record.
988 	@param index - the index being converted
989 	@param rec - record to update
990 	@param offsets - column offsets for the record
991 	@param deleted - true if row is delete marked
992 	@return DB_SUCCESS or error code. */
993 	dberr_t	adjust_cluster_record(
994 		const dict_index_t*	index,
995 		rec_t*			rec,
996 		const ulint*		offsets,
997 		bool			deleted) UNIV_NOTHROW;
998 
999 	/**
1000 	Find an index with the matching id.
1001 	@return row_index_t* instance or 0 */
find_index(index_id_t id)1002 	row_index_t* find_index(index_id_t id) UNIV_NOTHROW
1003 	{
1004 		row_index_t*	index = &m_cfg->m_indexes[0];
1005 
1006 		for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
1007 			if (id == index->m_id) {
1008 				return(index);
1009 			}
1010 		}
1011 
1012 		return(0);
1013 
1014 	}
1015 private:
1016 	/** Config for table that is being imported. */
1017 	row_import*		m_cfg;
1018 
1019 	/** Current index whose pages are being imported */
1020 	row_index_t*		m_index;
1021 
1022 	/** Current system LSN */
1023 	lsn_t			m_current_lsn;
1024 
1025 	/** Alias for m_page_zip, only set for compressed pages. */
1026 	page_zip_des_t*		m_page_zip_ptr;
1027 
1028 	/** Iterator over records in a block */
1029 	RecIterator		m_rec_iter;
1030 
1031 	/** Record offset */
1032 	ulint			m_offsets_[REC_OFFS_NORMAL_SIZE];
1033 
1034 	/** Pointer to m_offsets_ */
1035 	ulint*			m_offsets;
1036 
1037 	/** Memory heap for the record offsets */
1038 	mem_heap_t*		m_heap;
1039 
1040 	/** Cluster index instance */
1041 	dict_index_t*		m_cluster_index;
1042 };
1043 
1044 /**
1045 row_import destructor. */
~row_import()1046 row_import::~row_import() UNIV_NOTHROW
1047 {
1048 	for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
1049 		delete [] m_indexes[i].m_name;
1050 
1051 		if (m_indexes[i].m_fields == 0) {
1052 			continue;
1053 		}
1054 
1055 		dict_field_t*	fields = m_indexes[i].m_fields;
1056 		ulint		n_fields = m_indexes[i].m_n_fields;
1057 
1058 		for (ulint j = 0; j < n_fields; ++j) {
1059 			delete [] fields[j].name;
1060 		}
1061 
1062 		delete [] fields;
1063 	}
1064 
1065 	for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
1066 		delete [] m_col_names[i];
1067 	}
1068 
1069 	delete [] m_cols;
1070 	delete [] m_indexes;
1071 	delete [] m_col_names;
1072 	delete [] m_table_name;
1073 	delete [] m_hostname;
1074 }
1075 
1076 /**
1077 Find the index entry in in the indexes array.
1078 @param name - index name
1079 @return instance if found else 0. */
1080 row_index_t*
get_index(const char * name) const1081 row_import::get_index(
1082 	const char*	name) const UNIV_NOTHROW
1083 {
1084 	for (ulint i = 0; i < m_n_indexes; ++i) {
1085 		const char*	index_name;
1086 		row_index_t*	index = &m_indexes[i];
1087 
1088 		index_name = reinterpret_cast<const char*>(index->m_name);
1089 
1090 		if (strcmp(index_name, name) == 0) {
1091 
1092 			return(index);
1093 		}
1094 	}
1095 
1096 	return(0);
1097 }
1098 
1099 /**
1100 Get the number of rows in the index.
1101 @param name - index name
1102 @return number of rows (doesn't include delete marked rows). */
1103 ulint
get_n_rows(const char * name) const1104 row_import::get_n_rows(
1105 	const char*	name) const UNIV_NOTHROW
1106 {
1107 	const row_index_t*	index = get_index(name);
1108 
1109 	ut_a(name != 0);
1110 
1111 	return(index->m_stats.m_n_rows);
1112 }
1113 
1114 /**
1115 Get the number of rows for which purge failed uding the convert phase.
1116 @param name - index name
1117 @return number of rows for which purge failed. */
1118 ulint
get_n_purge_failed(const char * name) const1119 row_import::get_n_purge_failed(
1120 	const char*	name) const UNIV_NOTHROW
1121 {
1122 	const row_index_t*	index = get_index(name);
1123 
1124 	ut_a(name != 0);
1125 
1126 	return(index->m_stats.m_n_purge_failed);
1127 }
1128 
1129 /**
1130 Find the ordinal value of the column name in the cfg table columns.
1131 @param name - of column to look for.
1132 @return ULINT_UNDEFINED if not found. */
1133 ulint
find_col(const char * name) const1134 row_import::find_col(
1135 	const char*	name) const UNIV_NOTHROW
1136 {
1137 	for (ulint i = 0; i < m_n_cols; ++i) {
1138 		const char*	col_name;
1139 
1140 		col_name = reinterpret_cast<const char*>(m_col_names[i]);
1141 
1142 		if (strcmp(col_name, name) == 0) {
1143 			return(i);
1144 		}
1145 	}
1146 
1147 	return(ULINT_UNDEFINED);
1148 }
1149 
1150 /**
1151 Check if the index schema that was read from the .cfg file matches the
1152 in memory index definition.
1153 @return DB_SUCCESS or error code. */
1154 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1155 row_import::match_index_columns(
1156 	THD*			thd,
1157 	const dict_index_t*	index) UNIV_NOTHROW
1158 {
1159 	row_index_t*		cfg_index;
1160 	dberr_t			err = DB_SUCCESS;
1161 
1162 	cfg_index = get_index(index->name);
1163 
1164 	if (cfg_index == 0) {
1165 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1166 			 ER_TABLE_SCHEMA_MISMATCH,
1167 			 "Index %s not found in tablespace meta-data file.",
1168 			 index->name);
1169 
1170 		return(DB_ERROR);
1171 	}
1172 
1173 	if (cfg_index->m_n_fields != index->n_fields) {
1174 
1175 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1176 			 ER_TABLE_SCHEMA_MISMATCH,
1177 			 "Index field count %lu doesn't match"
1178 			 " tablespace metadata file value %lu",
1179 			 (ulong) index->n_fields,
1180 			 (ulong) cfg_index->m_n_fields);
1181 
1182 		return(DB_ERROR);
1183 	}
1184 
1185 	cfg_index->m_srv_index = index;
1186 
1187 	const dict_field_t*	field = index->fields;
1188 	const dict_field_t*	cfg_field = cfg_index->m_fields;
1189 
1190 	for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1191 
1192 		if (strcmp(field->name, cfg_field->name) != 0) {
1193 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1194 				 ER_TABLE_SCHEMA_MISMATCH,
1195 				 "Index field name %s doesn't match"
1196 				 " tablespace metadata field name %s"
1197 				 " for field position %lu",
1198 				 field->name, cfg_field->name, (ulong) i);
1199 
1200 			err = DB_ERROR;
1201 		}
1202 
1203 		if (cfg_field->prefix_len != field->prefix_len) {
1204 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1205 				 ER_TABLE_SCHEMA_MISMATCH,
1206 				 "Index %s field %s prefix len %lu"
1207 				 " doesn't match metadata file value"
1208 				 " %lu",
1209 				 index->name, field->name,
1210 				 (ulong) field->prefix_len,
1211 				 (ulong) cfg_field->prefix_len);
1212 
1213 			err = DB_ERROR;
1214 		}
1215 
1216 		if (cfg_field->fixed_len != field->fixed_len) {
1217 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1218 				 ER_TABLE_SCHEMA_MISMATCH,
1219 				 "Index %s field %s fixed len %lu"
1220 				 " doesn't match metadata file value"
1221 				 " %lu",
1222 				 index->name, field->name,
1223 				 (ulong) field->fixed_len,
1224 				 (ulong) cfg_field->fixed_len);
1225 
1226 			err = DB_ERROR;
1227 		}
1228 	}
1229 
1230 	return(err);
1231 }
1232 
1233 /**
1234 Check if the table schema that was read from the .cfg file matches the
1235 in memory table definition.
1236 @param thd - MySQL session variable
1237 @return DB_SUCCESS or error code. */
1238 dberr_t
match_table_columns(THD * thd)1239 row_import::match_table_columns(
1240 	THD*			thd) UNIV_NOTHROW
1241 {
1242 	dberr_t			err = DB_SUCCESS;
1243 	const dict_col_t*	col = m_table->cols;
1244 
1245 	for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1246 
1247 		const char*	col_name;
1248 		ulint		cfg_col_index;
1249 
1250 		col_name = dict_table_get_col_name(
1251 			m_table, dict_col_get_no(col));
1252 
1253 		cfg_col_index = find_col(col_name);
1254 
1255 		if (cfg_col_index == ULINT_UNDEFINED) {
1256 
1257 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1258 				 ER_TABLE_SCHEMA_MISMATCH,
1259 				 "Column %s not found in tablespace.",
1260 				 col_name);
1261 
1262 			err = DB_ERROR;
1263 		} else if (cfg_col_index != col->ind) {
1264 
1265 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1266 				 ER_TABLE_SCHEMA_MISMATCH,
1267 				 "Column %s ordinal value mismatch, it's at "
1268 				 "%lu in the table and %lu in the tablespace "
1269 				 "meta-data file",
1270 				 col_name,
1271 				 (ulong) col->ind, (ulong) cfg_col_index);
1272 
1273 			err = DB_ERROR;
1274 		} else {
1275 			const dict_col_t*	cfg_col;
1276 
1277 			cfg_col = &m_cols[cfg_col_index];
1278 			ut_a(cfg_col->ind == cfg_col_index);
1279 
1280 			if (cfg_col->prtype != col->prtype) {
1281 				ib_errf(thd,
1282 					 IB_LOG_LEVEL_ERROR,
1283 					 ER_TABLE_SCHEMA_MISMATCH,
1284 					 "Column %s precise type mismatch.",
1285 					 col_name);
1286 				err = DB_ERROR;
1287 			}
1288 
1289 			if (cfg_col->mtype != col->mtype) {
1290 				ib_errf(thd,
1291 					 IB_LOG_LEVEL_ERROR,
1292 					 ER_TABLE_SCHEMA_MISMATCH,
1293 					 "Column %s main type mismatch.",
1294 					 col_name);
1295 				err = DB_ERROR;
1296 			}
1297 
1298 			if (cfg_col->len != col->len) {
1299 				ib_errf(thd,
1300 					 IB_LOG_LEVEL_ERROR,
1301 					 ER_TABLE_SCHEMA_MISMATCH,
1302 					 "Column %s length mismatch.",
1303 					 col_name);
1304 				err = DB_ERROR;
1305 			}
1306 
1307 			if (cfg_col->mbminmaxlen != col->mbminmaxlen) {
1308 				ib_errf(thd,
1309 					 IB_LOG_LEVEL_ERROR,
1310 					 ER_TABLE_SCHEMA_MISMATCH,
1311 					 "Column %s multi-byte len mismatch.",
1312 					 col_name);
1313 				err = DB_ERROR;
1314 			}
1315 
1316 			if (cfg_col->ind != col->ind) {
1317 				err = DB_ERROR;
1318 			}
1319 
1320 			if (cfg_col->ord_part != col->ord_part) {
1321 				ib_errf(thd,
1322 					 IB_LOG_LEVEL_ERROR,
1323 					 ER_TABLE_SCHEMA_MISMATCH,
1324 					 "Column %s ordering mismatch.",
1325 					 col_name);
1326 				err = DB_ERROR;
1327 			}
1328 
1329 			if (cfg_col->max_prefix != col->max_prefix) {
1330 				ib_errf(thd,
1331 					 IB_LOG_LEVEL_ERROR,
1332 					 ER_TABLE_SCHEMA_MISMATCH,
1333 					 "Column %s max prefix mismatch.",
1334 					 col_name);
1335 				err = DB_ERROR;
1336 			}
1337 		}
1338 	}
1339 
1340 	return(err);
1341 }
1342 
1343 /**
1344 Check if the table (and index) schema that was read from the .cfg file
1345 matches the in memory table definition.
1346 @param thd - MySQL session variable
1347 @return DB_SUCCESS or error code. */
1348 dberr_t
match_schema(THD * thd)1349 row_import::match_schema(
1350 	THD*		thd) UNIV_NOTHROW
1351 {
1352 	/* Do some simple checks. */
1353 
1354 	if (m_flags != m_table->flags) {
1355 		if (dict_tf_to_row_format_string(m_flags) !=
1356 				dict_tf_to_row_format_string(m_table->flags)) {
1357 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1358 				ER_TABLE_SCHEMA_MISMATCH,
1359 				"Table flags don't match,"
1360 				"server table has %s "
1361 				"and the meta-data file has %s",
1362 				dict_tf_to_row_format_string(m_table->flags),
1363 				dict_tf_to_row_format_string(m_flags));
1364 		} else if (DICT_TF_HAS_DATA_DIR(m_flags) !=
1365 			DICT_TF_HAS_DATA_DIR(m_table->flags)) {
1366 			/* If the meta-data flag is set for data_dir,
1367 			but table flag is not set for data_dir or vice versa
1368 			then return error. */
1369 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1370 				ER_TABLE_SCHEMA_MISMATCH,
1371 				"Table location flags do not match. "
1372 				"The source table %s a DATA DIRECTORY "
1373 				"but the destination table %s.",
1374 				(DICT_TF_HAS_DATA_DIR(m_flags) ? "uses"
1375 				: "does not use"),
1376 				(DICT_TF_HAS_DATA_DIR(m_table->flags) ? "does"
1377 				: "does not"));
1378 		} else {
1379 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1380 				ER_TABLE_SCHEMA_MISMATCH,
1381 				"Table flags don't match");
1382 		}
1383 		return(DB_ERROR);
1384 	} else if (m_table->n_cols != m_n_cols) {
1385 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1386 			 "Number of columns don't match, table has %lu "
1387 			 "columns but the tablespace meta-data file has "
1388 			 "%lu columns",
1389 			 (ulong) m_table->n_cols, (ulong) m_n_cols);
1390 
1391 		return(DB_ERROR);
1392 	} else if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1393 
1394 		/* If the number of indexes don't match then it is better
1395 		to abort the IMPORT. It is easy for the user to create a
1396 		table matching the IMPORT definition. */
1397 
1398 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1399 			 "Number of indexes don't match, table has %lu "
1400 			 "indexes but the tablespace meta-data file has "
1401 			 "%lu indexes",
1402 			 (ulong) UT_LIST_GET_LEN(m_table->indexes),
1403 			 (ulong) m_n_indexes);
1404 
1405 		return(DB_ERROR);
1406 	}
1407 
1408 	dberr_t	err = match_table_columns(thd);
1409 
1410 	if (err != DB_SUCCESS) {
1411 		return(err);
1412 	}
1413 
1414 	/* Check if the index definitions match. */
1415 
1416 	const dict_index_t* index;
1417 
1418 	for (index = UT_LIST_GET_FIRST(m_table->indexes);
1419 	     index != 0;
1420 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1421 
1422 		dberr_t	index_err;
1423 
1424 		index_err = match_index_columns(thd, index);
1425 
1426 		if (index_err != DB_SUCCESS) {
1427 			err = index_err;
1428 		}
1429 	}
1430 
1431 	return(err);
1432 }
1433 
1434 /**
1435 Set the index root <space, pageno>, using index name. */
1436 void
set_root_by_name()1437 row_import::set_root_by_name() UNIV_NOTHROW
1438 {
1439 	row_index_t*	cfg_index = m_indexes;
1440 
1441 	for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1442 		dict_index_t*	index;
1443 
1444 		const char*	index_name;
1445 
1446 		index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1447 
1448 		index = dict_table_get_index_on_name(m_table, index_name);
1449 
1450 		/* We've already checked that it exists. */
1451 		ut_a(index != 0);
1452 
1453 		/* Set the root page number and space id. */
1454 		index->space = m_table->space;
1455 		index->page = cfg_index->m_page_no;
1456 	}
1457 }
1458 
1459 /**
1460 Set the index root <space, pageno>, using a heuristic.
1461 @return DB_SUCCESS or error code */
1462 dberr_t
set_root_by_heuristic()1463 row_import::set_root_by_heuristic() UNIV_NOTHROW
1464 {
1465 	row_index_t*	cfg_index = m_indexes;
1466 
1467 	ut_a(m_n_indexes > 0);
1468 
1469 	// TODO: For now use brute force, based on ordinality
1470 
1471 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1472 
1473 		char	table_name[MAX_FULL_NAME_LEN + 1];
1474 
1475 		innobase_format_name(
1476 			table_name, sizeof(table_name), m_table->name, FALSE);
1477 
1478 		ib_logf(IB_LOG_LEVEL_WARN,
1479 			"Table %s should have %lu indexes but the tablespace "
1480 			"has %lu indexes",
1481 			table_name,
1482 			UT_LIST_GET_LEN(m_table->indexes),
1483 			m_n_indexes);
1484 	}
1485 
1486 	dict_mutex_enter_for_mysql();
1487 
1488 	ulint	i = 0;
1489 	dberr_t	err = DB_SUCCESS;
1490 
1491 	for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1492 	     index != 0;
1493 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1494 
1495 		if (index->type & DICT_FTS) {
1496 			index->type |= DICT_CORRUPT;
1497 			ib_logf(IB_LOG_LEVEL_WARN,
1498 				"Skipping FTS index: %s", index->name);
1499 		} else if (i < m_n_indexes) {
1500 
1501 			delete [] cfg_index[i].m_name;
1502 
1503 			ulint	len = strlen(index->name) + 1;
1504 
1505 			cfg_index[i].m_name = new(std::nothrow) byte[len];
1506 
1507 			/* Trigger OOM */
1508 			DBUG_EXECUTE_IF("ib_import_OOM_14",
1509 					delete[] cfg_index[i].m_name;
1510 					cfg_index[i].m_name = 0;);
1511 
1512 			if (cfg_index[i].m_name == 0) {
1513 				err = DB_OUT_OF_MEMORY;
1514 				break;
1515 			}
1516 
1517 			memcpy(cfg_index[i].m_name, index->name, len);
1518 
1519 			cfg_index[i].m_srv_index = index;
1520 
1521 			index->space = m_table->space;
1522 			index->page = cfg_index[i].m_page_no;
1523 
1524 			++i;
1525 		}
1526 	}
1527 
1528 	dict_mutex_exit_for_mysql();
1529 
1530 	return(err);
1531 }
1532 
1533 /**
1534 Purge delete marked records.
1535 @return DB_SUCCESS or error code. */
1536 dberr_t
garbage_collect()1537 IndexPurge::garbage_collect() UNIV_NOTHROW
1538 {
1539 	dberr_t	err;
1540 	ibool	comp = dict_table_is_comp(m_index->table);
1541 
1542 	/* Open the persistent cursor and start the mini-transaction. */
1543 
1544 	open();
1545 
1546 	while ((err = next()) == DB_SUCCESS) {
1547 
1548 		rec_t*	rec = btr_pcur_get_rec(&m_pcur);
1549 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1550 
1551 		if (!deleted) {
1552 			++m_n_rows;
1553 		} else {
1554 			purge();
1555 		}
1556 	}
1557 
1558 	/* Close the persistent cursor and commit the mini-transaction. */
1559 
1560 	close();
1561 
1562 	return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1563 }
1564 
1565 /**
1566 Begin import, position the cursor on the first record. */
1567 void
open()1568 IndexPurge::open() UNIV_NOTHROW
1569 {
1570 	mtr_start(&m_mtr);
1571 
1572 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1573 
1574 	btr_pcur_open_at_index_side(
1575 		true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1576 }
1577 
1578 /**
1579 Close the persistent curosr and commit the mini-transaction. */
1580 void
close()1581 IndexPurge::close() UNIV_NOTHROW
1582 {
1583 	btr_pcur_close(&m_pcur);
1584 	mtr_commit(&m_mtr);
1585 }
1586 
1587 /**
1588 Position the cursor on the next record.
1589 @return DB_SUCCESS or error code */
1590 dberr_t
next()1591 IndexPurge::next() UNIV_NOTHROW
1592 {
1593 	btr_pcur_move_to_next_on_page(&m_pcur);
1594 
1595 	/* When switching pages, commit the mini-transaction
1596 	in order to release the latch on the old page. */
1597 
1598 	if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1599 		return(DB_SUCCESS);
1600 	} else if (trx_is_interrupted(m_trx)) {
1601 		/* Check after every page because the check
1602 		is expensive. */
1603 		return(DB_INTERRUPTED);
1604 	}
1605 
1606 	btr_pcur_store_position(&m_pcur, &m_mtr);
1607 
1608 	mtr_commit(&m_mtr);
1609 
1610 	mtr_start(&m_mtr);
1611 
1612 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1613 
1614 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1615 
1616 	if (!btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr)) {
1617 
1618 		return(DB_END_OF_INDEX);
1619 	}
1620 
1621 	return(DB_SUCCESS);
1622 }
1623 
1624 /**
1625 Store the persistent cursor position and reopen the
1626 B-tree cursor in BTR_MODIFY_TREE mode, because the
1627 tree structure may be changed during a pessimistic delete. */
1628 void
purge_pessimistic_delete()1629 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1630 {
1631 	dberr_t	err;
1632 
1633 	btr_pcur_restore_position(BTR_MODIFY_TREE, &m_pcur, &m_mtr);
1634 
1635 	ut_ad(rec_get_deleted_flag(
1636 			btr_pcur_get_rec(&m_pcur),
1637 			dict_table_is_comp(m_index->table)));
1638 
1639 	btr_cur_pessimistic_delete(
1640 		&err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, RB_NONE, &m_mtr);
1641 
1642 	ut_a(err == DB_SUCCESS);
1643 
1644 	/* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1645 	mtr_commit(&m_mtr);
1646 }
1647 
1648 /**
1649 Purge delete-marked records. */
1650 void
purge()1651 IndexPurge::purge() UNIV_NOTHROW
1652 {
1653 	btr_pcur_store_position(&m_pcur, &m_mtr);
1654 
1655 	purge_pessimistic_delete();
1656 
1657 	mtr_start(&m_mtr);
1658 
1659 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1660 
1661 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1662 }
1663 
1664 /**
1665 Constructor
1666 * @param cfg - config of table being imported.
1667 * @param trx - transaction covering the import */
PageConverter(row_import * cfg,trx_t * trx)1668 PageConverter::PageConverter(
1669 	row_import*	cfg,
1670 	trx_t*		trx)
1671 	:
1672 	AbstractCallback(trx),
1673 	m_cfg(cfg),
1674 	m_page_zip_ptr(0),
1675 	m_heap(0) UNIV_NOTHROW
1676 {
1677 	m_index = m_cfg->m_indexes;
1678 
1679 	m_current_lsn = log_get_lsn();
1680 	ut_a(m_current_lsn > 0);
1681 
1682 	m_offsets = m_offsets_;
1683 	rec_offs_init(m_offsets_);
1684 
1685 	m_cluster_index = dict_table_get_first_index(m_cfg->m_table);
1686 }
1687 
1688 /**
1689 Adjust the BLOB reference for a single column that is externally stored
1690 @param rec - record to update
1691 @param offsets - column offsets for the record
1692 @param i - column ordinal value
1693 @return DB_SUCCESS or error code */
1694 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const ulint * offsets,ulint i)1695 PageConverter::adjust_cluster_index_blob_column(
1696 	rec_t*		rec,
1697 	const ulint*	offsets,
1698 	ulint		i) UNIV_NOTHROW
1699 {
1700 	ulint		len;
1701 	byte*		field;
1702 
1703 	field = rec_get_nth_field(rec, offsets, i, &len);
1704 
1705 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1706 			len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1707 
1708 	if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1709 
1710 		char index_name[MAX_FULL_NAME_LEN + 1];
1711 
1712 		innobase_format_name(
1713 			index_name, sizeof(index_name),
1714 			m_cluster_index->name, TRUE);
1715 
1716 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1717 			ER_INNODB_INDEX_CORRUPT,
1718 			"Externally stored column(%lu) has a reference "
1719 			"length of %lu in the cluster index %s",
1720 			(ulong) i, (ulong) len, index_name);
1721 
1722 		return(DB_CORRUPTION);
1723 	}
1724 
1725 	field += BTR_EXTERN_SPACE_ID - BTR_EXTERN_FIELD_REF_SIZE + len;
1726 
1727 	if (is_compressed_table()) {
1728 		mach_write_to_4(field, get_space_id());
1729 
1730 		page_zip_write_blob_ptr(
1731 			m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1732 	} else {
1733 		mlog_write_ulint(field, get_space_id(), MLOG_4BYTES, 0);
1734 	}
1735 
1736 	return(DB_SUCCESS);
1737 }
1738 
1739 /**
1740 Adjusts the BLOB reference in the clustered index row for all externally
1741 stored columns.
1742 @param rec - record to update
1743 @param offsets - column offsets for the record
1744 @return DB_SUCCESS or error code */
1745 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const ulint * offsets)1746 PageConverter::adjust_cluster_index_blob_columns(
1747 	rec_t*		rec,
1748 	const ulint*	offsets) UNIV_NOTHROW
1749 {
1750 	ut_ad(rec_offs_any_extern(offsets));
1751 
1752 	/* Adjust the space_id in the BLOB pointers. */
1753 
1754 	for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1755 
1756 		/* Only if the column is stored "externally". */
1757 
1758 		if (rec_offs_nth_extern(offsets, i)) {
1759 			dberr_t	err;
1760 
1761 			err = adjust_cluster_index_blob_column(rec, offsets, i);
1762 
1763 			if (err != DB_SUCCESS) {
1764 				return(err);
1765 			}
1766 		}
1767 	}
1768 
1769 	return(DB_SUCCESS);
1770 }
1771 
1772 /**
1773 In the clustered index, adjust BLOB pointers as needed. Also update the
1774 BLOB reference, write the new space id.
1775 @param rec - record to update
1776 @param offsets - column offsets for the record
1777 @return DB_SUCCESS or error code */
1778 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const ulint * offsets)1779 PageConverter::adjust_cluster_index_blob_ref(
1780 	rec_t*		rec,
1781 	const ulint*	offsets) UNIV_NOTHROW
1782 {
1783 	if (rec_offs_any_extern(offsets)) {
1784 		dberr_t	err;
1785 
1786 		err = adjust_cluster_index_blob_columns(rec, offsets);
1787 
1788 		if (err != DB_SUCCESS) {
1789 			return(err);
1790 		}
1791 	}
1792 
1793 	return(DB_SUCCESS);
1794 }
1795 
1796 /**
1797 Purge delete-marked records, only if it is possible to do so without
1798 re-organising the B+tree.
1799 @param offsets - current row offsets.
1800 @return true if purge succeeded */
1801 bool
purge(const ulint * offsets)1802 PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
1803 {
1804 	const dict_index_t*	index = m_index->m_srv_index;
1805 
1806 	/* We can't have a page that is empty and not root. */
1807 	if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1808 
1809 		++m_index->m_stats.m_n_purged;
1810 
1811 		return(true);
1812 	} else {
1813 		++m_index->m_stats.m_n_purge_failed;
1814 	}
1815 
1816 	return(false);
1817 }
1818 
1819 /**
1820 Adjust the BLOB references and sys fields for the current record.
1821 @param rec - record to update
1822 @param offsets - column offsets for the record
1823 @param deleted - true if row is delete marked
1824 @return DB_SUCCESS or error code. */
1825 dberr_t
adjust_cluster_record(const dict_index_t * index,rec_t * rec,const ulint * offsets,bool deleted)1826 PageConverter::adjust_cluster_record(
1827 	const dict_index_t*	index,
1828 	rec_t*			rec,
1829 	const ulint*		offsets,
1830 	bool			deleted) UNIV_NOTHROW
1831 {
1832 	dberr_t	err;
1833 
1834 	if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1835 
1836 		/* Reset DB_TRX_ID and DB_ROLL_PTR.  Normally, these fields
1837 		are only written in conjunction with other changes to the
1838 		record. */
1839 
1840 		row_upd_rec_sys_fields(
1841 			rec, m_page_zip_ptr, m_cluster_index, m_offsets,
1842 			m_trx, 0);
1843 	}
1844 
1845 	return(err);
1846 }
1847 
1848 /**
1849 Update the BLOB refrences and write UNDO log entries for
1850 rows that can't be purged optimistically.
1851 @param block - block to update
1852 @retval DB_SUCCESS or error code */
1853 dberr_t
update_records(buf_block_t * block)1854 PageConverter::update_records(
1855 	buf_block_t*	block) UNIV_NOTHROW
1856 {
1857 	ibool	comp = dict_table_is_comp(m_cfg->m_table);
1858 	bool	clust_index = m_index->m_srv_index == m_cluster_index;
1859 
1860 	/* This will also position the cursor on the first user record. */
1861 
1862 	m_rec_iter.open(block);
1863 
1864 	while (!m_rec_iter.end()) {
1865 
1866 		rec_t*	rec = m_rec_iter.current();
1867 
1868 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1869 
1870 		/* For the clustered index we have to adjust the BLOB
1871 		reference and the system fields irrespective of the
1872 		delete marked flag. The adjustment of delete marked
1873 		cluster records is required for purge to work later. */
1874 
1875 		if (deleted || clust_index) {
1876 			m_offsets = rec_get_offsets(
1877 				rec, m_index->m_srv_index, m_offsets,
1878 				ULINT_UNDEFINED, &m_heap);
1879 		}
1880 
1881 		if (clust_index) {
1882 
1883 			dberr_t err = adjust_cluster_record(
1884 				m_index->m_srv_index, rec, m_offsets,
1885 				deleted);
1886 
1887 			if (err != DB_SUCCESS) {
1888 				return(err);
1889 			}
1890 		}
1891 
1892 		/* If it is a delete marked record then try an
1893 		optimistic delete. */
1894 
1895 		if (deleted) {
1896 			/* A successful purge will move the cursor to the
1897 			next record. */
1898 
1899 			if (!purge(m_offsets)) {
1900 				m_rec_iter.next();
1901 			}
1902 
1903 			++m_index->m_stats.m_n_deleted;
1904 		} else {
1905 			++m_index->m_stats.m_n_rows;
1906 			m_rec_iter.next();
1907 		}
1908 	}
1909 
1910 	return(DB_SUCCESS);
1911 }
1912 
1913 /**
1914 Update the space, index id, trx id.
1915 @return DB_SUCCESS or error code */
1916 dberr_t
update_index_page(buf_block_t * block)1917 PageConverter::update_index_page(
1918 	buf_block_t*	block) UNIV_NOTHROW
1919 {
1920 	index_id_t	id;
1921 	buf_frame_t*	page = block->frame;
1922 
1923 	if (is_free(buf_block_get_page_no(block))) {
1924 		return(DB_SUCCESS);
1925 	} else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1926 
1927 		row_index_t*	index = find_index(id);
1928 
1929 		if (index == 0) {
1930 			m_index = 0;
1931 			return(DB_CORRUPTION);
1932 		}
1933 
1934 		/* Update current index */
1935 		m_index = index;
1936 	}
1937 
1938 	/* If the .cfg file is missing and there is an index mismatch
1939 	then ignore the error. */
1940 	if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1941 		return(DB_SUCCESS);
1942 	}
1943 
1944 #ifdef UNIV_ZIP_DEBUG
1945 	ut_a(!is_compressed_table()
1946 	     || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1947 #endif /* UNIV_ZIP_DEBUG */
1948 
1949 	/* This has to be written to uncompressed index header. Set it to
1950 	the current index id. */
1951 	btr_page_set_index_id(
1952 		page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1953 
1954 	page_set_max_trx_id(block, m_page_zip_ptr, m_trx->id, 0);
1955 
1956 	if (page_is_empty(block->frame)) {
1957 
1958 		/* Only a root page can be empty. */
1959 		if (!is_root_page(block->frame)) {
1960 			// TODO: We should relax this and skip secondary
1961 			// indexes. Mark them as corrupt because they can
1962 			// always be rebuilt.
1963 			return(DB_CORRUPTION);
1964 		}
1965 
1966 		return(DB_SUCCESS);
1967 	}
1968 
1969 	if (!page_is_leaf(block->frame)) {
1970 		return (DB_SUCCESS);
1971 	}
1972 
1973 	return(update_records(block));
1974 }
1975 
1976 /**
1977 Validate the space flags and update tablespace header page.
1978 @param block - block read from file, not from the buffer pool.
1979 @retval DB_SUCCESS or error code */
1980 dberr_t
update_header(buf_block_t * block)1981 PageConverter::update_header(
1982 	buf_block_t*	block) UNIV_NOTHROW
1983 {
1984 	/* Check for valid header */
1985 	switch(fsp_header_get_space_id(get_frame(block))) {
1986 	case 0:
1987 		return(DB_CORRUPTION);
1988 	case ULINT_UNDEFINED:
1989 		ib_logf(IB_LOG_LEVEL_WARN,
1990 			"Space id check in the header failed "
1991 			"- ignored");
1992 	}
1993 
1994 	ulint		space_flags = fsp_header_get_flags(get_frame(block));
1995 
1996 	if (!fsp_flags_is_valid(space_flags)) {
1997 
1998 		ib_logf(IB_LOG_LEVEL_ERROR,
1999 			"Unsupported tablespace format %lu",
2000 			(ulong) space_flags);
2001 
2002 		return(DB_UNSUPPORTED);
2003 	}
2004 
2005 	mach_write_to_8(
2006 		get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN, m_current_lsn);
2007 
2008 	/* Write space_id to the tablespace header, page 0. */
2009 	mach_write_to_4(
2010 		get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2011 		get_space_id());
2012 
2013 	/* This is on every page in the tablespace. */
2014 	mach_write_to_4(
2015 		get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2016 		get_space_id());
2017 
2018 	return(DB_SUCCESS);
2019 }
2020 
2021 /**
2022 Update the page, set the space id, max trx id and index id.
2023 @param block - block read from file
2024 @retval DB_SUCCESS or error code */
2025 dberr_t
update_page(buf_block_t * block,ulint & page_type)2026 PageConverter::update_page(
2027 	buf_block_t*	block,
2028 	ulint&		page_type) UNIV_NOTHROW
2029 {
2030 	dberr_t		err = DB_SUCCESS;
2031 
2032 	switch (page_type = fil_page_get_type(get_frame(block))) {
2033 	case FIL_PAGE_TYPE_FSP_HDR:
2034 		/* Work directly on the uncompressed page headers. */
2035 		ut_a(buf_block_get_page_no(block) == 0);
2036 		return(update_header(block));
2037 
2038 	case FIL_PAGE_INDEX:
2039 		/* We need to decompress the contents into block->frame
2040 		before we can do any thing with Btree pages. */
2041 
2042 		if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2043 			return(DB_CORRUPTION);
2044 		}
2045 
2046 		/* This is on every page in the tablespace. */
2047 		mach_write_to_4(
2048 			get_frame(block)
2049 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2050 
2051 		/* Only update the Btree nodes. */
2052 		return(update_index_page(block));
2053 
2054 	case FIL_PAGE_TYPE_SYS:
2055 		/* This is page 0 in the system tablespace. */
2056 		return(DB_CORRUPTION);
2057 
2058 	case FIL_PAGE_TYPE_XDES:
2059 		err = set_current_xdes(
2060 			buf_block_get_page_no(block), get_frame(block));
2061 	case FIL_PAGE_INODE:
2062 	case FIL_PAGE_TYPE_TRX_SYS:
2063 	case FIL_PAGE_IBUF_FREE_LIST:
2064 	case FIL_PAGE_TYPE_ALLOCATED:
2065 	case FIL_PAGE_IBUF_BITMAP:
2066 	case FIL_PAGE_TYPE_BLOB:
2067 	case FIL_PAGE_TYPE_ZBLOB:
2068 	case FIL_PAGE_TYPE_ZBLOB2:
2069 
2070 		/* Work directly on the uncompressed page headers. */
2071 		/* This is on every page in the tablespace. */
2072 		mach_write_to_4(
2073 			get_frame(block)
2074 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2075 
2076 		return(err);
2077 	}
2078 
2079 	ib_logf(IB_LOG_LEVEL_WARN, "Unknown page type (%lu)", page_type);
2080 
2081 	return(DB_CORRUPTION);
2082 }
2083 
2084 /**
2085 Validate the page
2086 @param offset - physical offset within file.
2087 @param page - page read from file.
2088 @return status */
2089 PageConverter::import_page_status_t
validate(os_offset_t offset,buf_block_t * block)2090 PageConverter::validate(
2091 	os_offset_t	offset,
2092 	buf_block_t*	block) UNIV_NOTHROW
2093 {
2094 	buf_frame_t*	page = get_frame(block);
2095 
2096 	/* Check that the page number corresponds to the offset in
2097 	the file. Flag as corrupt if it doesn't. Disable the check
2098 	for LSN in buf_page_is_corrupted() */
2099 
2100 	if (buf_page_is_corrupted(false, page, get_zip_size())
2101 	    || (page_get_page_no(page) != offset / m_page_size
2102 		&& page_get_page_no(page) != 0)) {
2103 
2104 		return(IMPORT_PAGE_STATUS_CORRUPTED);
2105 
2106 	} else if (offset > 0 && page_get_page_no(page) == 0) {
2107 		ulint		checksum;
2108 
2109 		checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
2110 		if (checksum != 0) {
2111 			/* Checksum check passed in buf_page_is_corrupted(). */
2112 			ib_logf(IB_LOG_LEVEL_WARN,
2113 				"%s: Page %lu checksum %lu should be zero.",
2114 				m_filepath, (ulong) (offset / m_page_size),
2115 				checksum);
2116 		}
2117 
2118 		const byte*	b = page + FIL_PAGE_OFFSET;
2119 		const byte*	e = page + m_page_size
2120 				    - FIL_PAGE_END_LSN_OLD_CHKSUM;
2121 
2122 		/* If the page number is zero and offset > 0 then
2123 		the entire page MUST consist of zeroes. If not then
2124 		we flag it as corrupt. */
2125 
2126 		while (b != e) {
2127 
2128 			if (*b++ && !trigger_corruption()) {
2129 				return(IMPORT_PAGE_STATUS_CORRUPTED);
2130 			}
2131 		}
2132 
2133 		/* The page is all zero: do nothing. */
2134 		return(IMPORT_PAGE_STATUS_ALL_ZERO);
2135 	}
2136 
2137 	return(IMPORT_PAGE_STATUS_OK);
2138 }
2139 
2140 /**
2141 Called for every page in the tablespace. If the page was not
2142 updated then its state must be set to BUF_PAGE_NOT_USED.
2143 @param offset - physical offset within the file
2144 @param block - block read from file, note it is not from the buffer pool
2145 @retval DB_SUCCESS or error code. */
2146 dberr_t
operator ()(os_offset_t offset,buf_block_t * block)2147 PageConverter::operator() (
2148 	os_offset_t	offset,
2149 	buf_block_t*	block) UNIV_NOTHROW
2150 {
2151 	ulint		page_type;
2152 	dberr_t		err = DB_SUCCESS;
2153 
2154 	if ((err = periodic_check()) != DB_SUCCESS) {
2155 		return(err);
2156 	}
2157 
2158 	if (is_compressed_table()) {
2159 		m_page_zip_ptr = &block->page.zip;
2160 	} else {
2161 		ut_ad(m_page_zip_ptr == 0);
2162 	}
2163 
2164 	switch(validate(offset, block)) {
2165 	case IMPORT_PAGE_STATUS_OK:
2166 
2167 		/* We have to decompress the compressed pages before
2168 		we can work on them */
2169 
2170 		if ((err = update_page(block, page_type)) != DB_SUCCESS) {
2171 			return(err);
2172 		}
2173 
2174 		/* Note: For compressed pages this function will write to the
2175 		zip descriptor and for uncompressed pages it will write to
2176 		page (ie. the block->frame). Therefore the caller should write
2177 		out the descriptor contents and not block->frame for compressed
2178 		pages. */
2179 
2180 		if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) {
2181 
2182 			buf_flush_init_for_writing(
2183 				!is_compressed_table()
2184 				? block->frame : block->page.zip.data,
2185 				!is_compressed_table() ? 0 : m_page_zip_ptr,
2186 				m_current_lsn);
2187 		} else {
2188 			/* Calculate and update the checksum of non-btree
2189 			pages for compressed tables explicitly here. */
2190 
2191 			buf_flush_update_zip_checksum(
2192 				get_frame(block), get_zip_size(),
2193 				m_current_lsn);
2194 		}
2195 
2196 		break;
2197 
2198 	case IMPORT_PAGE_STATUS_ALL_ZERO:
2199 		/* The page is all zero: leave it as is. */
2200 		break;
2201 
2202 	case IMPORT_PAGE_STATUS_CORRUPTED:
2203 
2204 		ib_logf(IB_LOG_LEVEL_WARN,
2205 			"%s: Page %lu at offset " UINT64PF " looks corrupted.",
2206 			m_filepath, (ulong) (offset / m_page_size), offset);
2207 
2208 		return(DB_CORRUPTION);
2209 	}
2210 
2211 	return(err);
2212 }
2213 
2214 /*****************************************************************//**
2215 Clean up after import tablespace failure, this function will acquire
2216 the dictionary latches on behalf of the transaction if the transaction
2217 hasn't already acquired them. */
2218 static	MY_ATTRIBUTE((nonnull))
2219 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2220 row_import_discard_changes(
2221 /*=======================*/
2222 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2223 	trx_t*		trx,		/*!< in/out: transaction for import */
2224 	dberr_t		err)		/*!< in: error code */
2225 {
2226 	dict_table_t*	table = prebuilt->table;
2227 
2228 	ut_a(err != DB_SUCCESS);
2229 
2230 	prebuilt->trx->error_info = NULL;
2231 
2232 	char	table_name[MAX_FULL_NAME_LEN + 1];
2233 
2234 	innobase_format_name(
2235 		table_name, sizeof(table_name),
2236 		prebuilt->table->name, FALSE);
2237 
2238 	ib_logf(IB_LOG_LEVEL_INFO,
2239 		"Discarding tablespace of table %s: %s",
2240 		table_name, ut_strerr(err));
2241 
2242 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2243 		ut_a(trx->dict_operation_lock_mode == 0);
2244 		row_mysql_lock_data_dictionary(trx);
2245 	}
2246 
2247 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2248 
2249 	/* Since we update the index root page numbers on disk after
2250 	we've done a successful import. The table will not be loadable.
2251 	However, we need to ensure that the in memory root page numbers
2252 	are reset to "NULL". */
2253 
2254 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2255 		index != 0;
2256 		index = UT_LIST_GET_NEXT(indexes, index)) {
2257 
2258 		index->page = FIL_NULL;
2259 		index->space = FIL_NULL;
2260 	}
2261 
2262 	table->ibd_file_missing = TRUE;
2263 
2264 	fil_close_tablespace(trx, table->space);
2265 }
2266 
2267 /*****************************************************************//**
2268 Clean up after import tablespace. */
2269 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2270 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2271 row_import_cleanup(
2272 /*===============*/
2273 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2274 	trx_t*		trx,		/*!< in/out: transaction for import */
2275 	dberr_t		err)		/*!< in: error code */
2276 {
2277 	ut_a(prebuilt->trx != trx);
2278 
2279 	if (err != DB_SUCCESS) {
2280 		row_import_discard_changes(prebuilt, trx, err);
2281 	}
2282 
2283 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2284 
2285 	DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2286 
2287 	trx_commit_for_mysql(trx);
2288 
2289 	row_mysql_unlock_data_dictionary(trx);
2290 
2291 	trx_free_for_mysql(trx);
2292 
2293 	prebuilt->trx->op_info = "";
2294 
2295 	DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2296 
2297 	log_make_checkpoint_at(LSN_MAX, TRUE);
2298 
2299 	return(err);
2300 }
2301 
2302 /*****************************************************************//**
2303 Report error during tablespace import. */
2304 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2305 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2306 row_import_error(
2307 /*=============*/
2308 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2309 	trx_t*		trx,		/*!< in/out: transaction for import */
2310 	dberr_t		err)		/*!< in: error code */
2311 {
2312 	if (!trx_is_interrupted(trx)) {
2313 		char	table_name[MAX_FULL_NAME_LEN + 1];
2314 
2315 		innobase_format_name(
2316 			table_name, sizeof(table_name),
2317 			prebuilt->table->name, FALSE);
2318 
2319 		ib_senderrf(
2320 			trx->mysql_thd, IB_LOG_LEVEL_WARN,
2321 			ER_INNODB_IMPORT_ERROR,
2322 			table_name, (ulong) err, ut_strerr(err));
2323 	}
2324 
2325 	return(row_import_cleanup(prebuilt, trx, err));
2326 }
2327 
2328 /*****************************************************************//**
2329 Adjust the root page index node and leaf node segment headers, update
2330 with the new space id. For all the table's secondary indexes.
2331 @return error code */
2332 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2333 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(row_prebuilt_t * prebuilt,trx_t * trx,dict_table_t * table,const row_import & cfg)2334 row_import_adjust_root_pages_of_secondary_indexes(
2335 /*==============================================*/
2336 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2337 						handler */
2338 	trx_t*			trx,		/*!< in: transaction used for
2339 						the import */
2340 	dict_table_t*		table,		/*!< in: table the indexes
2341 						belong to */
2342 	const row_import&	cfg)		/*!< Import context */
2343 {
2344 	dict_index_t*		index;
2345 	ulint			n_rows_in_table;
2346 	dberr_t			err = DB_SUCCESS;
2347 
2348 	/* Skip the clustered index. */
2349 	index = dict_table_get_first_index(table);
2350 
2351 	n_rows_in_table = cfg.get_n_rows(index->name);
2352 
2353 	DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2354 			n_rows_in_table++;);
2355 
2356 	/* Adjust the root pages of the secondary indexes only. */
2357 	while ((index = dict_table_get_next_index(index)) != NULL) {
2358 		char		index_name[MAX_FULL_NAME_LEN + 1];
2359 
2360 		innobase_format_name(
2361 			index_name, sizeof(index_name), index->name, TRUE);
2362 
2363 		ut_a(!dict_index_is_clust(index));
2364 
2365 		if (!(index->type & DICT_CORRUPT)
2366 		    && index->space != FIL_NULL
2367 		    && index->page != FIL_NULL) {
2368 
2369 			/* Update the Btree segment headers for index node and
2370 			leaf nodes in the root page. Set the new space id. */
2371 
2372 			err = btr_root_adjust_on_import(index);
2373 		} else {
2374 			ib_logf(IB_LOG_LEVEL_WARN,
2375 				"Skip adjustment of root pages for "
2376 				"index %s.", index->name);
2377 
2378 			err = DB_CORRUPTION;
2379 		}
2380 
2381 		if (err != DB_SUCCESS) {
2382 
2383 			if (index->type & DICT_CLUSTERED) {
2384 				break;
2385 			}
2386 
2387 			ib_errf(trx->mysql_thd,
2388 				IB_LOG_LEVEL_WARN,
2389 				ER_INNODB_INDEX_CORRUPT,
2390 				"Index '%s' not found or corrupt, "
2391 				"you should recreate this index.",
2392 				index_name);
2393 
2394 			/* Do not bail out, so that the data
2395 			can be recovered. */
2396 
2397 			err = DB_SUCCESS;
2398 			index->type |= DICT_CORRUPT;
2399 			continue;
2400 		}
2401 
2402 		/* If we failed to purge any records in the index then
2403 		do it the hard way.
2404 
2405 		TODO: We can do this in the first pass by generating UNDO log
2406 		records for the failed rows. */
2407 
2408 		if (!cfg.requires_purge(index->name)) {
2409 			continue;
2410 		}
2411 
2412 		IndexPurge   purge(trx, index);
2413 
2414 		trx->op_info = "secondary: purge delete marked records";
2415 
2416 		err = purge.garbage_collect();
2417 
2418 		trx->op_info = "";
2419 
2420 		if (err != DB_SUCCESS) {
2421 			break;
2422 		} else if (purge.get_n_rows() != n_rows_in_table) {
2423 
2424 			ib_errf(trx->mysql_thd,
2425 				IB_LOG_LEVEL_WARN,
2426 				ER_INNODB_INDEX_CORRUPT,
2427 				"Index '%s' contains %lu entries, "
2428 				"should be %lu, you should recreate "
2429 				"this index.", index_name,
2430 				(ulong) purge.get_n_rows(),
2431 				(ulong) n_rows_in_table);
2432 
2433 			index->type |= DICT_CORRUPT;
2434 
2435 			/* Do not bail out, so that the data
2436 			can be recovered. */
2437 
2438 			err = DB_SUCCESS;
2439                 }
2440 	}
2441 
2442 	return(err);
2443 }
2444 
2445 /*****************************************************************//**
2446 Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
2447 @return error code */
2448 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2449 dberr_t
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2450 row_import_set_sys_max_row_id(
2451 /*==========================*/
2452 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2453 						handler */
2454 	const dict_table_t*	table)		/*!< in: table to import */
2455 {
2456 	dberr_t			err;
2457 	const rec_t*		rec;
2458 	mtr_t			mtr;
2459 	btr_pcur_t		pcur;
2460 	row_id_t		row_id	= 0;
2461 	dict_index_t*		index;
2462 
2463 	index = dict_table_get_first_index(table);
2464 	ut_a(dict_index_is_clust(index));
2465 
2466 	mtr_start(&mtr);
2467 
2468 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2469 
2470 	btr_pcur_open_at_index_side(
2471 		false,		// High end
2472 		index,
2473 		BTR_SEARCH_LEAF,
2474 		&pcur,
2475 		true,		// Init cursor
2476 		0,		// Leaf level
2477 		&mtr);
2478 
2479 	btr_pcur_move_to_prev_on_page(&pcur);
2480 	rec = btr_pcur_get_rec(&pcur);
2481 
2482 	/* Check for empty table. */
2483 	if (!page_rec_is_infimum(rec)) {
2484 		ulint		len;
2485 		const byte*	field;
2486 		mem_heap_t*	heap = NULL;
2487 		ulint		offsets_[1 + REC_OFFS_HEADER_SIZE];
2488 		ulint*		offsets;
2489 
2490 		rec_offs_init(offsets_);
2491 
2492 		offsets = rec_get_offsets(
2493 			rec, index, offsets_, ULINT_UNDEFINED, &heap);
2494 
2495 		field = rec_get_nth_field(
2496 			rec, offsets,
2497 			dict_index_get_sys_col_pos(index, DATA_ROW_ID),
2498 			&len);
2499 
2500 		if (len == DATA_ROW_ID_LEN) {
2501 			row_id = mach_read_from_6(field);
2502 			err = DB_SUCCESS;
2503 		} else {
2504 			err = DB_CORRUPTION;
2505 		}
2506 
2507 		if (heap != NULL) {
2508 			mem_heap_free(heap);
2509 		}
2510 	} else {
2511 		/* The table is empty. */
2512 		err = DB_SUCCESS;
2513 	}
2514 
2515 	btr_pcur_close(&pcur);
2516 	mtr_commit(&mtr);
2517 
2518 	DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
2519 			err = DB_CORRUPTION;);
2520 
2521 	if (err != DB_SUCCESS) {
2522 		char		index_name[MAX_FULL_NAME_LEN + 1];
2523 
2524 		innobase_format_name(
2525 			index_name, sizeof(index_name), index->name, TRUE);
2526 
2527 		ib_errf(prebuilt->trx->mysql_thd,
2528 			IB_LOG_LEVEL_WARN,
2529 			ER_INNODB_INDEX_CORRUPT,
2530 			"Index '%s' corruption detected, invalid DB_ROW_ID "
2531 			"in index.", index_name);
2532 
2533 		return(err);
2534 
2535 	} else if (row_id > 0) {
2536 
2537 		/* Update the system row id if the imported index row id is
2538 		greater than the max system row id. */
2539 
2540 		mutex_enter(&dict_sys->mutex);
2541 
2542 		if (row_id >= dict_sys->row_id) {
2543 			dict_sys->row_id = row_id + 1;
2544 			dict_hdr_flush_row_id();
2545 		}
2546 
2547 		mutex_exit(&dict_sys->mutex);
2548 	}
2549 
2550 	return(DB_SUCCESS);
2551 }
2552 
2553 /*****************************************************************//**
2554 Read the a string from the meta data file.
2555 @return DB_SUCCESS or error code. */
2556 static
2557 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2558 row_import_cfg_read_string(
2559 /*=======================*/
2560 	FILE*		file,		/*!< in/out: File to read from */
2561 	byte*		ptr,		/*!< out: string to read */
2562 	ulint		max_len)	/*!< in: maximum length of the output
2563 					buffer in bytes */
2564 {
2565 	DBUG_EXECUTE_IF("ib_import_string_read_error",
2566 			errno = EINVAL; return(DB_IO_ERROR););
2567 
2568 	ulint		len = 0;
2569 
2570 	while (!feof(file)) {
2571 		int	ch = fgetc(file);
2572 
2573 		if (ch == EOF) {
2574 			break;
2575 		} else if (ch != 0) {
2576 			if (len < max_len) {
2577 				ptr[len++] = ch;
2578 			} else {
2579 				break;
2580 			}
2581 		/* max_len includes the NUL byte */
2582 		} else if (len != max_len - 1) {
2583 			break;
2584 		} else {
2585 			ptr[len] = 0;
2586 			return(DB_SUCCESS);
2587 		}
2588 	}
2589 
2590 	errno = EINVAL;
2591 
2592 	return(DB_IO_ERROR);
2593 }
2594 
2595 /*********************************************************************//**
2596 Write the meta data (index user fields) config file.
2597 @return DB_SUCCESS or error code. */
2598 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2599 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index,row_import * cfg)2600 row_import_cfg_read_index_fields(
2601 /*=============================*/
2602 	FILE*			file,	/*!< in: file to write to */
2603 	THD*			thd,	/*!< in/out: session */
2604 	row_index_t*		index,	/*!< Index being read in */
2605 	row_import*		cfg)	/*!< in/out: meta-data read */
2606 {
2607 	byte			row[sizeof(ib_uint32_t) * 3];
2608 	ulint			n_fields = index->m_n_fields;
2609 
2610 	index->m_fields = new(std::nothrow) dict_field_t[n_fields];
2611 
2612 	/* Trigger OOM */
2613 	DBUG_EXECUTE_IF("ib_import_OOM_4",
2614 			delete [] index->m_fields; index->m_fields = 0;);
2615 
2616 	if (index->m_fields == 0) {
2617 		return(DB_OUT_OF_MEMORY);
2618 	}
2619 
2620 	dict_field_t*	field = index->m_fields;
2621 
2622 	memset(field, 0x0, sizeof(*field) * n_fields);
2623 
2624 	for (ulint i = 0; i < n_fields; ++i, ++field) {
2625 		byte*		ptr = row;
2626 
2627 		/* Trigger EOF */
2628 		DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2629 				(void) fseek(file, 0L, SEEK_END););
2630 
2631 		if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2632 
2633 			ib_senderrf(
2634 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2635 				errno, strerror(errno),
2636 				"while reading index fields.");
2637 
2638 			return(DB_IO_ERROR);
2639 		}
2640 
2641 		field->prefix_len = mach_read_from_4(ptr);
2642 		ptr += sizeof(ib_uint32_t);
2643 
2644 		field->fixed_len = mach_read_from_4(ptr);
2645 		ptr += sizeof(ib_uint32_t);
2646 
2647 		/* Include the NUL byte in the length. */
2648 		ulint	len = mach_read_from_4(ptr);
2649 
2650 		byte*	name = new(std::nothrow) byte[len];
2651 
2652 		/* Trigger OOM */
2653 		DBUG_EXECUTE_IF("ib_import_OOM_5", delete [] name; name = 0;);
2654 
2655 		if (name == 0) {
2656 			return(DB_OUT_OF_MEMORY);
2657 		}
2658 
2659 		field->name = reinterpret_cast<const char*>(name);
2660 
2661 		dberr_t	err = row_import_cfg_read_string(file, name, len);
2662 
2663 		if (err != DB_SUCCESS) {
2664 
2665 			ib_senderrf(
2666 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2667 				errno, strerror(errno),
2668 				"while parsing table name.");
2669 
2670 			return(err);
2671 		}
2672 	}
2673 
2674 	return(DB_SUCCESS);
2675 }
2676 
2677 /*****************************************************************//**
2678 Read the index names and root page numbers of the indexes and set the values.
2679 Row format [root_page_no, len of str, str ... ]
2680 @return DB_SUCCESS or error code. */
2681 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2682 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2683 row_import_read_index_data(
2684 /*=======================*/
2685 	FILE*		file,		/*!< in: File to read from */
2686 	THD*		thd,		/*!< in: session */
2687 	row_import*	cfg)		/*!< in/out: meta-data read */
2688 {
2689 	byte*		ptr;
2690 	row_index_t*	cfg_index;
2691 	byte		row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2692 
2693 	/* FIXME: What is the max value? */
2694 	ut_a(cfg->m_n_indexes > 0);
2695 	ut_a(cfg->m_n_indexes < 1024);
2696 
2697 	cfg->m_indexes = new(std::nothrow) row_index_t[cfg->m_n_indexes];
2698 
2699 	/* Trigger OOM */
2700 	DBUG_EXECUTE_IF("ib_import_OOM_6",
2701 			delete [] cfg->m_indexes; cfg->m_indexes = 0;);
2702 
2703 	if (cfg->m_indexes == 0) {
2704 		return(DB_OUT_OF_MEMORY);
2705 	}
2706 
2707 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2708 
2709 	cfg_index = cfg->m_indexes;
2710 
2711 	for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2712 		/* Trigger EOF */
2713 		DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2714 				(void) fseek(file, 0L, SEEK_END););
2715 
2716 		/* Read the index data. */
2717 		size_t	n_bytes = fread(row, 1, sizeof(row), file);
2718 
2719 		/* Trigger EOF */
2720 		DBUG_EXECUTE_IF("ib_import_io_read_error",
2721 				(void) fseek(file, 0L, SEEK_END););
2722 
2723 		if (n_bytes != sizeof(row)) {
2724 			char	msg[BUFSIZ];
2725 
2726 			ut_snprintf(msg, sizeof(msg),
2727 				    "while reading index meta-data, expected "
2728 				    "to read %lu bytes but read only %lu "
2729 				    "bytes",
2730 				    (ulong) sizeof(row), (ulong) n_bytes);
2731 
2732 			ib_senderrf(
2733 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2734 				errno, strerror(errno), msg);
2735 
2736 			ib_logf(IB_LOG_LEVEL_ERROR, "IO Error: %s", msg);
2737 
2738 			return(DB_IO_ERROR);
2739 		}
2740 
2741 		ptr = row;
2742 
2743 		cfg_index->m_id = mach_read_from_8(ptr);
2744 		ptr += sizeof(index_id_t);
2745 
2746 		cfg_index->m_space = mach_read_from_4(ptr);
2747 		ptr += sizeof(ib_uint32_t);
2748 
2749 		cfg_index->m_page_no = mach_read_from_4(ptr);
2750 		ptr += sizeof(ib_uint32_t);
2751 
2752 		cfg_index->m_type = mach_read_from_4(ptr);
2753 		ptr += sizeof(ib_uint32_t);
2754 
2755 		cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2756 		if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2757 			ut_ad(0);
2758 			/* Overflow. Pretend that the clustered index
2759 			has a variable-length PRIMARY KEY. */
2760 			cfg_index->m_trx_id_offset = 0;
2761 		}
2762 		ptr += sizeof(ib_uint32_t);
2763 
2764 		cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2765 		ptr += sizeof(ib_uint32_t);
2766 
2767 		cfg_index->m_n_uniq = mach_read_from_4(ptr);
2768 		ptr += sizeof(ib_uint32_t);
2769 
2770 		cfg_index->m_n_nullable = mach_read_from_4(ptr);
2771 		ptr += sizeof(ib_uint32_t);
2772 
2773 		cfg_index->m_n_fields = mach_read_from_4(ptr);
2774 		ptr += sizeof(ib_uint32_t);
2775 
2776 		/* The NUL byte is included in the name length. */
2777 		ulint	len = mach_read_from_4(ptr);
2778 
2779 		if (len > OS_FILE_MAX_PATH) {
2780 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2781 				ER_INNODB_INDEX_CORRUPT,
2782 				"Index name length (%lu) is too long, "
2783 				"the meta-data is corrupt", len);
2784 
2785 			return(DB_CORRUPTION);
2786 		}
2787 
2788 		cfg_index->m_name = new(std::nothrow) byte[len];
2789 
2790 		/* Trigger OOM */
2791 		DBUG_EXECUTE_IF("ib_import_OOM_7",
2792 				delete [] cfg_index->m_name;
2793 				cfg_index->m_name = 0;);
2794 
2795 		if (cfg_index->m_name == 0) {
2796 			return(DB_OUT_OF_MEMORY);
2797 		}
2798 
2799 		dberr_t	err;
2800 
2801 		err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2802 
2803 		if (err != DB_SUCCESS) {
2804 
2805 			ib_senderrf(
2806 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2807 				errno, strerror(errno),
2808 				"while parsing index name.");
2809 
2810 			return(err);
2811 		}
2812 
2813 		err = row_import_cfg_read_index_fields(
2814 			file, thd, cfg_index, cfg);
2815 
2816 		if (err != DB_SUCCESS) {
2817 			return(err);
2818 		}
2819 
2820 	}
2821 
2822 	return(DB_SUCCESS);
2823 }
2824 
2825 /*****************************************************************//**
2826 Set the index root page number for v1 format.
2827 @return DB_SUCCESS or error code. */
2828 static
2829 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2830 row_import_read_indexes(
2831 /*====================*/
2832 	FILE*		file,		/*!< in: File to read from */
2833 	THD*		thd,		/*!< in: session */
2834 	row_import*	cfg)		/*!< in/out: meta-data read */
2835 {
2836 	byte		row[sizeof(ib_uint32_t)];
2837 
2838 	/* Trigger EOF */
2839 	DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2840 			(void) fseek(file, 0L, SEEK_END););
2841 
2842 	/* Read the number of indexes. */
2843 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2844 		ib_senderrf(
2845 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2846 			errno, strerror(errno),
2847 			"while reading number of indexes.");
2848 
2849 		return(DB_IO_ERROR);
2850 	}
2851 
2852 	cfg->m_n_indexes = mach_read_from_4(row);
2853 
2854 	if (cfg->m_n_indexes == 0) {
2855 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2856 			"Number of indexes in meta-data file is 0");
2857 
2858 		return(DB_CORRUPTION);
2859 
2860 	} else if (cfg->m_n_indexes > 1024) {
2861 		// FIXME: What is the upper limit? */
2862 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2863 			"Number of indexes in meta-data file is too high: %lu",
2864 			(ulong) cfg->m_n_indexes);
2865 		cfg->m_n_indexes = 0;
2866 
2867 		return(DB_CORRUPTION);
2868 	}
2869 
2870 	return(row_import_read_index_data(file, thd, cfg));
2871 }
2872 
2873 /*********************************************************************//**
2874 Read the meta data (table columns) config file. Deserialise the contents of
2875 dict_col_t structure, along with the column name. */
2876 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2877 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2878 row_import_read_columns(
2879 /*====================*/
2880 	FILE*			file,	/*!< in: file to write to */
2881 	THD*			thd,	/*!< in/out: session */
2882 	row_import*		cfg)	/*!< in/out: meta-data read */
2883 {
2884 	dict_col_t*		col;
2885 	byte			row[sizeof(ib_uint32_t) * 8];
2886 
2887 	/* FIXME: What should the upper limit be? */
2888 	ut_a(cfg->m_n_cols > 0);
2889 	ut_a(cfg->m_n_cols < 1024);
2890 
2891 	cfg->m_cols = new(std::nothrow) dict_col_t[cfg->m_n_cols];
2892 
2893 	/* Trigger OOM */
2894 	DBUG_EXECUTE_IF("ib_import_OOM_8",
2895 			delete [] cfg->m_cols; cfg->m_cols = 0;);
2896 
2897 	if (cfg->m_cols == 0) {
2898 		return(DB_OUT_OF_MEMORY);
2899 	}
2900 
2901 	cfg->m_col_names = new(std::nothrow) byte* [cfg->m_n_cols];
2902 
2903 	/* Trigger OOM */
2904 	DBUG_EXECUTE_IF("ib_import_OOM_9",
2905 			delete [] cfg->m_col_names; cfg->m_col_names = 0;);
2906 
2907 	if (cfg->m_col_names == 0) {
2908 		return(DB_OUT_OF_MEMORY);
2909 	}
2910 
2911 	memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2912 	memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2913 
2914 	col = cfg->m_cols;
2915 
2916 	for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2917 		byte*		ptr = row;
2918 
2919 		/* Trigger EOF */
2920 		DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2921 				(void) fseek(file, 0L, SEEK_END););
2922 
2923 		if (fread(row, 1,  sizeof(row), file) != sizeof(row)) {
2924 			ib_senderrf(
2925 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2926 				errno, strerror(errno),
2927 				"while reading table column meta-data.");
2928 
2929 			return(DB_IO_ERROR);
2930 		}
2931 
2932 		col->prtype = mach_read_from_4(ptr);
2933 		ptr += sizeof(ib_uint32_t);
2934 
2935 		col->mtype = mach_read_from_4(ptr);
2936 		ptr += sizeof(ib_uint32_t);
2937 
2938 		col->len = mach_read_from_4(ptr);
2939 		ptr += sizeof(ib_uint32_t);
2940 
2941 		col->mbminmaxlen = mach_read_from_4(ptr);
2942 		ptr += sizeof(ib_uint32_t);
2943 
2944 		col->ind = mach_read_from_4(ptr);
2945 		ptr += sizeof(ib_uint32_t);
2946 
2947 		col->ord_part = mach_read_from_4(ptr);
2948 		ptr += sizeof(ib_uint32_t);
2949 
2950 		col->max_prefix = mach_read_from_4(ptr);
2951 		ptr += sizeof(ib_uint32_t);
2952 
2953 		/* Read in the column name as [len, byte array]. The len
2954 		includes the NUL byte. */
2955 
2956 		ulint		len = mach_read_from_4(ptr);
2957 
2958 		/* FIXME: What is the maximum column name length? */
2959 		if (len == 0 || len > 128) {
2960 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2961 				ER_IO_READ_ERROR,
2962 				"Column name length %lu, is invalid",
2963 				(ulong) len);
2964 
2965 			return(DB_CORRUPTION);
2966 		}
2967 
2968 		cfg->m_col_names[i] = new(std::nothrow) byte[len];
2969 
2970 		/* Trigger OOM */
2971 		DBUG_EXECUTE_IF("ib_import_OOM_10",
2972 				delete [] cfg->m_col_names[i];
2973 				cfg->m_col_names[i] = 0;);
2974 
2975 		if (cfg->m_col_names[i] == 0) {
2976 			return(DB_OUT_OF_MEMORY);
2977 		}
2978 
2979 		dberr_t	err;
2980 
2981 		err = row_import_cfg_read_string(
2982 			file, cfg->m_col_names[i], len);
2983 
2984 		if (err != DB_SUCCESS) {
2985 
2986 			ib_senderrf(
2987 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2988 				errno, strerror(errno),
2989 				"while parsing table column name.");
2990 
2991 			return(err);
2992 		}
2993 	}
2994 
2995 	return(DB_SUCCESS);
2996 }
2997 
2998 /*****************************************************************//**
2999 Read the contents of the <tablespace>.cfg file.
3000 @return DB_SUCCESS or error code. */
3001 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3002 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)3003 row_import_read_v1(
3004 /*===============*/
3005 	FILE*		file,		/*!< in: File to read from */
3006 	THD*		thd,		/*!< in: session */
3007 	row_import*	cfg)		/*!< out: meta data */
3008 {
3009 	byte		value[sizeof(ib_uint32_t)];
3010 
3011 	/* Trigger EOF */
3012 	DBUG_EXECUTE_IF("ib_import_io_read_error_5",
3013 			(void) fseek(file, 0L, SEEK_END););
3014 
3015 	/* Read the hostname where the tablespace was exported. */
3016 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
3017 		ib_senderrf(
3018 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3019 			errno, strerror(errno),
3020 			"while reading meta-data export hostname length.");
3021 
3022 		return(DB_IO_ERROR);
3023 	}
3024 
3025 	ulint	len = mach_read_from_4(value);
3026 
3027 	/* NUL byte is part of name length. */
3028 	cfg->m_hostname = new(std::nothrow) byte[len];
3029 
3030 	/* Trigger OOM */
3031 	DBUG_EXECUTE_IF("ib_import_OOM_1",
3032 			delete [] cfg->m_hostname; cfg->m_hostname = 0;);
3033 
3034 	if (cfg->m_hostname == 0) {
3035 		return(DB_OUT_OF_MEMORY);
3036 	}
3037 
3038 	dberr_t	err = row_import_cfg_read_string(file, cfg->m_hostname, len);
3039 
3040 	if (err != DB_SUCCESS) {
3041 
3042 		ib_senderrf(
3043 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3044 			errno, strerror(errno),
3045 			"while parsing export hostname.");
3046 
3047 		return(err);
3048 	}
3049 
3050 	/* Trigger EOF */
3051 	DBUG_EXECUTE_IF("ib_import_io_read_error_6",
3052 			(void) fseek(file, 0L, SEEK_END););
3053 
3054 	/* Read the table name of tablespace that was exported. */
3055 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
3056 		ib_senderrf(
3057 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3058 			errno, strerror(errno),
3059 			"while reading meta-data table name length.");
3060 
3061 		return(DB_IO_ERROR);
3062 	}
3063 
3064 	len = mach_read_from_4(value);
3065 
3066 	/* NUL byte is part of name length. */
3067 	cfg->m_table_name = new(std::nothrow) byte[len];
3068 
3069 	/* Trigger OOM */
3070 	DBUG_EXECUTE_IF("ib_import_OOM_2",
3071 			delete [] cfg->m_table_name; cfg->m_table_name = 0;);
3072 
3073 	if (cfg->m_table_name == 0) {
3074 		return(DB_OUT_OF_MEMORY);
3075 	}
3076 
3077 	err = row_import_cfg_read_string(file, cfg->m_table_name, len);
3078 
3079 	if (err != DB_SUCCESS) {
3080 		ib_senderrf(
3081 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3082 			errno, strerror(errno),
3083 			"while parsing table name.");
3084 
3085 		return(err);
3086 	}
3087 
3088 	ib_logf(IB_LOG_LEVEL_INFO,
3089 		"Importing tablespace for table '%s' that was exported "
3090 		"from host '%s'", cfg->m_table_name, cfg->m_hostname);
3091 
3092 	byte		row[sizeof(ib_uint32_t) * 3];
3093 
3094 	/* Trigger EOF */
3095 	DBUG_EXECUTE_IF("ib_import_io_read_error_7",
3096 			(void) fseek(file, 0L, SEEK_END););
3097 
3098 	/* Read the autoinc value. */
3099 	if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
3100 		ib_senderrf(
3101 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3102 			errno, strerror(errno),
3103 			"while reading autoinc value.");
3104 
3105 		return(DB_IO_ERROR);
3106 	}
3107 
3108 	cfg->m_autoinc = mach_read_from_8(row);
3109 
3110 	/* Trigger EOF */
3111 	DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3112 			(void) fseek(file, 0L, SEEK_END););
3113 
3114 	/* Read the tablespace page size. */
3115 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3116 		ib_senderrf(
3117 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3118 			errno, strerror(errno),
3119 			"while reading meta-data header.");
3120 
3121 		return(DB_IO_ERROR);
3122 	}
3123 
3124 	byte*		ptr = row;
3125 
3126 	cfg->m_page_size = mach_read_from_4(ptr);
3127 	ptr += sizeof(ib_uint32_t);
3128 
3129 	if (cfg->m_page_size != UNIV_PAGE_SIZE) {
3130 
3131 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3132 			"Tablespace to be imported has a different "
3133 			"page size than this server. Server page size "
3134 			"is %lu, whereas tablespace page size is %lu",
3135 			UNIV_PAGE_SIZE, (ulong) cfg->m_page_size);
3136 
3137 		return(DB_ERROR);
3138 	}
3139 
3140 	cfg->m_flags = mach_read_from_4(ptr);
3141 	ptr += sizeof(ib_uint32_t);
3142 
3143 	cfg->m_n_cols = mach_read_from_4(ptr);
3144 
3145 	if (!dict_tf_is_valid(cfg->m_flags)) {
3146 
3147 		return(DB_CORRUPTION);
3148 
3149 	} else if ((err = row_import_read_columns(file, thd, cfg))
3150 		   != DB_SUCCESS) {
3151 
3152 		return(err);
3153 
3154 	} else  if ((err = row_import_read_indexes(file, thd, cfg))
3155 		   != DB_SUCCESS) {
3156 
3157 		return(err);
3158 	}
3159 
3160 	ut_a(err == DB_SUCCESS);
3161 	return(err);
3162 }
3163 
3164 /**
3165 Read the contents of the <tablespace>.cfg file.
3166 @return DB_SUCCESS or error code. */
3167 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3168 dberr_t
row_import_read_meta_data(dict_table_t * table,FILE * file,THD * thd,row_import & cfg)3169 row_import_read_meta_data(
3170 /*======================*/
3171 	dict_table_t*	table,		/*!< in: table */
3172 	FILE*		file,		/*!< in: File to read from */
3173 	THD*		thd,		/*!< in: session */
3174 	row_import&	cfg)		/*!< out: contents of the .cfg file */
3175 {
3176 	byte		row[sizeof(ib_uint32_t)];
3177 
3178 	/* Trigger EOF */
3179 	DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3180 			(void) fseek(file, 0L, SEEK_END););
3181 
3182 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3183 		ib_senderrf(
3184 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3185 			errno, strerror(errno),
3186 			"while reading meta-data version.");
3187 
3188 		return(DB_IO_ERROR);
3189 	}
3190 
3191 	cfg.m_version = mach_read_from_4(row);
3192 
3193 	/* Check the version number. */
3194 	switch (cfg.m_version) {
3195 	case IB_EXPORT_CFG_VERSION_V1:
3196 
3197 		return(row_import_read_v1(file, thd, &cfg));
3198 	default:
3199 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3200 			"Unsupported meta-data version number (%lu), "
3201 			"file ignored", (ulong) cfg.m_version);
3202 	}
3203 
3204 	return(DB_ERROR);
3205 }
3206 
3207 /**
3208 Read the contents of the <tablename>.cfg file.
3209 @return DB_SUCCESS or error code. */
3210 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3211 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3212 row_import_read_cfg(
3213 /*================*/
3214 	dict_table_t*	table,	/*!< in: table */
3215 	THD*		thd,	/*!< in: session */
3216 	row_import&	cfg)	/*!< out: contents of the .cfg file */
3217 {
3218 	dberr_t		err;
3219 	char		name[OS_FILE_MAX_PATH];
3220 
3221 	cfg.m_table = table;
3222 
3223 	srv_get_meta_data_filename(table, name, sizeof(name));
3224 
3225 	FILE*	file = fopen(name, "rb");
3226 
3227 	if (file == NULL) {
3228 		char	msg[BUFSIZ];
3229 
3230 		ut_snprintf(msg, sizeof(msg),
3231 			    "Error opening '%s', will attempt to import "
3232 			    "without schema verification", name);
3233 
3234 		ib_senderrf(
3235 			thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3236 			errno, strerror(errno), msg);
3237 
3238 		cfg.m_missing = true;
3239 
3240 		err = DB_FAIL;
3241 	} else {
3242 
3243 		cfg.m_missing = false;
3244 
3245 		err = row_import_read_meta_data(table, file, thd, cfg);
3246 		fclose(file);
3247 	}
3248 
3249 	return(err);
3250 }
3251 
3252 /*****************************************************************//**
3253 Update the <space, root page> of a table's indexes from the values
3254 in the data dictionary.
3255 @return DB_SUCCESS or error code */
3256 UNIV_INTERN
3257 dberr_t
row_import_update_index_root(trx_t * trx,const dict_table_t * table,bool reset,bool dict_locked)3258 row_import_update_index_root(
3259 /*=========================*/
3260 	trx_t*			trx,		/*!< in/out: transaction that
3261 						covers the update */
3262 	const dict_table_t*	table,		/*!< in: Table for which we want
3263 						to set the root page_no */
3264 	bool			reset,		/*!< in: if true then set to
3265 						FIL_NUL */
3266 	bool			dict_locked)	/*!< in: Set to true if the
3267 						caller already owns the
3268 						dict_sys_t:: mutex. */
3269 
3270 {
3271 	const dict_index_t*	index;
3272 	que_t*			graph = 0;
3273 	dberr_t			err = DB_SUCCESS;
3274 
3275 	static const char	sql[] = {
3276 		"PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3277 		"BEGIN\n"
3278 		"UPDATE SYS_INDEXES\n"
3279 		"SET SPACE = :space,\n"
3280 		"    PAGE_NO = :page,\n"
3281 		"    TYPE = :type\n"
3282 		"WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3283 		"END;\n"};
3284 
3285 	if (!dict_locked) {
3286 		mutex_enter(&dict_sys->mutex);
3287 	}
3288 
3289 	for (index = dict_table_get_first_index(table);
3290 	     index != 0;
3291 	     index = dict_table_get_next_index(index)) {
3292 
3293 		pars_info_t*	info;
3294 		ib_uint32_t	page;
3295 		ib_uint32_t	space;
3296 		ib_uint32_t	type;
3297 		index_id_t	index_id;
3298 		table_id_t	table_id;
3299 
3300 		info = (graph != 0) ? graph->info : pars_info_create();
3301 
3302 		mach_write_to_4(
3303 			reinterpret_cast<byte*>(&type),
3304 			index->type);
3305 
3306 		mach_write_to_4(
3307 			reinterpret_cast<byte*>(&page),
3308 			reset ? FIL_NULL : index->page);
3309 
3310 		mach_write_to_4(
3311 			reinterpret_cast<byte*>(&space),
3312 			reset ? FIL_NULL : index->space);
3313 
3314 		mach_write_to_8(
3315 			reinterpret_cast<byte*>(&index_id),
3316 			index->id);
3317 
3318 		mach_write_to_8(
3319 			reinterpret_cast<byte*>(&table_id),
3320 			table->id);
3321 
3322 		/* If we set the corrupt bit during the IMPORT phase then
3323 		we need to update the system tables. */
3324 		pars_info_bind_int4_literal(info, "type", &type);
3325 		pars_info_bind_int4_literal(info, "space", &space);
3326 		pars_info_bind_int4_literal(info, "page", &page);
3327 		pars_info_bind_ull_literal(info, "index_id", &index_id);
3328 		pars_info_bind_ull_literal(info, "table_id", &table_id);
3329 
3330 		if (graph == 0) {
3331 			graph = pars_sql(info, sql);
3332 			ut_a(graph);
3333 			graph->trx = trx;
3334 		}
3335 
3336 		que_thr_t*	thr;
3337 
3338 		graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3339 
3340 		ut_a(thr = que_fork_start_command(graph));
3341 
3342 		que_run_threads(thr);
3343 
3344 		DBUG_EXECUTE_IF("ib_import_internal_error",
3345 				trx->error_state = DB_ERROR;);
3346 
3347 		err = trx->error_state;
3348 
3349 		if (err != DB_SUCCESS) {
3350 			char		index_name[MAX_FULL_NAME_LEN + 1];
3351 
3352 			innobase_format_name(
3353 				index_name, sizeof(index_name),
3354 				index->name, TRUE);
3355 
3356 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3357 				ER_INTERNAL_ERROR,
3358 				"While updating the <space, root page "
3359 				"number> of index %s - %s",
3360 				index_name, ut_strerr(err));
3361 
3362 			break;
3363 		}
3364 	}
3365 
3366 	que_graph_free(graph);
3367 
3368 	if (!dict_locked) {
3369 		mutex_exit(&dict_sys->mutex);
3370 	}
3371 
3372 	return(err);
3373 }
3374 
3375 /** Callback arg for row_import_set_discarded. */
3376 struct discard_t {
3377 	ib_uint32_t	flags2;			/*!< Value read from column */
3378 	bool		state;			/*!< New state of the flag */
3379 	ulint		n_recs;			/*!< Number of recs processed */
3380 };
3381 
3382 /******************************************************************//**
3383 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3384 SYS_TABLES. The flags is stored in MIX_LEN column.
3385 @return FALSE if all OK */
3386 static
3387 ibool
row_import_set_discarded(void * row,void * user_arg)3388 row_import_set_discarded(
3389 /*=====================*/
3390 	void*		row,			/*!< in: sel_node_t* */
3391 	void*		user_arg)		/*!< in: bool set/unset flag */
3392 {
3393 	sel_node_t*	node = static_cast<sel_node_t*>(row);
3394 	discard_t*	discard = static_cast<discard_t*>(user_arg);
3395 	dfield_t*	dfield = que_node_get_val(node->select_list);
3396 	dtype_t*	type = dfield_get_type(dfield);
3397 	ulint		len = dfield_get_len(dfield);
3398 
3399 	ut_a(dtype_get_mtype(type) == DATA_INT);
3400 	ut_a(len == sizeof(ib_uint32_t));
3401 
3402 	ulint	flags2 = mach_read_from_4(
3403 		static_cast<byte*>(dfield_get_data(dfield)));
3404 
3405 	if (discard->state) {
3406 		flags2 |= DICT_TF2_DISCARDED;
3407 	} else {
3408 		flags2 &= ~DICT_TF2_DISCARDED;
3409 	}
3410 
3411 	mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3412 
3413 	++discard->n_recs;
3414 
3415 	/* There should be at most one matching record. */
3416 	ut_a(discard->n_recs == 1);
3417 
3418 	return(FALSE);
3419 }
3420 
3421 /*****************************************************************//**
3422 Update the DICT_TF2_DISCARDED flag in SYS_TABLES.
3423 @return DB_SUCCESS or error code. */
3424 UNIV_INTERN
3425 dberr_t
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded,bool dict_locked)3426 row_import_update_discarded_flag(
3427 /*=============================*/
3428 	trx_t*		trx,		/*!< in/out: transaction that
3429 					covers the update */
3430 	table_id_t	table_id,	/*!< in: Table for which we want
3431 					to set the root table->flags2 */
3432 	bool		discarded,	/*!< in: set MIX_LEN column bit
3433 					to discarded, if true */
3434 	bool		dict_locked)	/*!< in: set to true if the
3435 					caller already owns the
3436 					dict_sys_t:: mutex. */
3437 
3438 {
3439 	pars_info_t*		info;
3440 	discard_t		discard;
3441 
3442 	static const char	sql[] =
3443 		"PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3444 		"DECLARE FUNCTION my_func;\n"
3445 		"DECLARE CURSOR c IS\n"
3446 		" SELECT MIX_LEN "
3447 		" FROM SYS_TABLES "
3448 		" WHERE ID = :table_id FOR UPDATE;"
3449 		"\n"
3450 		"BEGIN\n"
3451 		"OPEN c;\n"
3452 		"WHILE 1 = 1 LOOP\n"
3453 		"  FETCH c INTO my_func();\n"
3454 		"  IF c % NOTFOUND THEN\n"
3455 		"    EXIT;\n"
3456 		"  END IF;\n"
3457 		"END LOOP;\n"
3458 		"UPDATE SYS_TABLES"
3459 		" SET MIX_LEN = :flags2"
3460 		" WHERE ID = :table_id;\n"
3461 		"CLOSE c;\n"
3462 		"END;\n";
3463 
3464 	discard.n_recs = 0;
3465 	discard.state = discarded;
3466 	discard.flags2 = ULINT32_UNDEFINED;
3467 
3468 	info = pars_info_create();
3469 
3470 	pars_info_add_ull_literal(info, "table_id", table_id);
3471 	pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3472 
3473 	pars_info_bind_function(
3474 		info, "my_func", row_import_set_discarded, &discard);
3475 
3476 	dberr_t	err = que_eval_sql(info, sql, !dict_locked, trx);
3477 
3478 	ut_a(discard.n_recs == 1);
3479 	ut_a(discard.flags2 != ULINT32_UNDEFINED);
3480 
3481 	return(err);
3482 }
3483 
3484 /*****************************************************************//**
3485 Imports a tablespace. The space id in the .ibd file must match the space id
3486 of the table in the data dictionary.
3487 @return	error code or DB_SUCCESS */
3488 UNIV_INTERN
3489 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)3490 row_import_for_mysql(
3491 /*=================*/
3492 	dict_table_t*	table,		/*!< in/out: table */
3493 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL */
3494 {
3495 	dberr_t		err;
3496 	trx_t*		trx;
3497 	ib_uint64_t	autoinc = 0;
3498 	char		table_name[MAX_FULL_NAME_LEN + 1];
3499 	char*		filepath = NULL;
3500 
3501 	ut_ad(!srv_read_only_mode);
3502 
3503 	innobase_format_name(
3504 		table_name, sizeof(table_name), table->name, FALSE);
3505 
3506 	ut_a(table->space);
3507 	ut_ad(prebuilt->trx);
3508 	ut_a(table->ibd_file_missing);
3509 
3510 	trx_start_if_not_started(prebuilt->trx);
3511 
3512 	trx = trx_allocate_for_mysql();
3513 
3514 	/* So that the table is not DROPped during recovery. */
3515 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3516 
3517 	trx_start_if_not_started(trx);
3518 
3519 	/* So that we can send error messages to the user. */
3520 	trx->mysql_thd = prebuilt->trx->mysql_thd;
3521 
3522 	/* Ensure that the table will be dropped by trx_rollback_active()
3523 	in case of a crash. */
3524 
3525 	trx->table_id = table->id;
3526 
3527 	/* Assign an undo segment for the transaction, so that the
3528 	transaction will be recovered after a crash. */
3529 
3530 	mutex_enter(&trx->undo_mutex);
3531 
3532 	err = trx_undo_assign_undo(trx, TRX_UNDO_UPDATE);
3533 
3534 	mutex_exit(&trx->undo_mutex);
3535 
3536 	DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
3537 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
3538 
3539 	if (err != DB_SUCCESS) {
3540 
3541 		return(row_import_cleanup(prebuilt, trx, err));
3542 
3543 	} else if (trx->update_undo == 0) {
3544 
3545 		err = DB_TOO_MANY_CONCURRENT_TRXS;
3546 		return(row_import_cleanup(prebuilt, trx, err));
3547 	}
3548 
3549 	prebuilt->trx->op_info = "read meta-data file";
3550 
3551 	/* Prevent DDL operations while we are checking. */
3552 
3553 	rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
3554 
3555 	row_import	cfg;
3556 	ulint		space_flags = 0;
3557 
3558 	memset(&cfg, 0x0, sizeof(cfg));
3559 
3560 	err = row_import_read_cfg(table, trx->mysql_thd, cfg);
3561 
3562 	/* Check if the table column definitions match the contents
3563 	of the config file. */
3564 
3565 	if (err == DB_SUCCESS) {
3566 
3567 		/* We have a schema file, try and match it with the our
3568 		data dictionary. */
3569 
3570 		err = cfg.match_schema(trx->mysql_thd);
3571 
3572 		/* Update index->page and SYS_INDEXES.PAGE_NO to match the
3573 		B-tree root page numbers in the tablespace. Use the index
3574 		name from the .cfg file to find match. */
3575 
3576 		if (err == DB_SUCCESS) {
3577 			cfg.set_root_by_name();
3578 			autoinc = cfg.m_autoinc;
3579 		}
3580 
3581 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3582 
3583 		DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
3584 				err = DB_TOO_MANY_CONCURRENT_TRXS;);
3585 
3586 	} else if (cfg.m_missing) {
3587 
3588 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3589 
3590 		/* We don't have a schema file, we will have to discover
3591 		the index root pages from the .ibd file and skip the schema
3592 		matching step. */
3593 
3594 		ut_a(err == DB_FAIL);
3595 
3596 		cfg.m_page_size = UNIV_PAGE_SIZE;
3597 
3598 		FetchIndexRootPages	fetchIndexRootPages(table, trx);
3599 
3600 		err = fil_tablespace_iterate(
3601 			table, IO_BUFFER_SIZE(cfg.m_page_size),
3602 			fetchIndexRootPages);
3603 
3604 		if (err == DB_SUCCESS) {
3605 
3606 			err = fetchIndexRootPages.build_row_import(&cfg);
3607 
3608 			/* Update index->page and SYS_INDEXES.PAGE_NO
3609 			to match the B-tree root page numbers in the
3610 			tablespace. */
3611 
3612 			if (err == DB_SUCCESS) {
3613 				err = cfg.set_root_by_heuristic();
3614 			}
3615 		}
3616 
3617 		space_flags = fetchIndexRootPages.get_space_flags();
3618 
3619 		/* If the fsp flag is set for data_dir, but table flag is not
3620 		set for data_dir or vice versa then return error. */
3621 		if (err == DB_SUCCESS
3622 			&& FSP_FLAGS_HAS_DATA_DIR(space_flags) !=
3623 			DICT_TF_HAS_DATA_DIR(table->flags)) {
3624 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3625 				ER_TABLE_SCHEMA_MISMATCH,
3626 				"Table location flags do not match. "
3627 				"The source table %s a DATA DIRECTORY "
3628 				"but the destination table %s.",
3629 				(FSP_FLAGS_HAS_DATA_DIR(space_flags) ? "uses"
3630 				: "does not use"),
3631 				(DICT_TF_HAS_DATA_DIR(table->flags) ? "does"
3632 				: "does not"));
3633 			err = DB_ERROR;
3634 			return(row_import_error(prebuilt, trx, err));
3635 		}
3636 
3637 	} else {
3638 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
3639 	}
3640 
3641 	if (err != DB_SUCCESS) {
3642 		return(row_import_error(prebuilt, trx, err));
3643 	}
3644 
3645 	prebuilt->trx->op_info = "importing tablespace";
3646 
3647 	ib_logf(IB_LOG_LEVEL_INFO, "Phase I - Update all pages");
3648 
3649 	/* Iterate over all the pages and do the sanity checking and
3650 	the conversion required to import the tablespace. */
3651 
3652 	PageConverter	converter(&cfg, trx);
3653 
3654 	/* Set the IO buffer size in pages. */
3655 
3656 	err = fil_tablespace_iterate(
3657 		table, IO_BUFFER_SIZE(cfg.m_page_size), converter);
3658 
3659 	DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
3660 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
3661 
3662 	if (err != DB_SUCCESS) {
3663 		char	table_name[MAX_FULL_NAME_LEN + 1];
3664 
3665 		innobase_format_name(
3666 			table_name, sizeof(table_name), table->name, FALSE);
3667 
3668 		ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3669 			ER_INTERNAL_ERROR,
3670 			"Cannot reset LSNs in table '%s' : %s",
3671 			table_name, ut_strerr(err));
3672 
3673 		return(row_import_cleanup(prebuilt, trx, err));
3674 	}
3675 
3676 	row_mysql_lock_data_dictionary(trx);
3677 
3678 	/* If the table is stored in a remote tablespace, we need to
3679 	determine that filepath from the link file and system tables.
3680 	Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
3681 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3682 		dict_get_and_save_data_dir_path(table, true);
3683 		ut_a(table->data_dir_path);
3684 
3685 		filepath = os_file_make_remote_pathname(
3686 			table->data_dir_path, table->name, "ibd");
3687 	} else {
3688 		filepath = fil_make_ibd_name(table->name, false);
3689 	}
3690 	ut_a(filepath);
3691 
3692 	/* Open the tablespace so that we can access via the buffer pool.
3693 	We set the 2nd param (fix_dict = true) here because we already
3694 	have an x-lock on dict_operation_lock and dict_sys->mutex. */
3695 
3696 	err = fil_open_single_table_tablespace(
3697 		true, true, table->space,
3698 		dict_tf_to_fsp_flags(table->flags),
3699 		table->name, filepath);
3700 
3701 	DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
3702 			err = DB_TABLESPACE_NOT_FOUND;);
3703 
3704 	if (err != DB_SUCCESS) {
3705 		row_mysql_unlock_data_dictionary(trx);
3706 
3707 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3708 			ER_FILE_NOT_FOUND,
3709 			filepath, err, ut_strerr(err));
3710 
3711 		mem_free(filepath);
3712 
3713 		return(row_import_cleanup(prebuilt, trx, err));
3714 	}
3715 
3716 	row_mysql_unlock_data_dictionary(trx);
3717 
3718 	mem_free(filepath);
3719 
3720 	err = ibuf_check_bitmap_on_import(trx, table->space);
3721 
3722 	DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
3723 
3724 	if (err != DB_SUCCESS) {
3725 		return(row_import_cleanup(prebuilt, trx, err));
3726 	}
3727 
3728 	/* The first index must always be the clustered index. */
3729 
3730 	dict_index_t*	index = dict_table_get_first_index(table);
3731 
3732 	if (!dict_index_is_clust(index)) {
3733 		return(row_import_error(prebuilt, trx, DB_CORRUPTION));
3734 	}
3735 
3736 	/* Update the Btree segment headers for index node and
3737 	leaf nodes in the root page. Set the new space id. */
3738 
3739 	err = btr_root_adjust_on_import(index);
3740 
3741 	DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
3742 			err = DB_CORRUPTION;);
3743 
3744 	if (err != DB_SUCCESS) {
3745 		return(row_import_error(prebuilt, trx, err));
3746 	}
3747 
3748 	if (err != DB_SUCCESS) {
3749 		return(row_import_error(prebuilt, trx, err));
3750 	} else if (cfg.requires_purge(index->name)) {
3751 
3752 		/* Purge any delete-marked records that couldn't be
3753 		purged during the page conversion phase from the
3754 		cluster index. */
3755 
3756 		IndexPurge	purge(trx, index);
3757 
3758 		trx->op_info = "cluster: purging delete marked records";
3759 
3760 		err = purge.garbage_collect();
3761 
3762 		trx->op_info = "";
3763 	}
3764 
3765 	DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
3766 
3767 	if (err != DB_SUCCESS) {
3768 		return(row_import_error(prebuilt, trx, err));
3769 	}
3770 
3771 	/* For secondary indexes, purge any records that couldn't be purged
3772 	during the page conversion phase. */
3773 
3774 	err = row_import_adjust_root_pages_of_secondary_indexes(
3775 		prebuilt, trx, table, cfg);
3776 
3777 	DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
3778 			err = DB_CORRUPTION;);
3779 
3780 	if (err != DB_SUCCESS) {
3781 		return(row_import_error(prebuilt, trx, err));
3782 	}
3783 
3784 	/* Ensure that the next available DB_ROW_ID is not smaller than
3785 	any DB_ROW_ID stored in the table. */
3786 
3787 	if (prebuilt->clust_index_was_generated) {
3788 
3789 		err = row_import_set_sys_max_row_id(prebuilt, table);
3790 
3791 		if (err != DB_SUCCESS) {
3792 			return(row_import_error(prebuilt, trx, err));
3793 		}
3794 	}
3795 
3796 	ib_logf(IB_LOG_LEVEL_INFO, "Phase III - Flush changes to disk");
3797 
3798 	/* Ensure that all pages dirtied during the IMPORT make it to disk.
3799 	The only dirty pages generated should be from the pessimistic purge
3800 	of delete marked records that couldn't be purged in Phase I. */
3801 
3802 	buf_LRU_flush_or_remove_pages(
3803 		prebuilt->table->space, BUF_REMOVE_FLUSH_WRITE, trx);
3804 
3805 	if (trx_is_interrupted(trx)) {
3806 		ib_logf(IB_LOG_LEVEL_INFO, "Phase III - Flush interrupted");
3807 		return(row_import_error(prebuilt, trx, DB_INTERRUPTED));
3808 	} else {
3809 		ib_logf(IB_LOG_LEVEL_INFO, "Phase IV - Flush complete");
3810 	}
3811 
3812 	/* The dictionary latches will be released in in row_import_cleanup()
3813 	after the transaction commit, for both success and error. */
3814 
3815 	row_mysql_lock_data_dictionary(trx);
3816 
3817 	/* Update the root pages of the table's indexes. */
3818 	err = row_import_update_index_root(trx, table, false, true);
3819 
3820 	if (err != DB_SUCCESS) {
3821 		return(row_import_error(prebuilt, trx, err));
3822 	}
3823 
3824 	/* Update the table's discarded flag, unset it. */
3825 	err = row_import_update_discarded_flag(trx, table->id, false, true);
3826 
3827 	if (err != DB_SUCCESS) {
3828 		return(row_import_error(prebuilt, trx, err));
3829 	}
3830 
3831 	table->ibd_file_missing = false;
3832 	table->flags2 &= ~DICT_TF2_DISCARDED;
3833 
3834 	if (autoinc != 0) {
3835 		char	table_name[MAX_FULL_NAME_LEN + 1];
3836 
3837 		innobase_format_name(
3838 			table_name, sizeof(table_name), table->name, FALSE);
3839 
3840 		ib_logf(IB_LOG_LEVEL_INFO, "%s autoinc value set to " IB_ID_FMT,
3841 			table_name, autoinc);
3842 
3843 		dict_table_autoinc_lock(table);
3844 		dict_table_autoinc_initialize(table, autoinc);
3845 		dict_table_autoinc_unlock(table);
3846 	}
3847 
3848 	ut_a(err == DB_SUCCESS);
3849 
3850 	return(row_import_cleanup(prebuilt, trx, err));
3851 }
3852 
3853