1 /*****************************************************************************
2 
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23 
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26 
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #include "row0row.h"
44 #ifdef HAVE_LZO
45 #include "lzo/lzo1x.h"
46 #endif
47 #ifdef HAVE_SNAPPY
48 #include "snappy-c.h"
49 #endif
50 
51 #include "scope.h"
52 
53 #include <vector>
54 
55 #ifdef HAVE_MY_AES_H
56 #include <my_aes.h>
57 #endif
58 
59 using st_::span;
60 
61 /** The size of the buffer to use for IO.
62 @param n physical page size
63 @return number of pages */
64 #define IO_BUFFER_SIZE(n)	((1024 * 1024) / (n))
65 
66 /** For gathering stats on records during phase I */
67 struct row_stats_t {
68 	ulint		m_n_deleted;		/*!< Number of deleted records
69 						found in the index */
70 
71 	ulint		m_n_purged;		/*!< Number of records purged
72 						optimisatically */
73 
74 	ulint		m_n_rows;		/*!< Number of rows */
75 
76 	ulint		m_n_purge_failed;	/*!< Number of deleted rows
77 						that could not be purged */
78 };
79 
80 /** Index information required by IMPORT. */
81 struct row_index_t {
82 	index_id_t	m_id;			/*!< Index id of the table
83 						in the exporting server */
84 	byte*		m_name;			/*!< Index name */
85 
86 	ulint		m_space;		/*!< Space where it is placed */
87 
88 	ulint		m_page_no;		/*!< Root page number */
89 
90 	ulint		m_type;			/*!< Index type */
91 
92 	ulint		m_trx_id_offset;	/*!< Relevant only for clustered
93 						indexes, offset of transaction
94 						id system column */
95 
96 	ulint		m_n_user_defined_cols;	/*!< User defined columns */
97 
98 	ulint		m_n_uniq;		/*!< Number of columns that can
99 						uniquely identify the row */
100 
101 	ulint		m_n_nullable;		/*!< Number of nullable
102 						columns */
103 
104 	ulint		m_n_fields;		/*!< Total number of fields */
105 
106 	dict_field_t*	m_fields;		/*!< Index fields */
107 
108 	const dict_index_t*
109 			m_srv_index;		/*!< Index instance in the
110 						importing server */
111 
112 	row_stats_t	m_stats;		/*!< Statistics gathered during
113 						the import phase */
114 
115 };
116 
117 /** Meta data required by IMPORT. */
118 struct row_import {
row_importrow_import119 	row_import() UNIV_NOTHROW
120 		:
121 		m_table(NULL),
122 		m_version(0),
123 		m_hostname(NULL),
124 		m_table_name(NULL),
125 		m_autoinc(0),
126 		m_zip_size(0),
127 		m_flags(0),
128 		m_n_cols(0),
129 		m_cols(NULL),
130 		m_col_names(NULL),
131 		m_n_indexes(0),
132 		m_indexes(NULL),
133 		m_missing(true) { }
134 
135 	~row_import() UNIV_NOTHROW;
136 
137 	/** Find the index entry in in the indexes array.
138 	@param name index name
139 	@return instance if found else 0. */
140 	row_index_t* get_index(const char* name) const UNIV_NOTHROW;
141 
142 	/** Get the number of rows in the index.
143 	@param name index name
144 	@return number of rows (doesn't include delete marked rows). */
145 	ulint	get_n_rows(const char* name) const UNIV_NOTHROW;
146 
147 	/** Find the ordinal value of the column name in the cfg table columns.
148 	@param name of column to look for.
149 	@return ULINT_UNDEFINED if not found. */
150 	ulint find_col(const char* name) const UNIV_NOTHROW;
151 
152 	/** Get the number of rows for which purge failed during the
153 	convert phase.
154 	@param name index name
155 	@return number of rows for which purge failed. */
156 	ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
157 
158 	/** Check if the index is clean. ie. no delete-marked records
159 	@param name index name
160 	@return true if index needs to be purged. */
requires_purgerow_import161 	bool requires_purge(const char* name) const UNIV_NOTHROW
162 	{
163 		return(get_n_purge_failed(name) > 0);
164 	}
165 
166 	/** Set the index root <space, pageno> using the index name */
167 	void set_root_by_name() UNIV_NOTHROW;
168 
169 	/** Set the index root <space, pageno> using a heuristic
170 	@return DB_SUCCESS or error code */
171 	dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172 
173 	/** Check if the index schema that was read from the .cfg file
174 	matches the in memory index definition.
175 	Note: It will update row_import_t::m_srv_index to map the meta-data
176 	read from the .cfg file to the server index instance.
177 	@return DB_SUCCESS or error code. */
178 	dberr_t match_index_columns(
179 		THD*			thd,
180 		const dict_index_t*	index) UNIV_NOTHROW;
181 
182 	/** Check if the table schema that was read from the .cfg file
183 	matches the in memory table definition.
184 	@param thd MySQL session variable
185 	@return DB_SUCCESS or error code. */
186 	dberr_t match_table_columns(
187 		THD*			thd) UNIV_NOTHROW;
188 
189 	/** Check if the table (and index) schema that was read from the
190 	.cfg file matches the in memory table definition.
191 	@param thd MySQL session variable
192 	@return DB_SUCCESS or error code. */
193 	dberr_t match_schema(
194 		THD*			thd) UNIV_NOTHROW;
195 
196 	dberr_t match_flags(THD *thd) const ;
197 
198 
199 	dict_table_t*	m_table;		/*!< Table instance */
200 
201 	ulint		m_version;		/*!< Version of config file */
202 
203 	byte*		m_hostname;		/*!< Hostname where the
204 						tablespace was exported */
205 	byte*		m_table_name;		/*!< Exporting instance table
206 						name */
207 
208 	ib_uint64_t	m_autoinc;		/*!< Next autoinc value */
209 
210 	ulint		m_zip_size;		/*!< ROW_FORMAT=COMPRESSED
211 						page size, or 0 */
212 
213 	ulint		m_flags;		/*!< Table flags */
214 
215 	ulint		m_n_cols;		/*!< Number of columns in the
216 						meta-data file */
217 
218 	dict_col_t*	m_cols;			/*!< Column data */
219 
220 	byte**		m_col_names;		/*!< Column names, we store the
221 						column naems separately becuase
222 						there is no field to store the
223 						value in dict_col_t */
224 
225 	ulint		m_n_indexes;		/*!< Number of indexes,
226 						including clustered index */
227 
228 	row_index_t*	m_indexes;		/*!< Index meta data */
229 
230 	bool		m_missing;		/*!< true if a .cfg file was
231 						found and was readable */
232 };
233 
234 struct fil_iterator_t {
235 	pfs_os_file_t	file;			/*!< File handle */
236 	const char*	filepath;		/*!< File path name */
237 	os_offset_t	start;			/*!< From where to start */
238 	os_offset_t	end;			/*!< Where to stop */
239 	os_offset_t	file_size;		/*!< File size in bytes */
240 	ulint		n_io_buffers;		/*!< Number of pages to use
241 						for IO */
242 	byte*		io_buffer;		/*!< Buffer to use for IO */
243 	fil_space_crypt_t *crypt_data;		/*!< Crypt data (if encrypted) */
244 	byte*           crypt_io_buffer;        /*!< IO buffer when encrypted */
245 };
246 
247 /** Use the page cursor to iterate over records in a block. */
248 class RecIterator {
249 public:
250 	/** Default constructor */
RecIterator()251 	RecIterator() UNIV_NOTHROW
252 	{
253 		memset(&m_cur, 0x0, sizeof(m_cur));
254 	}
255 
256 	/** Position the cursor on the first user record. */
open(buf_block_t * block)257 	void	open(buf_block_t* block) UNIV_NOTHROW
258 	{
259 		page_cur_set_before_first(block, &m_cur);
260 
261 		if (!end()) {
262 			next();
263 		}
264 	}
265 
266 	/** Move to the next record. */
next()267 	void	next() UNIV_NOTHROW
268 	{
269 		page_cur_move_to_next(&m_cur);
270 	}
271 
272 	/**
273 	@return the current record */
current()274 	rec_t*	current() UNIV_NOTHROW
275 	{
276 		ut_ad(!end());
277 		return(page_cur_get_rec(&m_cur));
278 	}
279 
280 	/**
281 	@return true if cursor is at the end */
end()282 	bool	end() UNIV_NOTHROW
283 	{
284 		return(page_cur_is_after_last(&m_cur) == TRUE);
285 	}
286 
287 	/** Remove the current record
288 	@return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,rec_offs * offsets)289 	bool remove(
290 		const dict_index_t*	index,
291 		page_zip_des_t*		page_zip,
292 		rec_offs*		offsets) UNIV_NOTHROW
293 	{
294 		/* We can't end up with an empty page unless it is root. */
295 		if (page_get_n_recs(m_cur.block->frame) <= 1) {
296 			return(false);
297 		}
298 
299 		return(page_delete_rec(index, &m_cur, page_zip, offsets));
300 	}
301 
302 private:
303 	page_cur_t	m_cur;
304 };
305 
306 /** Class that purges delete marked reocords from indexes, both secondary
307 and cluster. It does a pessimistic delete. This should only be done if we
308 couldn't purge the delete marked reocrds during Phase I. */
309 class IndexPurge {
310 public:
311 	/** Constructor
312 	@param trx the user transaction covering the import tablespace
313 	@param index to be imported
314 	@param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)315 	IndexPurge(
316 		trx_t*		trx,
317 		dict_index_t*	index) UNIV_NOTHROW
318 		:
319 		m_trx(trx),
320 		m_index(index),
321 		m_n_rows(0)
322 	{
323 		ib::info() << "Phase II - Purge records from index "
324 			<< index->name;
325 	}
326 
327 	/** Descructor */
~IndexPurge()328 	~IndexPurge() UNIV_NOTHROW { }
329 
330 	/** Purge delete marked records.
331 	@return DB_SUCCESS or error code. */
332 	dberr_t	garbage_collect() UNIV_NOTHROW;
333 
334 	/** The number of records that are not delete marked.
335 	@return total records in the index after purge */
get_n_rows() const336 	ulint	get_n_rows() const UNIV_NOTHROW
337 	{
338 		return(m_n_rows);
339 	}
340 
341 private:
342 	/** Begin import, position the cursor on the first record. */
343 	void	open() UNIV_NOTHROW;
344 
345 	/** Close the persistent curosr and commit the mini-transaction. */
346 	void	close() UNIV_NOTHROW;
347 
348 	/** Position the cursor on the next record.
349 	@return DB_SUCCESS or error code */
350 	dberr_t	next() UNIV_NOTHROW;
351 
352 	/** Store the persistent cursor position and reopen the
353 	B-tree cursor in BTR_MODIFY_TREE mode, because the
354 	tree structure may be changed during a pessimistic delete. */
355 	void	purge_pessimistic_delete() UNIV_NOTHROW;
356 
357 	/** Purge delete-marked records.
358 	@param offsets current row offsets. */
359 	void	purge() UNIV_NOTHROW;
360 
361 protected:
362 	// Disable copying
363 	IndexPurge();
364 	IndexPurge(const IndexPurge&);
365 	IndexPurge &operator=(const IndexPurge&);
366 
367 private:
368 	trx_t*			m_trx;		/*!< User transaction */
369 	mtr_t			m_mtr;		/*!< Mini-transaction */
370 	btr_pcur_t		m_pcur;		/*!< Persistent cursor */
371 	dict_index_t*		m_index;	/*!< Index to be processed */
372 	ulint			m_n_rows;	/*!< Records in index */
373 };
374 
375 /** Functor that is called for each physical page that is read from the
376 tablespace file.  */
377 class AbstractCallback
378 {
379 public:
380 	/** Constructor
381 	@param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)382 	AbstractCallback(trx_t* trx, ulint space_id)
383 		:
384 		m_zip_size(0),
385 		m_trx(trx),
386 		m_space(space_id),
387 		m_xdes(),
388 		m_xdes_page_no(ULINT_UNDEFINED),
389 		m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
390 
391 	/** Free any extent descriptor instance */
~AbstractCallback()392 	virtual ~AbstractCallback()
393 	{
394 		UT_DELETE_ARRAY(m_xdes);
395 	}
396 
397 	/** Determine the page size to use for traversing the tablespace
398 	@param file_size size of the tablespace file in bytes
399 	@param block contents of the first page in the tablespace file.
400 	@retval DB_SUCCESS or error code. */
401 	virtual dberr_t init(
402 		os_offset_t		file_size,
403 		const buf_block_t*	block) UNIV_NOTHROW;
404 
405 	/** @return true if compressed table. */
is_compressed_table() const406 	bool is_compressed_table() const UNIV_NOTHROW
407 	{
408 		return get_zip_size();
409 	}
410 
411 	/** @return the tablespace flags */
get_space_flags() const412 	ulint get_space_flags() const
413 	{
414 		return(m_space_flags);
415 	}
416 
417 	/**
418 	Set the name of the physical file and the file handle that is used
419 	to open it for the file that is being iterated over.
420 	@param filename the physical name of the tablespace file
421 	@param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)422 	void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
423 	{
424 		m_file = file;
425 		m_filepath = filename;
426 	}
427 
get_zip_size() const428 	ulint get_zip_size() const { return m_zip_size; }
physical_size() const429 	ulint physical_size() const
430 	{
431 		return m_zip_size ? m_zip_size : srv_page_size;
432 	}
433 
filename() const434 	const char* filename() const { return m_filepath; }
435 
436 	/**
437 	Called for every page in the tablespace. If the page was not
438 	updated then its state must be set to BUF_PAGE_NOT_USED. For
439 	compressed tables the page descriptor memory will be at offset:
440 		block->frame + srv_page_size;
441 	@param block block read from file, note it is not from the buffer pool
442 	@retval DB_SUCCESS or error code. */
443 	virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
444 
445 	/** @return the tablespace identifier */
get_space_id() const446 	ulint get_space_id() const { return m_space; }
447 
is_interrupted() const448 	bool is_interrupted() const { return trx_is_interrupted(m_trx); }
449 
450 	/**
451 	Get the data page depending on the table type, compressed or not.
452 	@param block - block read from disk
453 	@retval the buffer frame */
get_frame(const buf_block_t * block)454 	static byte* get_frame(const buf_block_t* block)
455 	{
456 		return block->page.zip.data
457 			? block->page.zip.data : block->frame;
458 	}
459 
460 	/** Invoke the functionality for the callback */
461 	virtual dberr_t run(const fil_iterator_t& iter,
462 			    buf_block_t* block) UNIV_NOTHROW = 0;
463 
464 protected:
465 	/** Get the physical offset of the extent descriptor within the page.
466 	@param page_no page number of the extent descriptor
467 	@param page contents of the page containing the extent descriptor.
468 	@return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const469 	const xdes_t* xdes(
470 		ulint		page_no,
471 		const page_t*	page) const UNIV_NOTHROW
472 	{
473 		ulint	offset;
474 
475 		offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
476 
477 		return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
478 	}
479 
480 	/** Set the current page directory (xdes). If the extent descriptor is
481 	marked as free then free the current extent descriptor and set it to
482 	0. This implies that all pages that are covered by this extent
483 	descriptor are also freed.
484 
485 	@param page_no offset of page within the file
486 	@param page page contents
487 	@return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)488 	dberr_t	set_current_xdes(
489 		ulint		page_no,
490 		const page_t*	page) UNIV_NOTHROW
491 	{
492 		m_xdes_page_no = page_no;
493 
494 		UT_DELETE_ARRAY(m_xdes);
495 		m_xdes = NULL;
496 
497 		if (mach_read_from_4(XDES_ARR_OFFSET + XDES_STATE + page)
498 		    != XDES_FREE) {
499 			const ulint physical_size = m_zip_size
500 				? m_zip_size : srv_page_size;
501 
502 			m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t, physical_size);
503 
504 			/* Trigger OOM */
505 			DBUG_EXECUTE_IF(
506 				"ib_import_OOM_13",
507 				UT_DELETE_ARRAY(m_xdes);
508 				m_xdes = NULL;
509 			);
510 
511 			if (m_xdes == NULL) {
512 				return(DB_OUT_OF_MEMORY);
513 			}
514 
515 			memcpy(m_xdes, page, physical_size);
516 		}
517 
518 		return(DB_SUCCESS);
519 	}
520 
521 	/** Check if the page is marked as free in the extent descriptor.
522 	@param page_no page number to check in the extent descriptor.
523 	@return true if the page is marked as free */
is_free(ulint page_no) const524 	bool is_free(ulint page_no) const UNIV_NOTHROW
525 	{
526 		ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
527 		     == m_xdes_page_no);
528 
529 		if (m_xdes != 0) {
530 			const xdes_t*	xdesc = xdes(page_no, m_xdes);
531 			ulint		pos = page_no % FSP_EXTENT_SIZE;
532 
533 			return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
534 		}
535 
536 		/* If the current xdes was free, the page must be free. */
537 		return(true);
538 	}
539 
540 protected:
541 	/** The ROW_FORMAT=COMPRESSED page size, or 0. */
542 	ulint			m_zip_size;
543 
544 	/** File handle to the tablespace */
545 	pfs_os_file_t		m_file;
546 
547 	/** Physical file path. */
548 	const char*		m_filepath;
549 
550 	/** Covering transaction. */
551 	trx_t*			m_trx;
552 
553 	/** Space id of the file being iterated over. */
554 	ulint			m_space;
555 
556 	/** Current size of the space in pages */
557 	ulint			m_size;
558 
559 	/** Current extent descriptor page */
560 	xdes_t*			m_xdes;
561 
562 	/** Physical page offset in the file of the extent descriptor */
563 	ulint			m_xdes_page_no;
564 
565 	/** Flags value read from the header page */
566 	ulint			m_space_flags;
567 };
568 
569 /** Determine the page size to use for traversing the tablespace
570 @param file_size size of the tablespace file in bytes
571 @param block contents of the first page in the tablespace file.
572 @retval DB_SUCCESS or error code. */
573 dberr_t
init(os_offset_t file_size,const buf_block_t * block)574 AbstractCallback::init(
575 	os_offset_t		file_size,
576 	const buf_block_t*	block) UNIV_NOTHROW
577 {
578 	const page_t*		page = block->frame;
579 
580 	m_space_flags = fsp_header_get_flags(page);
581 	if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
582 		ulint cflags = fsp_flags_convert_from_101(m_space_flags);
583 		if (cflags == ULINT_UNDEFINED) {
584 			return(DB_CORRUPTION);
585 		}
586 		m_space_flags = cflags;
587 	}
588 
589 	/* Clear the DATA_DIR flag, which is basically garbage. */
590 	m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
591 	m_zip_size = fil_space_t::zip_size(m_space_flags);
592 	const ulint logical_size = fil_space_t::logical_size(m_space_flags);
593 	const ulint physical_size = fil_space_t::physical_size(m_space_flags);
594 
595 	if (logical_size != srv_page_size) {
596 
597 		ib::error() << "Page size " << logical_size
598 			<< " of ibd file is not the same as the server page"
599 			" size " << srv_page_size;
600 
601 		return(DB_CORRUPTION);
602 
603 	} else if (file_size & (physical_size - 1)) {
604 
605 		ib::error() << "File size " << file_size << " is not a"
606 			" multiple of the page size "
607 			<< physical_size;
608 
609 		return(DB_CORRUPTION);
610 	}
611 
612 	m_size  = mach_read_from_4(page + FSP_SIZE);
613 	if (m_space == ULINT_UNDEFINED) {
614 		m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
615 					   + page);
616 	}
617 
618 	return set_current_xdes(0, page);
619 }
620 
621 /**
622 TODO: This can be made parallel trivially by chunking up the file
623 and creating a callback per thread.. Main benefit will be to use
624 multiple CPUs for checksums and compressed tables. We have to do
625 compressed tables block by block right now. Secondly we need to
626 decompress/compress and copy too much of data. These are
627 CPU intensive.
628 
629 Iterate over all the pages in the tablespace.
630 @param iter - Tablespace iterator
631 @param block - block to use for IO
632 @param callback - Callback to inspect and update page contents
633 @retval DB_SUCCESS or error code */
634 static dberr_t fil_iterate(
635 	const fil_iterator_t&	iter,
636 	buf_block_t*		block,
637 	AbstractCallback&	callback);
638 
639 /**
640 Try and determine the index root pages by checking if the next/prev
641 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
642 struct FetchIndexRootPages : public AbstractCallback {
643 
644 	/** Index information gathered from the .ibd file. */
645 	struct Index {
646 
IndexFetchIndexRootPages::Index647 		Index(index_id_t id, ulint page_no)
648 			:
649 			m_id(id),
650 			m_page_no(page_no) { }
651 
652 		index_id_t	m_id;		/*!< Index id */
653 		ulint		m_page_no;	/*!< Root page number */
654 	};
655 
656 	/** Constructor
657 	@param trx covering (user) transaction
658 	@param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages659 	FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
660 		:
661 		AbstractCallback(trx, ULINT_UNDEFINED),
662 		m_table(table), m_index(0, 0) UNIV_NOTHROW { }
663 
664 	/** Destructor */
~FetchIndexRootPagesFetchIndexRootPages665 	~FetchIndexRootPages() UNIV_NOTHROW override { }
666 
667 	/** Fetch the clustered index root page in the tablespace
668 	@param iter	Tablespace iterator
669 	@param block	Block to use for IO
670 	@retval DB_SUCCESS or error code */
671 	dberr_t run(const fil_iterator_t& iter,
672 		    buf_block_t* block) UNIV_NOTHROW override;
673 
674 	/** Called for each block as it is read from the file.
675 	@param block block to convert, it is not from the buffer pool.
676 	@retval DB_SUCCESS or error code. */
677 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
678 
679 	/** Update the import configuration that will be used to import
680 	the tablespace. */
681 	dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
682 
683 	/** Table definition in server. */
684 	const dict_table_t*	m_table;
685 
686 	/** Index information */
687 	Index			m_index;
688 };
689 
690 /** Called for each block as it is read from the file. Check index pages to
691 determine the exact row format. We can't get that from the tablespace
692 header flags alone.
693 
694 @param block block to convert, it is not from the buffer pool.
695 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)696 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
697 {
698 	if (is_interrupted()) return DB_INTERRUPTED;
699 
700 	const page_t*	page = get_frame(block);
701 
702 	m_index.m_id = btr_page_get_index_id(page);
703 	m_index.m_page_no = block->page.id.page_no();
704 
705 	/* Check that the tablespace flags match the table flags. */
706 	ulint expected = dict_tf_to_fsp_flags(m_table->flags);
707 	if (!fsp_flags_match(expected, m_space_flags)) {
708 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
709 			ER_TABLE_SCHEMA_MISMATCH,
710 			"Expected FSP_SPACE_FLAGS=0x%x, .ibd "
711 			"file contains 0x%x.",
712 			unsigned(expected),
713 			unsigned(m_space_flags));
714 		return(DB_CORRUPTION);
715 	}
716 
717 	if (!page_is_comp(block->frame) !=
718 	    !dict_table_is_comp(m_table)) {
719 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
720 			ER_TABLE_SCHEMA_MISMATCH,
721 			"ROW_FORMAT mismatch");
722 		return DB_CORRUPTION;
723 	}
724 
725 	return DB_SUCCESS;
726 }
727 
728 /**
729 Update the import configuration that will be used to import the tablespace.
730 @return error code or DB_SUCCESS */
731 dberr_t
build_row_import(row_import * cfg) const732 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
733 {
734 	ut_a(cfg->m_table == m_table);
735 	cfg->m_zip_size = m_zip_size;
736 	cfg->m_n_indexes = 1;
737 
738 	if (cfg->m_n_indexes == 0) {
739 
740 		ib::error() << "No B+Tree found in tablespace";
741 
742 		return(DB_CORRUPTION);
743 	}
744 
745 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
746 
747 	/* Trigger OOM */
748 	DBUG_EXECUTE_IF(
749 		"ib_import_OOM_11",
750 		UT_DELETE_ARRAY(cfg->m_indexes);
751 		cfg->m_indexes = NULL;
752 	);
753 
754 	if (cfg->m_indexes == NULL) {
755 		return(DB_OUT_OF_MEMORY);
756 	}
757 
758 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
759 
760 	row_index_t*	cfg_index = cfg->m_indexes;
761 
762 	char	name[BUFSIZ];
763 
764 	snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
765 
766 	ulint	len = strlen(name) + 1;
767 
768 	cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
769 
770 	/* Trigger OOM */
771 	DBUG_EXECUTE_IF(
772 		"ib_import_OOM_12",
773 		UT_DELETE_ARRAY(cfg_index->m_name);
774 		cfg_index->m_name = NULL;
775 	);
776 
777 	if (cfg_index->m_name == NULL) {
778 		return(DB_OUT_OF_MEMORY);
779 	}
780 
781 	memcpy(cfg_index->m_name, name, len);
782 
783 	cfg_index->m_id = m_index.m_id;
784 
785 	cfg_index->m_space = m_space;
786 
787 	cfg_index->m_page_no = m_index.m_page_no;
788 
789 	return(DB_SUCCESS);
790 }
791 
792 /* Functor that is called for each physical page that is read from the
793 tablespace file.
794 
795   1. Check each page for corruption.
796 
797   2. Update the space id and LSN on every page
798      * For the header page
799        - Validate the flags
800        - Update the LSN
801 
802   3. On Btree pages
803      * Set the index id
804      * Update the max trx id
805      * In a cluster index, update the system columns
806      * In a cluster index, update the BLOB ptr, set the space id
807      * Purge delete marked records, but only if they can be easily
808        removed from the page
809      * Keep a counter of number of rows, ie. non-delete-marked rows
810      * Keep a counter of number of delete marked rows
811      * Keep a counter of number of purge failure
812      * If a page is stamped with an index id that isn't in the .cfg file
813        we assume it is deleted and the page can be ignored.
814 
815    4. Set the page state to dirty so that it will be written to disk.
816 */
817 class PageConverter : public AbstractCallback {
818 public:
819 	/** Constructor
820 	@param cfg config of table being imported.
821 	@param space_id tablespace identifier
822 	@param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)823 	PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
824 		:
825 		AbstractCallback(trx, space_id),
826 		m_cfg(cfg),
827 		m_index(cfg->m_indexes),
828 		m_current_lsn(log_get_lsn()),
829 		m_page_zip_ptr(0),
830 		m_rec_iter(),
831 		m_offsets_(), m_offsets(m_offsets_),
832 		m_heap(0),
833 		m_cluster_index(dict_table_get_first_index(cfg->m_table))
834 	{
835 		ut_ad(m_current_lsn);
836 		rec_offs_init(m_offsets_);
837 	}
838 
~PageConverter()839 	~PageConverter() UNIV_NOTHROW override
840 	{
841 		if (m_heap != 0) {
842 			mem_heap_free(m_heap);
843 		}
844 	}
845 
run(const fil_iterator_t & iter,buf_block_t * block)846 	dberr_t run(const fil_iterator_t& iter,
847 		    buf_block_t* block) UNIV_NOTHROW override
848 	{
849 		return fil_iterate(iter, block, *this);
850 	}
851 
852 	/** Called for each block as it is read from the file.
853 	@param block block to convert, it is not from the buffer pool.
854 	@retval DB_SUCCESS or error code. */
855 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
856 
857 private:
858 	/** Update the page, set the space id, max trx id and index id.
859 	@param block block read from file
860 	@param page_type type of the page
861 	@retval DB_SUCCESS or error code */
862 	dberr_t update_page(
863 		buf_block_t*	block,
864 		ulint&		page_type) UNIV_NOTHROW;
865 
866 	/** Update the space, index id, trx id.
867 	@param block block to convert
868 	@return DB_SUCCESS or error code */
869 	dberr_t	update_index_page(buf_block_t*	block) UNIV_NOTHROW;
870 
871 	/** Update the BLOB refrences and write UNDO log entries for
872 	rows that can't be purged optimistically.
873 	@param block block to update
874 	@retval DB_SUCCESS or error code */
875 	dberr_t	update_records(buf_block_t* block) UNIV_NOTHROW;
876 
877 	/** Validate the space flags and update tablespace header page.
878 	@param block block read from file, not from the buffer pool.
879 	@retval DB_SUCCESS or error code */
880 	dberr_t	update_header(buf_block_t* block) UNIV_NOTHROW;
881 
882 	/** Adjust the BLOB reference for a single column that is externally stored
883 	@param rec record to update
884 	@param offsets column offsets for the record
885 	@param i column ordinal value
886 	@return DB_SUCCESS or error code */
887 	dberr_t	adjust_cluster_index_blob_column(
888 		rec_t*		rec,
889 		const rec_offs*	offsets,
890 		ulint		i) UNIV_NOTHROW;
891 
892 	/** Adjusts the BLOB reference in the clustered index row for all
893 	externally stored columns.
894 	@param rec record to update
895 	@param offsets column offsets for the record
896 	@return DB_SUCCESS or error code */
897 	dberr_t	adjust_cluster_index_blob_columns(
898 		rec_t*		rec,
899 		const rec_offs*	offsets) UNIV_NOTHROW;
900 
901 	/** In the clustered index, adjist the BLOB pointers as needed.
902 	Also update the BLOB reference, write the new space id.
903 	@param rec record to update
904 	@param offsets column offsets for the record
905 	@return DB_SUCCESS or error code */
906 	dberr_t	adjust_cluster_index_blob_ref(
907 		rec_t*		rec,
908 		const rec_offs*	offsets) UNIV_NOTHROW;
909 
910 	/** Purge delete-marked records, only if it is possible to do
911 	so without re-organising the B+tree.
912 	@retval true if purged */
913 	bool purge() UNIV_NOTHROW;
914 
915 	/** Adjust the BLOB references and sys fields for the current record.
916 	@param rec record to update
917 	@param offsets column offsets for the record
918 	@return DB_SUCCESS or error code. */
919 	dberr_t	adjust_cluster_record(
920 		rec_t*			rec,
921 		const rec_offs*		offsets) UNIV_NOTHROW;
922 
923 	/** Find an index with the matching id.
924 	@return row_index_t* instance or 0 */
find_index(index_id_t id)925 	row_index_t* find_index(index_id_t id) UNIV_NOTHROW
926 	{
927 		row_index_t*	index = &m_cfg->m_indexes[0];
928 
929 		for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
930 			if (id == index->m_id) {
931 				return(index);
932 			}
933 		}
934 
935 		return(0);
936 
937 	}
938 private:
939 	/** Config for table that is being imported. */
940 	row_import*		m_cfg;
941 
942 	/** Current index whose pages are being imported */
943 	row_index_t*		m_index;
944 
945 	/** Current system LSN */
946 	lsn_t			m_current_lsn;
947 
948 	/** Alias for m_page_zip, only set for compressed pages. */
949 	page_zip_des_t*		m_page_zip_ptr;
950 
951 	/** Iterator over records in a block */
952 	RecIterator		m_rec_iter;
953 
954 	/** Record offset */
955 	rec_offs		m_offsets_[REC_OFFS_NORMAL_SIZE];
956 
957 	/** Pointer to m_offsets_ */
958 	rec_offs*		m_offsets;
959 
960 	/** Memory heap for the record offsets */
961 	mem_heap_t*		m_heap;
962 
963 	/** Cluster index instance */
964 	dict_index_t*		m_cluster_index;
965 };
966 
967 /**
968 row_import destructor. */
~row_import()969 row_import::~row_import() UNIV_NOTHROW
970 {
971 	for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
972 		UT_DELETE_ARRAY(m_indexes[i].m_name);
973 
974 		if (m_indexes[i].m_fields == NULL) {
975 			continue;
976 		}
977 
978 		dict_field_t*	fields = m_indexes[i].m_fields;
979 		ulint		n_fields = m_indexes[i].m_n_fields;
980 
981 		for (ulint j = 0; j < n_fields; ++j) {
982 			UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
983 		}
984 
985 		UT_DELETE_ARRAY(fields);
986 	}
987 
988 	for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
989 		UT_DELETE_ARRAY(m_col_names[i]);
990 	}
991 
992 	UT_DELETE_ARRAY(m_cols);
993 	UT_DELETE_ARRAY(m_indexes);
994 	UT_DELETE_ARRAY(m_col_names);
995 	UT_DELETE_ARRAY(m_table_name);
996 	UT_DELETE_ARRAY(m_hostname);
997 }
998 
999 /** Find the index entry in in the indexes array.
1000 @param name index name
1001 @return instance if found else 0. */
1002 row_index_t*
get_index(const char * name) const1003 row_import::get_index(
1004 	const char*	name) const UNIV_NOTHROW
1005 {
1006 	for (ulint i = 0; i < m_n_indexes; ++i) {
1007 		const char*	index_name;
1008 		row_index_t*	index = &m_indexes[i];
1009 
1010 		index_name = reinterpret_cast<const char*>(index->m_name);
1011 
1012 		if (strcmp(index_name, name) == 0) {
1013 
1014 			return(index);
1015 		}
1016 	}
1017 
1018 	return(0);
1019 }
1020 
1021 /** Get the number of rows in the index.
1022 @param name index name
1023 @return number of rows (doesn't include delete marked rows). */
1024 ulint
get_n_rows(const char * name) const1025 row_import::get_n_rows(
1026 	const char*	name) const UNIV_NOTHROW
1027 {
1028 	const row_index_t*	index = get_index(name);
1029 
1030 	ut_a(name != 0);
1031 
1032 	return(index->m_stats.m_n_rows);
1033 }
1034 
1035 /** Get the number of rows for which purge failed uding the convert phase.
1036 @param name index name
1037 @return number of rows for which purge failed. */
1038 ulint
get_n_purge_failed(const char * name) const1039 row_import::get_n_purge_failed(
1040 	const char*	name) const UNIV_NOTHROW
1041 {
1042 	const row_index_t*	index = get_index(name);
1043 
1044 	ut_a(name != 0);
1045 
1046 	return(index->m_stats.m_n_purge_failed);
1047 }
1048 
1049 /** Find the ordinal value of the column name in the cfg table columns.
1050 @param name of column to look for.
1051 @return ULINT_UNDEFINED if not found. */
1052 ulint
find_col(const char * name) const1053 row_import::find_col(
1054 	const char*	name) const UNIV_NOTHROW
1055 {
1056 	for (ulint i = 0; i < m_n_cols; ++i) {
1057 		const char*	col_name;
1058 
1059 		col_name = reinterpret_cast<const char*>(m_col_names[i]);
1060 
1061 		if (strcmp(col_name, name) == 0) {
1062 			return(i);
1063 		}
1064 	}
1065 
1066 	return(ULINT_UNDEFINED);
1067 }
1068 
1069 /**
1070 Check if the index schema that was read from the .cfg file matches the
1071 in memory index definition.
1072 @return DB_SUCCESS or error code. */
1073 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1074 row_import::match_index_columns(
1075 	THD*			thd,
1076 	const dict_index_t*	index) UNIV_NOTHROW
1077 {
1078 	row_index_t*		cfg_index;
1079 	dberr_t			err = DB_SUCCESS;
1080 
1081 	cfg_index = get_index(index->name);
1082 
1083 	if (cfg_index == 0) {
1084 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1085 			ER_TABLE_SCHEMA_MISMATCH,
1086 			"Index %s not found in tablespace meta-data file.",
1087 			index->name());
1088 
1089 		return(DB_ERROR);
1090 	}
1091 
1092 	if (cfg_index->m_n_fields != index->n_fields) {
1093 
1094 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1095 			ER_TABLE_SCHEMA_MISMATCH,
1096 			"Index field count %u doesn't match"
1097 			" tablespace metadata file value " ULINTPF,
1098 			index->n_fields, cfg_index->m_n_fields);
1099 
1100 		return(DB_ERROR);
1101 	}
1102 
1103 	cfg_index->m_srv_index = index;
1104 
1105 	const dict_field_t*	field = index->fields;
1106 	const dict_field_t*	cfg_field = cfg_index->m_fields;
1107 
1108 	for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1109 
1110 		if (field->name() && cfg_field->name()
1111 		     && strcmp(field->name(), cfg_field->name()) != 0) {
1112 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1113 				ER_TABLE_SCHEMA_MISMATCH,
1114 				"Index field name %s doesn't match"
1115 				" tablespace metadata field name %s"
1116 				" for field position " ULINTPF,
1117 				field->name(), cfg_field->name(), i);
1118 
1119 			err = DB_ERROR;
1120 		}
1121 
1122 		if (cfg_field->prefix_len != field->prefix_len) {
1123 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1124 				ER_TABLE_SCHEMA_MISMATCH,
1125 				"Index %s field %s prefix len %u"
1126 				" doesn't match metadata file value %u",
1127 				index->name(), field->name(),
1128 				field->prefix_len, cfg_field->prefix_len);
1129 
1130 			err = DB_ERROR;
1131 		}
1132 
1133 		if (cfg_field->fixed_len != field->fixed_len) {
1134 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1135 				ER_TABLE_SCHEMA_MISMATCH,
1136 				"Index %s field %s fixed len %u"
1137 				" doesn't match metadata file value %u",
1138 				index->name(), field->name(),
1139 				field->fixed_len,
1140 				cfg_field->fixed_len);
1141 
1142 			err = DB_ERROR;
1143 		}
1144 	}
1145 
1146 	return(err);
1147 }
1148 
1149 /** Check if the table schema that was read from the .cfg file matches the
1150 in memory table definition.
1151 @param thd MySQL session variable
1152 @return DB_SUCCESS or error code. */
1153 dberr_t
match_table_columns(THD * thd)1154 row_import::match_table_columns(
1155 	THD*			thd) UNIV_NOTHROW
1156 {
1157 	dberr_t			err = DB_SUCCESS;
1158 	const dict_col_t*	col = m_table->cols;
1159 
1160 	for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1161 
1162 		const char*	col_name;
1163 		ulint		cfg_col_index;
1164 
1165 		col_name = dict_table_get_col_name(
1166 			m_table, dict_col_get_no(col));
1167 
1168 		cfg_col_index = find_col(col_name);
1169 
1170 		if (cfg_col_index == ULINT_UNDEFINED) {
1171 
1172 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1173 				 ER_TABLE_SCHEMA_MISMATCH,
1174 				 "Column %s not found in tablespace.",
1175 				 col_name);
1176 
1177 			err = DB_ERROR;
1178 		} else if (cfg_col_index != col->ind) {
1179 
1180 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1181 				ER_TABLE_SCHEMA_MISMATCH,
1182 				"Column %s ordinal value mismatch, it's at %u"
1183 				" in the table and " ULINTPF
1184 				" in the tablespace meta-data file",
1185 				col_name, col->ind, cfg_col_index);
1186 
1187 			err = DB_ERROR;
1188 		} else {
1189 			const dict_col_t*	cfg_col;
1190 
1191 			cfg_col = &m_cols[cfg_col_index];
1192 			ut_a(cfg_col->ind == cfg_col_index);
1193 
1194 			if (cfg_col->prtype != col->prtype) {
1195 				ib_errf(thd,
1196 					IB_LOG_LEVEL_ERROR,
1197 					ER_TABLE_SCHEMA_MISMATCH,
1198 					"Column %s precise type mismatch,"
1199 					" it's 0X%X in the table and 0X%X"
1200 					" in the tablespace meta file",
1201 					col_name, col->prtype, cfg_col->prtype);
1202 				err = DB_ERROR;
1203 			}
1204 
1205 			if (cfg_col->mtype != col->mtype) {
1206 				ib_errf(thd,
1207 					IB_LOG_LEVEL_ERROR,
1208 					ER_TABLE_SCHEMA_MISMATCH,
1209 					"Column %s main type mismatch,"
1210 					" it's 0X%X in the table and 0X%X"
1211 					" in the tablespace meta file",
1212 					col_name, col->mtype, cfg_col->mtype);
1213 				err = DB_ERROR;
1214 			}
1215 
1216 			if (cfg_col->len != col->len) {
1217 				ib_errf(thd,
1218 					IB_LOG_LEVEL_ERROR,
1219 					ER_TABLE_SCHEMA_MISMATCH,
1220 					"Column %s length mismatch,"
1221 					" it's %u in the table and %u"
1222 					" in the tablespace meta file",
1223 					col_name, col->len, cfg_col->len);
1224 				err = DB_ERROR;
1225 			}
1226 
1227 			if (cfg_col->mbminlen != col->mbminlen
1228 			    || cfg_col->mbmaxlen != col->mbmaxlen) {
1229 				ib_errf(thd,
1230 					IB_LOG_LEVEL_ERROR,
1231 					ER_TABLE_SCHEMA_MISMATCH,
1232 					"Column %s multi-byte len mismatch,"
1233 					" it's %u-%u in the table and %u-%u"
1234 					" in the tablespace meta file",
1235 					col_name, col->mbminlen, col->mbmaxlen,
1236 					cfg_col->mbminlen, cfg_col->mbmaxlen);
1237 				err = DB_ERROR;
1238 			}
1239 
1240 			if (cfg_col->ind != col->ind) {
1241 				ib_errf(thd,
1242 					IB_LOG_LEVEL_ERROR,
1243 					ER_TABLE_SCHEMA_MISMATCH,
1244 					"Column %s position mismatch,"
1245 					" it's %u in the table and %u"
1246 					" in the tablespace meta file",
1247 					col_name, col->ind, cfg_col->ind);
1248 				err = DB_ERROR;
1249 			}
1250 
1251 			if (cfg_col->ord_part != col->ord_part) {
1252 				ib_errf(thd,
1253 					IB_LOG_LEVEL_ERROR,
1254 					ER_TABLE_SCHEMA_MISMATCH,
1255 					"Column %s ordering mismatch,"
1256 					" it's %u in the table and %u"
1257 					" in the tablespace meta file",
1258 					col_name, col->ord_part,
1259 					cfg_col->ord_part);
1260 				err = DB_ERROR;
1261 			}
1262 
1263 			if (cfg_col->max_prefix != col->max_prefix) {
1264 				ib_errf(thd,
1265 					IB_LOG_LEVEL_ERROR,
1266 					ER_TABLE_SCHEMA_MISMATCH,
1267 					"Column %s max prefix mismatch"
1268 					" it's %u in the table and %u"
1269 					" in the tablespace meta file",
1270 					col_name, col->max_prefix,
1271 					cfg_col->max_prefix);
1272 				err = DB_ERROR;
1273 			}
1274 		}
1275 	}
1276 
1277 	return(err);
1278 }
1279 
match_flags(THD * thd) const1280 dberr_t row_import::match_flags(THD *thd) const
1281 {
1282   ulint mismatch= (m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR;
1283   if (!mismatch)
1284     return DB_SUCCESS;
1285 
1286   const char *msg;
1287   if (mismatch & DICT_TF_MASK_ZIP_SSIZE)
1288   {
1289     if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) &&
1290         (m_flags & DICT_TF_MASK_ZIP_SSIZE))
1291     {
1292       switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1293       case 0U << DICT_TF_POS_ZIP_SSIZE:
1294         goto uncompressed;
1295       case 1U << DICT_TF_POS_ZIP_SSIZE:
1296         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=1";
1297         break;
1298       case 2U << DICT_TF_POS_ZIP_SSIZE:
1299         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=2";
1300         break;
1301       case 3U << DICT_TF_POS_ZIP_SSIZE:
1302         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=4";
1303         break;
1304       case 4U << DICT_TF_POS_ZIP_SSIZE:
1305         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8";
1306         break;
1307       case 5U << DICT_TF_POS_ZIP_SSIZE:
1308         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=16";
1309         break;
1310       default:
1311         msg= "strange KEY_BLOCK_SIZE";
1312       }
1313     }
1314     else if (m_flags & DICT_TF_MASK_ZIP_SSIZE)
1315       msg= "ROW_FORMAT=COMPRESSED";
1316     else
1317       goto uncompressed;
1318   }
1319   else
1320   {
1321   uncompressed:
1322     msg= (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) ? "ROW_FORMAT=DYNAMIC"
1323          : (m_flags & DICT_TF_MASK_COMPACT)    ? "ROW_FORMAT=COMPACT"
1324                                                : "ROW_FORMAT=REDUNDANT";
1325   }
1326 
1327   ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1328           "Table flags don't match, server table has 0x%x and the meta-data "
1329           "file has 0x%zx; .cfg file uses %s",
1330           m_table->flags, m_flags, msg);
1331 
1332   return DB_ERROR;
1333 }
1334 
1335 /** Check if the table (and index) schema that was read from the .cfg file
1336 matches the in memory table definition.
1337 @param thd MySQL session variable
1338 @return DB_SUCCESS or error code. */
1339 dberr_t
match_schema(THD * thd)1340 row_import::match_schema(
1341 	THD*		thd) UNIV_NOTHROW
1342 {
1343 	/* Do some simple checks. */
1344 
1345 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1346 
1347 		/* If the number of indexes don't match then it is better
1348 		to abort the IMPORT. It is easy for the user to create a
1349 		table matching the IMPORT definition. */
1350 
1351 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1352 			"Number of indexes don't match, table has " ULINTPF
1353 			" indexes but the tablespace meta-data file has "
1354 			ULINTPF " indexes",
1355 			UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1356 
1357 		return(DB_ERROR);
1358 	}
1359 
1360 	dberr_t	err = match_table_columns(thd);
1361 
1362 	if (err != DB_SUCCESS) {
1363 		return(err);
1364 	}
1365 
1366 	/* Check if the index definitions match. */
1367 
1368 	const dict_index_t* index;
1369 
1370 	for (index = UT_LIST_GET_FIRST(m_table->indexes);
1371 	     index != 0;
1372 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1373 
1374 		dberr_t	index_err;
1375 
1376 		index_err = match_index_columns(thd, index);
1377 
1378 		if (index_err != DB_SUCCESS) {
1379 			err = index_err;
1380 		}
1381 	}
1382 
1383 	return(err);
1384 }
1385 
1386 /**
1387 Set the index root <space, pageno>, using index name. */
1388 void
set_root_by_name()1389 row_import::set_root_by_name() UNIV_NOTHROW
1390 {
1391 	row_index_t*	cfg_index = m_indexes;
1392 
1393 	for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1394 		dict_index_t*	index;
1395 
1396 		const char*	index_name;
1397 
1398 		index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1399 
1400 		index = dict_table_get_index_on_name(m_table, index_name);
1401 
1402 		/* We've already checked that it exists. */
1403 		ut_a(index != 0);
1404 
1405 		index->page = cfg_index->m_page_no;
1406 	}
1407 }
1408 
1409 /**
1410 Set the index root <space, pageno>, using a heuristic.
1411 @return DB_SUCCESS or error code */
1412 dberr_t
set_root_by_heuristic()1413 row_import::set_root_by_heuristic() UNIV_NOTHROW
1414 {
1415 	row_index_t*	cfg_index = m_indexes;
1416 
1417 	ut_a(m_n_indexes > 0);
1418 
1419 	// TODO: For now use brute force, based on ordinality
1420 
1421 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1422 
1423 		ib::warn() << "Table " << m_table->name << " should have "
1424 			<< UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1425 			" the tablespace has " << m_n_indexes << " indexes";
1426 	}
1427 
1428 	dict_mutex_enter_for_mysql();
1429 
1430 	ulint	i = 0;
1431 	dberr_t	err = DB_SUCCESS;
1432 
1433 	for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1434 	     index != 0;
1435 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1436 
1437 		if (index->type & DICT_FTS) {
1438 			index->type |= DICT_CORRUPT;
1439 			ib::warn() << "Skipping FTS index: " << index->name;
1440 		} else if (i < m_n_indexes) {
1441 
1442 			UT_DELETE_ARRAY(cfg_index[i].m_name);
1443 
1444 			ulint	len = strlen(index->name) + 1;
1445 
1446 			cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1447 
1448 			/* Trigger OOM */
1449 			DBUG_EXECUTE_IF(
1450 				"ib_import_OOM_14",
1451 				UT_DELETE_ARRAY(cfg_index[i].m_name);
1452 				cfg_index[i].m_name = NULL;
1453 			);
1454 
1455 			if (cfg_index[i].m_name == NULL) {
1456 				err = DB_OUT_OF_MEMORY;
1457 				break;
1458 			}
1459 
1460 			memcpy(cfg_index[i].m_name, index->name, len);
1461 
1462 			cfg_index[i].m_srv_index = index;
1463 
1464 			index->page = cfg_index[i].m_page_no;
1465 
1466 			++i;
1467 		}
1468 	}
1469 
1470 	dict_mutex_exit_for_mysql();
1471 
1472 	return(err);
1473 }
1474 
1475 /**
1476 Purge delete marked records.
1477 @return DB_SUCCESS or error code. */
1478 dberr_t
garbage_collect()1479 IndexPurge::garbage_collect() UNIV_NOTHROW
1480 {
1481 	dberr_t	err;
1482 	ibool	comp = dict_table_is_comp(m_index->table);
1483 
1484 	/* Open the persistent cursor and start the mini-transaction. */
1485 
1486 	open();
1487 
1488 	while ((err = next()) == DB_SUCCESS) {
1489 
1490 		rec_t*	rec = btr_pcur_get_rec(&m_pcur);
1491 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1492 
1493 		if (!deleted) {
1494 			++m_n_rows;
1495 		} else {
1496 			purge();
1497 		}
1498 	}
1499 
1500 	/* Close the persistent cursor and commit the mini-transaction. */
1501 
1502 	close();
1503 
1504 	return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1505 }
1506 
1507 /**
1508 Begin import, position the cursor on the first record. */
1509 void
open()1510 IndexPurge::open() UNIV_NOTHROW
1511 {
1512 	mtr_start(&m_mtr);
1513 
1514 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1515 
1516 	btr_pcur_open_at_index_side(
1517 		true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1518 	btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1519 	if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) {
1520 		ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1521 		/* Skip the metadata pseudo-record. */
1522 	} else {
1523 		btr_pcur_move_to_prev_on_page(&m_pcur);
1524 	}
1525 }
1526 
1527 /**
1528 Close the persistent curosr and commit the mini-transaction. */
1529 void
close()1530 IndexPurge::close() UNIV_NOTHROW
1531 {
1532 	btr_pcur_close(&m_pcur);
1533 	mtr_commit(&m_mtr);
1534 }
1535 
1536 /**
1537 Position the cursor on the next record.
1538 @return DB_SUCCESS or error code */
1539 dberr_t
next()1540 IndexPurge::next() UNIV_NOTHROW
1541 {
1542 	btr_pcur_move_to_next_on_page(&m_pcur);
1543 
1544 	/* When switching pages, commit the mini-transaction
1545 	in order to release the latch on the old page. */
1546 
1547 	if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1548 		return(DB_SUCCESS);
1549 	} else if (trx_is_interrupted(m_trx)) {
1550 		/* Check after every page because the check
1551 		is expensive. */
1552 		return(DB_INTERRUPTED);
1553 	}
1554 
1555 	btr_pcur_store_position(&m_pcur, &m_mtr);
1556 
1557 	mtr_commit(&m_mtr);
1558 
1559 	mtr_start(&m_mtr);
1560 
1561 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1562 
1563 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1564 	/* The following is based on btr_pcur_move_to_next_user_rec(). */
1565 	m_pcur.old_stored = false;
1566 	ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1567 	do {
1568 		if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1569 			if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1570 				return DB_END_OF_INDEX;
1571 			}
1572 
1573 			buf_block_t* block = btr_pcur_get_block(&m_pcur);
1574 			uint32_t next_page = btr_page_get_next(block->frame);
1575 
1576 			/* MDEV-13542 FIXME: Make these checks part of
1577 			btr_pcur_move_to_next_page(), and introduce a
1578 			return status that will be checked in all callers! */
1579 			switch (next_page) {
1580 			default:
1581 				if (next_page != block->page.id.page_no()) {
1582 					break;
1583 				}
1584 				/* MDEV-20931 FIXME: Check that
1585 				next_page is within the tablespace
1586 				bounds! Also check that it is not a
1587 				change buffer bitmap page. */
1588 				/* fall through */
1589 			case 0:
1590 			case 1:
1591 			case FIL_NULL:
1592 				return DB_CORRUPTION;
1593 			}
1594 
1595 			dict_index_t* index = m_pcur.btr_cur.index;
1596 			buf_block_t* next_block = btr_block_get(
1597 				page_id_t(block->page.id.space(), next_page),
1598 				block->zip_size(), BTR_MODIFY_LEAF, index,
1599 				&m_mtr);
1600 
1601 			if (UNIV_UNLIKELY(!next_block
1602 					  || !fil_page_index_page_check(
1603 						  next_block->frame)
1604 					  || !!dict_index_is_spatial(index)
1605 					  != (fil_page_get_type(
1606 						      next_block->frame)
1607 					      == FIL_PAGE_RTREE)
1608 					  || page_is_comp(next_block->frame)
1609 					  != page_is_comp(block->frame)
1610 					  || btr_page_get_prev(
1611 						  next_block->frame)
1612 					  != block->page.id.page_no())) {
1613 				return DB_CORRUPTION;
1614 			}
1615 
1616 			btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1617 
1618 			page_cur_set_before_first(next_block,
1619 						  &m_pcur.btr_cur.page_cur);
1620 
1621 			ut_d(page_check_dir(next_block->frame));
1622 		} else {
1623 			btr_pcur_move_to_next_on_page(&m_pcur);
1624 		}
1625 	} while (!btr_pcur_is_on_user_rec(&m_pcur));
1626 
1627 	return DB_SUCCESS;
1628 }
1629 
1630 /**
1631 Store the persistent cursor position and reopen the
1632 B-tree cursor in BTR_MODIFY_TREE mode, because the
1633 tree structure may be changed during a pessimistic delete. */
1634 void
purge_pessimistic_delete()1635 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1636 {
1637 	dberr_t	err;
1638 
1639 	btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1640 				  &m_pcur, &m_mtr);
1641 
1642 	ut_ad(rec_get_deleted_flag(
1643 			btr_pcur_get_rec(&m_pcur),
1644 			dict_table_is_comp(m_index->table)));
1645 
1646 	btr_cur_pessimistic_delete(
1647 		&err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1648 
1649 	ut_a(err == DB_SUCCESS);
1650 
1651 	/* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1652 	mtr_commit(&m_mtr);
1653 }
1654 
1655 /**
1656 Purge delete-marked records. */
1657 void
purge()1658 IndexPurge::purge() UNIV_NOTHROW
1659 {
1660 	btr_pcur_store_position(&m_pcur, &m_mtr);
1661 
1662 	purge_pessimistic_delete();
1663 
1664 	mtr_start(&m_mtr);
1665 
1666 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1667 
1668 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1669 }
1670 
1671 /** Adjust the BLOB reference for a single column that is externally stored
1672 @param rec record to update
1673 @param offsets column offsets for the record
1674 @param i column ordinal value
1675 @return DB_SUCCESS or error code */
1676 inline
1677 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1678 PageConverter::adjust_cluster_index_blob_column(
1679 	rec_t*		rec,
1680 	const rec_offs*	offsets,
1681 	ulint		i) UNIV_NOTHROW
1682 {
1683 	ulint		len;
1684 	byte*		field;
1685 
1686 	field = rec_get_nth_field(rec, offsets, i, &len);
1687 
1688 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1689 			len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1690 
1691 	if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1692 
1693 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1694 			ER_INNODB_INDEX_CORRUPT,
1695 			"Externally stored column(" ULINTPF
1696 			") has a reference length of " ULINTPF
1697 			" in the cluster index %s",
1698 			i, len, m_cluster_index->name());
1699 
1700 		return(DB_CORRUPTION);
1701 	}
1702 
1703 	field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1704 
1705 	mach_write_to_4(field, get_space_id());
1706 
1707 	if (m_page_zip_ptr) {
1708 		page_zip_write_blob_ptr(
1709 			m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1710 	}
1711 
1712 	return(DB_SUCCESS);
1713 }
1714 
1715 /** Adjusts the BLOB reference in the clustered index row for all externally
1716 stored columns.
1717 @param rec record to update
1718 @param offsets column offsets for the record
1719 @return DB_SUCCESS or error code */
1720 inline
1721 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1722 PageConverter::adjust_cluster_index_blob_columns(
1723 	rec_t*		rec,
1724 	const rec_offs*	offsets) UNIV_NOTHROW
1725 {
1726 	ut_ad(rec_offs_any_extern(offsets));
1727 
1728 	/* Adjust the space_id in the BLOB pointers. */
1729 
1730 	for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1731 
1732 		/* Only if the column is stored "externally". */
1733 
1734 		if (rec_offs_nth_extern(offsets, i)) {
1735 			dberr_t	err;
1736 
1737 			err = adjust_cluster_index_blob_column(rec, offsets, i);
1738 
1739 			if (err != DB_SUCCESS) {
1740 				return(err);
1741 			}
1742 		}
1743 	}
1744 
1745 	return(DB_SUCCESS);
1746 }
1747 
1748 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1749 BLOB reference, write the new space id.
1750 @param rec record to update
1751 @param offsets column offsets for the record
1752 @return DB_SUCCESS or error code */
1753 inline
1754 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1755 PageConverter::adjust_cluster_index_blob_ref(
1756 	rec_t*		rec,
1757 	const rec_offs*	offsets) UNIV_NOTHROW
1758 {
1759 	if (rec_offs_any_extern(offsets)) {
1760 		dberr_t	err;
1761 
1762 		err = adjust_cluster_index_blob_columns(rec, offsets);
1763 
1764 		if (err != DB_SUCCESS) {
1765 			return(err);
1766 		}
1767 	}
1768 
1769 	return(DB_SUCCESS);
1770 }
1771 
1772 /** Purge delete-marked records, only if it is possible to do so without
1773 re-organising the B+tree.
1774 @return true if purge succeeded */
purge()1775 inline bool PageConverter::purge() UNIV_NOTHROW
1776 {
1777 	const dict_index_t*	index = m_index->m_srv_index;
1778 
1779 	/* We can't have a page that is empty and not root. */
1780 	if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1781 
1782 		++m_index->m_stats.m_n_purged;
1783 
1784 		return(true);
1785 	} else {
1786 		++m_index->m_stats.m_n_purge_failed;
1787 	}
1788 
1789 	return(false);
1790 }
1791 
1792 /** Adjust the BLOB references and sys fields for the current record.
1793 @param rec record to update
1794 @param offsets column offsets for the record
1795 @return DB_SUCCESS or error code. */
1796 inline
1797 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1798 PageConverter::adjust_cluster_record(
1799 	rec_t*			rec,
1800 	const rec_offs*		offsets) UNIV_NOTHROW
1801 {
1802 	dberr_t	err;
1803 
1804 	if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1805 
1806 		/* Reset DB_TRX_ID and DB_ROLL_PTR.  Normally, these fields
1807 		are only written in conjunction with other changes to the
1808 		record. */
1809 		ulint	trx_id_pos = m_cluster_index->n_uniq
1810 			? m_cluster_index->n_uniq : 1;
1811 		if (m_page_zip_ptr) {
1812 			page_zip_write_trx_id_and_roll_ptr(
1813 				m_page_zip_ptr, rec, m_offsets, trx_id_pos,
1814 				0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1815 				NULL);
1816 		} else {
1817 			ulint	len;
1818 			byte*	ptr = rec_get_nth_field(
1819 				rec, m_offsets, trx_id_pos, &len);
1820 			ut_ad(len == DATA_TRX_ID_LEN);
1821 			memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1822 		}
1823 	}
1824 
1825 	return(err);
1826 }
1827 
1828 /** Update the BLOB refrences and write UNDO log entries for
1829 rows that can't be purged optimistically.
1830 @param block block to update
1831 @retval DB_SUCCESS or error code */
1832 inline
1833 dberr_t
update_records(buf_block_t * block)1834 PageConverter::update_records(
1835 	buf_block_t*	block) UNIV_NOTHROW
1836 {
1837 	ibool	comp = dict_table_is_comp(m_cfg->m_table);
1838 	bool	clust_index = m_index->m_srv_index == m_cluster_index;
1839 
1840 	/* This will also position the cursor on the first user record. */
1841 
1842 	m_rec_iter.open(block);
1843 
1844 	while (!m_rec_iter.end()) {
1845 		rec_t*	rec = m_rec_iter.current();
1846 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1847 
1848 		/* For the clustered index we have to adjust the BLOB
1849 		reference and the system fields irrespective of the
1850 		delete marked flag. The adjustment of delete marked
1851 		cluster records is required for purge to work later. */
1852 
1853 		if (deleted || clust_index) {
1854 			m_offsets = rec_get_offsets(
1855 				rec, m_index->m_srv_index, m_offsets,
1856 				m_index->m_srv_index->n_core_fields,
1857 				ULINT_UNDEFINED, &m_heap);
1858 		}
1859 
1860 		if (clust_index) {
1861 
1862 			dberr_t err = adjust_cluster_record(rec, m_offsets);
1863 
1864 			if (err != DB_SUCCESS) {
1865 				return(err);
1866 			}
1867 		}
1868 
1869 		/* If it is a delete marked record then try an
1870 		optimistic delete. */
1871 
1872 		if (deleted) {
1873 			/* A successful purge will move the cursor to the
1874 			next record. */
1875 
1876 			if (!purge()) {
1877 				m_rec_iter.next();
1878 			}
1879 
1880 			++m_index->m_stats.m_n_deleted;
1881 		} else {
1882 			++m_index->m_stats.m_n_rows;
1883 			m_rec_iter.next();
1884 		}
1885 	}
1886 
1887 	return(DB_SUCCESS);
1888 }
1889 
1890 /** Update the space, index id, trx id.
1891 @return DB_SUCCESS or error code */
1892 inline
1893 dberr_t
update_index_page(buf_block_t * block)1894 PageConverter::update_index_page(
1895 	buf_block_t*	block) UNIV_NOTHROW
1896 {
1897 	index_id_t	id;
1898 	buf_frame_t*	page = block->frame;
1899 
1900 	if (is_free(block->page.id.page_no())) {
1901 		return(DB_SUCCESS);
1902 	} else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1903 		row_index_t*	index = find_index(id);
1904 
1905 		if (UNIV_UNLIKELY(!index)) {
1906 			if (m_cfg->m_missing) {
1907 				return DB_SUCCESS;
1908 			}
1909 
1910 			ib::error() << "Page for tablespace " << m_space
1911 				<< " is index page with id " << id
1912 				<< " but that index is not found from"
1913 				<< " configuration file. Current index name "
1914 				<< m_index->m_name << " and id " <<  m_index->m_id;
1915 			m_index = 0;
1916 			return(DB_CORRUPTION);
1917 		}
1918 
1919 		/* Update current index */
1920 		m_index = index;
1921 	}
1922 
1923 	/* If the .cfg file is missing and there is an index mismatch
1924 	then ignore the error. */
1925 	if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1926 		return(DB_SUCCESS);
1927 	}
1928 
1929 	if (m_index && block->page.id.page_no() == m_index->m_page_no) {
1930 		byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1931 			+ page;
1932 		mach_write_to_4(b, block->page.id.space());
1933 
1934 		memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1935 		       + page, b, 4);
1936 		if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1937 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1938 						     + PAGE_BTR_SEG_TOP
1939 						     + FSEG_HDR_SPACE], b, 4);
1940 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1941 						     + PAGE_BTR_SEG_LEAF
1942 						     + FSEG_HDR_SPACE], b, 4);
1943 		}
1944 	}
1945 
1946 #ifdef UNIV_ZIP_DEBUG
1947 	ut_a(!is_compressed_table()
1948 	     || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1949 #endif /* UNIV_ZIP_DEBUG */
1950 
1951 	/* This has to be written to uncompressed index header. Set it to
1952 	the current index id. */
1953 	btr_page_set_index_id(
1954 		page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1955 
1956 	if (dict_index_is_clust(m_index->m_srv_index)) {
1957 		dict_index_t* index = const_cast<dict_index_t*>(
1958 			m_index->m_srv_index);
1959 		if (block->page.id.page_no() != index->page) {
1960 			/* Clear PAGE_MAX_TRX_ID so that it can be
1961 			used for other purposes in the future. IMPORT
1962 			in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1963 			would set the field to the transaction ID even
1964 			on clustered index pages. */
1965 			page_set_max_trx_id(block, m_page_zip_ptr, 0, NULL);
1966 		}
1967 	} else {
1968 		/* Set PAGE_MAX_TRX_ID on secondary index leaf pages,
1969 		and clear it on non-leaf pages. */
1970 		page_set_max_trx_id(block, m_page_zip_ptr,
1971 				    page_is_leaf(page) ? m_trx->id : 0, NULL);
1972 	}
1973 
1974 	if (page_is_empty(page)) {
1975 
1976 		/* Only a root page can be empty. */
1977 		if (page_has_siblings(page)) {
1978 			// TODO: We should relax this and skip secondary
1979 			// indexes. Mark them as corrupt because they can
1980 			// always be rebuilt.
1981 			return(DB_CORRUPTION);
1982 		}
1983 
1984 		return(DB_SUCCESS);
1985 	}
1986 
1987 	return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
1988 }
1989 
1990 /** Validate the space flags and update tablespace header page.
1991 @param block block read from file, not from the buffer pool.
1992 @retval DB_SUCCESS or error code */
1993 inline
1994 dberr_t
update_header(buf_block_t * block)1995 PageConverter::update_header(
1996 	buf_block_t*	block) UNIV_NOTHROW
1997 {
1998 	/* Check for valid header */
1999 	switch (fsp_header_get_space_id(get_frame(block))) {
2000 	case 0:
2001 		return(DB_CORRUPTION);
2002 	case ULINT_UNDEFINED:
2003 		ib::warn() << "Space id check in the header failed: ignored";
2004 	}
2005 
2006 	mach_write_to_8(
2007 		get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
2008 		m_current_lsn);
2009 
2010 	/* Write back the adjusted flags. */
2011 	mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS
2012 			+ get_frame(block), m_space_flags);
2013 
2014 	/* Write space_id to the tablespace header, page 0. */
2015 	mach_write_to_4(
2016 		get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2017 		get_space_id());
2018 
2019 	/* This is on every page in the tablespace. */
2020 	mach_write_to_4(
2021 		get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2022 		get_space_id());
2023 
2024 	return(DB_SUCCESS);
2025 }
2026 
2027 /** Update the page, set the space id, max trx id and index id.
2028 @param block block read from file
2029 @retval DB_SUCCESS or error code */
2030 inline
2031 dberr_t
update_page(buf_block_t * block,ulint & page_type)2032 PageConverter::update_page(
2033 	buf_block_t*	block,
2034 	ulint&		page_type) UNIV_NOTHROW
2035 {
2036 	dberr_t		err = DB_SUCCESS;
2037 
2038 	ut_ad(!block->page.zip.data == !is_compressed_table());
2039 
2040 	if (block->page.zip.data) {
2041 		m_page_zip_ptr = &block->page.zip;
2042 	} else {
2043 		ut_ad(!m_page_zip_ptr);
2044 	}
2045 
2046 	switch (page_type = fil_page_get_type(get_frame(block))) {
2047 	case FIL_PAGE_TYPE_FSP_HDR:
2048 		ut_a(block->page.id.page_no() == 0);
2049 		/* Work directly on the uncompressed page headers. */
2050 		return(update_header(block));
2051 
2052 	case FIL_PAGE_INDEX:
2053 	case FIL_PAGE_RTREE:
2054 		/* We need to decompress the contents into block->frame
2055 		before we can do any thing with Btree pages. */
2056 
2057 		if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2058 			return(DB_CORRUPTION);
2059 		}
2060 
2061 		/* fall through */
2062 	case FIL_PAGE_TYPE_INSTANT:
2063 		/* This is on every page in the tablespace. */
2064 		mach_write_to_4(
2065 			get_frame(block)
2066 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2067 
2068 		/* Only update the Btree nodes. */
2069 		return(update_index_page(block));
2070 
2071 	case FIL_PAGE_TYPE_SYS:
2072 		/* This is page 0 in the system tablespace. */
2073 		return(DB_CORRUPTION);
2074 
2075 	case FIL_PAGE_TYPE_XDES:
2076 		err = set_current_xdes(
2077 			block->page.id.page_no(), get_frame(block));
2078 		/* fall through */
2079 	case FIL_PAGE_INODE:
2080 	case FIL_PAGE_TYPE_TRX_SYS:
2081 	case FIL_PAGE_IBUF_FREE_LIST:
2082 	case FIL_PAGE_TYPE_ALLOCATED:
2083 	case FIL_PAGE_IBUF_BITMAP:
2084 	case FIL_PAGE_TYPE_BLOB:
2085 	case FIL_PAGE_TYPE_ZBLOB:
2086 	case FIL_PAGE_TYPE_ZBLOB2:
2087 
2088 		/* Work directly on the uncompressed page headers. */
2089 		/* This is on every page in the tablespace. */
2090 		mach_write_to_4(
2091 			get_frame(block)
2092 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2093 
2094 		return(err);
2095 	}
2096 
2097 	ib::warn() << "Unknown page type (" << page_type << ")";
2098 
2099 	return(DB_CORRUPTION);
2100 }
2101 
2102 /** Called for every page in the tablespace. If the page was not
2103 updated then its state must be set to BUF_PAGE_NOT_USED.
2104 @param block block read from file, note it is not from the buffer pool
2105 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2106 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2107 {
2108 	/* If we already had an old page with matching number
2109 	in the buffer pool, evict it now, because
2110 	we no longer evict the pages on DISCARD TABLESPACE. */
2111 	buf_page_get_gen(block->page.id, get_zip_size(),
2112 			 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2113 			 __FILE__, __LINE__, NULL, NULL);
2114 
2115 	ulint		page_type;
2116 
2117 	if (dberr_t err = update_page(block, page_type)) {
2118 		return err;
2119 	}
2120 
2121 	const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
2122 
2123 	if (!block->page.zip.data) {
2124 		buf_flush_init_for_writing(
2125 			NULL, block->frame, NULL, m_current_lsn, full_crc32);
2126 	} else if (fil_page_type_is_index(page_type)) {
2127 		buf_flush_init_for_writing(
2128 			NULL, block->page.zip.data, &block->page.zip,
2129 			m_current_lsn, full_crc32);
2130 	} else {
2131 		/* Calculate and update the checksum of non-index
2132 		pages for ROW_FORMAT=COMPRESSED tables. */
2133 		buf_flush_update_zip_checksum(
2134 			block->page.zip.data, block->zip_size(),
2135 			m_current_lsn);
2136 	}
2137 
2138 	return DB_SUCCESS;
2139 }
2140 
2141 /*****************************************************************//**
2142 Clean up after import tablespace failure, this function will acquire
2143 the dictionary latches on behalf of the transaction if the transaction
2144 hasn't already acquired them. */
2145 static	MY_ATTRIBUTE((nonnull))
2146 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2147 row_import_discard_changes(
2148 /*=======================*/
2149 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2150 	trx_t*		trx,		/*!< in/out: transaction for import */
2151 	dberr_t		err)		/*!< in: error code */
2152 {
2153 	dict_table_t*	table = prebuilt->table;
2154 
2155 	ut_a(err != DB_SUCCESS);
2156 
2157 	prebuilt->trx->error_info = NULL;
2158 
2159 	ib::info() << "Discarding tablespace of table "
2160 		<< prebuilt->table->name
2161 		<< ": " << err;
2162 
2163 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2164 		ut_a(trx->dict_operation_lock_mode == 0);
2165 		row_mysql_lock_data_dictionary(trx);
2166 	}
2167 
2168 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2169 
2170 	/* Since we update the index root page numbers on disk after
2171 	we've done a successful import. The table will not be loadable.
2172 	However, we need to ensure that the in memory root page numbers
2173 	are reset to "NULL". */
2174 
2175 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2176 		index != 0;
2177 		index = UT_LIST_GET_NEXT(indexes, index)) {
2178 
2179 		index->page = FIL_NULL;
2180 	}
2181 
2182 	table->file_unreadable = true;
2183 	if (table->space) {
2184 		fil_close_tablespace(trx, table->space_id);
2185 		table->space = NULL;
2186 	}
2187 }
2188 
2189 /*****************************************************************//**
2190 Clean up after import tablespace. */
2191 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2192 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2193 row_import_cleanup(
2194 /*===============*/
2195 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2196 	trx_t*		trx,		/*!< in/out: transaction for import */
2197 	dberr_t		err)		/*!< in: error code */
2198 {
2199 	ut_a(prebuilt->trx != trx);
2200 
2201 	if (err != DB_SUCCESS) {
2202 		row_import_discard_changes(prebuilt, trx, err);
2203 	}
2204 
2205 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2206 
2207 	DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2208 
2209 	trx_commit_for_mysql(trx);
2210 
2211 	row_mysql_unlock_data_dictionary(trx);
2212 
2213 	trx->free();
2214 
2215 	prebuilt->trx->op_info = "";
2216 
2217 	DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2218 
2219 	log_make_checkpoint();
2220 
2221 	return(err);
2222 }
2223 
2224 /*****************************************************************//**
2225 Report error during tablespace import. */
2226 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2227 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2228 row_import_error(
2229 /*=============*/
2230 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2231 	trx_t*		trx,		/*!< in/out: transaction for import */
2232 	dberr_t		err)		/*!< in: error code */
2233 {
2234 	if (!trx_is_interrupted(trx)) {
2235 		char	table_name[MAX_FULL_NAME_LEN + 1];
2236 
2237 		innobase_format_name(
2238 			table_name, sizeof(table_name),
2239 			prebuilt->table->name.m_name);
2240 
2241 		ib_senderrf(
2242 			trx->mysql_thd, IB_LOG_LEVEL_WARN,
2243 			ER_INNODB_IMPORT_ERROR,
2244 			table_name, (ulong) err, ut_strerr(err));
2245 	}
2246 
2247 	return(row_import_cleanup(prebuilt, trx, err));
2248 }
2249 
2250 /*****************************************************************//**
2251 Adjust the root page index node and leaf node segment headers, update
2252 with the new space id. For all the table's secondary indexes.
2253 @return error code */
2254 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2255 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2256 row_import_adjust_root_pages_of_secondary_indexes(
2257 /*==============================================*/
2258 	trx_t*			trx,		/*!< in: transaction used for
2259 						the import */
2260 	dict_table_t*		table,		/*!< in: table the indexes
2261 						belong to */
2262 	const row_import&	cfg)		/*!< Import context */
2263 {
2264 	dict_index_t*		index;
2265 	ulint			n_rows_in_table;
2266 	dberr_t			err = DB_SUCCESS;
2267 
2268 	/* Skip the clustered index. */
2269 	index = dict_table_get_first_index(table);
2270 
2271 	n_rows_in_table = cfg.get_n_rows(index->name);
2272 
2273 	DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2274 			n_rows_in_table++;);
2275 
2276 	/* Adjust the root pages of the secondary indexes only. */
2277 	while ((index = dict_table_get_next_index(index)) != NULL) {
2278 		ut_a(!dict_index_is_clust(index));
2279 
2280 		if (!(index->type & DICT_CORRUPT)
2281 		    && index->page != FIL_NULL) {
2282 
2283 			/* Update the Btree segment headers for index node and
2284 			leaf nodes in the root page. Set the new space id. */
2285 
2286 			err = btr_root_adjust_on_import(index);
2287 		} else {
2288 			ib::warn() << "Skip adjustment of root pages for"
2289 				" index " << index->name << ".";
2290 
2291 			err = DB_CORRUPTION;
2292 		}
2293 
2294 		if (err != DB_SUCCESS) {
2295 
2296 			if (index->type & DICT_CLUSTERED) {
2297 				break;
2298 			}
2299 
2300 			ib_errf(trx->mysql_thd,
2301 				IB_LOG_LEVEL_WARN,
2302 				ER_INNODB_INDEX_CORRUPT,
2303 				"Index %s not found or corrupt,"
2304 				" you should recreate this index.",
2305 				index->name());
2306 
2307 			/* Do not bail out, so that the data
2308 			can be recovered. */
2309 
2310 			err = DB_SUCCESS;
2311 			index->type |= DICT_CORRUPT;
2312 			continue;
2313 		}
2314 
2315 		/* If we failed to purge any records in the index then
2316 		do it the hard way.
2317 
2318 		TODO: We can do this in the first pass by generating UNDO log
2319 		records for the failed rows. */
2320 
2321 		if (!cfg.requires_purge(index->name)) {
2322 			continue;
2323 		}
2324 
2325 		IndexPurge   purge(trx, index);
2326 
2327 		trx->op_info = "secondary: purge delete marked records";
2328 
2329 		err = purge.garbage_collect();
2330 
2331 		trx->op_info = "";
2332 
2333 		if (err != DB_SUCCESS) {
2334 			break;
2335 		} else if (purge.get_n_rows() != n_rows_in_table) {
2336 
2337 			ib_errf(trx->mysql_thd,
2338 				IB_LOG_LEVEL_WARN,
2339 				ER_INNODB_INDEX_CORRUPT,
2340 				"Index '%s' contains " ULINTPF " entries, "
2341 				"should be " ULINTPF ", you should recreate "
2342 				"this index.", index->name(),
2343 				purge.get_n_rows(), n_rows_in_table);
2344 
2345 			index->type |= DICT_CORRUPT;
2346 
2347 			/* Do not bail out, so that the data
2348 			can be recovered. */
2349 
2350 			err = DB_SUCCESS;
2351                 }
2352 	}
2353 
2354 	return(err);
2355 }
2356 
2357 /*****************************************************************//**
2358 Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */
2359 MY_ATTRIBUTE((nonnull)) static
2360 void
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2361 row_import_set_sys_max_row_id(
2362 /*==========================*/
2363 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2364 						handler */
2365 	const dict_table_t*	table)		/*!< in: table to import */
2366 {
2367 	const rec_t*		rec;
2368 	mtr_t			mtr;
2369 	btr_pcur_t		pcur;
2370 	row_id_t		row_id	= 0;
2371 	dict_index_t*		index;
2372 
2373 	index = dict_table_get_first_index(table);
2374 	ut_ad(index->is_primary());
2375 	ut_ad(dict_index_is_auto_gen_clust(index));
2376 
2377 	mtr_start(&mtr);
2378 
2379 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2380 
2381 	btr_pcur_open_at_index_side(
2382 		false,		// High end
2383 		index,
2384 		BTR_SEARCH_LEAF,
2385 		&pcur,
2386 		true,		// Init cursor
2387 		0,		// Leaf level
2388 		&mtr);
2389 
2390 	btr_pcur_move_to_prev_on_page(&pcur);
2391 	rec = btr_pcur_get_rec(&pcur);
2392 
2393 	/* Check for empty table. */
2394 	if (page_rec_is_infimum(rec)) {
2395 		/* The table is empty. */
2396 	} else if (rec_is_metadata(rec, *index)) {
2397 		/* The clustered index contains the metadata record only,
2398 		that is, the table is empty. */
2399 	} else {
2400 		row_id = mach_read_from_6(rec);
2401 	}
2402 
2403 	btr_pcur_close(&pcur);
2404 	mtr_commit(&mtr);
2405 
2406 	if (row_id) {
2407 		/* Update the system row id if the imported index row id is
2408 		greater than the max system row id. */
2409 
2410 		mutex_enter(&dict_sys.mutex);
2411 
2412 		if (row_id >= dict_sys.row_id) {
2413 			dict_sys.row_id = row_id + 1;
2414 			dict_hdr_flush_row_id();
2415 		}
2416 
2417 		mutex_exit(&dict_sys.mutex);
2418 	}
2419 }
2420 
2421 /*****************************************************************//**
2422 Read the a string from the meta data file.
2423 @return DB_SUCCESS or error code. */
2424 static
2425 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2426 row_import_cfg_read_string(
2427 /*=======================*/
2428 	FILE*		file,		/*!< in/out: File to read from */
2429 	byte*		ptr,		/*!< out: string to read */
2430 	ulint		max_len)	/*!< in: maximum length of the output
2431 					buffer in bytes */
2432 {
2433 	DBUG_EXECUTE_IF("ib_import_string_read_error",
2434 			errno = EINVAL; return(DB_IO_ERROR););
2435 
2436 	ulint		len = 0;
2437 
2438 	while (!feof(file)) {
2439 		int	ch = fgetc(file);
2440 
2441 		if (ch == EOF) {
2442 			break;
2443 		} else if (ch != 0) {
2444 			if (len < max_len) {
2445 				ptr[len++] = ch;
2446 			} else {
2447 				break;
2448 			}
2449 		/* max_len includes the NUL byte */
2450 		} else if (len != max_len - 1) {
2451 			break;
2452 		} else {
2453 			ptr[len] = 0;
2454 			return(DB_SUCCESS);
2455 		}
2456 	}
2457 
2458 	errno = EINVAL;
2459 
2460 	return(DB_IO_ERROR);
2461 }
2462 
2463 /*********************************************************************//**
2464 Write the meta data (index user fields) config file.
2465 @return DB_SUCCESS or error code. */
2466 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2467 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2468 row_import_cfg_read_index_fields(
2469 /*=============================*/
2470 	FILE*			file,	/*!< in: file to write to */
2471 	THD*			thd,	/*!< in/out: session */
2472 	row_index_t*		index)	/*!< Index being read in */
2473 {
2474 	byte			row[sizeof(ib_uint32_t) * 3];
2475 	ulint			n_fields = index->m_n_fields;
2476 
2477 	index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2478 
2479 	/* Trigger OOM */
2480 	DBUG_EXECUTE_IF(
2481 		"ib_import_OOM_4",
2482 		UT_DELETE_ARRAY(index->m_fields);
2483 		index->m_fields = NULL;
2484 	);
2485 
2486 	if (index->m_fields == NULL) {
2487 		return(DB_OUT_OF_MEMORY);
2488 	}
2489 
2490 	dict_field_t*	field = index->m_fields;
2491 
2492 	for (ulint i = 0; i < n_fields; ++i, ++field) {
2493 		byte*		ptr = row;
2494 
2495 		/* Trigger EOF */
2496 		DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2497 				(void) fseek(file, 0L, SEEK_END););
2498 
2499 		if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2500 
2501 			ib_senderrf(
2502 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2503 				(ulong) errno, strerror(errno),
2504 				"while reading index fields.");
2505 
2506 			return(DB_IO_ERROR);
2507 		}
2508 
2509 		new (field) dict_field_t();
2510 
2511 		field->prefix_len = mach_read_from_4(ptr);
2512 		ptr += sizeof(ib_uint32_t);
2513 
2514 		field->fixed_len = mach_read_from_4(ptr);
2515 		ptr += sizeof(ib_uint32_t);
2516 
2517 		/* Include the NUL byte in the length. */
2518 		ulint	len = mach_read_from_4(ptr);
2519 
2520 		byte*	name = UT_NEW_ARRAY_NOKEY(byte, len);
2521 
2522 		/* Trigger OOM */
2523 		DBUG_EXECUTE_IF(
2524 			"ib_import_OOM_5",
2525 			UT_DELETE_ARRAY(name);
2526 			name = NULL;
2527 		);
2528 
2529 		if (name == NULL) {
2530 			return(DB_OUT_OF_MEMORY);
2531 		}
2532 
2533 		field->name = reinterpret_cast<const char*>(name);
2534 
2535 		dberr_t	err = row_import_cfg_read_string(file, name, len);
2536 
2537 		if (err != DB_SUCCESS) {
2538 
2539 			ib_senderrf(
2540 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2541 				(ulong) errno, strerror(errno),
2542 				"while parsing table name.");
2543 
2544 			return(err);
2545 		}
2546 	}
2547 
2548 	return(DB_SUCCESS);
2549 }
2550 
2551 /*****************************************************************//**
2552 Read the index names and root page numbers of the indexes and set the values.
2553 Row format [root_page_no, len of str, str ... ]
2554 @return DB_SUCCESS or error code. */
2555 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2556 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2557 row_import_read_index_data(
2558 /*=======================*/
2559 	FILE*		file,		/*!< in: File to read from */
2560 	THD*		thd,		/*!< in: session */
2561 	row_import*	cfg)		/*!< in/out: meta-data read */
2562 {
2563 	byte*		ptr;
2564 	row_index_t*	cfg_index;
2565 	byte		row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2566 
2567 	/* FIXME: What is the max value? */
2568 	ut_a(cfg->m_n_indexes > 0);
2569 	ut_a(cfg->m_n_indexes < 1024);
2570 
2571 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2572 
2573 	/* Trigger OOM */
2574 	DBUG_EXECUTE_IF(
2575 		"ib_import_OOM_6",
2576 		UT_DELETE_ARRAY(cfg->m_indexes);
2577 		cfg->m_indexes = NULL;
2578 	);
2579 
2580 	if (cfg->m_indexes == NULL) {
2581 		return(DB_OUT_OF_MEMORY);
2582 	}
2583 
2584 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2585 
2586 	cfg_index = cfg->m_indexes;
2587 
2588 	for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2589 		/* Trigger EOF */
2590 		DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2591 				(void) fseek(file, 0L, SEEK_END););
2592 
2593 		/* Read the index data. */
2594 		size_t	n_bytes = fread(row, 1, sizeof(row), file);
2595 
2596 		/* Trigger EOF */
2597 		DBUG_EXECUTE_IF("ib_import_io_read_error",
2598 				(void) fseek(file, 0L, SEEK_END););
2599 
2600 		if (n_bytes != sizeof(row)) {
2601 			char	msg[BUFSIZ];
2602 
2603 			snprintf(msg, sizeof(msg),
2604 				 "while reading index meta-data, expected "
2605 				 "to read " ULINTPF
2606 				 " bytes but read only " ULINTPF " bytes",
2607 				 sizeof(row), n_bytes);
2608 
2609 			ib_senderrf(
2610 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2611 				(ulong) errno, strerror(errno), msg);
2612 
2613 			ib::error() << "IO Error: " << msg;
2614 
2615 			return(DB_IO_ERROR);
2616 		}
2617 
2618 		ptr = row;
2619 
2620 		cfg_index->m_id = mach_read_from_8(ptr);
2621 		ptr += sizeof(index_id_t);
2622 
2623 		cfg_index->m_space = mach_read_from_4(ptr);
2624 		ptr += sizeof(ib_uint32_t);
2625 
2626 		cfg_index->m_page_no = mach_read_from_4(ptr);
2627 		ptr += sizeof(ib_uint32_t);
2628 
2629 		cfg_index->m_type = mach_read_from_4(ptr);
2630 		ptr += sizeof(ib_uint32_t);
2631 
2632 		cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2633 		if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2634 			ut_ad(0);
2635 			/* Overflow. Pretend that the clustered index
2636 			has a variable-length PRIMARY KEY. */
2637 			cfg_index->m_trx_id_offset = 0;
2638 		}
2639 		ptr += sizeof(ib_uint32_t);
2640 
2641 		cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2642 		ptr += sizeof(ib_uint32_t);
2643 
2644 		cfg_index->m_n_uniq = mach_read_from_4(ptr);
2645 		ptr += sizeof(ib_uint32_t);
2646 
2647 		cfg_index->m_n_nullable = mach_read_from_4(ptr);
2648 		ptr += sizeof(ib_uint32_t);
2649 
2650 		cfg_index->m_n_fields = mach_read_from_4(ptr);
2651 		ptr += sizeof(ib_uint32_t);
2652 
2653 		/* The NUL byte is included in the name length. */
2654 		ulint	len = mach_read_from_4(ptr);
2655 
2656 		if (len > OS_FILE_MAX_PATH) {
2657 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2658 				ER_INNODB_INDEX_CORRUPT,
2659 				"Index name length (" ULINTPF ") is too long, "
2660 				"the meta-data is corrupt", len);
2661 
2662 			return(DB_CORRUPTION);
2663 		}
2664 
2665 		cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2666 
2667 		/* Trigger OOM */
2668 		DBUG_EXECUTE_IF(
2669 			"ib_import_OOM_7",
2670 			UT_DELETE_ARRAY(cfg_index->m_name);
2671 			cfg_index->m_name = NULL;
2672 		);
2673 
2674 		if (cfg_index->m_name == NULL) {
2675 			return(DB_OUT_OF_MEMORY);
2676 		}
2677 
2678 		dberr_t	err;
2679 
2680 		err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2681 
2682 		if (err != DB_SUCCESS) {
2683 
2684 			ib_senderrf(
2685 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2686 				(ulong) errno, strerror(errno),
2687 				"while parsing index name.");
2688 
2689 			return(err);
2690 		}
2691 
2692 		err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2693 
2694 		if (err != DB_SUCCESS) {
2695 			return(err);
2696 		}
2697 
2698 	}
2699 
2700 	return(DB_SUCCESS);
2701 }
2702 
2703 /*****************************************************************//**
2704 Set the index root page number for v1 format.
2705 @return DB_SUCCESS or error code. */
2706 static
2707 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2708 row_import_read_indexes(
2709 /*====================*/
2710 	FILE*		file,		/*!< in: File to read from */
2711 	THD*		thd,		/*!< in: session */
2712 	row_import*	cfg)		/*!< in/out: meta-data read */
2713 {
2714 	byte		row[sizeof(ib_uint32_t)];
2715 
2716 	/* Trigger EOF */
2717 	DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2718 			(void) fseek(file, 0L, SEEK_END););
2719 
2720 	/* Read the number of indexes. */
2721 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2722 		ib_senderrf(
2723 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2724 			(ulong) errno, strerror(errno),
2725 			"while reading number of indexes.");
2726 
2727 		return(DB_IO_ERROR);
2728 	}
2729 
2730 	cfg->m_n_indexes = mach_read_from_4(row);
2731 
2732 	if (cfg->m_n_indexes == 0) {
2733 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2734 			"Number of indexes in meta-data file is 0");
2735 
2736 		return(DB_CORRUPTION);
2737 
2738 	} else if (cfg->m_n_indexes > 1024) {
2739 		// FIXME: What is the upper limit? */
2740 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2741 			"Number of indexes in meta-data file is too high: "
2742 			ULINTPF, cfg->m_n_indexes);
2743 		cfg->m_n_indexes = 0;
2744 
2745 		return(DB_CORRUPTION);
2746 	}
2747 
2748 	return(row_import_read_index_data(file, thd, cfg));
2749 }
2750 
2751 /*********************************************************************//**
2752 Read the meta data (table columns) config file. Deserialise the contents of
2753 dict_col_t structure, along with the column name. */
2754 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2755 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2756 row_import_read_columns(
2757 /*====================*/
2758 	FILE*			file,	/*!< in: file to write to */
2759 	THD*			thd,	/*!< in/out: session */
2760 	row_import*		cfg)	/*!< in/out: meta-data read */
2761 {
2762 	dict_col_t*		col;
2763 	byte			row[sizeof(ib_uint32_t) * 8];
2764 
2765 	/* FIXME: What should the upper limit be? */
2766 	ut_a(cfg->m_n_cols > 0);
2767 	ut_a(cfg->m_n_cols < 1024);
2768 
2769 	cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2770 
2771 	/* Trigger OOM */
2772 	DBUG_EXECUTE_IF(
2773 		"ib_import_OOM_8",
2774 		UT_DELETE_ARRAY(cfg->m_cols);
2775 		cfg->m_cols = NULL;
2776 	);
2777 
2778 	if (cfg->m_cols == NULL) {
2779 		return(DB_OUT_OF_MEMORY);
2780 	}
2781 
2782 	cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2783 
2784 	/* Trigger OOM */
2785 	DBUG_EXECUTE_IF(
2786 		"ib_import_OOM_9",
2787 		UT_DELETE_ARRAY(cfg->m_col_names);
2788 		cfg->m_col_names = NULL;
2789 	);
2790 
2791 	if (cfg->m_col_names == NULL) {
2792 		return(DB_OUT_OF_MEMORY);
2793 	}
2794 
2795 	memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2796 	memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2797 
2798 	col = cfg->m_cols;
2799 
2800 	for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2801 		byte*		ptr = row;
2802 
2803 		/* Trigger EOF */
2804 		DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2805 				(void) fseek(file, 0L, SEEK_END););
2806 
2807 		if (fread(row, 1,  sizeof(row), file) != sizeof(row)) {
2808 			ib_senderrf(
2809 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2810 				(ulong) errno, strerror(errno),
2811 				"while reading table column meta-data.");
2812 
2813 			return(DB_IO_ERROR);
2814 		}
2815 
2816 		col->prtype = mach_read_from_4(ptr);
2817 		ptr += sizeof(ib_uint32_t);
2818 
2819 		col->mtype = mach_read_from_4(ptr);
2820 		ptr += sizeof(ib_uint32_t);
2821 
2822 		col->len = mach_read_from_4(ptr);
2823 		ptr += sizeof(ib_uint32_t);
2824 
2825 		ulint mbminmaxlen = mach_read_from_4(ptr);
2826 		col->mbmaxlen = mbminmaxlen / 5;
2827 		col->mbminlen = mbminmaxlen % 5;
2828 		ptr += sizeof(ib_uint32_t);
2829 
2830 		col->ind = mach_read_from_4(ptr);
2831 		ptr += sizeof(ib_uint32_t);
2832 
2833 		col->ord_part = mach_read_from_4(ptr);
2834 		ptr += sizeof(ib_uint32_t);
2835 
2836 		col->max_prefix = mach_read_from_4(ptr);
2837 		ptr += sizeof(ib_uint32_t);
2838 
2839 		/* Read in the column name as [len, byte array]. The len
2840 		includes the NUL byte. */
2841 
2842 		ulint		len = mach_read_from_4(ptr);
2843 
2844 		/* FIXME: What is the maximum column name length? */
2845 		if (len == 0 || len > 128) {
2846 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2847 				ER_IO_READ_ERROR,
2848 				"Column name length " ULINTPF ", is invalid",
2849 				len);
2850 
2851 			return(DB_CORRUPTION);
2852 		}
2853 
2854 		cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2855 
2856 		/* Trigger OOM */
2857 		DBUG_EXECUTE_IF(
2858 			"ib_import_OOM_10",
2859 			UT_DELETE_ARRAY(cfg->m_col_names[i]);
2860 			cfg->m_col_names[i] = NULL;
2861 		);
2862 
2863 		if (cfg->m_col_names[i] == NULL) {
2864 			return(DB_OUT_OF_MEMORY);
2865 		}
2866 
2867 		dberr_t	err;
2868 
2869 		err = row_import_cfg_read_string(
2870 			file, cfg->m_col_names[i], len);
2871 
2872 		if (err != DB_SUCCESS) {
2873 
2874 			ib_senderrf(
2875 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2876 				(ulong) errno, strerror(errno),
2877 				"while parsing table column name.");
2878 
2879 			return(err);
2880 		}
2881 	}
2882 
2883 	return(DB_SUCCESS);
2884 }
2885 
2886 /*****************************************************************//**
2887 Read the contents of the <tablespace>.cfg file.
2888 @return DB_SUCCESS or error code. */
2889 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2890 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2891 row_import_read_v1(
2892 /*===============*/
2893 	FILE*		file,		/*!< in: File to read from */
2894 	THD*		thd,		/*!< in: session */
2895 	row_import*	cfg)		/*!< out: meta data */
2896 {
2897 	byte		value[sizeof(ib_uint32_t)];
2898 
2899 	/* Trigger EOF */
2900 	DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2901 			(void) fseek(file, 0L, SEEK_END););
2902 
2903 	/* Read the hostname where the tablespace was exported. */
2904 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2905 		ib_senderrf(
2906 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2907 			(ulong) errno, strerror(errno),
2908 			"while reading meta-data export hostname length.");
2909 
2910 		return(DB_IO_ERROR);
2911 	}
2912 
2913 	ulint	len = mach_read_from_4(value);
2914 
2915 	/* NUL byte is part of name length. */
2916 	cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2917 
2918 	/* Trigger OOM */
2919 	DBUG_EXECUTE_IF(
2920 		"ib_import_OOM_1",
2921 		UT_DELETE_ARRAY(cfg->m_hostname);
2922 		cfg->m_hostname = NULL;
2923 	);
2924 
2925 	if (cfg->m_hostname == NULL) {
2926 		return(DB_OUT_OF_MEMORY);
2927 	}
2928 
2929 	dberr_t	err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2930 
2931 	if (err != DB_SUCCESS) {
2932 
2933 		ib_senderrf(
2934 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2935 			(ulong) errno, strerror(errno),
2936 			"while parsing export hostname.");
2937 
2938 		return(err);
2939 	}
2940 
2941 	/* Trigger EOF */
2942 	DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2943 			(void) fseek(file, 0L, SEEK_END););
2944 
2945 	/* Read the table name of tablespace that was exported. */
2946 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2947 		ib_senderrf(
2948 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2949 			(ulong) errno, strerror(errno),
2950 			"while reading meta-data table name length.");
2951 
2952 		return(DB_IO_ERROR);
2953 	}
2954 
2955 	len = mach_read_from_4(value);
2956 
2957 	/* NUL byte is part of name length. */
2958 	cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2959 
2960 	/* Trigger OOM */
2961 	DBUG_EXECUTE_IF(
2962 		"ib_import_OOM_2",
2963 		UT_DELETE_ARRAY(cfg->m_table_name);
2964 		cfg->m_table_name = NULL;
2965 	);
2966 
2967 	if (cfg->m_table_name == NULL) {
2968 		return(DB_OUT_OF_MEMORY);
2969 	}
2970 
2971 	err = row_import_cfg_read_string(file, cfg->m_table_name, len);
2972 
2973 	if (err != DB_SUCCESS) {
2974 		ib_senderrf(
2975 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2976 			(ulong) errno, strerror(errno),
2977 			"while parsing table name.");
2978 
2979 		return(err);
2980 	}
2981 
2982 	ib::info() << "Importing tablespace for table '" << cfg->m_table_name
2983 		<< "' that was exported from host '" << cfg->m_hostname << "'";
2984 
2985 	byte		row[sizeof(ib_uint32_t) * 3];
2986 
2987 	/* Trigger EOF */
2988 	DBUG_EXECUTE_IF("ib_import_io_read_error_7",
2989 			(void) fseek(file, 0L, SEEK_END););
2990 
2991 	/* Read the autoinc value. */
2992 	if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
2993 		ib_senderrf(
2994 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2995 			(ulong) errno, strerror(errno),
2996 			"while reading autoinc value.");
2997 
2998 		return(DB_IO_ERROR);
2999 	}
3000 
3001 	cfg->m_autoinc = mach_read_from_8(row);
3002 
3003 	/* Trigger EOF */
3004 	DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3005 			(void) fseek(file, 0L, SEEK_END););
3006 
3007 	/* Read the tablespace page size. */
3008 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3009 		ib_senderrf(
3010 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3011 			(ulong) errno, strerror(errno),
3012 			"while reading meta-data header.");
3013 
3014 		return(DB_IO_ERROR);
3015 	}
3016 
3017 	byte*		ptr = row;
3018 
3019 	const ulint	logical_page_size = mach_read_from_4(ptr);
3020 	ptr += sizeof(ib_uint32_t);
3021 
3022 	if (logical_page_size != srv_page_size) {
3023 
3024 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3025 			"Tablespace to be imported has a different"
3026 			" page size than this server. Server page size"
3027 			" is %lu, whereas tablespace page size"
3028 			" is " ULINTPF,
3029 			srv_page_size,
3030 			logical_page_size);
3031 
3032 		return(DB_ERROR);
3033 	}
3034 
3035 	cfg->m_flags = mach_read_from_4(ptr);
3036 	ptr += sizeof(ib_uint32_t);
3037 
3038 	cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags);
3039 	cfg->m_n_cols = mach_read_from_4(ptr);
3040 
3041 	if (!dict_tf_is_valid(cfg->m_flags)) {
3042 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
3043 			ER_TABLE_SCHEMA_MISMATCH,
3044 			"Invalid table flags: " ULINTPF, cfg->m_flags);
3045 
3046 		return(DB_CORRUPTION);
3047 	}
3048 
3049 	err = row_import_read_columns(file, thd, cfg);
3050 
3051 	if (err == DB_SUCCESS) {
3052 		err = row_import_read_indexes(file, thd, cfg);
3053 	}
3054 
3055 	return(err);
3056 }
3057 
3058 /**
3059 Read the contents of the <tablespace>.cfg file.
3060 @return DB_SUCCESS or error code. */
3061 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3062 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3063 row_import_read_meta_data(
3064 /*======================*/
3065 	FILE*		file,		/*!< in: File to read from */
3066 	THD*		thd,		/*!< in: session */
3067 	row_import&	cfg)		/*!< out: contents of the .cfg file */
3068 {
3069 	byte		row[sizeof(ib_uint32_t)];
3070 
3071 	/* Trigger EOF */
3072 	DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3073 			(void) fseek(file, 0L, SEEK_END););
3074 
3075 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3076 		ib_senderrf(
3077 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3078 			(ulong) errno, strerror(errno),
3079 			"while reading meta-data version.");
3080 
3081 		return(DB_IO_ERROR);
3082 	}
3083 
3084 	cfg.m_version = mach_read_from_4(row);
3085 
3086 	/* Check the version number. */
3087 	switch (cfg.m_version) {
3088 	case IB_EXPORT_CFG_VERSION_V1:
3089 
3090 		return(row_import_read_v1(file, thd, &cfg));
3091 	default:
3092 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3093 			"Unsupported meta-data version number (" ULINTPF "), "
3094 			"file ignored", cfg.m_version);
3095 	}
3096 
3097 	return(DB_ERROR);
3098 }
3099 
3100 #define BTR_BLOB_HDR_PART_LEN 0 /*!< BLOB part len on this page */
3101 #define BTR_BLOB_HDR_NEXT_PAGE_NO 4 /*!< next BLOB part page no,
3102                                     FIL_NULL if none */
3103 #define BTR_BLOB_HDR_SIZE 8 /*!< Size of a BLOB part header, in bytes */
3104 
3105 /* decrypt and decompress page if needed */
decrypt_decompress(fil_space_crypt_t * space_crypt,size_t space_flags,span<byte> page,size_t space_id,byte * page_compress_buf)3106 static dberr_t decrypt_decompress(fil_space_crypt_t *space_crypt,
3107                                   size_t space_flags, span<byte> page,
3108                                   size_t space_id, byte *page_compress_buf)
3109 {
3110   auto *data= page.data();
3111 
3112   if (space_crypt && space_crypt->should_encrypt())
3113   {
3114     if (!buf_page_verify_crypt_checksum(data, space_flags))
3115       return DB_CORRUPTION;
3116 
3117     if (dberr_t err= fil_space_decrypt(space_id, space_crypt, data,
3118                                        page.size(), space_flags, data))
3119       return err;
3120   }
3121   else if (fil_page_is_compressed_encrypted(data))
3122     return DB_CORRUPTION;
3123 
3124   const bool is_full_crc32_compressed=
3125       fil_space_t::is_full_crc32_compressed(space_flags);
3126 
3127   const bool page_actually_compressed=
3128       (is_full_crc32_compressed &&
3129        buf_page_is_compressed(data, space_flags)) ||
3130       fil_page_is_compressed_encrypted(data) || fil_page_is_compressed(data);
3131 
3132   if (page_actually_compressed)
3133   {
3134     if (!is_full_crc32_compressed && !fil_space_t::is_compressed(space_flags))
3135       return DB_CORRUPTION;
3136 
3137     auto compress_length=
3138         fil_page_decompress(page_compress_buf, data, space_flags);
3139     ut_ad(compress_length != srv_page_size);
3140 
3141     if (compress_length == 0)
3142       return DB_CORRUPTION;
3143   }
3144 
3145   return DB_SUCCESS;
3146 }
3147 
get_buf_size()3148 static size_t get_buf_size()
3149 {
3150   return srv_page_size
3151 #ifdef HAVE_LZO
3152          + LZO1X_1_15_MEM_COMPRESS
3153 #elif defined HAVE_SNAPPY
3154          + snappy_max_compressed_length(srv_page_size)
3155 #endif
3156       ;
3157 }
3158 
3159 /* find, parse instant metadata, performing variaous checks,
3160 and apply it to dict_table_t
3161 @return DB_SUCCESS or some error */
handle_instant_metadata(dict_table_t * table,const row_import & cfg)3162 static dberr_t handle_instant_metadata(dict_table_t *table,
3163                                        const row_import &cfg)
3164 {
3165   dict_get_and_save_data_dir_path(table, false);
3166 
3167   char *filepath;
3168   if (DICT_TF_HAS_DATA_DIR(table->flags))
3169   {
3170     ut_a(table->data_dir_path);
3171 
3172     filepath=
3173         fil_make_filepath(table->data_dir_path, table->name.m_name, IBD, true);
3174   }
3175   else
3176     filepath= fil_make_filepath(nullptr, table->name.m_name, IBD, false);
3177 
3178   if (!filepath)
3179     return DB_OUT_OF_MEMORY;
3180 
3181   SCOPE_EXIT([filepath]() { ut_free(filepath); });
3182 
3183   bool success;
3184   auto file= os_file_create_simple_no_error_handling(
3185       innodb_data_file_key, filepath, OS_FILE_OPEN, OS_FILE_READ_WRITE, false,
3186       &success);
3187   if (!success)
3188     return DB_IO_ERROR;
3189 
3190   if (os_file_get_size(file) < srv_page_size * 4)
3191     return DB_CORRUPTION;
3192 
3193   SCOPE_EXIT([&file]() { os_file_close(file); });
3194 
3195   std::unique_ptr<byte[], decltype(&aligned_free)> first_page(
3196       static_cast<byte *>(aligned_malloc(srv_page_size, srv_page_size)),
3197       &aligned_free);
3198 
3199   if (dberr_t err= os_file_read_no_error_handling(IORequest(IORequest::READ),
3200                                                   file, first_page.get(), 0,
3201                                                   srv_page_size, nullptr))
3202     return err;
3203 
3204   auto space_flags= fsp_header_get_flags(first_page.get());
3205 
3206   if (!fil_space_t::is_valid_flags(space_flags, true))
3207   {
3208     auto cflags= fsp_flags_convert_from_101(space_flags);
3209     if (cflags == ULINT_UNDEFINED)
3210     {
3211       ib::error() << "Invalid FSP_SPACE_FLAGS=" << ib::hex(space_flags);
3212       return DB_CORRUPTION;
3213     }
3214     space_flags= cflags;
3215   }
3216 
3217   if (!cfg.m_missing)
3218   {
3219     if (dberr_t err= cfg.match_flags(current_thd))
3220       return err;
3221   }
3222 
3223   const unsigned zip_size= fil_space_t::zip_size(space_flags);
3224   const unsigned physical_size= zip_size ? zip_size : unsigned(srv_page_size);
3225   ut_ad(physical_size <= UNIV_PAGE_SIZE_MAX);
3226   const uint32_t space_id= page_get_space_id(first_page.get());
3227 
3228   auto *space_crypt= fil_space_read_crypt_data(zip_size, first_page.get());
3229   SCOPE_EXIT([&space_crypt]() {
3230     if (space_crypt)
3231       fil_space_destroy_crypt_data(&space_crypt);
3232   });
3233 
3234   std::unique_ptr<byte[], decltype(&aligned_free)> page(
3235       static_cast<byte *>(
3236           aligned_malloc(UNIV_PAGE_SIZE_MAX, UNIV_PAGE_SIZE_MAX)),
3237       &aligned_free);
3238 
3239   if (dberr_t err= os_file_read_no_error_handling(
3240           IORequest(IORequest::READ), file, page.get(), 3 * physical_size,
3241           physical_size, nullptr))
3242     return err;
3243 
3244   std::unique_ptr<byte[]> page_compress_buf(new byte[get_buf_size()]);
3245 
3246   if (dberr_t err=
3247           decrypt_decompress(space_crypt, space_flags,
3248                              {page.get(), static_cast<size_t>(physical_size)},
3249                              space_id, page_compress_buf.get()))
3250     return err;
3251 
3252   if (table->supports_instant())
3253   {
3254     dict_index_t *index= dict_table_get_first_index(table);
3255 
3256     auto tmp1= table->space_id;
3257     table->space_id= page_get_space_id(page.get());
3258     SCOPE_EXIT([tmp1, table]() { table->space_id= tmp1; });
3259 
3260     auto tmp2= index->page;
3261     index->page= page_get_page_no(page.get());
3262     SCOPE_EXIT([tmp2, index]() { index->page= tmp2; });
3263 
3264     if (!page_is_comp(page.get()) != !dict_table_is_comp(table))
3265     {
3266       ib_errf(current_thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3267               "ROW_FORMAT mismatch");
3268       return DB_CORRUPTION;
3269     }
3270 
3271     if (btr_cur_instant_root_init(index, page.get()))
3272       return DB_ERROR;
3273 
3274     ut_ad(index->n_core_null_bytes != dict_index_t::NO_CORE_NULL_BYTES);
3275 
3276     if (fil_page_get_type(page.get()) == FIL_PAGE_INDEX)
3277     {
3278       ut_ad(!index->is_instant());
3279       return DB_SUCCESS;
3280     }
3281 
3282     mem_heap_t *heap= NULL;
3283     SCOPE_EXIT([&heap]() {
3284       if (heap)
3285         mem_heap_free(heap);
3286     });
3287 
3288     while (btr_page_get_level(page.get()) != 0)
3289     {
3290       const rec_t *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3291 
3292       /* Relax the assertion in rec_init_offsets(). */
3293       ut_ad(!index->in_instant_init);
3294       ut_d(index->in_instant_init= true);
3295       rec_offs *offsets=
3296           rec_get_offsets(rec, index, nullptr, 0, ULINT_UNDEFINED, &heap);
3297       ut_d(index->in_instant_init= false);
3298 
3299       uint64_t child_page_no= btr_node_ptr_get_child_page_no(rec, offsets);
3300 
3301       if (dberr_t err= os_file_read_no_error_handling(
3302               IORequest(IORequest::READ), file, page.get(),
3303               child_page_no * physical_size, physical_size, nullptr))
3304         return err;
3305 
3306       if (dberr_t err= decrypt_decompress(
3307               space_crypt, space_flags,
3308               {page.get(), static_cast<size_t>(physical_size)}, space_id,
3309               page_compress_buf.get()))
3310         return err;
3311     }
3312 
3313     const auto *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3314     const auto comp= dict_table_is_comp(index->table);
3315     const auto info_bits= rec_get_info_bits(rec, comp);
3316 
3317     if (page_rec_is_supremum(rec) || !(info_bits & REC_INFO_MIN_REC_FLAG))
3318     {
3319       ib::error() << "Table " << index->table->name
3320                   << " is missing instant ALTER metadata";
3321       index->table->corrupted= true;
3322       return DB_CORRUPTION;
3323     }
3324 
3325     if ((info_bits & ~REC_INFO_DELETED_FLAG) != REC_INFO_MIN_REC_FLAG ||
3326         (comp && rec_get_status(rec) != REC_STATUS_INSTANT))
3327     {
3328     incompatible:
3329       ib::error() << "Table " << index->table->name
3330                   << " contains unrecognizable instant ALTER metadata";
3331       index->table->corrupted= true;
3332       return DB_CORRUPTION;
3333     }
3334 
3335     if (info_bits & REC_INFO_DELETED_FLAG)
3336     {
3337       ulint trx_id_offset= index->trx_id_offset;
3338       ut_ad(index->n_uniq);
3339 
3340       if (trx_id_offset)
3341       {
3342       }
3343       else if (index->table->not_redundant())
3344       {
3345 
3346         for (uint i= index->n_uniq; i--;)
3347           trx_id_offset+= index->fields[i].fixed_len;
3348       }
3349       else if (rec_get_1byte_offs_flag(rec))
3350       {
3351         trx_id_offset= rec_1_get_field_end_info(rec, index->n_uniq - 1);
3352         ut_ad(!(trx_id_offset & REC_1BYTE_SQL_NULL_MASK));
3353         trx_id_offset&= ~REC_1BYTE_SQL_NULL_MASK;
3354       }
3355       else
3356       {
3357         trx_id_offset= rec_2_get_field_end_info(rec, index->n_uniq - 1);
3358         ut_ad(!(trx_id_offset & REC_2BYTE_SQL_NULL_MASK));
3359         trx_id_offset&= ~REC_2BYTE_SQL_NULL_MASK;
3360       }
3361 
3362       const byte *ptr=
3363           rec + trx_id_offset + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3364 
3365       if (mach_read_from_4(ptr + BTR_EXTERN_LEN))
3366         goto incompatible;
3367 
3368       uint len= mach_read_from_4(ptr + BTR_EXTERN_LEN + 4);
3369       if (!len || mach_read_from_4(ptr + BTR_EXTERN_OFFSET) != FIL_PAGE_DATA)
3370         goto incompatible;
3371 
3372       std::unique_ptr<byte[], decltype(&aligned_free)> second_page(
3373           static_cast<byte *>(aligned_malloc(physical_size, physical_size)),
3374           &aligned_free);
3375 
3376       if (dberr_t err= os_file_read_no_error_handling(
3377               IORequest(IORequest::READ), file, second_page.get(),
3378               mach_read_from_4(ptr + BTR_EXTERN_PAGE_NO) * physical_size,
3379               srv_page_size, nullptr))
3380         return err;
3381 
3382       if (dberr_t err= decrypt_decompress(
3383               space_crypt, space_flags,
3384               {second_page.get(), static_cast<size_t>(physical_size)},
3385               space_id, page_compress_buf.get()))
3386         return err;
3387 
3388       if (fil_page_get_type(second_page.get()) != FIL_PAGE_TYPE_BLOB ||
3389           mach_read_from_4(
3390               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_NEXT_PAGE_NO]) !=
3391               FIL_NULL ||
3392           mach_read_from_4(
3393               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_PART_LEN]) != len)
3394         goto incompatible;
3395 
3396       /* The unused part of the BLOB page should be zero-filled. */
3397       for (const byte *
3398                b= second_page.get() + (FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE) +
3399                   len,
3400               *const end= second_page.get() + srv_page_size - BTR_EXTERN_LEN;
3401            b < end;)
3402       {
3403         if (*b++)
3404           goto incompatible;
3405       }
3406 
3407       if (index->table->deserialise_columns(
3408               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE], len))
3409         goto incompatible;
3410     }
3411 
3412     rec_offs *offsets= rec_get_offsets(
3413         rec, index, nullptr, index->n_core_fields, ULINT_UNDEFINED, &heap);
3414     if (rec_offs_any_default(offsets))
3415     {
3416     inconsistent:
3417       goto incompatible;
3418     }
3419 
3420     /* In fact, because we only ever append fields to the metadata
3421     record, it is also OK to perform READ UNCOMMITTED and
3422     then ignore any extra fields, provided that
3423     trx_sys.is_registered(DB_TRX_ID). */
3424     if (rec_offs_n_fields(offsets) >
3425             ulint(index->n_fields) + !!index->table->instant &&
3426         !trx_sys.is_registered(current_trx(),
3427                                row_get_rec_trx_id(rec, index, offsets)))
3428       goto inconsistent;
3429 
3430     for (unsigned i= index->n_core_fields; i < index->n_fields; i++)
3431     {
3432       dict_col_t *col= index->fields[i].col;
3433       const unsigned o= i + !!index->table->instant;
3434       ulint len;
3435       const byte *data= rec_get_nth_field(rec, offsets, o, &len);
3436       ut_ad(!col->is_added());
3437       ut_ad(!col->def_val.data);
3438       col->def_val.len= len;
3439       switch (len) {
3440       case UNIV_SQL_NULL:
3441         continue;
3442       case 0:
3443         col->def_val.data= field_ref_zero;
3444         continue;
3445       }
3446       ut_ad(len != UNIV_SQL_DEFAULT);
3447       if (!rec_offs_nth_extern(offsets, o))
3448         col->def_val.data= mem_heap_dup(index->table->heap, data, len);
3449       else if (len < BTR_EXTERN_FIELD_REF_SIZE ||
3450                !memcmp(data + len - BTR_EXTERN_FIELD_REF_SIZE, field_ref_zero,
3451                        BTR_EXTERN_FIELD_REF_SIZE))
3452       {
3453         col->def_val.len= UNIV_SQL_DEFAULT;
3454         goto inconsistent;
3455       }
3456       else
3457       {
3458         col->def_val.data= btr_copy_externally_stored_field(
3459             &col->def_val.len, data, srv_page_size, len, index->table->heap);
3460       }
3461     }
3462   }
3463 
3464   return DB_SUCCESS;
3465 }
3466 
3467 /**
3468 Read the contents of the <tablename>.cfg file.
3469 @return DB_SUCCESS or error code. */
3470 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3471 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3472 row_import_read_cfg(
3473 /*================*/
3474 	dict_table_t*	table,	/*!< in: table */
3475 	THD*		thd,	/*!< in: session */
3476 	row_import&	cfg)	/*!< out: contents of the .cfg file */
3477 {
3478 	dberr_t		err;
3479 	char		name[OS_FILE_MAX_PATH];
3480 
3481 	cfg.m_table = table;
3482 
3483 	srv_get_meta_data_filename(table, name, sizeof(name));
3484 
3485 	FILE*	file = fopen(name, "rb");
3486 
3487 	if (file == NULL) {
3488 		char	msg[BUFSIZ];
3489 
3490 		snprintf(msg, sizeof(msg),
3491 			 "Error opening '%s', will attempt to import"
3492 			 " without schema verification", name);
3493 
3494 		ib_senderrf(
3495 			thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3496 			(ulong) errno, strerror(errno), msg);
3497 
3498 		cfg.m_missing = true;
3499 
3500 		err = DB_FAIL;
3501 	} else {
3502 
3503 		cfg.m_missing = false;
3504 
3505 		err = row_import_read_meta_data(file, thd, cfg);
3506 		fclose(file);
3507 	}
3508 
3509 	return(err);
3510 }
3511 
3512 /** Update the root page numbers and tablespace ID of a table.
3513 @param[in,out]	trx	dictionary transaction
3514 @param[in,out]	table	persistent table
3515 @param[in]	reset	whether to reset the fields to FIL_NULL
3516 @return DB_SUCCESS or error code */
3517 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3518 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3519 {
3520 	const dict_index_t*	index;
3521 	que_t*			graph = 0;
3522 	dberr_t			err = DB_SUCCESS;
3523 
3524 	ut_ad(reset || table->space->id == table->space_id);
3525 
3526 	static const char	sql[] = {
3527 		"PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3528 		"BEGIN\n"
3529 		"UPDATE SYS_INDEXES\n"
3530 		"SET SPACE = :space,\n"
3531 		"    PAGE_NO = :page,\n"
3532 		"    TYPE = :type\n"
3533 		"WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3534 		"END;\n"};
3535 
3536 	table->def_trx_id = trx->id;
3537 
3538 	for (index = dict_table_get_first_index(table);
3539 	     index != 0;
3540 	     index = dict_table_get_next_index(index)) {
3541 
3542 		pars_info_t*	info;
3543 		ib_uint32_t	page;
3544 		ib_uint32_t	space;
3545 		ib_uint32_t	type;
3546 		index_id_t	index_id;
3547 		table_id_t	table_id;
3548 
3549 		info = (graph != 0) ? graph->info : pars_info_create();
3550 
3551 		mach_write_to_4(
3552 			reinterpret_cast<byte*>(&type),
3553 			index->type);
3554 
3555 		mach_write_to_4(
3556 			reinterpret_cast<byte*>(&page),
3557 			reset ? FIL_NULL : index->page);
3558 
3559 		mach_write_to_4(
3560 			reinterpret_cast<byte*>(&space),
3561 			reset ? FIL_NULL : index->table->space_id);
3562 
3563 		mach_write_to_8(
3564 			reinterpret_cast<byte*>(&index_id),
3565 			index->id);
3566 
3567 		mach_write_to_8(
3568 			reinterpret_cast<byte*>(&table_id),
3569 			table->id);
3570 
3571 		/* If we set the corrupt bit during the IMPORT phase then
3572 		we need to update the system tables. */
3573 		pars_info_bind_int4_literal(info, "type", &type);
3574 		pars_info_bind_int4_literal(info, "space", &space);
3575 		pars_info_bind_int4_literal(info, "page", &page);
3576 		pars_info_bind_ull_literal(info, "index_id", &index_id);
3577 		pars_info_bind_ull_literal(info, "table_id", &table_id);
3578 
3579 		if (graph == 0) {
3580 			graph = pars_sql(info, sql);
3581 			ut_a(graph);
3582 			graph->trx = trx;
3583 		}
3584 
3585 		que_thr_t*	thr;
3586 
3587 		graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3588 
3589 		ut_a(thr = que_fork_start_command(graph));
3590 
3591 		que_run_threads(thr);
3592 
3593 		DBUG_EXECUTE_IF("ib_import_internal_error",
3594 				trx->error_state = DB_ERROR;);
3595 
3596 		err = trx->error_state;
3597 
3598 		if (err != DB_SUCCESS) {
3599 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3600 				ER_INTERNAL_ERROR,
3601 				"While updating the <space, root page"
3602 				" number> of index %s - %s",
3603 				index->name(), ut_strerr(err));
3604 
3605 			break;
3606 		}
3607 	}
3608 
3609 	que_graph_free(graph);
3610 
3611 	return(err);
3612 }
3613 
3614 /** Callback arg for row_import_set_discarded. */
3615 struct discard_t {
3616 	ib_uint32_t	flags2;			/*!< Value read from column */
3617 	bool		state;			/*!< New state of the flag */
3618 	ulint		n_recs;			/*!< Number of recs processed */
3619 };
3620 
3621 /******************************************************************//**
3622 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3623 SYS_TABLES. The flags is stored in MIX_LEN column.
3624 @return FALSE if all OK */
3625 static
3626 ibool
row_import_set_discarded(void * row,void * user_arg)3627 row_import_set_discarded(
3628 /*=====================*/
3629 	void*		row,			/*!< in: sel_node_t* */
3630 	void*		user_arg)		/*!< in: bool set/unset flag */
3631 {
3632 	sel_node_t*	node = static_cast<sel_node_t*>(row);
3633 	discard_t*	discard = static_cast<discard_t*>(user_arg);
3634 	dfield_t*	dfield = que_node_get_val(node->select_list);
3635 	dtype_t*	type = dfield_get_type(dfield);
3636 	ulint		len = dfield_get_len(dfield);
3637 
3638 	ut_a(dtype_get_mtype(type) == DATA_INT);
3639 	ut_a(len == sizeof(ib_uint32_t));
3640 
3641 	ulint	flags2 = mach_read_from_4(
3642 		static_cast<byte*>(dfield_get_data(dfield)));
3643 
3644 	if (discard->state) {
3645 		flags2 |= DICT_TF2_DISCARDED;
3646 	} else {
3647 		flags2 &= ~DICT_TF2_DISCARDED;
3648 	}
3649 
3650 	mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3651 
3652 	++discard->n_recs;
3653 
3654 	/* There should be at most one matching record. */
3655 	ut_a(discard->n_recs == 1);
3656 
3657 	return(FALSE);
3658 }
3659 
3660 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3661 @param[in,out]	trx		dictionary transaction
3662 @param[in]	table_id	table identifier
3663 @param[in]	discarded	whether to set or clear the flag
3664 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3665 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3666 					 bool discarded)
3667 {
3668 	pars_info_t*		info;
3669 	discard_t		discard;
3670 
3671 	static const char	sql[] =
3672 		"PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3673 		"DECLARE FUNCTION my_func;\n"
3674 		"DECLARE CURSOR c IS\n"
3675 		" SELECT MIX_LEN"
3676 		" FROM SYS_TABLES"
3677 		" WHERE ID = :table_id FOR UPDATE;"
3678 		"\n"
3679 		"BEGIN\n"
3680 		"OPEN c;\n"
3681 		"WHILE 1 = 1 LOOP\n"
3682 		"  FETCH c INTO my_func();\n"
3683 		"  IF c % NOTFOUND THEN\n"
3684 		"    EXIT;\n"
3685 		"  END IF;\n"
3686 		"END LOOP;\n"
3687 		"UPDATE SYS_TABLES"
3688 		" SET MIX_LEN = :flags2"
3689 		" WHERE ID = :table_id;\n"
3690 		"CLOSE c;\n"
3691 		"END;\n";
3692 
3693 	discard.n_recs = 0;
3694 	discard.state = discarded;
3695 	discard.flags2 = ULINT32_UNDEFINED;
3696 
3697 	info = pars_info_create();
3698 
3699 	pars_info_add_ull_literal(info, "table_id", table_id);
3700 	pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3701 
3702 	pars_info_bind_function(
3703 		info, "my_func", row_import_set_discarded, &discard);
3704 
3705 	dberr_t	err = que_eval_sql(info, sql, false, trx);
3706 
3707 	ut_a(discard.n_recs == 1);
3708 	ut_a(discard.flags2 != ULINT32_UNDEFINED);
3709 
3710 	return(err);
3711 }
3712 
3713 /** InnoDB writes page by page when there is page compressed
3714 tablespace involved. It does help to save the disk space when
3715 punch hole is enabled
3716 @param iter     Tablespace iterator
3717 @param full_crc32    whether the file is in the full_crc32 format
3718 @param write_request Request to write into the file
3719 @param offset   offset of the file to be written
3720 @param writeptr buffer to be written
3721 @param n_bytes  number of bytes to be written
3722 @param try_punch_only   Try the range punch only because the
3723                         current range is full of empty pages
3724 @return DB_SUCCESS */
3725 static
fil_import_compress_fwrite(const fil_iterator_t & iter,bool full_crc32,const IORequest & write_request,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3726 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3727                                    bool full_crc32,
3728                                    const IORequest &write_request,
3729                                    os_offset_t offset,
3730                                    const byte *writeptr,
3731                                    ulint n_bytes,
3732                                    bool try_punch_only= false)
3733 {
3734   if (dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes))
3735     return err;
3736 
3737   if (try_punch_only)
3738     return DB_SUCCESS;
3739 
3740   for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3741   {
3742     /* Read the original data length from block and
3743     safer to read FIL_PAGE_COMPRESSED_SIZE because it
3744     is not encrypted*/
3745     ulint n_write_bytes= srv_page_size;
3746     if (j || offset)
3747     {
3748       n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3749       const unsigned ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3750       /* Ignore the empty page */
3751       if (ptype == 0 && n_write_bytes == 0)
3752         continue;
3753       if (full_crc32)
3754         n_write_bytes= buf_page_full_crc32_size(writeptr + j,
3755                                                 nullptr, nullptr);
3756       else
3757       {
3758         n_write_bytes+= ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3759           ? FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN
3760           : FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
3761       }
3762     }
3763 
3764     if (dberr_t err= os_file_write(write_request, iter.filepath, iter.file,
3765                                    writeptr + j, offset + j, n_write_bytes))
3766       return err;
3767   }
3768 
3769   return DB_SUCCESS;
3770 }
3771 
run(const fil_iterator_t & iter,buf_block_t * block)3772 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3773                                  buf_block_t* block) UNIV_NOTHROW
3774 {
3775   const unsigned zip_size= fil_space_t::zip_size(m_space_flags);
3776   const unsigned size= zip_size ? zip_size : unsigned(srv_page_size);
3777   byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3778   const bool full_crc32 = fil_space_t::full_crc32(m_space_flags);
3779   bool skip_checksum_check = false;
3780   ut_ad(!srv_read_only_mode);
3781 
3782   if (!page_compress_buf)
3783     return DB_OUT_OF_MEMORY;
3784 
3785   const bool encrypted= iter.crypt_data != NULL &&
3786     iter.crypt_data->should_encrypt();
3787   byte* const readptr= iter.io_buffer;
3788   block->frame= readptr;
3789 
3790   if (block->page.zip.data)
3791     block->page.zip.data= readptr;
3792 
3793   IORequest read_request(IORequest::READ);
3794   read_request.disable_partial_io_warnings();
3795   ulint page_no= 0;
3796   bool page_compressed= false;
3797 
3798   dberr_t err= os_file_read_no_error_handling(
3799     read_request, iter.file, readptr, 3 * size, size, 0);
3800   if (err != DB_SUCCESS)
3801   {
3802     ib::error() << iter.filepath << ": os_file_read() failed";
3803     goto func_exit;
3804   }
3805 
3806   block->page.id.set_page_no(3);
3807   page_no= page_get_page_no(readptr);
3808 
3809   if (page_no != 3)
3810   {
3811 page_corrupted:
3812     ib::warn() << filename() << ": Page 3 at offset "
3813                << 3 * size << " looks corrupted.";
3814     err= DB_CORRUPTION;
3815     goto func_exit;
3816   }
3817 
3818   page_compressed=
3819     (full_crc32 && fil_space_t::is_compressed(m_space_flags) &&
3820      buf_page_is_compressed(readptr, m_space_flags)) ||
3821     (fil_page_is_compressed_encrypted(readptr) ||
3822      fil_page_is_compressed(readptr));
3823 
3824   if (page_compressed && block->page.zip.data)
3825     goto page_corrupted;
3826 
3827   if (encrypted)
3828   {
3829     if (!buf_page_verify_crypt_checksum(readptr, m_space_flags))
3830       goto page_corrupted;
3831 
3832     if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3833         buf_page_get_key_version(readptr, m_space_flags))
3834       goto page_corrupted;
3835 
3836     if ((err= fil_space_decrypt(get_space_id(), iter.crypt_data, readptr, size,
3837                                 m_space_flags, readptr)))
3838       goto func_exit;
3839   }
3840 
3841   /* For full_crc32 format, skip checksum check
3842   after decryption. */
3843   skip_checksum_check= full_crc32 && encrypted;
3844 
3845   if (page_compressed)
3846   {
3847     ulint compress_length= fil_page_decompress(page_compress_buf,
3848                                                readptr,
3849                                                m_space_flags);
3850     ut_ad(compress_length != srv_page_size);
3851     if (compress_length == 0)
3852       goto page_corrupted;
3853   }
3854   else if (!skip_checksum_check
3855            && buf_page_is_corrupted(false, readptr, m_space_flags))
3856     goto page_corrupted;
3857 
3858   err= this->operator()(block);
3859 func_exit:
3860   free(page_compress_buf);
3861   return err;
3862 }
3863 
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3864 static dberr_t fil_iterate(
3865 	const fil_iterator_t&	iter,
3866 	buf_block_t*		block,
3867 	AbstractCallback&	callback)
3868 {
3869 	os_offset_t		offset;
3870 	const ulint		size = callback.physical_size();
3871 	ulint			n_bytes = iter.n_io_buffers * size;
3872 
3873 	byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3874 	ut_ad(!srv_read_only_mode);
3875 
3876 	if (!page_compress_buf) {
3877 		return DB_OUT_OF_MEMORY;
3878 	}
3879 
3880 	ulint actual_space_id = 0;
3881 	const bool full_crc32 = fil_space_t::full_crc32(
3882 		callback.get_space_flags());
3883 
3884 	/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3885 	copying for non-index pages. Unfortunately, it is
3886 	required by buf_zip_decompress() */
3887 	dberr_t		err = DB_SUCCESS;
3888 	bool		page_compressed = false;
3889 	bool		punch_hole = true;
3890 	const IORequest	write_request(IORequest::WRITE);
3891 
3892 	for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3893 		if (callback.is_interrupted()) {
3894 			err = DB_INTERRUPTED;
3895 			goto func_exit;
3896 		}
3897 
3898 		byte*		io_buffer = iter.io_buffer;
3899 		block->frame = io_buffer;
3900 
3901 		if (block->page.zip.data) {
3902 			/* Zip IO is done in the compressed page buffer. */
3903 			io_buffer = block->page.zip.data;
3904 		}
3905 
3906 		/* We have to read the exact number of bytes. Otherwise the
3907 		InnoDB IO functions croak on failed reads. */
3908 
3909 		n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3910 				       iter.end - offset));
3911 
3912 		ut_ad(n_bytes > 0);
3913 		ut_ad(!(n_bytes % size));
3914 
3915 		const bool encrypted = iter.crypt_data != NULL
3916 			&& iter.crypt_data->should_encrypt();
3917 		/* Use additional crypt io buffer if tablespace is encrypted */
3918 		byte* const readptr = encrypted
3919 			? iter.crypt_io_buffer : io_buffer;
3920 		byte* const writeptr = readptr;
3921 
3922 		IORequest	read_request(IORequest::READ);
3923 		read_request.disable_partial_io_warnings();
3924 
3925 		err = os_file_read_no_error_handling(
3926 			read_request, iter.file, readptr, offset, n_bytes, 0);
3927 		if (err != DB_SUCCESS) {
3928 			ib::error() << iter.filepath
3929 				    << ": os_file_read() failed";
3930 			goto func_exit;
3931 		}
3932 
3933 		bool		updated = false;
3934 		os_offset_t	page_off = offset;
3935 		ulint		n_pages_read = n_bytes / size;
3936 		block->page.id.set_page_no(ulint(page_off / size));
3937 
3938 		for (ulint i = 0; i < n_pages_read;
3939 		     block->page.id.set_page_no(block->page.id.page_no() + 1),
3940 		     ++i, page_off += size, block->frame += size) {
3941 			byte*	src = readptr + i * size;
3942 			const ulint page_no = page_get_page_no(src);
3943 			if (!page_no && block->page.id.page_no()) {
3944 				if (!buf_is_zeroes(span<const byte>(src,
3945 								    size))) {
3946 					goto page_corrupted;
3947 				}
3948 				/* Proceed to the next page,
3949 				because this one is all zero. */
3950 				continue;
3951 			}
3952 
3953 			if (page_no != block->page.id.page_no()) {
3954 page_corrupted:
3955 				ib::warn() << callback.filename()
3956 					   << ": Page " << (offset / size)
3957 					   << " at offset " << offset
3958 					   << " looks corrupted.";
3959 				err = DB_CORRUPTION;
3960 				goto func_exit;
3961 			}
3962 
3963 			if (block->page.id.page_no() == 0) {
3964 				actual_space_id = mach_read_from_4(
3965 					src + FIL_PAGE_SPACE_ID);
3966 			}
3967 
3968 			page_compressed =
3969 				(full_crc32
3970 				 && fil_space_t::is_compressed(
3971 					callback.get_space_flags())
3972 				 && buf_page_is_compressed(
3973 					src, callback.get_space_flags()))
3974 				|| (fil_page_is_compressed_encrypted(src)
3975 				    || fil_page_is_compressed(src));
3976 
3977 			if (page_compressed && block->page.zip.data) {
3978 				goto page_corrupted;
3979 			}
3980 
3981 			bool decrypted = false;
3982 			byte* dst = io_buffer + i * size;
3983 			bool frame_changed = false;
3984 			uint key_version = buf_page_get_key_version(
3985 				src, callback.get_space_flags());
3986 
3987 			if (!encrypted) {
3988 			} else if (!key_version) {
3989 				if (block->page.id.page_no() == 0
3990 				    && block->page.zip.data) {
3991 					block->page.zip.data = src;
3992 					frame_changed = true;
3993 				} else if (!page_compressed
3994 					   && !block->page.zip.data) {
3995 					block->frame = src;
3996 					frame_changed = true;
3997 				} else {
3998 					ut_ad(dst != src);
3999 					memcpy(dst, src, size);
4000 				}
4001 			} else {
4002 				if (!buf_page_verify_crypt_checksum(
4003 					src, callback.get_space_flags())) {
4004 					goto page_corrupted;
4005 				}
4006 
4007 				if ((err = fil_space_decrypt(
4008 					actual_space_id,
4009 					iter.crypt_data, dst,
4010 					callback.physical_size(),
4011 					callback.get_space_flags(),
4012 					src))) {
4013 					goto func_exit;
4014 				}
4015 
4016 				decrypted = true;
4017 				updated = true;
4018 			}
4019 
4020 			/* For full_crc32 format, skip checksum check
4021 			after decryption. */
4022 			bool skip_checksum_check = full_crc32 && encrypted;
4023 
4024 			/* If the original page is page_compressed, we need
4025 			to decompress it before adjusting further. */
4026 			if (page_compressed) {
4027 				ulint compress_length = fil_page_decompress(
4028 					page_compress_buf, dst,
4029 					callback.get_space_flags());
4030 				ut_ad(compress_length != srv_page_size);
4031 				if (compress_length == 0) {
4032 					goto page_corrupted;
4033 				}
4034 				updated = true;
4035 			} else if (!skip_checksum_check
4036 				   && buf_page_is_corrupted(
4037 					   false,
4038 					   encrypted && !frame_changed
4039 					   ? dst : src,
4040 					   callback.get_space_flags())) {
4041 				goto page_corrupted;
4042 			}
4043 
4044 			if ((err = callback(block)) != DB_SUCCESS) {
4045 				goto func_exit;
4046 			} else if (!updated) {
4047 				updated = buf_block_get_state(block)
4048 					== BUF_BLOCK_FILE_PAGE;
4049 			}
4050 
4051 			/* If tablespace is encrypted we use additional
4052 			temporary scratch area where pages are read
4053 			for decrypting readptr == crypt_io_buffer != io_buffer.
4054 
4055 			Destination for decryption is a buffer pool block
4056 			block->frame == dst == io_buffer that is updated.
4057 			Pages that did not require decryption even when
4058 			tablespace is marked as encrypted are not copied
4059 			instead block->frame is set to src == readptr.
4060 
4061 			For encryption we again use temporary scratch area
4062 			writeptr != io_buffer == dst
4063 			that is then written to the tablespace
4064 
4065 			(1) For normal tables io_buffer == dst == writeptr
4066 			(2) For only page compressed tables
4067 			io_buffer == dst == writeptr
4068 			(3) For encrypted (and page compressed)
4069 			readptr != io_buffer == dst != writeptr
4070 			*/
4071 
4072 			ut_ad(!encrypted && !page_compressed ?
4073 			      src == dst && dst == writeptr + (i * size):1);
4074 			ut_ad(page_compressed && !encrypted ?
4075 			      src == dst && dst == writeptr + (i * size):1);
4076 			ut_ad(encrypted ?
4077 			      src != dst && dst != writeptr + (i * size):1);
4078 
4079 			/* When tablespace is encrypted or compressed its
4080 			first page (i.e. page 0) is not encrypted or
4081 			compressed and there is no need to copy frame. */
4082 			if (encrypted && block->page.id.page_no() != 0) {
4083 				byte *local_frame = callback.get_frame(block);
4084 				ut_ad((writeptr + (i * size)) != local_frame);
4085 				memcpy((writeptr + (i * size)), local_frame, size);
4086 			}
4087 
4088 			if (frame_changed) {
4089 				if (block->page.zip.data) {
4090 					block->page.zip.data = dst;
4091 				} else {
4092 					block->frame = dst;
4093 				}
4094 			}
4095 
4096 			src =  io_buffer + (i * size);
4097 
4098 			if (page_compressed) {
4099 				updated = true;
4100 				if (ulint len = fil_page_compress(
4101 					    src,
4102 					    page_compress_buf,
4103 					    callback.get_space_flags(),
4104 					    512,/* FIXME: proper block size */
4105 					    encrypted)) {
4106 					/* FIXME: remove memcpy() */
4107 					memcpy(src, page_compress_buf, len);
4108 					memset(src + len, 0,
4109 					       srv_page_size - len);
4110 				}
4111 			}
4112 
4113 			/* Encrypt the page if encryption was used. */
4114 			if (encrypted && decrypted) {
4115 				byte *dest = writeptr + i * size;
4116 
4117 				byte* tmp = fil_encrypt_buf(
4118 					iter.crypt_data,
4119 					block->page.id.space(),
4120 					block->page.id.page_no(),
4121 					mach_read_from_8(src + FIL_PAGE_LSN),
4122 					src, block->zip_size(), dest,
4123 					full_crc32);
4124 
4125 				if (tmp == src) {
4126 					/* TODO: remove unnecessary memcpy's */
4127 					ut_ad(dest != src);
4128 					memcpy(dest, src, size);
4129 				}
4130 
4131 				updated = true;
4132 			}
4133 
4134 			/* Write checksum for the compressed full crc32 page.*/
4135 			if (full_crc32 && page_compressed) {
4136 				ut_ad(updated);
4137 				byte* dest = writeptr + i * size;
4138 				ut_d(bool comp = false);
4139 				ut_d(bool corrupt = false);
4140 				ulint size = buf_page_full_crc32_size(
4141 					dest,
4142 #ifdef UNIV_DEBUG
4143 					&comp, &corrupt
4144 #else
4145 					NULL, NULL
4146 #endif
4147 				);
4148 				ut_ad(!comp == (size == srv_page_size));
4149 				ut_ad(!corrupt);
4150 				mach_write_to_4(dest + (size - 4),
4151 						ut_crc32(dest, size - 4));
4152 			}
4153 		}
4154 
4155 		if (page_compressed && punch_hole) {
4156 			err = fil_import_compress_fwrite(
4157 				iter, full_crc32, write_request, offset,
4158 				writeptr, n_bytes, !updated);
4159 
4160 			if (err != DB_SUCCESS) {
4161 				punch_hole = false;
4162 				if (updated) {
4163 					goto normal_write;
4164 				}
4165 			}
4166 		} else if (updated) {
4167 			/* A page was updated in the set, write back to disk. */
4168 normal_write:
4169 			err = os_file_write(
4170 				write_request, iter.filepath, iter.file,
4171 				writeptr, offset, n_bytes);
4172 
4173 			if (err != DB_SUCCESS) {
4174 				goto func_exit;
4175 			}
4176 		}
4177 	}
4178 
4179 func_exit:
4180 	free(page_compress_buf);
4181 	return err;
4182 }
4183 
4184 /********************************************************************//**
4185 Iterate over all the pages in the tablespace.
4186 @param table - the table definiton in the server
4187 @param n_io_buffers - number of blocks to read and write together
4188 @param callback - functor that will do the page updates
4189 @return	DB_SUCCESS or error code */
4190 static
4191 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)4192 fil_tablespace_iterate(
4193 /*===================*/
4194 	dict_table_t*		table,
4195 	ulint			n_io_buffers,
4196 	AbstractCallback&	callback)
4197 {
4198 	dberr_t		err;
4199 	pfs_os_file_t	file;
4200 	char*		filepath;
4201 
4202 	ut_a(n_io_buffers > 0);
4203 	ut_ad(!srv_read_only_mode);
4204 
4205 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
4206 			return(DB_CORRUPTION););
4207 
4208 	/* Make sure the data_dir_path is set. */
4209 	dict_get_and_save_data_dir_path(table, false);
4210 
4211 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4212 		ut_a(table->data_dir_path);
4213 
4214 		filepath = fil_make_filepath(
4215 			table->data_dir_path, table->name.m_name, IBD, true);
4216 	} else {
4217 		filepath = fil_make_filepath(
4218 			NULL, table->name.m_name, IBD, false);
4219 	}
4220 
4221 	if (!filepath) {
4222 		return(DB_OUT_OF_MEMORY);
4223 	} else {
4224 		bool	success;
4225 
4226 		file = os_file_create_simple_no_error_handling(
4227 			innodb_data_file_key, filepath,
4228 			OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
4229 
4230 		if (!success) {
4231 			/* The following call prints an error message */
4232 			os_file_get_last_error(true);
4233 			ib::error() << "Trying to import a tablespace,"
4234 				" but could not open the tablespace file "
4235 				    << filepath;
4236 			ut_free(filepath);
4237 			return DB_TABLESPACE_NOT_FOUND;
4238 		} else {
4239 			err = DB_SUCCESS;
4240 		}
4241 	}
4242 
4243 	callback.set_file(filepath, file);
4244 
4245 	os_offset_t	file_size = os_file_get_size(file);
4246 	ut_a(file_size != (os_offset_t) -1);
4247 
4248 	/* Allocate a page to read in the tablespace header, so that we
4249 	can determine the page size and zip_size (if it is compressed).
4250 	We allocate an extra page in case it is a compressed table. One
4251 	page is to ensure alignement. */
4252 
4253 	void*	page_ptr = ut_malloc_nokey(3U << srv_page_size_shift);
4254 	byte*	page = static_cast<byte*>(ut_align(page_ptr, srv_page_size));
4255 
4256 	buf_block_t* block = reinterpret_cast<buf_block_t*>
4257 		(ut_zalloc_nokey(sizeof *block));
4258 	block->frame = page;
4259 	block->page.id = page_id_t(0, 0);
4260 	block->page.io_fix = BUF_IO_NONE;
4261 	block->page.buf_fix_count = 1;
4262 	block->page.state = BUF_BLOCK_FILE_PAGE;
4263 
4264 	/* Read the first page and determine the page and zip size. */
4265 
4266 	IORequest       request(IORequest::READ);
4267 	request.disable_partial_io_warnings();
4268 
4269 	err = os_file_read_no_error_handling(request, file, page, 0,
4270 					     srv_page_size, 0);
4271 
4272 	if (err == DB_SUCCESS) {
4273 		err = callback.init(file_size, block);
4274 	}
4275 
4276 	if (err == DB_SUCCESS) {
4277 		block->page.id = page_id_t(callback.get_space_id(), 0);
4278 		if (ulint zip_size = callback.get_zip_size()) {
4279 			page_zip_set_size(&block->page.zip, zip_size);
4280 			/* ROW_FORMAT=COMPRESSED is not optimised for block IO
4281 			for now. We do the IMPORT page by page. */
4282 			n_io_buffers = 1;
4283 		}
4284 
4285 		fil_iterator_t	iter;
4286 
4287 		/* read (optional) crypt data */
4288 		iter.crypt_data = fil_space_read_crypt_data(
4289 			callback.get_zip_size(), page);
4290 
4291 		/* If tablespace is encrypted, it needs extra buffers */
4292 		if (iter.crypt_data && n_io_buffers > 1) {
4293 			/* decrease io buffers so that memory
4294 			consumption will not double */
4295 			n_io_buffers /= 2;
4296 		}
4297 
4298 		iter.file = file;
4299 		iter.start = 0;
4300 		iter.end = file_size;
4301 		iter.filepath = filepath;
4302 		iter.file_size = file_size;
4303 		iter.n_io_buffers = n_io_buffers;
4304 
4305 		/* Add an extra page for compressed page scratch area. */
4306 		void*	io_buffer = ut_malloc_nokey(
4307 			(2 + iter.n_io_buffers) << srv_page_size_shift);
4308 
4309 		iter.io_buffer = static_cast<byte*>(
4310 			ut_align(io_buffer, srv_page_size));
4311 
4312 		void* crypt_io_buffer = NULL;
4313 		if (iter.crypt_data) {
4314 			crypt_io_buffer = ut_malloc_nokey(
4315 				(2 + iter.n_io_buffers)
4316 				<< srv_page_size_shift);
4317 			iter.crypt_io_buffer = static_cast<byte*>(
4318 				ut_align(crypt_io_buffer, srv_page_size));
4319 		}
4320 
4321 		if (block->page.zip.ssize) {
4322 			ut_ad(iter.n_io_buffers == 1);
4323 			block->frame = iter.io_buffer;
4324 			block->page.zip.data = block->frame + srv_page_size;
4325 		}
4326 
4327 		err = callback.run(iter, block);
4328 
4329 		if (iter.crypt_data) {
4330 			fil_space_destroy_crypt_data(&iter.crypt_data);
4331 		}
4332 
4333 		ut_free(crypt_io_buffer);
4334 		ut_free(io_buffer);
4335 	}
4336 
4337 	if (err == DB_SUCCESS) {
4338 		ib::info() << "Sync to disk";
4339 
4340 		if (!os_file_flush(file)) {
4341 			ib::info() << "os_file_flush() failed!";
4342 			err = DB_IO_ERROR;
4343 		} else {
4344 			ib::info() << "Sync to disk - done!";
4345 		}
4346 	}
4347 
4348 	os_file_close(file);
4349 
4350 	ut_free(page_ptr);
4351 	ut_free(filepath);
4352 	ut_free(block);
4353 
4354 	return(err);
4355 }
4356 
4357 /*****************************************************************//**
4358 Imports a tablespace. The space id in the .ibd file must match the space id
4359 of the table in the data dictionary.
4360 @return error code or DB_SUCCESS */
4361 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)4362 row_import_for_mysql(
4363 /*=================*/
4364 	dict_table_t*	table,		/*!< in/out: table */
4365 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL */
4366 {
4367 	dberr_t		err;
4368 	trx_t*		trx;
4369 	ib_uint64_t	autoinc = 0;
4370 	char*		filepath = NULL;
4371 	ulint		space_flags MY_ATTRIBUTE((unused));
4372 
4373 	/* The caller assured that this is not read_only_mode and that no
4374 	temorary tablespace is being imported. */
4375 	ut_ad(!srv_read_only_mode);
4376 	ut_ad(!table->is_temporary());
4377 
4378 	ut_ad(table->space_id);
4379 	ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
4380 	ut_ad(prebuilt->trx);
4381 	ut_ad(!table->is_readable());
4382 
4383 	ibuf_delete_for_discarded_space(table->space_id);
4384 
4385 	trx_start_if_not_started(prebuilt->trx, true);
4386 
4387 	trx = trx_create();
4388 
4389 	/* So that the table is not DROPped during recovery. */
4390 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4391 
4392 	trx_start_if_not_started(trx, true);
4393 
4394 	/* So that we can send error messages to the user. */
4395 	trx->mysql_thd = prebuilt->trx->mysql_thd;
4396 
4397 	/* Ensure that the table will be dropped by trx_rollback_active()
4398 	in case of a crash. */
4399 
4400 	trx->table_id = table->id;
4401 
4402 	/* Assign an undo segment for the transaction, so that the
4403 	transaction will be recovered after a crash. */
4404 
4405 	/* TODO: Do not write any undo log for the IMPORT cleanup. */
4406 	{
4407 		mtr_t mtr;
4408 		mtr.start();
4409 		trx_undo_assign(trx, &err, &mtr);
4410 		mtr.commit();
4411 	}
4412 
4413 	DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4414 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4415 
4416 	if (err != DB_SUCCESS) {
4417 
4418 		return(row_import_cleanup(prebuilt, trx, err));
4419 
4420 	} else if (trx->rsegs.m_redo.undo == 0) {
4421 
4422 		err = DB_TOO_MANY_CONCURRENT_TRXS;
4423 		return(row_import_cleanup(prebuilt, trx, err));
4424 	}
4425 
4426 	prebuilt->trx->op_info = "read meta-data file";
4427 
4428 	/* Prevent DDL operations while we are checking. */
4429 
4430 	rw_lock_s_lock(&dict_sys.latch);
4431 
4432 	row_import	cfg;
4433 
4434 	err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4435 
4436 	/* Check if the table column definitions match the contents
4437 	of the config file. */
4438 
4439 	if (err == DB_SUCCESS) {
4440 
4441 		if (dberr_t err = handle_instant_metadata(table, cfg)) {
4442 			rw_lock_s_unlock(&dict_sys.latch);
4443 			return row_import_error(prebuilt, trx, err);
4444 		}
4445 
4446 		/* We have a schema file, try and match it with our
4447 		data dictionary. */
4448 
4449 		err = cfg.match_schema(trx->mysql_thd);
4450 
4451 		/* Update index->page and SYS_INDEXES.PAGE_NO to match the
4452 		B-tree root page numbers in the tablespace. Use the index
4453 		name from the .cfg file to find match. */
4454 
4455 		if (err == DB_SUCCESS) {
4456 			cfg.set_root_by_name();
4457 			autoinc = cfg.m_autoinc;
4458 		}
4459 
4460 		rw_lock_s_unlock(&dict_sys.latch);
4461 
4462 		DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4463 				err = DB_TOO_MANY_CONCURRENT_TRXS;);
4464 
4465 	} else if (cfg.m_missing) {
4466 
4467 		rw_lock_s_unlock(&dict_sys.latch);
4468 
4469 		/* We don't have a schema file, we will have to discover
4470 		the index root pages from the .ibd file and skip the schema
4471 		matching step. */
4472 
4473 		ut_a(err == DB_FAIL);
4474 
4475 		cfg.m_zip_size = 0;
4476 
4477 		if (UT_LIST_GET_LEN(table->indexes) > 1) {
4478 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4479 				ER_INTERNAL_ERROR,
4480 				"Drop all secondary indexes before importing "
4481 				"table %s when .cfg file is missing.",
4482 				table->name.m_name);
4483 			err = DB_ERROR;
4484 			return row_import_error(prebuilt, trx, err);
4485 		}
4486 
4487 		FetchIndexRootPages	fetchIndexRootPages(table, trx);
4488 
4489 		err = fil_tablespace_iterate(
4490 			table, IO_BUFFER_SIZE(srv_page_size),
4491 			fetchIndexRootPages);
4492 
4493 		if (err == DB_SUCCESS) {
4494 
4495 			err = fetchIndexRootPages.build_row_import(&cfg);
4496 
4497 			/* Update index->page and SYS_INDEXES.PAGE_NO
4498 			to match the B-tree root page numbers in the
4499 			tablespace. */
4500 
4501 			if (err == DB_SUCCESS) {
4502 				err = cfg.set_root_by_heuristic();
4503 
4504 				if (err == DB_SUCCESS) {
4505 					if (dberr_t err =
4506 					    handle_instant_metadata(table,
4507 								    cfg)) {
4508 						return row_import_error(
4509 							prebuilt, trx, err);
4510 					}
4511 				}
4512 			}
4513 		}
4514 
4515 		space_flags = fetchIndexRootPages.get_space_flags();
4516 
4517 	} else {
4518 		rw_lock_s_unlock(&dict_sys.latch);
4519 	}
4520 
4521 	if (err != DB_SUCCESS) {
4522 		return(row_import_error(prebuilt, trx, err));
4523 	}
4524 
4525 	prebuilt->trx->op_info = "importing tablespace";
4526 
4527 	ib::info() << "Phase I - Update all pages";
4528 
4529 	/* Iterate over all the pages and do the sanity checking and
4530 	the conversion required to import the tablespace. */
4531 
4532 	PageConverter	converter(&cfg, table->space_id, trx);
4533 
4534 	/* Set the IO buffer size in pages. */
4535 
4536 	err = fil_tablespace_iterate(
4537 		table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size
4538 				      : srv_page_size), converter);
4539 
4540 	DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4541 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4542 #ifdef BTR_CUR_HASH_ADAPT
4543 	/* On DISCARD TABLESPACE, we did not drop any adaptive hash
4544 	index entries. If we replaced the discarded tablespace with a
4545 	smaller one here, there could still be some adaptive hash
4546 	index entries that point to cached garbage pages in the buffer
4547 	pool, because PageConverter::operator() only evicted those
4548 	pages that were replaced by the imported pages. We must
4549 	detach any remaining adaptive hash index entries, because the
4550 	adaptive hash index must be a subset of the table contents;
4551 	false positives are not tolerated. */
4552 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4553 	     index = UT_LIST_GET_NEXT(indexes, index)) {
4554 		index = index->clone_if_needed();
4555 	}
4556 #endif /* BTR_CUR_HASH_ADAPT */
4557 
4558 	if (err != DB_SUCCESS) {
4559 		char	table_name[MAX_FULL_NAME_LEN + 1];
4560 
4561 		innobase_format_name(
4562 			table_name, sizeof(table_name),
4563 			table->name.m_name);
4564 
4565 		if (err != DB_DECRYPTION_FAILED) {
4566 
4567 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4568 				ER_INTERNAL_ERROR,
4569 			"Cannot reset LSNs in table %s : %s",
4570 				table_name, ut_strerr(err));
4571 		}
4572 
4573 		return(row_import_cleanup(prebuilt, trx, err));
4574 	}
4575 
4576 	row_mysql_lock_data_dictionary(trx);
4577 
4578 	/* If the table is stored in a remote tablespace, we need to
4579 	determine that filepath from the link file and system tables.
4580 	Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4581 	dict_get_and_save_data_dir_path(table, true);
4582 
4583 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4584 		ut_a(table->data_dir_path);
4585 
4586 		filepath = fil_make_filepath(
4587 			table->data_dir_path, table->name.m_name, IBD, true);
4588 	} else {
4589 		filepath = fil_make_filepath(
4590 			NULL, table->name.m_name, IBD, false);
4591 	}
4592 
4593 	DBUG_EXECUTE_IF(
4594 		"ib_import_OOM_15",
4595 		ut_free(filepath);
4596 		filepath = NULL;
4597 	);
4598 
4599 	if (filepath == NULL) {
4600 		row_mysql_unlock_data_dictionary(trx);
4601 		return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4602 	}
4603 
4604 	/* Open the tablespace so that we can access via the buffer pool.
4605 	We set the 2nd param (fix_dict = true) here because we already
4606 	have an x-lock on dict_sys.latch and dict_sys.mutex.
4607 	The tablespace is initially opened as a temporary one, because
4608 	we will not be writing any redo log for it before we have invoked
4609 	fil_space_t::set_imported() to declare it a persistent tablespace. */
4610 
4611 	ulint	fsp_flags = dict_tf_to_fsp_flags(table->flags);
4612 
4613 	table->space = fil_ibd_open(
4614 		true, true, FIL_TYPE_IMPORT, table->space_id,
4615 		fsp_flags, table->name, filepath, &err);
4616 
4617 	ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4618 	DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4619 			err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4620 
4621 	if (!table->space) {
4622 		row_mysql_unlock_data_dictionary(trx);
4623 
4624 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4625 			ER_GET_ERRMSG,
4626 			err, ut_strerr(err), filepath);
4627 
4628 		ut_free(filepath);
4629 
4630 		return(row_import_cleanup(prebuilt, trx, err));
4631 	}
4632 
4633 	row_mysql_unlock_data_dictionary(trx);
4634 
4635 	ut_free(filepath);
4636 
4637 	err = ibuf_check_bitmap_on_import(trx, table->space);
4638 
4639 	DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4640 
4641 	if (err != DB_SUCCESS) {
4642 		return(row_import_cleanup(prebuilt, trx, err));
4643 	}
4644 
4645 	/* The first index must always be the clustered index. */
4646 
4647 	dict_index_t*	index = dict_table_get_first_index(table);
4648 
4649 	if (!dict_index_is_clust(index)) {
4650 		return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4651 	}
4652 
4653 	/* Update the Btree segment headers for index node and
4654 	leaf nodes in the root page. Set the new space id. */
4655 
4656 	err = btr_root_adjust_on_import(index);
4657 
4658 	DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4659 			err = DB_CORRUPTION;);
4660 
4661 	if (err != DB_SUCCESS) {
4662 		return(row_import_error(prebuilt, trx, err));
4663 	} else if (cfg.requires_purge(index->name)) {
4664 
4665 		/* Purge any delete-marked records that couldn't be
4666 		purged during the page conversion phase from the
4667 		cluster index. */
4668 
4669 		IndexPurge	purge(trx, index);
4670 
4671 		trx->op_info = "cluster: purging delete marked records";
4672 
4673 		err = purge.garbage_collect();
4674 
4675 		trx->op_info = "";
4676 	}
4677 
4678 	DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4679 
4680 	if (err != DB_SUCCESS) {
4681 		return(row_import_error(prebuilt, trx, err));
4682 	}
4683 
4684 	/* For secondary indexes, purge any records that couldn't be purged
4685 	during the page conversion phase. */
4686 
4687 	err = row_import_adjust_root_pages_of_secondary_indexes(
4688 		trx, table, cfg);
4689 
4690 	DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4691 			err = DB_CORRUPTION;);
4692 
4693 	if (err != DB_SUCCESS) {
4694 		return(row_import_error(prebuilt, trx, err));
4695 	}
4696 
4697 	/* Ensure that the next available DB_ROW_ID is not smaller than
4698 	any DB_ROW_ID stored in the table. */
4699 
4700 	if (prebuilt->clust_index_was_generated) {
4701 		row_import_set_sys_max_row_id(prebuilt, table);
4702 	}
4703 
4704 	ib::info() << "Phase III - Flush changes to disk";
4705 
4706 	/* Ensure that all pages dirtied during the IMPORT make it to disk.
4707 	The only dirty pages generated should be from the pessimistic purge
4708 	of delete marked records that couldn't be purged in Phase I. */
4709 
4710 	{
4711 		FlushObserver observer(prebuilt->table->space, trx, NULL);
4712 		buf_LRU_flush_or_remove_pages(prebuilt->table->space_id,
4713 					      &observer);
4714 
4715 		if (observer.is_interrupted()) {
4716 			ib::info() << "Phase III - Flush interrupted";
4717 			return(row_import_error(prebuilt, trx,
4718 						DB_INTERRUPTED));
4719 		}
4720 	}
4721 
4722 	ib::info() << "Phase IV - Flush complete";
4723 	prebuilt->table->space->set_imported();
4724 
4725 	/* The dictionary latches will be released in in row_import_cleanup()
4726 	after the transaction commit, for both success and error. */
4727 
4728 	row_mysql_lock_data_dictionary(trx);
4729 
4730 	/* Update the root pages of the table's indexes. */
4731 	err = row_import_update_index_root(trx, table, false);
4732 
4733 	if (err != DB_SUCCESS) {
4734 		return(row_import_error(prebuilt, trx, err));
4735 	}
4736 
4737 	err = row_import_update_discarded_flag(trx, table->id, false);
4738 
4739 	if (err != DB_SUCCESS) {
4740 		return(row_import_error(prebuilt, trx, err));
4741 	}
4742 
4743 	table->file_unreadable = false;
4744 	table->flags2 &= ~DICT_TF2_DISCARDED;
4745 
4746 	/* Set autoinc value read from .cfg file, if one was specified.
4747 	Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4748 	if (autoinc) {
4749 		ib::info() << table->name << " autoinc value set to "
4750 			<< autoinc;
4751 
4752 		table->autoinc = autoinc--;
4753 		btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4754 	}
4755 
4756 	return(row_import_cleanup(prebuilt, trx, err));
4757 }
4758