1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2016, Percona Inc. All Rights Reserved.
6 
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License, version 2.0,
9 as published by the Free Software Foundation.
10 
11 This program is also distributed with certain software (including
12 but not limited to OpenSSL) that is licensed under separate terms,
13 as designated in a particular file or component or in included license
14 documentation.  The authors of MySQL hereby grant you an additional
15 permission to link the program and your derivative works with the
16 separately licensed software that they have included with MySQL.
17 
18 This program is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 GNU General Public License, version 2.0, for more details.
22 
23 You should have received a copy of the GNU General Public License along with
24 this program; if not, write to the Free Software Foundation, Inc.,
25 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
26 
27 *****************************************************************************/
28 
29 /**************************************************//**
30 @file log/log0recv.cc
31 Recovery
32 
33 Created 9/20/1997 Heikki Tuuri
34 *******************************************************/
35 
36 #include "ha_prototypes.h"
37 
38 #include <vector>
39 #include <map>
40 #include <string>
41 
42 #include "log0recv.h"
43 
44 #ifdef UNIV_NONINL
45 #include "log0recv.ic"
46 #endif
47 
48 #include <my_aes.h>
49 
50 #include "mem0mem.h"
51 #include "buf0buf.h"
52 #include "buf0flu.h"
53 #include "mtr0mtr.h"
54 #include "mtr0log.h"
55 #include "page0cur.h"
56 #include "page0zip.h"
57 #include "btr0btr.h"
58 #include "btr0cur.h"
59 #include "ibuf0ibuf.h"
60 #include "trx0undo.h"
61 #include "trx0rec.h"
62 #include "fil0fil.h"
63 #include "fsp0sysspace.h"
64 #include "ut0new.h"
65 #include "row0trunc.h"
66 #ifndef UNIV_HOTBACKUP
67 # include "buf0rea.h"
68 # include "srv0srv.h"
69 # include "srv0start.h"
70 # include "trx0roll.h"
71 # include "row0merge.h"
72 #else /* !UNIV_HOTBACKUP */
73 /** This is set to false if the backup was originally taken with the
74 mysqlbackup --include regexp option: then we do not want to create tables in
75 directories which were not included */
76 bool	recv_replay_file_ops	= true;
77 #include "fut0lst.h"
78 #endif /* !UNIV_HOTBACKUP */
79 
80 
81 #include "fil0crypt.h"
82 
83 /** Log records are stored in the hash table in chunks at most of this size;
84 this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */
85 #define RECV_DATA_BLOCK_SIZE	(MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
86 
87 /** Read-ahead area in applying log records to file pages */
88 #define RECV_READ_AHEAD_AREA	32
89 
90 /** The recovery system */
91 recv_sys_t*	recv_sys = NULL;
92 /** TRUE when applying redo log records during crash recovery; FALSE
93 otherwise.  Note that this is FALSE while a background thread is
94 rolling back incomplete transactions. */
95 volatile bool	recv_recovery_on;
96 
97 #ifndef UNIV_HOTBACKUP
98 /** TRUE when recv_init_crash_recovery() has been called. */
99 bool	recv_needed_recovery;
100 #else
101 # define recv_needed_recovery			false
102 # define buf_pool_get_curr_size() (5 * 1024 * 1024)
103 #endif /* !UNIV_HOTBACKUP */
104 # ifdef UNIV_DEBUG
105 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
106 Protected by log_sys->mutex. */
107 bool	recv_no_log_write = false;
108 # endif /* UNIV_DEBUG */
109 
110 /** TRUE if buf_page_is_corrupted() should check if the log sequence
111 number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
112 recv_recovery_from_checkpoint_start(). */
113 bool	recv_lsn_checks_on;
114 
115 /** If the following is TRUE, the buffer pool file pages must be invalidated
116 after recovery and no ibuf operations are allowed; this becomes TRUE if
117 the log record hash table becomes too full, and log records must be merged
118 to file pages already before the recovery is finished: in this case no
119 ibuf operations are allowed, as they could modify the pages read in the
120 buffer pool before the pages have been recovered to the up-to-date state.
121 
122 TRUE means that recovery is running and no operations on the log files
123 are allowed yet: the variable name is misleading. */
124 #ifndef UNIV_HOTBACKUP
125 bool	recv_no_ibuf_operations;
126 /** TRUE when the redo log is being backed up */
127 # define recv_is_making_a_backup		false
128 /** TRUE when recovering from a backed up redo log file */
129 # define recv_is_from_backup			false
130 #else /* !UNIV_HOTBACKUP */
131 /** true if the backup is an offline backup */
132 volatile bool is_online_redo_copy = true;
133 /**true if the last flushed lsn read at the start of backup */
134 volatile lsn_t backup_redo_log_flushed_lsn;
135 
136 /** TRUE when the redo log is being backed up */
137 bool	recv_is_making_a_backup	= false;
138 /** TRUE when recovering from a backed up redo log file */
139 bool	recv_is_from_backup	= false;
140 # define buf_pool_get_curr_size() (5 * 1024 * 1024)
141 #endif /* !UNIV_HOTBACKUP */
142 /** The following counter is used to decide when to print info on
143 log scan */
144 static ulint	recv_scan_print_counter;
145 
146 /** The type of the previous parsed redo log record */
147 static mlog_id_t	recv_previous_parsed_rec_type;
148 /** The offset of the previous parsed redo log record */
149 static ulint	recv_previous_parsed_rec_offset;
150 /** The 'multi' flag of the previous parsed redo log record */
151 static ulint	recv_previous_parsed_rec_is_multi;
152 
153 /** This many frames must be left free in the buffer pool when we scan
154 the log and store the scanned log records in the buffer pool: we will
155 use these free frames to read in pages when we start applying the
156 log records to the database.
157 This is the default value. If the actual size of the buffer pool is
158 larger than 10 MB we'll set this value to 512. */
159 ulint	recv_n_pool_free_frames;
160 
161 /** The maximum lsn we see for a page during the recovery process. If this
162 is bigger than the lsn we are able to scan up to, that is an indication that
163 the recovery failed and the database may be corrupt. */
164 lsn_t	recv_max_page_lsn;
165 
166 #ifdef UNIV_PFS_THREAD
167 mysql_pfs_key_t	trx_rollback_clean_thread_key;
168 #endif /* UNIV_PFS_THREAD */
169 
170 #ifndef	NDEBUG
171 /** Return string name of the redo log record type.
172 @param[in]	type	record log record enum
173 @return string name of record log record */
174 const char*
175 get_mlog_string(mlog_id_t type);
176 #endif /* !NDEBUG */
177 
178 /* prototypes */
179 
180 #ifndef UNIV_HOTBACKUP
181 /*******************************************************//**
182 Initialize crash recovery environment. Can be called iff
183 recv_needed_recovery == false. */
184 static
185 void
186 recv_init_crash_recovery(void);
187 /*===========================*/
188 #endif /* !UNIV_HOTBACKUP */
189 
190 /** Tablespace item during recovery */
191 struct file_name_t {
192 	/** Tablespace file name (MLOG_FILE_NAME) */
193 	std::string	name;
194 	/** Tablespace object (NULL if not valid or not found) */
195 	fil_space_t*	space;
196 	/** Whether the tablespace has been deleted */
197 	bool		deleted;
198 
199 	/** Constructor */
file_name_tfile_name_t200 	file_name_t(std::string name_, bool deleted_) :
201 		name(name_), space(NULL), deleted (deleted_) {}
202 };
203 
204 /** Map of dirty tablespaces during recovery */
205 typedef std::map<
206 	ulint,
207 	file_name_t,
208 	std::less<ulint>,
209 	ut_allocator<std::pair<const ulint, file_name_t> > >	recv_spaces_t;
210 
211 static recv_spaces_t	recv_spaces;
212 
213 /** Process a file name from a MLOG_FILE_* record.
214 @param[in,out]	name		file name
215 @param[in]	len		length of the file name
216 @param[in]	space_id	the tablespace ID
217 @param[in]	deleted		whether this is a MLOG_FILE_DELETE record
218 @retval true if able to process file successfully.
219 @retval false if unable to process the file */
220 static
221 bool
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)222 fil_name_process(
223 	char*	name,
224 	ulint	len,
225 	ulint	space_id,
226 	bool	deleted)
227 {
228 	bool	processed = true;
229 
230 	/* The first condition is true during normal server operation, the
231 	second one during server startup after
232 	recv_recovery_from_checkpoint_start has completed. */
233 	if (!recv_recovery_is_on() || recv_lsn_checks_on)
234 	{
235 		/* We are being called from online log tracking, file name
236 		processing is a no-op, and specifically do not cause any DD
237 		changes. */
238 		return(processed);
239 	}
240 
241 	/* We will also insert space=NULL into the map, so that
242 	further checks can ensure that a MLOG_FILE_NAME record was
243 	scanned before applying any page records for the space_id. */
244 
245 	os_normalize_path(name);
246 	file_name_t	fname(std::string(name, len - 1), deleted);
247 	std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
248 		std::make_pair(space_id, fname));
249 	ut_ad(p.first->first == space_id);
250 
251 	file_name_t&	f = p.first->second;
252 
253 	if (deleted) {
254 		/* Got MLOG_FILE_DELETE */
255 
256 		if (!p.second && !f.deleted) {
257 			f.deleted = true;
258 			if (f.space != NULL) {
259 				fil_space_free(space_id, false);
260 				f.space = NULL;
261 			}
262 		}
263 
264 		ut_ad(f.space == NULL);
265 	} else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
266 		   || f.name != fname.name) {
267 		fil_space_t*	space;
268 
269 		/* Check if the tablespace file exists and contains
270 		the space_id. If not, ignore the file after displaying
271 		a note. Abort if there are multiple files with the
272 		same space_id. */
273 		switch (fil_ibd_load(space_id, name, space)) {
274 		case FIL_LOAD_OK:
275 			ut_ad(space != NULL);
276 
277 			/* For encrypted tablespace, set key and iv. */
278 			if (FSP_FLAGS_GET_ENCRYPTION(space->flags)
279 			    && recv_sys->encryption_list != NULL) {
280 
281 				ut_ad(space->crypt_data == NULL);
282 				dberr_t				err;
283 				encryption_list_t::iterator	it;
284 
285 				for (it = recv_sys->encryption_list->begin();
286 				     it != recv_sys->encryption_list->end();
287 				     it++) {
288 					if (it->space_id == space->id) {
289 						err = fil_set_encryption(
290 							space->id,
291 							Encryption::AES,
292 							it->key,
293 							it->iv);
294 						if (err != DB_SUCCESS) {
295 							ib::error()
296 								<< "Can't set"
297 								" encryption"
298 								" information"
299 								" for"
300 								" tablespace"
301 								<< space->name
302 								<< "!";
303 						}
304 						ut_free(it->key);
305 						ut_free(it->iv);
306 						it->key = NULL;
307 						it->iv = NULL;
308 						it->space_id = 0;
309 					}
310 				}
311 			}
312 
313 			if (f.space == NULL || f.space == space) {
314 				f.name = fname.name;
315 				f.space = space;
316 				f.deleted = false;
317 			} else {
318 				ib::error() << "Tablespace " << space_id
319 					<< " has been found in two places: '"
320 					<< f.name << "' and '" << name << "'."
321 					" You must delete one of them.";
322 				recv_sys->found_corrupt_fs = true;
323 				processed = false;
324 			}
325 			break;
326 
327 		case FIL_LOAD_ID_CHANGED:
328 			ut_ad(space == NULL);
329 			break;
330 
331 		case FIL_LOAD_NOT_FOUND:
332 			/* No matching tablespace was found; maybe it
333 			was renamed, and we will find a subsequent
334 			MLOG_FILE_* record. */
335 			ut_ad(space == NULL);
336 
337 			if (srv_force_recovery) {
338 				/* Without innodb_force_recovery,
339 				missing tablespaces will only be
340 				reported in
341 				recv_init_crash_recovery_spaces().
342 				Enable some more diagnostics when
343 				forcing recovery. */
344 
345 				ib::info()
346 					<< "At LSN: " << recv_sys->recovered_lsn
347 					<< ": unable to open file " << name
348 					<< " for tablespace " << space_id;
349 			}
350 			break;
351 
352 		case FIL_LOAD_INVALID:
353 			ut_ad(space == NULL);
354 			if (srv_force_recovery == 0) {
355 #ifndef UNIV_HOTBACKUP
356 				ib::warn() << "We do not continue the crash"
357 					" recovery, because the table may"
358 					" become corrupt if we cannot apply"
359 					" the log records in the InnoDB log to"
360 					" it. To fix the problem and start"
361 					" mysqld:";
362 				ib::info() << "1) If there is a permission"
363 					" problem in the file and mysqld"
364 					" cannot open the file, you should"
365 					" modify the permissions.";
366 				ib::info() << "2) If the tablespace is not"
367 					" needed, or you can restore an older"
368 					" version from a backup, then you can"
369 					" remove the .ibd file, and use"
370 					" --innodb_force_recovery=1 to force"
371 					" startup without this file.";
372 				ib::info() << "3) If the file system or the"
373 					" disk is broken, and you cannot"
374 					" remove the .ibd file, you can set"
375 					" --innodb_force_recovery.";
376 				recv_sys->found_corrupt_fs = true;
377 #else
378 				ib::warn() << "We do not continue the apply-log"
379 					" operation because the tablespace may"
380 					" become corrupt if we cannot apply"
381 					" the log records in the redo log"
382 					" records to it.";
383 #endif /* !UNIV_BACKUP  */
384 				processed = false;
385 				break;
386 			}
387 
388 			ib::info() << "innodb_force_recovery was set to "
389 				<< srv_force_recovery << ". Continuing crash"
390 				" recovery even though we cannot access the"
391 				" files for tablespace " << space_id << ".";
392 			break;
393 		}
394 	}
395 	return(processed);
396 }
397 
398 #ifndef UNIV_HOTBACKUP
399 /** Parse or process a MLOG_FILE_* record.
400 @param[in]	ptr		redo log record
401 @param[in]	end		end of the redo log buffer
402 @param[in]	space_id	the tablespace ID
403 @param[in]	first_page_no	first page number in the file
404 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
405 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
406 @param[in]	apply		whether to apply the record
407 @return pointer to next redo log record
408 @retval NULL if this log record was truncated */
409 static
410 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type,bool apply)411 fil_name_parse(
412 	byte*		ptr,
413 	const byte*	end,
414 	ulint		space_id,
415 	ulint		first_page_no,
416 	mlog_id_t	type,
417 	bool		apply)
418 {
419 	if (type == MLOG_FILE_CREATE2) {
420 		if (end < ptr + 4) {
421 			return(NULL);
422 		}
423 		ptr += 4;
424 	}
425 
426 	if (end < ptr + 2) {
427 		return(NULL);
428 	}
429 
430 	ulint	len = mach_read_from_2(ptr);
431 	ptr += 2;
432 	if (end < ptr + len) {
433 		return(NULL);
434 	}
435 
436 	/* MLOG_FILE_* records should only be written for
437 	user-created tablespaces. The name must be long enough
438 	and end in .ibd. */
439 	bool corrupt = is_predefined_tablespace(space_id)
440 		|| first_page_no != 0 // TODO: multi-file user tablespaces
441 		|| len < sizeof "/a.ibd\0"
442 		|| memcmp(ptr + len - 5, DOT_IBD, 5) != 0
443 		|| memchr(ptr, OS_PATH_SEPARATOR, len) == NULL;
444 
445 	byte*	end_ptr	= ptr + len;
446 
447 	switch (type) {
448 	default:
449 		ut_ad(0); // the caller checked this
450 	case MLOG_FILE_NAME:
451 		if (corrupt) {
452 			recv_sys->set_corrupt_log();
453 			break;
454 		}
455 
456 		fil_name_process(
457 			reinterpret_cast<char*>(ptr), len, space_id, false);
458 		break;
459 	case MLOG_FILE_DELETE:
460 		if (corrupt) {
461 			recv_sys->set_corrupt_log();
462 			break;
463 		}
464 
465 		fil_name_process(
466 			reinterpret_cast<char*>(ptr), len, space_id, true);
467 
468 		break;
469 	case MLOG_FILE_CREATE2:
470 		break;
471 	case MLOG_FILE_RENAME2:
472 		if (corrupt) {
473 			recv_sys->set_corrupt_log();
474 		}
475 
476 		/* The new name follows the old name. */
477 		byte*	new_name = end_ptr + 2;
478 		if (end < new_name) {
479 			return(NULL);
480 		}
481 
482 		ulint	new_len = mach_read_from_2(end_ptr);
483 
484 		if (end < end_ptr + 2 + new_len) {
485 			return(NULL);
486 		}
487 
488 		end_ptr += 2 + new_len;
489 
490 		corrupt = corrupt
491 			|| new_len < sizeof "/a.ibd\0"
492 			|| memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0
493 			|| !memchr(new_name, OS_PATH_SEPARATOR, new_len);
494 
495 		if (corrupt) {
496 			recv_sys->set_corrupt_log();
497 			break;
498 		}
499 
500 		fil_name_process(
501 			reinterpret_cast<char*>(ptr), len,
502 			space_id, false);
503 		fil_name_process(
504 			reinterpret_cast<char*>(new_name), new_len,
505 			space_id, false);
506 
507 		if (!apply) {
508 			break;
509 		}
510 		if (!fil_op_replay_rename(
511 			    space_id, first_page_no,
512 			    reinterpret_cast<const char*>(ptr),
513 			    reinterpret_cast<const char*>(new_name))) {
514 			recv_sys->found_corrupt_fs = true;
515 		}
516 	}
517 
518 	return(end_ptr);
519 }
520 #else /* !UNIV_HOTBACKUP */
521 /** Parse a file name retrieved from a MLOG_FILE_* record,
522 and return the absolute file path corresponds to backup dir
523 as well as in the form of database/tablespace
524 @param[in]	file_name		path emitted by the redo log
525 @param[out]	absolute_path	absolute path of tablespace
526 corresponds to backup dir
527 @param[out]	tablespace_name	name in the form of database/table */
528 static
529 void
make_abs_file_path(const std::string & name,std::string & absolute_path,std::string & tablespace_name)530 make_abs_file_path(
531 	const std::string&	name,
532 	std::string&		absolute_path,
533 	std::string&		tablespace_name)
534 {
535 	std::string file_name = name;
536 	std::string path = fil_path_to_mysql_datadir;
537 	size_t pos = std::string::npos;
538 
539 	if (is_absolute_path(file_name.c_str())) {
540 
541 		pos = file_name.rfind(OS_PATH_SEPARATOR);
542 		std::string temp_name = file_name.substr(0, pos);
543 		pos = temp_name.rfind(OS_PATH_SEPARATOR);
544 		++pos;
545 		file_name = file_name.substr(pos, file_name.length());
546 		path += OS_PATH_SEPARATOR + file_name;
547 	} else {
548 		pos = file_name.find(OS_PATH_SEPARATOR);
549 		++pos;
550 		file_name = file_name.substr(pos, file_name.length());
551 		path += OS_PATH_SEPARATOR + file_name;
552 	}
553 
554 	absolute_path = path;
555 
556 	/* remove the .ibd extension */
557 	pos = file_name.rfind(".ibd");
558 	if (pos != std::string::npos)
559 		tablespace_name = file_name.substr(0, pos);
560 
561 	/* space->name uses '/', not OS_PATH_SEPARATOR,
562 	update the seperator */
563 	if (OS_PATH_SEPARATOR != '/') {
564 		pos = tablespace_name.find(OS_PATH_SEPARATOR);
565 		while (pos != std::string::npos) {
566 			tablespace_name[pos] = '/';
567 			pos = tablespace_name.find(OS_PATH_SEPARATOR);
568 		}
569 	}
570 
571 }
572 
573 /** Wrapper around fil_name_process()
574 @param[in]	name		absolute path of tablespace file
575 @param[in]	space_id	the tablespace ID
576 @retval		true		if able to process file successfully.
577 @retval		false		if unable to process the file */
578 bool
fil_name_process(const char * name,ulint space_id)579 fil_name_process(
580 	const char*	name,
581 	ulint	space_id)
582 {
583 	size_t length = strlen(name);
584 	++length;
585 
586 	char* file_name = static_cast<char*>(ut_malloc_nokey(length));
587 	strncpy(file_name, name,length);
588 
589 	bool processed = fil_name_process(file_name, length, space_id, false);
590 
591 	ut_free(file_name);
592 	return(processed);
593 }
594 
595 /** Parse or process a MLOG_FILE_* record.
596 @param[in]	ptr		redo log record
597 @param[in]	end		end of the redo log buffer
598 @param[in]	space_id	the tablespace ID
599 @param[in]	first_page_no	first page number in the file
600 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
601 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
602 @param[in]	apply		whether to apply the record
603 @retval	pointer to next redo log record
604 @retval	NULL if this log record was truncated */
605 static
606 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type,bool apply)607 fil_name_parse(
608 	byte*		ptr,
609 	const byte*	end,
610 	ulint		space_id,
611 	ulint		first_page_no,
612 	mlog_id_t	type,
613 	bool		apply)
614 {
615 
616 	ulint flags = mach_read_from_4(ptr);
617 
618 	if (type == MLOG_FILE_CREATE2) {
619 		if (end < ptr + 4) {
620 			return(NULL);
621 		}
622 		ptr += 4;
623 	}
624 
625 	if (end < ptr + 2) {
626 		return(NULL);
627 	}
628 
629 	ulint	len = mach_read_from_2(ptr);
630 	ptr += 2;
631 	if (end < ptr + len) {
632 		return(NULL);
633 	}
634 
635 	os_normalize_path(reinterpret_cast<char*>(ptr));
636 
637 	/* MLOG_FILE_* records should only be written for
638 	user-created tablespaces. The name must be long enough
639 	and end in .ibd. */
640 	bool corrupt = is_predefined_tablespace(space_id)
641 		|| first_page_no != 0 // TODO: multi-file user tablespaces
642 		|| len < sizeof "/a.ibd\0"
643 		|| memcmp(ptr + len - 5, DOT_IBD, 5) != 0
644 		|| memchr(ptr, OS_PATH_SEPARATOR, len) == NULL;
645 
646 	byte*	end_ptr = ptr + len;
647 
648 	if (corrupt) {
649 		recv_sys->set_corrupt_log();
650 		return(end_ptr);
651 	}
652 
653 	std::string abs_file_path, tablespace_name;
654 	char* name = reinterpret_cast<char*>(ptr);
655 	char* new_name = NULL;
656 	recv_spaces_t::iterator itr;
657 
658 	make_abs_file_path(name, abs_file_path, tablespace_name);
659 
660 	if (!recv_is_making_a_backup) {
661 
662 		name = static_cast<char*>(ut_malloc_nokey(
663 			(abs_file_path.length() + 1)));
664 		strcpy(name, abs_file_path.c_str());
665 		len = strlen(name) + 1;
666 	}
667 	switch (type) {
668 	default:
669 		ut_ad(0); // the caller checked this
670 	case MLOG_FILE_NAME:
671 		/* Don't validate tablespaces while copying redo logs
672 		because backup process might keep some tablespace handles
673 		open in server datadir.
674 		Maintain "map of dirty tablespaces" so that assumptions
675 		for other redo log records are not broken even for dirty
676 		tablespaces during apply log */
677 		if (!recv_is_making_a_backup) {
678 			recv_spaces.insert(std::make_pair(space_id,
679 						file_name_t(abs_file_path,
680 						false)));
681 		}
682 		break;
683 	case MLOG_FILE_DELETE:
684 		/* Don't validate tablespaces while copying redo logs
685 		because backup process might keep some tablespace handles
686 		open in server datadir. */
687 		if (recv_is_making_a_backup)
688 			break;
689 
690 		fil_name_process(
691 			name, len, space_id, true);
692 
693 		if (apply && recv_replay_file_ops
694 			&& fil_space_get(space_id)) {
695 			dberr_t	err = fil_delete_tablespace(
696 				space_id, BUF_REMOVE_FLUSH_NO_WRITE);
697 			ut_a(err == DB_SUCCESS);
698 		}
699 
700 		break;
701 	case MLOG_FILE_CREATE2:
702 		if (recv_is_making_a_backup
703 		    || (!recv_replay_file_ops)
704 		    || (is_intermediate_file(abs_file_path.c_str()))
705 		    || (fil_space_get(space_id))
706 		    || (fil_space_get_id_by_name(
707 				tablespace_name.c_str()) != ULINT_UNDEFINED)) {
708 			/* Don't create table while :-
709 			1. scanning the redo logs during backup
710 			2. apply-log on a partial backup
711 			3. if it is intermediate file
712 			4. tablespace is already loaded in memory */
713 		} else {
714 			itr = recv_spaces.find(space_id);
715 			if (itr == recv_spaces.end()
716 				|| (itr->second.name != abs_file_path)) {
717 
718 				dberr_t ret = fil_ibd_create(
719 					space_id, tablespace_name.c_str(),
720 					abs_file_path.c_str(),
721 					flags, FIL_IBD_FILE_INITIAL_SIZE,
722 					FIL_ENCRYPTION_DEFAULT,
723 					0);
724 
725 				if (ret != DB_SUCCESS) {
726 					ib::fatal() << "Could not create the"
727 						<< " tablespace : "
728 						<< abs_file_path
729 						<< " with space Id : "
730 						<< space_id;
731 				}
732 			}
733 		}
734 		break;
735 	case MLOG_FILE_RENAME2:
736 		/* The new name follows the old name. */
737 		byte*	new_table_name = end_ptr + 2;
738 		if (end < new_table_name) {
739 			return(NULL);
740 		}
741 
742 		ulint	new_len = mach_read_from_2(end_ptr);
743 
744 		if (end < end_ptr + 2 + new_len) {
745 			return(NULL);
746 		}
747 
748 		end_ptr += 2 + new_len;
749 
750 		char* new_table = reinterpret_cast<char*>(new_table_name);
751 		os_normalize_path(new_table);
752 
753 		corrupt = corrupt
754 			|| new_len < sizeof "/a.ibd\0"
755 			|| memcmp(new_table_name + new_len - 5, DOT_IBD, 5) != 0
756 			|| !memchr(new_table_name, OS_PATH_SEPARATOR, new_len);
757 
758 		if (corrupt) {
759 			recv_sys->set_corrupt_log();
760 			break;
761 		}
762 
763 		if (recv_is_making_a_backup
764 		    || (!recv_replay_file_ops)
765 		    || (is_intermediate_file(name))
766 		    || (is_intermediate_file(new_table))) {
767 			/* Don't rename table while :-
768 			1. scanning the redo logs during backup
769 			2. apply-log on a partial backup
770 			3. The new name is already used.
771 			4. A tablespace is not open in memory with the old name.
772 			This will prevent unintended renames during recovery. */
773 			break;
774 		} else {
775 			make_abs_file_path(new_table, abs_file_path,
776 					   tablespace_name);
777 
778 			new_name = static_cast<char*>(ut_malloc_nokey(
779 				(abs_file_path.length() + 1)));
780 			strcpy(new_name, abs_file_path.c_str());
781 			new_len = strlen(new_name) + 1;
782 		}
783 
784 		fil_name_process(name, len, space_id, false);
785 		fil_name_process( new_name, new_len, space_id, false);
786 
787 		if (!fil_op_replay_rename(
788 			space_id, first_page_no,
789 			name,
790 			new_name)) {
791 			recv_sys->found_corrupt_fs = true;
792 		}
793 	}
794 
795 	if (!recv_is_making_a_backup) {
796 		ut_free(name);
797 		ut_free(new_name);
798 	}
799 	return(end_ptr);
800 }
801 #endif /* UNIV_HOTBACKUP */
802 
803 /********************************************************//**
804 Creates the recovery system. */
805 void
recv_sys_create(void)806 recv_sys_create(void)
807 /*=================*/
808 {
809 	if (recv_sys != NULL) {
810 
811 		return;
812 	}
813 
814 	recv_sys = static_cast<recv_sys_t*>(ut_zalloc_nokey(sizeof(*recv_sys)));
815 
816 	mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
817 
818 	recv_sys->heap = NULL;
819 	recv_sys->addr_hash = NULL;
820 }
821 
822 /********************************************************//**
823 Release recovery system mutexes. */
824 void
recv_sys_close(void)825 recv_sys_close(void)
826 /*================*/
827 {
828 	if (recv_sys != NULL) {
829 		if (recv_sys->addr_hash != NULL) {
830 			hash_table_free(recv_sys->addr_hash);
831 		}
832 
833 		if (recv_sys->heap != NULL) {
834 			mem_heap_free(recv_sys->heap);
835 		}
836 #ifndef UNIV_HOTBACKUP
837 		if (recv_sys->flush_start != NULL) {
838 			os_event_destroy(recv_sys->flush_start);
839 		}
840 
841 		if (recv_sys->flush_end != NULL) {
842 			os_event_destroy(recv_sys->flush_end);
843 		}
844 #endif /* !UNIV_HOTBACKUP */
845 		ut_free(recv_sys->buf);
846 		ut_free(recv_sys->last_block_buf_start);
847 
848 		/* Call the destructor for recv_sys_t::dblwr member */
849 		recv_sys->dblwr.~recv_dblwr_t();
850 
851 		mutex_free(&recv_sys->mutex);
852 
853 		ut_free(recv_sys);
854 		recv_sys = NULL;
855 	}
856 
857 	recv_spaces.clear();
858 }
859 
860 /********************************************************//**
861 Frees the recovery system memory. */
862 void
recv_sys_mem_free(void)863 recv_sys_mem_free(void)
864 /*===================*/
865 {
866 	if (recv_sys != NULL) {
867 		if (recv_sys->addr_hash != NULL) {
868 			hash_table_free(recv_sys->addr_hash);
869 		}
870 
871 		if (recv_sys->heap != NULL) {
872 			mem_heap_free(recv_sys->heap);
873 		}
874 #ifndef UNIV_HOTBACKUP
875 		if (recv_sys->flush_start != NULL) {
876 			os_event_destroy(recv_sys->flush_start);
877 		}
878 
879 		if (recv_sys->flush_end != NULL) {
880 			os_event_destroy(recv_sys->flush_end);
881 		}
882 #endif /* !UNIV_HOTBACKUP */
883 		ut_free(recv_sys->buf);
884 		ut_free(recv_sys->last_block_buf_start);
885 
886 		/* Call the destructor for recv_sys_t::dblwr member */
887 		recv_sys->dblwr.~recv_dblwr_t();
888 
889 		ut_free(recv_sys);
890 		recv_sys = NULL;
891 	}
892 }
893 
894 #ifndef UNIV_HOTBACKUP
895 /************************************************************
896 Reset the state of the recovery system variables. */
897 void
recv_sys_var_init(void)898 recv_sys_var_init(void)
899 /*===================*/
900 {
901 	recv_recovery_on = false;
902 	recv_needed_recovery = false;
903 	recv_lsn_checks_on = false;
904 	recv_no_ibuf_operations = false;
905 	recv_scan_print_counter	= 0;
906 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
907 	recv_previous_parsed_rec_offset	= 0;
908 	recv_previous_parsed_rec_is_multi = 0;
909 	recv_n_pool_free_frames	= 256;
910 	recv_max_page_lsn = 0;
911 }
912 #endif /* !UNIV_HOTBACKUP */
913 
914 /************************************************************
915 Inits the recovery system for a recovery operation. */
916 void
recv_sys_init(ulint available_memory)917 recv_sys_init(
918 /*==========*/
919 	ulint	available_memory)	/*!< in: available memory in bytes */
920 {
921 	if (recv_sys->heap != NULL) {
922 
923 		return;
924 	}
925 
926 #ifndef UNIV_HOTBACKUP
927 	mutex_enter(&(recv_sys->mutex));
928 
929 	recv_sys->heap = mem_heap_create_typed(256,
930 					MEM_HEAP_FOR_RECV_SYS);
931 
932 	if (!srv_read_only_mode) {
933 		recv_sys->flush_start = os_event_create(0);
934 		recv_sys->flush_end = os_event_create(0);
935 	}
936 #else /* !UNIV_HOTBACKUP */
937 	recv_sys->heap = mem_heap_create(256);
938 	recv_is_from_backup = true;
939 #endif /* !UNIV_HOTBACKUP */
940 
941 	/* Set appropriate value of recv_n_pool_free_frames. */
942 	if (buf_pool_get_curr_size() >= (10 * 1024 * 1024)) {
943 		/* Buffer pool of size greater than 10 MB. */
944 		recv_n_pool_free_frames = 512;
945 	}
946 
947 	recv_sys->buf = static_cast<byte*>(
948 		ut_malloc_nokey(RECV_PARSING_BUF_SIZE));
949 	recv_sys->len = 0;
950 	recv_sys->recovered_offset = 0;
951 
952 	recv_sys->addr_hash = hash_create(available_memory / 512);
953 	recv_sys->n_addrs = 0;
954 
955 	recv_sys->apply_log_recs = FALSE;
956 	recv_sys->apply_batch_on = FALSE;
957 
958 	recv_sys->last_block_buf_start = static_cast<byte*>(
959 		ut_malloc_nokey(OS_FILE_LOG_BLOCK_SIZE
960 				+ MAX_SRV_LOG_WRITE_AHEAD_SIZE));
961 
962 	recv_sys->last_block = static_cast<byte*>(ut_align(
963 		recv_sys->last_block_buf_start, MAX_SRV_LOG_WRITE_AHEAD_SIZE));
964 
965 	recv_sys->found_corrupt_log = false;
966 	recv_sys->found_corrupt_fs = false;
967 	recv_sys->mlog_checkpoint_lsn = 0;
968 
969 	recv_max_page_lsn = 0;
970 
971 	/* Call the constructor for recv_sys_t::dblwr member */
972 	new (&recv_sys->dblwr) recv_dblwr_t();
973 
974 	recv_sys->encryption_list = NULL;
975 	mutex_exit(&(recv_sys->mutex));
976 }
977 
978 /********************************************************//**
979 Empties the hash table when it has been fully processed. */
980 static
981 void
recv_sys_empty_hash(void)982 recv_sys_empty_hash(void)
983 /*=====================*/
984 {
985 	ut_ad(mutex_own(&(recv_sys->mutex)));
986 
987 	if (recv_sys->n_addrs != 0) {
988 		ib::fatal() << recv_sys->n_addrs << " pages with log records"
989 			" were left unprocessed!";
990 	}
991 
992 	hash_table_free(recv_sys->addr_hash);
993 	mem_heap_empty(recv_sys->heap);
994 
995 	recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
996 }
997 
998 #ifndef UNIV_HOTBACKUP
999 
1000 /********************************************************//**
1001 Frees the recovery system. */
1002 void
recv_sys_debug_free(void)1003 recv_sys_debug_free(void)
1004 /*=====================*/
1005 {
1006 	mutex_enter(&(recv_sys->mutex));
1007 
1008 	hash_table_free(recv_sys->addr_hash);
1009 	mem_heap_free(recv_sys->heap);
1010 	ut_free(recv_sys->buf);
1011 	ut_free(recv_sys->last_block_buf_start);
1012 
1013 	recv_sys->buf = NULL;
1014 	recv_sys->heap = NULL;
1015 	recv_sys->addr_hash = NULL;
1016 	recv_sys->last_block_buf_start = NULL;
1017 
1018 	/* wake page cleaner up to progress */
1019 	if (!srv_read_only_mode) {
1020 		ut_ad(!recv_recovery_on);
1021 		os_event_reset(buf_flush_event);
1022 		os_event_set(recv_sys->flush_start);
1023 	}
1024 
1025 	if (recv_sys->encryption_list != NULL) {
1026 		encryption_list_t::iterator	it;
1027 
1028 		for (it = recv_sys->encryption_list->begin();
1029 		     it != recv_sys->encryption_list->end();
1030 		     it++) {
1031 			if (it->key != NULL) {
1032 				ut_free(it->key);
1033 				it->key = NULL;
1034 			}
1035 			if (it->iv != NULL) {
1036 				ut_free(it->iv);
1037 				it->iv = NULL;
1038 			}
1039 		}
1040 
1041 		recv_sys->encryption_list->swap(*recv_sys->encryption_list);
1042 
1043 		UT_DELETE(recv_sys->encryption_list);
1044 		recv_sys->encryption_list = NULL;
1045 	}
1046 
1047 	mutex_exit(&(recv_sys->mutex));
1048 }
1049 
1050 /********************************************************//**
1051 Copies a log segment from the most up-to-date log group to the other log
1052 groups, so that they all contain the latest log data. Also writes the info
1053 about the latest checkpoint to the groups, and inits the fields in the group
1054 memory structs to up-to-date values. */
1055 static
1056 void
recv_synchronize_groups(void)1057 recv_synchronize_groups(void)
1058 /*=========================*/
1059 {
1060 	lsn_t		start_lsn;
1061 	lsn_t		end_lsn;
1062 	lsn_t		recovered_lsn;
1063 
1064 	recovered_lsn = recv_sys->recovered_lsn;
1065 
1066 	/* Read the last recovered log block to the recovery system buffer:
1067 	the block is always incomplete */
1068 
1069 	start_lsn = ut_uint64_align_down(recovered_lsn,
1070 					 OS_FILE_LOG_BLOCK_SIZE);
1071 	end_lsn = ut_uint64_align_up(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
1072 
1073 	ut_a(start_lsn != end_lsn);
1074 
1075 	log_group_read_log_seg(recv_sys->last_block,
1076 			       UT_LIST_GET_FIRST(log_sys->log_groups),
1077 			       start_lsn, end_lsn, false);
1078 
1079 	for (log_group_t* group = UT_LIST_GET_FIRST(log_sys->log_groups);
1080 	     group;
1081 	     group = UT_LIST_GET_NEXT(log_groups, group)) {
1082 		/* Update the fields in the group struct to correspond to
1083 		recovered_lsn */
1084 
1085 		log_group_set_fields(group, recovered_lsn);
1086 	}
1087 
1088 	/* Copy the checkpoint info to the log; remember that we have
1089 	incremented checkpoint_no by one, and the info will not be written
1090 	over the max checkpoint info, thus making the preservation of max
1091 	checkpoint info on disk certain */
1092 
1093 	log_write_checkpoint_info(true);
1094 	log_mutex_enter();
1095 }
1096 #endif /* !UNIV_HOTBACKUP */
1097 
1098 /** Check the consistency of a log header block.
1099 @param[in]	log header block
1100 @return true if ok */
1101 static
1102 bool
recv_check_log_header_checksum(const byte * buf)1103 recv_check_log_header_checksum(
1104 	const byte*	buf)
1105 {
1106 	return(log_block_get_checksum(buf)
1107 	       == log_block_calc_checksum_crc32(buf));
1108 }
1109 
1110 #ifndef UNIV_HOTBACKUP
1111 /** Find the latest checkpoint in the format-0 log header.
1112 @param[out]	max_group	log group, or NULL
1113 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1114 @return error code or DB_SUCCESS */
1115 static MY_ATTRIBUTE((warn_unused_result))
1116 dberr_t
recv_find_max_checkpoint_0(log_group_t ** max_group,ulint * max_field)1117 recv_find_max_checkpoint_0(
1118 	log_group_t**	max_group,
1119 	ulint*		max_field)
1120 {
1121 	log_group_t*	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1122 	ib_uint64_t	max_no = 0;
1123 	ib_uint64_t	checkpoint_no;
1124 	byte*		buf	= log_sys->checkpoint_buf;
1125 
1126 	ut_ad(group->format == 0);
1127 	ut_ad(UT_LIST_GET_NEXT(log_groups, group) == NULL);
1128 
1129 	/** Offset of the first checkpoint checksum */
1130 	static const uint CHECKSUM_1 = 288;
1131 	/** Offset of the second checkpoint checksum */
1132 	static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1133 	/** Most significant bits of the checkpoint offset */
1134 	static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1135 	/** Least significant bits of the checkpoint offset */
1136 	static const uint OFFSET_LOW32 = 16;
1137 
1138 	for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1139 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1140 		log_group_header_read(group, field);
1141 
1142 		if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1143 		    != mach_read_from_4(buf + CHECKSUM_1)
1144 		    || static_cast<uint32_t>(
1145 			    ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1146 					   CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1147 		    != mach_read_from_4(buf + CHECKSUM_2)) {
1148 			DBUG_PRINT("ib_log",
1149 				   ("invalid pre-5.7.9 checkpoint " ULINTPF,
1150 				    field));
1151 			continue;
1152 		}
1153 
1154 		group->state = LOG_GROUP_OK;
1155 
1156 		group->lsn = mach_read_from_8(
1157 			buf + LOG_CHECKPOINT_LSN);
1158 		group->lsn_offset = static_cast<ib_uint64_t>(
1159 			mach_read_from_4(buf + OFFSET_HIGH32)) << 32
1160 			| mach_read_from_4(buf + OFFSET_LOW32);
1161 		checkpoint_no = mach_read_from_8(
1162 			buf + LOG_CHECKPOINT_NO);
1163 
1164 		DBUG_PRINT("ib_log",
1165 			   ("checkpoint " UINT64PF " at " LSN_PF
1166 			    " found in group " ULINTPF,
1167 			    checkpoint_no, group->lsn, group->id));
1168 
1169 		if (checkpoint_no >= max_no) {
1170 			*max_group = group;
1171 			*max_field = field;
1172 			max_no = checkpoint_no;
1173 		}
1174 	}
1175 
1176 	if (*max_group != NULL) {
1177 		return(DB_SUCCESS);
1178 	}
1179 
1180 	ib::error() << "Upgrade after a crash is not supported."
1181 		" This redo log was created before MySQL 5.7.9,"
1182 		" and we did not find a valid checkpoint."
1183 		" Please follow the instructions at"
1184 		" " REFMAN "upgrading.html";
1185 	return(DB_ERROR);
1186 }
1187 
1188 /** Determine if a pre-5.7.9 redo log is clean.
1189 @param[in]	lsn	checkpoint LSN
1190 @return error code
1191 @retval	DB_SUCCESS	if the redo log is clean
1192 @retval DB_ERROR	if the redo log is corrupted or dirty */
1193 static
1194 dberr_t
recv_log_format_0_recover(lsn_t lsn)1195 recv_log_format_0_recover(lsn_t lsn)
1196 {
1197 	log_mutex_enter();
1198 	log_group_t*	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1199 	const lsn_t	source_offset
1200 		= log_group_calc_lsn_offset(lsn, group);
1201 	log_mutex_exit();
1202 	const ulint	page_no
1203 		= (ulint) (source_offset / univ_page_size.physical());
1204 	byte*		buf = log_sys->buf;
1205 
1206 	static const char* NO_UPGRADE_RECOVERY_MSG =
1207 		"Upgrade after a crash is not supported."
1208 		" This redo log was created before MySQL 5.7.9";
1209 	static const char* NO_UPGRADE_RTFM_MSG =
1210 		". Please follow the instructions at "
1211 		REFMAN "upgrading.html";
1212 
1213 	fil_io(IORequestLogRead, true,
1214 	       page_id_t(group->space_id, page_no),
1215 	       univ_page_size,
1216 	       (ulint) ((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1217 			% univ_page_size.physical()),
1218 	       OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1219 
1220 	if (log_block_calc_checksum_format_0(buf)
1221 	    != log_block_get_checksum(buf)) {
1222 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1223 			<< ", and it appears corrupted"
1224 			<< NO_UPGRADE_RTFM_MSG;
1225 		return(DB_CORRUPTION);
1226 	}
1227 
1228 	if (log_block_get_data_len(buf)
1229 	    != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1230 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1231 			<< NO_UPGRADE_RTFM_MSG;
1232 		return(DB_ERROR);
1233 	}
1234 
1235 	/* Mark the redo log for upgrading. */
1236 	srv_log_file_size = 0;
1237 	recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1238 		= recv_sys->scanned_lsn
1239 		= recv_sys->mlog_checkpoint_lsn = lsn;
1240 	log_sys->last_checkpoint_lsn = log_sys->next_checkpoint_lsn
1241 		= log_sys->lsn = log_sys->write_lsn
1242 		= log_sys->current_flush_lsn = log_sys->flushed_to_disk_lsn
1243 		= lsn;
1244 	log_sys->next_checkpoint_no = 0;
1245 	return(DB_SUCCESS);
1246 }
1247 
1248 /** Find the latest checkpoint in the log header.
1249 @param[out]	max_group	log group, or NULL
1250 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1251 @return error code or DB_SUCCESS */
1252 static MY_ATTRIBUTE((warn_unused_result))
1253 dberr_t
recv_find_max_checkpoint(log_group_t ** max_group,ulint * max_field)1254 recv_find_max_checkpoint(
1255 	log_group_t**	max_group,
1256 	ulint*		max_field)
1257 {
1258 	log_group_t*	group;
1259 	ib_uint64_t	max_no;
1260 	ib_uint64_t	checkpoint_no;
1261 	ulint		field;
1262 	byte*		buf;
1263 
1264 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1265 
1266 	max_no = 0;
1267 	*max_group = NULL;
1268 	*max_field = 0;
1269 
1270 	buf = log_sys->checkpoint_buf;
1271 
1272 	while (group) {
1273 		group->state = LOG_GROUP_CORRUPTED;
1274 
1275 		log_group_header_read(group, 0);
1276 		/* Check the header page checksum. There was no
1277 		checksum in the first redo log format (version 0). */
1278 		group->format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1279 		if (group->format != 0
1280 		    && !recv_check_log_header_checksum(buf)) {
1281 			ib::error() << "Invalid redo log header checksum.";
1282 			return(DB_CORRUPTION);
1283 		}
1284 
1285 		switch (group->format) {
1286 		case 0:
1287 			return(recv_find_max_checkpoint_0(
1288 				       max_group, max_field));
1289 		case LOG_HEADER_FORMAT_CURRENT:
1290 			break;
1291 		default:
1292 			/* Ensure that the string is NUL-terminated. */
1293 			buf[LOG_HEADER_CREATOR_END] = 0;
1294 			ib::error() << "Unsupported redo log format."
1295 				" The redo log was created"
1296 				" with " << buf + LOG_HEADER_CREATOR <<
1297 				". Please follow the instructions at "
1298 				REFMAN "upgrading-downgrading.html";
1299 			/* Do not issue a message about a possibility
1300 			to cleanly shut down the newer server version
1301 			and to remove the redo logs, because the
1302 			format of the system data structures may
1303 			radically change after MySQL 5.7. */
1304 			return(DB_ERROR);
1305 		}
1306 
1307 		for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1308 		     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1309 
1310 			log_group_header_read(group, field);
1311 
1312 			if (!recv_check_log_header_checksum(buf)) {
1313 				DBUG_PRINT("ib_log",
1314 					   ("invalid checkpoint,"
1315 					    " group " ULINTPF " at " ULINTPF
1316 					    ", checksum %x",
1317 					    group->id, field,
1318 					    (unsigned) log_block_get_checksum(
1319 						    buf)));
1320 				continue;
1321 			}
1322 
1323 			group->state = LOG_GROUP_OK;
1324 
1325 			group->lsn = mach_read_from_8(
1326 				buf + LOG_CHECKPOINT_LSN);
1327 			group->lsn_offset = mach_read_from_8(
1328 				buf + LOG_CHECKPOINT_OFFSET);
1329 			checkpoint_no = mach_read_from_8(
1330 				buf + LOG_CHECKPOINT_NO);
1331 
1332 			DBUG_PRINT("ib_log",
1333 				   ("checkpoint " UINT64PF " at " LSN_PF
1334 				    " found in group " ULINTPF,
1335 				    checkpoint_no, group->lsn, group->id));
1336 
1337 			if (checkpoint_no >= max_no) {
1338 				*max_group = group;
1339 				*max_field = field;
1340 				max_no = checkpoint_no;
1341 			}
1342 		}
1343 
1344 		group = UT_LIST_GET_NEXT(log_groups, group);
1345 	}
1346 
1347 	if (*max_group == NULL) {
1348 		/* Before 5.7.9, we could get here during database
1349 		initialization if we created an ib_logfile0 file that
1350 		was filled with zeroes, and were killed. After
1351 		5.7.9, we would reject such a file already earlier,
1352 		when checking the file header. */
1353 		ib::error() << "No valid checkpoint found"
1354 			" (corrupted redo log)."
1355 			" You can try --innodb-force-recovery=6"
1356 			" as a last resort.";
1357 		return(DB_ERROR);
1358 	}
1359 
1360 	return(DB_SUCCESS);
1361 }
1362 #else /* !UNIV_HOTBACKUP */
1363 /*******************************************************************//**
1364 Reads the checkpoint info needed in hot backup.
1365 @return TRUE if success */
1366 ibool
recv_read_checkpoint_info_for_backup(const byte * hdr,lsn_t * lsn,lsn_t * offset,lsn_t * cp_no,lsn_t * first_header_lsn)1367 recv_read_checkpoint_info_for_backup(
1368 /*=================================*/
1369 	const byte*	hdr,	/*!< in: buffer containing the log group
1370 				header */
1371 	lsn_t*		lsn,	/*!< out: checkpoint lsn */
1372 	lsn_t*		offset,	/*!< out: checkpoint offset in the log group */
1373 	lsn_t*		cp_no,	/*!< out: checkpoint number */
1374 	lsn_t*		first_header_lsn)
1375 				/*!< out: lsn of of the start of the
1376 				first log file */
1377 {
1378 	ulint		max_cp		= 0;
1379 	ib_uint64_t	max_cp_no	= 0;
1380 	const byte*	cp_buf;
1381 
1382 	cp_buf = hdr + LOG_CHECKPOINT_1;
1383 
1384 	if (recv_check_log_header_checksum(cp_buf)) {
1385 		max_cp_no = mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO);
1386 		max_cp = LOG_CHECKPOINT_1;
1387 	}
1388 
1389 	cp_buf = hdr + LOG_CHECKPOINT_2;
1390 
1391 	if (recv_check_log_header_checksum(cp_buf)) {
1392 		if (mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO) > max_cp_no) {
1393 			max_cp = LOG_CHECKPOINT_2;
1394 		}
1395 	}
1396 
1397 	if (max_cp == 0) {
1398 		return(FALSE);
1399 	}
1400 
1401 	cp_buf = hdr + max_cp;
1402 
1403 	*lsn = mach_read_from_8(cp_buf + LOG_CHECKPOINT_LSN);
1404 	*offset = mach_read_from_8(
1405 		cp_buf + LOG_CHECKPOINT_OFFSET);
1406 
1407 	*cp_no = mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO);
1408 
1409 	*first_header_lsn = mach_read_from_8(hdr + LOG_HEADER_START_LSN);
1410 
1411 	return(TRUE);
1412 }
1413 #endif /* !UNIV_HOTBACKUP */
1414 
1415 /** Check the 4-byte checksum to the trailer checksum field of a log
1416 block.
1417 @param[in]	log block
1418 @return whether the checksum matches */
1419 bool
log_block_checksum_is_ok(const byte * block)1420 log_block_checksum_is_ok(
1421 	const byte*	block)	/*!< in: pointer to a log block */
1422 {
1423 	return(!innodb_log_checksums
1424 	       || log_block_get_checksum(block)
1425 	       == log_block_calc_checksum(block));
1426 }
1427 
1428 #ifdef UNIV_HOTBACKUP
1429 /*******************************************************************//**
1430 Scans the log segment and n_bytes_scanned is set to the length of valid
1431 log scanned. */
1432 void
recv_scan_log_seg_for_backup(byte * buf,ulint buf_len,lsn_t * scanned_lsn,ulint * scanned_checkpoint_no,ulint * n_bytes_scanned)1433 recv_scan_log_seg_for_backup(
1434 /*=========================*/
1435 	byte*		buf,		/*!< in: buffer containing log data */
1436 	ulint		buf_len,	/*!< in: data length in that buffer */
1437 	lsn_t*		scanned_lsn,	/*!< in/out: lsn of buffer start,
1438 					we return scanned lsn */
1439 	ulint*		scanned_checkpoint_no,
1440 					/*!< in/out: 4 lowest bytes of the
1441 					highest scanned checkpoint number so
1442 					far */
1443 	ulint*		n_bytes_scanned)/*!< out: how much we were able to
1444 					scan, smaller than buf_len if log
1445 					data ended here */
1446 {
1447 	ulint	data_len;
1448 	byte*	log_block;
1449 	ulint	no;
1450 
1451 	*n_bytes_scanned = 0;
1452 
1453 	for (log_block = buf; log_block < buf + buf_len;
1454 	     log_block += OS_FILE_LOG_BLOCK_SIZE) {
1455 
1456 		no = log_block_get_hdr_no(log_block);
1457 
1458 #if 0
1459 		fprintf(stderr, "Log block header no %lu\n", no);
1460 #endif
1461 
1462 		if (no != log_block_convert_lsn_to_no(*scanned_lsn)
1463 		    || !log_block_checksum_is_ok(log_block)) {
1464 #if 0
1465 			fprintf(stderr,
1466 				"Log block n:o %lu, scanned lsn n:o %lu\n",
1467 				no, log_block_convert_lsn_to_no(*scanned_lsn));
1468 #endif
1469 			/* Garbage or an incompletely written log block */
1470 
1471 			log_block += OS_FILE_LOG_BLOCK_SIZE;
1472 #if 0
1473 			fprintf(stderr,
1474 				"Next log block n:o %lu\n",
1475 				log_block_get_hdr_no(log_block));
1476 #endif
1477 			break;
1478 		}
1479 
1480 		if (*scanned_checkpoint_no > 0
1481 		    && log_block_get_checkpoint_no(log_block)
1482 		    < *scanned_checkpoint_no
1483 		    && *scanned_checkpoint_no
1484 		    - log_block_get_checkpoint_no(log_block)
1485 		    > 0x80000000UL) {
1486 
1487 			/* Garbage from a log buffer flush which was made
1488 			before the most recent database recovery */
1489 #if 0
1490 			fprintf(stderr,
1491 				"Scanned cp n:o %lu, block cp n:o %lu\n",
1492 				*scanned_checkpoint_no,
1493 				log_block_get_checkpoint_no(log_block));
1494 #endif
1495 			break;
1496 		}
1497 
1498 		data_len = log_block_get_data_len(log_block);
1499 
1500 		*scanned_checkpoint_no
1501 			= log_block_get_checkpoint_no(log_block);
1502 		*scanned_lsn += data_len;
1503 
1504 		*n_bytes_scanned += data_len;
1505 
1506 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
1507 			/* Log data ends here */
1508 
1509 #if 0
1510 			fprintf(stderr, "Log block data len %lu\n",
1511 				data_len);
1512 #endif
1513 			break;
1514 		}
1515 	}
1516 }
1517 #endif /* UNIV_HOTBACKUP */
1518 
1519 /** Parse or process a write encryption info record.
1520 @param[in]	ptr		redo log record
1521 @param[in]	end		end of the redo log buffer
1522 @param[in]	space_id	the tablespace ID
1523 @return log record end, NULL if not a complete record */
1524 static
1525 byte*
fil_write_encryption_parse(byte * ptr,const byte * end,ulint space_id,ulint len)1526 fil_write_encryption_parse(
1527 	byte*		ptr,
1528 	const byte*	end,
1529 	ulint		space_id,
1530 	ulint           len)
1531 {
1532 	fil_space_t*	space;
1533 	byte*		key = NULL;
1534 	byte*		iv = NULL;
1535 	bool		is_new = false;
1536 
1537 	space = fil_space_get(space_id);
1538 	if (space == NULL) {
1539 		encryption_list_t::iterator	it;
1540 
1541 		if (recv_sys->encryption_list == NULL) {
1542 			recv_sys->encryption_list =
1543 				UT_NEW_NOKEY(encryption_list_t());
1544 		}
1545 
1546 		for (it = recv_sys->encryption_list->begin();
1547 		     it != recv_sys->encryption_list->end();
1548 		     it++) {
1549 			if (it->space_id == space_id) {
1550 				key = it->key;
1551 				iv = it->iv;
1552 			}
1553 		}
1554 
1555 		if (key == NULL) {
1556 			key = static_cast<byte*>(ut_malloc_nokey(
1557 					ENCRYPTION_KEY_LEN));
1558 			iv = static_cast<byte*>(ut_malloc_nokey(
1559 					ENCRYPTION_KEY_LEN));
1560 			is_new = true;
1561 		}
1562 	} else {
1563 		key = space->encryption_key;
1564 		iv = space->encryption_iv;
1565 	}
1566 
1567 	if  ((len != ENCRYPTION_INFO_SIZE_V1
1568 		&& len != ENCRYPTION_INFO_SIZE_V2)) {
1569 		recv_sys->set_corrupt_log();
1570 		return(NULL);
1571 	}
1572 
1573 #ifdef	UNIV_ENCRYPT_DEBUG
1574 	if (space) {
1575 		fprintf(stderr, "Got %lu from redo log:", space->id);
1576 	}
1577 #endif
1578 	if (!fsp_header_decode_encryption_info(key,
1579 					       iv,
1580 					       ptr)) {
1581 		recv_sys->set_corrupt_log();
1582 		ib::warn() << "Encryption information"
1583 			<< " in the redo log of space "
1584 			<< space_id << " is invalid";
1585 	}
1586 
1587 	ut_ad(len == ENCRYPTION_INFO_SIZE_V1
1588 	      || len == ENCRYPTION_INFO_SIZE_V2);
1589 
1590 	ptr += len;
1591 
1592 	if (space == NULL) {
1593 		if (is_new) {
1594 			recv_encryption_t info;
1595 
1596 			/* Add key and iv to list */
1597 			info.space_id = space_id;
1598 			info.key = key;
1599 			info.iv = iv;
1600 
1601 			recv_sys->encryption_list->push_back(info);
1602 		}
1603 	} else {
1604 		ut_ad(FSP_FLAGS_GET_ENCRYPTION(space->flags));
1605 
1606 		space->encryption_type = Encryption::AES;
1607 		space->encryption_klen = ENCRYPTION_KEY_LEN;
1608 	}
1609 
1610 	return(ptr);
1611 }
1612 
1613 /** Try to parse a single log record body and also applies it if
1614 specified.
1615 @param[in]	type		redo log entry type
1616 @param[in]	ptr		redo log record body
1617 @param[in]	end_ptr		end of buffer
1618 @param[in]	space_id	tablespace identifier
1619 @param[in]	page_no		page number
1620 @param[in]	apply		whether to apply the record
1621 @param[in,out]	block		buffer block, or NULL if
1622 a page log record should not be applied
1623 or if it is a MLOG_FILE_ operation
1624 @param[in,out]	mtr		mini-transaction, or NULL if
1625 a page log record should not be applied
1626 @return log record end, NULL if not a complete record */
1627 static
1628 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,ulint space_id,ulint page_no,bool apply,buf_block_t * block,mtr_t * mtr)1629 recv_parse_or_apply_log_rec_body(
1630 	mlog_id_t	type,
1631 	byte*		ptr,
1632 	byte*		end_ptr,
1633 	ulint		space_id,
1634 	ulint		page_no,
1635 	bool		apply,
1636 	buf_block_t*	block,
1637 	mtr_t*		mtr)
1638 {
1639 	ut_ad(!block == !mtr);
1640 
1641 	switch (type) {
1642 	case MLOG_FILE_NAME:
1643 	case MLOG_FILE_DELETE:
1644 	case MLOG_FILE_CREATE2:
1645 	case MLOG_FILE_RENAME2:
1646 		ut_ad(block == NULL);
1647 		/* Collect the file names when parsing the log,
1648 		before applying any log records. */
1649 		return(fil_name_parse(ptr, end_ptr, space_id, page_no, type,
1650 				      apply));
1651 	case MLOG_INDEX_LOAD:
1652 #ifdef UNIV_HOTBACKUP
1653 		/* While scaning redo logs during  backup phase a
1654 		MLOG_INDEX_LOAD type redo log record indicates a DDL
1655 		(create index, alter table...)is performed with
1656 		'algorithm=inplace'. This redo log indicates that
1657 
1658 		1. The DDL was started after MEB started backing up, in which
1659 		case MEB will not be able to take a consistent backup and should
1660 		fail. or
1661 		2. There is a possibility of this record existing in the REDO
1662 		even after the completion of the index create operation. This is
1663 		because of InnoDB does  not checkpointing after the flushing the
1664 		index pages.
1665 
1666 		If MEB gets the last_redo_flush_lsn and that is less than the
1667 		lsn of the current record MEB fails the backup process.
1668 		Error out in case of online backup and emit a warning in case
1669 		of offline backup and continue.
1670 		*/
1671 		if (!recv_recovery_on) {
1672 			if (is_online_redo_copy) {
1673 				if (backup_redo_log_flushed_lsn
1674 				    < recv_sys->recovered_lsn) {
1675 					ib::trace() << "Last flushed lsn: "
1676 						<< backup_redo_log_flushed_lsn
1677 						<< " load_index lsn "
1678 						<< recv_sys->recovered_lsn;
1679 
1680 					if (backup_redo_log_flushed_lsn == 0)
1681 						ib::error() << "MEB was not "
1682 							"able to determine the"
1683 							"InnoDB Engine Status";
1684 
1685 					ib::fatal() << "An optimized(without"
1686 						" redo logging) DDLoperation"
1687 						" has been performed. All"
1688 						" modified pages may not have"
1689 						" been flushed to the disk yet."
1690 						" \n    MEB will not be able"
1691 						" take a consistent backup."
1692 						" Retry the backup operation";
1693 				}
1694 				/** else the index is flushed to disk before
1695 				backup started hence no error */
1696 			} else {
1697 				/* offline backup */
1698 				ib::trace() << "Last flushed lsn: "
1699 					<< backup_redo_log_flushed_lsn
1700 					<< " load_index lsn "
1701 					<< recv_sys->recovered_lsn;
1702 
1703 				ib::warn() << "An optimized(without redo"
1704 					" logging) DDL operation has been"
1705 					" performed. All modified pages may not"
1706 					" have been flushed to the disk yet."
1707 					" \n    This offline backup may not"
1708 					" be consistent";
1709 			}
1710 		}
1711 #endif /* UNIV_HOTBACKUP */
1712 		if (end_ptr < ptr + 8) {
1713 			return(NULL);
1714 		}
1715 		return(ptr + 8);
1716 	case MLOG_TRUNCATE:
1717 		return(truncate_t::parse_redo_entry(ptr, end_ptr, space_id));
1718         case MLOG_WRITE_STRING:
1719 		/* For encrypted tablespace, we need to get the
1720 		encryption key information before the page 0 is recovered.
1721 	        Otherwise, redo will not find the key to decrypt
1722 		the data pages. */
1723                 if (page_no == 0 && !apply) {
1724 			byte* ptr_copy = ptr;
1725 			ulint offset = mach_read_from_2(ptr_copy);
1726 			ptr_copy += 2;
1727 			ulint len = mach_read_from_2(ptr_copy);
1728 			ptr_copy += 2;
1729 			if (end_ptr < ptr_copy + len)
1730 				return NULL;
1731 
1732 			if (memcmp(ptr_copy, ENCRYPTION_KEY_MAGIC_V1,
1733 				ENCRYPTION_MAGIC_SIZE) == 0 ||
1734 			    memcmp(ptr_copy, ENCRYPTION_KEY_MAGIC_V2,
1735 				ENCRYPTION_MAGIC_SIZE) == 0 ||
1736 			    memcmp(ptr_copy, ENCRYPTION_KEY_MAGIC_V3,
1737 				ENCRYPTION_MAGIC_SIZE) == 0) {
1738 
1739 				if (offset >= UNIV_PAGE_SIZE
1740 				    || len + offset > UNIV_PAGE_SIZE) {
1741 					recv_sys->set_corrupt_log();
1742 					return NULL;
1743 				}
1744 
1745 				return(fil_write_encryption_parse(ptr_copy,
1746 								  end_ptr,
1747 								  space_id,
1748 								  len));
1749 			} else if (memcmp(ptr_copy, ENCRYPTION_KEY_MAGIC_PS_V1,
1750 				   ENCRYPTION_MAGIC_SIZE) == 0) {
1751 				return(fil_parse_write_crypt_data(ptr_copy,
1752 								  end_ptr,
1753 								  block,
1754 								  len));
1755 			}
1756 		}
1757 		break;
1758 
1759 	default:
1760 		break;
1761 	}
1762 
1763 	dict_index_t*	index	= NULL;
1764 	page_t*		page;
1765 	page_zip_des_t*	page_zip;
1766 #ifdef UNIV_DEBUG
1767 	ulint		page_type;
1768 #endif /* UNIV_DEBUG */
1769 
1770 	if (block) {
1771 		/* Applying a page log record. */
1772 		page = block->frame;
1773 		page_zip = buf_block_get_page_zip(block);
1774 		ut_d(page_type = fil_page_get_type(page));
1775 	} else {
1776 		/* Parsing a page log record. */
1777 		page = NULL;
1778 		page_zip = NULL;
1779 		ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1780 	}
1781 
1782 	const byte*	old_ptr = ptr;
1783 
1784 	switch (type) {
1785 #ifdef UNIV_LOG_LSN_DEBUG
1786 	case MLOG_LSN:
1787 		/* The LSN is checked in recv_parse_log_rec(). */
1788 		break;
1789 #endif /* UNIV_LOG_LSN_DEBUG */
1790 	case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1791 #ifdef UNIV_DEBUG
1792 		if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1793 		    && end_ptr >= ptr + 2) {
1794 			/* It is OK to set FIL_PAGE_TYPE and certain
1795 			list node fields on an empty page.  Any other
1796 			write is not OK. */
1797 
1798 			/* NOTE: There may be bogus assertion failures for
1799 			dict_hdr_create(), trx_rseg_header_create(),
1800 			trx_sys_create_doublewrite_buf(), and
1801 			trx_sysf_create().
1802 			These are only called during database creation. */
1803 			ulint	offs = mach_read_from_2(ptr);
1804 
1805 			switch (type) {
1806 			default:
1807 				ut_error;
1808 			case MLOG_2BYTES:
1809 				/* Note that this can fail when the
1810 				redo log been written with something
1811 				older than InnoDB Plugin 1.0.4. */
1812 				ut_ad(offs == FIL_PAGE_TYPE
1813 				      || offs == IBUF_TREE_SEG_HEADER
1814 				      + IBUF_HEADER + FSEG_HDR_OFFSET
1815 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1816 				      + PAGE_HEADER + FIL_ADDR_BYTE
1817 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1818 				      + PAGE_HEADER + FIL_ADDR_BYTE
1819 				      + FIL_ADDR_SIZE
1820 				      || offs == PAGE_BTR_SEG_LEAF
1821 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1822 				      || offs == PAGE_BTR_SEG_TOP
1823 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1824 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1825 				      + PAGE_HEADER + FIL_ADDR_BYTE
1826 				      + 0 /*FLST_PREV*/
1827 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1828 				      + PAGE_HEADER + FIL_ADDR_BYTE
1829 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1830 				break;
1831 			case MLOG_4BYTES:
1832 				/* Note that this can fail when the
1833 				redo log been written with something
1834 				older than InnoDB Plugin 1.0.4. */
1835 				ut_ad(0
1836 				      /* fil_crypt_rotate_page() writes this */
1837 				      || offs == FIL_PAGE_SPACE_ID
1838 				      || offs == IBUF_TREE_SEG_HEADER
1839 				      + IBUF_HEADER + FSEG_HDR_SPACE
1840 				      || offs == IBUF_TREE_SEG_HEADER
1841 				      + IBUF_HEADER + FSEG_HDR_PAGE_NO
1842 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1843 				      + PAGE_HEADER/* flst_init */
1844 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1845 				      + PAGE_HEADER + FIL_ADDR_PAGE
1846 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1847 				      + PAGE_HEADER + FIL_ADDR_PAGE
1848 				      + FIL_ADDR_SIZE
1849 				      || offs == PAGE_BTR_SEG_LEAF
1850 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1851 				      || offs == PAGE_BTR_SEG_LEAF
1852 				      + PAGE_HEADER + FSEG_HDR_SPACE
1853 				      || offs == PAGE_BTR_SEG_TOP
1854 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1855 				      || offs == PAGE_BTR_SEG_TOP
1856 				      + PAGE_HEADER + FSEG_HDR_SPACE
1857 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1858 				      + PAGE_HEADER + FIL_ADDR_PAGE
1859 				      + 0 /*FLST_PREV*/
1860 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1861 				      + PAGE_HEADER + FIL_ADDR_PAGE
1862 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1863 				break;
1864 			}
1865 		}
1866 #endif /* UNIV_DEBUG */
1867 		ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1868 		if (ptr != NULL && page != NULL
1869 		    && page_no == 0 && type == MLOG_4BYTES) {
1870 			ulint	offs = mach_read_from_2(old_ptr);
1871 			switch (offs) {
1872 				fil_space_t*	space;
1873 				ulint		val;
1874 			default:
1875 				break;
1876 			case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1877 			case FSP_HEADER_OFFSET + FSP_SIZE:
1878 			case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1879 			case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1880 				space = fil_space_get(space_id);
1881 				ut_a(space != NULL);
1882 				val = mach_read_from_4(page + offs);
1883 
1884 				switch (offs) {
1885 				case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1886 					space->flags = val;
1887 					break;
1888 				case FSP_HEADER_OFFSET + FSP_SIZE:
1889 					space->size_in_header = val;
1890 					break;
1891 				case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1892 					space->free_limit = val;
1893 					break;
1894 				case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1895 					space->free_len = val;
1896 					ut_ad(val == flst_get_len(
1897 						      page + offs));
1898 					break;
1899 				}
1900 			}
1901 		}
1902 		break;
1903 	case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1904 		ut_ad(!page || fil_page_type_is_index(page_type));
1905 
1906 		if (NULL != (ptr = mlog_parse_index(
1907 				     ptr, end_ptr,
1908 				     type == MLOG_COMP_REC_INSERT,
1909 				     &index))) {
1910 			ut_a(!page
1911 			     || (ibool)!!page_is_comp(page)
1912 			     == dict_table_is_comp(index->table));
1913 			ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1914 							block, index, mtr);
1915 		}
1916 		break;
1917 	case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1918 		ut_ad(!page || fil_page_type_is_index(page_type));
1919 
1920 		if (NULL != (ptr = mlog_parse_index(
1921 				     ptr, end_ptr,
1922 				     type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1923 				     &index))) {
1924 			ut_a(!page
1925 			     || (ibool)!!page_is_comp(page)
1926 			     == dict_table_is_comp(index->table));
1927 			ptr = btr_cur_parse_del_mark_set_clust_rec(
1928 				ptr, end_ptr, page, page_zip, index);
1929 		}
1930 		break;
1931 	case MLOG_COMP_REC_SEC_DELETE_MARK:
1932 		ut_ad(!page || fil_page_type_is_index(page_type));
1933 		/* This log record type is obsolete, but we process it for
1934 		backward compatibility with MySQL 5.0.3 and 5.0.4. */
1935 		ut_a(!page || page_is_comp(page));
1936 		ut_a(!page_zip);
1937 		ptr = mlog_parse_index(ptr, end_ptr, TRUE, &index);
1938 		if (!ptr) {
1939 			break;
1940 		}
1941 		/* Fall through */
1942 	case MLOG_REC_SEC_DELETE_MARK:
1943 		ut_ad(!page || fil_page_type_is_index(page_type));
1944 		ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1945 							 page, page_zip);
1946 		break;
1947 	case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1948 		ut_ad(!page || fil_page_type_is_index(page_type));
1949 
1950 		if (NULL != (ptr = mlog_parse_index(
1951 				     ptr, end_ptr,
1952 				     type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1953 				     &index))) {
1954 			ut_a(!page
1955 			     || (ibool)!!page_is_comp(page)
1956 			     == dict_table_is_comp(index->table));
1957 			ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1958 							    page_zip, index);
1959 		}
1960 		break;
1961 	case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1962 	case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1963 		ut_ad(!page || fil_page_type_is_index(page_type));
1964 
1965 		if (NULL != (ptr = mlog_parse_index(
1966 				     ptr, end_ptr,
1967 				     type == MLOG_COMP_LIST_END_DELETE
1968 				     || type == MLOG_COMP_LIST_START_DELETE,
1969 				     &index))) {
1970 			ut_a(!page
1971 			     || (ibool)!!page_is_comp(page)
1972 			     == dict_table_is_comp(index->table));
1973 			ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1974 							 block, index, mtr);
1975 		}
1976 		break;
1977 	case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1978 		ut_ad(!page || fil_page_type_is_index(page_type));
1979 
1980 		if (NULL != (ptr = mlog_parse_index(
1981 				     ptr, end_ptr,
1982 				     type == MLOG_COMP_LIST_END_COPY_CREATED,
1983 				     &index))) {
1984 			ut_a(!page
1985 			     || (ibool)!!page_is_comp(page)
1986 			     == dict_table_is_comp(index->table));
1987 			ptr = page_parse_copy_rec_list_to_created_page(
1988 				ptr, end_ptr, block, index, mtr);
1989 		}
1990 		break;
1991 	case MLOG_PAGE_REORGANIZE:
1992 	case MLOG_COMP_PAGE_REORGANIZE:
1993 	case MLOG_ZIP_PAGE_REORGANIZE:
1994 		ut_ad(!page || fil_page_type_is_index(page_type));
1995 
1996 		if (NULL != (ptr = mlog_parse_index(
1997 				     ptr, end_ptr,
1998 				     type != MLOG_PAGE_REORGANIZE,
1999 				     &index))) {
2000 			ut_a(!page
2001 			     || (ibool)!!page_is_comp(page)
2002 			     == dict_table_is_comp(index->table));
2003 			ptr = btr_parse_page_reorganize(
2004 				ptr, end_ptr, index,
2005 				type == MLOG_ZIP_PAGE_REORGANIZE,
2006 				block, mtr);
2007 		}
2008 		break;
2009 	case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
2010 		/* Allow anything in page_type when creating a page. */
2011 		ut_a(!page_zip);
2012 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
2013 		break;
2014 	case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
2015 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
2016 				  true);
2017 		break;
2018 	case MLOG_UNDO_INSERT:
2019 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2020 		ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
2021 		break;
2022 	case MLOG_UNDO_ERASE_END:
2023 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2024 		ptr = trx_undo_parse_erase_page_end(ptr, end_ptr, page, mtr);
2025 		break;
2026 	case MLOG_UNDO_INIT:
2027 		/* Allow anything in page_type when creating a page. */
2028 		ptr = trx_undo_parse_page_init(ptr, end_ptr, page, mtr);
2029 		break;
2030 	case MLOG_UNDO_HDR_DISCARD:
2031 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2032 		ptr = trx_undo_parse_discard_latest(ptr, end_ptr, page, mtr);
2033 		break;
2034 	case MLOG_UNDO_HDR_CREATE:
2035 	case MLOG_UNDO_HDR_REUSE:
2036 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2037 		ptr = trx_undo_parse_page_header(type, ptr, end_ptr,
2038 						 page, mtr);
2039 		break;
2040 	case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
2041 		ut_ad(!page || fil_page_type_is_index(page_type));
2042 		/* On a compressed page, MLOG_COMP_REC_MIN_MARK
2043 		will be followed by MLOG_COMP_REC_DELETE
2044 		or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
2045 		in the same mini-transaction. */
2046 		ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
2047 		ptr = btr_parse_set_min_rec_mark(
2048 			ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
2049 			page, mtr);
2050 		break;
2051 	case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
2052 		ut_ad(!page || fil_page_type_is_index(page_type));
2053 
2054 		if (NULL != (ptr = mlog_parse_index(
2055 				     ptr, end_ptr,
2056 				     type == MLOG_COMP_REC_DELETE,
2057 				     &index))) {
2058 			ut_a(!page
2059 			     || (ibool)!!page_is_comp(page)
2060 			     == dict_table_is_comp(index->table));
2061 			ptr = page_cur_parse_delete_rec(ptr, end_ptr,
2062 							block, index, mtr);
2063 		}
2064 		break;
2065 	case MLOG_IBUF_BITMAP_INIT:
2066 		/* Allow anything in page_type when creating a page. */
2067 		ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
2068 		break;
2069 	case MLOG_INIT_FILE_PAGE:
2070 	case MLOG_INIT_FILE_PAGE2:
2071 		/* Allow anything in page_type when creating a page. */
2072 		ptr = fsp_parse_init_file_page(ptr, end_ptr, block);
2073 		break;
2074 	case MLOG_WRITE_STRING:
2075 		ut_ad(!page || page_type != FIL_PAGE_TYPE_ALLOCATED
2076 		      || page_no == 0);
2077 		ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
2078 		break;
2079 	case MLOG_ZIP_WRITE_NODE_PTR:
2080 		ut_ad(!page || fil_page_type_is_index(page_type));
2081 		ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
2082 						    page, page_zip);
2083 		break;
2084 	case MLOG_ZIP_WRITE_BLOB_PTR:
2085 		ut_ad(!page || fil_page_type_is_index(page_type));
2086 		ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
2087 						    page, page_zip);
2088 		break;
2089 	case MLOG_ZIP_WRITE_HEADER:
2090 		ut_ad(!page || fil_page_type_is_index(page_type));
2091 		ptr = page_zip_parse_write_header(ptr, end_ptr,
2092 						  page, page_zip);
2093 		break;
2094 	case MLOG_ZIP_PAGE_COMPRESS:
2095 		/* Allow anything in page_type when creating a page. */
2096 		ptr = page_zip_parse_compress(ptr, end_ptr,
2097 					      page, page_zip);
2098 		break;
2099 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
2100 		if (NULL != (ptr = mlog_parse_index(
2101 				ptr, end_ptr, TRUE, &index))) {
2102 
2103 			ut_a(!page || ((ibool)!!page_is_comp(page)
2104 				== dict_table_is_comp(index->table)));
2105 			ptr = page_zip_parse_compress_no_data(
2106 				ptr, end_ptr, page, page_zip, index);
2107 		}
2108 		break;
2109 	default:
2110 		ptr = NULL;
2111 		recv_sys->set_corrupt_log();
2112 	}
2113 
2114 	if (index) {
2115 		dict_table_t*	table = index->table;
2116 
2117 		dict_mem_index_free(index);
2118 		dict_mem_table_free(table);
2119 	}
2120 
2121 	return(ptr);
2122 }
2123 
2124 /*********************************************************************//**
2125 Calculates the fold value of a page file address: used in inserting or
2126 searching for a log record in the hash table.
2127 @return folded value */
2128 UNIV_INLINE
2129 ulint
recv_fold(ulint space,ulint page_no)2130 recv_fold(
2131 /*======*/
2132 	ulint	space,	/*!< in: space */
2133 	ulint	page_no)/*!< in: page number */
2134 {
2135 	return(ut_fold_ulint_pair(space, page_no));
2136 }
2137 
2138 /*********************************************************************//**
2139 Calculates the hash value of a page file address: used in inserting or
2140 searching for a log record in the hash table.
2141 @return folded value */
2142 UNIV_INLINE
2143 ulint
recv_hash(ulint space,ulint page_no)2144 recv_hash(
2145 /*======*/
2146 	ulint	space,	/*!< in: space */
2147 	ulint	page_no)/*!< in: page number */
2148 {
2149 	return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
2150 }
2151 
2152 /*********************************************************************//**
2153 Gets the hashed file address struct for a page.
2154 @return file address struct, NULL if not found from the hash table */
2155 
2156 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)2157 recv_get_fil_addr_struct(
2158 /*=====================*/
2159 	ulint	space,	/*!< in: space id */
2160 	ulint	page_no)/*!< in: page number */
2161 {
2162 	recv_addr_t*	recv_addr;
2163 
2164 	for (recv_addr = static_cast<recv_addr_t*>(
2165 			HASH_GET_FIRST(recv_sys->addr_hash,
2166 				       recv_hash(space, page_no)));
2167 	     recv_addr != 0;
2168 	     recv_addr = static_cast<recv_addr_t*>(
2169 		     HASH_GET_NEXT(addr_hash, recv_addr))) {
2170 
2171 		if (recv_addr->space == space
2172 		    && recv_addr->page_no == page_no) {
2173 
2174 			return(recv_addr);
2175 		}
2176 	}
2177 
2178 	return(NULL);
2179 }
2180 
2181 /*******************************************************************//**
2182 Adds a new log record to the hash table of log records. */
2183 static
2184 void
recv_add_to_hash_table(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t start_lsn,lsn_t end_lsn)2185 recv_add_to_hash_table(
2186 /*===================*/
2187 	mlog_id_t	type,		/*!< in: log record type */
2188 	ulint		space,		/*!< in: space id */
2189 	ulint		page_no,	/*!< in: page number */
2190 	byte*		body,		/*!< in: log record body */
2191 	byte*		rec_end,	/*!< in: log record end */
2192 	lsn_t		start_lsn,	/*!< in: start lsn of the mtr */
2193 	lsn_t		end_lsn)	/*!< in: end lsn of the mtr */
2194 {
2195 	recv_t*		recv;
2196 	ulint		len;
2197 	recv_data_t*	recv_data;
2198 	recv_data_t**	prev_field;
2199 	recv_addr_t*	recv_addr;
2200 
2201 	ut_ad(type != MLOG_FILE_DELETE);
2202 	ut_ad(type != MLOG_FILE_CREATE2);
2203 	ut_ad(type != MLOG_FILE_RENAME2);
2204 	ut_ad(type != MLOG_FILE_NAME);
2205 	ut_ad(type != MLOG_DUMMY_RECORD);
2206 	ut_ad(type != MLOG_CHECKPOINT);
2207 	ut_ad(type != MLOG_INDEX_LOAD);
2208 	ut_ad(type != MLOG_TRUNCATE);
2209 
2210 	len = rec_end - body;
2211 
2212 	recv = static_cast<recv_t*>(
2213 		mem_heap_alloc(recv_sys->heap, sizeof(recv_t)));
2214 
2215 	recv->type = type;
2216 	recv->len = rec_end - body;
2217 	recv->start_lsn = start_lsn;
2218 	recv->end_lsn = end_lsn;
2219 
2220 	recv_addr = recv_get_fil_addr_struct(space, page_no);
2221 
2222 	if (recv_addr == NULL) {
2223 		recv_addr = static_cast<recv_addr_t*>(
2224 			mem_heap_alloc(recv_sys->heap, sizeof(recv_addr_t)));
2225 
2226 		recv_addr->space = space;
2227 		recv_addr->page_no = page_no;
2228 		recv_addr->state = RECV_NOT_PROCESSED;
2229 
2230 		UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
2231 
2232 		HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
2233 			    recv_fold(space, page_no), recv_addr);
2234 		recv_sys->n_addrs++;
2235 #if 0
2236 		fprintf(stderr, "Inserting log rec for space %lu, page %lu\n",
2237 			space, page_no);
2238 #endif
2239 	}
2240 
2241 	UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
2242 
2243 	prev_field = &(recv->data);
2244 
2245 	/* Store the log record body in chunks of less than UNIV_PAGE_SIZE:
2246 	recv_sys->heap grows into the buffer pool, and bigger chunks could not
2247 	be allocated */
2248 
2249 	while (rec_end > body) {
2250 
2251 		len = rec_end - body;
2252 
2253 		if (len > RECV_DATA_BLOCK_SIZE) {
2254 			len = RECV_DATA_BLOCK_SIZE;
2255 		}
2256 
2257 		recv_data = static_cast<recv_data_t*>(
2258 			mem_heap_alloc(recv_sys->heap,
2259 				       sizeof(recv_data_t) + len));
2260 
2261 		*prev_field = recv_data;
2262 
2263 		memcpy(recv_data + 1, body, len);
2264 
2265 		prev_field = &(recv_data->next);
2266 
2267 		body += len;
2268 	}
2269 
2270 	*prev_field = NULL;
2271 }
2272 
2273 /*********************************************************************//**
2274 Copies the log record body from recv to buf. */
2275 static
2276 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)2277 recv_data_copy_to_buf(
2278 /*==================*/
2279 	byte*	buf,	/*!< in: buffer of length at least recv->len */
2280 	recv_t*	recv)	/*!< in: log record */
2281 {
2282 	recv_data_t*	recv_data;
2283 	ulint		part_len;
2284 	ulint		len;
2285 
2286 	len = recv->len;
2287 	recv_data = recv->data;
2288 
2289 	while (len > 0) {
2290 		if (len > RECV_DATA_BLOCK_SIZE) {
2291 			part_len = RECV_DATA_BLOCK_SIZE;
2292 		} else {
2293 			part_len = len;
2294 		}
2295 
2296 		ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
2297 			  part_len);
2298 		buf += part_len;
2299 		len -= part_len;
2300 
2301 		recv_data = recv_data->next;
2302 	}
2303 }
2304 
2305 /************************************************************************//**
2306 Applies the hashed log records to the page, if the page lsn is less than the
2307 lsn of a log record. This can be called when a buffer page has just been
2308 read in, or also for a page already in the buffer pool. */
2309 void
recv_recover_page_func(ibool just_read_in,buf_block_t * block)2310 recv_recover_page_func(
2311 /*===================*/
2312 #ifndef UNIV_HOTBACKUP
2313 	ibool		just_read_in,
2314 				/*!< in: TRUE if the i/o handler calls
2315 				this for a freshly read page */
2316 #endif /* !UNIV_HOTBACKUP */
2317 	buf_block_t*	block)	/*!< in/out: buffer block */
2318 {
2319 	page_t*		page;
2320 	page_zip_des_t*	page_zip;
2321 	recv_addr_t*	recv_addr;
2322 	recv_t*		recv;
2323 	byte*		buf;
2324 	lsn_t		start_lsn;
2325 	lsn_t		end_lsn;
2326 	lsn_t		page_lsn;
2327 	lsn_t		page_newest_lsn;
2328 	ibool		modification_to_page;
2329 	mtr_t		mtr;
2330 
2331 	mutex_enter(&(recv_sys->mutex));
2332 
2333 	if (recv_sys->apply_log_recs == FALSE) {
2334 
2335 		/* Log records should not be applied now */
2336 
2337 		mutex_exit(&(recv_sys->mutex));
2338 
2339 		return;
2340 	}
2341 
2342 	recv_addr = recv_get_fil_addr_struct(block->page.id.space(),
2343 					     block->page.id.page_no());
2344 
2345 	if ((recv_addr == NULL)
2346 		/* bugfix: http://bugs.mysql.com/bug.php?id=44140 */
2347 	    || (recv_addr->state == RECV_BEING_READ && !just_read_in)
2348 	    || (recv_addr->state == RECV_BEING_PROCESSED)
2349 	    || (recv_addr->state == RECV_PROCESSED)) {
2350 		ut_ad(recv_addr == NULL || recv_needed_recovery);
2351 
2352 		mutex_exit(&(recv_sys->mutex));
2353 
2354 		return;
2355 	}
2356 
2357 #ifndef UNIV_HOTBACKUP
2358 	ut_ad(recv_needed_recovery);
2359 
2360 	DBUG_PRINT("ib_log",
2361 		   ("Applying log to page %u:%u",
2362 		    recv_addr->space, recv_addr->page_no));
2363 #endif /* !UNIV_HOTBACKUP */
2364 
2365 	recv_addr->state = RECV_BEING_PROCESSED;
2366 
2367 	mutex_exit(&(recv_sys->mutex));
2368 
2369 	mtr_start(&mtr);
2370 	mtr_set_log_mode(&mtr, MTR_LOG_NONE);
2371 
2372 	page = block->frame;
2373 	page_zip = buf_block_get_page_zip(block);
2374 
2375 #ifndef UNIV_HOTBACKUP
2376 	if (just_read_in) {
2377 		/* Move the ownership of the x-latch on the page to
2378 		this OS thread, so that we can acquire a second
2379 		x-latch on it.  This is needed for the operations to
2380 		the page to pass the debug checks. */
2381 
2382 		rw_lock_x_lock_move_ownership(&block->lock);
2383 	}
2384 
2385 	ibool	success = buf_page_get_known_nowait(
2386 		RW_X_LATCH, block, BUF_KEEP_OLD,
2387 		__FILE__, __LINE__, &mtr);
2388 	ut_a(success);
2389 
2390 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2391 #endif /* !UNIV_HOTBACKUP */
2392 
2393 	/* Read the newest modification lsn from the page */
2394 	page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
2395 
2396 #ifndef UNIV_HOTBACKUP
2397 	/* It may be that the page has been modified in the buffer
2398 	pool: read the newest modification lsn there */
2399 
2400 	page_newest_lsn = buf_page_get_newest_modification(&block->page);
2401 
2402 	if (page_newest_lsn) {
2403 
2404 		page_lsn = page_newest_lsn;
2405 	}
2406 #else /* !UNIV_HOTBACKUP */
2407 	/* In recovery from a backup we do not really use the buffer pool */
2408 	page_newest_lsn = 0;
2409 #endif /* !UNIV_HOTBACKUP */
2410 
2411 	modification_to_page = FALSE;
2412 	start_lsn = end_lsn = 0;
2413 
2414 	recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
2415 
2416 	while (recv) {
2417 		end_lsn = recv->end_lsn;
2418 
2419 		ut_ad(end_lsn
2420 		      <= UT_LIST_GET_FIRST(log_sys->log_groups)->scanned_lsn);
2421 
2422 		if (recv->len > RECV_DATA_BLOCK_SIZE) {
2423 			/* We have to copy the record body to a separate
2424 			buffer */
2425 
2426 			buf = static_cast<byte*>(ut_malloc_nokey(recv->len));
2427 
2428 			recv_data_copy_to_buf(buf, recv);
2429 		} else {
2430 			buf = ((byte*)(recv->data)) + sizeof(recv_data_t);
2431 		}
2432 
2433 		if (recv->type == MLOG_INIT_FILE_PAGE) {
2434 			page_lsn = page_newest_lsn;
2435 
2436 			memset(FIL_PAGE_LSN + page, 0, 8);
2437 			memset(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM
2438 			       + page, 0, 8);
2439 
2440 			if (page_zip) {
2441 				memset(FIL_PAGE_LSN + page_zip->data, 0, 8);
2442 			}
2443 		}
2444 
2445 		/* If per-table tablespace was truncated and there exist REDO
2446 		records before truncate that are to be applied as part of
2447 		recovery (checkpoint didn't happen since truncate was done)
2448 		skip such records using lsn check as they may not stand valid
2449 		post truncate.
2450 		LSN at start of truncate is recorded and any redo record
2451 		with LSN less than recorded LSN is skipped.
2452 		Note: We can't skip complete recv_addr as same page may have
2453 		valid REDO records post truncate those needs to be applied. */
2454 		bool	skip_recv = false;
2455 		if (srv_was_tablespace_truncated(fil_space_get(recv_addr->space))) {
2456 			lsn_t	init_lsn =
2457 				truncate_t::get_truncated_tablespace_init_lsn(
2458 				recv_addr->space);
2459 			skip_recv = (recv->start_lsn < init_lsn);
2460 		}
2461 
2462 		/* Ignore applying the redo logs for tablespace that is
2463 		truncated. Post recovery there is fixup action that will
2464 		restore the tablespace back to normal state.
2465 		Applying redo at this stage can result in error given that
2466 		redo will have action recorded on page before tablespace
2467 		was re-inited and that would lead to an error while applying
2468 		such action. */
2469 		if (recv->start_lsn >= page_lsn
2470 		    && !srv_is_tablespace_truncated(recv_addr->space)
2471 		    && !skip_recv) {
2472 
2473 			lsn_t	end_lsn;
2474 
2475 			if (!modification_to_page) {
2476 
2477 				modification_to_page = TRUE;
2478 				start_lsn = recv->start_lsn;
2479 			}
2480 
2481 			DBUG_PRINT("ib_log",
2482 				   ("apply " LSN_PF ":"
2483 				    " %s len " ULINTPF " page %u:%u",
2484 				    recv->start_lsn,
2485 				    get_mlog_string(recv->type), recv->len,
2486 				    recv_addr->space,
2487 				    recv_addr->page_no));
2488 
2489 			recv_parse_or_apply_log_rec_body(
2490 				recv->type, buf, buf + recv->len,
2491 				recv_addr->space, recv_addr->page_no,
2492 				true, block, &mtr);
2493 
2494 			end_lsn = recv->start_lsn + recv->len;
2495 			mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2496 			mach_write_to_8(UNIV_PAGE_SIZE
2497 					- FIL_PAGE_END_LSN_OLD_CHKSUM
2498 					+ page, end_lsn);
2499 
2500 			if (page_zip) {
2501 				mach_write_to_8(FIL_PAGE_LSN
2502 						+ page_zip->data, end_lsn);
2503 			}
2504 		}
2505 
2506 		if (recv->len > RECV_DATA_BLOCK_SIZE) {
2507 			ut_free(buf);
2508 		}
2509 
2510 		recv = UT_LIST_GET_NEXT(rec_list, recv);
2511 	}
2512 
2513 #ifdef UNIV_ZIP_DEBUG
2514 	if (fil_page_index_page_check(page)) {
2515 		page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
2516 
2517 		ut_a(!page_zip
2518 		     || page_zip_validate_low(page_zip, page, NULL, FALSE));
2519 	}
2520 #endif /* UNIV_ZIP_DEBUG */
2521 
2522 #ifndef UNIV_HOTBACKUP
2523 	if (modification_to_page) {
2524 		ut_a(block);
2525 
2526 		log_flush_order_mutex_enter();
2527 		buf_flush_recv_note_modification(block, start_lsn, end_lsn);
2528 		log_flush_order_mutex_exit();
2529 	}
2530 #else /* !UNIV_HOTBACKUP */
2531 	start_lsn = start_lsn; /* Silence compiler */
2532 #endif /* !UNIV_HOTBACKUP */
2533 
2534 	/* Make sure that committing mtr does not change the modification
2535 	lsn values of page */
2536 
2537 	mtr.discard_modifications();
2538 
2539 	mtr_commit(&mtr);
2540 
2541 	mutex_enter(&(recv_sys->mutex));
2542 
2543 	if (recv_max_page_lsn < page_lsn) {
2544 		recv_max_page_lsn = page_lsn;
2545 	}
2546 
2547 	recv_addr->state = RECV_PROCESSED;
2548 
2549 	ut_a(recv_sys->n_addrs);
2550 	recv_sys->n_addrs--;
2551 
2552 	mutex_exit(&(recv_sys->mutex));
2553 
2554 }
2555 
2556 #ifndef UNIV_HOTBACKUP
2557 /** Reads in pages which have hashed log records, from an area around a given
2558 page number.
2559 @param[in]	page_id	page id
2560 @return number of pages found */
2561 static
2562 ulint
recv_read_in_area(const page_id_t & page_id)2563 recv_read_in_area(
2564 	const page_id_t&	page_id)
2565 {
2566 	recv_addr_t* recv_addr;
2567 	ulint	page_nos[RECV_READ_AHEAD_AREA];
2568 	ulint	low_limit;
2569 	ulint	n;
2570 
2571 	low_limit = page_id.page_no()
2572 		- (page_id.page_no() % RECV_READ_AHEAD_AREA);
2573 
2574 	n = 0;
2575 
2576 	for (ulint page_no = low_limit;
2577 	     page_no < low_limit + RECV_READ_AHEAD_AREA;
2578 	     page_no++) {
2579 
2580 		recv_addr = recv_get_fil_addr_struct(page_id.space(), page_no);
2581 
2582 		const page_id_t	cur_page_id(page_id.space(), page_no);
2583 
2584 		if (recv_addr && !buf_page_peek(cur_page_id)) {
2585 
2586 			mutex_enter(&(recv_sys->mutex));
2587 
2588 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2589 				recv_addr->state = RECV_BEING_READ;
2590 
2591 				page_nos[n] = page_no;
2592 
2593 				n++;
2594 			}
2595 
2596 			mutex_exit(&(recv_sys->mutex));
2597 		}
2598 	}
2599 
2600 	buf_read_recv_pages(FALSE, page_id.space(), page_nos, n);
2601 	/*
2602 	fprintf(stderr, "Recv pages at %lu n %lu\n", page_nos[0], n);
2603 	*/
2604 	return(n);
2605 }
2606 
2607 /*******************************************************************//**
2608 Empties the hash table of stored log records, applying them to appropriate
2609 pages. */
2610 void
recv_apply_hashed_log_recs(ibool allow_ibuf)2611 recv_apply_hashed_log_recs(
2612 /*=======================*/
2613 	ibool	allow_ibuf)	/*!< in: if TRUE, also ibuf operations are
2614 				allowed during the application; if FALSE,
2615 				no ibuf operations are allowed, and after
2616 				the application all file pages are flushed to
2617 				disk and invalidated in buffer pool: this
2618 				alternative means that no new log records
2619 				can be generated during the application;
2620 				the caller must in this case own the log
2621 				mutex */
2622 {
2623 	recv_addr_t* recv_addr;
2624 	ulint	i;
2625 	ibool	has_printed	= FALSE;
2626 	mtr_t	mtr;
2627 loop:
2628 	mutex_enter(&(recv_sys->mutex));
2629 
2630 	if (recv_sys->apply_batch_on) {
2631 		bool abort = recv_sys->found_corrupt_log;
2632 		mutex_exit(&(recv_sys->mutex));
2633 
2634 		if (abort) {
2635 			return;
2636 		}
2637 
2638 		os_thread_sleep(500000);
2639 
2640 		goto loop;
2641 	}
2642 
2643 	ut_ad(!allow_ibuf == log_mutex_own());
2644 
2645 	if (!allow_ibuf) {
2646 		recv_no_ibuf_operations = true;
2647 	}
2648 
2649 	recv_sys->apply_log_recs = TRUE;
2650 	recv_sys->apply_batch_on = TRUE;
2651 
2652 	for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
2653 
2654 		for (recv_addr = static_cast<recv_addr_t*>(
2655 				HASH_GET_FIRST(recv_sys->addr_hash, i));
2656 		     recv_addr != 0;
2657 		     recv_addr = static_cast<recv_addr_t*>(
2658 				HASH_GET_NEXT(addr_hash, recv_addr))) {
2659 
2660 			if (srv_is_tablespace_truncated(recv_addr->space)) {
2661 				/* Avoid applying REDO log for the tablespace
2662 				that is schedule for TRUNCATE. */
2663 				ut_a(recv_sys->n_addrs);
2664 				recv_addr->state = RECV_DISCARDED;
2665 				recv_sys->n_addrs--;
2666 				continue;
2667 			}
2668 
2669 			if (recv_addr->state == RECV_DISCARDED) {
2670 				ut_a(recv_sys->n_addrs);
2671 				recv_sys->n_addrs--;
2672 				continue;
2673 			}
2674 
2675 			const page_id_t		page_id(recv_addr->space,
2676 							recv_addr->page_no);
2677 			bool			found;
2678 			const page_size_t&	page_size
2679 				= fil_space_get_page_size(recv_addr->space,
2680 							  &found);
2681 
2682 			ut_ad(found);
2683 
2684 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2685 				if (!has_printed) {
2686 					ib::info() << "Starting an apply batch"
2687 						" of log records"
2688 						" to the database...";
2689 					fputs("InnoDB: Progress in percent: ",
2690 					      stderr);
2691 					has_printed = TRUE;
2692 				}
2693 
2694 				mutex_exit(&(recv_sys->mutex));
2695 
2696 				if (buf_page_peek(page_id)) {
2697 					buf_block_t*	block;
2698 
2699 					mtr_start(&mtr);
2700 
2701 					block = buf_page_get(
2702 						page_id, page_size,
2703 						RW_X_LATCH, &mtr);
2704 
2705 					buf_block_dbg_add_level(
2706 						block, SYNC_NO_ORDER_CHECK);
2707 
2708 					recv_recover_page(FALSE, block);
2709 					mtr_commit(&mtr);
2710 				} else {
2711 					recv_read_in_area(page_id);
2712 				}
2713 
2714 				mutex_enter(&(recv_sys->mutex));
2715 			}
2716 		}
2717 
2718 		if (has_printed
2719 		    && (i * 100) / hash_get_n_cells(recv_sys->addr_hash)
2720 		    != ((i + 1) * 100)
2721 		    / hash_get_n_cells(recv_sys->addr_hash)) {
2722 
2723 			fprintf(stderr, "%lu ", (ulong)
2724 				((i * 100)
2725 				 / hash_get_n_cells(recv_sys->addr_hash)));
2726 		}
2727 	}
2728 
2729 	/* Wait until all the pages have been processed */
2730 
2731 	while (recv_sys->n_addrs != 0) {
2732                 bool abort = recv_sys->found_corrupt_log;
2733 
2734 		mutex_exit(&(recv_sys->mutex));
2735 
2736 		if (abort) {
2737 			return;
2738 		}
2739 
2740 		os_thread_sleep(500000);
2741 
2742 		mutex_enter(&(recv_sys->mutex));
2743 	}
2744 
2745 	if (has_printed) {
2746 
2747 		fprintf(stderr, "\n");
2748 	}
2749 
2750 	if (!allow_ibuf) {
2751 
2752 		/* Flush all the file pages to disk and invalidate them in
2753 		the buffer pool */
2754 
2755 		ut_d(recv_no_log_write = true);
2756 		mutex_exit(&(recv_sys->mutex));
2757 		log_mutex_exit();
2758 
2759 		os_event_reset(recv_sys->flush_end);
2760 		os_event_set(recv_sys->flush_start);
2761 		os_event_wait(recv_sys->flush_end);
2762 
2763 		/* Wait for any currently run batch to end. */
2764 		buf_flush_wait_LRU_batch_end();
2765 
2766 		buf_pool_invalidate();
2767 
2768 		log_mutex_enter();
2769 		mutex_enter(&(recv_sys->mutex));
2770 		ut_d(recv_no_log_write = false);
2771 
2772 		recv_no_ibuf_operations = false;
2773 	}
2774 
2775 	recv_sys->apply_log_recs = FALSE;
2776 	recv_sys->apply_batch_on = FALSE;
2777 
2778 	recv_sys_empty_hash();
2779 
2780 	if (has_printed) {
2781 		ib::info() << "Apply batch completed";
2782 	}
2783 
2784 	mutex_exit(&(recv_sys->mutex));
2785 }
2786 #else /* !UNIV_HOTBACKUP */
2787 /*******************************************************************//**
2788 Applies log records in the hash table to a backup. */
2789 void
recv_apply_log_recs_for_backup(void)2790 recv_apply_log_recs_for_backup(void)
2791 /*================================*/
2792 {
2793 	recv_addr_t*	recv_addr;
2794 	ulint		n_hash_cells;
2795 	buf_block_t*	block;
2796 	bool		success;
2797 	ulint		error;
2798 	ulint		i;
2799 	fil_space_t*	space = NULL;
2800 	page_id_t	page_id;
2801 	recv_sys->apply_log_recs = TRUE;
2802 	recv_sys->apply_batch_on = TRUE;
2803 
2804 	block = back_block1;
2805 
2806 	ib::info() << "Starting an apply batch of log records to the"
2807 		" database...\n";
2808 
2809 	fputs("InnoDB: Progress in percent: ", stderr);
2810 
2811 	n_hash_cells = hash_get_n_cells(recv_sys->addr_hash);
2812 
2813 	for (i = 0; i < n_hash_cells; i++) {
2814 		/* The address hash table is externally chained */
2815 		recv_addr = static_cast<recv_addr_t*>(hash_get_nth_cell(
2816 					recv_sys->addr_hash, i)->node);
2817 
2818 		while (recv_addr != NULL) {
2819 
2820 			ib::trace() << "recv_addr {State: " << recv_addr->state
2821 				<< ", Space id: " << recv_addr->space
2822 				<< "Page no: " << recv_addr->page_no
2823 				<< ". index i: " << i << "\n";
2824 
2825 			bool			found;
2826 			const page_size_t&	page_size
2827 				= fil_space_get_page_size(recv_addr->space,
2828 							  &found);
2829 
2830 			if (!found) {
2831 #if 0
2832 				fprintf(stderr,
2833 					"InnoDB: Warning: cannot apply"
2834 					" log record to"
2835 					" tablespace %lu page %lu,\n"
2836 					"InnoDB: because tablespace with"
2837 					" that id does not exist.\n",
2838 					recv_addr->space, recv_addr->page_no);
2839 #endif
2840 				recv_addr->state = RECV_DISCARDED;
2841 
2842 				ut_a(recv_sys->n_addrs);
2843 				recv_sys->n_addrs--;
2844 
2845 				goto skip_this_recv_addr;
2846 			}
2847 
2848 			/* We simulate a page read made by the buffer pool, to
2849 			make sure the recovery apparatus works ok. We must init
2850 			the block. */
2851 
2852 			buf_page_init_for_backup_restore(
2853 				page_id_t(recv_addr->space, recv_addr->page_no),
2854 				page_size, block);
2855 
2856 			/* Extend the tablespace's last file if the page_no
2857 			does not fall inside its bounds; we assume the last
2858 			file is auto-extending, and mysqlbackup copied the file
2859 			when it still was smaller */
2860 			fil_space_t*	space
2861 				= fil_space_get(recv_addr->space);
2862 
2863 			success = fil_space_extend(
2864 				space, recv_addr->page_no + 1);
2865 			if (!success) {
2866 				ib::fatal() << "Cannot extend tablespace "
2867 					<< recv_addr->space << " to hold "
2868 					<< recv_addr->page_no << " pages";
2869 			}
2870 
2871 			/* Read the page from the tablespace file using the
2872 			fil0fil.cc routines */
2873 
2874 			const page_id_t	page_id(recv_addr->space,
2875 						recv_addr->page_no);
2876 
2877 			if (page_size.is_compressed()) {
2878 
2879 				error = fil_io(
2880 					IORequestRead, true,
2881 					page_id,
2882 					page_size, 0, page_size.physical(),
2883 					block->page.zip.data, NULL);
2884 
2885 				if (error == DB_SUCCESS
2886 				    && !buf_zip_decompress(block, TRUE)) {
2887 					ut_error;
2888 				}
2889 			} else {
2890 
2891 				error = fil_io(
2892 					IORequestRead, true,
2893 					page_id, page_size, 0,
2894 					page_size.logical(),
2895 					block->frame, NULL);
2896 			}
2897 
2898 			if (error != DB_SUCCESS) {
2899 				ib::fatal() << "Cannot read from tablespace "
2900 					<< recv_addr->space << " page number "
2901 					<< recv_addr->page_no;
2902 			}
2903 
2904 			/* Apply the log records to this page */
2905 			recv_recover_page(FALSE, block);
2906 
2907 			/* Write the page back to the tablespace file using the
2908 			fil0fil.cc routines */
2909 
2910 			buf_flush_init_for_writing(
2911 				block, block->frame,
2912 				buf_block_get_page_zip(block),
2913 				mach_read_from_8(block->frame + FIL_PAGE_LSN),
2914 				fsp_is_checksum_disabled(
2915 					block->page.id.space()));
2916 
2917 			if (page_size.is_compressed()) {
2918 
2919 				error = fil_io(
2920 					IORequestWrite, true, page_id,
2921 					page_size, 0, page_size.physical(),
2922 					block->page.zip.data, NULL);
2923 			} else {
2924 				error = fil_io(
2925 					IORequestWrite, true, page_id,
2926 					page_size, 0, page_size.logical(),
2927 					block->frame, NULL);
2928 			}
2929 skip_this_recv_addr:
2930 			recv_addr = static_cast<recv_addr_t*>(HASH_GET_NEXT(
2931 					addr_hash, recv_addr));
2932 		}
2933 
2934 		if ((100 * i) / n_hash_cells
2935 		    != (100 * (i + 1)) / n_hash_cells) {
2936 			fprintf(stderr, "%lu ",
2937 				(ulong) ((100 * i) / n_hash_cells));
2938 			fflush(stderr);
2939 		}
2940 	}
2941 	/* write logs in next line */
2942 	fprintf(stderr, "\n");
2943 	recv_sys->apply_log_recs = FALSE;
2944 	recv_sys->apply_batch_on = FALSE;
2945 	recv_sys_empty_hash();
2946 }
2947 #endif /* !UNIV_HOTBACKUP */
2948 
2949 /** Tries to parse a single log record.
2950 @param[out]	type		log record type
2951 @param[in]	ptr		pointer to a buffer
2952 @param[in]	end_ptr		end of the buffer
2953 @param[out]	space_id	tablespace identifier
2954 @param[out]	page_no		page number
2955 @param[in]	apply		whether to apply MLOG_FILE_* records
2956 @param[out]	body		start of log record body
2957 @return length of the record, or 0 if the record was not complete */
2958 
2959 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2960 recv_parse_log_rec(
2961 	mlog_id_t*	type,
2962 	byte*		ptr,
2963 	byte*		end_ptr,
2964 	ulint*		space,
2965 	ulint*		page_no,
2966 	bool		apply,
2967 	byte**		body)
2968 {
2969 	byte*	new_ptr;
2970 
2971 	*body = NULL;
2972 
2973 	UNIV_MEM_INVALID(type, sizeof *type);
2974 	UNIV_MEM_INVALID(space, sizeof *space);
2975 	UNIV_MEM_INVALID(page_no, sizeof *page_no);
2976 	UNIV_MEM_INVALID(body, sizeof *body);
2977 
2978 	if (ptr == end_ptr) {
2979 
2980 		return(0);
2981 	}
2982 
2983 	switch (*ptr) {
2984 #ifdef UNIV_LOG_LSN_DEBUG
2985 	case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2986 	case MLOG_LSN:
2987 		new_ptr = mlog_parse_initial_log_record(
2988 			ptr, end_ptr, type, space, page_no);
2989 		if (new_ptr != NULL) {
2990 			const lsn_t	lsn = static_cast<lsn_t>(
2991 				*space) << 32 | *page_no;
2992 			ut_a(lsn == recv_sys->recovered_lsn);
2993 		}
2994 
2995 		*type = MLOG_LSN;
2996 		return(new_ptr - ptr);
2997 #endif /* UNIV_LOG_LSN_DEBUG */
2998 	case MLOG_MULTI_REC_END:
2999 	case MLOG_DUMMY_RECORD:
3000 		*type = static_cast<mlog_id_t>(*ptr);
3001 		return(1);
3002 	case MLOG_CHECKPOINT:
3003 		if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
3004 			return(0);
3005 		}
3006 		*type = static_cast<mlog_id_t>(*ptr);
3007 		return(SIZE_OF_MLOG_CHECKPOINT);
3008 	case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
3009 	case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
3010 	case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
3011 		recv_sys->set_corrupt_log();
3012 		return(0);
3013 	}
3014 
3015 	new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
3016 						page_no);
3017 	*body = new_ptr;
3018 
3019 	if (UNIV_UNLIKELY(!new_ptr)) {
3020 
3021 		return(0);
3022 	}
3023 
3024 	new_ptr = recv_parse_or_apply_log_rec_body(
3025 		*type, new_ptr, end_ptr, *space, *page_no, apply, NULL, NULL);
3026 
3027 	if (UNIV_UNLIKELY(new_ptr == NULL)) {
3028 
3029 		return(0);
3030 	}
3031 
3032 	return(new_ptr - ptr);
3033 }
3034 
3035 /*******************************************************//**
3036 Calculates the new value for lsn when more data is added to the log. */
3037 
3038 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)3039 recv_calc_lsn_on_data_add(
3040 /*======================*/
3041 	lsn_t		lsn,	/*!< in: old lsn */
3042 	ib_uint64_t	len)	/*!< in: this many bytes of data is
3043 				added, log block headers not included */
3044 {
3045 	ulint		frag_len;
3046 	ib_uint64_t	lsn_len;
3047 
3048 	frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
3049 	ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
3050 	      - LOG_BLOCK_TRL_SIZE);
3051 	lsn_len = len;
3052 	lsn_len += (lsn_len + frag_len)
3053 		/ (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
3054 		   - LOG_BLOCK_TRL_SIZE)
3055 		* (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
3056 
3057 	return(lsn + lsn_len);
3058 }
3059 
3060 /** Prints diagnostic info of corrupt log.
3061 @param[in]	ptr	pointer to corrupt log record
3062 @param[in]	type	type of the log record (could be garbage)
3063 @param[in]	space	tablespace ID (could be garbage)
3064 @param[in]	page_no	page number (could be garbage)
3065 @return whether processing should continue */
3066 static
3067 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)3068 recv_report_corrupt_log(
3069 	const byte*	ptr,
3070 	int		type,
3071 	ulint		space,
3072 	ulint		page_no)
3073 {
3074 	ib::error() <<
3075 		"############### CORRUPT LOG RECORD FOUND ##################";
3076 
3077 	ib::info() << "Log record type " << type << ", page " << space << ":"
3078 		<< page_no << ". Log parsing proceeded successfully up to "
3079 		<< recv_sys->recovered_lsn << ". Previous log record type "
3080 		<< recv_previous_parsed_rec_type << ", is multi "
3081 		<< recv_previous_parsed_rec_is_multi << " Recv offset "
3082 		<< (ptr - recv_sys->buf) << ", prev "
3083 		<< recv_previous_parsed_rec_offset;
3084 
3085 	ut_ad(ptr <= recv_sys->buf + recv_sys->len);
3086 
3087 	const ulint	limit	= 100;
3088 	const ulint	before
3089 		= std::min(recv_previous_parsed_rec_offset, limit);
3090 	const ulint	after
3091 		= std::min(recv_sys->len - (ptr - recv_sys->buf), limit);
3092 
3093 	ib::info() << "Hex dump starting " << before << " bytes before and"
3094 		" ending " << after << " bytes after the corrupted record:";
3095 
3096 	ut_print_buf(stderr,
3097 		     recv_sys->buf
3098 		     + recv_previous_parsed_rec_offset - before,
3099 		     ptr - recv_sys->buf + before + after
3100 		     - recv_previous_parsed_rec_offset);
3101 	putc('\n', stderr);
3102 
3103 #ifndef UNIV_HOTBACKUP
3104 	if (!srv_force_recovery) {
3105 		ib::info() << "Set innodb_force_recovery to ignore this error.";
3106 		return(false);
3107 	}
3108 #endif /* !UNIV_HOTBACKUP */
3109 
3110 	ib::warn() << "The log file may have been corrupt and it is possible"
3111 		" that the log scan did not proceed far enough in recovery!"
3112 		" Please run CHECK TABLE on your InnoDB tables to check"
3113 		" that they are ok! If mysqld crashes after this recovery; "
3114 		<< FORCE_RECOVERY_MSG;
3115 	return(true);
3116 }
3117 
3118 /** Whether to store redo log records to the hash table */
3119 enum store_t {
3120 	/** Do not store redo log records. */
3121 	STORE_NO,
3122 	/** Store redo log records. */
3123 	STORE_YES,
3124 	/** Store redo log records if the tablespace exists. */
3125 	STORE_IF_EXISTS
3126 };
3127 
3128 /** Parse log records from a buffer and optionally store them to a
3129 hash table to wait merging to file pages.
3130 @param[in]	checkpoint_lsn	the LSN of the latest checkpoint
3131 @param[in]	store		whether to store page operations
3132 @return whether MLOG_CHECKPOINT record was seen the first time,
3133 or corruption was noticed */
3134 static MY_ATTRIBUTE((warn_unused_result))
3135 bool
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t store)3136 recv_parse_log_recs(
3137 	lsn_t		checkpoint_lsn,
3138 	store_t		store)
3139 {
3140 	byte*		ptr;
3141 	byte*		end_ptr;
3142 	bool		single_rec;
3143 	ulint		len;
3144 	lsn_t		new_recovered_lsn;
3145 	lsn_t		old_lsn;
3146 	mlog_id_t	type;
3147 	ulint		space;
3148 	ulint		page_no;
3149 	byte*		body;
3150 
3151 	ut_ad(log_mutex_own());
3152 	ut_ad(recv_sys->parse_start_lsn != 0);
3153 loop:
3154 	ptr = recv_sys->buf + recv_sys->recovered_offset;
3155 
3156 	end_ptr = recv_sys->buf + recv_sys->len;
3157 
3158 	if (ptr == end_ptr) {
3159 
3160 		return(false);
3161 	}
3162 
3163 	switch (*ptr) {
3164 	case MLOG_CHECKPOINT:
3165 #ifdef UNIV_LOG_LSN_DEBUG
3166 	case MLOG_LSN:
3167 #endif /* UNIV_LOG_LSN_DEBUG */
3168 	case MLOG_DUMMY_RECORD:
3169 		single_rec = true;
3170 		break;
3171 	default:
3172 		single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
3173 	}
3174 
3175 	if (single_rec) {
3176 		/* The mtr did not modify multiple pages */
3177 
3178 		old_lsn = recv_sys->recovered_lsn;
3179 
3180 		/* Try to parse a log record, fetching its type, space id,
3181 		page no, and a pointer to the body of the log record */
3182 
3183 		len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
3184 					 &page_no, true, &body);
3185 
3186 		if (len == 0) {
3187 			return(false);
3188 		}
3189 
3190 		if (recv_sys->found_corrupt_log) {
3191 			recv_report_corrupt_log(
3192 				ptr, type, space, page_no);
3193 			return(true);
3194 		}
3195 
3196 		if (recv_sys->found_corrupt_fs) {
3197 			return(true);
3198 		}
3199 
3200 		new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
3201 
3202 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
3203 			/* The log record filled a log block, and we require
3204 			that also the next log block should have been scanned
3205 			in */
3206 
3207 			return(false);
3208 		}
3209 
3210 		recv_previous_parsed_rec_type = type;
3211 		recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
3212 		recv_previous_parsed_rec_is_multi = 0;
3213 
3214 		recv_sys->recovered_offset += len;
3215 		recv_sys->recovered_lsn = new_recovered_lsn;
3216 
3217 		switch (type) {
3218 			lsn_t	lsn;
3219 		case MLOG_DUMMY_RECORD:
3220 			/* Do nothing */
3221 			break;
3222 		case MLOG_CHECKPOINT:
3223 #if SIZE_OF_MLOG_CHECKPOINT != 1 + 8
3224 # error SIZE_OF_MLOG_CHECKPOINT != 1 + 8
3225 #endif
3226 			lsn = mach_read_from_8(ptr + 1);
3227 
3228 			DBUG_PRINT("ib_log",
3229 				   ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
3230 				    LSN_PF,
3231 				    lsn,
3232 				    lsn != checkpoint_lsn ? "ignored"
3233 				    : recv_sys->mlog_checkpoint_lsn
3234 				    ? "reread" : "read",
3235 				    recv_sys->recovered_lsn));
3236 
3237 			if (lsn == checkpoint_lsn) {
3238 				if (recv_sys->mlog_checkpoint_lsn) {
3239 					/* At recv_reset_logs() we may
3240 					write a duplicate MLOG_CHECKPOINT
3241 					for the same checkpoint LSN. Thus
3242 					recv_sys->mlog_checkpoint_lsn
3243 					can differ from the current LSN. */
3244 					ut_ad(recv_sys->mlog_checkpoint_lsn
3245 					      <= recv_sys->recovered_lsn);
3246 					break;
3247 				}
3248 				recv_sys->mlog_checkpoint_lsn
3249 					= recv_sys->recovered_lsn;
3250 			}
3251 			break;
3252 		case MLOG_FILE_NAME:
3253 		case MLOG_FILE_DELETE:
3254 		case MLOG_FILE_CREATE2:
3255 		case MLOG_FILE_RENAME2:
3256 		case MLOG_TRUNCATE:
3257 			/* These were already handled by
3258 			recv_parse_log_rec() and
3259 			recv_parse_or_apply_log_rec_body(). */
3260 			break;
3261 #ifdef UNIV_LOG_LSN_DEBUG
3262 		case MLOG_LSN:
3263 			/* Do not add these records to the hash table.
3264 			The page number and space id fields are misused
3265 			for something else. */
3266 			break;
3267 #endif /* UNIV_LOG_LSN_DEBUG */
3268 		default:
3269 			switch (store) {
3270 			case STORE_NO:
3271 				break;
3272 			case STORE_IF_EXISTS:
3273 				if (fil_space_get_flags(space)
3274 				    == ULINT_UNDEFINED) {
3275 					break;
3276 				}
3277 				/* fall through */
3278 			case STORE_YES:
3279 				recv_add_to_hash_table(
3280 					type, space, page_no, body,
3281 					ptr + len, old_lsn,
3282 					recv_sys->recovered_lsn);
3283 			}
3284 			/* fall through */
3285 		case MLOG_INDEX_LOAD:
3286 			DBUG_PRINT("ib_log",
3287 				("scan " LSN_PF ": log rec %s"
3288 				" len " ULINTPF
3289 				" page " ULINTPF ":" ULINTPF,
3290 				old_lsn, get_mlog_string(type),
3291 				len, space, page_no));
3292 		}
3293 	} else {
3294 		/* Check that all the records associated with the single mtr
3295 		are included within the buffer */
3296 
3297 		ulint	total_len	= 0;
3298 		ulint	n_recs		= 0;
3299 		bool	only_mlog_file	= true;
3300 		ulint	mlog_rec_len	= 0;
3301 
3302 		for (;;) {
3303 			len = recv_parse_log_rec(
3304 				&type, ptr, end_ptr, &space, &page_no,
3305 				false, &body);
3306 
3307 			if (len == 0) {
3308 				return(false);
3309 			}
3310 
3311 			if (recv_sys->found_corrupt_log
3312 			    || type == MLOG_CHECKPOINT
3313 			    || (*ptr & MLOG_SINGLE_REC_FLAG)) {
3314 				recv_sys->set_corrupt_log();
3315 				recv_report_corrupt_log(
3316 					ptr, type, space, page_no);
3317 				return(true);
3318 			}
3319 
3320 			if (recv_sys->found_corrupt_fs) {
3321 				return(true);
3322 			}
3323 
3324 			recv_previous_parsed_rec_type = type;
3325 			recv_previous_parsed_rec_offset
3326 				= recv_sys->recovered_offset + total_len;
3327 			recv_previous_parsed_rec_is_multi = 1;
3328 
3329 			/* MLOG_FILE_NAME redo log records doesn't make changes
3330 			to persistent data. If only MLOG_FILE_NAME redo
3331 			log record exists then reset the parsing buffer pointer
3332 			by changing recovered_lsn and recovered_offset. */
3333 			if (type != MLOG_FILE_NAME && only_mlog_file == true) {
3334 				only_mlog_file = false;
3335 			}
3336 
3337 			if (only_mlog_file) {
3338 				new_recovered_lsn = recv_calc_lsn_on_data_add(
3339 					recv_sys->recovered_lsn, len);
3340 				mlog_rec_len += len;
3341 				recv_sys->recovered_offset += len;
3342 				recv_sys->recovered_lsn = new_recovered_lsn;
3343 			}
3344 
3345 			total_len += len;
3346 			n_recs++;
3347 
3348 			ptr += len;
3349 
3350 			if (type == MLOG_MULTI_REC_END) {
3351 				DBUG_PRINT("ib_log",
3352 					   ("scan " LSN_PF
3353 					    ": multi-log end"
3354 					    " total_len " ULINTPF
3355 					    " n=" ULINTPF,
3356 					    recv_sys->recovered_lsn,
3357 					    total_len, n_recs));
3358 				total_len -= mlog_rec_len;
3359 				break;
3360 			}
3361 
3362 			DBUG_PRINT("ib_log",
3363 				   ("scan " LSN_PF ": multi-log rec %s"
3364 				    " len " ULINTPF
3365 				    " page " ULINTPF ":" ULINTPF,
3366 				    recv_sys->recovered_lsn,
3367 				    get_mlog_string(type), len, space, page_no));
3368 		}
3369 
3370 		new_recovered_lsn = recv_calc_lsn_on_data_add(
3371 			recv_sys->recovered_lsn, total_len);
3372 
3373 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
3374 			/* The log record filled a log block, and we require
3375 			that also the next log block should have been scanned
3376 			in */
3377 
3378 			return(false);
3379 		}
3380 
3381 		/* Add all the records to the hash table */
3382 
3383 		ptr = recv_sys->buf + recv_sys->recovered_offset;
3384 
3385 		for (;;) {
3386 			old_lsn = recv_sys->recovered_lsn;
3387 			/* This will apply MLOG_FILE_ records. We
3388 			had to skip them in the first scan, because we
3389 			did not know if the mini-transaction was
3390 			completely recovered (until MLOG_MULTI_REC_END). */
3391 			len = recv_parse_log_rec(
3392 				&type, ptr, end_ptr, &space, &page_no,
3393 				true, &body);
3394 
3395 			if (recv_sys->found_corrupt_log
3396 			    && !recv_report_corrupt_log(
3397 				    ptr, type, space, page_no)) {
3398 				return(true);
3399 			}
3400 
3401 			if (recv_sys->found_corrupt_fs) {
3402 				return(true);
3403 			}
3404 
3405 			ut_a(len != 0);
3406 			ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
3407 
3408 			recv_sys->recovered_offset += len;
3409 			recv_sys->recovered_lsn
3410 				= recv_calc_lsn_on_data_add(old_lsn, len);
3411 
3412 			switch (type) {
3413 			case MLOG_MULTI_REC_END:
3414 				/* Found the end mark for the records */
3415 				goto loop;
3416 #ifdef UNIV_LOG_LSN_DEBUG
3417 			case MLOG_LSN:
3418 				/* Do not add these records to the hash table.
3419 				The page number and space id fields are misused
3420 				for something else. */
3421 				break;
3422 #endif /* UNIV_LOG_LSN_DEBUG */
3423 			case MLOG_FILE_NAME:
3424 			case MLOG_FILE_DELETE:
3425 			case MLOG_FILE_CREATE2:
3426 			case MLOG_FILE_RENAME2:
3427 			case MLOG_INDEX_LOAD:
3428 			case MLOG_TRUNCATE:
3429 				/* These were already handled by
3430 				recv_parse_log_rec() and
3431 				recv_parse_or_apply_log_rec_body(). */
3432 				break;
3433 			default:
3434 				switch (store) {
3435 				case STORE_NO:
3436 					break;
3437 				case STORE_IF_EXISTS:
3438 					if (fil_space_get_flags(space)
3439 					    == ULINT_UNDEFINED) {
3440 						break;
3441 					}
3442 					/* fall through */
3443 				case STORE_YES:
3444 					recv_add_to_hash_table(
3445 						type, space, page_no,
3446 						body, ptr + len,
3447 						old_lsn,
3448 						new_recovered_lsn);
3449 				}
3450 			}
3451 
3452 			ptr += len;
3453 		}
3454 	}
3455 
3456 	goto loop;
3457 }
3458 
3459 /*******************************************************//**
3460 Adds data from a new log block to the parsing buffer of recv_sys if
3461 recv_sys->parse_start_lsn is non-zero.
3462 @return true if more data added */
3463 static
3464 bool
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3465 recv_sys_add_to_parsing_buf(
3466 /*========================*/
3467 	const byte*	log_block,	/*!< in: log block */
3468 	lsn_t		scanned_lsn)	/*!< in: lsn of how far we were able
3469 					to find data in this log block */
3470 {
3471 	ulint	more_len;
3472 	ulint	data_len;
3473 	ulint	start_offset;
3474 	ulint	end_offset;
3475 
3476 	ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
3477 
3478 	if (!recv_sys->parse_start_lsn) {
3479 		/* Cannot start parsing yet because no start point for
3480 		it found */
3481 
3482 		return(false);
3483 	}
3484 
3485 	data_len = log_block_get_data_len(log_block);
3486 
3487 	if (recv_sys->parse_start_lsn >= scanned_lsn) {
3488 
3489 		return(false);
3490 
3491 	} else if (recv_sys->scanned_lsn >= scanned_lsn) {
3492 
3493 		return(false);
3494 
3495 	} else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
3496 		more_len = (ulint) (scanned_lsn - recv_sys->parse_start_lsn);
3497 	} else {
3498 		more_len = (ulint) (scanned_lsn - recv_sys->scanned_lsn);
3499 	}
3500 
3501 	if (more_len == 0) {
3502 
3503 		return(false);
3504 	}
3505 
3506 	ut_ad(data_len >= more_len);
3507 
3508 	start_offset = data_len - more_len;
3509 
3510 	if (start_offset < LOG_BLOCK_HDR_SIZE) {
3511 		start_offset = LOG_BLOCK_HDR_SIZE;
3512 	}
3513 
3514 	end_offset = data_len;
3515 
3516 	if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
3517 		end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
3518 	}
3519 
3520 	ut_ad(start_offset <= end_offset);
3521 
3522 	if (start_offset < end_offset) {
3523 		ut_memcpy(recv_sys->buf + recv_sys->len,
3524 			  log_block + start_offset, end_offset - start_offset);
3525 
3526 		recv_sys->len += end_offset - start_offset;
3527 
3528 		ut_a(recv_sys->len <= RECV_PARSING_BUF_SIZE);
3529 	}
3530 
3531 	return(true);
3532 }
3533 
3534 /*******************************************************//**
3535 Moves the parsing buffer data left to the buffer start. */
3536 static
3537 void
recv_sys_justify_left_parsing_buf(void)3538 recv_sys_justify_left_parsing_buf(void)
3539 /*===================================*/
3540 {
3541 	ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset,
3542 		   recv_sys->len - recv_sys->recovered_offset);
3543 
3544 	recv_sys->len -= recv_sys->recovered_offset;
3545 
3546 	recv_sys->recovered_offset = 0;
3547 }
3548 
3549 /*******************************************************//**
3550 Scans log from a buffer and stores new log data to the parsing buffer.
3551 Parses and hashes the log records if new data found.  Unless
3552 UNIV_HOTBACKUP is defined, this function will apply log records
3553 automatically when the hash table becomes full.
3554 @return true if not able to scan any more in this log group */
3555 static
3556 bool
recv_scan_log_recs(ulint available_memory,store_t * store_to_hash,const byte * buf,ulint len,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3557 recv_scan_log_recs(
3558 /*===============*/
3559 	ulint		available_memory,/*!< in: we let the hash table of recs
3560 					to grow to this size, at the maximum */
3561 	store_t*	store_to_hash,	/*!< in,out: whether the records should be
3562 					stored to the hash table; this is reset
3563 					if just debug checking is needed, or
3564 					when the available_memory runs out */
3565 	const byte*	buf,		/*!< in: buffer containing a log
3566 					segment or garbage */
3567 	ulint		len,		/*!< in: buffer length */
3568 	lsn_t		checkpoint_lsn,	/*!< in: latest checkpoint LSN */
3569 	lsn_t		start_lsn,	/*!< in: buffer start lsn */
3570 	lsn_t*		contiguous_lsn,	/*!< in/out: it is known that all log
3571 					groups contain contiguous log data up
3572 					to this lsn */
3573 	lsn_t*		group_scanned_lsn)/*!< out: scanning succeeded up to
3574 					this lsn */
3575 {
3576 	const byte*	log_block	= buf;
3577 	ulint		no;
3578 	lsn_t		scanned_lsn	= start_lsn;
3579 	bool		finished	= false;
3580 	ulint		data_len;
3581 	bool		more_data	= false;
3582 	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3583 
3584 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3585 	ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
3586 	ut_ad(len >= OS_FILE_LOG_BLOCK_SIZE);
3587 
3588 	do {
3589 		ut_ad(!finished);
3590 		no = log_block_get_hdr_no(log_block);
3591 		ulint expected_no = log_block_convert_lsn_to_no(scanned_lsn);
3592 		if (no != expected_no) {
3593 			/* Garbage or an incompletely written log block.
3594 
3595 			We will not report any error, because this can
3596 			happen when InnoDB was killed while it was
3597 			writing redo log. We simply treat this as an
3598 			abrupt end of the redo log. */
3599 			finished = true;
3600 			break;
3601 		}
3602 
3603 		if (!log_block_checksum_is_ok(log_block)) {
3604 			ib::error() << "Log block " << no <<
3605 				" at lsn " << scanned_lsn << " has valid"
3606 				" header, but checksum field contains "
3607 				<< log_block_get_checksum(log_block)
3608 				<< ", should be "
3609 				<< log_block_calc_checksum(log_block);
3610 			/* Garbage or an incompletely written log block.
3611 
3612 			This could be the result of killing the server
3613 			while it was writing this log block. We treat
3614 			this as an abrupt end of the redo log. */
3615 			finished = true;
3616 			break;
3617 		}
3618 
3619 		if (log_block_get_flush_bit(log_block)) {
3620 			/* This block was a start of a log flush operation:
3621 			we know that the previous flush operation must have
3622 			been completed for all log groups before this block
3623 			can have been flushed to any of the groups. Therefore,
3624 			we know that log data is contiguous up to scanned_lsn
3625 			in all non-corrupt log groups. */
3626 
3627 			if (scanned_lsn > *contiguous_lsn) {
3628 				*contiguous_lsn = scanned_lsn;
3629 			}
3630 		}
3631 
3632 		data_len = log_block_get_data_len(log_block);
3633 
3634 		if (scanned_lsn + data_len > recv_sys->scanned_lsn
3635 		    && log_block_get_checkpoint_no(log_block)
3636 		    < recv_sys->scanned_checkpoint_no
3637 		    && (recv_sys->scanned_checkpoint_no
3638 			- log_block_get_checkpoint_no(log_block)
3639 			> 0x80000000UL)) {
3640 
3641 			/* Garbage from a log buffer flush which was made
3642 			before the most recent database recovery */
3643 			finished = true;
3644 			break;
3645 		}
3646 
3647 		if (!recv_sys->parse_start_lsn
3648 		    && (log_block_get_first_rec_group(log_block) > 0)) {
3649 
3650 			/* We found a point from which to start the parsing
3651 			of log records */
3652 
3653 			recv_sys->parse_start_lsn = scanned_lsn
3654 				+ log_block_get_first_rec_group(log_block);
3655 			recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
3656 			recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
3657 		}
3658 
3659 		scanned_lsn += data_len;
3660 
3661 		if (scanned_lsn > recv_sys->scanned_lsn) {
3662 
3663 			/* We have found more entries. If this scan is
3664 			of startup type, we must initiate crash recovery
3665 			environment before parsing these log records. */
3666 
3667 #ifndef UNIV_HOTBACKUP
3668 			if (!recv_needed_recovery) {
3669 
3670 				if (!srv_read_only_mode) {
3671 					ib::info() << "Log scan progressed"
3672 						" past the checkpoint lsn "
3673 						<< recv_sys->scanned_lsn;
3674 
3675 					recv_init_crash_recovery();
3676 				} else {
3677 
3678 					ib::warn() << "Recovery skipped,"
3679 						" --innodb-read-only set!";
3680 
3681 					return(true);
3682 				}
3683 			}
3684 #endif /* !UNIV_HOTBACKUP */
3685 
3686 			/* We were able to find more log data: add it to the
3687 			parsing buffer if parse_start_lsn is already
3688 			non-zero */
3689 
3690 			DBUG_EXECUTE_IF(
3691 				"reduce_recv_parsing_buf",
3692 				recv_parsing_buf_size
3693 					= (70 * 1024);
3694 				);
3695 
3696 			if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE
3697 			    >= recv_parsing_buf_size) {
3698 				ib::error() << "Log parsing buffer overflow."
3699 					" Recovery may have failed!";
3700 
3701 				recv_sys->set_corrupt_log();
3702 
3703 #ifndef UNIV_HOTBACKUP
3704 				if (!srv_force_recovery) {
3705 					ib::error()
3706 						<< "Set innodb_force_recovery"
3707 						" to ignore this error.";
3708 					return(true);
3709 				}
3710 #endif /* !UNIV_HOTBACKUP */
3711 
3712 			} else if (!recv_sys->found_corrupt_log) {
3713 				more_data = recv_sys_add_to_parsing_buf(
3714 					log_block, scanned_lsn);
3715 			}
3716 
3717 			recv_sys->scanned_lsn = scanned_lsn;
3718 			recv_sys->scanned_checkpoint_no
3719 				= log_block_get_checkpoint_no(log_block);
3720 		}
3721 
3722 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3723 			/* Log data for this group ends here */
3724 			finished = true;
3725 			break;
3726 		} else {
3727 			log_block += OS_FILE_LOG_BLOCK_SIZE;
3728 		}
3729 	} while (log_block < buf + len);
3730 
3731 	*group_scanned_lsn = scanned_lsn;
3732 
3733 	if (recv_needed_recovery
3734 	    || (recv_is_from_backup && !recv_is_making_a_backup)) {
3735 		recv_scan_print_counter++;
3736 
3737 		if (finished || (recv_scan_print_counter % 80 == 0)) {
3738 
3739 			ib::info() << "Doing recovery: scanned up to"
3740 				" log sequence number " << scanned_lsn;
3741 		}
3742 	}
3743 
3744 	if (more_data && !recv_sys->found_corrupt_log) {
3745 		/* Try to parse more log records */
3746 
3747 		if (recv_parse_log_recs(checkpoint_lsn,
3748 					*store_to_hash)) {
3749 			ut_ad(recv_sys->found_corrupt_log
3750 			      || recv_sys->found_corrupt_fs
3751 			      || recv_sys->mlog_checkpoint_lsn
3752 			      == recv_sys->recovered_lsn);
3753 			return(true);
3754 		}
3755 
3756 		if (*store_to_hash != STORE_NO
3757 		    && mem_heap_get_size(recv_sys->heap) > available_memory) {
3758 			*store_to_hash = STORE_NO;
3759 		}
3760 
3761 		if (recv_sys->recovered_offset > recv_parsing_buf_size / 4) {
3762 			/* Move parsing buffer data to the buffer start */
3763 
3764 			recv_sys_justify_left_parsing_buf();
3765 		}
3766 	}
3767 
3768 	return(finished);
3769 }
3770 
3771 #ifndef UNIV_HOTBACKUP
3772 /** Scans log from a buffer and stores new log data to the parsing buffer.
3773 Parses and hashes the log records if new data found.
3774 @param[in,out]	group			log group
3775 @param[in,out]	contiguous_lsn		log sequence number
3776 until which all redo log has been scanned
3777 @param[in]	last_phase		whether changes
3778 can be applied to the tablespaces
3779 @return whether rescan is needed (not everything was stored) */
3780 static
3781 bool
recv_group_scan_log_recs(log_group_t * group,lsn_t * contiguous_lsn,bool last_phase)3782 recv_group_scan_log_recs(
3783 	log_group_t*	group,
3784 	lsn_t*		contiguous_lsn,
3785 	bool		last_phase)
3786 {
3787 	DBUG_ENTER("recv_group_scan_log_recs");
3788 	assert(!last_phase || recv_sys->mlog_checkpoint_lsn > 0);
3789 
3790 	mutex_enter(&recv_sys->mutex);
3791 	recv_sys->len = 0;
3792 	recv_sys->recovered_offset = 0;
3793 	recv_sys->n_addrs = 0;
3794 	recv_sys_empty_hash();
3795 	srv_start_lsn = *contiguous_lsn;
3796 	recv_sys->parse_start_lsn = *contiguous_lsn;
3797 	recv_sys->scanned_lsn = *contiguous_lsn;
3798 	recv_sys->recovered_lsn = *contiguous_lsn;
3799 	recv_sys->scanned_checkpoint_no = 0;
3800 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3801 	recv_previous_parsed_rec_offset	= 0;
3802 	recv_previous_parsed_rec_is_multi = 0;
3803 	ut_ad(recv_max_page_lsn == 0);
3804 	mutex_exit(&recv_sys->mutex);
3805 
3806 	lsn_t	checkpoint_lsn	= *contiguous_lsn;
3807 	lsn_t	start_lsn;
3808 	lsn_t	end_lsn;
3809 	store_t	store_to_hash	= last_phase ? STORE_IF_EXISTS : STORE_YES;
3810 	ulint	available_mem	= UNIV_PAGE_SIZE
3811 		* (buf_pool_get_n_pages()
3812 		   - (recv_n_pool_free_frames * srv_buf_pool_instances));
3813 
3814 	end_lsn = *contiguous_lsn = ut_uint64_align_down(
3815 		*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3816 
3817 	do {
3818 		if (last_phase && store_to_hash == STORE_NO) {
3819 			store_to_hash = STORE_IF_EXISTS;
3820 			/* We must not allow change buffer
3821 			merge here, because it would generate
3822 			redo log records before we have
3823 			finished the redo log scan. */
3824 			recv_apply_hashed_log_recs(FALSE);
3825 		}
3826 
3827 		start_lsn = end_lsn;
3828 		end_lsn += RECV_SCAN_SIZE;
3829 
3830 		log_group_read_log_seg(
3831 			log_sys->buf, group, start_lsn, end_lsn, false);
3832 	} while (!recv_scan_log_recs(
3833 			 available_mem, &store_to_hash, log_sys->buf,
3834 			 RECV_SCAN_SIZE,
3835 			 checkpoint_lsn,
3836 			 start_lsn, contiguous_lsn, &group->scanned_lsn));
3837 
3838 	if (recv_sys->found_corrupt_log || recv_sys->found_corrupt_fs) {
3839 		DBUG_RETURN(false);
3840 	}
3841 
3842 	DBUG_PRINT("ib_log", ("%s " LSN_PF
3843 			      " completed for log group " ULINTPF,
3844 			      last_phase ? "rescan" : "scan",
3845 			      group->scanned_lsn, group->id));
3846 
3847 	DBUG_RETURN(store_to_hash == STORE_NO);
3848 }
3849 
3850 /*******************************************************//**
3851 Initialize crash recovery environment. Can be called iff
3852 recv_needed_recovery == false. */
3853 static
3854 void
recv_init_crash_recovery(void)3855 recv_init_crash_recovery(void)
3856 {
3857 	ut_ad(!srv_read_only_mode);
3858 	ut_a(!recv_needed_recovery);
3859 
3860 	recv_needed_recovery = true;
3861 }
3862 
3863 /** Report a missing tablespace for which page-redo log exists.
3864 @param[in]	err	previous error code
3865 @param[in]	i	tablespace descriptor
3866 @return new error code */
3867 static
3868 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3869 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3870 {
3871 	if (srv_force_recovery == 0) {
3872 		ib::error() << "Tablespace " << i->first << " was not"
3873 			" found at " << i->second.name << ".";
3874 
3875 		if (err == DB_SUCCESS) {
3876 			ib::error() << "Set innodb_force_recovery=1 to"
3877 				" ignore this and to permanently lose"
3878 				" all changes to the tablespace.";
3879 			err = DB_TABLESPACE_NOT_FOUND;
3880 		}
3881 	} else {
3882 		ib::warn() << "Tablespace " << i->first << " was not"
3883 			" found at " << i->second.name << ", and"
3884 			" innodb_force_recovery was set. All redo log"
3885 			" for this tablespace will be ignored!";
3886 	}
3887 
3888 	return(err);
3889 }
3890 
3891 /** Report a missing mlog_file_name or mlog_file_delete record for
3892 the tablespace.
3893 @param[in]	recv_addr	Hashed page file address. */
3894 static
3895 void
recv_init_missing_mlog(recv_addr_t * recv_addr)3896 recv_init_missing_mlog(
3897 	recv_addr_t*	recv_addr)
3898 {
3899 	ulint	space_id = recv_addr->space;
3900 	ulint	page_no = recv_addr->page_no;
3901 	ulint	type = UT_LIST_GET_FIRST(recv_addr->rec_list)->type;
3902 	ulint	start_lsn = UT_LIST_GET_FIRST(recv_addr->rec_list)->start_lsn;
3903 
3904 	ib::fatal() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE "
3905 		"for redo log record " << type << " (page "
3906 		<< space_id << ":" << page_no << ") at "
3907 		<< start_lsn;
3908 }
3909 
3910 /** Check if all tablespaces were found for crash recovery.
3911 @return error code or DB_SUCCESS */
3912 static MY_ATTRIBUTE((warn_unused_result))
3913 dberr_t
recv_init_crash_recovery_spaces(void)3914 recv_init_crash_recovery_spaces(void)
3915 {
3916 	typedef std::set<ulint>	space_set_t;
3917 	bool		flag_deleted	= false;
3918 	space_set_t	missing_spaces;
3919 
3920 	ut_ad(!srv_read_only_mode);
3921 	ut_ad(recv_needed_recovery);
3922 
3923 	ib::info() << "Database was not shutdown normally!";
3924 	ib::info() << "Starting crash recovery.";
3925 
3926 	for (recv_spaces_t::iterator i = recv_spaces.begin();
3927 	     i != recv_spaces.end(); i++) {
3928 		ut_ad(!is_predefined_tablespace(i->first));
3929 
3930 		if (i->second.deleted) {
3931 			/* The tablespace was deleted,
3932 			so we can ignore any redo log for it. */
3933 			flag_deleted = true;
3934 		} else if (i->second.space != NULL) {
3935 			/* The tablespace was found, and there
3936 			are some redo log records for it. */
3937 			fil_names_dirty(i->second.space);
3938 		} else {
3939 			missing_spaces.insert(i->first);
3940 			flag_deleted = true;
3941 		}
3942 	}
3943 
3944 	if (flag_deleted) {
3945 		dberr_t err = DB_SUCCESS;
3946 
3947 		for (ulint h = 0;
3948 		     h < hash_get_n_cells(recv_sys->addr_hash);
3949 		     h++) {
3950 			for (recv_addr_t* recv_addr
3951 				     = static_cast<recv_addr_t*>(
3952 					     HASH_GET_FIRST(
3953 						     recv_sys->addr_hash, h));
3954 			     recv_addr != 0;
3955 			     recv_addr = static_cast<recv_addr_t*>(
3956 				     HASH_GET_NEXT(addr_hash, recv_addr))) {
3957 				const ulint space = recv_addr->space;
3958 
3959 				if (is_predefined_tablespace(space)) {
3960 					continue;
3961 				}
3962 
3963 				recv_spaces_t::iterator i
3964 					= recv_spaces.find(space);
3965 
3966 				if (i == recv_spaces.end()) {
3967 					recv_init_missing_mlog(recv_addr);
3968 					recv_addr->state = RECV_DISCARDED;
3969 					continue;
3970 				}
3971 
3972 				if (i->second.deleted) {
3973 					ut_ad(missing_spaces.find(space)
3974 					      == missing_spaces.end());
3975 					recv_addr->state = RECV_DISCARDED;
3976 					continue;
3977 				}
3978 
3979 				space_set_t::iterator m = missing_spaces.find(
3980 					space);
3981 
3982 				if (m != missing_spaces.end()) {
3983 					missing_spaces.erase(m);
3984 					err = recv_init_missing_space(err, i);
3985 					recv_addr->state = RECV_DISCARDED;
3986 					/* All further redo log for this
3987 					tablespace should be removed. */
3988 					i->second.deleted = true;
3989 				}
3990 			}
3991 		}
3992 
3993 		if (err != DB_SUCCESS) {
3994 			return(err);
3995 		}
3996 	}
3997 
3998 	for (space_set_t::const_iterator m = missing_spaces.begin();
3999 	     m != missing_spaces.end(); m++) {
4000 		recv_spaces_t::iterator i = recv_spaces.find(*m);
4001 		ut_ad(i != recv_spaces.end());
4002 
4003 		ib::info() << "Tablespace " << i->first
4004 			<< " was not found at '" << i->second.name
4005 			<< "', but there were no modifications either.";
4006 	}
4007 
4008 	buf_dblwr_process();
4009 
4010 	return(DB_SUCCESS);
4011 }
4012 
4013 /** Start recovering from a redo log checkpoint.
4014 @see recv_recovery_from_checkpoint_finish
4015 @param[in]	flush_lsn	FIL_PAGE_FILE_FLUSH_LSN
4016 of first system tablespace page
4017 @return error code or DB_SUCCESS */
4018 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)4019 recv_recovery_from_checkpoint_start(
4020 	lsn_t	flush_lsn)
4021 {
4022 	log_group_t*	group;
4023 	log_group_t*	max_cp_group;
4024 	ulint		max_cp_field;
4025 	lsn_t		checkpoint_lsn;
4026 	bool		rescan;
4027 	ib_uint64_t	checkpoint_no;
4028 	lsn_t		contiguous_lsn;
4029 	byte*		buf;
4030 	byte*		log_hdr_buf;
4031 	dberr_t		err;
4032 
4033 	/* Initialize red-black tree for fast insertions into the
4034 	flush_list during recovery process. */
4035 	buf_flush_init_flush_rbt();
4036 
4037 	if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
4038 
4039 		ib::info() << "The user has set SRV_FORCE_NO_LOG_REDO on,"
4040 			" skipping log redo";
4041 
4042 		srv_init_log_online();
4043 
4044 		return(DB_SUCCESS);
4045 	}
4046 
4047 	recv_recovery_on = true;
4048 
4049 	log_mutex_enter();
4050 
4051 	/* Look for the latest checkpoint from any of the log groups */
4052 
4053 	err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field);
4054 
4055 	if (err != DB_SUCCESS) {
4056 
4057 		log_mutex_exit();
4058 
4059 		return(err);
4060 	}
4061 
4062 	log_group_header_read(max_cp_group, max_cp_field);
4063 
4064 	buf = log_sys->checkpoint_buf;
4065 
4066 	checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
4067 	checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
4068 
4069 	/* Read the first log file header to print a note if this is
4070 	a recovery from a restored InnoDB Hot Backup */
4071 
4072 	const page_id_t	page_id(max_cp_group->space_id, 0);
4073 
4074 	byte* log_hdr_buf_unalign = static_cast<byte*>(ut_malloc_nokey(
4075 				LOG_FILE_HDR_SIZE
4076 				+ MAX_SRV_LOG_WRITE_AHEAD_SIZE));
4077 	log_hdr_buf = static_cast<byte*>(ut_align(
4078 				log_hdr_buf_unalign,
4079 				MAX_SRV_LOG_WRITE_AHEAD_SIZE));
4080 
4081 	fil_io(IORequestLogRead, true, page_id, univ_page_size, 0,
4082 	       LOG_FILE_HDR_SIZE, log_hdr_buf, max_cp_group);
4083 
4084 	if (0 == ut_memcmp(log_hdr_buf + LOG_HEADER_CREATOR,
4085 			   (byte*)"ibbackup", (sizeof "ibbackup") - 1)) {
4086 
4087 		if (srv_read_only_mode) {
4088 			log_mutex_exit();
4089 
4090 			ib::error() << "Cannot restore from mysqlbackup,"
4091 				" InnoDB running in read-only mode!";
4092 
4093 			ut_free(log_hdr_buf_unalign);
4094 			return(DB_ERROR);
4095 		}
4096 
4097 		/* This log file was created by mysqlbackup --restore: print
4098 		a note to the user about it */
4099 
4100 		ib::info() << "The log file was created by mysqlbackup"
4101 			" --apply-log at "
4102 			<< log_hdr_buf + LOG_HEADER_CREATOR
4103 			<< ". The following crash recovery is part of a"
4104 			" normal restore.";
4105 
4106 		/* Replace the label. */
4107 		ut_ad(LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR
4108 		      >= sizeof LOG_HEADER_CREATOR_CURRENT);
4109 		memset(log_hdr_buf + LOG_HEADER_CREATOR, 0,
4110 		       LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR);
4111 		strcpy(reinterpret_cast<char*>(log_hdr_buf)
4112 		       + LOG_HEADER_CREATOR, LOG_HEADER_CREATOR_CURRENT);
4113 
4114 		/* Write to the log file to wipe over the label */
4115 		fil_io(IORequestLogWrite, true, page_id,
4116 		       univ_page_size, 0, OS_FILE_LOG_BLOCK_SIZE, log_hdr_buf,
4117 		       max_cp_group);
4118 	}
4119 
4120 	ut_free(log_hdr_buf_unalign);
4121 
4122 	/* Start reading the log groups from the checkpoint lsn up. The
4123 	variable contiguous_lsn contains an lsn up to which the log is
4124 	known to be contiguously written to all log groups. */
4125 
4126 	recv_sys->mlog_checkpoint_lsn = 0;
4127 
4128 	ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size);
4129 
4130 	ut_ad(UT_LIST_GET_LEN(log_sys->log_groups) == 1);
4131 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
4132 
4133 	ut_ad(recv_sys->n_addrs == 0);
4134 	contiguous_lsn = checkpoint_lsn;
4135 	switch (group->format) {
4136 	case 0:
4137 		log_mutex_exit();
4138 		err = recv_log_format_0_recover(checkpoint_lsn);
4139 		if (err == DB_SUCCESS) {
4140 			buf_parallel_dblwr_finish_recovery();
4141 			buf_parallel_dblwr_delete();
4142 		}
4143 		return(err);
4144 	case LOG_HEADER_FORMAT_CURRENT:
4145 		break;
4146 	default:
4147 		ut_ad(0);
4148 		recv_sys->set_corrupt_log();
4149 		log_mutex_exit();
4150 		return(DB_ERROR);
4151 	}
4152 
4153 	/** Scan the redo log from checkpoint lsn and redo log to
4154 	the hash table. */
4155 	rescan = recv_group_scan_log_recs(group, &contiguous_lsn, false);
4156 
4157 
4158 	if ((recv_sys->found_corrupt_log && !srv_force_recovery)
4159 	    || recv_sys->found_corrupt_fs) {
4160 		log_mutex_exit();
4161 		return(DB_ERROR);
4162 	}
4163 
4164 	if (recv_sys->mlog_checkpoint_lsn == 0) {
4165 		if (!srv_read_only_mode
4166 		    && group->scanned_lsn != checkpoint_lsn) {
4167 			ib::error() << "Ignoring the redo log due to missing"
4168 				" MLOG_CHECKPOINT between the checkpoint "
4169 				<< checkpoint_lsn << " and the end "
4170 				<< group->scanned_lsn << ".";
4171 			if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
4172 				log_mutex_exit();
4173 				return(DB_ERROR);
4174 			}
4175 		}
4176 
4177 		group->scanned_lsn = checkpoint_lsn;
4178 		rescan = false;
4179 	}
4180 
4181 	/* NOTE: we always do a 'recovery' at startup, but only if
4182 	there is something wrong we will print a message to the
4183 	user about recovery: */
4184 
4185 	if (checkpoint_lsn != flush_lsn) {
4186 
4187 		if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
4188 			ib::warn() << " Are you sure you are using the"
4189 				" right ib_logfiles to start up the database?"
4190 				" Log sequence number in the ib_logfiles is "
4191 				<< checkpoint_lsn << ", less than the"
4192 				" log sequence number in the first system"
4193 				" tablespace file header, " << flush_lsn << ".";
4194 		}
4195 
4196 		if (!recv_needed_recovery) {
4197 
4198 			ib::info() << "The log sequence number " << flush_lsn
4199 				<< " in the system tablespace does not match"
4200 				" the log sequence number " << checkpoint_lsn
4201 				<< " in the ib_logfiles!";
4202 
4203 			if (srv_read_only_mode) {
4204 				ib::error() << "Can't initiate database"
4205 					" recovery, running in read-only-mode.";
4206 				log_mutex_exit();
4207 				return(DB_READ_ONLY);
4208 			}
4209 
4210 			recv_init_crash_recovery();
4211 		}
4212 	}
4213 
4214 	log_sys->lsn = recv_sys->recovered_lsn;
4215 
4216 	if (recv_needed_recovery) {
4217 		err = recv_init_crash_recovery_spaces();
4218 
4219 		if (err != DB_SUCCESS) {
4220 			log_mutex_exit();
4221 			return(err);
4222 		}
4223 
4224 		if (rescan) {
4225 			contiguous_lsn = checkpoint_lsn;
4226 			recv_group_scan_log_recs(group, &contiguous_lsn, true);
4227 
4228 			if ((recv_sys->found_corrupt_log
4229 			     && !srv_force_recovery)
4230 			    || recv_sys->found_corrupt_fs) {
4231 				log_mutex_exit();
4232 				return(DB_ERROR);
4233 			}
4234 		}
4235 	} else {
4236 		buf_parallel_dblwr_finish_recovery();
4237 		buf_parallel_dblwr_delete();
4238 		ut_ad(!rescan || recv_sys->n_addrs == 0);
4239 	}
4240 
4241 	/* We currently have only one log group */
4242 
4243 	if (group->scanned_lsn < checkpoint_lsn
4244 	    || group->scanned_lsn < recv_max_page_lsn) {
4245 
4246 		ib::error() << "We scanned the log up to " << group->scanned_lsn
4247 			<< ". A checkpoint was at " << checkpoint_lsn << " and"
4248 			" the maximum LSN on a database page was "
4249 			<< recv_max_page_lsn << ". It is possible that the"
4250 			" database is now corrupt!";
4251 	}
4252 
4253 	if (recv_sys->recovered_lsn < checkpoint_lsn) {
4254 		log_mutex_exit();
4255 
4256 		/* No harm in trying to do RO access. */
4257 		if (!srv_read_only_mode) {
4258 			ut_error;
4259 		}
4260 
4261 		return(DB_ERROR);
4262 	}
4263 
4264 	/* Synchronize the uncorrupted log groups to the most up-to-date log
4265 	group; we also copy checkpoint info to groups */
4266 
4267 	log_sys->next_checkpoint_lsn = checkpoint_lsn;
4268 	log_sys->next_checkpoint_no = checkpoint_no + 1;
4269 
4270 	recv_synchronize_groups();
4271 
4272 	if (!recv_needed_recovery) {
4273 		ut_a(checkpoint_lsn == recv_sys->recovered_lsn);
4274 	} else {
4275 		srv_start_lsn = recv_sys->recovered_lsn;
4276 	}
4277 
4278 	ut_memcpy(log_sys->buf, recv_sys->last_block, OS_FILE_LOG_BLOCK_SIZE);
4279 
4280 	log_sys->buf_free = (ulint) log_sys->lsn % OS_FILE_LOG_BLOCK_SIZE;
4281 	log_sys->buf_next_to_write = log_sys->buf_free;
4282 	log_sys->write_lsn = log_sys->lsn;
4283 
4284 	log_sys->last_checkpoint_lsn = checkpoint_lsn;
4285 
4286 	log_mutex_exit();
4287 
4288 	srv_init_log_online();
4289 
4290 	log_mutex_enter();
4291 
4292 	if (!srv_read_only_mode) {
4293 		/* Write a MLOG_CHECKPOINT marker as the first thing,
4294 		before generating any other redo log. */
4295 		fil_names_clear(log_sys->last_checkpoint_lsn, true);
4296 	}
4297 
4298 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
4299 		    log_sys->lsn - log_sys->last_checkpoint_lsn);
4300 
4301 	log_sys->next_checkpoint_no = checkpoint_no + 1;
4302 
4303 	mutex_enter(&recv_sys->mutex);
4304 
4305 	recv_sys->apply_log_recs = TRUE;
4306 
4307 	mutex_exit(&recv_sys->mutex);
4308 
4309 	log_mutex_exit();
4310 
4311 	recv_lsn_checks_on = true;
4312 
4313 	/* The database is now ready to start almost normal processing of user
4314 	transactions: transaction rollbacks and the application of the log
4315 	records in the hash table can be run in background. */
4316 
4317 	return(DB_SUCCESS);
4318 }
4319 
4320 /** Complete recovery from a checkpoint. */
4321 void
recv_recovery_from_checkpoint_finish(void)4322 recv_recovery_from_checkpoint_finish(void)
4323 {
4324 	/* Free the resources of the recovery system */
4325 	recv_recovery_on = false;
4326 
4327 	buf_flush_wait_LRU_batch_end();
4328 
4329 	recv_sys_debug_free();
4330 
4331 	/* Free up the flush_rbt. */
4332 	buf_flush_free_flush_rbt();
4333 
4334 	/* Validate a few system page types that were left uninitialized
4335 	by older versions of MySQL. */
4336 	mtr_t		mtr;
4337 	buf_block_t*	block;
4338 	mtr.start();
4339 	mtr.set_sys_modified();
4340 	/* Bitmap page types will be reset in buf_dblwr_check_block()
4341 	without redo logging. */
4342 	block = buf_page_get(
4343 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
4344 		univ_page_size, RW_X_LATCH, &mtr);
4345 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4346 	/* Already MySQL 3.23.53 initialized FSP_IBUF_TREE_ROOT_PAGE_NO
4347 	to FIL_PAGE_INDEX. No need to reset that one. */
4348 	block = buf_page_get(
4349 		page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
4350 		univ_page_size, RW_X_LATCH, &mtr);
4351 	fil_block_check_type(block, FIL_PAGE_TYPE_TRX_SYS, &mtr);
4352 	block = buf_page_get(
4353 		page_id_t(TRX_SYS_SPACE, FSP_FIRST_RSEG_PAGE_NO),
4354 		univ_page_size, RW_X_LATCH, &mtr);
4355 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4356 	block = buf_page_get(
4357 		page_id_t(TRX_SYS_SPACE, FSP_DICT_HDR_PAGE_NO),
4358 		univ_page_size, RW_X_LATCH, &mtr);
4359 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4360 	mtr.commit();
4361 
4362 	/* Roll back any recovered data dictionary transactions, so
4363 	that the data dictionary tables will be free of any locks.
4364 	The data dictionary latch should guarantee that there is at
4365 	most one data dictionary transaction active at a time. */
4366 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO) {
4367 		trx_rollback_or_clean_recovered(FALSE);
4368 	}
4369 }
4370 
4371 /********************************************************//**
4372 Initiates the rollback of active transactions. */
4373 void
recv_recovery_rollback_active(void)4374 recv_recovery_rollback_active(void)
4375 /*===============================*/
4376 {
4377 	/* Switch latching order checks on in sync0debug.cc, if
4378 	--innodb-sync-debug=true (default) */
4379 	ut_d(sync_check_enable());
4380 
4381 	/* We can't start any (DDL) transactions if UNDO logging
4382 	has been disabled, additionally disable ROLLBACK of recovered
4383 	user transactions. */
4384 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
4385 	    && !srv_read_only_mode) {
4386 
4387 		/* Drop partially created indexes. */
4388 		row_merge_drop_temp_indexes();
4389 		/* Drop temporary tables. */
4390 		row_mysql_drop_temp_tables();
4391 
4392 		/* Drop any auxiliary tables that were not dropped when the
4393 		parent table was dropped. This can happen if the parent table
4394 		was dropped but the server crashed before the auxiliary tables
4395 		were dropped. */
4396 		fts_drop_orphaned_tables();
4397 
4398 		/* Rollback the uncommitted transactions which have no user
4399 		session */
4400 
4401 		trx_rollback_or_clean_is_active = true;
4402 		os_thread_create(trx_rollback_or_clean_all_recovered, 0, 0);
4403 	}
4404 }
4405 
4406 /******************************************************//**
4407 Resets the logs. The contents of log files will be lost! */
4408 void
recv_reset_logs(lsn_t lsn)4409 recv_reset_logs(
4410 /*============*/
4411 	lsn_t		lsn)		/*!< in: reset to this lsn
4412 					rounded up to be divisible by
4413 					OS_FILE_LOG_BLOCK_SIZE, after
4414 					which we add
4415 					LOG_BLOCK_HDR_SIZE */
4416 {
4417 	log_group_t*	group;
4418 
4419 	ut_ad(log_mutex_own());
4420 
4421 	log_sys->lsn = ut_uint64_align_up(lsn, OS_FILE_LOG_BLOCK_SIZE);
4422 
4423 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
4424 
4425 	while (group) {
4426 		group->lsn = log_sys->lsn;
4427 		group->lsn_offset = LOG_FILE_HDR_SIZE;
4428 		group = UT_LIST_GET_NEXT(log_groups, group);
4429 	}
4430 
4431 	log_sys->buf_next_to_write = 0;
4432 	log_sys->write_lsn = log_sys->lsn;
4433 
4434 	log_sys->next_checkpoint_no = 0;
4435 	log_sys->last_checkpoint_lsn = 0;
4436 
4437 	log_sys->tracked_lsn = log_sys->lsn;
4438 
4439 	log_block_init(log_sys->buf, log_sys->lsn);
4440 	log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE);
4441 
4442 	log_sys->buf_free = LOG_BLOCK_HDR_SIZE;
4443 	log_sys->lsn += LOG_BLOCK_HDR_SIZE;
4444 
4445 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
4446 		    (log_sys->lsn - log_sys->last_checkpoint_lsn));
4447 
4448 	log_mutex_exit();
4449 
4450 	/* Reset the checkpoint fields in logs */
4451 
4452 	log_make_checkpoint_at(LSN_MAX, TRUE);
4453 
4454 	log_mutex_enter();
4455 }
4456 #endif /* !UNIV_HOTBACKUP */
4457 
4458 #ifdef UNIV_HOTBACKUP
4459 /******************************************************//**
4460 Creates new log files after a backup has been restored. */
4461 void
recv_reset_log_files_for_backup(const char * log_dir,ulint n_log_files,lsn_t log_file_size,lsn_t lsn)4462 recv_reset_log_files_for_backup(
4463 /*============================*/
4464 	const char*	log_dir,	/*!< in: log file directory path */
4465 	ulint		n_log_files,	/*!< in: number of log files */
4466 	lsn_t		log_file_size,	/*!< in: log file size */
4467 	lsn_t		lsn)		/*!< in: new start lsn, must be
4468 					divisible by OS_FILE_LOG_BLOCK_SIZE */
4469 {
4470 	os_file_t	log_file;
4471 	bool		success;
4472 	byte*		buf;
4473 	ulint		i;
4474 	ulint		log_dir_len;
4475 	char		name[5000];
4476 
4477 	log_dir_len = strlen(log_dir);
4478 	/* full path name of ib_logfile consists of log dir path + basename
4479 	+ number. This must fit in the name buffer.
4480 	*/
4481 	ut_a(log_dir_len + strlen(ib_logfile_basename) + 11  < sizeof(name));
4482 
4483 	buf = (byte*)ut_zalloc_nokey(LOG_FILE_HDR_SIZE +
4484 				     OS_FILE_LOG_BLOCK_SIZE);
4485 
4486 	for (i = 0; i < n_log_files; i++) {
4487 
4488 		sprintf(name, "%s%s%lu", log_dir,
4489 			ib_logfile_basename, (ulong) i);
4490 
4491 		log_file = os_file_create_simple(innodb_log_file_key,
4492 						 name, OS_FILE_CREATE,
4493 						 OS_FILE_READ_WRITE,
4494 						 srv_read_only_mode, &success);
4495 		if (!success) {
4496 			ib::fatal() << "Cannot create " << name << ". Check that"
4497 				" the file does not exist yet.";
4498 		}
4499 
4500 		ib::info() << "Setting log file size to " << log_file_size;
4501 
4502 		success = os_file_set_size(
4503 			name, log_file, log_file_size, srv_read_only_mode);
4504 
4505 		if (!success) {
4506 			ib::fatal() << "Cannot set " << name << " size to "
4507 				<< (long long unsigned)log_file_size;
4508 		}
4509 
4510 		os_file_flush(log_file);
4511 		os_file_close(log_file);
4512 	}
4513 
4514 	/* We pretend there is a checkpoint at lsn + LOG_BLOCK_HDR_SIZE */
4515 
4516 	log_reset_first_header_and_checkpoint(buf, lsn);
4517 
4518 	log_block_init(buf + LOG_FILE_HDR_SIZE, lsn);
4519 	log_block_set_first_rec_group(buf + LOG_FILE_HDR_SIZE,
4520 				      LOG_BLOCK_HDR_SIZE);
4521 	log_block_set_checksum(buf + LOG_FILE_HDR_SIZE,
4522 	log_block_calc_checksum_crc32(buf + LOG_FILE_HDR_SIZE));
4523 
4524 	log_block_set_checksum(buf, log_block_calc_checksum_crc32(buf));
4525 	sprintf(name, "%s%s%lu", log_dir, ib_logfile_basename, (ulong)0);
4526 
4527 	log_file = os_file_create_simple(innodb_log_file_key,
4528 					 name, OS_FILE_OPEN,
4529 					 OS_FILE_READ_WRITE,
4530 					 srv_read_only_mode, &success);
4531 	if (!success) {
4532 		ib::fatal() << "Cannot open " << name << ".";
4533 	}
4534 
4535 	IORequest	request(IORequest::WRITE);
4536 
4537 	dberr_t	err = os_file_write(
4538 		request, name, log_file, buf, 0,
4539 		LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE);
4540 
4541 	ut_a(err == DB_SUCCESS);
4542 
4543 	os_file_flush(log_file);
4544 	os_file_close(log_file);
4545 
4546 	ut_free(buf);
4547 }
4548 #endif /* UNIV_HOTBACKUP */
4549 
4550 /** Find a doublewrite copy of a page.
4551 @param[in]	space_id	tablespace identifier
4552 @param[in]	page_no		page number
4553 @return	page frame
4554 @retval NULL if no page was found */
4555 
4556 const byte*
find_page(ulint space_id,ulint page_no)4557 recv_dblwr_t::find_page(ulint space_id, ulint page_no)
4558 {
4559 	typedef std::vector<const byte*, ut_allocator<const byte*> >
4560 		matches_t;
4561 
4562 	matches_t	matches;
4563 	const byte*	result = 0;
4564 
4565 	for (list::iterator i = pages.begin(); i != pages.end(); ++i) {
4566 		if (page_get_space_id(*i) == space_id
4567 		    && page_get_page_no(*i) == page_no) {
4568 			matches.push_back(*i);
4569 		}
4570 	}
4571 
4572 	if (matches.size() == 1) {
4573 		result = matches[0];
4574 	} else if (matches.size() > 1) {
4575 
4576 		lsn_t max_lsn	= 0;
4577 		lsn_t page_lsn	= 0;
4578 
4579 		for (matches_t::iterator i = matches.begin();
4580 		     i != matches.end();
4581 		     ++i) {
4582 
4583 			page_lsn = mach_read_from_8(*i + FIL_PAGE_LSN);
4584 
4585 			if (page_lsn > max_lsn) {
4586 				max_lsn = page_lsn;
4587 				result = *i;
4588 			}
4589 		}
4590 	}
4591 
4592 	return(result);
4593 }
4594 
4595 /** Decrypt double write buffer pages if system tablespace is
4596 encrypted. This function process only pages from sys_pages list.
4597 Other pages from parallel doublewrite buffer will be decrypted after
4598 tablespace objects are loaded. */
4599 void
decrypt_sys_dblwr_pages()4600 recv_dblwr_t::decrypt_sys_dblwr_pages()
4601 {
4602 	fil_space_t*	space = fil_space_get(TRX_SYS_SPACE);
4603 
4604 	ut_ad(FSP_FLAGS_GET_ENCRYPTION(space->flags));
4605 
4606 	IORequest	decrypt_request;
4607 
4608 	decrypt_request.encryption_key(
4609 			space->encryption_key,
4610 			space->encryption_klen,
4611 			false,
4612 			space->encryption_iv,
4613                         0, 0, NULL, NULL);
4614 
4615 	decrypt_request.encryption_algorithm(
4616 		Encryption::AES);
4617 
4618 	Encryption	encryption(
4619 		decrypt_request.encryption_algorithm());
4620 
4621 	for (list::iterator i = sys_pages.begin(); i != sys_pages.end(); ++i) {
4622 		byte*	page = *i;
4623 
4624 		/* System tablespace encryption key will be used to decrypt the
4625 		page, not the tablespace key of the page. These pages are encrypted
4626 		with system tablespace encryption key. */
4627 		dberr_t	err = encryption.decrypt(
4628 			decrypt_request,
4629 			page, univ_page_size.physical(), NULL,
4630 			univ_page_size.physical());
4631 		ut_a(err == DB_SUCCESS);
4632 	}
4633 }
4634 
4635 #ifndef NDEBUG
4636 /** Return string name of the redo log record type.
4637 @param[in]	type	record log record enum
4638 @return string name of record log record */
4639 const char*
get_mlog_string(mlog_id_t type)4640 get_mlog_string(mlog_id_t type)
4641 {
4642 	switch (type) {
4643 	case MLOG_SINGLE_REC_FLAG:
4644 		return("MLOG_SINGLE_REC_FLAG");
4645 
4646 	case MLOG_1BYTE:
4647 		return("MLOG_1BYTE");
4648 
4649 	case MLOG_2BYTES:
4650 		return("MLOG_2BYTES");
4651 
4652 	case MLOG_4BYTES:
4653 		return("MLOG_4BYTES");
4654 
4655 	case MLOG_8BYTES:
4656 		return("MLOG_8BYTES");
4657 
4658 	case MLOG_REC_INSERT:
4659 		return("MLOG_REC_INSERT");
4660 
4661 	case MLOG_REC_CLUST_DELETE_MARK:
4662 		return("MLOG_REC_CLUST_DELETE_MARK");
4663 
4664 	case MLOG_REC_SEC_DELETE_MARK:
4665 		return("MLOG_REC_SEC_DELETE_MARK");
4666 
4667 	case MLOG_REC_UPDATE_IN_PLACE:
4668 		return("MLOG_REC_UPDATE_IN_PLACE");
4669 
4670 	case MLOG_REC_DELETE:
4671 		return("MLOG_REC_DELETE");
4672 
4673 	case MLOG_LIST_END_DELETE:
4674 		return("MLOG_LIST_END_DELETE");
4675 
4676 	case MLOG_LIST_START_DELETE:
4677 		return("MLOG_LIST_START_DELETE");
4678 
4679 	case MLOG_LIST_END_COPY_CREATED:
4680 		return("MLOG_LIST_END_COPY_CREATED");
4681 
4682 	case MLOG_PAGE_REORGANIZE:
4683 		return("MLOG_PAGE_REORGANIZE");
4684 
4685 	case MLOG_PAGE_CREATE:
4686 		return("MLOG_PAGE_CREATE");
4687 
4688 	case MLOG_UNDO_INSERT:
4689 		return("MLOG_UNDO_INSERT");
4690 
4691 	case MLOG_UNDO_ERASE_END:
4692 		return("MLOG_UNDO_ERASE_END");
4693 
4694 	case MLOG_UNDO_INIT:
4695 		return("MLOG_UNDO_INIT");
4696 
4697 	case MLOG_UNDO_HDR_DISCARD:
4698 		return("MLOG_UNDO_HDR_DISCARD");
4699 
4700 	case MLOG_UNDO_HDR_REUSE:
4701 		return("MLOG_UNDO_HDR_REUSE");
4702 
4703 	case MLOG_UNDO_HDR_CREATE:
4704 		return("MLOG_UNDO_HDR_CREATE");
4705 
4706 	case MLOG_REC_MIN_MARK:
4707 		return("MLOG_REC_MIN_MARK");
4708 
4709 	case MLOG_IBUF_BITMAP_INIT:
4710 		return("MLOG_IBUF_BITMAP_INIT");
4711 
4712 #ifdef UNIV_LOG_LSN_DEBUG
4713 	case MLOG_LSN:
4714 		return("MLOG_LSN");
4715 #endif /* UNIV_LOG_LSN_DEBUG */
4716 
4717 	case MLOG_INIT_FILE_PAGE:
4718 		return("MLOG_INIT_FILE_PAGE");
4719 
4720 	case MLOG_WRITE_STRING:
4721 		return("MLOG_WRITE_STRING");
4722 
4723 	case MLOG_MULTI_REC_END:
4724 		return("MLOG_MULTI_REC_END");
4725 
4726 	case MLOG_DUMMY_RECORD:
4727 		return("MLOG_DUMMY_RECORD");
4728 
4729 	case MLOG_FILE_DELETE:
4730 		return("MLOG_FILE_DELETE");
4731 
4732 	case MLOG_COMP_REC_MIN_MARK:
4733 		return("MLOG_COMP_REC_MIN_MARK");
4734 
4735 	case MLOG_COMP_PAGE_CREATE:
4736 		return("MLOG_COMP_PAGE_CREATE");
4737 
4738 	case MLOG_COMP_REC_INSERT:
4739 		return("MLOG_COMP_REC_INSERT");
4740 
4741 	case MLOG_COMP_REC_CLUST_DELETE_MARK:
4742 		return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4743 
4744 	case MLOG_COMP_REC_SEC_DELETE_MARK:
4745 		return("MLOG_COMP_REC_SEC_DELETE_MARK");
4746 
4747 	case MLOG_COMP_REC_UPDATE_IN_PLACE:
4748 		return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4749 
4750 	case MLOG_COMP_REC_DELETE:
4751 		return("MLOG_COMP_REC_DELETE");
4752 
4753 	case MLOG_COMP_LIST_END_DELETE:
4754 		return("MLOG_COMP_LIST_END_DELETE");
4755 
4756 	case MLOG_COMP_LIST_START_DELETE:
4757 		return("MLOG_COMP_LIST_START_DELETE");
4758 
4759 	case MLOG_COMP_LIST_END_COPY_CREATED:
4760 		return("MLOG_COMP_LIST_END_COPY_CREATED");
4761 
4762 	case MLOG_COMP_PAGE_REORGANIZE:
4763 		return("MLOG_COMP_PAGE_REORGANIZE");
4764 
4765 	case MLOG_FILE_CREATE2:
4766 		return("MLOG_FILE_CREATE2");
4767 
4768 	case MLOG_ZIP_WRITE_NODE_PTR:
4769 		return("MLOG_ZIP_WRITE_NODE_PTR");
4770 
4771 	case MLOG_ZIP_WRITE_BLOB_PTR:
4772 		return("MLOG_ZIP_WRITE_BLOB_PTR");
4773 
4774 	case MLOG_ZIP_WRITE_HEADER:
4775 		return("MLOG_ZIP_WRITE_HEADER");
4776 
4777 	case MLOG_ZIP_PAGE_COMPRESS:
4778 		return("MLOG_ZIP_PAGE_COMPRESS");
4779 
4780 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4781 		return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4782 
4783 	case MLOG_ZIP_PAGE_REORGANIZE:
4784 		return("MLOG_ZIP_PAGE_REORGANIZE");
4785 
4786 	case MLOG_FILE_RENAME2:
4787 		return("MLOG_FILE_RENAME2");
4788 
4789 	case MLOG_FILE_NAME:
4790 		return("MLOG_FILE_NAME");
4791 
4792 	case MLOG_CHECKPOINT:
4793 		return("MLOG_CHECKPOINT");
4794 
4795 	case MLOG_PAGE_CREATE_RTREE:
4796 		return("MLOG_PAGE_CREATE_RTREE");
4797 
4798 	case MLOG_COMP_PAGE_CREATE_RTREE:
4799 		return("MLOG_COMP_PAGE_CREATE_RTREE");
4800 
4801 	case MLOG_INIT_FILE_PAGE2:
4802 		return("MLOG_INIT_FILE_PAGE2");
4803 
4804 	case MLOG_INDEX_LOAD:
4805 		return("MLOG_INDEX_LOAD");
4806 
4807 	case MLOG_TRUNCATE:
4808 		return("MLOG_TRUNCATE");
4809 	}
4810 	assert(0);
4811 	return(NULL);
4812 }
4813 #endif /* !NDEBUG */
4814