1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2013, 2022, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file log/log0recv.cc
22 Recovery
23 
24 Created 9/20/1997 Heikki Tuuri
25 *******************************************************/
26 
27 #include "univ.i"
28 
29 #include <map>
30 #include <string>
31 #include <my_service_manager.h>
32 
33 #include "log0recv.h"
34 
35 #ifdef HAVE_MY_AES_H
36 #include <my_aes.h>
37 #endif
38 
39 #include "log0crypt.h"
40 #include "mem0mem.h"
41 #include "buf0buf.h"
42 #include "buf0dblwr.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0page.h"
47 #include "page0cur.h"
48 #include "trx0undo.h"
49 #include "ibuf0ibuf.h"
50 #include "trx0undo.h"
51 #include "trx0rec.h"
52 #include "fil0fil.h"
53 #include "buf0rea.h"
54 #include "srv0srv.h"
55 #include "srv0start.h"
56 #include "fil0pagecompress.h"
57 
58 /** The recovery system */
59 recv_sys_t	recv_sys;
60 /** TRUE when recv_init_crash_recovery() has been called. */
61 bool	recv_needed_recovery;
62 #ifdef UNIV_DEBUG
63 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
64 Protected by log_sys.mutex. */
65 bool	recv_no_log_write = false;
66 #endif /* UNIV_DEBUG */
67 
68 /** TRUE if buf_page_is_corrupted() should check if the log sequence
69 number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
70 recv_recovery_from_checkpoint_start(). */
71 bool	recv_lsn_checks_on;
72 
73 /** If the following is TRUE, the buffer pool file pages must be invalidated
74 after recovery and no ibuf operations are allowed; this becomes TRUE if
75 the log record hash table becomes too full, and log records must be merged
76 to file pages already before the recovery is finished: in this case no
77 ibuf operations are allowed, as they could modify the pages read in the
78 buffer pool before the pages have been recovered to the up-to-date state.
79 
80 true means that recovery is running and no operations on the log file
81 are allowed yet: the variable name is misleading. */
82 bool	recv_no_ibuf_operations;
83 
84 /** The maximum lsn we see for a page during the recovery process. If this
85 is bigger than the lsn we are able to scan up to, that is an indication that
86 the recovery failed and the database may be corrupt. */
87 static lsn_t	recv_max_page_lsn;
88 
89 /** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */
90 struct log_phys_t : public log_rec_t
91 {
92   /** start LSN of the mini-transaction (not necessarily of this record) */
93   const lsn_t start_lsn;
94 private:
95   /** @return the start of length and data */
startlog_phys_t96   const byte *start() const
97   {
98     return my_assume_aligned<sizeof(size_t)>
99       (reinterpret_cast<const byte*>(&start_lsn + 1));
100   }
101   /** @return the start of length and data */
startlog_phys_t102   byte *start()
103   { return const_cast<byte*>(const_cast<const log_phys_t*>(this)->start()); }
104   /** @return the length of the following record */
lenlog_phys_t105   uint16_t len() const { uint16_t i; memcpy(&i, start(), 2); return i; }
106 
107   /** @return start of the log records */
beginlog_phys_t108   byte *begin() { return start() + 2; }
109   /** @return end of the log records */
endlog_phys_t110   byte *end() { byte *e= begin() + len(); ut_ad(!*e); return e; }
111 public:
112   /** @return start of the log records */
beginlog_phys_t113   const byte *begin() const { return const_cast<log_phys_t*>(this)->begin(); }
114   /** @return end of the log records */
endlog_phys_t115   const byte *end() const { return const_cast<log_phys_t*>(this)->end(); }
116 
117   /** Determine the allocated size of the object.
118   @param len  length of recs, excluding terminating NUL byte
119   @return the total allocation size */
120   static inline size_t alloc_size(size_t len);
121 
122   /** Constructor.
123   @param start_lsn start LSN of the mini-transaction
124   @param lsn  mtr_t::commit_lsn() of the mini-transaction
125   @param recs the first log record for the page in the mini-transaction
126   @param size length of recs, in bytes, excluding terminating NUL byte */
log_phys_tlog_phys_t127   log_phys_t(lsn_t start_lsn, lsn_t lsn, const byte *recs, size_t size) :
128     log_rec_t(lsn), start_lsn(start_lsn)
129   {
130     ut_ad(start_lsn);
131     ut_ad(start_lsn < lsn);
132     const uint16_t len= static_cast<uint16_t>(size);
133     ut_ad(len == size);
134     memcpy(start(), &len, 2);
135     reinterpret_cast<byte*>(memcpy(begin(), recs, size))[size]= 0;
136   }
137 
138   /** Append a record to the log.
139   @param recs  log to append
140   @param size  size of the log, in bytes */
appendlog_phys_t141   void append(const byte *recs, size_t size)
142   {
143     ut_ad(start_lsn < lsn);
144     uint16_t l= len();
145     reinterpret_cast<byte*>(memcpy(end(), recs, size))[size]= 0;
146     l= static_cast<uint16_t>(l + size);
147     memcpy(start(), &l, 2);
148   }
149 
150   /** Apply an UNDO_APPEND record.
151   @see mtr_t::undo_append()
152   @param block   undo log page
153   @param data    undo log record
154   @param len     length of the undo log record
155   @return whether the operation failed (inconcistency was noticed) */
undo_appendlog_phys_t156   static bool undo_append(const buf_block_t &block, const byte *data,
157                           size_t len)
158   {
159     ut_ad(len > 2);
160     byte *free_p= my_assume_aligned<2>
161       (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.frame);
162     const uint16_t free= mach_read_from_2(free_p);
163     if (UNIV_UNLIKELY(free < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE ||
164                       free + len + 6 >= srv_page_size - FIL_PAGE_DATA_END))
165     {
166       ib::error() << "Not applying UNDO_APPEND due to corruption on "
167                   << block.page.id();
168       return true;
169     }
170 
171     byte *p= block.frame + free;
172     mach_write_to_2(free_p, free + 4 + len);
173     memcpy(p, free_p, 2);
174     p+= 2;
175     memcpy(p, data, len);
176     p+= len;
177     mach_write_to_2(p, free);
178     return false;
179   }
180 
181   /** The status of apply() */
182   enum apply_status {
183     /** The page was not affected */
184     APPLIED_NO= 0,
185     /** The page was modified */
186     APPLIED_YES,
187     /** The page was modified, affecting the encryption parameters */
188     APPLIED_TO_ENCRYPTION,
189     /** The page was modified, affecting the tablespace header */
190     APPLIED_TO_FSP_HEADER
191   };
192 
193   /** Apply log to a page frame.
194   @param[in,out] block         buffer block
195   @param[in,out] last_offset   last byte offset, for same_page records
196   @return whether any log was applied to the page */
applylog_phys_t197   apply_status apply(const buf_block_t &block, uint16_t &last_offset) const
198   {
199     const byte * const recs= begin();
200     byte *const frame= block.page.zip.ssize
201       ? block.page.zip.data : block.frame;
202     const size_t size= block.physical_size();
203     apply_status applied= APPLIED_NO;
204 
205     for (const byte *l= recs;;)
206     {
207       const byte b= *l++;
208       if (!b)
209         return applied;
210       ut_ad((b & 0x70) != RESERVED);
211       size_t rlen= b & 0xf;
212       if (!rlen)
213       {
214         const size_t lenlen= mlog_decode_varint_length(*l);
215         const uint32_t addlen= mlog_decode_varint(l);
216         ut_ad(addlen != MLOG_DECODE_ERROR);
217         rlen= addlen + 15 - lenlen;
218         l+= lenlen;
219       }
220       if (!(b & 0x80))
221       {
222         /* Skip the page identifier. It has already been validated. */
223         size_t idlen= mlog_decode_varint_length(*l);
224         ut_ad(idlen <= 5);
225         ut_ad(idlen < rlen);
226         ut_ad(mlog_decode_varint(l) == block.page.id().space());
227         l+= idlen;
228         rlen-= idlen;
229         idlen= mlog_decode_varint_length(*l);
230         ut_ad(idlen <= 5);
231         ut_ad(idlen <= rlen);
232         ut_ad(mlog_decode_varint(l) == block.page.id().page_no());
233         l+= idlen;
234         rlen-= idlen;
235         last_offset= 0;
236       }
237 
238       switch (b & 0x70) {
239       case FREE_PAGE:
240         ut_ad(last_offset == 0);
241         goto next_not_same_page;
242       case INIT_PAGE:
243         if (UNIV_LIKELY(rlen == 0))
244         {
245           memset_aligned<UNIV_ZIP_SIZE_MIN>(frame, 0, size);
246           mach_write_to_4(frame + FIL_PAGE_OFFSET, block.page.id().page_no());
247           memset_aligned<8>(FIL_PAGE_PREV + frame, 0xff, 8);
248           mach_write_to_4(frame + FIL_PAGE_SPACE_ID, block.page.id().space());
249           last_offset= FIL_PAGE_TYPE;
250       next_after_applying:
251           if (applied == APPLIED_NO)
252             applied= APPLIED_YES;
253         }
254         else
255         {
256       record_corrupted:
257           if (!srv_force_recovery)
258           {
259             recv_sys.found_corrupt_log= true;
260             return applied;
261           }
262       next_not_same_page:
263           last_offset= 1; /* the next record must not be same_page  */
264         }
265       next:
266         l+= rlen;
267         continue;
268       }
269 
270       ut_ad(mach_read_from_4(frame + FIL_PAGE_OFFSET) ==
271             block.page.id().page_no());
272       ut_ad(mach_read_from_4(frame + FIL_PAGE_SPACE_ID) ==
273             block.page.id().space());
274       ut_ad(last_offset <= 1 || last_offset > 8);
275       ut_ad(last_offset <= size);
276 
277       switch (b & 0x70) {
278       case OPTION:
279         goto next;
280       case EXTENDED:
281         if (UNIV_UNLIKELY(block.page.id().page_no() < 3 ||
282                           block.page.zip.ssize))
283           goto record_corrupted;
284         static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
285         static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
286         if (UNIV_UNLIKELY(!rlen))
287           goto record_corrupted;
288         switch (const byte subtype= *l) {
289           uint8_t ll;
290           size_t prev_rec, hdr_size;
291         default:
292           goto record_corrupted;
293         case INIT_ROW_FORMAT_REDUNDANT:
294         case INIT_ROW_FORMAT_DYNAMIC:
295           if (UNIV_UNLIKELY(rlen != 1))
296             goto record_corrupted;
297           page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT);
298           break;
299         case UNDO_INIT:
300           if (UNIV_UNLIKELY(rlen != 1))
301             goto record_corrupted;
302           trx_undo_page_init(block);
303           break;
304         case UNDO_APPEND:
305           if (UNIV_UNLIKELY(rlen <= 3))
306             goto record_corrupted;
307           if (undo_append(block, ++l, --rlen) && !srv_force_recovery)
308           {
309 page_corrupted:
310             ib::error() << "Set innodb_force_recovery=1 to ignore corruption.";
311             recv_sys.found_corrupt_log= true;
312             return applied;
313           }
314           break;
315         case INSERT_HEAP_REDUNDANT:
316         case INSERT_REUSE_REDUNDANT:
317         case INSERT_HEAP_DYNAMIC:
318         case INSERT_REUSE_DYNAMIC:
319           if (UNIV_UNLIKELY(rlen < 2))
320             goto record_corrupted;
321           rlen--;
322           ll= mlog_decode_varint_length(*++l);
323           if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
324             goto record_corrupted;
325           prev_rec= mlog_decode_varint(l);
326           ut_ad(prev_rec != MLOG_DECODE_ERROR);
327           rlen-= ll;
328           l+= ll;
329           ll= mlog_decode_varint_length(*l);
330           static_assert(INSERT_HEAP_REDUNDANT == 4, "compatibility");
331           static_assert(INSERT_REUSE_REDUNDANT == 5, "compatibility");
332           static_assert(INSERT_HEAP_DYNAMIC == 6, "compatibility");
333           static_assert(INSERT_REUSE_DYNAMIC == 7, "compatibility");
334           if (subtype & 2)
335           {
336             size_t shift= 0;
337             if (subtype & 1)
338             {
339               if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
340                 goto record_corrupted;
341               shift= mlog_decode_varint(l);
342               ut_ad(shift != MLOG_DECODE_ERROR);
343               rlen-= ll;
344               l+= ll;
345               ll= mlog_decode_varint_length(*l);
346             }
347             if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
348               goto record_corrupted;
349             size_t enc_hdr_l= mlog_decode_varint(l);
350             ut_ad(enc_hdr_l != MLOG_DECODE_ERROR);
351             rlen-= ll;
352             l+= ll;
353             ll= mlog_decode_varint_length(*l);
354             if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
355               goto record_corrupted;
356             size_t hdr_c= mlog_decode_varint(l);
357             ut_ad(hdr_c != MLOG_DECODE_ERROR);
358             rlen-= ll;
359             l+= ll;
360             ll= mlog_decode_varint_length(*l);
361             if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
362               goto record_corrupted;
363             size_t data_c= mlog_decode_varint(l);
364             ut_ad(data_c != MLOG_DECODE_ERROR);
365             rlen-= ll;
366             l+= ll;
367             if (page_apply_insert_dynamic(block, subtype & 1, prev_rec,
368                                           shift, enc_hdr_l, hdr_c, data_c,
369                                           l, rlen) && !srv_force_recovery)
370               goto page_corrupted;
371           }
372           else
373           {
374             if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
375               goto record_corrupted;
376             size_t header= mlog_decode_varint(l);
377             ut_ad(header != MLOG_DECODE_ERROR);
378             rlen-= ll;
379             l+= ll;
380             ll= mlog_decode_varint_length(*l);
381             if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
382               goto record_corrupted;
383             size_t hdr_c= mlog_decode_varint(l);
384             ut_ad(hdr_c != MLOG_DECODE_ERROR);
385             rlen-= ll;
386             l+= ll;
387             ll= mlog_decode_varint_length(*l);
388             if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
389               goto record_corrupted;
390             size_t data_c= mlog_decode_varint(l);
391             rlen-= ll;
392             l+= ll;
393             if (page_apply_insert_redundant(block, subtype & 1, prev_rec,
394                                             header, hdr_c, data_c,
395                                             l, rlen) && !srv_force_recovery)
396               goto page_corrupted;
397           }
398           break;
399         case DELETE_ROW_FORMAT_REDUNDANT:
400           if (UNIV_UNLIKELY(rlen < 2 || rlen > 4))
401             goto record_corrupted;
402           rlen--;
403           ll= mlog_decode_varint_length(*++l);
404           if (UNIV_UNLIKELY(ll != rlen))
405             goto record_corrupted;
406           if (page_apply_delete_redundant(block, mlog_decode_varint(l)) &&
407               !srv_force_recovery)
408             goto page_corrupted;
409           break;
410         case DELETE_ROW_FORMAT_DYNAMIC:
411           if (UNIV_UNLIKELY(rlen < 2))
412             goto record_corrupted;
413           rlen--;
414           ll= mlog_decode_varint_length(*++l);
415           if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
416             goto record_corrupted;
417           prev_rec= mlog_decode_varint(l);
418           ut_ad(prev_rec != MLOG_DECODE_ERROR);
419           rlen-= ll;
420           l+= ll;
421           ll= mlog_decode_varint_length(*l);
422           if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
423             goto record_corrupted;
424           hdr_size= mlog_decode_varint(l);
425           ut_ad(hdr_size != MLOG_DECODE_ERROR);
426           rlen-= ll;
427           l+= ll;
428           ll= mlog_decode_varint_length(*l);
429           if (UNIV_UNLIKELY(ll > 3 || ll != rlen))
430             goto record_corrupted;
431           if (page_apply_delete_dynamic(block, prev_rec, hdr_size,
432                                         mlog_decode_varint(l)) &&
433               !srv_force_recovery)
434             goto page_corrupted;
435           break;
436         }
437         last_offset= FIL_PAGE_TYPE;
438         goto next_after_applying;
439       case WRITE:
440       case MEMSET:
441       case MEMMOVE:
442         if (UNIV_UNLIKELY(last_offset == 1))
443           goto record_corrupted;
444         const size_t olen= mlog_decode_varint_length(*l);
445         if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3))
446           goto record_corrupted;
447         const uint32_t offset= mlog_decode_varint(l);
448         ut_ad(offset != MLOG_DECODE_ERROR);
449         static_assert(FIL_PAGE_OFFSET == 4, "compatibility");
450         if (UNIV_UNLIKELY(offset >= size))
451           goto record_corrupted;
452         if (UNIV_UNLIKELY(offset + last_offset < 8 ||
453                           offset + last_offset >= size))
454           goto record_corrupted;
455         last_offset= static_cast<uint16_t>(last_offset + offset);
456         l+= olen;
457         rlen-= olen;
458         size_t llen= rlen;
459         if ((b & 0x70) == WRITE)
460         {
461           if (UNIV_UNLIKELY(rlen + last_offset > size))
462             goto record_corrupted;
463           memcpy(frame + last_offset, l, llen);
464           if (UNIV_LIKELY(block.page.id().page_no()));
465           else if (llen == 11 + MY_AES_BLOCK_SIZE &&
466                    last_offset == FSP_HEADER_OFFSET + MAGIC_SZ +
467                    fsp_header_get_encryption_offset(block.zip_size()))
468             applied= APPLIED_TO_ENCRYPTION;
469           else if (last_offset < FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN + 4 &&
470                    last_offset + llen >= FSP_HEADER_OFFSET + FSP_SIZE)
471             applied= APPLIED_TO_FSP_HEADER;
472         next_after_applying_write:
473           ut_ad(llen + last_offset <= size);
474           last_offset= static_cast<uint16_t>(last_offset + llen);
475           goto next_after_applying;
476         }
477         llen= mlog_decode_varint_length(*l);
478         if (UNIV_UNLIKELY(llen > rlen || llen > 3))
479           goto record_corrupted;
480         const uint32_t len= mlog_decode_varint(l);
481         ut_ad(len != MLOG_DECODE_ERROR);
482         if (UNIV_UNLIKELY(len + last_offset > size))
483           goto record_corrupted;
484         l+= llen;
485         rlen-= llen;
486         llen= len;
487         if ((b & 0x70) == MEMSET)
488         {
489           ut_ad(rlen <= llen);
490           if (UNIV_UNLIKELY(rlen != 1))
491           {
492             size_t s;
493             for (s= 0; s < llen; s+= rlen)
494               memcpy(frame + last_offset + s, l, rlen);
495             memcpy(frame + last_offset + s, l, llen - s);
496           }
497           else
498             memset(frame + last_offset, *l, llen);
499           goto next_after_applying_write;
500         }
501         const size_t slen= mlog_decode_varint_length(*l);
502         if (UNIV_UNLIKELY(slen != rlen || slen > 3))
503           goto record_corrupted;
504         uint32_t s= mlog_decode_varint(l);
505         ut_ad(slen != MLOG_DECODE_ERROR);
506         if (s & 1)
507           s= last_offset - (s >> 1) - 1;
508         else
509           s= last_offset + (s >> 1) + 1;
510         if (UNIV_LIKELY(s >= 8 && s + llen <= size))
511         {
512           memmove(frame + last_offset, frame + s, llen);
513           goto next_after_applying_write;
514         }
515       }
516       goto record_corrupted;
517     }
518   }
519 };
520 
521 
alloc_size(size_t len)522 inline size_t log_phys_t::alloc_size(size_t len)
523 {
524   return len + (1 + 2 + sizeof(log_phys_t));
525 }
526 
527 
528 /** Tablespace item during recovery */
529 struct file_name_t {
530 	/** Tablespace file name (FILE_MODIFY) */
531 	std::string	name;
532 	/** Tablespace object (NULL if not valid or not found) */
533 	fil_space_t*	space = nullptr;
534 
535 	/** Tablespace status. */
536 	enum fil_status {
537 		/** Normal tablespace */
538 		NORMAL,
539 		/** Deleted tablespace */
540 		DELETED,
541 		/** Missing tablespace */
542 		MISSING
543 	};
544 
545 	/** Status of the tablespace */
546 	fil_status	status;
547 
548 	/** FSP_SIZE of tablespace */
549 	uint32_t	size = 0;
550 
551 	/** Freed pages of tablespace */
552 	range_set	freed_ranges;
553 
554 	/** Dummy flags before they have been read from the .ibd file */
555 	static constexpr uint32_t initial_flags = FSP_FLAGS_FCRC32_MASK_MARKER;
556 	/** FSP_SPACE_FLAGS of tablespace */
557 	uint32_t	flags = initial_flags;
558 
559 	/** Constructor */
file_name_tfile_name_t560 	file_name_t(std::string name_, bool deleted)
561 		: name(std::move(name_)), status(deleted ? DELETED: NORMAL) {}
562 
563   /** Add the freed pages */
add_freed_pagefile_name_t564   void add_freed_page(uint32_t page_no) { freed_ranges.add_value(page_no); }
565 
566   /** Remove the freed pages */
remove_freed_pagefile_name_t567   void remove_freed_page(uint32_t page_no)
568   {
569     if (freed_ranges.empty()) return;
570     freed_ranges.remove_value(page_no);
571   }
572 };
573 
574 /** Map of dirty tablespaces during recovery */
575 typedef std::map<
576 	ulint,
577 	file_name_t,
578 	std::less<ulint>,
579 	ut_allocator<std::pair<const ulint, file_name_t> > >	recv_spaces_t;
580 
581 static recv_spaces_t	recv_spaces;
582 
583 /** The last parsed FILE_RENAME records */
584 static std::map<uint32_t,std::string> renamed_spaces;
585 
586 /** Report an operation to create, delete, or rename a file during backup.
587 @param[in]	space_id	tablespace identifier
588 @param[in]	create		whether the file is being created
589 @param[in]	name		file name (not NUL-terminated)
590 @param[in]	len		length of name, in bytes
591 @param[in]	new_name	new file name (NULL if not rename)
592 @param[in]	new_len		length of new_name, in bytes (0 if NULL) */
593 void (*log_file_op)(ulint space_id, bool create,
594 		    const byte* name, ulint len,
595 		    const byte* new_name, ulint new_len);
596 
597 /** Information about initializing page contents during redo log processing.
598 FIXME: Rely on recv_sys.pages! */
599 class mlog_init_t
600 {
601 public:
602 	/** A page initialization operation that was parsed from
603 	the redo log */
604 	struct init {
605 		/** log sequence number of the page initialization */
606 		lsn_t lsn;
607 		/** Whether btr_page_create() avoided a read of the page.
608 
609 		At the end of the last recovery batch, mark_ibuf_exist()
610 		will mark pages for which this flag is set. */
611 		bool created;
612 	};
613 
614 private:
615 	typedef std::map<const page_id_t, init,
616 			 std::less<const page_id_t>,
617 			 ut_allocator<std::pair<const page_id_t, init> > >
618 		map;
619 	/** Map of page initialization operations.
620 	FIXME: Merge this to recv_sys.pages! */
621 	map inits;
622 public:
623 	/** Record that a page will be initialized by the redo log.
624 	@param[in]	page_id		page identifier
625 	@param[in]	lsn		log sequence number
626 	@return whether the state was changed */
add(const page_id_t page_id,lsn_t lsn)627 	bool add(const page_id_t page_id, lsn_t lsn)
628 	{
629 		ut_ad(mutex_own(&recv_sys.mutex));
630 		const init init = { lsn, false };
631 		std::pair<map::iterator, bool> p = inits.insert(
632 			map::value_type(page_id, init));
633 		ut_ad(!p.first->second.created);
634 		if (p.second) return true;
635 		if (p.first->second.lsn >= init.lsn) return false;
636 		p.first->second = init;
637 		return true;
638 	}
639 
640 	/** Get the last stored lsn of the page id and its respective
641 	init/load operation.
642 	@param[in]	page_id	page id
643 	@param[in,out]	init	initialize log or load log
644 	@return the latest page initialization;
645 	not valid after releasing recv_sys.mutex. */
last(page_id_t page_id)646 	init& last(page_id_t page_id)
647 	{
648 		ut_ad(mutex_own(&recv_sys.mutex));
649 		return inits.find(page_id)->second;
650 	}
651 
652 	/** Determine if a page will be initialized or freed after a time.
653 	@param page_id      page identifier
654 	@param lsn          log sequence number
655 	@return whether page_id will be freed or initialized after lsn */
will_avoid_read(page_id_t page_id,lsn_t lsn) const656 	bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
657 	{
658 		ut_ad(mutex_own(&recv_sys.mutex));
659 		auto i= inits.find(page_id);
660 		return i != inits.end() && i->second.lsn > lsn;
661 	}
662 
663 	/** At the end of each recovery batch, reset the 'created' flags. */
reset()664 	void reset()
665 	{
666 		ut_ad(mutex_own(&recv_sys.mutex));
667 		ut_ad(recv_no_ibuf_operations);
668 		for (map::value_type& i : inits) {
669 			i.second.created = false;
670 		}
671 	}
672 
673 	/** On the last recovery batch, mark whether there exist
674 	buffered changes for the pages that were initialized
675 	by buf_page_create() and still reside in the buffer pool.
676 	@param[in,out]	mtr	dummy mini-transaction */
mark_ibuf_exist(mtr_t & mtr)677 	void mark_ibuf_exist(mtr_t& mtr)
678 	{
679 		ut_ad(mutex_own(&recv_sys.mutex));
680 		mtr.start();
681 
682 		for (const map::value_type& i : inits) {
683 			if (!i.second.created) {
684 				continue;
685 			}
686 			if (buf_block_t* block = buf_page_get_low(
687 				    i.first, 0, RW_X_LATCH, nullptr,
688 				    BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
689 				    &mtr, nullptr, false)) {
690 				if (UNIV_LIKELY_NULL(block->page.zip.data)) {
691 					switch (fil_page_get_type(
692 							block->page.zip.data)) {
693 					case FIL_PAGE_INDEX:
694 					case FIL_PAGE_RTREE:
695 						if (page_zip_decompress(
696 							    &block->page.zip,
697 							    block->frame,
698 							    true)) {
699 							break;
700 						}
701 						ib::error() << "corrupted "
702 							    << block->page.id();
703 					}
704 				}
705 				if (recv_no_ibuf_operations) {
706 					mtr.commit();
707 					mtr.start();
708 					continue;
709 				}
710 				mutex_exit(&recv_sys.mutex);
711 				block->page.ibuf_exist = ibuf_page_exists(
712 					block->page.id(), block->zip_size());
713 				mtr.commit();
714 				mtr.start();
715 				mutex_enter(&recv_sys.mutex);
716 			}
717 		}
718 
719 		mtr.commit();
720 	}
721 
722 	/** Clear the data structure */
clear()723 	void clear() { inits.clear(); }
724 };
725 
726 static mlog_init_t mlog_init;
727 
728 /** Process a record that indicates that a tablespace is
729 being shrunk in size.
730 @param page_id	first page identifier that is not in the file
731 @param lsn	log sequence number of the shrink operation */
trim(const page_id_t page_id,lsn_t lsn)732 inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
733 {
734 	DBUG_ENTER("recv_sys_t::trim");
735 	DBUG_LOG("ib_log",
736 		 "discarding log beyond end of tablespace "
737 		 << page_id << " before LSN " << lsn);
738 	ut_ad(mutex_own(&mutex));
739 	for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
740 	     p != pages.end() && p->first.space() == page_id.space();) {
741 		recv_sys_t::map::iterator r = p++;
742 		if (r->second.trim(lsn)) {
743 			pages.erase(r);
744 		}
745 	}
746 	if (fil_space_t* space = fil_space_get(page_id.space())) {
747 		ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
748 		fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
749 		ut_ad(file->is_open());
750 		os_file_truncate(file->name, file->handle,
751 				 os_offset_t{page_id.page_no()}
752 				 << srv_page_size_shift, true);
753 	}
754 	DBUG_VOID_RETURN;
755 }
756 
open_log_files_if_needed()757 void recv_sys_t::open_log_files_if_needed()
758 {
759   if (!recv_sys.files.empty())
760     return;
761 
762   for (auto &&path : get_existing_log_files_paths())
763   {
764     recv_sys.files.emplace_back(std::move(path));
765     ut_a(recv_sys.files.back().open(true) == DB_SUCCESS);
766   }
767 }
768 
read(os_offset_t total_offset,span<byte> buf)769 void recv_sys_t::read(os_offset_t total_offset, span<byte> buf)
770 {
771   open_log_files_if_needed();
772 
773   size_t file_idx= static_cast<size_t>(total_offset / log_sys.log.file_size);
774   os_offset_t offset= total_offset % log_sys.log.file_size;
775   dberr_t err= recv_sys.files[file_idx].read(offset, buf);
776   ut_a(err == DB_SUCCESS);
777 }
778 
files_size()779 inline size_t recv_sys_t::files_size()
780 {
781   open_log_files_if_needed();
782   return files.size();
783 }
784 
785 /** Process a file name from a FILE_* record.
786 @param[in,out]	name		file name
787 @param[in]	len		length of the file name
788 @param[in]	space_id	the tablespace ID
789 @param[in]	deleted		whether this is a FILE_DELETE record */
790 static
791 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)792 fil_name_process(char* name, ulint len, ulint space_id, bool deleted)
793 {
794 	if (srv_operation == SRV_OPERATION_BACKUP) {
795 		return;
796 	}
797 
798 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
799 	      || srv_operation == SRV_OPERATION_RESTORE
800 	      || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
801 
802 	/* We will also insert space=NULL into the map, so that
803 	further checks can ensure that a FILE_MODIFY record was
804 	scanned before applying any page records for the space_id. */
805 
806 	os_normalize_path(name);
807 	const file_name_t fname(std::string(name, len), deleted);
808 	std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.emplace(
809 		space_id, fname);
810 	ut_ad(p.first->first == space_id);
811 
812 	file_name_t&	f = p.first->second;
813 
814 	if (deleted) {
815 		/* Got FILE_DELETE */
816 
817 		if (!p.second && f.status != file_name_t::DELETED) {
818 			f.status = file_name_t::DELETED;
819 			if (f.space != NULL) {
820 				fil_space_free(space_id, false);
821 				f.space = NULL;
822 			}
823 		}
824 
825 		ut_ad(f.space == NULL);
826 	} else if (p.second // the first FILE_MODIFY or FILE_RENAME
827 		   || f.name != fname.name) {
828 		fil_space_t*	space;
829 
830 		/* Check if the tablespace file exists and contains
831 		the space_id. If not, ignore the file after displaying
832 		a note. Abort if there are multiple files with the
833 		same space_id. */
834 		switch (fil_ibd_load(space_id, name, space)) {
835 		case FIL_LOAD_OK:
836 			ut_ad(space != NULL);
837 
838 			if (!f.space) {
839 				if (f.size
840 				    || f.flags != f.initial_flags) {
841 					fil_space_set_recv_size_and_flags(
842 						space->id, f.size, f.flags);
843 				}
844 
845 				f.space = space;
846 				goto same_space;
847 			} else if (f.space == space) {
848 same_space:
849 				f.name = fname.name;
850 				f.status = file_name_t::NORMAL;
851 			} else {
852 				ib::error() << "Tablespace " << space_id
853 					<< " has been found in two places: '"
854 					<< f.name << "' and '" << name << "'."
855 					" You must delete one of them.";
856 				recv_sys.found_corrupt_fs = true;
857 			}
858 			break;
859 
860 		case FIL_LOAD_ID_CHANGED:
861 			ut_ad(space == NULL);
862 			break;
863 
864 		case FIL_LOAD_NOT_FOUND:
865 			/* No matching tablespace was found; maybe it
866 			was renamed, and we will find a subsequent
867 			FILE_* record. */
868 			ut_ad(space == NULL);
869 
870 			if (srv_force_recovery) {
871 				/* Without innodb_force_recovery,
872 				missing tablespaces will only be
873 				reported in
874 				recv_init_crash_recovery_spaces().
875 				Enable some more diagnostics when
876 				forcing recovery. */
877 
878 				ib::info()
879 					<< "At LSN: " << recv_sys.recovered_lsn
880 					<< ": unable to open file " << name
881 					<< " for tablespace " << space_id;
882 			}
883 			break;
884 
885 		case FIL_LOAD_INVALID:
886 			ut_ad(space == NULL);
887 			if (srv_force_recovery == 0) {
888 				ib::warn() << "We do not continue the crash"
889 					" recovery, because the table may"
890 					" become corrupt if we cannot apply"
891 					" the log records in the InnoDB log to"
892 					" it. To fix the problem and start"
893 					" mysqld:";
894 				ib::info() << "1) If there is a permission"
895 					" problem in the file and mysqld"
896 					" cannot open the file, you should"
897 					" modify the permissions.";
898 				ib::info() << "2) If the tablespace is not"
899 					" needed, or you can restore an older"
900 					" version from a backup, then you can"
901 					" remove the .ibd file, and use"
902 					" --innodb_force_recovery=1 to force"
903 					" startup without this file.";
904 				ib::info() << "3) If the file system or the"
905 					" disk is broken, and you cannot"
906 					" remove the .ibd file, you can set"
907 					" --innodb_force_recovery.";
908 				recv_sys.found_corrupt_fs = true;
909 				break;
910 			}
911 
912 			ib::info() << "innodb_force_recovery was set to "
913 				<< srv_force_recovery << ". Continuing crash"
914 				" recovery even though we cannot access the"
915 				" files for tablespace " << space_id << ".";
916 			break;
917 		}
918 	}
919 }
920 
921 /** Clean up after recv_sys_t::create() */
close()922 void recv_sys_t::close()
923 {
924   ut_ad(this == &recv_sys);
925 
926   if (is_initialised())
927   {
928     dblwr.pages.clear();
929     ut_d(mutex_enter(&mutex));
930     clear();
931     ut_d(mutex_exit(&mutex));
932 
933     if (buf)
934     {
935       ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
936       buf= nullptr;
937     }
938 
939     last_stored_lsn= 0;
940     mutex_free(&mutex);
941   }
942 
943   recv_spaces.clear();
944   renamed_spaces.clear();
945   mlog_init.clear();
946 
947   close_files();
948 }
949 
950 /** Initialize the redo log recovery subsystem. */
create()951 void recv_sys_t::create()
952 {
953 	ut_ad(this == &recv_sys);
954 	ut_ad(!is_initialised());
955 	mutex_create(LATCH_ID_RECV_SYS, &mutex);
956 
957 	apply_log_recs = false;
958 	apply_batch_on = false;
959 
960 	buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE,
961 						    PSI_INSTRUMENT_ME));
962 	len = 0;
963 	parse_start_lsn = 0;
964 	scanned_lsn = 0;
965 	scanned_checkpoint_no = 0;
966 	recovered_offset = 0;
967 	recovered_lsn = 0;
968 	found_corrupt_log = false;
969 	found_corrupt_fs = false;
970 	mlog_checkpoint_lsn = 0;
971 
972 	progress_time = time(NULL);
973 	recv_max_page_lsn = 0;
974 
975 	memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
976 	last_stored_lsn = 1;
977 	UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU);
978 }
979 
980 /** Clear a fully processed set of stored redo log records. */
clear()981 inline void recv_sys_t::clear()
982 {
983   ut_ad(mutex_own(&mutex));
984   apply_log_recs= false;
985   apply_batch_on= false;
986   ut_ad(!after_apply || !UT_LIST_GET_LAST(blocks));
987   pages.clear();
988 
989   for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
990   {
991     buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
992     ut_ad(block->page.state() == BUF_BLOCK_MEMORY);
993     UT_LIST_REMOVE(blocks, block);
994     MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size);
995     buf_block_free(block);
996     block= prev_block;
997   }
998 }
999 
1000 /** Free most recovery data structures. */
debug_free()1001 void recv_sys_t::debug_free()
1002 {
1003   ut_ad(this == &recv_sys);
1004   ut_ad(is_initialised());
1005   mutex_enter(&mutex);
1006 
1007   recovery_on= false;
1008   pages.clear();
1009   ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
1010 
1011   buf= nullptr;
1012 
1013   mutex_exit(&mutex);
1014 }
1015 
alloc(size_t len)1016 inline void *recv_sys_t::alloc(size_t len)
1017 {
1018   ut_ad(mutex_own(&mutex));
1019   ut_ad(len);
1020   ut_ad(len <= srv_page_size);
1021 
1022   buf_block_t *block= UT_LIST_GET_FIRST(blocks);
1023   if (UNIV_UNLIKELY(!block))
1024   {
1025 create_block:
1026     block= buf_block_alloc();
1027     block->page.access_time= 1U << 16 |
1028       ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT);
1029     static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
1030     UT_LIST_ADD_FIRST(blocks, block);
1031     MEM_MAKE_ADDRESSABLE(block->frame, len);
1032     MEM_NOACCESS(block->frame + len, srv_page_size - len);
1033     return my_assume_aligned<ALIGNMENT>(block->frame);
1034   }
1035 
1036   size_t free_offset= static_cast<uint16_t>(block->page.access_time);
1037   ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
1038   if (UNIV_UNLIKELY(!free_offset))
1039   {
1040     ut_ad(srv_page_size == 65536);
1041     goto create_block;
1042   }
1043   ut_ad(free_offset <= srv_page_size);
1044   free_offset+= len;
1045 
1046   if (free_offset > srv_page_size)
1047     goto create_block;
1048 
1049   block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
1050     ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
1051   MEM_MAKE_ADDRESSABLE(block->frame + free_offset - len, len);
1052   return my_assume_aligned<ALIGNMENT>(block->frame + free_offset - len);
1053 }
1054 
1055 
1056 /** Free a redo log snippet.
1057 @param data buffer returned by alloc() */
free(const void * data)1058 inline void recv_sys_t::free(const void *data)
1059 {
1060   ut_ad(!ut_align_offset(data, ALIGNMENT));
1061   data= page_align(data);
1062   ut_ad(mutex_own(&mutex));
1063 
1064   /* MDEV-14481 FIXME: To prevent race condition with buf_pool.resize(),
1065   we must acquire and hold the buffer pool mutex here. */
1066   ut_ad(!buf_pool.resize_in_progress());
1067 
1068   auto *chunk= buf_pool.chunks;
1069   for (auto i= buf_pool.n_chunks; i--; chunk++)
1070   {
1071     if (data < chunk->blocks->frame)
1072       continue;
1073     const size_t offs= (reinterpret_cast<const byte*>(data) -
1074                         chunk->blocks->frame) >> srv_page_size_shift;
1075     if (offs >= chunk->size)
1076       continue;
1077     buf_block_t *block= &chunk->blocks[offs];
1078     ut_ad(block->frame == data);
1079     ut_ad(block->page.state() == BUF_BLOCK_MEMORY);
1080     ut_ad(static_cast<uint16_t>(block->page.access_time - 1) <
1081           srv_page_size);
1082     ut_ad(block->page.access_time >= 1U << 16);
1083     if (!((block->page.access_time -= 1U << 16) >> 16))
1084     {
1085       UT_LIST_REMOVE(blocks, block);
1086       MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size);
1087       buf_block_free(block);
1088     }
1089     return;
1090   }
1091   ut_ad(0);
1092 }
1093 
1094 
1095 /** Read a log segment to log_sys.buf.
1096 @param[in,out]	start_lsn	in: read area start,
1097 out: the last read valid lsn
1098 @param[in]	end_lsn		read area end
1099 @return	whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)1100 bool log_t::file::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
1101 {
1102 	ulint	len;
1103 	bool success = true;
1104 	mysql_mutex_assert_owner(&log_sys.mutex);
1105 	ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
1106 	ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
1107 	byte* buf = log_sys.buf;
1108 loop:
1109 	lsn_t source_offset = calc_lsn_offset_old(*start_lsn);
1110 
1111 	ut_a(end_lsn - *start_lsn <= ULINT_MAX);
1112 	len = (ulint) (end_lsn - *start_lsn);
1113 
1114 	ut_ad(len != 0);
1115 
1116 	const bool at_eof = (source_offset % file_size) + len > file_size;
1117 	if (at_eof) {
1118 		/* If the above condition is true then len (which is ulint)
1119 		is > the expression below, so the typecast is ok */
1120 		len = ulint(file_size - (source_offset % file_size));
1121 	}
1122 
1123 	log_sys.n_log_ios++;
1124 
1125 	ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
1126 
1127 	recv_sys.read(source_offset, {buf, len});
1128 
1129 	for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
1130 		     buf += OS_FILE_LOG_BLOCK_SIZE,
1131 		     (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
1132 		const ulint block_number = log_block_get_hdr_no(buf);
1133 
1134 		if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
1135 			/* Garbage or an incompletely written log block.
1136 			We will not report any error, because this can
1137 			happen when InnoDB was killed while it was
1138 			writing redo log. We simply treat this as an
1139 			abrupt end of the redo log. */
1140 fail:
1141 			end_lsn = *start_lsn;
1142 			success = false;
1143 			break;
1144 		}
1145 
1146 		ulint crc = log_block_calc_checksum_crc32(buf);
1147 		ulint cksum = log_block_get_checksum(buf);
1148 
1149 		DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
1150 				static int block_counter;
1151 				if (block_counter++ == 0) {
1152 					cksum = crc + 1;
1153 				}
1154 			});
1155 
1156 		DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
1157 
1158 		if (UNIV_UNLIKELY(crc != cksum)) {
1159 			ib::error_or_warn(srv_operation!=SRV_OPERATION_BACKUP)
1160 				<< "Invalid log block checksum. block: "
1161 				<< block_number
1162 				<< " checkpoint no: "
1163 				<< log_block_get_checkpoint_no(buf)
1164 				<< " expected: " << crc
1165 				<< " found: " << cksum;
1166 			goto fail;
1167 		}
1168 
1169 		if (is_encrypted()
1170 		    && !log_crypt(buf, *start_lsn,
1171 				  OS_FILE_LOG_BLOCK_SIZE,
1172 				  LOG_DECRYPT)) {
1173 			goto fail;
1174 		}
1175 
1176 		ulint dl = log_block_get_data_len(buf);
1177 		if (dl < LOG_BLOCK_HDR_SIZE
1178 		    || (dl != OS_FILE_LOG_BLOCK_SIZE
1179 			&& dl > log_sys.trailer_offset())) {
1180 			recv_sys.found_corrupt_log = true;
1181 			goto fail;
1182 		}
1183 	}
1184 
1185 	if (recv_sys.report(time(NULL))) {
1186 		ib::info() << "Read redo log up to LSN=" << *start_lsn;
1187 		service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
1188 			"Read redo log up to LSN=" LSN_PF,
1189 			*start_lsn);
1190 	}
1191 
1192 	if (*start_lsn != end_lsn) {
1193 		goto loop;
1194 	}
1195 
1196 	return(success);
1197 }
1198 
1199 
1200 
1201 /********************************************************//**
1202 Copies a log segment from the most up-to-date log group to the other log
1203 groups, so that they all contain the latest log data. Also writes the info
1204 about the latest checkpoint to the groups, and inits the fields in the group
1205 memory structs to up-to-date values. */
1206 static
1207 void
recv_synchronize_groups()1208 recv_synchronize_groups()
1209 {
1210 	const lsn_t recovered_lsn = recv_sys.recovered_lsn;
1211 
1212 	/* Read the last recovered log block to the recovery system buffer:
1213 	the block is always incomplete */
1214 
1215 	lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1216 					       OS_FILE_LOG_BLOCK_SIZE);
1217 	log_sys.log.read_log_seg(&start_lsn,
1218 				 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1219 	log_sys.log.set_fields(recovered_lsn);
1220 
1221 	/* Copy the checkpoint info to the log; remember that we have
1222 	incremented checkpoint_no by one, and the info will not be written
1223 	over the max checkpoint info, thus making the preservation of max
1224 	checkpoint info on disk certain */
1225 
1226 	if (!srv_read_only_mode) {
1227 		log_write_checkpoint_info(0);
1228 		mysql_mutex_lock(&log_sys.mutex);
1229 	}
1230 }
1231 
1232 /** Check the consistency of a log header block.
1233 @param[in]	log header block
1234 @return true if ok */
1235 static
1236 bool
recv_check_log_header_checksum(const byte * buf)1237 recv_check_log_header_checksum(
1238 	const byte*	buf)
1239 {
1240 	return(log_block_get_checksum(buf)
1241 	       == log_block_calc_checksum_crc32(buf));
1242 }
1243 
redo_file_sizes_are_correct()1244 static bool redo_file_sizes_are_correct()
1245 {
1246   auto paths= get_existing_log_files_paths();
1247   auto get_size= [](const std::string &path) {
1248     return os_file_get_size(path.c_str()).m_total_size;
1249   };
1250   os_offset_t size= get_size(paths[0]);
1251 
1252   auto it=
1253       std::find_if(paths.begin(), paths.end(), [&](const std::string &path) {
1254         return get_size(path) != size;
1255       });
1256 
1257   if (it == paths.end())
1258     return true;
1259 
1260   ib::error() << "Log file " << *it << " is of different size "
1261               << get_size(*it) << " bytes than other log files " << size
1262               << " bytes!";
1263   return false;
1264 }
1265 
1266 /** Calculate the checksum for a log block using the pre-10.2.2 algorithm. */
log_block_calc_checksum_format_0(const byte * b)1267 inline uint32_t log_block_calc_checksum_format_0(const byte *b)
1268 {
1269   uint32_t sum= 1;
1270   const byte *const end= &b[512 - 4];
1271 
1272   for (uint32_t sh= 0; b < end; )
1273   {
1274     sum&= 0x7FFFFFFFUL;
1275     sum+= uint32_t{*b} << sh++;
1276     sum+= *b++;
1277     if (sh > 24)
1278       sh= 0;
1279   }
1280 
1281   return sum;
1282 }
1283 
1284 /** Determine if a redo log from before MariaDB 10.2.2 is clean.
1285 @return error code
1286 @retval DB_SUCCESS      if the redo log is clean
1287 @retval DB_CORRUPTION   if the redo log is corrupted
1288 @retval DB_ERROR        if the redo log is not empty */
recv_log_recover_pre_10_2()1289 ATTRIBUTE_COLD static dberr_t recv_log_recover_pre_10_2()
1290 {
1291   uint64_t max_no= 0;
1292   byte *buf= log_sys.buf;
1293 
1294   ut_ad(log_sys.log.format == 0);
1295 
1296   if (!redo_file_sizes_are_correct())
1297     return DB_CORRUPTION;
1298 
1299   /** Offset of the first checkpoint checksum */
1300   constexpr uint CHECKSUM_1= 288;
1301   /** Offset of the second checkpoint checksum */
1302   constexpr uint CHECKSUM_2= CHECKSUM_1 + 4;
1303   /** the checkpoint LSN field */
1304   constexpr uint CHECKPOINT_LSN= 8;
1305   /** Most significant bits of the checkpoint offset */
1306   constexpr uint OFFS_HI= CHECKSUM_2 + 12;
1307   /** Least significant bits of the checkpoint offset */
1308   constexpr uint OFFS_LO= 16;
1309 
1310   lsn_t lsn= 0;
1311 
1312   for (ulint field= LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1313        field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1)
1314   {
1315     log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
1316 
1317     if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1)) !=
1318         mach_read_from_4(buf + CHECKSUM_1) ||
1319         static_cast<uint32_t>(ut_fold_binary(buf + CHECKPOINT_LSN,
1320                                              CHECKSUM_2 - CHECKPOINT_LSN)) !=
1321         mach_read_from_4(buf + CHECKSUM_2))
1322      {
1323        DBUG_LOG("ib_log", "invalid pre-10.2.2 checkpoint " << field);
1324        continue;
1325      }
1326 
1327     if (!log_crypt_101_read_checkpoint(buf))
1328     {
1329       ib::error() << "Decrypting checkpoint failed";
1330       continue;
1331     }
1332 
1333     const uint64_t checkpoint_no= mach_read_from_8(buf);
1334 
1335     DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found",
1336                           checkpoint_no,
1337                           mach_read_from_8(buf + CHECKPOINT_LSN)));
1338 
1339     if (checkpoint_no >= max_no)
1340     {
1341       max_no= checkpoint_no;
1342       lsn= mach_read_from_8(buf + CHECKPOINT_LSN);
1343       log_sys.log.set_lsn(lsn);
1344       log_sys.log.set_lsn_offset(lsn_t{mach_read_from_4(buf + OFFS_HI)} << 32 |
1345                                  mach_read_from_4(buf + OFFS_LO));
1346     }
1347   }
1348 
1349   if (!lsn)
1350   {
1351     ib::error() << "Upgrade after a crash is not supported."
1352             " This redo log was created before MariaDB 10.2.2,"
1353             " and we did not find a valid checkpoint."
1354             " Please follow the instructions at"
1355             " https://mariadb.com/kb/en/library/upgrading/";
1356     return DB_ERROR;
1357   }
1358 
1359   log_sys.set_lsn(lsn);
1360   log_sys.set_flushed_lsn(lsn);
1361   const lsn_t source_offset= log_sys.log.calc_lsn_offset_old(lsn);
1362 
1363   static constexpr char NO_UPGRADE_RECOVERY_MSG[]=
1364     "Upgrade after a crash is not supported."
1365     " This redo log was created before MariaDB 10.2.2";
1366 
1367   recv_sys.read(source_offset & ~511, {buf, 512});
1368 
1369   if (log_block_calc_checksum_format_0(buf) != log_block_get_checksum(buf) &&
1370       !log_crypt_101_read_block(buf, lsn))
1371   {
1372     ib::error() << NO_UPGRADE_RECOVERY_MSG << ", and it appears corrupted.";
1373     return DB_CORRUPTION;
1374   }
1375 
1376   if (mach_read_from_2(buf + 4) == (source_offset & 511))
1377   {
1378     /* Mark the redo log for upgrading. */
1379     srv_log_file_size= 0;
1380     recv_sys.parse_start_lsn= recv_sys.recovered_lsn= recv_sys.scanned_lsn=
1381       recv_sys.mlog_checkpoint_lsn = lsn;
1382     log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn=
1383       log_sys.write_lsn= log_sys.current_flush_lsn= lsn;
1384     log_sys.next_checkpoint_no= 0;
1385     return DB_SUCCESS;
1386   }
1387 
1388   if (buf[20 + 32 * 9] == 2)
1389     ib::error() << "Cannot decrypt log for upgrading."
1390                    " The encrypted log was created before MariaDB 10.2.2.";
1391   else
1392     ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1393 
1394   return DB_ERROR;
1395 }
1396 
1397 /** Calculate the offset of a log sequence number
1398 in an old redo log file (during upgrade check).
1399 @param[in]	lsn	log sequence number
1400 @return byte offset within the log */
calc_lsn_offset_old(lsn_t lsn) const1401 inline lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const
1402 {
1403   const lsn_t size= capacity() * recv_sys.files_size();
1404   lsn_t l= lsn - this->lsn;
1405   if (longlong(l) < 0)
1406   {
1407     l= lsn_t(-longlong(l)) % size;
1408     l= size - l;
1409   }
1410 
1411   l+= lsn_offset - LOG_FILE_HDR_SIZE * (1 + lsn_offset / file_size);
1412   l%= size;
1413   return l + LOG_FILE_HDR_SIZE * (1 + l / (file_size - LOG_FILE_HDR_SIZE));
1414 }
1415 
1416 /** Determine if a redo log from MariaDB 10.2.2+, 10.3, or 10.4 is clean.
1417 @return	error code
1418 @retval	DB_SUCCESS	if the redo log is clean
1419 @retval	DB_CORRUPTION	if the redo log is corrupted
1420 @retval	DB_ERROR	if the redo log is not empty */
recv_log_recover_10_4()1421 static dberr_t recv_log_recover_10_4()
1422 {
1423 	const lsn_t	lsn = log_sys.log.get_lsn();
1424 	const lsn_t	source_offset =	log_sys.log.calc_lsn_offset_old(lsn);
1425 	byte*		buf = log_sys.buf;
1426 
1427 	if (!redo_file_sizes_are_correct()) {
1428 		return DB_CORRUPTION;
1429 	}
1430 
1431 	recv_sys.read(source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1),
1432 		      {buf, OS_FILE_LOG_BLOCK_SIZE});
1433 
1434 	ulint crc = log_block_calc_checksum_crc32(buf);
1435 	ulint cksum = log_block_get_checksum(buf);
1436 
1437 	if (UNIV_UNLIKELY(crc != cksum)) {
1438 		ib::error() << "Invalid log block checksum."
1439 			    << " block: "
1440 			    << log_block_get_hdr_no(buf)
1441 			    << " checkpoint no: "
1442 			    << log_block_get_checkpoint_no(buf)
1443 			    << " expected: " << crc
1444 			    << " found: " << cksum;
1445 		return DB_CORRUPTION;
1446 	}
1447 
1448 	if (log_sys.log.is_encrypted()
1449 	    && !log_crypt(buf, lsn & ~511, 512, LOG_DECRYPT)) {
1450 		return DB_ERROR;
1451 	}
1452 
1453 	/* On a clean shutdown, the redo log will be logically empty
1454 	after the checkpoint lsn. */
1455 
1456 	if (log_block_get_data_len(buf)
1457 	    != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1458 		return DB_ERROR;
1459 	}
1460 
1461 	/* Mark the redo log for upgrading. */
1462 	srv_log_file_size = 0;
1463 	recv_sys.parse_start_lsn = recv_sys.recovered_lsn
1464 		= recv_sys.scanned_lsn
1465 		= recv_sys.mlog_checkpoint_lsn = lsn;
1466 	log_sys.set_lsn(lsn);
1467 	log_sys.set_flushed_lsn(lsn);
1468 	log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1469 		= log_sys.write_lsn = log_sys.current_flush_lsn = lsn;
1470 	log_sys.next_checkpoint_no = 0;
1471 	return DB_SUCCESS;
1472 }
1473 
1474 /** Find the latest checkpoint in the log header.
1475 @param[out]	max_field	LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1476 @return error code or DB_SUCCESS */
1477 dberr_t
recv_find_max_checkpoint(ulint * max_field)1478 recv_find_max_checkpoint(ulint* max_field)
1479 {
1480 	ib_uint64_t	max_no;
1481 	ib_uint64_t	checkpoint_no;
1482 	ulint		field;
1483 	byte*		buf;
1484 
1485 	max_no = 0;
1486 	*max_field = 0;
1487 
1488 	buf = log_sys.checkpoint_buf;
1489 
1490 	log_sys.log.read(0, {buf, OS_FILE_LOG_BLOCK_SIZE});
1491 	/* Check the header page checksum. There was no
1492 	checksum in the first redo log format (version 0). */
1493 	log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1494 	log_sys.log.subformat = log_sys.log.format != log_t::FORMAT_3_23
1495 		? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1496 		: 0;
1497 	if (log_sys.log.format != log_t::FORMAT_3_23
1498 	    && !recv_check_log_header_checksum(buf)) {
1499 		ib::error() << "Invalid redo log header checksum.";
1500 		return(DB_CORRUPTION);
1501 	}
1502 
1503 	char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1504 
1505 	memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1506 	/* Ensure that the string is NUL-terminated. */
1507 	creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1508 
1509 	switch (log_sys.log.format) {
1510 	case log_t::FORMAT_3_23:
1511 		return recv_log_recover_pre_10_2();
1512 	case log_t::FORMAT_10_2:
1513 	case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
1514 	case log_t::FORMAT_10_3:
1515 	case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
1516 	case log_t::FORMAT_10_4:
1517 	case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
1518 	case log_t::FORMAT_10_5:
1519 	case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED:
1520 		break;
1521 	default:
1522 		ib::error() << "Unsupported redo log format."
1523 			" The redo log was created with " << creator << ".";
1524 		return(DB_ERROR);
1525 	}
1526 
1527 	for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1528 	     field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1529 		log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
1530 
1531 		const ulint crc32 = log_block_calc_checksum_crc32(buf);
1532 		const ulint cksum = log_block_get_checksum(buf);
1533 
1534 		if (crc32 != cksum) {
1535 			DBUG_PRINT("ib_log",
1536 				   ("invalid checkpoint,"
1537 				    " at " ULINTPF
1538 				    ", checksum " ULINTPFx
1539 				    " expected " ULINTPFx,
1540 				    field, cksum, crc32));
1541 			continue;
1542 		}
1543 
1544 		if (log_sys.is_encrypted()
1545 		    && !log_crypt_read_checkpoint_buf(buf)) {
1546 			ib::error() << "Reading checkpoint"
1547 				" encryption info failed.";
1548 			continue;
1549 		}
1550 
1551 		checkpoint_no = mach_read_from_8(
1552 			buf + LOG_CHECKPOINT_NO);
1553 
1554 		DBUG_PRINT("ib_log",
1555 			   ("checkpoint " UINT64PF " at " LSN_PF " found",
1556 			    checkpoint_no, mach_read_from_8(
1557 				    buf + LOG_CHECKPOINT_LSN)));
1558 
1559 		if (checkpoint_no >= max_no) {
1560 			*max_field = field;
1561 			max_no = checkpoint_no;
1562 			log_sys.log.set_lsn(mach_read_from_8(
1563 				buf + LOG_CHECKPOINT_LSN));
1564 			log_sys.log.set_lsn_offset(mach_read_from_8(
1565 				buf + LOG_CHECKPOINT_OFFSET));
1566 			log_sys.next_checkpoint_no = checkpoint_no;
1567 		}
1568 	}
1569 
1570 	if (*max_field == 0) {
1571 		/* Before 10.2.2, we could get here during database
1572 		initialization if we created an LOG_FILE_NAME file that
1573 		was filled with zeroes, and were killed. After
1574 		10.2.2, we would reject such a file already earlier,
1575 		when checking the file header. */
1576 		ib::error() << "No valid checkpoint found"
1577 			" (corrupted redo log)."
1578 			" You can try --innodb-force-recovery=6"
1579 			" as a last resort.";
1580 		return(DB_ERROR);
1581 	}
1582 
1583 	switch (log_sys.log.format) {
1584 	case log_t::FORMAT_10_5:
1585 	case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED:
1586 		break;
1587 	default:
1588 		if (dberr_t err = recv_log_recover_10_4()) {
1589 			ib::error()
1590 				<< "Upgrade after a crash is not supported."
1591 				" The redo log was created with " << creator
1592 				<< (err == DB_ERROR
1593 				    ? "." : ", and it appears corrupted.");
1594 			return err;
1595 		}
1596 	}
1597 
1598 	return(DB_SUCCESS);
1599 }
1600 
1601 /*******************************************************//**
1602 Calculates the new value for lsn when more data is added to the log. */
1603 static
1604 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)1605 recv_calc_lsn_on_data_add(
1606 /*======================*/
1607 	lsn_t		lsn,	/*!< in: old lsn */
1608 	ib_uint64_t	len)	/*!< in: this many bytes of data is
1609 				added, log block headers not included */
1610 {
1611 	unsigned frag_len = static_cast<unsigned>(lsn % OS_FILE_LOG_BLOCK_SIZE)
1612 		- LOG_BLOCK_HDR_SIZE;
1613 	unsigned payload_size = log_sys.payload_size();
1614 	ut_ad(frag_len < payload_size);
1615 	lsn_t lsn_len = len;
1616 	lsn_len += (lsn_len + frag_len) / payload_size
1617 		* (OS_FILE_LOG_BLOCK_SIZE - payload_size);
1618 
1619 	return(lsn + lsn_len);
1620 }
1621 
1622 /** Trim old log records for a page.
1623 @param start_lsn oldest log sequence number to preserve
1624 @return whether all the log for the page was trimmed */
trim(lsn_t start_lsn)1625 inline bool page_recv_t::trim(lsn_t start_lsn)
1626 {
1627   while (log.head)
1628   {
1629     if (log.head->lsn >= start_lsn) return false;
1630     last_offset= 1; /* the next record must not be same_page */
1631     log_rec_t *next= log.head->next;
1632     recv_sys.free(log.head);
1633     log.head= next;
1634   }
1635   log.tail= nullptr;
1636   return true;
1637 }
1638 
1639 
clear()1640 inline void page_recv_t::recs_t::clear()
1641 {
1642   ut_ad(mutex_own(&recv_sys.mutex));
1643   for (const log_rec_t *l= head; l; )
1644   {
1645     const log_rec_t *next= l->next;
1646     recv_sys.free(l);
1647     l= next;
1648   }
1649   head= tail= nullptr;
1650 }
1651 
1652 
1653 /** Ignore any earlier redo log records for this page. */
will_not_read()1654 inline void page_recv_t::will_not_read()
1655 {
1656   ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
1657   state= RECV_WILL_NOT_READ;
1658   log.clear();
1659 }
1660 
1661 
1662 /** Register a redo log snippet for a page.
1663 @param it       page iterator
1664 @param start_lsn start LSN of the mini-transaction
1665 @param lsn      @see mtr_t::commit_lsn()
1666 @param recs     redo log snippet @see log_t::FORMAT_10_5
1667 @param len      length of l, in bytes */
add(map::iterator it,lsn_t start_lsn,lsn_t lsn,const byte * l,size_t len)1668 inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
1669                             const byte *l, size_t len)
1670 {
1671   ut_ad(mutex_own(&mutex));
1672   page_id_t page_id = it->first;
1673   page_recv_t &recs= it->second;
1674 
1675   switch (*l & 0x70) {
1676   case FREE_PAGE: case INIT_PAGE:
1677     recs.will_not_read();
1678     mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */
1679     /* fall through */
1680   default:
1681     log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last());
1682     if (!tail)
1683       break;
1684     if (tail->start_lsn != start_lsn)
1685       break;
1686     ut_ad(tail->lsn == lsn);
1687     buf_block_t *block= UT_LIST_GET_LAST(blocks);
1688     ut_ad(block);
1689     const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1;
1690     ut_ad(used >= ALIGNMENT);
1691     const byte *end= const_cast<const log_phys_t*>(tail)->end();
1692     if (!((reinterpret_cast<size_t>(end + len) ^
1693            reinterpret_cast<size_t>(end)) & ~(ALIGNMENT - 1)))
1694     {
1695       /* Use already allocated 'padding' bytes */
1696 append:
1697       MEM_MAKE_ADDRESSABLE(end + 1, len);
1698       /* Append to the preceding record for the page */
1699       tail->append(l, len);
1700       return;
1701     }
1702     if (end <= &block->frame[used - ALIGNMENT] || &block->frame[used] >= end)
1703       break; /* Not the last allocated record in the page */
1704     const size_t new_used= static_cast<size_t>(end - block->frame + len + 1);
1705     ut_ad(new_used > used);
1706     if (new_used > srv_page_size)
1707       break;
1708     block->page.access_time= (block->page.access_time & ~0U << 16) |
1709       ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT);
1710     goto append;
1711   }
1712   recs.log.append(new (alloc(log_phys_t::alloc_size(len)))
1713                   log_phys_t(start_lsn, lsn, l, len));
1714 }
1715 
1716 /** Store/remove the freed pages in fil_name_t of recv_spaces.
1717 @param[in]	page_id		freed or init page_id
1718 @param[in]	freed		TRUE if page is freed */
store_freed_or_init_rec(page_id_t page_id,bool freed)1719 static void store_freed_or_init_rec(page_id_t page_id, bool freed)
1720 {
1721   uint32_t space_id= page_id.space();
1722   uint32_t page_no= page_id.page_no();
1723   if (is_predefined_tablespace(space_id))
1724   {
1725     if (!srv_immediate_scrub_data_uncompressed)
1726       return;
1727     fil_space_t *space;
1728     if (space_id == TRX_SYS_SPACE)
1729       space= fil_system.sys_space;
1730     else
1731       space= fil_space_get(space_id);
1732 
1733     space->free_page(page_no, freed);
1734     return;
1735   }
1736 
1737   recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
1738   if (i != recv_spaces.end() && i->first == space_id)
1739   {
1740     if (freed)
1741       i->second.add_freed_page(page_no);
1742     else
1743       i->second.remove_freed_page(page_no);
1744   }
1745 }
1746 
1747 /** Parse and register one mini-transaction in log_t::FORMAT_10_5.
1748 @param checkpoint_lsn  the log sequence number of the latest checkpoint
1749 @param store           whether to store the records
1750 @param apply           whether to apply file-level log records
1751 @return whether FILE_CHECKPOINT record was seen the first time,
1752 or corruption was noticed */
parse(lsn_t checkpoint_lsn,store_t * store,bool apply)1753 bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
1754 {
1755   mysql_mutex_assert_owner(&log_sys.mutex);
1756   ut_ad(mutex_own(&mutex));
1757   ut_ad(parse_start_lsn);
1758   ut_ad(log_sys.is_physical());
1759 
1760   bool last_phase= (*store == STORE_IF_EXISTS);
1761   const byte *const end= buf + len;
1762 loop:
1763   const byte *const log= buf + recovered_offset;
1764   const lsn_t start_lsn= recovered_lsn;
1765   map::iterator cached_pages_it = pages.end();
1766 
1767   /* Check that the entire mini-transaction is included within the buffer */
1768   const byte *l;
1769   uint32_t rlen;
1770   for (l= log; l < end; l+= rlen)
1771   {
1772     if (!*l)
1773       goto eom_found;
1774     if (UNIV_LIKELY((*l & 0x70) != RESERVED));
1775     else if (srv_force_recovery)
1776       ib::warn() << "Ignoring unknown log record at LSN " << recovered_lsn;
1777     else
1778     {
1779 malformed:
1780       ib::error() << "Malformed log record;"
1781                      " set innodb_force_recovery=1 to ignore.";
1782 corrupted:
1783       const size_t trailing_bytes= std::min<size_t>(100, size_t(end - l));
1784       ib::info() << "Dump from the start of the mini-transaction (LSN="
1785                  << start_lsn << ") to "
1786                  << trailing_bytes << " bytes after the record:";
1787       ut_print_buf(stderr, log, l - log + trailing_bytes);
1788       putc('\n', stderr);
1789       found_corrupt_log= true;
1790       return true;
1791     }
1792     rlen= *l++ & 0xf;
1793     if (l + (rlen ? rlen : 16) >= end)
1794       break;
1795     if (!rlen)
1796     {
1797       rlen= mlog_decode_varint_length(*l);
1798       if (l + rlen >= end)
1799         break;
1800       const uint32_t addlen= mlog_decode_varint(l);
1801       if (UNIV_UNLIKELY(addlen == MLOG_DECODE_ERROR))
1802       {
1803         ib::error() << "Corrupted record length";
1804         goto corrupted;
1805       }
1806       rlen= addlen + 15;
1807     }
1808   }
1809 
1810   /* Not the entire mini-transaction was present. */
1811   return false;
1812 
1813 eom_found:
1814   ut_ad(!*l);
1815   ut_d(const byte *const el= l + 1);
1816 
1817   const lsn_t end_lsn= recv_calc_lsn_on_data_add(start_lsn, l + 1 - log);
1818   if (UNIV_UNLIKELY(end_lsn > scanned_lsn))
1819     /* The log record filled a log block, and we require that also the
1820     next log block should have been scanned in */
1821     return false;
1822 
1823   ut_d(std::set<page_id_t> freed);
1824 #if 0 && defined UNIV_DEBUG /* MDEV-21727 FIXME: enable this */
1825   /* Pages that have been modified in this mini-transaction.
1826   If a mini-transaction writes INIT_PAGE for a page, it should not have
1827   written any log records for the page. Unfortunately, this does not
1828   hold for ROW_FORMAT=COMPRESSED pages, because page_zip_compress()
1829   can be invoked in a pessimistic operation, even after log has
1830   been written for other pages. */
1831   ut_d(std::set<page_id_t> modified);
1832 #endif
1833 
1834   uint32_t space_id= 0, page_no= 0, last_offset= 0;
1835   bool got_page_op= false;
1836   for (l= log; l < end; l+= rlen)
1837   {
1838     const byte *const recs= l;
1839     const byte b= *l++;
1840 
1841     if (!b)
1842       break;
1843     ut_ad(UNIV_LIKELY(b & 0x70) != RESERVED || srv_force_recovery);
1844     rlen= b & 0xf;
1845     ut_ad(l + rlen < end);
1846     ut_ad(rlen || l + 16 < end);
1847     if (!rlen)
1848     {
1849       const uint32_t lenlen= mlog_decode_varint_length(*l);
1850       ut_ad(l + lenlen < end);
1851       const uint32_t addlen= mlog_decode_varint(l);
1852       ut_ad(addlen != MLOG_DECODE_ERROR);
1853       rlen= addlen + 15 - lenlen;
1854       l+= lenlen;
1855     }
1856     ut_ad(l + rlen < end);
1857     uint32_t idlen;
1858     if ((b & 0x80) && got_page_op)
1859     {
1860       /* This record is for the same page as the previous one. */
1861       if (UNIV_UNLIKELY((b & 0x70) <= INIT_PAGE))
1862       {
1863 record_corrupted:
1864         /* FREE_PAGE,INIT_PAGE cannot be with same_page flag */
1865         if (!srv_force_recovery)
1866           goto malformed;
1867         ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn;
1868         last_offset= 1; /* the next record must not be same_page  */
1869         continue;
1870       }
1871       goto same_page;
1872     }
1873     last_offset= 0;
1874     idlen= mlog_decode_varint_length(*l);
1875     if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen))
1876     {
1877 page_id_corrupted:
1878       if (!srv_force_recovery)
1879       {
1880         ib::error() << "Corrupted page identifier at " << recovered_lsn
1881                     << "; set innodb_force_recovery=1 to ignore the record.";
1882         goto corrupted;
1883       }
1884       ib::warn() << "Ignoring corrupted page identifier at LSN "
1885                  << recovered_lsn;
1886       continue;
1887     }
1888     space_id= mlog_decode_varint(l);
1889     if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR))
1890       goto page_id_corrupted;
1891     l+= idlen;
1892     rlen-= idlen;
1893     idlen= mlog_decode_varint_length(*l);
1894     if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen))
1895       goto page_id_corrupted;
1896     page_no= mlog_decode_varint(l);
1897     if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR))
1898       goto page_id_corrupted;
1899     l+= idlen;
1900     rlen-= idlen;
1901     got_page_op = !(b & 0x80);
1902     if (got_page_op && apply && !is_predefined_tablespace(space_id))
1903     {
1904       recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
1905       if (i != recv_spaces.end() && i->first == space_id);
1906       else if (recovered_lsn < mlog_checkpoint_lsn)
1907         /* We have not seen all records between the checkpoint and
1908         FILE_CHECKPOINT. There should be a FILE_DELETE for this
1909         tablespace later. */
1910         recv_spaces.emplace_hint(i, space_id, file_name_t("", false));
1911       else
1912       {
1913         const page_id_t id(space_id, page_no);
1914         if (!srv_force_recovery)
1915         {
1916           ib::error() << "Missing FILE_DELETE or FILE_MODIFY for " << id
1917                       << " at " << recovered_lsn
1918                       << "; set innodb_force_recovery=1 to ignore the record.";
1919           goto corrupted;
1920         }
1921         ib::warn() << "Ignoring record for " << id << " at " << recovered_lsn;
1922         continue;
1923       }
1924     }
1925 same_page:
1926     DBUG_PRINT("ib_log",
1927                ("scan " LSN_PF ": rec %x len %zu page %u:%u",
1928                 recovered_lsn, b, static_cast<size_t>(l + rlen - recs),
1929                 space_id, page_no));
1930 
1931     if (got_page_op)
1932     {
1933       const page_id_t id(space_id, page_no);
1934       ut_d(if ((b & 0x70) == INIT_PAGE) freed.erase(id));
1935       ut_ad(freed.find(id) == freed.end());
1936       switch (b & 0x70) {
1937       case FREE_PAGE:
1938         ut_ad(freed.emplace(id).second);
1939         last_offset= 1; /* the next record must not be same_page  */
1940         goto free_or_init_page;
1941       case INIT_PAGE:
1942         last_offset= FIL_PAGE_TYPE;
1943       free_or_init_page:
1944         store_freed_or_init_rec(id, (b & 0x70) == FREE_PAGE);
1945         if (UNIV_UNLIKELY(rlen != 0))
1946           goto record_corrupted;
1947         break;
1948       case EXTENDED:
1949         if (UNIV_UNLIKELY(!rlen))
1950           goto record_corrupted;
1951         if (rlen == 1 && *l == TRIM_PAGES)
1952         {
1953 #if 0 /* For now, we can only truncate an undo log tablespace */
1954           if (UNIV_UNLIKELY(!space_id || !page_no))
1955             goto record_corrupted;
1956 #else
1957           if (!srv_is_undo_tablespace(space_id) ||
1958               page_no != SRV_UNDO_TABLESPACE_SIZE_IN_PAGES)
1959             goto record_corrupted;
1960           static_assert(UT_ARR_SIZE(truncated_undo_spaces) ==
1961                         TRX_SYS_MAX_UNDO_SPACES, "compatibility");
1962           truncated_undo_spaces[space_id - srv_undo_space_id_start]=
1963             { recovered_lsn, page_no };
1964 #endif
1965           last_offset= 1; /* the next record must not be same_page  */
1966           continue;
1967         }
1968         last_offset= FIL_PAGE_TYPE;
1969         break;
1970       case RESERVED:
1971       case OPTION:
1972         continue;
1973       case WRITE:
1974       case MEMMOVE:
1975       case MEMSET:
1976         if (UNIV_UNLIKELY(rlen == 0 || last_offset == 1))
1977           goto record_corrupted;
1978         const uint32_t olen= mlog_decode_varint_length(*l);
1979         if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3))
1980           goto record_corrupted;
1981         const uint32_t offset= mlog_decode_varint(l);
1982         ut_ad(offset != MLOG_DECODE_ERROR);
1983         static_assert(FIL_PAGE_OFFSET == 4, "compatibility");
1984         if (UNIV_UNLIKELY(offset >= srv_page_size))
1985           goto record_corrupted;
1986         last_offset+= offset;
1987         if (UNIV_UNLIKELY(last_offset < 8 || last_offset >= srv_page_size))
1988           goto record_corrupted;
1989         l+= olen;
1990         rlen-= olen;
1991         if ((b & 0x70) == WRITE)
1992         {
1993           if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size))
1994             goto record_corrupted;
1995           if (UNIV_UNLIKELY(!page_no) && apply)
1996           {
1997             const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE &&
1998               last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4;
1999             const bool has_flags= last_offset <=
2000               FSP_HEADER_OFFSET + FSP_SPACE_FLAGS &&
2001               last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + 4;
2002             if (has_size || has_flags)
2003             {
2004               recv_spaces_t::iterator it= recv_spaces.find(space_id);
2005               const uint32_t size= has_size
2006                 ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SIZE + l -
2007                                    last_offset)
2008                 : 0;
2009               const uint32_t flags= has_flags
2010                 ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + l -
2011                                    last_offset)
2012                 : file_name_t::initial_flags;
2013               if (it == recv_spaces.end())
2014                 ut_ad(!mlog_checkpoint_lsn || space_id == TRX_SYS_SPACE ||
2015                       srv_is_undo_tablespace(space_id));
2016               else if (!it->second.space)
2017               {
2018                 if (has_size)
2019                   it->second.size= size;
2020                 if (has_flags)
2021                   it->second.flags= flags;
2022               }
2023               fil_space_set_recv_size_and_flags(space_id, size, flags);
2024             }
2025           }
2026           last_offset+= rlen;
2027           break;
2028         }
2029         uint32_t llen= mlog_decode_varint_length(*l);
2030         if (UNIV_UNLIKELY(llen > rlen || llen > 3))
2031           goto record_corrupted;
2032         const uint32_t len= mlog_decode_varint(l);
2033         ut_ad(len != MLOG_DECODE_ERROR);
2034         if (UNIV_UNLIKELY(last_offset + len > srv_page_size))
2035           goto record_corrupted;
2036         l+= llen;
2037         rlen-= llen;
2038         llen= len;
2039         if ((b & 0x70) == MEMSET)
2040         {
2041           if (UNIV_UNLIKELY(rlen > llen))
2042             goto record_corrupted;
2043           last_offset+= llen;
2044           break;
2045         }
2046         const uint32_t slen= mlog_decode_varint_length(*l);
2047         if (UNIV_UNLIKELY(slen != rlen || slen > 3))
2048           goto record_corrupted;
2049         uint32_t s= mlog_decode_varint(l);
2050         ut_ad(slen != MLOG_DECODE_ERROR);
2051         if (s & 1)
2052           s= last_offset - (s >> 1) - 1;
2053         else
2054           s= last_offset + (s >> 1) + 1;
2055         if (UNIV_UNLIKELY(s < 8 || s + llen > srv_page_size))
2056           goto record_corrupted;
2057         last_offset+= llen;
2058         break;
2059       }
2060 #if 0 && defined UNIV_DEBUG
2061       switch (b & 0x70) {
2062       case RESERVED:
2063       case OPTION:
2064         ut_ad(0); /* we did "continue" earlier */
2065         break;
2066       case FREE_PAGE:
2067         break;
2068       default:
2069         ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
2070       }
2071 #endif
2072       const bool is_init= (b & 0x70) <= INIT_PAGE;
2073       switch (*store) {
2074       case STORE_IF_EXISTS:
2075         if (fil_space_t *space= fil_space_t::get(space_id))
2076         {
2077           const auto size= space->get_size();
2078           space->release();
2079           if (!size)
2080             continue;
2081         }
2082         else
2083           continue;
2084         /* fall through */
2085       case STORE_YES:
2086         if (!mlog_init.will_avoid_read(id, start_lsn))
2087         {
2088           if (cached_pages_it == pages.end() || cached_pages_it->first != id)
2089             cached_pages_it= pages.emplace(id, page_recv_t()).first;
2090           add(cached_pages_it, start_lsn, end_lsn, recs,
2091               static_cast<size_t>(l + rlen - recs));
2092         }
2093         continue;
2094       case STORE_NO:
2095         if (!is_init)
2096           continue;
2097         mlog_init.add(id, start_lsn);
2098         map::iterator i= pages.find(id);
2099         if (i == pages.end())
2100           continue;
2101         i->second.log.clear();
2102         pages.erase(i);
2103       }
2104     }
2105     else if (rlen)
2106     {
2107       switch (b & 0xf0) {
2108       case FILE_CHECKPOINT:
2109         if (space_id == 0 && page_no == 0 && rlen == 8)
2110         {
2111           const lsn_t lsn= mach_read_from_8(l);
2112 
2113           if (UNIV_UNLIKELY(srv_print_verbose_log == 2))
2114             fprintf(stderr, "FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF "\n",
2115                     lsn, lsn != checkpoint_lsn
2116                     ? "ignored"
2117                     : mlog_checkpoint_lsn ? "reread" : "read",
2118                     recovered_lsn);
2119 
2120           DBUG_PRINT("ib_log", ("FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF,
2121                                 lsn, lsn != checkpoint_lsn
2122                                 ? "ignored"
2123                                 : mlog_checkpoint_lsn ? "reread" : "read",
2124                                 recovered_lsn));
2125 
2126           if (lsn == checkpoint_lsn)
2127           {
2128             /* There can be multiple FILE_CHECKPOINT for the same LSN. */
2129             if (mlog_checkpoint_lsn)
2130               continue;
2131             mlog_checkpoint_lsn= recovered_lsn;
2132             l+= 8;
2133             recovered_offset= l - buf;
2134             return true;
2135           }
2136           continue;
2137         }
2138         /* fall through */
2139       default:
2140         if (!srv_force_recovery)
2141           goto malformed;
2142         ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn;
2143         continue;
2144       case FILE_DELETE:
2145       case FILE_MODIFY:
2146       case FILE_RENAME:
2147         if (UNIV_UNLIKELY(page_no != 0))
2148         {
2149         file_rec_error:
2150           if (!srv_force_recovery)
2151           {
2152             ib::error() << "Corrupted file-level record;"
2153                            " set innodb_force_recovery=1 to ignore.";
2154             goto corrupted;
2155           }
2156 
2157           ib::warn() << "Ignoring corrupted file-level record at LSN "
2158                      << recovered_lsn;
2159           continue;
2160         }
2161         /* fall through */
2162       case FILE_CREATE:
2163         if (UNIV_UNLIKELY(!space_id || page_no))
2164           goto file_rec_error;
2165         /* There is no terminating NUL character. Names must end in .ibd.
2166         For FILE_RENAME, there is a NUL between the two file names. */
2167         const char * const fn= reinterpret_cast<const char*>(l);
2168         const char *fn2= static_cast<const char*>(memchr(fn, 0, rlen));
2169 
2170         if (UNIV_UNLIKELY((fn2 == nullptr) == ((b & 0xf0) == FILE_RENAME)))
2171           goto file_rec_error;
2172 
2173         const char * const fnend= fn2 ? fn2 : fn + rlen;
2174         const char * const fn2end= fn2 ? fn + rlen : nullptr;
2175 
2176         if (fn2)
2177         {
2178           fn2++;
2179           if (memchr(fn2, 0, fn2end - fn2))
2180             goto file_rec_error;
2181           if (fn2end - fn2 < 4 || memcmp(fn2end - 4, DOT_IBD, 4))
2182             goto file_rec_error;
2183         }
2184 
2185         if (is_predefined_tablespace(space_id))
2186           goto file_rec_error;
2187         if (fnend - fn < 4 || memcmp(fnend - 4, DOT_IBD, 4))
2188           goto file_rec_error;
2189 
2190         const char saved_end= fn[rlen];
2191         const_cast<char&>(fn[rlen])= '\0';
2192         fil_name_process(const_cast<char*>(fn), fnend - fn, space_id,
2193                          (b & 0xf0) == FILE_DELETE);
2194         if (fn2)
2195           fil_name_process(const_cast<char*>(fn2), fn2end - fn2, space_id,
2196                            false);
2197         if ((b & 0xf0) < FILE_MODIFY && log_file_op)
2198           log_file_op(space_id, (b & 0xf0) == FILE_CREATE,
2199                       l, static_cast<ulint>(fnend - fn),
2200                       reinterpret_cast<const byte*>(fn2),
2201                       fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
2202         const_cast<char&>(fn[rlen])= saved_end;
2203 
2204         if (fn2 && apply)
2205         {
2206           const size_t len= fn2end - fn2;
2207           auto r= renamed_spaces.emplace(space_id, std::string{fn2, len});
2208           if (!r.second)
2209             r.first->second= std::string{fn2, len};
2210         }
2211         if (UNIV_UNLIKELY(found_corrupt_fs))
2212           return true;
2213       }
2214     }
2215     else
2216       goto malformed;
2217   }
2218 
2219   ut_ad(l == el);
2220   recovered_offset= l - buf;
2221   recovered_lsn= end_lsn;
2222   if (is_memory_exhausted(store) && last_phase)
2223     return false;
2224   goto loop;
2225 }
2226 
2227 /** Apply the hashed log records to the page, if the page lsn is less than the
2228 lsn of a log record.
2229 @param[in,out]	block		buffer pool page
2230 @param[in,out]	mtr		mini-transaction
2231 @param[in,out]	p		recovery address
2232 @param[in,out]	space		tablespace, or NULL if not looked up yet
2233 @param[in,out]	init		page initialization operation, or NULL */
recv_recover_page(buf_block_t * block,mtr_t & mtr,const recv_sys_t::map::iterator & p,fil_space_t * space=NULL,mlog_init_t::init * init=NULL)2234 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
2235 			      const recv_sys_t::map::iterator& p,
2236 			      fil_space_t* space = NULL,
2237 			      mlog_init_t::init* init = NULL)
2238 {
2239 	ut_ad(mutex_own(&recv_sys.mutex));
2240 	ut_ad(recv_sys.apply_log_recs);
2241 	ut_ad(recv_needed_recovery);
2242 	ut_ad(!init || init->created);
2243 	ut_ad(!init || init->lsn);
2244 	ut_ad(block->page.id() == p->first);
2245 	ut_ad(!p->second.is_being_processed());
2246 	ut_ad(!space || space->id == block->page.id().space());
2247 	ut_ad(log_sys.is_physical());
2248 
2249 	if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2250 		ib::info() << "Applying log to page " << block->page.id();
2251 	}
2252 
2253 	DBUG_PRINT("ib_log", ("Applying log to page %u:%u",
2254 			      block->page.id().space(),
2255 			      block->page.id().page_no()));
2256 
2257 	p->second.state = page_recv_t::RECV_BEING_PROCESSED;
2258 
2259 	mutex_exit(&recv_sys.mutex);
2260 
2261 	byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
2262 		? block->page.zip.data
2263 		: block->frame;
2264 	const lsn_t page_lsn = init
2265 		? 0
2266 		: mach_read_from_8(frame + FIL_PAGE_LSN);
2267 	bool free_page = false;
2268 	lsn_t start_lsn = 0, end_lsn = 0;
2269 	ut_d(lsn_t recv_start_lsn = 0);
2270 	const lsn_t init_lsn = init ? init->lsn : 0;
2271 
2272 	bool skipped_after_init = false;
2273 
2274 	for (const log_rec_t* recv : p->second.log) {
2275 		const log_phys_t* l = static_cast<const log_phys_t*>(recv);
2276 		ut_ad(l->lsn);
2277 		ut_ad(end_lsn <= l->lsn);
2278 		ut_ad(l->lsn <= log_sys.log.scanned_lsn);
2279 
2280 		ut_ad(l->start_lsn);
2281 		ut_ad(recv_start_lsn <= l->start_lsn);
2282 		ut_d(recv_start_lsn = l->start_lsn);
2283 
2284 		if (l->start_lsn < page_lsn) {
2285 			/* This record has already been applied. */
2286 			DBUG_PRINT("ib_log", ("apply skip %u:%u LSN " LSN_PF
2287 					      " < " LSN_PF,
2288 					      block->page.id().space(),
2289 					      block->page.id().page_no(),
2290 					      l->start_lsn, page_lsn));
2291 			skipped_after_init = true;
2292 			end_lsn = l->lsn;
2293 			continue;
2294 		}
2295 
2296 		if (l->start_lsn < init_lsn) {
2297 			DBUG_PRINT("ib_log", ("init skip %u:%u LSN " LSN_PF
2298 					      " < " LSN_PF,
2299 					      block->page.id().space(),
2300 					      block->page.id().page_no(),
2301 					      l->start_lsn, init_lsn));
2302 			skipped_after_init = false;
2303 			end_lsn = l->lsn;
2304 			continue;
2305 		}
2306 
2307 		/* There is no need to check LSN for just initialized pages. */
2308 		if (skipped_after_init) {
2309 			skipped_after_init = false;
2310 			ut_ad(end_lsn == page_lsn);
2311 			if (end_lsn != page_lsn)
2312 				ib::warn()
2313 					<< "The last skipped log record LSN "
2314 					<< end_lsn
2315 					<< " is not equal to page LSN "
2316 					<< page_lsn;
2317 		}
2318 
2319 		end_lsn = l->lsn;
2320 
2321 		if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2322 			ib::info() << "apply " << l->start_lsn
2323 				   << ": " << block->page.id();
2324 		}
2325 
2326 		DBUG_PRINT("ib_log", ("apply " LSN_PF ": %u:%u",
2327 				      l->start_lsn,
2328 				      block->page.id().space(),
2329 				      block->page.id().page_no()));
2330 
2331 		log_phys_t::apply_status a= l->apply(*block,
2332 						     p->second.last_offset);
2333 
2334 		switch (a) {
2335 		case log_phys_t::APPLIED_NO:
2336 			ut_ad(!mtr.has_modifications());
2337 			free_page = true;
2338 			start_lsn = 0;
2339 			continue;
2340 		case log_phys_t::APPLIED_YES:
2341 			goto set_start_lsn;
2342 		case log_phys_t::APPLIED_TO_FSP_HEADER:
2343 		case log_phys_t::APPLIED_TO_ENCRYPTION:
2344 			break;
2345 		}
2346 
2347 		if (fil_space_t* s = space
2348 		    ? space
2349 		    : fil_space_t::get(block->page.id().space())) {
2350 			switch (a) {
2351 			case log_phys_t::APPLIED_TO_FSP_HEADER:
2352 				s->flags = mach_read_from_4(
2353 					FSP_HEADER_OFFSET
2354 					+ FSP_SPACE_FLAGS + frame);
2355 				s->size_in_header = mach_read_from_4(
2356 					FSP_HEADER_OFFSET + FSP_SIZE
2357 					+ frame);
2358 				s->free_limit = mach_read_from_4(
2359 					FSP_HEADER_OFFSET
2360 					+ FSP_FREE_LIMIT + frame);
2361 				s->free_len = mach_read_from_4(
2362 					FSP_HEADER_OFFSET + FSP_FREE
2363 					+ FLST_LEN + frame);
2364 				break;
2365 			default:
2366 				byte* b= frame
2367 					+ fsp_header_get_encryption_offset(
2368 						block->zip_size())
2369 					+ FSP_HEADER_OFFSET;
2370 				if (memcmp(b, CRYPT_MAGIC, MAGIC_SZ)) {
2371 					break;
2372 				}
2373 				b += MAGIC_SZ;
2374 				if (*b != CRYPT_SCHEME_UNENCRYPTED
2375 				    && *b != CRYPT_SCHEME_1) {
2376 					break;
2377 				}
2378 				if (b[1] != MY_AES_BLOCK_SIZE) {
2379 					break;
2380 				}
2381 				if (b[2 + MY_AES_BLOCK_SIZE + 4 + 4]
2382 				    > FIL_ENCRYPTION_OFF) {
2383 					break;
2384 				}
2385 				fil_crypt_parse(s, b);
2386 			}
2387 
2388 			if (!space) {
2389 				s->release();
2390 			}
2391 		}
2392 
2393 set_start_lsn:
2394 		if (recv_sys.found_corrupt_log && !srv_force_recovery) {
2395 			break;
2396 		}
2397 
2398 		if (!start_lsn) {
2399 			start_lsn = l->start_lsn;
2400 		}
2401 	}
2402 
2403 	if (start_lsn) {
2404 		ut_ad(end_lsn >= start_lsn);
2405 		mach_write_to_8(FIL_PAGE_LSN + frame, end_lsn);
2406 		if (UNIV_LIKELY(frame == block->frame)) {
2407 			mach_write_to_8(srv_page_size
2408 					- FIL_PAGE_END_LSN_OLD_CHKSUM
2409 					+ frame, end_lsn);
2410 		} else {
2411 			buf_zip_decompress(block, false);
2412 		}
2413 
2414 		buf_block_modify_clock_inc(block);
2415 		mysql_mutex_lock(&log_sys.flush_order_mutex);
2416 		buf_flush_note_modification(block, start_lsn, end_lsn);
2417 		mysql_mutex_unlock(&log_sys.flush_order_mutex);
2418 	} else if (free_page && init) {
2419 		/* There have been no operations that modify the page.
2420 		Any buffered changes must not be merged. A subsequent
2421 		buf_page_create() from a user thread should discard
2422 		any buffered changes. */
2423 		init->created = false;
2424 		ut_ad(!mtr.has_modifications());
2425 		block->page.status = buf_page_t::FREED;
2426 	}
2427 
2428 	/* Make sure that committing mtr does not change the modification
2429 	lsn values of page */
2430 
2431 	mtr.discard_modifications();
2432 	mtr.commit();
2433 
2434 	time_t now = time(NULL);
2435 
2436 	mutex_enter(&recv_sys.mutex);
2437 
2438 	if (recv_max_page_lsn < page_lsn) {
2439 		recv_max_page_lsn = page_lsn;
2440 	}
2441 
2442 	ut_ad(p->second.is_being_processed());
2443 	ut_ad(!recv_sys.pages.empty());
2444 
2445 	if (recv_sys.report(now)) {
2446 		const ulint n = recv_sys.pages.size();
2447 		ib::info() << "To recover: " << n << " pages from log";
2448 		service_manager_extend_timeout(
2449 			INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2450 	}
2451 }
2452 
2453 /** Remove records for a corrupted page.
2454 This function should only be called when innodb_force_recovery is set.
2455 @param page_id  corrupted page identifier */
free_corrupted_page(page_id_t page_id)2456 ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
2457 {
2458   mutex_enter(&mutex);
2459   map::iterator p= pages.find(page_id);
2460   if (p != pages.end())
2461   {
2462     p->second.log.clear();
2463     pages.erase(p);
2464   }
2465   mutex_exit(&mutex);
2466 }
2467 
2468 /** Apply any buffered redo log to a page that was just read from a data file.
2469 @param[in,out]	space	tablespace
2470 @param[in,out]	bpage	buffer pool page */
recv_recover_page(fil_space_t * space,buf_page_t * bpage)2471 void recv_recover_page(fil_space_t* space, buf_page_t* bpage)
2472 {
2473 	mtr_t mtr;
2474 	mtr.start();
2475 	mtr.set_log_mode(MTR_LOG_NO_REDO);
2476 
2477 	ut_ad(bpage->state() == BUF_BLOCK_FILE_PAGE);
2478 	buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2479 
2480 	/* Move the ownership of the x-latch on the page to
2481 	this OS thread, so that we can acquire a second
2482 	x-latch on it.  This is needed for the operations to
2483 	the page to pass the debug checks. */
2484 	rw_lock_x_lock_move_ownership(&block->lock);
2485 	buf_block_buf_fix_inc(block, __FILE__, __LINE__);
2486 	rw_lock_x_lock(&block->lock);
2487 	mtr.memo_push(block, MTR_MEMO_PAGE_X_FIX);
2488 
2489 	mutex_enter(&recv_sys.mutex);
2490 	if (recv_sys.apply_log_recs) {
2491 		recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id());
2492 		if (p != recv_sys.pages.end()
2493 		    && !p->second.is_being_processed()) {
2494 			recv_recover_page(block, mtr, p, space);
2495 			p->second.log.clear();
2496 			recv_sys.pages.erase(p);
2497 			goto func_exit;
2498 		}
2499 	}
2500 
2501 	mtr.commit();
2502 func_exit:
2503 	mutex_exit(&recv_sys.mutex);
2504 	ut_ad(mtr.has_committed());
2505 }
2506 
2507 /** Read pages for which log needs to be applied.
2508 @param page_id	first page identifier to read
2509 @param i        iterator to recv_sys.pages */
recv_read_in_area(page_id_t page_id,recv_sys_t::map::iterator i)2510 static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
2511 {
2512   uint32_t page_nos[32];
2513   ut_ad(page_id == i->first);
2514   page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U));
2515   const page_id_t up_limit{page_id + 31};
2516   uint32_t* p= page_nos;
2517 
2518   for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
2519   {
2520     if (i->second.state == page_recv_t::RECV_NOT_PROCESSED)
2521     {
2522       i->second.state= page_recv_t::RECV_BEING_READ;
2523       *p++= i->first.page_no();
2524     }
2525   }
2526 
2527   if (p != page_nos)
2528   {
2529     mutex_exit(&recv_sys.mutex);
2530     buf_read_recv_pages(page_id.space(), page_nos, ulint(p - page_nos));
2531     mutex_enter(&recv_sys.mutex);
2532   }
2533 }
2534 
2535 /** Attempt to initialize a page based on redo log records.
2536 @param page_id  page identifier
2537 @param p        iterator pointing to page_id
2538 @param mtr      mini-transaction
2539 @param b        pre-allocated buffer pool block
2540 @return whether the page was successfully initialized */
recover_low(const page_id_t page_id,map::iterator & p,mtr_t & mtr,buf_block_t * b)2541 inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
2542                                             map::iterator &p, mtr_t &mtr,
2543                                             buf_block_t *b)
2544 {
2545   ut_ad(mutex_own(&mutex));
2546   ut_ad(p->first == page_id);
2547   page_recv_t &recs= p->second;
2548   ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
2549   buf_block_t* block= nullptr;
2550   mlog_init_t::init &i= mlog_init.last(page_id);
2551   const lsn_t end_lsn = recs.log.last()->lsn;
2552   if (end_lsn < i.lsn)
2553     DBUG_LOG("ib_log", "skip log for page " << page_id
2554              << " LSN " << end_lsn << " < " << i.lsn);
2555   else if (fil_space_t *space= fil_space_t::get(page_id.space()))
2556   {
2557     mtr.start();
2558     mtr.set_log_mode(MTR_LOG_NO_REDO);
2559     block= buf_page_create(space, page_id.page_no(), space->zip_size(), &mtr,
2560                            b);
2561     if (UNIV_UNLIKELY(block != b))
2562     {
2563       /* The page happened to exist in the buffer pool, or it was just
2564       being read in. Before buf_page_get_with_no_latch() returned to
2565       buf_page_create(), all changes must have been applied to the
2566       page already. */
2567       ut_ad(recv_sys.pages.find(page_id) == recv_sys.pages.end());
2568       mtr.commit();
2569       block= nullptr;
2570     }
2571     else
2572     {
2573       ut_ad(&recs == &recv_sys.pages.find(page_id)->second);
2574       i.created= true;
2575       buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2576       recv_recover_page(block, mtr, p, space, &i);
2577       ut_ad(mtr.has_committed());
2578       recs.log.clear();
2579       map::iterator r= p++;
2580       recv_sys.pages.erase(r);
2581     }
2582     space->release();
2583   }
2584 
2585   return block;
2586 }
2587 
2588 /** Attempt to initialize a page based on redo log records.
2589 @param page_id  page identifier
2590 @return whether the page was successfully initialized */
recover_low(const page_id_t page_id)2591 buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
2592 {
2593   buf_block_t *free_block= buf_LRU_get_free_block(false);
2594   buf_block_t *block= nullptr;
2595 
2596   mutex_enter(&mutex);
2597   map::iterator p= pages.find(page_id);
2598 
2599   if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
2600   {
2601     mtr_t mtr;
2602     block= recover_low(page_id, p, mtr, free_block);
2603     ut_ad(!block || block == free_block);
2604   }
2605 
2606   mutex_exit(&mutex);
2607   if (UNIV_UNLIKELY(!block))
2608     buf_pool.free_block(free_block);
2609   return block;
2610 }
2611 
2612 /** Thread-safe function which sorts flush_list by oldest_modification */
log_sort_flush_list()2613 static void log_sort_flush_list()
2614 {
2615   mysql_mutex_lock(&buf_pool.flush_list_mutex);
2616 
2617   const size_t size= UT_LIST_GET_LEN(buf_pool.flush_list);
2618   std::unique_ptr<buf_page_t *[]> list(new buf_page_t *[size]);
2619 
2620   size_t idx= 0;
2621   for (buf_page_t *p= UT_LIST_GET_FIRST(buf_pool.flush_list); p;
2622        p= UT_LIST_GET_NEXT(list, p))
2623     list.get()[idx++]= p;
2624 
2625   std::sort(list.get(), list.get() + size,
2626             [](const buf_page_t *lhs, const buf_page_t *rhs) {
2627               return rhs->oldest_modification() < lhs->oldest_modification();
2628             });
2629 
2630   UT_LIST_INIT(buf_pool.flush_list, &buf_page_t::list);
2631 
2632   for (size_t i= 0; i < size; i++)
2633     UT_LIST_ADD_LAST(buf_pool.flush_list, list[i]);
2634 
2635   mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2636 }
2637 
2638 /** Apply buffered log to persistent data pages.
2639 @param last_batch     whether it is possible to write more redo log */
apply(bool last_batch)2640 void recv_sys_t::apply(bool last_batch)
2641 {
2642   ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
2643         srv_operation == SRV_OPERATION_RESTORE ||
2644         srv_operation == SRV_OPERATION_RESTORE_EXPORT);
2645 
2646   mutex_enter(&mutex);
2647 
2648   while (apply_batch_on)
2649   {
2650     bool abort= found_corrupt_log;
2651     mutex_exit(&mutex);
2652 
2653     if (abort)
2654       return;
2655 
2656     os_thread_sleep(500000);
2657     mutex_enter(&mutex);
2658   }
2659 
2660 #ifdef SAFE_MUTEX
2661   DBUG_ASSERT(!last_batch == mysql_mutex_is_owner(&log_sys.mutex));
2662 #endif /* SAFE_MUTEX */
2663 
2664   recv_no_ibuf_operations = !last_batch ||
2665     srv_operation == SRV_OPERATION_RESTORE ||
2666     srv_operation == SRV_OPERATION_RESTORE_EXPORT;
2667 
2668   mtr_t mtr;
2669 
2670   if (!pages.empty())
2671   {
2672     const char *msg= last_batch
2673       ? "Starting final batch to recover "
2674       : "Starting a batch to recover ";
2675     const ulint n= pages.size();
2676     ib::info() << msg << n << " pages from redo log.";
2677     sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log", msg, n);
2678 
2679     apply_log_recs= true;
2680     apply_batch_on= true;
2681 
2682     for (auto id= srv_undo_tablespaces_open; id--;)
2683     {
2684       const trunc& t= truncated_undo_spaces[id];
2685       if (t.lsn)
2686         trim(page_id_t(id + srv_undo_space_id_start, t.pages), t.lsn);
2687     }
2688 
2689     fil_system.extend_to_recv_size();
2690 
2691     buf_block_t *free_block= buf_LRU_get_free_block(false);
2692 
2693     for (map::iterator p= pages.begin(); p != pages.end(); )
2694     {
2695       const page_id_t page_id= p->first;
2696       ut_ad(!p->second.log.empty());
2697 
2698       switch (p->second.state) {
2699       case page_recv_t::RECV_BEING_READ:
2700       case page_recv_t::RECV_BEING_PROCESSED:
2701         p++;
2702         continue;
2703       case page_recv_t::RECV_WILL_NOT_READ:
2704         if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block)))
2705         {
2706           mutex_exit(&mutex);
2707           free_block= buf_LRU_get_free_block(false);
2708           mutex_enter(&mutex);
2709           break;
2710         }
2711         ut_ad(p == pages.end() || p->first > page_id);
2712         continue;
2713       case page_recv_t::RECV_NOT_PROCESSED:
2714         recv_read_in_area(page_id, p);
2715       }
2716       p= pages.lower_bound(page_id);
2717       /* Ensure that progress will be made. */
2718       ut_ad(p == pages.end() || p->first > page_id ||
2719             p->second.state >= page_recv_t::RECV_BEING_READ);
2720     }
2721 
2722     buf_pool.free_block(free_block);
2723 
2724     /* Wait until all the pages have been processed */
2725     while (!pages.empty() || buf_pool.n_pend_reads)
2726     {
2727       const bool abort= found_corrupt_log || found_corrupt_fs;
2728 
2729       if (found_corrupt_fs && !srv_force_recovery)
2730         ib::info() << "Set innodb_force_recovery=1 to ignore corrupted pages.";
2731 
2732       mutex_exit(&mutex);
2733 
2734       if (abort)
2735         return;
2736       os_thread_sleep(500000);
2737       mutex_enter(&mutex);
2738     }
2739   }
2740 
2741   if (last_batch)
2742     /* We skipped this in buf_page_create(). */
2743     mlog_init.mark_ibuf_exist(mtr);
2744   else
2745   {
2746     mlog_init.reset();
2747     mysql_mutex_unlock(&log_sys.mutex);
2748   }
2749 
2750   mysql_mutex_assert_not_owner(&log_sys.mutex);
2751   mutex_exit(&mutex);
2752 
2753   if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
2754       srv_operation != SRV_OPERATION_RESTORE_EXPORT)
2755     log_sort_flush_list();
2756   else
2757   {
2758     /* Instead of flushing, last_batch could sort the buf_pool.flush_list
2759     in ascending order of buf_page_t::oldest_modification. */
2760     buf_flush_sync_batch(recovered_lsn);
2761   }
2762 
2763   if (!last_batch)
2764   {
2765     buf_pool_invalidate();
2766     mysql_mutex_lock(&log_sys.mutex);
2767   }
2768 #if 1 /* Mariabackup FIXME: Remove or adjust rename_table_in_prepare() */
2769   else if (srv_operation != SRV_OPERATION_NORMAL);
2770 #endif
2771   else
2772   {
2773     /* In the last batch, we will apply any rename operations. */
2774     for (auto r : renamed_spaces)
2775     {
2776       const uint32_t id= r.first;
2777       fil_space_t *space= fil_space_t::get(id);
2778       if (!space)
2779         continue;
2780       ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
2781       const char *old= space->chain.start->name;
2782       if (r.second != old)
2783       {
2784         bool exists;
2785         os_file_type_t ftype;
2786         const char *new_name= r.second.c_str();
2787         if (!os_file_status(new_name, &exists, &ftype) || exists)
2788         {
2789           ib::error() << "Cannot replay rename of tablespace " << id
2790                       << " from '" << old << "' to '" << r.second <<
2791                       (exists ? "' because the target file exists" : "'");
2792           found_corrupt_fs= true;
2793         }
2794         else
2795         {
2796           size_t base= r.second.rfind(OS_PATH_SEPARATOR);
2797           ut_ad(base != std::string::npos);
2798           size_t start= r.second.rfind(OS_PATH_SEPARATOR, base - 1);
2799           if (start == std::string::npos)
2800             start= 0;
2801           else
2802             ++start;
2803           /* Keep only databasename/tablename without .ibd suffix */
2804           std::string space_name(r.second, start, r.second.size() - start - 4);
2805           ut_ad(space_name[base - start] == OS_PATH_SEPARATOR);
2806 #if OS_PATH_SEPARATOR != '/'
2807           space_name[base - start]= '/';
2808 #endif
2809           mysql_mutex_lock(&log_sys.mutex);
2810           if (dberr_t err= space->rename(space_name.c_str(), r.second.c_str(),
2811                                          false))
2812           {
2813             ib::error() << "Cannot replay rename of tablespace " << id
2814                         << " to '" << r.second << "': " << err;
2815             found_corrupt_fs= true;
2816           }
2817           mysql_mutex_unlock(&log_sys.mutex);
2818         }
2819       }
2820       space->release();
2821     }
2822     renamed_spaces.clear();
2823   }
2824 
2825   mutex_enter(&mutex);
2826 
2827   ut_d(after_apply= true);
2828   clear();
2829   mutex_exit(&mutex);
2830 }
2831 
2832 /** Check whether the number of read redo log blocks exceeds the maximum.
2833 Store last_stored_lsn if the recovery is not in the last phase.
2834 @param[in,out] store    whether to store page operations
2835 @return whether the memory is exhausted */
is_memory_exhausted(store_t * store)2836 inline bool recv_sys_t::is_memory_exhausted(store_t *store)
2837 {
2838   if (*store == STORE_NO ||
2839       UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages())
2840     return false;
2841   if (*store == STORE_YES)
2842     last_stored_lsn= recovered_lsn;
2843   *store= STORE_NO;
2844   DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
2845                        " last stored offset " ULINTPF "\n",
2846                        recovered_lsn, recovered_offset));
2847   return true;
2848 }
2849 
2850 /** Adds data from a new log block to the parsing buffer of recv_sys if
2851 recv_sys.parse_start_lsn is non-zero.
2852 @param[in]	log_block	log block to add
2853 @param[in]	scanned_lsn	lsn of how far we were able to find
2854 				data in this log block
2855 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)2856 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
2857 {
2858 	ulint	more_len;
2859 	ulint	data_len;
2860 	ulint	start_offset;
2861 	ulint	end_offset;
2862 
2863 	ut_ad(scanned_lsn >= recv_sys.scanned_lsn);
2864 
2865 	if (!recv_sys.parse_start_lsn) {
2866 		/* Cannot start parsing yet because no start point for
2867 		it found */
2868 		return(false);
2869 	}
2870 
2871 	data_len = log_block_get_data_len(log_block);
2872 
2873 	if (recv_sys.parse_start_lsn >= scanned_lsn) {
2874 
2875 		return(false);
2876 
2877 	} else if (recv_sys.scanned_lsn >= scanned_lsn) {
2878 
2879 		return(false);
2880 
2881 	} else if (recv_sys.parse_start_lsn > recv_sys.scanned_lsn) {
2882 		more_len = (ulint) (scanned_lsn - recv_sys.parse_start_lsn);
2883 	} else {
2884 		more_len = (ulint) (scanned_lsn - recv_sys.scanned_lsn);
2885 	}
2886 
2887 	if (more_len == 0) {
2888 		return(false);
2889 	}
2890 
2891 	ut_ad(data_len >= more_len);
2892 
2893 	start_offset = data_len - more_len;
2894 
2895 	if (start_offset < LOG_BLOCK_HDR_SIZE) {
2896 		start_offset = LOG_BLOCK_HDR_SIZE;
2897 	}
2898 
2899 	end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
2900 
2901 	ut_ad(start_offset <= end_offset);
2902 
2903 	if (start_offset < end_offset) {
2904 		memcpy(recv_sys.buf + recv_sys.len,
2905 		       log_block + start_offset, end_offset - start_offset);
2906 
2907 		recv_sys.len += end_offset - start_offset;
2908 
2909 		ut_a(recv_sys.len <= RECV_PARSING_BUF_SIZE);
2910 	}
2911 
2912 	return(true);
2913 }
2914 
2915 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()2916 void recv_sys_justify_left_parsing_buf()
2917 {
2918 	memmove(recv_sys.buf, recv_sys.buf + recv_sys.recovered_offset,
2919 		recv_sys.len - recv_sys.recovered_offset);
2920 
2921 	recv_sys.len -= recv_sys.recovered_offset;
2922 
2923 	recv_sys.recovered_offset = 0;
2924 }
2925 
2926 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
2927 Parse and hash the log records if new data found.
2928 Apply log records automatically when the hash table becomes full.
2929 @param[in,out]	store			whether the records should be
2930 					stored into recv_sys.pages; this is
2931 					reset if just debug checking is
2932 					needed, or when the num_max_blocks in
2933 					recv_sys runs out
2934 @param[in]	log_block		log segment
2935 @param[in]	checkpoint_lsn		latest checkpoint LSN
2936 @param[in]	start_lsn		buffer start LSN
2937 @param[in]	end_lsn			buffer end LSN
2938 @param[in,out]	contiguous_lsn		it is known that all groups contain
2939 					contiguous log data upto this lsn
2940 @param[out]	group_scanned_lsn	scanning succeeded upto this lsn
2941 @return true if not able to scan any more in this log group */
recv_scan_log_recs(store_t * store,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)2942 static bool recv_scan_log_recs(
2943 	store_t*	store,
2944 	const byte*	log_block,
2945 	lsn_t		checkpoint_lsn,
2946 	lsn_t		start_lsn,
2947 	lsn_t		end_lsn,
2948 	lsn_t*		contiguous_lsn,
2949 	lsn_t*		group_scanned_lsn)
2950 {
2951 	lsn_t		scanned_lsn	= start_lsn;
2952 	bool		finished	= false;
2953 	ulint		data_len;
2954 	bool		more_data	= false;
2955 	bool		apply		= recv_sys.mlog_checkpoint_lsn != 0;
2956 	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
2957 	const bool	last_phase = (*store == STORE_IF_EXISTS);
2958 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2959 	ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2960 	ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
2961 	ut_ad(log_sys.is_physical());
2962 
2963 	const byte* const	log_end = log_block
2964 		+ ulint(end_lsn - start_lsn);
2965 	constexpr ulint sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT;
2966 
2967 	do {
2968 		ut_ad(!finished);
2969 
2970 		if (log_block_get_flush_bit(log_block)) {
2971 			/* This block was a start of a log flush operation:
2972 			we know that the previous flush operation must have
2973 			been completed for all log groups before this block
2974 			can have been flushed to any of the groups. Therefore,
2975 			we know that log data is contiguous up to scanned_lsn
2976 			in all non-corrupt log groups. */
2977 
2978 			if (scanned_lsn > *contiguous_lsn) {
2979 				*contiguous_lsn = scanned_lsn;
2980 			}
2981 		}
2982 
2983 		data_len = log_block_get_data_len(log_block);
2984 
2985 		if (scanned_lsn + data_len > recv_sys.scanned_lsn
2986 		    && log_block_get_checkpoint_no(log_block)
2987 		    < recv_sys.scanned_checkpoint_no
2988 		    && (recv_sys.scanned_checkpoint_no
2989 			- log_block_get_checkpoint_no(log_block)
2990 			> 0x80000000UL)) {
2991 
2992 			/* Garbage from a log buffer flush which was made
2993 			before the most recent database recovery */
2994 			finished = true;
2995 			break;
2996 		}
2997 
2998 		if (!recv_sys.parse_start_lsn
2999 		    && (log_block_get_first_rec_group(log_block) > 0)) {
3000 
3001 			/* We found a point from which to start the parsing
3002 			of log records */
3003 
3004 			recv_sys.parse_start_lsn = scanned_lsn
3005 				+ log_block_get_first_rec_group(log_block);
3006 			recv_sys.scanned_lsn = recv_sys.parse_start_lsn;
3007 			recv_sys.recovered_lsn = recv_sys.parse_start_lsn;
3008 		}
3009 
3010 		scanned_lsn += data_len;
3011 
3012 		if (data_len == LOG_BLOCK_HDR_SIZE + sizeof_checkpoint
3013 		    && scanned_lsn == checkpoint_lsn + sizeof_checkpoint
3014 		    && log_block[LOG_BLOCK_HDR_SIZE]
3015 		    == (FILE_CHECKPOINT | (SIZE_OF_FILE_CHECKPOINT - 2))
3016 		    && checkpoint_lsn == mach_read_from_8(
3017 			    (LOG_BLOCK_HDR_SIZE + 1 + 2)
3018 			    + log_block)) {
3019 			/* The redo log is logically empty. */
3020 			ut_ad(recv_sys.mlog_checkpoint_lsn == 0
3021 			      || recv_sys.mlog_checkpoint_lsn
3022 			      == checkpoint_lsn);
3023 			recv_sys.mlog_checkpoint_lsn = checkpoint_lsn;
3024 			DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3025 					      scanned_lsn));
3026 			finished = true;
3027 			break;
3028 		}
3029 
3030 		if (scanned_lsn > recv_sys.scanned_lsn) {
3031 			ut_ad(!srv_log_file_created);
3032 			if (!recv_needed_recovery) {
3033 				recv_needed_recovery = true;
3034 
3035 				if (srv_read_only_mode) {
3036 					ib::warn() << "innodb_read_only"
3037 						" prevents crash recovery";
3038 					return(true);
3039 				}
3040 
3041 				ib::info() << "Starting crash recovery from"
3042 					" checkpoint LSN=" << checkpoint_lsn
3043 					   << "," << recv_sys.scanned_lsn;
3044 			}
3045 
3046 			/* We were able to find more log data: add it to the
3047 			parsing buffer if parse_start_lsn is already
3048 			non-zero */
3049 
3050 			DBUG_EXECUTE_IF(
3051 				"reduce_recv_parsing_buf",
3052 				recv_parsing_buf_size = RECV_SCAN_SIZE * 2;
3053 				);
3054 
3055 			if (recv_sys.len + 4 * OS_FILE_LOG_BLOCK_SIZE
3056 			    >= recv_parsing_buf_size) {
3057 				ib::error() << "Log parsing buffer overflow."
3058 					" Recovery may have failed!";
3059 
3060 				recv_sys.found_corrupt_log = true;
3061 
3062 				if (!srv_force_recovery) {
3063 					ib::error()
3064 						<< "Set innodb_force_recovery"
3065 						" to ignore this error.";
3066 					return(true);
3067 				}
3068 			} else if (!recv_sys.found_corrupt_log) {
3069 				more_data = recv_sys_add_to_parsing_buf(
3070 					log_block, scanned_lsn);
3071 			}
3072 
3073 			recv_sys.scanned_lsn = scanned_lsn;
3074 			recv_sys.scanned_checkpoint_no
3075 				= log_block_get_checkpoint_no(log_block);
3076 		}
3077 
3078 		/* During last phase of scanning, there can be redo logs
3079 		left in recv_sys.buf to parse & store it in recv_sys.heap */
3080 		if (last_phase
3081 		    && recv_sys.recovered_lsn < recv_sys.scanned_lsn) {
3082 			more_data = true;
3083 		}
3084 
3085 		if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3086 			/* Log data for this group ends here */
3087 			finished = true;
3088 			break;
3089 		} else {
3090 			log_block += OS_FILE_LOG_BLOCK_SIZE;
3091 		}
3092 	} while (log_block < log_end);
3093 
3094 	*group_scanned_lsn = scanned_lsn;
3095 
3096 	mutex_enter(&recv_sys.mutex);
3097 
3098 	if (more_data && !recv_sys.found_corrupt_log) {
3099 		/* Try to parse more log records */
3100 		if (recv_sys.parse(checkpoint_lsn, store, apply)) {
3101 			ut_ad(recv_sys.found_corrupt_log
3102 			      || recv_sys.found_corrupt_fs
3103 			      || recv_sys.mlog_checkpoint_lsn
3104 			      == recv_sys.recovered_lsn);
3105 			finished = true;
3106 			goto func_exit;
3107 		}
3108 
3109 		recv_sys.is_memory_exhausted(store);
3110 
3111 		if (recv_sys.recovered_offset > recv_parsing_buf_size / 4
3112 		    || (recv_sys.recovered_offset
3113 			&& recv_sys.len
3114 			>= recv_parsing_buf_size - RECV_SCAN_SIZE)) {
3115 			/* Move parsing buffer data to the buffer start */
3116 			recv_sys_justify_left_parsing_buf();
3117 		}
3118 
3119 		/* Need to re-parse the redo log which're stored
3120 		in recv_sys.buf */
3121 		if (last_phase && *store == STORE_NO) {
3122 			finished = false;
3123 		}
3124 	}
3125 
3126 func_exit:
3127 	mutex_exit(&recv_sys.mutex);
3128 	return(finished);
3129 }
3130 
3131 /** Scans log from a buffer and stores new log data to the parsing buffer.
3132 Parses and hashes the log records if new data found.
3133 @param[in]	checkpoint_lsn		latest checkpoint log sequence number
3134 @param[in,out]	contiguous_lsn		log sequence number
3135 until which all redo log has been scanned
3136 @param[in]	last_phase		whether changes
3137 can be applied to the tablespaces
3138 @return whether rescan is needed (not everything was stored) */
3139 static
3140 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3141 recv_group_scan_log_recs(
3142 	lsn_t		checkpoint_lsn,
3143 	lsn_t*		contiguous_lsn,
3144 	bool		last_phase)
3145 {
3146 	DBUG_ENTER("recv_group_scan_log_recs");
3147 	DBUG_ASSERT(!last_phase || recv_sys.mlog_checkpoint_lsn > 0);
3148 
3149 	mutex_enter(&recv_sys.mutex);
3150 	recv_sys.len = 0;
3151 	recv_sys.recovered_offset = 0;
3152 	recv_sys.clear();
3153 	recv_sys.parse_start_lsn = *contiguous_lsn;
3154 	recv_sys.scanned_lsn = *contiguous_lsn;
3155 	recv_sys.recovered_lsn = *contiguous_lsn;
3156 	recv_sys.scanned_checkpoint_no = 0;
3157 	ut_ad(recv_max_page_lsn == 0);
3158 	mutex_exit(&recv_sys.mutex);
3159 
3160 	lsn_t	start_lsn;
3161 	lsn_t	end_lsn;
3162 	store_t	store	= recv_sys.mlog_checkpoint_lsn == 0
3163 		? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3164 
3165 	log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3166 		ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3167 	ut_d(recv_sys.after_apply = last_phase);
3168 
3169 	do {
3170 		if (last_phase && store == STORE_NO) {
3171 			store = STORE_IF_EXISTS;
3172 			recv_sys.apply(false);
3173 			/* Rescan the redo logs from last stored lsn */
3174 			end_lsn = recv_sys.recovered_lsn;
3175 		}
3176 
3177 		start_lsn = ut_uint64_align_down(end_lsn,
3178 						 OS_FILE_LOG_BLOCK_SIZE);
3179 		end_lsn = start_lsn;
3180 		log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3181 	} while (end_lsn != start_lsn
3182 		 && !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn,
3183 					start_lsn, end_lsn, contiguous_lsn,
3184 					&log_sys.log.scanned_lsn));
3185 
3186 	if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
3187 		DBUG_RETURN(false);
3188 	}
3189 
3190 	DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3191 			      last_phase ? "rescan" : "scan",
3192 			      log_sys.log.scanned_lsn));
3193 
3194 	DBUG_RETURN(store == STORE_NO);
3195 }
3196 
3197 /** Report a missing tablespace for which page-redo log exists.
3198 @param[in]	err	previous error code
3199 @param[in]	i	tablespace descriptor
3200 @return new error code */
3201 static
3202 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3203 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3204 {
3205 	if (srv_operation == SRV_OPERATION_RESTORE
3206 	    || srv_operation == SRV_OPERATION_RESTORE_EXPORT) {
3207 		if (i->second.name.find(TEMP_TABLE_PATH_PREFIX)
3208 		    != std::string::npos) {
3209 			ib::warn() << "Tablespace " << i->first << " was not"
3210 				" found at " << i->second.name << " when"
3211 				" restoring a (partial?) backup. All redo log"
3212 				" for this file will be ignored!";
3213 		}
3214 		return(err);
3215 	}
3216 
3217 	if (srv_force_recovery == 0) {
3218 		ib::error() << "Tablespace " << i->first << " was not"
3219 			" found at " << i->second.name << ".";
3220 
3221 		if (err == DB_SUCCESS) {
3222 			ib::error() << "Set innodb_force_recovery=1 to"
3223 				" ignore this and to permanently lose"
3224 				" all changes to the tablespace.";
3225 			err = DB_TABLESPACE_NOT_FOUND;
3226 		}
3227 	} else {
3228 		ib::warn() << "Tablespace " << i->first << " was not"
3229 			" found at " << i->second.name << ", and"
3230 			" innodb_force_recovery was set. All redo log"
3231 			" for this tablespace will be ignored!";
3232 	}
3233 
3234 	return(err);
3235 }
3236 
3237 /** Report the missing tablespace and discard the redo logs for the deleted
3238 tablespace.
3239 @param[in]	rescan			rescan of redo logs is needed
3240 					if hash table ran out of memory
3241 @param[out]	missing_tablespace	missing tablespace exists or not
3242 @return error code or DB_SUCCESS. */
3243 static MY_ATTRIBUTE((warn_unused_result))
3244 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3245 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3246 {
3247 	dberr_t err = DB_SUCCESS;
3248 
3249 	mutex_enter(&recv_sys.mutex);
3250 
3251 	for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
3252 	     p != recv_sys.pages.end();) {
3253 		ut_ad(!p->second.log.empty());
3254 		const ulint space = p->first.space();
3255 		if (is_predefined_tablespace(space)) {
3256 next:
3257 			p++;
3258 			continue;
3259 		}
3260 
3261 		recv_spaces_t::iterator i = recv_spaces.find(space);
3262 		ut_ad(i != recv_spaces.end());
3263 
3264 		switch (i->second.status) {
3265 		case file_name_t::NORMAL:
3266 			goto next;
3267 		case file_name_t::MISSING:
3268 			err = recv_init_missing_space(err, i);
3269 			i->second.status = file_name_t::DELETED;
3270 			/* fall through */
3271 		case file_name_t::DELETED:
3272 			recv_sys_t::map::iterator r = p++;
3273 			r->second.log.clear();
3274 			recv_sys.pages.erase(r);
3275 			continue;
3276 		}
3277 		ut_ad(0);
3278 	}
3279 
3280 	if (err != DB_SUCCESS) {
3281 func_exit:
3282 		mutex_exit(&recv_sys.mutex);
3283 		return(err);
3284 	}
3285 
3286 	/* When rescan is not needed, recv_sys.pages will contain the
3287 	entire redo log. If rescan is needed or innodb_force_recovery
3288 	is set, we can ignore missing tablespaces. */
3289 	for (const recv_spaces_t::value_type& rs : recv_spaces) {
3290 		if (UNIV_LIKELY(rs.second.status != file_name_t::MISSING)) {
3291 			continue;
3292 		}
3293 
3294 		missing_tablespace = true;
3295 
3296 		if (srv_force_recovery > 0) {
3297 			ib::warn() << "Tablespace " << rs.first
3298 				<<" was not found at " << rs.second.name
3299 				<<", and innodb_force_recovery was set."
3300 				<<" All redo log for this tablespace"
3301 				<<" will be ignored!";
3302 			continue;
3303 		}
3304 
3305 		if (!rescan) {
3306 			ib::info() << "Tablespace " << rs.first
3307 				<< " was not found at '"
3308 				<< rs.second.name << "', but there"
3309 				<<" were no modifications either.";
3310 		}
3311 	}
3312 
3313 	if (!rescan || srv_force_recovery > 0) {
3314 		missing_tablespace = false;
3315 	}
3316 
3317 	err = DB_SUCCESS;
3318 	goto func_exit;
3319 }
3320 
3321 /** Check if all tablespaces were found for crash recovery.
3322 @param[in]	rescan			rescan of redo logs is needed
3323 @param[out]	missing_tablespace	missing table exists
3324 @return error code or DB_SUCCESS */
3325 static MY_ATTRIBUTE((warn_unused_result))
3326 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3327 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3328 {
3329 	bool		flag_deleted	= false;
3330 
3331 	ut_ad(!srv_read_only_mode);
3332 	ut_ad(recv_needed_recovery);
3333 
3334 	for (recv_spaces_t::value_type& rs : recv_spaces) {
3335 		ut_ad(!is_predefined_tablespace(rs.first));
3336 		ut_ad(rs.second.status != file_name_t::DELETED
3337 		      || !rs.second.space);
3338 
3339 		if (rs.second.status == file_name_t::DELETED) {
3340 			/* The tablespace was deleted,
3341 			so we can ignore any redo log for it. */
3342 			flag_deleted = true;
3343 		} else if (rs.second.space != NULL) {
3344 			/* The tablespace was found, and there
3345 			are some redo log records for it. */
3346 			fil_names_dirty(rs.second.space);
3347 
3348 			/* Add the freed page ranges in the respective
3349 			tablespace */
3350 			if (!rs.second.freed_ranges.empty()
3351 			    && (srv_immediate_scrub_data_uncompressed
3352 				|| rs.second.space->is_compressed())) {
3353 
3354 				rs.second.space->add_free_ranges(
3355 					std::move(rs.second.freed_ranges));
3356 			}
3357 		} else if (rs.second.name == "") {
3358 			ib::error() << "Missing FILE_CREATE, FILE_DELETE"
3359 				" or FILE_MODIFY before FILE_CHECKPOINT"
3360 				" for tablespace " << rs.first;
3361 			recv_sys.found_corrupt_log = true;
3362 			return(DB_CORRUPTION);
3363 		} else {
3364 			rs.second.status = file_name_t::MISSING;
3365 			flag_deleted = true;
3366 		}
3367 
3368 		ut_ad(rs.second.status == file_name_t::DELETED
3369 		      || rs.second.name != "");
3370 	}
3371 
3372 	if (flag_deleted) {
3373 		return recv_validate_tablespace(rescan, missing_tablespace);
3374 	}
3375 
3376 	return DB_SUCCESS;
3377 }
3378 
3379 /** Start recovering from a redo log checkpoint.
3380 @param[in]	flush_lsn	FIL_PAGE_FILE_FLUSH_LSN
3381 of first system tablespace page
3382 @return error code or DB_SUCCESS */
3383 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3384 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3385 {
3386 	ulint		max_cp_field;
3387 	lsn_t		checkpoint_lsn;
3388 	bool		rescan = false;
3389 	ib_uint64_t	checkpoint_no;
3390 	lsn_t		contiguous_lsn;
3391 	byte*		buf;
3392 	dberr_t		err = DB_SUCCESS;
3393 
3394 	ut_ad(srv_operation == SRV_OPERATION_NORMAL
3395 	      || srv_operation == SRV_OPERATION_RESTORE
3396 	      || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
3397 	ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
3398 	ut_ad(UT_LIST_GET_LEN(buf_pool.LRU) == 0);
3399 	ut_ad(UT_LIST_GET_LEN(buf_pool.unzip_LRU) == 0);
3400 	ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
3401 
3402 	if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3403 
3404 		ib::info() << "innodb_force_recovery=6 skips redo log apply";
3405 
3406 		return(DB_SUCCESS);
3407 	}
3408 
3409 	recv_sys.recovery_on = true;
3410 
3411 	mysql_mutex_lock(&log_sys.mutex);
3412 
3413 	err = recv_find_max_checkpoint(&max_cp_field);
3414 
3415 	if (err != DB_SUCCESS) {
3416 
3417 		recv_sys.recovered_lsn = log_sys.get_lsn();
3418 		mysql_mutex_unlock(&log_sys.mutex);
3419 		return(err);
3420 	}
3421 
3422 	buf = log_sys.checkpoint_buf;
3423 	log_sys.log.read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE});
3424 
3425 	checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3426 	checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3427 
3428 	/* Start reading the log from the checkpoint lsn. The variable
3429 	contiguous_lsn contains an lsn up to which the log is known to
3430 	be contiguously written. */
3431 	recv_sys.mlog_checkpoint_lsn = 0;
3432 
3433 	ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3434 
3435 	const lsn_t	end_lsn = mach_read_from_8(
3436 		buf + LOG_CHECKPOINT_END_LSN);
3437 
3438 	ut_ad(recv_sys.pages.empty());
3439 	contiguous_lsn = checkpoint_lsn;
3440 	switch (log_sys.log.format) {
3441 	case 0:
3442 		mysql_mutex_unlock(&log_sys.mutex);
3443 		return DB_SUCCESS;
3444 	default:
3445 		if (end_lsn == 0) {
3446 			break;
3447 		}
3448 		if (end_lsn >= checkpoint_lsn) {
3449 			contiguous_lsn = end_lsn;
3450 			break;
3451 		}
3452 		recv_sys.found_corrupt_log = true;
3453 		mysql_mutex_unlock(&log_sys.mutex);
3454 		return(DB_ERROR);
3455 	}
3456 
3457 	size_t sizeof_checkpoint;
3458 
3459 	if (!log_sys.is_physical()) {
3460 		sizeof_checkpoint = 9/* size of MLOG_CHECKPOINT */;
3461 		goto completed;
3462 	}
3463 
3464 	/* Look for FILE_CHECKPOINT. */
3465 	recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3466 	/* The first scan should not have stored or applied any records. */
3467 	ut_ad(recv_sys.pages.empty());
3468 	ut_ad(!recv_sys.found_corrupt_fs);
3469 
3470 	if (srv_read_only_mode && recv_needed_recovery) {
3471 		mysql_mutex_unlock(&log_sys.mutex);
3472 		return(DB_READ_ONLY);
3473 	}
3474 
3475 	if (recv_sys.found_corrupt_log && !srv_force_recovery) {
3476 		mysql_mutex_unlock(&log_sys.mutex);
3477 		ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3478 		return(DB_ERROR);
3479 	}
3480 
3481 	if (recv_sys.mlog_checkpoint_lsn == 0) {
3482 		lsn_t scan_lsn = log_sys.log.scanned_lsn;
3483 		if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3484 			mysql_mutex_unlock(&log_sys.mutex);
3485 			ib::error err;
3486 			err << "Missing FILE_CHECKPOINT";
3487 			if (end_lsn) {
3488 				err << " at " << end_lsn;
3489 			}
3490 			err << " between the checkpoint " << checkpoint_lsn
3491 			    << " and the end " << scan_lsn << ".";
3492 			return(DB_ERROR);
3493 		}
3494 
3495 		log_sys.log.scanned_lsn = checkpoint_lsn;
3496 	} else {
3497 		contiguous_lsn = checkpoint_lsn;
3498 		rescan = recv_group_scan_log_recs(
3499 			checkpoint_lsn, &contiguous_lsn, false);
3500 
3501 		if ((recv_sys.found_corrupt_log && !srv_force_recovery)
3502 		    || recv_sys.found_corrupt_fs) {
3503 			mysql_mutex_unlock(&log_sys.mutex);
3504 			return(DB_ERROR);
3505 		}
3506 	}
3507 
3508 	/* NOTE: we always do a 'recovery' at startup, but only if
3509 	there is something wrong we will print a message to the
3510 	user about recovery: */
3511 	sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT;
3512 
3513 completed:
3514 	if (flush_lsn == checkpoint_lsn + sizeof_checkpoint
3515 	    && recv_sys.mlog_checkpoint_lsn == checkpoint_lsn) {
3516 		/* The redo log is logically empty. */
3517 	} else if (checkpoint_lsn != flush_lsn) {
3518 		ut_ad(!srv_log_file_created);
3519 
3520 		if (checkpoint_lsn + sizeof_checkpoint < flush_lsn) {
3521 			ib::warn()
3522 				<< "Are you sure you are using the right "
3523 				<< LOG_FILE_NAME
3524 				<< " to start up the database? Log sequence "
3525 				   "number in the "
3526 				<< LOG_FILE_NAME << " is " << checkpoint_lsn
3527 				<< ", less than the log sequence number in "
3528 				   "the first system tablespace file header, "
3529 				<< flush_lsn << ".";
3530 		}
3531 
3532 		if (!recv_needed_recovery) {
3533 
3534 			ib::info()
3535 				<< "The log sequence number " << flush_lsn
3536 				<< " in the system tablespace does not match"
3537 				   " the log sequence number "
3538 				<< checkpoint_lsn << " in the "
3539 				<< LOG_FILE_NAME << "!";
3540 
3541 			if (srv_read_only_mode) {
3542 				ib::error() << "innodb_read_only"
3543 					" prevents crash recovery";
3544 				mysql_mutex_unlock(&log_sys.mutex);
3545 				return(DB_READ_ONLY);
3546 			}
3547 
3548 			recv_needed_recovery = true;
3549 		}
3550 	}
3551 
3552 	log_sys.set_lsn(recv_sys.recovered_lsn);
3553 	if (UNIV_LIKELY(log_sys.get_flushed_lsn() < recv_sys.recovered_lsn)) {
3554 		/* This may already have been set by create_log_file()
3555 		if no logs existed when the server started up. */
3556 		log_sys.set_flushed_lsn(recv_sys.recovered_lsn);
3557 	}
3558 
3559 	if (recv_needed_recovery) {
3560 		bool missing_tablespace = false;
3561 
3562 		err = recv_init_crash_recovery_spaces(
3563 			rescan, missing_tablespace);
3564 
3565 		if (err != DB_SUCCESS) {
3566 			mysql_mutex_unlock(&log_sys.mutex);
3567 			return(err);
3568 		}
3569 
3570 		/* If there is any missing tablespace and rescan is needed
3571 		then there is a possiblity that hash table will not contain
3572 		all space ids redo logs. Rescan the remaining unstored
3573 		redo logs for the validation of missing tablespace. */
3574 		ut_ad(rescan || !missing_tablespace);
3575 
3576 		while (missing_tablespace) {
3577 			DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3578 					      "the missing tablespace. Scan "
3579 					      "from last stored LSN " LSN_PF,
3580 					      recv_sys.last_stored_lsn));
3581 
3582 			lsn_t recent_stored_lsn = recv_sys.last_stored_lsn;
3583 			rescan = recv_group_scan_log_recs(
3584 				checkpoint_lsn, &recent_stored_lsn, false);
3585 
3586 			ut_ad(!recv_sys.found_corrupt_fs);
3587 
3588 			missing_tablespace = false;
3589 
3590 			err = recv_sys.found_corrupt_log
3591 				? DB_ERROR
3592 				: recv_validate_tablespace(
3593 					rescan, missing_tablespace);
3594 
3595 			if (err != DB_SUCCESS) {
3596 				mysql_mutex_unlock(&log_sys.mutex);
3597 				return err;
3598 			}
3599 
3600 			rescan = true;
3601 		}
3602 
3603 		recv_sys.parse_start_lsn = checkpoint_lsn;
3604 
3605 		if (srv_operation == SRV_OPERATION_NORMAL) {
3606 			buf_dblwr.recover();
3607 		}
3608 
3609 		ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3610 
3611 		if (rescan) {
3612 			contiguous_lsn = checkpoint_lsn;
3613 
3614 			recv_group_scan_log_recs(
3615 				checkpoint_lsn, &contiguous_lsn, true);
3616 
3617 			if ((recv_sys.found_corrupt_log
3618 			     && !srv_force_recovery)
3619 			    || recv_sys.found_corrupt_fs) {
3620 				mysql_mutex_unlock(&log_sys.mutex);
3621 				return(DB_ERROR);
3622 			}
3623 		}
3624 	} else {
3625 		ut_ad(!rescan || recv_sys.pages.empty());
3626 	}
3627 
3628 	if (log_sys.is_physical()
3629 	    && (log_sys.log.scanned_lsn < checkpoint_lsn
3630 		|| log_sys.log.scanned_lsn < recv_max_page_lsn)) {
3631 
3632 		ib::error() << "We scanned the log up to "
3633 			<< log_sys.log.scanned_lsn
3634 			<< ". A checkpoint was at " << checkpoint_lsn << " and"
3635 			" the maximum LSN on a database page was "
3636 			<< recv_max_page_lsn << ". It is possible that the"
3637 			" database is now corrupt!";
3638 	}
3639 
3640 	if (recv_sys.recovered_lsn < checkpoint_lsn) {
3641 		mysql_mutex_unlock(&log_sys.mutex);
3642 
3643 		ib::error() << "Recovered only to lsn:"
3644 			    << recv_sys.recovered_lsn
3645 			    << " checkpoint_lsn: " << checkpoint_lsn;
3646 
3647 		return(DB_ERROR);
3648 	}
3649 
3650 	log_sys.next_checkpoint_lsn = checkpoint_lsn;
3651 	log_sys.next_checkpoint_no = checkpoint_no + 1;
3652 
3653 	recv_synchronize_groups();
3654 
3655 	ut_ad(recv_needed_recovery
3656 	      || checkpoint_lsn == recv_sys.recovered_lsn);
3657 
3658 	log_sys.write_lsn = log_sys.get_lsn();
3659 	log_sys.buf_free = log_sys.write_lsn % OS_FILE_LOG_BLOCK_SIZE;
3660 	log_sys.buf_next_to_write = log_sys.buf_free;
3661 
3662 	log_sys.last_checkpoint_lsn = checkpoint_lsn;
3663 
3664 	if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL
3665 	    && (~log_t::FORMAT_ENCRYPTED & log_sys.log.format)
3666 	    == log_t::FORMAT_10_5) {
3667 		/* Write a FILE_CHECKPOINT marker as the first thing,
3668 		before generating any other redo log. This ensures
3669 		that subsequent crash recovery will be possible even
3670 		if the server were killed soon after this. */
3671 		fil_names_clear(log_sys.last_checkpoint_lsn, true);
3672 	}
3673 
3674 	log_sys.next_checkpoint_no = ++checkpoint_no;
3675 
3676 	mutex_enter(&recv_sys.mutex);
3677 
3678 	recv_sys.apply_log_recs = true;
3679 	recv_no_ibuf_operations = false;
3680 	ut_d(recv_no_log_write = srv_operation == SRV_OPERATION_RESTORE
3681 	     || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
3682 
3683 	mutex_exit(&recv_sys.mutex);
3684 
3685 	mysql_mutex_unlock(&log_sys.mutex);
3686 
3687 	recv_lsn_checks_on = true;
3688 
3689 	/* The database is now ready to start almost normal processing of user
3690 	transactions: transaction rollbacks and the application of the log
3691 	records in the hash table can be run in background. */
3692 
3693 	return(DB_SUCCESS);
3694 }
3695 
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)3696 bool recv_dblwr_t::validate_page(const page_id_t page_id,
3697                                  const byte *page,
3698                                  const fil_space_t *space,
3699                                  byte *tmp_buf)
3700 {
3701   if (page_id.page_no() == 0)
3702   {
3703     ulint flags= fsp_header_get_flags(page);
3704     if (!fil_space_t::is_valid_flags(flags, page_id.space()))
3705     {
3706       ulint cflags= fsp_flags_convert_from_101(flags);
3707       if (cflags == ULINT_UNDEFINED)
3708       {
3709         ib::warn() << "Ignoring a doublewrite copy of page " << page_id
3710                    << "due to invalid flags " << ib::hex(flags);
3711         return false;
3712       }
3713 
3714       flags= cflags;
3715     }
3716 
3717     /* Page 0 is never page_compressed or encrypted. */
3718     return !buf_page_is_corrupted(true, page, flags);
3719   }
3720 
3721   ut_ad(tmp_buf);
3722   byte *tmp_frame= tmp_buf;
3723   byte *tmp_page= tmp_buf + srv_page_size;
3724   const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
3725   const bool expect_encrypted= space->crypt_data &&
3726     space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
3727 
3728   if (space->full_crc32())
3729     return !buf_page_is_corrupted(true, page, space->flags);
3730 
3731   if (expect_encrypted &&
3732       mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
3733   {
3734     if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
3735       return false;
3736     if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
3737       return true;
3738     if (space->zip_size())
3739       return false;
3740     memcpy(tmp_page, page, space->physical_size());
3741     if (!fil_space_decrypt(space, tmp_frame, tmp_page))
3742       return false;
3743   }
3744 
3745   switch (page_type) {
3746   case FIL_PAGE_PAGE_COMPRESSED:
3747     memcpy(tmp_page, page, space->physical_size());
3748     /* fall through */
3749   case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3750     if (space->zip_size())
3751       return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
3752     ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
3753     if (!decomp)
3754       return false; /* decompression failed */
3755     if (decomp == srv_page_size)
3756       return false; /* the page was not compressed (invalid page type) */
3757     return !buf_page_is_corrupted(true, tmp_page, space->flags);
3758   }
3759 
3760   return !buf_page_is_corrupted(true, page, space->flags);
3761 }
3762 
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)3763 byte *recv_dblwr_t::find_page(const page_id_t page_id,
3764                               const fil_space_t *space, byte *tmp_buf)
3765 {
3766   byte *result= NULL;
3767   lsn_t max_lsn= 0;
3768 
3769   for (byte *page : pages)
3770   {
3771     if (page_get_page_no(page) != page_id.page_no() ||
3772         page_get_space_id(page) != page_id.space())
3773       continue;
3774     const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
3775     if (lsn <= max_lsn ||
3776         !validate_page(page_id, page, space, tmp_buf))
3777     {
3778       /* Mark processed for subsequent iterations in buf_dblwr_t::recover() */
3779       memset(page + FIL_PAGE_LSN, 0, 8);
3780       continue;
3781     }
3782     max_lsn= lsn;
3783     result= page;
3784   }
3785 
3786   return result;
3787 }
3788