1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file log/log0recv.cc
30 Recovery
31 
32 Created 9/20/1997 Heikki Tuuri
33 *******************************************************/
34 
35 #include "ha_prototypes.h"
36 
37 #include <vector>
38 #include <map>
39 #include <string>
40 
41 #include "log0recv.h"
42 
43 #ifdef UNIV_NONINL
44 #include "log0recv.ic"
45 #endif
46 
47 #include <my_aes.h>
48 
49 #include "mem0mem.h"
50 #include "buf0buf.h"
51 #include "buf0flu.h"
52 #include "mtr0mtr.h"
53 #include "mtr0log.h"
54 #include "page0cur.h"
55 #include "page0zip.h"
56 #include "btr0btr.h"
57 #include "btr0cur.h"
58 #include "ibuf0ibuf.h"
59 #include "trx0undo.h"
60 #include "trx0rec.h"
61 #include "fil0fil.h"
62 #include "fsp0sysspace.h"
63 #include "ut0new.h"
64 #include "row0trunc.h"
65 #ifndef UNIV_HOTBACKUP
66 # include "buf0rea.h"
67 # include "srv0srv.h"
68 # include "srv0start.h"
69 # include "trx0roll.h"
70 # include "row0merge.h"
71 #else /* !UNIV_HOTBACKUP */
72 /** This is set to false if the backup was originally taken with the
73 mysqlbackup --include regexp option: then we do not want to create tables in
74 directories which were not included */
75 bool	recv_replay_file_ops	= true;
76 #include "fut0lst.h"
77 #endif /* !UNIV_HOTBACKUP */
78 
79 /** Log records are stored in the hash table in chunks at most of this size;
80 this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */
81 #define RECV_DATA_BLOCK_SIZE	(MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
82 
83 /** Read-ahead area in applying log records to file pages */
84 #define RECV_READ_AHEAD_AREA	32
85 
86 /** The recovery system */
87 recv_sys_t*	recv_sys = NULL;
88 /** TRUE when applying redo log records during crash recovery; FALSE
89 otherwise.  Note that this is FALSE while a background thread is
90 rolling back incomplete transactions. */
91 volatile bool	recv_recovery_on;
92 
93 #ifndef UNIV_HOTBACKUP
94 /** TRUE when recv_init_crash_recovery() has been called. */
95 bool	recv_needed_recovery;
96 #else
97 # define recv_needed_recovery			false
98 # define buf_pool_get_curr_size() (5 * 1024 * 1024)
99 #endif /* !UNIV_HOTBACKUP */
100 # ifdef UNIV_DEBUG
101 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
102 Protected by log_sys->mutex. */
103 bool	recv_no_log_write = false;
104 # endif /* UNIV_DEBUG */
105 
106 /** TRUE if buf_page_is_corrupted() should check if the log sequence
107 number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
108 recv_recovery_from_checkpoint_start(). */
109 bool	recv_lsn_checks_on;
110 
111 /** If the following is TRUE, the buffer pool file pages must be invalidated
112 after recovery and no ibuf operations are allowed; this becomes TRUE if
113 the log record hash table becomes too full, and log records must be merged
114 to file pages already before the recovery is finished: in this case no
115 ibuf operations are allowed, as they could modify the pages read in the
116 buffer pool before the pages have been recovered to the up-to-date state.
117 
118 TRUE means that recovery is running and no operations on the log files
119 are allowed yet: the variable name is misleading. */
120 #ifndef UNIV_HOTBACKUP
121 bool	recv_no_ibuf_operations;
122 /** TRUE when the redo log is being backed up */
123 # define recv_is_making_a_backup		false
124 /** TRUE when recovering from a backed up redo log file */
125 # define recv_is_from_backup			false
126 #else /* !UNIV_HOTBACKUP */
127 /** true if the backup is an offline backup */
128 volatile bool is_online_redo_copy = true;
129 /**true if the last flushed lsn read at the start of backup */
130 volatile lsn_t backup_redo_log_flushed_lsn;
131 
132 /** TRUE when the redo log is being backed up */
133 bool	recv_is_making_a_backup	= false;
134 /** TRUE when recovering from a backed up redo log file */
135 bool	recv_is_from_backup	= false;
136 # define buf_pool_get_curr_size() (5 * 1024 * 1024)
137 #endif /* !UNIV_HOTBACKUP */
138 /** The following counter is used to decide when to print info on
139 log scan */
140 static ulint	recv_scan_print_counter;
141 
142 /** The type of the previous parsed redo log record */
143 static mlog_id_t	recv_previous_parsed_rec_type;
144 /** The offset of the previous parsed redo log record */
145 static ulint	recv_previous_parsed_rec_offset;
146 /** The 'multi' flag of the previous parsed redo log record */
147 static ulint	recv_previous_parsed_rec_is_multi;
148 
149 /** This many frames must be left free in the buffer pool when we scan
150 the log and store the scanned log records in the buffer pool: we will
151 use these free frames to read in pages when we start applying the
152 log records to the database.
153 This is the default value. If the actual size of the buffer pool is
154 larger than 10 MB we'll set this value to 512. */
155 ulint	recv_n_pool_free_frames;
156 
157 /** The maximum lsn we see for a page during the recovery process. If this
158 is bigger than the lsn we are able to scan up to, that is an indication that
159 the recovery failed and the database may be corrupt. */
160 lsn_t	recv_max_page_lsn;
161 
162 #ifdef UNIV_PFS_THREAD
163 mysql_pfs_key_t	trx_rollback_clean_thread_key;
164 #endif /* UNIV_PFS_THREAD */
165 
166 #ifndef UNIV_HOTBACKUP
167 # ifdef UNIV_PFS_THREAD
168 mysql_pfs_key_t	recv_writer_thread_key;
169 # endif /* UNIV_PFS_THREAD */
170 
171 /** Flag indicating if recv_writer thread is active. */
172 volatile bool	recv_writer_thread_active = false;
173 #endif /* !UNIV_HOTBACKUP */
174 
175 #ifndef	NDEBUG
176 /** Return string name of the redo log record type.
177 @param[in]	type	record log record enum
178 @return string name of record log record */
179 const char*
180 get_mlog_string(mlog_id_t type);
181 #endif /* !NDEBUG */
182 
183 /* prototypes */
184 
185 #ifndef UNIV_HOTBACKUP
186 /*******************************************************//**
187 Initialize crash recovery environment. Can be called iff
188 recv_needed_recovery == false. */
189 static
190 void
191 recv_init_crash_recovery(void);
192 /*===========================*/
193 #endif /* !UNIV_HOTBACKUP */
194 
195 /** Tablespace item during recovery */
196 struct file_name_t {
197 	/** Tablespace file name (MLOG_FILE_NAME) */
198 	std::string	name;
199 	/** Tablespace object (NULL if not valid or not found) */
200 	fil_space_t*	space;
201 	/** Whether the tablespace has been deleted */
202 	bool		deleted;
203 
204 	/** Constructor */
file_name_tfile_name_t205 	file_name_t(std::string name_, bool deleted_) :
206 		name(name_), space(NULL), deleted (deleted_) {}
207 };
208 
209 /** Map of dirty tablespaces during recovery */
210 typedef std::map<
211 	ulint,
212 	file_name_t,
213 	std::less<ulint>,
214 	ut_allocator<std::pair<const ulint, file_name_t> > >	recv_spaces_t;
215 
216 static recv_spaces_t	recv_spaces;
217 
218 /** Process a file name from a MLOG_FILE_* record.
219 @param[in,out]	name		file name
220 @param[in]	len		length of the file name
221 @param[in]	space_id	the tablespace ID
222 @param[in]	deleted		whether this is a MLOG_FILE_DELETE record
223 @retval true if able to process file successfully.
224 @retval false if unable to process the file */
225 static
226 bool
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)227 fil_name_process(
228 	char*	name,
229 	ulint	len,
230 	ulint	space_id,
231 	bool	deleted)
232 {
233 	bool	processed = true;
234 
235 	/* We will also insert space=NULL into the map, so that
236 	further checks can ensure that a MLOG_FILE_NAME record was
237 	scanned before applying any page records for the space_id. */
238 
239 	os_normalize_path(name);
240 	file_name_t	fname(std::string(name, len - 1), deleted);
241 	std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
242 		std::make_pair(space_id, fname));
243 	ut_ad(p.first->first == space_id);
244 
245 	file_name_t&	f = p.first->second;
246 
247 	if (deleted) {
248 		/* Got MLOG_FILE_DELETE */
249 
250 		if (!p.second && !f.deleted) {
251 			f.deleted = true;
252 			if (f.space != NULL) {
253 				fil_space_free(space_id, false);
254 				f.space = NULL;
255 			}
256 		}
257 
258 		ut_ad(f.space == NULL);
259 	} else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
260 		   || f.name != fname.name) {
261 		fil_space_t*	space;
262 
263 		/* Check if the tablespace file exists and contains
264 		the space_id. If not, ignore the file after displaying
265 		a note. Abort if there are multiple files with the
266 		same space_id. */
267 		switch (fil_ibd_load(space_id, name, space)) {
268 		case FIL_LOAD_OK:
269 			ut_ad(space != NULL);
270 
271 			/* For encrypted tablespace, set key and iv. */
272 			if (FSP_FLAGS_GET_ENCRYPTION(space->flags)
273 			    && recv_sys->encryption_list != NULL) {
274 				dberr_t				err;
275 				encryption_list_t::iterator	it;
276 
277 				for (it = recv_sys->encryption_list->begin();
278 				     it != recv_sys->encryption_list->end();
279 				     it++) {
280 					if (it->space_id == space->id) {
281 						err = fil_set_encryption(
282 							space->id,
283 							Encryption::AES,
284 							it->key,
285 							it->iv);
286 						if (err != DB_SUCCESS) {
287 							ib::error()
288 								<< "Can't set"
289 								" encryption"
290 								" information"
291 								" for"
292 								" tablespace"
293 								<< space->name
294 								<< "!";
295 						}
296 						ut_free(it->key);
297 						ut_free(it->iv);
298 						it->key = NULL;
299 						it->iv = NULL;
300 						it->space_id = 0;
301 					}
302 				}
303 			}
304 
305 			if (f.space == NULL || f.space == space) {
306 				f.name = fname.name;
307 				f.space = space;
308 				f.deleted = false;
309 			} else {
310 				ib::error() << "Tablespace " << space_id
311 					<< " has been found in two places: '"
312 					<< f.name << "' and '" << name << "'."
313 					" You must delete one of them.";
314 				recv_sys->found_corrupt_fs = true;
315 				processed = false;
316 			}
317 			break;
318 
319 		case FIL_LOAD_ID_CHANGED:
320 			ut_ad(space == NULL);
321 			break;
322 
323 		case FIL_LOAD_NOT_FOUND:
324 			/* No matching tablespace was found; maybe it
325 			was renamed, and we will find a subsequent
326 			MLOG_FILE_* record. */
327 			ut_ad(space == NULL);
328 
329 			if (srv_force_recovery) {
330 				/* Without innodb_force_recovery,
331 				missing tablespaces will only be
332 				reported in
333 				recv_init_crash_recovery_spaces().
334 				Enable some more diagnostics when
335 				forcing recovery. */
336 
337 				ib::info()
338 					<< "At LSN: " << recv_sys->recovered_lsn
339 					<< ": unable to open file " << name
340 					<< " for tablespace " << space_id;
341 			}
342 			break;
343 
344 		case FIL_LOAD_INVALID:
345 			ut_ad(space == NULL);
346 			if (srv_force_recovery == 0) {
347 #ifndef UNIV_HOTBACKUP
348 				ib::warn() << "We do not continue the crash"
349 					" recovery, because the table may"
350 					" become corrupt if we cannot apply"
351 					" the log records in the InnoDB log to"
352 					" it. To fix the problem and start"
353 					" mysqld:";
354 				ib::info() << "1) If there is a permission"
355 					" problem in the file and mysqld"
356 					" cannot open the file, you should"
357 					" modify the permissions.";
358 				ib::info() << "2) If the tablespace is not"
359 					" needed, or you can restore an older"
360 					" version from a backup, then you can"
361 					" remove the .ibd file, and use"
362 					" --innodb_force_recovery=1 to force"
363 					" startup without this file.";
364 				ib::info() << "3) If the file system or the"
365 					" disk is broken, and you cannot"
366 					" remove the .ibd file, you can set"
367 					" --innodb_force_recovery.";
368 				recv_sys->found_corrupt_fs = true;
369 #else
370 				ib::warn() << "We do not continue the apply-log"
371 					" operation because the tablespace may"
372 					" become corrupt if we cannot apply"
373 					" the log records in the redo log"
374 					" records to it.";
375 #endif /* !UNIV_BACKUP  */
376 				processed = false;
377 				break;
378 			}
379 
380 			ib::info() << "innodb_force_recovery was set to "
381 				<< srv_force_recovery << ". Continuing crash"
382 				" recovery even though we cannot access the"
383 				" files for tablespace " << space_id << ".";
384 			break;
385 		}
386 	}
387 	return(processed);
388 }
389 
390 #ifndef UNIV_HOTBACKUP
391 /** Parse or process a MLOG_FILE_* record.
392 @param[in]	ptr		redo log record
393 @param[in]	end		end of the redo log buffer
394 @param[in]	space_id	the tablespace ID
395 @param[in]	first_page_no	first page number in the file
396 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
397 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
398 @return pointer to next redo log record
399 @retval NULL if this log record was truncated */
400 static
401 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type)402 fil_name_parse(
403 	byte*		ptr,
404 	const byte*	end,
405 	ulint		space_id,
406 	ulint		first_page_no,
407 	mlog_id_t	type)
408 {
409 	if (type == MLOG_FILE_CREATE2) {
410 		if (end < ptr + 4) {
411 			return(NULL);
412 		}
413 		ptr += 4;
414 	}
415 
416 	if (end < ptr + 2) {
417 		return(NULL);
418 	}
419 
420 	ulint	len = mach_read_from_2(ptr);
421 	ptr += 2;
422 	if (end < ptr + len) {
423 		return(NULL);
424 	}
425 
426 	/* MLOG_FILE_* records should only be written for
427 	user-created tablespaces. The name must be long enough
428 	and end in .ibd. */
429 	bool corrupt = is_predefined_tablespace(space_id)
430 		|| first_page_no != 0 // TODO: multi-file user tablespaces
431 		|| len < sizeof "/a.ibd\0"
432 		|| memcmp(ptr + len - 5, DOT_IBD, 5) != 0
433 		|| memchr(ptr, OS_PATH_SEPARATOR, len) == NULL;
434 
435 	byte*	end_ptr	= ptr + len;
436 
437 	switch (type) {
438 	default:
439 		ut_ad(0); // the caller checked this
440 	case MLOG_FILE_NAME:
441 		if (corrupt) {
442 			recv_sys->found_corrupt_log = true;
443 			break;
444 		}
445 
446 		fil_name_process(
447 			reinterpret_cast<char*>(ptr), len, space_id, false);
448 		break;
449 	case MLOG_FILE_DELETE:
450 		if (corrupt) {
451 			recv_sys->found_corrupt_log = true;
452 			break;
453 		}
454 
455 		fil_name_process(
456 			reinterpret_cast<char*>(ptr), len, space_id, true);
457 
458 		break;
459 	case MLOG_FILE_CREATE2:
460 		break;
461 	case MLOG_FILE_RENAME2:
462 		if (corrupt) {
463 			recv_sys->found_corrupt_log = true;
464 		}
465 
466 		/* The new name follows the old name. */
467 		byte*	new_name = end_ptr + 2;
468 		if (end < new_name) {
469 			return(NULL);
470 		}
471 
472 		ulint	new_len = mach_read_from_2(end_ptr);
473 
474 		if (end < end_ptr + 2 + new_len) {
475 			return(NULL);
476 		}
477 
478 		end_ptr += 2 + new_len;
479 
480 		corrupt = corrupt
481 			|| new_len < sizeof "/a.ibd\0"
482 			|| memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0
483 			|| !memchr(new_name, OS_PATH_SEPARATOR, new_len);
484 
485 		if (corrupt) {
486 			recv_sys->found_corrupt_log = true;
487 			break;
488 		}
489 
490 		fil_name_process(
491 			reinterpret_cast<char*>(ptr), len,
492 			space_id, false);
493 		fil_name_process(
494 			reinterpret_cast<char*>(new_name), new_len,
495 			space_id, false);
496 
497 		if (!fil_op_replay_rename(
498 			    space_id, first_page_no,
499 			    reinterpret_cast<const char*>(ptr),
500 			    reinterpret_cast<const char*>(new_name))) {
501 			recv_sys->found_corrupt_fs = true;
502 		}
503 	}
504 
505 	return(end_ptr);
506 }
507 #else /* !UNIV_HOTBACKUP */
508 /** Parse a file name retrieved from a MLOG_FILE_* record,
509 and return the absolute file path corresponds to backup dir
510 as well as in the form of database/tablespace
511 @param[in]	file_name		path emitted by the redo log
512 @param[out]	absolute_path	absolute path of tablespace
513 corresponds to backup dir
514 @param[out]	tablespace_name	name in the form of database/table */
515 static
516 void
make_abs_file_path(const std::string & name,std::string & absolute_path,std::string & tablespace_name)517 make_abs_file_path(
518 	const std::string&	name,
519 	std::string&		absolute_path,
520 	std::string&		tablespace_name)
521 {
522 	std::string file_name = name;
523 	std::string path = fil_path_to_mysql_datadir;
524 	size_t pos = std::string::npos;
525 
526 	if (is_absolute_path(file_name.c_str())) {
527 
528 		pos = file_name.rfind(OS_PATH_SEPARATOR);
529 		std::string temp_name = file_name.substr(0, pos);
530 		pos = temp_name.rfind(OS_PATH_SEPARATOR);
531 		++pos;
532 		file_name = file_name.substr(pos, file_name.length());
533 		path += OS_PATH_SEPARATOR + file_name;
534 	} else {
535 		pos = file_name.find(OS_PATH_SEPARATOR);
536 		++pos;
537 		file_name = file_name.substr(pos, file_name.length());
538 		path += OS_PATH_SEPARATOR + file_name;
539 	}
540 
541 	absolute_path = path;
542 
543 	/* remove the .ibd extension */
544 	pos = file_name.rfind(".ibd");
545 	if (pos != std::string::npos)
546 		tablespace_name = file_name.substr(0, pos);
547 
548 	/* space->name uses '/', not OS_PATH_SEPARATOR,
549 	update the seperator */
550 	if (OS_PATH_SEPARATOR != '/') {
551 		pos = tablespace_name.find(OS_PATH_SEPARATOR);
552 		while (pos != std::string::npos) {
553 			tablespace_name[pos] = '/';
554 			pos = tablespace_name.find(OS_PATH_SEPARATOR);
555 		}
556 	}
557 
558 }
559 
560 /** Wrapper around fil_name_process()
561 @param[in]	name		absolute path of tablespace file
562 @param[in]	space_id	the tablespace ID
563 @retval		true		if able to process file successfully.
564 @retval		false		if unable to process the file */
565 bool
fil_name_process(const char * name,ulint space_id)566 fil_name_process(
567 	const char*	name,
568 	ulint	space_id)
569 {
570 	size_t length = strlen(name);
571 	++length;
572 
573 	char* file_name = static_cast<char*>(ut_malloc_nokey(length));
574 	strncpy(file_name, name,length);
575 
576 	bool processed = fil_name_process(file_name, length, space_id, false);
577 
578 	ut_free(file_name);
579 	return(processed);
580 }
581 
582 /** Parse or process a MLOG_FILE_* record.
583 @param[in]	ptr		redo log record
584 @param[in]	end		end of the redo log buffer
585 @param[in]	space_id	the tablespace ID
586 @param[in]	first_page_no	first page number in the file
587 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
588 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
589 @retval	pointer to next redo log record
590 @retval	NULL if this log record was truncated */
591 static
592 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type)593 fil_name_parse(
594 	byte*		ptr,
595 	const byte*	end,
596 	ulint		space_id,
597 	ulint		first_page_no,
598 	mlog_id_t	type)
599 {
600 
601 	ulint flags = mach_read_from_4(ptr);
602 
603 	if (type == MLOG_FILE_CREATE2) {
604 		if (end < ptr + 4) {
605 			return(NULL);
606 		}
607 		ptr += 4;
608 	}
609 
610 	if (end < ptr + 2) {
611 		return(NULL);
612 	}
613 
614 	ulint	len = mach_read_from_2(ptr);
615 	ptr += 2;
616 	if (end < ptr + len) {
617 		return(NULL);
618 	}
619 
620 	os_normalize_path(reinterpret_cast<char*>(ptr));
621 
622 	/* MLOG_FILE_* records should only be written for
623 	user-created tablespaces. The name must be long enough
624 	and end in .ibd. */
625 	bool corrupt = is_predefined_tablespace(space_id)
626 		|| first_page_no != 0 // TODO: multi-file user tablespaces
627 		|| len < sizeof "/a.ibd\0"
628 		|| memcmp(ptr + len - 5, DOT_IBD, 5) != 0
629 		|| memchr(ptr, OS_PATH_SEPARATOR, len) == NULL;
630 
631 	byte*	end_ptr = ptr + len;
632 
633 	if (corrupt) {
634 		recv_sys->found_corrupt_log = true;
635 		return(end_ptr);
636 	}
637 
638 	std::string abs_file_path, tablespace_name;
639 	char* name = reinterpret_cast<char*>(ptr);
640 	char* new_name = NULL;
641 	recv_spaces_t::iterator itr;
642 
643 	make_abs_file_path(name, abs_file_path, tablespace_name);
644 
645 	if (!recv_is_making_a_backup) {
646 
647 		name = static_cast<char*>(ut_malloc_nokey(
648 			(abs_file_path.length() + 1)));
649 		strcpy(name, abs_file_path.c_str());
650 		len = strlen(name) + 1;
651 	}
652 	switch (type) {
653 	default:
654 		ut_ad(0); // the caller checked this
655 	case MLOG_FILE_NAME:
656 		/* Don't validate tablespaces while copying redo logs
657 		because backup process might keep some tablespace handles
658 		open in server datadir.
659 		Maintain "map of dirty tablespaces" so that assumptions
660 		for other redo log records are not broken even for dirty
661 		tablespaces during apply log */
662 		if (!recv_is_making_a_backup) {
663 			recv_spaces.insert(std::make_pair(space_id,
664 						file_name_t(abs_file_path,
665 						false)));
666 		}
667 		break;
668 	case MLOG_FILE_DELETE:
669 		/* Don't validate tablespaces while copying redo logs
670 		because backup process might keep some tablespace handles
671 		open in server datadir. */
672 		if (recv_is_making_a_backup)
673 			break;
674 
675 		fil_name_process(
676 			name, len, space_id, true);
677 
678 		if (recv_replay_file_ops
679 			&& fil_space_get(space_id)) {
680 			dberr_t	err = fil_delete_tablespace(
681 				space_id, BUF_REMOVE_FLUSH_NO_WRITE);
682 			ut_a(err == DB_SUCCESS);
683 		}
684 
685 		break;
686 	case MLOG_FILE_CREATE2:
687 		if (recv_is_making_a_backup
688 		    || (!recv_replay_file_ops)
689 		    || (is_intermediate_file(abs_file_path.c_str()))
690 		    || (fil_space_get(space_id))
691 		    || (fil_space_get_id_by_name(
692 				tablespace_name.c_str()) != ULINT_UNDEFINED)) {
693 			/* Don't create table while :-
694 			1. scanning the redo logs during backup
695 			2. apply-log on a partial backup
696 			3. if it is intermediate file
697 			4. tablespace is already loaded in memory */
698 		} else {
699 			itr = recv_spaces.find(space_id);
700 			if (itr == recv_spaces.end()
701 				|| (itr->second.name != abs_file_path)) {
702 
703 				dberr_t ret = fil_ibd_create(
704 					space_id, tablespace_name.c_str(),
705 					abs_file_path.c_str(),
706 					flags, FIL_IBD_FILE_INITIAL_SIZE);
707 
708 				if (ret != DB_SUCCESS) {
709 					ib::fatal() << "Could not create the"
710 						<< " tablespace : "
711 						<< abs_file_path
712 						<< " with space Id : "
713 						<< space_id;
714 				}
715 			}
716 		}
717 		break;
718 	case MLOG_FILE_RENAME2:
719 		/* The new name follows the old name. */
720 		byte*	new_table_name = end_ptr + 2;
721 		if (end < new_table_name) {
722 			return(NULL);
723 		}
724 
725 		ulint	new_len = mach_read_from_2(end_ptr);
726 
727 		if (end < end_ptr + 2 + new_len) {
728 			return(NULL);
729 		}
730 
731 		end_ptr += 2 + new_len;
732 
733 		char* new_table = reinterpret_cast<char*>(new_table_name);
734 		os_normalize_path(new_table);
735 
736 		corrupt = corrupt
737 			|| new_len < sizeof "/a.ibd\0"
738 			|| memcmp(new_table_name + new_len - 5, DOT_IBD, 5) != 0
739 			|| !memchr(new_table_name, OS_PATH_SEPARATOR, new_len);
740 
741 		if (corrupt) {
742 			recv_sys->found_corrupt_log = true;
743 			break;
744 		}
745 
746 		if (recv_is_making_a_backup
747 		    || (!recv_replay_file_ops)
748 		    || (is_intermediate_file(name))
749 		    || (is_intermediate_file(new_table))) {
750 			/* Don't rename table while :-
751 			1. scanning the redo logs during backup
752 			2. apply-log on a partial backup
753 			3. The new name is already used.
754 			4. A tablespace is not open in memory with the old name.
755 			This will prevent unintended renames during recovery. */
756 			break;
757 		} else {
758 			make_abs_file_path(new_table, abs_file_path,
759 					   tablespace_name);
760 
761 			new_name = static_cast<char*>(ut_malloc_nokey(
762 				(abs_file_path.length() + 1)));
763 			strcpy(new_name, abs_file_path.c_str());
764 			new_len = strlen(new_name) + 1;
765 		}
766 
767 		fil_name_process(name, len, space_id, false);
768 		fil_name_process( new_name, new_len, space_id, false);
769 
770 		if (!fil_op_replay_rename(
771 			space_id, first_page_no,
772 			name,
773 			new_name)) {
774 			recv_sys->found_corrupt_fs = true;
775 		}
776 	}
777 
778 	if (!recv_is_making_a_backup) {
779 		ut_free(name);
780 		ut_free(new_name);
781 	}
782 	return(end_ptr);
783 }
784 #endif /* UNIV_HOTBACKUP */
785 
786 /********************************************************//**
787 Creates the recovery system. */
788 void
recv_sys_create(void)789 recv_sys_create(void)
790 /*=================*/
791 {
792 	if (recv_sys != NULL) {
793 
794 		return;
795 	}
796 
797 	recv_sys = static_cast<recv_sys_t*>(ut_zalloc_nokey(sizeof(*recv_sys)));
798 
799 	mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
800 	mutex_create(LATCH_ID_RECV_WRITER, &recv_sys->writer_mutex);
801 
802 	recv_sys->heap = NULL;
803 	recv_sys->addr_hash = NULL;
804 }
805 
806 /********************************************************//**
807 Release recovery system mutexes. */
808 void
recv_sys_close(void)809 recv_sys_close(void)
810 /*================*/
811 {
812 	if (recv_sys != NULL) {
813 		if (recv_sys->addr_hash != NULL) {
814 			hash_table_free(recv_sys->addr_hash);
815 		}
816 
817 		if (recv_sys->heap != NULL) {
818 			mem_heap_free(recv_sys->heap);
819 		}
820 #ifndef UNIV_HOTBACKUP
821 		if (recv_sys->flush_start != NULL) {
822 			os_event_destroy(recv_sys->flush_start);
823 		}
824 
825 		if (recv_sys->flush_end != NULL) {
826 			os_event_destroy(recv_sys->flush_end);
827 		}
828 #endif /* !UNIV_HOTBACKUP */
829 		ut_free(recv_sys->buf);
830 		ut_free(recv_sys->last_block_buf_start);
831 
832 #ifndef UNIV_HOTBACKUP
833 		ut_ad(!recv_writer_thread_active);
834 		mutex_free(&recv_sys->writer_mutex);
835 #endif /* !UNIV_HOTBACKUP */
836 
837 		mutex_free(&recv_sys->mutex);
838 
839 		ut_free(recv_sys);
840 		recv_sys = NULL;
841 	}
842 
843 	recv_spaces.clear();
844 }
845 
846 /********************************************************//**
847 Frees the recovery system memory. */
848 void
recv_sys_mem_free(void)849 recv_sys_mem_free(void)
850 /*===================*/
851 {
852 	if (recv_sys != NULL) {
853 		if (recv_sys->addr_hash != NULL) {
854 			hash_table_free(recv_sys->addr_hash);
855 		}
856 
857 		if (recv_sys->heap != NULL) {
858 			mem_heap_free(recv_sys->heap);
859 		}
860 #ifndef UNIV_HOTBACKUP
861 		if (recv_sys->flush_start != NULL) {
862 			os_event_destroy(recv_sys->flush_start);
863 		}
864 
865 		if (recv_sys->flush_end != NULL) {
866 			os_event_destroy(recv_sys->flush_end);
867 		}
868 #endif /* !UNIV_HOTBACKUP */
869 		ut_free(recv_sys->buf);
870 		ut_free(recv_sys->last_block_buf_start);
871 		ut_free(recv_sys);
872 		recv_sys = NULL;
873 	}
874 }
875 
876 #ifndef UNIV_HOTBACKUP
877 /************************************************************
878 Reset the state of the recovery system variables. */
879 void
recv_sys_var_init(void)880 recv_sys_var_init(void)
881 /*===================*/
882 {
883 	recv_recovery_on = false;
884 	recv_needed_recovery = false;
885 	recv_lsn_checks_on = false;
886 	recv_no_ibuf_operations = false;
887 	recv_scan_print_counter	= 0;
888 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
889 	recv_previous_parsed_rec_offset	= 0;
890 	recv_previous_parsed_rec_is_multi = 0;
891 	recv_n_pool_free_frames	= 256;
892 	recv_max_page_lsn = 0;
893 }
894 
895 /******************************************************************//**
896 recv_writer thread tasked with flushing dirty pages from the buffer
897 pools.
898 @return a dummy parameter */
899 extern "C"
900 os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)901 DECLARE_THREAD(recv_writer_thread)(
902 /*===============================*/
903 	void*	arg MY_ATTRIBUTE((unused)))
904 			/*!< in: a dummy parameter required by
905 			os_thread_create */
906 {
907 	my_thread_init();
908 	ut_ad(!srv_read_only_mode);
909 
910 #ifdef UNIV_PFS_THREAD
911 	pfs_register_thread(recv_writer_thread_key);
912 #endif /* UNIV_PFS_THREAD */
913 
914 #ifdef UNIV_DEBUG_THREAD_CREATION
915 	ib::info() << "recv_writer thread running, id "
916 		<< os_thread_pf(os_thread_get_curr_id());
917 #endif /* UNIV_DEBUG_THREAD_CREATION */
918 
919 	recv_writer_thread_active = true;
920 
921 	while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
922 
923 		os_thread_sleep(100000);
924 
925 		mutex_enter(&recv_sys->writer_mutex);
926 
927 		if (!recv_recovery_on) {
928 			mutex_exit(&recv_sys->writer_mutex);
929 			break;
930 		}
931 
932 		/* Flush pages from end of LRU if required */
933 		os_event_reset(recv_sys->flush_end);
934 		recv_sys->flush_type = BUF_FLUSH_LRU;
935 		os_event_set(recv_sys->flush_start);
936 		os_event_wait(recv_sys->flush_end);
937 
938 		mutex_exit(&recv_sys->writer_mutex);
939 	}
940 
941 	recv_writer_thread_active = false;
942 
943 	my_thread_end();
944 	/* We count the number of threads in os_thread_exit().
945 	A created thread should always use that to exit and not
946 	use return() to exit. */
947 	os_thread_exit();
948 
949 	OS_THREAD_DUMMY_RETURN;
950 }
951 #endif /* !UNIV_HOTBACKUP */
952 
953 /************************************************************
954 Inits the recovery system for a recovery operation. */
955 void
recv_sys_init(ulint available_memory)956 recv_sys_init(
957 /*==========*/
958 	ulint	available_memory)	/*!< in: available memory in bytes */
959 {
960 	if (recv_sys->heap != NULL) {
961 
962 		return;
963 	}
964 
965 #ifndef UNIV_HOTBACKUP
966 	mutex_enter(&(recv_sys->mutex));
967 
968 	recv_sys->heap = mem_heap_create_typed(256,
969 					MEM_HEAP_FOR_RECV_SYS);
970 
971 	if (!srv_read_only_mode) {
972 		recv_sys->flush_start = os_event_create(0);
973 		recv_sys->flush_end = os_event_create(0);
974 	}
975 #else /* !UNIV_HOTBACKUP */
976 	recv_sys->heap = mem_heap_create(256);
977 	recv_is_from_backup = true;
978 #endif /* !UNIV_HOTBACKUP */
979 
980 	/* Set appropriate value of recv_n_pool_free_frames. */
981 	if (buf_pool_get_curr_size() >= (10 * 1024 * 1024)) {
982 		/* Buffer pool of size greater than 10 MB. */
983 		recv_n_pool_free_frames = 512;
984 	}
985 
986 	recv_sys->buf = static_cast<byte*>(
987 		ut_malloc_nokey(RECV_PARSING_BUF_SIZE));
988 	recv_sys->len = 0;
989 	recv_sys->recovered_offset = 0;
990 
991 	recv_sys->addr_hash = hash_create(available_memory / 512);
992 	recv_sys->n_addrs = 0;
993 
994 	recv_sys->apply_log_recs = FALSE;
995 	recv_sys->apply_batch_on = FALSE;
996 
997 	recv_sys->last_block_buf_start = static_cast<byte*>(
998 		ut_malloc_nokey(2 * OS_FILE_LOG_BLOCK_SIZE));
999 
1000 	recv_sys->last_block = static_cast<byte*>(ut_align(
1001 		recv_sys->last_block_buf_start, OS_FILE_LOG_BLOCK_SIZE));
1002 
1003 	recv_sys->found_corrupt_log = false;
1004 	recv_sys->found_corrupt_fs = false;
1005 	recv_sys->mlog_checkpoint_lsn = 0;
1006 
1007 	recv_max_page_lsn = 0;
1008 
1009 	/* Call the constructor for recv_sys_t::dblwr member */
1010 	new (&recv_sys->dblwr) recv_dblwr_t();
1011 
1012 	recv_sys->encryption_list = NULL;
1013 	mutex_exit(&(recv_sys->mutex));
1014 }
1015 
1016 /********************************************************//**
1017 Empties the hash table when it has been fully processed. */
1018 static
1019 void
recv_sys_empty_hash(void)1020 recv_sys_empty_hash(void)
1021 /*=====================*/
1022 {
1023 	ut_ad(mutex_own(&(recv_sys->mutex)));
1024 
1025 	if (recv_sys->n_addrs != 0) {
1026 		ib::fatal() << recv_sys->n_addrs << " pages with log records"
1027 			" were left unprocessed!";
1028 	}
1029 
1030 	hash_table_free(recv_sys->addr_hash);
1031 	mem_heap_empty(recv_sys->heap);
1032 
1033 	recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
1034 }
1035 
1036 #ifndef UNIV_HOTBACKUP
1037 
1038 /********************************************************//**
1039 Frees the recovery system. */
1040 void
recv_sys_debug_free(void)1041 recv_sys_debug_free(void)
1042 /*=====================*/
1043 {
1044 	mutex_enter(&(recv_sys->mutex));
1045 
1046 	hash_table_free(recv_sys->addr_hash);
1047 	mem_heap_free(recv_sys->heap);
1048 	ut_free(recv_sys->buf);
1049 	ut_free(recv_sys->last_block_buf_start);
1050 
1051 	recv_sys->buf = NULL;
1052 	recv_sys->heap = NULL;
1053 	recv_sys->addr_hash = NULL;
1054 	recv_sys->last_block_buf_start = NULL;
1055 
1056 	/* wake page cleaner up to progress */
1057 	if (!srv_read_only_mode) {
1058 		ut_ad(!recv_recovery_on);
1059 		ut_ad(!recv_writer_thread_active);
1060 		os_event_reset(buf_flush_event);
1061 		os_event_set(recv_sys->flush_start);
1062 	}
1063 
1064 	if (recv_sys->encryption_list != NULL) {
1065 		encryption_list_t::iterator	it;
1066 
1067 		for (it = recv_sys->encryption_list->begin();
1068 		     it != recv_sys->encryption_list->end();
1069 		     it++) {
1070 			if (it->key != NULL) {
1071 				ut_free(it->key);
1072 				it->key = NULL;
1073 			}
1074 			if (it->iv != NULL) {
1075 				ut_free(it->iv);
1076 				it->iv = NULL;
1077 			}
1078 		}
1079 
1080 		recv_sys->encryption_list->swap(*recv_sys->encryption_list);
1081 
1082 		UT_DELETE(recv_sys->encryption_list);
1083 		recv_sys->encryption_list = NULL;
1084 	}
1085 
1086 	mutex_exit(&(recv_sys->mutex));
1087 }
1088 
1089 /********************************************************//**
1090 Copies a log segment from the most up-to-date log group to the other log
1091 groups, so that they all contain the latest log data. Also writes the info
1092 about the latest checkpoint to the groups, and inits the fields in the group
1093 memory structs to up-to-date values. */
1094 static
1095 void
recv_synchronize_groups(void)1096 recv_synchronize_groups(void)
1097 /*=========================*/
1098 {
1099 	lsn_t		start_lsn;
1100 	lsn_t		end_lsn;
1101 	lsn_t		recovered_lsn;
1102 
1103 	recovered_lsn = recv_sys->recovered_lsn;
1104 
1105 	/* Read the last recovered log block to the recovery system buffer:
1106 	the block is always incomplete */
1107 
1108 	start_lsn = ut_uint64_align_down(recovered_lsn,
1109 					 OS_FILE_LOG_BLOCK_SIZE);
1110 	end_lsn = ut_uint64_align_up(recovered_lsn, OS_FILE_LOG_BLOCK_SIZE);
1111 
1112 	ut_a(start_lsn != end_lsn);
1113 
1114 	log_group_read_log_seg(recv_sys->last_block,
1115 			       UT_LIST_GET_FIRST(log_sys->log_groups),
1116 			       start_lsn, end_lsn);
1117 
1118 	for (log_group_t* group = UT_LIST_GET_FIRST(log_sys->log_groups);
1119 	     group;
1120 	     group = UT_LIST_GET_NEXT(log_groups, group)) {
1121 		/* Update the fields in the group struct to correspond to
1122 		recovered_lsn */
1123 
1124 		log_group_set_fields(group, recovered_lsn);
1125 	}
1126 
1127 	/* Copy the checkpoint info to the log; remember that we have
1128 	incremented checkpoint_no by one, and the info will not be written
1129 	over the max checkpoint info, thus making the preservation of max
1130 	checkpoint info on disk certain */
1131 
1132 	log_write_checkpoint_info(true);
1133 	log_mutex_enter();
1134 }
1135 #endif /* !UNIV_HOTBACKUP */
1136 
1137 /** Check the consistency of a log header block.
1138 @param[in]	log header block
1139 @return true if ok */
1140 static
1141 bool
recv_check_log_header_checksum(const byte * buf)1142 recv_check_log_header_checksum(
1143 	const byte*	buf)
1144 {
1145 	return(log_block_get_checksum(buf)
1146 	       == log_block_calc_checksum_crc32(buf));
1147 }
1148 
1149 #ifndef UNIV_HOTBACKUP
1150 /** Find the latest checkpoint in the format-0 log header.
1151 @param[out]	max_group	log group, or NULL
1152 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1153 @return error code or DB_SUCCESS */
1154 static MY_ATTRIBUTE((warn_unused_result))
1155 dberr_t
recv_find_max_checkpoint_0(log_group_t ** max_group,ulint * max_field)1156 recv_find_max_checkpoint_0(
1157 	log_group_t**	max_group,
1158 	ulint*		max_field)
1159 {
1160 	log_group_t*	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1161 	ib_uint64_t	max_no = 0;
1162 	ib_uint64_t	checkpoint_no;
1163 	byte*		buf	= log_sys->checkpoint_buf;
1164 
1165 	ut_ad(group->format == 0);
1166 	ut_ad(UT_LIST_GET_NEXT(log_groups, group) == NULL);
1167 
1168 	/** Offset of the first checkpoint checksum */
1169 	static const uint CHECKSUM_1 = 288;
1170 	/** Offset of the second checkpoint checksum */
1171 	static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1172 	/** Most significant bits of the checkpoint offset */
1173 	static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1174 	/** Least significant bits of the checkpoint offset */
1175 	static const uint OFFSET_LOW32 = 16;
1176 
1177 	for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1178 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1179 		log_group_header_read(group, field);
1180 
1181 		if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1182 		    != mach_read_from_4(buf + CHECKSUM_1)
1183 		    || static_cast<uint32_t>(
1184 			    ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1185 					   CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1186 		    != mach_read_from_4(buf + CHECKSUM_2)) {
1187 			DBUG_PRINT("ib_log",
1188 				   ("invalid pre-5.7.9 checkpoint " ULINTPF,
1189 				    field));
1190 			continue;
1191 		}
1192 
1193 		group->state = LOG_GROUP_OK;
1194 
1195 		group->lsn = mach_read_from_8(
1196 			buf + LOG_CHECKPOINT_LSN);
1197 		group->lsn_offset = static_cast<ib_uint64_t>(
1198 			mach_read_from_4(buf + OFFSET_HIGH32)) << 32
1199 			| mach_read_from_4(buf + OFFSET_LOW32);
1200 		checkpoint_no = mach_read_from_8(
1201 			buf + LOG_CHECKPOINT_NO);
1202 
1203 		DBUG_PRINT("ib_log",
1204 			   ("checkpoint " UINT64PF " at " LSN_PF
1205 			    " found in group " ULINTPF,
1206 			    checkpoint_no, group->lsn, group->id));
1207 
1208 		if (checkpoint_no >= max_no) {
1209 			*max_group = group;
1210 			*max_field = field;
1211 			max_no = checkpoint_no;
1212 		}
1213 	}
1214 
1215 	if (*max_group != NULL) {
1216 		return(DB_SUCCESS);
1217 	}
1218 
1219 	ib::error() << "Upgrade after a crash is not supported."
1220 		" This redo log was created before MySQL 5.7.9,"
1221 		" and we did not find a valid checkpoint."
1222 		" Please follow the instructions at"
1223 		" " REFMAN "upgrading.html";
1224 	return(DB_ERROR);
1225 }
1226 
1227 /** Determine if a pre-5.7.9 redo log is clean.
1228 @param[in]	lsn	checkpoint LSN
1229 @return error code
1230 @retval	DB_SUCCESS	if the redo log is clean
1231 @retval DB_ERROR	if the redo log is corrupted or dirty */
1232 static
1233 dberr_t
recv_log_format_0_recover(lsn_t lsn)1234 recv_log_format_0_recover(lsn_t lsn)
1235 {
1236 	log_mutex_enter();
1237 	log_group_t*	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1238 	const lsn_t	source_offset
1239 		= log_group_calc_lsn_offset(lsn, group);
1240 	log_mutex_exit();
1241 	const ulint	page_no
1242 		= (ulint) (source_offset / univ_page_size.physical());
1243 	byte*		buf = log_sys->buf;
1244 
1245 	static const char* NO_UPGRADE_RECOVERY_MSG =
1246 		"Upgrade after a crash is not supported."
1247 		" This redo log was created before MySQL 5.7.9";
1248 	static const char* NO_UPGRADE_RTFM_MSG =
1249 		". Please follow the instructions at "
1250 		REFMAN "upgrading.html";
1251 
1252 	fil_io(IORequestLogRead, true,
1253 	       page_id_t(group->space_id, page_no),
1254 	       univ_page_size,
1255 	       (ulint) ((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1256 			% univ_page_size.physical()),
1257 	       OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1258 
1259 	if (log_block_calc_checksum_format_0(buf)
1260 	    != log_block_get_checksum(buf)) {
1261 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1262 			<< ", and it appears corrupted"
1263 			<< NO_UPGRADE_RTFM_MSG;
1264 		return(DB_CORRUPTION);
1265 	}
1266 
1267 	if (log_block_get_data_len(buf)
1268 	    != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1269 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1270 			<< NO_UPGRADE_RTFM_MSG;
1271 		return(DB_ERROR);
1272 	}
1273 
1274 	/* Mark the redo log for upgrading. */
1275 	srv_log_file_size = 0;
1276 	recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1277 		= recv_sys->scanned_lsn
1278 		= recv_sys->mlog_checkpoint_lsn = lsn;
1279 	log_sys->last_checkpoint_lsn = log_sys->next_checkpoint_lsn
1280 		= log_sys->lsn = log_sys->write_lsn
1281 		= log_sys->current_flush_lsn = log_sys->flushed_to_disk_lsn
1282 		= lsn;
1283 	log_sys->next_checkpoint_no = 0;
1284 	return(DB_SUCCESS);
1285 }
1286 
1287 /** Find the latest checkpoint in the log header.
1288 @param[out]	max_group	log group, or NULL
1289 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1290 @return error code or DB_SUCCESS */
1291 static MY_ATTRIBUTE((warn_unused_result))
1292 dberr_t
recv_find_max_checkpoint(log_group_t ** max_group,ulint * max_field)1293 recv_find_max_checkpoint(
1294 	log_group_t**	max_group,
1295 	ulint*		max_field)
1296 {
1297 	log_group_t*	group;
1298 	ib_uint64_t	max_no;
1299 	ib_uint64_t	checkpoint_no;
1300 	ulint		field;
1301 	byte*		buf;
1302 
1303 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
1304 
1305 	max_no = 0;
1306 	*max_group = NULL;
1307 	*max_field = 0;
1308 
1309 	buf = log_sys->checkpoint_buf;
1310 
1311 	while (group) {
1312 		group->state = LOG_GROUP_CORRUPTED;
1313 
1314 		log_group_header_read(group, 0);
1315 		/* Check the header page checksum. There was no
1316 		checksum in the first redo log format (version 0). */
1317 		group->format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1318 		if (group->format != 0
1319 		    && !recv_check_log_header_checksum(buf)) {
1320 			ib::error() << "Invalid redo log header checksum.";
1321 			return(DB_CORRUPTION);
1322 		}
1323 
1324 		switch (group->format) {
1325 		case 0:
1326 			return(recv_find_max_checkpoint_0(
1327 				       max_group, max_field));
1328 		case LOG_HEADER_FORMAT_CURRENT:
1329 			break;
1330 		default:
1331 			/* Ensure that the string is NUL-terminated. */
1332 			buf[LOG_HEADER_CREATOR_END] = 0;
1333 			ib::error() << "Unsupported redo log format."
1334 				" The redo log was created"
1335 				" with " << buf + LOG_HEADER_CREATOR <<
1336 				". Please follow the instructions at "
1337 				REFMAN "upgrading-downgrading.html";
1338 			/* Do not issue a message about a possibility
1339 			to cleanly shut down the newer server version
1340 			and to remove the redo logs, because the
1341 			format of the system data structures may
1342 			radically change after MySQL 5.7. */
1343 			return(DB_ERROR);
1344 		}
1345 
1346 		for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1347 		     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1348 
1349 			log_group_header_read(group, field);
1350 
1351 			if (!recv_check_log_header_checksum(buf)) {
1352 				DBUG_PRINT("ib_log",
1353 					   ("invalid checkpoint,"
1354 					    " group " ULINTPF " at " ULINTPF
1355 					    ", checksum %x",
1356 					    group->id, field,
1357 					    (unsigned) log_block_get_checksum(
1358 						    buf)));
1359 				continue;
1360 			}
1361 
1362 			group->state = LOG_GROUP_OK;
1363 
1364 			group->lsn = mach_read_from_8(
1365 				buf + LOG_CHECKPOINT_LSN);
1366 			group->lsn_offset = mach_read_from_8(
1367 				buf + LOG_CHECKPOINT_OFFSET);
1368 			checkpoint_no = mach_read_from_8(
1369 				buf + LOG_CHECKPOINT_NO);
1370 
1371 			DBUG_PRINT("ib_log",
1372 				   ("checkpoint " UINT64PF " at " LSN_PF
1373 				    " found in group " ULINTPF,
1374 				    checkpoint_no, group->lsn, group->id));
1375 
1376 			if (checkpoint_no >= max_no) {
1377 				*max_group = group;
1378 				*max_field = field;
1379 				max_no = checkpoint_no;
1380 			}
1381 		}
1382 
1383 		group = UT_LIST_GET_NEXT(log_groups, group);
1384 	}
1385 
1386 	if (*max_group == NULL) {
1387 		/* Before 5.7.9, we could get here during database
1388 		initialization if we created an ib_logfile0 file that
1389 		was filled with zeroes, and were killed. After
1390 		5.7.9, we would reject such a file already earlier,
1391 		when checking the file header. */
1392 		ib::error() << "No valid checkpoint found"
1393 			" (corrupted redo log)."
1394 			" You can try --innodb-force-recovery=6"
1395 			" as a last resort.";
1396 		return(DB_ERROR);
1397 	}
1398 
1399 	return(DB_SUCCESS);
1400 }
1401 #else /* !UNIV_HOTBACKUP */
1402 /*******************************************************************//**
1403 Reads the checkpoint info needed in hot backup.
1404 @return TRUE if success */
1405 ibool
recv_read_checkpoint_info_for_backup(const byte * hdr,lsn_t * lsn,lsn_t * offset,lsn_t * cp_no,lsn_t * first_header_lsn)1406 recv_read_checkpoint_info_for_backup(
1407 /*=================================*/
1408 	const byte*	hdr,	/*!< in: buffer containing the log group
1409 				header */
1410 	lsn_t*		lsn,	/*!< out: checkpoint lsn */
1411 	lsn_t*		offset,	/*!< out: checkpoint offset in the log group */
1412 	lsn_t*		cp_no,	/*!< out: checkpoint number */
1413 	lsn_t*		first_header_lsn)
1414 				/*!< out: lsn of of the start of the
1415 				first log file */
1416 {
1417 	ulint		max_cp		= 0;
1418 	ib_uint64_t	max_cp_no	= 0;
1419 	const byte*	cp_buf;
1420 
1421 	cp_buf = hdr + LOG_CHECKPOINT_1;
1422 
1423 	if (recv_check_log_header_checksum(cp_buf)) {
1424 		max_cp_no = mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO);
1425 		max_cp = LOG_CHECKPOINT_1;
1426 	}
1427 
1428 	cp_buf = hdr + LOG_CHECKPOINT_2;
1429 
1430 	if (recv_check_log_header_checksum(cp_buf)) {
1431 		if (mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO) > max_cp_no) {
1432 			max_cp = LOG_CHECKPOINT_2;
1433 		}
1434 	}
1435 
1436 	if (max_cp == 0) {
1437 		return(FALSE);
1438 	}
1439 
1440 	cp_buf = hdr + max_cp;
1441 
1442 	*lsn = mach_read_from_8(cp_buf + LOG_CHECKPOINT_LSN);
1443 	*offset = mach_read_from_8(
1444 		cp_buf + LOG_CHECKPOINT_OFFSET);
1445 
1446 	*cp_no = mach_read_from_8(cp_buf + LOG_CHECKPOINT_NO);
1447 
1448 	*first_header_lsn = mach_read_from_8(hdr + LOG_HEADER_START_LSN);
1449 
1450 	return(TRUE);
1451 }
1452 #endif /* !UNIV_HOTBACKUP */
1453 
1454 /** Check the 4-byte checksum to the trailer checksum field of a log
1455 block.
1456 @param[in]	log block
1457 @return whether the checksum matches */
1458 static
1459 bool
log_block_checksum_is_ok(const byte * block)1460 log_block_checksum_is_ok(
1461 	const byte*	block)	/*!< in: pointer to a log block */
1462 {
1463 	return(!innodb_log_checksums
1464 	       || log_block_get_checksum(block)
1465 	       == log_block_calc_checksum(block));
1466 }
1467 
1468 #ifdef UNIV_HOTBACKUP
1469 /*******************************************************************//**
1470 Scans the log segment and n_bytes_scanned is set to the length of valid
1471 log scanned. */
1472 void
recv_scan_log_seg_for_backup(byte * buf,ulint buf_len,lsn_t * scanned_lsn,ulint * scanned_checkpoint_no,ulint * n_bytes_scanned)1473 recv_scan_log_seg_for_backup(
1474 /*=========================*/
1475 	byte*		buf,		/*!< in: buffer containing log data */
1476 	ulint		buf_len,	/*!< in: data length in that buffer */
1477 	lsn_t*		scanned_lsn,	/*!< in/out: lsn of buffer start,
1478 					we return scanned lsn */
1479 	ulint*		scanned_checkpoint_no,
1480 					/*!< in/out: 4 lowest bytes of the
1481 					highest scanned checkpoint number so
1482 					far */
1483 	ulint*		n_bytes_scanned)/*!< out: how much we were able to
1484 					scan, smaller than buf_len if log
1485 					data ended here */
1486 {
1487 	ulint	data_len;
1488 	byte*	log_block;
1489 	ulint	no;
1490 
1491 	*n_bytes_scanned = 0;
1492 
1493 	for (log_block = buf; log_block < buf + buf_len;
1494 	     log_block += OS_FILE_LOG_BLOCK_SIZE) {
1495 
1496 		no = log_block_get_hdr_no(log_block);
1497 
1498 #if 0
1499 		fprintf(stderr, "Log block header no %lu\n", no);
1500 #endif
1501 
1502 		if (no != log_block_convert_lsn_to_no(*scanned_lsn)
1503 		    || !log_block_checksum_is_ok(log_block)) {
1504 #if 0
1505 			fprintf(stderr,
1506 				"Log block n:o %lu, scanned lsn n:o %lu\n",
1507 				no, log_block_convert_lsn_to_no(*scanned_lsn));
1508 #endif
1509 			/* Garbage or an incompletely written log block */
1510 
1511 			log_block += OS_FILE_LOG_BLOCK_SIZE;
1512 #if 0
1513 			fprintf(stderr,
1514 				"Next log block n:o %lu\n",
1515 				log_block_get_hdr_no(log_block));
1516 #endif
1517 			break;
1518 		}
1519 
1520 		if (*scanned_checkpoint_no > 0
1521 		    && log_block_get_checkpoint_no(log_block)
1522 		    < *scanned_checkpoint_no
1523 		    && *scanned_checkpoint_no
1524 		    - log_block_get_checkpoint_no(log_block)
1525 		    > 0x80000000UL) {
1526 
1527 			/* Garbage from a log buffer flush which was made
1528 			before the most recent database recovery */
1529 #if 0
1530 			fprintf(stderr,
1531 				"Scanned cp n:o %lu, block cp n:o %lu\n",
1532 				*scanned_checkpoint_no,
1533 				log_block_get_checkpoint_no(log_block));
1534 #endif
1535 			break;
1536 		}
1537 
1538 		data_len = log_block_get_data_len(log_block);
1539 
1540 		*scanned_checkpoint_no
1541 			= log_block_get_checkpoint_no(log_block);
1542 		*scanned_lsn += data_len;
1543 
1544 		*n_bytes_scanned += data_len;
1545 
1546 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
1547 			/* Log data ends here */
1548 
1549 #if 0
1550 			fprintf(stderr, "Log block data len %lu\n",
1551 				data_len);
1552 #endif
1553 			break;
1554 		}
1555 	}
1556 }
1557 #endif /* UNIV_HOTBACKUP */
1558 
1559 /** Parse or process a write encryption info record.
1560 @param[in]	ptr		redo log record
1561 @param[in]	end		end of the redo log buffer
1562 @param[in]	space_id	the tablespace ID
1563 @return log record end, NULL if not a complete record */
1564 static
1565 byte*
fil_write_encryption_parse(byte * ptr,const byte * end,ulint space_id)1566 fil_write_encryption_parse(
1567 	byte*		ptr,
1568 	const byte*	end,
1569 	ulint		space_id)
1570 {
1571 	fil_space_t*	space;
1572 	ulint		offset;
1573 	ulint		len;
1574 	byte*		key = NULL;
1575 	byte*		iv = NULL;
1576 	bool		is_new = false;
1577 
1578 	space = fil_space_get(space_id);
1579 	if (space == NULL) {
1580 		encryption_list_t::iterator	it;
1581 
1582 		if (recv_sys->encryption_list == NULL) {
1583 			recv_sys->encryption_list =
1584 				UT_NEW_NOKEY(encryption_list_t());
1585 		}
1586 
1587 		for (it = recv_sys->encryption_list->begin();
1588 		     it != recv_sys->encryption_list->end();
1589 		     it++) {
1590 			if (it->space_id == space_id) {
1591 				key = it->key;
1592 				iv = it->iv;
1593 			}
1594 		}
1595 
1596 		if (key == NULL) {
1597 			key = static_cast<byte*>(ut_malloc_nokey(
1598 					ENCRYPTION_KEY_LEN));
1599 			iv = static_cast<byte*>(ut_malloc_nokey(
1600 					ENCRYPTION_KEY_LEN));
1601 			is_new = true;
1602 		}
1603 	} else {
1604 		key = space->encryption_key;
1605 		iv = space->encryption_iv;
1606 	}
1607 
1608 	offset = mach_read_from_2(ptr);
1609 	ptr += 2;
1610 	len = mach_read_from_2(ptr);
1611 
1612 	ptr += 2;
1613 	if (end < ptr + len) {
1614 		return(NULL);
1615 	}
1616 
1617 	if (offset >= UNIV_PAGE_SIZE
1618 	    || len + offset > UNIV_PAGE_SIZE
1619 	    || (len != ENCRYPTION_INFO_SIZE_V1
1620 		&& len != ENCRYPTION_INFO_SIZE_V2)) {
1621 		recv_sys->found_corrupt_log = TRUE;
1622 		return(NULL);
1623 	}
1624 
1625 #ifdef	UNIV_ENCRYPT_DEBUG
1626 	if (space) {
1627 		fprintf(stderr, "Got %lu from redo log:", space->id);
1628 	}
1629 #endif
1630 	if (!fsp_header_decode_encryption_info(key,
1631 					       iv,
1632 					       ptr)) {
1633 		recv_sys->found_corrupt_log = TRUE;
1634 		ib::warn() << "Encryption information"
1635 			<< " in the redo log of space "
1636 			<< space_id << " is invalid";
1637 	}
1638 
1639 	ut_ad(len == ENCRYPTION_INFO_SIZE_V1
1640 	      || len == ENCRYPTION_INFO_SIZE_V2);
1641 
1642 	ptr += len;
1643 
1644 	if (space == NULL) {
1645 		if (is_new) {
1646 			recv_encryption_t info;
1647 
1648 			/* Add key and iv to list */
1649 			info.space_id = space_id;
1650 			info.key = key;
1651 			info.iv = iv;
1652 
1653 			recv_sys->encryption_list->push_back(info);
1654 		}
1655 	} else {
1656 		ut_ad(FSP_FLAGS_GET_ENCRYPTION(space->flags));
1657 
1658 		space->encryption_type = Encryption::AES;
1659 		space->encryption_klen = ENCRYPTION_KEY_LEN;
1660 	}
1661 
1662 	return(ptr);
1663 }
1664 
1665 /** Try to parse a single log record body and also applies it if
1666 specified.
1667 @param[in]	type		redo log entry type
1668 @param[in]	ptr		redo log record body
1669 @param[in]	end_ptr		end of buffer
1670 @param[in]	space_id	tablespace identifier
1671 @param[in]	page_no		page number
1672 @param[in,out]	block		buffer block, or NULL if
1673 a page log record should not be applied
1674 or if it is a MLOG_FILE_ operation
1675 @param[in,out]	mtr		mini-transaction, or NULL if
1676 a page log record should not be applied
1677 @return log record end, NULL if not a complete record */
1678 static
1679 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,ulint space_id,ulint page_no,buf_block_t * block,mtr_t * mtr)1680 recv_parse_or_apply_log_rec_body(
1681 	mlog_id_t	type,
1682 	byte*		ptr,
1683 	byte*		end_ptr,
1684 	ulint		space_id,
1685 	ulint		page_no,
1686 	buf_block_t*	block,
1687 	mtr_t*		mtr)
1688 {
1689 	ut_ad(!block == !mtr);
1690 
1691 	switch (type) {
1692 	case MLOG_FILE_NAME:
1693 	case MLOG_FILE_DELETE:
1694 	case MLOG_FILE_CREATE2:
1695 	case MLOG_FILE_RENAME2:
1696 		ut_ad(block == NULL);
1697 		/* Collect the file names when parsing the log,
1698 		before applying any log records. */
1699 		return(fil_name_parse(ptr, end_ptr, space_id, page_no, type));
1700 	case MLOG_INDEX_LOAD:
1701 #ifdef UNIV_HOTBACKUP
1702 		/* While scaning redo logs during  backup phase a
1703 		MLOG_INDEX_LOAD type redo log record indicates a DDL
1704 		(create index, alter table...)is performed with
1705 		'algorithm=inplace'. This redo log indicates that
1706 
1707 		1. The DDL was started after MEB started backing up, in which
1708 		case MEB will not be able to take a consistent backup and should
1709 		fail. or
1710 		2. There is a possibility of this record existing in the REDO
1711 		even after the completion of the index create operation. This is
1712 		because of InnoDB does  not checkpointing after the flushing the
1713 		index pages.
1714 
1715 		If MEB gets the last_redo_flush_lsn and that is less than the
1716 		lsn of the current record MEB fails the backup process.
1717 		Error out in case of online backup and emit a warning in case
1718 		of offline backup and continue.
1719 		*/
1720 		if (!recv_recovery_on) {
1721 			if (is_online_redo_copy) {
1722 				if (backup_redo_log_flushed_lsn
1723 				    < recv_sys->recovered_lsn) {
1724 					ib::trace() << "Last flushed lsn: "
1725 						<< backup_redo_log_flushed_lsn
1726 						<< " load_index lsn "
1727 						<< recv_sys->recovered_lsn;
1728 
1729 					if (backup_redo_log_flushed_lsn == 0)
1730 						ib::error() << "MEB was not "
1731 							"able to determine the"
1732 							"InnoDB Engine Status";
1733 
1734 					ib::fatal() << "An optimized(without"
1735 						" redo logging) DDLoperation"
1736 						" has been performed. All"
1737 						" modified pages may not have"
1738 						" been flushed to the disk yet."
1739 						" \n    MEB will not be able"
1740 						" take a consistent backup."
1741 						" Retry the backup operation";
1742 				}
1743 				/** else the index is flushed to disk before
1744 				backup started hence no error */
1745 			} else {
1746 				/* offline backup */
1747 				ib::trace() << "Last flushed lsn: "
1748 					<< backup_redo_log_flushed_lsn
1749 					<< " load_index lsn "
1750 					<< recv_sys->recovered_lsn;
1751 
1752 				ib::warn() << "An optimized(without redo"
1753 					" logging) DDL operation has been"
1754 					" performed. All modified pages may not"
1755 					" have been flushed to the disk yet."
1756 					" \n    This offline backup may not"
1757 					" be consistent";
1758 			}
1759 		}
1760 #endif /* UNIV_HOTBACKUP */
1761 		if (end_ptr < ptr + 8) {
1762 			return(NULL);
1763 		}
1764 		return(ptr + 8);
1765 	case MLOG_TRUNCATE:
1766 		return(truncate_t::parse_redo_entry(ptr, end_ptr, space_id));
1767 	case MLOG_WRITE_STRING:
1768 		/* For encrypted tablespace, we need to get the
1769 		encryption key information before the page 0 is recovered.
1770 	        Otherwise, redo will not find the key to decrypt
1771 		the data pages. */
1772 		if (page_no == 0 && !is_system_tablespace(space_id)) {
1773 			return(fil_write_encryption_parse(ptr,
1774 							  end_ptr,
1775 							  space_id));
1776 		}
1777 		break;
1778 
1779 	default:
1780 		break;
1781 	}
1782 
1783 	dict_index_t*	index	= NULL;
1784 	page_t*		page;
1785 	page_zip_des_t*	page_zip;
1786 #ifdef UNIV_DEBUG
1787 	ulint		page_type;
1788 #endif /* UNIV_DEBUG */
1789 
1790 	if (block) {
1791 		/* Applying a page log record. */
1792 		page = block->frame;
1793 		page_zip = buf_block_get_page_zip(block);
1794 		ut_d(page_type = fil_page_get_type(page));
1795 	} else {
1796 		/* Parsing a page log record. */
1797 		page = NULL;
1798 		page_zip = NULL;
1799 		ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1800 	}
1801 
1802 	const byte*	old_ptr = ptr;
1803 
1804 	switch (type) {
1805 #ifdef UNIV_LOG_LSN_DEBUG
1806 	case MLOG_LSN:
1807 		/* The LSN is checked in recv_parse_log_rec(). */
1808 		break;
1809 #endif /* UNIV_LOG_LSN_DEBUG */
1810 	case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1811 #ifdef UNIV_DEBUG
1812 		if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1813 		    && end_ptr >= ptr + 2) {
1814 			/* It is OK to set FIL_PAGE_TYPE and certain
1815 			list node fields on an empty page.  Any other
1816 			write is not OK. */
1817 
1818 			/* NOTE: There may be bogus assertion failures for
1819 			dict_hdr_create(), trx_rseg_header_create(),
1820 			trx_sys_create_doublewrite_buf(), and
1821 			trx_sysf_create().
1822 			These are only called during database creation. */
1823 			ulint	offs = mach_read_from_2(ptr);
1824 
1825 			switch (type) {
1826 			default:
1827 				ut_error;
1828 			case MLOG_2BYTES:
1829 				/* Note that this can fail when the
1830 				redo log been written with something
1831 				older than InnoDB Plugin 1.0.4. */
1832 				ut_ad(offs == FIL_PAGE_TYPE
1833 				      || offs == IBUF_TREE_SEG_HEADER
1834 				      + IBUF_HEADER + FSEG_HDR_OFFSET
1835 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1836 				      + PAGE_HEADER + FIL_ADDR_BYTE
1837 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1838 				      + PAGE_HEADER + FIL_ADDR_BYTE
1839 				      + FIL_ADDR_SIZE
1840 				      || offs == PAGE_BTR_SEG_LEAF
1841 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1842 				      || offs == PAGE_BTR_SEG_TOP
1843 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1844 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1845 				      + PAGE_HEADER + FIL_ADDR_BYTE
1846 				      + 0 /*FLST_PREV*/
1847 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1848 				      + PAGE_HEADER + FIL_ADDR_BYTE
1849 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1850 				break;
1851 			case MLOG_4BYTES:
1852 				/* Note that this can fail when the
1853 				redo log been written with something
1854 				older than InnoDB Plugin 1.0.4. */
1855 				ut_ad(0
1856 				      || offs == IBUF_TREE_SEG_HEADER
1857 				      + IBUF_HEADER + FSEG_HDR_SPACE
1858 				      || offs == IBUF_TREE_SEG_HEADER
1859 				      + IBUF_HEADER + FSEG_HDR_PAGE_NO
1860 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1861 				      + PAGE_HEADER/* flst_init */
1862 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1863 				      + PAGE_HEADER + FIL_ADDR_PAGE
1864 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1865 				      + PAGE_HEADER + FIL_ADDR_PAGE
1866 				      + FIL_ADDR_SIZE
1867 				      || offs == PAGE_BTR_SEG_LEAF
1868 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1869 				      || offs == PAGE_BTR_SEG_LEAF
1870 				      + PAGE_HEADER + FSEG_HDR_SPACE
1871 				      || offs == PAGE_BTR_SEG_TOP
1872 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1873 				      || offs == PAGE_BTR_SEG_TOP
1874 				      + PAGE_HEADER + FSEG_HDR_SPACE
1875 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1876 				      + PAGE_HEADER + FIL_ADDR_PAGE
1877 				      + 0 /*FLST_PREV*/
1878 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1879 				      + PAGE_HEADER + FIL_ADDR_PAGE
1880 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1881 				break;
1882 			}
1883 		}
1884 #endif /* UNIV_DEBUG */
1885 		ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1886 		if (ptr != NULL && page != NULL
1887 		    && page_no == 0 && type == MLOG_4BYTES) {
1888 			ulint	offs = mach_read_from_2(old_ptr);
1889 			switch (offs) {
1890 				fil_space_t*	space;
1891 				ulint		val;
1892 			default:
1893 				break;
1894 			case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1895 			case FSP_HEADER_OFFSET + FSP_SIZE:
1896 			case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1897 			case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1898 				space = fil_space_get(space_id);
1899 				ut_a(space != NULL);
1900 				val = mach_read_from_4(page + offs);
1901 
1902 				switch (offs) {
1903 				case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1904 					space->flags = val;
1905 					break;
1906 				case FSP_HEADER_OFFSET + FSP_SIZE:
1907 					space->size_in_header = val;
1908 					break;
1909 				case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1910 					space->free_limit = val;
1911 					break;
1912 				case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1913 					space->free_len = val;
1914 					ut_ad(val == flst_get_len(
1915 						      page + offs));
1916 					break;
1917 				}
1918 			}
1919 		}
1920 		break;
1921 	case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1922 		ut_ad(!page || fil_page_type_is_index(page_type));
1923 
1924 		if (NULL != (ptr = mlog_parse_index(
1925 				     ptr, end_ptr,
1926 				     type == MLOG_COMP_REC_INSERT,
1927 				     &index))) {
1928 			ut_a(!page
1929 			     || (ibool)!!page_is_comp(page)
1930 			     == dict_table_is_comp(index->table));
1931 			ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1932 							block, index, mtr);
1933 		}
1934 		break;
1935 	case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1936 		ut_ad(!page || fil_page_type_is_index(page_type));
1937 
1938 		if (NULL != (ptr = mlog_parse_index(
1939 				     ptr, end_ptr,
1940 				     type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1941 				     &index))) {
1942 			ut_a(!page
1943 			     || (ibool)!!page_is_comp(page)
1944 			     == dict_table_is_comp(index->table));
1945 			ptr = btr_cur_parse_del_mark_set_clust_rec(
1946 				ptr, end_ptr, page, page_zip, index);
1947 		}
1948 		break;
1949 	case MLOG_COMP_REC_SEC_DELETE_MARK:
1950 		ut_ad(!page || fil_page_type_is_index(page_type));
1951 		/* This log record type is obsolete, but we process it for
1952 		backward compatibility with MySQL 5.0.3 and 5.0.4. */
1953 		ut_a(!page || page_is_comp(page));
1954 		ut_a(!page_zip);
1955 		ptr = mlog_parse_index(ptr, end_ptr, TRUE, &index);
1956 		if (!ptr) {
1957 			break;
1958 		}
1959 		/* Fall through */
1960 	case MLOG_REC_SEC_DELETE_MARK:
1961 		ut_ad(!page || fil_page_type_is_index(page_type));
1962 		ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1963 							 page, page_zip);
1964 		break;
1965 	case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1966 		ut_ad(!page || fil_page_type_is_index(page_type));
1967 
1968 		if (NULL != (ptr = mlog_parse_index(
1969 				     ptr, end_ptr,
1970 				     type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1971 				     &index))) {
1972 			ut_a(!page
1973 			     || (ibool)!!page_is_comp(page)
1974 			     == dict_table_is_comp(index->table));
1975 			ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1976 							    page_zip, index);
1977 		}
1978 		break;
1979 	case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1980 	case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1981 		ut_ad(!page || fil_page_type_is_index(page_type));
1982 
1983 		if (NULL != (ptr = mlog_parse_index(
1984 				     ptr, end_ptr,
1985 				     type == MLOG_COMP_LIST_END_DELETE
1986 				     || type == MLOG_COMP_LIST_START_DELETE,
1987 				     &index))) {
1988 			ut_a(!page
1989 			     || (ibool)!!page_is_comp(page)
1990 			     == dict_table_is_comp(index->table));
1991 			ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1992 							 block, index, mtr);
1993 		}
1994 		break;
1995 	case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1996 		ut_ad(!page || fil_page_type_is_index(page_type));
1997 
1998 		if (NULL != (ptr = mlog_parse_index(
1999 				     ptr, end_ptr,
2000 				     type == MLOG_COMP_LIST_END_COPY_CREATED,
2001 				     &index))) {
2002 			ut_a(!page
2003 			     || (ibool)!!page_is_comp(page)
2004 			     == dict_table_is_comp(index->table));
2005 			ptr = page_parse_copy_rec_list_to_created_page(
2006 				ptr, end_ptr, block, index, mtr);
2007 		}
2008 		break;
2009 	case MLOG_PAGE_REORGANIZE:
2010 	case MLOG_COMP_PAGE_REORGANIZE:
2011 	case MLOG_ZIP_PAGE_REORGANIZE:
2012 		ut_ad(!page || fil_page_type_is_index(page_type));
2013 
2014 		if (NULL != (ptr = mlog_parse_index(
2015 				     ptr, end_ptr,
2016 				     type != MLOG_PAGE_REORGANIZE,
2017 				     &index))) {
2018 			ut_a(!page
2019 			     || (ibool)!!page_is_comp(page)
2020 			     == dict_table_is_comp(index->table));
2021 			ptr = btr_parse_page_reorganize(
2022 				ptr, end_ptr, index,
2023 				type == MLOG_ZIP_PAGE_REORGANIZE,
2024 				block, mtr);
2025 		}
2026 		break;
2027 	case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
2028 		/* Allow anything in page_type when creating a page. */
2029 		ut_a(!page_zip);
2030 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
2031 		break;
2032 	case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
2033 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
2034 				  true);
2035 		break;
2036 	case MLOG_UNDO_INSERT:
2037 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2038 		ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
2039 		break;
2040 	case MLOG_UNDO_ERASE_END:
2041 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2042 		ptr = trx_undo_parse_erase_page_end(ptr, end_ptr, page, mtr);
2043 		break;
2044 	case MLOG_UNDO_INIT:
2045 		/* Allow anything in page_type when creating a page. */
2046 		ptr = trx_undo_parse_page_init(ptr, end_ptr, page, mtr);
2047 		break;
2048 	case MLOG_UNDO_HDR_DISCARD:
2049 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2050 		ptr = trx_undo_parse_discard_latest(ptr, end_ptr, page, mtr);
2051 		break;
2052 	case MLOG_UNDO_HDR_CREATE:
2053 	case MLOG_UNDO_HDR_REUSE:
2054 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
2055 		ptr = trx_undo_parse_page_header(type, ptr, end_ptr,
2056 						 page, mtr);
2057 		break;
2058 	case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
2059 		ut_ad(!page || fil_page_type_is_index(page_type));
2060 		/* On a compressed page, MLOG_COMP_REC_MIN_MARK
2061 		will be followed by MLOG_COMP_REC_DELETE
2062 		or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
2063 		in the same mini-transaction. */
2064 		ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
2065 		ptr = btr_parse_set_min_rec_mark(
2066 			ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
2067 			page, mtr);
2068 		break;
2069 	case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
2070 		ut_ad(!page || fil_page_type_is_index(page_type));
2071 
2072 		if (NULL != (ptr = mlog_parse_index(
2073 				     ptr, end_ptr,
2074 				     type == MLOG_COMP_REC_DELETE,
2075 				     &index))) {
2076 			ut_a(!page
2077 			     || (ibool)!!page_is_comp(page)
2078 			     == dict_table_is_comp(index->table));
2079 			ptr = page_cur_parse_delete_rec(ptr, end_ptr,
2080 							block, index, mtr);
2081 		}
2082 		break;
2083 	case MLOG_IBUF_BITMAP_INIT:
2084 		/* Allow anything in page_type when creating a page. */
2085 		ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
2086 		break;
2087 	case MLOG_INIT_FILE_PAGE:
2088 	case MLOG_INIT_FILE_PAGE2:
2089 		/* Allow anything in page_type when creating a page. */
2090 		ptr = fsp_parse_init_file_page(ptr, end_ptr, block);
2091 		break;
2092 	case MLOG_WRITE_STRING:
2093 		ut_ad(!page || page_type != FIL_PAGE_TYPE_ALLOCATED
2094 		      || page_no == 0);
2095 		ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
2096 		break;
2097 	case MLOG_ZIP_WRITE_NODE_PTR:
2098 		ut_ad(!page || fil_page_type_is_index(page_type));
2099 		ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
2100 						    page, page_zip);
2101 		break;
2102 	case MLOG_ZIP_WRITE_BLOB_PTR:
2103 		ut_ad(!page || fil_page_type_is_index(page_type));
2104 		ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
2105 						    page, page_zip);
2106 		break;
2107 	case MLOG_ZIP_WRITE_HEADER:
2108 		ut_ad(!page || fil_page_type_is_index(page_type));
2109 		ptr = page_zip_parse_write_header(ptr, end_ptr,
2110 						  page, page_zip);
2111 		break;
2112 	case MLOG_ZIP_PAGE_COMPRESS:
2113 		/* Allow anything in page_type when creating a page. */
2114 		ptr = page_zip_parse_compress(ptr, end_ptr,
2115 					      page, page_zip);
2116 		break;
2117 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
2118 		if (NULL != (ptr = mlog_parse_index(
2119 				ptr, end_ptr, TRUE, &index))) {
2120 
2121 			ut_a(!page || ((ibool)!!page_is_comp(page)
2122 				== dict_table_is_comp(index->table)));
2123 			ptr = page_zip_parse_compress_no_data(
2124 				ptr, end_ptr, page, page_zip, index);
2125 		}
2126 		break;
2127 	default:
2128 		ptr = NULL;
2129 		recv_sys->found_corrupt_log = true;
2130 	}
2131 
2132 	if (index) {
2133 		dict_table_t*	table = index->table;
2134 
2135 		dict_mem_index_free(index);
2136 		dict_mem_table_free(table);
2137 	}
2138 
2139 	return(ptr);
2140 }
2141 
2142 /*********************************************************************//**
2143 Calculates the fold value of a page file address: used in inserting or
2144 searching for a log record in the hash table.
2145 @return folded value */
2146 UNIV_INLINE
2147 ulint
recv_fold(ulint space,ulint page_no)2148 recv_fold(
2149 /*======*/
2150 	ulint	space,	/*!< in: space */
2151 	ulint	page_no)/*!< in: page number */
2152 {
2153 	return(ut_fold_ulint_pair(space, page_no));
2154 }
2155 
2156 /*********************************************************************//**
2157 Calculates the hash value of a page file address: used in inserting or
2158 searching for a log record in the hash table.
2159 @return folded value */
2160 UNIV_INLINE
2161 ulint
recv_hash(ulint space,ulint page_no)2162 recv_hash(
2163 /*======*/
2164 	ulint	space,	/*!< in: space */
2165 	ulint	page_no)/*!< in: page number */
2166 {
2167 	return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
2168 }
2169 
2170 /*********************************************************************//**
2171 Gets the hashed file address struct for a page.
2172 @return file address struct, NULL if not found from the hash table */
2173 static
2174 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)2175 recv_get_fil_addr_struct(
2176 /*=====================*/
2177 	ulint	space,	/*!< in: space id */
2178 	ulint	page_no)/*!< in: page number */
2179 {
2180 	recv_addr_t*	recv_addr;
2181 
2182 	for (recv_addr = static_cast<recv_addr_t*>(
2183 			HASH_GET_FIRST(recv_sys->addr_hash,
2184 				       recv_hash(space, page_no)));
2185 	     recv_addr != 0;
2186 	     recv_addr = static_cast<recv_addr_t*>(
2187 		     HASH_GET_NEXT(addr_hash, recv_addr))) {
2188 
2189 		if (recv_addr->space == space
2190 		    && recv_addr->page_no == page_no) {
2191 
2192 			return(recv_addr);
2193 		}
2194 	}
2195 
2196 	return(NULL);
2197 }
2198 
2199 /*******************************************************************//**
2200 Adds a new log record to the hash table of log records. */
2201 static
2202 void
recv_add_to_hash_table(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t start_lsn,lsn_t end_lsn)2203 recv_add_to_hash_table(
2204 /*===================*/
2205 	mlog_id_t	type,		/*!< in: log record type */
2206 	ulint		space,		/*!< in: space id */
2207 	ulint		page_no,	/*!< in: page number */
2208 	byte*		body,		/*!< in: log record body */
2209 	byte*		rec_end,	/*!< in: log record end */
2210 	lsn_t		start_lsn,	/*!< in: start lsn of the mtr */
2211 	lsn_t		end_lsn)	/*!< in: end lsn of the mtr */
2212 {
2213 	recv_t*		recv;
2214 	ulint		len;
2215 	recv_data_t*	recv_data;
2216 	recv_data_t**	prev_field;
2217 	recv_addr_t*	recv_addr;
2218 
2219 	ut_ad(type != MLOG_FILE_DELETE);
2220 	ut_ad(type != MLOG_FILE_CREATE2);
2221 	ut_ad(type != MLOG_FILE_RENAME2);
2222 	ut_ad(type != MLOG_FILE_NAME);
2223 	ut_ad(type != MLOG_DUMMY_RECORD);
2224 	ut_ad(type != MLOG_CHECKPOINT);
2225 	ut_ad(type != MLOG_INDEX_LOAD);
2226 	ut_ad(type != MLOG_TRUNCATE);
2227 
2228 	len = rec_end - body;
2229 
2230 	recv = static_cast<recv_t*>(
2231 		mem_heap_alloc(recv_sys->heap, sizeof(recv_t)));
2232 
2233 	recv->type = type;
2234 	recv->len = rec_end - body;
2235 	recv->start_lsn = start_lsn;
2236 	recv->end_lsn = end_lsn;
2237 
2238 	recv_addr = recv_get_fil_addr_struct(space, page_no);
2239 
2240 	if (recv_addr == NULL) {
2241 		recv_addr = static_cast<recv_addr_t*>(
2242 			mem_heap_alloc(recv_sys->heap, sizeof(recv_addr_t)));
2243 
2244 		recv_addr->space = space;
2245 		recv_addr->page_no = page_no;
2246 		recv_addr->state = RECV_NOT_PROCESSED;
2247 
2248 		UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
2249 
2250 		HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
2251 			    recv_fold(space, page_no), recv_addr);
2252 		recv_sys->n_addrs++;
2253 #if 0
2254 		fprintf(stderr, "Inserting log rec for space %lu, page %lu\n",
2255 			space, page_no);
2256 #endif
2257 	}
2258 
2259 	UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
2260 
2261 	prev_field = &(recv->data);
2262 
2263 	/* Store the log record body in chunks of less than UNIV_PAGE_SIZE:
2264 	recv_sys->heap grows into the buffer pool, and bigger chunks could not
2265 	be allocated */
2266 
2267 	while (rec_end > body) {
2268 
2269 		len = rec_end - body;
2270 
2271 		if (len > RECV_DATA_BLOCK_SIZE) {
2272 			len = RECV_DATA_BLOCK_SIZE;
2273 		}
2274 
2275 		recv_data = static_cast<recv_data_t*>(
2276 			mem_heap_alloc(recv_sys->heap,
2277 				       sizeof(recv_data_t) + len));
2278 
2279 		*prev_field = recv_data;
2280 
2281 		memcpy(recv_data + 1, body, len);
2282 
2283 		prev_field = &(recv_data->next);
2284 
2285 		body += len;
2286 	}
2287 
2288 	*prev_field = NULL;
2289 }
2290 
2291 /*********************************************************************//**
2292 Copies the log record body from recv to buf. */
2293 static
2294 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)2295 recv_data_copy_to_buf(
2296 /*==================*/
2297 	byte*	buf,	/*!< in: buffer of length at least recv->len */
2298 	recv_t*	recv)	/*!< in: log record */
2299 {
2300 	recv_data_t*	recv_data;
2301 	ulint		part_len;
2302 	ulint		len;
2303 
2304 	len = recv->len;
2305 	recv_data = recv->data;
2306 
2307 	while (len > 0) {
2308 		if (len > RECV_DATA_BLOCK_SIZE) {
2309 			part_len = RECV_DATA_BLOCK_SIZE;
2310 		} else {
2311 			part_len = len;
2312 		}
2313 
2314 		ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
2315 			  part_len);
2316 		buf += part_len;
2317 		len -= part_len;
2318 
2319 		recv_data = recv_data->next;
2320 	}
2321 }
2322 
2323 /************************************************************************//**
2324 Applies the hashed log records to the page, if the page lsn is less than the
2325 lsn of a log record. This can be called when a buffer page has just been
2326 read in, or also for a page already in the buffer pool. */
2327 void
recv_recover_page_func(ibool just_read_in,buf_block_t * block)2328 recv_recover_page_func(
2329 /*===================*/
2330 #ifndef UNIV_HOTBACKUP
2331 	ibool		just_read_in,
2332 				/*!< in: TRUE if the i/o handler calls
2333 				this for a freshly read page */
2334 #endif /* !UNIV_HOTBACKUP */
2335 	buf_block_t*	block)	/*!< in/out: buffer block */
2336 {
2337 	page_t*		page;
2338 	page_zip_des_t*	page_zip;
2339 	recv_addr_t*	recv_addr;
2340 	recv_t*		recv;
2341 	byte*		buf;
2342 	lsn_t		start_lsn;
2343 	lsn_t		end_lsn;
2344 	lsn_t		page_lsn;
2345 	lsn_t		page_newest_lsn;
2346 	ibool		modification_to_page;
2347 	mtr_t		mtr;
2348 
2349 	mutex_enter(&(recv_sys->mutex));
2350 
2351 	if (recv_sys->apply_log_recs == FALSE) {
2352 
2353 		/* Log records should not be applied now */
2354 
2355 		mutex_exit(&(recv_sys->mutex));
2356 
2357 		return;
2358 	}
2359 
2360 	recv_addr = recv_get_fil_addr_struct(block->page.id.space(),
2361 					     block->page.id.page_no());
2362 
2363 	if ((recv_addr == NULL)
2364 	    || (recv_addr->state == RECV_BEING_PROCESSED)
2365 	    || (recv_addr->state == RECV_PROCESSED)) {
2366 		ut_ad(recv_addr == NULL || recv_needed_recovery);
2367 
2368 		mutex_exit(&(recv_sys->mutex));
2369 
2370 		return;
2371 	}
2372 
2373 #ifndef UNIV_HOTBACKUP
2374 	ut_ad(recv_needed_recovery);
2375 
2376 	DBUG_PRINT("ib_log",
2377 		   ("Applying log to page %u:%u",
2378 		    recv_addr->space, recv_addr->page_no));
2379 #endif /* !UNIV_HOTBACKUP */
2380 
2381 	recv_addr->state = RECV_BEING_PROCESSED;
2382 
2383 	mutex_exit(&(recv_sys->mutex));
2384 
2385 	mtr_start(&mtr);
2386 	mtr_set_log_mode(&mtr, MTR_LOG_NONE);
2387 
2388 	page = block->frame;
2389 	page_zip = buf_block_get_page_zip(block);
2390 
2391 #ifndef UNIV_HOTBACKUP
2392 	if (just_read_in) {
2393 		/* Move the ownership of the x-latch on the page to
2394 		this OS thread, so that we can acquire a second
2395 		x-latch on it.  This is needed for the operations to
2396 		the page to pass the debug checks. */
2397 
2398 		rw_lock_x_lock_move_ownership(&block->lock);
2399 	}
2400 
2401 	ibool	success = buf_page_get_known_nowait(
2402 		RW_X_LATCH, block, BUF_KEEP_OLD,
2403 		__FILE__, __LINE__, &mtr);
2404 	ut_a(success);
2405 
2406 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2407 #endif /* !UNIV_HOTBACKUP */
2408 
2409 	/* Read the newest modification lsn from the page */
2410 	page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
2411 
2412 #ifndef UNIV_HOTBACKUP
2413 	/* It may be that the page has been modified in the buffer
2414 	pool: read the newest modification lsn there */
2415 
2416 	page_newest_lsn = buf_page_get_newest_modification(&block->page);
2417 
2418 	if (page_newest_lsn) {
2419 
2420 		page_lsn = page_newest_lsn;
2421 	}
2422 #else /* !UNIV_HOTBACKUP */
2423 	/* In recovery from a backup we do not really use the buffer pool */
2424 	page_newest_lsn = 0;
2425 #endif /* !UNIV_HOTBACKUP */
2426 
2427 	modification_to_page = FALSE;
2428 	start_lsn = end_lsn = 0;
2429 
2430 	recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
2431 
2432 	while (recv) {
2433 		end_lsn = recv->end_lsn;
2434 
2435 		ut_ad(end_lsn
2436 		      <= UT_LIST_GET_FIRST(log_sys->log_groups)->scanned_lsn);
2437 
2438 		if (recv->len > RECV_DATA_BLOCK_SIZE) {
2439 			/* We have to copy the record body to a separate
2440 			buffer */
2441 
2442 			buf = static_cast<byte*>(ut_malloc_nokey(recv->len));
2443 
2444 			recv_data_copy_to_buf(buf, recv);
2445 		} else {
2446 			buf = ((byte*)(recv->data)) + sizeof(recv_data_t);
2447 		}
2448 
2449 		if (recv->type == MLOG_INIT_FILE_PAGE) {
2450 			page_lsn = page_newest_lsn;
2451 
2452 			memset(FIL_PAGE_LSN + page, 0, 8);
2453 			memset(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM
2454 			       + page, 0, 8);
2455 
2456 			if (page_zip) {
2457 				memset(FIL_PAGE_LSN + page_zip->data, 0, 8);
2458 			}
2459 		}
2460 
2461 		/* If per-table tablespace was truncated and there exist REDO
2462 		records before truncate that are to be applied as part of
2463 		recovery (checkpoint didn't happen since truncate was done)
2464 		skip such records using lsn check as they may not stand valid
2465 		post truncate.
2466 		LSN at start of truncate is recorded and any redo record
2467 		with LSN less than recorded LSN is skipped.
2468 		Note: We can't skip complete recv_addr as same page may have
2469 		valid REDO records post truncate those needs to be applied. */
2470 		bool	skip_recv = false;
2471 		if (srv_was_tablespace_truncated(fil_space_get(recv_addr->space))) {
2472 			lsn_t	init_lsn =
2473 				truncate_t::get_truncated_tablespace_init_lsn(
2474 				recv_addr->space);
2475 			skip_recv = (recv->start_lsn < init_lsn);
2476 		}
2477 
2478 		/* Ignore applying the redo logs for tablespace that is
2479 		truncated. Post recovery there is fixup action that will
2480 		restore the tablespace back to normal state.
2481 		Applying redo at this stage can result in error given that
2482 		redo will have action recorded on page before tablespace
2483 		was re-inited and that would lead to an error while applying
2484 		such action. */
2485 		if (recv->start_lsn >= page_lsn
2486 		    && !srv_is_tablespace_truncated(recv_addr->space)
2487 		    && !skip_recv) {
2488 
2489 			lsn_t	end_lsn;
2490 
2491 			if (!modification_to_page) {
2492 
2493 				modification_to_page = TRUE;
2494 				start_lsn = recv->start_lsn;
2495 			}
2496 
2497 			DBUG_PRINT("ib_log",
2498 				   ("apply " LSN_PF ":"
2499 				    " %s len " ULINTPF " page %u:%u",
2500 				    recv->start_lsn,
2501 				    get_mlog_string(recv->type), recv->len,
2502 				    recv_addr->space,
2503 				    recv_addr->page_no));
2504 
2505 			recv_parse_or_apply_log_rec_body(
2506 				recv->type, buf, buf + recv->len,
2507 				recv_addr->space, recv_addr->page_no,
2508 				block, &mtr);
2509 
2510 			end_lsn = recv->start_lsn + recv->len;
2511 			mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2512 			mach_write_to_8(UNIV_PAGE_SIZE
2513 					- FIL_PAGE_END_LSN_OLD_CHKSUM
2514 					+ page, end_lsn);
2515 
2516 			if (page_zip) {
2517 				mach_write_to_8(FIL_PAGE_LSN
2518 						+ page_zip->data, end_lsn);
2519 			}
2520 		}
2521 
2522 		if (recv->len > RECV_DATA_BLOCK_SIZE) {
2523 			ut_free(buf);
2524 		}
2525 
2526 		recv = UT_LIST_GET_NEXT(rec_list, recv);
2527 	}
2528 
2529 #ifdef UNIV_ZIP_DEBUG
2530 	if (fil_page_index_page_check(page)) {
2531 		page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
2532 
2533 		ut_a(!page_zip
2534 		     || page_zip_validate_low(page_zip, page, NULL, FALSE));
2535 	}
2536 #endif /* UNIV_ZIP_DEBUG */
2537 
2538 #ifndef UNIV_HOTBACKUP
2539 	if (modification_to_page) {
2540 		ut_a(block);
2541 
2542 		log_flush_order_mutex_enter();
2543 		buf_flush_recv_note_modification(block, start_lsn, end_lsn);
2544 		log_flush_order_mutex_exit();
2545 	}
2546 #else /* !UNIV_HOTBACKUP */
2547 	start_lsn = start_lsn; /* Silence compiler */
2548 #endif /* !UNIV_HOTBACKUP */
2549 
2550 	/* Make sure that committing mtr does not change the modification
2551 	lsn values of page */
2552 
2553 	mtr.discard_modifications();
2554 
2555 	mtr_commit(&mtr);
2556 
2557 	mutex_enter(&(recv_sys->mutex));
2558 
2559 	if (recv_max_page_lsn < page_lsn) {
2560 		recv_max_page_lsn = page_lsn;
2561 	}
2562 
2563 	recv_addr->state = RECV_PROCESSED;
2564 
2565 	ut_a(recv_sys->n_addrs);
2566 	recv_sys->n_addrs--;
2567 
2568 	mutex_exit(&(recv_sys->mutex));
2569 
2570 }
2571 
2572 #ifndef UNIV_HOTBACKUP
2573 /** Reads in pages which have hashed log records, from an area around a given
2574 page number.
2575 @param[in]	page_id	page id
2576 @return number of pages found */
2577 static
2578 ulint
recv_read_in_area(const page_id_t & page_id)2579 recv_read_in_area(
2580 	const page_id_t&	page_id)
2581 {
2582 	recv_addr_t* recv_addr;
2583 	ulint	page_nos[RECV_READ_AHEAD_AREA];
2584 	ulint	low_limit;
2585 	ulint	n;
2586 
2587 	low_limit = page_id.page_no()
2588 		- (page_id.page_no() % RECV_READ_AHEAD_AREA);
2589 
2590 	n = 0;
2591 
2592 	for (ulint page_no = low_limit;
2593 	     page_no < low_limit + RECV_READ_AHEAD_AREA;
2594 	     page_no++) {
2595 
2596 		recv_addr = recv_get_fil_addr_struct(page_id.space(), page_no);
2597 
2598 		const page_id_t	cur_page_id(page_id.space(), page_no);
2599 
2600 		if (recv_addr && !buf_page_peek(cur_page_id)) {
2601 
2602 			mutex_enter(&(recv_sys->mutex));
2603 
2604 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2605 				recv_addr->state = RECV_BEING_READ;
2606 
2607 				page_nos[n] = page_no;
2608 
2609 				n++;
2610 			}
2611 
2612 			mutex_exit(&(recv_sys->mutex));
2613 		}
2614 	}
2615 
2616 	buf_read_recv_pages(FALSE, page_id.space(), page_nos, n);
2617 	/*
2618 	fprintf(stderr, "Recv pages at %lu n %lu\n", page_nos[0], n);
2619 	*/
2620 	return(n);
2621 }
2622 
2623 /*******************************************************************//**
2624 Empties the hash table of stored log records, applying them to appropriate
2625 pages. */
2626 void
recv_apply_hashed_log_recs(ibool allow_ibuf)2627 recv_apply_hashed_log_recs(
2628 /*=======================*/
2629 	ibool	allow_ibuf)	/*!< in: if TRUE, also ibuf operations are
2630 				allowed during the application; if FALSE,
2631 				no ibuf operations are allowed, and after
2632 				the application all file pages are flushed to
2633 				disk and invalidated in buffer pool: this
2634 				alternative means that no new log records
2635 				can be generated during the application;
2636 				the caller must in this case own the log
2637 				mutex */
2638 {
2639 	recv_addr_t* recv_addr;
2640 	ulint	i;
2641 	ibool	has_printed	= FALSE;
2642 	mtr_t	mtr;
2643 loop:
2644 	mutex_enter(&(recv_sys->mutex));
2645 
2646 	if (recv_sys->apply_batch_on) {
2647 
2648 		mutex_exit(&(recv_sys->mutex));
2649 
2650 		os_thread_sleep(500000);
2651 
2652 		goto loop;
2653 	}
2654 
2655 	ut_ad(!allow_ibuf == log_mutex_own());
2656 
2657 	if (!allow_ibuf) {
2658 		recv_no_ibuf_operations = true;
2659 	}
2660 
2661 	recv_sys->apply_log_recs = TRUE;
2662 	recv_sys->apply_batch_on = TRUE;
2663 
2664 	for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
2665 
2666 		for (recv_addr = static_cast<recv_addr_t*>(
2667 				HASH_GET_FIRST(recv_sys->addr_hash, i));
2668 		     recv_addr != 0;
2669 		     recv_addr = static_cast<recv_addr_t*>(
2670 				HASH_GET_NEXT(addr_hash, recv_addr))) {
2671 
2672 			if (srv_is_tablespace_truncated(recv_addr->space)) {
2673 				/* Avoid applying REDO log for the tablespace
2674 				that is schedule for TRUNCATE. */
2675 				ut_a(recv_sys->n_addrs);
2676 				recv_addr->state = RECV_DISCARDED;
2677 				recv_sys->n_addrs--;
2678 				continue;
2679 			}
2680 
2681 			if (recv_addr->state == RECV_DISCARDED) {
2682 				ut_a(recv_sys->n_addrs);
2683 				recv_sys->n_addrs--;
2684 				continue;
2685 			}
2686 
2687 			const page_id_t		page_id(recv_addr->space,
2688 							recv_addr->page_no);
2689 			bool			found;
2690 			const page_size_t&	page_size
2691 				= fil_space_get_page_size(recv_addr->space,
2692 							  &found);
2693 
2694 			ut_ad(found);
2695 
2696 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2697 				if (!has_printed) {
2698 					ib::info() << "Starting an apply batch"
2699 						" of log records"
2700 						" to the database...";
2701 					fputs("InnoDB: Progress in percent: ",
2702 					      stderr);
2703 					has_printed = TRUE;
2704 				}
2705 
2706 				mutex_exit(&(recv_sys->mutex));
2707 
2708 				if (buf_page_peek(page_id)) {
2709 					buf_block_t*	block;
2710 
2711 					mtr_start(&mtr);
2712 
2713 					block = buf_page_get(
2714 						page_id, page_size,
2715 						RW_X_LATCH, &mtr);
2716 
2717 					buf_block_dbg_add_level(
2718 						block, SYNC_NO_ORDER_CHECK);
2719 
2720 					recv_recover_page(FALSE, block);
2721 					mtr_commit(&mtr);
2722 				} else {
2723 					recv_read_in_area(page_id);
2724 				}
2725 
2726 				mutex_enter(&(recv_sys->mutex));
2727 			}
2728 		}
2729 
2730 		if (has_printed
2731 		    && (i * 100) / hash_get_n_cells(recv_sys->addr_hash)
2732 		    != ((i + 1) * 100)
2733 		    / hash_get_n_cells(recv_sys->addr_hash)) {
2734 
2735 			fprintf(stderr, "%lu ", (ulong)
2736 				((i * 100)
2737 				 / hash_get_n_cells(recv_sys->addr_hash)));
2738 		}
2739 	}
2740 
2741 	/* Wait until all the pages have been processed */
2742 
2743 	while (recv_sys->n_addrs != 0) {
2744 
2745 		mutex_exit(&(recv_sys->mutex));
2746 
2747 		os_thread_sleep(500000);
2748 
2749 		mutex_enter(&(recv_sys->mutex));
2750 	}
2751 
2752 	if (has_printed) {
2753 
2754 		fprintf(stderr, "\n");
2755 	}
2756 
2757 	if (!allow_ibuf) {
2758 
2759 		/* Flush all the file pages to disk and invalidate them in
2760 		the buffer pool */
2761 
2762 		ut_d(recv_no_log_write = true);
2763 		mutex_exit(&(recv_sys->mutex));
2764 		log_mutex_exit();
2765 
2766 		/* Stop the recv_writer thread from issuing any LRU
2767 		flush batches. */
2768 		mutex_enter(&recv_sys->writer_mutex);
2769 
2770 		/* Wait for any currently run batch to end. */
2771 		buf_flush_wait_LRU_batch_end();
2772 
2773 		os_event_reset(recv_sys->flush_end);
2774 		recv_sys->flush_type = BUF_FLUSH_LIST;
2775 		os_event_set(recv_sys->flush_start);
2776 		os_event_wait(recv_sys->flush_end);
2777 
2778 		buf_pool_invalidate();
2779 
2780 		/* Allow batches from recv_writer thread. */
2781 		mutex_exit(&recv_sys->writer_mutex);
2782 
2783 		log_mutex_enter();
2784 		mutex_enter(&(recv_sys->mutex));
2785 		ut_d(recv_no_log_write = false);
2786 
2787 		recv_no_ibuf_operations = false;
2788 	}
2789 
2790 	recv_sys->apply_log_recs = FALSE;
2791 	recv_sys->apply_batch_on = FALSE;
2792 
2793 	recv_sys_empty_hash();
2794 
2795 	if (has_printed) {
2796 		ib::info() << "Apply batch completed";
2797 	}
2798 
2799 	mutex_exit(&(recv_sys->mutex));
2800 }
2801 #else /* !UNIV_HOTBACKUP */
2802 /*******************************************************************//**
2803 Applies log records in the hash table to a backup. */
2804 void
recv_apply_log_recs_for_backup(void)2805 recv_apply_log_recs_for_backup(void)
2806 /*================================*/
2807 {
2808 	recv_addr_t*	recv_addr;
2809 	ulint		n_hash_cells;
2810 	buf_block_t*	block;
2811 	bool		success;
2812 	ulint		error;
2813 	ulint		i;
2814 	fil_space_t*	space = NULL;
2815 	page_id_t	page_id;
2816 	recv_sys->apply_log_recs = TRUE;
2817 	recv_sys->apply_batch_on = TRUE;
2818 
2819 	block = back_block1;
2820 
2821 	ib::info() << "Starting an apply batch of log records to the"
2822 		" database...\n";
2823 
2824 	fputs("InnoDB: Progress in percent: ", stderr);
2825 
2826 	n_hash_cells = hash_get_n_cells(recv_sys->addr_hash);
2827 
2828 	for (i = 0; i < n_hash_cells; i++) {
2829 		/* The address hash table is externally chained */
2830 		recv_addr = static_cast<recv_addr_t*>(hash_get_nth_cell(
2831 					recv_sys->addr_hash, i)->node);
2832 
2833 		while (recv_addr != NULL) {
2834 
2835 			ib::trace() << "recv_addr {State: " << recv_addr->state
2836 				<< ", Space id: " << recv_addr->space
2837 				<< "Page no: " << recv_addr->page_no
2838 				<< ". index i: " << i << "\n";
2839 
2840 			bool			found;
2841 			const page_size_t&	page_size
2842 				= fil_space_get_page_size(recv_addr->space,
2843 							  &found);
2844 
2845 			if (!found) {
2846 #if 0
2847 				fprintf(stderr,
2848 					"InnoDB: Warning: cannot apply"
2849 					" log record to"
2850 					" tablespace %lu page %lu,\n"
2851 					"InnoDB: because tablespace with"
2852 					" that id does not exist.\n",
2853 					recv_addr->space, recv_addr->page_no);
2854 #endif
2855 				recv_addr->state = RECV_DISCARDED;
2856 
2857 				ut_a(recv_sys->n_addrs);
2858 				recv_sys->n_addrs--;
2859 
2860 				goto skip_this_recv_addr;
2861 			}
2862 
2863 			/* We simulate a page read made by the buffer pool, to
2864 			make sure the recovery apparatus works ok. We must init
2865 			the block. */
2866 
2867 			buf_page_init_for_backup_restore(
2868 				page_id_t(recv_addr->space, recv_addr->page_no),
2869 				page_size, block);
2870 
2871 			/* Extend the tablespace's last file if the page_no
2872 			does not fall inside its bounds; we assume the last
2873 			file is auto-extending, and mysqlbackup copied the file
2874 			when it still was smaller */
2875 			fil_space_t*	space
2876 				= fil_space_get(recv_addr->space);
2877 
2878 			success = fil_space_extend(
2879 				space, recv_addr->page_no + 1);
2880 			if (!success) {
2881 				ib::fatal() << "Cannot extend tablespace "
2882 					<< recv_addr->space << " to hold "
2883 					<< recv_addr->page_no << " pages";
2884 			}
2885 
2886 			/* Read the page from the tablespace file using the
2887 			fil0fil.cc routines */
2888 
2889 			const page_id_t	page_id(recv_addr->space,
2890 						recv_addr->page_no);
2891 
2892 			if (page_size.is_compressed()) {
2893 
2894 				error = fil_io(
2895 					IORequestRead, true,
2896 					page_id,
2897 					page_size, 0, page_size.physical(),
2898 					block->page.zip.data, NULL);
2899 
2900 				if (error == DB_SUCCESS
2901 				    && !buf_zip_decompress(block, TRUE)) {
2902 					ut_error;
2903 				}
2904 			} else {
2905 
2906 				error = fil_io(
2907 					IORequestRead, true,
2908 					page_id, page_size, 0,
2909 					page_size.logical(),
2910 					block->frame, NULL);
2911 			}
2912 
2913 			if (error != DB_SUCCESS) {
2914 				ib::fatal() << "Cannot read from tablespace "
2915 					<< recv_addr->space << " page number "
2916 					<< recv_addr->page_no;
2917 			}
2918 
2919 			/* Apply the log records to this page */
2920 			recv_recover_page(FALSE, block);
2921 
2922 			/* Write the page back to the tablespace file using the
2923 			fil0fil.cc routines */
2924 
2925 			buf_flush_init_for_writing(
2926 				block, block->frame,
2927 				buf_block_get_page_zip(block),
2928 				mach_read_from_8(block->frame + FIL_PAGE_LSN),
2929 				fsp_is_checksum_disabled(
2930 					block->page.id.space()));
2931 
2932 			if (page_size.is_compressed()) {
2933 
2934 				error = fil_io(
2935 					IORequestWrite, true, page_id,
2936 					page_size, 0, page_size.physical(),
2937 					block->page.zip.data, NULL);
2938 			} else {
2939 				error = fil_io(
2940 					IORequestWrite, true, page_id,
2941 					page_size, 0, page_size.logical(),
2942 					block->frame, NULL);
2943 			}
2944 skip_this_recv_addr:
2945 			recv_addr = static_cast<recv_addr_t*>(HASH_GET_NEXT(
2946 					addr_hash, recv_addr));
2947 		}
2948 
2949 		if ((100 * i) / n_hash_cells
2950 		    != (100 * (i + 1)) / n_hash_cells) {
2951 			fprintf(stderr, "%lu ",
2952 				(ulong) ((100 * i) / n_hash_cells));
2953 			fflush(stderr);
2954 		}
2955 	}
2956 	/* write logs in next line */
2957 	fprintf(stderr, "\n");
2958 	recv_sys->apply_log_recs = FALSE;
2959 	recv_sys->apply_batch_on = FALSE;
2960 	recv_sys_empty_hash();
2961 }
2962 #endif /* !UNIV_HOTBACKUP */
2963 
2964 /** Tries to parse a single log record.
2965 @param[out]	type		log record type
2966 @param[in]	ptr		pointer to a buffer
2967 @param[in]	end_ptr		end of the buffer
2968 @param[out]	space_id	tablespace identifier
2969 @param[out]	page_no		page number
2970 @param[in]	apply		whether to apply MLOG_FILE_* records
2971 @param[out]	body		start of log record body
2972 @return length of the record, or 0 if the record was not complete */
2973 static
2974 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2975 recv_parse_log_rec(
2976 	mlog_id_t*	type,
2977 	byte*		ptr,
2978 	byte*		end_ptr,
2979 	ulint*		space,
2980 	ulint*		page_no,
2981 	bool		apply,
2982 	byte**		body)
2983 {
2984 	byte*	new_ptr;
2985 
2986 	*body = NULL;
2987 
2988 	UNIV_MEM_INVALID(type, sizeof *type);
2989 	UNIV_MEM_INVALID(space, sizeof *space);
2990 	UNIV_MEM_INVALID(page_no, sizeof *page_no);
2991 	UNIV_MEM_INVALID(body, sizeof *body);
2992 
2993 	if (ptr == end_ptr) {
2994 
2995 		return(0);
2996 	}
2997 
2998 	switch (*ptr) {
2999 #ifdef UNIV_LOG_LSN_DEBUG
3000 	case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
3001 	case MLOG_LSN:
3002 		new_ptr = mlog_parse_initial_log_record(
3003 			ptr, end_ptr, type, space, page_no);
3004 		if (new_ptr != NULL) {
3005 			const lsn_t	lsn = static_cast<lsn_t>(
3006 				*space) << 32 | *page_no;
3007 			ut_a(lsn == recv_sys->recovered_lsn);
3008 		}
3009 
3010 		*type = MLOG_LSN;
3011 		return(new_ptr - ptr);
3012 #endif /* UNIV_LOG_LSN_DEBUG */
3013 	case MLOG_MULTI_REC_END:
3014 	case MLOG_DUMMY_RECORD:
3015 		*type = static_cast<mlog_id_t>(*ptr);
3016 		return(1);
3017 	case MLOG_CHECKPOINT:
3018 		if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
3019 			return(0);
3020 		}
3021 		*type = static_cast<mlog_id_t>(*ptr);
3022 		return(SIZE_OF_MLOG_CHECKPOINT);
3023 	case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
3024 	case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
3025 	case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
3026 		recv_sys->found_corrupt_log = true;
3027 		return(0);
3028 	}
3029 
3030 	new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
3031 						page_no);
3032 	*body = new_ptr;
3033 
3034 	if (UNIV_UNLIKELY(!new_ptr)) {
3035 
3036 		return(0);
3037 	}
3038 
3039 	new_ptr = recv_parse_or_apply_log_rec_body(
3040 		*type, new_ptr, end_ptr, *space, *page_no, NULL, NULL);
3041 
3042 	if (UNIV_UNLIKELY(new_ptr == NULL)) {
3043 
3044 		return(0);
3045 	}
3046 
3047 	return(new_ptr - ptr);
3048 }
3049 
3050 /*******************************************************//**
3051 Calculates the new value for lsn when more data is added to the log. */
3052 static
3053 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)3054 recv_calc_lsn_on_data_add(
3055 /*======================*/
3056 	lsn_t		lsn,	/*!< in: old lsn */
3057 	ib_uint64_t	len)	/*!< in: this many bytes of data is
3058 				added, log block headers not included */
3059 {
3060 	ulint		frag_len;
3061 	ib_uint64_t	lsn_len;
3062 
3063 	frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
3064 	ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
3065 	      - LOG_BLOCK_TRL_SIZE);
3066 	lsn_len = len;
3067 	lsn_len += (lsn_len + frag_len)
3068 		/ (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
3069 		   - LOG_BLOCK_TRL_SIZE)
3070 		* (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
3071 
3072 	return(lsn + lsn_len);
3073 }
3074 
3075 /** Prints diagnostic info of corrupt log.
3076 @param[in]	ptr	pointer to corrupt log record
3077 @param[in]	type	type of the log record (could be garbage)
3078 @param[in]	space	tablespace ID (could be garbage)
3079 @param[in]	page_no	page number (could be garbage)
3080 @return whether processing should continue */
3081 static
3082 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)3083 recv_report_corrupt_log(
3084 	const byte*	ptr,
3085 	int		type,
3086 	ulint		space,
3087 	ulint		page_no)
3088 {
3089 	ib::error() <<
3090 		"############### CORRUPT LOG RECORD FOUND ##################";
3091 
3092 	ib::info() << "Log record type " << type << ", page " << space << ":"
3093 		<< page_no << ". Log parsing proceeded successfully up to "
3094 		<< recv_sys->recovered_lsn << ". Previous log record type "
3095 		<< recv_previous_parsed_rec_type << ", is multi "
3096 		<< recv_previous_parsed_rec_is_multi << " Recv offset "
3097 		<< (ptr - recv_sys->buf) << ", prev "
3098 		<< recv_previous_parsed_rec_offset;
3099 
3100 	ut_ad(ptr <= recv_sys->buf + recv_sys->len);
3101 
3102 	const ulint	limit	= 100;
3103 	const ulint	before
3104 		= std::min(recv_previous_parsed_rec_offset, limit);
3105 	const ulint	after
3106 		= std::min(recv_sys->len - (ptr - recv_sys->buf), limit);
3107 
3108 	ib::info() << "Hex dump starting " << before << " bytes before and"
3109 		" ending " << after << " bytes after the corrupted record:";
3110 
3111 	ut_print_buf(stderr,
3112 		     recv_sys->buf
3113 		     + recv_previous_parsed_rec_offset - before,
3114 		     ptr - recv_sys->buf + before + after
3115 		     - recv_previous_parsed_rec_offset);
3116 	putc('\n', stderr);
3117 
3118 #ifndef UNIV_HOTBACKUP
3119 	if (!srv_force_recovery) {
3120 		ib::info() << "Set innodb_force_recovery to ignore this error.";
3121 		return(false);
3122 	}
3123 #endif /* !UNIV_HOTBACKUP */
3124 
3125 	ib::warn() << "The log file may have been corrupt and it is possible"
3126 		" that the log scan did not proceed far enough in recovery!"
3127 		" Please run CHECK TABLE on your InnoDB tables to check"
3128 		" that they are ok! If mysqld crashes after this recovery; "
3129 		<< FORCE_RECOVERY_MSG;
3130 	return(true);
3131 }
3132 
3133 /** Whether to store redo log records to the hash table */
3134 enum store_t {
3135 	/** Do not store redo log records. */
3136 	STORE_NO,
3137 	/** Store redo log records. */
3138 	STORE_YES,
3139 	/** Store redo log records if the tablespace exists. */
3140 	STORE_IF_EXISTS
3141 };
3142 
3143 /** Parse log records from a buffer and optionally store them to a
3144 hash table to wait merging to file pages.
3145 @param[in]	checkpoint_lsn	the LSN of the latest checkpoint
3146 @param[in]	store		whether to store page operations
3147 @return whether MLOG_CHECKPOINT record was seen the first time,
3148 or corruption was noticed */
3149 static MY_ATTRIBUTE((warn_unused_result))
3150 bool
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t store)3151 recv_parse_log_recs(
3152 	lsn_t		checkpoint_lsn,
3153 	store_t		store)
3154 {
3155 	byte*		ptr;
3156 	byte*		end_ptr;
3157 	bool		single_rec;
3158 	ulint		len;
3159 	lsn_t		new_recovered_lsn;
3160 	lsn_t		old_lsn;
3161 	mlog_id_t	type;
3162 	ulint		space;
3163 	ulint		page_no;
3164 	byte*		body;
3165 
3166 	ut_ad(log_mutex_own());
3167 	ut_ad(recv_sys->parse_start_lsn != 0);
3168 loop:
3169 	ptr = recv_sys->buf + recv_sys->recovered_offset;
3170 
3171 	end_ptr = recv_sys->buf + recv_sys->len;
3172 
3173 	if (ptr == end_ptr) {
3174 
3175 		return(false);
3176 	}
3177 
3178 	switch (*ptr) {
3179 	case MLOG_CHECKPOINT:
3180 #ifdef UNIV_LOG_LSN_DEBUG
3181 	case MLOG_LSN:
3182 #endif /* UNIV_LOG_LSN_DEBUG */
3183 	case MLOG_DUMMY_RECORD:
3184 		single_rec = true;
3185 		break;
3186 	default:
3187 		single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
3188 	}
3189 
3190 	if (single_rec) {
3191 		/* The mtr did not modify multiple pages */
3192 
3193 		old_lsn = recv_sys->recovered_lsn;
3194 
3195 		/* Try to parse a log record, fetching its type, space id,
3196 		page no, and a pointer to the body of the log record */
3197 
3198 		len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
3199 					 &page_no, true, &body);
3200 
3201 		if (len == 0) {
3202 			return(false);
3203 		}
3204 
3205 		if (recv_sys->found_corrupt_log) {
3206 			recv_report_corrupt_log(
3207 				ptr, type, space, page_no);
3208 			return(true);
3209 		}
3210 
3211 		if (recv_sys->found_corrupt_fs) {
3212 			return(true);
3213 		}
3214 
3215 		new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
3216 
3217 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
3218 			/* The log record filled a log block, and we require
3219 			that also the next log block should have been scanned
3220 			in */
3221 
3222 			return(false);
3223 		}
3224 
3225 		recv_previous_parsed_rec_type = type;
3226 		recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
3227 		recv_previous_parsed_rec_is_multi = 0;
3228 
3229 		recv_sys->recovered_offset += len;
3230 		recv_sys->recovered_lsn = new_recovered_lsn;
3231 
3232 		switch (type) {
3233 			lsn_t	lsn;
3234 		case MLOG_DUMMY_RECORD:
3235 			/* Do nothing */
3236 			break;
3237 		case MLOG_CHECKPOINT:
3238 #if SIZE_OF_MLOG_CHECKPOINT != 1 + 8
3239 # error SIZE_OF_MLOG_CHECKPOINT != 1 + 8
3240 #endif
3241 			lsn = mach_read_from_8(ptr + 1);
3242 
3243 			DBUG_PRINT("ib_log",
3244 				   ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
3245 				    LSN_PF,
3246 				    lsn,
3247 				    lsn != checkpoint_lsn ? "ignored"
3248 				    : recv_sys->mlog_checkpoint_lsn
3249 				    ? "reread" : "read",
3250 				    recv_sys->recovered_lsn));
3251 
3252 			if (lsn == checkpoint_lsn) {
3253 				if (recv_sys->mlog_checkpoint_lsn) {
3254 					/* At recv_reset_logs() we may
3255 					write a duplicate MLOG_CHECKPOINT
3256 					for the same checkpoint LSN. Thus
3257 					recv_sys->mlog_checkpoint_lsn
3258 					can differ from the current LSN. */
3259 					ut_ad(recv_sys->mlog_checkpoint_lsn
3260 					      <= recv_sys->recovered_lsn);
3261 					break;
3262 				}
3263 				recv_sys->mlog_checkpoint_lsn
3264 					= recv_sys->recovered_lsn;
3265 			}
3266 			break;
3267 		case MLOG_FILE_NAME:
3268 		case MLOG_FILE_DELETE:
3269 		case MLOG_FILE_CREATE2:
3270 		case MLOG_FILE_RENAME2:
3271 		case MLOG_TRUNCATE:
3272 			/* These were already handled by
3273 			recv_parse_log_rec() and
3274 			recv_parse_or_apply_log_rec_body(). */
3275 			break;
3276 #ifdef UNIV_LOG_LSN_DEBUG
3277 		case MLOG_LSN:
3278 			/* Do not add these records to the hash table.
3279 			The page number and space id fields are misused
3280 			for something else. */
3281 			break;
3282 #endif /* UNIV_LOG_LSN_DEBUG */
3283 		default:
3284 			switch (store) {
3285 			case STORE_NO:
3286 				break;
3287 			case STORE_IF_EXISTS:
3288 				if (fil_space_get_flags(space)
3289 				    == ULINT_UNDEFINED) {
3290 					break;
3291 				}
3292 				/* fall through */
3293 			case STORE_YES:
3294 				recv_add_to_hash_table(
3295 					type, space, page_no, body,
3296 					ptr + len, old_lsn,
3297 					recv_sys->recovered_lsn);
3298 			}
3299 			/* fall through */
3300 		case MLOG_INDEX_LOAD:
3301 			DBUG_PRINT("ib_log",
3302 				("scan " LSN_PF ": log rec %s"
3303 				" len " ULINTPF
3304 				" page " ULINTPF ":" ULINTPF,
3305 				old_lsn, get_mlog_string(type),
3306 				len, space, page_no));
3307 		}
3308 	} else {
3309 		/* Check that all the records associated with the single mtr
3310 		are included within the buffer */
3311 
3312 		ulint	total_len	= 0;
3313 		ulint	n_recs		= 0;
3314 		bool	only_mlog_file	= true;
3315 		ulint	mlog_rec_len	= 0;
3316 
3317 		for (;;) {
3318 			len = recv_parse_log_rec(
3319 				&type, ptr, end_ptr, &space, &page_no,
3320 				false, &body);
3321 
3322 			if (len == 0) {
3323 				return(false);
3324 			}
3325 
3326 			if (recv_sys->found_corrupt_log
3327 			    || type == MLOG_CHECKPOINT
3328 			    || (*ptr & MLOG_SINGLE_REC_FLAG)) {
3329 				recv_sys->found_corrupt_log = true;
3330 				recv_report_corrupt_log(
3331 					ptr, type, space, page_no);
3332 				return(true);
3333 			}
3334 
3335 			if (recv_sys->found_corrupt_fs) {
3336 				return(true);
3337 			}
3338 
3339 			recv_previous_parsed_rec_type = type;
3340 			recv_previous_parsed_rec_offset
3341 				= recv_sys->recovered_offset + total_len;
3342 			recv_previous_parsed_rec_is_multi = 1;
3343 
3344 			/* MLOG_FILE_NAME redo log records doesn't make changes
3345 			to persistent data. If only MLOG_FILE_NAME redo
3346 			log record exists then reset the parsing buffer pointer
3347 			by changing recovered_lsn and recovered_offset. */
3348 			if (type != MLOG_FILE_NAME && only_mlog_file == true) {
3349 				only_mlog_file = false;
3350 			}
3351 
3352 			if (only_mlog_file) {
3353 				new_recovered_lsn = recv_calc_lsn_on_data_add(
3354 					recv_sys->recovered_lsn, len);
3355 				mlog_rec_len += len;
3356 				recv_sys->recovered_offset += len;
3357 				recv_sys->recovered_lsn = new_recovered_lsn;
3358 			}
3359 
3360 			total_len += len;
3361 			n_recs++;
3362 
3363 			ptr += len;
3364 
3365 			if (type == MLOG_MULTI_REC_END) {
3366 				DBUG_PRINT("ib_log",
3367 					   ("scan " LSN_PF
3368 					    ": multi-log end"
3369 					    " total_len " ULINTPF
3370 					    " n=" ULINTPF,
3371 					    recv_sys->recovered_lsn,
3372 					    total_len, n_recs));
3373 				total_len -= mlog_rec_len;
3374 				break;
3375 			}
3376 
3377 			DBUG_PRINT("ib_log",
3378 				   ("scan " LSN_PF ": multi-log rec %s"
3379 				    " len " ULINTPF
3380 				    " page " ULINTPF ":" ULINTPF,
3381 				    recv_sys->recovered_lsn,
3382 				    get_mlog_string(type), len, space, page_no));
3383 		}
3384 
3385 		new_recovered_lsn = recv_calc_lsn_on_data_add(
3386 			recv_sys->recovered_lsn, total_len);
3387 
3388 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
3389 			/* The log record filled a log block, and we require
3390 			that also the next log block should have been scanned
3391 			in */
3392 
3393 			return(false);
3394 		}
3395 
3396 		/* Add all the records to the hash table */
3397 
3398 		ptr = recv_sys->buf + recv_sys->recovered_offset;
3399 
3400 		for (;;) {
3401 			old_lsn = recv_sys->recovered_lsn;
3402 			/* This will apply MLOG_FILE_ records. We
3403 			had to skip them in the first scan, because we
3404 			did not know if the mini-transaction was
3405 			completely recovered (until MLOG_MULTI_REC_END). */
3406 			len = recv_parse_log_rec(
3407 				&type, ptr, end_ptr, &space, &page_no,
3408 				true, &body);
3409 
3410 			if (recv_sys->found_corrupt_log
3411 			    && !recv_report_corrupt_log(
3412 				    ptr, type, space, page_no)) {
3413 				return(true);
3414 			}
3415 
3416 			if (recv_sys->found_corrupt_fs) {
3417 				return(true);
3418 			}
3419 
3420 			ut_a(len != 0);
3421 			ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
3422 
3423 			recv_sys->recovered_offset += len;
3424 			recv_sys->recovered_lsn
3425 				= recv_calc_lsn_on_data_add(old_lsn, len);
3426 
3427 			switch (type) {
3428 			case MLOG_MULTI_REC_END:
3429 				/* Found the end mark for the records */
3430 				goto loop;
3431 #ifdef UNIV_LOG_LSN_DEBUG
3432 			case MLOG_LSN:
3433 				/* Do not add these records to the hash table.
3434 				The page number and space id fields are misused
3435 				for something else. */
3436 				break;
3437 #endif /* UNIV_LOG_LSN_DEBUG */
3438 			case MLOG_FILE_NAME:
3439 			case MLOG_FILE_DELETE:
3440 			case MLOG_FILE_CREATE2:
3441 			case MLOG_FILE_RENAME2:
3442 			case MLOG_INDEX_LOAD:
3443 			case MLOG_TRUNCATE:
3444 				/* These were already handled by
3445 				recv_parse_log_rec() and
3446 				recv_parse_or_apply_log_rec_body(). */
3447 				break;
3448 			default:
3449 				switch (store) {
3450 				case STORE_NO:
3451 					break;
3452 				case STORE_IF_EXISTS:
3453 					if (fil_space_get_flags(space)
3454 					    == ULINT_UNDEFINED) {
3455 						break;
3456 					}
3457 					/* fall through */
3458 				case STORE_YES:
3459 					recv_add_to_hash_table(
3460 						type, space, page_no,
3461 						body, ptr + len,
3462 						old_lsn,
3463 						new_recovered_lsn);
3464 				}
3465 			}
3466 
3467 			ptr += len;
3468 		}
3469 	}
3470 
3471 	goto loop;
3472 }
3473 
3474 /*******************************************************//**
3475 Adds data from a new log block to the parsing buffer of recv_sys if
3476 recv_sys->parse_start_lsn is non-zero.
3477 @return true if more data added */
3478 static
3479 bool
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3480 recv_sys_add_to_parsing_buf(
3481 /*========================*/
3482 	const byte*	log_block,	/*!< in: log block */
3483 	lsn_t		scanned_lsn)	/*!< in: lsn of how far we were able
3484 					to find data in this log block */
3485 {
3486 	ulint	more_len;
3487 	ulint	data_len;
3488 	ulint	start_offset;
3489 	ulint	end_offset;
3490 
3491 	ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
3492 
3493 	if (!recv_sys->parse_start_lsn) {
3494 		/* Cannot start parsing yet because no start point for
3495 		it found */
3496 
3497 		return(false);
3498 	}
3499 
3500 	data_len = log_block_get_data_len(log_block);
3501 
3502 	if (recv_sys->parse_start_lsn >= scanned_lsn) {
3503 
3504 		return(false);
3505 
3506 	} else if (recv_sys->scanned_lsn >= scanned_lsn) {
3507 
3508 		return(false);
3509 
3510 	} else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
3511 		more_len = (ulint) (scanned_lsn - recv_sys->parse_start_lsn);
3512 	} else {
3513 		more_len = (ulint) (scanned_lsn - recv_sys->scanned_lsn);
3514 	}
3515 
3516 	if (more_len == 0) {
3517 
3518 		return(false);
3519 	}
3520 
3521 	ut_ad(data_len >= more_len);
3522 
3523 	start_offset = data_len - more_len;
3524 
3525 	if (start_offset < LOG_BLOCK_HDR_SIZE) {
3526 		start_offset = LOG_BLOCK_HDR_SIZE;
3527 	}
3528 
3529 	end_offset = data_len;
3530 
3531 	if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
3532 		end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
3533 	}
3534 
3535 	ut_ad(start_offset <= end_offset);
3536 
3537 	if (start_offset < end_offset) {
3538 		ut_memcpy(recv_sys->buf + recv_sys->len,
3539 			  log_block + start_offset, end_offset - start_offset);
3540 
3541 		recv_sys->len += end_offset - start_offset;
3542 
3543 		ut_a(recv_sys->len <= RECV_PARSING_BUF_SIZE);
3544 	}
3545 
3546 	return(true);
3547 }
3548 
3549 /*******************************************************//**
3550 Moves the parsing buffer data left to the buffer start. */
3551 static
3552 void
recv_sys_justify_left_parsing_buf(void)3553 recv_sys_justify_left_parsing_buf(void)
3554 /*===================================*/
3555 {
3556 	ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset,
3557 		   recv_sys->len - recv_sys->recovered_offset);
3558 
3559 	recv_sys->len -= recv_sys->recovered_offset;
3560 
3561 	recv_sys->recovered_offset = 0;
3562 }
3563 
3564 /*******************************************************//**
3565 Scans log from a buffer and stores new log data to the parsing buffer.
3566 Parses and hashes the log records if new data found.  Unless
3567 UNIV_HOTBACKUP is defined, this function will apply log records
3568 automatically when the hash table becomes full.
3569 @return true if not able to scan any more in this log group */
3570 static
3571 bool
recv_scan_log_recs(ulint available_memory,store_t * store_to_hash,const byte * buf,ulint len,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3572 recv_scan_log_recs(
3573 /*===============*/
3574 	ulint		available_memory,/*!< in: we let the hash table of recs
3575 					to grow to this size, at the maximum */
3576 	store_t*	store_to_hash,	/*!< in,out: whether the records should be
3577 					stored to the hash table; this is reset
3578 					if just debug checking is needed, or
3579 					when the available_memory runs out */
3580 	const byte*	buf,		/*!< in: buffer containing a log
3581 					segment or garbage */
3582 	ulint		len,		/*!< in: buffer length */
3583 	lsn_t		checkpoint_lsn,	/*!< in: latest checkpoint LSN */
3584 	lsn_t		start_lsn,	/*!< in: buffer start lsn */
3585 	lsn_t*		contiguous_lsn,	/*!< in/out: it is known that all log
3586 					groups contain contiguous log data up
3587 					to this lsn */
3588 	lsn_t*		group_scanned_lsn)/*!< out: scanning succeeded up to
3589 					this lsn */
3590 {
3591 	const byte*	log_block	= buf;
3592 	ulint		no;
3593 	lsn_t		scanned_lsn	= start_lsn;
3594 	bool		finished	= false;
3595 	ulint		data_len;
3596 	bool		more_data	= false;
3597 	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3598 
3599 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3600 	ut_ad(len % OS_FILE_LOG_BLOCK_SIZE == 0);
3601 	ut_ad(len >= OS_FILE_LOG_BLOCK_SIZE);
3602 
3603 	do {
3604 		ut_ad(!finished);
3605 		no = log_block_get_hdr_no(log_block);
3606 		ulint expected_no = log_block_convert_lsn_to_no(scanned_lsn);
3607 		if (no != expected_no) {
3608 			/* Garbage or an incompletely written log block.
3609 
3610 			We will not report any error, because this can
3611 			happen when InnoDB was killed while it was
3612 			writing redo log. We simply treat this as an
3613 			abrupt end of the redo log. */
3614 			finished = true;
3615 			break;
3616 		}
3617 
3618 		if (!log_block_checksum_is_ok(log_block)) {
3619 			ib::error() << "Log block " << no <<
3620 				" at lsn " << scanned_lsn << " has valid"
3621 				" header, but checksum field contains "
3622 				<< log_block_get_checksum(log_block)
3623 				<< ", should be "
3624 				<< log_block_calc_checksum(log_block);
3625 			/* Garbage or an incompletely written log block.
3626 
3627 			This could be the result of killing the server
3628 			while it was writing this log block. We treat
3629 			this as an abrupt end of the redo log. */
3630 			finished = true;
3631 			break;
3632 		}
3633 
3634 		if (log_block_get_flush_bit(log_block)) {
3635 			/* This block was a start of a log flush operation:
3636 			we know that the previous flush operation must have
3637 			been completed for all log groups before this block
3638 			can have been flushed to any of the groups. Therefore,
3639 			we know that log data is contiguous up to scanned_lsn
3640 			in all non-corrupt log groups. */
3641 
3642 			if (scanned_lsn > *contiguous_lsn) {
3643 				*contiguous_lsn = scanned_lsn;
3644 			}
3645 		}
3646 
3647 		data_len = log_block_get_data_len(log_block);
3648 
3649 		if (scanned_lsn + data_len > recv_sys->scanned_lsn
3650 		    && log_block_get_checkpoint_no(log_block)
3651 		    < recv_sys->scanned_checkpoint_no
3652 		    && (recv_sys->scanned_checkpoint_no
3653 			- log_block_get_checkpoint_no(log_block)
3654 			> 0x80000000UL)) {
3655 
3656 			/* Garbage from a log buffer flush which was made
3657 			before the most recent database recovery */
3658 			finished = true;
3659 			break;
3660 		}
3661 
3662 		if (!recv_sys->parse_start_lsn
3663 		    && (log_block_get_first_rec_group(log_block) > 0)) {
3664 
3665 			/* We found a point from which to start the parsing
3666 			of log records */
3667 
3668 			recv_sys->parse_start_lsn = scanned_lsn
3669 				+ log_block_get_first_rec_group(log_block);
3670 			recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
3671 			recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
3672 		}
3673 
3674 		scanned_lsn += data_len;
3675 
3676 		if (scanned_lsn > recv_sys->scanned_lsn) {
3677 
3678 			/* We have found more entries. If this scan is
3679 			of startup type, we must initiate crash recovery
3680 			environment before parsing these log records. */
3681 
3682 #ifndef UNIV_HOTBACKUP
3683 			if (!recv_needed_recovery) {
3684 
3685 				if (!srv_read_only_mode) {
3686 					ib::info() << "Log scan progressed"
3687 						" past the checkpoint lsn "
3688 						<< recv_sys->scanned_lsn;
3689 
3690 					recv_init_crash_recovery();
3691 				} else {
3692 
3693 					ib::warn() << "Recovery skipped,"
3694 						" --innodb-read-only set!";
3695 
3696 					return(true);
3697 				}
3698 			}
3699 #endif /* !UNIV_HOTBACKUP */
3700 
3701 			/* We were able to find more log data: add it to the
3702 			parsing buffer if parse_start_lsn is already
3703 			non-zero */
3704 
3705 			DBUG_EXECUTE_IF(
3706 				"reduce_recv_parsing_buf",
3707 				recv_parsing_buf_size
3708 					= (70 * 1024);
3709 				);
3710 
3711 			if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE
3712 			    >= recv_parsing_buf_size) {
3713 				ib::error() << "Log parsing buffer overflow."
3714 					" Recovery may have failed!";
3715 
3716 				recv_sys->found_corrupt_log = true;
3717 
3718 #ifndef UNIV_HOTBACKUP
3719 				if (!srv_force_recovery) {
3720 					ib::error()
3721 						<< "Set innodb_force_recovery"
3722 						" to ignore this error.";
3723 					return(true);
3724 				}
3725 #endif /* !UNIV_HOTBACKUP */
3726 
3727 			} else if (!recv_sys->found_corrupt_log) {
3728 				more_data = recv_sys_add_to_parsing_buf(
3729 					log_block, scanned_lsn);
3730 			}
3731 
3732 			recv_sys->scanned_lsn = scanned_lsn;
3733 			recv_sys->scanned_checkpoint_no
3734 				= log_block_get_checkpoint_no(log_block);
3735 		}
3736 
3737 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3738 			/* Log data for this group ends here */
3739 			finished = true;
3740 			break;
3741 		} else {
3742 			log_block += OS_FILE_LOG_BLOCK_SIZE;
3743 		}
3744 	} while (log_block < buf + len);
3745 
3746 	*group_scanned_lsn = scanned_lsn;
3747 
3748 	if (recv_needed_recovery
3749 	    || (recv_is_from_backup && !recv_is_making_a_backup)) {
3750 		recv_scan_print_counter++;
3751 
3752 		if (finished || (recv_scan_print_counter % 80 == 0)) {
3753 
3754 			ib::info() << "Doing recovery: scanned up to"
3755 				" log sequence number " << scanned_lsn;
3756 		}
3757 	}
3758 
3759 	if (more_data && !recv_sys->found_corrupt_log) {
3760 		/* Try to parse more log records */
3761 
3762 		if (recv_parse_log_recs(checkpoint_lsn,
3763 					*store_to_hash)) {
3764 			ut_ad(recv_sys->found_corrupt_log
3765 			      || recv_sys->found_corrupt_fs
3766 			      || recv_sys->mlog_checkpoint_lsn
3767 			      == recv_sys->recovered_lsn);
3768 			return(true);
3769 		}
3770 
3771 		if (*store_to_hash != STORE_NO
3772 		    && mem_heap_get_size(recv_sys->heap) > available_memory) {
3773 			*store_to_hash = STORE_NO;
3774 		}
3775 
3776 		if (recv_sys->recovered_offset > recv_parsing_buf_size / 4) {
3777 			/* Move parsing buffer data to the buffer start */
3778 
3779 			recv_sys_justify_left_parsing_buf();
3780 		}
3781 	}
3782 
3783 	return(finished);
3784 }
3785 
3786 #ifndef UNIV_HOTBACKUP
3787 /** Scans log from a buffer and stores new log data to the parsing buffer.
3788 Parses and hashes the log records if new data found.
3789 @param[in,out]	group			log group
3790 @param[in,out]	contiguous_lsn		log sequence number
3791 until which all redo log has been scanned
3792 @param[in]	last_phase		whether changes
3793 can be applied to the tablespaces
3794 @return whether rescan is needed (not everything was stored) */
3795 static
3796 bool
recv_group_scan_log_recs(log_group_t * group,lsn_t * contiguous_lsn,bool last_phase)3797 recv_group_scan_log_recs(
3798 	log_group_t*	group,
3799 	lsn_t*		contiguous_lsn,
3800 	bool		last_phase)
3801 {
3802 	DBUG_ENTER("recv_group_scan_log_recs");
3803 	assert(!last_phase || recv_sys->mlog_checkpoint_lsn > 0);
3804 
3805 	mutex_enter(&recv_sys->mutex);
3806 	recv_sys->len = 0;
3807 	recv_sys->recovered_offset = 0;
3808 	recv_sys->n_addrs = 0;
3809 	recv_sys_empty_hash();
3810 	srv_start_lsn = *contiguous_lsn;
3811 	recv_sys->parse_start_lsn = *contiguous_lsn;
3812 	recv_sys->scanned_lsn = *contiguous_lsn;
3813 	recv_sys->recovered_lsn = *contiguous_lsn;
3814 	recv_sys->scanned_checkpoint_no = 0;
3815 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3816 	recv_previous_parsed_rec_offset	= 0;
3817 	recv_previous_parsed_rec_is_multi = 0;
3818 	ut_ad(recv_max_page_lsn == 0);
3819 	ut_ad(last_phase || !recv_writer_thread_active);
3820 	mutex_exit(&recv_sys->mutex);
3821 
3822 	lsn_t	checkpoint_lsn	= *contiguous_lsn;
3823 	lsn_t	start_lsn;
3824 	lsn_t	end_lsn;
3825 	store_t	store_to_hash	= last_phase ? STORE_IF_EXISTS : STORE_YES;
3826 	ulint	available_mem	= UNIV_PAGE_SIZE
3827 		* (buf_pool_get_n_pages()
3828 		   - (recv_n_pool_free_frames * srv_buf_pool_instances));
3829 
3830 	end_lsn = *contiguous_lsn = ut_uint64_align_down(
3831 		*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3832 
3833 	do {
3834 		if (last_phase && store_to_hash == STORE_NO) {
3835 			store_to_hash = STORE_IF_EXISTS;
3836 			/* We must not allow change buffer
3837 			merge here, because it would generate
3838 			redo log records before we have
3839 			finished the redo log scan. */
3840 			recv_apply_hashed_log_recs(FALSE);
3841 		}
3842 
3843 		start_lsn = end_lsn;
3844 		end_lsn += RECV_SCAN_SIZE;
3845 
3846 		log_group_read_log_seg(
3847 			log_sys->buf, group, start_lsn, end_lsn);
3848 	} while (!recv_scan_log_recs(
3849 			 available_mem, &store_to_hash, log_sys->buf,
3850 			 RECV_SCAN_SIZE,
3851 			 checkpoint_lsn,
3852 			 start_lsn, contiguous_lsn, &group->scanned_lsn));
3853 
3854 	if (recv_sys->found_corrupt_log || recv_sys->found_corrupt_fs) {
3855 		DBUG_RETURN(false);
3856 	}
3857 
3858 	DBUG_PRINT("ib_log", ("%s " LSN_PF
3859 			      " completed for log group " ULINTPF,
3860 			      last_phase ? "rescan" : "scan",
3861 			      group->scanned_lsn, group->id));
3862 
3863 	DBUG_RETURN(store_to_hash == STORE_NO);
3864 }
3865 
3866 /*******************************************************//**
3867 Initialize crash recovery environment. Can be called iff
3868 recv_needed_recovery == false. */
3869 static
3870 void
recv_init_crash_recovery(void)3871 recv_init_crash_recovery(void)
3872 {
3873 	ut_ad(!srv_read_only_mode);
3874 	ut_a(!recv_needed_recovery);
3875 
3876 	recv_needed_recovery = true;
3877 }
3878 
3879 /** Report a missing tablespace for which page-redo log exists.
3880 @param[in]	err	previous error code
3881 @param[in]	i	tablespace descriptor
3882 @return new error code */
3883 static
3884 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3885 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3886 {
3887 	if (srv_force_recovery == 0) {
3888 		ib::error() << "Tablespace " << i->first << " was not"
3889 			" found at " << i->second.name << ".";
3890 
3891 		if (err == DB_SUCCESS) {
3892 			ib::error() << "Set innodb_force_recovery=1 to"
3893 				" ignore this and to permanently lose"
3894 				" all changes to the tablespace.";
3895 			err = DB_TABLESPACE_NOT_FOUND;
3896 		}
3897 	} else {
3898 		ib::warn() << "Tablespace " << i->first << " was not"
3899 			" found at " << i->second.name << ", and"
3900 			" innodb_force_recovery was set. All redo log"
3901 			" for this tablespace will be ignored!";
3902 	}
3903 
3904 	return(err);
3905 }
3906 
3907 /** Report a missing mlog_file_name or mlog_file_delete record for
3908 the tablespace.
3909 @param[in]	recv_addr	Hashed page file address. */
3910 static
3911 void
recv_init_missing_mlog(recv_addr_t * recv_addr)3912 recv_init_missing_mlog(
3913 	recv_addr_t*	recv_addr)
3914 {
3915 	ulint	space_id = recv_addr->space;
3916 	ulint	page_no = recv_addr->page_no;
3917 	ulint	type = UT_LIST_GET_FIRST(recv_addr->rec_list)->type;
3918 	ulint	start_lsn = UT_LIST_GET_FIRST(recv_addr->rec_list)->start_lsn;
3919 
3920 	ib::fatal() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE "
3921 		"for redo log record " << type << " (page "
3922 		<< space_id << ":" << page_no << ") at "
3923 		<< start_lsn;
3924 }
3925 
3926 /** Check if all tablespaces were found for crash recovery.
3927 @return error code or DB_SUCCESS */
3928 static MY_ATTRIBUTE((warn_unused_result))
3929 dberr_t
recv_init_crash_recovery_spaces(void)3930 recv_init_crash_recovery_spaces(void)
3931 {
3932 	typedef std::set<ulint>	space_set_t;
3933 	bool		flag_deleted	= false;
3934 	space_set_t	missing_spaces;
3935 
3936 	ut_ad(!srv_read_only_mode);
3937 	ut_ad(recv_needed_recovery);
3938 
3939 	ib::info() << "Database was not shutdown normally!";
3940 	ib::info() << "Starting crash recovery.";
3941 
3942 	for (recv_spaces_t::iterator i = recv_spaces.begin();
3943 	     i != recv_spaces.end(); i++) {
3944 		ut_ad(!is_predefined_tablespace(i->first));
3945 
3946 		if (i->second.deleted) {
3947 			/* The tablespace was deleted,
3948 			so we can ignore any redo log for it. */
3949 			flag_deleted = true;
3950 		} else if (i->second.space != NULL) {
3951 			/* The tablespace was found, and there
3952 			are some redo log records for it. */
3953 			fil_names_dirty(i->second.space);
3954 		} else {
3955 			missing_spaces.insert(i->first);
3956 			flag_deleted = true;
3957 		}
3958 	}
3959 
3960 	if (flag_deleted) {
3961 		dberr_t err = DB_SUCCESS;
3962 
3963 		for (ulint h = 0;
3964 		     h < hash_get_n_cells(recv_sys->addr_hash);
3965 		     h++) {
3966 			for (recv_addr_t* recv_addr
3967 				     = static_cast<recv_addr_t*>(
3968 					     HASH_GET_FIRST(
3969 						     recv_sys->addr_hash, h));
3970 			     recv_addr != 0;
3971 			     recv_addr = static_cast<recv_addr_t*>(
3972 				     HASH_GET_NEXT(addr_hash, recv_addr))) {
3973 				const ulint space = recv_addr->space;
3974 
3975 				if (is_predefined_tablespace(space)) {
3976 					continue;
3977 				}
3978 
3979 				recv_spaces_t::iterator i
3980 					= recv_spaces.find(space);
3981 
3982 				if (i == recv_spaces.end()) {
3983 					recv_init_missing_mlog(recv_addr);
3984 					recv_addr->state = RECV_DISCARDED;
3985 					continue;
3986 				}
3987 
3988 				if (i->second.deleted) {
3989 					ut_ad(missing_spaces.find(space)
3990 					      == missing_spaces.end());
3991 					recv_addr->state = RECV_DISCARDED;
3992 					continue;
3993 				}
3994 
3995 				space_set_t::iterator m = missing_spaces.find(
3996 					space);
3997 
3998 				if (m != missing_spaces.end()) {
3999 					missing_spaces.erase(m);
4000 					err = recv_init_missing_space(err, i);
4001 					recv_addr->state = RECV_DISCARDED;
4002 					/* All further redo log for this
4003 					tablespace should be removed. */
4004 					i->second.deleted = true;
4005 				}
4006 			}
4007 		}
4008 
4009 		if (err != DB_SUCCESS) {
4010 			return(err);
4011 		}
4012 	}
4013 
4014 	for (space_set_t::const_iterator m = missing_spaces.begin();
4015 	     m != missing_spaces.end(); m++) {
4016 		recv_spaces_t::iterator i = recv_spaces.find(*m);
4017 		ut_ad(i != recv_spaces.end());
4018 
4019 		ib::info() << "Tablespace " << i->first
4020 			<< " was not found at '" << i->second.name
4021 			<< "', but there were no modifications either.";
4022 	}
4023 
4024 	buf_dblwr_process();
4025 
4026 	if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
4027 		/* Spawn the background thread to flush dirty pages
4028 		from the buffer pools. */
4029 		os_thread_create(recv_writer_thread, 0, 0);
4030 	}
4031 
4032 	return(DB_SUCCESS);
4033 }
4034 
4035 /** Start recovering from a redo log checkpoint.
4036 @see recv_recovery_from_checkpoint_finish
4037 @param[in]	flush_lsn	FIL_PAGE_FILE_FLUSH_LSN
4038 of first system tablespace page
4039 @return error code or DB_SUCCESS */
4040 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)4041 recv_recovery_from_checkpoint_start(
4042 	lsn_t	flush_lsn)
4043 {
4044 	log_group_t*	group;
4045 	log_group_t*	max_cp_group;
4046 	ulint		max_cp_field;
4047 	lsn_t		checkpoint_lsn;
4048 	bool		rescan;
4049 	ib_uint64_t	checkpoint_no;
4050 	lsn_t		contiguous_lsn;
4051 	byte*		buf;
4052 	byte		log_hdr_buf[LOG_FILE_HDR_SIZE];
4053 	dberr_t		err;
4054 
4055 	/* Initialize red-black tree for fast insertions into the
4056 	flush_list during recovery process. */
4057 	buf_flush_init_flush_rbt();
4058 
4059 	if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
4060 
4061 		ib::info() << "The user has set SRV_FORCE_NO_LOG_REDO on,"
4062 			" skipping log redo";
4063 
4064 		return(DB_SUCCESS);
4065 	}
4066 
4067 	recv_recovery_on = true;
4068 
4069 	log_mutex_enter();
4070 
4071 	/* Look for the latest checkpoint from any of the log groups */
4072 
4073 	err = recv_find_max_checkpoint(&max_cp_group, &max_cp_field);
4074 
4075 	if (err != DB_SUCCESS) {
4076 
4077 		log_mutex_exit();
4078 
4079 		return(err);
4080 	}
4081 
4082 	log_group_header_read(max_cp_group, max_cp_field);
4083 
4084 	buf = log_sys->checkpoint_buf;
4085 
4086 	checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
4087 	checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
4088 
4089 	/* Read the first log file header to print a note if this is
4090 	a recovery from a restored InnoDB Hot Backup */
4091 
4092 	const page_id_t	page_id(max_cp_group->space_id, 0);
4093 
4094 	fil_io(IORequestLogRead, true, page_id, univ_page_size, 0,
4095 	       LOG_FILE_HDR_SIZE, log_hdr_buf, max_cp_group);
4096 
4097 	if (0 == ut_memcmp(log_hdr_buf + LOG_HEADER_CREATOR,
4098 			   (byte*)"ibbackup", (sizeof "ibbackup") - 1)) {
4099 
4100 		if (srv_read_only_mode) {
4101 			log_mutex_exit();
4102 
4103 			ib::error() << "Cannot restore from mysqlbackup,"
4104 				" InnoDB running in read-only mode!";
4105 
4106 			return(DB_ERROR);
4107 		}
4108 
4109 		/* This log file was created by mysqlbackup --restore: print
4110 		a note to the user about it */
4111 
4112 		ib::info() << "The log file was created by mysqlbackup"
4113 			" --apply-log at "
4114 			<< log_hdr_buf + LOG_HEADER_CREATOR
4115 			<< ". The following crash recovery is part of a"
4116 			" normal restore.";
4117 
4118 		/* Replace the label. */
4119 		ut_ad(LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR
4120 		      >= sizeof LOG_HEADER_CREATOR_CURRENT);
4121 		memset(log_hdr_buf + LOG_HEADER_CREATOR, 0,
4122 		       LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR);
4123 		strcpy(reinterpret_cast<char*>(log_hdr_buf)
4124 		       + LOG_HEADER_CREATOR, LOG_HEADER_CREATOR_CURRENT);
4125 
4126 		/* Write to the log file to wipe over the label */
4127 		fil_io(IORequestLogWrite, true, page_id,
4128 		       univ_page_size, 0, OS_FILE_LOG_BLOCK_SIZE, log_hdr_buf,
4129 		       max_cp_group);
4130 	}
4131 
4132 	/* Start reading the log groups from the checkpoint lsn up. The
4133 	variable contiguous_lsn contains an lsn up to which the log is
4134 	known to be contiguously written to all log groups. */
4135 
4136 	recv_sys->mlog_checkpoint_lsn = 0;
4137 
4138 	ut_ad(RECV_SCAN_SIZE <= log_sys->buf_size);
4139 
4140 	ut_ad(UT_LIST_GET_LEN(log_sys->log_groups) == 1);
4141 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
4142 
4143 	ut_ad(recv_sys->n_addrs == 0);
4144 	contiguous_lsn = checkpoint_lsn;
4145 	switch (group->format) {
4146 	case 0:
4147 		log_mutex_exit();
4148 		return(recv_log_format_0_recover(checkpoint_lsn));
4149 	case LOG_HEADER_FORMAT_CURRENT:
4150 		break;
4151 	default:
4152 		ut_ad(0);
4153 		recv_sys->found_corrupt_log = true;
4154 		log_mutex_exit();
4155 		return(DB_ERROR);
4156 	}
4157 
4158 	/** Scan the redo log from checkpoint lsn and redo log to
4159 	the hash table. */
4160 	rescan = recv_group_scan_log_recs(group, &contiguous_lsn, false);
4161 
4162 
4163 	if ((recv_sys->found_corrupt_log && !srv_force_recovery)
4164 	    || recv_sys->found_corrupt_fs) {
4165 		log_mutex_exit();
4166 		return(DB_ERROR);
4167 	}
4168 
4169 	if (recv_sys->mlog_checkpoint_lsn == 0) {
4170 		if (!srv_read_only_mode
4171 		    && group->scanned_lsn != checkpoint_lsn) {
4172 			ib::error() << "Ignoring the redo log due to missing"
4173 				" MLOG_CHECKPOINT between the checkpoint "
4174 				<< checkpoint_lsn << " and the end "
4175 				<< group->scanned_lsn << ".";
4176 			if (srv_force_recovery < SRV_FORCE_NO_LOG_REDO) {
4177 				log_mutex_exit();
4178 				return(DB_ERROR);
4179 			}
4180 		}
4181 
4182 		group->scanned_lsn = checkpoint_lsn;
4183 		rescan = false;
4184 	}
4185 
4186 	/* NOTE: we always do a 'recovery' at startup, but only if
4187 	there is something wrong we will print a message to the
4188 	user about recovery: */
4189 
4190 	if (checkpoint_lsn != flush_lsn) {
4191 
4192 		if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
4193 			ib::warn() << " Are you sure you are using the"
4194 				" right ib_logfiles to start up the database?"
4195 				" Log sequence number in the ib_logfiles is "
4196 				<< checkpoint_lsn << ", less than the"
4197 				" log sequence number in the first system"
4198 				" tablespace file header, " << flush_lsn << ".";
4199 		}
4200 
4201 		if (!recv_needed_recovery) {
4202 
4203 			ib::info() << "The log sequence number " << flush_lsn
4204 				<< " in the system tablespace does not match"
4205 				" the log sequence number " << checkpoint_lsn
4206 				<< " in the ib_logfiles!";
4207 
4208 			if (srv_read_only_mode) {
4209 				ib::error() << "Can't initiate database"
4210 					" recovery, running in read-only-mode.";
4211 				log_mutex_exit();
4212 				return(DB_READ_ONLY);
4213 			}
4214 
4215 			recv_init_crash_recovery();
4216 		}
4217 	}
4218 
4219 	log_sys->lsn = recv_sys->recovered_lsn;
4220 
4221 	if (recv_needed_recovery) {
4222 		err = recv_init_crash_recovery_spaces();
4223 
4224 		if (err != DB_SUCCESS) {
4225 			log_mutex_exit();
4226 			return(err);
4227 		}
4228 
4229 		if (rescan) {
4230 			contiguous_lsn = checkpoint_lsn;
4231 			recv_group_scan_log_recs(group, &contiguous_lsn, true);
4232 
4233 			if ((recv_sys->found_corrupt_log
4234 			     && !srv_force_recovery)
4235 			    || recv_sys->found_corrupt_fs) {
4236 				log_mutex_exit();
4237 				return(DB_ERROR);
4238 			}
4239 		}
4240 	} else {
4241 		ut_ad(!rescan || recv_sys->n_addrs == 0);
4242 	}
4243 
4244 	/* We currently have only one log group */
4245 
4246 	if (group->scanned_lsn < checkpoint_lsn
4247 	    || group->scanned_lsn < recv_max_page_lsn) {
4248 
4249 		ib::error() << "We scanned the log up to " << group->scanned_lsn
4250 			<< ". A checkpoint was at " << checkpoint_lsn << " and"
4251 			" the maximum LSN on a database page was "
4252 			<< recv_max_page_lsn << ". It is possible that the"
4253 			" database is now corrupt!";
4254 	}
4255 
4256 	if (recv_sys->recovered_lsn < checkpoint_lsn) {
4257 		log_mutex_exit();
4258 
4259 		/* No harm in trying to do RO access. */
4260 		if (!srv_read_only_mode) {
4261 			ut_error;
4262 		}
4263 
4264 		return(DB_ERROR);
4265 	}
4266 
4267 	/* Synchronize the uncorrupted log groups to the most up-to-date log
4268 	group; we also copy checkpoint info to groups */
4269 
4270 	log_sys->next_checkpoint_lsn = checkpoint_lsn;
4271 	log_sys->next_checkpoint_no = checkpoint_no + 1;
4272 
4273 	recv_synchronize_groups();
4274 
4275 	if (!recv_needed_recovery) {
4276 		ut_a(checkpoint_lsn == recv_sys->recovered_lsn);
4277 	} else {
4278 		srv_start_lsn = recv_sys->recovered_lsn;
4279 	}
4280 
4281 	ut_memcpy(log_sys->buf, recv_sys->last_block, OS_FILE_LOG_BLOCK_SIZE);
4282 
4283 	log_sys->buf_free = (ulint) log_sys->lsn % OS_FILE_LOG_BLOCK_SIZE;
4284 	log_sys->buf_next_to_write = log_sys->buf_free;
4285 	log_sys->write_lsn = log_sys->lsn;
4286 
4287 	log_sys->last_checkpoint_lsn = checkpoint_lsn;
4288 
4289 	if (!srv_read_only_mode) {
4290 		/* Write a MLOG_CHECKPOINT marker as the first thing,
4291 		before generating any other redo log. */
4292 		fil_names_clear(log_sys->last_checkpoint_lsn, true);
4293 	}
4294 
4295 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
4296 		    log_sys->lsn - log_sys->last_checkpoint_lsn);
4297 
4298 	log_sys->next_checkpoint_no = checkpoint_no + 1;
4299 
4300 	mutex_enter(&recv_sys->mutex);
4301 
4302 	recv_sys->apply_log_recs = TRUE;
4303 
4304 	mutex_exit(&recv_sys->mutex);
4305 
4306 	log_mutex_exit();
4307 
4308 	recv_lsn_checks_on = true;
4309 
4310 	/* The database is now ready to start almost normal processing of user
4311 	transactions: transaction rollbacks and the application of the log
4312 	records in the hash table can be run in background. */
4313 
4314 	return(DB_SUCCESS);
4315 }
4316 
4317 /** Complete recovery from a checkpoint. */
4318 void
recv_recovery_from_checkpoint_finish(void)4319 recv_recovery_from_checkpoint_finish(void)
4320 {
4321 	/* Make sure that the recv_writer thread is done. This is
4322 	required because it grabs various mutexes and we want to
4323 	ensure that when we enable sync_order_checks there is no
4324 	mutex currently held by any thread. */
4325 	mutex_enter(&recv_sys->writer_mutex);
4326 
4327 	/* Free the resources of the recovery system */
4328 	recv_recovery_on = false;
4329 
4330 	/* By acquring the mutex we ensure that the recv_writer thread
4331 	won't trigger any more LRU batches. Now wait for currently
4332 	in progress batches to finish. */
4333 	buf_flush_wait_LRU_batch_end();
4334 
4335 	mutex_exit(&recv_sys->writer_mutex);
4336 
4337 	ulint count = 0;
4338 	while (recv_writer_thread_active) {
4339 		++count;
4340 		os_thread_sleep(100000);
4341 		if (srv_print_verbose_log && count > 600) {
4342 			ib::info() << "Waiting for recv_writer to"
4343 				" finish flushing of buffer pool";
4344 			count = 0;
4345 		}
4346 	}
4347 
4348 	recv_sys_debug_free();
4349 
4350 	/* Free up the flush_rbt. */
4351 	buf_flush_free_flush_rbt();
4352 
4353 	/* Validate a few system page types that were left uninitialized
4354 	by older versions of MySQL. */
4355 	mtr_t		mtr;
4356 	buf_block_t*	block;
4357 	mtr.start();
4358 	mtr.set_sys_modified();
4359 	/* Bitmap page types will be reset in buf_dblwr_check_block()
4360 	without redo logging. */
4361 	block = buf_page_get(
4362 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
4363 		univ_page_size, RW_X_LATCH, &mtr);
4364 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4365 	/* Already MySQL 3.23.53 initialized FSP_IBUF_TREE_ROOT_PAGE_NO
4366 	to FIL_PAGE_INDEX. No need to reset that one. */
4367 	block = buf_page_get(
4368 		page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
4369 		univ_page_size, RW_X_LATCH, &mtr);
4370 	fil_block_check_type(block, FIL_PAGE_TYPE_TRX_SYS, &mtr);
4371 	block = buf_page_get(
4372 		page_id_t(TRX_SYS_SPACE, FSP_FIRST_RSEG_PAGE_NO),
4373 		univ_page_size, RW_X_LATCH, &mtr);
4374 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4375 	block = buf_page_get(
4376 		page_id_t(TRX_SYS_SPACE, FSP_DICT_HDR_PAGE_NO),
4377 		univ_page_size, RW_X_LATCH, &mtr);
4378 	fil_block_check_type(block, FIL_PAGE_TYPE_SYS, &mtr);
4379 	mtr.commit();
4380 
4381 	/* Roll back any recovered data dictionary transactions, so
4382 	that the data dictionary tables will be free of any locks.
4383 	The data dictionary latch should guarantee that there is at
4384 	most one data dictionary transaction active at a time. */
4385 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO) {
4386 		trx_rollback_or_clean_recovered(FALSE);
4387 	}
4388 }
4389 
4390 /********************************************************//**
4391 Initiates the rollback of active transactions. */
4392 void
recv_recovery_rollback_active(void)4393 recv_recovery_rollback_active(void)
4394 /*===============================*/
4395 {
4396 	ut_ad(!recv_writer_thread_active);
4397 
4398 	/* Switch latching order checks on in sync0debug.cc, if
4399 	--innodb-sync-debug=true (default) */
4400 	ut_d(sync_check_enable());
4401 
4402 	/* We can't start any (DDL) transactions if UNDO logging
4403 	has been disabled, additionally disable ROLLBACK of recovered
4404 	user transactions. */
4405 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
4406 	    && !srv_read_only_mode) {
4407 
4408 		/* Drop partially created indexes. */
4409 		row_merge_drop_temp_indexes();
4410 		/* Drop temporary tables. */
4411 		row_mysql_drop_temp_tables();
4412 
4413 		/* Drop any auxiliary tables that were not dropped when the
4414 		parent table was dropped. This can happen if the parent table
4415 		was dropped but the server crashed before the auxiliary tables
4416 		were dropped. */
4417 		fts_drop_orphaned_tables();
4418 
4419 		/* Rollback the uncommitted transactions which have no user
4420 		session */
4421 
4422 		trx_rollback_or_clean_is_active = true;
4423 		os_thread_create(trx_rollback_or_clean_all_recovered, 0, 0);
4424 	}
4425 }
4426 
4427 /******************************************************//**
4428 Resets the logs. The contents of log files will be lost! */
4429 void
recv_reset_logs(lsn_t lsn)4430 recv_reset_logs(
4431 /*============*/
4432 	lsn_t		lsn)		/*!< in: reset to this lsn
4433 					rounded up to be divisible by
4434 					OS_FILE_LOG_BLOCK_SIZE, after
4435 					which we add
4436 					LOG_BLOCK_HDR_SIZE */
4437 {
4438 	log_group_t*	group;
4439 
4440 	ut_ad(log_mutex_own());
4441 
4442 	log_sys->lsn = ut_uint64_align_up(lsn, OS_FILE_LOG_BLOCK_SIZE);
4443 
4444 	group = UT_LIST_GET_FIRST(log_sys->log_groups);
4445 
4446 	while (group) {
4447 		group->lsn = log_sys->lsn;
4448 		group->lsn_offset = LOG_FILE_HDR_SIZE;
4449 		group = UT_LIST_GET_NEXT(log_groups, group);
4450 	}
4451 
4452 	log_sys->buf_next_to_write = 0;
4453 	log_sys->write_lsn = log_sys->lsn;
4454 
4455 	log_sys->next_checkpoint_no = 0;
4456 	log_sys->last_checkpoint_lsn = 0;
4457 
4458 	log_block_init(log_sys->buf, log_sys->lsn);
4459 	log_block_set_first_rec_group(log_sys->buf, LOG_BLOCK_HDR_SIZE);
4460 
4461 	log_sys->buf_free = LOG_BLOCK_HDR_SIZE;
4462 	log_sys->lsn += LOG_BLOCK_HDR_SIZE;
4463 
4464 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
4465 		    (log_sys->lsn - log_sys->last_checkpoint_lsn));
4466 
4467 	log_mutex_exit();
4468 
4469 	/* Reset the checkpoint fields in logs */
4470 
4471 	log_make_checkpoint_at(LSN_MAX, TRUE);
4472 
4473 	log_mutex_enter();
4474 }
4475 #endif /* !UNIV_HOTBACKUP */
4476 
4477 #ifdef UNIV_HOTBACKUP
4478 /******************************************************//**
4479 Creates new log files after a backup has been restored. */
4480 void
recv_reset_log_files_for_backup(const char * log_dir,ulint n_log_files,lsn_t log_file_size,lsn_t lsn)4481 recv_reset_log_files_for_backup(
4482 /*============================*/
4483 	const char*	log_dir,	/*!< in: log file directory path */
4484 	ulint		n_log_files,	/*!< in: number of log files */
4485 	lsn_t		log_file_size,	/*!< in: log file size */
4486 	lsn_t		lsn)		/*!< in: new start lsn, must be
4487 					divisible by OS_FILE_LOG_BLOCK_SIZE */
4488 {
4489 	os_file_t	log_file;
4490 	bool		success;
4491 	byte*		buf;
4492 	ulint		i;
4493 	ulint		log_dir_len;
4494 	char		name[5000];
4495 
4496 	log_dir_len = strlen(log_dir);
4497 	/* full path name of ib_logfile consists of log dir path + basename
4498 	+ number. This must fit in the name buffer.
4499 	*/
4500 	ut_a(log_dir_len + strlen(ib_logfile_basename) + 11  < sizeof(name));
4501 
4502 	buf = (byte*)ut_zalloc_nokey(LOG_FILE_HDR_SIZE +
4503 				     OS_FILE_LOG_BLOCK_SIZE);
4504 
4505 	for (i = 0; i < n_log_files; i++) {
4506 
4507 		sprintf(name, "%s%s%lu", log_dir,
4508 			ib_logfile_basename, (ulong) i);
4509 
4510 		log_file = os_file_create_simple(innodb_log_file_key,
4511 						 name, OS_FILE_CREATE,
4512 						 OS_FILE_READ_WRITE,
4513 						 srv_read_only_mode, &success);
4514 		if (!success) {
4515 			ib::fatal() << "Cannot create " << name << ". Check that"
4516 				" the file does not exist yet.";
4517 		}
4518 
4519 		ib::info() << "Setting log file size to " << log_file_size;
4520 
4521 		success = os_file_set_size(
4522 			name, log_file, log_file_size, srv_read_only_mode);
4523 
4524 		if (!success) {
4525 			ib::fatal() << "Cannot set " << name << " size to "
4526 				<< (long long unsigned)log_file_size;
4527 		}
4528 
4529 		os_file_flush(log_file);
4530 		os_file_close(log_file);
4531 	}
4532 
4533 	/* We pretend there is a checkpoint at lsn + LOG_BLOCK_HDR_SIZE */
4534 
4535 	log_reset_first_header_and_checkpoint(buf, lsn);
4536 
4537 	log_block_init(buf + LOG_FILE_HDR_SIZE, lsn);
4538 	log_block_set_first_rec_group(buf + LOG_FILE_HDR_SIZE,
4539 				      LOG_BLOCK_HDR_SIZE);
4540 	log_block_set_checksum(buf + LOG_FILE_HDR_SIZE,
4541 	log_block_calc_checksum_crc32(buf + LOG_FILE_HDR_SIZE));
4542 
4543 	log_block_set_checksum(buf, log_block_calc_checksum_crc32(buf));
4544 	sprintf(name, "%s%s%lu", log_dir, ib_logfile_basename, (ulong)0);
4545 
4546 	log_file = os_file_create_simple(innodb_log_file_key,
4547 					 name, OS_FILE_OPEN,
4548 					 OS_FILE_READ_WRITE,
4549 					 srv_read_only_mode, &success);
4550 	if (!success) {
4551 		ib::fatal() << "Cannot open " << name << ".";
4552 	}
4553 
4554 	IORequest	request(IORequest::WRITE);
4555 
4556 	dberr_t	err = os_file_write(
4557 		request, name, log_file, buf, 0,
4558 		LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE);
4559 
4560 	ut_a(err == DB_SUCCESS);
4561 
4562 	os_file_flush(log_file);
4563 	os_file_close(log_file);
4564 
4565 	ut_free(buf);
4566 }
4567 #endif /* UNIV_HOTBACKUP */
4568 
4569 /** Find a doublewrite copy of a page.
4570 @param[in]	space_id	tablespace identifier
4571 @param[in]	page_no		page number
4572 @return	page frame
4573 @retval NULL if no page was found */
4574 
4575 const byte*
find_page(ulint space_id,ulint page_no)4576 recv_dblwr_t::find_page(ulint space_id, ulint page_no)
4577 {
4578 	typedef std::vector<const byte*, ut_allocator<const byte*> >
4579 		matches_t;
4580 
4581 	matches_t	matches;
4582 	const byte*	result = 0;
4583 
4584 	for (list::iterator i = pages.begin(); i != pages.end(); ++i) {
4585 		if (page_get_space_id(*i) == space_id
4586 		    && page_get_page_no(*i) == page_no) {
4587 			matches.push_back(*i);
4588 		}
4589 	}
4590 
4591 	if (matches.size() == 1) {
4592 		result = matches[0];
4593 	} else if (matches.size() > 1) {
4594 
4595 		lsn_t max_lsn	= 0;
4596 		lsn_t page_lsn	= 0;
4597 
4598 		for (matches_t::iterator i = matches.begin();
4599 		     i != matches.end();
4600 		     ++i) {
4601 
4602 			page_lsn = mach_read_from_8(*i + FIL_PAGE_LSN);
4603 
4604 			if (page_lsn > max_lsn) {
4605 				max_lsn = page_lsn;
4606 				result = *i;
4607 			}
4608 		}
4609 	}
4610 
4611 	return(result);
4612 }
4613 
4614 #ifndef NDEBUG
4615 /** Return string name of the redo log record type.
4616 @param[in]	type	record log record enum
4617 @return string name of record log record */
4618 const char*
get_mlog_string(mlog_id_t type)4619 get_mlog_string(mlog_id_t type)
4620 {
4621 	switch (type) {
4622 	case MLOG_SINGLE_REC_FLAG:
4623 		return("MLOG_SINGLE_REC_FLAG");
4624 
4625 	case MLOG_1BYTE:
4626 		return("MLOG_1BYTE");
4627 
4628 	case MLOG_2BYTES:
4629 		return("MLOG_2BYTES");
4630 
4631 	case MLOG_4BYTES:
4632 		return("MLOG_4BYTES");
4633 
4634 	case MLOG_8BYTES:
4635 		return("MLOG_8BYTES");
4636 
4637 	case MLOG_REC_INSERT:
4638 		return("MLOG_REC_INSERT");
4639 
4640 	case MLOG_REC_CLUST_DELETE_MARK:
4641 		return("MLOG_REC_CLUST_DELETE_MARK");
4642 
4643 	case MLOG_REC_SEC_DELETE_MARK:
4644 		return("MLOG_REC_SEC_DELETE_MARK");
4645 
4646 	case MLOG_REC_UPDATE_IN_PLACE:
4647 		return("MLOG_REC_UPDATE_IN_PLACE");
4648 
4649 	case MLOG_REC_DELETE:
4650 		return("MLOG_REC_DELETE");
4651 
4652 	case MLOG_LIST_END_DELETE:
4653 		return("MLOG_LIST_END_DELETE");
4654 
4655 	case MLOG_LIST_START_DELETE:
4656 		return("MLOG_LIST_START_DELETE");
4657 
4658 	case MLOG_LIST_END_COPY_CREATED:
4659 		return("MLOG_LIST_END_COPY_CREATED");
4660 
4661 	case MLOG_PAGE_REORGANIZE:
4662 		return("MLOG_PAGE_REORGANIZE");
4663 
4664 	case MLOG_PAGE_CREATE:
4665 		return("MLOG_PAGE_CREATE");
4666 
4667 	case MLOG_UNDO_INSERT:
4668 		return("MLOG_UNDO_INSERT");
4669 
4670 	case MLOG_UNDO_ERASE_END:
4671 		return("MLOG_UNDO_ERASE_END");
4672 
4673 	case MLOG_UNDO_INIT:
4674 		return("MLOG_UNDO_INIT");
4675 
4676 	case MLOG_UNDO_HDR_DISCARD:
4677 		return("MLOG_UNDO_HDR_DISCARD");
4678 
4679 	case MLOG_UNDO_HDR_REUSE:
4680 		return("MLOG_UNDO_HDR_REUSE");
4681 
4682 	case MLOG_UNDO_HDR_CREATE:
4683 		return("MLOG_UNDO_HDR_CREATE");
4684 
4685 	case MLOG_REC_MIN_MARK:
4686 		return("MLOG_REC_MIN_MARK");
4687 
4688 	case MLOG_IBUF_BITMAP_INIT:
4689 		return("MLOG_IBUF_BITMAP_INIT");
4690 
4691 #ifdef UNIV_LOG_LSN_DEBUG
4692 	case MLOG_LSN:
4693 		return("MLOG_LSN");
4694 #endif /* UNIV_LOG_LSN_DEBUG */
4695 
4696 	case MLOG_INIT_FILE_PAGE:
4697 		return("MLOG_INIT_FILE_PAGE");
4698 
4699 	case MLOG_WRITE_STRING:
4700 		return("MLOG_WRITE_STRING");
4701 
4702 	case MLOG_MULTI_REC_END:
4703 		return("MLOG_MULTI_REC_END");
4704 
4705 	case MLOG_DUMMY_RECORD:
4706 		return("MLOG_DUMMY_RECORD");
4707 
4708 	case MLOG_FILE_DELETE:
4709 		return("MLOG_FILE_DELETE");
4710 
4711 	case MLOG_COMP_REC_MIN_MARK:
4712 		return("MLOG_COMP_REC_MIN_MARK");
4713 
4714 	case MLOG_COMP_PAGE_CREATE:
4715 		return("MLOG_COMP_PAGE_CREATE");
4716 
4717 	case MLOG_COMP_REC_INSERT:
4718 		return("MLOG_COMP_REC_INSERT");
4719 
4720 	case MLOG_COMP_REC_CLUST_DELETE_MARK:
4721 		return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4722 
4723 	case MLOG_COMP_REC_SEC_DELETE_MARK:
4724 		return("MLOG_COMP_REC_SEC_DELETE_MARK");
4725 
4726 	case MLOG_COMP_REC_UPDATE_IN_PLACE:
4727 		return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4728 
4729 	case MLOG_COMP_REC_DELETE:
4730 		return("MLOG_COMP_REC_DELETE");
4731 
4732 	case MLOG_COMP_LIST_END_DELETE:
4733 		return("MLOG_COMP_LIST_END_DELETE");
4734 
4735 	case MLOG_COMP_LIST_START_DELETE:
4736 		return("MLOG_COMP_LIST_START_DELETE");
4737 
4738 	case MLOG_COMP_LIST_END_COPY_CREATED:
4739 		return("MLOG_COMP_LIST_END_COPY_CREATED");
4740 
4741 	case MLOG_COMP_PAGE_REORGANIZE:
4742 		return("MLOG_COMP_PAGE_REORGANIZE");
4743 
4744 	case MLOG_FILE_CREATE2:
4745 		return("MLOG_FILE_CREATE2");
4746 
4747 	case MLOG_ZIP_WRITE_NODE_PTR:
4748 		return("MLOG_ZIP_WRITE_NODE_PTR");
4749 
4750 	case MLOG_ZIP_WRITE_BLOB_PTR:
4751 		return("MLOG_ZIP_WRITE_BLOB_PTR");
4752 
4753 	case MLOG_ZIP_WRITE_HEADER:
4754 		return("MLOG_ZIP_WRITE_HEADER");
4755 
4756 	case MLOG_ZIP_PAGE_COMPRESS:
4757 		return("MLOG_ZIP_PAGE_COMPRESS");
4758 
4759 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4760 		return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4761 
4762 	case MLOG_ZIP_PAGE_REORGANIZE:
4763 		return("MLOG_ZIP_PAGE_REORGANIZE");
4764 
4765 	case MLOG_FILE_RENAME2:
4766 		return("MLOG_FILE_RENAME2");
4767 
4768 	case MLOG_FILE_NAME:
4769 		return("MLOG_FILE_NAME");
4770 
4771 	case MLOG_CHECKPOINT:
4772 		return("MLOG_CHECKPOINT");
4773 
4774 	case MLOG_PAGE_CREATE_RTREE:
4775 		return("MLOG_PAGE_CREATE_RTREE");
4776 
4777 	case MLOG_COMP_PAGE_CREATE_RTREE:
4778 		return("MLOG_COMP_PAGE_CREATE_RTREE");
4779 
4780 	case MLOG_INIT_FILE_PAGE2:
4781 		return("MLOG_INIT_FILE_PAGE2");
4782 
4783 	case MLOG_INDEX_LOAD:
4784 		return("MLOG_INDEX_LOAD");
4785 
4786 	case MLOG_TRUNCATE:
4787 		return("MLOG_TRUNCATE");
4788 	}
4789 	assert(0);
4790 	return(NULL);
4791 }
4792 #endif /* !NDEBUG */
4793