1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2013, 2020, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file log/log0recv.cc
23 Recovery
24 
25 Created 9/20/1997 Heikki Tuuri
26 *******************************************************/
27 
28 #include "univ.i"
29 
30 #include <map>
31 #include <string>
32 #include <my_service_manager.h>
33 
34 #include "log0recv.h"
35 
36 #ifdef HAVE_MY_AES_H
37 #include <my_aes.h>
38 #endif
39 
40 #include "log0crypt.h"
41 #include "mem0mem.h"
42 #include "buf0buf.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0cur.h"
47 #include "page0zip.h"
48 #include "btr0btr.h"
49 #include "btr0cur.h"
50 #include "ibuf0ibuf.h"
51 #include "trx0undo.h"
52 #include "trx0rec.h"
53 #include "fil0fil.h"
54 #include "buf0rea.h"
55 #include "srv0srv.h"
56 #include "srv0start.h"
57 #include "trx0roll.h"
58 #include "row0merge.h"
59 #include "fil0pagecompress.h"
60 
61 /** Log records are stored in the hash table in chunks at most of this size;
62 this must be less than srv_page_size as it is stored in the buffer pool */
63 #define RECV_DATA_BLOCK_SIZE	(MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t) - REDZONE_SIZE)
64 
65 /** Read-ahead area in applying log records to file pages */
66 #define RECV_READ_AHEAD_AREA	32
67 
68 /** The recovery system */
69 recv_sys_t	recv_sys;
70 /** TRUE when applying redo log records during crash recovery; FALSE
71 otherwise.  Note that this is FALSE while a background thread is
72 rolling back incomplete transactions. */
73 volatile bool	recv_recovery_on;
74 
75 /** TRUE when recv_init_crash_recovery() has been called. */
76 bool	recv_needed_recovery;
77 #ifdef UNIV_DEBUG
78 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
79 Protected by log_sys.mutex. */
80 bool	recv_no_log_write = false;
81 #endif /* UNIV_DEBUG */
82 
83 /** TRUE if buf_page_is_corrupted() should check if the log sequence
84 number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
85 recv_recovery_from_checkpoint_start(). */
86 bool	recv_lsn_checks_on;
87 
88 /** If the following is TRUE, the buffer pool file pages must be invalidated
89 after recovery and no ibuf operations are allowed; this becomes TRUE if
90 the log record hash table becomes too full, and log records must be merged
91 to file pages already before the recovery is finished: in this case no
92 ibuf operations are allowed, as they could modify the pages read in the
93 buffer pool before the pages have been recovered to the up-to-date state.
94 
95 TRUE means that recovery is running and no operations on the log files
96 are allowed yet: the variable name is misleading. */
97 bool	recv_no_ibuf_operations;
98 
99 /** The type of the previous parsed redo log record */
100 static mlog_id_t	recv_previous_parsed_rec_type;
101 /** The offset of the previous parsed redo log record */
102 static ulint	recv_previous_parsed_rec_offset;
103 /** The 'multi' flag of the previous parsed redo log record */
104 static ulint	recv_previous_parsed_rec_is_multi;
105 
106 /** The maximum lsn we see for a page during the recovery process. If this
107 is bigger than the lsn we are able to scan up to, that is an indication that
108 the recovery failed and the database may be corrupt. */
109 static lsn_t	recv_max_page_lsn;
110 
111 #ifdef UNIV_PFS_THREAD
112 mysql_pfs_key_t	trx_rollback_clean_thread_key;
113 mysql_pfs_key_t	recv_writer_thread_key;
114 #endif /* UNIV_PFS_THREAD */
115 
116 /** Is recv_writer_thread active? */
117 bool	recv_writer_thread_active;
118 
119 #ifndef	DBUG_OFF
120 /** Return string name of the redo log record type.
121 @param[in]	type	record log record enum
122 @return string name of record log record */
123 static const char* get_mlog_string(mlog_id_t type);
124 #endif /* !DBUG_OFF */
125 
126 /** Tablespace item during recovery */
127 struct file_name_t {
128 	/** Tablespace file name (MLOG_FILE_NAME) */
129 	std::string	name;
130 	/** Tablespace object (NULL if not valid or not found) */
131 	fil_space_t*	space;
132 
133 	/** Tablespace status. */
134 	enum fil_status {
135 		/** Normal tablespace */
136 		NORMAL,
137 		/** Deleted tablespace */
138 		DELETED,
139 		/** Missing tablespace */
140 		MISSING
141 	};
142 
143 	/** Status of the tablespace */
144 	fil_status	status;
145 
146 	/** FSP_SIZE of tablespace */
147 	ulint		size = 0;
148 
149 	/** the log sequence number of the last observed MLOG_INDEX_LOAD
150 	record for the tablespace */
151 	lsn_t		enable_lsn = 0;
152 
153 	/** Dummy flags before they have been read from the .ibd file */
154 	static constexpr uint32_t initial_flags = FSP_FLAGS_FCRC32_MASK_MARKER;
155 	/** FSP_SPACE_FLAGS of tablespace */
156 	uint32_t	flags = initial_flags;
157 
158 	/** Constructor */
file_name_tfile_name_t159 	file_name_t(std::string name_, bool deleted)
160 		: name(std::move(name_)), space(NULL),
161 		status(deleted ? DELETED: NORMAL) {}
162 
163 	/** Report a MLOG_INDEX_LOAD operation, meaning that
164 	mlog_init for any earlier LSN must be skipped.
165 	@param lsn	log sequence number of the MLOG_INDEX_LOAD */
mlog_index_loadfile_name_t166 	void mlog_index_load(lsn_t lsn)
167 	{
168 		if (enable_lsn < lsn) enable_lsn = lsn;
169 	}
170 };
171 
172 /** Map of dirty tablespaces during recovery */
173 typedef std::map<
174 	ulint,
175 	file_name_t,
176 	std::less<ulint>,
177 	ut_allocator<std::pair<const ulint, file_name_t> > >	recv_spaces_t;
178 
179 static recv_spaces_t	recv_spaces;
180 
181 /** States of recv_addr_t */
182 enum recv_addr_state {
183 	/** not yet processed */
184 	RECV_NOT_PROCESSED,
185 	/** not processed; the page will be reinitialized */
186 	RECV_WILL_NOT_READ,
187 	/** page is being read */
188 	RECV_BEING_READ,
189 	/** log records are being applied on the page */
190 	RECV_BEING_PROCESSED,
191 	/** log records have been applied on the page */
192 	RECV_PROCESSED,
193 	/** log records have been discarded because the tablespace
194 	does not exist */
195 	RECV_DISCARDED
196 };
197 
198 /** Hashed page file address struct */
199 struct recv_addr_t{
200 	/** recovery state of the page */
201 	recv_addr_state	state;
202 	/** tablespace identifier */
203 	unsigned	space:32;
204 	/** page number */
205 	unsigned	page_no:32;
206 	/** list of log records for this page */
207 	UT_LIST_BASE_NODE_T(recv_t) rec_list;
208 	/** hash node in the hash bucket chain */
209 	hash_node_t	addr_hash;
210 };
211 
212 /** Report optimized DDL operation (without redo log),
213 corresponding to MLOG_INDEX_LOAD.
214 @param[in]	space_id	tablespace identifier
215 */
216 void (*log_optimized_ddl_op)(ulint space_id);
217 
218 /** Report an operation to create, delete, or rename a file during backup.
219 @param[in]	space_id	tablespace identifier
220 @param[in]	flags		tablespace flags (NULL if not create)
221 @param[in]	name		file name (not NUL-terminated)
222 @param[in]	len		length of name, in bytes
223 @param[in]	new_name	new file name (NULL if not rename)
224 @param[in]	new_len		length of new_name, in bytes (0 if NULL) */
225 void (*log_file_op)(ulint space_id, const byte* flags,
226 		    const byte* name, ulint len,
227 		    const byte* new_name, ulint new_len);
228 
229 /** Information about initializing page contents during redo log processing */
230 class mlog_init_t
231 {
232 public:
233 	/** A page initialization operation that was parsed from
234 	the redo log */
235 	struct init {
236 		/** log sequence number of the page initialization */
237 		lsn_t lsn;
238 		/** Whether btr_page_create() avoided a read of the page.
239 
240 		At the end of the last recovery batch, ibuf_merge()
241 		will invoke change buffer merge for pages that reside
242 		in the buffer pool. (In the last batch, loading pages
243 		would trigger change buffer merge.) */
244 		bool created;
245 	};
246 
247 private:
248 	typedef std::map<const page_id_t, init,
249 			 std::less<const page_id_t>,
250 			 ut_allocator<std::pair<const page_id_t, init> > >
251 		map;
252 	/** Map of page initialization operations.
253 	FIXME: Merge this to recv_sys.addr_hash! */
254 	map inits;
255 public:
256 	/** Record that a page will be initialized by the redo log.
257 	@param[in]	space		tablespace identifier
258 	@param[in]	page_no		page number
259 	@param[in]	lsn		log sequence number */
add(ulint space,ulint page_no,lsn_t lsn)260 	void add(ulint space, ulint page_no, lsn_t lsn)
261 	{
262 		ut_ad(mutex_own(&recv_sys.mutex));
263 		const init init = { lsn, false };
264 		std::pair<map::iterator, bool> p = inits.insert(
265 			map::value_type(page_id_t(space, page_no), init));
266 		ut_ad(!p.first->second.created);
267 		if (!p.second && p.first->second.lsn < init.lsn) {
268 			p.first->second = init;
269 		}
270 	}
271 
272 	/** Get the last stored lsn of the page id and its respective
273 	init/load operation.
274 	@param[in]	page_id	page id
275 	@param[in,out]	init	initialize log or load log
276 	@return the latest page initialization;
277 	not valid after releasing recv_sys.mutex. */
last(page_id_t page_id)278 	init& last(page_id_t page_id)
279 	{
280 		ut_ad(mutex_own(&recv_sys.mutex));
281 		return inits.find(page_id)->second;
282 	}
283 
284 	/** At the end of each recovery batch, reset the 'created' flags. */
reset()285 	void reset()
286 	{
287 		ut_ad(mutex_own(&recv_sys.mutex));
288 		ut_ad(recv_no_ibuf_operations);
289 		for (map::value_type& i : inits) {
290 			i.second.created = false;
291 		}
292 	}
293 
294 	/** On the last recovery batch, merge buffered changes to those
295 	pages that were initialized by buf_page_create() and still reside
296 	in the buffer pool. Stale pages are not allowed in the buffer pool.
297 
298 	Note: When MDEV-14481 implements redo log apply in the
299 	background, we will have to ensure that buf_page_get_gen()
300 	will not deliver stale pages to users (pages on which the
301 	change buffer was not merged yet).  Normally, the change
302 	buffer merge is performed on I/O completion. Maybe, add a
303 	flag to buf_page_t and perform the change buffer merge on
304 	the first actual access?
305 	@param[in,out]	mtr	dummy mini-transaction */
ibuf_merge(mtr_t & mtr)306 	void ibuf_merge(mtr_t& mtr)
307 	{
308 		ut_ad(mutex_own(&recv_sys.mutex));
309 		ut_ad(!recv_no_ibuf_operations);
310 		mtr.start();
311 
312 		for (const map::value_type& i : inits) {
313 			if (!i.second.created) {
314 				continue;
315 			}
316 			if (buf_block_t* block = buf_page_get_low(
317 				    i.first, 0, RW_X_LATCH, NULL,
318 				    BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
319 				    &mtr, NULL)) {
320 				mutex_exit(&recv_sys.mutex);
321 				ibuf_merge_or_delete_for_page(
322 					block, i.first,
323 					block->zip_size());
324 				mtr.commit();
325 				mtr.start();
326 				mutex_enter(&recv_sys.mutex);
327 			}
328 		}
329 
330 		mtr.commit();
331 	}
332 
333 	/** Clear the data structure */
clear()334 	void clear() { inits.clear(); }
335 };
336 
337 static mlog_init_t mlog_init;
338 
339 /** Process a MLOG_CREATE2 record that indicates that a tablespace
340 is being shrunk in size.
341 @param[in]	space_id	tablespace identifier
342 @param[in]	pages		trimmed size of the file, in pages
343 @param[in]	lsn		log sequence number of the operation */
recv_addr_trim(ulint space_id,unsigned pages,lsn_t lsn)344 static void recv_addr_trim(ulint space_id, unsigned pages, lsn_t lsn)
345 {
346 	DBUG_ENTER("recv_addr_trim");
347 	DBUG_LOG("ib_log",
348 		 "discarding log beyond end of tablespace "
349 		 << page_id_t(space_id, pages) << " before LSN " << lsn);
350 	ut_ad(mutex_own(&recv_sys.mutex));
351 	for (ulint i = recv_sys.addr_hash->n_cells; i--; ) {
352 		hash_cell_t* const cell = hash_get_nth_cell(
353 			recv_sys.addr_hash, i);
354 		for (recv_addr_t* addr = static_cast<recv_addr_t*>(cell->node),
355 			     *next;
356 		     addr; addr = next) {
357 			next = static_cast<recv_addr_t*>(addr->addr_hash);
358 
359 			if (addr->space != space_id || addr->page_no < pages) {
360 				continue;
361 			}
362 
363 			for (recv_t* recv = UT_LIST_GET_FIRST(addr->rec_list);
364 			     recv; ) {
365 				recv_t* n = UT_LIST_GET_NEXT(rec_list, recv);
366 				if (recv->start_lsn < lsn) {
367 					DBUG_PRINT("ib_log",
368 						   ("Discarding %s for"
369 						    " page %u:%u at " LSN_PF,
370 						    get_mlog_string(
371 							    recv->type),
372 						    addr->space, addr->page_no,
373 						    recv->start_lsn));
374 					UT_LIST_REMOVE(addr->rec_list, recv);
375 				}
376 				recv = n;
377 			}
378 		}
379 	}
380 	if (fil_space_t* space = fil_space_get(space_id)) {
381 		ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
382 		fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
383 		ut_ad(file->is_open());
384 		os_file_truncate(file->name, file->handle,
385 				 os_offset_t(pages) << srv_page_size_shift,
386 				 true);
387 	}
388 	DBUG_VOID_RETURN;
389 }
390 
391 /** Process a file name from a MLOG_FILE_* record.
392 @param[in,out]	name		file name
393 @param[in]	len		length of the file name
394 @param[in]	space_id	the tablespace ID
395 @param[in]	deleted		whether this is a MLOG_FILE_DELETE record */
396 static
397 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)398 fil_name_process(
399 	char*	name,
400 	ulint	len,
401 	ulint	space_id,
402 	bool	deleted)
403 {
404 	if (srv_operation == SRV_OPERATION_BACKUP) {
405 		return;
406 	}
407 
408 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
409 	      || is_mariabackup_restore_or_export());
410 
411 	/* We will also insert space=NULL into the map, so that
412 	further checks can ensure that a MLOG_FILE_NAME record was
413 	scanned before applying any page records for the space_id. */
414 
415 	os_normalize_path(name);
416 	file_name_t	fname(std::string(name, len - 1), deleted);
417 	std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
418 		std::make_pair(space_id, fname));
419 	ut_ad(p.first->first == space_id);
420 
421 	file_name_t&	f = p.first->second;
422 
423 	if (deleted) {
424 		/* Got MLOG_FILE_DELETE */
425 
426 		if (!p.second && f.status != file_name_t::DELETED) {
427 			f.status = file_name_t::DELETED;
428 			if (f.space != NULL) {
429 				fil_space_free(space_id, false);
430 				f.space = NULL;
431 			}
432 		}
433 
434 		ut_ad(f.space == NULL);
435 	} else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
436 		   || f.name != fname.name) {
437 		fil_space_t*	space;
438 
439 		/* Check if the tablespace file exists and contains
440 		the space_id. If not, ignore the file after displaying
441 		a note. Abort if there are multiple files with the
442 		same space_id. */
443 		switch (fil_ibd_load(space_id, name, space)) {
444 		case FIL_LOAD_OK:
445 			ut_ad(space != NULL);
446 
447 			if (!f.space) {
448 				if (f.size
449 				    || f.flags != f.initial_flags) {
450 					fil_space_set_recv_size_and_flags(
451 						space->id, f.size, f.flags);
452 				}
453 
454 				f.space = space;
455 				goto same_space;
456 			} else if (f.space == space) {
457 same_space:
458 				f.name = fname.name;
459 				f.status = file_name_t::NORMAL;
460 			} else {
461 				ib::error() << "Tablespace " << space_id
462 					<< " has been found in two places: '"
463 					<< f.name << "' and '" << name << "'."
464 					" You must delete one of them.";
465 				recv_sys.found_corrupt_fs = true;
466 			}
467 			break;
468 
469 		case FIL_LOAD_ID_CHANGED:
470 			ut_ad(space == NULL);
471 			break;
472 
473 		case FIL_LOAD_NOT_FOUND:
474 			/* No matching tablespace was found; maybe it
475 			was renamed, and we will find a subsequent
476 			MLOG_FILE_* record. */
477 			ut_ad(space == NULL);
478 
479 			if (srv_force_recovery) {
480 				/* Without innodb_force_recovery,
481 				missing tablespaces will only be
482 				reported in
483 				recv_init_crash_recovery_spaces().
484 				Enable some more diagnostics when
485 				forcing recovery. */
486 
487 				ib::info()
488 					<< "At LSN: " << recv_sys.recovered_lsn
489 					<< ": unable to open file " << name
490 					<< " for tablespace " << space_id;
491 			}
492 			break;
493 
494 		case FIL_LOAD_INVALID:
495 			ut_ad(space == NULL);
496 			if (srv_force_recovery == 0) {
497 				ib::warn() << "We do not continue the crash"
498 					" recovery, because the table may"
499 					" become corrupt if we cannot apply"
500 					" the log records in the InnoDB log to"
501 					" it. To fix the problem and start"
502 					" mysqld:";
503 				ib::info() << "1) If there is a permission"
504 					" problem in the file and mysqld"
505 					" cannot open the file, you should"
506 					" modify the permissions.";
507 				ib::info() << "2) If the tablespace is not"
508 					" needed, or you can restore an older"
509 					" version from a backup, then you can"
510 					" remove the .ibd file, and use"
511 					" --innodb_force_recovery=1 to force"
512 					" startup without this file.";
513 				ib::info() << "3) If the file system or the"
514 					" disk is broken, and you cannot"
515 					" remove the .ibd file, you can set"
516 					" --innodb_force_recovery.";
517 				recv_sys.found_corrupt_fs = true;
518 				break;
519 			}
520 
521 			ib::info() << "innodb_force_recovery was set to "
522 				<< srv_force_recovery << ". Continuing crash"
523 				" recovery even though we cannot access the"
524 				" files for tablespace " << space_id << ".";
525 			break;
526 		}
527 	}
528 }
529 
530 /** Parse or process a MLOG_FILE_* record.
531 @param[in]	ptr		redo log record
532 @param[in]	end		end of the redo log buffer
533 @param[in]	page_id		first page number in the file
534 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
535 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
536 @param[in]	apply		whether to apply the record
537 @return pointer to next redo log record
538 @retval NULL if this log record was truncated */
539 static
540 byte*
fil_name_parse(byte * ptr,const byte * end,const page_id_t page_id,mlog_id_t type,bool apply)541 fil_name_parse(
542 	byte*		ptr,
543 	const byte*	end,
544 	const page_id_t	page_id,
545 	mlog_id_t	type,
546 	bool		apply)
547 {
548 	if (type == MLOG_FILE_CREATE2) {
549 		if (end < ptr + 4) {
550 			return(NULL);
551 		}
552 		ptr += 4;
553 	}
554 
555 	if (end < ptr + 2) {
556 		return(NULL);
557 	}
558 
559 	ulint	len = mach_read_from_2(ptr);
560 	ptr += 2;
561 	if (end < ptr + len) {
562 		return(NULL);
563 	}
564 
565 	/* MLOG_FILE_* records should only be written for
566 	user-created tablespaces. The name must be long enough
567 	and end in .ibd. */
568 	bool corrupt = is_predefined_tablespace(page_id.space())
569 		|| len < sizeof "/a.ibd\0"
570 		|| (!page_id.page_no() != !memcmp(ptr + len - 5, DOT_IBD, 5));
571 
572 	if (!corrupt && !memchr(ptr, OS_PATH_SEPARATOR, len)) {
573 		if (byte* c = static_cast<byte*>
574 		    (memchr(ptr, OS_PATH_SEPARATOR_ALT, len))) {
575 			ut_ad(c >= ptr);
576 			ut_ad(c < ptr + len);
577 			do {
578 				*c = OS_PATH_SEPARATOR;
579 			} while ((c = static_cast<byte*>
580 				  (memchr(ptr, OS_PATH_SEPARATOR_ALT,
581 					  len - ulint(c - ptr)))) != NULL);
582 		} else {
583 			corrupt = true;
584 		}
585 	}
586 
587 	byte*	end_ptr	= ptr + len;
588 
589 	switch (type) {
590 	default:
591 		ut_ad(0); // the caller checked this
592 		/* fall through */
593 	case MLOG_FILE_NAME:
594 		if (UNIV_UNLIKELY(corrupt)) {
595 			ib::error() << "MLOG_FILE_NAME incorrect:" << ptr;
596 			recv_sys.found_corrupt_log = true;
597 			break;
598 		}
599 
600 		fil_name_process(
601 			reinterpret_cast<char*>(ptr), len, page_id.space(),
602 			false);
603 		break;
604 	case MLOG_FILE_DELETE:
605 		if (UNIV_UNLIKELY(corrupt)) {
606 			ib::error() << "MLOG_FILE_DELETE incorrect:" << ptr;
607 			recv_sys.found_corrupt_log = true;
608 			break;
609 		}
610 
611 		fil_name_process(reinterpret_cast<char*>(ptr), len,
612 				 page_id.space(), true);
613 		/* fall through */
614 	case MLOG_FILE_CREATE2:
615 		if (page_id.page_no()) {
616 			ut_ad(page_id.page_no()
617 			      == SRV_UNDO_TABLESPACE_SIZE_IN_PAGES);
618 			ut_a(srv_is_undo_tablespace(page_id.space()));
619 			compile_time_assert(
620 				UT_ARR_SIZE(recv_sys.truncated_undo_spaces)
621 				== TRX_SYS_MAX_UNDO_SPACES);
622 			recv_sys_t::trunc& t = recv_sys.truncated_undo_spaces[
623 				page_id.space() - srv_undo_space_id_start];
624 			t.lsn = recv_sys.recovered_lsn;
625 			t.pages = uint32_t(page_id.page_no());
626 		} else if (log_file_op) {
627 			log_file_op(page_id.space(),
628 				    type == MLOG_FILE_CREATE2 ? ptr - 4 : NULL,
629 				    ptr, len, NULL, 0);
630 		}
631 		break;
632 	case MLOG_FILE_RENAME2:
633 		if (UNIV_UNLIKELY(corrupt)) {
634 			ib::error() << "MLOG_FILE_RENAME2 incorrect:" << ptr;
635 			recv_sys.found_corrupt_log = true;
636 		}
637 
638 		/* The new name follows the old name. */
639 		byte*	new_name = end_ptr + 2;
640 		if (end < new_name) {
641 			return(NULL);
642 		}
643 
644 		ulint	new_len = mach_read_from_2(end_ptr);
645 
646 		if (end < end_ptr + 2 + new_len) {
647 			return(NULL);
648 		}
649 
650 		end_ptr += 2 + new_len;
651 
652 		corrupt = corrupt
653 			|| new_len < sizeof "/a.ibd\0"
654 			|| memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0;
655 
656 		if (!corrupt && !memchr(new_name, OS_PATH_SEPARATOR, new_len)) {
657 			if (byte* c = static_cast<byte*>
658 			    (memchr(new_name, OS_PATH_SEPARATOR_ALT,
659 				    new_len))) {
660 				ut_ad(c >= new_name);
661 				ut_ad(c < new_name + new_len);
662 				do {
663 					*c = OS_PATH_SEPARATOR;
664 				} while ((c = static_cast<byte*>
665 					  (memchr(ptr, OS_PATH_SEPARATOR_ALT,
666 						  new_len
667 						  - ulint(c - new_name))))
668 					 != NULL);
669 			} else {
670 				corrupt = true;
671 			}
672 		}
673 
674 		if (UNIV_UNLIKELY(corrupt)) {
675 			ib::error() << "MLOG_FILE_RENAME2 new_name incorrect:" << ptr
676 				    << " new_name: " << new_name;
677 			recv_sys.found_corrupt_log = true;
678 			break;
679 		}
680 
681 		fil_name_process(
682 			reinterpret_cast<char*>(ptr), len,
683 			page_id.space(), false);
684 		fil_name_process(
685 			reinterpret_cast<char*>(new_name), new_len,
686 			page_id.space(), false);
687 
688 		if (log_file_op) {
689 			log_file_op(page_id.space(), NULL,
690 				    ptr, len, new_name, new_len);
691 		}
692 
693 		if (!apply) {
694 			break;
695 		}
696 		if (!fil_op_replay_rename(
697 			    page_id.space(), page_id.page_no(),
698 			    reinterpret_cast<const char*>(ptr),
699 			    reinterpret_cast<const char*>(new_name))) {
700 			recv_sys.found_corrupt_fs = true;
701 		}
702 	}
703 
704 	return(end_ptr);
705 }
706 
707 /** Clean up after recv_sys_t::create() */
close()708 void recv_sys_t::close()
709 {
710 	ut_ad(this == &recv_sys);
711 	ut_ad(!recv_writer_thread_active);
712 
713 	if (is_initialised()) {
714 		dblwr.pages.clear();
715 
716 		if (addr_hash) {
717 			hash_table_free(addr_hash);
718 			addr_hash = NULL;
719 		}
720 
721 		if (heap) {
722 			mem_heap_free(heap);
723 			heap = NULL;
724 		}
725 
726 		if (flush_start) {
727 			os_event_destroy(flush_start);
728 		}
729 
730 		if (flush_end) {
731 			os_event_destroy(flush_end);
732 		}
733 
734 		if (buf) {
735 			ut_free_dodump(buf, buf_size);
736 			buf = NULL;
737 		}
738 
739 		buf_size = 0;
740 		mutex_free(&writer_mutex);
741 		mutex_free(&mutex);
742 	}
743 
744 	recv_spaces.clear();
745 	mlog_init.clear();
746 }
747 
748 /************************************************************
749 Reset the state of the recovery system variables. */
750 void
recv_sys_var_init(void)751 recv_sys_var_init(void)
752 /*===================*/
753 {
754 	recv_recovery_on = false;
755 	recv_needed_recovery = false;
756 	recv_lsn_checks_on = false;
757 	recv_no_ibuf_operations = false;
758 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
759 	recv_previous_parsed_rec_offset	= 0;
760 	recv_previous_parsed_rec_is_multi = 0;
761 	recv_max_page_lsn = 0;
762 }
763 
764 /******************************************************************//**
765 recv_writer thread tasked with flushing dirty pages from the buffer
766 pools.
767 @return a dummy parameter */
768 extern "C"
769 os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)770 DECLARE_THREAD(recv_writer_thread)(
771 /*===============================*/
772 	void*	arg MY_ATTRIBUTE((unused)))
773 			/*!< in: a dummy parameter required by
774 			os_thread_create */
775 {
776 	my_thread_init();
777 	ut_ad(!srv_read_only_mode);
778 
779 #ifdef UNIV_PFS_THREAD
780 	pfs_register_thread(recv_writer_thread_key);
781 #endif /* UNIV_PFS_THREAD */
782 
783 #ifdef UNIV_DEBUG_THREAD_CREATION
784 	ib::info() << "recv_writer thread running, id "
785 		<< os_thread_pf(os_thread_get_curr_id());
786 #endif /* UNIV_DEBUG_THREAD_CREATION */
787 
788 	while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
789 
790 		/* Wait till we get a signal to clean the LRU list.
791 		Bounded by max wait time of 100ms. */
792 		int64_t      sig_count = os_event_reset(buf_flush_event);
793 		os_event_wait_time_low(buf_flush_event, 100000, sig_count);
794 
795 		mutex_enter(&recv_sys.writer_mutex);
796 
797 		if (!recv_recovery_is_on()) {
798 			mutex_exit(&recv_sys.writer_mutex);
799 			break;
800 		}
801 
802 		/* Flush pages from end of LRU if required */
803 		os_event_reset(recv_sys.flush_end);
804 		recv_sys.flush_type = BUF_FLUSH_LRU;
805 		os_event_set(recv_sys.flush_start);
806 		os_event_wait(recv_sys.flush_end);
807 
808 		mutex_exit(&recv_sys.writer_mutex);
809 	}
810 
811 	recv_writer_thread_active = false;
812 
813 	my_thread_end();
814 	/* We count the number of threads in os_thread_exit().
815 	A created thread should always use that to exit and not
816 	use return() to exit. */
817 	os_thread_exit();
818 
819 	OS_THREAD_DUMMY_RETURN;
820 }
821 
822 /** Initialize the redo log recovery subsystem. */
create()823 void recv_sys_t::create()
824 {
825 	ut_ad(this == &recv_sys);
826 	ut_ad(!is_initialised());
827 	ut_ad(!flush_start);
828 	ut_ad(!flush_end);
829 	mutex_create(LATCH_ID_RECV_SYS, &mutex);
830 	mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
831 
832 	heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
833 
834 	if (!srv_read_only_mode) {
835 		flush_start = os_event_create(0);
836 		flush_end = os_event_create(0);
837 	}
838 
839 	flush_type = BUF_FLUSH_LRU;
840 	apply_log_recs = false;
841 	apply_batch_on = false;
842 
843 	buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
844 	buf_size = RECV_PARSING_BUF_SIZE;
845 	len = 0;
846 	parse_start_lsn = 0;
847 	scanned_lsn = 0;
848 	scanned_checkpoint_no = 0;
849 	recovered_offset = 0;
850 	recovered_lsn = 0;
851 	found_corrupt_log = false;
852 	found_corrupt_fs = false;
853 	mlog_checkpoint_lsn = 0;
854 
855 	addr_hash = hash_create(buf_pool_get_curr_size() / 512);
856 	n_addrs = 0;
857 	progress_time = time(NULL);
858 	recv_max_page_lsn = 0;
859 
860 	memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
861 	last_stored_lsn = 0;
862 }
863 
864 /** Empty a fully processed set of stored redo log records. */
empty()865 inline void recv_sys_t::empty()
866 {
867 	ut_ad(mutex_own(&mutex));
868 	ut_a(n_addrs == 0);
869 
870 	hash_table_free(addr_hash);
871 	mem_heap_empty(heap);
872 
873 	addr_hash = hash_create(buf_pool_get_curr_size() / 512);
874 }
875 
876 /** Free most recovery data structures. */
debug_free()877 void recv_sys_t::debug_free()
878 {
879 	ut_ad(this == &recv_sys);
880 	ut_ad(is_initialised());
881 	mutex_enter(&mutex);
882 
883 	hash_table_free(addr_hash);
884 	mem_heap_free(heap);
885 	ut_free_dodump(buf, buf_size);
886 
887 	buf = NULL;
888 	heap = NULL;
889 	addr_hash = NULL;
890 
891 	/* wake page cleaner up to progress */
892 	if (!srv_read_only_mode) {
893 		ut_ad(!recv_recovery_is_on());
894 		ut_ad(!recv_writer_thread_active);
895 		os_event_reset(buf_flush_event);
896 		os_event_set(flush_start);
897 	}
898 
899 	mutex_exit(&mutex);
900 }
901 
902 /** Read a log segment to log_sys.buf.
903 @param[in,out]	start_lsn	in: read area start,
904 out: the last read valid lsn
905 @param[in]	end_lsn		read area end
906 @return	whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)907 bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
908 {
909 	ulint	len;
910 	bool success = true;
911 	ut_ad(log_sys.mutex.is_owned());
912 	ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
913 	ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
914 	byte* buf = log_sys.buf;
915 loop:
916 	lsn_t source_offset = calc_lsn_offset(*start_lsn);
917 
918 	ut_a(end_lsn - *start_lsn <= ULINT_MAX);
919 	len = (ulint) (end_lsn - *start_lsn);
920 
921 	ut_ad(len != 0);
922 
923 	const bool at_eof = (source_offset % file_size) + len > file_size;
924 	if (at_eof) {
925 		/* If the above condition is true then len (which is ulint)
926 		is > the expression below, so the typecast is ok */
927 		len = ulint(file_size - (source_offset % file_size));
928 	}
929 
930 	log_sys.n_log_ios++;
931 
932 	MONITOR_INC(MONITOR_LOG_IO);
933 
934 	ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
935 
936 	const ulint	page_no = ulint(source_offset >> srv_page_size_shift);
937 
938 	fil_io(IORequestLogRead, true,
939 	       page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
940 	       0,
941 	       ulint(source_offset & (srv_page_size - 1)),
942 	       len, buf, NULL);
943 
944 	for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
945 		     buf += OS_FILE_LOG_BLOCK_SIZE,
946 		     (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
947 		const ulint block_number = log_block_get_hdr_no(buf);
948 
949 		if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
950 			/* Garbage or an incompletely written log block.
951 			We will not report any error, because this can
952 			happen when InnoDB was killed while it was
953 			writing redo log. We simply treat this as an
954 			abrupt end of the redo log. */
955 fail:
956 			end_lsn = *start_lsn;
957 			success = false;
958 			break;
959 		}
960 
961 		if (innodb_log_checksums || is_encrypted()) {
962 			ulint crc = log_block_calc_checksum_crc32(buf);
963 			ulint cksum = log_block_get_checksum(buf);
964 
965 			DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
966 					 static int block_counter;
967 					 if (block_counter++ == 0) {
968 						 cksum = crc + 1;
969 					 }
970 			 });
971 
972 			DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
973 
974 			if (crc != cksum) {
975 				ib::error_or_warn(srv_operation != SRV_OPERATION_BACKUP)
976 					    << "Invalid log block checksum."
977 					    << " block: " << block_number
978 					    << " checkpoint no: "
979 					    << log_block_get_checkpoint_no(buf)
980 					    << " expected: " << crc
981 					    << " found: " << cksum;
982 				goto fail;
983 			}
984 
985 			if (is_encrypted()
986 			    && !log_crypt(buf, *start_lsn,
987 					  OS_FILE_LOG_BLOCK_SIZE,
988 					  LOG_DECRYPT)) {
989 				goto fail;
990 			}
991 		}
992 
993 		ulint dl = log_block_get_data_len(buf);
994 		if (dl < LOG_BLOCK_HDR_SIZE
995 		    || (dl != OS_FILE_LOG_BLOCK_SIZE
996 			&& dl > log_sys.trailer_offset())) {
997 			recv_sys.found_corrupt_log = true;
998 			goto fail;
999 		}
1000 	}
1001 
1002 	if (recv_sys.report(time(NULL))) {
1003 		ib::info() << "Read redo log up to LSN=" << *start_lsn;
1004 		service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
1005 			"Read redo log up to LSN=" LSN_PF,
1006 			*start_lsn);
1007 	}
1008 
1009 	if (*start_lsn != end_lsn) {
1010 		goto loop;
1011 	}
1012 
1013 	return(success);
1014 }
1015 
1016 
1017 
1018 /********************************************************//**
1019 Copies a log segment from the most up-to-date log group to the other log
1020 groups, so that they all contain the latest log data. Also writes the info
1021 about the latest checkpoint to the groups, and inits the fields in the group
1022 memory structs to up-to-date values. */
1023 static
1024 void
recv_synchronize_groups()1025 recv_synchronize_groups()
1026 {
1027 	const lsn_t recovered_lsn = recv_sys.recovered_lsn;
1028 
1029 	/* Read the last recovered log block to the recovery system buffer:
1030 	the block is always incomplete */
1031 
1032 	lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1033 					       OS_FILE_LOG_BLOCK_SIZE);
1034 	log_sys.log.read_log_seg(&start_lsn,
1035 				 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1036 	log_sys.log.set_fields(recovered_lsn);
1037 
1038 	/* Copy the checkpoint info to the log; remember that we have
1039 	incremented checkpoint_no by one, and the info will not be written
1040 	over the max checkpoint info, thus making the preservation of max
1041 	checkpoint info on disk certain */
1042 
1043 	if (!srv_read_only_mode) {
1044 		log_write_checkpoint_info(true, 0);
1045 		log_mutex_enter();
1046 	}
1047 }
1048 
1049 /** Check the consistency of a log header block.
1050 @param[in]	log header block
1051 @return true if ok */
1052 static
1053 bool
recv_check_log_header_checksum(const byte * buf)1054 recv_check_log_header_checksum(
1055 	const byte*	buf)
1056 {
1057 	return(log_block_get_checksum(buf)
1058 	       == log_block_calc_checksum_crc32(buf));
1059 }
1060 
1061 /** Find the latest checkpoint in the format-0 log header.
1062 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1063 @return error code or DB_SUCCESS */
1064 static MY_ATTRIBUTE((warn_unused_result))
1065 dberr_t
recv_find_max_checkpoint_0(ulint * max_field)1066 recv_find_max_checkpoint_0(ulint* max_field)
1067 {
1068 	ib_uint64_t	max_no = 0;
1069 	ib_uint64_t	checkpoint_no;
1070 	byte*		buf	= log_sys.checkpoint_buf;
1071 
1072 	ut_ad(log_sys.log.format == 0);
1073 
1074 	/** Offset of the first checkpoint checksum */
1075 	static const uint CHECKSUM_1 = 288;
1076 	/** Offset of the second checkpoint checksum */
1077 	static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1078 	/** Most significant bits of the checkpoint offset */
1079 	static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1080 	/** Least significant bits of the checkpoint offset */
1081 	static const uint OFFSET_LOW32 = 16;
1082 
1083 	bool found = false;
1084 
1085 	for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1086 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1087 		log_header_read(field);
1088 
1089 		if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1090 		    != mach_read_from_4(buf + CHECKSUM_1)
1091 		    || static_cast<uint32_t>(
1092 			    ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1093 					   CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1094 		    != mach_read_from_4(buf + CHECKSUM_2)) {
1095 			DBUG_LOG("ib_log",
1096 				 "invalid pre-10.2.2 checkpoint " << field);
1097 			continue;
1098 		}
1099 
1100 		checkpoint_no = mach_read_from_8(
1101 			buf + LOG_CHECKPOINT_NO);
1102 
1103 		if (!log_crypt_101_read_checkpoint(buf)) {
1104 			ib::error() << "Decrypting checkpoint failed";
1105 			continue;
1106 		}
1107 
1108 		DBUG_PRINT("ib_log",
1109 			   ("checkpoint " UINT64PF " at " LSN_PF " found",
1110 			    checkpoint_no,
1111 			    mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
1112 
1113 		if (checkpoint_no >= max_no) {
1114 			found = true;
1115 			*max_field = field;
1116 			max_no = checkpoint_no;
1117 
1118 			log_sys.log.set_lsn(mach_read_from_8(
1119 				buf + LOG_CHECKPOINT_LSN));
1120 			log_sys.log.set_lsn_offset(
1121 				lsn_t(mach_read_from_4(buf + OFFSET_HIGH32))
1122 				<< 32
1123 				| mach_read_from_4(buf + OFFSET_LOW32));
1124 		}
1125 	}
1126 
1127 	if (found) {
1128 		return(DB_SUCCESS);
1129 	}
1130 
1131 	ib::error() << "Upgrade after a crash is not supported."
1132 		" This redo log was created before MariaDB 10.2.2,"
1133 		" and we did not find a valid checkpoint."
1134 		" Please follow the instructions at"
1135 		" https://mariadb.com/kb/en/library/upgrading/";
1136 	return(DB_ERROR);
1137 }
1138 
1139 /** Determine if a pre-MySQL 5.7.9/MariaDB 10.2.2 redo log is clean.
1140 @param[in]	lsn	checkpoint LSN
1141 @param[in]	crypt	whether the log might be encrypted
1142 @return error code
1143 @retval	DB_SUCCESS	if the redo log is clean
1144 @retval DB_ERROR	if the redo log is corrupted or dirty */
recv_log_format_0_recover(lsn_t lsn,bool crypt)1145 static dberr_t recv_log_format_0_recover(lsn_t lsn, bool crypt)
1146 {
1147 	log_mutex_enter();
1148 	const lsn_t	source_offset = log_sys.log.calc_lsn_offset(lsn);
1149 	log_mutex_exit();
1150 	const ulint	page_no = ulint(source_offset >> srv_page_size_shift);
1151 	byte*		buf = log_sys.buf;
1152 
1153 	static const char* NO_UPGRADE_RECOVERY_MSG =
1154 		"Upgrade after a crash is not supported."
1155 		" This redo log was created before MariaDB 10.2.2";
1156 
1157 	fil_io(IORequestLogRead, true,
1158 	       page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1159 	       0,
1160 	       ulint((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1161 		     & (srv_page_size - 1)),
1162 	       OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1163 
1164 	if (log_block_calc_checksum_format_0(buf)
1165 	    != log_block_get_checksum(buf)
1166 	    && !log_crypt_101_read_block(buf)) {
1167 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1168 			<< ", and it appears corrupted.";
1169 		return(DB_CORRUPTION);
1170 	}
1171 
1172 	if (log_block_get_data_len(buf)
1173 	    == (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1174 	} else if (crypt) {
1175 		ib::error() << "Cannot decrypt log for upgrading."
1176 			" The encrypted log was created"
1177 			" before MariaDB 10.2.2.";
1178 		return DB_ERROR;
1179 	} else {
1180 		ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1181 		return(DB_ERROR);
1182 	}
1183 
1184 	/* Mark the redo log for upgrading. */
1185 	srv_log_file_size = 0;
1186 	recv_sys.parse_start_lsn = recv_sys.recovered_lsn
1187 		= recv_sys.scanned_lsn
1188 		= recv_sys.mlog_checkpoint_lsn = lsn;
1189 	log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1190 		= log_sys.lsn = log_sys.write_lsn
1191 		= log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1192 		= lsn;
1193 	log_sys.next_checkpoint_no = 0;
1194 	return(DB_SUCCESS);
1195 }
1196 
1197 /** Find the latest checkpoint in the log header.
1198 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1199 @return error code or DB_SUCCESS */
1200 dberr_t
recv_find_max_checkpoint(ulint * max_field)1201 recv_find_max_checkpoint(ulint* max_field)
1202 {
1203 	ib_uint64_t	max_no;
1204 	ib_uint64_t	checkpoint_no;
1205 	ulint		field;
1206 	byte*		buf;
1207 
1208 	max_no = 0;
1209 	*max_field = 0;
1210 
1211 	buf = log_sys.checkpoint_buf;
1212 
1213 	log_header_read(0);
1214 	/* Check the header page checksum. There was no
1215 	checksum in the first redo log format (version 0). */
1216 	log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1217 	log_sys.log.subformat = log_sys.log.format != log_t::FORMAT_3_23
1218 		? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1219 		: 0;
1220 	if (log_sys.log.format != log_t::FORMAT_3_23
1221 	    && !recv_check_log_header_checksum(buf)) {
1222 		ib::error() << "Invalid redo log header checksum.";
1223 		return(DB_CORRUPTION);
1224 	}
1225 
1226 	char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1227 
1228 	memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1229 	/* Ensure that the string is NUL-terminated. */
1230 	creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1231 
1232 	switch (log_sys.log.format) {
1233 	case log_t::FORMAT_3_23:
1234 		return(recv_find_max_checkpoint_0(max_field));
1235 	case log_t::FORMAT_10_2:
1236 	case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
1237 	case log_t::FORMAT_10_3:
1238 	case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
1239 	case log_t::FORMAT_10_4:
1240 	case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
1241 		break;
1242 	default:
1243 		ib::error() << "Unsupported redo log format."
1244 			" The redo log was created with " << creator << ".";
1245 		return(DB_ERROR);
1246 	}
1247 
1248 	for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1249 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1250 
1251 		log_header_read(field);
1252 
1253 		const ulint crc32 = log_block_calc_checksum_crc32(buf);
1254 		const ulint cksum = log_block_get_checksum(buf);
1255 
1256 		if (crc32 != cksum) {
1257 			DBUG_PRINT("ib_log",
1258 				   ("invalid checkpoint,"
1259 				    " at " ULINTPF
1260 				    ", checksum " ULINTPFx
1261 				    " expected " ULINTPFx,
1262 				    field, cksum, crc32));
1263 			continue;
1264 		}
1265 
1266 		if (log_sys.is_encrypted()
1267 		    && !log_crypt_read_checkpoint_buf(buf)) {
1268 			ib::error() << "Reading checkpoint"
1269 				" encryption info failed.";
1270 			continue;
1271 		}
1272 
1273 		checkpoint_no = mach_read_from_8(
1274 			buf + LOG_CHECKPOINT_NO);
1275 
1276 		DBUG_PRINT("ib_log",
1277 			   ("checkpoint " UINT64PF " at " LSN_PF " found",
1278 			    checkpoint_no, mach_read_from_8(
1279 				    buf + LOG_CHECKPOINT_LSN)));
1280 
1281 		if (checkpoint_no >= max_no) {
1282 			*max_field = field;
1283 			max_no = checkpoint_no;
1284 			log_sys.log.set_lsn(mach_read_from_8(
1285 				buf + LOG_CHECKPOINT_LSN));
1286 			log_sys.log.set_lsn_offset(mach_read_from_8(
1287 				buf + LOG_CHECKPOINT_OFFSET));
1288 			log_sys.next_checkpoint_no = checkpoint_no;
1289 		}
1290 	}
1291 
1292 	if (*max_field == 0) {
1293 		/* Before 10.2.2, we could get here during database
1294 		initialization if we created an ib_logfile0 file that
1295 		was filled with zeroes, and were killed. After
1296 		10.2.2, we would reject such a file already earlier,
1297 		when checking the file header. */
1298 		ib::error() << "No valid checkpoint found"
1299 			" (corrupted redo log)."
1300 			" You can try --innodb-force-recovery=6"
1301 			" as a last resort.";
1302 		return(DB_ERROR);
1303 	}
1304 
1305 	return(DB_SUCCESS);
1306 }
1307 
1308 /** Try to parse a single log record body and also applies it if
1309 specified.
1310 @param[in]	type		redo log entry type
1311 @param[in]	ptr		redo log record body
1312 @param[in]	end_ptr		end of buffer
1313 @param[in]	page_id		page identifier
1314 @param[in]	apply		whether to apply the record
1315 @param[in,out]	block		buffer block, or NULL if
1316 a page log record should not be applied
1317 or if it is a MLOG_FILE_ operation
1318 @param[in,out]	mtr		mini-transaction, or NULL if
1319 a page log record should not be applied
1320 @return log record end, NULL if not a complete record */
1321 static
1322 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,const page_id_t page_id,bool apply,buf_block_t * block,mtr_t * mtr)1323 recv_parse_or_apply_log_rec_body(
1324 	mlog_id_t	type,
1325 	byte*		ptr,
1326 	byte*		end_ptr,
1327 	const page_id_t	page_id,
1328 	bool		apply,
1329 	buf_block_t*	block,
1330 	mtr_t*		mtr)
1331 {
1332 	ut_ad(!block == !mtr);
1333 	ut_ad(!apply || recv_sys.mlog_checkpoint_lsn);
1334 
1335 	switch (type) {
1336 	case MLOG_FILE_NAME:
1337 	case MLOG_FILE_DELETE:
1338 	case MLOG_FILE_CREATE2:
1339 	case MLOG_FILE_RENAME2:
1340 		ut_ad(block == NULL);
1341 		/* Collect the file names when parsing the log,
1342 		before applying any log records. */
1343 		return fil_name_parse(ptr, end_ptr, page_id, type, apply);
1344 	case MLOG_INDEX_LOAD:
1345 		if (end_ptr < ptr + 8) {
1346 			return(NULL);
1347 		}
1348 		return(ptr + 8);
1349 	case MLOG_TRUNCATE:
1350 		ib::error() << "Cannot crash-upgrade from "
1351 			"old-style TRUNCATE TABLE";
1352 		recv_sys.found_corrupt_log = true;
1353 		return NULL;
1354 	default:
1355 		break;
1356 	}
1357 
1358 	dict_index_t*	index	= NULL;
1359 	page_t*		page;
1360 	page_zip_des_t*	page_zip;
1361 #ifdef UNIV_DEBUG
1362 	ulint		page_type;
1363 #endif /* UNIV_DEBUG */
1364 
1365 	if (block) {
1366 		/* Applying a page log record. */
1367 		ut_ad(apply);
1368 		page = block->frame;
1369 		page_zip = buf_block_get_page_zip(block);
1370 		ut_d(page_type = fil_page_get_type(page));
1371 	} else if (apply
1372 		   && !is_predefined_tablespace(page_id.space())
1373 		   && recv_spaces.find(page_id.space()) == recv_spaces.end()) {
1374 		if (recv_sys.recovered_lsn < recv_sys.mlog_checkpoint_lsn) {
1375 			/* We have not seen all records between the
1376 			checkpoint and MLOG_CHECKPOINT. There should be
1377 			a MLOG_FILE_DELETE for this tablespace later. */
1378 			recv_spaces.insert(
1379 				std::make_pair(page_id.space(),
1380 					       file_name_t("", false)));
1381 			goto parse_log;
1382 		}
1383 
1384 		ib::error() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE"
1385 			" for redo log record " << type << page_id << " at "
1386 			    << recv_sys.recovered_lsn << ".";
1387 		recv_sys.found_corrupt_log = true;
1388 		return(NULL);
1389 	} else {
1390 parse_log:
1391 		/* Parsing a page log record. */
1392 		page = NULL;
1393 		page_zip = NULL;
1394 		ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1395 	}
1396 
1397 	const byte*	old_ptr = ptr;
1398 
1399 	switch (type) {
1400 #ifdef UNIV_LOG_LSN_DEBUG
1401 	case MLOG_LSN:
1402 		/* The LSN is checked in recv_parse_log_rec(). */
1403 		break;
1404 #endif /* UNIV_LOG_LSN_DEBUG */
1405 	case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1406 	case MLOG_MEMSET:
1407 #ifdef UNIV_DEBUG
1408 		if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1409 		    && end_ptr >= ptr + 2) {
1410 			/* It is OK to set FIL_PAGE_TYPE and certain
1411 			list node fields on an empty page.  Any other
1412 			write is not OK. */
1413 
1414 			/* NOTE: There may be bogus assertion failures for
1415 			dict_hdr_create(), trx_rseg_header_create(),
1416 			trx_sys_create_doublewrite_buf(), and
1417 			trx_sysf_create().
1418 			These are only called during database creation. */
1419 			ulint	offs = mach_read_from_2(ptr);
1420 
1421 			switch (type) {
1422 			default:
1423 				ut_error;
1424 			case MLOG_2BYTES:
1425 				/* Note that this can fail when the
1426 				redo log been written with something
1427 				older than InnoDB Plugin 1.0.4. */
1428 				ut_ad(offs == FIL_PAGE_TYPE
1429 				      || srv_is_undo_tablespace(
1430 					      page_id.space())
1431 				      || offs == IBUF_TREE_SEG_HEADER
1432 				      + IBUF_HEADER + FSEG_HDR_OFFSET
1433 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1434 				      + PAGE_HEADER + FIL_ADDR_BYTE
1435 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1436 				      + PAGE_HEADER + FIL_ADDR_BYTE
1437 				      + FIL_ADDR_SIZE
1438 				      || offs == PAGE_BTR_SEG_LEAF
1439 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1440 				      || offs == PAGE_BTR_SEG_TOP
1441 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1442 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1443 				      + PAGE_HEADER + FIL_ADDR_BYTE
1444 				      + 0 /*FLST_PREV*/
1445 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1446 				      + PAGE_HEADER + FIL_ADDR_BYTE
1447 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1448 				break;
1449 			case MLOG_4BYTES:
1450 				/* Note that this can fail when the
1451 				redo log been written with something
1452 				older than InnoDB Plugin 1.0.4. */
1453 				ut_ad(0
1454 				      /* fil_crypt_rotate_page() writes this */
1455 				      || offs == FIL_PAGE_SPACE_ID
1456 				      || srv_is_undo_tablespace(
1457 					      page_id.space())
1458 				      || offs == IBUF_TREE_SEG_HEADER
1459 				      + IBUF_HEADER + FSEG_HDR_SPACE
1460 				      || offs == IBUF_TREE_SEG_HEADER
1461 				      + IBUF_HEADER + FSEG_HDR_PAGE_NO
1462 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1463 				      + PAGE_HEADER/* flst_init */
1464 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1465 				      + PAGE_HEADER + FIL_ADDR_PAGE
1466 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1467 				      + PAGE_HEADER + FIL_ADDR_PAGE
1468 				      + FIL_ADDR_SIZE
1469 				      || offs == PAGE_BTR_SEG_LEAF
1470 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1471 				      || offs == PAGE_BTR_SEG_LEAF
1472 				      + PAGE_HEADER + FSEG_HDR_SPACE
1473 				      || offs == PAGE_BTR_SEG_TOP
1474 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1475 				      || offs == PAGE_BTR_SEG_TOP
1476 				      + PAGE_HEADER + FSEG_HDR_SPACE
1477 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1478 				      + PAGE_HEADER + FIL_ADDR_PAGE
1479 				      + 0 /*FLST_PREV*/
1480 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1481 				      + PAGE_HEADER + FIL_ADDR_PAGE
1482 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1483 				break;
1484 			}
1485 		}
1486 #endif /* UNIV_DEBUG */
1487 		ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1488 		if (ptr != NULL && page != NULL
1489 		    && page_id.page_no() == 0 && type == MLOG_4BYTES) {
1490 			ulint	offs = mach_read_from_2(old_ptr);
1491 			switch (offs) {
1492 				fil_space_t*	space;
1493 				ulint		val;
1494 			default:
1495 				break;
1496 			case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1497 			case FSP_HEADER_OFFSET + FSP_SIZE:
1498 			case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1499 			case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1500 				space = fil_space_get(page_id.space());
1501 				ut_a(space != NULL);
1502 				val = mach_read_from_4(page + offs);
1503 
1504 				switch (offs) {
1505 				case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1506 					space->flags = val;
1507 					break;
1508 				case FSP_HEADER_OFFSET + FSP_SIZE:
1509 					space->size_in_header = val;
1510 					break;
1511 				case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1512 					space->free_limit = val;
1513 					break;
1514 				case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1515 					space->free_len = val;
1516 					ut_ad(val == flst_get_len(
1517 						      page + offs));
1518 					break;
1519 				}
1520 			}
1521 		}
1522 		break;
1523 	case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1524 		ut_ad(!page || fil_page_type_is_index(page_type));
1525 
1526 		if (NULL != (ptr = mlog_parse_index(
1527 				     ptr, end_ptr,
1528 				     type == MLOG_COMP_REC_INSERT,
1529 				     &index))) {
1530 			ut_a(!page
1531 			     || (ibool)!!page_is_comp(page)
1532 			     == dict_table_is_comp(index->table));
1533 			ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1534 							block, index, mtr);
1535 		}
1536 		break;
1537 	case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1538 		ut_ad(!page || fil_page_type_is_index(page_type));
1539 
1540 		if (NULL != (ptr = mlog_parse_index(
1541 				     ptr, end_ptr,
1542 				     type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1543 				     &index))) {
1544 			ut_a(!page
1545 			     || (ibool)!!page_is_comp(page)
1546 			     == dict_table_is_comp(index->table));
1547 			ptr = btr_cur_parse_del_mark_set_clust_rec(
1548 				ptr, end_ptr, page, page_zip, index);
1549 		}
1550 		break;
1551 	case MLOG_REC_SEC_DELETE_MARK:
1552 		ut_ad(!page || fil_page_type_is_index(page_type));
1553 		ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1554 							 page, page_zip);
1555 		break;
1556 	case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1557 		ut_ad(!page || fil_page_type_is_index(page_type));
1558 
1559 		if (NULL != (ptr = mlog_parse_index(
1560 				     ptr, end_ptr,
1561 				     type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1562 				     &index))) {
1563 			ut_a(!page
1564 			     || (ibool)!!page_is_comp(page)
1565 			     == dict_table_is_comp(index->table));
1566 			ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1567 							    page_zip, index);
1568 		}
1569 		break;
1570 	case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1571 	case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1572 		ut_ad(!page || fil_page_type_is_index(page_type));
1573 
1574 		if (NULL != (ptr = mlog_parse_index(
1575 				     ptr, end_ptr,
1576 				     type == MLOG_COMP_LIST_END_DELETE
1577 				     || type == MLOG_COMP_LIST_START_DELETE,
1578 				     &index))) {
1579 			ut_a(!page
1580 			     || (ibool)!!page_is_comp(page)
1581 			     == dict_table_is_comp(index->table));
1582 			ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1583 							 block, index, mtr);
1584 		}
1585 		break;
1586 	case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1587 		ut_ad(!page || fil_page_type_is_index(page_type));
1588 
1589 		if (NULL != (ptr = mlog_parse_index(
1590 				     ptr, end_ptr,
1591 				     type == MLOG_COMP_LIST_END_COPY_CREATED,
1592 				     &index))) {
1593 			ut_a(!page
1594 			     || (ibool)!!page_is_comp(page)
1595 			     == dict_table_is_comp(index->table));
1596 			ptr = page_parse_copy_rec_list_to_created_page(
1597 				ptr, end_ptr, block, index, mtr);
1598 		}
1599 		break;
1600 	case MLOG_PAGE_REORGANIZE:
1601 	case MLOG_COMP_PAGE_REORGANIZE:
1602 	case MLOG_ZIP_PAGE_REORGANIZE:
1603 		ut_ad(!page || fil_page_type_is_index(page_type));
1604 
1605 		if (NULL != (ptr = mlog_parse_index(
1606 				     ptr, end_ptr,
1607 				     type != MLOG_PAGE_REORGANIZE,
1608 				     &index))) {
1609 			ut_a(!page
1610 			     || (ibool)!!page_is_comp(page)
1611 			     == dict_table_is_comp(index->table));
1612 			ptr = btr_parse_page_reorganize(
1613 				ptr, end_ptr, index,
1614 				type == MLOG_ZIP_PAGE_REORGANIZE,
1615 				block, mtr);
1616 		}
1617 		break;
1618 	case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
1619 		/* Allow anything in page_type when creating a page. */
1620 		ut_a(!page_zip);
1621 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
1622 		break;
1623 	case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
1624 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
1625 				  true);
1626 		break;
1627 	case MLOG_UNDO_INSERT:
1628 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1629 		ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
1630 		break;
1631 	case MLOG_UNDO_ERASE_END:
1632 		if (page) {
1633 			ut_ad(page_type == FIL_PAGE_UNDO_LOG);
1634 			trx_undo_erase_page_end(page);
1635 		}
1636 		break;
1637 	case MLOG_UNDO_INIT:
1638 		/* Allow anything in page_type when creating a page. */
1639 		ptr = trx_undo_parse_page_init(ptr, end_ptr, page);
1640 		break;
1641 	case MLOG_UNDO_HDR_REUSE:
1642 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1643 		ptr = trx_undo_parse_page_header_reuse(ptr, end_ptr, page);
1644 		break;
1645 	case MLOG_UNDO_HDR_CREATE:
1646 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1647 		ptr = trx_undo_parse_page_header(ptr, end_ptr, page, mtr);
1648 		break;
1649 	case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
1650 		ut_ad(!page || fil_page_type_is_index(page_type));
1651 		/* On a compressed page, MLOG_COMP_REC_MIN_MARK
1652 		will be followed by MLOG_COMP_REC_DELETE
1653 		or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
1654 		in the same mini-transaction. */
1655 		ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
1656 		ptr = btr_parse_set_min_rec_mark(
1657 			ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
1658 			page, mtr);
1659 		break;
1660 	case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
1661 		ut_ad(!page || fil_page_type_is_index(page_type));
1662 
1663 		if (NULL != (ptr = mlog_parse_index(
1664 				     ptr, end_ptr,
1665 				     type == MLOG_COMP_REC_DELETE,
1666 				     &index))) {
1667 			ut_a(!page
1668 			     || (ibool)!!page_is_comp(page)
1669 			     == dict_table_is_comp(index->table));
1670 			ptr = page_cur_parse_delete_rec(ptr, end_ptr,
1671 							block, index, mtr);
1672 		}
1673 		break;
1674 	case MLOG_IBUF_BITMAP_INIT:
1675 		/* Allow anything in page_type when creating a page. */
1676 		if (block) ibuf_bitmap_init_apply(block);
1677 		break;
1678 	case MLOG_INIT_FILE_PAGE2:
1679 		/* Allow anything in page_type when creating a page. */
1680 		if (block) fsp_apply_init_file_page(block);
1681 		break;
1682 	case MLOG_INIT_FREE_PAGE:
1683 		/* The page can be zero-filled and its previous
1684 		contents can be ignored. We do not write or apply
1685 		this record yet. */
1686 		break;
1687 	case MLOG_WRITE_STRING:
1688 		ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
1689 		break;
1690 	case MLOG_ZIP_WRITE_NODE_PTR:
1691 		ut_ad(!page || fil_page_type_is_index(page_type));
1692 		ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
1693 						    page, page_zip);
1694 		break;
1695 	case MLOG_ZIP_WRITE_BLOB_PTR:
1696 		ut_ad(!page || fil_page_type_is_index(page_type));
1697 		ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
1698 						    page, page_zip);
1699 		break;
1700 	case MLOG_ZIP_WRITE_HEADER:
1701 		ut_ad(!page || fil_page_type_is_index(page_type));
1702 		ptr = page_zip_parse_write_header(ptr, end_ptr,
1703 						  page, page_zip);
1704 		break;
1705 	case MLOG_ZIP_PAGE_COMPRESS:
1706 		/* Allow anything in page_type when creating a page. */
1707 		ptr = page_zip_parse_compress(ptr, end_ptr, block);
1708 		break;
1709 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
1710 		if (NULL != (ptr = mlog_parse_index(
1711 				ptr, end_ptr, TRUE, &index))) {
1712 
1713 			ut_a(!page || ((ibool)!!page_is_comp(page)
1714 				== dict_table_is_comp(index->table)));
1715 			ptr = page_zip_parse_compress_no_data(
1716 				ptr, end_ptr, page, page_zip, index);
1717 		}
1718 		break;
1719 	case MLOG_ZIP_WRITE_TRX_ID:
1720 		/* This must be a clustered index leaf page. */
1721 		ut_ad(!page || page_type == FIL_PAGE_INDEX);
1722 		ptr = page_zip_parse_write_trx_id(ptr, end_ptr,
1723 						  page, page_zip);
1724 		break;
1725 	case MLOG_FILE_WRITE_CRYPT_DATA:
1726 		dberr_t err;
1727 		ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, &err));
1728 
1729 		if (err != DB_SUCCESS) {
1730 			recv_sys.found_corrupt_log = TRUE;
1731 		}
1732 		break;
1733 	default:
1734 		ptr = NULL;
1735 		ib::error() << "Incorrect log record type "
1736 			<< ib::hex(unsigned(type));
1737 
1738 		recv_sys.found_corrupt_log = true;
1739 	}
1740 
1741 	if (index) {
1742 		dict_table_t*	table = index->table;
1743 
1744 		dict_mem_index_free(index);
1745 		dict_mem_table_free(table);
1746 	}
1747 
1748 	return(ptr);
1749 }
1750 
1751 /*********************************************************************//**
1752 Calculates the fold value of a page file address: used in inserting or
1753 searching for a log record in the hash table.
1754 @return folded value */
1755 UNIV_INLINE
1756 ulint
recv_fold(ulint space,ulint page_no)1757 recv_fold(
1758 /*======*/
1759 	ulint	space,	/*!< in: space */
1760 	ulint	page_no)/*!< in: page number */
1761 {
1762 	return(ut_fold_ulint_pair(space, page_no));
1763 }
1764 
1765 /*********************************************************************//**
1766 Calculates the hash value of a page file address: used in inserting or
1767 searching for a log record in the hash table.
1768 @return folded value */
1769 UNIV_INLINE
1770 ulint
recv_hash(ulint space,ulint page_no)1771 recv_hash(
1772 /*======*/
1773 	ulint	space,	/*!< in: space */
1774 	ulint	page_no)/*!< in: page number */
1775 {
1776 	return(hash_calc_hash(recv_fold(space, page_no), recv_sys.addr_hash));
1777 }
1778 
1779 /*********************************************************************//**
1780 Gets the hashed file address struct for a page.
1781 @return file address struct, NULL if not found from the hash table */
1782 static
1783 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)1784 recv_get_fil_addr_struct(
1785 /*=====================*/
1786 	ulint	space,	/*!< in: space id */
1787 	ulint	page_no)/*!< in: page number */
1788 {
1789 	ut_ad(mutex_own(&recv_sys.mutex));
1790 
1791 	recv_addr_t*	recv_addr;
1792 
1793 	for (recv_addr = static_cast<recv_addr_t*>(
1794 			HASH_GET_FIRST(recv_sys.addr_hash,
1795 				       recv_hash(space, page_no)));
1796 	     recv_addr != 0;
1797 	     recv_addr = static_cast<recv_addr_t*>(
1798 		     HASH_GET_NEXT(addr_hash, recv_addr))) {
1799 
1800 		if (recv_addr->space == space
1801 		    && recv_addr->page_no == page_no) {
1802 
1803 			return(recv_addr);
1804 		}
1805 	}
1806 
1807 	return(NULL);
1808 }
1809 
1810 /** Store a redo log record for applying.
1811 @param type	record type
1812 @param space	tablespace identifier
1813 @param page_no	page number
1814 @param body	record body
1815 @param rec_end	end of record
1816 @param lsn	start LSN of the mini-transaction
1817 @param end_lsn	end LSN of the mini-transaction */
add(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t lsn,lsn_t end_lsn)1818 inline void recv_sys_t::add(mlog_id_t type, ulint space, ulint page_no,
1819 			    byte* body, byte* rec_end, lsn_t lsn,
1820 			    lsn_t end_lsn)
1821 {
1822 	ut_ad(type != MLOG_FILE_DELETE);
1823 	ut_ad(type != MLOG_FILE_CREATE2);
1824 	ut_ad(type != MLOG_FILE_RENAME2);
1825 	ut_ad(type != MLOG_FILE_NAME);
1826 	ut_ad(type != MLOG_DUMMY_RECORD);
1827 	ut_ad(type != MLOG_CHECKPOINT);
1828 	ut_ad(type != MLOG_INDEX_LOAD);
1829 	ut_ad(type != MLOG_TRUNCATE);
1830 
1831 	recv_t* recv= static_cast<recv_t*>(mem_heap_alloc(heap, sizeof *recv));
1832 
1833 	recv->type = type;
1834 	recv->len = ulint(rec_end - body);
1835 	recv->start_lsn = lsn;
1836 	recv->end_lsn = end_lsn;
1837 
1838 	recv_addr_t* recv_addr = recv_get_fil_addr_struct(space, page_no);
1839 
1840 	if (recv_addr == NULL) {
1841 		recv_addr = static_cast<recv_addr_t*>(
1842 			mem_heap_alloc(heap, sizeof(recv_addr_t)));
1843 
1844 		recv_addr->space = space;
1845 		recv_addr->page_no = page_no;
1846 		recv_addr->state = RECV_NOT_PROCESSED;
1847 
1848 		UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
1849 
1850 		HASH_INSERT(recv_addr_t, addr_hash, addr_hash,
1851 			    recv_fold(space, page_no), recv_addr);
1852 		n_addrs++;
1853 	}
1854 
1855 	switch (type) {
1856 	case MLOG_INIT_FILE_PAGE2:
1857 	case MLOG_ZIP_PAGE_COMPRESS:
1858 	case MLOG_INIT_FREE_PAGE:
1859 		/* Ignore any earlier redo log records for this page. */
1860 		ut_ad(recv_addr->state == RECV_NOT_PROCESSED
1861 		      || recv_addr->state == RECV_WILL_NOT_READ);
1862 		recv_addr->state = RECV_WILL_NOT_READ;
1863 		mlog_init.add(space, page_no, lsn);
1864 	default:
1865 		break;
1866 	}
1867 
1868 	UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
1869 
1870 	recv_data_t** prev_field = &recv->data;
1871 
1872 	/* Store the log record body in chunks of less than srv_page_size:
1873 	heap grows into the buffer pool, and bigger chunks could not
1874 	be allocated */
1875 
1876 	while (rec_end > body) {
1877 		ulint rec_len = ulint(rec_end - body);
1878 
1879 		if (rec_len > RECV_DATA_BLOCK_SIZE) {
1880 			rec_len = RECV_DATA_BLOCK_SIZE;
1881 		}
1882 
1883 		recv_data_t* recv_data = static_cast<recv_data_t*>(
1884 			mem_heap_alloc(heap, sizeof(recv_data_t) + rec_len));
1885 
1886 		*prev_field = recv_data;
1887 
1888 		memcpy(recv_data + 1, body, rec_len);
1889 
1890 		prev_field = &recv_data->next;
1891 
1892 		body += rec_len;
1893 	}
1894 
1895 	*prev_field = NULL;
1896 }
1897 
1898 /*********************************************************************//**
1899 Copies the log record body from recv to buf. */
1900 static
1901 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)1902 recv_data_copy_to_buf(
1903 /*==================*/
1904 	byte*	buf,	/*!< in: buffer of length at least recv->len */
1905 	recv_t*	recv)	/*!< in: log record */
1906 {
1907 	recv_data_t*	recv_data;
1908 	ulint		part_len;
1909 	ulint		len;
1910 
1911 	len = recv->len;
1912 	recv_data = recv->data;
1913 
1914 	while (len > 0) {
1915 		if (len > RECV_DATA_BLOCK_SIZE) {
1916 			part_len = RECV_DATA_BLOCK_SIZE;
1917 		} else {
1918 			part_len = len;
1919 		}
1920 
1921 		ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
1922 			  part_len);
1923 		buf += part_len;
1924 		len -= part_len;
1925 
1926 		recv_data = recv_data->next;
1927 	}
1928 }
1929 
1930 /** Apply the hashed log records to the page, if the page lsn is less than the
1931 lsn of a log record.
1932 @param[in,out]	block		buffer pool page
1933 @param[in,out]	mtr		mini-transaction
1934 @param[in,out]	recv_addr	recovery address
1935 @param[in,out]	init		page initialization operation, or NULL */
recv_recover_page(buf_block_t * block,mtr_t & mtr,recv_addr_t * recv_addr,mlog_init_t::init * init=NULL)1936 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
1937 			      recv_addr_t* recv_addr,
1938 			      mlog_init_t::init* init = NULL)
1939 {
1940 	page_t*		page;
1941 	page_zip_des_t*	page_zip;
1942 
1943 	ut_ad(mutex_own(&recv_sys.mutex));
1944 	ut_ad(recv_sys.apply_log_recs);
1945 	ut_ad(recv_needed_recovery);
1946 	ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
1947 	ut_ad(recv_addr->state != RECV_PROCESSED);
1948 	ut_ad(!init || init->created);
1949 	ut_ad(!init || init->lsn);
1950 
1951 	if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
1952 		fprintf(stderr, "Applying log to page %u:%u\n",
1953 			recv_addr->space, recv_addr->page_no);
1954 	}
1955 
1956 	DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
1957 
1958 	recv_addr->state = RECV_BEING_PROCESSED;
1959 	mutex_exit(&recv_sys.mutex);
1960 
1961 	page = block->frame;
1962 	page_zip = buf_block_get_page_zip(block);
1963 
1964 	/* The page may have been modified in the buffer pool.
1965 	FIL_PAGE_LSN would only be updated right before flushing. */
1966 	lsn_t page_lsn = buf_page_get_newest_modification(&block->page);
1967 	if (!page_lsn) {
1968 		page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
1969 	}
1970 
1971 	bool free_page = false;
1972 	lsn_t start_lsn = 0, end_lsn = 0;
1973 	const lsn_t init_lsn = init ? init->lsn : 0;
1974 
1975 	for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
1976 	     recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
1977 		ut_ad(recv->start_lsn);
1978 		end_lsn = recv->end_lsn;
1979 		ut_ad(end_lsn <= log_sys.log.scanned_lsn);
1980 
1981 		if (recv->start_lsn < page_lsn) {
1982 			/* Ignore this record, because there are later changes
1983 			for this page. */
1984 			DBUG_LOG("ib_log", "apply skip "
1985 				 << get_mlog_string(recv->type)
1986 				 << " LSN " << recv->start_lsn << " < "
1987 				 << page_lsn);
1988 		} else if (recv->start_lsn < init_lsn) {
1989 			DBUG_LOG("ib_log", "init skip "
1990 				 << get_mlog_string(recv->type)
1991 				 << " LSN " << recv->start_lsn << " < "
1992 				 << init_lsn);
1993 		} else {
1994 			if (recv->type == MLOG_INIT_FREE_PAGE) {
1995 				/* This does not really modify the page. */
1996 				free_page = true;
1997 			} else if (!start_lsn) {
1998 				start_lsn = recv->start_lsn;
1999 			}
2000 
2001 			if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2002 				fprintf(stderr, "apply " LSN_PF ":"
2003 					" %d len " ULINTPF " page %u:%u\n",
2004 					recv->start_lsn, recv->type, recv->len,
2005 					recv_addr->space, recv_addr->page_no);
2006 			}
2007 
2008 			DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
2009 				 << get_mlog_string(recv->type)
2010 				 << " len " << recv->len
2011 				 << " page " << block->page.id);
2012 
2013 			byte* buf;
2014 
2015 			if (recv->len > RECV_DATA_BLOCK_SIZE) {
2016 				/* We have to copy the record body to
2017 				a separate buffer */
2018 				buf = static_cast<byte*>
2019 					(ut_malloc_nokey(recv->len));
2020 				recv_data_copy_to_buf(buf, recv);
2021 			} else {
2022 				buf = reinterpret_cast<byte*>(recv->data)
2023 					+ sizeof *recv->data;
2024 			}
2025 
2026 			recv_parse_or_apply_log_rec_body(
2027 				recv->type, buf, buf + recv->len,
2028 				block->page.id, true, block, &mtr);
2029 
2030 			end_lsn = recv->start_lsn + recv->len;
2031 			mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2032 			mach_write_to_8(srv_page_size
2033 					- FIL_PAGE_END_LSN_OLD_CHKSUM
2034 					+ page, end_lsn);
2035 
2036 			if (page_zip) {
2037 				mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
2038 						end_lsn);
2039 			}
2040 
2041 			if (recv->len > RECV_DATA_BLOCK_SIZE) {
2042 				ut_free(buf);
2043 			}
2044 		}
2045 	}
2046 
2047 #ifdef UNIV_ZIP_DEBUG
2048 	ut_ad(!fil_page_index_page_check(page)
2049 	      || !page_zip
2050 	      || page_zip_validate_low(page_zip, page, NULL, FALSE));
2051 #endif /* UNIV_ZIP_DEBUG */
2052 
2053 	if (start_lsn) {
2054 		log_flush_order_mutex_enter();
2055 		buf_flush_note_modification(block, start_lsn, end_lsn, NULL);
2056 		log_flush_order_mutex_exit();
2057 	} else if (free_page && init) {
2058 		/* There have been no operations than MLOG_INIT_FREE_PAGE.
2059 		Any buffered changes must not be merged. A subsequent
2060 		buf_page_create() from a user thread should discard
2061 		any buffered changes. */
2062 		init->created = false;
2063 		ut_ad(!mtr.has_modifications());
2064 	}
2065 
2066 	/* Make sure that committing mtr does not change the modification
2067 	lsn values of page */
2068 
2069 	mtr.discard_modifications();
2070 	mtr.commit();
2071 
2072 	time_t now = time(NULL);
2073 
2074 	mutex_enter(&recv_sys.mutex);
2075 
2076 	if (recv_max_page_lsn < page_lsn) {
2077 		recv_max_page_lsn = page_lsn;
2078 	}
2079 
2080 	ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
2081 	recv_addr->state = RECV_PROCESSED;
2082 
2083 	ut_a(recv_sys.n_addrs > 0);
2084 	if (ulint n = --recv_sys.n_addrs) {
2085 		if (recv_sys.report(now)) {
2086 			ib::info() << "To recover: " << n << " pages from log";
2087 			service_manager_extend_timeout(
2088 				INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2089 		}
2090 	}
2091 }
2092 
2093 /** Reduces recv_sys.n_addrs for the corrupted page.
2094 This function should called when srv_force_recovery > 0.
2095 @param[in]	page_id	page id of the corrupted page */
recv_recover_corrupt_page(page_id_t page_id)2096 void recv_recover_corrupt_page(page_id_t page_id)
2097 {
2098 	ut_ad(srv_force_recovery);
2099 	mutex_enter(&recv_sys.mutex);
2100 
2101 	if (!recv_sys.apply_log_recs) {
2102 	} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2103 			   page_id.space(), page_id.page_no())) {
2104 		switch (recv_addr->state) {
2105 		case RECV_WILL_NOT_READ:
2106 			ut_ad(!"wrong state");
2107 			break;
2108 		case RECV_BEING_PROCESSED:
2109 		case RECV_PROCESSED:
2110 			break;
2111 		default:
2112 			recv_addr->state = RECV_PROCESSED;
2113 			ut_ad(recv_sys.n_addrs);
2114 			recv_sys.n_addrs--;
2115 		}
2116 	}
2117 
2118 	mutex_exit(&recv_sys.mutex);
2119 }
2120 
2121 /** Apply any buffered redo log to a page that was just read from a data file.
2122 @param[in,out]	bpage	buffer pool page */
recv_recover_page(buf_page_t * bpage)2123 void recv_recover_page(buf_page_t* bpage)
2124 {
2125 	mtr_t mtr;
2126 	mtr.start();
2127 	mtr.set_log_mode(MTR_LOG_NONE);
2128 
2129 	ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
2130 	buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2131 
2132 	/* Move the ownership of the x-latch on the page to
2133 	this OS thread, so that we can acquire a second
2134 	x-latch on it.  This is needed for the operations to
2135 	the page to pass the debug checks. */
2136 	rw_lock_x_lock_move_ownership(&block->lock);
2137 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2138 	ibool	success = buf_page_get_known_nowait(
2139 		RW_X_LATCH, block, BUF_KEEP_OLD,
2140 		__FILE__, __LINE__, &mtr);
2141 	ut_a(success);
2142 
2143 	mutex_enter(&recv_sys.mutex);
2144 	if (!recv_sys.apply_log_recs) {
2145 	} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2146 			   bpage->id.space(), bpage->id.page_no())) {
2147 		switch (recv_addr->state) {
2148 		case RECV_BEING_PROCESSED:
2149 		case RECV_PROCESSED:
2150 			break;
2151 		default:
2152 			recv_recover_page(block, mtr, recv_addr);
2153 			goto func_exit;
2154 		}
2155 	}
2156 
2157 	mtr.commit();
2158 func_exit:
2159 	mutex_exit(&recv_sys.mutex);
2160 	ut_ad(mtr.has_committed());
2161 }
2162 
2163 /** Reads in pages which have hashed log records, from an area around a given
2164 page number.
2165 @param[in]	page_id	page id */
recv_read_in_area(const page_id_t page_id)2166 static void recv_read_in_area(const page_id_t page_id)
2167 {
2168 	ulint	page_nos[RECV_READ_AHEAD_AREA];
2169 	ulint	page_no = page_id.page_no()
2170 		- (page_id.page_no() % RECV_READ_AHEAD_AREA);
2171 	ulint*	p = page_nos;
2172 
2173 	for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
2174 	     page_no < up_limit; page_no++) {
2175 		recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2176 			page_id.space(), page_no);
2177 		if (recv_addr
2178 		    && recv_addr->state == RECV_NOT_PROCESSED
2179 		    && !buf_page_peek(page_id_t(page_id.space(), page_no))) {
2180 			recv_addr->state = RECV_BEING_READ;
2181 			*p++ = page_no;
2182 		}
2183 	}
2184 
2185 	mutex_exit(&recv_sys.mutex);
2186 	buf_read_recv_pages(FALSE, page_id.space(), page_nos,
2187 			    ulint(p - page_nos));
2188 	mutex_enter(&recv_sys.mutex);
2189 }
2190 
2191 /** This is another low level function for the recovery system
2192 to create a page which has buffered page intialization redo log records.
2193 @param[in]	page_id		page to be created using redo logs
2194 @param[in,out]	recv_addr	Hashed redo logs for the given page id
2195 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id,recv_addr_t * recv_addr)2196 static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id,
2197                                                   recv_addr_t* recv_addr)
2198 {
2199   mtr_t mtr;
2200   mlog_init_t::init &i= mlog_init.last(page_id);
2201   const lsn_t end_lsn= UT_LIST_GET_LAST(recv_addr->rec_list)->end_lsn;
2202 
2203   if (end_lsn < i.lsn)
2204   {
2205     DBUG_LOG("ib_log", "skip log for page "
2206              << page_id
2207              << " LSN " << end_lsn
2208              << " < " << i.lsn);
2209     recv_addr->state= RECV_PROCESSED;
2210 ignore:
2211     ut_a(recv_sys.n_addrs);
2212     recv_sys.n_addrs--;
2213     return NULL;
2214   }
2215 
2216   fil_space_t *space= fil_space_acquire_for_io(recv_addr->space);
2217   if (!space)
2218   {
2219     recv_addr->state= RECV_PROCESSED;
2220     goto ignore;
2221   }
2222 
2223   if (space->enable_lsn)
2224   {
2225 init_fail:
2226     space->release_for_io();
2227     recv_addr->state= RECV_NOT_PROCESSED;
2228     return NULL;
2229   }
2230 
2231   /* Determine if a tablespace could be for an internal table
2232   for FULLTEXT INDEX. For those tables, no MLOG_INDEX_LOAD record
2233   used to be written when redo logging was disabled. Hence, we
2234   cannot optimize away page reads, because all the redo
2235   log records for initializing and modifying the page in the
2236   past could be older than the page in the data file.
2237 
2238   The check is too broad, causing all
2239   tables whose names start with FTS_ to skip the optimization. */
2240 
2241   if (strstr(space->name, "/FTS_"))
2242     goto init_fail;
2243 
2244   mtr.start();
2245   mtr.set_log_mode(MTR_LOG_NONE);
2246   buf_block_t *block= buf_page_create(page_id, space->zip_size(), &mtr);
2247   if (recv_addr->state == RECV_PROCESSED)
2248     /* The page happened to exist in the buffer pool, or it was
2249     just being read in. Before buf_page_get_with_no_latch() returned,
2250     all changes must have been applied to the page already. */
2251     mtr.commit();
2252   else
2253   {
2254     i.created= true;
2255     buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2256     recv_recover_page(block, mtr, recv_addr, &i);
2257     ut_ad(mtr.has_committed());
2258   }
2259 
2260   space->release_for_io();
2261   return block;
2262 }
2263 
2264 /** This is a low level function for the recovery system
2265 to create a page which has buffered intialized redo log records.
2266 @param[in]      page_id page to be created using redo logs
2267 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id)2268 buf_block_t* recv_recovery_create_page_low(const page_id_t page_id)
2269 {
2270   buf_block_t* block= nullptr;
2271   mutex_enter(&recv_sys.mutex);
2272   recv_addr_t* recv_addr= recv_get_fil_addr_struct(page_id.space(),
2273                                                    page_id.page_no());
2274   if (recv_addr && recv_addr->state == RECV_WILL_NOT_READ)
2275     block= recv_recovery_create_page_low(page_id, recv_addr);
2276   mutex_exit(&recv_sys.mutex);
2277   return block;
2278 }
2279 
2280 /** Apply the hash table of stored log records to persistent data pages.
2281 @param[in]	last_batch	whether the change buffer merge will be
2282 				performed as part of the operation */
recv_apply_hashed_log_recs(bool last_batch)2283 void recv_apply_hashed_log_recs(bool last_batch)
2284 {
2285 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
2286 	      || is_mariabackup_restore_or_export());
2287 
2288 	mutex_enter(&recv_sys.mutex);
2289 
2290 	while (recv_sys.apply_batch_on) {
2291 		bool abort = recv_sys.found_corrupt_log;
2292 		mutex_exit(&recv_sys.mutex);
2293 
2294 		if (abort) {
2295 			return;
2296 		}
2297 
2298 		os_thread_sleep(500000);
2299 		mutex_enter(&recv_sys.mutex);
2300 	}
2301 
2302 	ut_ad(!last_batch == log_mutex_own());
2303 
2304 	recv_no_ibuf_operations
2305 		= !last_batch || is_mariabackup_restore_or_export();
2306 
2307 	if (ulint n = recv_sys.n_addrs) {
2308 		if (!log_sys.log.subformat && !srv_force_recovery
2309 		    && srv_undo_tablespaces_open) {
2310 			ib::error() << "Recovery of separately logged"
2311 				" TRUNCATE operations is no longer supported."
2312 				" Set innodb_force_recovery=1"
2313 				" if no *trunc.log files exist";
2314 			recv_sys.found_corrupt_log = true;
2315 			mutex_exit(&recv_sys.mutex);
2316 			return;
2317 		}
2318 
2319 		const char* msg = last_batch
2320 			? "Starting final batch to recover "
2321 			: "Starting a batch to recover ";
2322 		ib::info() << msg << n << " pages from redo log.";
2323 		sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
2324 			   msg, n);
2325 	}
2326 	recv_sys.apply_log_recs = true;
2327 	recv_sys.apply_batch_on = true;
2328 
2329 	for (ulint id = srv_undo_tablespaces_open; id--; ) {
2330 		recv_sys_t::trunc& t = recv_sys.truncated_undo_spaces[id];
2331 		if (t.lsn) {
2332 			recv_addr_trim(id + srv_undo_space_id_start, t.pages,
2333 				       t.lsn);
2334 		}
2335 	}
2336 
2337 	mtr_t mtr;
2338 
2339 	for (ulint i = 0; i < hash_get_n_cells(recv_sys.addr_hash); i++) {
2340 		for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
2341 			     HASH_GET_FIRST(recv_sys.addr_hash, i));
2342 		     recv_addr;
2343 		     recv_addr = static_cast<recv_addr_t*>(
2344 				HASH_GET_NEXT(addr_hash, recv_addr))) {
2345 			if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
2346 ignore:
2347 				ut_a(recv_sys.n_addrs);
2348 				recv_sys.n_addrs--;
2349 				continue;
2350 			}
2351 
2352 			switch (recv_addr->state) {
2353 			case RECV_BEING_READ:
2354 			case RECV_BEING_PROCESSED:
2355 			case RECV_PROCESSED:
2356 				continue;
2357 			case RECV_DISCARDED:
2358 				goto ignore;
2359 			case RECV_NOT_PROCESSED:
2360 			case RECV_WILL_NOT_READ:
2361 				break;
2362 			}
2363 
2364 			const page_id_t page_id(recv_addr->space,
2365 						recv_addr->page_no);
2366 
2367 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2368 apply:
2369 				mtr.start();
2370 				mtr.set_log_mode(MTR_LOG_NONE);
2371 				if (buf_block_t* block = buf_page_get_low(
2372 					    page_id, 0, RW_X_LATCH, NULL,
2373 					    BUF_GET_IF_IN_POOL,
2374 					    __FILE__, __LINE__, &mtr, NULL)) {
2375 					buf_block_dbg_add_level(
2376 						block, SYNC_NO_ORDER_CHECK);
2377 					recv_recover_page(block, mtr,
2378 							  recv_addr);
2379 					ut_ad(mtr.has_committed());
2380 				} else {
2381 					mtr.commit();
2382 					recv_read_in_area(page_id);
2383 				}
2384 			} else if (!recv_recovery_create_page_low(
2385 					page_id, recv_addr)) {
2386 				goto apply;
2387 			}
2388 		}
2389 	}
2390 
2391 	/* Wait until all the pages have been processed */
2392 
2393 	while (recv_sys.n_addrs || buf_get_n_pending_read_ios()) {
2394 		const bool abort = recv_sys.found_corrupt_log
2395 			|| recv_sys.found_corrupt_fs;
2396 
2397 		if (recv_sys.found_corrupt_fs && !srv_force_recovery) {
2398 			ib::info() << "Set innodb_force_recovery=1"
2399 				" to ignore corrupted pages.";
2400 		}
2401 
2402 		mutex_exit(&(recv_sys.mutex));
2403 
2404 		if (abort) {
2405 			return;
2406 		}
2407 
2408 		os_thread_sleep(500000);
2409 
2410 		mutex_enter(&(recv_sys.mutex));
2411 	}
2412 
2413 	if (!last_batch) {
2414 		/* Flush all the file pages to disk and invalidate them in
2415 		the buffer pool */
2416 
2417 		mutex_exit(&(recv_sys.mutex));
2418 		log_mutex_exit();
2419 
2420 		/* Stop the recv_writer thread from issuing any LRU
2421 		flush batches. */
2422 		mutex_enter(&recv_sys.writer_mutex);
2423 
2424 		/* Wait for any currently run batch to end. */
2425 		buf_flush_wait_LRU_batch_end();
2426 
2427 		os_event_reset(recv_sys.flush_end);
2428 		recv_sys.flush_type = BUF_FLUSH_LIST;
2429 		os_event_set(recv_sys.flush_start);
2430 		os_event_wait(recv_sys.flush_end);
2431 
2432 		buf_pool_invalidate();
2433 
2434 		/* Allow batches from recv_writer thread. */
2435 		mutex_exit(&recv_sys.writer_mutex);
2436 
2437 		log_mutex_enter();
2438 		mutex_enter(&(recv_sys.mutex));
2439 		mlog_init.reset();
2440 	} else if (!recv_no_ibuf_operations) {
2441 		/* We skipped this in buf_page_create(). */
2442 		mlog_init.ibuf_merge(mtr);
2443 	}
2444 
2445 	recv_sys.apply_log_recs = false;
2446 	recv_sys.apply_batch_on = false;
2447 
2448 	recv_sys.empty();
2449 
2450 	mutex_exit(&recv_sys.mutex);
2451 }
2452 
2453 /** Parse the redo log to set the space recovery size and flags
2454 @param[in]	ptr	pointer to parsing redo buffer
2455 @param[in]	end_ptr	end of the parsing redo buffer
2456 @param[in]	space	tablespace id */
2457 static
recv_parse_set_size_and_flags(const byte * ptr,byte * end_ptr,ulint space)2458 void recv_parse_set_size_and_flags(const byte *ptr, byte *end_ptr,
2459                                    ulint space)
2460 {
2461   switch (const uint16_t offset= mach_read_from_2(ptr))
2462   {
2463   default:
2464     break;
2465   case FSP_HEADER_OFFSET + FSP_SIZE:
2466   case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
2467     ptr += 2;
2468     ulint val= mach_parse_compressed(&ptr, end_ptr);
2469     recv_spaces_t::iterator it= recv_spaces.find(space);
2470 
2471     ut_ad(!recv_sys.mlog_checkpoint_lsn || space == TRX_SYS_SPACE ||
2472           srv_is_undo_tablespace(space) || it != recv_spaces.end());
2473 
2474     if (offset == FSP_HEADER_OFFSET + FSP_SIZE)
2475       fil_space_set_recv_size_and_flags(
2476          space, val, FSP_FLAGS_FCRC32_MASK_MARKER);
2477     else
2478       fil_space_set_recv_size_and_flags(
2479          space, 0, static_cast<uint32_t>(val));
2480 
2481     if (it == recv_spaces.end() || it->second.space)
2482       return;
2483 
2484     if (offset == FSP_HEADER_OFFSET + FSP_SIZE)
2485       it->second.size= val;
2486     else
2487       it->second.flags= static_cast<uint32_t>(val);
2488   }
2489 }
2490 
2491 /** Tries to parse a single log record.
2492 @param[out]	type		log record type
2493 @param[in]	ptr		pointer to a buffer
2494 @param[in]	end_ptr		end of the buffer
2495 @param[out]	space_id	tablespace identifier
2496 @param[out]	page_no		page number
2497 @param[in]	apply		whether to apply MLOG_FILE_* records
2498 @param[out]	body		start of log record body
2499 @return length of the record, or 0 if the record was not complete */
2500 static
2501 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2502 recv_parse_log_rec(
2503 	mlog_id_t*	type,
2504 	byte*		ptr,
2505 	byte*		end_ptr,
2506 	ulint*		space,
2507 	ulint*		page_no,
2508 	bool		apply,
2509 	byte**		body)
2510 {
2511 	byte*	new_ptr;
2512 
2513 	*body = NULL;
2514 
2515 	MEM_UNDEFINED(type, sizeof *type);
2516 	MEM_UNDEFINED(space, sizeof *space);
2517 	MEM_UNDEFINED(page_no, sizeof *page_no);
2518 	MEM_UNDEFINED(body, sizeof *body);
2519 
2520 	if (ptr == end_ptr) {
2521 
2522 		return(0);
2523 	}
2524 
2525 	switch (*ptr) {
2526 #ifdef UNIV_LOG_LSN_DEBUG
2527 	case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2528 	case MLOG_LSN:
2529 		new_ptr = mlog_parse_initial_log_record(
2530 			ptr, end_ptr, type, space, page_no);
2531 		if (new_ptr != NULL) {
2532 			const lsn_t	lsn = static_cast<lsn_t>(
2533 				*space) << 32 | *page_no;
2534 			ut_a(lsn == recv_sys.recovered_lsn);
2535 		}
2536 
2537 		*type = MLOG_LSN;
2538 		return(new_ptr - ptr);
2539 #endif /* UNIV_LOG_LSN_DEBUG */
2540 	case MLOG_MULTI_REC_END:
2541 	case MLOG_DUMMY_RECORD:
2542 		*type = static_cast<mlog_id_t>(*ptr);
2543 		return(1);
2544 	case MLOG_CHECKPOINT:
2545 		if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
2546 			return(0);
2547 		}
2548 		*type = static_cast<mlog_id_t>(*ptr);
2549 		return(SIZE_OF_MLOG_CHECKPOINT);
2550 	case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2551 	case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2552 	case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
2553 		ib::error() << "Incorrect log record type "
2554 			<< ib::hex(unsigned(*ptr));
2555 		recv_sys.found_corrupt_log = true;
2556 		return(0);
2557 	}
2558 
2559 	new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
2560 						page_no);
2561 	*body = new_ptr;
2562 
2563 	if (UNIV_UNLIKELY(!new_ptr)) {
2564 
2565 		return(0);
2566 	}
2567 
2568 	const byte*	old_ptr = new_ptr;
2569 	new_ptr = recv_parse_or_apply_log_rec_body(
2570 		*type, new_ptr, end_ptr, page_id_t(*space, *page_no), apply,
2571 		NULL, NULL);
2572 
2573 	if (UNIV_UNLIKELY(new_ptr == NULL)) {
2574 		return(0);
2575 	}
2576 
2577 	if (*page_no == 0 && *type == MLOG_4BYTES && apply) {
2578 		recv_parse_set_size_and_flags(old_ptr, end_ptr, *space);
2579 	}
2580 
2581 	return ulint(new_ptr - ptr);
2582 }
2583 
2584 /*******************************************************//**
2585 Calculates the new value for lsn when more data is added to the log. */
2586 static
2587 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)2588 recv_calc_lsn_on_data_add(
2589 /*======================*/
2590 	lsn_t		lsn,	/*!< in: old lsn */
2591 	ib_uint64_t	len)	/*!< in: this many bytes of data is
2592 				added, log block headers not included */
2593 {
2594 	unsigned frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
2595 	unsigned payload_size = log_sys.payload_size();
2596 	ut_ad(frag_len < payload_size);
2597 	lsn_t lsn_len = len;
2598 	lsn_len += (lsn_len + frag_len) / payload_size
2599 		* (OS_FILE_LOG_BLOCK_SIZE - payload_size);
2600 
2601 	return(lsn + lsn_len);
2602 }
2603 
2604 /** Prints diagnostic info of corrupt log.
2605 @param[in]	ptr	pointer to corrupt log record
2606 @param[in]	type	type of the log record (could be garbage)
2607 @param[in]	space	tablespace ID (could be garbage)
2608 @param[in]	page_no	page number (could be garbage)
2609 @return whether processing should continue */
2610 ATTRIBUTE_COLD
2611 static
2612 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)2613 recv_report_corrupt_log(
2614 	const byte*	ptr,
2615 	int		type,
2616 	ulint		space,
2617 	ulint		page_no)
2618 {
2619 	ib::error() <<
2620 		"############### CORRUPT LOG RECORD FOUND ##################";
2621 
2622 	const ulint ptr_offset = ulint(ptr - recv_sys.buf);
2623 
2624 	ib::info() << "Log record type " << type << ", page " << space << ":"
2625 		<< page_no << ". Log parsing proceeded successfully up to "
2626 		<< recv_sys.recovered_lsn << ". Previous log record type "
2627 		<< recv_previous_parsed_rec_type << ", is multi "
2628 		<< recv_previous_parsed_rec_is_multi << " Recv offset "
2629 		<< ptr_offset << ", prev "
2630 		<< recv_previous_parsed_rec_offset;
2631 
2632 	ut_ad(ptr <= recv_sys.buf + recv_sys.len);
2633 
2634 	const ulint	limit	= 100;
2635 	const ulint	prev_offset = std::min(recv_previous_parsed_rec_offset,
2636 					       ptr_offset);
2637 	const ulint	before = std::min(prev_offset, limit);
2638 	const ulint	after = std::min(recv_sys.len - ptr_offset, limit);
2639 
2640 	ib::info() << "Hex dump starting " << before << " bytes before and"
2641 		" ending " << after << " bytes after the corrupted record:";
2642 
2643 	const byte* start = recv_sys.buf + prev_offset - before;
2644 
2645 	ut_print_buf(stderr, start, ulint(ptr - start) + after);
2646 	putc('\n', stderr);
2647 
2648 	if (!srv_force_recovery) {
2649 		ib::info() << "Set innodb_force_recovery to ignore this error.";
2650 		return(false);
2651 	}
2652 
2653 	ib::warn() << "The log file may have been corrupt and it is possible"
2654 		" that the log scan did not proceed far enough in recovery!"
2655 		" Please run CHECK TABLE on your InnoDB tables to check"
2656 		" that they are ok! If mysqld crashes after this recovery; "
2657 		<< FORCE_RECOVERY_MSG;
2658 	return(true);
2659 }
2660 
2661 /** Report a MLOG_INDEX_LOAD operation.
2662 @param[in]	space_id	tablespace id
2663 @param[in]	page_no		page number
2664 @param[in]	lsn		log sequence number */
2665 ATTRIBUTE_COLD static void
recv_mlog_index_load(ulint space_id,ulint page_no,lsn_t lsn)2666 recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
2667 {
2668 	recv_spaces_t::iterator it = recv_spaces.find(space_id);
2669 	if (it != recv_spaces.end()) {
2670 		it->second.mlog_index_load(lsn);
2671 	}
2672 
2673 	if (log_optimized_ddl_op) {
2674 		log_optimized_ddl_op(space_id);
2675 	}
2676 }
2677 
2678 /** Check whether read redo log memory exceeds the available memory
2679 of buffer pool. Store last_stored_lsn if it is not in last phase
2680 @param[in]	store		whether to store page operations
2681 @param[in]	available_mem	Available memory in buffer pool to
2682 				read redo logs. */
recv_sys_heap_check(store_t * store,ulint available_mem)2683 static bool recv_sys_heap_check(store_t* store, ulint available_mem)
2684 {
2685   if (*store != STORE_NO && mem_heap_get_size(recv_sys.heap) >= available_mem)
2686   {
2687     if (*store == STORE_YES)
2688       recv_sys.last_stored_lsn= recv_sys.recovered_lsn;
2689 
2690     *store= STORE_NO;
2691     DBUG_PRINT("ib_log",("Ran out of memory and last "
2692 			 "stored lsn " LSN_PF " last stored offset "
2693 			 ULINTPF "\n",
2694 			 recv_sys.recovered_lsn, recv_sys.recovered_offset));
2695     return true;
2696   }
2697 
2698   return false;
2699 }
2700 
2701 /** Parse log records from a buffer and optionally store them to a
2702 hash table to wait merging to file pages.
2703 @param[in]	checkpoint_lsn		the LSN of the latest checkpoint
2704 @param[in]	store			whether to store page operations
2705 @param[in]	available_mem		memory to read the redo logs
2706 @param[in]	apply			whether to apply the records
2707 @return whether MLOG_CHECKPOINT record was seen the first time,
2708 or corruption was noticed */
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t * store,ulint available_mem,bool apply)2709 bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
2710 			 ulint available_mem, bool apply)
2711 {
2712 	byte*		ptr;
2713 	byte*		end_ptr;
2714 	bool		single_rec;
2715 	ulint		len;
2716 	lsn_t		new_recovered_lsn;
2717 	lsn_t		old_lsn;
2718 	mlog_id_t	type;
2719 	ulint		space;
2720 	ulint		page_no;
2721 	byte*		body;
2722 	const bool	last_phase = (*store == STORE_IF_EXISTS);
2723 
2724 	ut_ad(log_mutex_own());
2725 	ut_ad(mutex_own(&recv_sys.mutex));
2726 	ut_ad(recv_sys.parse_start_lsn != 0);
2727 loop:
2728 	ptr = recv_sys.buf + recv_sys.recovered_offset;
2729 
2730 	end_ptr = recv_sys.buf + recv_sys.len;
2731 
2732 	if (ptr == end_ptr) {
2733 
2734 		return(false);
2735 	}
2736 
2737 	/* Check for memory overflow and ignore the parsing of remaining
2738 	redo log records if InnoDB ran out of memory */
2739 	if (recv_sys_heap_check(store, available_mem) && last_phase) {
2740 		return false;
2741 	}
2742 
2743 	switch (*ptr) {
2744 	case MLOG_CHECKPOINT:
2745 #ifdef UNIV_LOG_LSN_DEBUG
2746 	case MLOG_LSN:
2747 #endif /* UNIV_LOG_LSN_DEBUG */
2748 	case MLOG_DUMMY_RECORD:
2749 		single_rec = true;
2750 		break;
2751 	default:
2752 		single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
2753 	}
2754 
2755 	if (single_rec) {
2756 		/* The mtr did not modify multiple pages */
2757 
2758 		old_lsn = recv_sys.recovered_lsn;
2759 
2760 		/* Try to parse a log record, fetching its type, space id,
2761 		page no, and a pointer to the body of the log record */
2762 
2763 		len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
2764 					 &page_no, apply, &body);
2765 
2766 		if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)) {
2767 			recv_report_corrupt_log(ptr, type, space, page_no);
2768 			return(true);
2769 		}
2770 
2771 		if (UNIV_UNLIKELY(recv_sys.found_corrupt_fs)) {
2772 			return(true);
2773 		}
2774 
2775 		if (len == 0) {
2776 			return(false);
2777 		}
2778 
2779 		new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2780 
2781 		if (new_recovered_lsn > recv_sys.scanned_lsn) {
2782 			/* The log record filled a log block, and we require
2783 			that also the next log block should have been scanned
2784 			in */
2785 
2786 			return(false);
2787 		}
2788 
2789 		recv_previous_parsed_rec_type = type;
2790 		recv_previous_parsed_rec_offset = recv_sys.recovered_offset;
2791 		recv_previous_parsed_rec_is_multi = 0;
2792 
2793 		recv_sys.recovered_offset += len;
2794 		recv_sys.recovered_lsn = new_recovered_lsn;
2795 
2796 		switch (type) {
2797 			lsn_t	lsn;
2798 		case MLOG_DUMMY_RECORD:
2799 			/* Do nothing */
2800 			break;
2801 		case MLOG_CHECKPOINT:
2802 			compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
2803 			lsn = mach_read_from_8(ptr + 1);
2804 
2805 			if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2806 				fprintf(stderr,
2807 					"MLOG_CHECKPOINT(" LSN_PF ") %s at "
2808 					LSN_PF "\n", lsn,
2809 					lsn != checkpoint_lsn ? "ignored"
2810 					: recv_sys.mlog_checkpoint_lsn
2811 					? "reread" : "read",
2812 					recv_sys.recovered_lsn);
2813 			}
2814 
2815 			DBUG_PRINT("ib_log",
2816 				   ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
2817 				    LSN_PF,
2818 				    lsn,
2819 				    lsn != checkpoint_lsn ? "ignored"
2820 				    : recv_sys.mlog_checkpoint_lsn
2821 				    ? "reread" : "read",
2822 				    recv_sys.recovered_lsn));
2823 
2824 			if (lsn == checkpoint_lsn) {
2825 				if (recv_sys.mlog_checkpoint_lsn) {
2826 					/* There can be multiple
2827 					MLOG_CHECKPOINT lsn for the
2828 					same checkpoint. */
2829 					break;
2830 				}
2831 				recv_sys.mlog_checkpoint_lsn
2832 					= recv_sys.recovered_lsn;
2833 				return(true);
2834 			}
2835 			break;
2836 #ifdef UNIV_LOG_LSN_DEBUG
2837 		case MLOG_LSN:
2838 			/* Do not add these records to the hash table.
2839 			The page number and space id fields are misused
2840 			for something else. */
2841 			break;
2842 #endif /* UNIV_LOG_LSN_DEBUG */
2843 		default:
2844 			switch (*store) {
2845 			case STORE_NO:
2846 				break;
2847 			case STORE_IF_EXISTS:
2848 				if (fil_space_get_flags(space)
2849 				    == ULINT_UNDEFINED) {
2850 					break;
2851 				}
2852 				/* fall through */
2853 			case STORE_YES:
2854 				recv_sys.add(
2855 					type, space, page_no, body,
2856 					ptr + len, old_lsn,
2857 					recv_sys.recovered_lsn);
2858 			}
2859 			/* fall through */
2860 		case MLOG_INDEX_LOAD:
2861 			if (type == MLOG_INDEX_LOAD) {
2862 				recv_mlog_index_load(space, page_no, old_lsn);
2863 			}
2864 			/* fall through */
2865 		case MLOG_FILE_NAME:
2866 		case MLOG_FILE_DELETE:
2867 		case MLOG_FILE_CREATE2:
2868 		case MLOG_FILE_RENAME2:
2869 		case MLOG_TRUNCATE:
2870 			/* These were already handled by
2871 			recv_parse_log_rec() and
2872 			recv_parse_or_apply_log_rec_body(). */
2873 			DBUG_PRINT("ib_log",
2874 				("scan " LSN_PF ": log rec %s"
2875 				" len " ULINTPF
2876 				" page " ULINTPF ":" ULINTPF,
2877 				old_lsn, get_mlog_string(type),
2878 				len, space, page_no));
2879 		}
2880 	} else {
2881 		/* Check that all the records associated with the single mtr
2882 		are included within the buffer */
2883 
2884 		ulint	total_len	= 0;
2885 		ulint	n_recs		= 0;
2886 		bool	only_mlog_file	= true;
2887 		ulint	mlog_rec_len	= 0;
2888 
2889 		for (;;) {
2890 			len = recv_parse_log_rec(
2891 				&type, ptr, end_ptr, &space, &page_no,
2892 				false, &body);
2893 
2894 			if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)) {
2895 corrupted_log:
2896 				recv_report_corrupt_log(
2897 					ptr, type, space, page_no);
2898 				return(true);
2899 			}
2900 
2901 			if (ptr == end_ptr) {
2902 			} else if (type == MLOG_CHECKPOINT
2903 				   || (*ptr & MLOG_SINGLE_REC_FLAG)) {
2904 				recv_sys.found_corrupt_log = true;
2905 				goto corrupted_log;
2906 			}
2907 
2908 			if (recv_sys.found_corrupt_fs) {
2909 				return(true);
2910 			}
2911 
2912 			if (len == 0) {
2913 				return(false);
2914 			}
2915 
2916 			recv_previous_parsed_rec_type = type;
2917 			recv_previous_parsed_rec_offset
2918 				= recv_sys.recovered_offset + total_len;
2919 			recv_previous_parsed_rec_is_multi = 1;
2920 
2921 			/* MLOG_FILE_NAME redo log records doesn't make changes
2922 			to persistent data. If only MLOG_FILE_NAME redo
2923 			log record exists then reset the parsing buffer pointer
2924 			by changing recovered_lsn and recovered_offset. */
2925 			if (type != MLOG_FILE_NAME && only_mlog_file == true) {
2926 				only_mlog_file = false;
2927 			}
2928 
2929 			if (only_mlog_file) {
2930 				new_recovered_lsn = recv_calc_lsn_on_data_add(
2931 					recv_sys.recovered_lsn, len);
2932 				mlog_rec_len += len;
2933 				recv_sys.recovered_offset += len;
2934 				recv_sys.recovered_lsn = new_recovered_lsn;
2935 			}
2936 
2937 			total_len += len;
2938 			n_recs++;
2939 
2940 			ptr += len;
2941 
2942 			if (type == MLOG_MULTI_REC_END) {
2943 				DBUG_PRINT("ib_log",
2944 					   ("scan " LSN_PF
2945 					    ": multi-log end"
2946 					    " total_len " ULINTPF
2947 					    " n=" ULINTPF,
2948 					    recv_sys.recovered_lsn,
2949 					    total_len, n_recs));
2950 				total_len -= mlog_rec_len;
2951 				break;
2952 			}
2953 
2954 			DBUG_PRINT("ib_log",
2955 				   ("scan " LSN_PF ": multi-log rec %s"
2956 				    " len " ULINTPF
2957 				    " page " ULINTPF ":" ULINTPF,
2958 				    recv_sys.recovered_lsn,
2959 				    get_mlog_string(type), len, space, page_no));
2960 		}
2961 
2962 		new_recovered_lsn = recv_calc_lsn_on_data_add(
2963 			recv_sys.recovered_lsn, total_len);
2964 
2965 		if (new_recovered_lsn > recv_sys.scanned_lsn) {
2966 			/* The log record filled a log block, and we require
2967 			that also the next log block should have been scanned
2968 			in */
2969 
2970 			return(false);
2971 		}
2972 
2973 		/* Add all the records to the hash table */
2974 
2975 		ptr = recv_sys.buf + recv_sys.recovered_offset;
2976 
2977 		for (;;) {
2978 			old_lsn = recv_sys.recovered_lsn;
2979 			/* This will apply MLOG_FILE_ records. We
2980 			had to skip them in the first scan, because we
2981 			did not know if the mini-transaction was
2982 			completely recovered (until MLOG_MULTI_REC_END). */
2983 			len = recv_parse_log_rec(
2984 				&type, ptr, end_ptr, &space, &page_no,
2985 				apply, &body);
2986 
2987 			if (UNIV_UNLIKELY(recv_sys.found_corrupt_log)
2988 			    && !recv_report_corrupt_log(
2989 				    ptr, type, space, page_no)) {
2990 				return(true);
2991 			}
2992 
2993 			if (UNIV_UNLIKELY(recv_sys.found_corrupt_fs)) {
2994 				return(true);
2995 			}
2996 
2997 			ut_a(len != 0);
2998 			ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
2999 
3000 			recv_sys.recovered_offset += len;
3001 			recv_sys.recovered_lsn
3002 				= recv_calc_lsn_on_data_add(old_lsn, len);
3003 
3004 			switch (type) {
3005 			case MLOG_MULTI_REC_END:
3006 				/* Found the end mark for the records */
3007 				goto loop;
3008 #ifdef UNIV_LOG_LSN_DEBUG
3009 			case MLOG_LSN:
3010 				/* Do not add these records to the hash table.
3011 				The page number and space id fields are misused
3012 				for something else. */
3013 				break;
3014 #endif /* UNIV_LOG_LSN_DEBUG */
3015 			case MLOG_INDEX_LOAD:
3016 				recv_mlog_index_load(space, page_no, old_lsn);
3017 				break;
3018 			case MLOG_FILE_NAME:
3019 			case MLOG_FILE_DELETE:
3020 			case MLOG_FILE_CREATE2:
3021 			case MLOG_FILE_RENAME2:
3022 			case MLOG_TRUNCATE:
3023 				/* These were already handled by
3024 				recv_parse_log_rec() and
3025 				recv_parse_or_apply_log_rec_body(). */
3026 				break;
3027 			default:
3028 				switch (*store) {
3029 				case STORE_NO:
3030 					break;
3031 				case STORE_IF_EXISTS:
3032 					if (fil_space_get_flags(space)
3033 					    == ULINT_UNDEFINED) {
3034 						break;
3035 					}
3036 					/* fall through */
3037 				case STORE_YES:
3038 					recv_sys.add(
3039 						type, space, page_no,
3040 						body, ptr + len,
3041 						old_lsn,
3042 						new_recovered_lsn);
3043 				}
3044 			}
3045 
3046 			ptr += len;
3047 		}
3048 	}
3049 
3050 	goto loop;
3051 }
3052 
3053 /** Adds data from a new log block to the parsing buffer of recv_sys if
3054 recv_sys.parse_start_lsn is non-zero.
3055 @param[in]	log_block	log block to add
3056 @param[in]	scanned_lsn	lsn of how far we were able to find
3057 				data in this log block
3058 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3059 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
3060 {
3061 	ulint	more_len;
3062 	ulint	data_len;
3063 	ulint	start_offset;
3064 	ulint	end_offset;
3065 
3066 	ut_ad(scanned_lsn >= recv_sys.scanned_lsn);
3067 
3068 	if (!recv_sys.parse_start_lsn) {
3069 		/* Cannot start parsing yet because no start point for
3070 		it found */
3071 		return(false);
3072 	}
3073 
3074 	data_len = log_block_get_data_len(log_block);
3075 
3076 	if (recv_sys.parse_start_lsn >= scanned_lsn) {
3077 
3078 		return(false);
3079 
3080 	} else if (recv_sys.scanned_lsn >= scanned_lsn) {
3081 
3082 		return(false);
3083 
3084 	} else if (recv_sys.parse_start_lsn > recv_sys.scanned_lsn) {
3085 		more_len = (ulint) (scanned_lsn - recv_sys.parse_start_lsn);
3086 	} else {
3087 		more_len = (ulint) (scanned_lsn - recv_sys.scanned_lsn);
3088 	}
3089 
3090 	if (more_len == 0) {
3091 		return(false);
3092 	}
3093 
3094 	ut_ad(data_len >= more_len);
3095 
3096 	start_offset = data_len - more_len;
3097 
3098 	if (start_offset < LOG_BLOCK_HDR_SIZE) {
3099 		start_offset = LOG_BLOCK_HDR_SIZE;
3100 	}
3101 
3102 	end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
3103 
3104 	ut_ad(start_offset <= end_offset);
3105 
3106 	if (start_offset < end_offset) {
3107 		ut_memcpy(recv_sys.buf + recv_sys.len,
3108 			  log_block + start_offset, end_offset - start_offset);
3109 
3110 		recv_sys.len += end_offset - start_offset;
3111 
3112 		ut_a(recv_sys.len <= RECV_PARSING_BUF_SIZE);
3113 	}
3114 
3115 	return(true);
3116 }
3117 
3118 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()3119 void recv_sys_justify_left_parsing_buf()
3120 {
3121 	memmove(recv_sys.buf, recv_sys.buf + recv_sys.recovered_offset,
3122 		recv_sys.len - recv_sys.recovered_offset);
3123 
3124 	recv_sys.len -= recv_sys.recovered_offset;
3125 
3126 	recv_sys.recovered_offset = 0;
3127 }
3128 
3129 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
3130 Parse and hash the log records if new data found.
3131 Apply log records automatically when the hash table becomes full.
3132 @param[in]	available_mem		we let the hash table of recs to
3133 					grow to this size, at the maximum
3134 @param[in,out]	store_to_hash		whether the records should be
3135 					stored to the hash table; this is
3136 					reset if just debug checking is
3137 					needed, or when the available_mem
3138 					runs out
3139 @param[in]	log_block		log segment
3140 @param[in]	checkpoint_lsn		latest checkpoint LSN
3141 @param[in]	start_lsn		buffer start LSN
3142 @param[in]	end_lsn			buffer end LSN
3143 @param[in,out]	contiguous_lsn		it is known that all groups contain
3144 					contiguous log data upto this lsn
3145 @param[out]	group_scanned_lsn	scanning succeeded upto this lsn
3146 @return true if not able to scan any more in this log group */
recv_scan_log_recs(ulint available_mem,store_t * store_to_hash,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3147 static bool recv_scan_log_recs(
3148 	ulint		available_mem,
3149 	store_t*	store_to_hash,
3150 	const byte*	log_block,
3151 	lsn_t		checkpoint_lsn,
3152 	lsn_t		start_lsn,
3153 	lsn_t		end_lsn,
3154 	lsn_t*		contiguous_lsn,
3155 	lsn_t*		group_scanned_lsn)
3156 {
3157 	lsn_t		scanned_lsn	= start_lsn;
3158 	bool		finished	= false;
3159 	ulint		data_len;
3160 	bool		more_data	= false;
3161 	bool		apply		= recv_sys.mlog_checkpoint_lsn != 0;
3162 	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3163 	const bool	last_phase = (*store_to_hash == STORE_IF_EXISTS);
3164 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3165 	ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3166 	ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
3167 
3168 	const byte* const	log_end = log_block
3169 		+ ulint(end_lsn - start_lsn);
3170 	do {
3171 		ut_ad(!finished);
3172 
3173 		if (log_block_get_flush_bit(log_block)) {
3174 			/* This block was a start of a log flush operation:
3175 			we know that the previous flush operation must have
3176 			been completed for all log groups before this block
3177 			can have been flushed to any of the groups. Therefore,
3178 			we know that log data is contiguous up to scanned_lsn
3179 			in all non-corrupt log groups. */
3180 
3181 			if (scanned_lsn > *contiguous_lsn) {
3182 				*contiguous_lsn = scanned_lsn;
3183 			}
3184 		}
3185 
3186 		data_len = log_block_get_data_len(log_block);
3187 
3188 		if (scanned_lsn + data_len > recv_sys.scanned_lsn
3189 		    && log_block_get_checkpoint_no(log_block)
3190 		    < recv_sys.scanned_checkpoint_no
3191 		    && (recv_sys.scanned_checkpoint_no
3192 			- log_block_get_checkpoint_no(log_block)
3193 			> 0x80000000UL)) {
3194 
3195 			/* Garbage from a log buffer flush which was made
3196 			before the most recent database recovery */
3197 			finished = true;
3198 			break;
3199 		}
3200 
3201 		if (!recv_sys.parse_start_lsn
3202 		    && (log_block_get_first_rec_group(log_block) > 0)) {
3203 
3204 			/* We found a point from which to start the parsing
3205 			of log records */
3206 
3207 			recv_sys.parse_start_lsn = scanned_lsn
3208 				+ log_block_get_first_rec_group(log_block);
3209 			recv_sys.scanned_lsn = recv_sys.parse_start_lsn;
3210 			recv_sys.recovered_lsn = recv_sys.parse_start_lsn;
3211 		}
3212 
3213 		scanned_lsn += data_len;
3214 
3215 		if (data_len == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
3216 		    && scanned_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3217 		    && log_block[LOG_BLOCK_HDR_SIZE] == MLOG_CHECKPOINT
3218 		    && checkpoint_lsn == mach_read_from_8(LOG_BLOCK_HDR_SIZE
3219 							  + 1 + log_block)) {
3220 			/* The redo log is logically empty. */
3221 			ut_ad(recv_sys.mlog_checkpoint_lsn == 0
3222 			      || recv_sys.mlog_checkpoint_lsn
3223 			      == checkpoint_lsn);
3224 			recv_sys.mlog_checkpoint_lsn = checkpoint_lsn;
3225 			DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3226 					      scanned_lsn));
3227 			finished = true;
3228 			break;
3229 		}
3230 
3231 		if (scanned_lsn > recv_sys.scanned_lsn) {
3232 			ut_ad(!srv_log_files_created);
3233 			if (!recv_needed_recovery) {
3234 				recv_needed_recovery = true;
3235 
3236 				if (srv_read_only_mode) {
3237 					ib::warn() << "innodb_read_only"
3238 						" prevents crash recovery";
3239 					return(true);
3240 				}
3241 
3242 				ib::info() << "Starting crash recovery from"
3243 					" checkpoint LSN="
3244 					<< recv_sys.scanned_lsn;
3245 			}
3246 
3247 			/* We were able to find more log data: add it to the
3248 			parsing buffer if parse_start_lsn is already
3249 			non-zero */
3250 
3251 			DBUG_EXECUTE_IF(
3252 				"reduce_recv_parsing_buf",
3253 				recv_parsing_buf_size
3254 					= (70 * 1024);
3255 				);
3256 
3257 			if (recv_sys.len + 4 * OS_FILE_LOG_BLOCK_SIZE
3258 			    >= recv_parsing_buf_size) {
3259 				ib::error() << "Log parsing buffer overflow."
3260 					" Recovery may have failed!";
3261 
3262 				recv_sys.found_corrupt_log = true;
3263 
3264 				if (!srv_force_recovery) {
3265 					ib::error()
3266 						<< "Set innodb_force_recovery"
3267 						" to ignore this error.";
3268 					return(true);
3269 				}
3270 			} else if (!recv_sys.found_corrupt_log) {
3271 				more_data = recv_sys_add_to_parsing_buf(
3272 					log_block, scanned_lsn);
3273 			}
3274 
3275 			recv_sys.scanned_lsn = scanned_lsn;
3276 			recv_sys.scanned_checkpoint_no
3277 				= log_block_get_checkpoint_no(log_block);
3278 		}
3279 
3280 		/* During last phase of scanning, there can be redo logs
3281 		left in recv_sys.buf to parse & store it in recv_sys.heap */
3282 		if (last_phase
3283 		    && recv_sys.recovered_lsn < recv_sys.scanned_lsn) {
3284 			more_data = true;
3285 		}
3286 
3287 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3288 			/* Log data for this group ends here */
3289 			finished = true;
3290 			break;
3291 		} else {
3292 			log_block += OS_FILE_LOG_BLOCK_SIZE;
3293 		}
3294 	} while (log_block < log_end);
3295 
3296 	*group_scanned_lsn = scanned_lsn;
3297 
3298 	mutex_enter(&recv_sys.mutex);
3299 
3300 	if (more_data && !recv_sys.found_corrupt_log) {
3301 		/* Try to parse more log records */
3302 
3303 		if (recv_parse_log_recs(checkpoint_lsn,
3304 					store_to_hash, available_mem,
3305 					apply)) {
3306 			ut_ad(recv_sys.found_corrupt_log
3307 			      || recv_sys.found_corrupt_fs
3308 			      || recv_sys.mlog_checkpoint_lsn
3309 			      == recv_sys.recovered_lsn);
3310 			finished = true;
3311 			goto func_exit;
3312 		}
3313 
3314 		recv_sys_heap_check(store_to_hash, available_mem);
3315 
3316 		if (recv_sys.recovered_offset > recv_parsing_buf_size / 4) {
3317 			/* Move parsing buffer data to the buffer start */
3318 			recv_sys_justify_left_parsing_buf();
3319 		}
3320 
3321 		/* Need to re-parse the redo log which're stored
3322 		in recv_sys.buf */
3323 		if (last_phase && *store_to_hash == STORE_NO) {
3324 			finished = false;
3325 		}
3326 	}
3327 
3328 func_exit:
3329 	mutex_exit(&recv_sys.mutex);
3330 	return(finished);
3331 }
3332 
3333 /** Scans log from a buffer and stores new log data to the parsing buffer.
3334 Parses and hashes the log records if new data found.
3335 @param[in]	checkpoint_lsn		latest checkpoint log sequence number
3336 @param[in,out]	contiguous_lsn		log sequence number
3337 until which all redo log has been scanned
3338 @param[in]	last_phase		whether changes
3339 can be applied to the tablespaces
3340 @return whether rescan is needed (not everything was stored) */
3341 static
3342 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3343 recv_group_scan_log_recs(
3344 	lsn_t		checkpoint_lsn,
3345 	lsn_t*		contiguous_lsn,
3346 	bool		last_phase)
3347 {
3348 	DBUG_ENTER("recv_group_scan_log_recs");
3349 	DBUG_ASSERT(!last_phase || recv_sys.mlog_checkpoint_lsn > 0);
3350 
3351 	mutex_enter(&recv_sys.mutex);
3352 	recv_sys.len = 0;
3353 	recv_sys.recovered_offset = 0;
3354 	recv_sys.n_addrs = 0;
3355 	recv_sys.empty();
3356 	srv_start_lsn = *contiguous_lsn;
3357 	recv_sys.parse_start_lsn = *contiguous_lsn;
3358 	recv_sys.scanned_lsn = *contiguous_lsn;
3359 	recv_sys.recovered_lsn = *contiguous_lsn;
3360 	recv_sys.scanned_checkpoint_no = 0;
3361 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3362 	recv_previous_parsed_rec_offset	= 0;
3363 	recv_previous_parsed_rec_is_multi = 0;
3364 	ut_ad(recv_max_page_lsn == 0);
3365 	ut_ad(last_phase || !recv_writer_thread_active);
3366 	mutex_exit(&recv_sys.mutex);
3367 
3368 	lsn_t	start_lsn;
3369 	lsn_t	end_lsn;
3370 	store_t	store_to_hash	= recv_sys.mlog_checkpoint_lsn == 0
3371 		? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3372 	ulint	available_mem = (buf_pool_get_n_pages() * 2 / 3)
3373 		<< srv_page_size_shift;
3374 
3375 	log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3376 		ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3377 
3378 	do {
3379 		if (last_phase && store_to_hash == STORE_NO) {
3380 			store_to_hash = STORE_IF_EXISTS;
3381 			/* We must not allow change buffer
3382 			merge here, because it would generate
3383 			redo log records before we have
3384 			finished the redo log scan. */
3385 			recv_apply_hashed_log_recs(false);
3386 			/* Rescan the redo logs from last stored lsn */
3387 			end_lsn = recv_sys.recovered_lsn;
3388 		}
3389 
3390 		start_lsn = ut_uint64_align_down(end_lsn,
3391 						 OS_FILE_LOG_BLOCK_SIZE);
3392 		end_lsn = start_lsn;
3393 		log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3394 	} while (end_lsn != start_lsn
3395 		 && !recv_scan_log_recs(
3396 			 available_mem, &store_to_hash, log_sys.buf,
3397 			 checkpoint_lsn,
3398 			 start_lsn, end_lsn,
3399 			 contiguous_lsn, &log_sys.log.scanned_lsn));
3400 
3401 	if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
3402 		DBUG_RETURN(false);
3403 	}
3404 
3405 	DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3406 			      last_phase ? "rescan" : "scan",
3407 			      log_sys.log.scanned_lsn));
3408 
3409 	DBUG_RETURN(store_to_hash == STORE_NO);
3410 }
3411 
3412 /** Report a missing tablespace for which page-redo log exists.
3413 @param[in]	err	previous error code
3414 @param[in]	i	tablespace descriptor
3415 @return new error code */
3416 static
3417 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3418 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3419 {
3420 	if (is_mariabackup_restore_or_export()) {
3421 		if (i->second.name.find(TEMP_TABLE_PATH_PREFIX)
3422 		    != std::string::npos) {
3423 			ib::warn() << "Tablespace " << i->first << " was not"
3424 				" found at " << i->second.name << " when"
3425 				" restoring a (partial?) backup. All redo log"
3426 				" for this file will be ignored!";
3427 		}
3428 		return(err);
3429 	}
3430 
3431 	if (srv_force_recovery == 0) {
3432 		ib::error() << "Tablespace " << i->first << " was not"
3433 			" found at " << i->second.name << ".";
3434 
3435 		if (err == DB_SUCCESS) {
3436 			ib::error() << "Set innodb_force_recovery=1 to"
3437 				" ignore this and to permanently lose"
3438 				" all changes to the tablespace.";
3439 			err = DB_TABLESPACE_NOT_FOUND;
3440 		}
3441 	} else {
3442 		ib::warn() << "Tablespace " << i->first << " was not"
3443 			" found at " << i->second.name << ", and"
3444 			" innodb_force_recovery was set. All redo log"
3445 			" for this tablespace will be ignored!";
3446 	}
3447 
3448 	return(err);
3449 }
3450 
3451 /** Report the missing tablespace and discard the redo logs for the deleted
3452 tablespace.
3453 @param[in]	rescan			rescan of redo logs is needed
3454 					if hash table ran out of memory
3455 @param[out]	missing_tablespace	missing tablespace exists or not
3456 @return error code or DB_SUCCESS. */
3457 static MY_ATTRIBUTE((warn_unused_result))
3458 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3459 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3460 {
3461 	dberr_t err = DB_SUCCESS;
3462 
3463 	for (ulint h = 0; h < hash_get_n_cells(recv_sys.addr_hash); h++) {
3464 		for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
3465 			     HASH_GET_FIRST(recv_sys.addr_hash, h));
3466 		     recv_addr != 0;
3467 		     recv_addr = static_cast<recv_addr_t*>(
3468 			     HASH_GET_NEXT(addr_hash, recv_addr))) {
3469 
3470 			const ulint space = recv_addr->space;
3471 
3472 			if (is_predefined_tablespace(space)) {
3473 				continue;
3474 			}
3475 
3476 			recv_spaces_t::iterator i = recv_spaces.find(space);
3477 			ut_ad(i != recv_spaces.end());
3478 
3479 			switch (i->second.status) {
3480 			case file_name_t::MISSING:
3481 				err = recv_init_missing_space(err, i);
3482 				i->second.status = file_name_t::DELETED;
3483 				/* fall through */
3484 			case file_name_t::DELETED:
3485 				recv_addr->state = RECV_DISCARDED;
3486 				/* fall through */
3487 			case file_name_t::NORMAL:
3488 				continue;
3489 			}
3490 			ut_ad(0);
3491 		}
3492 	}
3493 
3494 	if (err != DB_SUCCESS) {
3495 		return(err);
3496 	}
3497 
3498 	/* When rescan is not needed, recv_sys.addr_hash will contain the
3499 	entire redo log. If rescan is needed or innodb_force_recovery
3500 	is set, we can ignore missing tablespaces. */
3501 	for (const recv_spaces_t::value_type& rs : recv_spaces) {
3502 		if (UNIV_LIKELY(rs.second.status != file_name_t::MISSING)) {
3503 			continue;
3504 		}
3505 
3506 		missing_tablespace = true;
3507 
3508 		if (srv_force_recovery > 0) {
3509 			ib::warn() << "Tablespace " << rs.first
3510 				<<" was not found at " << rs.second.name
3511 				<<", and innodb_force_recovery was set."
3512 				<<" All redo log for this tablespace"
3513 				<<" will be ignored!";
3514 			continue;
3515 		}
3516 
3517 		if (!rescan) {
3518 			ib::info() << "Tablespace " << rs.first
3519 				<< " was not found at '"
3520 				<< rs.second.name << "', but there"
3521 				<<" were no modifications either.";
3522 		}
3523 	}
3524 
3525 	if (!rescan || srv_force_recovery > 0) {
3526 		missing_tablespace = false;
3527 	}
3528 
3529 	return DB_SUCCESS;
3530 }
3531 
3532 /** Check if all tablespaces were found for crash recovery.
3533 @param[in]	rescan			rescan of redo logs is needed
3534 @param[out]	missing_tablespace	missing table exists
3535 @return error code or DB_SUCCESS */
3536 static MY_ATTRIBUTE((warn_unused_result))
3537 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3538 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3539 {
3540 	bool		flag_deleted	= false;
3541 
3542 	ut_ad(!srv_read_only_mode);
3543 	ut_ad(recv_needed_recovery);
3544 
3545 	for (recv_spaces_t::value_type& rs : recv_spaces) {
3546 		ut_ad(!is_predefined_tablespace(rs.first));
3547 		ut_ad(rs.second.status != file_name_t::DELETED
3548 		      || !rs.second.space);
3549 
3550 		if (rs.second.status == file_name_t::DELETED) {
3551 			/* The tablespace was deleted,
3552 			so we can ignore any redo log for it. */
3553 			flag_deleted = true;
3554 		} else if (rs.second.space != NULL) {
3555 			/* The tablespace was found, and there
3556 			are some redo log records for it. */
3557 			fil_names_dirty(rs.second.space);
3558 			rs.second.space->enable_lsn = rs.second.enable_lsn;
3559 		} else if (rs.second.name == "") {
3560 			ib::error() << "Missing MLOG_FILE_NAME"
3561 				" or MLOG_FILE_DELETE"
3562 				" before MLOG_CHECKPOINT for tablespace "
3563 				<< rs.first;
3564 			recv_sys.found_corrupt_log = true;
3565 			return(DB_CORRUPTION);
3566 		} else {
3567 			rs.second.status = file_name_t::MISSING;
3568 			flag_deleted = true;
3569 		}
3570 
3571 		ut_ad(rs.second.status == file_name_t::DELETED
3572 		      || rs.second.name != "");
3573 	}
3574 
3575 	if (flag_deleted) {
3576 		return recv_validate_tablespace(rescan, missing_tablespace);
3577 	}
3578 
3579 	return DB_SUCCESS;
3580 }
3581 
3582 /** Start recovering from a redo log checkpoint.
3583 @see recv_recovery_from_checkpoint_finish
3584 @param[in]	flush_lsn	FIL_PAGE_FILE_FLUSH_LSN
3585 of first system tablespace page
3586 @return error code or DB_SUCCESS */
3587 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3588 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3589 {
3590 	ulint		max_cp_field;
3591 	lsn_t		checkpoint_lsn;
3592 	bool		rescan;
3593 	ib_uint64_t	checkpoint_no;
3594 	lsn_t		contiguous_lsn;
3595 	byte*		buf;
3596 	dberr_t		err = DB_SUCCESS;
3597 
3598 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
3599 	      || is_mariabackup_restore_or_export());
3600 
3601 	/* Initialize red-black tree for fast insertions into the
3602 	flush_list during recovery process. */
3603 	buf_flush_init_flush_rbt();
3604 
3605 	if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3606 
3607 		ib::info() << "innodb_force_recovery=6 skips redo log apply";
3608 
3609 		return(DB_SUCCESS);
3610 	}
3611 
3612 	recv_recovery_on = true;
3613 
3614 	log_mutex_enter();
3615 
3616 	err = recv_find_max_checkpoint(&max_cp_field);
3617 
3618 	if (err != DB_SUCCESS) {
3619 
3620 		srv_start_lsn = recv_sys.recovered_lsn = log_sys.lsn;
3621 		log_mutex_exit();
3622 		return(err);
3623 	}
3624 
3625 	log_header_read(max_cp_field);
3626 
3627 	buf = log_sys.checkpoint_buf;
3628 
3629 	checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3630 	checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3631 
3632 	/* Start reading the log from the checkpoint lsn. The variable
3633 	contiguous_lsn contains an lsn up to which the log is known to
3634 	be contiguously written. */
3635 	recv_sys.mlog_checkpoint_lsn = 0;
3636 
3637 	ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3638 
3639 	const lsn_t	end_lsn = mach_read_from_8(
3640 		buf + LOG_CHECKPOINT_END_LSN);
3641 
3642 	ut_ad(recv_sys.n_addrs == 0);
3643 	contiguous_lsn = checkpoint_lsn;
3644 	switch (log_sys.log.format) {
3645 	case 0:
3646 		log_mutex_exit();
3647 		return recv_log_format_0_recover(checkpoint_lsn,
3648 						 buf[20 + 32 * 9] == 2);
3649 	default:
3650 		if (end_lsn == 0) {
3651 			break;
3652 		}
3653 		if (end_lsn >= checkpoint_lsn) {
3654 			contiguous_lsn = end_lsn;
3655 			break;
3656 		}
3657 		recv_sys.found_corrupt_log = true;
3658 		log_mutex_exit();
3659 		return(DB_ERROR);
3660 	}
3661 
3662 	/* Look for MLOG_CHECKPOINT. */
3663 	recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3664 	/* The first scan should not have stored or applied any records. */
3665 	ut_ad(recv_sys.n_addrs == 0);
3666 	ut_ad(!recv_sys.found_corrupt_fs);
3667 
3668 	if (srv_read_only_mode && recv_needed_recovery) {
3669 		log_mutex_exit();
3670 		return(DB_READ_ONLY);
3671 	}
3672 
3673 	if (recv_sys.found_corrupt_log && !srv_force_recovery) {
3674 		log_mutex_exit();
3675 		ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3676 		return(DB_ERROR);
3677 	}
3678 
3679 	if (recv_sys.mlog_checkpoint_lsn == 0) {
3680 		lsn_t scan_lsn = log_sys.log.scanned_lsn;
3681 		if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3682 			log_mutex_exit();
3683 			ib::error err;
3684 			err << "Missing MLOG_CHECKPOINT";
3685 			if (end_lsn) {
3686 				err << " at " << end_lsn;
3687 			}
3688 			err << " between the checkpoint " << checkpoint_lsn
3689 			    << " and the end " << scan_lsn << ".";
3690 			return(DB_ERROR);
3691 		}
3692 
3693 		log_sys.log.scanned_lsn = checkpoint_lsn;
3694 		rescan = false;
3695 	} else {
3696 		contiguous_lsn = checkpoint_lsn;
3697 		rescan = recv_group_scan_log_recs(
3698 			checkpoint_lsn, &contiguous_lsn, false);
3699 
3700 		if ((recv_sys.found_corrupt_log && !srv_force_recovery)
3701 		    || recv_sys.found_corrupt_fs) {
3702 			log_mutex_exit();
3703 			return(DB_ERROR);
3704 		}
3705 	}
3706 
3707 	/* NOTE: we always do a 'recovery' at startup, but only if
3708 	there is something wrong we will print a message to the
3709 	user about recovery: */
3710 
3711 	if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3712 	    && recv_sys.mlog_checkpoint_lsn == checkpoint_lsn) {
3713 		/* The redo log is logically empty. */
3714 	} else if (checkpoint_lsn != flush_lsn) {
3715 		ut_ad(!srv_log_files_created);
3716 
3717 		if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
3718 			ib::warn() << "Are you sure you are using the"
3719 				" right ib_logfiles to start up the database?"
3720 				" Log sequence number in the ib_logfiles is "
3721 				<< checkpoint_lsn << ", less than the"
3722 				" log sequence number in the first system"
3723 				" tablespace file header, " << flush_lsn << ".";
3724 		}
3725 
3726 		if (!recv_needed_recovery) {
3727 
3728 			ib::info() << "The log sequence number " << flush_lsn
3729 				<< " in the system tablespace does not match"
3730 				" the log sequence number " << checkpoint_lsn
3731 				<< " in the ib_logfiles!";
3732 
3733 			if (srv_read_only_mode) {
3734 				ib::error() << "innodb_read_only"
3735 					" prevents crash recovery";
3736 				log_mutex_exit();
3737 				return(DB_READ_ONLY);
3738 			}
3739 
3740 			recv_needed_recovery = true;
3741 		}
3742 	}
3743 
3744 	log_sys.lsn = recv_sys.recovered_lsn;
3745 
3746 	if (recv_needed_recovery) {
3747 		bool missing_tablespace = false;
3748 
3749 		err = recv_init_crash_recovery_spaces(
3750 			rescan, missing_tablespace);
3751 
3752 		if (err != DB_SUCCESS) {
3753 			log_mutex_exit();
3754 			return(err);
3755 		}
3756 
3757 		/* If there is any missing tablespace and rescan is needed
3758 		then there is a possiblity that hash table will not contain
3759 		all space ids redo logs. Rescan the remaining unstored
3760 		redo logs for the validation of missing tablespace. */
3761 		ut_ad(rescan || !missing_tablespace);
3762 
3763 		while (missing_tablespace) {
3764 			DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3765 					      "the missing tablespace. Scan "
3766 					      "from last stored LSN " LSN_PF,
3767 					      recv_sys.last_stored_lsn));
3768 
3769 			lsn_t recent_stored_lsn = recv_sys.last_stored_lsn;
3770 			rescan = recv_group_scan_log_recs(
3771 				checkpoint_lsn, &recent_stored_lsn, false);
3772 
3773 			ut_ad(!recv_sys.found_corrupt_fs);
3774 
3775 			missing_tablespace = false;
3776 
3777 			err = recv_sys.found_corrupt_log
3778 				? DB_ERROR
3779 				: recv_validate_tablespace(
3780 					rescan, missing_tablespace);
3781 
3782 			if (err != DB_SUCCESS) {
3783 				log_mutex_exit();
3784 				return err;
3785 			}
3786 
3787 			rescan = true;
3788 		}
3789 
3790 		recv_sys.parse_start_lsn = checkpoint_lsn;
3791 
3792 		if (srv_operation == SRV_OPERATION_NORMAL) {
3793 			buf_dblwr_process();
3794 		}
3795 
3796 		ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3797 
3798 		/* Spawn the background thread to flush dirty pages
3799 		from the buffer pools. */
3800 		recv_writer_thread_active = true;
3801 		os_thread_create(recv_writer_thread, 0, 0);
3802 
3803 		if (rescan) {
3804 			contiguous_lsn = checkpoint_lsn;
3805 
3806 			recv_group_scan_log_recs(
3807 				checkpoint_lsn, &contiguous_lsn, true);
3808 
3809 			if ((recv_sys.found_corrupt_log
3810 			     && !srv_force_recovery)
3811 			    || recv_sys.found_corrupt_fs) {
3812 				log_mutex_exit();
3813 				return(DB_ERROR);
3814 			}
3815 		}
3816 	} else {
3817 		ut_ad(!rescan || recv_sys.n_addrs == 0);
3818 	}
3819 
3820 	if (log_sys.log.scanned_lsn < checkpoint_lsn
3821 	    || log_sys.log.scanned_lsn < recv_max_page_lsn) {
3822 
3823 		ib::error() << "We scanned the log up to "
3824 			<< log_sys.log.scanned_lsn
3825 			<< ". A checkpoint was at " << checkpoint_lsn << " and"
3826 			" the maximum LSN on a database page was "
3827 			<< recv_max_page_lsn << ". It is possible that the"
3828 			" database is now corrupt!";
3829 	}
3830 
3831 	if (recv_sys.recovered_lsn < checkpoint_lsn) {
3832 		log_mutex_exit();
3833 
3834 		ib::error() << "Recovered only to lsn:"
3835 			    << recv_sys.recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
3836 
3837 		return(DB_ERROR);
3838 	}
3839 
3840 	log_sys.next_checkpoint_lsn = checkpoint_lsn;
3841 	log_sys.next_checkpoint_no = checkpoint_no + 1;
3842 
3843 	recv_synchronize_groups();
3844 
3845 	if (!recv_needed_recovery) {
3846 		ut_a(checkpoint_lsn == recv_sys.recovered_lsn);
3847 	} else {
3848 		srv_start_lsn = recv_sys.recovered_lsn;
3849 	}
3850 
3851 	log_sys.buf_free = ulong(log_sys.lsn % OS_FILE_LOG_BLOCK_SIZE);
3852 	log_sys.buf_next_to_write = log_sys.buf_free;
3853 	log_sys.write_lsn = log_sys.lsn;
3854 
3855 	log_sys.last_checkpoint_lsn = checkpoint_lsn;
3856 
3857 	if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) {
3858 		/* Write a MLOG_CHECKPOINT marker as the first thing,
3859 		before generating any other redo log. This ensures
3860 		that subsequent crash recovery will be possible even
3861 		if the server were killed soon after this. */
3862 		fil_names_clear(log_sys.last_checkpoint_lsn, true);
3863 	}
3864 
3865 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3866 		    log_sys.lsn - log_sys.last_checkpoint_lsn);
3867 
3868 	log_sys.next_checkpoint_no = ++checkpoint_no;
3869 
3870 	mutex_enter(&recv_sys.mutex);
3871 
3872 	recv_sys.apply_log_recs = true;
3873 	recv_no_ibuf_operations = is_mariabackup_restore_or_export();
3874 	ut_d(recv_no_log_write = recv_no_ibuf_operations);
3875 
3876 	mutex_exit(&recv_sys.mutex);
3877 
3878 	log_mutex_exit();
3879 
3880 	recv_lsn_checks_on = true;
3881 
3882 	/* The database is now ready to start almost normal processing of user
3883 	transactions: transaction rollbacks and the application of the log
3884 	records in the hash table can be run in background. */
3885 
3886 	return(DB_SUCCESS);
3887 }
3888 
3889 /** Complete recovery from a checkpoint. */
3890 void
recv_recovery_from_checkpoint_finish(void)3891 recv_recovery_from_checkpoint_finish(void)
3892 {
3893 	/* Make sure that the recv_writer thread is done. This is
3894 	required because it grabs various mutexes and we want to
3895 	ensure that when we enable sync_order_checks there is no
3896 	mutex currently held by any thread. */
3897 	mutex_enter(&recv_sys.writer_mutex);
3898 
3899 	/* Free the resources of the recovery system */
3900 	recv_recovery_on = false;
3901 
3902 	/* By acquring the mutex we ensure that the recv_writer thread
3903 	won't trigger any more LRU batches. Now wait for currently
3904 	in progress batches to finish. */
3905 	buf_flush_wait_LRU_batch_end();
3906 
3907 	mutex_exit(&recv_sys.writer_mutex);
3908 
3909 	ulint count = 0;
3910 	while (recv_writer_thread_active) {
3911 		++count;
3912 		os_thread_sleep(100000);
3913 		if (srv_print_verbose_log && count > 600) {
3914 			ib::info() << "Waiting for recv_writer to"
3915 				" finish flushing of buffer pool";
3916 			count = 0;
3917 		}
3918 	}
3919 
3920 	recv_sys.debug_free();
3921 
3922 	/* Free up the flush_rbt. */
3923 	buf_flush_free_flush_rbt();
3924 }
3925 
3926 /********************************************************//**
3927 Initiates the rollback of active transactions. */
3928 void
recv_recovery_rollback_active(void)3929 recv_recovery_rollback_active(void)
3930 /*===============================*/
3931 {
3932 	ut_ad(!recv_writer_thread_active);
3933 
3934 	/* Switch latching order checks on in sync0debug.cc, if
3935 	--innodb-sync-debug=true (default) */
3936 	ut_d(sync_check_enable());
3937 
3938 	/* We can't start any (DDL) transactions if UNDO logging
3939 	has been disabled, additionally disable ROLLBACK of recovered
3940 	user transactions. */
3941 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
3942 	    && !srv_read_only_mode) {
3943 
3944 		/* Drop partially created indexes. */
3945 		row_merge_drop_temp_indexes();
3946 		/* Drop garbage tables. */
3947 		row_mysql_drop_garbage_tables();
3948 
3949 		/* Drop any auxiliary tables that were not dropped when the
3950 		parent table was dropped. This can happen if the parent table
3951 		was dropped but the server crashed before the auxiliary tables
3952 		were dropped. */
3953 		fts_drop_orphaned_tables();
3954 
3955 		/* Rollback the uncommitted transactions which have no user
3956 		session */
3957 
3958 		trx_rollback_is_active = true;
3959 		os_thread_create(trx_rollback_all_recovered, 0, 0);
3960 	}
3961 }
3962 
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)3963 bool recv_dblwr_t::validate_page(const page_id_t page_id,
3964                                  const byte *page,
3965                                  const fil_space_t *space,
3966                                  byte *tmp_buf)
3967 {
3968   if (page_id.page_no() == 0)
3969   {
3970     ulint flags= fsp_header_get_flags(page);
3971     if (!fil_space_t::is_valid_flags(flags, page_id.space()))
3972     {
3973       ulint cflags= fsp_flags_convert_from_101(flags);
3974       if (cflags == ULINT_UNDEFINED)
3975       {
3976         ib::warn() << "Ignoring a doublewrite copy of page " << page_id
3977                    << "due to invalid flags " << ib::hex(flags);
3978         return false;
3979       }
3980 
3981       flags= cflags;
3982     }
3983 
3984     /* Page 0 is never page_compressed or encrypted. */
3985     return !buf_page_is_corrupted(true, page, flags);
3986   }
3987 
3988   ut_ad(tmp_buf);
3989   byte *tmp_frame= tmp_buf;
3990   byte *tmp_page= tmp_buf + srv_page_size;
3991   const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
3992   const bool expect_encrypted= space->crypt_data &&
3993     space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
3994 
3995   if (space->full_crc32())
3996     return !buf_page_is_corrupted(true, page, space->flags);
3997 
3998   if (expect_encrypted &&
3999       mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
4000   {
4001     if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
4002       return false;
4003     if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
4004       return true;
4005     if (space->zip_size())
4006       return false;
4007     memcpy(tmp_page, page, space->physical_size());
4008     if (!fil_space_decrypt(space, tmp_frame, tmp_page))
4009       return false;
4010   }
4011 
4012   switch (page_type) {
4013   case FIL_PAGE_PAGE_COMPRESSED:
4014     memcpy(tmp_page, page, space->physical_size());
4015     /* fall through */
4016   case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
4017     if (space->zip_size())
4018       return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
4019     ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
4020     if (!decomp)
4021       return false; /* decompression failed */
4022     if (decomp == srv_page_size)
4023       return false; /* the page was not compressed (invalid page type) */
4024     return !buf_page_is_corrupted(true, tmp_page, space->flags);
4025   }
4026 
4027   return !buf_page_is_corrupted(true, page, space->flags);
4028 }
4029 
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)4030 byte *recv_dblwr_t::find_page(const page_id_t page_id,
4031                               const fil_space_t *space, byte *tmp_buf)
4032 {
4033   byte *result= NULL;
4034   lsn_t max_lsn= 0;
4035 
4036   for (byte *page : pages)
4037   {
4038     if (page_get_page_no(page) != page_id.page_no() ||
4039         page_get_space_id(page) != page_id.space())
4040       continue;
4041     const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
4042     if (lsn <= max_lsn ||
4043         !validate_page(page_id, page, space, tmp_buf))
4044     {
4045       /* Mark processed for subsequent iterations in buf_dblwr_process() */
4046       memset(page + FIL_PAGE_LSN, 0, 8);
4047       continue;
4048     }
4049     max_lsn= lsn;
4050     result= page;
4051   }
4052 
4053   return result;
4054 }
4055 
4056 #ifndef DBUG_OFF
4057 /** Return string name of the redo log record type.
4058 @param[in]	type	record log record enum
4059 @return string name of record log record */
get_mlog_string(mlog_id_t type)4060 static const char* get_mlog_string(mlog_id_t type)
4061 {
4062 	switch (type) {
4063 	case MLOG_SINGLE_REC_FLAG:
4064 		return("MLOG_SINGLE_REC_FLAG");
4065 
4066 	case MLOG_1BYTE:
4067 		return("MLOG_1BYTE");
4068 
4069 	case MLOG_2BYTES:
4070 		return("MLOG_2BYTES");
4071 
4072 	case MLOG_4BYTES:
4073 		return("MLOG_4BYTES");
4074 
4075 	case MLOG_8BYTES:
4076 		return("MLOG_8BYTES");
4077 
4078 	case MLOG_REC_INSERT:
4079 		return("MLOG_REC_INSERT");
4080 
4081 	case MLOG_REC_CLUST_DELETE_MARK:
4082 		return("MLOG_REC_CLUST_DELETE_MARK");
4083 
4084 	case MLOG_REC_SEC_DELETE_MARK:
4085 		return("MLOG_REC_SEC_DELETE_MARK");
4086 
4087 	case MLOG_REC_UPDATE_IN_PLACE:
4088 		return("MLOG_REC_UPDATE_IN_PLACE");
4089 
4090 	case MLOG_REC_DELETE:
4091 		return("MLOG_REC_DELETE");
4092 
4093 	case MLOG_LIST_END_DELETE:
4094 		return("MLOG_LIST_END_DELETE");
4095 
4096 	case MLOG_LIST_START_DELETE:
4097 		return("MLOG_LIST_START_DELETE");
4098 
4099 	case MLOG_LIST_END_COPY_CREATED:
4100 		return("MLOG_LIST_END_COPY_CREATED");
4101 
4102 	case MLOG_PAGE_REORGANIZE:
4103 		return("MLOG_PAGE_REORGANIZE");
4104 
4105 	case MLOG_PAGE_CREATE:
4106 		return("MLOG_PAGE_CREATE");
4107 
4108 	case MLOG_UNDO_INSERT:
4109 		return("MLOG_UNDO_INSERT");
4110 
4111 	case MLOG_UNDO_ERASE_END:
4112 		return("MLOG_UNDO_ERASE_END");
4113 
4114 	case MLOG_UNDO_INIT:
4115 		return("MLOG_UNDO_INIT");
4116 
4117 	case MLOG_UNDO_HDR_REUSE:
4118 		return("MLOG_UNDO_HDR_REUSE");
4119 
4120 	case MLOG_UNDO_HDR_CREATE:
4121 		return("MLOG_UNDO_HDR_CREATE");
4122 
4123 	case MLOG_REC_MIN_MARK:
4124 		return("MLOG_REC_MIN_MARK");
4125 
4126 	case MLOG_IBUF_BITMAP_INIT:
4127 		return("MLOG_IBUF_BITMAP_INIT");
4128 
4129 #ifdef UNIV_LOG_LSN_DEBUG
4130 	case MLOG_LSN:
4131 		return("MLOG_LSN");
4132 #endif /* UNIV_LOG_LSN_DEBUG */
4133 
4134 	case MLOG_WRITE_STRING:
4135 		return("MLOG_WRITE_STRING");
4136 
4137 	case MLOG_MULTI_REC_END:
4138 		return("MLOG_MULTI_REC_END");
4139 
4140 	case MLOG_DUMMY_RECORD:
4141 		return("MLOG_DUMMY_RECORD");
4142 
4143 	case MLOG_FILE_DELETE:
4144 		return("MLOG_FILE_DELETE");
4145 
4146 	case MLOG_COMP_REC_MIN_MARK:
4147 		return("MLOG_COMP_REC_MIN_MARK");
4148 
4149 	case MLOG_COMP_PAGE_CREATE:
4150 		return("MLOG_COMP_PAGE_CREATE");
4151 
4152 	case MLOG_COMP_REC_INSERT:
4153 		return("MLOG_COMP_REC_INSERT");
4154 
4155 	case MLOG_COMP_REC_CLUST_DELETE_MARK:
4156 		return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4157 
4158 	case MLOG_COMP_REC_UPDATE_IN_PLACE:
4159 		return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4160 
4161 	case MLOG_COMP_REC_DELETE:
4162 		return("MLOG_COMP_REC_DELETE");
4163 
4164 	case MLOG_COMP_LIST_END_DELETE:
4165 		return("MLOG_COMP_LIST_END_DELETE");
4166 
4167 	case MLOG_COMP_LIST_START_DELETE:
4168 		return("MLOG_COMP_LIST_START_DELETE");
4169 
4170 	case MLOG_COMP_LIST_END_COPY_CREATED:
4171 		return("MLOG_COMP_LIST_END_COPY_CREATED");
4172 
4173 	case MLOG_COMP_PAGE_REORGANIZE:
4174 		return("MLOG_COMP_PAGE_REORGANIZE");
4175 
4176 	case MLOG_FILE_CREATE2:
4177 		return("MLOG_FILE_CREATE2");
4178 
4179 	case MLOG_ZIP_WRITE_NODE_PTR:
4180 		return("MLOG_ZIP_WRITE_NODE_PTR");
4181 
4182 	case MLOG_ZIP_WRITE_BLOB_PTR:
4183 		return("MLOG_ZIP_WRITE_BLOB_PTR");
4184 
4185 	case MLOG_ZIP_WRITE_HEADER:
4186 		return("MLOG_ZIP_WRITE_HEADER");
4187 
4188 	case MLOG_ZIP_PAGE_COMPRESS:
4189 		return("MLOG_ZIP_PAGE_COMPRESS");
4190 
4191 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4192 		return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4193 
4194 	case MLOG_ZIP_PAGE_REORGANIZE:
4195 		return("MLOG_ZIP_PAGE_REORGANIZE");
4196 
4197 	case MLOG_ZIP_WRITE_TRX_ID:
4198 		return("MLOG_ZIP_WRITE_TRX_ID");
4199 
4200 	case MLOG_FILE_RENAME2:
4201 		return("MLOG_FILE_RENAME2");
4202 
4203 	case MLOG_FILE_NAME:
4204 		return("MLOG_FILE_NAME");
4205 
4206 	case MLOG_CHECKPOINT:
4207 		return("MLOG_CHECKPOINT");
4208 
4209 	case MLOG_PAGE_CREATE_RTREE:
4210 		return("MLOG_PAGE_CREATE_RTREE");
4211 
4212 	case MLOG_COMP_PAGE_CREATE_RTREE:
4213 		return("MLOG_COMP_PAGE_CREATE_RTREE");
4214 
4215 	case MLOG_INIT_FILE_PAGE2:
4216 		return("MLOG_INIT_FILE_PAGE2");
4217 
4218 	case MLOG_INDEX_LOAD:
4219 		return("MLOG_INDEX_LOAD");
4220 
4221 	case MLOG_TRUNCATE:
4222 		return("MLOG_TRUNCATE");
4223 
4224 	case MLOG_MEMSET:
4225 		return("MLOG_MEMSET");
4226 
4227 	case MLOG_INIT_FREE_PAGE:
4228 		return("MLOG_INIT_FREE_PAGE");
4229 
4230 	case MLOG_FILE_WRITE_CRYPT_DATA:
4231 		return("MLOG_FILE_WRITE_CRYPT_DATA");
4232 	}
4233 	DBUG_ASSERT(0);
4234 	return(NULL);
4235 }
4236 #endif /* !DBUG_OFF */
4237