1 /*****************************************************************************
2 
3 Copyright (c) 2012, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0import.cc
22 Import a tablespace to a running instance.
23 
24 Created 2012-02-08 by Sunny Bains.
25 *******************************************************/
26 
27 #include "row0import.h"
28 #include "btr0pcur.h"
29 #ifdef BTR_CUR_HASH_ADAPT
30 # include "btr0sea.h"
31 #endif
32 #include "que0que.h"
33 #include "dict0boot.h"
34 #include "dict0load.h"
35 #include "ibuf0ibuf.h"
36 #include "pars0pars.h"
37 #include "row0sel.h"
38 #include "row0mysql.h"
39 #include "srv0start.h"
40 #include "row0quiesce.h"
41 #include "fil0pagecompress.h"
42 #include "trx0undo.h"
43 #ifdef HAVE_LZO
44 #include "lzo/lzo1x.h"
45 #endif
46 #ifdef HAVE_SNAPPY
47 #include "snappy-c.h"
48 #endif
49 
50 #include <vector>
51 
52 #ifdef HAVE_MY_AES_H
53 #include <my_aes.h>
54 #endif
55 
56 /** The size of the buffer to use for IO.
57 @param n physical page size
58 @return number of pages */
59 #define IO_BUFFER_SIZE(n)	((1024 * 1024) / n)
60 
61 /** For gathering stats on records during phase I */
62 struct row_stats_t {
63 	ulint		m_n_deleted;		/*!< Number of deleted records
64 						found in the index */
65 
66 	ulint		m_n_purged;		/*!< Number of records purged
67 						optimisatically */
68 
69 	ulint		m_n_rows;		/*!< Number of rows */
70 
71 	ulint		m_n_purge_failed;	/*!< Number of deleted rows
72 						that could not be purged */
73 };
74 
75 /** Index information required by IMPORT. */
76 struct row_index_t {
77 	index_id_t	m_id;			/*!< Index id of the table
78 						in the exporting server */
79 	byte*		m_name;			/*!< Index name */
80 
81 	ulint		m_space;		/*!< Space where it is placed */
82 
83 	ulint		m_page_no;		/*!< Root page number */
84 
85 	ulint		m_type;			/*!< Index type */
86 
87 	ulint		m_trx_id_offset;	/*!< Relevant only for clustered
88 						indexes, offset of transaction
89 						id system column */
90 
91 	ulint		m_n_user_defined_cols;	/*!< User defined columns */
92 
93 	ulint		m_n_uniq;		/*!< Number of columns that can
94 						uniquely identify the row */
95 
96 	ulint		m_n_nullable;		/*!< Number of nullable
97 						columns */
98 
99 	ulint		m_n_fields;		/*!< Total number of fields */
100 
101 	dict_field_t*	m_fields;		/*!< Index fields */
102 
103 	const dict_index_t*
104 			m_srv_index;		/*!< Index instance in the
105 						importing server */
106 
107 	row_stats_t	m_stats;		/*!< Statistics gathered during
108 						the import phase */
109 
110 };
111 
112 /** Meta data required by IMPORT. */
113 struct row_import {
row_importrow_import114 	row_import() UNIV_NOTHROW
115 		:
116 		m_table(NULL),
117 		m_version(0),
118 		m_hostname(NULL),
119 		m_table_name(NULL),
120 		m_autoinc(0),
121 		m_page_size(0, 0, false),
122 		m_flags(0),
123 		m_n_cols(0),
124 		m_cols(NULL),
125 		m_col_names(NULL),
126 		m_n_indexes(0),
127 		m_indexes(NULL),
128 		m_missing(true) { }
129 
130 	~row_import() UNIV_NOTHROW;
131 
132 	/** Find the index entry in in the indexes array.
133 	@param name index name
134 	@return instance if found else 0. */
135 	row_index_t* get_index(const char* name) const UNIV_NOTHROW;
136 
137 	/** Get the number of rows in the index.
138 	@param name index name
139 	@return number of rows (doesn't include delete marked rows). */
140 	ulint	get_n_rows(const char* name) const UNIV_NOTHROW;
141 
142 	/** Find the ordinal value of the column name in the cfg table columns.
143 	@param name of column to look for.
144 	@return ULINT_UNDEFINED if not found. */
145 	ulint find_col(const char* name) const UNIV_NOTHROW;
146 
147 	/** Get the number of rows for which purge failed during the
148 	convert phase.
149 	@param name index name
150 	@return number of rows for which purge failed. */
151 	ulint get_n_purge_failed(const char* name) const UNIV_NOTHROW;
152 
153 	/** Check if the index is clean. ie. no delete-marked records
154 	@param name index name
155 	@return true if index needs to be purged. */
requires_purgerow_import156 	bool requires_purge(const char* name) const UNIV_NOTHROW
157 	{
158 		return(get_n_purge_failed(name) > 0);
159 	}
160 
161 	/** Set the index root <space, pageno> using the index name */
162 	void set_root_by_name() UNIV_NOTHROW;
163 
164 	/** Set the index root <space, pageno> using a heuristic
165 	@return DB_SUCCESS or error code */
166 	dberr_t set_root_by_heuristic() UNIV_NOTHROW;
167 
168 	/** Check if the index schema that was read from the .cfg file
169 	matches the in memory index definition.
170 	Note: It will update row_import_t::m_srv_index to map the meta-data
171 	read from the .cfg file to the server index instance.
172 	@return DB_SUCCESS or error code. */
173 	dberr_t match_index_columns(
174 		THD*			thd,
175 		const dict_index_t*	index) UNIV_NOTHROW;
176 
177 	/** Check if the table schema that was read from the .cfg file
178 	matches the in memory table definition.
179 	@param thd MySQL session variable
180 	@return DB_SUCCESS or error code. */
181 	dberr_t match_table_columns(
182 		THD*			thd) UNIV_NOTHROW;
183 
184 	/** Check if the table (and index) schema that was read from the
185 	.cfg file matches the in memory table definition.
186 	@param thd MySQL session variable
187 	@return DB_SUCCESS or error code. */
188 	dberr_t match_schema(
189 		THD*			thd) UNIV_NOTHROW;
190 
191 	dict_table_t*	m_table;		/*!< Table instance */
192 
193 	ulint		m_version;		/*!< Version of config file */
194 
195 	byte*		m_hostname;		/*!< Hostname where the
196 						tablespace was exported */
197 	byte*		m_table_name;		/*!< Exporting instance table
198 						name */
199 
200 	ib_uint64_t	m_autoinc;		/*!< Next autoinc value */
201 
202 	page_size_t	m_page_size;		/*!< Tablespace page size */
203 
204 	ulint		m_flags;		/*!< Table flags */
205 
206 	ulint		m_n_cols;		/*!< Number of columns in the
207 						meta-data file */
208 
209 	dict_col_t*	m_cols;			/*!< Column data */
210 
211 	byte**		m_col_names;		/*!< Column names, we store the
212 						column naems separately becuase
213 						there is no field to store the
214 						value in dict_col_t */
215 
216 	ulint		m_n_indexes;		/*!< Number of indexes,
217 						including clustered index */
218 
219 	row_index_t*	m_indexes;		/*!< Index meta data */
220 
221 	bool		m_missing;		/*!< true if a .cfg file was
222 						found and was readable */
223 };
224 
225 struct fil_iterator_t {
226 	pfs_os_file_t	file;			/*!< File handle */
227 	const char*	filepath;		/*!< File path name */
228 	os_offset_t	start;			/*!< From where to start */
229 	os_offset_t	end;			/*!< Where to stop */
230 	os_offset_t	file_size;		/*!< File size in bytes */
231 	ulint		n_io_buffers;		/*!< Number of pages to use
232 						for IO */
233 	byte*		io_buffer;		/*!< Buffer to use for IO */
234 	fil_space_crypt_t *crypt_data;		/*!< Crypt data (if encrypted) */
235 	byte*           crypt_io_buffer;        /*!< IO buffer when encrypted */
236 };
237 
238 /** Use the page cursor to iterate over records in a block. */
239 class RecIterator {
240 public:
241 	/** Default constructor */
RecIterator()242 	RecIterator() UNIV_NOTHROW
243 	{
244 		memset(&m_cur, 0x0, sizeof(m_cur));
245 	}
246 
247 	/** Position the cursor on the first user record. */
open(buf_block_t * block)248 	void	open(buf_block_t* block) UNIV_NOTHROW
249 	{
250 		page_cur_set_before_first(block, &m_cur);
251 
252 		if (!end()) {
253 			next();
254 		}
255 	}
256 
257 	/** Move to the next record. */
next()258 	void	next() UNIV_NOTHROW
259 	{
260 		page_cur_move_to_next(&m_cur);
261 	}
262 
263 	/**
264 	@return the current record */
current()265 	rec_t*	current() UNIV_NOTHROW
266 	{
267 		ut_ad(!end());
268 		return(page_cur_get_rec(&m_cur));
269 	}
270 
271 	/**
272 	@return true if cursor is at the end */
end()273 	bool	end() UNIV_NOTHROW
274 	{
275 		return(page_cur_is_after_last(&m_cur) == TRUE);
276 	}
277 
278 	/** Remove the current record
279 	@return true on success */
remove(const dict_index_t * index,page_zip_des_t * page_zip,rec_offs * offsets)280 	bool remove(
281 		const dict_index_t*	index,
282 		page_zip_des_t*		page_zip,
283 		rec_offs*		offsets) UNIV_NOTHROW
284 	{
285 		/* We can't end up with an empty page unless it is root. */
286 		if (page_get_n_recs(m_cur.block->frame) <= 1) {
287 			return(false);
288 		}
289 
290 		return(page_delete_rec(index, &m_cur, page_zip, offsets));
291 	}
292 
293 private:
294 	page_cur_t	m_cur;
295 };
296 
297 /** Class that purges delete marked reocords from indexes, both secondary
298 and cluster. It does a pessimistic delete. This should only be done if we
299 couldn't purge the delete marked reocrds during Phase I. */
300 class IndexPurge {
301 public:
302 	/** Constructor
303 	@param trx the user transaction covering the import tablespace
304 	@param index to be imported
305 	@param space_id space id of the tablespace */
IndexPurge(trx_t * trx,dict_index_t * index)306 	IndexPurge(
307 		trx_t*		trx,
308 		dict_index_t*	index) UNIV_NOTHROW
309 		:
310 		m_trx(trx),
311 		m_index(index),
312 		m_n_rows(0)
313 	{
314 		ib::info() << "Phase II - Purge records from index "
315 			<< index->name;
316 	}
317 
318 	/** Descructor */
~IndexPurge()319 	~IndexPurge() UNIV_NOTHROW { }
320 
321 	/** Purge delete marked records.
322 	@return DB_SUCCESS or error code. */
323 	dberr_t	garbage_collect() UNIV_NOTHROW;
324 
325 	/** The number of records that are not delete marked.
326 	@return total records in the index after purge */
get_n_rows() const327 	ulint	get_n_rows() const UNIV_NOTHROW
328 	{
329 		return(m_n_rows);
330 	}
331 
332 private:
333 	/** Begin import, position the cursor on the first record. */
334 	void	open() UNIV_NOTHROW;
335 
336 	/** Close the persistent curosr and commit the mini-transaction. */
337 	void	close() UNIV_NOTHROW;
338 
339 	/** Position the cursor on the next record.
340 	@return DB_SUCCESS or error code */
341 	dberr_t	next() UNIV_NOTHROW;
342 
343 	/** Store the persistent cursor position and reopen the
344 	B-tree cursor in BTR_MODIFY_TREE mode, because the
345 	tree structure may be changed during a pessimistic delete. */
346 	void	purge_pessimistic_delete() UNIV_NOTHROW;
347 
348 	/** Purge delete-marked records.
349 	@param offsets current row offsets. */
350 	void	purge() UNIV_NOTHROW;
351 
352 protected:
353 	// Disable copying
354 	IndexPurge();
355 	IndexPurge(const IndexPurge&);
356 	IndexPurge &operator=(const IndexPurge&);
357 
358 private:
359 	trx_t*			m_trx;		/*!< User transaction */
360 	mtr_t			m_mtr;		/*!< Mini-transaction */
361 	btr_pcur_t		m_pcur;		/*!< Persistent cursor */
362 	dict_index_t*		m_index;	/*!< Index to be processed */
363 	ulint			m_n_rows;	/*!< Records in index */
364 };
365 
366 /** Functor that is called for each physical page that is read from the
367 tablespace file.  */
368 class AbstractCallback
369 {
370 public:
371 	/** Constructor
372 	@param trx covering transaction */
AbstractCallback(trx_t * trx,ulint space_id)373 	AbstractCallback(trx_t* trx, ulint space_id)
374 		:
375 		m_page_size(0, 0, false),
376 		m_trx(trx),
377 		m_space(space_id),
378 		m_xdes(),
379 		m_xdes_page_no(ULINT_UNDEFINED),
380 		m_space_flags(ULINT_UNDEFINED) UNIV_NOTHROW { }
381 
382 	/** Free any extent descriptor instance */
~AbstractCallback()383 	virtual ~AbstractCallback()
384 	{
385 		UT_DELETE_ARRAY(m_xdes);
386 	}
387 
388 	/** Determine the page size to use for traversing the tablespace
389 	@param file_size size of the tablespace file in bytes
390 	@param block contents of the first page in the tablespace file.
391 	@retval DB_SUCCESS or error code. */
392 	virtual dberr_t init(
393 		os_offset_t		file_size,
394 		const buf_block_t*	block) UNIV_NOTHROW;
395 
396 	/** @return true if compressed table. */
is_compressed_table() const397 	bool is_compressed_table() const UNIV_NOTHROW
398 	{
399 		return(get_page_size().is_compressed());
400 	}
401 
402 	/** @return the tablespace flags */
get_space_flags() const403 	ulint get_space_flags() const
404 	{
405 		return(m_space_flags);
406 	}
407 
408 	/**
409 	Set the name of the physical file and the file handle that is used
410 	to open it for the file that is being iterated over.
411 	@param filename the physical name of the tablespace file
412 	@param file OS file handle */
set_file(const char * filename,pfs_os_file_t file)413 	void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
414 	{
415 		m_file = file;
416 		m_filepath = filename;
417 	}
418 
get_page_size() const419 	const page_size_t& get_page_size() const { return m_page_size; }
420 
filename() const421 	const char* filename() const { return m_filepath; }
422 
423 	/**
424 	Called for every page in the tablespace. If the page was not
425 	updated then its state must be set to BUF_PAGE_NOT_USED. For
426 	compressed tables the page descriptor memory will be at offset:
427 		block->frame + srv_page_size;
428 	@param block block read from file, note it is not from the buffer pool
429 	@retval DB_SUCCESS or error code. */
430 	virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
431 
432 	/** @return the tablespace identifier */
get_space_id() const433 	ulint get_space_id() const { return m_space; }
434 
is_interrupted() const435 	bool is_interrupted() const { return trx_is_interrupted(m_trx); }
436 
437 	/**
438 	Get the data page depending on the table type, compressed or not.
439 	@param block - block read from disk
440 	@retval the buffer frame */
get_frame(const buf_block_t * block)441 	static byte* get_frame(const buf_block_t* block)
442 	{
443 		return block->page.zip.data
444 			? block->page.zip.data : block->frame;
445 	}
446 
447 	/** Invoke the functionality for the callback */
448 	virtual dberr_t run(const fil_iterator_t& iter,
449 			    buf_block_t* block) UNIV_NOTHROW = 0;
450 
451 protected:
452 	/** Get the physical offset of the extent descriptor within the page.
453 	@param page_no page number of the extent descriptor
454 	@param page contents of the page containing the extent descriptor.
455 	@return the start of the xdes array in a page */
xdes(ulint page_no,const page_t * page) const456 	const xdes_t* xdes(
457 		ulint		page_no,
458 		const page_t*	page) const UNIV_NOTHROW
459 	{
460 		ulint	offset;
461 
462 		offset = xdes_calc_descriptor_index(get_page_size(), page_no);
463 
464 		return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
465 	}
466 
467 	/** Set the current page directory (xdes). If the extent descriptor is
468 	marked as free then free the current extent descriptor and set it to
469 	0. This implies that all pages that are covered by this extent
470 	descriptor are also freed.
471 
472 	@param page_no offset of page within the file
473 	@param page page contents
474 	@return DB_SUCCESS or error code. */
set_current_xdes(ulint page_no,const page_t * page)475 	dberr_t	set_current_xdes(
476 		ulint		page_no,
477 		const page_t*	page) UNIV_NOTHROW
478 	{
479 		m_xdes_page_no = page_no;
480 
481 		UT_DELETE_ARRAY(m_xdes);
482 		m_xdes = NULL;
483 
484 		ulint		state;
485 		const xdes_t*	xdesc = page + XDES_ARR_OFFSET;
486 
487 		state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
488 
489 		if (state != XDES_FREE) {
490 
491 			m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t,
492 						    m_page_size.physical());
493 
494 			/* Trigger OOM */
495 			DBUG_EXECUTE_IF(
496 				"ib_import_OOM_13",
497 				UT_DELETE_ARRAY(m_xdes);
498 				m_xdes = NULL;
499 			);
500 
501 			if (m_xdes == NULL) {
502 				return(DB_OUT_OF_MEMORY);
503 			}
504 
505 			memcpy(m_xdes, page, m_page_size.physical());
506 		}
507 
508 		return(DB_SUCCESS);
509 	}
510 
511 	/** Check if the page is marked as free in the extent descriptor.
512 	@param page_no page number to check in the extent descriptor.
513 	@return true if the page is marked as free */
is_free(ulint page_no) const514 	bool is_free(ulint page_no) const UNIV_NOTHROW
515 	{
516 		ut_a(xdes_calc_descriptor_page(get_page_size(), page_no)
517 		     == m_xdes_page_no);
518 
519 		if (m_xdes != 0) {
520 			const xdes_t*	xdesc = xdes(page_no, m_xdes);
521 			ulint		pos = page_no % FSP_EXTENT_SIZE;
522 
523 			return(xdes_get_bit(xdesc, XDES_FREE_BIT, pos));
524 		}
525 
526 		/* If the current xdes was free, the page must be free. */
527 		return(true);
528 	}
529 
530 protected:
531 	/** The tablespace page size. */
532 	page_size_t		m_page_size;
533 
534 	/** File handle to the tablespace */
535 	pfs_os_file_t		m_file;
536 
537 	/** Physical file path. */
538 	const char*		m_filepath;
539 
540 	/** Covering transaction. */
541 	trx_t*			m_trx;
542 
543 	/** Space id of the file being iterated over. */
544 	ulint			m_space;
545 
546 	/** Current size of the space in pages */
547 	ulint			m_size;
548 
549 	/** Current extent descriptor page */
550 	xdes_t*			m_xdes;
551 
552 	/** Physical page offset in the file of the extent descriptor */
553 	ulint			m_xdes_page_no;
554 
555 	/** Flags value read from the header page */
556 	ulint			m_space_flags;
557 };
558 
559 /** Determine the page size to use for traversing the tablespace
560 @param file_size size of the tablespace file in bytes
561 @param block contents of the first page in the tablespace file.
562 @retval DB_SUCCESS or error code. */
563 dberr_t
init(os_offset_t file_size,const buf_block_t * block)564 AbstractCallback::init(
565 	os_offset_t		file_size,
566 	const buf_block_t*	block) UNIV_NOTHROW
567 {
568 	const page_t*		page = block->frame;
569 
570 	m_space_flags = fsp_header_get_flags(page);
571 	if (!fsp_flags_is_valid(m_space_flags, true)) {
572 		ulint cflags = fsp_flags_convert_from_101(m_space_flags);
573 		if (cflags == ULINT_UNDEFINED) {
574 			ib::error() << "Invalid FSP_SPACE_FLAGS="
575 				<< ib::hex(m_space_flags);
576 			return(DB_CORRUPTION);
577 		}
578 		m_space_flags = cflags;
579 	}
580 
581 	/* Clear the DATA_DIR flag, which is basically garbage. */
582 	m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
583 	m_page_size.copy_from(page_size_t(m_space_flags));
584 
585 	if (!is_compressed_table() && !m_page_size.equals_to(univ_page_size)) {
586 
587 		ib::error() << "Page size " << m_page_size.physical()
588 			<< " of ibd file is not the same as the server page"
589 			" size " << srv_page_size;
590 
591 		return(DB_CORRUPTION);
592 
593 	} else if (file_size % m_page_size.physical() != 0) {
594 
595 		ib::error() << "File size " << file_size << " is not a"
596 			" multiple of the page size "
597 			<< m_page_size.physical();
598 
599 		return(DB_CORRUPTION);
600 	}
601 
602 	m_size  = mach_read_from_4(page + FSP_SIZE);
603 	if (m_space == ULINT_UNDEFINED) {
604 		m_space = mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_ID
605 					   + page);
606 	}
607 
608 	return set_current_xdes(0, page);
609 }
610 
611 /**
612 TODO: This can be made parallel trivially by chunking up the file
613 and creating a callback per thread.. Main benefit will be to use
614 multiple CPUs for checksums and compressed tables. We have to do
615 compressed tables block by block right now. Secondly we need to
616 decompress/compress and copy too much of data. These are
617 CPU intensive.
618 
619 Iterate over all the pages in the tablespace.
620 @param iter - Tablespace iterator
621 @param block - block to use for IO
622 @param callback - Callback to inspect and update page contents
623 @retval DB_SUCCESS or error code */
624 static dberr_t fil_iterate(
625 	const fil_iterator_t&	iter,
626 	buf_block_t*		block,
627 	AbstractCallback&	callback);
628 
629 /**
630 Try and determine the index root pages by checking if the next/prev
631 pointers are both FIL_NULL. We need to ensure that skip deleted pages. */
632 struct FetchIndexRootPages : public AbstractCallback {
633 
634 	/** Index information gathered from the .ibd file. */
635 	struct Index {
636 
IndexFetchIndexRootPages::Index637 		Index(index_id_t id, ulint page_no)
638 			:
639 			m_id(id),
640 			m_page_no(page_no) { }
641 
642 		index_id_t	m_id;		/*!< Index id */
643 		ulint		m_page_no;	/*!< Root page number */
644 	};
645 
646 	/** Constructor
647 	@param trx covering (user) transaction
648 	@param table table definition in server .*/
FetchIndexRootPagesFetchIndexRootPages649 	FetchIndexRootPages(const dict_table_t* table, trx_t* trx)
650 		:
651 		AbstractCallback(trx, ULINT_UNDEFINED),
652 		m_table(table), m_index(0, 0) UNIV_NOTHROW { }
653 
654 	/** Destructor */
~FetchIndexRootPagesFetchIndexRootPages655 	virtual ~FetchIndexRootPages() UNIV_NOTHROW { }
656 
657 	/** Fetch the clustered index root page in the tablespace
658 	@param iter	Tablespace iterator
659 	@param block	Block to use for IO
660 	@retval DB_SUCCESS or error code */
661 	dberr_t run(const fil_iterator_t& iter,
662 		    buf_block_t* block) UNIV_NOTHROW;
663 
664 	/** Called for each block as it is read from the file.
665 	@param block block to convert, it is not from the buffer pool.
666 	@retval DB_SUCCESS or error code. */
667 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
668 
669 	/** Update the import configuration that will be used to import
670 	the tablespace. */
671 	dberr_t build_row_import(row_import* cfg) const UNIV_NOTHROW;
672 
673 	/** Table definition in server. */
674 	const dict_table_t*	m_table;
675 
676 	/** Index information */
677 	Index			m_index;
678 };
679 
680 /** Called for each block as it is read from the file. Check index pages to
681 determine the exact row format. We can't get that from the tablespace
682 header flags alone.
683 
684 @param block block to convert, it is not from the buffer pool.
685 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)686 dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
687 {
688 	if (is_interrupted()) return DB_INTERRUPTED;
689 
690 	const page_t*	page = get_frame(block);
691 
692 	m_index.m_id = btr_page_get_index_id(page);
693 	m_index.m_page_no = block->page.id.page_no();
694 
695 	/* Check that the tablespace flags match the table flags. */
696 	ulint expected = dict_tf_to_fsp_flags(m_table->flags);
697 	if (!fsp_flags_match(expected, m_space_flags)) {
698 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
699 			ER_TABLE_SCHEMA_MISMATCH,
700 			"Expected FSP_SPACE_FLAGS=0x%x, .ibd "
701 			"file contains 0x%x.",
702 			unsigned(expected),
703 			unsigned(m_space_flags));
704 		return(DB_CORRUPTION);
705 	}
706 
707 	return DB_SUCCESS;
708 }
709 
710 /**
711 Update the import configuration that will be used to import the tablespace.
712 @return error code or DB_SUCCESS */
713 dberr_t
build_row_import(row_import * cfg) const714 FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
715 {
716 	ut_a(cfg->m_table == m_table);
717 	cfg->m_page_size.copy_from(m_page_size);
718 	cfg->m_n_indexes = 1;
719 
720 	if (cfg->m_n_indexes == 0) {
721 
722 		ib::error() << "No B+Tree found in tablespace";
723 
724 		return(DB_CORRUPTION);
725 	}
726 
727 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
728 
729 	/* Trigger OOM */
730 	DBUG_EXECUTE_IF(
731 		"ib_import_OOM_11",
732 		UT_DELETE_ARRAY(cfg->m_indexes);
733 		cfg->m_indexes = NULL;
734 	);
735 
736 	if (cfg->m_indexes == NULL) {
737 		return(DB_OUT_OF_MEMORY);
738 	}
739 
740 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
741 
742 	row_index_t*	cfg_index = cfg->m_indexes;
743 
744 	char	name[BUFSIZ];
745 
746 	snprintf(name, sizeof(name), "index" IB_ID_FMT, m_index.m_id);
747 
748 	ulint	len = strlen(name) + 1;
749 
750 	cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
751 
752 	/* Trigger OOM */
753 	DBUG_EXECUTE_IF(
754 		"ib_import_OOM_12",
755 		UT_DELETE_ARRAY(cfg_index->m_name);
756 		cfg_index->m_name = NULL;
757 	);
758 
759 	if (cfg_index->m_name == NULL) {
760 		return(DB_OUT_OF_MEMORY);
761 	}
762 
763 	memcpy(cfg_index->m_name, name, len);
764 
765 	cfg_index->m_id = m_index.m_id;
766 
767 	cfg_index->m_space = m_space;
768 
769 	cfg_index->m_page_no = m_index.m_page_no;
770 
771 	return(DB_SUCCESS);
772 }
773 
774 /* Functor that is called for each physical page that is read from the
775 tablespace file.
776 
777   1. Check each page for corruption.
778 
779   2. Update the space id and LSN on every page
780      * For the header page
781        - Validate the flags
782        - Update the LSN
783 
784   3. On Btree pages
785      * Set the index id
786      * Update the max trx id
787      * In a cluster index, update the system columns
788      * In a cluster index, update the BLOB ptr, set the space id
789      * Purge delete marked records, but only if they can be easily
790        removed from the page
791      * Keep a counter of number of rows, ie. non-delete-marked rows
792      * Keep a counter of number of delete marked rows
793      * Keep a counter of number of purge failure
794      * If a page is stamped with an index id that isn't in the .cfg file
795        we assume it is deleted and the page can be ignored.
796 
797    4. Set the page state to dirty so that it will be written to disk.
798 */
799 class PageConverter : public AbstractCallback {
800 public:
801 	/** Constructor
802 	@param cfg config of table being imported.
803 	@param space_id tablespace identifier
804 	@param trx transaction covering the import */
PageConverter(row_import * cfg,ulint space_id,trx_t * trx)805 	PageConverter(row_import* cfg, ulint space_id, trx_t* trx)
806 		:
807 		AbstractCallback(trx, space_id),
808 		m_cfg(cfg),
809 		m_index(cfg->m_indexes),
810 		m_current_lsn(log_get_lsn()),
811 		m_page_zip_ptr(0),
812 		m_rec_iter(),
813 		m_offsets_(), m_offsets(m_offsets_),
814 		m_heap(0),
815 		m_cluster_index(dict_table_get_first_index(cfg->m_table))
816 	{
817 		ut_ad(m_current_lsn);
818 		rec_offs_init(m_offsets_);
819 	}
820 
~PageConverter()821 	virtual ~PageConverter() UNIV_NOTHROW
822 	{
823 		if (m_heap != 0) {
824 			mem_heap_free(m_heap);
825 		}
826 	}
827 
run(const fil_iterator_t & iter,buf_block_t * block)828 	dberr_t run(const fil_iterator_t& iter, buf_block_t* block) UNIV_NOTHROW
829 	{
830 		return fil_iterate(iter, block, *this);
831 	}
832 
833 	/** Called for each block as it is read from the file.
834 	@param block block to convert, it is not from the buffer pool.
835 	@retval DB_SUCCESS or error code. */
836 	dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
837 private:
838 	/** Update the page, set the space id, max trx id and index id.
839 	@param block block read from file
840 	@param page_type type of the page
841 	@retval DB_SUCCESS or error code */
842 	dberr_t update_page(
843 		buf_block_t*	block,
844 		ulint&		page_type) UNIV_NOTHROW;
845 
846 	/** Update the space, index id, trx id.
847 	@param block block to convert
848 	@return DB_SUCCESS or error code */
849 	dberr_t	update_index_page(buf_block_t*	block) UNIV_NOTHROW;
850 
851 	/** Update the BLOB refrences and write UNDO log entries for
852 	rows that can't be purged optimistically.
853 	@param block block to update
854 	@retval DB_SUCCESS or error code */
855 	dberr_t	update_records(buf_block_t* block) UNIV_NOTHROW;
856 
857 	/** Validate the space flags and update tablespace header page.
858 	@param block block read from file, not from the buffer pool.
859 	@retval DB_SUCCESS or error code */
860 	dberr_t	update_header(buf_block_t* block) UNIV_NOTHROW;
861 
862 	/** Adjust the BLOB reference for a single column that is externally stored
863 	@param rec record to update
864 	@param offsets column offsets for the record
865 	@param i column ordinal value
866 	@return DB_SUCCESS or error code */
867 	dberr_t	adjust_cluster_index_blob_column(
868 		rec_t*		rec,
869 		const rec_offs*	offsets,
870 		ulint		i) UNIV_NOTHROW;
871 
872 	/** Adjusts the BLOB reference in the clustered index row for all
873 	externally stored columns.
874 	@param rec record to update
875 	@param offsets column offsets for the record
876 	@return DB_SUCCESS or error code */
877 	dberr_t	adjust_cluster_index_blob_columns(
878 		rec_t*		rec,
879 		const rec_offs*	offsets) UNIV_NOTHROW;
880 
881 	/** In the clustered index, adjist the BLOB pointers as needed.
882 	Also update the BLOB reference, write the new space id.
883 	@param rec record to update
884 	@param offsets column offsets for the record
885 	@return DB_SUCCESS or error code */
886 	dberr_t	adjust_cluster_index_blob_ref(
887 		rec_t*		rec,
888 		const rec_offs*	offsets) UNIV_NOTHROW;
889 
890 	/** Purge delete-marked records, only if it is possible to do
891 	so without re-organising the B+tree.
892 	@retval true if purged */
893 	bool purge() UNIV_NOTHROW;
894 
895 	/** Adjust the BLOB references and sys fields for the current record.
896 	@param rec record to update
897 	@param offsets column offsets for the record
898 	@return DB_SUCCESS or error code. */
899 	dberr_t	adjust_cluster_record(
900 		rec_t*			rec,
901 		const rec_offs*		offsets) UNIV_NOTHROW;
902 
903 	/** Find an index with the matching id.
904 	@return row_index_t* instance or 0 */
find_index(index_id_t id)905 	row_index_t* find_index(index_id_t id) UNIV_NOTHROW
906 	{
907 		row_index_t*	index = &m_cfg->m_indexes[0];
908 
909 		for (ulint i = 0; i < m_cfg->m_n_indexes; ++i, ++index) {
910 			if (id == index->m_id) {
911 				return(index);
912 			}
913 		}
914 
915 		return(0);
916 
917 	}
918 private:
919 	/** Config for table that is being imported. */
920 	row_import*		m_cfg;
921 
922 	/** Current index whose pages are being imported */
923 	row_index_t*		m_index;
924 
925 	/** Current system LSN */
926 	lsn_t			m_current_lsn;
927 
928 	/** Alias for m_page_zip, only set for compressed pages. */
929 	page_zip_des_t*		m_page_zip_ptr;
930 
931 	/** Iterator over records in a block */
932 	RecIterator		m_rec_iter;
933 
934 	/** Record offset */
935 	rec_offs		m_offsets_[REC_OFFS_NORMAL_SIZE];
936 
937 	/** Pointer to m_offsets_ */
938 	rec_offs*		m_offsets;
939 
940 	/** Memory heap for the record offsets */
941 	mem_heap_t*		m_heap;
942 
943 	/** Cluster index instance */
944 	dict_index_t*		m_cluster_index;
945 };
946 
947 /**
948 row_import destructor. */
~row_import()949 row_import::~row_import() UNIV_NOTHROW
950 {
951 	for (ulint i = 0; m_indexes != 0 && i < m_n_indexes; ++i) {
952 		UT_DELETE_ARRAY(m_indexes[i].m_name);
953 
954 		if (m_indexes[i].m_fields == NULL) {
955 			continue;
956 		}
957 
958 		dict_field_t*	fields = m_indexes[i].m_fields;
959 		ulint		n_fields = m_indexes[i].m_n_fields;
960 
961 		for (ulint j = 0; j < n_fields; ++j) {
962 			UT_DELETE_ARRAY(const_cast<char*>(fields[j].name()));
963 		}
964 
965 		UT_DELETE_ARRAY(fields);
966 	}
967 
968 	for (ulint i = 0; m_col_names != 0 && i < m_n_cols; ++i) {
969 		UT_DELETE_ARRAY(m_col_names[i]);
970 	}
971 
972 	UT_DELETE_ARRAY(m_cols);
973 	UT_DELETE_ARRAY(m_indexes);
974 	UT_DELETE_ARRAY(m_col_names);
975 	UT_DELETE_ARRAY(m_table_name);
976 	UT_DELETE_ARRAY(m_hostname);
977 }
978 
979 /** Find the index entry in in the indexes array.
980 @param name index name
981 @return instance if found else 0. */
982 row_index_t*
get_index(const char * name) const983 row_import::get_index(
984 	const char*	name) const UNIV_NOTHROW
985 {
986 	for (ulint i = 0; i < m_n_indexes; ++i) {
987 		const char*	index_name;
988 		row_index_t*	index = &m_indexes[i];
989 
990 		index_name = reinterpret_cast<const char*>(index->m_name);
991 
992 		if (strcmp(index_name, name) == 0) {
993 
994 			return(index);
995 		}
996 	}
997 
998 	return(0);
999 }
1000 
1001 /** Get the number of rows in the index.
1002 @param name index name
1003 @return number of rows (doesn't include delete marked rows). */
1004 ulint
get_n_rows(const char * name) const1005 row_import::get_n_rows(
1006 	const char*	name) const UNIV_NOTHROW
1007 {
1008 	const row_index_t*	index = get_index(name);
1009 
1010 	ut_a(name != 0);
1011 
1012 	return(index->m_stats.m_n_rows);
1013 }
1014 
1015 /** Get the number of rows for which purge failed uding the convert phase.
1016 @param name index name
1017 @return number of rows for which purge failed. */
1018 ulint
get_n_purge_failed(const char * name) const1019 row_import::get_n_purge_failed(
1020 	const char*	name) const UNIV_NOTHROW
1021 {
1022 	const row_index_t*	index = get_index(name);
1023 
1024 	ut_a(name != 0);
1025 
1026 	return(index->m_stats.m_n_purge_failed);
1027 }
1028 
1029 /** Find the ordinal value of the column name in the cfg table columns.
1030 @param name of column to look for.
1031 @return ULINT_UNDEFINED if not found. */
1032 ulint
find_col(const char * name) const1033 row_import::find_col(
1034 	const char*	name) const UNIV_NOTHROW
1035 {
1036 	for (ulint i = 0; i < m_n_cols; ++i) {
1037 		const char*	col_name;
1038 
1039 		col_name = reinterpret_cast<const char*>(m_col_names[i]);
1040 
1041 		if (strcmp(col_name, name) == 0) {
1042 			return(i);
1043 		}
1044 	}
1045 
1046 	return(ULINT_UNDEFINED);
1047 }
1048 
1049 /**
1050 Check if the index schema that was read from the .cfg file matches the
1051 in memory index definition.
1052 @return DB_SUCCESS or error code. */
1053 dberr_t
match_index_columns(THD * thd,const dict_index_t * index)1054 row_import::match_index_columns(
1055 	THD*			thd,
1056 	const dict_index_t*	index) UNIV_NOTHROW
1057 {
1058 	row_index_t*		cfg_index;
1059 	dberr_t			err = DB_SUCCESS;
1060 
1061 	cfg_index = get_index(index->name);
1062 
1063 	if (cfg_index == 0) {
1064 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1065 			ER_TABLE_SCHEMA_MISMATCH,
1066 			"Index %s not found in tablespace meta-data file.",
1067 			index->name());
1068 
1069 		return(DB_ERROR);
1070 	}
1071 
1072 	if (cfg_index->m_n_fields != index->n_fields) {
1073 
1074 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
1075 			ER_TABLE_SCHEMA_MISMATCH,
1076 			"Index field count %u doesn't match"
1077 			" tablespace metadata file value " ULINTPF,
1078 			index->n_fields, cfg_index->m_n_fields);
1079 
1080 		return(DB_ERROR);
1081 	}
1082 
1083 	cfg_index->m_srv_index = index;
1084 
1085 	const dict_field_t*	field = index->fields;
1086 	const dict_field_t*	cfg_field = cfg_index->m_fields;
1087 
1088 	for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
1089 
1090 		if (strcmp(field->name(), cfg_field->name()) != 0) {
1091 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1092 				ER_TABLE_SCHEMA_MISMATCH,
1093 				"Index field name %s doesn't match"
1094 				" tablespace metadata field name %s"
1095 				" for field position " ULINTPF,
1096 				field->name(), cfg_field->name(), i);
1097 
1098 			err = DB_ERROR;
1099 		}
1100 
1101 		if (cfg_field->prefix_len != field->prefix_len) {
1102 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1103 				ER_TABLE_SCHEMA_MISMATCH,
1104 				"Index %s field %s prefix len %u"
1105 				" doesn't match metadata file value %u",
1106 				index->name(), field->name(),
1107 				field->prefix_len, cfg_field->prefix_len);
1108 
1109 			err = DB_ERROR;
1110 		}
1111 
1112 		if (cfg_field->fixed_len != field->fixed_len) {
1113 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1114 				ER_TABLE_SCHEMA_MISMATCH,
1115 				"Index %s field %s fixed len %u"
1116 				" doesn't match metadata file value %u",
1117 				index->name(), field->name(),
1118 				field->fixed_len,
1119 				cfg_field->fixed_len);
1120 
1121 			err = DB_ERROR;
1122 		}
1123 	}
1124 
1125 	return(err);
1126 }
1127 
1128 /** Check if the table schema that was read from the .cfg file matches the
1129 in memory table definition.
1130 @param thd MySQL session variable
1131 @return DB_SUCCESS or error code. */
1132 dberr_t
match_table_columns(THD * thd)1133 row_import::match_table_columns(
1134 	THD*			thd) UNIV_NOTHROW
1135 {
1136 	dberr_t			err = DB_SUCCESS;
1137 	const dict_col_t*	col = m_table->cols;
1138 
1139 	for (ulint i = 0; i < m_table->n_cols; ++i, ++col) {
1140 
1141 		const char*	col_name;
1142 		ulint		cfg_col_index;
1143 
1144 		col_name = dict_table_get_col_name(
1145 			m_table, dict_col_get_no(col));
1146 
1147 		cfg_col_index = find_col(col_name);
1148 
1149 		if (cfg_col_index == ULINT_UNDEFINED) {
1150 
1151 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1152 				 ER_TABLE_SCHEMA_MISMATCH,
1153 				 "Column %s not found in tablespace.",
1154 				 col_name);
1155 
1156 			err = DB_ERROR;
1157 		} else if (cfg_col_index != col->ind) {
1158 
1159 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
1160 				ER_TABLE_SCHEMA_MISMATCH,
1161 				"Column %s ordinal value mismatch, it's at %u"
1162 				" in the table and " ULINTPF
1163 				" in the tablespace meta-data file",
1164 				col_name, col->ind, cfg_col_index);
1165 
1166 			err = DB_ERROR;
1167 		} else {
1168 			const dict_col_t*	cfg_col;
1169 
1170 			cfg_col = &m_cols[cfg_col_index];
1171 			ut_a(cfg_col->ind == cfg_col_index);
1172 
1173 			if (cfg_col->prtype != col->prtype) {
1174 				ib_errf(thd,
1175 					IB_LOG_LEVEL_ERROR,
1176 					ER_TABLE_SCHEMA_MISMATCH,
1177 					"Column %s precise type mismatch,"
1178 					" it's 0X%X in the table and 0X%X"
1179 					" in the tablespace meta file",
1180 					col_name, col->prtype, cfg_col->prtype);
1181 				err = DB_ERROR;
1182 			}
1183 
1184 			if (cfg_col->mtype != col->mtype) {
1185 				ib_errf(thd,
1186 					IB_LOG_LEVEL_ERROR,
1187 					ER_TABLE_SCHEMA_MISMATCH,
1188 					"Column %s main type mismatch,"
1189 					" it's 0X%X in the table and 0X%X"
1190 					" in the tablespace meta file",
1191 					col_name, col->mtype, cfg_col->mtype);
1192 				err = DB_ERROR;
1193 			}
1194 
1195 			if (cfg_col->len != col->len) {
1196 				ib_errf(thd,
1197 					IB_LOG_LEVEL_ERROR,
1198 					ER_TABLE_SCHEMA_MISMATCH,
1199 					"Column %s length mismatch,"
1200 					" it's %u in the table and %u"
1201 					" in the tablespace meta file",
1202 					col_name, col->len, cfg_col->len);
1203 				err = DB_ERROR;
1204 			}
1205 
1206 			if (cfg_col->mbminlen != col->mbminlen
1207 			    || cfg_col->mbmaxlen != col->mbmaxlen) {
1208 				ib_errf(thd,
1209 					IB_LOG_LEVEL_ERROR,
1210 					ER_TABLE_SCHEMA_MISMATCH,
1211 					"Column %s multi-byte len mismatch,"
1212 					" it's %u-%u in the table and %u-%u"
1213 					" in the tablespace meta file",
1214 					col_name, col->mbminlen, col->mbmaxlen,
1215 					cfg_col->mbminlen, cfg_col->mbmaxlen);
1216 				err = DB_ERROR;
1217 			}
1218 
1219 			if (cfg_col->ind != col->ind) {
1220 				ib_errf(thd,
1221 					IB_LOG_LEVEL_ERROR,
1222 					ER_TABLE_SCHEMA_MISMATCH,
1223 					"Column %s position mismatch,"
1224 					" it's %u in the table and %u"
1225 					" in the tablespace meta file",
1226 					col_name, col->ind, cfg_col->ind);
1227 				err = DB_ERROR;
1228 			}
1229 
1230 			if (cfg_col->ord_part != col->ord_part) {
1231 				ib_errf(thd,
1232 					IB_LOG_LEVEL_ERROR,
1233 					ER_TABLE_SCHEMA_MISMATCH,
1234 					"Column %s ordering mismatch,"
1235 					" it's %u in the table and %u"
1236 					" in the tablespace meta file",
1237 					col_name, col->ord_part,
1238 					cfg_col->ord_part);
1239 				err = DB_ERROR;
1240 			}
1241 
1242 			if (cfg_col->max_prefix != col->max_prefix) {
1243 				ib_errf(thd,
1244 					IB_LOG_LEVEL_ERROR,
1245 					ER_TABLE_SCHEMA_MISMATCH,
1246 					"Column %s max prefix mismatch"
1247 					" it's %u in the table and %u"
1248 					" in the tablespace meta file",
1249 					col_name, col->max_prefix,
1250 					cfg_col->max_prefix);
1251 				err = DB_ERROR;
1252 			}
1253 		}
1254 	}
1255 
1256 	return(err);
1257 }
1258 
1259 /** Check if the table (and index) schema that was read from the .cfg file
1260 matches the in memory table definition.
1261 @param thd MySQL session variable
1262 @return DB_SUCCESS or error code. */
1263 dberr_t
match_schema(THD * thd)1264 row_import::match_schema(
1265 	THD*		thd) UNIV_NOTHROW
1266 {
1267 	/* Do some simple checks. */
1268 
1269 	if (ulint mismatch = (m_table->flags ^ m_flags)
1270 	    & ~DICT_TF_MASK_DATA_DIR) {
1271 		const char* msg;
1272 		if (mismatch & DICT_TF_MASK_ZIP_SSIZE) {
1273 			if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE)
1274 			    && (m_flags & DICT_TF_MASK_ZIP_SSIZE)) {
1275 				switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1276 				case 0U << DICT_TF_POS_ZIP_SSIZE:
1277 					goto uncompressed;
1278 				case 1U << DICT_TF_POS_ZIP_SSIZE:
1279 					msg = "ROW_FORMAT=COMPRESSED"
1280 						" KEY_BLOCK_SIZE=1";
1281 					break;
1282 				case 2U << DICT_TF_POS_ZIP_SSIZE:
1283 					msg = "ROW_FORMAT=COMPRESSED"
1284 						" KEY_BLOCK_SIZE=2";
1285 					break;
1286 				case 3U << DICT_TF_POS_ZIP_SSIZE:
1287 					msg = "ROW_FORMAT=COMPRESSED"
1288 						" KEY_BLOCK_SIZE=4";
1289 					break;
1290 				case 4U << DICT_TF_POS_ZIP_SSIZE:
1291 					msg = "ROW_FORMAT=COMPRESSED"
1292 						" KEY_BLOCK_SIZE=8";
1293 					break;
1294 				case 5U << DICT_TF_POS_ZIP_SSIZE:
1295 					msg = "ROW_FORMAT=COMPRESSED"
1296 						" KEY_BLOCK_SIZE=16";
1297 					break;
1298 				default:
1299 					msg = "strange KEY_BLOCK_SIZE";
1300 				}
1301 			} else if (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
1302 				msg = "ROW_FORMAT=COMPRESSED";
1303 			} else {
1304 				goto uncompressed;
1305 			}
1306 		} else {
1307 uncompressed:
1308 			msg = (m_flags & DICT_TF_MASK_ATOMIC_BLOBS)
1309 				? "ROW_FORMAT=DYNAMIC"
1310 				: (m_flags & DICT_TF_MASK_COMPACT)
1311 				? "ROW_FORMAT=COMPACT"
1312 				: "ROW_FORMAT=REDUNDANT";
1313 		}
1314 
1315 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1316 			"Table flags don't match, server table has 0x%x"
1317 			" and the meta-data file has 0x" ULINTPFx ";"
1318 			" .cfg file uses %s",
1319 			m_table->flags, m_flags, msg);
1320 
1321 		return(DB_ERROR);
1322 	} else if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1323 
1324 		/* If the number of indexes don't match then it is better
1325 		to abort the IMPORT. It is easy for the user to create a
1326 		table matching the IMPORT definition. */
1327 
1328 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
1329 			"Number of indexes don't match, table has " ULINTPF
1330 			" indexes but the tablespace meta-data file has "
1331 			ULINTPF " indexes",
1332 			UT_LIST_GET_LEN(m_table->indexes), m_n_indexes);
1333 
1334 		return(DB_ERROR);
1335 	}
1336 
1337 	dberr_t	err = match_table_columns(thd);
1338 
1339 	if (err != DB_SUCCESS) {
1340 		return(err);
1341 	}
1342 
1343 	/* Check if the index definitions match. */
1344 
1345 	const dict_index_t* index;
1346 
1347 	for (index = UT_LIST_GET_FIRST(m_table->indexes);
1348 	     index != 0;
1349 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1350 
1351 		dberr_t	index_err;
1352 
1353 		index_err = match_index_columns(thd, index);
1354 
1355 		if (index_err != DB_SUCCESS) {
1356 			err = index_err;
1357 		}
1358 	}
1359 
1360 	return(err);
1361 }
1362 
1363 /**
1364 Set the index root <space, pageno>, using index name. */
1365 void
set_root_by_name()1366 row_import::set_root_by_name() UNIV_NOTHROW
1367 {
1368 	row_index_t*	cfg_index = m_indexes;
1369 
1370 	for (ulint i = 0; i < m_n_indexes; ++i, ++cfg_index) {
1371 		dict_index_t*	index;
1372 
1373 		const char*	index_name;
1374 
1375 		index_name = reinterpret_cast<const char*>(cfg_index->m_name);
1376 
1377 		index = dict_table_get_index_on_name(m_table, index_name);
1378 
1379 		/* We've already checked that it exists. */
1380 		ut_a(index != 0);
1381 
1382 		index->page = cfg_index->m_page_no;
1383 	}
1384 }
1385 
1386 /**
1387 Set the index root <space, pageno>, using a heuristic.
1388 @return DB_SUCCESS or error code */
1389 dberr_t
set_root_by_heuristic()1390 row_import::set_root_by_heuristic() UNIV_NOTHROW
1391 {
1392 	row_index_t*	cfg_index = m_indexes;
1393 
1394 	ut_a(m_n_indexes > 0);
1395 
1396 	// TODO: For now use brute force, based on ordinality
1397 
1398 	if (UT_LIST_GET_LEN(m_table->indexes) != m_n_indexes) {
1399 
1400 		ib::warn() << "Table " << m_table->name << " should have "
1401 			<< UT_LIST_GET_LEN(m_table->indexes) << " indexes but"
1402 			" the tablespace has " << m_n_indexes << " indexes";
1403 	}
1404 
1405 	dict_mutex_enter_for_mysql();
1406 
1407 	ulint	i = 0;
1408 	dberr_t	err = DB_SUCCESS;
1409 
1410 	for (dict_index_t* index = UT_LIST_GET_FIRST(m_table->indexes);
1411 	     index != 0;
1412 	     index = UT_LIST_GET_NEXT(indexes, index)) {
1413 
1414 		if (index->type & DICT_FTS) {
1415 			index->type |= DICT_CORRUPT;
1416 			ib::warn() << "Skipping FTS index: " << index->name;
1417 		} else if (i < m_n_indexes) {
1418 
1419 			UT_DELETE_ARRAY(cfg_index[i].m_name);
1420 
1421 			ulint	len = strlen(index->name) + 1;
1422 
1423 			cfg_index[i].m_name = UT_NEW_ARRAY_NOKEY(byte, len);
1424 
1425 			/* Trigger OOM */
1426 			DBUG_EXECUTE_IF(
1427 				"ib_import_OOM_14",
1428 				UT_DELETE_ARRAY(cfg_index[i].m_name);
1429 				cfg_index[i].m_name = NULL;
1430 			);
1431 
1432 			if (cfg_index[i].m_name == NULL) {
1433 				err = DB_OUT_OF_MEMORY;
1434 				break;
1435 			}
1436 
1437 			memcpy(cfg_index[i].m_name, index->name, len);
1438 
1439 			cfg_index[i].m_srv_index = index;
1440 
1441 			index->page = cfg_index[i].m_page_no;
1442 
1443 			++i;
1444 		}
1445 	}
1446 
1447 	dict_mutex_exit_for_mysql();
1448 
1449 	return(err);
1450 }
1451 
1452 /**
1453 Purge delete marked records.
1454 @return DB_SUCCESS or error code. */
1455 dberr_t
garbage_collect()1456 IndexPurge::garbage_collect() UNIV_NOTHROW
1457 {
1458 	dberr_t	err;
1459 	ibool	comp = dict_table_is_comp(m_index->table);
1460 
1461 	/* Open the persistent cursor and start the mini-transaction. */
1462 
1463 	open();
1464 
1465 	while ((err = next()) == DB_SUCCESS) {
1466 
1467 		rec_t*	rec = btr_pcur_get_rec(&m_pcur);
1468 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1469 
1470 		if (!deleted) {
1471 			++m_n_rows;
1472 		} else {
1473 			purge();
1474 		}
1475 	}
1476 
1477 	/* Close the persistent cursor and commit the mini-transaction. */
1478 
1479 	close();
1480 
1481 	return(err == DB_END_OF_INDEX ? DB_SUCCESS : err);
1482 }
1483 
1484 /**
1485 Begin import, position the cursor on the first record. */
1486 void
open()1487 IndexPurge::open() UNIV_NOTHROW
1488 {
1489 	mtr_start(&m_mtr);
1490 
1491 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1492 
1493 	btr_pcur_open_at_index_side(
1494 		true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
1495 	btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
1496 	if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), m_index)) {
1497 		ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
1498 		/* Skip the metadata pseudo-record. */
1499 	} else {
1500 		btr_pcur_move_to_prev_on_page(&m_pcur);
1501 	}
1502 }
1503 
1504 /**
1505 Close the persistent curosr and commit the mini-transaction. */
1506 void
close()1507 IndexPurge::close() UNIV_NOTHROW
1508 {
1509 	btr_pcur_close(&m_pcur);
1510 	mtr_commit(&m_mtr);
1511 }
1512 
1513 /**
1514 Position the cursor on the next record.
1515 @return DB_SUCCESS or error code */
1516 dberr_t
next()1517 IndexPurge::next() UNIV_NOTHROW
1518 {
1519 	btr_pcur_move_to_next_on_page(&m_pcur);
1520 
1521 	/* When switching pages, commit the mini-transaction
1522 	in order to release the latch on the old page. */
1523 
1524 	if (!btr_pcur_is_after_last_on_page(&m_pcur)) {
1525 		return(DB_SUCCESS);
1526 	} else if (trx_is_interrupted(m_trx)) {
1527 		/* Check after every page because the check
1528 		is expensive. */
1529 		return(DB_INTERRUPTED);
1530 	}
1531 
1532 	btr_pcur_store_position(&m_pcur, &m_mtr);
1533 
1534 	mtr_commit(&m_mtr);
1535 
1536 	mtr_start(&m_mtr);
1537 
1538 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1539 
1540 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1541 	/* The following is based on btr_pcur_move_to_next_user_rec(). */
1542 	m_pcur.old_stored = false;
1543 	ut_ad(m_pcur.latch_mode == BTR_MODIFY_LEAF);
1544 	do {
1545 		if (btr_pcur_is_after_last_on_page(&m_pcur)) {
1546 			if (btr_pcur_is_after_last_in_tree(&m_pcur)) {
1547 				return DB_END_OF_INDEX;
1548 			}
1549 
1550 			buf_block_t* block = btr_pcur_get_block(&m_pcur);
1551 			uint32_t next_page = btr_page_get_next(block->frame);
1552 
1553 			/* MDEV-13542 FIXME: Make these checks part of
1554 			btr_pcur_move_to_next_page(), and introduce a
1555 			return status that will be checked in all callers! */
1556 			switch (next_page) {
1557 			default:
1558 				if (next_page != block->page.id.page_no()) {
1559 					break;
1560 				}
1561 				/* MDEV-20931 FIXME: Check that
1562 				next_page is within the tablespace
1563 				bounds! Also check that it is not a
1564 				change buffer bitmap page. */
1565 				/* fall through */
1566 			case 0:
1567 			case 1:
1568 			case FIL_NULL:
1569 				return DB_CORRUPTION;
1570 			}
1571 
1572 			dict_index_t* index = m_pcur.btr_cur.index;
1573 			buf_block_t* next_block = btr_block_get(
1574 				page_id_t(block->page.id.space(), next_page),
1575 				block->page.size, BTR_MODIFY_LEAF, index,
1576 				&m_mtr);
1577 
1578 			if (UNIV_UNLIKELY(!next_block
1579 					  || !fil_page_index_page_check(
1580 						  next_block->frame)
1581 					  || !!dict_index_is_spatial(index)
1582 					  != (fil_page_get_type(
1583 						      next_block->frame)
1584 					      == FIL_PAGE_RTREE)
1585 					  || page_is_comp(next_block->frame)
1586 					  != page_is_comp(block->frame)
1587 					  || btr_page_get_prev(
1588 						  next_block->frame)
1589 					  != block->page.id.page_no())) {
1590 				return DB_CORRUPTION;
1591 			}
1592 
1593 			btr_leaf_page_release(block, BTR_MODIFY_LEAF, &m_mtr);
1594 
1595 			page_cur_set_before_first(next_block,
1596 						  &m_pcur.btr_cur.page_cur);
1597 
1598 			ut_d(page_check_dir(next_block->frame));
1599 		} else {
1600 			btr_pcur_move_to_next_on_page(&m_pcur);
1601 		}
1602 	} while (!btr_pcur_is_on_user_rec(&m_pcur));
1603 
1604 	return DB_SUCCESS;
1605 }
1606 
1607 /**
1608 Store the persistent cursor position and reopen the
1609 B-tree cursor in BTR_MODIFY_TREE mode, because the
1610 tree structure may be changed during a pessimistic delete. */
1611 void
purge_pessimistic_delete()1612 IndexPurge::purge_pessimistic_delete() UNIV_NOTHROW
1613 {
1614 	dberr_t	err;
1615 
1616 	btr_pcur_restore_position(BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1617 				  &m_pcur, &m_mtr);
1618 
1619 	ut_ad(rec_get_deleted_flag(
1620 			btr_pcur_get_rec(&m_pcur),
1621 			dict_table_is_comp(m_index->table)));
1622 
1623 	btr_cur_pessimistic_delete(
1624 		&err, FALSE, btr_pcur_get_btr_cur(&m_pcur), 0, false, &m_mtr);
1625 
1626 	ut_a(err == DB_SUCCESS);
1627 
1628 	/* Reopen the B-tree cursor in BTR_MODIFY_LEAF mode */
1629 	mtr_commit(&m_mtr);
1630 }
1631 
1632 /**
1633 Purge delete-marked records. */
1634 void
purge()1635 IndexPurge::purge() UNIV_NOTHROW
1636 {
1637 	btr_pcur_store_position(&m_pcur, &m_mtr);
1638 
1639 	purge_pessimistic_delete();
1640 
1641 	mtr_start(&m_mtr);
1642 
1643 	mtr_set_log_mode(&m_mtr, MTR_LOG_NO_REDO);
1644 
1645 	btr_pcur_restore_position(BTR_MODIFY_LEAF, &m_pcur, &m_mtr);
1646 }
1647 
1648 /** Adjust the BLOB reference for a single column that is externally stored
1649 @param rec record to update
1650 @param offsets column offsets for the record
1651 @param i column ordinal value
1652 @return DB_SUCCESS or error code */
1653 inline
1654 dberr_t
adjust_cluster_index_blob_column(rec_t * rec,const rec_offs * offsets,ulint i)1655 PageConverter::adjust_cluster_index_blob_column(
1656 	rec_t*		rec,
1657 	const rec_offs*	offsets,
1658 	ulint		i) UNIV_NOTHROW
1659 {
1660 	ulint		len;
1661 	byte*		field;
1662 
1663 	field = rec_get_nth_field(rec, offsets, i, &len);
1664 
1665 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_2",
1666 			len = BTR_EXTERN_FIELD_REF_SIZE - 1;);
1667 
1668 	if (len < BTR_EXTERN_FIELD_REF_SIZE) {
1669 
1670 		ib_errf(m_trx->mysql_thd, IB_LOG_LEVEL_ERROR,
1671 			ER_INNODB_INDEX_CORRUPT,
1672 			"Externally stored column(" ULINTPF
1673 			") has a reference length of " ULINTPF
1674 			" in the cluster index %s",
1675 			i, len, m_cluster_index->name());
1676 
1677 		return(DB_CORRUPTION);
1678 	}
1679 
1680 	field += len - (BTR_EXTERN_FIELD_REF_SIZE - BTR_EXTERN_SPACE_ID);
1681 
1682 	mach_write_to_4(field, get_space_id());
1683 
1684 	if (m_page_zip_ptr) {
1685 		page_zip_write_blob_ptr(
1686 			m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
1687 	}
1688 
1689 	return(DB_SUCCESS);
1690 }
1691 
1692 /** Adjusts the BLOB reference in the clustered index row for all externally
1693 stored columns.
1694 @param rec record to update
1695 @param offsets column offsets for the record
1696 @return DB_SUCCESS or error code */
1697 inline
1698 dberr_t
adjust_cluster_index_blob_columns(rec_t * rec,const rec_offs * offsets)1699 PageConverter::adjust_cluster_index_blob_columns(
1700 	rec_t*		rec,
1701 	const rec_offs*	offsets) UNIV_NOTHROW
1702 {
1703 	ut_ad(rec_offs_any_extern(offsets));
1704 
1705 	/* Adjust the space_id in the BLOB pointers. */
1706 
1707 	for (ulint i = 0; i < rec_offs_n_fields(offsets); ++i) {
1708 
1709 		/* Only if the column is stored "externally". */
1710 
1711 		if (rec_offs_nth_extern(offsets, i)) {
1712 			dberr_t	err;
1713 
1714 			err = adjust_cluster_index_blob_column(rec, offsets, i);
1715 
1716 			if (err != DB_SUCCESS) {
1717 				return(err);
1718 			}
1719 		}
1720 	}
1721 
1722 	return(DB_SUCCESS);
1723 }
1724 
1725 /** In the clustered index, adjust BLOB pointers as needed. Also update the
1726 BLOB reference, write the new space id.
1727 @param rec record to update
1728 @param offsets column offsets for the record
1729 @return DB_SUCCESS or error code */
1730 inline
1731 dberr_t
adjust_cluster_index_blob_ref(rec_t * rec,const rec_offs * offsets)1732 PageConverter::adjust_cluster_index_blob_ref(
1733 	rec_t*		rec,
1734 	const rec_offs*	offsets) UNIV_NOTHROW
1735 {
1736 	if (rec_offs_any_extern(offsets)) {
1737 		dberr_t	err;
1738 
1739 		err = adjust_cluster_index_blob_columns(rec, offsets);
1740 
1741 		if (err != DB_SUCCESS) {
1742 			return(err);
1743 		}
1744 	}
1745 
1746 	return(DB_SUCCESS);
1747 }
1748 
1749 /** Purge delete-marked records, only if it is possible to do so without
1750 re-organising the B+tree.
1751 @return true if purge succeeded */
purge()1752 inline bool PageConverter::purge() UNIV_NOTHROW
1753 {
1754 	const dict_index_t*	index = m_index->m_srv_index;
1755 
1756 	/* We can't have a page that is empty and not root. */
1757 	if (m_rec_iter.remove(index, m_page_zip_ptr, m_offsets)) {
1758 
1759 		++m_index->m_stats.m_n_purged;
1760 
1761 		return(true);
1762 	} else {
1763 		++m_index->m_stats.m_n_purge_failed;
1764 	}
1765 
1766 	return(false);
1767 }
1768 
1769 /** Adjust the BLOB references and sys fields for the current record.
1770 @param rec record to update
1771 @param offsets column offsets for the record
1772 @return DB_SUCCESS or error code. */
1773 inline
1774 dberr_t
adjust_cluster_record(rec_t * rec,const rec_offs * offsets)1775 PageConverter::adjust_cluster_record(
1776 	rec_t*			rec,
1777 	const rec_offs*		offsets) UNIV_NOTHROW
1778 {
1779 	dberr_t	err;
1780 
1781 	if ((err = adjust_cluster_index_blob_ref(rec, offsets)) == DB_SUCCESS) {
1782 
1783 		/* Reset DB_TRX_ID and DB_ROLL_PTR.  Normally, these fields
1784 		are only written in conjunction with other changes to the
1785 		record. */
1786 		ulint	trx_id_pos = m_cluster_index->n_uniq
1787 			? m_cluster_index->n_uniq : 1;
1788 		if (m_page_zip_ptr) {
1789 			page_zip_write_trx_id_and_roll_ptr(
1790 				m_page_zip_ptr, rec, m_offsets, trx_id_pos,
1791 				0, roll_ptr_t(1) << ROLL_PTR_INSERT_FLAG_POS,
1792 				NULL);
1793 		} else {
1794 			ulint	len;
1795 			byte*	ptr = rec_get_nth_field(
1796 				rec, m_offsets, trx_id_pos, &len);
1797 			ut_ad(len == DATA_TRX_ID_LEN);
1798 			memcpy(ptr, reset_trx_id, sizeof reset_trx_id);
1799 		}
1800 	}
1801 
1802 	return(err);
1803 }
1804 
1805 /** Update the BLOB refrences and write UNDO log entries for
1806 rows that can't be purged optimistically.
1807 @param block block to update
1808 @retval DB_SUCCESS or error code */
1809 inline
1810 dberr_t
update_records(buf_block_t * block)1811 PageConverter::update_records(
1812 	buf_block_t*	block) UNIV_NOTHROW
1813 {
1814 	ibool	comp = dict_table_is_comp(m_cfg->m_table);
1815 	bool	clust_index = m_index->m_srv_index == m_cluster_index;
1816 
1817 	/* This will also position the cursor on the first user record. */
1818 
1819 	m_rec_iter.open(block);
1820 
1821 	while (!m_rec_iter.end()) {
1822 		rec_t*	rec = m_rec_iter.current();
1823 		ibool	deleted = rec_get_deleted_flag(rec, comp);
1824 
1825 		/* For the clustered index we have to adjust the BLOB
1826 		reference and the system fields irrespective of the
1827 		delete marked flag. The adjustment of delete marked
1828 		cluster records is required for purge to work later. */
1829 
1830 		if (deleted || clust_index) {
1831 			m_offsets = rec_get_offsets(
1832 				rec, m_index->m_srv_index, m_offsets,
1833 				m_index->m_srv_index->n_core_fields,
1834 				ULINT_UNDEFINED, &m_heap);
1835 		}
1836 
1837 		if (clust_index) {
1838 
1839 			dberr_t err = adjust_cluster_record(rec, m_offsets);
1840 
1841 			if (err != DB_SUCCESS) {
1842 				return(err);
1843 			}
1844 		}
1845 
1846 		/* If it is a delete marked record then try an
1847 		optimistic delete. */
1848 
1849 		if (deleted) {
1850 			/* A successful purge will move the cursor to the
1851 			next record. */
1852 
1853 			if (!purge()) {
1854 				m_rec_iter.next();
1855 			}
1856 
1857 			++m_index->m_stats.m_n_deleted;
1858 		} else {
1859 			++m_index->m_stats.m_n_rows;
1860 			m_rec_iter.next();
1861 		}
1862 	}
1863 
1864 	return(DB_SUCCESS);
1865 }
1866 
1867 /** Update the space, index id, trx id.
1868 @return DB_SUCCESS or error code */
1869 inline
1870 dberr_t
update_index_page(buf_block_t * block)1871 PageConverter::update_index_page(
1872 	buf_block_t*	block) UNIV_NOTHROW
1873 {
1874 	index_id_t	id;
1875 	buf_frame_t*	page = block->frame;
1876 
1877 	if (is_free(block->page.id.page_no())) {
1878 		return(DB_SUCCESS);
1879 	} else if ((id = btr_page_get_index_id(page)) != m_index->m_id) {
1880 		row_index_t*	index = find_index(id);
1881 
1882 		if (UNIV_UNLIKELY(!index)) {
1883 			if (m_cfg->m_missing) {
1884 				return DB_SUCCESS;
1885 			}
1886 
1887 			ib::error() << "Page for tablespace " << m_space
1888 				<< " is index page with id " << id
1889 				<< " but that index is not found from"
1890 				<< " configuration file. Current index name "
1891 				<< m_index->m_name << " and id " <<  m_index->m_id;
1892 			m_index = 0;
1893 			return(DB_CORRUPTION);
1894 		}
1895 
1896 		/* Update current index */
1897 		m_index = index;
1898 	}
1899 
1900 	/* If the .cfg file is missing and there is an index mismatch
1901 	then ignore the error. */
1902 	if (m_cfg->m_missing && (m_index == 0 || m_index->m_srv_index == 0)) {
1903 		return(DB_SUCCESS);
1904 	}
1905 
1906 	if (m_index && block->page.id.page_no() == m_index->m_page_no) {
1907 		byte *b = FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + FSEG_HDR_SPACE
1908 			+ page;
1909 		mach_write_to_4(b, block->page.id.space());
1910 
1911 		memcpy(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + FSEG_HDR_SPACE
1912 		       + page, b, 4);
1913 		if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1914 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1915 						     + PAGE_BTR_SEG_TOP
1916 						     + FSEG_HDR_SPACE], b, 4);
1917 			memcpy(&block->page.zip.data[FIL_PAGE_DATA
1918 						     + PAGE_BTR_SEG_LEAF
1919 						     + FSEG_HDR_SPACE], b, 4);
1920 		}
1921 	}
1922 
1923 #ifdef UNIV_ZIP_DEBUG
1924 	ut_a(!is_compressed_table()
1925 	     || page_zip_validate(m_page_zip_ptr, page, m_index->m_srv_index));
1926 #endif /* UNIV_ZIP_DEBUG */
1927 
1928 	/* This has to be written to uncompressed index header. Set it to
1929 	the current index id. */
1930 	btr_page_set_index_id(
1931 		page, m_page_zip_ptr, m_index->m_srv_index->id, 0);
1932 
1933 	if (dict_index_is_clust(m_index->m_srv_index)) {
1934 		dict_index_t* index = const_cast<dict_index_t*>(
1935 			m_index->m_srv_index);
1936 		if (block->page.id.page_no() == index->page) {
1937 			/* Preserve the PAGE_ROOT_AUTO_INC. */
1938 			if (index->table->supports_instant()) {
1939 				if (btr_cur_instant_root_init(index, page)) {
1940 					return(DB_CORRUPTION);
1941 				}
1942 
1943 				/* Provisionally set all instantly
1944 				added columns to be DEFAULT NULL. */
1945 				for (unsigned i = index->n_core_fields;
1946 				     i < index->n_fields; i++) {
1947 					dict_col_t* col = index->fields[i].col;
1948 					col->def_val.len = UNIV_SQL_NULL;
1949 					col->def_val.data = NULL;
1950 				}
1951 			}
1952 		} else {
1953 			/* Clear PAGE_MAX_TRX_ID so that it can be
1954 			used for other purposes in the future. IMPORT
1955 			in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
1956 			would set the field to the transaction ID even
1957 			on clustered index pages. */
1958 			page_set_max_trx_id(block, m_page_zip_ptr, 0, NULL);
1959 		}
1960 	} else {
1961 		/* Set PAGE_MAX_TRX_ID on secondary index leaf pages,
1962 		and clear it on non-leaf pages. */
1963 		page_set_max_trx_id(block, m_page_zip_ptr,
1964 				    page_is_leaf(page) ? m_trx->id : 0, NULL);
1965 	}
1966 
1967 	if (page_is_empty(page)) {
1968 
1969 		/* Only a root page can be empty. */
1970 		if (page_has_siblings(page)) {
1971 			// TODO: We should relax this and skip secondary
1972 			// indexes. Mark them as corrupt because they can
1973 			// always be rebuilt.
1974 			return(DB_CORRUPTION);
1975 		}
1976 
1977 		return(DB_SUCCESS);
1978 	}
1979 
1980 	return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
1981 }
1982 
1983 /** Validate the space flags and update tablespace header page.
1984 @param block block read from file, not from the buffer pool.
1985 @retval DB_SUCCESS or error code */
1986 inline
1987 dberr_t
update_header(buf_block_t * block)1988 PageConverter::update_header(
1989 	buf_block_t*	block) UNIV_NOTHROW
1990 {
1991 	/* Check for valid header */
1992 	switch (fsp_header_get_space_id(get_frame(block))) {
1993 	case 0:
1994 		return(DB_CORRUPTION);
1995 	case ULINT_UNDEFINED:
1996 		ib::warn() << "Space id check in the header failed: ignored";
1997 	}
1998 
1999 	mach_write_to_8(
2000 		get_frame(block) + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
2001 		m_current_lsn);
2002 
2003 	/* Write back the adjusted flags. */
2004 	mach_write_to_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS
2005 			+ get_frame(block), m_space_flags);
2006 
2007 	/* Write space_id to the tablespace header, page 0. */
2008 	mach_write_to_4(
2009 		get_frame(block) + FSP_HEADER_OFFSET + FSP_SPACE_ID,
2010 		get_space_id());
2011 
2012 	/* This is on every page in the tablespace. */
2013 	mach_write_to_4(
2014 		get_frame(block) + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
2015 		get_space_id());
2016 
2017 	return(DB_SUCCESS);
2018 }
2019 
2020 /** Update the page, set the space id, max trx id and index id.
2021 @param block block read from file
2022 @retval DB_SUCCESS or error code */
2023 inline
2024 dberr_t
update_page(buf_block_t * block,ulint & page_type)2025 PageConverter::update_page(
2026 	buf_block_t*	block,
2027 	ulint&		page_type) UNIV_NOTHROW
2028 {
2029 	dberr_t		err = DB_SUCCESS;
2030 
2031 	ut_ad(!block->page.zip.data == !is_compressed_table());
2032 
2033 	if (block->page.zip.data) {
2034 		m_page_zip_ptr = &block->page.zip;
2035 	} else {
2036 		ut_ad(!m_page_zip_ptr);
2037 	}
2038 
2039 	switch (page_type = fil_page_get_type(get_frame(block))) {
2040 	case FIL_PAGE_TYPE_FSP_HDR:
2041 		ut_a(block->page.id.page_no() == 0);
2042 		/* Work directly on the uncompressed page headers. */
2043 		return(update_header(block));
2044 
2045 	case FIL_PAGE_INDEX:
2046 	case FIL_PAGE_RTREE:
2047 		/* We need to decompress the contents into block->frame
2048 		before we can do any thing with Btree pages. */
2049 
2050 		if (is_compressed_table() && !buf_zip_decompress(block, TRUE)) {
2051 			return(DB_CORRUPTION);
2052 		}
2053 
2054 		/* fall through */
2055 	case FIL_PAGE_TYPE_INSTANT:
2056 		/* This is on every page in the tablespace. */
2057 		mach_write_to_4(
2058 			get_frame(block)
2059 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2060 
2061 		/* Only update the Btree nodes. */
2062 		return(update_index_page(block));
2063 
2064 	case FIL_PAGE_TYPE_SYS:
2065 		/* This is page 0 in the system tablespace. */
2066 		return(DB_CORRUPTION);
2067 
2068 	case FIL_PAGE_TYPE_XDES:
2069 		err = set_current_xdes(
2070 			block->page.id.page_no(), get_frame(block));
2071 		/* fall through */
2072 	case FIL_PAGE_INODE:
2073 	case FIL_PAGE_TYPE_TRX_SYS:
2074 	case FIL_PAGE_IBUF_FREE_LIST:
2075 	case FIL_PAGE_TYPE_ALLOCATED:
2076 	case FIL_PAGE_IBUF_BITMAP:
2077 	case FIL_PAGE_TYPE_BLOB:
2078 	case FIL_PAGE_TYPE_ZBLOB:
2079 	case FIL_PAGE_TYPE_ZBLOB2:
2080 
2081 		/* Work directly on the uncompressed page headers. */
2082 		/* This is on every page in the tablespace. */
2083 		mach_write_to_4(
2084 			get_frame(block)
2085 			+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, get_space_id());
2086 
2087 		return(err);
2088 	}
2089 
2090 	ib::warn() << "Unknown page type (" << page_type << ")";
2091 
2092 	return(DB_CORRUPTION);
2093 }
2094 
2095 /** Called for every page in the tablespace. If the page was not
2096 updated then its state must be set to BUF_PAGE_NOT_USED.
2097 @param block block read from file, note it is not from the buffer pool
2098 @retval DB_SUCCESS or error code. */
operator ()(buf_block_t * block)2099 dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
2100 {
2101 	/* If we already had an old page with matching number
2102 	in the buffer pool, evict it now, because
2103 	we no longer evict the pages on DISCARD TABLESPACE. */
2104 	buf_page_get_gen(block->page.id, get_page_size(),
2105 			 RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
2106 			 __FILE__, __LINE__, NULL, NULL);
2107 
2108 	ulint		page_type;
2109 
2110 	dberr_t err = update_page(block, page_type);
2111 	if (err != DB_SUCCESS) return err;
2112 
2113 	if (!block->page.zip.data) {
2114 		buf_flush_init_for_writing(
2115 			NULL, block->frame, NULL, m_current_lsn);
2116 	} else if (fil_page_type_is_index(page_type)) {
2117 		buf_flush_init_for_writing(
2118 			NULL, block->page.zip.data, &block->page.zip,
2119 			m_current_lsn);
2120 	} else {
2121 		/* Calculate and update the checksum of non-index
2122 		pages for ROW_FORMAT=COMPRESSED tables. */
2123 		buf_flush_update_zip_checksum(
2124 			block->page.zip.data, get_page_size().physical(),
2125 			m_current_lsn);
2126 	}
2127 
2128 	return DB_SUCCESS;
2129 }
2130 
2131 /*****************************************************************//**
2132 Clean up after import tablespace failure, this function will acquire
2133 the dictionary latches on behalf of the transaction if the transaction
2134 hasn't already acquired them. */
2135 static	MY_ATTRIBUTE((nonnull))
2136 void
row_import_discard_changes(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2137 row_import_discard_changes(
2138 /*=======================*/
2139 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2140 	trx_t*		trx,		/*!< in/out: transaction for import */
2141 	dberr_t		err)		/*!< in: error code */
2142 {
2143 	dict_table_t*	table = prebuilt->table;
2144 
2145 	ut_a(err != DB_SUCCESS);
2146 
2147 	prebuilt->trx->error_info = NULL;
2148 
2149 	ib::info() << "Discarding tablespace of table "
2150 		<< prebuilt->table->name
2151 		<< ": " << err;
2152 
2153 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
2154 		ut_a(trx->dict_operation_lock_mode == 0);
2155 		row_mysql_lock_data_dictionary(trx);
2156 	}
2157 
2158 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2159 
2160 	/* Since we update the index root page numbers on disk after
2161 	we've done a successful import. The table will not be loadable.
2162 	However, we need to ensure that the in memory root page numbers
2163 	are reset to "NULL". */
2164 
2165 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2166 		index != 0;
2167 		index = UT_LIST_GET_NEXT(indexes, index)) {
2168 
2169 		index->page = FIL_NULL;
2170 	}
2171 
2172 	table->file_unreadable = true;
2173 	if (table->space) {
2174 		fil_close_tablespace(trx, table->space_id);
2175 		table->space = NULL;
2176 	}
2177 }
2178 
2179 /*****************************************************************//**
2180 Clean up after import tablespace. */
2181 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2182 dberr_t
row_import_cleanup(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2183 row_import_cleanup(
2184 /*===============*/
2185 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2186 	trx_t*		trx,		/*!< in/out: transaction for import */
2187 	dberr_t		err)		/*!< in: error code */
2188 {
2189 	ut_a(prebuilt->trx != trx);
2190 
2191 	if (err != DB_SUCCESS) {
2192 		row_import_discard_changes(prebuilt, trx, err);
2193 	}
2194 
2195 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2196 
2197 	DBUG_EXECUTE_IF("ib_import_before_commit_crash", DBUG_SUICIDE(););
2198 
2199 	trx_commit_for_mysql(trx);
2200 
2201 	row_mysql_unlock_data_dictionary(trx);
2202 
2203 	trx->free();
2204 
2205 	prebuilt->trx->op_info = "";
2206 
2207 	DBUG_EXECUTE_IF("ib_import_before_checkpoint_crash", DBUG_SUICIDE(););
2208 
2209 	log_make_checkpoint();
2210 
2211 	return(err);
2212 }
2213 
2214 /*****************************************************************//**
2215 Report error during tablespace import. */
2216 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2217 dberr_t
row_import_error(row_prebuilt_t * prebuilt,trx_t * trx,dberr_t err)2218 row_import_error(
2219 /*=============*/
2220 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt from handler */
2221 	trx_t*		trx,		/*!< in/out: transaction for import */
2222 	dberr_t		err)		/*!< in: error code */
2223 {
2224 	if (!trx_is_interrupted(trx)) {
2225 		char	table_name[MAX_FULL_NAME_LEN + 1];
2226 
2227 		innobase_format_name(
2228 			table_name, sizeof(table_name),
2229 			prebuilt->table->name.m_name);
2230 
2231 		ib_senderrf(
2232 			trx->mysql_thd, IB_LOG_LEVEL_WARN,
2233 			ER_INNODB_IMPORT_ERROR,
2234 			table_name, (ulong) err, ut_strerr(err));
2235 	}
2236 
2237 	return(row_import_cleanup(prebuilt, trx, err));
2238 }
2239 
2240 /*****************************************************************//**
2241 Adjust the root page index node and leaf node segment headers, update
2242 with the new space id. For all the table's secondary indexes.
2243 @return error code */
2244 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2245 dberr_t
row_import_adjust_root_pages_of_secondary_indexes(trx_t * trx,dict_table_t * table,const row_import & cfg)2246 row_import_adjust_root_pages_of_secondary_indexes(
2247 /*==============================================*/
2248 	trx_t*			trx,		/*!< in: transaction used for
2249 						the import */
2250 	dict_table_t*		table,		/*!< in: table the indexes
2251 						belong to */
2252 	const row_import&	cfg)		/*!< Import context */
2253 {
2254 	dict_index_t*		index;
2255 	ulint			n_rows_in_table;
2256 	dberr_t			err = DB_SUCCESS;
2257 
2258 	/* Skip the clustered index. */
2259 	index = dict_table_get_first_index(table);
2260 
2261 	n_rows_in_table = cfg.get_n_rows(index->name);
2262 
2263 	DBUG_EXECUTE_IF("ib_import_sec_rec_count_mismatch_failure",
2264 			n_rows_in_table++;);
2265 
2266 	/* Adjust the root pages of the secondary indexes only. */
2267 	while ((index = dict_table_get_next_index(index)) != NULL) {
2268 		ut_a(!dict_index_is_clust(index));
2269 
2270 		if (!(index->type & DICT_CORRUPT)
2271 		    && index->page != FIL_NULL) {
2272 
2273 			/* Update the Btree segment headers for index node and
2274 			leaf nodes in the root page. Set the new space id. */
2275 
2276 			err = btr_root_adjust_on_import(index);
2277 		} else {
2278 			ib::warn() << "Skip adjustment of root pages for"
2279 				" index " << index->name << ".";
2280 
2281 			err = DB_CORRUPTION;
2282 		}
2283 
2284 		if (err != DB_SUCCESS) {
2285 
2286 			if (index->type & DICT_CLUSTERED) {
2287 				break;
2288 			}
2289 
2290 			ib_errf(trx->mysql_thd,
2291 				IB_LOG_LEVEL_WARN,
2292 				ER_INNODB_INDEX_CORRUPT,
2293 				"Index %s not found or corrupt,"
2294 				" you should recreate this index.",
2295 				index->name());
2296 
2297 			/* Do not bail out, so that the data
2298 			can be recovered. */
2299 
2300 			err = DB_SUCCESS;
2301 			index->type |= DICT_CORRUPT;
2302 			continue;
2303 		}
2304 
2305 		/* If we failed to purge any records in the index then
2306 		do it the hard way.
2307 
2308 		TODO: We can do this in the first pass by generating UNDO log
2309 		records for the failed rows. */
2310 
2311 		if (!cfg.requires_purge(index->name)) {
2312 			continue;
2313 		}
2314 
2315 		IndexPurge   purge(trx, index);
2316 
2317 		trx->op_info = "secondary: purge delete marked records";
2318 
2319 		err = purge.garbage_collect();
2320 
2321 		trx->op_info = "";
2322 
2323 		if (err != DB_SUCCESS) {
2324 			break;
2325 		} else if (purge.get_n_rows() != n_rows_in_table) {
2326 
2327 			ib_errf(trx->mysql_thd,
2328 				IB_LOG_LEVEL_WARN,
2329 				ER_INNODB_INDEX_CORRUPT,
2330 				"Index '%s' contains " ULINTPF " entries, "
2331 				"should be " ULINTPF ", you should recreate "
2332 				"this index.", index->name(),
2333 				purge.get_n_rows(), n_rows_in_table);
2334 
2335 			index->type |= DICT_CORRUPT;
2336 
2337 			/* Do not bail out, so that the data
2338 			can be recovered. */
2339 
2340 			err = DB_SUCCESS;
2341                 }
2342 	}
2343 
2344 	return(err);
2345 }
2346 
2347 /*****************************************************************//**
2348 Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
2349 @return error code */
2350 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2351 dberr_t
row_import_set_sys_max_row_id(row_prebuilt_t * prebuilt,const dict_table_t * table)2352 row_import_set_sys_max_row_id(
2353 /*==========================*/
2354 	row_prebuilt_t*		prebuilt,	/*!< in/out: prebuilt from
2355 						handler */
2356 	const dict_table_t*	table)		/*!< in: table to import */
2357 {
2358 	dberr_t			err;
2359 	const rec_t*		rec;
2360 	mtr_t			mtr;
2361 	btr_pcur_t		pcur;
2362 	row_id_t		row_id	= 0;
2363 	dict_index_t*		index;
2364 
2365 	index = dict_table_get_first_index(table);
2366 	ut_a(dict_index_is_clust(index));
2367 
2368 	mtr_start(&mtr);
2369 
2370 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2371 
2372 	btr_pcur_open_at_index_side(
2373 		false,		// High end
2374 		index,
2375 		BTR_SEARCH_LEAF,
2376 		&pcur,
2377 		true,		// Init cursor
2378 		0,		// Leaf level
2379 		&mtr);
2380 
2381 	btr_pcur_move_to_prev_on_page(&pcur);
2382 	rec = btr_pcur_get_rec(&pcur);
2383 
2384 	/* Check for empty table. */
2385 	if (page_rec_is_infimum(rec)) {
2386 		/* The table is empty. */
2387 		err = DB_SUCCESS;
2388 	} else if (rec_is_metadata(rec, index)) {
2389 		/* The clustered index contains the metadata record only,
2390 		that is, the table is empty. */
2391 		err = DB_SUCCESS;
2392 	} else {
2393 		ulint		len;
2394 		const byte*	field;
2395 		mem_heap_t*	heap = NULL;
2396 		rec_offs	offsets_[1 + REC_OFFS_HEADER_SIZE];
2397 		rec_offs*	offsets;
2398 
2399 		rec_offs_init(offsets_);
2400 
2401 		offsets = rec_get_offsets(
2402 			rec, index, offsets_, index->n_core_fields,
2403 			ULINT_UNDEFINED, &heap);
2404 
2405 		field = rec_get_nth_field(
2406 			rec, offsets,
2407 			dict_index_get_sys_col_pos(index, DATA_ROW_ID),
2408 			&len);
2409 
2410 		if (len == DATA_ROW_ID_LEN) {
2411 			row_id = mach_read_from_6(field);
2412 			err = DB_SUCCESS;
2413 		} else {
2414 			err = DB_CORRUPTION;
2415 		}
2416 
2417 		if (heap != NULL) {
2418 			mem_heap_free(heap);
2419 		}
2420 	}
2421 
2422 	btr_pcur_close(&pcur);
2423 	mtr_commit(&mtr);
2424 
2425 	DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
2426 			err = DB_CORRUPTION;);
2427 
2428 	if (err != DB_SUCCESS) {
2429 		ib_errf(prebuilt->trx->mysql_thd,
2430 			IB_LOG_LEVEL_WARN,
2431 			ER_INNODB_INDEX_CORRUPT,
2432 			"Index `%s` corruption detected, invalid DB_ROW_ID"
2433 			" in index.", index->name());
2434 
2435 		return(err);
2436 
2437 	} else if (row_id > 0) {
2438 
2439 		/* Update the system row id if the imported index row id is
2440 		greater than the max system row id. */
2441 
2442 		mutex_enter(&dict_sys->mutex);
2443 
2444 		if (row_id >= dict_sys->row_id) {
2445 			dict_sys->row_id = row_id + 1;
2446 			dict_hdr_flush_row_id();
2447 		}
2448 
2449 		mutex_exit(&dict_sys->mutex);
2450 	}
2451 
2452 	return(DB_SUCCESS);
2453 }
2454 
2455 /*****************************************************************//**
2456 Read the a string from the meta data file.
2457 @return DB_SUCCESS or error code. */
2458 static
2459 dberr_t
row_import_cfg_read_string(FILE * file,byte * ptr,ulint max_len)2460 row_import_cfg_read_string(
2461 /*=======================*/
2462 	FILE*		file,		/*!< in/out: File to read from */
2463 	byte*		ptr,		/*!< out: string to read */
2464 	ulint		max_len)	/*!< in: maximum length of the output
2465 					buffer in bytes */
2466 {
2467 	DBUG_EXECUTE_IF("ib_import_string_read_error",
2468 			errno = EINVAL; return(DB_IO_ERROR););
2469 
2470 	ulint		len = 0;
2471 
2472 	while (!feof(file)) {
2473 		int	ch = fgetc(file);
2474 
2475 		if (ch == EOF) {
2476 			break;
2477 		} else if (ch != 0) {
2478 			if (len < max_len) {
2479 				ptr[len++] = ch;
2480 			} else {
2481 				break;
2482 			}
2483 		/* max_len includes the NUL byte */
2484 		} else if (len != max_len - 1) {
2485 			break;
2486 		} else {
2487 			ptr[len] = 0;
2488 			return(DB_SUCCESS);
2489 		}
2490 	}
2491 
2492 	errno = EINVAL;
2493 
2494 	return(DB_IO_ERROR);
2495 }
2496 
2497 /*********************************************************************//**
2498 Write the meta data (index user fields) config file.
2499 @return DB_SUCCESS or error code. */
2500 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2501 dberr_t
row_import_cfg_read_index_fields(FILE * file,THD * thd,row_index_t * index)2502 row_import_cfg_read_index_fields(
2503 /*=============================*/
2504 	FILE*			file,	/*!< in: file to write to */
2505 	THD*			thd,	/*!< in/out: session */
2506 	row_index_t*		index)	/*!< Index being read in */
2507 {
2508 	byte			row[sizeof(ib_uint32_t) * 3];
2509 	ulint			n_fields = index->m_n_fields;
2510 
2511 	index->m_fields = UT_NEW_ARRAY_NOKEY(dict_field_t, n_fields);
2512 
2513 	/* Trigger OOM */
2514 	DBUG_EXECUTE_IF(
2515 		"ib_import_OOM_4",
2516 		UT_DELETE_ARRAY(index->m_fields);
2517 		index->m_fields = NULL;
2518 	);
2519 
2520 	if (index->m_fields == NULL) {
2521 		return(DB_OUT_OF_MEMORY);
2522 	}
2523 
2524 	dict_field_t*	field = index->m_fields;
2525 
2526 	for (ulint i = 0; i < n_fields; ++i, ++field) {
2527 		byte*		ptr = row;
2528 
2529 		/* Trigger EOF */
2530 		DBUG_EXECUTE_IF("ib_import_io_read_error_1",
2531 				(void) fseek(file, 0L, SEEK_END););
2532 
2533 		if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2534 
2535 			ib_senderrf(
2536 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2537 				(ulong) errno, strerror(errno),
2538 				"while reading index fields.");
2539 
2540 			return(DB_IO_ERROR);
2541 		}
2542 
2543 		new (field) dict_field_t();
2544 
2545 		field->prefix_len = mach_read_from_4(ptr);
2546 		ptr += sizeof(ib_uint32_t);
2547 
2548 		field->fixed_len = mach_read_from_4(ptr);
2549 		ptr += sizeof(ib_uint32_t);
2550 
2551 		/* Include the NUL byte in the length. */
2552 		ulint	len = mach_read_from_4(ptr);
2553 
2554 		byte*	name = UT_NEW_ARRAY_NOKEY(byte, len);
2555 
2556 		/* Trigger OOM */
2557 		DBUG_EXECUTE_IF(
2558 			"ib_import_OOM_5",
2559 			UT_DELETE_ARRAY(name);
2560 			name = NULL;
2561 		);
2562 
2563 		if (name == NULL) {
2564 			return(DB_OUT_OF_MEMORY);
2565 		}
2566 
2567 		field->name = reinterpret_cast<const char*>(name);
2568 
2569 		dberr_t	err = row_import_cfg_read_string(file, name, len);
2570 
2571 		if (err != DB_SUCCESS) {
2572 
2573 			ib_senderrf(
2574 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2575 				(ulong) errno, strerror(errno),
2576 				"while parsing table name.");
2577 
2578 			return(err);
2579 		}
2580 	}
2581 
2582 	return(DB_SUCCESS);
2583 }
2584 
2585 /*****************************************************************//**
2586 Read the index names and root page numbers of the indexes and set the values.
2587 Row format [root_page_no, len of str, str ... ]
2588 @return DB_SUCCESS or error code. */
2589 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2590 dberr_t
row_import_read_index_data(FILE * file,THD * thd,row_import * cfg)2591 row_import_read_index_data(
2592 /*=======================*/
2593 	FILE*		file,		/*!< in: File to read from */
2594 	THD*		thd,		/*!< in: session */
2595 	row_import*	cfg)		/*!< in/out: meta-data read */
2596 {
2597 	byte*		ptr;
2598 	row_index_t*	cfg_index;
2599 	byte		row[sizeof(index_id_t) + sizeof(ib_uint32_t) * 9];
2600 
2601 	/* FIXME: What is the max value? */
2602 	ut_a(cfg->m_n_indexes > 0);
2603 	ut_a(cfg->m_n_indexes < 1024);
2604 
2605 	cfg->m_indexes = UT_NEW_ARRAY_NOKEY(row_index_t, cfg->m_n_indexes);
2606 
2607 	/* Trigger OOM */
2608 	DBUG_EXECUTE_IF(
2609 		"ib_import_OOM_6",
2610 		UT_DELETE_ARRAY(cfg->m_indexes);
2611 		cfg->m_indexes = NULL;
2612 	);
2613 
2614 	if (cfg->m_indexes == NULL) {
2615 		return(DB_OUT_OF_MEMORY);
2616 	}
2617 
2618 	memset(cfg->m_indexes, 0x0, sizeof(*cfg->m_indexes) * cfg->m_n_indexes);
2619 
2620 	cfg_index = cfg->m_indexes;
2621 
2622 	for (ulint i = 0; i < cfg->m_n_indexes; ++i, ++cfg_index) {
2623 		/* Trigger EOF */
2624 		DBUG_EXECUTE_IF("ib_import_io_read_error_2",
2625 				(void) fseek(file, 0L, SEEK_END););
2626 
2627 		/* Read the index data. */
2628 		size_t	n_bytes = fread(row, 1, sizeof(row), file);
2629 
2630 		/* Trigger EOF */
2631 		DBUG_EXECUTE_IF("ib_import_io_read_error",
2632 				(void) fseek(file, 0L, SEEK_END););
2633 
2634 		if (n_bytes != sizeof(row)) {
2635 			char	msg[BUFSIZ];
2636 
2637 			snprintf(msg, sizeof(msg),
2638 				 "while reading index meta-data, expected "
2639 				 "to read " ULINTPF
2640 				 " bytes but read only " ULINTPF " bytes",
2641 				 sizeof(row), n_bytes);
2642 
2643 			ib_senderrf(
2644 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2645 				(ulong) errno, strerror(errno), msg);
2646 
2647 			ib::error() << "IO Error: " << msg;
2648 
2649 			return(DB_IO_ERROR);
2650 		}
2651 
2652 		ptr = row;
2653 
2654 		cfg_index->m_id = mach_read_from_8(ptr);
2655 		ptr += sizeof(index_id_t);
2656 
2657 		cfg_index->m_space = mach_read_from_4(ptr);
2658 		ptr += sizeof(ib_uint32_t);
2659 
2660 		cfg_index->m_page_no = mach_read_from_4(ptr);
2661 		ptr += sizeof(ib_uint32_t);
2662 
2663 		cfg_index->m_type = mach_read_from_4(ptr);
2664 		ptr += sizeof(ib_uint32_t);
2665 
2666 		cfg_index->m_trx_id_offset = mach_read_from_4(ptr);
2667 		if (cfg_index->m_trx_id_offset != mach_read_from_4(ptr)) {
2668 			ut_ad(0);
2669 			/* Overflow. Pretend that the clustered index
2670 			has a variable-length PRIMARY KEY. */
2671 			cfg_index->m_trx_id_offset = 0;
2672 		}
2673 		ptr += sizeof(ib_uint32_t);
2674 
2675 		cfg_index->m_n_user_defined_cols = mach_read_from_4(ptr);
2676 		ptr += sizeof(ib_uint32_t);
2677 
2678 		cfg_index->m_n_uniq = mach_read_from_4(ptr);
2679 		ptr += sizeof(ib_uint32_t);
2680 
2681 		cfg_index->m_n_nullable = mach_read_from_4(ptr);
2682 		ptr += sizeof(ib_uint32_t);
2683 
2684 		cfg_index->m_n_fields = mach_read_from_4(ptr);
2685 		ptr += sizeof(ib_uint32_t);
2686 
2687 		/* The NUL byte is included in the name length. */
2688 		ulint	len = mach_read_from_4(ptr);
2689 
2690 		if (len > OS_FILE_MAX_PATH) {
2691 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2692 				ER_INNODB_INDEX_CORRUPT,
2693 				"Index name length (" ULINTPF ") is too long, "
2694 				"the meta-data is corrupt", len);
2695 
2696 			return(DB_CORRUPTION);
2697 		}
2698 
2699 		cfg_index->m_name = UT_NEW_ARRAY_NOKEY(byte, len);
2700 
2701 		/* Trigger OOM */
2702 		DBUG_EXECUTE_IF(
2703 			"ib_import_OOM_7",
2704 			UT_DELETE_ARRAY(cfg_index->m_name);
2705 			cfg_index->m_name = NULL;
2706 		);
2707 
2708 		if (cfg_index->m_name == NULL) {
2709 			return(DB_OUT_OF_MEMORY);
2710 		}
2711 
2712 		dberr_t	err;
2713 
2714 		err = row_import_cfg_read_string(file, cfg_index->m_name, len);
2715 
2716 		if (err != DB_SUCCESS) {
2717 
2718 			ib_senderrf(
2719 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2720 				(ulong) errno, strerror(errno),
2721 				"while parsing index name.");
2722 
2723 			return(err);
2724 		}
2725 
2726 		err = row_import_cfg_read_index_fields(file, thd, cfg_index);
2727 
2728 		if (err != DB_SUCCESS) {
2729 			return(err);
2730 		}
2731 
2732 	}
2733 
2734 	return(DB_SUCCESS);
2735 }
2736 
2737 /*****************************************************************//**
2738 Set the index root page number for v1 format.
2739 @return DB_SUCCESS or error code. */
2740 static
2741 dberr_t
row_import_read_indexes(FILE * file,THD * thd,row_import * cfg)2742 row_import_read_indexes(
2743 /*====================*/
2744 	FILE*		file,		/*!< in: File to read from */
2745 	THD*		thd,		/*!< in: session */
2746 	row_import*	cfg)		/*!< in/out: meta-data read */
2747 {
2748 	byte		row[sizeof(ib_uint32_t)];
2749 
2750 	/* Trigger EOF */
2751 	DBUG_EXECUTE_IF("ib_import_io_read_error_3",
2752 			(void) fseek(file, 0L, SEEK_END););
2753 
2754 	/* Read the number of indexes. */
2755 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
2756 		ib_senderrf(
2757 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2758 			(ulong) errno, strerror(errno),
2759 			"while reading number of indexes.");
2760 
2761 		return(DB_IO_ERROR);
2762 	}
2763 
2764 	cfg->m_n_indexes = mach_read_from_4(row);
2765 
2766 	if (cfg->m_n_indexes == 0) {
2767 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2768 			"Number of indexes in meta-data file is 0");
2769 
2770 		return(DB_CORRUPTION);
2771 
2772 	} else if (cfg->m_n_indexes > 1024) {
2773 		// FIXME: What is the upper limit? */
2774 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2775 			"Number of indexes in meta-data file is too high: "
2776 			ULINTPF, cfg->m_n_indexes);
2777 		cfg->m_n_indexes = 0;
2778 
2779 		return(DB_CORRUPTION);
2780 	}
2781 
2782 	return(row_import_read_index_data(file, thd, cfg));
2783 }
2784 
2785 /*********************************************************************//**
2786 Read the meta data (table columns) config file. Deserialise the contents of
2787 dict_col_t structure, along with the column name. */
2788 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2789 dberr_t
row_import_read_columns(FILE * file,THD * thd,row_import * cfg)2790 row_import_read_columns(
2791 /*====================*/
2792 	FILE*			file,	/*!< in: file to write to */
2793 	THD*			thd,	/*!< in/out: session */
2794 	row_import*		cfg)	/*!< in/out: meta-data read */
2795 {
2796 	dict_col_t*		col;
2797 	byte			row[sizeof(ib_uint32_t) * 8];
2798 
2799 	/* FIXME: What should the upper limit be? */
2800 	ut_a(cfg->m_n_cols > 0);
2801 	ut_a(cfg->m_n_cols < 1024);
2802 
2803 	cfg->m_cols = UT_NEW_ARRAY_NOKEY(dict_col_t, cfg->m_n_cols);
2804 
2805 	/* Trigger OOM */
2806 	DBUG_EXECUTE_IF(
2807 		"ib_import_OOM_8",
2808 		UT_DELETE_ARRAY(cfg->m_cols);
2809 		cfg->m_cols = NULL;
2810 	);
2811 
2812 	if (cfg->m_cols == NULL) {
2813 		return(DB_OUT_OF_MEMORY);
2814 	}
2815 
2816 	cfg->m_col_names = UT_NEW_ARRAY_NOKEY(byte*, cfg->m_n_cols);
2817 
2818 	/* Trigger OOM */
2819 	DBUG_EXECUTE_IF(
2820 		"ib_import_OOM_9",
2821 		UT_DELETE_ARRAY(cfg->m_col_names);
2822 		cfg->m_col_names = NULL;
2823 	);
2824 
2825 	if (cfg->m_col_names == NULL) {
2826 		return(DB_OUT_OF_MEMORY);
2827 	}
2828 
2829 	memset(cfg->m_cols, 0x0, sizeof(cfg->m_cols) * cfg->m_n_cols);
2830 	memset(cfg->m_col_names, 0x0, sizeof(cfg->m_col_names) * cfg->m_n_cols);
2831 
2832 	col = cfg->m_cols;
2833 
2834 	for (ulint i = 0; i < cfg->m_n_cols; ++i, ++col) {
2835 		byte*		ptr = row;
2836 
2837 		/* Trigger EOF */
2838 		DBUG_EXECUTE_IF("ib_import_io_read_error_4",
2839 				(void) fseek(file, 0L, SEEK_END););
2840 
2841 		if (fread(row, 1,  sizeof(row), file) != sizeof(row)) {
2842 			ib_senderrf(
2843 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2844 				(ulong) errno, strerror(errno),
2845 				"while reading table column meta-data.");
2846 
2847 			return(DB_IO_ERROR);
2848 		}
2849 
2850 		col->prtype = mach_read_from_4(ptr);
2851 		ptr += sizeof(ib_uint32_t);
2852 
2853 		col->mtype = mach_read_from_4(ptr);
2854 		ptr += sizeof(ib_uint32_t);
2855 
2856 		col->len = mach_read_from_4(ptr);
2857 		ptr += sizeof(ib_uint32_t);
2858 
2859 		ulint mbminmaxlen = mach_read_from_4(ptr);
2860 		col->mbmaxlen = mbminmaxlen / 5;
2861 		col->mbminlen = mbminmaxlen % 5;
2862 		ptr += sizeof(ib_uint32_t);
2863 
2864 		col->ind = mach_read_from_4(ptr);
2865 		ptr += sizeof(ib_uint32_t);
2866 
2867 		col->ord_part = mach_read_from_4(ptr);
2868 		ptr += sizeof(ib_uint32_t);
2869 
2870 		col->max_prefix = mach_read_from_4(ptr);
2871 		ptr += sizeof(ib_uint32_t);
2872 
2873 		/* Read in the column name as [len, byte array]. The len
2874 		includes the NUL byte. */
2875 
2876 		ulint		len = mach_read_from_4(ptr);
2877 
2878 		/* FIXME: What is the maximum column name length? */
2879 		if (len == 0 || len > 128) {
2880 			ib_errf(thd, IB_LOG_LEVEL_ERROR,
2881 				ER_IO_READ_ERROR,
2882 				"Column name length " ULINTPF ", is invalid",
2883 				len);
2884 
2885 			return(DB_CORRUPTION);
2886 		}
2887 
2888 		cfg->m_col_names[i] = UT_NEW_ARRAY_NOKEY(byte, len);
2889 
2890 		/* Trigger OOM */
2891 		DBUG_EXECUTE_IF(
2892 			"ib_import_OOM_10",
2893 			UT_DELETE_ARRAY(cfg->m_col_names[i]);
2894 			cfg->m_col_names[i] = NULL;
2895 		);
2896 
2897 		if (cfg->m_col_names[i] == NULL) {
2898 			return(DB_OUT_OF_MEMORY);
2899 		}
2900 
2901 		dberr_t	err;
2902 
2903 		err = row_import_cfg_read_string(
2904 			file, cfg->m_col_names[i], len);
2905 
2906 		if (err != DB_SUCCESS) {
2907 
2908 			ib_senderrf(
2909 				thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2910 				(ulong) errno, strerror(errno),
2911 				"while parsing table column name.");
2912 
2913 			return(err);
2914 		}
2915 	}
2916 
2917 	return(DB_SUCCESS);
2918 }
2919 
2920 /*****************************************************************//**
2921 Read the contents of the <tablespace>.cfg file.
2922 @return DB_SUCCESS or error code. */
2923 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2924 dberr_t
row_import_read_v1(FILE * file,THD * thd,row_import * cfg)2925 row_import_read_v1(
2926 /*===============*/
2927 	FILE*		file,		/*!< in: File to read from */
2928 	THD*		thd,		/*!< in: session */
2929 	row_import*	cfg)		/*!< out: meta data */
2930 {
2931 	byte		value[sizeof(ib_uint32_t)];
2932 
2933 	/* Trigger EOF */
2934 	DBUG_EXECUTE_IF("ib_import_io_read_error_5",
2935 			(void) fseek(file, 0L, SEEK_END););
2936 
2937 	/* Read the hostname where the tablespace was exported. */
2938 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2939 		ib_senderrf(
2940 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2941 			(ulong) errno, strerror(errno),
2942 			"while reading meta-data export hostname length.");
2943 
2944 		return(DB_IO_ERROR);
2945 	}
2946 
2947 	ulint	len = mach_read_from_4(value);
2948 
2949 	/* NUL byte is part of name length. */
2950 	cfg->m_hostname = UT_NEW_ARRAY_NOKEY(byte, len);
2951 
2952 	/* Trigger OOM */
2953 	DBUG_EXECUTE_IF(
2954 		"ib_import_OOM_1",
2955 		UT_DELETE_ARRAY(cfg->m_hostname);
2956 		cfg->m_hostname = NULL;
2957 	);
2958 
2959 	if (cfg->m_hostname == NULL) {
2960 		return(DB_OUT_OF_MEMORY);
2961 	}
2962 
2963 	dberr_t	err = row_import_cfg_read_string(file, cfg->m_hostname, len);
2964 
2965 	if (err != DB_SUCCESS) {
2966 
2967 		ib_senderrf(
2968 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2969 			(ulong) errno, strerror(errno),
2970 			"while parsing export hostname.");
2971 
2972 		return(err);
2973 	}
2974 
2975 	/* Trigger EOF */
2976 	DBUG_EXECUTE_IF("ib_import_io_read_error_6",
2977 			(void) fseek(file, 0L, SEEK_END););
2978 
2979 	/* Read the table name of tablespace that was exported. */
2980 	if (fread(value, 1, sizeof(value), file) != sizeof(value)) {
2981 		ib_senderrf(
2982 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
2983 			(ulong) errno, strerror(errno),
2984 			"while reading meta-data table name length.");
2985 
2986 		return(DB_IO_ERROR);
2987 	}
2988 
2989 	len = mach_read_from_4(value);
2990 
2991 	/* NUL byte is part of name length. */
2992 	cfg->m_table_name = UT_NEW_ARRAY_NOKEY(byte, len);
2993 
2994 	/* Trigger OOM */
2995 	DBUG_EXECUTE_IF(
2996 		"ib_import_OOM_2",
2997 		UT_DELETE_ARRAY(cfg->m_table_name);
2998 		cfg->m_table_name = NULL;
2999 	);
3000 
3001 	if (cfg->m_table_name == NULL) {
3002 		return(DB_OUT_OF_MEMORY);
3003 	}
3004 
3005 	err = row_import_cfg_read_string(file, cfg->m_table_name, len);
3006 
3007 	if (err != DB_SUCCESS) {
3008 		ib_senderrf(
3009 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3010 			(ulong) errno, strerror(errno),
3011 			"while parsing table name.");
3012 
3013 		return(err);
3014 	}
3015 
3016 	ib::info() << "Importing tablespace for table '" << cfg->m_table_name
3017 		<< "' that was exported from host '" << cfg->m_hostname << "'";
3018 
3019 	byte		row[sizeof(ib_uint32_t) * 3];
3020 
3021 	/* Trigger EOF */
3022 	DBUG_EXECUTE_IF("ib_import_io_read_error_7",
3023 			(void) fseek(file, 0L, SEEK_END););
3024 
3025 	/* Read the autoinc value. */
3026 	if (fread(row, 1, sizeof(ib_uint64_t), file) != sizeof(ib_uint64_t)) {
3027 		ib_senderrf(
3028 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3029 			(ulong) errno, strerror(errno),
3030 			"while reading autoinc value.");
3031 
3032 		return(DB_IO_ERROR);
3033 	}
3034 
3035 	cfg->m_autoinc = mach_read_from_8(row);
3036 
3037 	/* Trigger EOF */
3038 	DBUG_EXECUTE_IF("ib_import_io_read_error_8",
3039 			(void) fseek(file, 0L, SEEK_END););
3040 
3041 	/* Read the tablespace page size. */
3042 	if (fread(row, 1, sizeof(row), file) != sizeof(row)) {
3043 		ib_senderrf(
3044 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3045 			(ulong) errno, strerror(errno),
3046 			"while reading meta-data header.");
3047 
3048 		return(DB_IO_ERROR);
3049 	}
3050 
3051 	byte*		ptr = row;
3052 
3053 	const ulint	logical_page_size = mach_read_from_4(ptr);
3054 	ptr += sizeof(ib_uint32_t);
3055 
3056 	if (logical_page_size != srv_page_size) {
3057 
3058 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
3059 			"Tablespace to be imported has a different"
3060 			" page size than this server. Server page size"
3061 			" is %lu, whereas tablespace page size"
3062 			" is " ULINTPF,
3063 			srv_page_size,
3064 			logical_page_size);
3065 
3066 		return(DB_ERROR);
3067 	}
3068 
3069 	cfg->m_flags = mach_read_from_4(ptr);
3070 	ptr += sizeof(ib_uint32_t);
3071 
3072 	cfg->m_page_size.copy_from(dict_tf_get_page_size(cfg->m_flags));
3073 
3074 	ut_a(logical_page_size == cfg->m_page_size.logical());
3075 
3076 	cfg->m_n_cols = mach_read_from_4(ptr);
3077 
3078 	if (!dict_tf_is_valid(cfg->m_flags)) {
3079 		ib_errf(thd, IB_LOG_LEVEL_ERROR,
3080 			ER_TABLE_SCHEMA_MISMATCH,
3081 			"Invalid table flags: " ULINTPF, cfg->m_flags);
3082 
3083 		return(DB_CORRUPTION);
3084 	}
3085 
3086 	err = row_import_read_columns(file, thd, cfg);
3087 
3088 	if (err == DB_SUCCESS) {
3089 		err = row_import_read_indexes(file, thd, cfg);
3090 	}
3091 
3092 	return(err);
3093 }
3094 
3095 /**
3096 Read the contents of the <tablespace>.cfg file.
3097 @return DB_SUCCESS or error code. */
3098 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3099 dberr_t
row_import_read_meta_data(FILE * file,THD * thd,row_import & cfg)3100 row_import_read_meta_data(
3101 /*======================*/
3102 	FILE*		file,		/*!< in: File to read from */
3103 	THD*		thd,		/*!< in: session */
3104 	row_import&	cfg)		/*!< out: contents of the .cfg file */
3105 {
3106 	byte		row[sizeof(ib_uint32_t)];
3107 
3108 	/* Trigger EOF */
3109 	DBUG_EXECUTE_IF("ib_import_io_read_error_9",
3110 			(void) fseek(file, 0L, SEEK_END););
3111 
3112 	if (fread(&row, 1, sizeof(row), file) != sizeof(row)) {
3113 		ib_senderrf(
3114 			thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3115 			(ulong) errno, strerror(errno),
3116 			"while reading meta-data version.");
3117 
3118 		return(DB_IO_ERROR);
3119 	}
3120 
3121 	cfg.m_version = mach_read_from_4(row);
3122 
3123 	/* Check the version number. */
3124 	switch (cfg.m_version) {
3125 	case IB_EXPORT_CFG_VERSION_V1:
3126 
3127 		return(row_import_read_v1(file, thd, &cfg));
3128 	default:
3129 		ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_IO_READ_ERROR,
3130 			"Unsupported meta-data version number (" ULINTPF "), "
3131 			"file ignored", cfg.m_version);
3132 	}
3133 
3134 	return(DB_ERROR);
3135 }
3136 
3137 /**
3138 Read the contents of the <tablename>.cfg file.
3139 @return DB_SUCCESS or error code. */
3140 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
3141 dberr_t
row_import_read_cfg(dict_table_t * table,THD * thd,row_import & cfg)3142 row_import_read_cfg(
3143 /*================*/
3144 	dict_table_t*	table,	/*!< in: table */
3145 	THD*		thd,	/*!< in: session */
3146 	row_import&	cfg)	/*!< out: contents of the .cfg file */
3147 {
3148 	dberr_t		err;
3149 	char		name[OS_FILE_MAX_PATH];
3150 
3151 	cfg.m_table = table;
3152 
3153 	srv_get_meta_data_filename(table, name, sizeof(name));
3154 
3155 	FILE*	file = fopen(name, "rb");
3156 
3157 	if (file == NULL) {
3158 		char	msg[BUFSIZ];
3159 
3160 		snprintf(msg, sizeof(msg),
3161 			 "Error opening '%s', will attempt to import"
3162 			 " without schema verification", name);
3163 
3164 		ib_senderrf(
3165 			thd, IB_LOG_LEVEL_WARN, ER_IO_READ_ERROR,
3166 			(ulong) errno, strerror(errno), msg);
3167 
3168 		cfg.m_missing = true;
3169 
3170 		err = DB_FAIL;
3171 	} else {
3172 
3173 		cfg.m_missing = false;
3174 
3175 		err = row_import_read_meta_data(file, thd, cfg);
3176 		fclose(file);
3177 	}
3178 
3179 	return(err);
3180 }
3181 
3182 /** Update the root page numbers and tablespace ID of a table.
3183 @param[in,out]	trx	dictionary transaction
3184 @param[in,out]	table	persistent table
3185 @param[in]	reset	whether to reset the fields to FIL_NULL
3186 @return DB_SUCCESS or error code */
3187 dberr_t
row_import_update_index_root(trx_t * trx,dict_table_t * table,bool reset)3188 row_import_update_index_root(trx_t* trx, dict_table_t* table, bool reset)
3189 {
3190 	const dict_index_t*	index;
3191 	que_t*			graph = 0;
3192 	dberr_t			err = DB_SUCCESS;
3193 
3194 	ut_ad(reset || table->space->id == table->space_id);
3195 
3196 	static const char	sql[] = {
3197 		"PROCEDURE UPDATE_INDEX_ROOT() IS\n"
3198 		"BEGIN\n"
3199 		"UPDATE SYS_INDEXES\n"
3200 		"SET SPACE = :space,\n"
3201 		"    PAGE_NO = :page,\n"
3202 		"    TYPE = :type\n"
3203 		"WHERE TABLE_ID = :table_id AND ID = :index_id;\n"
3204 		"END;\n"};
3205 
3206 	table->def_trx_id = trx->id;
3207 
3208 	for (index = dict_table_get_first_index(table);
3209 	     index != 0;
3210 	     index = dict_table_get_next_index(index)) {
3211 
3212 		pars_info_t*	info;
3213 		ib_uint32_t	page;
3214 		ib_uint32_t	space;
3215 		ib_uint32_t	type;
3216 		index_id_t	index_id;
3217 		table_id_t	table_id;
3218 
3219 		info = (graph != 0) ? graph->info : pars_info_create();
3220 
3221 		mach_write_to_4(
3222 			reinterpret_cast<byte*>(&type),
3223 			index->type);
3224 
3225 		mach_write_to_4(
3226 			reinterpret_cast<byte*>(&page),
3227 			reset ? FIL_NULL : index->page);
3228 
3229 		mach_write_to_4(
3230 			reinterpret_cast<byte*>(&space),
3231 			reset ? FIL_NULL : index->table->space_id);
3232 
3233 		mach_write_to_8(
3234 			reinterpret_cast<byte*>(&index_id),
3235 			index->id);
3236 
3237 		mach_write_to_8(
3238 			reinterpret_cast<byte*>(&table_id),
3239 			table->id);
3240 
3241 		/* If we set the corrupt bit during the IMPORT phase then
3242 		we need to update the system tables. */
3243 		pars_info_bind_int4_literal(info, "type", &type);
3244 		pars_info_bind_int4_literal(info, "space", &space);
3245 		pars_info_bind_int4_literal(info, "page", &page);
3246 		pars_info_bind_ull_literal(info, "index_id", &index_id);
3247 		pars_info_bind_ull_literal(info, "table_id", &table_id);
3248 
3249 		if (graph == 0) {
3250 			graph = pars_sql(info, sql);
3251 			ut_a(graph);
3252 			graph->trx = trx;
3253 		}
3254 
3255 		que_thr_t*	thr;
3256 
3257 		graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
3258 
3259 		ut_a(thr = que_fork_start_command(graph));
3260 
3261 		que_run_threads(thr);
3262 
3263 		DBUG_EXECUTE_IF("ib_import_internal_error",
3264 				trx->error_state = DB_ERROR;);
3265 
3266 		err = trx->error_state;
3267 
3268 		if (err != DB_SUCCESS) {
3269 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3270 				ER_INTERNAL_ERROR,
3271 				"While updating the <space, root page"
3272 				" number> of index %s - %s",
3273 				index->name(), ut_strerr(err));
3274 
3275 			break;
3276 		}
3277 	}
3278 
3279 	que_graph_free(graph);
3280 
3281 	return(err);
3282 }
3283 
3284 /** Callback arg for row_import_set_discarded. */
3285 struct discard_t {
3286 	ib_uint32_t	flags2;			/*!< Value read from column */
3287 	bool		state;			/*!< New state of the flag */
3288 	ulint		n_recs;			/*!< Number of recs processed */
3289 };
3290 
3291 /******************************************************************//**
3292 Fetch callback that sets or unsets the DISCARDED tablespace flag in
3293 SYS_TABLES. The flags is stored in MIX_LEN column.
3294 @return FALSE if all OK */
3295 static
3296 ibool
row_import_set_discarded(void * row,void * user_arg)3297 row_import_set_discarded(
3298 /*=====================*/
3299 	void*		row,			/*!< in: sel_node_t* */
3300 	void*		user_arg)		/*!< in: bool set/unset flag */
3301 {
3302 	sel_node_t*	node = static_cast<sel_node_t*>(row);
3303 	discard_t*	discard = static_cast<discard_t*>(user_arg);
3304 	dfield_t*	dfield = que_node_get_val(node->select_list);
3305 	dtype_t*	type = dfield_get_type(dfield);
3306 	ulint		len = dfield_get_len(dfield);
3307 
3308 	ut_a(dtype_get_mtype(type) == DATA_INT);
3309 	ut_a(len == sizeof(ib_uint32_t));
3310 
3311 	ulint	flags2 = mach_read_from_4(
3312 		static_cast<byte*>(dfield_get_data(dfield)));
3313 
3314 	if (discard->state) {
3315 		flags2 |= DICT_TF2_DISCARDED;
3316 	} else {
3317 		flags2 &= ~DICT_TF2_DISCARDED;
3318 	}
3319 
3320 	mach_write_to_4(reinterpret_cast<byte*>(&discard->flags2), flags2);
3321 
3322 	++discard->n_recs;
3323 
3324 	/* There should be at most one matching record. */
3325 	ut_a(discard->n_recs == 1);
3326 
3327 	return(FALSE);
3328 }
3329 
3330 /** Update the DICT_TF2_DISCARDED flag in SYS_TABLES.MIX_LEN.
3331 @param[in,out]	trx		dictionary transaction
3332 @param[in]	table_id	table identifier
3333 @param[in]	discarded	whether to set or clear the flag
3334 @return DB_SUCCESS or error code */
row_import_update_discarded_flag(trx_t * trx,table_id_t table_id,bool discarded)3335 dberr_t row_import_update_discarded_flag(trx_t* trx, table_id_t table_id,
3336 					 bool discarded)
3337 {
3338 	pars_info_t*		info;
3339 	discard_t		discard;
3340 
3341 	static const char	sql[] =
3342 		"PROCEDURE UPDATE_DISCARDED_FLAG() IS\n"
3343 		"DECLARE FUNCTION my_func;\n"
3344 		"DECLARE CURSOR c IS\n"
3345 		" SELECT MIX_LEN"
3346 		" FROM SYS_TABLES"
3347 		" WHERE ID = :table_id FOR UPDATE;"
3348 		"\n"
3349 		"BEGIN\n"
3350 		"OPEN c;\n"
3351 		"WHILE 1 = 1 LOOP\n"
3352 		"  FETCH c INTO my_func();\n"
3353 		"  IF c % NOTFOUND THEN\n"
3354 		"    EXIT;\n"
3355 		"  END IF;\n"
3356 		"END LOOP;\n"
3357 		"UPDATE SYS_TABLES"
3358 		" SET MIX_LEN = :flags2"
3359 		" WHERE ID = :table_id;\n"
3360 		"CLOSE c;\n"
3361 		"END;\n";
3362 
3363 	discard.n_recs = 0;
3364 	discard.state = discarded;
3365 	discard.flags2 = ULINT32_UNDEFINED;
3366 
3367 	info = pars_info_create();
3368 
3369 	pars_info_add_ull_literal(info, "table_id", table_id);
3370 	pars_info_bind_int4_literal(info, "flags2", &discard.flags2);
3371 
3372 	pars_info_bind_function(
3373 		info, "my_func", row_import_set_discarded, &discard);
3374 
3375 	dberr_t	err = que_eval_sql(info, sql, false, trx);
3376 
3377 	ut_a(discard.n_recs == 1);
3378 	ut_a(discard.flags2 != ULINT32_UNDEFINED);
3379 
3380 	return(err);
3381 }
3382 
3383 /** InnoDB writes page by page when there is page compressed
3384 tablespace involved. It does help to save the disk space when
3385 punch hole is enabled
3386 @param iter 	Tablespace iterator
3387 @param write_request Request to write into the file
3388 @param offset	offset of the file to be written
3389 @param writeptr	buffer to be written
3390 @param n_bytes	number of bytes to be written
3391 @param try_punch_only	Try the range punch only because the
3392 			current range is full of empty pages
3393 @return DB_SUCCESS */
3394 static
fil_import_compress_fwrite(const fil_iterator_t & iter,const IORequest & write_request,os_offset_t offset,const byte * writeptr,ulint n_bytes,bool try_punch_only=false)3395 dberr_t fil_import_compress_fwrite(const fil_iterator_t &iter,
3396                                    const IORequest &write_request,
3397                                    os_offset_t offset,
3398                                    const byte *writeptr,
3399                                    ulint n_bytes,
3400                                    bool try_punch_only=false)
3401 {
3402   dberr_t err= os_file_punch_hole(iter.file, offset, n_bytes);
3403   if (err != DB_SUCCESS || try_punch_only)
3404     return err;
3405 
3406   for (ulint j= 0; j < n_bytes; j+= srv_page_size)
3407   {
3408     /* Read the original data length from block and
3409     safer to read FIL_PAGE_COMPRESSED_SIZE because it
3410     is not encrypted*/
3411     ulint n_write_bytes= srv_page_size;
3412     if (j || offset)
3413     {
3414       n_write_bytes= mach_read_from_2(writeptr + j + FIL_PAGE_DATA);
3415       const unsigned  ptype= mach_read_from_2(writeptr + j + FIL_PAGE_TYPE);
3416       /* Ignore the empty page */
3417       if (ptype == 0 && n_write_bytes == 0)
3418         continue;
3419       n_write_bytes+= FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
3420       if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
3421         n_write_bytes+= FIL_PAGE_COMPRESSION_METHOD_SIZE;
3422     }
3423 
3424     err= os_file_write(write_request, iter.filepath, iter.file,
3425                        writeptr + j, offset + j, n_write_bytes);
3426     if (err != DB_SUCCESS)
3427       break;
3428   }
3429 
3430   return err;
3431 }
3432 
run(const fil_iterator_t & iter,buf_block_t * block)3433 dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
3434 				 buf_block_t* block) UNIV_NOTHROW
3435 {
3436   const ulint size= get_page_size().physical();
3437   const ulint buf_size = srv_page_size
3438 #ifdef HAVE_LZO
3439 		+ LZO1X_1_15_MEM_COMPRESS
3440 #elif defined HAVE_SNAPPY
3441 		+ snappy_max_compressed_length(srv_page_size)
3442 #endif
3443 		;
3444   byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
3445   ut_ad(!srv_read_only_mode);
3446 
3447   if (!page_compress_buf)
3448     return DB_OUT_OF_MEMORY;
3449 
3450   const bool encrypted= iter.crypt_data != NULL &&
3451     iter.crypt_data->should_encrypt();
3452   byte* const readptr= iter.io_buffer;
3453   block->frame= readptr;
3454 
3455   if (block->page.zip.data)
3456     block->page.zip.data= readptr;
3457 
3458   IORequest read_request(IORequest::READ);
3459   read_request.disable_partial_io_warnings();
3460   ulint page_no= 0;
3461   bool page_compressed= false;
3462 
3463   dberr_t err= os_file_read_no_error_handling(
3464     read_request, iter.file, readptr, 3 * size, size, 0);
3465   if (err != DB_SUCCESS)
3466   {
3467     ib::error() << iter.filepath << ": os_file_read() failed";
3468     goto func_exit;
3469   }
3470 
3471   block->page.id.set_page_no(3);
3472   page_no= page_get_page_no(readptr);
3473 
3474   if (page_no != 3)
3475   {
3476 page_corrupted:
3477     ib::warn() << filename() << ": Page 3 at offset "
3478                << 3 * size << " looks corrupted.";
3479     err= DB_CORRUPTION;
3480     goto func_exit;
3481   }
3482 
3483   page_compressed= fil_page_is_compressed_encrypted(readptr) ||
3484     fil_page_is_compressed(readptr);
3485 
3486   if (page_compressed && block->page.zip.data)
3487     goto page_corrupted;
3488 
3489   if (encrypted)
3490   {
3491     if (!fil_space_verify_crypt_checksum(readptr, get_page_size()))
3492       goto page_corrupted;
3493 
3494     if (ENCRYPTION_KEY_NOT_ENCRYPTED ==
3495         mach_read_from_4(readptr + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
3496       goto page_corrupted;
3497 
3498     if ((err = fil_space_decrypt(iter.crypt_data, readptr,
3499 				 get_page_size(), readptr)))
3500       goto func_exit;
3501   }
3502 
3503   if (page_compressed)
3504   {
3505     ulint compress_length = fil_page_decompress(page_compress_buf, readptr);
3506     ut_ad(compress_length != srv_page_size);
3507     if (compress_length == 0)
3508       goto page_corrupted;
3509   }
3510   else if (buf_page_is_corrupted(
3511             false, readptr, get_page_size(), NULL))
3512     goto page_corrupted;
3513 
3514   err = this->operator()(block);
3515 func_exit:
3516   free(page_compress_buf);
3517   return err;
3518 }
3519 
fil_iterate(const fil_iterator_t & iter,buf_block_t * block,AbstractCallback & callback)3520 static dberr_t fil_iterate(
3521 	const fil_iterator_t&	iter,
3522 	buf_block_t*		block,
3523 	AbstractCallback&	callback)
3524 {
3525 	os_offset_t		offset;
3526 	const ulint	 	size = callback.get_page_size().physical();
3527 	ulint			n_bytes = iter.n_io_buffers * size;
3528 
3529 	const ulint buf_size = srv_page_size
3530 #ifdef HAVE_LZO
3531 		+ LZO1X_1_15_MEM_COMPRESS
3532 #elif defined HAVE_SNAPPY
3533 		+ snappy_max_compressed_length(srv_page_size)
3534 #endif
3535 		;
3536 	byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
3537 	ut_ad(!srv_read_only_mode);
3538 
3539 	if (!page_compress_buf) {
3540 		return DB_OUT_OF_MEMORY;
3541 	}
3542 
3543 	/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
3544 	copying for non-index pages. Unfortunately, it is
3545 	required by buf_zip_decompress() */
3546 	dberr_t		err = DB_SUCCESS;
3547 	bool		page_compressed = false;
3548 	bool		punch_hole = true;
3549 	IORequest	write_request(IORequest::WRITE);
3550 
3551 	for (offset = iter.start; offset < iter.end; offset += n_bytes) {
3552 		if (callback.is_interrupted()) {
3553 			err = DB_INTERRUPTED;
3554 			goto func_exit;
3555 		}
3556 
3557 		byte*		io_buffer = iter.io_buffer;
3558 		block->frame = io_buffer;
3559 
3560 		if (block->page.zip.data) {
3561 			/* Zip IO is done in the compressed page buffer. */
3562 			io_buffer = block->page.zip.data;
3563 		}
3564 
3565 		/* We have to read the exact number of bytes. Otherwise the
3566 		InnoDB IO functions croak on failed reads. */
3567 
3568 		n_bytes = ulint(ut_min(os_offset_t(n_bytes),
3569 				       iter.end - offset));
3570 
3571 		ut_ad(n_bytes > 0);
3572 		ut_ad(!(n_bytes % size));
3573 
3574 		const bool encrypted = iter.crypt_data != NULL
3575 			&& iter.crypt_data->should_encrypt();
3576 		/* Use additional crypt io buffer if tablespace is encrypted */
3577 		byte* const readptr = encrypted
3578 			? iter.crypt_io_buffer : io_buffer;
3579 		byte* const writeptr = readptr;
3580 
3581 		IORequest	read_request(IORequest::READ);
3582 		read_request.disable_partial_io_warnings();
3583 
3584 		err = os_file_read_no_error_handling(
3585 			read_request, iter.file, readptr, offset, n_bytes, 0);
3586 		if (err != DB_SUCCESS) {
3587 			ib::error() << iter.filepath
3588 				    << ": os_file_read() failed";
3589 			goto func_exit;
3590 		}
3591 
3592 		bool		updated = false;
3593 		os_offset_t	page_off = offset;
3594 		ulint		n_pages_read = n_bytes / size;
3595 		block->page.id.set_page_no(ulint(page_off / size));
3596 
3597 		for (ulint i = 0; i < n_pages_read;
3598 		     block->page.id.set_page_no(block->page.id.page_no() + 1),
3599 		     ++i, page_off += size, block->frame += size) {
3600 			byte*	src = readptr + i * size;
3601 			const ulint page_no = page_get_page_no(src);
3602 			if (!page_no && block->page.id.page_no()) {
3603 				const ulint* b = reinterpret_cast<const ulint*>
3604 					(src);
3605 				const ulint* const e = b + size / sizeof *b;
3606 				do {
3607 					if (*b++) {
3608 						goto page_corrupted;
3609 					}
3610 				} while (b != e);
3611 
3612 				/* Proceed to the next page,
3613 				because this one is all zero. */
3614 				continue;
3615 			}
3616 
3617 			if (page_no != block->page.id.page_no()) {
3618 page_corrupted:
3619 				ib::warn() << callback.filename()
3620 					   << ": Page " << (offset / size)
3621 					   << " at offset " << offset
3622 					   << " looks corrupted.";
3623 				err = DB_CORRUPTION;
3624 				goto func_exit;
3625 			}
3626 
3627 			page_compressed= fil_page_is_compressed_encrypted(src)
3628 					 || fil_page_is_compressed(src);
3629 
3630 			if (page_compressed && block->page.zip.data) {
3631 				goto page_corrupted;
3632 			}
3633 
3634 			bool decrypted = false;
3635 			byte* dst = io_buffer + i * size;
3636 			bool frame_changed = false;
3637 
3638 			if (!encrypted) {
3639 			} else if (!mach_read_from_4(
3640 					   FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
3641 					   + src)) {
3642 				if (block->page.id.page_no() == 0
3643 				    && block->page.zip.data) {
3644 					block->page.zip.data = src;
3645 					frame_changed = true;
3646 				} else if (!page_compressed
3647 					   && !block->page.zip.data) {
3648 					block->frame = src;
3649 					frame_changed = true;
3650 				} else {
3651 					ut_ad(dst != src);
3652 					memcpy(dst, src, size);
3653 				}
3654 			} else {
3655 				if (!fil_space_verify_crypt_checksum(
3656 					    src, callback.get_page_size())) {
3657 					goto page_corrupted;
3658 				}
3659 
3660 				if ((err = fil_space_decrypt(
3661 					iter.crypt_data, dst,
3662 					callback.get_page_size(), src))) {
3663 					goto func_exit;
3664 				}
3665 
3666 				decrypted = true;
3667 				updated = true;
3668 			}
3669 
3670 			/* If the original page is page_compressed, we need
3671 			to decompress it before adjusting further. */
3672 			if (page_compressed) {
3673 				ulint compress_length = fil_page_decompress(
3674 					page_compress_buf, dst);
3675 				ut_ad(compress_length != srv_page_size);
3676 				if (compress_length == 0) {
3677 					goto page_corrupted;
3678 				}
3679 				updated = true;
3680 			} else if (buf_page_is_corrupted(
3681 					   false,
3682 					   encrypted && !frame_changed
3683 					   ? dst : src,
3684 					   callback.get_page_size(), NULL)) {
3685 				goto page_corrupted;
3686 			}
3687 
3688 			if ((err = callback(block)) != DB_SUCCESS) {
3689 				goto func_exit;
3690 			} else if (!updated) {
3691 				updated = buf_block_get_state(block)
3692 					== BUF_BLOCK_FILE_PAGE;
3693 			}
3694 
3695 			/* If tablespace is encrypted we use additional
3696 			temporary scratch area where pages are read
3697 			for decrypting readptr == crypt_io_buffer != io_buffer.
3698 
3699 			Destination for decryption is a buffer pool block
3700 			block->frame == dst == io_buffer that is updated.
3701 			Pages that did not require decryption even when
3702 			tablespace is marked as encrypted are not copied
3703 			instead block->frame is set to src == readptr.
3704 
3705 			For encryption we again use temporary scratch area
3706 			writeptr != io_buffer == dst
3707 			that is then written to the tablespace
3708 
3709 			(1) For normal tables io_buffer == dst == writeptr
3710 			(2) For only page compressed tables
3711 			io_buffer == dst == writeptr
3712 			(3) For encrypted (and page compressed)
3713 			readptr != io_buffer == dst != writeptr
3714 			*/
3715 
3716 			ut_ad(!encrypted && !page_compressed ?
3717 			      src == dst && dst == writeptr + (i * size):1);
3718 			ut_ad(page_compressed && !encrypted ?
3719 			      src == dst && dst == writeptr + (i * size):1);
3720 			ut_ad(encrypted ?
3721 			      src != dst && dst != writeptr + (i * size):1);
3722 
3723 			/* When tablespace is encrypted or compressed its
3724 			first page (i.e. page 0) is not encrypted or
3725 			compressed and there is no need to copy frame. */
3726 			if (encrypted && block->page.id.page_no() != 0) {
3727 				byte *local_frame = callback.get_frame(block);
3728 				ut_ad((writeptr + (i * size)) != local_frame);
3729 				memcpy((writeptr + (i * size)), local_frame, size);
3730 			}
3731 
3732 			if (frame_changed) {
3733 				if (block->page.zip.data) {
3734 					block->page.zip.data = dst;
3735 				} else {
3736 					block->frame = dst;
3737 				}
3738 			}
3739 
3740 			src =  io_buffer + (i * size);
3741 
3742 			if (page_compressed) {
3743 				updated = true;
3744 				if (ulint len = fil_page_compress(
3745 					    src,
3746 					    page_compress_buf,
3747 					    0,/* FIXME: compression level */
3748 					    512,/* FIXME: proper block size */
3749 					    encrypted)) {
3750 					/* FIXME: remove memcpy() */
3751 					memcpy(src, page_compress_buf, len);
3752 					memset(src + len, 0,
3753 					       srv_page_size - len);
3754 				}
3755 			}
3756 
3757 			/* Encrypt the page if encryption was used. */
3758 			if (encrypted && decrypted) {
3759 				byte *dest = writeptr + i * size;
3760 				byte* tmp = fil_encrypt_buf(
3761 					iter.crypt_data,
3762 					block->page.id.space(),
3763 					block->page.id.page_no(),
3764 					mach_read_from_8(src + FIL_PAGE_LSN),
3765 					src, callback.get_page_size(), dest);
3766 
3767 				if (tmp == src) {
3768 					/* TODO: remove unnecessary memcpy's */
3769 					ut_ad(dest != src);
3770 					memcpy(dest, src, size);
3771 				}
3772 
3773 				updated = true;
3774 			}
3775 		}
3776 
3777 		if (page_compressed && punch_hole) {
3778 			err = fil_import_compress_fwrite(
3779 				iter, write_request, offset, writeptr, n_bytes,
3780 				!updated);
3781 
3782 			if (err != DB_SUCCESS) {
3783 				punch_hole = false;
3784 				if (updated) {
3785 					goto normal_write;
3786 				}
3787 			}
3788 		} else if (updated) {
3789 			/* A page was updated in the set, write back to disk. */
3790 normal_write:
3791 			err = os_file_write(
3792 				write_request, iter.filepath, iter.file,
3793 				writeptr, offset, n_bytes);
3794 
3795 			if (err != DB_SUCCESS) {
3796 				goto func_exit;
3797 			}
3798 		}
3799 	}
3800 
3801 func_exit:
3802 	free(page_compress_buf);
3803 	return err;
3804 }
3805 
3806 /********************************************************************//**
3807 Iterate over all the pages in the tablespace.
3808 @param table - the table definiton in the server
3809 @param n_io_buffers - number of blocks to read and write together
3810 @param callback - functor that will do the page updates
3811 @return	DB_SUCCESS or error code */
3812 static
3813 dberr_t
fil_tablespace_iterate(dict_table_t * table,ulint n_io_buffers,AbstractCallback & callback)3814 fil_tablespace_iterate(
3815 /*===================*/
3816 	dict_table_t*		table,
3817 	ulint			n_io_buffers,
3818 	AbstractCallback&	callback)
3819 {
3820 	dberr_t		err;
3821 	pfs_os_file_t	file;
3822 	char*		filepath;
3823 
3824 	ut_a(n_io_buffers > 0);
3825 	ut_ad(!srv_read_only_mode);
3826 
3827 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
3828 			return(DB_CORRUPTION););
3829 
3830 	/* Make sure the data_dir_path is set. */
3831 	dict_get_and_save_data_dir_path(table, false);
3832 
3833 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3834 		ut_a(table->data_dir_path);
3835 
3836 		filepath = fil_make_filepath(
3837 			table->data_dir_path, table->name.m_name, IBD, true);
3838 	} else {
3839 		filepath = fil_make_filepath(
3840 			NULL, table->name.m_name, IBD, false);
3841 	}
3842 
3843 	if (!filepath) {
3844 		return(DB_OUT_OF_MEMORY);
3845 	} else {
3846 		bool	success;
3847 
3848 		file = os_file_create_simple_no_error_handling(
3849 			innodb_data_file_key, filepath,
3850 			OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
3851 
3852 		if (!success) {
3853 			/* The following call prints an error message */
3854 			os_file_get_last_error(true);
3855 			ib::error() << "Trying to import a tablespace,"
3856 				" but could not open the tablespace file "
3857 				    << filepath;
3858 			ut_free(filepath);
3859 			return DB_TABLESPACE_NOT_FOUND;
3860 		} else {
3861 			err = DB_SUCCESS;
3862 		}
3863 	}
3864 
3865 	callback.set_file(filepath, file);
3866 
3867 	os_offset_t	file_size = os_file_get_size(file);
3868 	ut_a(file_size != (os_offset_t) -1);
3869 
3870 	/* Allocate a page to read in the tablespace header, so that we
3871 	can determine the page size and zip_size (if it is compressed).
3872 	We allocate an extra page in case it is a compressed table. One
3873 	page is to ensure alignement. */
3874 
3875 	void*	page_ptr = ut_malloc_nokey(3U << srv_page_size_shift);
3876 	byte*	page = static_cast<byte*>(ut_align(page_ptr, srv_page_size));
3877 
3878 	buf_block_t* block = reinterpret_cast<buf_block_t*>
3879 		(ut_zalloc_nokey(sizeof *block));
3880 	block->frame = page;
3881 	block->page.id = page_id_t(0, 0);
3882 	block->page.io_fix = BUF_IO_NONE;
3883 	block->page.buf_fix_count = 1;
3884 	block->page.state = BUF_BLOCK_FILE_PAGE;
3885 
3886 	/* Read the first page and determine the page and zip size. */
3887 
3888 	IORequest       request(IORequest::READ);
3889 	request.disable_partial_io_warnings();
3890 
3891 	err = os_file_read_no_error_handling(request, file, page, 0,
3892 					     srv_page_size, 0);
3893 
3894 	if (err == DB_SUCCESS) {
3895 		err = callback.init(file_size, block);
3896 	}
3897 
3898 	if (err == DB_SUCCESS) {
3899 		block->page.id = page_id_t(callback.get_space_id(), 0);
3900 		block->page.size.copy_from(callback.get_page_size());
3901 		if (block->page.size.is_compressed()) {
3902 			page_zip_set_size(&block->page.zip,
3903 					  callback.get_page_size().physical());
3904 			/* ROW_FORMAT=COMPRESSED is not optimised for block IO
3905 			for now. We do the IMPORT page by page. */
3906 			n_io_buffers = 1;
3907 		}
3908 
3909 		fil_iterator_t	iter;
3910 
3911 		/* read (optional) crypt data */
3912 		iter.crypt_data = fil_space_read_crypt_data(
3913 			callback.get_page_size(), page);
3914 
3915 		/* If tablespace is encrypted, it needs extra buffers */
3916 		if (iter.crypt_data && n_io_buffers > 1) {
3917 			/* decrease io buffers so that memory
3918 			consumption will not double */
3919 			n_io_buffers /= 2;
3920 		}
3921 
3922 		iter.file = file;
3923 		iter.start = 0;
3924 		iter.end = file_size;
3925 		iter.filepath = filepath;
3926 		iter.file_size = file_size;
3927 		iter.n_io_buffers = n_io_buffers;
3928 
3929 		/* Add an extra page for compressed page scratch area. */
3930 		void*	io_buffer = ut_malloc_nokey(
3931 			(2 + iter.n_io_buffers) << srv_page_size_shift);
3932 
3933 		iter.io_buffer = static_cast<byte*>(
3934 			ut_align(io_buffer, srv_page_size));
3935 
3936 		void* crypt_io_buffer = NULL;
3937 		if (iter.crypt_data) {
3938 			crypt_io_buffer = ut_malloc_nokey(
3939 				(2 + iter.n_io_buffers)
3940 				<< srv_page_size_shift);
3941 			iter.crypt_io_buffer = static_cast<byte*>(
3942 				ut_align(crypt_io_buffer, srv_page_size));
3943 		}
3944 
3945 		if (block->page.zip.ssize) {
3946 			ut_ad(iter.n_io_buffers == 1);
3947 			block->frame = iter.io_buffer;
3948 			block->page.zip.data = block->frame + srv_page_size;
3949 		}
3950 
3951 		err = callback.run(iter, block);
3952 
3953 		if (iter.crypt_data) {
3954 			fil_space_destroy_crypt_data(&iter.crypt_data);
3955 		}
3956 
3957 		ut_free(crypt_io_buffer);
3958 		ut_free(io_buffer);
3959 	}
3960 
3961 	if (err == DB_SUCCESS) {
3962 		ib::info() << "Sync to disk";
3963 
3964 		if (!os_file_flush(file)) {
3965 			ib::info() << "os_file_flush() failed!";
3966 			err = DB_IO_ERROR;
3967 		} else {
3968 			ib::info() << "Sync to disk - done!";
3969 		}
3970 	}
3971 
3972 	os_file_close(file);
3973 
3974 	ut_free(page_ptr);
3975 	ut_free(filepath);
3976 	ut_free(block);
3977 
3978 	return(err);
3979 }
3980 
3981 /*****************************************************************//**
3982 Imports a tablespace. The space id in the .ibd file must match the space id
3983 of the table in the data dictionary.
3984 @return error code or DB_SUCCESS */
3985 dberr_t
row_import_for_mysql(dict_table_t * table,row_prebuilt_t * prebuilt)3986 row_import_for_mysql(
3987 /*=================*/
3988 	dict_table_t*	table,		/*!< in/out: table */
3989 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL */
3990 {
3991 	dberr_t		err;
3992 	trx_t*		trx;
3993 	ib_uint64_t	autoinc = 0;
3994 	char*		filepath = NULL;
3995 	ulint		space_flags MY_ATTRIBUTE((unused));
3996 
3997 	/* The caller assured that this is not read_only_mode and that no
3998 	temorary tablespace is being imported. */
3999 	ut_ad(!srv_read_only_mode);
4000 	ut_ad(!table->is_temporary());
4001 
4002 	ut_ad(table->space_id);
4003 	ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
4004 	ut_ad(prebuilt->trx);
4005 	ut_ad(!table->is_readable());
4006 
4007 	ibuf_delete_for_discarded_space(table->space_id);
4008 
4009 	trx_start_if_not_started(prebuilt->trx, true);
4010 
4011 	trx = trx_create();
4012 
4013 	/* So that the table is not DROPped during recovery. */
4014 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4015 
4016 	trx_start_if_not_started(trx, true);
4017 
4018 	/* So that we can send error messages to the user. */
4019 	trx->mysql_thd = prebuilt->trx->mysql_thd;
4020 
4021 	/* Ensure that the table will be dropped by trx_rollback_active()
4022 	in case of a crash. */
4023 
4024 	trx->table_id = table->id;
4025 
4026 	/* Assign an undo segment for the transaction, so that the
4027 	transaction will be recovered after a crash. */
4028 
4029 	/* TODO: Do not write any undo log for the IMPORT cleanup. */
4030 	{
4031 		mtr_t mtr;
4032 		mtr.start();
4033 		trx_undo_assign(trx, &err, &mtr);
4034 		mtr.commit();
4035 	}
4036 
4037 	DBUG_EXECUTE_IF("ib_import_undo_assign_failure",
4038 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4039 
4040 	if (err != DB_SUCCESS) {
4041 
4042 		return(row_import_cleanup(prebuilt, trx, err));
4043 
4044 	} else if (trx->rsegs.m_redo.undo == 0) {
4045 
4046 		err = DB_TOO_MANY_CONCURRENT_TRXS;
4047 		return(row_import_cleanup(prebuilt, trx, err));
4048 	}
4049 
4050 	prebuilt->trx->op_info = "read meta-data file";
4051 
4052 	/* Prevent DDL operations while we are checking. */
4053 
4054 	rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
4055 
4056 	row_import	cfg;
4057 
4058 	err = row_import_read_cfg(table, trx->mysql_thd, cfg);
4059 
4060 	/* Check if the table column definitions match the contents
4061 	of the config file. */
4062 
4063 	if (err == DB_SUCCESS) {
4064 
4065 		/* We have a schema file, try and match it with our
4066 		data dictionary. */
4067 
4068 		err = cfg.match_schema(trx->mysql_thd);
4069 
4070 		/* Update index->page and SYS_INDEXES.PAGE_NO to match the
4071 		B-tree root page numbers in the tablespace. Use the index
4072 		name from the .cfg file to find match. */
4073 
4074 		if (err == DB_SUCCESS) {
4075 			cfg.set_root_by_name();
4076 			autoinc = cfg.m_autoinc;
4077 		}
4078 
4079 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4080 
4081 		DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
4082 				err = DB_TOO_MANY_CONCURRENT_TRXS;);
4083 
4084 	} else if (cfg.m_missing) {
4085 
4086 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4087 
4088 		/* We don't have a schema file, we will have to discover
4089 		the index root pages from the .ibd file and skip the schema
4090 		matching step. */
4091 
4092 		ut_a(err == DB_FAIL);
4093 
4094 		cfg.m_page_size.copy_from(univ_page_size);
4095 
4096 		if (UT_LIST_GET_LEN(table->indexes) > 1) {
4097 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4098 				ER_INTERNAL_ERROR,
4099 				"Drop all secondary indexes before importing "
4100 				"table %s when .cfg file is missing.",
4101 				table->name.m_name);
4102 			err = DB_ERROR;
4103 			return row_import_error(prebuilt, trx, err);
4104 		}
4105 
4106 		FetchIndexRootPages	fetchIndexRootPages(table, trx);
4107 
4108 		err = fil_tablespace_iterate(
4109 			table, IO_BUFFER_SIZE(cfg.m_page_size.physical()),
4110 			fetchIndexRootPages);
4111 
4112 		if (err == DB_SUCCESS) {
4113 
4114 			err = fetchIndexRootPages.build_row_import(&cfg);
4115 
4116 			/* Update index->page and SYS_INDEXES.PAGE_NO
4117 			to match the B-tree root page numbers in the
4118 			tablespace. */
4119 
4120 			if (err == DB_SUCCESS) {
4121 				err = cfg.set_root_by_heuristic();
4122 			}
4123 		}
4124 
4125 		space_flags = fetchIndexRootPages.get_space_flags();
4126 
4127 	} else {
4128 		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
4129 	}
4130 
4131 	if (err != DB_SUCCESS) {
4132 		return(row_import_error(prebuilt, trx, err));
4133 	}
4134 
4135 	prebuilt->trx->op_info = "importing tablespace";
4136 
4137 	ib::info() << "Phase I - Update all pages";
4138 
4139 	/* Iterate over all the pages and do the sanity checking and
4140 	the conversion required to import the tablespace. */
4141 
4142 	PageConverter	converter(&cfg, table->space_id, trx);
4143 
4144 	/* Set the IO buffer size in pages. */
4145 
4146 	err = fil_tablespace_iterate(
4147 		table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), converter);
4148 
4149 	DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
4150 			err = DB_TOO_MANY_CONCURRENT_TRXS;);
4151 #ifdef BTR_CUR_HASH_ADAPT
4152 	/* On DISCARD TABLESPACE, we did not drop any adaptive hash
4153 	index entries. If we replaced the discarded tablespace with a
4154 	smaller one here, there could still be some adaptive hash
4155 	index entries that point to cached garbage pages in the buffer
4156 	pool, because PageConverter::operator() only evicted those
4157 	pages that were replaced by the imported pages. We must
4158 	detach any remaining adaptive hash index entries, because the
4159 	adaptive hash index must be a subset of the table contents;
4160 	false positives are not tolerated. */
4161 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes); index;
4162 	     index = UT_LIST_GET_NEXT(indexes, index)) {
4163 		index = index->clone_if_needed();
4164 	}
4165 #endif /* BTR_CUR_HASH_ADAPT */
4166 
4167 	if (err != DB_SUCCESS) {
4168 		char	table_name[MAX_FULL_NAME_LEN + 1];
4169 
4170 		innobase_format_name(
4171 			table_name, sizeof(table_name),
4172 			table->name.m_name);
4173 
4174 		if (err != DB_DECRYPTION_FAILED) {
4175 
4176 			ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4177 				ER_INTERNAL_ERROR,
4178 			"Cannot reset LSNs in table %s : %s",
4179 				table_name, ut_strerr(err));
4180 		}
4181 
4182 		return(row_import_cleanup(prebuilt, trx, err));
4183 	}
4184 
4185 	row_mysql_lock_data_dictionary(trx);
4186 
4187 	/* If the table is stored in a remote tablespace, we need to
4188 	determine that filepath from the link file and system tables.
4189 	Find the space ID in SYS_TABLES since this is an ALTER TABLE. */
4190 	dict_get_and_save_data_dir_path(table, true);
4191 
4192 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4193 		ut_a(table->data_dir_path);
4194 
4195 		filepath = fil_make_filepath(
4196 			table->data_dir_path, table->name.m_name, IBD, true);
4197 	} else {
4198 		filepath = fil_make_filepath(
4199 			NULL, table->name.m_name, IBD, false);
4200 	}
4201 
4202 	DBUG_EXECUTE_IF(
4203 		"ib_import_OOM_15",
4204 		ut_free(filepath);
4205 		filepath = NULL;
4206 	);
4207 
4208 	if (filepath == NULL) {
4209 		row_mysql_unlock_data_dictionary(trx);
4210 		return(row_import_cleanup(prebuilt, trx, DB_OUT_OF_MEMORY));
4211 	}
4212 
4213 	/* Open the tablespace so that we can access via the buffer pool.
4214 	We set the 2nd param (fix_dict = true) here because we already
4215 	have an x-lock on dict_operation_lock and dict_sys->mutex.
4216 	The tablespace is initially opened as a temporary one, because
4217 	we will not be writing any redo log for it before we have invoked
4218 	fil_space_t::set_imported() to declare it a persistent tablespace. */
4219 
4220 	ulint	fsp_flags = dict_tf_to_fsp_flags(table->flags);
4221 
4222 	table->space = fil_ibd_open(
4223 		true, true, FIL_TYPE_IMPORT, table->space_id,
4224 		fsp_flags, table->name, filepath, &err);
4225 
4226 	ut_ad((table->space == NULL) == (err != DB_SUCCESS));
4227 	DBUG_EXECUTE_IF("ib_import_open_tablespace_failure",
4228 			err = DB_TABLESPACE_NOT_FOUND; table->space = NULL;);
4229 
4230 	if (!table->space) {
4231 		row_mysql_unlock_data_dictionary(trx);
4232 
4233 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4234 			ER_GET_ERRMSG,
4235 			err, ut_strerr(err), filepath);
4236 
4237 		ut_free(filepath);
4238 
4239 		return(row_import_cleanup(prebuilt, trx, err));
4240 	}
4241 
4242 	row_mysql_unlock_data_dictionary(trx);
4243 
4244 	ut_free(filepath);
4245 
4246 	err = ibuf_check_bitmap_on_import(trx, table->space);
4247 
4248 	DBUG_EXECUTE_IF("ib_import_check_bitmap_failure", err = DB_CORRUPTION;);
4249 
4250 	if (err != DB_SUCCESS) {
4251 		return(row_import_cleanup(prebuilt, trx, err));
4252 	}
4253 
4254 	/* The first index must always be the clustered index. */
4255 
4256 	dict_index_t*	index = dict_table_get_first_index(table);
4257 
4258 	if (!dict_index_is_clust(index)) {
4259 		return(row_import_error(prebuilt, trx, DB_CORRUPTION));
4260 	}
4261 
4262 	/* Update the Btree segment headers for index node and
4263 	leaf nodes in the root page. Set the new space id. */
4264 
4265 	err = btr_root_adjust_on_import(index);
4266 
4267 	DBUG_EXECUTE_IF("ib_import_cluster_root_adjust_failure",
4268 			err = DB_CORRUPTION;);
4269 
4270 	if (err != DB_SUCCESS) {
4271 		return(row_import_error(prebuilt, trx, err));
4272 	} else if (cfg.requires_purge(index->name)) {
4273 
4274 		/* Purge any delete-marked records that couldn't be
4275 		purged during the page conversion phase from the
4276 		cluster index. */
4277 
4278 		IndexPurge	purge(trx, index);
4279 
4280 		trx->op_info = "cluster: purging delete marked records";
4281 
4282 		err = purge.garbage_collect();
4283 
4284 		trx->op_info = "";
4285 	}
4286 
4287 	DBUG_EXECUTE_IF("ib_import_cluster_failure", err = DB_CORRUPTION;);
4288 
4289 	if (err != DB_SUCCESS) {
4290 		return(row_import_error(prebuilt, trx, err));
4291 	}
4292 
4293 	/* For secondary indexes, purge any records that couldn't be purged
4294 	during the page conversion phase. */
4295 
4296 	err = row_import_adjust_root_pages_of_secondary_indexes(
4297 		trx, table, cfg);
4298 
4299 	DBUG_EXECUTE_IF("ib_import_sec_root_adjust_failure",
4300 			err = DB_CORRUPTION;);
4301 
4302 	if (err != DB_SUCCESS) {
4303 		return(row_import_error(prebuilt, trx, err));
4304 	}
4305 
4306 	/* Ensure that the next available DB_ROW_ID is not smaller than
4307 	any DB_ROW_ID stored in the table. */
4308 
4309 	if (prebuilt->clust_index_was_generated) {
4310 
4311 		err = row_import_set_sys_max_row_id(prebuilt, table);
4312 
4313 		if (err != DB_SUCCESS) {
4314 			return(row_import_error(prebuilt, trx, err));
4315 		}
4316 	}
4317 
4318 	ib::info() << "Phase III - Flush changes to disk";
4319 
4320 	/* Ensure that all pages dirtied during the IMPORT make it to disk.
4321 	The only dirty pages generated should be from the pessimistic purge
4322 	of delete marked records that couldn't be purged in Phase I. */
4323 
4324 	{
4325 		FlushObserver observer(prebuilt->table->space, trx, NULL);
4326 		buf_LRU_flush_or_remove_pages(prebuilt->table->space_id,
4327 					      &observer);
4328 
4329 		if (observer.is_interrupted()) {
4330 			ib::info() << "Phase III - Flush interrupted";
4331 			return(row_import_error(prebuilt, trx,
4332 						DB_INTERRUPTED));
4333 		}
4334 	}
4335 
4336 	ib::info() << "Phase IV - Flush complete";
4337 	prebuilt->table->space->set_imported();
4338 
4339 	/* The dictionary latches will be released in in row_import_cleanup()
4340 	after the transaction commit, for both success and error. */
4341 
4342 	row_mysql_lock_data_dictionary(trx);
4343 
4344 	/* Update the root pages of the table's indexes. */
4345 	err = row_import_update_index_root(trx, table, false);
4346 
4347 	if (err != DB_SUCCESS) {
4348 		return(row_import_error(prebuilt, trx, err));
4349 	}
4350 
4351 	err = row_import_update_discarded_flag(trx, table->id, false);
4352 
4353 	if (err != DB_SUCCESS) {
4354 		return(row_import_error(prebuilt, trx, err));
4355 	}
4356 
4357 	table->file_unreadable = false;
4358 	table->flags2 &= ~DICT_TF2_DISCARDED;
4359 
4360 	/* Set autoinc value read from .cfg file, if one was specified.
4361 	Otherwise, keep the PAGE_ROOT_AUTO_INC as is. */
4362 	if (autoinc) {
4363 		ib::info() << table->name << " autoinc value set to "
4364 			<< autoinc;
4365 
4366 		table->autoinc = autoinc--;
4367 		btr_write_autoinc(dict_table_get_first_index(table), autoinc);
4368 	}
4369 
4370 	return(row_import_cleanup(prebuilt, trx, err));
4371 }
4372