1 /*****************************************************************************
2 
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23 
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26 
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #include "row0row.h"
44 #ifdef HAVE_LZO
45 #include "lzo/lzo1x.h"
46 #endif
47 #ifdef HAVE_SNAPPY
48 #include "snappy-c.h"
49 #endif
50 
51 #include "scope.h"
52 
53 #include <vector>
54 
55 #ifdef HAVE_MY_AES_H
56 #include <my_aes.h>
57 #endif
58 
59 using st_::span;
60 
61 /** The size of the buffer to use for IO.
62 @param n physical page size
63 @return number of pages */
64 #define IO_BUFFER_SIZE(n)	((1024 * 1024) / (n))
65 
66 /** For gathering stats on records during phase I */
67 struct row_stats_t {
68 	ulint		m_n_deleted;		/*!< Number of deleted records
69 						found in the index */
70 
71 	ulint		m_n_purged;		/*!< Number of records purged
72 						optimisatically */
73 
74 	ulint		m_n_rows;		/*!< Number of rows */
75 
76 	ulint		m_n_purge_failed;	/*!< Number of deleted rows
77 						that could not be purged */
78 };
79 
80 /** Index information required by IMPORT. */
81 struct row_index_t {
82 	index_id_t	m_id;			/*!< Index id of the table
83 						in the exporting server */
84 	byte*		m_name;			/*!< Index name */
85 
86 	ulint		m_space;		/*!< Space where it is placed */
87 
88 	ulint		m_page_no;		/*!< Root page number */
89 
90 	ulint		m_type;			/*!< Index type */
91 
92 	ulint		m_trx_id_offset;	/*!< Relevant only for clustered
93 						indexes, offset of transaction
94 						id system column */
95 
96 	ulint		m_n_user_defined_cols;	/*!< User defined columns */
97 
98 	ulint		m_n_uniq;		/*!< Number of columns that can
99 						uniquely identify the row */
100 
101 	ulint		m_n_nullable;		/*!< Number of nullable
102 						columns */
103 
104 	ulint		m_n_fields;		/*!< Total number of fields */
105 
106 	dict_field_t*	m_fields;		/*!< Index fields */
107 
108 	const dict_index_t*
109 			m_srv_index;		/*!< Index instance in the
110 						importing server */
111 
112 	row_stats_t	m_stats;		/*!< Statistics gathered during
113 						the import phase */
114 
115 };
116 
117 /** Meta data required by IMPORT. */
118 struct row_import {
row_importrow_import119 	row_import() UNIV_NOTHROW
120 		:
121 		m_table(NULL),
122 		m_version(0),
123 		m_hostname(NULL),
124 		m_table_name(NULL),
125 		m_autoinc(0),
126 		m_zip_size(0),
127 		m_flags(0),
128 		m_n_cols(0),
129 		m_cols(NULL),
130 		m_col_names(NULL),
131 		m_n_indexes(0),
132 		m_indexes(NULL),
133 		m_missing(true) { }
134 
135 	~row_import() UNIV_NOTHROW;
136 
137 	/** Find the index entry in in the indexes array.
138 	@param name index name
139 	@return instance if found else 0. */
140 	row_index_t* get_index(const char* name) const UNIV_NOTHROW;
141 
142 	/** Get the number of rows in the index.
143 	@param name index name
144 	@return number of rows (doesn't include delete marked rows). */
145 	ulint	get_n_rows(const char* name) const UNIV_NOTHROW;
146 
147 	/** Find the ordinal value of the column name in the cfg table columns.
148 	@param name of column to look for.
149 	@return ULINT_UNDEFINED if not found. */
150 	ulint find_col(const char* name) const UNIV_NOTHROW;
151 
152 	/** Get the number of rows for which purge failed during the
153 	convert phase.
154 	@param name index name
155 	@return number of rows for which purge failed. */
156 	ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
157 
158 	/** Check if the index is clean. ie. no delete-marked records
159 	@param name index name
160 	@return true if index needs to be purged. */
requires_purgerow_import161 	bool requires_purge(const char* name) const UNIV_NOTHROW
162 	{
163 		return(get_n_purge_failed(name) > 0);
164 	}
165 
166 	/** Set the index root <space, pageno> using the index name */
167 	void set_root_by_name() UNIV_NOTHROW;
168 
169 	/** Set the index root <space, pageno> using a heuristic
170 	@return DB_SUCCESS or error code */
171 	dberr_t set_root_by_heuristic() UNIV_NOTHROW;
172 
173 	/** Check if the index schema that was read from the .cfg file
174 	matches the in memory index definition.
175 	Note: It will update row_import_t::m_srv_index to map the meta-data
176 	read from the .cfg file to the server index instance.
177 	@return DB_SUCCESS or error code. */
178 	dberr_t match_index_columns(
179 		THD*			thd,
180 		const dict_index_t*	index) UNIV_NOTHROW;
181 
182 	/** Check if the table schema that was read from the .cfg file
183 	matches the in memory table definition.
184 	@param thd MySQL session variable
185 	@return DB_SUCCESS or error code. */
186 	dberr_t match_table_columns(
187 		THD*			thd) UNIV_NOTHROW;
188 
189 	/** Check if the table (and index) schema that was read from the
190 	.cfg file matches the in memory table definition.
191 	@param thd MySQL session variable
192 	@return DB_SUCCESS or error code. */
193 	dberr_t match_schema(
194 		THD*			thd) UNIV_NOTHROW;
195 
196 	dberr_t match_flags(THD *thd) const ;
197 
198 
199 	dict_table_t*	m_table;		/*!< Table instance */
200 
201 	ulint		m_version;		/*!< Version of config file */
202 
203 	byte*		m_hostname;		/*!< Hostname where the
204 						tablespace was exported */
205 	byte*		m_table_name;		/*!< Exporting instance table
206 						name */
207 
208 	ib_uint64_t	m_autoinc;		/*!< Next autoinc value */
209 
210 	ulint		m_zip_size;		/*!< ROW_FORMAT=COMPRESSED
211 						page size, or 0 */
212 
213 	ulint		m_flags;		/*!< Table flags */
214 
215 	ulint		m_n_cols;		/*!< Number of columns in the
216 						meta-data file */
217 
218 	dict_col_t*	m_cols;			/*!< Column data */
219 
220 	byte**		m_col_names;		/*!< Column names, we store the
221 						column naems separately becuase
222 						there is no field to store the
223 						value in dict_col_t */
224 
225 	ulint		m_n_indexes;		/*!< Number of indexes,
226 						including clustered index */
227 
228 	row_index_t*	m_indexes;		/*!< Index meta data */
229 
230 	bool		m_missing;		/*!< true if a .cfg file was
231 						found and was readable */
232 };
233 
234 struct fil_iterator_t {
235 	pfs_os_file_t	file;			/*!< File handle */
236 	const char*	filepath;		/*!< File path name */
237 	os_offset_t	start;			/*!< From where to start */
238 	os_offset_t	end;			/*!< Where to stop */
239 	os_offset_t	file_size;		/*!< File size in bytes */
240 	ulint		n_io_buffers;		/*!< Number of pages to use
241 						for IO */
242 	byte*		io_buffer;		/*!< Buffer to use for IO */
243 	fil_space_crypt_t *crypt_data;		/*!< Crypt data (if encrypted) */
244 	byte*           crypt_io_buffer;        /*!< IO buffer when encrypted */
245 };
246 
247 /** Use the page cursor to iterate over records in a block. */
248 class RecIterator {
249 public:
250 	/** Default constructor */
RecIterator()251 	RecIterator() UNIV_NOTHROW
252 	{
253 		memset(&m_cur, 0x0, sizeof(m_cur));
254 		/* Make page_cur_delete_rec() happy. */
255 		m_mtr.start();
256 		m_mtr.set_log_mode(MTR_LOG_NO_REDO);
257 	}
258 
259 	/** Position the cursor on the first user record. */
open(buf_block_t * block)260 	void	open(buf_block_t* block) UNIV_NOTHROW
261 	{
262 		page_cur_set_before_first(block, &m_cur);
263 
264 		if (!end()) {
265 			next();
266 		}
267 	}
268 
269 	/** Move to the next record. */
next()270 	void	next() UNIV_NOTHROW
271 	{
272 		page_cur_move_to_next(&m_cur);
273 	}
274 
275 	/**
276 	@return the current record */
current()277 	rec_t*	current() UNIV_NOTHROW
278 	{
279 		ut_ad(!end());
280 		return(page_cur_get_rec(&m_cur));
281 	}
282 
current_block() const283 	buf_block_t* current_block() const { return m_cur.block; }
284 
285 	/**
286 	@return true if cursor is at the end */
end()287 	bool	end() UNIV_NOTHROW
288 	{
289 		return(page_cur_is_after_last(&m_cur) == TRUE);
290 	}
291 
292 	/** Remove the current record
293 	@return true on success */
remove(const dict_index_t * index,rec_offs * offsets)294 	bool remove(
295 		const dict_index_t*	index,
296 		rec_offs*		offsets) UNIV_NOTHROW
297 	{
298 		ut_ad(page_is_leaf(m_cur.block->frame));
299 		/* We can't end up with an empty page unless it is root. */
300 		if (page_get_n_recs(m_cur.block->frame) <= 1) {
301 			return(false);
302 		}
303 
304 		if (!rec_offs_any_extern(offsets)
305 		    && m_cur.block->page.id().page_no() != index->page
306 		    && ((page_get_data_size(m_cur.block->frame)
307 			 - rec_offs_size(offsets)
308 			 < BTR_CUR_PAGE_COMPRESS_LIMIT(index))
309 			|| !page_has_siblings(m_cur.block->frame)
310 			|| (page_get_n_recs(m_cur.block->frame) < 2))) {
311 			return false;
312 		}
313 
314 #ifdef UNIV_ZIP_DEBUG
315 		page_zip_des_t* page_zip = buf_block_get_page_zip(m_cur.block);
316 		ut_a(!page_zip || page_zip_validate(
317 			     page_zip, m_cur.block->frame, index));
318 #endif /* UNIV_ZIP_DEBUG */
319 
320 		page_cur_delete_rec(&m_cur, index, offsets, &m_mtr);
321 
322 #ifdef UNIV_ZIP_DEBUG
323 		ut_a(!page_zip || page_zip_validate(
324 			     page_zip, m_cur.block->frame, index));
325 #endif /* UNIV_ZIP_DEBUG */
326 
327 		return true;
328 	}
329 
330 private:
331 	page_cur_t	m_cur;
332 public:
333 	mtr_t		m_mtr;
334 };
335 
336 /** Class that purges delete marked records from indexes, both secondary
337 and cluster. It does a pessimistic delete. This should only be done if we
338 couldn't purge the delete marked reocrds during Phase I. */
339 class IndexPurge {
340 public:
341 	/** Constructor
342 	@param trx the user transaction covering the import tablespace
343 	@param index to be imported
344 	@param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)345 	IndexPurge(
346 		trx_t*		trx,
347 		dict_index_t*	index) UNIV_NOTHROW
348 		:
349 		m_trx(trx),
350 		m_index(index),
351 		m_n_rows(0)
352 	{
353 		ib::info() << "Phase II - Purge records from index "
354 			<< index->name;
355 	}
356 
357 	/** Descructor */
~IndexPurge()358 	~IndexPurge() UNIV_NOTHROW { }
359 
360 	/** Purge delete marked records.
361 	@return DB_SUCCESS or error code. */
362 	dberr_t	garbage_collect() UNIV_NOTHROW;
363 
364 	/** The number of records that are not delete marked.
365 	@return total records in the index after purge */
get_n_rows() const366 	ulint	get_n_rows() const UNIV_NOTHROW
367 	{
368 		return(m_n_rows);
369 	}
370 
371 private:
372 	/** Begin import, position the cursor on the first record. */
373 	void	open() UNIV_NOTHROW;
374 
375 	/** Close the persistent curosr and commit the mini-transaction. */
376 	void	close() UNIV_NOTHROW;
377 
378 	/** Position the cursor on the next record.
379 	@return DB_SUCCESS or error code */
380 	dberr_t	next() UNIV_NOTHROW;
381 
382 	/** Store the persistent cursor position and reopen the
383 	B-tree cursor in BTR_MODIFY_TREE mode, because the
384 	tree structure may be changed during a pessimistic delete. */
385 	void	purge_pessimistic_delete() UNIV_NOTHROW;
386 
387 	/** Purge delete-marked records.
388 	@param offsets current row offsets. */
389 	void	purge() UNIV_NOTHROW;
390 
391 protected:
392 	// Disable copying
393 	IndexPurge();
394 	IndexPurge(const IndexPurge&);
395 	IndexPurge &operator=(const IndexPurge&);
396 
397 private:
398 	trx_t*			m_trx;		/*!< User transaction */
399 	mtr_t			m_mtr;		/*!< Mini-transaction */
400 	btr_pcur_t		m_pcur;		/*!< Persistent cursor */
401 	dict_index_t*		m_index;	/*!< Index to be processed */
402 	ulint			m_n_rows;	/*!< Records in index */
403 };
404 
405 /** Functor that is called for each physical page that is read from the
406 tablespace file.  */
407 class AbstractCallback
408 {
409 public:
410 	/** Constructor
411 	@param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)412 	AbstractCallback(trx_t* trx, ulint space_id)
413 		:
414 		m_zip_size(0),
415 		m_trx(trx),
416 		m_space(space_id),
417 		m_xdes(),
418 		m_xdes_page_no(ULINT_UNDEFINED),
419 		m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
420 
421 	/** Free any extent descriptor instance */
~AbstractCallback()422 	virtual ~AbstractCallback()
423 	{
424 		UT_DELETE_ARRAY(m_xdes);
425 	}
426 
427 	/** Determine the page size to use for traversing the tablespace
428 	@param file_size size of the tablespace file in bytes
429 	@param block contents of the first page in the tablespace file.
430 	@retval DB_SUCCESS or error code. */
431 	virtual dberr_t init(
432 		os_offset_t		file_size,
433 		const buf_block_t*	block) UNIV_NOTHROW;
434 
435 	/** @return true if compressed table. */
is_compressed_table() const436 	bool is_compressed_table() const UNIV_NOTHROW
437 	{
438 		return get_zip_size();
439 	}
440 
441 	/** @return the tablespace flags */
get_space_flags() const442 	ulint get_space_flags() const
443 	{
444 		return(m_space_flags);
445 	}
446 
447 	/**
448 	Set the name of the physical file and the file handle that is used
449 	to open it for the file that is being iterated over.
450 	@param filename the physical name of the tablespace file
451 	@param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)452 	void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
453 	{
454 		m_file = file;
455 		m_filepath = filename;
456 	}
457 
get_zip_size() const458 	ulint get_zip_size() const { return m_zip_size; }
physical_size() const459 	ulint physical_size() const
460 	{
461 		return m_zip_size ? m_zip_size : srv_page_size;
462 	}
463 
filename() const464 	const char* filename() const { return m_filepath; }
465 
466 	/**
467 	Called for every page in the tablespace. If the page was not
468 	updated then its state must be set to BUF_PAGE_NOT_USED. For
469 	compressed tables the page descriptor memory will be at offset:
470 		block->frame + srv_page_size;
471 	@param block block read from file, note it is not from the buffer pool
472 	@retval DB_SUCCESS or error code. */
473 	virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
474 
475 	/** @return the tablespace identifier */
get_space_id() const476 	ulint get_space_id() const { return m_space; }
477 
is_interrupted() const478 	bool is_interrupted() const { return trx_is_interrupted(m_trx); }
479 
480 	/**
481 	Get the data page depending on the table type, compressed or not.
482 	@param block - block read from disk
483 	@retval the buffer frame */
get_frame(const buf_block_t * block)484 	static byte* get_frame(const buf_block_t* block)
485 	{
486 		return block->page.zip.data
487 			? block->page.zip.data : block->frame;
488 	}
489 
490 	/** Invoke the functionality for the callback */
491 	virtual dberr_t run(const fil_iterator_t& iter,
492 			    buf_block_t* block) UNIV_NOTHROW = 0;
493 
494 protected:
495 	/** Get the physical offset of the extent descriptor within the page.
496 	@param page_no page number of the extent descriptor
497 	@param page contents of the page containing the extent descriptor.
498 	@return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const499 	const xdes_t* xdes(
500 		ulint		page_no,
501 		const page_t*	page) const UNIV_NOTHROW
502 	{
503 		ulint	offset;
504 
505 		offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
506 
507 		return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
508 	}
509 
510 	/** Set the current page directory (xdes). If the extent descriptor is
511 	marked as free then free the current extent descriptor and set it to
512 	0. This implies that all pages that are covered by this extent
513 	descriptor are also freed.
514 
515 	@param page_no offset of page within the file
516 	@param page page contents
517 	@return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)518 	dberr_t	set_current_xdes(
519 		ulint		page_no,
520 		const page_t*	page) UNIV_NOTHROW
521 	{
522 		m_xdes_page_no = page_no;
523 
524 		UT_DELETE_ARRAY(m_xdes);
525 		m_xdes = NULL;
526 
527 		if (mach_read_from_4(XDES_ARR_OFFSET + XDES_STATE + page)
528 		    != XDES_FREE) {
529 			const ulint physical_size = m_zip_size
530 				? m_zip_size : srv_page_size;
531 
532 			m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t, physical_size);
533 
534 			/* Trigger OOM */
535 			DBUG_EXECUTE_IF(
536 				"ib_import_OOM_13",
537 				UT_DELETE_ARRAY(m_xdes);
538 				m_xdes = NULL;
539 			);
540 
541 			if (m_xdes == NULL) {
542 				return(DB_OUT_OF_MEMORY);
543 			}
544 
545 			memcpy(m_xdes, page, physical_size);
546 		}
547 
548 		return(DB_SUCCESS);
549 	}
550 
551 	/** Check if the page is marked as free in the extent descriptor.
552 	@param page_no page number to check in the extent descriptor.
553 	@return true if the page is marked as free */
is_free(uint32_t page_no) const554 	bool is_free(uint32_t page_no) const UNIV_NOTHROW
555 	{
556 		ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
557 		     == m_xdes_page_no);
558 
559 		if (m_xdes != 0) {
560 			const xdes_t*	xdesc = xdes(page_no, m_xdes);
561 			ulint		pos = page_no % FSP_EXTENT_SIZE;
562 
563 			return xdes_is_free(xdesc, pos);
564 		}
565 
566 		/* If the current xdes was free, the page must be free. */
567 		return(true);
568 	}
569 
570 protected:
571 	/** The ROW_FORMAT=COMPRESSED page size, or 0. */
572 	ulint			m_zip_size;
573 
574 	/** File handle to the tablespace */
575 	pfs_os_file_t		m_file;
576 
577 	/** Physical file path. */
578 	const char*		m_filepath;
579 
580 	/** Covering transaction. */
581 	trx_t*			m_trx;
582 
583 	/** Space id of the file being iterated over. */
584 	ulint			m_space;
585 
586 	/** Current size of the space in pages */
587 	ulint			m_size;
588 
589 	/** Current extent descriptor page */
590 	xdes_t*			m_xdes;
591 
592 	/** Physical page offset in the file of the extent descriptor */
593 	ulint			m_xdes_page_no;
594 
595 	/** Flags value read from the header page */
596 	ulint			m_space_flags;
597 };
598 
599 /** Determine the page size to use for traversing the tablespace
600 @param file_size size of the tablespace file in bytes
601 @param block contents of the first page in the tablespace file.
602 @retval DB_SUCCESS or error code. */
603 dberr_t
init(os_offset_t file_size,const buf_block_t * block)604 AbstractCallback::init(
605 	os_offset_t		file_size,
606 	const buf_block_t*	block) UNIV_NOTHROW
607 {
608 	const page_t*		page = block->frame;
609 
610 	m_space_flags = fsp_header_get_flags(page);
611 	if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
612 		ulint cflags = fsp_flags_convert_from_101(m_space_flags);
613 		if (cflags == ULINT_UNDEFINED) {
614 			return(DB_CORRUPTION);
615 		}
616 		m_space_flags = cflags;
617 	}
618 
619 	/* Clear the DATA_DIR flag, which is basically garbage. */
620 	m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
621 	m_zip_size = fil_space_t::zip_size(m_space_flags);
622 	const ulint logical_size = fil_space_t::logical_size(m_space_flags);
623 	const ulint physical_size = fil_space_t::physical_size(m_space_flags);
624 
625 	if (logical_size != srv_page_size) {
626 
627 		ib::error() << "Page size " << logical_size
628 			<< " of ibd file is not the same as the server page"
629 			" size " << srv_page_size;
630 
631 		return(DB_CORRUPTION);
632 
633 	} else if (file_size & (physical_size - 1)) {
634 
635 		ib::error() << "File size " << file_size << " is not a"
636 			" multiple of the page size "
637 			<< physical_size;
638 
639 		return(DB_CORRUPTION);
640 	}
641 
642 	m_size  = mach_read_from_4(page + FSP_SIZE);
643 	if (m_space == ULINT_UNDEFINED) {
644 		m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
645 					   + page);
646 	}
647 
648 	return set_current_xdes(0, page);
649 }
650 
651 /**
652 TODO: This can be made parallel trivially by chunking up the file
653 and creating a callback per thread.. Main benefit will be to use
654 multiple CPUs for checksums and compressed tables. We have to do
655 compressed tables block by block right now. Secondly we need to
656 decompress/compress and copy too much of data. These are
657 CPU intensive.
658 
659 Iterate over all the pages in the tablespace.
660 @param iter - Tablespace iterator
661 @param block - block to use for IO
662 @param callback - Callback to inspect and update page contents
663 @retval DB_SUCCESS or error code */
664 static dberr_t fil_iterate(
665 	const fil_iterator_t&	iter,
666 	buf_block_t*		block,
667 	AbstractCallback&	callback);
668 
669 /**
670 Try and determine the index root pages by checking if the next/prev
671 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
672 struct FetchIndexRootPages : public AbstractCallback {
673 
674 	/** Index information gathered from the .ibd file. */
675 	struct Index {
676 
IndexFetchIndexRootPages::Index677 		Index(index_id_t id, ulint page_no)
678 			:
679 			m_id(id),
680 			m_page_no(page_no) { }
681 
682 		index_id_t	m_id;		/*!< Index id */
683 		ulint		m_page_no;	/*!< Root page number */
684 	};
685 
686 	/** Constructor
687 	@param trx covering (user) transaction
688 	@param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages689 	FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
690 		:
691 		AbstractCallback(trx, ULINT_UNDEFINED),
692 		m_table(table), m_index(0, 0) UNIV_NOTHROW { }
693 
694 	/** Destructor */
~FetchIndexRootPagesFetchIndexRootPages695 	~FetchIndexRootPages() UNIV_NOTHROW override { }
696 
697 	/** Fetch the clustered index root page in the tablespace
698 	@param iter	Tablespace iterator
699 	@param block	Block to use for IO
700 	@retval DB_SUCCESS or error code */
701 	dberr_t run(const fil_iterator_t& iter,
702 		    buf_block_t* block) UNIV_NOTHROW override;
703 
704 	/** Called for each block as it is read from the file.
705 	@param block block to convert, it is not from the buffer pool.
706 	@retval DB_SUCCESS or error code. */
707 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
708 
709 	/** Update the import configuration that will be used to import
710 	the tablespace. */
711 	dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
712 
713 	/** Table definition in server. */
714 	const dict_table_t*	m_table;
715 
716 	/** Index information */
717 	Index			m_index;
718 };
719 
720 /** Called for each block as it is read from the file. Check index pages to
721 determine the exact row format. We can't get that from the tablespace
722 header flags alone.
723 
724 @param block block to convert, it is not from the buffer pool.
725 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)726 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
727 {
728 	if (is_interrupted()) return DB_INTERRUPTED;
729 
730 	const page_t*	page = get_frame(block);
731 
732 	m_index.m_id = btr_page_get_index_id(page);
733 	m_index.m_page_no = block->page.id().page_no();
734 
735 	/* Check that the tablespace flags match the table flags. */
736 	ulint expected = dict_tf_to_fsp_flags(m_table->flags);
737 	if (!fsp_flags_match(expected, m_space_flags)) {
738 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
739 			ER_TABLE_SCHEMA_MISMATCH,
740 			"Expected FSP_SPACE_FLAGS=0x%x, .ibd "
741 			"file contains 0x%x.",
742 			unsigned(expected),
743 			unsigned(m_space_flags));
744 		return(DB_CORRUPTION);
745 	}
746 
747 	if (!page_is_comp(block->frame) !=
748 	    !dict_table_is_comp(m_table)) {
749 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
750 			ER_TABLE_SCHEMA_MISMATCH,
751 			"ROW_FORMAT mismatch");
752 		return DB_CORRUPTION;
753 	}
754 
755 	return DB_SUCCESS;
756 }
757 
758 /**
759 Update the import configuration that will be used to import the tablespace.
760 @return error code or DB_SUCCESS */
761 dberr_t
build_row_import(row_import * cfg) const762 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
763 {
764 	ut_a(cfg->m_table == m_table);
765 	cfg->m_zip_size = m_zip_size;
766 	cfg->m_n_indexes = 1;
767 
768 	if (cfg->m_n_indexes == 0) {
769 
770 		ib::error() << "No B+Tree found in tablespace";
771 
772 		return(DB_CORRUPTION);
773 	}
774 
775 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
776 
777 	/* Trigger OOM */
778 	DBUG_EXECUTE_IF(
779 		"ib_import_OOM_11",
780 		UT_DELETE_ARRAY(cfg->m_indexes);
781 		cfg->m_indexes = NULL;
782 	);
783 
784 	if (cfg->m_indexes == NULL) {
785 		return(DB_OUT_OF_MEMORY);
786 	}
787 
788 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
789 
790 	row_index_t*	cfg_index = cfg->m_indexes;
791 
792 	char	name[BUFSIZ];
793 
794 	snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
795 
796 	ulint	len = strlen(name) + 1;
797 
798 	cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
799 
800 	/* Trigger OOM */
801 	DBUG_EXECUTE_IF(
802 		"ib_import_OOM_12",
803 		UT_DELETE_ARRAY(cfg_index->m_name);
804 		cfg_index->m_name = NULL;
805 	);
806 
807 	if (cfg_index->m_name == NULL) {
808 		return(DB_OUT_OF_MEMORY);
809 	}
810 
811 	memcpy(cfg_index->m_name, name, len);
812 
813 	cfg_index->m_id = m_index.m_id;
814 
815 	cfg_index->m_space = m_space;
816 
817 	cfg_index->m_page_no = m_index.m_page_no;
818 
819 	return(DB_SUCCESS);
820 }
821 
822 /* Functor that is called for each physical page that is read from the
823 tablespace file.
824 
825   1. Check each page for corruption.
826 
827   2. Update the space id and LSN on every page
828      * For the header page
829        - Validate the flags
830        - Update the LSN
831 
832   3. On Btree pages
833      * Set the index id
834      * Update the max trx id
835      * In a cluster index, update the system columns
836      * In a cluster index, update the BLOB ptr, set the space id
837      * Purge delete marked records, but only if they can be easily
838        removed from the page
839      * Keep a counter of number of rows, ie. non-delete-marked rows
840      * Keep a counter of number of delete marked rows
841      * Keep a counter of number of purge failure
842      * If a page is stamped with an index id that isn't in the .cfg file
843        we assume it is deleted and the page can be ignored.
844 
845    4. Set the page state to dirty so that it will be written to disk.
846 */
847 class PageConverter : public AbstractCallback {
848 public:
849 	/** Constructor
850 	@param cfg config of table being imported.
851 	@param space_id tablespace identifier
852 	@param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)853 	PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
854 		:
855 		AbstractCallback(trx, space_id),
856 		m_cfg(cfg),
857 		m_index(cfg->m_indexes),
858 		m_rec_iter(),
859 		m_offsets_(), m_offsets(m_offsets_),
860 		m_heap(0),
861 		m_cluster_index(dict_table_get_first_index(cfg->m_table))
862 	{
863 		rec_offs_init(m_offsets_);
864 	}
865 
~PageConverter()866 	~PageConverter() UNIV_NOTHROW override
867 	{
868 		if (m_heap != 0) {
869 			mem_heap_free(m_heap);
870 		}
871 	}
872 
run(const fil_iterator_t & iter,buf_block_t * block)873 	dberr_t run(const fil_iterator_t& iter,
874 		    buf_block_t* block) UNIV_NOTHROW override
875 	{
876 		return fil_iterate(iter, block, *this);
877 	}
878 
879 	/** Called for each block as it is read from the file.
880 	@param block block to convert, it is not from the buffer pool.
881 	@retval DB_SUCCESS or error code. */
882 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW override;
883 
884 private:
885 	/** Update the page, set the space id, max trx id and index id.
886 	@param block block read from file
887 	@param page_type type of the page
888 	@retval DB_SUCCESS or error code */
889 	dberr_t update_page(buf_block_t* block, uint16_t& page_type)
890 		UNIV_NOTHROW;
891 
892 	/** Update the space, index id, trx id.
893 	@param block block to convert
894 	@return DB_SUCCESS or error code */
895 	dberr_t	update_index_page(buf_block_t*	block) UNIV_NOTHROW;
896 
897 	/** Update the BLOB refrences and write UNDO log entries for
898 	rows that can't be purged optimistically.
899 	@param block block to update
900 	@retval DB_SUCCESS or error code */
901 	dberr_t	update_records(buf_block_t* block) UNIV_NOTHROW;
902 
903 	/** Validate the space flags and update tablespace header page.
904 	@param block block read from file, not from the buffer pool.
905 	@retval DB_SUCCESS or error code */
906 	dberr_t	update_header(buf_block_t* block) UNIV_NOTHROW;
907 
908 	/** Adjust the BLOB reference for a single column that is externally stored
909 	@param rec record to update
910 	@param offsets column offsets for the record
911 	@param i column ordinal value
912 	@return DB_SUCCESS or error code */
913 	dberr_t	adjust_cluster_index_blob_column(
914 		rec_t*		rec,
915 		const rec_offs*	offsets,
916 		ulint		i) UNIV_NOTHROW;
917 
918 	/** Adjusts the BLOB reference in the clustered index row for all
919 	externally stored columns.
920 	@param rec record to update
921 	@param offsets column offsets for the record
922 	@return DB_SUCCESS or error code */
923 	dberr_t	adjust_cluster_index_blob_columns(
924 		rec_t*		rec,
925 		const rec_offs*	offsets) UNIV_NOTHROW;
926 
927 	/** In the clustered index, adjist the BLOB pointers as needed.
928 	Also update the BLOB reference, write the new space id.
929 	@param rec record to update
930 	@param offsets column offsets for the record
931 	@return DB_SUCCESS or error code */
932 	dberr_t	adjust_cluster_index_blob_ref(
933 		rec_t*		rec,
934 		const rec_offs*	offsets) UNIV_NOTHROW;
935 
936 	/** Purge delete-marked records, only if it is possible to do
937 	so without re-organising the B+tree.
938 	@retval true if purged */
939 	bool purge() UNIV_NOTHROW;
940 
941 	/** Adjust the BLOB references and sys fields for the current record.
942 	@param rec record to update
943 	@param offsets column offsets for the record
944 	@return DB_SUCCESS or error code. */
945 	dberr_t	adjust_cluster_record(
946 		rec_t*			rec,
947 		const rec_offs*		offsets) UNIV_NOTHROW;
948 
949 	/** Find an index with the matching id.
950 	@return row_index_t* instance or 0 */
find_index(index_id_t id)951 	row_index_t* find_index(index_id_t id) UNIV_NOTHROW
952 	{
953 		row_index_t*	index = &m_cfg->m_indexes[0];
954 
955 		for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
956 			if (id == index->m_id) {
957 				return(index);
958 			}
959 		}
960 
961 		return(0);
962 
963 	}
964 private:
965 	/** Config for table that is being imported. */
966 	row_import*		m_cfg;
967 
968 	/** Current index whose pages are being imported */
969 	row_index_t*		m_index;
970 
971 	/** Iterator over records in a block */
972 	RecIterator		m_rec_iter;
973 
974 	/** Record offset */
975 	rec_offs		m_offsets_[REC_OFFS_NORMAL_SIZE];
976 
977 	/** Pointer to m_offsets_ */
978 	rec_offs*		m_offsets;
979 
980 	/** Memory heap for the record offsets */
981 	mem_heap_t*		m_heap;
982 
983 	/** Cluster index instance */
984 	dict_index_t*		m_cluster_index;
985 };
986 
987 /**
988 row_import destructor. */
~row_import()989 row_import::~row_import() UNIV_NOTHROW
990 {
991 	for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
992 		UT_DELETE_ARRAY(m_indexes[i].m_name);
993 
994 		if (m_indexes[i].m_fields == NULL) {
995 			continue;
996 		}
997 
998 		dict_field_t*	fields = m_indexes[i].m_fields;
999 		ulint		n_fields = m_indexes[i].m_n_fields;
1000 
1001 		for (ulint j = 0; j < n_fields; ++j) {
1002 			UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
1003 		}
1004 
1005 		UT_DELETE_ARRAY(fields);
1006 	}
1007 
1008 	for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
1009 		UT_DELETE_ARRAY(m_col_names[i]);
1010 	}
1011 
1012 	UT_DELETE_ARRAY(m_cols);
1013 	UT_DELETE_ARRAY(m_indexes);
1014 	UT_DELETE_ARRAY(m_col_names);
1015 	UT_DELETE_ARRAY(m_table_name);
1016 	UT_DELETE_ARRAY(m_hostname);
1017 }
1018 
1019 /** Find the index entry in in the indexes array.
1020 @param name index name
1021 @return instance if found else 0. */
1022 row_index_t*
get_index(const char * name) const1023 row_import::get_index(
1024 	const char*	name) const UNIV_NOTHROW
1025 {
1026 	for (ulint i = 0; i < m_n_indexes; ++i) {
1027 		const char*	index_name;
1028 		row_index_t*	index = &m_indexes[i];
1029 
1030 		index_name = reinterpret_cast<const char*>(index->m_name);
1031 
1032 		if (strcmp(index_name, name) == 0) {
1033 
1034 			return(index);
1035 		}
1036 	}
1037 
1038 	return(0);
1039 }
1040 
1041 /** Get the number of rows in the index.
1042 @param name index name
1043 @return number of rows (doesn't include delete marked rows). */
1044 ulint
get_n_rows(const char * name) const1045 row_import::get_n_rows(
1046 	const char*	name) const UNIV_NOTHROW
1047 {
1048 	const row_index_t*	index = get_index(name);
1049 
1050 	ut_a(name != 0);
1051 
1052 	return(index->m_stats.m_n_rows);
1053 }
1054 
1055 /** Get the number of rows for which purge failed uding the convert phase.
1056 @param name index name
1057 @return number of rows for which purge failed. */
1058 ulint
get_n_purge_failed(const char * name) const1059 row_import::get_n_purge_failed(
1060 	const char*	name) const UNIV_NOTHROW
1061 {
1062 	const row_index_t*	index = get_index(name);
1063 
1064 	ut_a(name != 0);
1065 
1066 	return(index->m_stats.m_n_purge_failed);
1067 }
1068 
1069 /** Find the ordinal value of the column name in the cfg table columns.
1070 @param name of column to look for.
1071 @return ULINT_UNDEFINED if not found. */
1072 ulint
find_col(const char * name) const1073 row_import::find_col(
1074 	const char*	name) const UNIV_NOTHROW
1075 {
1076 	for (ulint i = 0; i < m_n_cols; ++i) {
1077 		const char*	col_name;
1078 
1079 		col_name = reinterpret_cast<const char*>(m_col_names[i]);
1080 
1081 		if (strcmp(col_name, name) == 0) {
1082 			return(i);
1083 		}
1084 	}
1085 
1086 	return(ULINT_UNDEFINED);
1087 }
1088 
1089 /**
1090 Check if the index schema that was read from the .cfg file matches the
1091 in memory index definition.
1092 @return DB_SUCCESS or error code. */
1093 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1094 row_import::match_index_columns(
1095 	THD*			thd,
1096 	const dict_index_t*	index) UNIV_NOTHROW
1097 {
1098 	row_index_t*		cfg_index;
1099 	dberr_t			err = DB_SUCCESS;
1100 
1101 	cfg_index = get_index(index->name);
1102 
1103 	if (cfg_index == 0) {
1104 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1105 			ER_TABLE_SCHEMA_MISMATCH,
1106 			"Index %s not found in tablespace meta-data file.",
1107 			index->name());
1108 
1109 		return(DB_ERROR);
1110 	}
1111 
1112 	if (cfg_index->m_n_fields != index->n_fields) {
1113 
1114 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1115 			ER_TABLE_SCHEMA_MISMATCH,
1116 			"Index field count %u doesn't match"
1117 			" tablespace metadata file value " ULINTPF,
1118 			index->n_fields, cfg_index->m_n_fields);
1119 
1120 		return(DB_ERROR);
1121 	}
1122 
1123 	cfg_index->m_srv_index = index;
1124 
1125 	const dict_field_t*	field = index->fields;
1126 	const dict_field_t*	cfg_field = cfg_index->m_fields;
1127 
1128 	for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1129 
1130 		if (field->name() && cfg_field->name()
1131 		     && strcmp(field->name(), cfg_field->name()) != 0) {
1132 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1133 				ER_TABLE_SCHEMA_MISMATCH,
1134 				"Index field name %s doesn't match"
1135 				" tablespace metadata field name %s"
1136 				" for field position " ULINTPF,
1137 				field->name(), cfg_field->name(), i);
1138 
1139 			err = DB_ERROR;
1140 		}
1141 
1142 		if (cfg_field->prefix_len != field->prefix_len) {
1143 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1144 				ER_TABLE_SCHEMA_MISMATCH,
1145 				"Index %s field %s prefix len %u"
1146 				" doesn't match metadata file value %u",
1147 				index->name(), field->name(),
1148 				field->prefix_len, cfg_field->prefix_len);
1149 
1150 			err = DB_ERROR;
1151 		}
1152 
1153 		if (cfg_field->fixed_len != field->fixed_len) {
1154 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1155 				ER_TABLE_SCHEMA_MISMATCH,
1156 				"Index %s field %s fixed len %u"
1157 				" doesn't match metadata file value %u",
1158 				index->name(), field->name(),
1159 				field->fixed_len,
1160 				cfg_field->fixed_len);
1161 
1162 			err = DB_ERROR;
1163 		}
1164 	}
1165 
1166 	return(err);
1167 }
1168 
1169 /** Check if the table schema that was read from the .cfg file matches the
1170 in memory table definition.
1171 @param thd MySQL session variable
1172 @return DB_SUCCESS or error code. */
1173 dberr_t
match_table_columns(THD * thd)1174 row_import::match_table_columns(
1175 	THD*			thd) UNIV_NOTHROW
1176 {
1177 	dberr_t			err = DB_SUCCESS;
1178 	const dict_col_t*	col = m_table->cols;
1179 
1180 	for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1181 
1182 		const char*	col_name;
1183 		ulint		cfg_col_index;
1184 
1185 		col_name = dict_table_get_col_name(
1186 			m_table, dict_col_get_no(col));
1187 
1188 		cfg_col_index = find_col(col_name);
1189 
1190 		if (cfg_col_index == ULINT_UNDEFINED) {
1191 
1192 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1193 				 ER_TABLE_SCHEMA_MISMATCH,
1194 				 "Column %s not found in tablespace.",
1195 				 col_name);
1196 
1197 			err = DB_ERROR;
1198 		} else if (cfg_col_index != col->ind) {
1199 
1200 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1201 				ER_TABLE_SCHEMA_MISMATCH,
1202 				"Column %s ordinal value mismatch, it's at %u"
1203 				" in the table and " ULINTPF
1204 				" in the tablespace meta-data file",
1205 				col_name, col->ind, cfg_col_index);
1206 
1207 			err = DB_ERROR;
1208 		} else {
1209 			const dict_col_t*	cfg_col;
1210 
1211 			cfg_col = &m_cols[cfg_col_index];
1212 			ut_a(cfg_col->ind == cfg_col_index);
1213 
1214 			if (cfg_col->prtype != col->prtype) {
1215 				ib_errf(thd,
1216 					IB_LOG_LEVEL_ERROR,
1217 					ER_TABLE_SCHEMA_MISMATCH,
1218 					"Column %s precise type mismatch,"
1219 					" it's 0X%X in the table and 0X%X"
1220 					" in the tablespace meta file",
1221 					col_name, col->prtype, cfg_col->prtype);
1222 				err = DB_ERROR;
1223 			}
1224 
1225 			if (cfg_col->mtype != col->mtype) {
1226 				ib_errf(thd,
1227 					IB_LOG_LEVEL_ERROR,
1228 					ER_TABLE_SCHEMA_MISMATCH,
1229 					"Column %s main type mismatch,"
1230 					" it's 0X%X in the table and 0X%X"
1231 					" in the tablespace meta file",
1232 					col_name, col->mtype, cfg_col->mtype);
1233 				err = DB_ERROR;
1234 			}
1235 
1236 			if (cfg_col->len != col->len) {
1237 				ib_errf(thd,
1238 					IB_LOG_LEVEL_ERROR,
1239 					ER_TABLE_SCHEMA_MISMATCH,
1240 					"Column %s length mismatch,"
1241 					" it's %u in the table and %u"
1242 					" in the tablespace meta file",
1243 					col_name, col->len, cfg_col->len);
1244 				err = DB_ERROR;
1245 			}
1246 
1247 			if (cfg_col->mbminlen != col->mbminlen
1248 			    || cfg_col->mbmaxlen != col->mbmaxlen) {
1249 				ib_errf(thd,
1250 					IB_LOG_LEVEL_ERROR,
1251 					ER_TABLE_SCHEMA_MISMATCH,
1252 					"Column %s multi-byte len mismatch,"
1253 					" it's %u-%u in the table and %u-%u"
1254 					" in the tablespace meta file",
1255 					col_name, col->mbminlen, col->mbmaxlen,
1256 					cfg_col->mbminlen, cfg_col->mbmaxlen);
1257 				err = DB_ERROR;
1258 			}
1259 
1260 			if (cfg_col->ind != col->ind) {
1261 				ib_errf(thd,
1262 					IB_LOG_LEVEL_ERROR,
1263 					ER_TABLE_SCHEMA_MISMATCH,
1264 					"Column %s position mismatch,"
1265 					" it's %u in the table and %u"
1266 					" in the tablespace meta file",
1267 					col_name, col->ind, cfg_col->ind);
1268 				err = DB_ERROR;
1269 			}
1270 
1271 			if (cfg_col->ord_part != col->ord_part) {
1272 				ib_errf(thd,
1273 					IB_LOG_LEVEL_ERROR,
1274 					ER_TABLE_SCHEMA_MISMATCH,
1275 					"Column %s ordering mismatch,"
1276 					" it's %u in the table and %u"
1277 					" in the tablespace meta file",
1278 					col_name, col->ord_part,
1279 					cfg_col->ord_part);
1280 				err = DB_ERROR;
1281 			}
1282 
1283 			if (cfg_col->max_prefix != col->max_prefix) {
1284 				ib_errf(thd,
1285 					IB_LOG_LEVEL_ERROR,
1286 					ER_TABLE_SCHEMA_MISMATCH,
1287 					"Column %s max prefix mismatch"
1288 					" it's %u in the table and %u"
1289 					" in the tablespace meta file",
1290 					col_name, col->max_prefix,
1291 					cfg_col->max_prefix);
1292 				err = DB_ERROR;
1293 			}
1294 		}
1295 	}
1296 
1297 	return(err);
1298 }
1299 
match_flags(THD * thd) const1300 dberr_t row_import::match_flags(THD *thd) const
1301 {
1302   ulint mismatch= (m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR;
1303   if (!mismatch)
1304     return DB_SUCCESS;
1305 
1306   const char *msg;
1307   if (mismatch & DICT_TF_MASK_ZIP_SSIZE)
1308   {
1309     if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) &&
1310         (m_flags & DICT_TF_MASK_ZIP_SSIZE))
1311     {
1312       switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1313       case 0U << DICT_TF_POS_ZIP_SSIZE:
1314         goto uncompressed;
1315       case 1U << DICT_TF_POS_ZIP_SSIZE:
1316         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=1";
1317         break;
1318       case 2U << DICT_TF_POS_ZIP_SSIZE:
1319         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=2";
1320         break;
1321       case 3U << DICT_TF_POS_ZIP_SSIZE:
1322         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=4";
1323         break;
1324       case 4U << DICT_TF_POS_ZIP_SSIZE:
1325         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8";
1326         break;
1327       case 5U << DICT_TF_POS_ZIP_SSIZE:
1328         msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=16";
1329         break;
1330       default:
1331         msg= "strange KEY_BLOCK_SIZE";
1332       }
1333     }
1334     else if (m_flags & DICT_TF_MASK_ZIP_SSIZE)
1335       msg= "ROW_FORMAT=COMPRESSED";
1336     else
1337       goto uncompressed;
1338   }
1339   else
1340   {
1341   uncompressed:
1342     msg= (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) ? "ROW_FORMAT=DYNAMIC"
1343          : (m_flags & DICT_TF_MASK_COMPACT)    ? "ROW_FORMAT=COMPACT"
1344                                                : "ROW_FORMAT=REDUNDANT";
1345   }
1346 
1347   ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1348           "Table flags don't match, server table has 0x%x and the meta-data "
1349           "file has 0x%zx; .cfg file uses %s",
1350           m_table->flags, m_flags, msg);
1351 
1352   return DB_ERROR;
1353 }
1354 
1355 /** Check if the table (and index) schema that was read from the .cfg file
1356 matches the in memory table definition.
1357 @param thd MySQL session variable
1358 @return DB_SUCCESS or error code. */
1359 dberr_t
match_schema(THD * thd)1360 row_import::match_schema(
1361 	THD*		thd) UNIV_NOTHROW
1362 {
1363 	/* Do some simple checks. */
1364 
1365 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1366 
1367 		/* If the number of indexes don't match then it is better
1368 		to abort the IMPORT. It is easy for the user to create a
1369 		table matching the IMPORT definition. */
1370 
1371 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1372 			"Number of indexes don't match, table has " ULINTPF
1373 			" indexes but the tablespace meta-data file has "
1374 			ULINTPF " indexes",
1375 			UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1376 
1377 		return(DB_ERROR);
1378 	}
1379 
1380 	dberr_t	err = match_table_columns(thd);
1381 
1382 	if (err != DB_SUCCESS) {
1383 		return(err);
1384 	}
1385 
1386 	/* Check if the index definitions match. */
1387 
1388 	const dict_index_t* index;
1389 
1390 	for (index = UT_LIST_GET_FIRST(m_table->indexes);
1391 	     index != 0;
1392 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1393 
1394 		dberr_t	index_err;
1395 
1396 		index_err = match_index_columns(thd, index);
1397 
1398 		if (index_err != DB_SUCCESS) {
1399 			err = index_err;
1400 		}
1401 	}
1402 
1403 	return(err);
1404 }
1405 
1406 /**
1407 Set the index root <space, pageno>, using index name. */
1408 void
set_root_by_name()1409 row_import::set_root_by_name() UNIV_NOTHROW
1410 {
1411 	row_index_t*	cfg_index = m_indexes;
1412 
1413 	for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1414 		dict_index_t*	index;
1415 
1416 		const char*	index_name;
1417 
1418 		index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1419 
1420 		index = dict_table_get_index_on_name(m_table, index_name);
1421 
1422 		/* We've already checked that it exists. */
1423 		ut_a(index != 0);
1424 
1425 		index->page = static_cast<uint32_t>(cfg_index->m_page_no);
1426 	}
1427 }
1428 
1429 /**
1430 Set the index root <space, pageno>, using a heuristic.
1431 @return DB_SUCCESS or error code */
1432 dberr_t
set_root_by_heuristic()1433 row_import::set_root_by_heuristic() UNIV_NOTHROW
1434 {
1435 	row_index_t*	cfg_index = m_indexes;
1436 
1437 	ut_a(m_n_indexes > 0);
1438 
1439 	// TODO: For now use brute force, based on ordinality
1440 
1441 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1442 
1443 		ib::warn() << "Table " << m_table->name << " should have "
1444 			<< UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1445 			" the tablespace has " << m_n_indexes << " indexes";
1446 	}
1447 
1448 	dict_mutex_enter_for_mysql();
1449 
1450 	ulint	i = 0;
1451 	dberr_t	err = DB_SUCCESS;
1452 
1453 	for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1454 	     index != 0;
1455 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1456 
1457 		if (index->type & DICT_FTS) {
1458 			index->type |= DICT_CORRUPT;
1459 			ib::warn() << "Skipping FTS index: " << index->name;
1460 		} else if (i < m_n_indexes) {
1461 
1462 			UT_DELETE_ARRAY(cfg_index[i].m_name);
1463 
1464 			ulint	len = strlen(index->name) + 1;
1465 
1466 			cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1467 
1468 			/* Trigger OOM */
1469 			DBUG_EXECUTE_IF(
1470 				"ib_import_OOM_14",
1471 				UT_DELETE_ARRAY(cfg_index[i].m_name);
1472 				cfg_index[i].m_name = NULL;
1473 			);
1474 
1475 			if (cfg_index[i].m_name == NULL) {
1476 				err = DB_OUT_OF_MEMORY;
1477 				break;
1478 			}
1479 
1480 			memcpy(cfg_index[i].m_name, index->name, len);
1481 
1482 			cfg_index[i].m_srv_index = index;
1483 
1484 			index->page = static_cast<uint32_t>(
1485 				cfg_index[i++].m_page_no);
1486 		}
1487 	}
1488 
1489 	dict_mutex_exit_for_mysql();
1490 
1491 	return(err);
1492 }
1493 
1494 /**
1495 Purge delete marked records.
1496 @return DB_SUCCESS or error code. */
1497 dberr_t
garbage_collect()1498 IndexPurge::garbage_collect() UNIV_NOTHROW
1499 {
1500 	dberr_t	err;
1501 	ibool	comp = dict_table_is_comp(m_index->table);
1502 
1503 	/* Open the persistent cursor and start the mini-transaction. */
1504 
1505 	open();
1506 
1507 	while ((err = next()) == DB_SUCCESS) {
1508 
1509 		rec_t*	rec = btr_pcur_get_rec(&m_pcur);
1510 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1511 
1512 		if (!deleted) {
1513 			++m_n_rows;
1514 		} else {
1515 			purge();
1516 		}
1517 	}
1518 
1519 	/* Close the persistent cursor and commit the mini-transaction. */
1520 
1521 	close();
1522 
1523 	return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1524 }
1525 
1526 /**
1527 Begin import, position the cursor on the first record. */
1528 void
open()1529 IndexPurge::open() UNIV_NOTHROW
1530 {
1531 	mtr_start(&m_mtr);
1532 
1533 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1534 
1535 	btr_pcur_open_at_index_side(
1536 		true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1537 	btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1538 	if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) {
1539 		ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1540 		/* Skip the metadata pseudo-record. */
1541 	} else {
1542 		btr_pcur_move_to_prev_on_page(&m_pcur);
1543 	}
1544 }
1545 
1546 /**
1547 Close the persistent curosr and commit the mini-transaction. */
1548 void
close()1549 IndexPurge::close() UNIV_NOTHROW
1550 {
1551 	btr_pcur_close(&m_pcur);
1552 	mtr_commit(&m_mtr);
1553 }
1554 
1555 /**
1556 Position the cursor on the next record.
1557 @return DB_SUCCESS or error code */
1558 dberr_t
next()1559 IndexPurge::next() UNIV_NOTHROW
1560 {
1561 	btr_pcur_move_to_next_on_page(&m_pcur);
1562 
1563 	/* When switching pages, commit the mini-transaction
1564 	in order to release the latch on the old page. */
1565 
1566 	if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1567 		return(DB_SUCCESS);
1568 	} else if (trx_is_interrupted(m_trx)) {
1569 		/* Check after every page because the check
1570 		is expensive. */
1571 		return(DB_INTERRUPTED);
1572 	}
1573 
1574 	btr_pcur_store_position(&m_pcur, &m_mtr);
1575 
1576 	mtr_commit(&m_mtr);
1577 
1578 	mtr_start(&m_mtr);
1579 
1580 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1581 
1582 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1583 	/* The following is based on btr_pcur_move_to_next_user_rec(). */
1584 	m_pcur.old_stored = false;
1585 	ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1586 	do {
1587 		if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1588 			if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1589 				return DB_END_OF_INDEX;
1590 			}
1591 
1592 			buf_block_t* block = btr_pcur_get_block(&m_pcur);
1593 			uint32_t next_page = btr_page_get_next(block->frame);
1594 
1595 			/* MDEV-13542 FIXME: Make these checks part of
1596 			btr_pcur_move_to_next_page(), and introduce a
1597 			return status that will be checked in all callers! */
1598 			switch (next_page) {
1599 			default:
1600 				if (next_page != block->page.id().page_no()) {
1601 					break;
1602 				}
1603 				/* MDEV-20931 FIXME: Check that
1604 				next_page is within the tablespace
1605 				bounds! Also check that it is not a
1606 				change buffer bitmap page. */
1607 				/* fall through */
1608 			case 0:
1609 			case 1:
1610 			case FIL_NULL:
1611 				return DB_CORRUPTION;
1612 			}
1613 
1614 			dict_index_t* index = m_pcur.btr_cur.index;
1615 			buf_block_t* next_block = btr_block_get(
1616 				*index, next_page, BTR_MODIFY_LEAF, false,
1617 				&m_mtr);
1618 
1619 			if (UNIV_UNLIKELY(!next_block
1620 					  || !fil_page_index_page_check(
1621 						  next_block->frame)
1622 					  || !!dict_index_is_spatial(index)
1623 					  != (fil_page_get_type(
1624 						      next_block->frame)
1625 					      == FIL_PAGE_RTREE)
1626 					  || page_is_comp(next_block->frame)
1627 					  != page_is_comp(block->frame)
1628 					  || btr_page_get_prev(
1629 						  next_block->frame)
1630 					  != block->page.id().page_no())) {
1631 				return DB_CORRUPTION;
1632 			}
1633 
1634 			btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1635 
1636 			page_cur_set_before_first(next_block,
1637 						  &m_pcur.btr_cur.page_cur);
1638 
1639 			ut_d(page_check_dir(next_block->frame));
1640 		} else {
1641 			btr_pcur_move_to_next_on_page(&m_pcur);
1642 		}
1643 	} while (!btr_pcur_is_on_user_rec(&m_pcur));
1644 
1645 	return DB_SUCCESS;
1646 }
1647 
1648 /**
1649 Store the persistent cursor position and reopen the
1650 B-tree cursor in BTR_MODIFY_TREE mode, because the
1651 tree structure may be changed during a pessimistic delete. */
1652 void
purge_pessimistic_delete()1653 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1654 {
1655 	dberr_t	err;
1656 
1657 	btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1658 				  &m_pcur, &m_mtr);
1659 
1660 	ut_ad(rec_get_deleted_flag(
1661 			btr_pcur_get_rec(&m_pcur),
1662 			dict_table_is_comp(m_index->table)));
1663 
1664 	btr_cur_pessimistic_delete(
1665 		&err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1666 
1667 	ut_a(err == DB_SUCCESS);
1668 
1669 	/* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1670 	mtr_commit(&m_mtr);
1671 }
1672 
1673 /**
1674 Purge delete-marked records. */
1675 void
purge()1676 IndexPurge::purge() UNIV_NOTHROW
1677 {
1678 	btr_pcur_store_position(&m_pcur, &m_mtr);
1679 
1680 	purge_pessimistic_delete();
1681 
1682 	mtr_start(&m_mtr);
1683 
1684 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1685 
1686 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1687 }
1688 
1689 /** Adjust the BLOB reference for a single column that is externally stored
1690 @param rec record to update
1691 @param offsets column offsets for the record
1692 @param i column ordinal value
1693 @return DB_SUCCESS or error code */
1694 inline
1695 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1696 PageConverter::adjust_cluster_index_blob_column(
1697 	rec_t*		rec,
1698 	const rec_offs*	offsets,
1699 	ulint		i) UNIV_NOTHROW
1700 {
1701 	ulint		len;
1702 	byte*		field;
1703 
1704 	field = rec_get_nth_field(rec, offsets, i, &len);
1705 
1706 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1707 			len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1708 
1709 	if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1710 
1711 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1712 			ER_INNODB_INDEX_CORRUPT,
1713 			"Externally stored column(" ULINTPF
1714 			") has a reference length of " ULINTPF
1715 			" in the cluster index %s",
1716 			i, len, m_cluster_index->name());
1717 
1718 		return(DB_CORRUPTION);
1719 	}
1720 
1721 	field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1722 
1723 	mach_write_to_4(field, get_space_id());
1724 
1725 	if (UNIV_LIKELY_NULL(m_rec_iter.current_block()->page.zip.data)) {
1726 		page_zip_write_blob_ptr(
1727 			m_rec_iter.current_block(), rec, m_cluster_index,
1728 			offsets, i, &m_rec_iter.m_mtr);
1729 	}
1730 
1731 	return(DB_SUCCESS);
1732 }
1733 
1734 /** Adjusts the BLOB reference in the clustered index row for all externally
1735 stored columns.
1736 @param rec record to update
1737 @param offsets column offsets for the record
1738 @return DB_SUCCESS or error code */
1739 inline
1740 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1741 PageConverter::adjust_cluster_index_blob_columns(
1742 	rec_t*		rec,
1743 	const rec_offs*	offsets) UNIV_NOTHROW
1744 {
1745 	ut_ad(rec_offs_any_extern(offsets));
1746 
1747 	/* Adjust the space_id in the BLOB pointers. */
1748 
1749 	for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1750 
1751 		/* Only if the column is stored "externally". */
1752 
1753 		if (rec_offs_nth_extern(offsets, i)) {
1754 			dberr_t	err;
1755 
1756 			err = adjust_cluster_index_blob_column(rec, offsets, i);
1757 
1758 			if (err != DB_SUCCESS) {
1759 				return(err);
1760 			}
1761 		}
1762 	}
1763 
1764 	return(DB_SUCCESS);
1765 }
1766 
1767 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1768 BLOB reference, write the new space id.
1769 @param rec record to update
1770 @param offsets column offsets for the record
1771 @return DB_SUCCESS or error code */
1772 inline
1773 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1774 PageConverter::adjust_cluster_index_blob_ref(
1775 	rec_t*		rec,
1776 	const rec_offs*	offsets) UNIV_NOTHROW
1777 {
1778 	if (rec_offs_any_extern(offsets)) {
1779 		dberr_t	err;
1780 
1781 		err = adjust_cluster_index_blob_columns(rec, offsets);
1782 
1783 		if (err != DB_SUCCESS) {
1784 			return(err);
1785 		}
1786 	}
1787 
1788 	return(DB_SUCCESS);
1789 }
1790 
1791 /** Purge delete-marked records, only if it is possible to do so without
1792 re-organising the B+tree.
1793 @return true if purge succeeded */
purge()1794 inline bool PageConverter::purge() UNIV_NOTHROW
1795 {
1796 	const dict_index_t*	index = m_index->m_srv_index;
1797 
1798 	/* We can't have a page that is empty and not root. */
1799 	if (m_rec_iter.remove(index, m_offsets)) {
1800 
1801 		++m_index->m_stats.m_n_purged;
1802 
1803 		return(true);
1804 	} else {
1805 		++m_index->m_stats.m_n_purge_failed;
1806 	}
1807 
1808 	return(false);
1809 }
1810 
1811 /** Adjust the BLOB references and sys fields for the current record.
1812 @param rec record to update
1813 @param offsets column offsets for the record
1814 @return DB_SUCCESS or error code. */
1815 inline
1816 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1817 PageConverter::adjust_cluster_record(
1818 	rec_t*			rec,
1819 	const rec_offs*		offsets) UNIV_NOTHROW
1820 {
1821 	dberr_t	err;
1822 
1823 	if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1824 
1825 		/* Reset DB_TRX_ID and DB_ROLL_PTR.  Normally, these fields
1826 		are only written in conjunction with other changes to the
1827 		record. */
1828 		ulint	trx_id_pos = m_cluster_index->n_uniq
1829 			? m_cluster_index->n_uniq : 1;
1830 		if (UNIV_LIKELY_NULL(m_rec_iter.current_block()
1831 				     ->page.zip.data)) {
1832 			page_zip_write_trx_id_and_roll_ptr(
1833 				m_rec_iter.current_block(),
1834 				rec, m_offsets, trx_id_pos,
1835 				0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1836 				&m_rec_iter.m_mtr);
1837 		} else {
1838 			ulint	len;
1839 			byte*	ptr = rec_get_nth_field(
1840 				rec, m_offsets, trx_id_pos, &len);
1841 			ut_ad(len == DATA_TRX_ID_LEN);
1842 			memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1843 		}
1844 	}
1845 
1846 	return(err);
1847 }
1848 
1849 /** Update the BLOB refrences and write UNDO log entries for
1850 rows that can't be purged optimistically.
1851 @param block block to update
1852 @retval DB_SUCCESS or error code */
1853 inline
1854 dberr_t
update_records(buf_block_t * block)1855 PageConverter::update_records(
1856 	buf_block_t*	block) UNIV_NOTHROW
1857 {
1858 	ibool	comp = dict_table_is_comp(m_cfg->m_table);
1859 	bool	clust_index = m_index->m_srv_index == m_cluster_index;
1860 
1861 	/* This will also position the cursor on the first user record. */
1862 
1863 	m_rec_iter.open(block);
1864 
1865 	while (!m_rec_iter.end()) {
1866 		rec_t*	rec = m_rec_iter.current();
1867 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1868 
1869 		/* For the clustered index we have to adjust the BLOB
1870 		reference and the system fields irrespective of the
1871 		delete marked flag. The adjustment of delete marked
1872 		cluster records is required for purge to work later. */
1873 
1874 		if (deleted || clust_index) {
1875 			m_offsets = rec_get_offsets(
1876 				rec, m_index->m_srv_index, m_offsets,
1877 				m_index->m_srv_index->n_core_fields,
1878 				ULINT_UNDEFINED, &m_heap);
1879 		}
1880 
1881 		if (clust_index) {
1882 
1883 			dberr_t err = adjust_cluster_record(rec, m_offsets);
1884 
1885 			if (err != DB_SUCCESS) {
1886 				return(err);
1887 			}
1888 		}
1889 
1890 		/* If it is a delete marked record then try an
1891 		optimistic delete. */
1892 
1893 		if (deleted) {
1894 			/* A successful purge will move the cursor to the
1895 			next record. */
1896 
1897 			if (!purge()) {
1898 				m_rec_iter.next();
1899 			}
1900 
1901 			++m_index->m_stats.m_n_deleted;
1902 		} else {
1903 			++m_index->m_stats.m_n_rows;
1904 			m_rec_iter.next();
1905 		}
1906 	}
1907 
1908 	return(DB_SUCCESS);
1909 }
1910 
1911 /** Update the space, index id, trx id.
1912 @return DB_SUCCESS or error code */
1913 inline
1914 dberr_t
update_index_page(buf_block_t * block)1915 PageConverter::update_index_page(
1916 	buf_block_t*	block) UNIV_NOTHROW
1917 {
1918 	const page_id_t page_id(block->page.id());
1919 
1920 	if (is_free(page_id.page_no())) {
1921 		return(DB_SUCCESS);
1922 	}
1923 
1924 	buf_frame_t* page = block->frame;
1925 	const index_id_t id = btr_page_get_index_id(page);
1926 
1927 	if (id != m_index->m_id) {
1928 		row_index_t* index = find_index(id);
1929 
1930 		if (UNIV_UNLIKELY(!index)) {
1931 			if (!m_cfg->m_missing) {
1932 				ib::warn() << "Unknown index id " << id
1933 					   << " on page " << page_id.page_no();
1934 			}
1935 			return DB_SUCCESS;
1936 		}
1937 
1938 		m_index = index;
1939 	}
1940 
1941 	/* If the .cfg file is missing and there is an index mismatch
1942 	then ignore the error. */
1943 	if (m_cfg->m_missing && !m_index->m_srv_index) {
1944 		return(DB_SUCCESS);
1945 	}
1946 
1947 	if (m_index && page_id.page_no() == m_index->m_page_no) {
1948 		byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1949 			+ page;
1950 		mach_write_to_4(b, page_id.space());
1951 
1952 		memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1953 		       + page, b, 4);
1954 		if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1955 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1956 						     + PAGE_BTR_SEG_TOP
1957 						     + FSEG_HDR_SPACE], b, 4);
1958 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1959 						     + PAGE_BTR_SEG_LEAF
1960 						     + FSEG_HDR_SPACE], b, 4);
1961 		}
1962 	}
1963 
1964 #ifdef UNIV_ZIP_DEBUG
1965 	ut_a(!block->page.zip.data || page_zip_validate(&block->page.zip, page,
1966 							m_index->m_srv_index));
1967 #endif /* UNIV_ZIP_DEBUG */
1968 
1969 	/* This has to be written to uncompressed index header. Set it to
1970 	the current index id. */
1971 	mach_write_to_8(page + (PAGE_HEADER + PAGE_INDEX_ID),
1972 			m_index->m_srv_index->id);
1973 	if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1974 		memcpy(&block->page.zip.data[PAGE_HEADER + PAGE_INDEX_ID],
1975 		       &block->frame[PAGE_HEADER + PAGE_INDEX_ID], 8);
1976 	}
1977 
1978 	if (m_index->m_srv_index->is_clust()) {
1979 		if (page_id.page_no() != m_index->m_srv_index->page) {
1980 			goto clear_page_max_trx_id;
1981 		}
1982 	} else if (page_is_leaf(page)) {
1983 		/* Set PAGE_MAX_TRX_ID on secondary index leaf pages. */
1984 		mach_write_to_8(&block->frame[PAGE_HEADER + PAGE_MAX_TRX_ID],
1985 				m_trx->id);
1986 		if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1987 			memcpy_aligned<8>(&block->page.zip.data
1988 					  [PAGE_HEADER + PAGE_MAX_TRX_ID],
1989 					  &block->frame
1990 					  [PAGE_HEADER + PAGE_MAX_TRX_ID], 8);
1991 		}
1992 	} else {
1993 clear_page_max_trx_id:
1994 		/* Clear PAGE_MAX_TRX_ID so that it can be
1995 		used for other purposes in the future. IMPORT
1996 		in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1997 		would set the field to the transaction ID even
1998 		on clustered index pages. */
1999 		memset_aligned<8>(&block->frame[PAGE_HEADER + PAGE_MAX_TRX_ID],
2000 				  0, 8);
2001 		if (UNIV_LIKELY_NULL(block->page.zip.data)) {
2002 			memset_aligned<8>(&block->page.zip.data
2003 					  [PAGE_HEADER + PAGE_MAX_TRX_ID],
2004 					  0, 8);
2005 		}
2006 	}
2007 
2008 	if (page_is_empty(page)) {
2009 
2010 		/* Only a root page can be empty. */
2011 		if (page_has_siblings(page)) {
2012 			// TODO: We should relax this and skip secondary
2013 			// indexes. Mark them as corrupt because they can
2014 			// always be rebuilt.
2015 			return(DB_CORRUPTION);
2016 		}
2017 
2018 		return(DB_SUCCESS);
2019 	}
2020 
2021 	return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
2022 }
2023 
2024 /** Validate the space flags and update tablespace header page.
2025 @param block block read from file, not from the buffer pool.
2026 @retval DB_SUCCESS or error code */
update_header(buf_block_t * block)2027 inline dberr_t PageConverter::update_header(buf_block_t* block) UNIV_NOTHROW
2028 {
2029   byte *frame= get_frame(block);
2030   if (memcmp_aligned<2>(FIL_PAGE_SPACE_ID + frame,
2031                         FSP_HEADER_OFFSET + FSP_SPACE_ID + frame, 4))
2032     ib::warn() << "Space id check in the header failed: ignored";
2033   else if (!mach_read_from_4(FIL_PAGE_SPACE_ID + frame))
2034     return DB_CORRUPTION;
2035 
2036   memset(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0, 8);
2037 
2038   /* Write space_id to the tablespace header, page 0. */
2039   mach_write_to_4(FIL_PAGE_SPACE_ID + frame, get_space_id());
2040   memcpy_aligned<2>(FSP_HEADER_OFFSET + FSP_SPACE_ID + frame,
2041                     FIL_PAGE_SPACE_ID + frame, 4);
2042   /* Write back the adjusted flags. */
2043   mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + frame, m_space_flags);
2044 
2045   return DB_SUCCESS;
2046 }
2047 
2048 /** Update the page, set the space id, max trx id and index id.
2049 @param block block read from file
2050 @retval DB_SUCCESS or error code */
2051 inline
2052 dberr_t
update_page(buf_block_t * block,uint16_t & page_type)2053 PageConverter::update_page(buf_block_t* block, uint16_t& page_type)
2054 	UNIV_NOTHROW
2055 {
2056 	dberr_t		err = DB_SUCCESS;
2057 
2058 	ut_ad(!block->page.zip.data == !is_compressed_table());
2059 
2060 	switch (page_type = fil_page_get_type(get_frame(block))) {
2061 	case FIL_PAGE_TYPE_FSP_HDR:
2062 		ut_a(block->page.id().page_no() == 0);
2063 		/* Work directly on the uncompressed page headers. */
2064 		return(update_header(block));
2065 
2066 	case FIL_PAGE_INDEX:
2067 	case FIL_PAGE_RTREE:
2068 		/* We need to decompress the contents into block->frame
2069 		before we can do any thing with Btree pages. */
2070 
2071 		if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2072 			return(DB_CORRUPTION);
2073 		}
2074 
2075 		/* fall through */
2076 	case FIL_PAGE_TYPE_INSTANT:
2077 		/* This is on every page in the tablespace. */
2078 		mach_write_to_4(
2079 			get_frame(block)
2080 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2081 
2082 		/* Only update the Btree nodes. */
2083 		return(update_index_page(block));
2084 
2085 	case FIL_PAGE_TYPE_SYS:
2086 		/* This is page 0 in the system tablespace. */
2087 		return(DB_CORRUPTION);
2088 
2089 	case FIL_PAGE_TYPE_XDES:
2090 		err = set_current_xdes(
2091 			block->page.id().page_no(), get_frame(block));
2092 		/* fall through */
2093 	case FIL_PAGE_INODE:
2094 	case FIL_PAGE_TYPE_TRX_SYS:
2095 	case FIL_PAGE_IBUF_FREE_LIST:
2096 	case FIL_PAGE_TYPE_ALLOCATED:
2097 	case FIL_PAGE_IBUF_BITMAP:
2098 	case FIL_PAGE_TYPE_BLOB:
2099 	case FIL_PAGE_TYPE_ZBLOB:
2100 	case FIL_PAGE_TYPE_ZBLOB2:
2101 
2102 		/* Work directly on the uncompressed page headers. */
2103 		/* This is on every page in the tablespace. */
2104 		mach_write_to_4(
2105 			get_frame(block)
2106 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2107 
2108 		return(err);
2109 	}
2110 
2111 	ib::warn() << "Unknown page type (" << page_type << ")";
2112 
2113 	return(DB_CORRUPTION);
2114 }
2115 
2116 /** Called for every page in the tablespace. If the page was not
2117 updated then its state must be set to BUF_PAGE_NOT_USED.
2118 @param block block read from file, note it is not from the buffer pool
2119 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2120 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2121 {
2122 	/* If we already had an old page with matching number
2123 	in the buffer pool, evict it now, because
2124 	we no longer evict the pages on DISCARD TABLESPACE. */
2125 	buf_page_get_gen(block->page.id(), get_zip_size(),
2126 			 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2127 			 __FILE__, __LINE__, NULL, NULL);
2128 
2129 	uint16_t page_type;
2130 
2131 	if (dberr_t err = update_page(block, page_type)) {
2132 		return err;
2133 	}
2134 
2135 	const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
2136 	byte* frame = get_frame(block);
2137 	memset_aligned<8>(frame + FIL_PAGE_LSN, 0, 8);
2138 
2139 	if (!block->page.zip.data) {
2140 		buf_flush_init_for_writing(
2141 			NULL, block->frame, NULL, full_crc32);
2142 	} else if (fil_page_type_is_index(page_type)) {
2143 		buf_flush_init_for_writing(
2144 			NULL, block->page.zip.data, &block->page.zip,
2145 			full_crc32);
2146 	} else {
2147 		/* Calculate and update the checksum of non-index
2148 		pages for ROW_FORMAT=COMPRESSED tables. */
2149 		buf_flush_update_zip_checksum(
2150 			block->page.zip.data, block->zip_size());
2151 	}
2152 
2153 	return DB_SUCCESS;
2154 }
2155 
2156 /*****************************************************************//**
2157 Clean up after import tablespace. */
2158 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2159 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2160 row_import_cleanup(
2161 /*===============*/
2162 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2163 	trx_t*		trx,		/*!< in/out: transaction for import */
2164 	dberr_t		err)		/*!< in: error code */
2165 {
2166 	ut_a(prebuilt->trx != trx);
2167 
2168 	if (err != DB_SUCCESS) {
2169 		dict_table_t* table = prebuilt->table;
2170 		table->file_unreadable = true;
2171 		if (table->space) {
2172 			fil_close_tablespace(table->space_id);
2173 			table->space = NULL;
2174 		}
2175 
2176 		prebuilt->trx->error_info = NULL;
2177 
2178 		ib::info() << "Discarding tablespace of table "
2179 			   << table->name << ": " << err;
2180 
2181 		if (!trx->dict_operation_lock_mode) {
2182 			row_mysql_lock_data_dictionary(trx);
2183 		}
2184 
2185 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2186 		     index;
2187 		     index = UT_LIST_GET_NEXT(indexes, index)) {
2188 			index->page = FIL_NULL;
2189 		}
2190 	}
2191 
2192 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2193 
2194 	DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2195 
2196 	trx_commit_for_mysql(trx);
2197 
2198 	row_mysql_unlock_data_dictionary(trx);
2199 
2200 	trx->free();
2201 
2202 	prebuilt->trx->op_info = "";
2203 
2204 	DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2205 
2206 	return(err);
2207 }
2208 
2209 /*****************************************************************//**
2210 Report error during tablespace import. */
2211 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2212 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2213 row_import_error(
2214 /*=============*/
2215 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2216 	trx_t*		trx,		/*!< in/out: transaction for import */
2217 	dberr_t		err)		/*!< in: error code */
2218 {
2219 	if (!trx_is_interrupted(trx)) {
2220 		char	table_name[MAX_FULL_NAME_LEN + 1];
2221 
2222 		innobase_format_name(
2223 			table_name, sizeof(table_name),
2224 			prebuilt->table->name.m_name);
2225 
2226 		ib_senderrf(
2227 			trx->mysql_thd, IB_LOG_LEVEL_WARN,
2228 			ER_INNODB_IMPORT_ERROR,
2229 			table_name, (ulong) err, ut_strerr(err));
2230 	}
2231 
2232 	return(row_import_cleanup(prebuilt, trx, err));
2233 }
2234 
2235 /*****************************************************************//**
2236 Adjust the root page index node and leaf node segment headers, update
2237 with the new space id. For all the table's secondary indexes.
2238 @return error code */
2239 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2240 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2241 row_import_adjust_root_pages_of_secondary_indexes(
2242 /*==============================================*/
2243 	trx_t*			trx,		/*!< in: transaction used for
2244 						the import */
2245 	dict_table_t*		table,		/*!< in: table the indexes
2246 						belong to */
2247 	const row_import&	cfg)		/*!< Import context */
2248 {
2249 	dict_index_t*		index;
2250 	ulint			n_rows_in_table;
2251 	dberr_t			err = DB_SUCCESS;
2252 
2253 	/* Skip the clustered index. */
2254 	index = dict_table_get_first_index(table);
2255 
2256 	n_rows_in_table = cfg.get_n_rows(index->name);
2257 
2258 	DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2259 			n_rows_in_table++;);
2260 
2261 	/* Adjust the root pages of the secondary indexes only. */
2262 	while ((index = dict_table_get_next_index(index)) != NULL) {
2263 		ut_a(!dict_index_is_clust(index));
2264 
2265 		if (!(index->type & DICT_CORRUPT)
2266 		    && index->page != FIL_NULL) {
2267 
2268 			/* Update the Btree segment headers for index node and
2269 			leaf nodes in the root page. Set the new space id. */
2270 
2271 			err = btr_root_adjust_on_import(index);
2272 		} else {
2273 			ib::warn() << "Skip adjustment of root pages for"
2274 				" index " << index->name << ".";
2275 
2276 			err = DB_CORRUPTION;
2277 		}
2278 
2279 		if (err != DB_SUCCESS) {
2280 
2281 			if (index->type & DICT_CLUSTERED) {
2282 				break;
2283 			}
2284 
2285 			ib_errf(trx->mysql_thd,
2286 				IB_LOG_LEVEL_WARN,
2287 				ER_INNODB_INDEX_CORRUPT,
2288 				"Index %s not found or corrupt,"
2289 				" you should recreate this index.",
2290 				index->name());
2291 
2292 			/* Do not bail out, so that the data
2293 			can be recovered. */
2294 
2295 			err = DB_SUCCESS;
2296 			index->type |= DICT_CORRUPT;
2297 			continue;
2298 		}
2299 
2300 		/* If we failed to purge any records in the index then
2301 		do it the hard way.
2302 
2303 		TODO: We can do this in the first pass by generating UNDO log
2304 		records for the failed rows. */
2305 
2306 		if (!cfg.requires_purge(index->name)) {
2307 			continue;
2308 		}
2309 
2310 		IndexPurge   purge(trx, index);
2311 
2312 		trx->op_info = "secondary: purge delete marked records";
2313 
2314 		err = purge.garbage_collect();
2315 
2316 		trx->op_info = "";
2317 
2318 		if (err != DB_SUCCESS) {
2319 			break;
2320 		} else if (purge.get_n_rows() != n_rows_in_table) {
2321 
2322 			ib_errf(trx->mysql_thd,
2323 				IB_LOG_LEVEL_WARN,
2324 				ER_INNODB_INDEX_CORRUPT,
2325 				"Index '%s' contains " ULINTPF " entries, "
2326 				"should be " ULINTPF ", you should recreate "
2327 				"this index.", index->name(),
2328 				purge.get_n_rows(), n_rows_in_table);
2329 
2330 			index->type |= DICT_CORRUPT;
2331 
2332 			/* Do not bail out, so that the data
2333 			can be recovered. */
2334 
2335 			err = DB_SUCCESS;
2336                 }
2337 	}
2338 
2339 	return(err);
2340 }
2341 
2342 /*****************************************************************//**
2343 Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */
2344 MY_ATTRIBUTE((nonnull)) static
2345 void
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2346 row_import_set_sys_max_row_id(
2347 /*==========================*/
2348 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2349 						handler */
2350 	const dict_table_t*	table)		/*!< in: table to import */
2351 {
2352 	const rec_t*		rec;
2353 	mtr_t			mtr;
2354 	btr_pcur_t		pcur;
2355 	row_id_t		row_id	= 0;
2356 	dict_index_t*		index;
2357 
2358 	index = dict_table_get_first_index(table);
2359 	ut_ad(index->is_primary());
2360 	ut_ad(dict_index_is_auto_gen_clust(index));
2361 
2362 	mtr_start(&mtr);
2363 
2364 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2365 
2366 	btr_pcur_open_at_index_side(
2367 		false,		// High end
2368 		index,
2369 		BTR_SEARCH_LEAF,
2370 		&pcur,
2371 		true,		// Init cursor
2372 		0,		// Leaf level
2373 		&mtr);
2374 
2375 	btr_pcur_move_to_prev_on_page(&pcur);
2376 	rec = btr_pcur_get_rec(&pcur);
2377 
2378 	/* Check for empty table. */
2379 	if (page_rec_is_infimum(rec)) {
2380 		/* The table is empty. */
2381 	} else if (rec_is_metadata(rec, *index)) {
2382 		/* The clustered index contains the metadata record only,
2383 		that is, the table is empty. */
2384 	} else {
2385 		row_id = mach_read_from_6(rec);
2386 	}
2387 
2388 	btr_pcur_close(&pcur);
2389 	mtr_commit(&mtr);
2390 
2391 	if (row_id) {
2392 		/* Update the system row id if the imported index row id is
2393 		greater than the max system row id. */
2394 
2395 		mutex_enter(&dict_sys.mutex);
2396 
2397 		if (row_id >= dict_sys.row_id) {
2398 			dict_sys.row_id = row_id + 1;
2399 			dict_hdr_flush_row_id();
2400 		}
2401 
2402 		mutex_exit(&dict_sys.mutex);
2403 	}
2404 }
2405 
2406 /*****************************************************************//**
2407 Read the a string from the meta data file.
2408 @return DB_SUCCESS or error code. */
2409 static
2410 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2411 row_import_cfg_read_string(
2412 /*=======================*/
2413 	FILE*		file,		/*!< in/out: File to read from */
2414 	byte*		ptr,		/*!< out: string to read */
2415 	ulint		max_len)	/*!< in: maximum length of the output
2416 					buffer in bytes */
2417 {
2418 	DBUG_EXECUTE_IF("ib_import_string_read_error",
2419 			errno = EINVAL; return(DB_IO_ERROR););
2420 
2421 	ulint		len = 0;
2422 
2423 	while (!feof(file)) {
2424 		int	ch = fgetc(file);
2425 
2426 		if (ch == EOF) {
2427 			break;
2428 		} else if (ch != 0) {
2429 			if (len < max_len) {
2430 				ptr[len++] = static_cast<byte>(ch);
2431 			} else {
2432 				break;
2433 			}
2434 		/* max_len includes the NUL byte */
2435 		} else if (len != max_len - 1) {
2436 			break;
2437 		} else {
2438 			ptr[len] = 0;
2439 			return(DB_SUCCESS);
2440 		}
2441 	}
2442 
2443 	errno = EINVAL;
2444 
2445 	return(DB_IO_ERROR);
2446 }
2447 
2448 /*********************************************************************//**
2449 Write the meta data (index user fields) config file.
2450 @return DB_SUCCESS or error code. */
2451 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2452 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2453 row_import_cfg_read_index_fields(
2454 /*=============================*/
2455 	FILE*			file,	/*!< in: file to write to */
2456 	THD*			thd,	/*!< in/out: session */
2457 	row_index_t*		index)	/*!< Index being read in */
2458 {
2459 	byte			row[sizeof(ib_uint32_t) * 3];
2460 	ulint			n_fields = index->m_n_fields;
2461 
2462 	index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2463 
2464 	/* Trigger OOM */
2465 	DBUG_EXECUTE_IF(
2466 		"ib_import_OOM_4",
2467 		UT_DELETE_ARRAY(index->m_fields);
2468 		index->m_fields = NULL;
2469 	);
2470 
2471 	if (index->m_fields == NULL) {
2472 		return(DB_OUT_OF_MEMORY);
2473 	}
2474 
2475 	dict_field_t*	field = index->m_fields;
2476 
2477 	for (ulint i = 0; i < n_fields; ++i, ++field) {
2478 		byte*		ptr = row;
2479 
2480 		/* Trigger EOF */
2481 		DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2482 				(void) fseek(file, 0L, SEEK_END););
2483 
2484 		if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2485 
2486 			ib_senderrf(
2487 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2488 				(ulong) errno, strerror(errno),
2489 				"while reading index fields.");
2490 
2491 			return(DB_IO_ERROR);
2492 		}
2493 
2494 		new (field) dict_field_t();
2495 
2496 		field->prefix_len = mach_read_from_4(ptr) & ((1U << 12) - 1);
2497 		ptr += sizeof(ib_uint32_t);
2498 
2499 		field->fixed_len = mach_read_from_4(ptr) & ((1U << 10) - 1);
2500 		ptr += sizeof(ib_uint32_t);
2501 
2502 		/* Include the NUL byte in the length. */
2503 		ulint	len = mach_read_from_4(ptr);
2504 
2505 		byte*	name = UT_NEW_ARRAY_NOKEY(byte, len);
2506 
2507 		/* Trigger OOM */
2508 		DBUG_EXECUTE_IF(
2509 			"ib_import_OOM_5",
2510 			UT_DELETE_ARRAY(name);
2511 			name = NULL;
2512 		);
2513 
2514 		if (name == NULL) {
2515 			return(DB_OUT_OF_MEMORY);
2516 		}
2517 
2518 		field->name = reinterpret_cast<const char*>(name);
2519 
2520 		dberr_t	err = row_import_cfg_read_string(file, name, len);
2521 
2522 		if (err != DB_SUCCESS) {
2523 
2524 			ib_senderrf(
2525 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2526 				(ulong) errno, strerror(errno),
2527 				"while parsing table name.");
2528 
2529 			return(err);
2530 		}
2531 	}
2532 
2533 	return(DB_SUCCESS);
2534 }
2535 
2536 /*****************************************************************//**
2537 Read the index names and root page numbers of the indexes and set the values.
2538 Row format [root_page_no, len of str, str ... ]
2539 @return DB_SUCCESS or error code. */
2540 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2541 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2542 row_import_read_index_data(
2543 /*=======================*/
2544 	FILE*		file,		/*!< in: File to read from */
2545 	THD*		thd,		/*!< in: session */
2546 	row_import*	cfg)		/*!< in/out: meta-data read */
2547 {
2548 	byte*		ptr;
2549 	row_index_t*	cfg_index;
2550 	byte		row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2551 
2552 	/* FIXME: What is the max value? */
2553 	ut_a(cfg->m_n_indexes > 0);
2554 	ut_a(cfg->m_n_indexes < 1024);
2555 
2556 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2557 
2558 	/* Trigger OOM */
2559 	DBUG_EXECUTE_IF(
2560 		"ib_import_OOM_6",
2561 		UT_DELETE_ARRAY(cfg->m_indexes);
2562 		cfg->m_indexes = NULL;
2563 	);
2564 
2565 	if (cfg->m_indexes == NULL) {
2566 		return(DB_OUT_OF_MEMORY);
2567 	}
2568 
2569 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2570 
2571 	cfg_index = cfg->m_indexes;
2572 
2573 	for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2574 		/* Trigger EOF */
2575 		DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2576 				(void) fseek(file, 0L, SEEK_END););
2577 
2578 		/* Read the index data. */
2579 		size_t	n_bytes = fread(row, 1, sizeof(row), file);
2580 
2581 		/* Trigger EOF */
2582 		DBUG_EXECUTE_IF("ib_import_io_read_error",
2583 				(void) fseek(file, 0L, SEEK_END););
2584 
2585 		if (n_bytes != sizeof(row)) {
2586 			char	msg[BUFSIZ];
2587 
2588 			snprintf(msg, sizeof(msg),
2589 				 "while reading index meta-data, expected "
2590 				 "to read " ULINTPF
2591 				 " bytes but read only " ULINTPF " bytes",
2592 				 sizeof(row), n_bytes);
2593 
2594 			ib_senderrf(
2595 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2596 				(ulong) errno, strerror(errno), msg);
2597 
2598 			ib::error() << "IO Error: " << msg;
2599 
2600 			return(DB_IO_ERROR);
2601 		}
2602 
2603 		ptr = row;
2604 
2605 		cfg_index->m_id = mach_read_from_8(ptr);
2606 		ptr += sizeof(index_id_t);
2607 
2608 		cfg_index->m_space = mach_read_from_4(ptr);
2609 		ptr += sizeof(ib_uint32_t);
2610 
2611 		cfg_index->m_page_no = mach_read_from_4(ptr);
2612 		ptr += sizeof(ib_uint32_t);
2613 
2614 		cfg_index->m_type = mach_read_from_4(ptr);
2615 		ptr += sizeof(ib_uint32_t);
2616 
2617 		cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2618 		if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2619 			ut_ad(0);
2620 			/* Overflow. Pretend that the clustered index
2621 			has a variable-length PRIMARY KEY. */
2622 			cfg_index->m_trx_id_offset = 0;
2623 		}
2624 		ptr += sizeof(ib_uint32_t);
2625 
2626 		cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2627 		ptr += sizeof(ib_uint32_t);
2628 
2629 		cfg_index->m_n_uniq = mach_read_from_4(ptr);
2630 		ptr += sizeof(ib_uint32_t);
2631 
2632 		cfg_index->m_n_nullable = mach_read_from_4(ptr);
2633 		ptr += sizeof(ib_uint32_t);
2634 
2635 		cfg_index->m_n_fields = mach_read_from_4(ptr);
2636 		ptr += sizeof(ib_uint32_t);
2637 
2638 		/* The NUL byte is included in the name length. */
2639 		ulint	len = mach_read_from_4(ptr);
2640 
2641 		if (len > OS_FILE_MAX_PATH) {
2642 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2643 				ER_INNODB_INDEX_CORRUPT,
2644 				"Index name length (" ULINTPF ") is too long, "
2645 				"the meta-data is corrupt", len);
2646 
2647 			return(DB_CORRUPTION);
2648 		}
2649 
2650 		cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2651 
2652 		/* Trigger OOM */
2653 		DBUG_EXECUTE_IF(
2654 			"ib_import_OOM_7",
2655 			UT_DELETE_ARRAY(cfg_index->m_name);
2656 			cfg_index->m_name = NULL;
2657 		);
2658 
2659 		if (cfg_index->m_name == NULL) {
2660 			return(DB_OUT_OF_MEMORY);
2661 		}
2662 
2663 		dberr_t	err;
2664 
2665 		err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2666 
2667 		if (err != DB_SUCCESS) {
2668 
2669 			ib_senderrf(
2670 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2671 				(ulong) errno, strerror(errno),
2672 				"while parsing index name.");
2673 
2674 			return(err);
2675 		}
2676 
2677 		err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2678 
2679 		if (err != DB_SUCCESS) {
2680 			return(err);
2681 		}
2682 
2683 	}
2684 
2685 	return(DB_SUCCESS);
2686 }
2687 
2688 /*****************************************************************//**
2689 Set the index root page number for v1 format.
2690 @return DB_SUCCESS or error code. */
2691 static
2692 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2693 row_import_read_indexes(
2694 /*====================*/
2695 	FILE*		file,		/*!< in: File to read from */
2696 	THD*		thd,		/*!< in: session */
2697 	row_import*	cfg)		/*!< in/out: meta-data read */
2698 {
2699 	byte		row[sizeof(ib_uint32_t)];
2700 
2701 	/* Trigger EOF */
2702 	DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2703 			(void) fseek(file, 0L, SEEK_END););
2704 
2705 	/* Read the number of indexes. */
2706 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2707 		ib_senderrf(
2708 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2709 			(ulong) errno, strerror(errno),
2710 			"while reading number of indexes.");
2711 
2712 		return(DB_IO_ERROR);
2713 	}
2714 
2715 	cfg->m_n_indexes = mach_read_from_4(row);
2716 
2717 	if (cfg->m_n_indexes == 0) {
2718 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2719 			"Number of indexes in meta-data file is 0");
2720 
2721 		return(DB_CORRUPTION);
2722 
2723 	} else if (cfg->m_n_indexes > 1024) {
2724 		// FIXME: What is the upper limit? */
2725 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2726 			"Number of indexes in meta-data file is too high: "
2727 			ULINTPF, cfg->m_n_indexes);
2728 		cfg->m_n_indexes = 0;
2729 
2730 		return(DB_CORRUPTION);
2731 	}
2732 
2733 	return(row_import_read_index_data(file, thd, cfg));
2734 }
2735 
2736 /*********************************************************************//**
2737 Read the meta data (table columns) config file. Deserialise the contents of
2738 dict_col_t structure, along with the column name. */
2739 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2740 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2741 row_import_read_columns(
2742 /*====================*/
2743 	FILE*			file,	/*!< in: file to write to */
2744 	THD*			thd,	/*!< in/out: session */
2745 	row_import*		cfg)	/*!< in/out: meta-data read */
2746 {
2747 	dict_col_t*		col;
2748 	byte			row[sizeof(ib_uint32_t) * 8];
2749 
2750 	/* FIXME: What should the upper limit be? */
2751 	ut_a(cfg->m_n_cols > 0);
2752 	ut_a(cfg->m_n_cols < 1024);
2753 
2754 	cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2755 
2756 	/* Trigger OOM */
2757 	DBUG_EXECUTE_IF(
2758 		"ib_import_OOM_8",
2759 		UT_DELETE_ARRAY(cfg->m_cols);
2760 		cfg->m_cols = NULL;
2761 	);
2762 
2763 	if (cfg->m_cols == NULL) {
2764 		return(DB_OUT_OF_MEMORY);
2765 	}
2766 
2767 	cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2768 
2769 	/* Trigger OOM */
2770 	DBUG_EXECUTE_IF(
2771 		"ib_import_OOM_9",
2772 		UT_DELETE_ARRAY(cfg->m_col_names);
2773 		cfg->m_col_names = NULL;
2774 	);
2775 
2776 	if (cfg->m_col_names == NULL) {
2777 		return(DB_OUT_OF_MEMORY);
2778 	}
2779 
2780 	memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2781 	memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2782 
2783 	col = cfg->m_cols;
2784 
2785 	for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2786 		byte*		ptr = row;
2787 
2788 		/* Trigger EOF */
2789 		DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2790 				(void) fseek(file, 0L, SEEK_END););
2791 
2792 		if (fread(row, 1,  sizeof(row), file) != sizeof(row)) {
2793 			ib_senderrf(
2794 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2795 				(ulong) errno, strerror(errno),
2796 				"while reading table column meta-data.");
2797 
2798 			return(DB_IO_ERROR);
2799 		}
2800 
2801 		col->prtype = mach_read_from_4(ptr);
2802 		ptr += sizeof(ib_uint32_t);
2803 
2804 		col->mtype = static_cast<byte>(mach_read_from_4(ptr));
2805 		ptr += sizeof(ib_uint32_t);
2806 
2807 		col->len = static_cast<uint16_t>(mach_read_from_4(ptr));
2808 		ptr += sizeof(ib_uint32_t);
2809 
2810 		uint32_t mbminmaxlen = mach_read_from_4(ptr);
2811 		col->mbmaxlen = (mbminmaxlen / 5) & 7;
2812 		col->mbminlen = (mbminmaxlen % 5) & 7;
2813 		ptr += sizeof(ib_uint32_t);
2814 
2815 		col->ind = mach_read_from_4(ptr) & dict_index_t::MAX_N_FIELDS;
2816 		ptr += sizeof(ib_uint32_t);
2817 
2818 		col->ord_part = mach_read_from_4(ptr) & 1;
2819 		ptr += sizeof(ib_uint32_t);
2820 
2821 		col->max_prefix = mach_read_from_4(ptr) & ((1U << 12) - 1);
2822 		ptr += sizeof(ib_uint32_t);
2823 
2824 		/* Read in the column name as [len, byte array]. The len
2825 		includes the NUL byte. */
2826 
2827 		ulint		len = mach_read_from_4(ptr);
2828 
2829 		/* FIXME: What is the maximum column name length? */
2830 		if (len == 0 || len > 128) {
2831 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2832 				ER_IO_READ_ERROR,
2833 				"Column name length " ULINTPF ", is invalid",
2834 				len);
2835 
2836 			return(DB_CORRUPTION);
2837 		}
2838 
2839 		cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2840 
2841 		/* Trigger OOM */
2842 		DBUG_EXECUTE_IF(
2843 			"ib_import_OOM_10",
2844 			UT_DELETE_ARRAY(cfg->m_col_names[i]);
2845 			cfg->m_col_names[i] = NULL;
2846 		);
2847 
2848 		if (cfg->m_col_names[i] == NULL) {
2849 			return(DB_OUT_OF_MEMORY);
2850 		}
2851 
2852 		dberr_t	err;
2853 
2854 		err = row_import_cfg_read_string(
2855 			file, cfg->m_col_names[i], len);
2856 
2857 		if (err != DB_SUCCESS) {
2858 
2859 			ib_senderrf(
2860 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2861 				(ulong) errno, strerror(errno),
2862 				"while parsing table column name.");
2863 
2864 			return(err);
2865 		}
2866 	}
2867 
2868 	return(DB_SUCCESS);
2869 }
2870 
2871 /*****************************************************************//**
2872 Read the contents of the <tablespace>.cfg file.
2873 @return DB_SUCCESS or error code. */
2874 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2875 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2876 row_import_read_v1(
2877 /*===============*/
2878 	FILE*		file,		/*!< in: File to read from */
2879 	THD*		thd,		/*!< in: session */
2880 	row_import*	cfg)		/*!< out: meta data */
2881 {
2882 	byte		value[sizeof(ib_uint32_t)];
2883 
2884 	/* Trigger EOF */
2885 	DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2886 			(void) fseek(file, 0L, SEEK_END););
2887 
2888 	/* Read the hostname where the tablespace was exported. */
2889 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2890 		ib_senderrf(
2891 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2892 			(ulong) errno, strerror(errno),
2893 			"while reading meta-data export hostname length.");
2894 
2895 		return(DB_IO_ERROR);
2896 	}
2897 
2898 	ulint	len = mach_read_from_4(value);
2899 
2900 	/* NUL byte is part of name length. */
2901 	cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2902 
2903 	/* Trigger OOM */
2904 	DBUG_EXECUTE_IF(
2905 		"ib_import_OOM_1",
2906 		UT_DELETE_ARRAY(cfg->m_hostname);
2907 		cfg->m_hostname = NULL;
2908 	);
2909 
2910 	if (cfg->m_hostname == NULL) {
2911 		return(DB_OUT_OF_MEMORY);
2912 	}
2913 
2914 	dberr_t	err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2915 
2916 	if (err != DB_SUCCESS) {
2917 
2918 		ib_senderrf(
2919 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2920 			(ulong) errno, strerror(errno),
2921 			"while parsing export hostname.");
2922 
2923 		return(err);
2924 	}
2925 
2926 	/* Trigger EOF */
2927 	DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2928 			(void) fseek(file, 0L, SEEK_END););
2929 
2930 	/* Read the table name of tablespace that was exported. */
2931 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2932 		ib_senderrf(
2933 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2934 			(ulong) errno, strerror(errno),
2935 			"while reading meta-data table name length.");
2936 
2937 		return(DB_IO_ERROR);
2938 	}
2939 
2940 	len = mach_read_from_4(value);
2941 
2942 	/* NUL byte is part of name length. */
2943 	cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2944 
2945 	/* Trigger OOM */
2946 	DBUG_EXECUTE_IF(
2947 		"ib_import_OOM_2",
2948 		UT_DELETE_ARRAY(cfg->m_table_name);
2949 		cfg->m_table_name = NULL;
2950 	);
2951 
2952 	if (cfg->m_table_name == NULL) {
2953 		return(DB_OUT_OF_MEMORY);
2954 	}
2955 
2956 	err = row_import_cfg_read_string(file, cfg->m_table_name, len);
2957 
2958 	if (err != DB_SUCCESS) {
2959 		ib_senderrf(
2960 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2961 			(ulong) errno, strerror(errno),
2962 			"while parsing table name.");
2963 
2964 		return(err);
2965 	}
2966 
2967 	ib::info() << "Importing tablespace for table '" << cfg->m_table_name
2968 		<< "' that was exported from host '" << cfg->m_hostname << "'";
2969 
2970 	byte		row[sizeof(ib_uint32_t) * 3];
2971 
2972 	/* Trigger EOF */
2973 	DBUG_EXECUTE_IF("ib_import_io_read_error_7",
2974 			(void) fseek(file, 0L, SEEK_END););
2975 
2976 	/* Read the autoinc value. */
2977 	if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
2978 		ib_senderrf(
2979 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2980 			(ulong) errno, strerror(errno),
2981 			"while reading autoinc value.");
2982 
2983 		return(DB_IO_ERROR);
2984 	}
2985 
2986 	cfg->m_autoinc = mach_read_from_8(row);
2987 
2988 	/* Trigger EOF */
2989 	DBUG_EXECUTE_IF("ib_import_io_read_error_8",
2990 			(void) fseek(file, 0L, SEEK_END););
2991 
2992 	/* Read the tablespace page size. */
2993 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2994 		ib_senderrf(
2995 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2996 			(ulong) errno, strerror(errno),
2997 			"while reading meta-data header.");
2998 
2999 		return(DB_IO_ERROR);
3000 	}
3001 
3002 	byte*		ptr = row;
3003 
3004 	const ulint	logical_page_size = mach_read_from_4(ptr);
3005 	ptr += sizeof(ib_uint32_t);
3006 
3007 	if (logical_page_size != srv_page_size) {
3008 
3009 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3010 			"Tablespace to be imported has a different"
3011 			" page size than this server. Server page size"
3012 			" is %lu, whereas tablespace page size"
3013 			" is " ULINTPF,
3014 			srv_page_size,
3015 			logical_page_size);
3016 
3017 		return(DB_ERROR);
3018 	}
3019 
3020 	cfg->m_flags = mach_read_from_4(ptr);
3021 	ptr += sizeof(ib_uint32_t);
3022 
3023 	cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags);
3024 	cfg->m_n_cols = mach_read_from_4(ptr);
3025 
3026 	if (!dict_tf_is_valid(cfg->m_flags)) {
3027 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
3028 			ER_TABLE_SCHEMA_MISMATCH,
3029 			"Invalid table flags: " ULINTPF, cfg->m_flags);
3030 
3031 		return(DB_CORRUPTION);
3032 	}
3033 
3034 	err = row_import_read_columns(file, thd, cfg);
3035 
3036 	if (err == DB_SUCCESS) {
3037 		err = row_import_read_indexes(file, thd, cfg);
3038 	}
3039 
3040 	return(err);
3041 }
3042 
3043 /**
3044 Read the contents of the <tablespace>.cfg file.
3045 @return DB_SUCCESS or error code. */
3046 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3047 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3048 row_import_read_meta_data(
3049 /*======================*/
3050 	FILE*		file,		/*!< in: File to read from */
3051 	THD*		thd,		/*!< in: session */
3052 	row_import&	cfg)		/*!< out: contents of the .cfg file */
3053 {
3054 	byte		row[sizeof(ib_uint32_t)];
3055 
3056 	/* Trigger EOF */
3057 	DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3058 			(void) fseek(file, 0L, SEEK_END););
3059 
3060 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3061 		ib_senderrf(
3062 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3063 			(ulong) errno, strerror(errno),
3064 			"while reading meta-data version.");
3065 
3066 		return(DB_IO_ERROR);
3067 	}
3068 
3069 	cfg.m_version = mach_read_from_4(row);
3070 
3071 	/* Check the version number. */
3072 	switch (cfg.m_version) {
3073 	case IB_EXPORT_CFG_VERSION_V1:
3074 
3075 		return(row_import_read_v1(file, thd, &cfg));
3076 	default:
3077 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3078 			"Unsupported meta-data version number (" ULINTPF "), "
3079 			"file ignored", cfg.m_version);
3080 	}
3081 
3082 	return(DB_ERROR);
3083 }
3084 
3085 #define BTR_BLOB_HDR_PART_LEN 0 /*!< BLOB part len on this page */
3086 #define BTR_BLOB_HDR_NEXT_PAGE_NO 4 /*!< next BLOB part page no,
3087                                     FIL_NULL if none */
3088 #define BTR_BLOB_HDR_SIZE 8 /*!< Size of a BLOB part header, in bytes */
3089 
3090 /* decrypt and decompress page if needed */
decrypt_decompress(fil_space_crypt_t * space_crypt,size_t space_flags,span<byte> page,size_t space_id,byte * page_compress_buf)3091 static dberr_t decrypt_decompress(fil_space_crypt_t *space_crypt,
3092                                   size_t space_flags, span<byte> page,
3093                                   size_t space_id, byte *page_compress_buf)
3094 {
3095   auto *data= page.data();
3096 
3097   if (space_crypt && space_crypt->should_encrypt())
3098   {
3099     if (!buf_page_verify_crypt_checksum(data, space_flags))
3100       return DB_CORRUPTION;
3101 
3102     if (dberr_t err= fil_space_decrypt(space_id, space_crypt, data,
3103                                        page.size(), space_flags, data))
3104       return err;
3105   }
3106 
3107   bool page_compressed= false;
3108 
3109   if (fil_space_t::full_crc32(space_flags) &&
3110       fil_space_t::is_compressed(space_flags))
3111     page_compressed= buf_page_is_compressed(data, space_flags);
3112   else
3113   {
3114     switch (fil_page_get_type(data)) {
3115     case FIL_PAGE_PAGE_COMPRESSED:
3116     case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3117       page_compressed= true;
3118     }
3119   }
3120 
3121   if (page_compressed)
3122   {
3123     auto compress_length=
3124       fil_page_decompress(page_compress_buf, data, space_flags);
3125     ut_ad(compress_length != srv_page_size);
3126 
3127     if (compress_length == 0)
3128       return DB_CORRUPTION;
3129   }
3130 
3131   return DB_SUCCESS;
3132 }
3133 
get_buf_size()3134 static size_t get_buf_size()
3135 {
3136   return srv_page_size
3137 #ifdef HAVE_LZO
3138          + LZO1X_1_15_MEM_COMPRESS
3139 #elif defined HAVE_SNAPPY
3140          + snappy_max_compressed_length(srv_page_size)
3141 #endif
3142       ;
3143 }
3144 
3145 /* find, parse instant metadata, performing variaous checks,
3146 and apply it to dict_table_t
3147 @return DB_SUCCESS or some error */
handle_instant_metadata(dict_table_t * table,const row_import & cfg)3148 static dberr_t handle_instant_metadata(dict_table_t *table,
3149                                        const row_import &cfg)
3150 {
3151   dict_get_and_save_data_dir_path(table, false);
3152 
3153   char *filepath;
3154   if (DICT_TF_HAS_DATA_DIR(table->flags))
3155   {
3156     ut_a(table->data_dir_path);
3157 
3158     filepath=
3159         fil_make_filepath(table->data_dir_path, table->name.m_name, IBD, true);
3160   }
3161   else
3162     filepath= fil_make_filepath(nullptr, table->name.m_name, IBD, false);
3163 
3164   if (!filepath)
3165     return DB_OUT_OF_MEMORY;
3166 
3167   SCOPE_EXIT([filepath]() { ut_free(filepath); });
3168 
3169   bool success;
3170   auto file= os_file_create_simple_no_error_handling(
3171       innodb_data_file_key, filepath, OS_FILE_OPEN, OS_FILE_READ_WRITE, false,
3172       &success);
3173   if (!success)
3174     return DB_IO_ERROR;
3175 
3176   if (os_file_get_size(file) < srv_page_size * 4)
3177     return DB_CORRUPTION;
3178 
3179   SCOPE_EXIT([&file]() { os_file_close(file); });
3180 
3181   std::unique_ptr<byte[], decltype(&aligned_free)> first_page(
3182       static_cast<byte *>(aligned_malloc(srv_page_size, srv_page_size)),
3183       &aligned_free);
3184 
3185   if (dberr_t err= os_file_read_no_error_handling(IORequestReadPartial,
3186                                                   file, first_page.get(), 0,
3187                                                   srv_page_size, nullptr))
3188     return err;
3189 
3190   auto space_flags= fsp_header_get_flags(first_page.get());
3191 
3192   if (!fil_space_t::is_valid_flags(space_flags, true))
3193   {
3194     auto cflags= fsp_flags_convert_from_101(space_flags);
3195     if (cflags == ULINT_UNDEFINED)
3196     {
3197       ib::error() << "Invalid FSP_SPACE_FLAGS=" << ib::hex(space_flags);
3198       return DB_CORRUPTION;
3199     }
3200     space_flags= static_cast<decltype(space_flags)>(cflags);
3201   }
3202 
3203   if (!cfg.m_missing)
3204   {
3205     if (dberr_t err= cfg.match_flags(current_thd))
3206       return err;
3207   }
3208 
3209   const unsigned zip_size= fil_space_t::zip_size(space_flags);
3210   const unsigned physical_size= zip_size ? zip_size : unsigned(srv_page_size);
3211   ut_ad(physical_size <= UNIV_PAGE_SIZE_MAX);
3212   const uint32_t space_id= page_get_space_id(first_page.get());
3213 
3214   auto *space_crypt= fil_space_read_crypt_data(zip_size, first_page.get());
3215   SCOPE_EXIT([&space_crypt]() {
3216     if (space_crypt)
3217       fil_space_destroy_crypt_data(&space_crypt);
3218   });
3219 
3220   std::unique_ptr<byte[], decltype(&aligned_free)> page(
3221       static_cast<byte *>(
3222           aligned_malloc(UNIV_PAGE_SIZE_MAX, UNIV_PAGE_SIZE_MAX)),
3223       &aligned_free);
3224 
3225   if (dberr_t err= os_file_read_no_error_handling(
3226           IORequestReadPartial, file, page.get(), 3 * physical_size,
3227           physical_size, nullptr))
3228     return err;
3229 
3230   std::unique_ptr<byte[]> page_compress_buf(new byte[get_buf_size()]);
3231 
3232   if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3233                                       {page.get(), static_cast<size_t>
3234                                        (physical_size)},
3235                                       space_id, page_compress_buf.get()))
3236     return err;
3237 
3238   if (table->supports_instant())
3239   {
3240     dict_index_t *index= dict_table_get_first_index(table);
3241 
3242     auto tmp1= table->space_id;
3243     table->space_id= page_get_space_id(page.get());
3244     SCOPE_EXIT([tmp1, table]() { table->space_id= tmp1; });
3245 
3246     auto tmp2= index->page;
3247     index->page= page_get_page_no(page.get());
3248     SCOPE_EXIT([tmp2, index]() { index->page= tmp2; });
3249 
3250     if (!page_is_comp(page.get()) != !dict_table_is_comp(table))
3251     {
3252       ib_errf(current_thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3253               "ROW_FORMAT mismatch");
3254       return DB_CORRUPTION;
3255     }
3256 
3257     if (btr_cur_instant_root_init(index, page.get()))
3258       return DB_ERROR;
3259 
3260     ut_ad(index->n_core_null_bytes != dict_index_t::NO_CORE_NULL_BYTES);
3261 
3262     if (fil_page_get_type(page.get()) == FIL_PAGE_INDEX)
3263     {
3264       ut_ad(!index->is_instant());
3265       return DB_SUCCESS;
3266     }
3267 
3268     mem_heap_t *heap= NULL;
3269     SCOPE_EXIT([&heap]() {
3270       if (heap)
3271         mem_heap_free(heap);
3272     });
3273 
3274     while (btr_page_get_level(page.get()) != 0)
3275     {
3276       const rec_t *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3277 
3278       /* Relax the assertion in rec_init_offsets(). */
3279       ut_ad(!index->in_instant_init);
3280       ut_d(index->in_instant_init= true);
3281       rec_offs *offsets=
3282           rec_get_offsets(rec, index, nullptr, 0, ULINT_UNDEFINED, &heap);
3283       ut_d(index->in_instant_init= false);
3284 
3285       uint64_t child_page_no= btr_node_ptr_get_child_page_no(rec, offsets);
3286 
3287       if (dberr_t err=
3288           os_file_read_no_error_handling(IORequestReadPartial, file,
3289                                          page.get(),
3290                                          child_page_no * physical_size,
3291                                          physical_size, nullptr))
3292         return err;
3293 
3294       if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3295                                           {page.get(), static_cast<size_t>
3296                                            (physical_size)}, space_id,
3297                                           page_compress_buf.get()))
3298         return err;
3299     }
3300 
3301     const auto *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
3302     const auto comp= dict_table_is_comp(index->table);
3303     const auto info_bits= rec_get_info_bits(rec, comp);
3304 
3305     if (page_rec_is_supremum(rec) || !(info_bits & REC_INFO_MIN_REC_FLAG))
3306     {
3307       ib::error() << "Table " << index->table->name
3308                   << " is missing instant ALTER metadata";
3309       index->table->corrupted= true;
3310       return DB_CORRUPTION;
3311     }
3312 
3313     if ((info_bits & ~REC_INFO_DELETED_FLAG) != REC_INFO_MIN_REC_FLAG ||
3314         (comp && rec_get_status(rec) != REC_STATUS_INSTANT))
3315     {
3316     incompatible:
3317       ib::error() << "Table " << index->table->name
3318                   << " contains unrecognizable instant ALTER metadata";
3319       index->table->corrupted= true;
3320       return DB_CORRUPTION;
3321     }
3322 
3323     if (info_bits & REC_INFO_DELETED_FLAG)
3324     {
3325       ulint trx_id_offset= index->trx_id_offset;
3326       ut_ad(index->n_uniq);
3327 
3328       if (trx_id_offset)
3329       {
3330       }
3331       else if (index->table->not_redundant())
3332       {
3333 
3334         for (uint i= index->n_uniq; i--;)
3335           trx_id_offset+= index->fields[i].fixed_len;
3336       }
3337       else if (rec_get_1byte_offs_flag(rec))
3338       {
3339         trx_id_offset= rec_1_get_field_end_info(rec, index->n_uniq - 1);
3340         ut_ad(!(trx_id_offset & REC_1BYTE_SQL_NULL_MASK));
3341         trx_id_offset&= ~REC_1BYTE_SQL_NULL_MASK;
3342       }
3343       else
3344       {
3345         trx_id_offset= rec_2_get_field_end_info(rec, index->n_uniq - 1);
3346         ut_ad(!(trx_id_offset & REC_2BYTE_SQL_NULL_MASK));
3347         trx_id_offset&= ~REC_2BYTE_SQL_NULL_MASK;
3348       }
3349 
3350       const byte *ptr=
3351           rec + trx_id_offset + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3352 
3353       if (mach_read_from_4(ptr + BTR_EXTERN_LEN))
3354         goto incompatible;
3355 
3356       uint len= mach_read_from_4(ptr + BTR_EXTERN_LEN + 4);
3357       if (!len || mach_read_from_4(ptr + BTR_EXTERN_OFFSET) != FIL_PAGE_DATA)
3358         goto incompatible;
3359 
3360       std::unique_ptr<byte[], decltype(&aligned_free)>
3361         second_page(static_cast<byte*>(aligned_malloc(physical_size,
3362                                                       physical_size)),
3363                     &aligned_free);
3364 
3365       if (dberr_t err=
3366           os_file_read_no_error_handling(IORequestReadPartial, file,
3367                                          second_page.get(), physical_size *
3368                                          mach_read_from_4(ptr +
3369                                                           BTR_EXTERN_PAGE_NO),
3370                                          srv_page_size, nullptr))
3371         return err;
3372 
3373       if (dberr_t err= decrypt_decompress(space_crypt, space_flags,
3374                                           {second_page.get(),
3375                                            static_cast<size_t>(physical_size)},
3376                                           space_id, page_compress_buf.get()))
3377         return err;
3378 
3379       if (fil_page_get_type(second_page.get()) != FIL_PAGE_TYPE_BLOB ||
3380           mach_read_from_4(
3381               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_NEXT_PAGE_NO]) !=
3382               FIL_NULL ||
3383           mach_read_from_4(
3384               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_PART_LEN]) != len)
3385         goto incompatible;
3386 
3387       /* The unused part of the BLOB page should be zero-filled. */
3388       for (const byte *
3389                b= second_page.get() + (FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE) +
3390                   len,
3391               *const end= second_page.get() + srv_page_size - BTR_EXTERN_LEN;
3392            b < end;)
3393       {
3394         if (*b++)
3395           goto incompatible;
3396       }
3397 
3398       if (index->table->deserialise_columns(
3399               &second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE], len))
3400         goto incompatible;
3401     }
3402 
3403     rec_offs *offsets= rec_get_offsets(
3404         rec, index, nullptr, index->n_core_fields, ULINT_UNDEFINED, &heap);
3405     if (rec_offs_any_default(offsets))
3406     {
3407     inconsistent:
3408       goto incompatible;
3409     }
3410 
3411     /* In fact, because we only ever append fields to the metadata
3412     record, it is also OK to perform READ UNCOMMITTED and
3413     then ignore any extra fields, provided that
3414     trx_sys.is_registered(DB_TRX_ID). */
3415     if (rec_offs_n_fields(offsets) >
3416             ulint(index->n_fields) + !!index->table->instant &&
3417         !trx_sys.is_registered(current_trx(),
3418                                row_get_rec_trx_id(rec, index, offsets)))
3419       goto inconsistent;
3420 
3421     for (unsigned i= index->n_core_fields; i < index->n_fields; i++)
3422     {
3423       dict_col_t *col= index->fields[i].col;
3424       const unsigned o= i + !!index->table->instant;
3425       ulint len;
3426       const byte *data= rec_get_nth_field(rec, offsets, o, &len);
3427       ut_ad(!col->is_added());
3428       ut_ad(!col->def_val.data);
3429       col->def_val.len= len;
3430       switch (len) {
3431       case UNIV_SQL_NULL:
3432         continue;
3433       case 0:
3434         col->def_val.data= field_ref_zero;
3435         continue;
3436       }
3437       ut_ad(len != UNIV_SQL_DEFAULT);
3438       if (!rec_offs_nth_extern(offsets, o))
3439         col->def_val.data= mem_heap_dup(index->table->heap, data, len);
3440       else if (len < BTR_EXTERN_FIELD_REF_SIZE ||
3441                !memcmp(data + len - BTR_EXTERN_FIELD_REF_SIZE, field_ref_zero,
3442                        BTR_EXTERN_FIELD_REF_SIZE))
3443       {
3444         col->def_val.len= UNIV_SQL_DEFAULT;
3445         goto inconsistent;
3446       }
3447       else
3448       {
3449         col->def_val.data= btr_copy_externally_stored_field(
3450             &col->def_val.len, data, srv_page_size, len, index->table->heap);
3451       }
3452     }
3453   }
3454 
3455   return DB_SUCCESS;
3456 }
3457 
3458 /**
3459 Read the contents of the <tablename>.cfg file.
3460 @return DB_SUCCESS or error code. */
3461 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3462 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3463 row_import_read_cfg(
3464 /*================*/
3465 	dict_table_t*	table,	/*!< in: table */
3466 	THD*		thd,	/*!< in: session */
3467 	row_import&	cfg)	/*!< out: contents of the .cfg file */
3468 {
3469 	dberr_t		err;
3470 	char		name[OS_FILE_MAX_PATH];
3471 
3472 	cfg.m_table = table;
3473 
3474 	srv_get_meta_data_filename(table, name, sizeof(name));
3475 
3476 	FILE*	file = fopen(name, "rb");
3477 
3478 	if (file == NULL) {
3479 		char	msg[BUFSIZ];
3480 
3481 		snprintf(msg, sizeof(msg),
3482 			 "Error opening '%s', will attempt to import"
3483 			 " without schema verification", name);
3484 
3485 		ib_senderrf(
3486 			thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3487 			(ulong) errno, strerror(errno), msg);
3488 
3489 		cfg.m_missing = true;
3490 
3491 		err = DB_FAIL;
3492 	} else {
3493 
3494 		cfg.m_missing = false;
3495 
3496 		err = row_import_read_meta_data(file, thd, cfg);
3497 		fclose(file);
3498 	}
3499 
3500 	return(err);
3501 }
3502 
3503 /** Update the root page numbers and tablespace ID of a table.
3504 @param[in,out]	trx	dictionary transaction
3505 @param[in,out]	table	persistent table
3506 @param[in]	reset	whether to reset the fields to FIL_NULL
3507 @return DB_SUCCESS or error code */
3508 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3509 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3510 {
3511 	const dict_index_t*	index;
3512 	que_t*			graph = 0;
3513 	dberr_t			err = DB_SUCCESS;
3514 
3515 	ut_ad(reset || table->space->id == table->space_id);
3516 
3517 	static const char	sql[] = {
3518 		"PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3519 		"BEGIN\n"
3520 		"UPDATE SYS_INDEXES\n"
3521 		"SET SPACE = :space,\n"
3522 		"    PAGE_NO = :page,\n"
3523 		"    TYPE = :type\n"
3524 		"WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3525 		"END;\n"};
3526 
3527 	table->def_trx_id = trx->id;
3528 
3529 	for (index = dict_table_get_first_index(table);
3530 	     index != 0;
3531 	     index = dict_table_get_next_index(index)) {
3532 
3533 		pars_info_t*	info;
3534 		ib_uint32_t	page;
3535 		ib_uint32_t	space;
3536 		ib_uint32_t	type;
3537 		index_id_t	index_id;
3538 		table_id_t	table_id;
3539 
3540 		info = (graph != 0) ? graph->info : pars_info_create();
3541 
3542 		mach_write_to_4(
3543 			reinterpret_cast<byte*>(&type),
3544 			index->type);
3545 
3546 		mach_write_to_4(
3547 			reinterpret_cast<byte*>(&page),
3548 			reset ? FIL_NULL : index->page);
3549 
3550 		mach_write_to_4(
3551 			reinterpret_cast<byte*>(&space),
3552 			reset ? FIL_NULL : index->table->space_id);
3553 
3554 		mach_write_to_8(
3555 			reinterpret_cast<byte*>(&index_id),
3556 			index->id);
3557 
3558 		mach_write_to_8(
3559 			reinterpret_cast<byte*>(&table_id),
3560 			table->id);
3561 
3562 		/* If we set the corrupt bit during the IMPORT phase then
3563 		we need to update the system tables. */
3564 		pars_info_bind_int4_literal(info, "type", &type);
3565 		pars_info_bind_int4_literal(info, "space", &space);
3566 		pars_info_bind_int4_literal(info, "page", &page);
3567 		pars_info_bind_ull_literal(info, "index_id", &index_id);
3568 		pars_info_bind_ull_literal(info, "table_id", &table_id);
3569 
3570 		if (graph == 0) {
3571 			graph = pars_sql(info, sql);
3572 			ut_a(graph);
3573 			graph->trx = trx;
3574 		}
3575 
3576 		que_thr_t*	thr;
3577 
3578 		graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3579 
3580 		ut_a(thr = que_fork_start_command(graph));
3581 
3582 		que_run_threads(thr);
3583 
3584 		DBUG_EXECUTE_IF("ib_import_internal_error",
3585 				trx->error_state = DB_ERROR;);
3586 
3587 		err = trx->error_state;
3588 
3589 		if (err != DB_SUCCESS) {
3590 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3591 				ER_INTERNAL_ERROR,
3592 				"While updating the <space, root page"
3593 				" number> of index %s - %s",
3594 				index->name(), ut_strerr(err));
3595 
3596 			break;
3597 		}
3598 	}
3599 
3600 	que_graph_free(graph);
3601 
3602 	return(err);
3603 }
3604 
3605 /** Callback arg for row_import_set_discarded. */
3606 struct discard_t {
3607 	ib_uint32_t	flags2;			/*!< Value read from column */
3608 	bool		state;			/*!< New state of the flag */
3609 	ulint		n_recs;			/*!< Number of recs processed */
3610 };
3611 
3612 /******************************************************************//**
3613 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3614 SYS_TABLES. The flags is stored in MIX_LEN column.
3615 @return FALSE if all OK */
3616 static
3617 ibool
row_import_set_discarded(void * row,void * user_arg)3618 row_import_set_discarded(
3619 /*=====================*/
3620 	void*		row,			/*!< in: sel_node_t* */
3621 	void*		user_arg)		/*!< in: bool set/unset flag */
3622 {
3623 	sel_node_t*	node = static_cast<sel_node_t*>(row);
3624 	discard_t*	discard = static_cast<discard_t*>(user_arg);
3625 	dfield_t*	dfield = que_node_get_val(node->select_list);
3626 	dtype_t*	type = dfield_get_type(dfield);
3627 	ulint		len = dfield_get_len(dfield);
3628 
3629 	ut_a(dtype_get_mtype(type) == DATA_INT);
3630 	ut_a(len == sizeof(ib_uint32_t));
3631 
3632 	ulint	flags2 = mach_read_from_4(
3633 		static_cast<byte*>(dfield_get_data(dfield)));
3634 
3635 	if (discard->state) {
3636 		flags2 |= DICT_TF2_DISCARDED;
3637 	} else {
3638 		flags2 &= ~DICT_TF2_DISCARDED;
3639 	}
3640 
3641 	mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3642 
3643 	++discard->n_recs;
3644 
3645 	/* There should be at most one matching record. */
3646 	ut_a(discard->n_recs == 1);
3647 
3648 	return(FALSE);
3649 }
3650 
3651 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3652 @param[in,out]	trx		dictionary transaction
3653 @param[in]	table_id	table identifier
3654 @param[in]	discarded	whether to set or clear the flag
3655 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3656 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3657 					 bool discarded)
3658 {
3659 	pars_info_t*		info;
3660 	discard_t		discard;
3661 
3662 	static const char	sql[] =
3663 		"PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3664 		"DECLARE FUNCTION my_func;\n"
3665 		"DECLARE CURSOR c IS\n"
3666 		" SELECT MIX_LEN"
3667 		" FROM SYS_TABLES"
3668 		" WHERE ID = :table_id FOR UPDATE;"
3669 		"\n"
3670 		"BEGIN\n"
3671 		"OPEN c;\n"
3672 		"WHILE 1 = 1 LOOP\n"
3673 		"  FETCH c INTO my_func();\n"
3674 		"  IF c % NOTFOUND THEN\n"
3675 		"    EXIT;\n"
3676 		"  END IF;\n"
3677 		"END LOOP;\n"
3678 		"UPDATE SYS_TABLES"
3679 		" SET MIX_LEN = :flags2"
3680 		" WHERE ID = :table_id;\n"
3681 		"CLOSE c;\n"
3682 		"END;\n";
3683 
3684 	discard.n_recs = 0;
3685 	discard.state = discarded;
3686 	discard.flags2 = ULINT32_UNDEFINED;
3687 
3688 	info = pars_info_create();
3689 
3690 	pars_info_add_ull_literal(info, "table_id", table_id);
3691 	pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3692 
3693 	pars_info_bind_function(
3694 		info, "my_func", row_import_set_discarded, &discard);
3695 
3696 	dberr_t	err = que_eval_sql(info, sql, false, trx);
3697 
3698 	ut_a(discard.n_recs == 1);
3699 	ut_a(discard.flags2 != ULINT32_UNDEFINED);
3700 
3701 	return(err);
3702 }
3703 
3704 /** InnoDB writes page by page when there is page compressed
3705 tablespace involved. It does help to save the disk space when
3706 punch hole is enabled
3707 @param iter     Tablespace iterator
3708 @param full_crc32    whether the file is in the full_crc32 format
3709 @param offset   offset of the file to be written
3710 @param writeptr buffer to be written
3711 @param n_bytes  number of bytes to be written
3712 @param try_punch_only   Try the range punch only because the
3713                         current range is full of empty pages
3714 @return DB_SUCCESS */
3715 static
fil_import_compress_fwrite(const fil_iterator_t & iter,bool full_crc32,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3716 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3717                                    bool full_crc32,
3718                                    os_offset_t offset,
3719                                    const byte *writeptr,
3720                                    ulint n_bytes,
3721                                    bool try_punch_only= false)
3722 {
3723   if (dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes))
3724     return err;
3725 
3726   if (try_punch_only)
3727     return DB_SUCCESS;
3728 
3729   for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3730   {
3731     /* Read the original data length from block and
3732     safer to read FIL_PAGE_COMPRESSED_SIZE because it
3733     is not encrypted*/
3734     ulint n_write_bytes= srv_page_size;
3735     if (j || offset)
3736     {
3737       n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3738       const unsigned ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3739       /* Ignore the empty page */
3740       if (ptype == 0 && n_write_bytes == 0)
3741         continue;
3742       if (full_crc32)
3743         n_write_bytes= buf_page_full_crc32_size(writeptr + j,
3744                                                 nullptr, nullptr);
3745       else
3746       {
3747         n_write_bytes+= ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3748           ? FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN
3749           : FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
3750       }
3751     }
3752 
3753     if (dberr_t err= os_file_write(IORequestWrite, iter.filepath, iter.file,
3754                                    writeptr + j, offset + j, n_write_bytes))
3755       return err;
3756   }
3757 
3758   return DB_SUCCESS;
3759 }
3760 
run(const fil_iterator_t & iter,buf_block_t * block)3761 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3762                                  buf_block_t* block) UNIV_NOTHROW
3763 {
3764   const unsigned zip_size= fil_space_t::zip_size(m_space_flags);
3765   const unsigned size= zip_size ? zip_size : unsigned(srv_page_size);
3766   byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3767   const bool full_crc32 = fil_space_t::full_crc32(m_space_flags);
3768   bool skip_checksum_check = false;
3769   ut_ad(!srv_read_only_mode);
3770 
3771   if (!page_compress_buf)
3772     return DB_OUT_OF_MEMORY;
3773 
3774   const bool encrypted= iter.crypt_data != NULL &&
3775     iter.crypt_data->should_encrypt();
3776   byte* const readptr= iter.io_buffer;
3777   block->frame= readptr;
3778 
3779   if (block->page.zip.data)
3780     block->page.zip.data= readptr;
3781 
3782   bool page_compressed= false;
3783 
3784   dberr_t err= os_file_read_no_error_handling(
3785     IORequestReadPartial, iter.file, readptr, 3 * size, size, 0);
3786   if (err != DB_SUCCESS)
3787   {
3788     ib::error() << iter.filepath << ": os_file_read() failed";
3789     goto func_exit;
3790   }
3791 
3792   if (page_get_page_no(readptr) != 3)
3793   {
3794 page_corrupted:
3795     ib::warn() << filename() << ": Page 3 at offset "
3796                << 3 * size << " looks corrupted.";
3797     err= DB_CORRUPTION;
3798     goto func_exit;
3799   }
3800 
3801   block->page.id_.set_page_no(3);
3802   if (full_crc32 && fil_space_t::is_compressed(m_space_flags))
3803     page_compressed= buf_page_is_compressed(readptr, m_space_flags);
3804   else
3805   {
3806     switch (fil_page_get_type(readptr)) {
3807     case FIL_PAGE_PAGE_COMPRESSED:
3808     case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3809       if (block->page.zip.data)
3810         goto page_corrupted;
3811       page_compressed= true;
3812     }
3813   }
3814 
3815   if (encrypted)
3816   {
3817     if (!buf_page_verify_crypt_checksum(readptr, m_space_flags))
3818       goto page_corrupted;
3819 
3820     if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3821         buf_page_get_key_version(readptr, m_space_flags))
3822       goto page_corrupted;
3823 
3824     if ((err= fil_space_decrypt(get_space_id(), iter.crypt_data, readptr, size,
3825                                 m_space_flags, readptr)))
3826       goto func_exit;
3827   }
3828 
3829   /* For full_crc32 format, skip checksum check
3830   after decryption. */
3831   skip_checksum_check= full_crc32 && encrypted;
3832 
3833   if (page_compressed)
3834   {
3835     ulint compress_length= fil_page_decompress(page_compress_buf,
3836                                                readptr,
3837                                                m_space_flags);
3838     ut_ad(compress_length != srv_page_size);
3839     if (compress_length == 0)
3840       goto page_corrupted;
3841   }
3842   else if (!skip_checksum_check
3843            && buf_page_is_corrupted(false, readptr, m_space_flags))
3844     goto page_corrupted;
3845 
3846   err= this->operator()(block);
3847 func_exit:
3848   free(page_compress_buf);
3849   return err;
3850 }
3851 
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3852 static dberr_t fil_iterate(
3853 	const fil_iterator_t&	iter,
3854 	buf_block_t*		block,
3855 	AbstractCallback&	callback)
3856 {
3857 	os_offset_t		offset;
3858 	const ulint		size = callback.physical_size();
3859 	ulint			n_bytes = iter.n_io_buffers * size;
3860 
3861 	byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
3862 	ut_ad(!srv_read_only_mode);
3863 
3864 	if (!page_compress_buf) {
3865 		return DB_OUT_OF_MEMORY;
3866 	}
3867 
3868 	ulint actual_space_id = 0;
3869 	const bool full_crc32 = fil_space_t::full_crc32(
3870 		callback.get_space_flags());
3871 
3872 	/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3873 	copying for non-index pages. Unfortunately, it is
3874 	required by buf_zip_decompress() */
3875 	dberr_t		err = DB_SUCCESS;
3876 	bool		page_compressed = false;
3877 	bool		punch_hole = true;
3878 
3879 	for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3880 		if (callback.is_interrupted()) {
3881 			err = DB_INTERRUPTED;
3882 			goto func_exit;
3883 		}
3884 
3885 		byte*		io_buffer = iter.io_buffer;
3886 		block->frame = io_buffer;
3887 
3888 		if (block->page.zip.data) {
3889 			/* Zip IO is done in the compressed page buffer. */
3890 			io_buffer = block->page.zip.data;
3891 		}
3892 
3893 		/* We have to read the exact number of bytes. Otherwise the
3894 		InnoDB IO functions croak on failed reads. */
3895 
3896 		n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3897 				       iter.end - offset));
3898 
3899 		ut_ad(n_bytes > 0);
3900 		ut_ad(!(n_bytes % size));
3901 
3902 		const bool encrypted = iter.crypt_data != NULL
3903 			&& iter.crypt_data->should_encrypt();
3904 		/* Use additional crypt io buffer if tablespace is encrypted */
3905 		byte* const readptr = encrypted
3906 			? iter.crypt_io_buffer : io_buffer;
3907 		byte* const writeptr = readptr;
3908 
3909 		err = os_file_read_no_error_handling(
3910 			IORequestReadPartial,
3911 			iter.file, readptr, offset, n_bytes, 0);
3912 		if (err != DB_SUCCESS) {
3913 			ib::error() << iter.filepath
3914 				    << ": os_file_read() failed";
3915 			goto func_exit;
3916 		}
3917 
3918 		bool		updated = false;
3919 		os_offset_t	page_off = offset;
3920 		ulint		n_pages_read = n_bytes / size;
3921 		/* This block is not attached to buf_pool */
3922 		block->page.id_.set_page_no(uint32_t(page_off / size));
3923 
3924 		for (ulint i = 0; i < n_pages_read;
3925 		     ++block->page.id_,
3926 		     ++i, page_off += size, block->frame += size) {
3927 			byte*	src = readptr + i * size;
3928 			const ulint page_no = page_get_page_no(src);
3929 			if (!page_no && block->page.id().page_no()) {
3930 				if (!buf_is_zeroes(span<const byte>(src,
3931 								    size))) {
3932 					goto page_corrupted;
3933 				}
3934 				/* Proceed to the next page,
3935 				because this one is all zero. */
3936 				continue;
3937 			}
3938 
3939 			if (page_no != block->page.id().page_no()) {
3940 page_corrupted:
3941 				ib::warn() << callback.filename()
3942 					   << ": Page " << (offset / size)
3943 					   << " at offset " << offset
3944 					   << " looks corrupted.";
3945 				err = DB_CORRUPTION;
3946 				goto func_exit;
3947 			}
3948 
3949 			if (block->page.id().page_no() == 0) {
3950 				actual_space_id = mach_read_from_4(
3951 					src + FIL_PAGE_SPACE_ID);
3952 			}
3953 
3954 			const uint16_t type = fil_page_get_type(src);
3955 			page_compressed =
3956 				(full_crc32
3957 				 && fil_space_t::is_compressed(
3958 					callback.get_space_flags())
3959 				 && buf_page_is_compressed(
3960 					src, callback.get_space_flags()))
3961 				|| type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
3962 				|| type == FIL_PAGE_PAGE_COMPRESSED;
3963 
3964 			if (page_compressed && block->page.zip.data) {
3965 				goto page_corrupted;
3966 			}
3967 
3968 			bool decrypted = false;
3969 			byte* dst = io_buffer + i * size;
3970 			bool frame_changed = false;
3971 			uint key_version = buf_page_get_key_version(
3972 				src, callback.get_space_flags());
3973 
3974 			if (!encrypted) {
3975 			} else if (!key_version) {
3976 				if (block->page.id().page_no() == 0
3977 				    && block->page.zip.data) {
3978 					block->page.zip.data = src;
3979 					frame_changed = true;
3980 				} else if (!page_compressed
3981 					   && !block->page.zip.data) {
3982 					block->frame = src;
3983 					frame_changed = true;
3984 				} else {
3985 					ut_ad(dst != src);
3986 					memcpy(dst, src, size);
3987 				}
3988 			} else {
3989 				if (!buf_page_verify_crypt_checksum(
3990 					src, callback.get_space_flags())) {
3991 					goto page_corrupted;
3992 				}
3993 
3994 				if ((err = fil_space_decrypt(
3995 					actual_space_id,
3996 					iter.crypt_data, dst,
3997 					callback.physical_size(),
3998 					callback.get_space_flags(),
3999 					src))) {
4000 					goto func_exit;
4001 				}
4002 
4003 				decrypted = true;
4004 				updated = true;
4005 			}
4006 
4007 			/* For full_crc32 format, skip checksum check
4008 			after decryption. */
4009 			bool skip_checksum_check = full_crc32 && encrypted;
4010 
4011 			/* If the original page is page_compressed, we need
4012 			to decompress it before adjusting further. */
4013 			if (page_compressed) {
4014 				ulint compress_length = fil_page_decompress(
4015 					page_compress_buf, dst,
4016 					callback.get_space_flags());
4017 				ut_ad(compress_length != srv_page_size);
4018 				if (compress_length == 0) {
4019 					goto page_corrupted;
4020 				}
4021 				updated = true;
4022 			} else if (!skip_checksum_check
4023 				   && buf_page_is_corrupted(
4024 					   false,
4025 					   encrypted && !frame_changed
4026 					   ? dst : src,
4027 					   callback.get_space_flags())) {
4028 				goto page_corrupted;
4029 			}
4030 
4031 			if ((err = callback(block)) != DB_SUCCESS) {
4032 				goto func_exit;
4033 			} else if (!updated) {
4034 				updated = block->page.state()
4035 					== BUF_BLOCK_FILE_PAGE;
4036 			}
4037 
4038 			/* If tablespace is encrypted we use additional
4039 			temporary scratch area where pages are read
4040 			for decrypting readptr == crypt_io_buffer != io_buffer.
4041 
4042 			Destination for decryption is a buffer pool block
4043 			block->frame == dst == io_buffer that is updated.
4044 			Pages that did not require decryption even when
4045 			tablespace is marked as encrypted are not copied
4046 			instead block->frame is set to src == readptr.
4047 
4048 			For encryption we again use temporary scratch area
4049 			writeptr != io_buffer == dst
4050 			that is then written to the tablespace
4051 
4052 			(1) For normal tables io_buffer == dst == writeptr
4053 			(2) For only page compressed tables
4054 			io_buffer == dst == writeptr
4055 			(3) For encrypted (and page compressed)
4056 			readptr != io_buffer == dst != writeptr
4057 			*/
4058 
4059 			ut_ad(!encrypted && !page_compressed ?
4060 			      src == dst && dst == writeptr + (i * size):1);
4061 			ut_ad(page_compressed && !encrypted ?
4062 			      src == dst && dst == writeptr + (i * size):1);
4063 			ut_ad(encrypted ?
4064 			      src != dst && dst != writeptr + (i * size):1);
4065 
4066 			/* When tablespace is encrypted or compressed its
4067 			first page (i.e. page 0) is not encrypted or
4068 			compressed and there is no need to copy frame. */
4069 			if (encrypted && block->page.id().page_no() != 0) {
4070 				byte *local_frame = callback.get_frame(block);
4071 				ut_ad((writeptr + (i * size)) != local_frame);
4072 				memcpy((writeptr + (i * size)), local_frame, size);
4073 			}
4074 
4075 			if (frame_changed) {
4076 				if (block->page.zip.data) {
4077 					block->page.zip.data = dst;
4078 				} else {
4079 					block->frame = dst;
4080 				}
4081 			}
4082 
4083 			src =  io_buffer + (i * size);
4084 
4085 			if (page_compressed) {
4086 				updated = true;
4087 				if (ulint len = fil_page_compress(
4088 					    src,
4089 					    page_compress_buf,
4090 					    callback.get_space_flags(),
4091 					    512,/* FIXME: proper block size */
4092 					    encrypted)) {
4093 					/* FIXME: remove memcpy() */
4094 					memcpy(src, page_compress_buf, len);
4095 					memset(src + len, 0,
4096 					       srv_page_size - len);
4097 				}
4098 			}
4099 
4100 			/* Encrypt the page if encryption was used. */
4101 			if (encrypted && decrypted) {
4102 				byte *dest = writeptr + i * size;
4103 
4104 				byte* tmp = fil_encrypt_buf(
4105 					iter.crypt_data,
4106 					block->page.id().space(),
4107 					block->page.id().page_no(),
4108 					src, block->zip_size(), dest,
4109 					full_crc32);
4110 
4111 				if (tmp == src) {
4112 					/* TODO: remove unnecessary memcpy's */
4113 					ut_ad(dest != src);
4114 					memcpy(dest, src, size);
4115 				}
4116 
4117 				updated = true;
4118 			}
4119 
4120 			/* Write checksum for the compressed full crc32 page.*/
4121 			if (full_crc32 && page_compressed) {
4122 				ut_ad(updated);
4123 				byte* dest = writeptr + i * size;
4124 				ut_d(bool comp = false);
4125 				ut_d(bool corrupt = false);
4126 				ulint size = buf_page_full_crc32_size(
4127 					dest,
4128 #ifdef UNIV_DEBUG
4129 					&comp, &corrupt
4130 #else
4131 					NULL, NULL
4132 #endif
4133 				);
4134 				ut_ad(!comp == (size == srv_page_size));
4135 				ut_ad(!corrupt);
4136 				mach_write_to_4(dest + (size - 4),
4137 						ut_crc32(dest, size - 4));
4138 			}
4139 		}
4140 
4141 		if (page_compressed && punch_hole) {
4142 			err = fil_import_compress_fwrite(
4143 				iter, full_crc32, offset, writeptr, n_bytes,
4144 				!updated);
4145 
4146 			if (err != DB_SUCCESS) {
4147 				punch_hole = false;
4148 				if (updated) {
4149 					goto normal_write;
4150 				}
4151 			}
4152 		} else if (updated) {
4153 normal_write:
4154 			/* A page was updated in the set, write it back. */
4155 			err = os_file_write(IORequestWrite,
4156 					    iter.filepath, iter.file,
4157 					    writeptr, offset, n_bytes);
4158 
4159 			if (err != DB_SUCCESS) {
4160 				goto func_exit;
4161 			}
4162 		}
4163 	}
4164 
4165 func_exit:
4166 	free(page_compress_buf);
4167 	return err;
4168 }
4169 
4170 /********************************************************************//**
4171 Iterate over all the pages in the tablespace.
4172 @param table - the table definiton in the server
4173 @param n_io_buffers - number of blocks to read and write together
4174 @param callback - functor that will do the page updates
4175 @return	DB_SUCCESS or error code */
4176 static
4177 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)4178 fil_tablespace_iterate(
4179 /*===================*/
4180 	dict_table_t*		table,
4181 	ulint			n_io_buffers,
4182 	AbstractCallback&	callback)
4183 {
4184 	dberr_t		err;
4185 	pfs_os_file_t	file;
4186 	char*		filepath;
4187 
4188 	ut_a(n_io_buffers > 0);
4189 	ut_ad(!srv_read_only_mode);
4190 
4191 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
4192 			return(DB_CORRUPTION););
4193 
4194 	/* Make sure the data_dir_path is set. */
4195 	dict_get_and_save_data_dir_path(table, false);
4196 
4197 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4198 		ut_a(table->data_dir_path);
4199 
4200 		filepath = fil_make_filepath(
4201 			table->data_dir_path, table->name.m_name, IBD, true);
4202 	} else {
4203 		filepath = fil_make_filepath(
4204 			NULL, table->name.m_name, IBD, false);
4205 	}
4206 
4207 	if (!filepath) {
4208 		return(DB_OUT_OF_MEMORY);
4209 	} else {
4210 		bool	success;
4211 
4212 		file = os_file_create_simple_no_error_handling(
4213 			innodb_data_file_key, filepath,
4214 			OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
4215 
4216 		if (!success) {
4217 			/* The following call prints an error message */
4218 			os_file_get_last_error(true);
4219 			ib::error() << "Trying to import a tablespace,"
4220 				" but could not open the tablespace file "
4221 				    << filepath;
4222 			ut_free(filepath);
4223 			return DB_TABLESPACE_NOT_FOUND;
4224 		} else {
4225 			err = DB_SUCCESS;
4226 		}
4227 	}
4228 
4229 	callback.set_file(filepath, file);
4230 
4231 	os_offset_t	file_size = os_file_get_size(file);
4232 	ut_a(file_size != (os_offset_t) -1);
4233 
4234 	/* Allocate a page to read in the tablespace header, so that we
4235 	can determine the page size and zip_size (if it is compressed).
4236 	We allocate an extra page in case it is a compressed table. */
4237 
4238 	byte*	page = static_cast<byte*>(aligned_malloc(2 * srv_page_size,
4239 							 srv_page_size));
4240 
4241 	buf_block_t* block = reinterpret_cast<buf_block_t*>
4242 		(ut_zalloc_nokey(sizeof *block));
4243 	block->frame = page;
4244         block->page.init(BUF_BLOCK_FILE_PAGE, page_id_t(~0ULL), 1);
4245 
4246 	/* Read the first page and determine the page and zip size. */
4247 
4248 	err = os_file_read_no_error_handling(IORequestReadPartial,
4249 					     file, page, 0, srv_page_size, 0);
4250 
4251 	if (err == DB_SUCCESS) {
4252 		err = callback.init(file_size, block);
4253 	}
4254 
4255 	if (err == DB_SUCCESS) {
4256 		block->page.id_ = page_id_t(callback.get_space_id(), 0);
4257 		if (ulint zip_size = callback.get_zip_size()) {
4258 			page_zip_set_size(&block->page.zip, zip_size);
4259 			/* ROW_FORMAT=COMPRESSED is not optimised for block IO
4260 			for now. We do the IMPORT page by page. */
4261 			n_io_buffers = 1;
4262 		}
4263 
4264 		fil_iterator_t	iter;
4265 
4266 		/* read (optional) crypt data */
4267 		iter.crypt_data = fil_space_read_crypt_data(
4268 			callback.get_zip_size(), page);
4269 
4270 		/* If tablespace is encrypted, it needs extra buffers */
4271 		if (iter.crypt_data && n_io_buffers > 1) {
4272 			/* decrease io buffers so that memory
4273 			consumption will not double */
4274 			n_io_buffers /= 2;
4275 		}
4276 
4277 		iter.file = file;
4278 		iter.start = 0;
4279 		iter.end = file_size;
4280 		iter.filepath = filepath;
4281 		iter.file_size = file_size;
4282 		iter.n_io_buffers = n_io_buffers;
4283 
4284 		/* Add an extra page for compressed page scratch area. */
4285 		iter.io_buffer = static_cast<byte*>(
4286 			aligned_malloc((1 + iter.n_io_buffers)
4287 				       << srv_page_size_shift, srv_page_size));
4288 
4289 		iter.crypt_io_buffer = iter.crypt_data
4290 			? static_cast<byte*>(
4291 				aligned_malloc((1 + iter.n_io_buffers)
4292 					       << srv_page_size_shift,
4293 					       srv_page_size))
4294 			: NULL;
4295 
4296 		if (block->page.zip.ssize) {
4297 			ut_ad(iter.n_io_buffers == 1);
4298 			block->frame = iter.io_buffer;
4299 			block->page.zip.data = block->frame + srv_page_size;
4300 		}
4301 
4302 		err = callback.run(iter, block);
4303 
4304 		if (iter.crypt_data) {
4305 			fil_space_destroy_crypt_data(&iter.crypt_data);
4306 		}
4307 
4308 		aligned_free(iter.crypt_io_buffer);
4309 		aligned_free(iter.io_buffer);
4310 	}
4311 
4312 	if (err == DB_SUCCESS) {
4313 		ib::info() << "Sync to disk";
4314 
4315 		if (!os_file_flush(file)) {
4316 			ib::info() << "os_file_flush() failed!";
4317 			err = DB_IO_ERROR;
4318 		} else {
4319 			ib::info() << "Sync to disk - done!";
4320 		}
4321 	}
4322 
4323 	os_file_close(file);
4324 
4325 	aligned_free(page);
4326 	ut_free(filepath);
4327 	ut_free(block);
4328 
4329 	return(err);
4330 }
4331 
4332 /*****************************************************************//**
4333 Imports a tablespace. The space id in the .ibd file must match the space id
4334 of the table in the data dictionary.
4335 @return error code or DB_SUCCESS */
4336 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)4337 row_import_for_mysql(
4338 /*=================*/
4339 	dict_table_t*	table,		/*!< in/out: table */
4340 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL */
4341 {
4342 	dberr_t		err;
4343 	trx_t*		trx;
4344 	ib_uint64_t	autoinc = 0;
4345 	char*		filepath = NULL;
4346 
4347 	/* The caller assured that this is not read_only_mode and that no
4348 	temorary tablespace is being imported. */
4349 	ut_ad(!srv_read_only_mode);
4350 	ut_ad(!table->is_temporary());
4351 
4352 	ut_ad(table->space_id);
4353 	ut_ad(table->space_id < SRV_SPACE_ID_UPPER_BOUND);
4354 	ut_ad(prebuilt->trx);
4355 	ut_ad(!table->is_readable());
4356 
4357 	ibuf_delete_for_discarded_space(table->space_id);
4358 
4359 	trx_start_if_not_started(prebuilt->trx, true);
4360 
4361 	trx = trx_create();
4362 
4363 	/* So that the table is not DROPped during recovery. */
4364 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4365 
4366 	trx_start_if_not_started(trx, true);
4367 
4368 	/* So that we can send error messages to the user. */
4369 	trx->mysql_thd = prebuilt->trx->mysql_thd;
4370 
4371 	/* Ensure that the table will be dropped by trx_rollback_active()
4372 	in case of a crash. */
4373 
4374 	trx->table_id = table->id;
4375 
4376 	/* Assign an undo segment for the transaction, so that the
4377 	transaction will be recovered after a crash. */
4378 
4379 	/* TODO: Do not write any undo log for the IMPORT cleanup. */
4380 	{
4381 		mtr_t mtr;
4382 		mtr.start();
4383 		trx_undo_assign(trx, &err, &mtr);
4384 		mtr.commit();
4385 	}
4386 
4387 	DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4388 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4389 
4390 	if (err != DB_SUCCESS) {
4391 
4392 		return(row_import_cleanup(prebuilt, trx, err));
4393 
4394 	} else if (trx->rsegs.m_redo.undo == 0) {
4395 
4396 		err = DB_TOO_MANY_CONCURRENT_TRXS;
4397 		return(row_import_cleanup(prebuilt, trx, err));
4398 	}
4399 
4400 	prebuilt->trx->op_info = "read meta-data file";
4401 
4402 	/* Prevent DDL operations while we are checking. */
4403 
4404 	rw_lock_s_lock(&dict_sys.latch);
4405 
4406 	row_import	cfg;
4407 
4408 	err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4409 
4410 	/* Check if the table column definitions match the contents
4411 	of the config file. */
4412 
4413 	if (err == DB_SUCCESS) {
4414 
4415 		if (dberr_t err = handle_instant_metadata(table, cfg)) {
4416 			rw_lock_s_unlock(&dict_sys.latch);
4417 			return row_import_error(prebuilt, trx, err);
4418 		}
4419 
4420 		/* We have a schema file, try and match it with our
4421 		data dictionary. */
4422 
4423 		err = cfg.match_schema(trx->mysql_thd);
4424 
4425 		/* Update index->page and SYS_INDEXES.PAGE_NO to match the
4426 		B-tree root page numbers in the tablespace. Use the index
4427 		name from the .cfg file to find match. */
4428 
4429 		if (err == DB_SUCCESS) {
4430 			cfg.set_root_by_name();
4431 			autoinc = cfg.m_autoinc;
4432 		}
4433 
4434 		rw_lock_s_unlock(&dict_sys.latch);
4435 
4436 		DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4437 				err = DB_TOO_MANY_CONCURRENT_TRXS;);
4438 
4439 	} else if (cfg.m_missing) {
4440 
4441 		rw_lock_s_unlock(&dict_sys.latch);
4442 
4443 		/* We don't have a schema file, we will have to discover
4444 		the index root pages from the .ibd file and skip the schema
4445 		matching step. */
4446 
4447 		ut_a(err == DB_FAIL);
4448 
4449 		cfg.m_zip_size = 0;
4450 
4451 		if (UT_LIST_GET_LEN(table->indexes) > 1) {
4452 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4453 				ER_INTERNAL_ERROR,
4454 				"Drop all secondary indexes before importing "
4455 				"table %s when .cfg file is missing.",
4456 				table->name.m_name);
4457 			err = DB_ERROR;
4458 			return row_import_error(prebuilt, trx, err);
4459 		}
4460 
4461 		FetchIndexRootPages	fetchIndexRootPages(table, trx);
4462 
4463 		err = fil_tablespace_iterate(
4464 			table, IO_BUFFER_SIZE(srv_page_size),
4465 			fetchIndexRootPages);
4466 
4467 		if (err == DB_SUCCESS) {
4468 
4469 			err = fetchIndexRootPages.build_row_import(&cfg);
4470 
4471 			/* Update index->page and SYS_INDEXES.PAGE_NO
4472 			to match the B-tree root page numbers in the
4473 			tablespace. */
4474 
4475 			if (err == DB_SUCCESS) {
4476 				err = cfg.set_root_by_heuristic();
4477 
4478 				if (err == DB_SUCCESS) {
4479 					if (dberr_t err =
4480 					    handle_instant_metadata(table,
4481 								    cfg)) {
4482 						return row_import_error(
4483 							prebuilt, trx, err);
4484 					}
4485 				}
4486 			}
4487 		}
4488 	} else {
4489 		rw_lock_s_unlock(&dict_sys.latch);
4490 	}
4491 
4492 	if (err != DB_SUCCESS) {
4493 		return(row_import_error(prebuilt, trx, err));
4494 	}
4495 
4496 	prebuilt->trx->op_info = "importing tablespace";
4497 
4498 	ib::info() << "Phase I - Update all pages";
4499 
4500 	/* Iterate over all the pages and do the sanity checking and
4501 	the conversion required to import the tablespace. */
4502 
4503 	PageConverter	converter(&cfg, table->space_id, trx);
4504 
4505 	/* Set the IO buffer size in pages. */
4506 
4507 	err = fil_tablespace_iterate(
4508 		table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size
4509 				      : srv_page_size), converter);
4510 
4511 	DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4512 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4513 #ifdef BTR_CUR_HASH_ADAPT
4514 	/* On DISCARD TABLESPACE, we did not drop any adaptive hash
4515 	index entries. If we replaced the discarded tablespace with a
4516 	smaller one here, there could still be some adaptive hash
4517 	index entries that point to cached garbage pages in the buffer
4518 	pool, because PageConverter::operator() only evicted those
4519 	pages that were replaced by the imported pages. We must
4520 	detach any remaining adaptive hash index entries, because the
4521 	adaptive hash index must be a subset of the table contents;
4522 	false positives are not tolerated. */
4523 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4524 	     index = UT_LIST_GET_NEXT(indexes, index)) {
4525 		index = index->clone_if_needed();
4526 	}
4527 #endif /* BTR_CUR_HASH_ADAPT */
4528 
4529 	if (err != DB_SUCCESS) {
4530 		char	table_name[MAX_FULL_NAME_LEN + 1];
4531 
4532 		innobase_format_name(
4533 			table_name, sizeof(table_name),
4534 			table->name.m_name);
4535 
4536 		if (err != DB_DECRYPTION_FAILED) {
4537 
4538 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4539 				ER_INTERNAL_ERROR,
4540 			"Cannot reset LSNs in table %s : %s",
4541 				table_name, ut_strerr(err));
4542 		}
4543 
4544 		return(row_import_cleanup(prebuilt, trx, err));
4545 	}
4546 
4547 	row_mysql_lock_data_dictionary(trx);
4548 
4549 	/* If the table is stored in a remote tablespace, we need to
4550 	determine that filepath from the link file and system tables.
4551 	Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4552 	dict_get_and_save_data_dir_path(table, true);
4553 
4554 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4555 		ut_a(table->data_dir_path);
4556 
4557 		filepath = fil_make_filepath(
4558 			table->data_dir_path, table->name.m_name, IBD, true);
4559 	} else {
4560 		filepath = fil_make_filepath(
4561 			NULL, table->name.m_name, IBD, false);
4562 	}
4563 
4564 	DBUG_EXECUTE_IF(
4565 		"ib_import_OOM_15",
4566 		ut_free(filepath);
4567 		filepath = NULL;
4568 	);
4569 
4570 	if (filepath == NULL) {
4571 		row_mysql_unlock_data_dictionary(trx);
4572 		return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4573 	}
4574 
4575 	/* Open the tablespace so that we can access via the buffer pool.
4576 	We set the 2nd param (fix_dict = true) here because we already
4577 	have an x-lock on dict_sys.latch and dict_sys.mutex.
4578 	The tablespace is initially opened as a temporary one, because
4579 	we will not be writing any redo log for it before we have invoked
4580 	fil_space_t::set_imported() to declare it a persistent tablespace. */
4581 
4582 	ulint	fsp_flags = dict_tf_to_fsp_flags(table->flags);
4583 
4584 	table->space = fil_ibd_open(
4585 		true, true, FIL_TYPE_IMPORT, table->space_id,
4586 		fsp_flags, table->name, filepath, &err);
4587 
4588 	ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4589 	DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4590 			err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4591 
4592 	if (!table->space) {
4593 		row_mysql_unlock_data_dictionary(trx);
4594 
4595 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4596 			ER_GET_ERRMSG,
4597 			err, ut_strerr(err), filepath);
4598 
4599 		ut_free(filepath);
4600 
4601 		return(row_import_cleanup(prebuilt, trx, err));
4602 	}
4603 
4604 	row_mysql_unlock_data_dictionary(trx);
4605 
4606 	ut_free(filepath);
4607 
4608 	err = ibuf_check_bitmap_on_import(trx, table->space);
4609 
4610 	DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4611 
4612 	if (err != DB_SUCCESS) {
4613 		return(row_import_cleanup(prebuilt, trx, err));
4614 	}
4615 
4616 	/* The first index must always be the clustered index. */
4617 
4618 	dict_index_t*	index = dict_table_get_first_index(table);
4619 
4620 	if (!dict_index_is_clust(index)) {
4621 		return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4622 	}
4623 
4624 	/* Update the Btree segment headers for index node and
4625 	leaf nodes in the root page. Set the new space id. */
4626 
4627 	err = btr_root_adjust_on_import(index);
4628 
4629 	DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4630 			err = DB_CORRUPTION;);
4631 
4632 	if (err != DB_SUCCESS) {
4633 		return(row_import_error(prebuilt, trx, err));
4634 	} else if (cfg.requires_purge(index->name)) {
4635 
4636 		/* Purge any delete-marked records that couldn't be
4637 		purged during the page conversion phase from the
4638 		cluster index. */
4639 
4640 		IndexPurge	purge(trx, index);
4641 
4642 		trx->op_info = "cluster: purging delete marked records";
4643 
4644 		err = purge.garbage_collect();
4645 
4646 		trx->op_info = "";
4647 	}
4648 
4649 	DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4650 
4651 	if (err != DB_SUCCESS) {
4652 		return(row_import_error(prebuilt, trx, err));
4653 	}
4654 
4655 	/* For secondary indexes, purge any records that couldn't be purged
4656 	during the page conversion phase. */
4657 
4658 	err = row_import_adjust_root_pages_of_secondary_indexes(
4659 		trx, table, cfg);
4660 
4661 	DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4662 			err = DB_CORRUPTION;);
4663 
4664 	if (err != DB_SUCCESS) {
4665 		return(row_import_error(prebuilt, trx, err));
4666 	}
4667 
4668 	/* Ensure that the next available DB_ROW_ID is not smaller than
4669 	any DB_ROW_ID stored in the table. */
4670 
4671 	if (prebuilt->clust_index_was_generated) {
4672 		row_import_set_sys_max_row_id(prebuilt, table);
4673 	}
4674 
4675 	ib::info() << "Phase III - Flush changes to disk";
4676 
4677 	/* Ensure that all pages dirtied during the IMPORT make it to disk.
4678 	The only dirty pages generated should be from the pessimistic purge
4679 	of delete marked records that couldn't be purged in Phase I. */
4680 	while (buf_flush_list_space(prebuilt->table->space));
4681 
4682 	for (ulint count = 0; prebuilt->table->space->referenced(); count++) {
4683 		/* Issue a warning every 10.24 seconds, starting after
4684 		2.56 seconds */
4685 		if ((count & 511) == 128) {
4686 			ib::warn() << "Waiting for flush to complete on "
4687 				   << prebuilt->table->name;
4688 		}
4689 		os_thread_sleep(20000);
4690 	}
4691 
4692 	ib::info() << "Phase IV - Flush complete";
4693 	prebuilt->table->space->set_imported();
4694 
4695 	/* The dictionary latches will be released in in row_import_cleanup()
4696 	after the transaction commit, for both success and error. */
4697 
4698 	row_mysql_lock_data_dictionary(trx);
4699 
4700 	/* Update the root pages of the table's indexes. */
4701 	err = row_import_update_index_root(trx, table, false);
4702 
4703 	if (err != DB_SUCCESS) {
4704 		return(row_import_error(prebuilt, trx, err));
4705 	}
4706 
4707 	err = row_import_update_discarded_flag(trx, table->id, false);
4708 
4709 	if (err != DB_SUCCESS) {
4710 		return(row_import_error(prebuilt, trx, err));
4711 	}
4712 
4713 	table->file_unreadable = false;
4714 	table->flags2 &= ~DICT_TF2_DISCARDED & ((1U << DICT_TF2_BITS) - 1);
4715 
4716 	/* Set autoinc value read from .cfg file, if one was specified.
4717 	Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4718 	if (autoinc) {
4719 		ib::info() << table->name << " autoinc value set to "
4720 			<< autoinc;
4721 
4722 		table->autoinc = autoinc--;
4723 		btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4724 	}
4725 
4726 	return(row_import_cleanup(prebuilt, trx, err));
4727 }
4728