1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2013, 2020, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file log/log0recv.cc
23 Recovery
24 
25 Created 9/20/1997 Heikki Tuuri
26 *******************************************************/
27 
28 #include "univ.i"
29 
30 #include <map>
31 #include <string>
32 #include <my_service_manager.h>
33 
34 #include "log0recv.h"
35 
36 #ifdef HAVE_MY_AES_H
37 #include <my_aes.h>
38 #endif
39 
40 #include "log0crypt.h"
41 #include "mem0mem.h"
42 #include "buf0buf.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0cur.h"
47 #include "page0zip.h"
48 #include "btr0btr.h"
49 #include "btr0cur.h"
50 #include "ibuf0ibuf.h"
51 #include "trx0undo.h"
52 #include "trx0rec.h"
53 #include "fil0fil.h"
54 #include "row0trunc.h"
55 #include "buf0rea.h"
56 #include "srv0srv.h"
57 #include "srv0start.h"
58 #include "trx0roll.h"
59 #include "row0merge.h"
60 #include "fil0pagecompress.h"
61 
62 /** Log records are stored in the hash table in chunks at most of this size;
63 this must be less than srv_page_size as it is stored in the buffer pool */
64 #define RECV_DATA_BLOCK_SIZE	(MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t) - REDZONE_SIZE)
65 
66 /** Read-ahead area in applying log records to file pages */
67 #define RECV_READ_AHEAD_AREA	32
68 
69 /** The recovery system */
70 recv_sys_t*	recv_sys;
71 /** TRUE when applying redo log records during crash recovery; FALSE
72 otherwise.  Note that this is FALSE while a background thread is
73 rolling back incomplete transactions. */
74 volatile bool	recv_recovery_on;
75 
76 /** TRUE when recv_init_crash_recovery() has been called. */
77 bool	recv_needed_recovery;
78 #ifdef UNIV_DEBUG
79 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
80 Protected by log_sys.mutex. */
81 bool	recv_no_log_write = false;
82 #endif /* UNIV_DEBUG */
83 
84 /** TRUE if buf_page_is_corrupted() should check if the log sequence
85 number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
86 recv_recovery_from_checkpoint_start(). */
87 bool	recv_lsn_checks_on;
88 
89 /** If the following is TRUE, the buffer pool file pages must be invalidated
90 after recovery and no ibuf operations are allowed; this becomes TRUE if
91 the log record hash table becomes too full, and log records must be merged
92 to file pages already before the recovery is finished: in this case no
93 ibuf operations are allowed, as they could modify the pages read in the
94 buffer pool before the pages have been recovered to the up-to-date state.
95 
96 TRUE means that recovery is running and no operations on the log files
97 are allowed yet: the variable name is misleading. */
98 bool	recv_no_ibuf_operations;
99 
100 /** The type of the previous parsed redo log record */
101 static mlog_id_t	recv_previous_parsed_rec_type;
102 /** The offset of the previous parsed redo log record */
103 static ulint	recv_previous_parsed_rec_offset;
104 /** The 'multi' flag of the previous parsed redo log record */
105 static ulint	recv_previous_parsed_rec_is_multi;
106 
107 /** The maximum lsn we see for a page during the recovery process. If this
108 is bigger than the lsn we are able to scan up to, that is an indication that
109 the recovery failed and the database may be corrupt. */
110 static lsn_t	recv_max_page_lsn;
111 
112 #ifdef UNIV_PFS_THREAD
113 mysql_pfs_key_t	trx_rollback_clean_thread_key;
114 mysql_pfs_key_t	recv_writer_thread_key;
115 #endif /* UNIV_PFS_THREAD */
116 
117 /** Is recv_writer_thread active? */
118 bool	recv_writer_thread_active;
119 
120 #ifndef	DBUG_OFF
121 /** Return string name of the redo log record type.
122 @param[in]	type	record log record enum
123 @return string name of record log record */
124 static const char* get_mlog_string(mlog_id_t type);
125 #endif /* !DBUG_OFF */
126 
127 /** Tablespace item during recovery */
128 struct file_name_t {
129 	/** Tablespace file name (MLOG_FILE_NAME) */
130 	std::string	name;
131 	/** Tablespace object (NULL if not valid or not found) */
132 	fil_space_t*	space;
133 
134 	/** Tablespace status. */
135 	enum fil_status {
136 		/** Normal tablespace */
137 		NORMAL,
138 		/** Deleted tablespace */
139 		DELETED,
140 		/** Missing tablespace */
141 		MISSING
142 	};
143 
144 	/** Status of the tablespace */
145 	fil_status	status;
146 
147 	/** FSP_SIZE of tablespace */
148 	ulint		size;
149 
150 	/** the log sequence number of the last observed MLOG_INDEX_LOAD
151 	record for the tablespace */
152 	lsn_t		enable_lsn;
153 
154 	/** Constructor */
file_name_tfile_name_t155 	file_name_t(std::string name_, bool deleted) :
156 		name(name_), space(NULL), status(deleted ? DELETED: NORMAL),
157 		size(0), enable_lsn(0) {}
158 
159 	/** Report a MLOG_INDEX_LOAD operation, meaning that
160 	mlog_init for any earlier LSN must be skipped.
161 	@param lsn	log sequence number of the MLOG_INDEX_LOAD */
mlog_index_loadfile_name_t162 	void mlog_index_load(lsn_t lsn)
163 	{
164 		if (enable_lsn < lsn) enable_lsn = lsn;
165 	}
166 };
167 
168 /** Map of dirty tablespaces during recovery */
169 typedef std::map<
170 	ulint,
171 	file_name_t,
172 	std::less<ulint>,
173 	ut_allocator<std::pair<const ulint, file_name_t> > >	recv_spaces_t;
174 
175 static recv_spaces_t	recv_spaces;
176 
177 /** States of recv_addr_t */
178 enum recv_addr_state {
179 	/** not yet processed */
180 	RECV_NOT_PROCESSED,
181 	/** not processed; the page will be reinitialized */
182 	RECV_WILL_NOT_READ,
183 	/** page is being read */
184 	RECV_BEING_READ,
185 	/** log records are being applied on the page */
186 	RECV_BEING_PROCESSED,
187 	/** log records have been applied on the page */
188 	RECV_PROCESSED,
189 	/** log records have been discarded because the tablespace
190 	does not exist */
191 	RECV_DISCARDED
192 };
193 
194 /** Hashed page file address struct */
195 struct recv_addr_t{
196 	/** recovery state of the page */
197 	recv_addr_state	state;
198 	/** tablespace identifier */
199 	unsigned	space:32;
200 	/** page number */
201 	unsigned	page_no:32;
202 	/** list of log records for this page */
203 	UT_LIST_BASE_NODE_T(recv_t) rec_list;
204 	/** hash node in the hash bucket chain */
205 	hash_node_t	addr_hash;
206 };
207 
208 /** Report optimized DDL operation (without redo log),
209 corresponding to MLOG_INDEX_LOAD.
210 @param[in]	space_id	tablespace identifier
211 */
212 void (*log_optimized_ddl_op)(ulint space_id);
213 
214 /** Report backup-unfriendly TRUNCATE operation (with separate log file),
215 corresponding to MLOG_TRUNCATE. */
216 void (*log_truncate)();
217 
218 /** Report an operation to create, delete, or rename a file during backup.
219 @param[in]	space_id	tablespace identifier
220 @param[in]	flags		tablespace flags (NULL if not create)
221 @param[in]	name		file name (not NUL-terminated)
222 @param[in]	len		length of name, in bytes
223 @param[in]	new_name	new file name (NULL if not rename)
224 @param[in]	new_len		length of new_name, in bytes (0 if NULL) */
225 void (*log_file_op)(ulint space_id, const byte* flags,
226 		    const byte* name, ulint len,
227 		    const byte* new_name, ulint new_len);
228 
229 /** Information about initializing page contents during redo log processing */
230 class mlog_init_t
231 {
232 public:
233 	/** A page initialization operation that was parsed from
234 	the redo log */
235 	struct init {
236 		/** log sequence number of the page initialization */
237 		lsn_t lsn;
238 		/** Whether btr_page_create() avoided a read of the page.
239 
240 		At the end of the last recovery batch, ibuf_merge()
241 		will invoke change buffer merge for pages that reside
242 		in the buffer pool. (In the last batch, loading pages
243 		would trigger change buffer merge.) */
244 		bool created;
245 	};
246 
247 private:
248 	typedef std::map<const page_id_t, init,
249 			 std::less<const page_id_t>,
250 			 ut_allocator<std::pair<const page_id_t, init> > >
251 		map;
252 	/** Map of page initialization operations.
253 	FIXME: Merge this to recv_sys->addr_hash! */
254 	map inits;
255 public:
256 	/** Record that a page will be initialized by the redo log.
257 	@param[in]	space		tablespace identifier
258 	@param[in]	page_no		page number
259 	@param[in]	lsn		log sequence number */
add(ulint space,ulint page_no,lsn_t lsn)260 	void add(ulint space, ulint page_no, lsn_t lsn)
261 	{
262 		ut_ad(mutex_own(&recv_sys->mutex));
263 		const init init = { lsn, false };
264 		std::pair<map::iterator, bool> p = inits.insert(
265 			map::value_type(page_id_t(space, page_no), init));
266 		ut_ad(!p.first->second.created);
267 		if (!p.second && p.first->second.lsn < init.lsn) {
268 			p.first->second = init;
269 		}
270 	}
271 
272 	/** Get the last stored lsn of the page id and its respective
273 	init/load operation.
274 	@param[in]	page_id	page id
275 	@param[in,out]	init	initialize log or load log
276 	@return the latest page initialization;
277 	not valid after releasing recv_sys->mutex. */
last(page_id_t page_id)278 	init& last(page_id_t page_id)
279 	{
280 		ut_ad(mutex_own(&recv_sys->mutex));
281 		return inits.find(page_id)->second;
282 	}
283 
284 	/** At the end of each recovery batch, reset the 'created' flags. */
reset()285 	void reset()
286 	{
287 		ut_ad(mutex_own(&recv_sys->mutex));
288 		ut_ad(recv_no_ibuf_operations);
289 		for (map::iterator i= inits.begin(); i != inits.end(); i++) {
290 			i->second.created = false;
291 		}
292 	}
293 
294 	/** On the last recovery batch, merge buffered changes to those
295 	pages that were initialized by buf_page_create() and still reside
296 	in the buffer pool. Stale pages are not allowed in the buffer pool.
297 
298 	Note: When MDEV-14481 implements redo log apply in the
299 	background, we will have to ensure that buf_page_get_gen()
300 	will not deliver stale pages to users (pages on which the
301 	change buffer was not merged yet).  Normally, the change
302 	buffer merge is performed on I/O completion. Maybe, add a
303 	flag to buf_page_t and perform the change buffer merge on
304 	the first actual access?
305 	@param[in,out]	mtr	dummy mini-transaction */
ibuf_merge(mtr_t & mtr)306 	void ibuf_merge(mtr_t& mtr)
307 	{
308 		ut_ad(mutex_own(&recv_sys->mutex));
309 		ut_ad(!recv_no_ibuf_operations);
310 		mtr.start();
311 
312 		for (map::const_iterator i= inits.begin(); i != inits.end();
313 		     i++) {
314 			if (!i->second.created) {
315 				continue;
316 			}
317 			if (buf_block_t* block = buf_page_get_low(
318 				    i->first, univ_page_size, RW_X_LATCH, NULL,
319 				    BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
320 				    &mtr, NULL)) {
321 				mutex_exit(&recv_sys->mutex);
322 				ibuf_merge_or_delete_for_page(
323 					block, i->first, block->page.size);
324 				mtr.commit();
325 				mtr.start();
326 				mutex_enter(&recv_sys->mutex);
327 			}
328 		}
329 
330 		mtr.commit();
331 	}
332 
333 	/** Clear the data structure */
clear()334 	void clear() { inits.clear(); }
335 };
336 
337 static mlog_init_t mlog_init;
338 
339 /** Process a MLOG_CREATE2 record that indicates that a tablespace
340 is being shrunk in size.
341 @param[in]	space_id	tablespace identifier
342 @param[in]	pages		trimmed size of the file, in pages
343 @param[in]	lsn		log sequence number of the operation */
recv_addr_trim(ulint space_id,unsigned pages,lsn_t lsn)344 static void recv_addr_trim(ulint space_id, unsigned pages, lsn_t lsn)
345 {
346 	DBUG_ENTER("recv_addr_trim");
347 	DBUG_LOG("ib_log",
348 		 "discarding log beyond end of tablespace "
349 		 << page_id_t(space_id, pages) << " before LSN " << lsn);
350 	ut_ad(mutex_own(&recv_sys->mutex));
351 	for (ulint i = recv_sys->addr_hash->n_cells; i--; ) {
352 		hash_cell_t* const cell = hash_get_nth_cell(
353 			recv_sys->addr_hash, i);
354 		for (recv_addr_t* addr = static_cast<recv_addr_t*>(cell->node),
355 			     *next;
356 		     addr; addr = next) {
357 			next = static_cast<recv_addr_t*>(addr->addr_hash);
358 
359 			if (addr->space != space_id || addr->page_no < pages) {
360 				continue;
361 			}
362 
363 			for (recv_t* recv = UT_LIST_GET_FIRST(addr->rec_list);
364 			     recv; ) {
365 				recv_t* n = UT_LIST_GET_NEXT(rec_list, recv);
366 				if (recv->start_lsn < lsn) {
367 					DBUG_PRINT("ib_log",
368 						   ("Discarding %s for"
369 						    " page %u:%u at " LSN_PF,
370 						    get_mlog_string(
371 							    recv->type),
372 						    addr->space, addr->page_no,
373 						    recv->start_lsn));
374 					UT_LIST_REMOVE(addr->rec_list, recv);
375 				}
376 				recv = n;
377 			}
378 		}
379 	}
380 	if (fil_space_t* space = fil_space_get(space_id)) {
381 		ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
382 		fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
383 		ut_ad(file->is_open());
384 		os_file_truncate(file->name, file->handle,
385 				 os_offset_t(pages) << srv_page_size_shift,
386 				 true);
387 	}
388 	DBUG_VOID_RETURN;
389 }
390 
391 /** Process a file name from a MLOG_FILE_* record.
392 @param[in,out]	name		file name
393 @param[in]	len		length of the file name
394 @param[in]	space_id	the tablespace ID
395 @param[in]	deleted		whether this is a MLOG_FILE_DELETE record */
396 static
397 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)398 fil_name_process(
399 	char*	name,
400 	ulint	len,
401 	ulint	space_id,
402 	bool	deleted)
403 {
404 	if (srv_operation == SRV_OPERATION_BACKUP) {
405 		return;
406 	}
407 
408 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
409 	      || is_mariabackup_restore_or_export());
410 
411 	/* We will also insert space=NULL into the map, so that
412 	further checks can ensure that a MLOG_FILE_NAME record was
413 	scanned before applying any page records for the space_id. */
414 
415 	os_normalize_path(name);
416 	file_name_t	fname(std::string(name, len - 1), deleted);
417 	std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
418 		std::make_pair(space_id, fname));
419 	ut_ad(p.first->first == space_id);
420 
421 	file_name_t&	f = p.first->second;
422 
423 	if (deleted) {
424 		/* Got MLOG_FILE_DELETE */
425 
426 		if (!p.second && f.status != file_name_t::DELETED) {
427 			f.status = file_name_t::DELETED;
428 			if (f.space != NULL) {
429 				fil_space_free(space_id, false);
430 				f.space = NULL;
431 			}
432 		}
433 
434 		ut_ad(f.space == NULL);
435 	} else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
436 		   || f.name != fname.name) {
437 		fil_space_t*	space;
438 
439 		/* Check if the tablespace file exists and contains
440 		the space_id. If not, ignore the file after displaying
441 		a note. Abort if there are multiple files with the
442 		same space_id. */
443 		switch (fil_ibd_load(space_id, name, space)) {
444 		case FIL_LOAD_OK:
445 			ut_ad(space != NULL);
446 
447 			if (f.space == NULL || f.space == space) {
448 
449 				if (f.size && f.space == NULL) {
450 					fil_space_set_recv_size(space->id, f.size);
451 				}
452 
453 				f.name = fname.name;
454 				f.space = space;
455 				f.status = file_name_t::NORMAL;
456 			} else {
457 				ib::error() << "Tablespace " << space_id
458 					<< " has been found in two places: '"
459 					<< f.name << "' and '" << name << "'."
460 					" You must delete one of them.";
461 				recv_sys->found_corrupt_fs = true;
462 			}
463 			break;
464 
465 		case FIL_LOAD_ID_CHANGED:
466 			ut_ad(space == NULL);
467 			break;
468 
469 		case FIL_LOAD_NOT_FOUND:
470 			/* No matching tablespace was found; maybe it
471 			was renamed, and we will find a subsequent
472 			MLOG_FILE_* record. */
473 			ut_ad(space == NULL);
474 
475 			if (srv_force_recovery) {
476 				/* Without innodb_force_recovery,
477 				missing tablespaces will only be
478 				reported in
479 				recv_init_crash_recovery_spaces().
480 				Enable some more diagnostics when
481 				forcing recovery. */
482 
483 				ib::info()
484 					<< "At LSN: " << recv_sys->recovered_lsn
485 					<< ": unable to open file " << name
486 					<< " for tablespace " << space_id;
487 			}
488 			break;
489 
490 		case FIL_LOAD_INVALID:
491 			ut_ad(space == NULL);
492 			if (srv_force_recovery == 0) {
493 				ib::warn() << "We do not continue the crash"
494 					" recovery, because the table may"
495 					" become corrupt if we cannot apply"
496 					" the log records in the InnoDB log to"
497 					" it. To fix the problem and start"
498 					" mysqld:";
499 				ib::info() << "1) If there is a permission"
500 					" problem in the file and mysqld"
501 					" cannot open the file, you should"
502 					" modify the permissions.";
503 				ib::info() << "2) If the tablespace is not"
504 					" needed, or you can restore an older"
505 					" version from a backup, then you can"
506 					" remove the .ibd file, and use"
507 					" --innodb_force_recovery=1 to force"
508 					" startup without this file.";
509 				ib::info() << "3) If the file system or the"
510 					" disk is broken, and you cannot"
511 					" remove the .ibd file, you can set"
512 					" --innodb_force_recovery.";
513 				recv_sys->found_corrupt_fs = true;
514 				break;
515 			}
516 
517 			ib::info() << "innodb_force_recovery was set to "
518 				<< srv_force_recovery << ". Continuing crash"
519 				" recovery even though we cannot access the"
520 				" files for tablespace " << space_id << ".";
521 			break;
522 		}
523 	}
524 }
525 
526 /** Parse or process a MLOG_FILE_* record.
527 @param[in]	ptr		redo log record
528 @param[in]	end		end of the redo log buffer
529 @param[in]	space_id	the tablespace ID
530 @param[in]	first_page_no	first page number in the file
531 @param[in]	type		MLOG_FILE_NAME or MLOG_FILE_DELETE
532 or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
533 @param[in]	apply		whether to apply the record
534 @return pointer to next redo log record
535 @retval NULL if this log record was truncated */
536 static
537 byte*
fil_name_parse(byte * ptr,const byte * end,ulint space_id,ulint first_page_no,mlog_id_t type,bool apply)538 fil_name_parse(
539 	byte*		ptr,
540 	const byte*	end,
541 	ulint		space_id,
542 	ulint		first_page_no,
543 	mlog_id_t	type,
544 	bool		apply)
545 {
546 	if (type == MLOG_FILE_CREATE2) {
547 		if (end < ptr + 4) {
548 			return(NULL);
549 		}
550 		ptr += 4;
551 	}
552 
553 	if (end < ptr + 2) {
554 		return(NULL);
555 	}
556 
557 	ulint	len = mach_read_from_2(ptr);
558 	ptr += 2;
559 	if (end < ptr + len) {
560 		return(NULL);
561 	}
562 
563 	/* MLOG_FILE_* records should only be written for
564 	user-created tablespaces. The name must be long enough
565 	and end in .ibd. */
566 	bool corrupt = is_predefined_tablespace(space_id)
567 		|| len < sizeof "/a.ibd\0"
568 		|| (!first_page_no != !memcmp(ptr + len - 5, DOT_IBD, 5));
569 
570 	if (!corrupt && !memchr(ptr, OS_PATH_SEPARATOR, len)) {
571 		if (byte* c = static_cast<byte*>
572 		    (memchr(ptr, OS_PATH_SEPARATOR_ALT, len))) {
573 			ut_ad(c >= ptr);
574 			ut_ad(c < ptr + len);
575 			do {
576 				*c = OS_PATH_SEPARATOR;
577 			} while ((c = static_cast<byte*>
578 				  (memchr(ptr, OS_PATH_SEPARATOR_ALT,
579 					  len - ulint(c - ptr)))) != NULL);
580 		} else {
581 			corrupt = true;
582 		}
583 	}
584 
585 	byte*	end_ptr	= ptr + len;
586 
587 	switch (type) {
588 	default:
589 		ut_ad(0); // the caller checked this
590 		/* fall through */
591 	case MLOG_FILE_NAME:
592 		if (UNIV_UNLIKELY(corrupt)) {
593 			ib::error() << "MLOG_FILE_NAME incorrect:" << ptr;
594 			recv_sys->found_corrupt_log = true;
595 			break;
596 		}
597 
598 		fil_name_process(
599 			reinterpret_cast<char*>(ptr), len, space_id, false);
600 		break;
601 	case MLOG_FILE_DELETE:
602 		if (UNIV_UNLIKELY(corrupt)) {
603 			ib::error() << "MLOG_FILE_DELETE incorrect:" << ptr;
604 			recv_sys->found_corrupt_log = true;
605 			break;
606 		}
607 
608 		fil_name_process(
609 			reinterpret_cast<char*>(ptr), len, space_id, true);
610 		/* fall through */
611 	case MLOG_FILE_CREATE2:
612 		if (first_page_no) {
613 			ut_ad(first_page_no
614 			      == SRV_UNDO_TABLESPACE_SIZE_IN_PAGES);
615 			ut_a(srv_is_undo_tablespace(space_id));
616 			compile_time_assert(
617 				UT_ARR_SIZE(recv_sys->truncated_undo_spaces)
618 				== TRX_SYS_MAX_UNDO_SPACES);
619 			recv_sys_t::trunc& t = recv_sys->truncated_undo_spaces[
620 				space_id - srv_undo_space_id_start];
621 			t.lsn = recv_sys->recovered_lsn;
622 			t.pages = uint32_t(first_page_no);
623 		} else if (log_file_op) {
624 			log_file_op(space_id,
625 				    type == MLOG_FILE_CREATE2 ? ptr - 4 : NULL,
626 				    ptr, len, NULL, 0);
627 		}
628 		break;
629 	case MLOG_FILE_RENAME2:
630 		if (UNIV_UNLIKELY(corrupt)) {
631 			ib::error() << "MLOG_FILE_RENAME2 incorrect:" << ptr;
632 			recv_sys->found_corrupt_log = true;
633 		}
634 
635 		/* The new name follows the old name. */
636 		byte*	new_name = end_ptr + 2;
637 		if (end < new_name) {
638 			return(NULL);
639 		}
640 
641 		ulint	new_len = mach_read_from_2(end_ptr);
642 
643 		if (end < end_ptr + 2 + new_len) {
644 			return(NULL);
645 		}
646 
647 		end_ptr += 2 + new_len;
648 
649 		corrupt = corrupt
650 			|| new_len < sizeof "/a.ibd\0"
651 			|| memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0;
652 
653 		if (!corrupt && !memchr(new_name, OS_PATH_SEPARATOR, new_len)) {
654 			if (byte* c = static_cast<byte*>
655 			    (memchr(new_name, OS_PATH_SEPARATOR_ALT,
656 				    new_len))) {
657 				ut_ad(c >= new_name);
658 				ut_ad(c < new_name + new_len);
659 				do {
660 					*c = OS_PATH_SEPARATOR;
661 				} while ((c = static_cast<byte*>
662 					  (memchr(ptr, OS_PATH_SEPARATOR_ALT,
663 						  new_len
664 						  - ulint(c - new_name))))
665 					 != NULL);
666 			} else {
667 				corrupt = true;
668 			}
669 		}
670 
671 		if (UNIV_UNLIKELY(corrupt)) {
672 			ib::error() << "MLOG_FILE_RENAME2 new_name incorrect:" << ptr
673 				    << " new_name: " << new_name;
674 			recv_sys->found_corrupt_log = true;
675 			break;
676 		}
677 
678 		fil_name_process(
679 			reinterpret_cast<char*>(ptr), len,
680 			space_id, false);
681 		fil_name_process(
682 			reinterpret_cast<char*>(new_name), new_len,
683 			space_id, false);
684 
685 		if (log_file_op) {
686 			log_file_op(space_id, NULL,
687 				    ptr, len, new_name, new_len);
688 		}
689 
690 		if (!apply) {
691 			break;
692 		}
693 		if (!fil_op_replay_rename(
694 			    space_id, first_page_no,
695 			    reinterpret_cast<const char*>(ptr),
696 			    reinterpret_cast<const char*>(new_name))) {
697 			recv_sys->found_corrupt_fs = true;
698 		}
699 	}
700 
701 	return(end_ptr);
702 }
703 
704 /** Clean up after recv_sys_init() */
705 void
recv_sys_close()706 recv_sys_close()
707 {
708 	if (recv_sys != NULL) {
709 		recv_sys->dblwr.pages.clear();
710 
711 		if (recv_sys->addr_hash != NULL) {
712 			hash_table_free(recv_sys->addr_hash);
713 		}
714 
715 		if (recv_sys->heap != NULL) {
716 			mem_heap_free(recv_sys->heap);
717 		}
718 
719 		if (recv_sys->flush_start != NULL) {
720 			os_event_destroy(recv_sys->flush_start);
721 		}
722 
723 		if (recv_sys->flush_end != NULL) {
724 			os_event_destroy(recv_sys->flush_end);
725 		}
726 
727 		if (recv_sys->buf != NULL) {
728 			ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
729 		}
730 
731 		ut_ad(!recv_writer_thread_active);
732 		mutex_free(&recv_sys->writer_mutex);
733 
734 		mutex_free(&recv_sys->mutex);
735 
736 		ut_free(recv_sys);
737 		recv_sys = NULL;
738 	}
739 
740 	recv_spaces.clear();
741 	mlog_init.clear();
742 }
743 
744 /************************************************************
745 Reset the state of the recovery system variables. */
746 void
recv_sys_var_init(void)747 recv_sys_var_init(void)
748 /*===================*/
749 {
750 	recv_recovery_on = false;
751 	recv_needed_recovery = false;
752 	recv_lsn_checks_on = false;
753 	recv_no_ibuf_operations = false;
754 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
755 	recv_previous_parsed_rec_offset	= 0;
756 	recv_previous_parsed_rec_is_multi = 0;
757 	recv_max_page_lsn = 0;
758 }
759 
760 /******************************************************************//**
761 recv_writer thread tasked with flushing dirty pages from the buffer
762 pools.
763 @return a dummy parameter */
764 extern "C"
765 os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)766 DECLARE_THREAD(recv_writer_thread)(
767 /*===============================*/
768 	void*	arg MY_ATTRIBUTE((unused)))
769 			/*!< in: a dummy parameter required by
770 			os_thread_create */
771 {
772 	my_thread_init();
773 	ut_ad(!srv_read_only_mode);
774 
775 #ifdef UNIV_PFS_THREAD
776 	pfs_register_thread(recv_writer_thread_key);
777 #endif /* UNIV_PFS_THREAD */
778 
779 #ifdef UNIV_DEBUG_THREAD_CREATION
780 	ib::info() << "recv_writer thread running, id "
781 		<< os_thread_pf(os_thread_get_curr_id());
782 #endif /* UNIV_DEBUG_THREAD_CREATION */
783 
784 	while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
785 
786 		/* Wait till we get a signal to clean the LRU list.
787 		Bounded by max wait time of 100ms. */
788 		int64_t      sig_count = os_event_reset(buf_flush_event);
789 		os_event_wait_time_low(buf_flush_event, 100000, sig_count);
790 
791 		mutex_enter(&recv_sys->writer_mutex);
792 
793 		if (!recv_recovery_is_on()) {
794 			mutex_exit(&recv_sys->writer_mutex);
795 			break;
796 		}
797 
798 		/* Flush pages from end of LRU if required */
799 		os_event_reset(recv_sys->flush_end);
800 		recv_sys->flush_type = BUF_FLUSH_LRU;
801 		os_event_set(recv_sys->flush_start);
802 		os_event_wait(recv_sys->flush_end);
803 
804 		mutex_exit(&recv_sys->writer_mutex);
805 	}
806 
807 	recv_writer_thread_active = false;
808 
809 	my_thread_end();
810 	/* We count the number of threads in os_thread_exit().
811 	A created thread should always use that to exit and not
812 	use return() to exit. */
813 	os_thread_exit();
814 
815 	OS_THREAD_DUMMY_RETURN;
816 }
817 
818 /** Initialize the redo log recovery subsystem. */
819 void
recv_sys_init()820 recv_sys_init()
821 {
822 	ut_ad(recv_sys == NULL);
823 
824 	recv_sys = static_cast<recv_sys_t*>(ut_zalloc_nokey(sizeof(*recv_sys)));
825 
826 	mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
827 	mutex_create(LATCH_ID_RECV_WRITER, &recv_sys->writer_mutex);
828 
829 	recv_sys->heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
830 
831 	if (!srv_read_only_mode) {
832 		recv_sys->flush_start = os_event_create(0);
833 		recv_sys->flush_end = os_event_create(0);
834 	}
835 
836 	recv_sys->buf = static_cast<byte*>(
837 		ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
838 	recv_sys->buf_size = RECV_PARSING_BUF_SIZE;
839 
840 	recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
841 	recv_sys->progress_time = time(NULL);
842 	recv_max_page_lsn = 0;
843 
844 	/* Call the constructor for recv_sys_t::dblwr member */
845 	new (&recv_sys->dblwr) recv_dblwr_t();
846 }
847 
848 /** Empty a fully processed hash table. */
849 static
850 void
recv_sys_empty_hash()851 recv_sys_empty_hash()
852 {
853 	ut_ad(mutex_own(&(recv_sys->mutex)));
854 	ut_a(recv_sys->n_addrs == 0);
855 
856 	hash_table_free(recv_sys->addr_hash);
857 	mem_heap_empty(recv_sys->heap);
858 
859 	recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
860 }
861 
862 /********************************************************//**
863 Frees the recovery system. */
864 void
recv_sys_debug_free(void)865 recv_sys_debug_free(void)
866 /*=====================*/
867 {
868 	mutex_enter(&(recv_sys->mutex));
869 
870 	hash_table_free(recv_sys->addr_hash);
871 	mem_heap_free(recv_sys->heap);
872 	ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
873 
874 	recv_sys->buf_size = 0;
875 	recv_sys->buf = NULL;
876 	recv_sys->heap = NULL;
877 	recv_sys->addr_hash = NULL;
878 
879 	/* wake page cleaner up to progress */
880 	if (!srv_read_only_mode) {
881 		ut_ad(!recv_recovery_is_on());
882 		ut_ad(!recv_writer_thread_active);
883 		os_event_reset(buf_flush_event);
884 		os_event_set(recv_sys->flush_start);
885 	}
886 
887 	mutex_exit(&(recv_sys->mutex));
888 }
889 
890 /** Read a log segment to log_sys.buf.
891 @param[in,out]	start_lsn	in: read area start,
892 out: the last read valid lsn
893 @param[in]	end_lsn		read area end
894 @return	whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)895 bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
896 {
897 	ulint	len;
898 	bool success = true;
899 	ut_ad(log_sys.mutex.is_owned());
900 	ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
901 	ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
902 	byte* buf = log_sys.buf;
903 loop:
904 	lsn_t source_offset = calc_lsn_offset(*start_lsn);
905 
906 	ut_a(end_lsn - *start_lsn <= ULINT_MAX);
907 	len = (ulint) (end_lsn - *start_lsn);
908 
909 	ut_ad(len != 0);
910 
911 	const bool at_eof = (source_offset % file_size) + len > file_size;
912 	if (at_eof) {
913 		/* If the above condition is true then len (which is ulint)
914 		is > the expression below, so the typecast is ok */
915 		len = ulint(file_size - (source_offset % file_size));
916 	}
917 
918 	log_sys.n_log_ios++;
919 
920 	MONITOR_INC(MONITOR_LOG_IO);
921 
922 	ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
923 
924 	const ulint	page_no = ulint(source_offset >> srv_page_size_shift);
925 
926 	fil_io(IORequestLogRead, true,
927 	       page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
928 	       univ_page_size,
929 	       ulint(source_offset & (srv_page_size - 1)),
930 	       len, buf, NULL);
931 
932 	for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
933 		     buf += OS_FILE_LOG_BLOCK_SIZE,
934 		     (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
935 		const ulint block_number = log_block_get_hdr_no(buf);
936 
937 		if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
938 			/* Garbage or an incompletely written log block.
939 			We will not report any error, because this can
940 			happen when InnoDB was killed while it was
941 			writing redo log. We simply treat this as an
942 			abrupt end of the redo log. */
943 fail:
944 			end_lsn = *start_lsn;
945 			success = false;
946 			break;
947 		}
948 
949 		if (innodb_log_checksums || is_encrypted()) {
950 			ulint crc = log_block_calc_checksum_crc32(buf);
951 			ulint cksum = log_block_get_checksum(buf);
952 
953 			DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
954 					 static int block_counter;
955 					 if (block_counter++ == 0) {
956 						 cksum = crc + 1;
957 					 }
958 			 });
959 
960 			DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
961 
962 			if (crc != cksum) {
963 				ib::error_or_warn(srv_operation != SRV_OPERATION_BACKUP)
964 					    << "Invalid log block checksum."
965 					    << " block: " << block_number
966 					    << " checkpoint no: "
967 					    << log_block_get_checkpoint_no(buf)
968 					    << " expected: " << crc
969 					    << " found: " << cksum;
970 				goto fail;
971 			}
972 
973 			if (is_encrypted()) {
974 				log_crypt(buf, *start_lsn,
975 					  OS_FILE_LOG_BLOCK_SIZE, true);
976 			}
977 		}
978 
979 		ulint dl = log_block_get_data_len(buf);
980 		if (dl < LOG_BLOCK_HDR_SIZE
981 		    || (dl > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE
982 			&& dl != OS_FILE_LOG_BLOCK_SIZE)) {
983 			recv_sys->found_corrupt_log = true;
984 			goto fail;
985 		}
986 	}
987 
988 	if (recv_sys->report(time(NULL))) {
989 		ib::info() << "Read redo log up to LSN=" << *start_lsn;
990 		service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
991 			"Read redo log up to LSN=" LSN_PF,
992 			*start_lsn);
993 	}
994 
995 	if (*start_lsn != end_lsn) {
996 		goto loop;
997 	}
998 
999 	return(success);
1000 }
1001 
1002 
1003 
1004 /********************************************************//**
1005 Copies a log segment from the most up-to-date log group to the other log
1006 groups, so that they all contain the latest log data. Also writes the info
1007 about the latest checkpoint to the groups, and inits the fields in the group
1008 memory structs to up-to-date values. */
1009 static
1010 void
recv_synchronize_groups()1011 recv_synchronize_groups()
1012 {
1013 	const lsn_t recovered_lsn = recv_sys->recovered_lsn;
1014 
1015 	/* Read the last recovered log block to the recovery system buffer:
1016 	the block is always incomplete */
1017 
1018 	lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1019 					       OS_FILE_LOG_BLOCK_SIZE);
1020 	log_sys.log.read_log_seg(&start_lsn,
1021 				 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1022 	log_sys.log.set_fields(recovered_lsn);
1023 
1024 	/* Copy the checkpoint info to the log; remember that we have
1025 	incremented checkpoint_no by one, and the info will not be written
1026 	over the max checkpoint info, thus making the preservation of max
1027 	checkpoint info on disk certain */
1028 
1029 	if (!srv_read_only_mode) {
1030 		log_write_checkpoint_info(true, 0);
1031 		log_mutex_enter();
1032 	}
1033 }
1034 
1035 /** Check the consistency of a log header block.
1036 @param[in]	log header block
1037 @return true if ok */
1038 static
1039 bool
recv_check_log_header_checksum(const byte * buf)1040 recv_check_log_header_checksum(
1041 	const byte*	buf)
1042 {
1043 	return(log_block_get_checksum(buf)
1044 	       == log_block_calc_checksum_crc32(buf));
1045 }
1046 
1047 /** Find the latest checkpoint in the format-0 log header.
1048 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1049 @return error code or DB_SUCCESS */
1050 static MY_ATTRIBUTE((warn_unused_result))
1051 dberr_t
recv_find_max_checkpoint_0(ulint * max_field)1052 recv_find_max_checkpoint_0(ulint* max_field)
1053 {
1054 	ib_uint64_t	max_no = 0;
1055 	ib_uint64_t	checkpoint_no;
1056 	byte*		buf	= log_sys.checkpoint_buf;
1057 
1058 	ut_ad(log_sys.log.format == 0);
1059 
1060 	/** Offset of the first checkpoint checksum */
1061 	static const uint CHECKSUM_1 = 288;
1062 	/** Offset of the second checkpoint checksum */
1063 	static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
1064 	/** Most significant bits of the checkpoint offset */
1065 	static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
1066 	/** Least significant bits of the checkpoint offset */
1067 	static const uint OFFSET_LOW32 = 16;
1068 
1069 	bool found = false;
1070 
1071 	for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1072 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1073 		log_header_read(field);
1074 
1075 		if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
1076 		    != mach_read_from_4(buf + CHECKSUM_1)
1077 		    || static_cast<uint32_t>(
1078 			    ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
1079 					   CHECKSUM_2 - LOG_CHECKPOINT_LSN))
1080 		    != mach_read_from_4(buf + CHECKSUM_2)) {
1081 			DBUG_LOG("ib_log",
1082 				 "invalid pre-10.2.2 checkpoint " << field);
1083 			continue;
1084 		}
1085 
1086 		checkpoint_no = mach_read_from_8(
1087 			buf + LOG_CHECKPOINT_NO);
1088 
1089 		if (!log_crypt_101_read_checkpoint(buf)) {
1090 			ib::error() << "Decrypting checkpoint failed";
1091 			continue;
1092 		}
1093 
1094 		DBUG_PRINT("ib_log",
1095 			   ("checkpoint " UINT64PF " at " LSN_PF " found",
1096 			    checkpoint_no,
1097 			    mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
1098 
1099 		if (checkpoint_no >= max_no) {
1100 			found = true;
1101 			*max_field = field;
1102 			max_no = checkpoint_no;
1103 
1104 			log_sys.log.set_lsn(mach_read_from_8(
1105 				buf + LOG_CHECKPOINT_LSN));
1106 			log_sys.log.set_lsn_offset(
1107 				lsn_t(mach_read_from_4(buf + OFFSET_HIGH32))
1108 				<< 32
1109 				| mach_read_from_4(buf + OFFSET_LOW32));
1110 		}
1111 	}
1112 
1113 	if (found) {
1114 		return(DB_SUCCESS);
1115 	}
1116 
1117 	ib::error() << "Upgrade after a crash is not supported."
1118 		" This redo log was created before MariaDB 10.2.2,"
1119 		" and we did not find a valid checkpoint."
1120 		" Please follow the instructions at"
1121 		" https://mariadb.com/kb/en/library/upgrading/";
1122 	return(DB_ERROR);
1123 }
1124 
1125 /** Determine if a pre-MySQL 5.7.9/MariaDB 10.2.2 redo log is clean.
1126 @param[in]	lsn	checkpoint LSN
1127 @param[in]	crypt	whether the log might be encrypted
1128 @return error code
1129 @retval	DB_SUCCESS	if the redo log is clean
1130 @retval DB_ERROR	if the redo log is corrupted or dirty */
recv_log_format_0_recover(lsn_t lsn,bool crypt)1131 static dberr_t recv_log_format_0_recover(lsn_t lsn, bool crypt)
1132 {
1133 	log_mutex_enter();
1134 	const lsn_t	source_offset = log_sys.log.calc_lsn_offset(lsn);
1135 	log_mutex_exit();
1136 	const ulint	page_no = ulint(source_offset >> srv_page_size_shift);
1137 	byte*		buf = log_sys.buf;
1138 
1139 	static const char* NO_UPGRADE_RECOVERY_MSG =
1140 		"Upgrade after a crash is not supported."
1141 		" This redo log was created before MariaDB 10.2.2";
1142 
1143 	fil_io(IORequestLogRead, true,
1144 	       page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1145 	       univ_page_size,
1146 	       ulint((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1147 		     & (srv_page_size - 1)),
1148 	       OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1149 
1150 	if (log_block_calc_checksum_format_0(buf)
1151 	    != log_block_get_checksum(buf)
1152 	    && !log_crypt_101_read_block(buf)) {
1153 		ib::error() << NO_UPGRADE_RECOVERY_MSG
1154 			<< ", and it appears corrupted.";
1155 		return(DB_CORRUPTION);
1156 	}
1157 
1158 	if (log_block_get_data_len(buf)
1159 	    == (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1160 	} else if (crypt) {
1161 		ib::error() << "Cannot decrypt log for upgrading."
1162 			" The encrypted log was created"
1163 			" before MariaDB 10.2.2.";
1164 		return DB_ERROR;
1165 	} else {
1166 		ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1167 		return(DB_ERROR);
1168 	}
1169 
1170 	/* Mark the redo log for upgrading. */
1171 	srv_log_file_size = 0;
1172 	recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1173 		= recv_sys->scanned_lsn
1174 		= recv_sys->mlog_checkpoint_lsn = lsn;
1175 	log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1176 		= log_sys.lsn = log_sys.write_lsn
1177 		= log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1178 		= lsn;
1179 	log_sys.next_checkpoint_no = 0;
1180 	return(DB_SUCCESS);
1181 }
1182 
1183 /** Determine if a redo log from MariaDB 10.4 is clean.
1184 @return	error code
1185 @retval	DB_SUCCESS	if the redo log is clean
1186 @retval	DB_CORRUPTION	if the redo log is corrupted
1187 @retval	DB_ERROR	if the redo log is not empty */
recv_log_recover_10_4()1188 static dberr_t recv_log_recover_10_4()
1189 {
1190 	ut_ad(!log_sys.is_encrypted());
1191 	const lsn_t	lsn = log_sys.log.get_lsn();
1192 	const lsn_t	source_offset = log_sys.log.calc_lsn_offset(lsn);
1193 	const ulint	page_no
1194 		= (ulint) (source_offset / univ_page_size.physical());
1195 	byte*		buf = log_sys.buf;
1196 
1197 	fil_io(IORequestLogRead, true,
1198 	       page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
1199 	       univ_page_size,
1200 	       (ulint) ((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
1201 			% univ_page_size.physical()),
1202 	       OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
1203 
1204 	const ulint cksum = log_block_get_checksum(buf);
1205 
1206 	if (cksum != LOG_NO_CHECKSUM_MAGIC
1207 	    && cksum != log_block_calc_checksum_crc32(buf)) {
1208 		return DB_CORRUPTION;
1209 	}
1210 
1211 	/* On a clean shutdown, the redo log will be logically empty
1212 	after the checkpoint lsn. */
1213 
1214 	if (log_block_get_data_len(buf)
1215 	    != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1216 		return DB_ERROR;
1217 	}
1218 
1219 	/* Mark the redo log for downgrading. */
1220 	srv_log_file_size = 0;
1221 	recv_sys->parse_start_lsn = recv_sys->recovered_lsn
1222 		= recv_sys->scanned_lsn
1223 		= recv_sys->mlog_checkpoint_lsn = lsn;
1224 	log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1225 		= log_sys.lsn = log_sys.write_lsn
1226 		= log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
1227 		= lsn;
1228 	log_sys.next_checkpoint_no = 0;
1229 	return DB_SUCCESS;
1230 }
1231 
1232 /** Find the latest checkpoint in the log header.
1233 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1234 @return error code or DB_SUCCESS */
1235 dberr_t
recv_find_max_checkpoint(ulint * max_field)1236 recv_find_max_checkpoint(ulint* max_field)
1237 {
1238 	ib_uint64_t	max_no;
1239 	ib_uint64_t	checkpoint_no;
1240 	ulint		field;
1241 	byte*		buf;
1242 
1243 	max_no = 0;
1244 	*max_field = 0;
1245 
1246 	buf = log_sys.checkpoint_buf;
1247 
1248 	log_header_read(0);
1249 	/* Check the header page checksum. There was no
1250 	checksum in the first redo log format (version 0). */
1251 	log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1252 	log_sys.log.subformat = log_sys.log.format != LOG_HEADER_FORMAT_3_23
1253 		? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1254 		: 0;
1255 	if (log_sys.log.format != LOG_HEADER_FORMAT_3_23
1256 	    && !recv_check_log_header_checksum(buf)) {
1257 		ib::error() << "Invalid redo log header checksum.";
1258 		return(DB_CORRUPTION);
1259 	}
1260 
1261 	char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1262 
1263 	memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1264 	/* Ensure that the string is NUL-terminated. */
1265 	creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1266 
1267 	switch (log_sys.log.format) {
1268 	case LOG_HEADER_FORMAT_3_23:
1269 		return(recv_find_max_checkpoint_0(max_field));
1270 	case LOG_HEADER_FORMAT_10_2:
1271 	case LOG_HEADER_FORMAT_10_2 | LOG_HEADER_FORMAT_ENCRYPTED:
1272 	case LOG_HEADER_FORMAT_CURRENT:
1273 	case LOG_HEADER_FORMAT_CURRENT | LOG_HEADER_FORMAT_ENCRYPTED:
1274 	case LOG_HEADER_FORMAT_10_4:
1275 		/* We can only parse the unencrypted LOG_HEADER_FORMAT_10_4.
1276 		The encrypted format uses a larger redo log block trailer. */
1277 		break;
1278 	default:
1279 		ib::error() << "Unsupported redo log format."
1280 			" The redo log was created with " << creator << ".";
1281 		return(DB_ERROR);
1282 	}
1283 
1284 	for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1285 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1286 
1287 		log_header_read(field);
1288 
1289 		const ulint crc32 = log_block_calc_checksum_crc32(buf);
1290 		const ulint cksum = log_block_get_checksum(buf);
1291 
1292 		if (crc32 != cksum) {
1293 			DBUG_PRINT("ib_log",
1294 				   ("invalid checkpoint,"
1295 				    " at " ULINTPF
1296 				    ", checksum " ULINTPFx
1297 				    " expected " ULINTPFx,
1298 				    field, cksum, crc32));
1299 			continue;
1300 		}
1301 
1302 		if (log_sys.is_encrypted()
1303 		    && !log_crypt_read_checkpoint_buf(buf)) {
1304 			ib::error() << "Reading checkpoint"
1305 				" encryption info failed.";
1306 			continue;
1307 		}
1308 
1309 		checkpoint_no = mach_read_from_8(
1310 			buf + LOG_CHECKPOINT_NO);
1311 
1312 		DBUG_PRINT("ib_log",
1313 			   ("checkpoint " UINT64PF " at " LSN_PF " found",
1314 			    checkpoint_no, mach_read_from_8(
1315 				    buf + LOG_CHECKPOINT_LSN)));
1316 
1317 		if (checkpoint_no >= max_no) {
1318 			*max_field = field;
1319 			max_no = checkpoint_no;
1320 			log_sys.log.set_lsn(mach_read_from_8(
1321 				buf + LOG_CHECKPOINT_LSN));
1322 			log_sys.log.set_lsn_offset(mach_read_from_8(
1323 				buf + LOG_CHECKPOINT_OFFSET));
1324 			log_sys.next_checkpoint_no = checkpoint_no;
1325 		}
1326 	}
1327 
1328 	if (*max_field == 0) {
1329 		/* Before 10.2.2, we could get here during database
1330 		initialization if we created an ib_logfile0 file that
1331 		was filled with zeroes, and were killed. After
1332 		10.2.2, we would reject such a file already earlier,
1333 		when checking the file header. */
1334 		ib::error() << "No valid checkpoint found"
1335 			" (corrupted redo log)."
1336 			" You can try --innodb-force-recovery=6"
1337 			" as a last resort.";
1338 		return(DB_ERROR);
1339 	}
1340 
1341 	if (log_sys.log.format == LOG_HEADER_FORMAT_10_4) {
1342 		dberr_t err = recv_log_recover_10_4();
1343 		if (err != DB_SUCCESS) {
1344 			ib::error()
1345 				<< "Downgrade after a crash is not supported."
1346 				" The redo log was created with " << creator
1347 				<< (err == DB_ERROR
1348 				    ? "." : ", and it appears corrupted.");
1349 		}
1350 		return err;
1351 	}
1352 
1353 	return DB_SUCCESS;
1354 }
1355 
1356 /** Try to parse a single log record body and also applies it if
1357 specified.
1358 @param[in]	type		redo log entry type
1359 @param[in]	ptr		redo log record body
1360 @param[in]	end_ptr		end of buffer
1361 @param[in]	space_id	tablespace identifier
1362 @param[in]	page_no		page number
1363 @param[in]	apply		whether to apply the record
1364 @param[in,out]	block		buffer block, or NULL if
1365 a page log record should not be applied
1366 or if it is a MLOG_FILE_ operation
1367 @param[in,out]	mtr		mini-transaction, or NULL if
1368 a page log record should not be applied
1369 @return log record end, NULL if not a complete record */
1370 static
1371 byte*
recv_parse_or_apply_log_rec_body(mlog_id_t type,byte * ptr,byte * end_ptr,ulint space_id,ulint page_no,bool apply,buf_block_t * block,mtr_t * mtr)1372 recv_parse_or_apply_log_rec_body(
1373 	mlog_id_t	type,
1374 	byte*		ptr,
1375 	byte*		end_ptr,
1376 	ulint		space_id,
1377 	ulint		page_no,
1378 	bool		apply,
1379 	buf_block_t*	block,
1380 	mtr_t*		mtr)
1381 {
1382 	ut_ad(!block == !mtr);
1383 	ut_ad(!apply || recv_sys->mlog_checkpoint_lsn != 0);
1384 
1385 	switch (type) {
1386 	case MLOG_FILE_NAME:
1387 	case MLOG_FILE_DELETE:
1388 	case MLOG_FILE_CREATE2:
1389 	case MLOG_FILE_RENAME2:
1390 		ut_ad(block == NULL);
1391 		/* Collect the file names when parsing the log,
1392 		before applying any log records. */
1393 		return(fil_name_parse(ptr, end_ptr, space_id, page_no, type,
1394 				      apply));
1395 	case MLOG_INDEX_LOAD:
1396 		if (end_ptr < ptr + 8) {
1397 			return(NULL);
1398 		}
1399 		return(ptr + 8);
1400 	case MLOG_TRUNCATE:
1401 		if (log_truncate) {
1402 			ut_ad(srv_operation != SRV_OPERATION_NORMAL);
1403 			log_truncate();
1404 			recv_sys->found_corrupt_fs = true;
1405 			return NULL;
1406 		}
1407 		return(truncate_t::parse_redo_entry(ptr, end_ptr, space_id));
1408 
1409 	default:
1410 		break;
1411 	}
1412 
1413 	dict_index_t*	index	= NULL;
1414 	page_t*		page;
1415 	page_zip_des_t*	page_zip;
1416 #ifdef UNIV_DEBUG
1417 	ulint		page_type;
1418 #endif /* UNIV_DEBUG */
1419 
1420 	if (block) {
1421 		/* Applying a page log record. */
1422 		ut_ad(apply);
1423 		page = block->frame;
1424 		page_zip = buf_block_get_page_zip(block);
1425 		ut_d(page_type = fil_page_get_type(page));
1426 	} else if (apply
1427 		   && !is_predefined_tablespace(space_id)
1428 		   && recv_spaces.find(space_id) == recv_spaces.end()) {
1429 		if (recv_sys->recovered_lsn < recv_sys->mlog_checkpoint_lsn) {
1430 			/* We have not seen all records between the
1431 			checkpoint and MLOG_CHECKPOINT. There should be
1432 			a MLOG_FILE_DELETE for this tablespace later. */
1433 			recv_spaces.insert(
1434 				std::make_pair(space_id,
1435 					       file_name_t("", false)));
1436 			goto parse_log;
1437 		}
1438 
1439 		ib::error() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE"
1440 			" for redo log record " << type << " (page "
1441 			    << space_id << ":" << page_no << ") at "
1442 			    << recv_sys->recovered_lsn << ".";
1443 		recv_sys->found_corrupt_log = true;
1444 		return(NULL);
1445 	} else {
1446 parse_log:
1447 		/* Parsing a page log record. */
1448 		page = NULL;
1449 		page_zip = NULL;
1450 		ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1451 	}
1452 
1453 	const byte*	old_ptr = ptr;
1454 
1455 	switch (type) {
1456 #ifdef UNIV_LOG_LSN_DEBUG
1457 	case MLOG_LSN:
1458 		/* The LSN is checked in recv_parse_log_rec(). */
1459 		break;
1460 #endif /* UNIV_LOG_LSN_DEBUG */
1461 	case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1462 #ifdef UNIV_DEBUG
1463 		if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1464 		    && end_ptr >= ptr + 2) {
1465 			/* It is OK to set FIL_PAGE_TYPE and certain
1466 			list node fields on an empty page.  Any other
1467 			write is not OK. */
1468 
1469 			/* NOTE: There may be bogus assertion failures for
1470 			dict_hdr_create(), trx_rseg_header_create(),
1471 			trx_sys_create_doublewrite_buf(), and
1472 			trx_sysf_create().
1473 			These are only called during database creation. */
1474 			ulint	offs = mach_read_from_2(ptr);
1475 
1476 			switch (type) {
1477 			default:
1478 				ut_error;
1479 			case MLOG_2BYTES:
1480 				/* Note that this can fail when the
1481 				redo log been written with something
1482 				older than InnoDB Plugin 1.0.4. */
1483 				ut_ad(offs == FIL_PAGE_TYPE
1484 				      || srv_is_undo_tablespace(space_id)
1485 				      || offs == IBUF_TREE_SEG_HEADER
1486 				      + IBUF_HEADER + FSEG_HDR_OFFSET
1487 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1488 				      + PAGE_HEADER + FIL_ADDR_BYTE
1489 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1490 				      + PAGE_HEADER + FIL_ADDR_BYTE
1491 				      + FIL_ADDR_SIZE
1492 				      || offs == PAGE_BTR_SEG_LEAF
1493 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1494 				      || offs == PAGE_BTR_SEG_TOP
1495 				      + PAGE_HEADER + FSEG_HDR_OFFSET
1496 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1497 				      + PAGE_HEADER + FIL_ADDR_BYTE
1498 				      + 0 /*FLST_PREV*/
1499 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1500 				      + PAGE_HEADER + FIL_ADDR_BYTE
1501 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1502 				break;
1503 			case MLOG_4BYTES:
1504 				/* Note that this can fail when the
1505 				redo log been written with something
1506 				older than InnoDB Plugin 1.0.4. */
1507 				ut_ad(0
1508 				      /* fil_crypt_rotate_page() writes this */
1509 				      || offs == FIL_PAGE_SPACE_ID
1510 				      || srv_is_undo_tablespace(space_id)
1511 				      || offs == IBUF_TREE_SEG_HEADER
1512 				      + IBUF_HEADER + FSEG_HDR_SPACE
1513 				      || offs == IBUF_TREE_SEG_HEADER
1514 				      + IBUF_HEADER + FSEG_HDR_PAGE_NO
1515 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1516 				      + PAGE_HEADER/* flst_init */
1517 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1518 				      + PAGE_HEADER + FIL_ADDR_PAGE
1519 				      || offs == PAGE_BTR_IBUF_FREE_LIST
1520 				      + PAGE_HEADER + FIL_ADDR_PAGE
1521 				      + FIL_ADDR_SIZE
1522 				      || offs == PAGE_BTR_SEG_LEAF
1523 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1524 				      || offs == PAGE_BTR_SEG_LEAF
1525 				      + PAGE_HEADER + FSEG_HDR_SPACE
1526 				      || offs == PAGE_BTR_SEG_TOP
1527 				      + PAGE_HEADER + FSEG_HDR_PAGE_NO
1528 				      || offs == PAGE_BTR_SEG_TOP
1529 				      + PAGE_HEADER + FSEG_HDR_SPACE
1530 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1531 				      + PAGE_HEADER + FIL_ADDR_PAGE
1532 				      + 0 /*FLST_PREV*/
1533 				      || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1534 				      + PAGE_HEADER + FIL_ADDR_PAGE
1535 				      + FIL_ADDR_SIZE /*FLST_NEXT*/);
1536 				break;
1537 			}
1538 		}
1539 #endif /* UNIV_DEBUG */
1540 		ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1541 		if (ptr != NULL && page != NULL
1542 		    && page_no == 0 && type == MLOG_4BYTES) {
1543 			ulint	offs = mach_read_from_2(old_ptr);
1544 			switch (offs) {
1545 				fil_space_t*	space;
1546 				ulint		val;
1547 			default:
1548 				break;
1549 			case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1550 			case FSP_HEADER_OFFSET + FSP_SIZE:
1551 			case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1552 			case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1553 				space = fil_space_get(space_id);
1554 				ut_a(space != NULL);
1555 				val = mach_read_from_4(page + offs);
1556 
1557 				switch (offs) {
1558 				case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1559 					space->flags = val;
1560 					break;
1561 				case FSP_HEADER_OFFSET + FSP_SIZE:
1562 					space->size_in_header = val;
1563 					break;
1564 				case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1565 					space->free_limit = val;
1566 					break;
1567 				case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1568 					space->free_len = val;
1569 					ut_ad(val == flst_get_len(
1570 						      page + offs));
1571 					break;
1572 				}
1573 			}
1574 		}
1575 		break;
1576 	case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1577 		ut_ad(!page || fil_page_type_is_index(page_type));
1578 
1579 		if (NULL != (ptr = mlog_parse_index(
1580 				     ptr, end_ptr,
1581 				     type == MLOG_COMP_REC_INSERT,
1582 				     &index))) {
1583 			ut_a(!page
1584 			     || (ibool)!!page_is_comp(page)
1585 			     == dict_table_is_comp(index->table));
1586 			ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1587 							block, index, mtr);
1588 		}
1589 		break;
1590 	case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1591 		ut_ad(!page || fil_page_type_is_index(page_type));
1592 
1593 		if (NULL != (ptr = mlog_parse_index(
1594 				     ptr, end_ptr,
1595 				     type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1596 				     &index))) {
1597 			ut_a(!page
1598 			     || (ibool)!!page_is_comp(page)
1599 			     == dict_table_is_comp(index->table));
1600 			ptr = btr_cur_parse_del_mark_set_clust_rec(
1601 				ptr, end_ptr, page, page_zip, index);
1602 		}
1603 		break;
1604 	case MLOG_REC_SEC_DELETE_MARK:
1605 		ut_ad(!page || fil_page_type_is_index(page_type));
1606 		ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1607 							 page, page_zip);
1608 		break;
1609 	case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1610 		ut_ad(!page || fil_page_type_is_index(page_type));
1611 
1612 		if (NULL != (ptr = mlog_parse_index(
1613 				     ptr, end_ptr,
1614 				     type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1615 				     &index))) {
1616 			ut_a(!page
1617 			     || (ibool)!!page_is_comp(page)
1618 			     == dict_table_is_comp(index->table));
1619 			ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1620 							    page_zip, index);
1621 		}
1622 		break;
1623 	case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1624 	case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1625 		ut_ad(!page || fil_page_type_is_index(page_type));
1626 
1627 		if (NULL != (ptr = mlog_parse_index(
1628 				     ptr, end_ptr,
1629 				     type == MLOG_COMP_LIST_END_DELETE
1630 				     || type == MLOG_COMP_LIST_START_DELETE,
1631 				     &index))) {
1632 			ut_a(!page
1633 			     || (ibool)!!page_is_comp(page)
1634 			     == dict_table_is_comp(index->table));
1635 			ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1636 							 block, index, mtr);
1637 		}
1638 		break;
1639 	case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1640 		ut_ad(!page || fil_page_type_is_index(page_type));
1641 
1642 		if (NULL != (ptr = mlog_parse_index(
1643 				     ptr, end_ptr,
1644 				     type == MLOG_COMP_LIST_END_COPY_CREATED,
1645 				     &index))) {
1646 			ut_a(!page
1647 			     || (ibool)!!page_is_comp(page)
1648 			     == dict_table_is_comp(index->table));
1649 			ptr = page_parse_copy_rec_list_to_created_page(
1650 				ptr, end_ptr, block, index, mtr);
1651 		}
1652 		break;
1653 	case MLOG_PAGE_REORGANIZE:
1654 	case MLOG_COMP_PAGE_REORGANIZE:
1655 	case MLOG_ZIP_PAGE_REORGANIZE:
1656 		ut_ad(!page || fil_page_type_is_index(page_type));
1657 
1658 		if (NULL != (ptr = mlog_parse_index(
1659 				     ptr, end_ptr,
1660 				     type != MLOG_PAGE_REORGANIZE,
1661 				     &index))) {
1662 			ut_a(!page
1663 			     || (ibool)!!page_is_comp(page)
1664 			     == dict_table_is_comp(index->table));
1665 			ptr = btr_parse_page_reorganize(
1666 				ptr, end_ptr, index,
1667 				type == MLOG_ZIP_PAGE_REORGANIZE,
1668 				block, mtr);
1669 		}
1670 		break;
1671 	case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
1672 		/* Allow anything in page_type when creating a page. */
1673 		ut_a(!page_zip);
1674 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
1675 		break;
1676 	case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
1677 		page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
1678 				  true);
1679 		break;
1680 	case MLOG_UNDO_INSERT:
1681 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1682 		ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
1683 		break;
1684 	case MLOG_UNDO_ERASE_END:
1685 		if (page) {
1686 			ut_ad(page_type == FIL_PAGE_UNDO_LOG);
1687 			trx_undo_erase_page_end(page);
1688 		}
1689 		break;
1690 	case MLOG_UNDO_INIT:
1691 		/* Allow anything in page_type when creating a page. */
1692 		ptr = trx_undo_parse_page_init(ptr, end_ptr, page);
1693 		break;
1694 	case MLOG_UNDO_HDR_REUSE:
1695 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1696 		ptr = trx_undo_parse_page_header_reuse(ptr, end_ptr, page);
1697 		break;
1698 	case MLOG_UNDO_HDR_CREATE:
1699 		ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1700 		ptr = trx_undo_parse_page_header(ptr, end_ptr, page, mtr);
1701 		break;
1702 	case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
1703 		ut_ad(!page || fil_page_type_is_index(page_type));
1704 		/* On a compressed page, MLOG_COMP_REC_MIN_MARK
1705 		will be followed by MLOG_COMP_REC_DELETE
1706 		or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
1707 		in the same mini-transaction. */
1708 		ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
1709 		ptr = btr_parse_set_min_rec_mark(
1710 			ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
1711 			page, mtr);
1712 		break;
1713 	case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
1714 		ut_ad(!page || fil_page_type_is_index(page_type));
1715 
1716 		if (NULL != (ptr = mlog_parse_index(
1717 				     ptr, end_ptr,
1718 				     type == MLOG_COMP_REC_DELETE,
1719 				     &index))) {
1720 			ut_a(!page
1721 			     || (ibool)!!page_is_comp(page)
1722 			     == dict_table_is_comp(index->table));
1723 			ptr = page_cur_parse_delete_rec(ptr, end_ptr,
1724 							block, index, mtr);
1725 		}
1726 		break;
1727 	case MLOG_IBUF_BITMAP_INIT:
1728 		/* Allow anything in page_type when creating a page. */
1729 		ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
1730 		break;
1731 	case MLOG_INIT_FILE_PAGE2:
1732 		/* Allow anything in page_type when creating a page. */
1733 		if (block) fsp_apply_init_file_page(block);
1734 		break;
1735 	case MLOG_WRITE_STRING:
1736 		ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
1737 		break;
1738 	case MLOG_ZIP_WRITE_NODE_PTR:
1739 		ut_ad(!page || fil_page_type_is_index(page_type));
1740 		ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
1741 						    page, page_zip);
1742 		break;
1743 	case MLOG_ZIP_WRITE_BLOB_PTR:
1744 		ut_ad(!page || fil_page_type_is_index(page_type));
1745 		ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
1746 						    page, page_zip);
1747 		break;
1748 	case MLOG_ZIP_WRITE_HEADER:
1749 		ut_ad(!page || fil_page_type_is_index(page_type));
1750 		ptr = page_zip_parse_write_header(ptr, end_ptr,
1751 						  page, page_zip);
1752 		break;
1753 	case MLOG_ZIP_PAGE_COMPRESS:
1754 		/* Allow anything in page_type when creating a page. */
1755 		ptr = page_zip_parse_compress(ptr, end_ptr, block);
1756 		break;
1757 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
1758 		if (NULL != (ptr = mlog_parse_index(
1759 				ptr, end_ptr, TRUE, &index))) {
1760 
1761 			ut_a(!page || ((ibool)!!page_is_comp(page)
1762 				== dict_table_is_comp(index->table)));
1763 			ptr = page_zip_parse_compress_no_data(
1764 				ptr, end_ptr, page, page_zip, index);
1765 		}
1766 		break;
1767 	case MLOG_ZIP_WRITE_TRX_ID:
1768 		/* This must be a clustered index leaf page. */
1769 		ut_ad(!page || page_type == FIL_PAGE_INDEX);
1770 		ptr = page_zip_parse_write_trx_id(ptr, end_ptr,
1771 						  page, page_zip);
1772 		break;
1773 	case MLOG_FILE_WRITE_CRYPT_DATA:
1774 		dberr_t err;
1775 		ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, &err));
1776 
1777 		if (err != DB_SUCCESS) {
1778 			recv_sys->found_corrupt_log = TRUE;
1779 		}
1780 		break;
1781 	default:
1782 		ptr = NULL;
1783 		ib::error() << "Incorrect log record type "
1784 			<< ib::hex(unsigned(type));
1785 
1786 		recv_sys->found_corrupt_log = true;
1787 	}
1788 
1789 	if (index) {
1790 		dict_table_t*	table = index->table;
1791 
1792 		dict_mem_index_free(index);
1793 		dict_mem_table_free(table);
1794 	}
1795 
1796 	return(ptr);
1797 }
1798 
1799 /*********************************************************************//**
1800 Calculates the fold value of a page file address: used in inserting or
1801 searching for a log record in the hash table.
1802 @return folded value */
1803 UNIV_INLINE
1804 ulint
recv_fold(ulint space,ulint page_no)1805 recv_fold(
1806 /*======*/
1807 	ulint	space,	/*!< in: space */
1808 	ulint	page_no)/*!< in: page number */
1809 {
1810 	return(ut_fold_ulint_pair(space, page_no));
1811 }
1812 
1813 /*********************************************************************//**
1814 Calculates the hash value of a page file address: used in inserting or
1815 searching for a log record in the hash table.
1816 @return folded value */
1817 UNIV_INLINE
1818 ulint
recv_hash(ulint space,ulint page_no)1819 recv_hash(
1820 /*======*/
1821 	ulint	space,	/*!< in: space */
1822 	ulint	page_no)/*!< in: page number */
1823 {
1824 	return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
1825 }
1826 
1827 /*********************************************************************//**
1828 Gets the hashed file address struct for a page.
1829 @return file address struct, NULL if not found from the hash table */
1830 static
1831 recv_addr_t*
recv_get_fil_addr_struct(ulint space,ulint page_no)1832 recv_get_fil_addr_struct(
1833 /*=====================*/
1834 	ulint	space,	/*!< in: space id */
1835 	ulint	page_no)/*!< in: page number */
1836 {
1837 	ut_ad(mutex_own(&recv_sys->mutex));
1838 
1839 	recv_addr_t*	recv_addr;
1840 
1841 	for (recv_addr = static_cast<recv_addr_t*>(
1842 			HASH_GET_FIRST(recv_sys->addr_hash,
1843 				       recv_hash(space, page_no)));
1844 	     recv_addr != 0;
1845 	     recv_addr = static_cast<recv_addr_t*>(
1846 		     HASH_GET_NEXT(addr_hash, recv_addr))) {
1847 
1848 		if (recv_addr->space == space
1849 		    && recv_addr->page_no == page_no) {
1850 
1851 			return(recv_addr);
1852 		}
1853 	}
1854 
1855 	return(NULL);
1856 }
1857 
1858 /*******************************************************************//**
1859 Adds a new log record to the hash table of log records. */
1860 static
1861 void
recv_add_to_hash_table(mlog_id_t type,ulint space,ulint page_no,byte * body,byte * rec_end,lsn_t start_lsn,lsn_t end_lsn)1862 recv_add_to_hash_table(
1863 /*===================*/
1864 	mlog_id_t	type,		/*!< in: log record type */
1865 	ulint		space,		/*!< in: space id */
1866 	ulint		page_no,	/*!< in: page number */
1867 	byte*		body,		/*!< in: log record body */
1868 	byte*		rec_end,	/*!< in: log record end */
1869 	lsn_t		start_lsn,	/*!< in: start lsn of the mtr */
1870 	lsn_t		end_lsn)	/*!< in: end lsn of the mtr */
1871 {
1872 	recv_t*		recv;
1873 	ulint		len;
1874 	recv_data_t*	recv_data;
1875 	recv_data_t**	prev_field;
1876 	recv_addr_t*	recv_addr;
1877 
1878 	ut_ad(type != MLOG_FILE_DELETE);
1879 	ut_ad(type != MLOG_FILE_CREATE2);
1880 	ut_ad(type != MLOG_FILE_RENAME2);
1881 	ut_ad(type != MLOG_FILE_NAME);
1882 	ut_ad(type != MLOG_DUMMY_RECORD);
1883 	ut_ad(type != MLOG_CHECKPOINT);
1884 	ut_ad(type != MLOG_INDEX_LOAD);
1885 	ut_ad(type != MLOG_TRUNCATE);
1886 
1887 	len = ulint(rec_end - body);
1888 
1889 	recv = static_cast<recv_t*>(
1890 		mem_heap_alloc(recv_sys->heap, sizeof(recv_t)));
1891 
1892 	recv->type = type;
1893 	recv->len = ulint(rec_end - body);
1894 	recv->start_lsn = start_lsn;
1895 	recv->end_lsn = end_lsn;
1896 
1897 	recv_addr = recv_get_fil_addr_struct(space, page_no);
1898 
1899 	if (recv_addr == NULL) {
1900 		recv_addr = static_cast<recv_addr_t*>(
1901 			mem_heap_alloc(recv_sys->heap, sizeof(recv_addr_t)));
1902 
1903 		recv_addr->space = space;
1904 		recv_addr->page_no = page_no;
1905 		recv_addr->state = RECV_NOT_PROCESSED;
1906 
1907 		UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
1908 
1909 		HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
1910 			    recv_fold(space, page_no), recv_addr);
1911 		recv_sys->n_addrs++;
1912 	}
1913 
1914 	switch (type) {
1915 	case MLOG_INIT_FILE_PAGE2:
1916 	case MLOG_ZIP_PAGE_COMPRESS:
1917 		/* Ignore any earlier redo log records for this page. */
1918 		ut_ad(recv_addr->state == RECV_NOT_PROCESSED
1919 		      || recv_addr->state == RECV_WILL_NOT_READ);
1920 		recv_addr->state = RECV_WILL_NOT_READ;
1921 		mlog_init.add(space, page_no, start_lsn);
1922 	default:
1923 		break;
1924 	}
1925 
1926 	UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
1927 
1928 	prev_field = &(recv->data);
1929 
1930 	/* Store the log record body in chunks of less than srv_page_size:
1931 	recv_sys->heap grows into the buffer pool, and bigger chunks could not
1932 	be allocated */
1933 
1934 	while (rec_end > body) {
1935 
1936 		len = ulint(rec_end - body);
1937 
1938 		if (len > RECV_DATA_BLOCK_SIZE) {
1939 			len = RECV_DATA_BLOCK_SIZE;
1940 		}
1941 
1942 		recv_data = static_cast<recv_data_t*>(
1943 			mem_heap_alloc(recv_sys->heap,
1944 				       sizeof(recv_data_t) + len));
1945 
1946 		*prev_field = recv_data;
1947 
1948 		memcpy(recv_data + 1, body, len);
1949 
1950 		prev_field = &(recv_data->next);
1951 
1952 		body += len;
1953 	}
1954 
1955 	*prev_field = NULL;
1956 }
1957 
1958 /*********************************************************************//**
1959 Copies the log record body from recv to buf. */
1960 static
1961 void
recv_data_copy_to_buf(byte * buf,recv_t * recv)1962 recv_data_copy_to_buf(
1963 /*==================*/
1964 	byte*	buf,	/*!< in: buffer of length at least recv->len */
1965 	recv_t*	recv)	/*!< in: log record */
1966 {
1967 	recv_data_t*	recv_data;
1968 	ulint		part_len;
1969 	ulint		len;
1970 
1971 	len = recv->len;
1972 	recv_data = recv->data;
1973 
1974 	while (len > 0) {
1975 		if (len > RECV_DATA_BLOCK_SIZE) {
1976 			part_len = RECV_DATA_BLOCK_SIZE;
1977 		} else {
1978 			part_len = len;
1979 		}
1980 
1981 		ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
1982 			  part_len);
1983 		buf += part_len;
1984 		len -= part_len;
1985 
1986 		recv_data = recv_data->next;
1987 	}
1988 }
1989 
1990 /** Apply the hashed log records to the page, if the page lsn is less than the
1991 lsn of a log record.
1992 @param[in,out]	block		buffer pool page
1993 @param[in,out]	mtr		mini-transaction
1994 @param[in,out]	recv_addr	recovery address
1995 @param[in]	init_lsn	the initial LSN where to start recovery */
recv_recover_page(buf_block_t * block,mtr_t & mtr,recv_addr_t * recv_addr,lsn_t init_lsn=0)1996 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
1997 			      recv_addr_t* recv_addr, lsn_t init_lsn = 0)
1998 {
1999 	page_t*		page;
2000 	page_zip_des_t*	page_zip;
2001 
2002 	ut_ad(mutex_own(&recv_sys->mutex));
2003 	ut_ad(recv_sys->apply_log_recs);
2004 	ut_ad(recv_needed_recovery);
2005 	ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
2006 	ut_ad(recv_addr->state != RECV_PROCESSED);
2007 
2008 	if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2009 		fprintf(stderr, "Applying log to page %u:%u\n",
2010 			recv_addr->space, recv_addr->page_no);
2011 	}
2012 
2013 	DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
2014 
2015 	recv_addr->state = RECV_BEING_PROCESSED;
2016 	mutex_exit(&recv_sys->mutex);
2017 
2018 	page = block->frame;
2019 	page_zip = buf_block_get_page_zip(block);
2020 
2021 	/* The page may have been modified in the buffer pool.
2022 	FIL_PAGE_LSN would only be updated right before flushing. */
2023 	lsn_t page_lsn = buf_page_get_newest_modification(&block->page);
2024 	if (!page_lsn) {
2025 		page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
2026 	}
2027 
2028 	lsn_t start_lsn = 0, end_lsn = 0;
2029 	fil_space_t* space;
2030 
2031 	if (srv_is_tablespace_truncated(recv_addr->space)) {
2032 		/* The table will be truncated after applying
2033 		normal redo log records. */
2034 		goto skip_log;
2035 	}
2036 
2037 	space = fil_space_acquire(recv_addr->space);
2038 	if (!space) {
2039 		goto skip_log;
2040 	}
2041 
2042 	for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
2043 	     recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
2044 		ut_ad(recv->start_lsn);
2045 		end_lsn = recv->end_lsn;
2046 		ut_ad(end_lsn <= log_sys.log.scanned_lsn);
2047 
2048 		if (recv->start_lsn < page_lsn) {
2049 			/* Ignore this record, because there are later changes
2050 			for this page. */
2051 			DBUG_LOG("ib_log", "apply skip "
2052 				 << get_mlog_string(recv->type)
2053 				 << " LSN " << recv->start_lsn << " < "
2054 				 << page_lsn);
2055 		} else if (recv->start_lsn < init_lsn) {
2056 			DBUG_LOG("ib_log", "init skip "
2057 				 << get_mlog_string(recv->type)
2058 				 << " LSN " << recv->start_lsn << " < "
2059 				 << init_lsn);
2060 		} else if (srv_was_tablespace_truncated(space)
2061 			   && recv->start_lsn
2062 			   < truncate_t::get_truncated_tablespace_init_lsn(
2063 				   recv_addr->space)) {
2064 			/* If per-table tablespace was truncated and
2065 			there exist REDO records before truncate that
2066 			are to be applied as part of recovery
2067 			(checkpoint didn't happen since truncate was
2068 			done) skip such records using lsn check as
2069 			they may not stand valid post truncate. */
2070 		} else {
2071 			if (!start_lsn) {
2072 				start_lsn = recv->start_lsn;
2073 			}
2074 
2075 			if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2076 				fprintf(stderr, "apply " LSN_PF ":"
2077 					" %d len " ULINTPF " page %u:%u\n",
2078 					recv->start_lsn, recv->type, recv->len,
2079 					recv_addr->space, recv_addr->page_no);
2080 			}
2081 
2082 			DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
2083 				 << get_mlog_string(recv->type)
2084 				 << " len " << recv->len
2085 				 << " page " << block->page.id);
2086 
2087 			byte* buf;
2088 
2089 			if (recv->len > RECV_DATA_BLOCK_SIZE) {
2090 				/* We have to copy the record body to
2091 				a separate buffer */
2092 				buf = static_cast<byte*>
2093 					(ut_malloc_nokey(recv->len));
2094 				recv_data_copy_to_buf(buf, recv);
2095 			} else {
2096 				buf = reinterpret_cast<byte*>(recv->data)
2097 					+ sizeof *recv->data;
2098 			}
2099 
2100 			recv_parse_or_apply_log_rec_body(
2101 				recv->type, buf, buf + recv->len,
2102 				block->page.id.space(),
2103 				block->page.id.page_no(), true, block, &mtr);
2104 
2105 			end_lsn = recv->start_lsn + recv->len;
2106 			mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
2107 			mach_write_to_8(srv_page_size
2108 					- FIL_PAGE_END_LSN_OLD_CHKSUM
2109 					+ page, end_lsn);
2110 
2111 			if (page_zip) {
2112 				mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
2113 						end_lsn);
2114 			}
2115 
2116 			if (recv->len > RECV_DATA_BLOCK_SIZE) {
2117 				ut_free(buf);
2118 			}
2119 		}
2120 	}
2121 
2122 	space->release();
2123 
2124 skip_log:
2125 #ifdef UNIV_ZIP_DEBUG
2126 	ut_ad(!fil_page_index_page_check(page)
2127 	      || !page_zip
2128 	      || page_zip_validate_low(page_zip, page, NULL, FALSE));
2129 #endif /* UNIV_ZIP_DEBUG */
2130 
2131 	if (start_lsn) {
2132 		log_flush_order_mutex_enter();
2133 		buf_flush_recv_note_modification(block, start_lsn, end_lsn);
2134 		log_flush_order_mutex_exit();
2135 	}
2136 
2137 	/* Make sure that committing mtr does not change the modification
2138 	lsn values of page */
2139 
2140 	mtr.discard_modifications();
2141 	mtr.commit();
2142 
2143 	time_t now = time(NULL);
2144 
2145 	mutex_enter(&recv_sys->mutex);
2146 
2147 	if (recv_max_page_lsn < page_lsn) {
2148 		recv_max_page_lsn = page_lsn;
2149 	}
2150 
2151 	ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
2152 	recv_addr->state = RECV_PROCESSED;
2153 
2154 	ut_a(recv_sys->n_addrs > 0);
2155 	if (ulint n = --recv_sys->n_addrs) {
2156 		if (recv_sys->report(now)) {
2157 			ib::info() << "To recover: " << n << " pages from log";
2158 			service_manager_extend_timeout(
2159 				INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2160 		}
2161 	}
2162 }
2163 
2164 /** Reduces recv_sys->n_addrs for the corrupted page.
2165 This function should called when srv_force_recovery > 0.
2166 @param[in]	page_id	page id of the corrupted page */
recv_recover_corrupt_page(page_id_t page_id)2167 void recv_recover_corrupt_page(page_id_t page_id)
2168 {
2169 	ut_ad(srv_force_recovery);
2170 	mutex_enter(&recv_sys->mutex);
2171 
2172 	if (!recv_sys->apply_log_recs) {
2173 	} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2174 			   page_id.space(), page_id.page_no())) {
2175 		switch (recv_addr->state) {
2176 		case RECV_WILL_NOT_READ:
2177 			ut_ad(!"wrong state");
2178 			break;
2179 		case RECV_BEING_PROCESSED:
2180 		case RECV_PROCESSED:
2181 			break;
2182 		default:
2183 			recv_addr->state = RECV_PROCESSED;
2184 			ut_ad(recv_sys->n_addrs);
2185 			recv_sys->n_addrs--;
2186 		}
2187 	}
2188 
2189 	mutex_exit(&recv_sys->mutex);
2190 }
2191 
2192 /** Apply any buffered redo log to a page that was just read from a data file.
2193 @param[in,out]	bpage	buffer pool page */
recv_recover_page(buf_page_t * bpage)2194 void recv_recover_page(buf_page_t* bpage)
2195 {
2196 	mtr_t mtr;
2197 	mtr.start();
2198 	mtr.set_log_mode(MTR_LOG_NONE);
2199 
2200 	ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
2201 	buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2202 
2203 	/* Move the ownership of the x-latch on the page to
2204 	this OS thread, so that we can acquire a second
2205 	x-latch on it.  This is needed for the operations to
2206 	the page to pass the debug checks. */
2207 	rw_lock_x_lock_move_ownership(&block->lock);
2208 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2209 	ibool	success = buf_page_get_known_nowait(
2210 		RW_X_LATCH, block, BUF_KEEP_OLD,
2211 		__FILE__, __LINE__, &mtr);
2212 	ut_a(success);
2213 
2214 	mutex_enter(&recv_sys->mutex);
2215 	if (!recv_sys->apply_log_recs) {
2216 	} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2217 			   bpage->id.space(), bpage->id.page_no())) {
2218 		switch (recv_addr->state) {
2219 		case RECV_BEING_PROCESSED:
2220 		case RECV_PROCESSED:
2221 			break;
2222 		default:
2223 			recv_recover_page(block, mtr, recv_addr);
2224 			goto func_exit;
2225 		}
2226 	}
2227 
2228 	mtr.commit();
2229 func_exit:
2230 	mutex_exit(&recv_sys->mutex);
2231 	ut_ad(mtr.has_committed());
2232 }
2233 
2234 /** Reads in pages which have hashed log records, from an area around a given
2235 page number.
2236 @param[in]	page_id	page id */
recv_read_in_area(const page_id_t page_id)2237 static void recv_read_in_area(const page_id_t page_id)
2238 {
2239 	ulint	page_nos[RECV_READ_AHEAD_AREA];
2240 	ulint	page_no = page_id.page_no()
2241 		- (page_id.page_no() % RECV_READ_AHEAD_AREA);
2242 	ulint*	p = page_nos;
2243 
2244 	for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
2245 	     page_no < up_limit; page_no++) {
2246 		recv_addr_t* recv_addr = recv_get_fil_addr_struct(
2247 			page_id.space(), page_no);
2248 		if (recv_addr
2249 		    && recv_addr->state == RECV_NOT_PROCESSED
2250 		    && !buf_page_peek(page_id_t(page_id.space(), page_no))) {
2251 			recv_addr->state = RECV_BEING_READ;
2252 			*p++ = page_no;
2253 		}
2254 	}
2255 
2256 	mutex_exit(&recv_sys->mutex);
2257 	buf_read_recv_pages(FALSE, page_id.space(), page_nos,
2258 			    ulint(p - page_nos));
2259 	mutex_enter(&recv_sys->mutex);
2260 }
2261 
2262 /** This is another low level function for the recovery system
2263 to create a page which has buffered page intialization redo log records.
2264 @param[in]	page_id		page to be created using redo logs
2265 @param[in,out]	recv_addr	Hashed redo logs for the given page id
2266 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id,recv_addr_t * recv_addr)2267 static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id,
2268                                                   recv_addr_t* recv_addr)
2269 {
2270   mtr_t mtr;
2271   mlog_init_t::init &i= mlog_init.last(page_id);
2272   const lsn_t end_lsn= UT_LIST_GET_LAST(recv_addr->rec_list)->end_lsn;
2273 
2274   if (end_lsn < i.lsn)
2275   {
2276     DBUG_LOG("ib_log", "skip log for page "
2277              << page_id
2278              << " LSN " << end_lsn
2279              << " < " << i.lsn);
2280     recv_addr->state= RECV_PROCESSED;
2281 ignore:
2282     ut_a(recv_sys->n_addrs);
2283     recv_sys->n_addrs--;
2284     return NULL;
2285   }
2286 
2287   fil_space_t *space= fil_space_acquire(recv_addr->space);
2288   if (!space)
2289   {
2290     recv_addr->state= RECV_PROCESSED;
2291     goto ignore;
2292   }
2293 
2294   if (space->enable_lsn)
2295   {
2296 init_fail:
2297     space->release();
2298     recv_addr->state= RECV_NOT_PROCESSED;
2299     return NULL;
2300   }
2301 
2302   /* Determine if a tablespace could be for an internal table
2303   for FULLTEXT INDEX. For those tables, no MLOG_INDEX_LOAD record
2304   used to be written when redo logging was disabled. Hence, we
2305   cannot optimize away page reads, because all the redo
2306   log records for initializing and modifying the page in the
2307   past could be older than the page in the data file.
2308 
2309   The check is too broad, causing all
2310   tables whose names start with FTS_ to skip the optimization. */
2311 
2312   if (strstr(space->name, "/FTS_"))
2313     goto init_fail;
2314 
2315   mtr.start();
2316   mtr.set_log_mode(MTR_LOG_NONE);
2317   buf_block_t *block= buf_page_create(page_id, page_size_t(space->flags),
2318                                        &mtr);
2319   if (recv_addr->state == RECV_PROCESSED)
2320     /* The page happened to exist in the buffer pool, or it was
2321     just being read in. Before buf_page_get_with_no_latch() returned,
2322     all changes must have been applied to the page already. */
2323     mtr.commit();
2324   else
2325   {
2326     i.created= true;
2327     buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2328     recv_recover_page(block, mtr, recv_addr, i.lsn);
2329     ut_ad(mtr.has_committed());
2330   }
2331 
2332   space->release();
2333   return block;
2334 }
2335 
2336 /** This is a low level function for the recovery system
2337 to create a page which has buffered intialized redo log records.
2338 @param[in]      page_id page to be created using redo logs
2339 @return whether the page creation successfully */
recv_recovery_create_page_low(const page_id_t page_id)2340 buf_block_t* recv_recovery_create_page_low(const page_id_t page_id)
2341 {
2342   buf_block_t* block= NULL;
2343   mutex_enter(&recv_sys->mutex);
2344   recv_addr_t* recv_addr= recv_get_fil_addr_struct(page_id.space(),
2345                                                    page_id.page_no());
2346   if (recv_addr && recv_addr->state == RECV_WILL_NOT_READ)
2347   {
2348     block= recv_recovery_create_page_low(page_id, recv_addr);
2349   }
2350   mutex_exit(&recv_sys->mutex);
2351   return block;
2352 }
2353 
2354 /** Apply the hash table of stored log records to persistent data pages.
2355 @param[in]	last_batch	whether the change buffer merge will be
2356 				performed as part of the operation */
recv_apply_hashed_log_recs(bool last_batch)2357 void recv_apply_hashed_log_recs(bool last_batch)
2358 {
2359 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
2360 	      || is_mariabackup_restore_or_export());
2361 
2362 	mutex_enter(&recv_sys->mutex);
2363 
2364 	while (recv_sys->apply_batch_on) {
2365 		bool abort = recv_sys->found_corrupt_log;
2366 		mutex_exit(&recv_sys->mutex);
2367 
2368 		if (abort) {
2369 			return;
2370 		}
2371 
2372 		os_thread_sleep(500000);
2373 		mutex_enter(&recv_sys->mutex);
2374 	}
2375 
2376 	ut_ad(!last_batch == log_mutex_own());
2377 
2378 	recv_no_ibuf_operations
2379 		= !last_batch || is_mariabackup_restore_or_export();
2380 
2381 	if (ulint n = recv_sys->n_addrs) {
2382 		const char* msg = last_batch
2383 			? "Starting final batch to recover "
2384 			: "Starting a batch to recover ";
2385 		ib::info() << msg << n << " pages from redo log.";
2386 		sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
2387 			   msg, n);
2388 	}
2389 	recv_sys->apply_log_recs = TRUE;
2390 	recv_sys->apply_batch_on = TRUE;
2391 
2392 	for (ulint id = srv_undo_tablespaces_open; id--; ) {
2393 		recv_sys_t::trunc& t = recv_sys->truncated_undo_spaces[id];
2394 		if (t.lsn) {
2395 			recv_addr_trim(id + srv_undo_space_id_start, t.pages,
2396 				       t.lsn);
2397 		}
2398 	}
2399 
2400 	mtr_t mtr;
2401 
2402 	for (ulint i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
2403 		for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
2404 			     HASH_GET_FIRST(recv_sys->addr_hash, i));
2405 		     recv_addr;
2406 		     recv_addr = static_cast<recv_addr_t*>(
2407 				HASH_GET_NEXT(addr_hash, recv_addr))) {
2408 			if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
2409 ignore:
2410 				ut_a(recv_sys->n_addrs);
2411 				recv_sys->n_addrs--;
2412 				continue;
2413 			}
2414 
2415 			switch (recv_addr->state) {
2416 			case RECV_BEING_READ:
2417 			case RECV_BEING_PROCESSED:
2418 			case RECV_PROCESSED:
2419 				continue;
2420 			case RECV_DISCARDED:
2421 				goto ignore;
2422 			case RECV_NOT_PROCESSED:
2423 			case RECV_WILL_NOT_READ:
2424 				break;
2425 			}
2426 
2427 			if (srv_is_tablespace_truncated(recv_addr->space)) {
2428 				/* Avoid applying REDO log for the tablespace
2429 				that is schedule for TRUNCATE. */
2430 				recv_addr->state = RECV_DISCARDED;
2431 				goto ignore;
2432 			}
2433 
2434 			const page_id_t page_id(recv_addr->space,
2435 						recv_addr->page_no);
2436 
2437 			if (recv_addr->state == RECV_NOT_PROCESSED) {
2438 apply:
2439 				mtr.start();
2440 				mtr.set_log_mode(MTR_LOG_NONE);
2441 				if (buf_block_t* block = buf_page_get_low(
2442 					    page_id, univ_page_size,
2443 					    RW_X_LATCH, NULL,
2444 					    BUF_GET_IF_IN_POOL,
2445 					    __FILE__, __LINE__, &mtr, NULL)) {
2446 					buf_block_dbg_add_level(
2447 						block, SYNC_NO_ORDER_CHECK);
2448 					recv_recover_page(block, mtr,
2449 							  recv_addr);
2450 					ut_ad(mtr.has_committed());
2451 				} else {
2452 					mtr.commit();
2453 					recv_read_in_area(page_id);
2454 				}
2455 			} else if (!recv_recovery_create_page_low(
2456 					page_id, recv_addr)) {
2457 				goto apply;
2458 			}
2459 		}
2460 	}
2461 
2462 	/* Wait until all the pages have been processed */
2463 
2464 	while (recv_sys->n_addrs || buf_get_n_pending_read_ios()) {
2465 		const bool abort = recv_sys->found_corrupt_log
2466 			|| recv_sys->found_corrupt_fs;
2467 
2468 		if (recv_sys->found_corrupt_fs && !srv_force_recovery) {
2469 			ib::info() << "Set innodb_force_recovery=1"
2470 				" to ignore corrupted pages.";
2471 		}
2472 
2473 		mutex_exit(&(recv_sys->mutex));
2474 
2475 		if (abort) {
2476 			return;
2477 		}
2478 
2479 		os_thread_sleep(500000);
2480 
2481 		mutex_enter(&(recv_sys->mutex));
2482 	}
2483 
2484 	if (!last_batch) {
2485 		/* Flush all the file pages to disk and invalidate them in
2486 		the buffer pool */
2487 
2488 		mutex_exit(&(recv_sys->mutex));
2489 		log_mutex_exit();
2490 
2491 		/* Stop the recv_writer thread from issuing any LRU
2492 		flush batches. */
2493 		mutex_enter(&recv_sys->writer_mutex);
2494 
2495 		/* Wait for any currently run batch to end. */
2496 		buf_flush_wait_LRU_batch_end();
2497 
2498 		os_event_reset(recv_sys->flush_end);
2499 		recv_sys->flush_type = BUF_FLUSH_LIST;
2500 		os_event_set(recv_sys->flush_start);
2501 		os_event_wait(recv_sys->flush_end);
2502 
2503 		buf_pool_invalidate();
2504 
2505 		/* Allow batches from recv_writer thread. */
2506 		mutex_exit(&recv_sys->writer_mutex);
2507 
2508 		log_mutex_enter();
2509 		mutex_enter(&(recv_sys->mutex));
2510 		mlog_init.reset();
2511 	} else if (!recv_no_ibuf_operations) {
2512 		/* We skipped this in buf_page_create(). */
2513 		mlog_init.ibuf_merge(mtr);
2514 	}
2515 
2516 	recv_sys->apply_log_recs = FALSE;
2517 	recv_sys->apply_batch_on = FALSE;
2518 
2519 	recv_sys_empty_hash();
2520 
2521 	mutex_exit(&recv_sys->mutex);
2522 }
2523 
2524 /** Tries to parse a single log record.
2525 @param[out]	type		log record type
2526 @param[in]	ptr		pointer to a buffer
2527 @param[in]	end_ptr		end of the buffer
2528 @param[out]	space_id	tablespace identifier
2529 @param[out]	page_no		page number
2530 @param[in]	apply		whether to apply MLOG_FILE_* records
2531 @param[out]	body		start of log record body
2532 @return length of the record, or 0 if the record was not complete */
2533 static
2534 ulint
recv_parse_log_rec(mlog_id_t * type,byte * ptr,byte * end_ptr,ulint * space,ulint * page_no,bool apply,byte ** body)2535 recv_parse_log_rec(
2536 	mlog_id_t*	type,
2537 	byte*		ptr,
2538 	byte*		end_ptr,
2539 	ulint*		space,
2540 	ulint*		page_no,
2541 	bool		apply,
2542 	byte**		body)
2543 {
2544 	byte*	new_ptr;
2545 
2546 	*body = NULL;
2547 
2548 	MEM_UNDEFINED(type, sizeof *type);
2549 	MEM_UNDEFINED(space, sizeof *space);
2550 	MEM_UNDEFINED(page_no, sizeof *page_no);
2551 	MEM_UNDEFINED(body, sizeof *body);
2552 
2553 	if (ptr == end_ptr) {
2554 
2555 		return(0);
2556 	}
2557 
2558 	switch (*ptr) {
2559 #ifdef UNIV_LOG_LSN_DEBUG
2560 	case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2561 	case MLOG_LSN:
2562 		new_ptr = mlog_parse_initial_log_record(
2563 			ptr, end_ptr, type, space, page_no);
2564 		if (new_ptr != NULL) {
2565 			const lsn_t	lsn = static_cast<lsn_t>(
2566 				*space) << 32 | *page_no;
2567 			ut_a(lsn == recv_sys->recovered_lsn);
2568 		}
2569 
2570 		*type = MLOG_LSN;
2571 		return(new_ptr - ptr);
2572 #endif /* UNIV_LOG_LSN_DEBUG */
2573 	case MLOG_MULTI_REC_END:
2574 	case MLOG_DUMMY_RECORD:
2575 		*type = static_cast<mlog_id_t>(*ptr);
2576 		return(1);
2577 	case MLOG_CHECKPOINT:
2578 		if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
2579 			return(0);
2580 		}
2581 		*type = static_cast<mlog_id_t>(*ptr);
2582 		return(SIZE_OF_MLOG_CHECKPOINT);
2583 	case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2584 	case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2585 	case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
2586 		ib::error() << "Incorrect log record type "
2587 			<< ib::hex(unsigned(*ptr));
2588 		recv_sys->found_corrupt_log = true;
2589 		return(0);
2590 	}
2591 
2592 	new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
2593 						page_no);
2594 	*body = new_ptr;
2595 
2596 	if (UNIV_UNLIKELY(!new_ptr)) {
2597 
2598 		return(0);
2599 	}
2600 
2601 	const byte*	old_ptr = new_ptr;
2602 	new_ptr = recv_parse_or_apply_log_rec_body(
2603 		*type, new_ptr, end_ptr, *space, *page_no, apply, NULL, NULL);
2604 
2605 	if (UNIV_UNLIKELY(new_ptr == NULL)) {
2606 		return(0);
2607 	}
2608 
2609 	if (*page_no == 0 && *type == MLOG_4BYTES
2610 	    && apply
2611 	    && mach_read_from_2(old_ptr) == FSP_HEADER_OFFSET + FSP_SIZE) {
2612 		old_ptr += 2;
2613 
2614 		ulint size = mach_parse_compressed(&old_ptr, end_ptr);
2615 
2616 		recv_spaces_t::iterator it = recv_spaces.find(*space);
2617 
2618 		ut_ad(!recv_sys->mlog_checkpoint_lsn
2619 		      || *space == TRX_SYS_SPACE
2620 		      || srv_is_undo_tablespace(*space)
2621 		      || it != recv_spaces.end());
2622 
2623 		if (it != recv_spaces.end() && !it->second.space) {
2624 			it->second.size = size;
2625 		}
2626 
2627 		fil_space_set_recv_size(*space, size);
2628 	}
2629 
2630 	return ulint(new_ptr - ptr);
2631 }
2632 
2633 /*******************************************************//**
2634 Calculates the new value for lsn when more data is added to the log. */
2635 static
2636 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)2637 recv_calc_lsn_on_data_add(
2638 /*======================*/
2639 	lsn_t		lsn,	/*!< in: old lsn */
2640 	ib_uint64_t	len)	/*!< in: this many bytes of data is
2641 				added, log block headers not included */
2642 {
2643 	ulint		frag_len;
2644 	ib_uint64_t	lsn_len;
2645 
2646 	frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
2647 	ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2648 	      - LOG_BLOCK_TRL_SIZE);
2649 	lsn_len = len;
2650 	lsn_len += (lsn_len + frag_len)
2651 		/ (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2652 		   - LOG_BLOCK_TRL_SIZE)
2653 		* (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
2654 
2655 	return(lsn + lsn_len);
2656 }
2657 
2658 /** Prints diagnostic info of corrupt log.
2659 @param[in]	ptr	pointer to corrupt log record
2660 @param[in]	type	type of the log record (could be garbage)
2661 @param[in]	space	tablespace ID (could be garbage)
2662 @param[in]	page_no	page number (could be garbage)
2663 @return whether processing should continue */
2664 ATTRIBUTE_COLD
2665 static
2666 bool
recv_report_corrupt_log(const byte * ptr,int type,ulint space,ulint page_no)2667 recv_report_corrupt_log(
2668 	const byte*	ptr,
2669 	int		type,
2670 	ulint		space,
2671 	ulint		page_no)
2672 {
2673 	ib::error() <<
2674 		"############### CORRUPT LOG RECORD FOUND ##################";
2675 
2676 	const ulint ptr_offset = ulint(ptr - recv_sys->buf);
2677 
2678 	ib::info() << "Log record type " << type << ", page " << space << ":"
2679 		<< page_no << ". Log parsing proceeded successfully up to "
2680 		<< recv_sys->recovered_lsn << ". Previous log record type "
2681 		<< recv_previous_parsed_rec_type
2682 		<< ", is multi "
2683 		<< recv_previous_parsed_rec_is_multi << " Recv offset "
2684 		<< ptr_offset << ", prev "
2685 		<< recv_previous_parsed_rec_offset;
2686 
2687 	ut_ad(ptr <= recv_sys->buf + recv_sys->len);
2688 
2689 	const ulint	limit	= 100;
2690 	const ulint	prev_offset = std::min(recv_previous_parsed_rec_offset,
2691 					       ptr_offset);
2692 	const ulint	before = std::min(prev_offset, limit);
2693 	const ulint	after = std::min(recv_sys->len - ptr_offset, limit);
2694 
2695 	ib::info() << "Hex dump starting " << before << " bytes before and"
2696 		" ending " << after << " bytes after the corrupted record:";
2697 
2698 	const byte* start = recv_sys->buf + prev_offset - before;
2699 
2700 	ut_print_buf(stderr, start, ulint(ptr - start) + after);
2701 	putc('\n', stderr);
2702 
2703 	if (!srv_force_recovery) {
2704 		ib::info() << "Set innodb_force_recovery to ignore this error.";
2705 		return(false);
2706 	}
2707 
2708 	ib::warn() << "The log file may have been corrupt and it is possible"
2709 		" that the log scan did not proceed far enough in recovery!"
2710 		" Please run CHECK TABLE on your InnoDB tables to check"
2711 		" that they are ok! If mysqld crashes after this recovery; "
2712 		<< FORCE_RECOVERY_MSG;
2713 	return(true);
2714 }
2715 
2716 /** Report a MLOG_INDEX_LOAD operation.
2717 @param[in]	space_id	tablespace id
2718 @param[in]	page_no		page number
2719 @param[in]	lsn		log sequence number */
2720 ATTRIBUTE_COLD static void
recv_mlog_index_load(ulint space_id,ulint page_no,lsn_t lsn)2721 recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
2722 {
2723 	recv_spaces_t::iterator it = recv_spaces.find(space_id);
2724 	if (it != recv_spaces.end()) {
2725 		it->second.mlog_index_load(lsn);
2726 	}
2727 
2728 	if (log_optimized_ddl_op) {
2729 		log_optimized_ddl_op(space_id);
2730 	}
2731 }
2732 
2733 /** Check whether read redo log memory exceeds the available memory
2734 of buffer pool. Store last_stored_lsn if it is not in last phase
2735 @param[in]	store		whether to store page operations
2736 @param[in]	available_mem	Available memory in buffer pool to
2737 				read redo logs. */
recv_sys_heap_check(store_t * store,ulint available_mem)2738 static bool recv_sys_heap_check(store_t* store, ulint available_mem)
2739 {
2740   if (*store != STORE_NO
2741       && mem_heap_get_size(recv_sys->heap) >= available_mem)
2742   {
2743     if (*store == STORE_YES)
2744       recv_sys->last_stored_lsn= recv_sys->recovered_lsn;
2745 
2746     *store= STORE_NO;
2747     DBUG_PRINT("ib_log",("Ran out of memory and last "
2748 			 "stored lsn " LSN_PF " last stored offset "
2749 			 ULINTPF "\n",recv_sys->recovered_lsn,
2750 			 recv_sys->recovered_offset));
2751     return true;
2752   }
2753 
2754   return false;
2755 }
2756 
2757 /** Parse log records from a buffer and optionally store them to a
2758 hash table to wait merging to file pages.
2759 @param[in]	checkpoint_lsn		the LSN of the latest checkpoint
2760 @param[in]	store			whether to store page operations
2761 @param[in]	available_mem		memory to read the redo logs
2762 @param[in]	apply			whether to apply the records
2763 @return whether MLOG_CHECKPOINT record was seen the first time,
2764 or corruption was noticed */
recv_parse_log_recs(lsn_t checkpoint_lsn,store_t * store,ulint available_mem,bool apply)2765 bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
2766 			 ulint available_mem, bool apply)
2767 {
2768 	byte*		ptr;
2769 	byte*		end_ptr;
2770 	bool		single_rec;
2771 	ulint		len;
2772 	lsn_t		new_recovered_lsn;
2773 	lsn_t		old_lsn;
2774 	mlog_id_t	type;
2775 	ulint		space;
2776 	ulint		page_no;
2777 	byte*		body;
2778 	const bool	last_phase = (*store == STORE_IF_EXISTS);
2779 
2780 	ut_ad(log_mutex_own());
2781 	ut_ad(mutex_own(&recv_sys->mutex));
2782 	ut_ad(recv_sys->parse_start_lsn != 0);
2783 loop:
2784 	ptr = recv_sys->buf + recv_sys->recovered_offset;
2785 
2786 	end_ptr = recv_sys->buf + recv_sys->len;
2787 
2788 	if (ptr == end_ptr) {
2789 
2790 		return(false);
2791 	}
2792 
2793 	/* Check for memory overflow and ignore the parsing of remaining
2794 	redo log records if InnoDB ran out of memory */
2795 	if (recv_sys_heap_check(store, available_mem) && last_phase) {
2796 		return false;
2797 	}
2798 
2799 	switch (*ptr) {
2800 	case MLOG_CHECKPOINT:
2801 #ifdef UNIV_LOG_LSN_DEBUG
2802 	case MLOG_LSN:
2803 #endif /* UNIV_LOG_LSN_DEBUG */
2804 	case MLOG_DUMMY_RECORD:
2805 		single_rec = true;
2806 		break;
2807 	default:
2808 		single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
2809 	}
2810 
2811 	if (single_rec) {
2812 		/* The mtr did not modify multiple pages */
2813 
2814 		old_lsn = recv_sys->recovered_lsn;
2815 
2816 		/* Try to parse a log record, fetching its type, space id,
2817 		page no, and a pointer to the body of the log record */
2818 
2819 		len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
2820 					 &page_no, apply, &body);
2821 
2822 		if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)) {
2823 			recv_report_corrupt_log(ptr, type, space, page_no);
2824 			return(true);
2825 		}
2826 
2827 		if (UNIV_UNLIKELY(recv_sys->found_corrupt_fs)) {
2828 			return(true);
2829 		}
2830 
2831 		if (len == 0) {
2832 			return(false);
2833 		}
2834 
2835 		new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2836 
2837 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
2838 			/* The log record filled a log block, and we require
2839 			that also the next log block should have been scanned
2840 			in */
2841 
2842 			return(false);
2843 		}
2844 
2845 		recv_previous_parsed_rec_type = type;
2846 		recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
2847 		recv_previous_parsed_rec_is_multi = 0;
2848 
2849 		recv_sys->recovered_offset += len;
2850 		recv_sys->recovered_lsn = new_recovered_lsn;
2851 
2852 		switch (type) {
2853 			lsn_t	lsn;
2854 		case MLOG_DUMMY_RECORD:
2855 			/* Do nothing */
2856 			break;
2857 		case MLOG_CHECKPOINT:
2858 			compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
2859 			lsn = mach_read_from_8(ptr + 1);
2860 
2861 			if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2862 				fprintf(stderr,
2863 					"MLOG_CHECKPOINT(" LSN_PF ") %s at "
2864 					LSN_PF "\n", lsn,
2865 					lsn != checkpoint_lsn ? "ignored"
2866 					: recv_sys->mlog_checkpoint_lsn
2867 					? "reread" : "read",
2868 					recv_sys->recovered_lsn);
2869 			}
2870 
2871 			DBUG_PRINT("ib_log",
2872 				   ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
2873 				    LSN_PF,
2874 				    lsn,
2875 				    lsn != checkpoint_lsn ? "ignored"
2876 				    : recv_sys->mlog_checkpoint_lsn
2877 				    ? "reread" : "read",
2878 				    recv_sys->recovered_lsn));
2879 
2880 			if (lsn == checkpoint_lsn) {
2881 				if (recv_sys->mlog_checkpoint_lsn) {
2882 					/* There can be multiple
2883 					MLOG_CHECKPOINT lsn for the
2884 					same checkpoint. */
2885 					break;
2886 				}
2887 				recv_sys->mlog_checkpoint_lsn
2888 					= recv_sys->recovered_lsn;
2889 				return(true);
2890 			}
2891 			break;
2892 #ifdef UNIV_LOG_LSN_DEBUG
2893 		case MLOG_LSN:
2894 			/* Do not add these records to the hash table.
2895 			The page number and space id fields are misused
2896 			for something else. */
2897 			break;
2898 #endif /* UNIV_LOG_LSN_DEBUG */
2899 		default:
2900 			switch (*store) {
2901 			case STORE_NO:
2902 				break;
2903 			case STORE_IF_EXISTS:
2904 				if (fil_space_get_flags(space)
2905 				    == ULINT_UNDEFINED) {
2906 					break;
2907 				}
2908 				/* fall through */
2909 			case STORE_YES:
2910 				recv_add_to_hash_table(
2911 					type, space, page_no, body,
2912 					ptr + len, old_lsn,
2913 					recv_sys->recovered_lsn);
2914 			}
2915 			/* fall through */
2916 		case MLOG_INDEX_LOAD:
2917 			if (type == MLOG_INDEX_LOAD) {
2918 				recv_mlog_index_load(space, page_no, old_lsn);
2919 			}
2920 			/* fall through */
2921 		case MLOG_FILE_NAME:
2922 		case MLOG_FILE_DELETE:
2923 		case MLOG_FILE_CREATE2:
2924 		case MLOG_FILE_RENAME2:
2925 		case MLOG_TRUNCATE:
2926 			/* These were already handled by
2927 			recv_parse_log_rec() and
2928 			recv_parse_or_apply_log_rec_body(). */
2929 			DBUG_PRINT("ib_log",
2930 				("scan " LSN_PF ": log rec %s"
2931 				" len " ULINTPF
2932 				" page " ULINTPF ":" ULINTPF,
2933 				old_lsn, get_mlog_string(type),
2934 				len, space, page_no));
2935 		}
2936 	} else {
2937 		/* Check that all the records associated with the single mtr
2938 		are included within the buffer */
2939 
2940 		ulint	total_len	= 0;
2941 		ulint	n_recs		= 0;
2942 		bool	only_mlog_file	= true;
2943 		ulint	mlog_rec_len	= 0;
2944 
2945 		for (;;) {
2946 			len = recv_parse_log_rec(
2947 				&type, ptr, end_ptr, &space, &page_no,
2948 				false, &body);
2949 
2950 			if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)) {
2951 corrupted_log:
2952 				recv_report_corrupt_log(
2953 					ptr, type, space, page_no);
2954 				return(true);
2955 			}
2956 
2957 			if (ptr == end_ptr) {
2958 			} else if (type == MLOG_CHECKPOINT
2959 				   || (*ptr & MLOG_SINGLE_REC_FLAG)) {
2960 				recv_sys->found_corrupt_log = true;
2961 				goto corrupted_log;
2962 			}
2963 
2964 			if (recv_sys->found_corrupt_fs) {
2965 				return(true);
2966 			}
2967 
2968 			if (len == 0) {
2969 				return(false);
2970 			}
2971 
2972 			recv_previous_parsed_rec_type = type;
2973 			recv_previous_parsed_rec_offset
2974 				= recv_sys->recovered_offset + total_len;
2975 			recv_previous_parsed_rec_is_multi = 1;
2976 
2977 			/* MLOG_FILE_NAME redo log records doesn't make changes
2978 			to persistent data. If only MLOG_FILE_NAME redo
2979 			log record exists then reset the parsing buffer pointer
2980 			by changing recovered_lsn and recovered_offset. */
2981 			if (type != MLOG_FILE_NAME && only_mlog_file == true) {
2982 				only_mlog_file = false;
2983 			}
2984 
2985 			if (only_mlog_file) {
2986 				new_recovered_lsn = recv_calc_lsn_on_data_add(
2987 					recv_sys->recovered_lsn, len);
2988 				mlog_rec_len += len;
2989 				recv_sys->recovered_offset += len;
2990 				recv_sys->recovered_lsn = new_recovered_lsn;
2991 			}
2992 
2993 			total_len += len;
2994 			n_recs++;
2995 
2996 			ptr += len;
2997 
2998 			if (type == MLOG_MULTI_REC_END) {
2999 				DBUG_PRINT("ib_log",
3000 					   ("scan " LSN_PF
3001 					    ": multi-log end"
3002 					    " total_len " ULINTPF
3003 					    " n=" ULINTPF,
3004 					    recv_sys->recovered_lsn,
3005 					    total_len, n_recs));
3006 				total_len -= mlog_rec_len;
3007 				break;
3008 			}
3009 
3010 			DBUG_PRINT("ib_log",
3011 				   ("scan " LSN_PF ": multi-log rec %s"
3012 				    " len " ULINTPF
3013 				    " page " ULINTPF ":" ULINTPF,
3014 				    recv_sys->recovered_lsn,
3015 				    get_mlog_string(type), len, space, page_no));
3016 		}
3017 
3018 		new_recovered_lsn = recv_calc_lsn_on_data_add(
3019 			recv_sys->recovered_lsn, total_len);
3020 
3021 		if (new_recovered_lsn > recv_sys->scanned_lsn) {
3022 			/* The log record filled a log block, and we require
3023 			that also the next log block should have been scanned
3024 			in */
3025 
3026 			return(false);
3027 		}
3028 
3029 		/* Add all the records to the hash table */
3030 
3031 		ptr = recv_sys->buf + recv_sys->recovered_offset;
3032 
3033 		for (;;) {
3034 			old_lsn = recv_sys->recovered_lsn;
3035 			/* This will apply MLOG_FILE_ records. We
3036 			had to skip them in the first scan, because we
3037 			did not know if the mini-transaction was
3038 			completely recovered (until MLOG_MULTI_REC_END). */
3039 			len = recv_parse_log_rec(
3040 				&type, ptr, end_ptr, &space, &page_no,
3041 				apply, &body);
3042 
3043 			if (UNIV_UNLIKELY(recv_sys->found_corrupt_log)
3044 			    && !recv_report_corrupt_log(
3045 				    ptr, type, space, page_no)) {
3046 				return(true);
3047 			}
3048 
3049 			if (UNIV_UNLIKELY(recv_sys->found_corrupt_fs)) {
3050 				return(true);
3051 			}
3052 
3053 			ut_a(len != 0);
3054 			ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
3055 
3056 			recv_sys->recovered_offset += len;
3057 			recv_sys->recovered_lsn
3058 				= recv_calc_lsn_on_data_add(old_lsn, len);
3059 
3060 			switch (type) {
3061 			case MLOG_MULTI_REC_END:
3062 				/* Found the end mark for the records */
3063 				goto loop;
3064 #ifdef UNIV_LOG_LSN_DEBUG
3065 			case MLOG_LSN:
3066 				/* Do not add these records to the hash table.
3067 				The page number and space id fields are misused
3068 				for something else. */
3069 				break;
3070 #endif /* UNIV_LOG_LSN_DEBUG */
3071 			case MLOG_INDEX_LOAD:
3072 				recv_mlog_index_load(space, page_no, old_lsn);
3073 				break;
3074 			case MLOG_FILE_NAME:
3075 			case MLOG_FILE_DELETE:
3076 			case MLOG_FILE_CREATE2:
3077 			case MLOG_FILE_RENAME2:
3078 			case MLOG_TRUNCATE:
3079 				/* These were already handled by
3080 				recv_parse_log_rec() and
3081 				recv_parse_or_apply_log_rec_body(). */
3082 				break;
3083 			default:
3084 				switch (*store) {
3085 				case STORE_NO:
3086 					break;
3087 				case STORE_IF_EXISTS:
3088 					if (fil_space_get_flags(space)
3089 					    == ULINT_UNDEFINED) {
3090 						break;
3091 					}
3092 					/* fall through */
3093 				case STORE_YES:
3094 					recv_add_to_hash_table(
3095 						type, space, page_no,
3096 						body, ptr + len,
3097 						old_lsn,
3098 						new_recovered_lsn);
3099 				}
3100 			}
3101 
3102 			ptr += len;
3103 		}
3104 	}
3105 
3106 	goto loop;
3107 }
3108 
3109 /** Adds data from a new log block to the parsing buffer of recv_sys if
3110 recv_sys->parse_start_lsn is non-zero.
3111 @param[in]	log_block	log block to add
3112 @param[in]	scanned_lsn	lsn of how far we were able to find
3113 				data in this log block
3114 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)3115 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
3116 {
3117 	ulint	more_len;
3118 	ulint	data_len;
3119 	ulint	start_offset;
3120 	ulint	end_offset;
3121 
3122 	ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
3123 
3124 	if (!recv_sys->parse_start_lsn) {
3125 		/* Cannot start parsing yet because no start point for
3126 		it found */
3127 		return(false);
3128 	}
3129 
3130 	data_len = log_block_get_data_len(log_block);
3131 
3132 	if (recv_sys->parse_start_lsn >= scanned_lsn) {
3133 
3134 		return(false);
3135 
3136 	} else if (recv_sys->scanned_lsn >= scanned_lsn) {
3137 
3138 		return(false);
3139 
3140 	} else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
3141 		more_len = (ulint) (scanned_lsn - recv_sys->parse_start_lsn);
3142 	} else {
3143 		more_len = (ulint) (scanned_lsn - recv_sys->scanned_lsn);
3144 	}
3145 
3146 	if (more_len == 0) {
3147 		return(false);
3148 	}
3149 
3150 	ut_ad(data_len >= more_len);
3151 
3152 	start_offset = data_len - more_len;
3153 
3154 	if (start_offset < LOG_BLOCK_HDR_SIZE) {
3155 		start_offset = LOG_BLOCK_HDR_SIZE;
3156 	}
3157 
3158 	end_offset = data_len;
3159 
3160 	if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
3161 		end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
3162 	}
3163 
3164 	ut_ad(start_offset <= end_offset);
3165 
3166 	if (start_offset < end_offset) {
3167 		ut_memcpy(recv_sys->buf + recv_sys->len,
3168 			  log_block + start_offset, end_offset - start_offset);
3169 
3170 		recv_sys->len += end_offset - start_offset;
3171 
3172 		ut_a(recv_sys->len <= RECV_PARSING_BUF_SIZE);
3173 	}
3174 
3175 	return(true);
3176 }
3177 
3178 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()3179 void recv_sys_justify_left_parsing_buf()
3180 {
3181 	memmove(recv_sys->buf,
3182 		recv_sys->buf + recv_sys->recovered_offset,
3183 		recv_sys->len - recv_sys->recovered_offset);
3184 
3185 	recv_sys->len -= recv_sys->recovered_offset;
3186 
3187 	recv_sys->recovered_offset = 0;
3188 }
3189 
3190 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
3191 Parse and hash the log records if new data found.
3192 Apply log records automatically when the hash table becomes full.
3193 @param[in]	available_mem		we let the hash table of recs to
3194 					grow to this size, at the maximum
3195 @param[in,out]	store_to_hash		whether the records should be
3196 					stored to the hash table; this is
3197 					reset if just debug checking is
3198 					needed, or when the available_mem
3199 					runs out
3200 @param[in]	log_block		log segment
3201 @param[in]	checkpoint_lsn		latest checkpoint LSN
3202 @param[in]	start_lsn		buffer start LSN
3203 @param[in]	end_lsn			buffer end LSN
3204 @param[in,out]	contiguous_lsn		it is known that all groups contain
3205 					contiguous log data upto this lsn
3206 @param[out]	group_scanned_lsn	scanning succeeded upto this lsn
3207 @return true if not able to scan any more in this log group */
recv_scan_log_recs(ulint available_mem,store_t * store_to_hash,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)3208 static bool recv_scan_log_recs(
3209 	ulint		available_mem,
3210 	store_t*	store_to_hash,
3211 	const byte*	log_block,
3212 	lsn_t		checkpoint_lsn,
3213 	lsn_t		start_lsn,
3214 	lsn_t		end_lsn,
3215 	lsn_t*		contiguous_lsn,
3216 	lsn_t*		group_scanned_lsn)
3217 {
3218 	lsn_t		scanned_lsn	= start_lsn;
3219 	bool		finished	= false;
3220 	ulint		data_len;
3221 	bool		more_data	= false;
3222 	bool		apply		= recv_sys->mlog_checkpoint_lsn != 0;
3223 	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
3224 	const bool	last_phase = (*store_to_hash == STORE_IF_EXISTS);
3225 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3226 	ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
3227 	ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
3228 
3229 	const byte* const	log_end = log_block
3230 		+ ulint(end_lsn - start_lsn);
3231 	do {
3232 		ut_ad(!finished);
3233 
3234 		if (log_block_get_flush_bit(log_block)) {
3235 			/* This block was a start of a log flush operation:
3236 			we know that the previous flush operation must have
3237 			been completed for all log groups before this block
3238 			can have been flushed to any of the groups. Therefore,
3239 			we know that log data is contiguous up to scanned_lsn
3240 			in all non-corrupt log groups. */
3241 
3242 			if (scanned_lsn > *contiguous_lsn) {
3243 				*contiguous_lsn = scanned_lsn;
3244 			}
3245 		}
3246 
3247 		data_len = log_block_get_data_len(log_block);
3248 
3249 		if (scanned_lsn + data_len > recv_sys->scanned_lsn
3250 		    && log_block_get_checkpoint_no(log_block)
3251 		    < recv_sys->scanned_checkpoint_no
3252 		    && (recv_sys->scanned_checkpoint_no
3253 			- log_block_get_checkpoint_no(log_block)
3254 			> 0x80000000UL)) {
3255 
3256 			/* Garbage from a log buffer flush which was made
3257 			before the most recent database recovery */
3258 			finished = true;
3259 			break;
3260 		}
3261 
3262 		if (!recv_sys->parse_start_lsn
3263 		    && (log_block_get_first_rec_group(log_block) > 0)) {
3264 
3265 			/* We found a point from which to start the parsing
3266 			of log records */
3267 
3268 			recv_sys->parse_start_lsn = scanned_lsn
3269 				+ log_block_get_first_rec_group(log_block);
3270 			recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
3271 			recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
3272 		}
3273 
3274 		scanned_lsn += data_len;
3275 
3276 		if (data_len == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
3277 		    && scanned_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3278 		    && log_block[LOG_BLOCK_HDR_SIZE] == MLOG_CHECKPOINT
3279 		    && checkpoint_lsn == mach_read_from_8(LOG_BLOCK_HDR_SIZE
3280 							  + 1 + log_block)) {
3281 			/* The redo log is logically empty. */
3282 			ut_ad(recv_sys->mlog_checkpoint_lsn == 0
3283 			      || recv_sys->mlog_checkpoint_lsn
3284 			      == checkpoint_lsn);
3285 			recv_sys->mlog_checkpoint_lsn = checkpoint_lsn;
3286 			DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3287 					      scanned_lsn));
3288 			finished = true;
3289 			break;
3290 		}
3291 
3292 		if (scanned_lsn > recv_sys->scanned_lsn) {
3293 			ut_ad(!srv_log_files_created);
3294 			if (!recv_needed_recovery) {
3295 				recv_needed_recovery = true;
3296 
3297 				if (srv_read_only_mode) {
3298 					ib::warn() << "innodb_read_only"
3299 						" prevents crash recovery";
3300 					return(true);
3301 				}
3302 
3303 				ib::info() << "Starting crash recovery from"
3304 					" checkpoint LSN="
3305 					<< recv_sys->scanned_lsn;
3306 			}
3307 
3308 			/* We were able to find more log data: add it to the
3309 			parsing buffer if parse_start_lsn is already
3310 			non-zero */
3311 
3312 			DBUG_EXECUTE_IF(
3313 				"reduce_recv_parsing_buf",
3314 				recv_parsing_buf_size
3315 					= (70 * 1024);
3316 				);
3317 
3318 			if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE
3319 			    >= recv_parsing_buf_size) {
3320 				ib::error() << "Log parsing buffer overflow."
3321 					" Recovery may have failed!";
3322 
3323 				recv_sys->found_corrupt_log = true;
3324 
3325 				if (!srv_force_recovery) {
3326 					ib::error()
3327 						<< "Set innodb_force_recovery"
3328 						" to ignore this error.";
3329 					return(true);
3330 				}
3331 			} else if (!recv_sys->found_corrupt_log) {
3332 				more_data = recv_sys_add_to_parsing_buf(
3333 					log_block, scanned_lsn);
3334 			}
3335 
3336 			recv_sys->scanned_lsn = scanned_lsn;
3337 			recv_sys->scanned_checkpoint_no
3338 				= log_block_get_checkpoint_no(log_block);
3339 		}
3340 
3341 		/* During last phase of scanning, there can be redo logs
3342 		left in recv_sys->buf to parse & store it in recv_sys->heap */
3343 		if (last_phase
3344 		    && recv_sys->recovered_lsn < recv_sys->scanned_lsn) {
3345 			more_data = true;
3346 		}
3347 
3348 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3349 			/* Log data for this group ends here */
3350 			finished = true;
3351 			break;
3352 		} else {
3353 			log_block += OS_FILE_LOG_BLOCK_SIZE;
3354 		}
3355 	} while (log_block < log_end);
3356 
3357 	*group_scanned_lsn = scanned_lsn;
3358 
3359 	mutex_enter(&recv_sys->mutex);
3360 
3361 	if (more_data && !recv_sys->found_corrupt_log) {
3362 		/* Try to parse more log records */
3363 
3364 		if (recv_parse_log_recs(checkpoint_lsn,
3365 					store_to_hash, available_mem,
3366 					apply)) {
3367 			ut_ad(recv_sys->found_corrupt_log
3368 			      || recv_sys->found_corrupt_fs
3369 			      || recv_sys->mlog_checkpoint_lsn
3370 			      == recv_sys->recovered_lsn);
3371 			finished = true;
3372 			goto func_exit;
3373 		}
3374 
3375 		recv_sys_heap_check(store_to_hash, available_mem);
3376 
3377 		if (recv_sys->recovered_offset > recv_parsing_buf_size / 4) {
3378 			/* Move parsing buffer data to the buffer start */
3379 			recv_sys_justify_left_parsing_buf();
3380 		}
3381 
3382 		/* Need to re-parse the redo log which're stored
3383 		in recv_sys->buf */
3384 		if (last_phase && *store_to_hash == STORE_NO) {
3385 			finished = false;
3386 		}
3387 	}
3388 
3389 func_exit:
3390 	mutex_exit(&recv_sys->mutex);
3391 	return(finished);
3392 }
3393 
3394 /** Scans log from a buffer and stores new log data to the parsing buffer.
3395 Parses and hashes the log records if new data found.
3396 @param[in]	checkpoint_lsn		latest checkpoint log sequence number
3397 @param[in,out]	contiguous_lsn		log sequence number
3398 until which all redo log has been scanned
3399 @param[in]	last_phase		whether changes
3400 can be applied to the tablespaces
3401 @return whether rescan is needed (not everything was stored) */
3402 static
3403 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3404 recv_group_scan_log_recs(
3405 	lsn_t		checkpoint_lsn,
3406 	lsn_t*		contiguous_lsn,
3407 	bool		last_phase)
3408 {
3409 	DBUG_ENTER("recv_group_scan_log_recs");
3410 	DBUG_ASSERT(!last_phase || recv_sys->mlog_checkpoint_lsn > 0);
3411 
3412 	mutex_enter(&recv_sys->mutex);
3413 	recv_sys->len = 0;
3414 	recv_sys->recovered_offset = 0;
3415 	recv_sys->n_addrs = 0;
3416 	recv_sys_empty_hash();
3417 	srv_start_lsn = *contiguous_lsn;
3418 	recv_sys->parse_start_lsn = *contiguous_lsn;
3419 	recv_sys->scanned_lsn = *contiguous_lsn;
3420 	recv_sys->recovered_lsn = *contiguous_lsn;
3421 	recv_sys->scanned_checkpoint_no = 0;
3422 	recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
3423 	recv_previous_parsed_rec_offset	= 0;
3424 	recv_previous_parsed_rec_is_multi = 0;
3425 	ut_ad(recv_max_page_lsn == 0);
3426 	ut_ad(last_phase || !recv_writer_thread_active);
3427 	mutex_exit(&recv_sys->mutex);
3428 
3429 	lsn_t	start_lsn;
3430 	lsn_t	end_lsn;
3431 	store_t	store_to_hash	= recv_sys->mlog_checkpoint_lsn == 0
3432 		? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3433 	ulint	available_mem = (buf_pool_get_n_pages() * 2 / 3)
3434 		<< srv_page_size_shift;
3435 
3436 	log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3437 		ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3438 
3439 	do {
3440 		if (last_phase && store_to_hash == STORE_NO) {
3441 			store_to_hash = STORE_IF_EXISTS;
3442 			/* We must not allow change buffer
3443 			merge here, because it would generate
3444 			redo log records before we have
3445 			finished the redo log scan. */
3446 			recv_apply_hashed_log_recs(false);
3447 			/* Rescan the redo logs from last stored lsn */
3448 			end_lsn = recv_sys->recovered_lsn;
3449 		}
3450 
3451 		start_lsn = ut_uint64_align_down(end_lsn,
3452 						 OS_FILE_LOG_BLOCK_SIZE);
3453 		end_lsn = start_lsn;
3454 		log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3455 	} while (end_lsn != start_lsn
3456 		 && !recv_scan_log_recs(
3457 			 available_mem, &store_to_hash, log_sys.buf,
3458 			 checkpoint_lsn,
3459 			 start_lsn, end_lsn,
3460 			 contiguous_lsn, &log_sys.log.scanned_lsn));
3461 
3462 	if (recv_sys->found_corrupt_log || recv_sys->found_corrupt_fs) {
3463 		DBUG_RETURN(false);
3464 	}
3465 
3466 	DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3467 			      last_phase ? "rescan" : "scan",
3468 			      log_sys.log.scanned_lsn));
3469 
3470 	DBUG_RETURN(store_to_hash == STORE_NO);
3471 }
3472 
3473 /** Report a missing tablespace for which page-redo log exists.
3474 @param[in]	err	previous error code
3475 @param[in]	i	tablespace descriptor
3476 @return new error code */
3477 static
3478 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3479 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3480 {
3481 	if (is_mariabackup_restore_or_export()) {
3482 		ib::warn() << "Tablespace " << i->first << " was not"
3483 			" found at " << i->second.name << " when"
3484 			" restoring a (partial?) backup. All redo log"
3485 			" for this file will be ignored!";
3486 		return(err);
3487 	}
3488 
3489 	if (srv_force_recovery == 0) {
3490 		ib::error() << "Tablespace " << i->first << " was not"
3491 			" found at " << i->second.name << ".";
3492 
3493 		if (err == DB_SUCCESS) {
3494 			ib::error() << "Set innodb_force_recovery=1 to"
3495 				" ignore this and to permanently lose"
3496 				" all changes to the tablespace.";
3497 			err = DB_TABLESPACE_NOT_FOUND;
3498 		}
3499 	} else {
3500 		ib::warn() << "Tablespace " << i->first << " was not"
3501 			" found at " << i->second.name << ", and"
3502 			" innodb_force_recovery was set. All redo log"
3503 			" for this tablespace will be ignored!";
3504 	}
3505 
3506 	return(err);
3507 }
3508 
3509 /** Report the missing tablespace and discard the redo logs for the deleted
3510 tablespace.
3511 @param[in]	rescan			rescan of redo logs is needed
3512 					if hash table ran out of memory
3513 @param[out]	missing_tablespace	missing tablespace exists or not
3514 @return error code or DB_SUCCESS. */
3515 static MY_ATTRIBUTE((warn_unused_result))
3516 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3517 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3518 {
3519 	dberr_t err = DB_SUCCESS;
3520 
3521 	for (ulint h = 0; h < hash_get_n_cells(recv_sys->addr_hash); h++) {
3522 		for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
3523 			     HASH_GET_FIRST(recv_sys->addr_hash, h));
3524 		     recv_addr != 0;
3525 		     recv_addr = static_cast<recv_addr_t*>(
3526 			     HASH_GET_NEXT(addr_hash, recv_addr))) {
3527 
3528 			const ulint space = recv_addr->space;
3529 
3530 			if (is_predefined_tablespace(space)) {
3531 				continue;
3532 			}
3533 
3534 			recv_spaces_t::iterator i = recv_spaces.find(space);
3535 			ut_ad(i != recv_spaces.end());
3536 
3537 			switch (i->second.status) {
3538 			case file_name_t::MISSING:
3539 				err = recv_init_missing_space(err, i);
3540 				i->second.status = file_name_t::DELETED;
3541 				/* fall through */
3542 			case file_name_t::DELETED:
3543 				recv_addr->state = RECV_DISCARDED;
3544 				/* fall through */
3545 			case file_name_t::NORMAL:
3546 				continue;
3547 			}
3548 			ut_ad(0);
3549 		}
3550 	}
3551 
3552 	if (err != DB_SUCCESS) {
3553 		return(err);
3554 	}
3555 
3556 	/* When rescan is not needed then recv_sys->addr_hash will have
3557 	all space id belongs to redo log. If rescan is needed and
3558 	innodb_force_recovery > 0 then InnoDB can ignore missing tablespace. */
3559 	for (recv_spaces_t::iterator i = recv_spaces.begin();
3560 	     i != recv_spaces.end(); i++) {
3561 
3562 		if (UNIV_LIKELY(i->second.status != file_name_t::MISSING)) {
3563 			continue;
3564 		}
3565 
3566 		missing_tablespace = true;
3567 
3568 		if (srv_force_recovery > 0) {
3569 			ib::warn() << "Tablespace " << i->first
3570 				<<" was not found at " << i->second.name
3571 				<<", and innodb_force_recovery was set."
3572 				<<" All redo log for this tablespace"
3573 				<<" will be ignored!";
3574 			continue;
3575 		}
3576 
3577 		if (!rescan) {
3578 			ib::info() << "Tablespace " << i->first
3579 				<< " was not found at '"
3580 				<< i->second.name << "', but there"
3581 				<<" were no modifications either.";
3582 		}
3583 	}
3584 
3585 	if (!rescan || srv_force_recovery > 0) {
3586 		missing_tablespace = false;
3587 	}
3588 
3589 	return DB_SUCCESS;
3590 }
3591 
3592 /** Check if all tablespaces were found for crash recovery.
3593 @param[in]	rescan			rescan of redo logs is needed
3594 @param[out]	missing_tablespace	missing table exists
3595 @return error code or DB_SUCCESS */
3596 static MY_ATTRIBUTE((warn_unused_result))
3597 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3598 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3599 {
3600 	bool		flag_deleted	= false;
3601 
3602 	ut_ad(!srv_read_only_mode);
3603 	ut_ad(recv_needed_recovery);
3604 
3605 	for (recv_spaces_t::iterator i = recv_spaces.begin();
3606 	     i != recv_spaces.end(); i++) {
3607 		ut_ad(!is_predefined_tablespace(i->first));
3608 		ut_ad(i->second.status != file_name_t::DELETED || !i->second.space);
3609 
3610 		if (i->second.status == file_name_t::DELETED) {
3611 			/* The tablespace was deleted,
3612 			so we can ignore any redo log for it. */
3613 			flag_deleted = true;
3614 		} else if (i->second.space != NULL) {
3615 			/* The tablespace was found, and there
3616 			are some redo log records for it. */
3617 			fil_names_dirty(i->second.space);
3618 			i->second.space->enable_lsn = i->second.enable_lsn;
3619 		} else if (i->second.name == "") {
3620 			ib::error() << "Missing MLOG_FILE_NAME"
3621 				" or MLOG_FILE_DELETE"
3622 				" before MLOG_CHECKPOINT for tablespace "
3623 				<< i->first;
3624 			recv_sys->found_corrupt_log = true;
3625 			return(DB_CORRUPTION);
3626 		} else {
3627 			i->second.status = file_name_t::MISSING;
3628 			flag_deleted = true;
3629 		}
3630 
3631 		ut_ad(i->second.status == file_name_t::DELETED || i->second.name != "");
3632 	}
3633 
3634 	if (flag_deleted) {
3635 		return recv_validate_tablespace(rescan, missing_tablespace);
3636 	}
3637 
3638 	return DB_SUCCESS;
3639 }
3640 
3641 /** Start recovering from a redo log checkpoint.
3642 @see recv_recovery_from_checkpoint_finish
3643 @param[in]	flush_lsn	FIL_PAGE_FILE_FLUSH_LSN
3644 of first system tablespace page
3645 @return error code or DB_SUCCESS */
3646 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3647 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3648 {
3649 	ulint		max_cp_field;
3650 	lsn_t		checkpoint_lsn;
3651 	bool		rescan;
3652 	ib_uint64_t	checkpoint_no;
3653 	lsn_t		contiguous_lsn;
3654 	byte*		buf;
3655 	dberr_t		err = DB_SUCCESS;
3656 
3657 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
3658 	      || is_mariabackup_restore_or_export());
3659 
3660 	/* Initialize red-black tree for fast insertions into the
3661 	flush_list during recovery process. */
3662 	buf_flush_init_flush_rbt();
3663 
3664 	if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3665 
3666 		ib::info() << "innodb_force_recovery=6 skips redo log apply";
3667 
3668 		return(DB_SUCCESS);
3669 	}
3670 
3671 	recv_recovery_on = true;
3672 
3673 	log_mutex_enter();
3674 
3675 	err = recv_find_max_checkpoint(&max_cp_field);
3676 
3677 	if (err != DB_SUCCESS) {
3678 
3679 		srv_start_lsn = recv_sys->recovered_lsn = log_sys.lsn;
3680 		log_mutex_exit();
3681 		return(err);
3682 	}
3683 
3684 	log_header_read(max_cp_field);
3685 
3686 	buf = log_sys.checkpoint_buf;
3687 
3688 	checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3689 	checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3690 
3691 	/* Start reading the log from the checkpoint lsn. The variable
3692 	contiguous_lsn contains an lsn up to which the log is known to
3693 	be contiguously written. */
3694 	recv_sys->mlog_checkpoint_lsn = 0;
3695 
3696 	ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3697 
3698 	const lsn_t	end_lsn = mach_read_from_8(
3699 		buf + LOG_CHECKPOINT_END_LSN);
3700 
3701 	ut_ad(recv_sys->n_addrs == 0);
3702 	contiguous_lsn = checkpoint_lsn;
3703 	switch (log_sys.log.format) {
3704 	case 0:
3705 		log_mutex_exit();
3706 		return recv_log_format_0_recover(checkpoint_lsn,
3707 						 buf[20 + 32 * 9] == 2);
3708 	default:
3709 		if (end_lsn == 0) {
3710 			break;
3711 		}
3712 		if (end_lsn >= checkpoint_lsn) {
3713 			contiguous_lsn = end_lsn;
3714 			break;
3715 		}
3716 		recv_sys->found_corrupt_log = true;
3717 		log_mutex_exit();
3718 		return(DB_ERROR);
3719 	}
3720 
3721 	/* Look for MLOG_CHECKPOINT. */
3722 	recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3723 	/* The first scan should not have stored or applied any records. */
3724 	ut_ad(recv_sys->n_addrs == 0);
3725 	ut_ad(!recv_sys->found_corrupt_fs);
3726 
3727 	if (srv_read_only_mode && recv_needed_recovery) {
3728 		log_mutex_exit();
3729 		return(DB_READ_ONLY);
3730 	}
3731 
3732 	if (recv_sys->found_corrupt_log && !srv_force_recovery) {
3733 		log_mutex_exit();
3734 		ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3735 		return(DB_ERROR);
3736 	}
3737 
3738 	if (recv_sys->mlog_checkpoint_lsn == 0) {
3739 		lsn_t scan_lsn = log_sys.log.scanned_lsn;
3740 		if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3741 			log_mutex_exit();
3742 			ib::error err;
3743 			err << "Missing MLOG_CHECKPOINT";
3744 			if (end_lsn) {
3745 				err << " at " << end_lsn;
3746 			}
3747 			err << " between the checkpoint " << checkpoint_lsn
3748 			    << " and the end " << scan_lsn << ".";
3749 			return(DB_ERROR);
3750 		}
3751 
3752 		log_sys.log.scanned_lsn = checkpoint_lsn;
3753 		rescan = false;
3754 	} else {
3755 		contiguous_lsn = checkpoint_lsn;
3756 		rescan = recv_group_scan_log_recs(
3757 			checkpoint_lsn, &contiguous_lsn, false);
3758 
3759 		if ((recv_sys->found_corrupt_log && !srv_force_recovery)
3760 		    || recv_sys->found_corrupt_fs) {
3761 			log_mutex_exit();
3762 			return(DB_ERROR);
3763 		}
3764 	}
3765 
3766 	/* NOTE: we always do a 'recovery' at startup, but only if
3767 	there is something wrong we will print a message to the
3768 	user about recovery: */
3769 
3770 	if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3771 	    && recv_sys->mlog_checkpoint_lsn == checkpoint_lsn) {
3772 		/* The redo log is logically empty. */
3773 	} else if (checkpoint_lsn != flush_lsn) {
3774 		ut_ad(!srv_log_files_created);
3775 
3776 		if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
3777 			ib::warn() << "Are you sure you are using the"
3778 				" right ib_logfiles to start up the database?"
3779 				" Log sequence number in the ib_logfiles is "
3780 				<< checkpoint_lsn << ", less than the"
3781 				" log sequence number in the first system"
3782 				" tablespace file header, " << flush_lsn << ".";
3783 		}
3784 
3785 		if (!recv_needed_recovery) {
3786 
3787 			ib::info() << "The log sequence number " << flush_lsn
3788 				<< " in the system tablespace does not match"
3789 				" the log sequence number " << checkpoint_lsn
3790 				<< " in the ib_logfiles!";
3791 
3792 			if (srv_read_only_mode) {
3793 				ib::error() << "innodb_read_only"
3794 					" prevents crash recovery";
3795 				log_mutex_exit();
3796 				return(DB_READ_ONLY);
3797 			}
3798 
3799 			recv_needed_recovery = true;
3800 		}
3801 	}
3802 
3803 	log_sys.lsn = recv_sys->recovered_lsn;
3804 
3805 	if (recv_needed_recovery) {
3806 		bool missing_tablespace = false;
3807 
3808 		err = recv_init_crash_recovery_spaces(
3809 			rescan, missing_tablespace);
3810 
3811 		if (err != DB_SUCCESS) {
3812 			log_mutex_exit();
3813 			return(err);
3814 		}
3815 
3816 		/* If there is any missing tablespace and rescan is needed
3817 		then there is a possiblity that hash table will not contain
3818 		all space ids redo logs. Rescan the remaining unstored
3819 		redo logs for the validation of missing tablespace. */
3820 		ut_ad(rescan || !missing_tablespace);
3821 
3822 		while (missing_tablespace) {
3823 			DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3824 					      "the missing tablespace. Scan "
3825 					      "from last stored LSN " LSN_PF,
3826 					      recv_sys->last_stored_lsn));
3827 
3828 			lsn_t recent_stored_lsn = recv_sys->last_stored_lsn;
3829 			rescan = recv_group_scan_log_recs(
3830 				checkpoint_lsn, &recent_stored_lsn, false);
3831 
3832 			ut_ad(!recv_sys->found_corrupt_fs);
3833 
3834 			missing_tablespace = false;
3835 
3836 			err = recv_sys->found_corrupt_log
3837 				? DB_ERROR
3838 				: recv_validate_tablespace(
3839 					rescan, missing_tablespace);
3840 
3841 			if (err != DB_SUCCESS) {
3842 				log_mutex_exit();
3843 				return err;
3844 			}
3845 
3846 			rescan = true;
3847 		}
3848 
3849 		recv_sys->parse_start_lsn = checkpoint_lsn;
3850 
3851 		if (srv_operation == SRV_OPERATION_NORMAL) {
3852 			buf_dblwr_process();
3853 		}
3854 
3855 		ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3856 
3857 		/* Spawn the background thread to flush dirty pages
3858 		from the buffer pools. */
3859 		recv_writer_thread_active = true;
3860 		os_thread_create(recv_writer_thread, 0, 0);
3861 
3862 		if (rescan) {
3863 			contiguous_lsn = checkpoint_lsn;
3864 
3865 			recv_group_scan_log_recs(
3866 				checkpoint_lsn, &contiguous_lsn, true);
3867 
3868 			if ((recv_sys->found_corrupt_log
3869 			     && !srv_force_recovery)
3870 			    || recv_sys->found_corrupt_fs) {
3871 				log_mutex_exit();
3872 				return(DB_ERROR);
3873 			}
3874 		}
3875 	} else {
3876 		ut_ad(!rescan || recv_sys->n_addrs == 0);
3877 	}
3878 
3879 	if (log_sys.log.scanned_lsn < checkpoint_lsn
3880 	    || log_sys.log.scanned_lsn < recv_max_page_lsn) {
3881 
3882 		ib::error() << "We scanned the log up to "
3883 			<< log_sys.log.scanned_lsn
3884 			<< ". A checkpoint was at " << checkpoint_lsn << " and"
3885 			" the maximum LSN on a database page was "
3886 			<< recv_max_page_lsn << ". It is possible that the"
3887 			" database is now corrupt!";
3888 	}
3889 
3890 	if (recv_sys->recovered_lsn < checkpoint_lsn) {
3891 		log_mutex_exit();
3892 
3893 		ib::error() << "Recovered only to lsn:"
3894 			    << recv_sys->recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
3895 
3896 		return(DB_ERROR);
3897 	}
3898 
3899 	log_sys.next_checkpoint_lsn = checkpoint_lsn;
3900 	log_sys.next_checkpoint_no = checkpoint_no + 1;
3901 
3902 	recv_synchronize_groups();
3903 
3904 	if (!recv_needed_recovery) {
3905 		ut_a(checkpoint_lsn == recv_sys->recovered_lsn);
3906 	} else {
3907 		srv_start_lsn = recv_sys->recovered_lsn;
3908 	}
3909 
3910 	log_sys.buf_free = ulong(log_sys.lsn % OS_FILE_LOG_BLOCK_SIZE);
3911 	log_sys.buf_next_to_write = log_sys.buf_free;
3912 	log_sys.write_lsn = log_sys.lsn;
3913 
3914 	log_sys.last_checkpoint_lsn = checkpoint_lsn;
3915 
3916 	if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) {
3917 		/* Write a MLOG_CHECKPOINT marker as the first thing,
3918 		before generating any other redo log. This ensures
3919 		that subsequent crash recovery will be possible even
3920 		if the server were killed soon after this. */
3921 		fil_names_clear(log_sys.last_checkpoint_lsn, true);
3922 	}
3923 
3924 	MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3925 		    log_sys.lsn - log_sys.last_checkpoint_lsn);
3926 
3927 	log_sys.next_checkpoint_no = ++checkpoint_no;
3928 
3929 	mutex_enter(&recv_sys->mutex);
3930 
3931 	recv_sys->apply_log_recs = TRUE;
3932 	recv_no_ibuf_operations = is_mariabackup_restore_or_export();
3933 	ut_d(recv_no_log_write = recv_no_ibuf_operations);
3934 
3935 	mutex_exit(&recv_sys->mutex);
3936 
3937 	log_mutex_exit();
3938 
3939 	recv_lsn_checks_on = true;
3940 
3941 	/* The database is now ready to start almost normal processing of user
3942 	transactions: transaction rollbacks and the application of the log
3943 	records in the hash table can be run in background. */
3944 
3945 	return(DB_SUCCESS);
3946 }
3947 
3948 /** Complete recovery from a checkpoint. */
3949 void
recv_recovery_from_checkpoint_finish(void)3950 recv_recovery_from_checkpoint_finish(void)
3951 {
3952 	/* Make sure that the recv_writer thread is done. This is
3953 	required because it grabs various mutexes and we want to
3954 	ensure that when we enable sync_order_checks there is no
3955 	mutex currently held by any thread. */
3956 	mutex_enter(&recv_sys->writer_mutex);
3957 
3958 	/* Free the resources of the recovery system */
3959 	recv_recovery_on = false;
3960 
3961 	/* By acquring the mutex we ensure that the recv_writer thread
3962 	won't trigger any more LRU batches. Now wait for currently
3963 	in progress batches to finish. */
3964 	buf_flush_wait_LRU_batch_end();
3965 
3966 	mutex_exit(&recv_sys->writer_mutex);
3967 
3968 	ulint count = 0;
3969 	while (recv_writer_thread_active) {
3970 		++count;
3971 		os_thread_sleep(100000);
3972 		if (srv_print_verbose_log && count > 600) {
3973 			ib::info() << "Waiting for recv_writer to"
3974 				" finish flushing of buffer pool";
3975 			count = 0;
3976 		}
3977 	}
3978 
3979 	recv_sys_debug_free();
3980 
3981 	/* Free up the flush_rbt. */
3982 	buf_flush_free_flush_rbt();
3983 }
3984 
3985 /********************************************************//**
3986 Initiates the rollback of active transactions. */
3987 void
recv_recovery_rollback_active(void)3988 recv_recovery_rollback_active(void)
3989 /*===============================*/
3990 {
3991 	ut_ad(!recv_writer_thread_active);
3992 
3993 	/* Switch latching order checks on in sync0debug.cc, if
3994 	--innodb-sync-debug=true (default) */
3995 	ut_d(sync_check_enable());
3996 
3997 	/* We can't start any (DDL) transactions if UNDO logging
3998 	has been disabled, additionally disable ROLLBACK of recovered
3999 	user transactions. */
4000 	if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
4001 	    && !srv_read_only_mode) {
4002 
4003 		/* Drop partially created indexes. */
4004 		row_merge_drop_temp_indexes();
4005 		/* Drop garbage tables. */
4006 		row_mysql_drop_garbage_tables();
4007 
4008 		/* Drop any auxiliary tables that were not dropped when the
4009 		parent table was dropped. This can happen if the parent table
4010 		was dropped but the server crashed before the auxiliary tables
4011 		were dropped. */
4012 		fts_drop_orphaned_tables();
4013 
4014 		/* Rollback the uncommitted transactions which have no user
4015 		session */
4016 
4017 		trx_rollback_is_active = true;
4018 		os_thread_create(trx_rollback_all_recovered, 0, 0);
4019 	}
4020 }
4021 
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)4022 bool recv_dblwr_t::validate_page(const page_id_t page_id,
4023                                  const byte *page,
4024                                  const fil_space_t *space,
4025                                  byte *tmp_buf)
4026 {
4027   if (page_id.page_no() == 0)
4028   {
4029     ulint flags= fsp_header_get_flags(page);
4030     if (!fsp_flags_is_valid(flags, page_id.space()))
4031     {
4032       ulint cflags= fsp_flags_convert_from_101(flags);
4033       if (cflags == ULINT_UNDEFINED)
4034       {
4035         ib::warn() << "Ignoring a doublewrite copy of page " << page_id
4036                    << "due to invalid flags " << ib::hex(flags);
4037         return false;
4038       }
4039 
4040       flags= cflags;
4041     }
4042 
4043     /* Page 0 is never page_compressed or encrypted. */
4044     return !buf_page_is_corrupted(true, page, page_size_t(flags));
4045   }
4046 
4047   ut_ad(tmp_buf);
4048   byte *tmp_frame= tmp_buf;
4049   byte *tmp_page= tmp_buf + srv_page_size;
4050   const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
4051   const page_size_t page_size(space->flags);
4052   const bool expect_encrypted= space->crypt_data &&
4053     space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
4054 
4055   if (expect_encrypted &&
4056       mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
4057   {
4058     if (!fil_space_verify_crypt_checksum(page, page_size))
4059       return false;
4060     if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
4061       return true;
4062     if (page_size.is_compressed())
4063       return false;
4064     memcpy(tmp_page, page, page_size.physical());
4065     if (!fil_space_decrypt(space, tmp_frame, tmp_page))
4066       return false;
4067   }
4068 
4069   switch (page_type) {
4070   case FIL_PAGE_PAGE_COMPRESSED:
4071     memcpy(tmp_page, page, page_size.physical());
4072     /* fall through */
4073   case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
4074     if (page_size.is_compressed())
4075       return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
4076     ulint decomp= fil_page_decompress(tmp_frame, tmp_page);
4077     if (!decomp)
4078       return false; /* decompression failed */
4079     if (decomp == srv_page_size)
4080       return false; /* the page was not compressed (invalid page type) */
4081     return !buf_page_is_corrupted(true, tmp_page, page_size, space);
4082   }
4083 
4084   return !buf_page_is_corrupted(true, page, page_size, space);
4085 }
4086 
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)4087 byte *recv_dblwr_t::find_page(const page_id_t page_id,
4088                               const fil_space_t *space, byte *tmp_buf)
4089 {
4090   byte *result= NULL;
4091   lsn_t max_lsn= 0;
4092 
4093   for (list::const_iterator i = pages.begin(); i != pages.end(); ++i)
4094   {
4095     byte *page= *i;
4096     if (page_get_page_no(page) != page_id.page_no() ||
4097         page_get_space_id(page) != page_id.space())
4098       continue;
4099     const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
4100     if (lsn <= max_lsn ||
4101         !validate_page(page_id, page, space, tmp_buf))
4102     {
4103       /* Mark processed for subsequent iterations in buf_dblwr_process() */
4104       memset(page + FIL_PAGE_LSN, 0, 8);
4105       continue;
4106     }
4107     max_lsn= lsn;
4108     result= page;
4109   }
4110 
4111   return result;
4112 }
4113 
4114 #ifndef DBUG_OFF
4115 /** Return string name of the redo log record type.
4116 @param[in]	type	record log record enum
4117 @return string name of record log record */
get_mlog_string(mlog_id_t type)4118 static const char* get_mlog_string(mlog_id_t type)
4119 {
4120 	switch (type) {
4121 	case MLOG_SINGLE_REC_FLAG:
4122 		return("MLOG_SINGLE_REC_FLAG");
4123 
4124 	case MLOG_1BYTE:
4125 		return("MLOG_1BYTE");
4126 
4127 	case MLOG_2BYTES:
4128 		return("MLOG_2BYTES");
4129 
4130 	case MLOG_4BYTES:
4131 		return("MLOG_4BYTES");
4132 
4133 	case MLOG_8BYTES:
4134 		return("MLOG_8BYTES");
4135 
4136 	case MLOG_REC_INSERT:
4137 		return("MLOG_REC_INSERT");
4138 
4139 	case MLOG_REC_CLUST_DELETE_MARK:
4140 		return("MLOG_REC_CLUST_DELETE_MARK");
4141 
4142 	case MLOG_REC_SEC_DELETE_MARK:
4143 		return("MLOG_REC_SEC_DELETE_MARK");
4144 
4145 	case MLOG_REC_UPDATE_IN_PLACE:
4146 		return("MLOG_REC_UPDATE_IN_PLACE");
4147 
4148 	case MLOG_REC_DELETE:
4149 		return("MLOG_REC_DELETE");
4150 
4151 	case MLOG_LIST_END_DELETE:
4152 		return("MLOG_LIST_END_DELETE");
4153 
4154 	case MLOG_LIST_START_DELETE:
4155 		return("MLOG_LIST_START_DELETE");
4156 
4157 	case MLOG_LIST_END_COPY_CREATED:
4158 		return("MLOG_LIST_END_COPY_CREATED");
4159 
4160 	case MLOG_PAGE_REORGANIZE:
4161 		return("MLOG_PAGE_REORGANIZE");
4162 
4163 	case MLOG_PAGE_CREATE:
4164 		return("MLOG_PAGE_CREATE");
4165 
4166 	case MLOG_UNDO_INSERT:
4167 		return("MLOG_UNDO_INSERT");
4168 
4169 	case MLOG_UNDO_ERASE_END:
4170 		return("MLOG_UNDO_ERASE_END");
4171 
4172 	case MLOG_UNDO_INIT:
4173 		return("MLOG_UNDO_INIT");
4174 
4175 	case MLOG_UNDO_HDR_REUSE:
4176 		return("MLOG_UNDO_HDR_REUSE");
4177 
4178 	case MLOG_UNDO_HDR_CREATE:
4179 		return("MLOG_UNDO_HDR_CREATE");
4180 
4181 	case MLOG_REC_MIN_MARK:
4182 		return("MLOG_REC_MIN_MARK");
4183 
4184 	case MLOG_IBUF_BITMAP_INIT:
4185 		return("MLOG_IBUF_BITMAP_INIT");
4186 
4187 #ifdef UNIV_LOG_LSN_DEBUG
4188 	case MLOG_LSN:
4189 		return("MLOG_LSN");
4190 #endif /* UNIV_LOG_LSN_DEBUG */
4191 
4192 	case MLOG_WRITE_STRING:
4193 		return("MLOG_WRITE_STRING");
4194 
4195 	case MLOG_MULTI_REC_END:
4196 		return("MLOG_MULTI_REC_END");
4197 
4198 	case MLOG_DUMMY_RECORD:
4199 		return("MLOG_DUMMY_RECORD");
4200 
4201 	case MLOG_FILE_DELETE:
4202 		return("MLOG_FILE_DELETE");
4203 
4204 	case MLOG_COMP_REC_MIN_MARK:
4205 		return("MLOG_COMP_REC_MIN_MARK");
4206 
4207 	case MLOG_COMP_PAGE_CREATE:
4208 		return("MLOG_COMP_PAGE_CREATE");
4209 
4210 	case MLOG_COMP_REC_INSERT:
4211 		return("MLOG_COMP_REC_INSERT");
4212 
4213 	case MLOG_COMP_REC_CLUST_DELETE_MARK:
4214 		return("MLOG_COMP_REC_CLUST_DELETE_MARK");
4215 
4216 	case MLOG_COMP_REC_UPDATE_IN_PLACE:
4217 		return("MLOG_COMP_REC_UPDATE_IN_PLACE");
4218 
4219 	case MLOG_COMP_REC_DELETE:
4220 		return("MLOG_COMP_REC_DELETE");
4221 
4222 	case MLOG_COMP_LIST_END_DELETE:
4223 		return("MLOG_COMP_LIST_END_DELETE");
4224 
4225 	case MLOG_COMP_LIST_START_DELETE:
4226 		return("MLOG_COMP_LIST_START_DELETE");
4227 
4228 	case MLOG_COMP_LIST_END_COPY_CREATED:
4229 		return("MLOG_COMP_LIST_END_COPY_CREATED");
4230 
4231 	case MLOG_COMP_PAGE_REORGANIZE:
4232 		return("MLOG_COMP_PAGE_REORGANIZE");
4233 
4234 	case MLOG_FILE_CREATE2:
4235 		return("MLOG_FILE_CREATE2");
4236 
4237 	case MLOG_ZIP_WRITE_NODE_PTR:
4238 		return("MLOG_ZIP_WRITE_NODE_PTR");
4239 
4240 	case MLOG_ZIP_WRITE_BLOB_PTR:
4241 		return("MLOG_ZIP_WRITE_BLOB_PTR");
4242 
4243 	case MLOG_ZIP_WRITE_HEADER:
4244 		return("MLOG_ZIP_WRITE_HEADER");
4245 
4246 	case MLOG_ZIP_PAGE_COMPRESS:
4247 		return("MLOG_ZIP_PAGE_COMPRESS");
4248 
4249 	case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
4250 		return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
4251 
4252 	case MLOG_ZIP_PAGE_REORGANIZE:
4253 		return("MLOG_ZIP_PAGE_REORGANIZE");
4254 
4255 	case MLOG_ZIP_WRITE_TRX_ID:
4256 		return("MLOG_ZIP_WRITE_TRX_ID");
4257 
4258 	case MLOG_FILE_RENAME2:
4259 		return("MLOG_FILE_RENAME2");
4260 
4261 	case MLOG_FILE_NAME:
4262 		return("MLOG_FILE_NAME");
4263 
4264 	case MLOG_CHECKPOINT:
4265 		return("MLOG_CHECKPOINT");
4266 
4267 	case MLOG_PAGE_CREATE_RTREE:
4268 		return("MLOG_PAGE_CREATE_RTREE");
4269 
4270 	case MLOG_COMP_PAGE_CREATE_RTREE:
4271 		return("MLOG_COMP_PAGE_CREATE_RTREE");
4272 
4273 	case MLOG_INIT_FILE_PAGE2:
4274 		return("MLOG_INIT_FILE_PAGE2");
4275 
4276 	case MLOG_INDEX_LOAD:
4277 		return("MLOG_INDEX_LOAD");
4278 
4279 	case MLOG_TRUNCATE:
4280 		return("MLOG_TRUNCATE");
4281 
4282 	case MLOG_FILE_WRITE_CRYPT_DATA:
4283 		return("MLOG_FILE_WRITE_CRYPT_DATA");
4284 	}
4285 	DBUG_ASSERT(0);
4286 	return(NULL);
4287 }
4288 #endif /* !DBUG_OFF */
4289